XRootD
Loading...
Searching...
No Matches
XrdOssCsiPages.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d O s s C s i P a g e s . c c */
4/* */
5/* (C) Copyright 2021 CERN. */
6/* */
7/* This file is part of the XRootD software suite. */
8/* */
9/* XRootD is free software: you can redistribute it and/or modify it under */
10/* the terms of the GNU Lesser General Public License as published by the */
11/* Free Software Foundation, either version 3 of the License, or (at your */
12/* option) any later version. */
13/* */
14/* In applying this licence, CERN does not waive the privileges and */
15/* immunities granted to it by virtue of its status as an Intergovernmental */
16/* Organization or submit itself to any jurisdiction. */
17/* */
18/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
19/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
20/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
21/* License for more details. */
22/* */
23/* You should have received a copy of the GNU Lesser General Public License */
24/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
25/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
26/* */
27/* The copyright holder's institutional names and contributor's names may not */
28/* be used to endorse or promote products derived from this software without */
29/* specific prior written permission of the institution or contributor. */
30/******************************************************************************/
31
32#include "XrdOssCsiTrace.hh"
33#include "XrdOssCsiPages.hh"
34#include "XrdOuc/XrdOucCRC.hh"
35
36#include <sys/types.h>
37#include <sys/stat.h>
38#include <fcntl.h>
39
40#include <assert.h>
41
43
44XrdOssCsiPages::XrdOssCsiPages(const std::string &fn, std::unique_ptr<XrdOssCsiTagstore> ts, bool wh, bool am, bool dpe, bool dlw, const char *tid) :
45 ts_(std::move(ts)),
46 writeHoles_(wh),
47 allowMissingTags_(am),
48 disablePgExtend_(dpe),
49 hasMissingTags_(false),
50 rdonly_(false),
51 loosewriteConfigured_(!dlw),
52 loosewrite_(false),
53 tscond_(0),
54 tsforupdate_(false),
55 fn_(fn),
56 tident_(tid),
57 tident(tident_.c_str()),
58 lastpgforloose_(0),
59 checklastpg_(false)
60{
61 // empty constructor
62}
63
64int XrdOssCsiPages::Open(const char *path, off_t dsize, int flags, XrdOucEnv &envP)
65{
66 EPNAME("Pages::Open");
67 hasMissingTags_ = false;
68 rdonly_ = false;
69 int ret = ts_->Open(path, dsize, flags, envP);
70 if (ret == -ENOENT)
71 {
72 // no existing tag
74 {
75 TRACE(Info, "Opening with missing tagfile: " << fn_);
76 hasMissingTags_ = true;
77 return 0;
78 }
79 TRACE(Warn, "Could not open tagfile for " << fn_ << " error " << ret);
80 return -EDOM;
81 }
82 if (ret<0) return ret;
83 if ((flags & O_ACCMODE) == O_RDONLY) rdonly_ = true;
84 loosewrite_ = (dsize==0 && ts_->GetTrackedTagSize()==0) ? false : loosewriteConfigured_;
85 return 0;
86}
87
89{
91 {
92 hasMissingTags_ = false;
93 return 0;
94 }
95 return ts_->Close();
96}
97
99{
100 if (!hasMissingTags_) ts_->Flush();
101}
102
104{
105 if (hasMissingTags_) return 0;
106 return ts_->Fsync();
107}
108
110{
111 if (hasMissingTags_) return -ENOENT;
112
114 while (tsforupdate_)
115 {
116 tscond_.Wait();
117 }
118 off_t tagsize = ts_->GetTrackedTagSize();
119 off_t datasize = ts_->GetTrackedDataSize();
120 if (forupdate)
121 {
122 tsforupdate_ = true;
123 }
124 rsizes = std::make_pair(tagsize,datasize);
125 return 0;
126}
127
129{
131 return ts_->SetTrackedSize(sz);
132}
133
135{
136 // nothing to do is no tag file
137 if (hasMissingTags_) return 0;
138
140 const int ret = ts_->ResetSizes(sz);
143 return ret;
144}
145
146int XrdOssCsiPages::LockTruncateSize(const off_t sz, const bool datatoo)
147{
149 return ts_->Truncate(sz,datatoo);
150}
151
153{
155 return ts_->SetUnverified();
156}
157
159{
161 assert(tsforupdate_ == true);
162
163 tsforupdate_ = false;
165}
166
167// Used by Write: At this point the user's data has not yet been written to the file.
168//
169int XrdOssCsiPages::UpdateRange(XrdOssDF *const fd, const void *buff, const off_t offset, const size_t blen, XrdOssCsiRangeGuard &rg)
170{
171 if (offset<0)
172 {
173 return -EINVAL;
174 }
175
176 if (blen == 0)
177 {
178 return 0;
179 }
180
181 // if the tag file is missing we don't need to store anything
182 if (hasMissingTags_)
183 {
184 return 0;
185 }
186
187 // update of file were checksums are based on the file data suppplied: as there's no separate
188 // source of checksum information mark this file as having unverified checksums
190
191 const Sizes_t sizes = rg.getTrackinglens();
192
193 const off_t trackinglen = sizes.first;
194 if (offset+blen > static_cast<size_t>(trackinglen))
195 {
196 LockSetTrackedSize(offset+blen);
198 }
199
200 int ret;
201 if ((offset % XrdSys::PageSize) != 0 ||
202 (offset+blen < static_cast<size_t>(trackinglen) && (blen % XrdSys::PageSize) != 0) ||
203 ((trackinglen % XrdSys::PageSize) !=0 && offset > trackinglen))
204 {
205 ret = UpdateRangeUnaligned(fd, buff, offset, blen, sizes);
206 }
207 else
208 {
209 ret = UpdateRangeAligned(buff, offset, blen, sizes);
210 }
211
212 return ret;
213}
214
215// Used by Read: At this point the user's buffer has already been filled from the file.
216// offset: offset within the file at which the read starts
217// blen : the length of the read already read into the buffer
218// (which may be less than what was originally requested)
219//
220int XrdOssCsiPages::VerifyRange(XrdOssDF *const fd, const void *buff, const off_t offset, const size_t blen, XrdOssCsiRangeGuard &rg)
221{
222 EPNAME("VerifyRange");
223
224 if (offset<0)
225 {
226 return -EINVAL;
227 }
228
229 // if the tag file is missing we don't verify anything
230 if (hasMissingTags_)
231 {
232 return 0;
233 }
234
235 const Sizes_t sizes = rg.getTrackinglens();
236 const off_t trackinglen = sizes.first;
237
238 if (offset >= trackinglen && blen == 0)
239 {
240 return 0;
241 }
242
243 if (blen == 0)
244 {
245 // if offset is before the tracked len we should not be requested to verify zero bytes:
246 // the file may have been truncated
247 TRACE(Warn, "Verify request for zero bytes " << fn_ << ", file may be truncated");
248 return -EDOM;
249 }
250
251 if (offset+blen > static_cast<size_t>(trackinglen))
252 {
253 TRACE(Warn, "Verify request for " << (offset+blen-trackinglen) << " bytes from " << fn_ << " beyond tracked length");
254 return -EDOM;
255 }
256
257 int vret;
258 if ((offset % XrdSys::PageSize) != 0 || (offset+blen != static_cast<size_t>(trackinglen) && (blen % XrdSys::PageSize) != 0))
259 {
260 vret = VerifyRangeUnaligned(fd, buff, offset, blen, sizes);
261 }
262 else
263 {
264 vret = VerifyRangeAligned(buff, offset, blen, sizes);
265 }
266
267 return vret;
268}
269
270// apply_sequential_aligned_modify: Internal func used during Write/pgWrite
271// (both aligned/unaligned cases) to update multiple tags.
272//
273// Write series of crc32c values to a file's tag file, with optional pre-block or lastblock
274// values. Tries to reduce the number of calls to WriteTags() by using an internal buffer
275// to make all the crc32c values available from a contiguous start location.
276//
277// buff: start of data buffer: Page aligned within the file, used if calculating crc32
278// startp: page index corresponding to the start of buff
279// nbytes: length of buffer
280// csvec: optional pre-computed crc32 values covering nbytes of buffer
281// preblockset: true/false. A value for a crc32 value to be placed at startp-1
282// lastblockset:true/false. If true the last page of buff must be partial, and instead of
283// calculating or fetching a crc32 value from csvec[], a supplied
284// value is used.
285// cspre: value to use for preblock crc32 (used if preblockset is true)
286// cslast: value to use for last partial-block crc32 (used if lastblockset is true)
287//
289 const void *const buff, const off_t startp, const size_t nbytes, const uint32_t *csvec,
290 const bool preblockset, const bool lastblockset, const uint32_t cspre, const uint32_t cslast)
291{
292 EPNAME("apply_sequential_aligned_modify");
293
294 if (lastblockset && (nbytes % XrdSys::PageSize)==0)
295 {
296 return -EINVAL;
297 }
298 if (preblockset && startp==0)
299 {
300 return -EINVAL;
301 }
302
303 uint32_t calcbuf[stsize_];
304 const size_t calcbufsz = sizeof(calcbuf)/sizeof(uint32_t);
305 const uint8_t *const p = (uint8_t*)buff;
306
307 // will be using calcbuf
308 bool useinternal = true;
309 if (csvec && !preblockset && !lastblockset)
310 {
311 useinternal = false;
312 }
313
314 bool dopre = preblockset;
315 const off_t sp = preblockset ? startp-1 : startp;
316
317 size_t blktowrite = ((nbytes+XrdSys::PageSize-1)/XrdSys::PageSize) + (preblockset ? 1 : 0);
318 size_t nblkwritten = 0;
319 size_t calcbytot = 0;
320 while(blktowrite>0)
321 {
322 size_t blkwcnt = blktowrite;
323 if (useinternal)
324 {
325 size_t cidx = 0;
326 size_t calcbycnt = nbytes - calcbytot;
327 if (nblkwritten == 0 && dopre)
328 {
329 calcbycnt = std::min(calcbycnt, (calcbufsz-1)*XrdSys::PageSize);
330 blkwcnt = (calcbycnt+XrdSys::PageSize-1)/XrdSys::PageSize;
331 calcbuf[cidx] = cspre;
332 cidx++;
333 blkwcnt++;
334 dopre = false;
335 }
336 else
337 {
338 calcbycnt = std::min(calcbycnt, calcbufsz*XrdSys::PageSize);
339 blkwcnt = (calcbycnt+XrdSys::PageSize-1)/XrdSys::PageSize;
340 }
341 if ((calcbycnt % XrdSys::PageSize)!=0 && lastblockset)
342 {
343 const size_t x = calcbycnt / XrdSys::PageSize;
344 calcbycnt = XrdSys::PageSize * x;
345 calcbuf[cidx + x] = cslast;
346 }
347 if (csvec)
348 {
349 memcpy(&calcbuf[cidx], &csvec[calcbytot/XrdSys::PageSize], 4*((calcbycnt+XrdSys::PageSize-1)/XrdSys::PageSize));
350 }
351 else
352 {
353 XrdOucCRC::Calc32C(&p[calcbytot], calcbycnt, &calcbuf[cidx]);
354 }
355 calcbytot += calcbycnt;
356 }
357 const ssize_t wret = ts_->WriteTags(useinternal ? calcbuf : &csvec[nblkwritten], sp+nblkwritten, blkwcnt);
358 if (wret<0)
359 {
360 TRACE(Warn, TagsWriteError(sp+nblkwritten, blkwcnt, wret));
361 return wret;
362 }
363 blktowrite -= blkwcnt;
364 nblkwritten += blkwcnt;
365 }
366 return nblkwritten;
367}
368
369//
370// FetchRangeAligned
371//
372// Used by pgRead or Read (via VerifyRangeAligned) when the read offset is at a page boundary within the file
373// AND the length is a multiple of page size or the read is up to exactly the end of file.
374//
375int XrdOssCsiPages::FetchRangeAligned(const void *const buff, const off_t offset, const size_t blen, const Sizes_t & /* sizes */, uint32_t *const csvec, const uint64_t opts)
376{
377 EPNAME("FetchRangeAligned");
378 uint32_t rdvec[stsize_],vrbuf[stsize_];
379
380 const off_t p1 = offset / XrdSys::PageSize;
381 const off_t p2 = (offset+blen) / XrdSys::PageSize;
382 const size_t p2_off = (offset+blen) % XrdSys::PageSize;
383 const size_t nfull = p2-p1;
384
385 uint32_t *rdbuf;
386 size_t rdbufsz;
387 if (csvec == NULL)
388 {
389 // use fixed sized stack buffer
390 rdbuf = rdvec;
391 rdbufsz = sizeof(rdvec)/sizeof(uint32_t);
392 }
393 else
394 {
395 // use supplied buffer, assumed to be large enough
396 rdbuf = csvec;
397 rdbufsz = (p2_off==0) ? nfull : (nfull+1);
398 }
399
400 // always use stack based, fixed sized buffer for verify
401 const size_t vrbufsz = sizeof(vrbuf)/sizeof(uint32_t);
402
403 // pointer to data
404 const uint8_t *const p = (uint8_t*)buff;
405
406 // process full pages + any partial page
407 size_t toread = (p2_off>0) ? nfull+1 : nfull;
408 size_t nread = 0;
409 while(toread>0)
410 {
411 const size_t rcnt = std::min(toread, rdbufsz-(nread%rdbufsz));
412 const ssize_t rret = ts_->ReadTags(&rdbuf[nread%rdbufsz], p1+nread, rcnt);
413 if (rret<0)
414 {
415 TRACE(Warn, TagsReadError(p1+nread, rcnt, rret));
416 return rret;
417 }
418 if ((opts & XrdOssDF::Verify))
419 {
420 size_t toverif = rcnt;
421 size_t nverif = 0;
422 while(toverif>0)
423 {
424 const size_t vcnt = std::min(toverif, vrbufsz);
425 const size_t databytes = (nread+nverif+vcnt <= nfull) ? (vcnt*XrdSys::PageSize) : ((vcnt-1)*XrdSys::PageSize+p2_off);
426 XrdOucCRC::Calc32C(&p[XrdSys::PageSize*(nread+nverif)],databytes,vrbuf);
427 if (memcmp(vrbuf, &rdbuf[(nread+nverif)%rdbufsz], 4*vcnt))
428 {
429 size_t badpg;
430 for(badpg=0;badpg<vcnt;++badpg) { if (memcmp(&vrbuf[badpg],&rdbuf[(nread+nverif+badpg)%rdbufsz],4)) break; }
431 TRACE(Warn, CRCMismatchError( (nread+nverif+badpg<nfull) ? XrdSys::PageSize : p2_off,
432 (p1+nread+nverif+badpg),
433 vrbuf[badpg],
434 rdbuf[(nread+nverif+badpg)%rdbufsz] ));
435 return -EDOM;
436 }
437 toverif -= vcnt;
438 nverif += vcnt;
439 }
440 }
441 toread -= rcnt;
442 nread += rcnt;
443 }
444
445 return 0;
446}
447
448int XrdOssCsiPages::VerifyRangeAligned(const void *const buff, const off_t offset, const size_t blen, const Sizes_t &sizes)
449{
450 return FetchRangeAligned(buff,offset,blen,sizes,NULL,XrdOssDF::Verify);
451}
452
453int XrdOssCsiPages::StoreRangeAligned(const void *const buff, const off_t offset, const size_t blen, const Sizes_t &sizes, uint32_t *csvec)
454{
455 EPNAME("StoreRangeAligned");
456
457 // if csvec given store those values
458 // if no csvec then calculate against data and store
459
460 const off_t p1 = offset / XrdSys::PageSize;
461 const off_t trackinglen = sizes.first;
462
463 if (offset > trackinglen)
464 {
465 const int ret = UpdateRangeHoleUntilPage(NULL, p1, sizes);
466 if (ret<0)
467 {
468 TRACE(Warn, "Error updating tags for holes, error=" << ret);
469 return ret;
470 }
471 }
472
473 const ssize_t aret = apply_sequential_aligned_modify(buff, p1, blen, csvec, false, false, 0U, 0U);
474 if (aret<0)
475 {
476 TRACE(Warn, "Error updating tags, error=" << aret);
477 return aret;
478 }
479
480 return 0;
481}
482
483// Used by Read for aligned reads. See StoreRangeAligned for conditions.
484//
485int XrdOssCsiPages::UpdateRangeAligned(const void *const buff, const off_t offset, const size_t blen, const Sizes_t &sizes)
486{
487 return StoreRangeAligned(buff, offset, blen, sizes, NULL);
488}
489
490//
491// LockTrackinglen: obtain current tracking counts and lock the following as necessary:
492// tracking counts and file byte range [offset, offend). Lock will be applied
493// at the page level.
494//
495// offset - byte offset of first page to apply lock
496// offend - end of range byte (excluding byte at end) of page at which to end lock
497// rdonly - will be a read-only operation
498//
499void XrdOssCsiPages::LockTrackinglen(XrdOssCsiRangeGuard &rg, const off_t offset, const off_t offend, const bool rdonly)
500{
501 // no need to lock if we don't have a tag file
502 if (hasMissingTags_) return;
503
504 // in case of empty range the tracking len is not copied
505 if (offset == offend) return;
506
507 {
509
510 Sizes_t sizes;
511 (void)TrackedSizesGet(sizes, !rdonly);
512
513 // tag tracking data filesize, as recorded in the tagfile and for which the tagfile
514 // should be approprately sized, is sizes.first: usually the same as the in
515 // memory "actual" data filesize (sizes.second), but may differ after crashes or write failure.
516 const off_t trackinglen = sizes.first;
517
518 const off_t p1 = (offset>trackinglen ? trackinglen : offset) / XrdSys::PageSize;
519 bool unlock = false;
520 if (!rdonly && offend <= trackinglen)
521 {
522 unlock = true;
523 }
524
525 off_t p2 = offend / XrdSys::PageSize;
526 const size_t p2_off = offend % XrdSys::PageSize;
527
528 // range is exclusive
529 if (p2_off ==0) p2--;
530
531 ranges_.AddRange(p1, p2, rg, rdonly);
532
533 if (unlock)
534 {
536 }
537 rg.SetTrackingInfo(this, sizes, (!rdonly && !unlock));
538 }
539
540 rg.Wait();
541}
542
543int XrdOssCsiPages::truncate(XrdOssDF *const fd, const off_t len, XrdOssCsiRangeGuard &rg)
544{
545 EPNAME("truncate");
546
547 if (len<0) return -EINVAL;
548
549 // nothing to truncate if there is no tag file
550 if (hasMissingTags_) return 0;
551
552 const Sizes_t sizes = rg.getTrackinglens();
553
554 const off_t trackinglen = sizes.first;
555 const off_t p_until = len / XrdSys::PageSize;
556 const size_t p_off = len % XrdSys::PageSize;
557
558 if (len>trackinglen)
559 {
560 int ret = UpdateRangeHoleUntilPage(fd,p_until,sizes);
561 if (ret<0)
562 {
563 TRACE(Warn, "Error updating tags for holes, error=" << ret);
564 return ret;
565 }
566 }
567
568 if (len != trackinglen && p_off != 0)
569 {
570 const off_t tracked_page = trackinglen / XrdSys::PageSize;
571 const size_t tracked_off = trackinglen % XrdSys::PageSize;
572 size_t toread = tracked_off;
573 if (len>trackinglen)
574 {
575 if (p_until != tracked_page) toread = 0;
576 }
577 else
578 {
579 if (p_until != tracked_page) toread = XrdSys::PageSize;
580 }
581 uint8_t b[XrdSys::PageSize];
582 if (toread>0)
583 {
584 ssize_t rret = XrdOssCsiPages::fullread(fd, b, p_until*XrdSys::PageSize, toread);
585 if (rret<0)
586 {
587 TRACE(Warn, PageReadError(toread, p_until, rret));
588 return rret;
589 }
590 const uint32_t crc32c = XrdOucCRC::Calc32C(b, toread, 0U);
591 uint32_t crc32v;
592 rret = ts_->ReadTags(&crc32v, p_until, 1);
593 if (rret<0)
594 {
595 TRACE(Warn, TagsReadError(p_until, 1, rret));
596 return rret;
597 }
598 if (crc32v != crc32c)
599 {
600 TRACE(Warn, CRCMismatchError(toread, p_until, crc32c, crc32v));
601 return -EDOM;
602 }
603 }
604 if (p_off > toread)
605 {
606 memset(&b[toread],0,p_off-toread);
607 }
608 const uint32_t crc32c = XrdOucCRC::Calc32C(b, p_off, 0U);
609 const ssize_t wret = ts_->WriteTags(&crc32c, p_until, 1);
610 if (wret < 0)
611 {
612 TRACE(Warn, TagsWriteError(p_until, 1, wret));
613 return wret;
614 }
615 }
616
617 LockTruncateSize(len,true);
619 return 0;
620}
621
622// used by pgRead: At this point the user's buffer has already been filled from the file
623// offset: offset within the file at which the read starts
624// blen : the length of the read already read into the buffer
625// (which may be less than what was originally requested)
626//
628 XrdOssDF *const fd, const void *buff, const off_t offset, const size_t blen,
629 uint32_t *csvec, const uint64_t opts, XrdOssCsiRangeGuard &rg)
630{
631 EPNAME("FetchRange");
632 if (offset<0)
633 {
634 return -EINVAL;
635 }
636
637 // if the tag file is missing there is nothing to fetch or verify
638 // but if we should return a list of checksums calculate them from the data
639 if (hasMissingTags_)
640 {
641 if (csvec)
642 {
643 pgDoCalc(buff, offset, blen, csvec);
644 }
645 return 0;
646 }
647
648 const Sizes_t sizes = rg.getTrackinglens();
649 const off_t trackinglen = sizes.first;
650
651 if (offset >= trackinglen && blen == 0)
652 {
653 return 0;
654 }
655
656 if (blen == 0)
657 {
658 // if offset is before the tracked len we should not be requested to verify zero bytes:
659 // the file may have been truncated
660 TRACE(Warn, "Fetch request for zero bytes " << fn_ << ", file may be truncated");
661 return -EDOM;
662 }
663
664 if (offset+blen > static_cast<size_t>(trackinglen))
665 {
666 TRACE(Warn, "Fetch request for " << (offset+blen-trackinglen) << " bytes from " << fn_ << " beyond tracked length");
667 return -EDOM;
668 }
669
670 if (csvec == NULL && !(opts & XrdOssDF::Verify))
671 {
672 // if the crc values are not wanted nor checks against data, then
673 // there's nothing more to do here
674 return 0;
675 }
676
677 int fret;
678 if ((offset % XrdSys::PageSize) != 0 || (offset+blen != static_cast<size_t>(trackinglen) && (blen % XrdSys::PageSize) != 0))
679 {
680 fret = FetchRangeUnaligned(fd, buff, offset, blen, sizes, csvec, opts);
681 }
682 else
683 {
684 fret = FetchRangeAligned(buff,offset,blen,sizes,csvec,opts);
685 }
686 return fret;
687}
688
689// Used by pgWrite: At this point the user's data has not yet been written to the file.
690//
691int XrdOssCsiPages::StoreRange(XrdOssDF *const fd, const void *buff, const off_t offset, const size_t blen, uint32_t *csvec, const uint64_t opts, XrdOssCsiRangeGuard &rg)
692{
693 if (offset<0)
694 {
695 return -EINVAL;
696 }
697
698 if (blen == 0)
699 {
700 return 0;
701 }
702
703 // if the tag file is missing there is nothing to store
704 // but do calculate checksums to return, if requested to do so
705 if (hasMissingTags_)
706 {
707 if (csvec && (opts & XrdOssDF::doCalc))
708 {
709 pgDoCalc(buff, offset, blen, csvec);
710 }
711 return 0;
712 }
713
714 const Sizes_t sizes = rg.getTrackinglens();
715 const off_t trackinglen = sizes.first;
716
717 // in the original specification of pgWrite there was the idea of a logical-eof, set by
718 // the a pgWrite with non-page aligned length: We support an option to approximate that
719 // by disallowing pgWrite past the current (non page aligned) eof.
720 if (disablePgExtend_ && (trackinglen % XrdSys::PageSize) !=0 && offset+blen > static_cast<size_t>(trackinglen))
721 {
722 return -ESPIPE;
723 }
724
725 // if doCalc is set and we have a csvec buffer fill it with calculated values
726 if (csvec && (opts & XrdOssDF::doCalc))
727 {
728 pgDoCalc(buff, offset, blen, csvec);
729 }
730
731 // if no vector of crc have been given and not specifically requested to calculate,
732 // then mark this file as having unverified checksums
733 if (!csvec && !(opts & XrdOssDF::doCalc))
734 {
736 }
737
738 if (offset+blen > static_cast<size_t>(trackinglen))
739 {
740 LockSetTrackedSize(offset+blen);
742 }
743
744 int ret;
745 if ((offset % XrdSys::PageSize) != 0 ||
746 (offset+blen < static_cast<size_t>(trackinglen) && (blen % XrdSys::PageSize) != 0) ||
747 ((trackinglen % XrdSys::PageSize) !=0 && offset > trackinglen))
748 {
749 ret = StoreRangeUnaligned(fd,buff,offset,blen,sizes,csvec);
750 }
751 else
752 {
753 ret = StoreRangeAligned(buff,offset,blen,sizes,csvec);
754 }
755
756 return ret;
757}
758
760{
761 if (hasMissingTags_)
762 {
763 return 0;
764 }
765 bool iv;
766 {
768 iv = ts_->IsVerified();
769 }
770 if (iv)
771 {
772 return XrdOss::PF_csVer;
773 }
774 return XrdOss::PF_csVun;
775}
776
777void XrdOssCsiPages::pgDoCalc(const void *buffer, off_t offset, size_t wrlen, uint32_t *csvec)
778{
779 const size_t p_off = offset % XrdSys::PageSize;
780 const size_t p_alen = (p_off > 0) ? (XrdSys::PageSize - p_off) : wrlen;
781 if (p_alen < wrlen)
782 {
783 XrdOucCRC::Calc32C((uint8_t *)buffer+p_alen, wrlen-p_alen, &csvec[1]);
784 }
785 XrdOucCRC::Calc32C((void*)buffer, std::min(p_alen, wrlen), csvec);
786}
787
788int XrdOssCsiPages::pgWritePrelockCheck(const void *buffer, off_t offset, size_t wrlen, const uint32_t *csvec, uint64_t opts)
789{
790 // do verify before taking locks to allow for faster fail
791 if (csvec && (opts & XrdOssDF::Verify))
792 {
793 uint32_t valcs;
794 const size_t p_off = offset % XrdSys::PageSize;
795 const size_t p_alen = (p_off > 0) ? (XrdSys::PageSize - p_off) : wrlen;
796 if (p_alen < wrlen)
797 {
798 if (XrdOucCRC::Ver32C((uint8_t *)buffer+p_alen, wrlen-p_alen, &csvec[1], valcs)>=0)
799 {
800 return -EDOM;
801 }
802 }
803 if (XrdOucCRC::Ver32C((void*)buffer, std::min(p_alen, wrlen), csvec, valcs)>=0)
804 {
805 return -EDOM;
806 }
807 }
808
809 return 0;
810}
811
812//
813// Do some consistency checks and repair in case the datafile length is inconsistent
814// with the length in the tag file. Called on open() and after write failures.
815// Disabled if loosewrite_ off, or file readonly. Sets lastpgforloose_.
816//
817// May be called under tscond_ lock from LockResetSizes, or directly from
818// CsiFile just after open.
819//
821{
822 EPNAME("BasicConsistencyCheck");
823
824 if (!loosewrite_ || rdonly_) return;
825
826 uint8_t b[XrdSys::PageSize];
827 static const uint8_t bz[XrdSys::PageSize] = {0};
828
829 const off_t tagsize = ts_->GetTrackedTagSize();
830 const off_t datasize = ts_->GetTrackedDataSize();
831
832 off_t taglp = 0, datalp = 0;
833 size_t tag_len = 0, data_len = 0;
834
835 if (tagsize>0)
836 {
837 taglp = (tagsize - 1) / XrdSys::PageSize;
838 tag_len = tagsize % XrdSys::PageSize;
839 tag_len = tag_len ? tag_len : XrdSys::PageSize;
840 }
841 if (datasize>0)
842 {
843 datalp = (datasize - 1) / XrdSys::PageSize;
844 data_len = datasize % XrdSys::PageSize;
845 data_len = data_len ? data_len : XrdSys::PageSize;
846 }
847
848 lastpgforloose_ = taglp;
849 checklastpg_ = true;
850
851 if (datasize>0 && taglp > datalp)
852 {
853 ssize_t rlen = XrdOssCsiPages::maxread(fd, b, XrdSys::PageSize * datalp, XrdSys::PageSize);
854 if (rlen<0)
855 {
856 TRACE(Warn, PageReadError(XrdSys::PageSize, datalp, rlen));
857 return;
858 }
859
860 memset(&b[rlen], 0, XrdSys::PageSize-rlen);
861 const uint32_t data_crc = XrdOucCRC::Calc32C(b, data_len, 0u);
862 const uint32_t data_crc_z = XrdOucCRC::Calc32C(b, XrdSys::PageSize, 0u);
863 uint32_t tagv;
864 ssize_t rret = ts_->ReadTags(&tagv, datalp, 1);
865 if (rret<0)
866 {
867 TRACE(Warn, TagsReadError(datalp, 1, rret));
868 return;
869 }
870
871 if (tagv == data_crc_z)
872 {
873 // expected
874 }
875 else if (tagv == data_crc)
876 {
877 // should set tagv to data_crc_z
878 TRACE(Warn, "Resetting tag for page at " << datalp*XrdSys::PageSize << " to zero-extended");
879 const ssize_t wret = ts_->WriteTags(&data_crc_z, datalp, 1);
880 if (wret < 0)
881 {
882 TRACE(Warn, TagsWriteError(datalp, 1, wret));
883 return;
884 }
885 }
886 else
887 {
888 // something else wrong
889 TRACE(Warn, CRCMismatchError(data_len, datalp, data_crc, tagv) << " (ignoring)");
890 }
891 }
892 else if (tagsize>0 && taglp < datalp)
893 {
894 // datafile has more pages than recorded in the tag file:
895 // the tag file should have a crc corresponding to the relevant data fragment that is tracked in the last page.
896 // If it has the crc for a whole page (and there no non-zero content later in the page) reset it.
897 // This is so that a subsequnt UpdateRangeHoleUntilPage can zero-extend the CRC and get a consistent CRC.
898
899 ssize_t rlen = XrdOssCsiPages::maxread(fd, b, XrdSys::PageSize * taglp, XrdSys::PageSize);
900 if (rlen<0)
901 {
902 TRACE(Warn, PageReadError(XrdSys::PageSize, taglp, rlen));
903 return;
904 }
905
906 memset(&b[rlen], 0, XrdSys::PageSize-rlen);
907 const uint32_t tag_crc = XrdOucCRC::Calc32C(b, tag_len, 0u);
908 const uint32_t tag_crc_z = XrdOucCRC::Calc32C(b, XrdSys::PageSize, 0u);
909 const uint32_t dp_ext_is_zero = !memcmp(&b[tag_len], bz, XrdSys::PageSize-tag_len);
910 uint32_t tagv;
911 ssize_t rret = ts_->ReadTags(&tagv, taglp, 1);
912 if (rret<0)
913 {
914 TRACE(Warn, TagsReadError(taglp, 1, rret));
915 return;
916 }
917
918 if (tagv == tag_crc)
919 {
920 // expected
921 }
922 else if (tagv == tag_crc_z && dp_ext_is_zero)
923 {
924 // should set tagv to tag_crc
925 TRACE(Warn, "Resetting tag for page at " << taglp*XrdSys::PageSize << " to not zero-extended");
926 const ssize_t wret = ts_->WriteTags(&tag_crc, taglp, 1);
927 if (wret < 0)
928 {
929 TRACE(Warn, TagsWriteError(taglp, 1, wret));
930 return;
931 }
932 }
933 else
934 {
935 // something else wrong
936 TRACE(Warn, CRCMismatchError(tag_len, taglp, tag_crc, tagv) << " dp_ext_is_zero=" << dp_ext_is_zero << " (ignoring)");
937 }
938 }
939}
#define tident
#define EPNAME(x)
XrdOucTrace OssCsiTrace
uint32_t crc32c(uint32_t crc, void const *buf, size_t len)
struct myOpts opts
#define TRACE(act, x)
Definition XrdTrace.hh:63
XrdSysCondVar tscond_
int UpdateRangeAligned(const void *, off_t, size_t, const Sizes_t &)
int StoreRangeUnaligned(XrdOssDF *, const void *, off_t, size_t, const Sizes_t &, const uint32_t *)
ssize_t apply_sequential_aligned_modify(const void *, off_t, size_t, const uint32_t *, bool, bool, uint32_t, uint32_t)
XrdOssCsiRanges ranges_
static ssize_t maxread(XrdOssDF *fd, void *buff, const off_t off, const size_t sz, size_t tg=0)
std::string TagsReadError(off_t start, size_t n, int ret)
std::unique_ptr< XrdOssCsiTagstore > ts_
int FetchRangeAligned(const void *, off_t, size_t, const Sizes_t &, uint32_t *, uint64_t)
int LockTruncateSize(off_t, bool)
XrdOssCsiPages(const std::string &fn, std::unique_ptr< XrdOssCsiTagstore > ts, bool wh, bool am, bool dpe, bool dlw, const char *)
int UpdateRangeUnaligned(XrdOssDF *, const void *, off_t, size_t, const Sizes_t &)
std::string TagsWriteError(off_t start, size_t n, int ret)
int StoreRangeAligned(const void *, off_t, size_t, const Sizes_t &, uint32_t *)
int UpdateRangeHoleUntilPage(XrdOssDF *, off_t, const Sizes_t &)
static ssize_t fullread(XrdOssDF *fd, void *buff, const off_t off, const size_t sz)
std::pair< off_t, off_t > Sizes_t
int FetchRangeUnaligned(XrdOssDF *, const void *, off_t, size_t, const Sizes_t &, uint32_t *, uint64_t)
int truncate(XrdOssDF *, off_t, XrdOssCsiRangeGuard &)
XrdSysMutex rangeaddmtx_
int LockResetSizes(XrdOssDF *, off_t)
int VerifyRangeAligned(const void *, off_t, size_t, const Sizes_t &)
int VerifyRangeUnaligned(XrdOssDF *, const void *, off_t, size_t, const Sizes_t &)
int LockSetTrackedSize(off_t)
int FetchRange(XrdOssDF *, const void *, off_t, size_t, uint32_t *, uint64_t, XrdOssCsiRangeGuard &)
std::string CRCMismatchError(size_t blen, off_t pgnum, uint32_t got, uint32_t expected)
void BasicConsistencyCheck(XrdOssDF *)
void LockTrackinglen(XrdOssCsiRangeGuard &, off_t, off_t, bool)
std::string PageReadError(size_t blen, off_t pgnum, int ret)
int Open(const char *path, off_t dsize, int flags, XrdOucEnv &envP)
int TrackedSizesGet(Sizes_t &, bool)
const bool loosewriteConfigured_
static int pgWritePrelockCheck(const void *, off_t, size_t, const uint32_t *, uint64_t)
const std::string fn_
int StoreRange(XrdOssDF *, const void *, off_t, size_t, uint32_t *, uint64_t, XrdOssCsiRangeGuard &)
static const size_t stsize_
int UpdateRange(XrdOssDF *, const void *, off_t, size_t, XrdOssCsiRangeGuard &)
static void pgDoCalc(const void *, off_t, size_t, uint32_t *)
int VerifyRange(XrdOssDF *, const void *, off_t, size_t, XrdOssCsiRangeGuard &)
const std::pair< off_t, off_t > & getTrackinglens() const
void SetTrackingInfo(XrdOssCsiPages *p, const std::pair< off_t, off_t > &tsizes, bool locked)
void AddRange(const off_t start, const off_t end, XrdOssCsiRangeGuard &rg, bool rdonly)
static const uint64_t doCalc
pgw: Calculate checksums
Definition XrdOss.hh:225
static const uint64_t Verify
all: Verify checksums
Definition XrdOss.hh:223
static const int PF_csVer
verified file checksums present
Definition XrdOss.hh:778
static const int PF_csVun
unverified file checksums present
Definition XrdOss.hh:779
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition XrdOucCRC.cc:190
static bool Ver32C(const void *data, size_t count, const uint32_t csval, uint32_t *csbad=0)
Definition XrdOucCRC.cc:222
static const int PageSize