XRootD
Loading...
Searching...
No Matches
XrdOssAio.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d O s s A i o . c c */
4/* */
5/* (c) 2004 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <signal.h>
32#include <cstdio>
33#include <unistd.h>
34#include <fcntl.h>
35#ifdef _POSIX_ASYNCHRONOUS_IO
36#ifdef __FreeBSD__
37#include <fcntl.h>
38#endif
39#ifdef __APPLE__
40#include <sys/aio.h>
41#else
42#include <aio.h>
43#endif
44#endif
45
46#include "XrdOss/XrdOssApi.hh"
47#include "XrdOss/XrdOssTrace.hh"
48#include "XrdSys/XrdSysError.hh"
51#include "XrdSfs/XrdSfsAio.hh"
52
53// All AIO interfaces are defined here.
54
55
56// Currently we disable aio support for MacOS because it is way too
57// buggy and incomplete. The two major problems are:
58// 1) No implementation of sigwaitinfo(). Though we can simulate it...
59// 2) Event notification returns an incomplete siginfo structure.
60//
61#ifdef __APPLE__
62#undef _POSIX_ASYNCHRONOUS_IO
63#endif
64
65#ifdef __GNU__
66// Compiler warning:
67// warning: sigwaitinfo is not implemented and will always fail
68#undef HAVE_SIGWTI
69#endif
70
71/******************************************************************************/
72/* G l o b a l s */
73/******************************************************************************/
74
76//define tident aiop->TIdent
77
79
80int XrdOssFile::AioFailure = 0;
81
82#ifdef _POSIX_ASYNCHRONOUS_IO
83#ifdef SIGRTMAX
84const int OSS_AIO_READ_DONE = SIGRTMAX-1;
85const int OSS_AIO_WRITE_DONE = SIGRTMAX;
86#else
87#define OSS_AIO_READ_DONE SIGUSR1
88#define OSS_AIO_WRITE_DONE SIGUSR2
89#endif
90#endif
91
92/******************************************************************************/
93/* F s y n c */
94/******************************************************************************/
95
96/*
97 Function: Async fsync() a file
98
99 Input: aiop - A aio request object
100*/
101
103{
104
105#ifdef _POSIX_ASYNCHRONOUS_IO
106 int rc;
107
108// Complete the aio request block and do the operation
109//
111 {aiop->sfsAio.aio_fildes = fd;
112 aiop->sfsAio.aio_sigevent.sigev_signo = OSS_AIO_WRITE_DONE;
113 aiop->TIdent = tident;
114
115 // Start the operation
116 //
117 if (!(rc = aio_fsync(O_SYNC, &aiop->sfsAio))) return 0;
118 if (errno != EAGAIN && errno != ENOSYS) return -errno;
119
120 // Aio failed keep track of the problem (msg every 1024 events). Note
121 // that the handling of the counter is sloppy because we do not lock it.
122 //
123 {int fcnt = AioFailure++;
124 if ((fcnt & 0x3ff) == 1) OssEroute.Emsg("aio", errno, "fsync async");
125 }
126 }
127#endif
128
129// Execute this request in a synchronous fashion
130//
131 if ((aiop->Result = Fsync())) aiop->Result = -errno;
132
133// Simply call the write completion routine and return as if all went well
134//
135 aiop->doneWrite();
136 return 0;
137}
138
139/******************************************************************************/
140/* R e a d */
141/******************************************************************************/
142
143/*
144 Function: Async read `blen' bytes from the associated file, placing in 'buff'
145
146 Input: aiop - An aio request object
147
148 Output: <0 -> Operation failed, value is negative errno value.
149 =0 -> Operation queued
150 >0 -> Operation not queued, system resources unavailable or
151 asynchronous I/O is not supported.
152*/
153
155{
156
157#ifdef _POSIX_ASYNCHRONOUS_IO
158 EPNAME("AioRead");
159 int rc;
160
161// Complete the aio request block and do the operation
162//
164 {aiop->sfsAio.aio_fildes = fd;
165 aiop->sfsAio.aio_sigevent.sigev_signo = OSS_AIO_READ_DONE;
166 aiop->TIdent = tident;
167 TRACE(Debug, "fd=" <<fd <<" read " <<aiop->sfsAio.aio_nbytes <<'@'
168 <<aiop->sfsAio.aio_offset <<" started; aiocb="
169 <<Xrd::hex1 <<aiop);
170
171 // Start the operation
172 //
173 if (!(rc = aio_read(&aiop->sfsAio))) return 0;
174 if (errno != EAGAIN && errno != ENOSYS) return -errno;
175
176 // Aio failed keep track of the problem (msg every 1024 events). Note
177 // that the handling of the counter is sloppy because we do not lock it.
178 //
179 {int fcnt = AioFailure++;
180 if ((fcnt & 0x3ff) == 1) OssEroute.Emsg("aio", errno, "read async");
181 }
182 }
183#endif
184
185// Execute this request in a synchronous fashion
186//
187 aiop->Result = this->Read((void *)aiop->sfsAio.aio_buf,
188 (off_t)aiop->sfsAio.aio_offset,
189 (size_t)aiop->sfsAio.aio_nbytes);
190
191// Simple call the read completion routine and return as if all went well
192//
193 aiop->doneRead();
194 return 0;
195}
196
197/******************************************************************************/
198/* W r i t e */
199/******************************************************************************/
200
201/*
202 Function: Async write `blen' bytes from 'buff' into the associated file
203
204 Input: aiop - An aio request object.
205
206 Output: <0 -> Operation failed, value is negative errno value.
207 =0 -> Operation queued
208 >0 -> Operation not queued, system resources unavailable or
209 asynchronous I/O is not supported.
210*/
211
213{
214#ifdef _POSIX_ASYNCHRONOUS_IO
215 EPNAME("AioWrite");
216 int rc;
217
218// Complete the aio request block and do the operation
219//
221 {aiop->sfsAio.aio_fildes = fd;
222 aiop->sfsAio.aio_sigevent.sigev_signo = OSS_AIO_WRITE_DONE;
223 aiop->TIdent = tident;
224 TRACE(Debug, "fd=" <<fd <<" write " <<aiop->sfsAio.aio_nbytes <<'@'
225 <<aiop->sfsAio.aio_offset <<" started; aiocb="
226 <<Xrd::hex1 <<aiop);
227
228 // Start the operation
229 //
230 if (!(rc = aio_write(&aiop->sfsAio))) return 0;
231 if (errno != EAGAIN && errno != ENOSYS) return -errno;
232
233 // Aio failed keep track of the problem (msg every 1024 events). Note
234 // that the handling of the counter is sloppy because we do not lock it.
235 //
236 {int fcnt = AioFailure++;
237 if ((fcnt & 0x3ff) == 1) OssEroute.Emsg("Write",errno,"write async");
238 }
239 }
240#endif
241
242// Execute this request in a synchronous fashion
243//
244 aiop->Result = this->Write((const void *)aiop->sfsAio.aio_buf,
245 (off_t)aiop->sfsAio.aio_offset,
246 (size_t)aiop->sfsAio.aio_nbytes);
247
248// Simply call the write completion routine and return as if all went well
249//
250 aiop->doneWrite();
251 return 0;
252}
253
254/******************************************************************************/
255/* X r d O s s S y s A I O M e t h o d s */
256/******************************************************************************/
257/******************************************************************************/
258/* G l o b a l s */
259/******************************************************************************/
260
261int XrdOssSys::AioAllOk = 0;
262
263#if defined(_POSIX_ASYNCHRONOUS_IO) && !defined(HAVE_SIGWTI)
264// The folowing is for sigwaitinfo() emulation
265//
266siginfo_t *XrdOssAioInfoR;
267siginfo_t *XrdOssAioInfoW;
268extern "C" {extern void XrdOssAioRSH(int, siginfo_t *, void *);}
269extern "C" {extern void XrdOssAioWSH(int, siginfo_t *, void *);}
270#endif
271
272/******************************************************************************/
273/* A i o I n i t */
274/******************************************************************************/
275/*
276 Function: Initialize for AIO processing.
277
278 Return: True if successful, false otherwise.
279*/
280
282{
283#if defined(_POSIX_ASYNCHRONOUS_IO)
284 EPNAME("AioInit");
285 extern void *XrdOssAioWait(void *carg);
286 pthread_t tid;
287 int retc;
288
289#ifndef HAVE_SIGWTI
290// For those platforms that do not have sigwaitinfo(), we provide the
291// appropriate emulation using a signal handler. We actually provide for
292// two handlers since we separate reads from writes. To emulate synchronous
293// signals, we prohibit one signal hander from interrupting another one.
294//
295 struct sigaction sa;
296
297 sa.sa_sigaction = XrdOssAioRSH;
298 sa.sa_flags = SA_SIGINFO;
299 sigemptyset(&sa.sa_mask);
300 sigaddset(&sa.sa_mask, OSS_AIO_WRITE_DONE);
301 if (sigaction(OSS_AIO_READ_DONE, &sa, NULL) < 0)
302 {OssEroute.Emsg("AioInit", errno, "creating AIO read signal handler; "
303 "AIO support terminated.");
304 return 0;
305 }
306
307 sa.sa_sigaction = XrdOssAioWSH;
308 sa.sa_flags = SA_SIGINFO;
309 sigemptyset(&sa.sa_mask);
310 sigaddset(&sa.sa_mask, OSS_AIO_READ_DONE);
311 if (sigaction(OSS_AIO_WRITE_DONE, &sa, NULL) < 0)
312 {OssEroute.Emsg("AioInit", errno, "creating AIO write signal handler; "
313 "AIO support terminated.");
314 return 0;
315 }
316#endif
317
318// The AIO signal handler consists of two thread (one for read and one for
319// write) that synhronously wait for AIO events. We assume, blithely, that
320// the first two real-time signals have been blocked for all threads.
321//
322 if ((retc = XrdSysThread::Run(&tid, XrdOssAioWait,
323 (void *)(&OSS_AIO_READ_DONE))) < 0)
324 OssEroute.Emsg("AioInit", retc, "creating AIO read signal thread; "
325 "AIO support terminated.");
326#ifdef __FreeBSD__
327 else {DEBUG("started AIO read signal thread.");
328#else
329 else {DEBUG("started AIO read signal thread; tid=" <<(unsigned int)tid);
330#endif
331 if ((retc = XrdSysThread::Run(&tid, XrdOssAioWait,
332 (void *)(&OSS_AIO_WRITE_DONE))) < 0)
333 OssEroute.Emsg("AioInit", retc, "creating AIO write signal thread; "
334 "AIO support terminated.");
335#ifdef __FreeBSD__
336 else {DEBUG("started AIO write signal thread.");
337#else
338 else {DEBUG("started AIO write signal thread; tid=" <<(unsigned int)tid);
339#endif
340 AioAllOk = 1;
341 }
342 }
343
344// All done
345//
346 return AioAllOk;
347#else
348 return 1;
349#endif
350}
351
352/******************************************************************************/
353/* A i o W a i t */
354/******************************************************************************/
355
356void *XrdOssAioWait(void *mySigarg)
357{
358#ifdef _POSIX_ASYNCHRONOUS_IO
359 EPNAME("AioWait");
360 int mySignum = *((int *)mySigarg);
361 const char *sigType = (mySignum == OSS_AIO_READ_DONE ? "read" : "write");
362 const int isRead = (mySignum == OSS_AIO_READ_DONE);
363 sigset_t mySigset;
364 siginfo_t myInfo;
365 XrdSfsAio *aiop;
366 int rc, numsig;
367 ssize_t retval;
368#ifndef HAVE_SIGWTI
369 extern int sigwaitinfo(const sigset_t *set, siginfo_t *info);
370 extern siginfo_t *XrdOssAioInfoR;
371 extern siginfo_t *XrdOssAioInfoW;
372
373// We will catch one signal at a time. So, the address of siginfo_t can be
374// placed in a global area where the signal handler will find it. We have one
375// two places where this can go.
376//
377 if (isRead) XrdOssAioInfoR = &myInfo;
378 else XrdOssAioInfoW = &myInfo;
379
380// Initialize the signal we will be suspended for
381//
382 sigfillset(&mySigset);
383 sigdelset(&mySigset, mySignum);
384#else
385
386// Initialize the signal we will be waiting for
387//
388 sigemptyset(&mySigset);
389 sigaddset(&mySigset, mySignum);
390#endif
391
392// Simply wait for events and requeue the completed AIO operation
393//
394 do {do {numsig = sigwaitinfo((const sigset_t *)&mySigset, &myInfo);}
395 while (numsig < 0 && errno == EINTR);
396 if (numsig < 0)
397 {OssEroute.Emsg("AioWait",errno,sigType,"wait for AIO signal");
399 break;
400 }
401 if (numsig != mySignum || myInfo.si_code != SI_ASYNCIO)
402 {char buff[80];
403 sprintf(buff, "%d %d", myInfo.si_code, numsig);
404 OssEroute.Emsg("AioWait", "received unexpected signal", buff);
405 continue;
406 }
407
408#ifdef __APPLE__
409 aiop = (XrdSfsAio *)myInfo.si_value.sigval_ptr;
410#else
411 aiop = (XrdSfsAio *)myInfo.si_value.sival_ptr;
412#endif
413
414 while ((rc = aio_error(&aiop->sfsAio)) == EINPROGRESS) {}
415 retval = (ssize_t)aio_return(&aiop->sfsAio);
416
417 DEBUG(sigType <<" completed for " <<aiop->TIdent <<"; rc=" <<rc
418 <<" result=" <<retval <<" aiocb=" <<Xrd::hex1 <<aiop);
419
420 if (retval < 0) aiop->Result = -rc;
421 else aiop->Result = retval;
422
423 if (isRead) aiop->doneRead();
424 else aiop->doneWrite();
425 } while(1);
426#endif
427 return (void *)0;
428}
429
430#if defined( _POSIX_ASYNCHRONOUS_IO) && !defined(HAVE_SIGWTI)
431/******************************************************************************/
432/* s i g w a i t i n f o */
433/******************************************************************************/
434
435// Some platforms do not have sigwaitinfo() (e.g., MacOS). We provide for
436// emulation here. It's not as good as the kernel version and the
437// implementation is very specific to the task at hand.
438//
439int sigwaitinfo(const sigset_t *set, siginfo_t *info)
440{
441// Now enable the signal handler by unblocking the signal. It will move the
442// siginfo into the waiting struct and we can return.
443//
444 sigsuspend(set);
445 return info->si_signo;
446}
447
448/******************************************************************************/
449/* X r d O s s A i o R S H */
450/******************************************************************************/
451
452// XrdOssAioRSH handles AIO read signals. This handler was setup at AIO
453// initialization time but only when this platform does not have sigwaitinfo().
454//
455extern "C"
456{
457void XrdOssAioRSH(int signum, siginfo_t *info, void *ucontext)
458{
459 extern siginfo_t *XrdOssAioInfoR;
460
461// If we received a signal, it must have been for an AIO read and the read
462// signal thread enabled this signal. This means that a valid address exists
463// in the global read info pointer that we can now fill out.
464//
465 XrdOssAioInfoR->si_signo = info->si_signo;
466 XrdOssAioInfoR->si_errno = info->si_errno;
467 XrdOssAioInfoR->si_code = info->si_code;
468#ifdef __APPLE__
469 XrdOssAioInfoR->si_value.sigval_ptr = info->si_addr;
470#else
471 XrdOssAioInfoR->si_value.sival_ptr = info->si_value.sival_ptr;
472#endif
473}
474}
475
476/******************************************************************************/
477/* X r d O s s A i o W S H */
478/******************************************************************************/
479
480// XrdOssAioRSH handles AIO read signals. This handler was setup at AIO
481// initialization time but only when this platform does not have sigwaitinfo().
482//
483extern "C"
484{
485void XrdOssAioWSH(int signum, siginfo_t *info, void *ucontext)
486{
487 extern siginfo_t *XrdOssAioInfoW;
488
489// If we received a signal, it must have been for an AIO read and the read
490// signal thread enabled this signal. This means that a valid address exists
491// in the global read info pointer that we can now fill out.
492//
493 XrdOssAioInfoW->si_signo = info->si_signo;
494 XrdOssAioInfoW->si_errno = info->si_errno;
495 XrdOssAioInfoW->si_code = info->si_code;
496#ifdef __APPLE__
497 XrdOssAioInfoW->si_value.sigval_ptr = info->si_addr;
498#else
499 XrdOssAioInfoW->si_value.sival_ptr = info->si_value.sival_ptr;
500#endif
501}
502}
503#endif
#define DEBUG(x)
#define EPNAME(x)
XrdSysTrace OssTrace
void * XrdOssAioWait(void *mySigarg)
Definition XrdOssAio.cc:356
XrdSysError OssEroute
bool Debug
off_t aio_offset
Definition XrdSfsAio.hh:49
size_t aio_nbytes
Definition XrdSfsAio.hh:48
struct sigevent aio_sigevent
Definition XrdSfsAio.hh:51
int aio_fildes
Definition XrdSfsAio.hh:46
void * aio_buf
Definition XrdSfsAio.hh:47
#define TRACE(act, x)
Definition XrdTrace.hh:63
const char * tident
Definition XrdOss.hh:453
int fd
Definition XrdOss.hh:455
ssize_t Read(off_t, size_t)
Definition XrdOssApi.cc:846
ssize_t Write(const void *, off_t, size_t)
static int AioInit()
Definition XrdOssAio.cc:281
static int AioAllOk
Definition XrdOssApi.hh:201
ssize_t Result
Definition XrdSfsAio.hh:65
const char * TIdent
Definition XrdSfsAio.hh:67
virtual void doneRead()=0
struct aiocb sfsAio
Definition XrdSfsAio.hh:62
virtual void doneWrite()=0
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)