XRootD
Loading...
Searching...
No Matches
XrdPfcFile.hh
Go to the documentation of this file.
1#ifndef __XRDPFC_FILE_HH__
2#define __XRDPFC_FILE_HH__
3//----------------------------------------------------------------------------------
4// Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
5// Author: Alja Mrak-Tadel, Matevz Tadel
6//----------------------------------------------------------------------------------
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//----------------------------------------------------------------------------------
20
23
24#include "XrdOuc/XrdOucCache.hh"
25#include "XrdOuc/XrdOucIOVec.hh"
26
27#include "XrdPfcInfo.hh"
28#include "XrdPfcStats.hh"
29
30#include <functional>
31#include <map>
32#include <set>
33#include <string>
34
35class XrdJob;
36class XrdOucIOVec;
37
38namespace XrdCl
39{
40class Log;
41}
42
43namespace XrdPfc
44{
45class BlockResponseHandler;
46class DirectResponseHandler;
47class IO;
48
49struct ReadVBlockListRAM;
50struct ReadVChunkListRAM;
51struct ReadVBlockListDisk;
52struct ReadVChunkListDisk;
53}
54
55
56namespace XrdPfc
57{
58class File;
59
61{
63 int m_n_chunks = 0; // Only set for ReadV().
64 unsigned short m_seq_id;
65 XrdOucCacheIOCB *m_iocb; // External callback passed into IO::Read().
66
67 ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb) :
68 m_seq_id(sid), m_iocb(iocb)
69 {}
70};
71
72// -------------------------------------------------------------
73
75{
77 ReadReqRH *m_rh; // Internal callback created in IO::Read().
78
79 long long m_bytes_read = 0;
80 int m_error_cond = 0; // to be set to -errno
83
85 bool m_sync_done = false;
86 bool m_direct_done = true;
87
89 m_io(io), m_rh(rh)
90 {}
91
93
94 bool is_complete() const { return m_n_chunk_reqs == 0 && m_sync_done && m_direct_done; }
96};
97
98// -------------------------------------------------------------
99
101{
103 char *m_buf; // Where to place the data chunk.
104 long long m_off; // Offset *within* the corresponding block.
105 int m_size; // Size of the data chunk.
106
107 ChunkRequest(ReadRequest *rreq, char *buf, long long off, int size) :
108 m_read_req(rreq), m_buf(buf), m_off(off), m_size(size)
109 {}
110};
111
112using vChunkRequest_t = std::vector<ChunkRequest>;
113using vChunkRequest_i = std::vector<ChunkRequest>::iterator;
114
115// ================================================================
116
117class Block
118{
119public:
121 IO *m_io; // IO that handled current request, used for == / != comparisons only
122 void *m_req_id; // Identity of requestor -- used for stats.
123
124 char *m_buff;
125 long long m_offset;
129 int m_errno; // stores negative errno
135
137
138 Block(File *f, IO *io, void *rid, char *buf, long long off, int size, int rsize,
139 bool m_prefetch, bool cks_net) :
140 m_file(f), m_io(io), m_req_id(rid),
141 m_buff(buf), m_offset(off), m_size(size), m_req_size(rsize),
144 {}
145
146 char* get_buff() const { return m_buff; }
147 int get_size() const { return m_size; }
148 int get_req_size() const { return m_req_size; }
149 long long get_offset() const { return m_offset; }
150
151 File* get_file() const { return m_file; }
152 IO* get_io() const { return m_io; }
153 void* get_req_id() const { return m_req_id; }
154
155 bool is_finished() const { return m_downloaded || m_errno != 0; }
156 bool is_ok() const { return m_downloaded; }
157 bool is_failed() const { return m_errno != 0; }
158
159 void set_downloaded() { m_downloaded = true; }
160 void set_error(int err) { m_errno = err; }
161 int get_error() const { return m_errno; }
162
163 void reset_error_and_set_io(IO *io, void *rid)
164 {
165 m_errno = 0;
166 m_io = io;
167 m_req_id = rid;
168 }
169
170 bool req_cksum_net() const { return m_req_cksum_net; }
171 bool has_cksums() const { return ! m_cksum_vec.empty(); }
175};
176
177using BlockList_t = std::list<Block*>;
178using BlockList_i = std::list<Block*>::iterator;
179
180// ================================================================
181
183{
184public:
186
188
189 void Done(int result) override;
190};
191
192// ----------------------------------------------------------------
193
195{
196public:
202 int m_errno = 0;
203
204 DirectResponseHandler(File *file, ReadRequest *rreq, int to_wait) :
205 m_file(file), m_read_req(rreq), m_to_wait(to_wait)
206 {}
207
208 void Done(int result) override;
209};
210
211// ================================================================
212
213class File
214{
217public:
218 // Constructor and Open() are private.
219
221 static File* FileOpen(const std::string &path, long long offset, long long fileSize);
222
224 ~File();
225
228
230 void BlocksRemovedFromWriteQ(std::list<Block*>&);
231
233 int Read(IO *io, char* buff, long long offset, int size, ReadReqRH *rh);
234
236 int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh);
237
238 //----------------------------------------------------------------------
240 //----------------------------------------------------------------------
241 void ioUpdated(IO *io);
242
243 //----------------------------------------------------------------------
246 //----------------------------------------------------------------------
247 bool ioActive(IO *io);
248
249 //----------------------------------------------------------------------
252 //----------------------------------------------------------------------
254
255 //----------------------------------------------------------------------
258 //----------------------------------------------------------------------
260
261 //----------------------------------------------------------------------
263 //----------------------------------------------------------------------
264 void Sync();
265
266 void WriteBlockToDisk(Block* b);
267
268 void Prefetch();
269
270 float GetPrefetchScore() const;
271
273 const char* lPath() const;
274
275 std::string& GetLocalPath() { return m_filename; }
276
279
280 long long GetFileSize() { return m_file_size; }
281
282 void AddIO(IO *io);
285 void RemoveIO(IO *io);
286
288
289 std::string GetRemoteLocations() const;
290 const Info::AStat* GetLastAccessStats() const { return m_cfi.GetLastAccessStats(); }
291 size_t GetAccessCnt() const { return m_cfi.GetAccessCnt(); }
292 int GetBlockSize() const { return m_cfi.GetBufferSize(); }
293 int GetNBlocks() const { return m_cfi.GetNBlocks(); }
294 int GetNDownloadedBlocks() const { return m_cfi.GetNDownloadedBlocks(); }
295 long long GetPrefetchedBytes() const { return m_prefetch_bytes; }
296 const Stats& RefStats() const { return m_stats; }
297
298 int Fstat(struct stat &sbuff);
299
300 // These three methods are called under Cache's m_active lock
301 int get_ref_cnt() { return m_ref_cnt; }
302 int inc_ref_cnt() { return ++m_ref_cnt; }
303 int dec_ref_cnt() { return --m_ref_cnt; }
304
306 bool is_in_emergency_shutdown() { return m_in_shutdown; }
307
308private:
310 File(const std::string &path, long long offset, long long fileSize);
311
313 bool Open();
314
315 static const char *m_traceID;
316
317 int m_ref_cnt;
318
319 XrdOssDF *m_data_file;
320 XrdOssDF *m_info_file;
321 Info m_cfi;
322
323 std::string m_filename;
324 long long m_offset;
325 long long m_file_size;
326
327 // IO objects attached to this file.
328
329 typedef std::set<IO*> IoSet_t;
330 typedef IoSet_t::iterator IoSet_i;
331
332 IoSet_t m_io_set;
333 IoSet_i m_current_io;
334 int m_ios_in_detach;
335
336 // FSync
337
338 std::vector<int> m_writes_during_sync;
339 int m_non_flushed_cnt;
340 bool m_in_sync;
341 bool m_detach_time_logged;
342 bool m_in_shutdown;
343
344 // Block state and management
345
346 typedef std::list<int> IntList_t;
347 typedef IntList_t::iterator IntList_i;
348
349 typedef std::map<int, Block*> BlockMap_t;
350 typedef BlockMap_t::iterator BlockMap_i;
351
352 BlockMap_t m_block_map;
353 XrdSysCondVar m_state_cond;
354 long long m_block_size;
355 int m_num_blocks;
356
357 // Stats
358
359 Stats m_stats;
360 Stats m_last_stats;
361
362 std::set<std::string> m_remote_locations;
363 void insert_remote_location(const std::string &loc);
364
365 // Prefetch
366
367 enum PrefetchState_e { kOff=-1, kOn, kHold, kStopped, kComplete };
368
369 PrefetchState_e m_prefetch_state;
370
371 long long m_prefetch_bytes;
372 int m_prefetch_read_cnt;
373 int m_prefetch_hit_cnt;
374 float m_prefetch_score; // cached
375
376 void inc_prefetch_read_cnt(int prc) { if (prc) { m_prefetch_read_cnt += prc; calc_prefetch_score(); } }
377 void inc_prefetch_hit_cnt (int phc) { if (phc) { m_prefetch_hit_cnt += phc; calc_prefetch_score(); } }
378 void calc_prefetch_score() { m_prefetch_score = float(m_prefetch_hit_cnt) / m_prefetch_read_cnt; }
379
380 // Helpers
381
382 bool overlap(int blk, // block to query
383 long long blk_size, //
384 long long req_off, // offset of user request
385 int req_size, // size of user request
386 // output:
387 long long &off, // offset in user buffer
388 long long &blk_off, // offset in block
389 int &size);
390
391 // Read & ReadV
392
393 Block* PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch);
394
395 void ProcessBlockRequest (Block *b);
396 void ProcessBlockRequests(BlockList_t& blks);
397
398 void RequestBlocksDirect(IO *io, ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec, int expected_size);
399
400 int ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec, int expected_size);
401
402 int ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum,
403 ReadReqRH *rh, const char *tpfx);
404
405 void ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond);
406 void ProcessBlockError(Block *b, ReadRequest *rreq);
407 void ProcessBlockSuccess(Block *b, ChunkRequest &creq);
408 void FinalizeReadRequest(ReadRequest *rreq);
409
410 void ProcessBlockResponse(Block *b, int res);
411
412 // Block management
413
414 void inc_ref_count(Block* b);
415 void dec_ref_count(Block* b, int count = 1);
416 void free_block(Block*);
417
418 bool select_current_io_or_disable_prefetching(bool skip_current);
419
420 int offsetIdx(int idx) const;
421};
422
423//------------------------------------------------------------------------------
424
425inline void File::inc_ref_count(Block* b)
426{
427 // Method always called under lock.
428 b->m_refcnt++;
429}
430
431//------------------------------------------------------------------------------
432
433inline void File::dec_ref_count(Block* b, int count)
434{
435 // Method always called under lock.
436 assert(b->is_finished());
437 b->m_refcnt -= count;
438 assert(b->m_refcnt >= 0);
439
440 if (b->m_refcnt == 0)
441 {
442 free_block(b);
443 }
444}
445
446}
447
448#endif
#define stat(a, b)
Definition XrdPosix.hh:96
XrdOucString File
void Done(int result) override
int get_size() const
int get_error() const
int get_n_cksum_errors()
int * ptr_n_cksum_errors()
Block(File *f, IO *io, void *rid, char *buf, long long off, int size, int rsize, bool m_prefetch, bool cks_net)
IO * get_io() const
vCkSum_t & ref_cksum_vec()
long long get_offset() const
vChunkRequest_t m_chunk_reqs
bool is_finished() const
bool is_ok() const
void set_error(int err)
void * get_req_id() const
void set_downloaded()
bool req_cksum_net() const
char * get_buff() const
bool has_cksums() const
bool is_failed() const
long long m_offset
File * get_file() const
vCkSum_t m_cksum_vec
void reset_error_and_set_io(IO *io, void *rid)
int get_req_size() const
void Done(int result) override
DirectResponseHandler(File *file, ReadRequest *rreq, int to_wait)
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
XrdSysTrace * GetTrace()
void WriteBlockToDisk(Block *b)
std::string & GetLocalPath()
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
float GetPrefetchScore() const
XrdSysError * GetLog()
int GetNBlocks() const
void StopPrefetchingOnIO(IO *io)
std::string GetRemoteLocations() const
size_t GetAccessCnt() const
int Fstat(struct stat &sbuff)
void AddIO(IO *io)
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
long long GetPrefetchedBytes() const
int GetBlockSize() const
int GetNDownloadedBlocks() const
const Info::AStat * GetLastAccessStats() const
long long GetFileSize()
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
void initiate_emergency_shutdown()
int inc_ref_cnt()
int GetPrefetchCountOnIO(IO *io)
const Stats & RefStats() const
void Sync()
Sync file cache inf o and output data with disk.
int dec_ref_cnt()
int get_ref_cnt()
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
~File()
Destructor.
Definition XrdPfcFile.cc:77
void RemoveIO(IO *io)
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
Stats DeltaStatsFromLastCall()
bool is_in_emergency_shutdown()
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition XrdPfcIO.hh:18
Status of cached file. Can be read from and written into a binary file.
Definition XrdPfcInfo.hh:45
const AStat * GetLastAccessStats() const
Get latest access stats.
long long GetBufferSize() const
Get prefetch buffer size.
int GetNDownloadedBlocks() const
Get number of downloaded blocks.
size_t GetAccessCnt() const
Get number of accesses.
int GetNBlocks() const
Get number of blocks represented in download-state bit-vector.
Statistics of cache utilisation by a File object.
XrdSysError Log
Definition XrdConfig.cc:112
std::vector< ChunkRequest > vChunkRequest_t
std::vector< ChunkRequest >::iterator vChunkRequest_i
std::list< Block * > BlockList_t
std::vector< uint32_t > vCkSum_t
std::list< Block * >::iterator BlockList_i
ChunkRequest(ReadRequest *rreq, char *buf, long long off, int size)
ReadRequest * m_read_req
Access statistics.
Definition XrdPfcInfo.hh:61
XrdOucCacheIOCB * m_iocb
Definition XrdPfcFile.hh:65
unsigned short m_seq_id
Definition XrdPfcFile.hh:64
ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb)
Definition XrdPfcFile.hh:67
void update_error_cond(int ec)
Definition XrdPfcFile.hh:92
ReadRequest(IO *io, ReadReqRH *rh)
Definition XrdPfcFile.hh:88
bool is_complete() const
Definition XrdPfcFile.hh:94
int return_value() const
Definition XrdPfcFile.hh:95
long long m_bytes_read
Definition XrdPfcFile.hh:79