XRootD
Loading...
Searching...
No Matches
XrdPfcFile.hh
Go to the documentation of this file.
1#ifndef __XRDPFC_FILE_HH__
2#define __XRDPFC_FILE_HH__
3//----------------------------------------------------------------------------------
4// Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
5// Author: Alja Mrak-Tadel, Matevz Tadel
6//----------------------------------------------------------------------------------
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//----------------------------------------------------------------------------------
20
23
24#include "XrdOuc/XrdOucCache.hh"
25#include "XrdOuc/XrdOucIOVec.hh"
26
27#include "XrdPfcInfo.hh"
28#include "XrdPfcStats.hh"
29
30#include <functional>
31#include <map>
32#include <set>
33#include <string>
34
35class XrdJob;
36class XrdOucIOVec;
37
38namespace XrdCl
39{
40class Log;
41}
42
43namespace XrdPfc
44{
45class BlockResponseHandler;
46class DirectResponseHandler;
47class IO;
48
49struct ReadVBlockListRAM;
50struct ReadVChunkListRAM;
51struct ReadVBlockListDisk;
52struct ReadVChunkListDisk;
53}
54
55
56namespace XrdPfc
57{
58class File;
59
61{
63 int m_n_chunks = 0; // Only set for ReadV().
64 unsigned short m_seq_id;
65 XrdOucCacheIOCB *m_iocb; // External callback passed into IO::Read().
66
67 ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb) :
68 m_seq_id(sid), m_iocb(iocb)
69 {}
70};
71
72// -------------------------------------------------------------
73
75{
77 ReadReqRH *m_rh; // Internal callback created in IO::Read().
78
79 long long m_bytes_read = 0;
80 int m_error_cond = 0; // to be set to -errno
82
84 bool m_sync_done = false;
85 bool m_direct_done = true;
86
88 m_io(io), m_rh(rh)
89 {}
90
91 void update_error_cond(int ec) { if (m_error_cond == 0 ) m_error_cond = ec; }
92
93 bool is_complete() const { return m_n_chunk_reqs == 0 && m_sync_done && m_direct_done; }
95};
96
97// -------------------------------------------------------------
98
100{
102 char *m_buf; // Where to place the data chunk.
103 long long m_off; // Offset *within* the corresponding block.
104 int m_size; // Size of the data chunk.
105
106 ChunkRequest(ReadRequest *rreq, char *buf, long long off, int size) :
107 m_read_req(rreq), m_buf(buf), m_off(off), m_size(size)
108 {}
109};
110
111using vChunkRequest_t = std::vector<ChunkRequest>;
112using vChunkRequest_i = std::vector<ChunkRequest>::iterator;
113
114// ================================================================
115
116class Block
117{
118public:
120 IO *m_io; // IO that handled current request, used for == / != comparisons only
121 void *m_req_id; // Identity of requestor -- used for stats.
122
123 char *m_buff;
124 long long m_offset;
128 int m_errno; // stores negative errno
134
136
137 Block(File *f, IO *io, void *rid, char *buf, long long off, int size, int rsize,
138 bool m_prefetch, bool cks_net) :
139 m_file(f), m_io(io), m_req_id(rid),
140 m_buff(buf), m_offset(off), m_size(size), m_req_size(rsize),
143 {}
144
145 char* get_buff() const { return m_buff; }
146 int get_size() const { return m_size; }
147 int get_req_size() const { return m_req_size; }
148 long long get_offset() const { return m_offset; }
149
150 File* get_file() const { return m_file; }
151 IO* get_io() const { return m_io; }
152 void* get_req_id() const { return m_req_id; }
153
154 bool is_finished() const { return m_downloaded || m_errno != 0; }
155 bool is_ok() const { return m_downloaded; }
156 bool is_failed() const { return m_errno != 0; }
157
158 void set_downloaded() { m_downloaded = true; }
159 void set_error(int err) { m_errno = err; }
160 int get_error() const { return m_errno; }
161
162 void reset_error_and_set_io(IO *io, void *rid)
163 {
164 m_errno = 0;
165 m_io = io;
166 m_req_id = rid;
167 }
168
169 bool req_cksum_net() const { return m_req_cksum_net; }
170 bool has_cksums() const { return ! m_cksum_vec.empty(); }
174};
175
176using BlockList_t = std::list<Block*>;
177using BlockList_i = std::list<Block*>::iterator;
178
179// ================================================================
180
182{
183public:
185
187
188 void Done(int result) override;
189};
190
191// ----------------------------------------------------------------
192
194{
195public:
201 int m_errno = 0;
202
203 DirectResponseHandler(File *file, ReadRequest *rreq, int to_wait) :
204 m_file(file), m_read_req(rreq), m_to_wait(to_wait)
205 {}
206
207 void Done(int result) override;
208};
209
210// ================================================================
211
212class File
213{
216public:
217 // Constructor and Open() are private.
218
220 static File* FileOpen(const std::string &path, long long offset, long long fileSize);
221
223 ~File();
224
227
229 void BlocksRemovedFromWriteQ(std::list<Block*>&);
230
232 int Read(IO *io, char* buff, long long offset, int size, ReadReqRH *rh);
233
235 int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh);
236
237 //----------------------------------------------------------------------
239 //----------------------------------------------------------------------
240 void ioUpdated(IO *io);
241
242 //----------------------------------------------------------------------
245 //----------------------------------------------------------------------
246 bool ioActive(IO *io);
247
248 //----------------------------------------------------------------------
251 //----------------------------------------------------------------------
253
254 //----------------------------------------------------------------------
257 //----------------------------------------------------------------------
259
260 //----------------------------------------------------------------------
262 //----------------------------------------------------------------------
263 void Sync();
264
265 void WriteBlockToDisk(Block* b);
266
267 void Prefetch();
268
269 float GetPrefetchScore() const;
270
272 const char* lPath() const;
273
274 std::string& GetLocalPath() { return m_filename; }
275
278
279 long long GetFileSize() { return m_file_size; }
280
281 void AddIO(IO *io);
284 void RemoveIO(IO *io);
285
287
288 std::string GetRemoteLocations() const;
289 const Info::AStat* GetLastAccessStats() const { return m_cfi.GetLastAccessStats(); }
290 size_t GetAccessCnt() const { return m_cfi.GetAccessCnt(); }
291 int GetBlockSize() const { return m_cfi.GetBufferSize(); }
292 int GetNBlocks() const { return m_cfi.GetNBlocks(); }
293 int GetNDownloadedBlocks() const { return m_cfi.GetNDownloadedBlocks(); }
294 const Stats& RefStats() const { return m_stats; }
295
296 // These three methods are called under Cache's m_active lock
297 int get_ref_cnt() { return m_ref_cnt; }
298 int inc_ref_cnt() { return ++m_ref_cnt; }
299 int dec_ref_cnt() { return --m_ref_cnt; }
300
302 bool is_in_emergency_shutdown() { return m_in_shutdown; }
303
304private:
306 File(const std::string &path, long long offset, long long fileSize);
307
309 bool Open();
310
311 static const char *m_traceID;
312
313 int m_ref_cnt;
314
315 XrdOssDF *m_data_file;
316 XrdOssDF *m_info_file;
317 Info m_cfi;
318
319 std::string m_filename;
320 long long m_offset;
321 long long m_file_size;
322
323 // IO objects attached to this file.
324
325 typedef std::set<IO*> IoSet_t;
326 typedef IoSet_t::iterator IoSet_i;
327
328 IoSet_t m_io_set;
329 IoSet_i m_current_io;
330 int m_ios_in_detach;
331
332 // FSync
333
334 std::vector<int> m_writes_during_sync;
335 int m_non_flushed_cnt;
336 bool m_in_sync;
337 bool m_detach_time_logged;
338 bool m_in_shutdown;
339
340 // Block state and management
341
342 typedef std::list<int> IntList_t;
343 typedef IntList_t::iterator IntList_i;
344
345 typedef std::map<int, Block*> BlockMap_t;
346 typedef BlockMap_t::iterator BlockMap_i;
347
348 BlockMap_t m_block_map;
349 XrdSysCondVar m_state_cond;
350 long long m_block_size;
351 int m_num_blocks;
352
353 // Stats
354
355 Stats m_stats;
356 Stats m_last_stats;
357
358 std::set<std::string> m_remote_locations;
359 void insert_remote_location(const std::string &loc);
360
361 // Prefetch
362
363 enum PrefetchState_e { kOff=-1, kOn, kHold, kStopped, kComplete };
364
365 PrefetchState_e m_prefetch_state;
366
367 int m_prefetch_read_cnt;
368 int m_prefetch_hit_cnt;
369 float m_prefetch_score; // cached
370
371 void inc_prefetch_read_cnt(int prc) { if (prc) { m_prefetch_read_cnt += prc; calc_prefetch_score(); } }
372 void inc_prefetch_hit_cnt (int phc) { if (phc) { m_prefetch_hit_cnt += phc; calc_prefetch_score(); } }
373 void calc_prefetch_score() { m_prefetch_score = float(m_prefetch_hit_cnt) / m_prefetch_read_cnt; }
374
375 // Helpers
376
377 bool overlap(int blk, // block to query
378 long long blk_size, //
379 long long req_off, // offset of user request
380 int req_size, // size of user request
381 // output:
382 long long &off, // offset in user buffer
383 long long &blk_off, // offset in block
384 int &size);
385
386 // Read & ReadV
387
388 Block* PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch);
389
390 void ProcessBlockRequest (Block *b);
391 void ProcessBlockRequests(BlockList_t& blks);
392
393 void RequestBlocksDirect(IO *io, ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec, int expected_size);
394
395 int ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec, int expected_size);
396
397 int ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum,
398 ReadReqRH *rh, const char *tpfx);
399
400 void ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond);
401 void ProcessBlockError(Block *b, ReadRequest *rreq);
402 void ProcessBlockSuccess(Block *b, ChunkRequest &creq);
403 void FinalizeReadRequest(ReadRequest *rreq);
404
405 void ProcessBlockResponse(Block *b, int res);
406
407 // Block management
408
409 void inc_ref_count(Block* b);
410 void dec_ref_count(Block* b, int count = 1);
411 void free_block(Block*);
412
413 bool select_current_io_or_disable_prefetching(bool skip_current);
414
415 int offsetIdx(int idx) const;
416};
417
418//------------------------------------------------------------------------------
419
420inline void File::inc_ref_count(Block* b)
421{
422 // Method always called under lock.
423 b->m_refcnt++;
424}
425
426//------------------------------------------------------------------------------
427
428inline void File::dec_ref_count(Block* b, int count)
429{
430 // Method always called under lock.
431 assert(b->is_finished());
432 b->m_refcnt -= count;
433 assert(b->m_refcnt >= 0);
434
435 if (b->m_refcnt == 0)
436 {
437 free_block(b);
438 }
439}
440
441}
442
443#endif
XrdOucString File
void Done(int result) override
int get_size() const
int get_error() const
int get_n_cksum_errors()
int * ptr_n_cksum_errors()
Block(File *f, IO *io, void *rid, char *buf, long long off, int size, int rsize, bool m_prefetch, bool cks_net)
IO * get_io() const
vCkSum_t & ref_cksum_vec()
long long get_offset() const
vChunkRequest_t m_chunk_reqs
bool is_finished() const
bool is_ok() const
void set_error(int err)
void * get_req_id() const
void set_downloaded()
bool req_cksum_net() const
char * get_buff() const
bool has_cksums() const
bool is_failed() const
long long m_offset
File * get_file() const
vCkSum_t m_cksum_vec
void reset_error_and_set_io(IO *io, void *rid)
int get_req_size() const
void Done(int result) override
DirectResponseHandler(File *file, ReadRequest *rreq, int to_wait)
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
XrdSysTrace * GetTrace()
void WriteBlockToDisk(Block *b)
std::string & GetLocalPath()
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
Definition XrdPfcFile.cc:99
float GetPrefetchScore() const
XrdSysError * GetLog()
int GetNBlocks() const
void StopPrefetchingOnIO(IO *io)
std::string GetRemoteLocations() const
size_t GetAccessCnt() const
void AddIO(IO *io)
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
int GetBlockSize() const
int GetNDownloadedBlocks() const
const Info::AStat * GetLastAccessStats() const
long long GetFileSize()
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
void initiate_emergency_shutdown()
int inc_ref_cnt()
int GetPrefetchCountOnIO(IO *io)
const Stats & RefStats() const
void Sync()
Sync file cache inf o and output data with disk.
int dec_ref_cnt()
int get_ref_cnt()
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
~File()
Destructor.
Definition XrdPfcFile.cc:76
void RemoveIO(IO *io)
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
Stats DeltaStatsFromLastCall()
bool is_in_emergency_shutdown()
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition XrdPfcIO.hh:18
Status of cached file. Can be read from and written into a binary file.
Definition XrdPfcInfo.hh:45
const AStat * GetLastAccessStats() const
Get latest access stats.
long long GetBufferSize() const
Get prefetch buffer size.
int GetNDownloadedBlocks() const
Get number of downloaded blocks.
size_t GetAccessCnt() const
Get number of accesses.
int GetNBlocks() const
Get number of blocks represented in download-state bit-vector.
Statistics of cache utilisation by a File object.
XrdSysError Log
Definition XrdConfig.cc:112
std::vector< ChunkRequest > vChunkRequest_t
std::vector< ChunkRequest >::iterator vChunkRequest_i
std::list< Block * > BlockList_t
std::vector< uint32_t > vCkSum_t
std::list< Block * >::iterator BlockList_i
ChunkRequest(ReadRequest *rreq, char *buf, long long off, int size)
ReadRequest * m_read_req
Access statistics.
Definition XrdPfcInfo.hh:61
XrdOucCacheIOCB * m_iocb
Definition XrdPfcFile.hh:65
unsigned short m_seq_id
Definition XrdPfcFile.hh:64
ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb)
Definition XrdPfcFile.hh:67
void update_error_cond(int ec)
Definition XrdPfcFile.hh:91
ReadRequest(IO *io, ReadReqRH *rh)
Definition XrdPfcFile.hh:87
bool is_complete() const
Definition XrdPfcFile.hh:93
int return_value() const
Definition XrdPfcFile.hh:94
long long m_bytes_read
Definition XrdPfcFile.hh:79