XRootD
Loading...
Searching...
No Matches
XrdFfsWcache.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* XrdFfsWcache.cc simple write cache that captures consecutive small writes */
3/* */
4/* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University */
5/* All Rights Reserved */
6/* Author: Wei Yang (SLAC National Accelerator Laboratory, 2009) */
7/* Contract DE-AC02-76-SFO0515 with the Department of Energy */
8/* */
9/* This file is part of the XRootD software suite. */
10/* */
11/* XRootD is free software: you can redistribute it and/or modify it under */
12/* the terms of the GNU Lesser General Public License as published by the */
13/* Free Software Foundation, either version 3 of the License, or (at your */
14/* option) any later version. */
15/* */
16/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19/* License for more details. */
20/* */
21/* You should have received a copy of the GNU Lesser General Public License */
22/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24/* */
25/* The copyright holder's institutional names and contributor's names may not */
26/* be used to endorse or promote products derived from this software without */
27/* specific prior written permission of the institution or contributor. */
28/******************************************************************************/
29
30/*
31 When direct_io is not used, kernel will break large write to 4Kbyte
32 writes. This significantly reduces the writting performance. This
33 simple cache mechanism is to improve the performace on small writes.
34
35 Note that fuse 2.8.0 pre2 or above and kernel 2.6.27 or above provide
36 a big_writes option to allow > 4KByte writing. It will make this
37 smiple write caching obsolete.
38*/
39
40#if defined(__linux__)
41/* For pread()/pwrite() */
42#ifndef _XOPEN_SOURCE
43#define _XOPEN_SOURCE 500
44#endif
45#endif
46
47#include <cstring>
48#include <cstdlib>
49#include <sys/types.h>
50#include <sys/resource.h>
51#include <unistd.h>
52#include <cerrno>
53
54#include <pthread.h>
55
57#ifndef NOXRD
58 #include "XrdFfs/XrdFfsPosix.hh"
59#endif
60
61#ifndef O_DIRECT
62#define O_DIRECT 0
63#endif
64
65#ifdef __cplusplus
66 extern "C" {
67#endif
68
70ssize_t XrdFfsWcacheBufsize = 131072;
71
73 off_t offset;
74 size_t len;
75 char *buf;
76 size_t bufsize;
77 pthread_mutex_t *mlock;
78};
79
81
82/* #include "xrdposix.h" */
83
85void XrdFfsWcache_init(int basefd, int maxfd)
86{
87 int fd;
88/* We are now using virtual file descriptors (from Xrootd Posix interface) in XrdFfsXrootdfs.cc so we need to set
89 * base (lowest) file descriptor, and max number of file descriptors..
90 *
91 struct rlimit rlp;
92
93 getrlimit(RLIMIT_NOFILE, &rlp);
94 XrdFfsWcacheNFILES = rlp.rlim_cur;
95 XrdFfsWcacheNFILES = (XrdFfsWcacheNFILES == (int)RLIM_INFINITY? 4096 : XrdFfsWcacheNFILES);
96 */
97
98 XrdFfsPosix_baseFD = basefd;
99 XrdFfsWcacheNFILES = maxfd;
100
101/* printf("%d %d\n", XrdFfsWcacheNFILES, sizeof(struct XrdFfsWcacheFilebuf)); */
103 for (fd = 0; fd < XrdFfsWcacheNFILES; fd++)
104 {
106 XrdFfsWcacheFbufs[fd].len = 0;
107 XrdFfsWcacheFbufs[fd].buf = NULL;
108 XrdFfsWcacheFbufs[fd].mlock = NULL;
109 }
110 if (!getenv("XRDCL_EC"))
111 {
112 XrdFfsRcacheBufsize = 1024 * 128;
113 }
114 else
115 {
116 char *savptr;
117 int nbdat = atoi(strtok_r(getenv("XRDCL_EC"), ",", &savptr));
118 strtok_r(NULL, ",", &savptr);
119 int chsz = atoi(strtok_r(NULL, ",", &savptr));
120 XrdFfsRcacheBufsize = nbdat * chsz;
121 }
122 if (getenv("XROOTDFS_WCACHESZ"))
123 XrdFfsRcacheBufsize = atoi(getenv("XROOTDFS_WCACHESZ"));
124}
125
126int XrdFfsWcache_create(int fd, int flags)
127/* Create a write cache buffer for a given file descriptor
128 *
129 * fd: file descriptor
130 *
131 * returns: 1 - ok
132 * 0 - error, error code in errno
133 */
134{
136 fd -= XrdFfsPosix_baseFD;
137
139 XrdFfsWcacheFbufs[fd].len = 0;
140 // "flag & O_RDONLY" is not equivalant to ! (flags & O_RDWR) && ! (flags & O_WRONLY)
141 if ( ! (flags & O_RDWR) &&
142 ! (flags & O_WRONLY) &&
143 (flags & O_DIRECT) ) // Limit the usage scenario of the read cache
144 {
145 XrdFfsWcacheFbufs[fd].buf = (char*)malloc(XrdFfsRcacheBufsize);
147 }
148 else
149 {
150 XrdFfsWcacheFbufs[fd].buf = (char*)malloc(XrdFfsWcacheBufsize);
152 }
153 if (XrdFfsWcacheFbufs[fd].buf == NULL)
154 {
155 errno = ENOMEM;
156 return 0;
157 }
158 XrdFfsWcacheFbufs[fd].mlock = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
159 if (XrdFfsWcacheFbufs[fd].mlock == NULL)
160 {
161 errno = ENOMEM;
162 return 0;
163 }
164 errno = pthread_mutex_init(XrdFfsWcacheFbufs[fd].mlock, NULL);
165 if (errno)
166 return 0;
167 return 1;
168}
169
171{
172/* XrdFfsWcache_flush(fd); */
173 fd -= XrdFfsPosix_baseFD;
174
176 XrdFfsWcacheFbufs[fd].len = 0;
177 if (XrdFfsWcacheFbufs[fd].buf != NULL)
178 free(XrdFfsWcacheFbufs[fd].buf);
179 XrdFfsWcacheFbufs[fd].buf = NULL;
180 if (XrdFfsWcacheFbufs[fd].mlock != NULL)
181 {
182 pthread_mutex_destroy(XrdFfsWcacheFbufs[fd].mlock);
183 free(XrdFfsWcacheFbufs[fd].mlock);
184 }
185 XrdFfsWcacheFbufs[fd].mlock = NULL;
186}
187
188ssize_t XrdFfsWcache_flush(int fd)
189{
190 ssize_t rc;
191 fd -= XrdFfsPosix_baseFD;
192
193 if (XrdFfsWcacheFbufs[fd].len == 0 || XrdFfsWcacheFbufs[fd].buf == NULL )
194 return 0;
195
198 if (rc > 0)
199 {
201 XrdFfsWcacheFbufs[fd].len = 0;
202 }
203 return rc;
204}
205
206/*
207struct fd_n_offset {
208 int fd;
209 off_t offset;
210 fd_n_offset(int myfd, off_t myoffset) : fd(myfd), offset(myoffset) {}
211};
212
213void *XrdFfsWcache_updateReadCache(void *x)
214{
215 struct fd_n_offset *a = (struct fd_n_offset*) x;
216 size_t bufsize = XrdFfsWcacheFbufs[a->fd].bufsize;
217
218 pthread_mutex_lock(XrdFfsWcacheFbufs[a->fd].mlock);
219 XrdFfsWcacheFbufs[a->fd].offset = (a->offset / bufsize) * bufsize;
220 XrdFfsWcacheFbufs[a->fd].len = XrdFfsPosix_pread(a->fd + XrdFfsPosix_baseFD,
221 XrdFfsWcacheFbufs[a->fd].buf,
222 bufsize,
223 XrdFfsWcacheFbufs[a->fd].offset);
224 pthread_mutex_unlock(XrdFfsWcacheFbufs[a->fd].mlock);
225 return NULL;
226}
227*/
228
229// this is a read cache
230ssize_t XrdFfsWcache_pread(int fd, char *buf, size_t len, off_t offset)
231{
232 ssize_t rc;
233 fd -= XrdFfsPosix_baseFD;
234 if (fd < 0)
235 {
236 errno = EBADF;
237 return -1;
238 }
239
240 char *bufptr;
241 size_t bufsize = XrdFfsWcacheFbufs[fd].bufsize;
242
243 pthread_mutex_lock(XrdFfsWcacheFbufs[fd].mlock);
244
245 // identity which block to cache
246 if (XrdFfsWcacheFbufs[fd].len == 0 ||
248 {
252 bufsize,
254 } // when XrdFfsWcacheFbufs[fd].len < bufsize, the block is partially cached.
255
256
257 // fetch data from the cache, up to the block's upper boundary.
258 if (XrdFfsWcacheFbufs[fd].offset <= offset &&
260 { // read from cache,
261//----------------------------------------------------------
262// FUSE doesn't like this block of the code, unless direct_io is enabled, or
263// O_DIRECT flags is used. Otherwise, FUSES will stop reading prematurely
264// when two processes read the same file at the same time.
268 memcpy(buf, bufptr, rc);
269//----------------------------------------------------------
270 }
271 else
272 { // offset fall into the uncached part of the partically cached block
274 }
275 pthread_mutex_unlock(XrdFfsWcacheFbufs[fd].mlock);
276/*
277 // prefetch the next block
278 if ( (offset + rc) ==
279 (XrdFfsWcacheFbufs[fd].offset + bufsize) )
280 {
281 pthread_t thread;
282 pthread_attr_t attr;
283 //size_t stacksize = 4*1024*1024;
284
285 pthread_attr_init(&attr);
286 //pthread_attr_setstacksize(&attr, stacksize);
287 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
288
289 struct fd_n_offset nextblock(fd, (offset + bufsize));
290 if (! pthread_create(&thread, &attr, XrdFfsWcache_updateReadCache, &nextblock))
291 pthread_detach(thread);
292 pthread_attr_destroy(&attr);
293 }
294*/
295 return rc;
296}
297
298ssize_t XrdFfsWcache_pwrite(int fd, char *buf, size_t len, off_t offset)
299{
300 ssize_t rc;
301 char *bufptr;
302 fd -= XrdFfsPosix_baseFD;
303 if (fd < 0)
304 {
305 errno = EBADF;
306 return -1;
307 }
308
309/* do not use caching under these cases */
310 if (len > (size_t)(XrdFfsWcacheBufsize/2) || fd >= XrdFfsWcacheNFILES)
311 {
313 return rc;
314 }
315
316 pthread_mutex_lock(XrdFfsWcacheFbufs[fd].mlock);
317 rc = XrdFfsWcacheFbufs[fd].len;
318/*
319 in the following two cases, a XrdFfsWcache_flush is required:
320 1. current offset isnn't pointing to the tail of data in buffer
321 2. adding new data will exceed the current buffer
322*/
323 if (offset != (off_t)(XrdFfsWcacheFbufs[fd].offset + XrdFfsWcacheFbufs[fd].len) ||
326
327 errno = 0;
328 if (rc < 0)
329 {
330 errno = ENOSPC;
331 pthread_mutex_unlock(XrdFfsWcacheFbufs[fd].mlock);
332 return -1;
333 }
334
335 bufptr = &XrdFfsWcacheFbufs[fd].buf[XrdFfsWcacheFbufs[fd].len];
336 memcpy(bufptr, buf, len);
337 if (XrdFfsWcacheFbufs[fd].len == 0)
340
341 pthread_mutex_unlock(XrdFfsWcacheFbufs[fd].mlock);
342 return (ssize_t)len;
343}
344
345#ifdef __cplusplus
346 }
347#endif
ssize_t XrdFfsPosix_pwrite(int fildes, const void *buf, size_t nbyte, off_t offset)
ssize_t XrdFfsPosix_pread(int fildes, void *buf, size_t nbyte, off_t offset)
void XrdFfsWcache_init(int basefd, int maxfd)
void XrdFfsWcache_destroy(int fd)
int XrdFfsWcacheNFILES
ssize_t XrdFfsWcache_pwrite(int fd, char *buf, size_t len, off_t offset)
ssize_t XrdFfsWcacheBufsize
#define O_DIRECT
pthread_mutex_t * mlock
ssize_t XrdFfsWcache_pread(int fd, char *buf, size_t len, off_t offset)
ssize_t XrdFfsRcacheBufsize
int XrdFfsPosix_baseFD
ssize_t XrdFfsWcache_flush(int fd)
struct XrdFfsWcacheFilebuf * XrdFfsWcacheFbufs
int XrdFfsWcache_create(int fd, int flags)