XRootD
Loading...
Searching...
No Matches
XrdCl::XRootDMsgHandler Class Reference

Handle/Process/Forward XRootD messages. More...

#include <XrdClXRootDMsgHandler.hh>

+ Inheritance diagram for XrdCl::XRootDMsgHandler:
+ Collaboration diagram for XrdCl::XRootDMsgHandler:

Public Member Functions

 XRootDMsgHandler (Message *msg, ResponseHandler *respHandler, const URL *url, std::shared_ptr< SIDManager > sidMgr, LocalFileHandler *lFileHandler)
 
 ~XRootDMsgHandler ()
 Destructor.
 
virtual uint16_t Examine (std::shared_ptr< Message > &msg)
 
time_t GetExpiration ()
 Get a timestamp after which we give up.
 
const MessageGetRequest () const
 Get the request pointer.
 
virtual uint16_t GetSid () const
 
virtual uint16_t InspectStatusRsp ()
 
virtual bool IsRaw () const
 Are we a raw writer or not?
 
virtual void OnStatusReady (const Message *message, XRootDStatus status)
 The requested action has been performed and the status is available.
 
virtual uint8_t OnStreamEvent (StreamEvent event, XRootDStatus status)
 
void PartialReceived ()
 
virtual void Process ()
 Process the message if it was "taken" by the examine action.
 
virtual XRootDStatus ReadMessageBody (Message *msg, Socket *socket, uint32_t &bytesRead)
 
void SetChunkList (ChunkList *chunkList)
 Set the chunk list.
 
void SetCrc32cDigests (std::vector< uint32_t > &&crc32cDigests)
 
void SetExpiration (time_t expiration)
 Set a timestamp after which we give up.
 
void SetFollowMetalink (bool followMetalink)
 
void SetHostList (HostList *hostList)
 Set host list.
 
void SetKernelBuffer (XrdSys::KernelBuffer *kbuff)
 Set the kernel buffer.
 
void SetLoadBalancer (const HostInfo &loadBalancer)
 Set the load balancer.
 
void SetOksofarAsAnswer (bool oksofarAsAnswer)
 
void SetRedirectAsAnswer (bool redirectAsAnswer)
 
void SetRedirectCounter (uint16_t redirectCounter)
 Set the redirect counter.
 
void SetStateful (bool stateful)
 
void WaitDone (time_t now)
 
XRootDStatus WriteMessageBody (Socket *socket, uint32_t &bytesWritten)
 
- Public Member Functions inherited from XrdCl::MsgHandler
virtual ~MsgHandler ()
 Event types that the message handler may receive.
 
virtual void OnReadyToSend (Message *msg)
 

Friends

class HandleRspJob
 

Additional Inherited Members

- Public Types inherited from XrdCl::MsgHandler
enum  Action {
  None = 0x0000 ,
  Nop = 0x0001 ,
  Ignore = 0x0002 ,
  RemoveHandler = 0x0004 ,
  Raw = 0x0008 ,
  NoProcess = 0x0010 ,
  Corrupted = 0x0020 ,
  More = 0x0040
}
 Actions to be taken after a message is processed by the handler. More...
 
enum  StreamEvent {
  Ready = 1 ,
  Broken = 2 ,
  Timeout = 3 ,
  FatalError = 4
}
 Events that may have occurred to the stream. More...
 

Detailed Description

Handle/Process/Forward XRootD messages.

Definition at line 119 of file XrdClXRootDMsgHandler.hh.

Constructor & Destructor Documentation

◆ XRootDMsgHandler()

XrdCl::XRootDMsgHandler::XRootDMsgHandler ( Message msg,
ResponseHandler respHandler,
const URL url,
std::shared_ptr< SIDManager sidMgr,
LocalFileHandler lFileHandler 
)
inline

Constructor

Parameters
msgmessage that has been sent out
respHandlerresponse handler to be called then the final final response arrives
urlthe url the message has been sent to
sidMgrthe sid manager used to allocate SID for the initial message

Definition at line 134 of file XrdClXRootDMsgHandler.hh.

138 :
139 pRequest( msg ),
140 pResponseHandler( respHandler ),
141 pUrl( *url ),
142 pEffectiveDataServerUrl( 0 ),
143 pSidMgr( sidMgr ),
144 pLFileHandler( lFileHandler ),
145 pExpiration( 0 ),
146 pRedirectAsAnswer( false ),
147 pOksofarAsAnswer( false ),
148 pHasLoadBalancer( false ),
149 pHasSessionId( false ),
150 pChunkList( 0 ),
151 pKBuff( 0 ),
152 pRedirectCounter( 0 ),
153 pNotAuthorizedCounter( 0 ),
154
155 pAsyncOffset( 0 ),
156 pAsyncChunkIndex( 0 ),
157
158 pPgWrtCksumBuff( 4 ),
159 pPgWrtCurrentPageOffset( 0 ),
160 pPgWrtCurrentPageNb( 0 ),
161
162 pOtherRawStarted( false ),
163
164 pFollowMetalink( false ),
165
166 pStateful( false ),
167
168 pAggregatedWaitTime( 0 ),
169
170 pMsgInFly( false ),
171
172 pTimeoutFence( false ),
173
174 pDirListStarted( false ),
175 pDirListWithStat( false ),
176
177 pCV( 0 ),
178
179 pSslErrCnt( 0 )
180 {
181 pPostMaster = DefaultEnv::GetPostMaster();
182 if( msg->GetSessionId() )
183 pHasSessionId = true;
184
185 Log *log = DefaultEnv::GetLog();
186 log->Debug( ExDbgMsg, "[%s] MsgHandler created: %p (message: %s ).",
187 pUrl.GetHostId().c_str(), this,
188 pRequest->GetObfuscatedDescription().c_str() );
189
190 ClientRequestHdr *hdr = (ClientRequestHdr*)pRequest->GetBuffer();
191 if( ntohs( hdr->requestid ) == kXR_pgread )
192 {
193 ClientPgReadRequest *pgrdreq = (ClientPgReadRequest*)pRequest->GetBuffer();
194 pCrc32cDigests.reserve( XrdOucPgrwUtils::csNum( ntohll( pgrdreq->offset ),
195 ntohl( pgrdreq->rlen ) ) );
196 }
197
198 if( ntohs( hdr->requestid ) == kXR_readv )
199 pBodyReader.reset( new AsyncVectorReader( *url, *pRequest ) );
200 else if( ntohs( hdr->requestid ) == kXR_read )
201 pBodyReader.reset( new AsyncRawReader( *url, *pRequest ) );
202 else
203 pBodyReader.reset( new AsyncDiscardReader( *url, *pRequest ) );
204 }
kXR_unt16 requestid
Definition XProtocol.hh:157
@ kXR_read
Definition XProtocol.hh:125
@ kXR_readv
Definition XProtocol.hh:137
@ kXR_pgread
Definition XProtocol.hh:142
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition XrdClURL.hh:99
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
const uint64_t ExDbgMsg
XrdSysError Log
Definition XrdConfig.cc:112

References XrdOucPgrwUtils::csNum(), XrdCl::Log::Debug(), XrdCl::ExDbgMsg, XrdCl::Buffer::GetBuffer(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Message::GetSessionId(), kXR_pgread, kXR_read, kXR_readv, ClientPgReadRequest::offset, ClientRequestHdr::requestid, and ClientPgReadRequest::rlen.

+ Here is the call graph for this function:

◆ ~XRootDMsgHandler()

XrdCl::XRootDMsgHandler::~XRootDMsgHandler ( )
inline

Destructor.

Definition at line 209 of file XrdClXRootDMsgHandler.hh.

210 {
211 DumpRedirectTraceBack();
212
213 if( !pHasSessionId )
214 delete pRequest;
215 delete pEffectiveDataServerUrl;
216
217 pRequest = reinterpret_cast<Message*>( 0xDEADBEEF );
218 pResponseHandler = reinterpret_cast<ResponseHandler*>( 0xDEADBEEF );
219 pPostMaster = reinterpret_cast<PostMaster*>( 0xDEADBEEF );
220 pLFileHandler = reinterpret_cast<LocalFileHandler*>( 0xDEADBEEF );
221 pChunkList = reinterpret_cast<ChunkList*>( 0xDEADBEEF );
222 pEffectiveDataServerUrl = reinterpret_cast<URL*>( 0xDEADBEEF );
223
224 Log *log = DefaultEnv::GetLog();
225 log->Debug( ExDbgMsg, "[%s] Destroying MsgHandler: %p.",
226 pUrl.GetHostId().c_str(), this );
227 }
std::vector< ChunkInfo > ChunkList
List of chunks.

References XrdCl::Log::Debug(), XrdCl::ExDbgMsg, XrdCl::URL::GetHostId(), and XrdCl::DefaultEnv::GetLog().

+ Here is the call graph for this function:

Member Function Documentation

◆ Examine()

uint16_t XrdCl::XRootDMsgHandler::Examine ( std::shared_ptr< Message > &  msg)
virtual

Examine an incoming message, and decide on the action to be taken

Parameters
msgthe message, may be zero if receive failed
Returns
action type that needs to be take wrt the message and the handler

Implements XrdCl::MsgHandler.

Definition at line 109 of file XrdClXRootDMsgHandler.cc.

110 {
111 //--------------------------------------------------------------------------
112 // if the MsgHandler is already being used to process another request
113 // (kXR_oksofar) we need to wait
114 //--------------------------------------------------------------------------
115 if( pOksofarAsAnswer )
116 {
117 XrdSysCondVarHelper lck( pCV );
118 while( pResponse ) pCV.Wait();
119 }
120 else
121 {
122 if( pResponse )
123 {
124 Log *log = DefaultEnv::GetLog();
125 log->Warning( ExDbgMsg, "[%s] MsgHandler is examining a response although "
126 "it already owns a response: %p (message: %s ).",
127 pUrl.GetHostId().c_str(), this,
128 pRequest->GetObfuscatedDescription().c_str() );
129 }
130 }
131
132 if( msg->GetSize() < 8 )
133 return Ignore;
134
135 ServerResponse *rsp = (ServerResponse *)msg->GetBuffer();
136 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
137 uint16_t status = 0;
138 uint32_t dlen = 0;
139
140 //--------------------------------------------------------------------------
141 // We only care about async responses, but those are extracted now
142 // in the SocketHandler.
143 //--------------------------------------------------------------------------
144 if( rsp->hdr.status == kXR_attn )
145 {
146 return Ignore;
147 }
148 //--------------------------------------------------------------------------
149 // We got a sync message - check if it belongs to us
150 //--------------------------------------------------------------------------
151 else
152 {
153 if( rsp->hdr.streamid[0] != req->header.streamid[0] ||
154 rsp->hdr.streamid[1] != req->header.streamid[1] )
155 return Ignore;
156
157 status = rsp->hdr.status;
158 dlen = rsp->hdr.dlen;
159 }
160
161 //--------------------------------------------------------------------------
162 // We take the ownership of the message and decide what we will do
163 // with the handler itself, the options are:
164 // 1) we want to either read in raw mode (the Raw flag) or have the message
165 // body reconstructed for us by the TransportHandler by the time
166 // Process() is called (default, no extra flag)
167 // 2) we either got a full response in which case we don't want to be
168 // notified about anything anymore (RemoveHandler) or we got a partial
169 // answer and we need to wait for more (default, no extra flag)
170 //--------------------------------------------------------------------------
171 pResponse = msg;
172 pBodyReader->SetDataLength( dlen );
173
174 Log *log = DefaultEnv::GetLog();
175 switch( status )
176 {
177 //------------------------------------------------------------------------
178 // Handle the cached cases
179 //------------------------------------------------------------------------
180 case kXR_error:
181 case kXR_redirect:
182 case kXR_wait:
183 return RemoveHandler;
184
185 case kXR_waitresp:
186 {
187 log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response to "
188 "message %s", pUrl.GetHostId().c_str(),
189 pRequest->GetObfuscatedDescription().c_str() );
190
191 pResponse.reset();
192 return Ignore; // This must be handled synchronously!
193 }
194
195 //------------------------------------------------------------------------
196 // Handle the potential raw cases
197 //------------------------------------------------------------------------
198 case kXR_ok:
199 {
200 //----------------------------------------------------------------------
201 // For kXR_read we read in raw mode
202 //----------------------------------------------------------------------
203 uint16_t reqId = ntohs( req->header.requestid );
204 if( reqId == kXR_read )
205 {
206 return Raw | RemoveHandler;
207 }
208
209 //----------------------------------------------------------------------
210 // kXR_readv is the same as kXR_read
211 //----------------------------------------------------------------------
212 if( reqId == kXR_readv )
213 {
214 return Raw | RemoveHandler;
215 }
216
217 //----------------------------------------------------------------------
218 // For everything else we just take what we got
219 //----------------------------------------------------------------------
220 return RemoveHandler;
221 }
222
223 //------------------------------------------------------------------------
224 // kXR_oksofars are special, they are not full responses, so we reset
225 // the response pointer to 0 and add the message to the partial list
226 //------------------------------------------------------------------------
227 case kXR_oksofar:
228 {
229 log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request "
230 "%s", pUrl.GetHostId().c_str(),
231 pRequest->GetObfuscatedDescription().c_str() );
232
233 if( !pOksofarAsAnswer )
234 {
235 pPartialResps.emplace_back( std::move( pResponse ) );
236 }
237
238 //----------------------------------------------------------------------
239 // For kXR_read we either read in raw mode if the message has not
240 // been fully reconstructed already, if it has, we adjust
241 // the buffer offset to prepare for the next one
242 //----------------------------------------------------------------------
243 uint16_t reqId = ntohs( req->header.requestid );
244 if( reqId == kXR_read )
245 {
246 pTimeoutFence.store( true, std::memory_order_relaxed );
247 return Raw | ( pOksofarAsAnswer ? None : NoProcess );
248 }
249
250 //----------------------------------------------------------------------
251 // kXR_readv is similar to read, except that the payload is different
252 //----------------------------------------------------------------------
253 if( reqId == kXR_readv )
254 {
255 pTimeoutFence.store( true, std::memory_order_relaxed );
256 return Raw | ( pOksofarAsAnswer ? None : NoProcess );
257 }
258
259 return ( pOksofarAsAnswer ? None : NoProcess );
260 }
261
262 case kXR_status:
263 {
264 log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request "
265 "%s", pUrl.GetHostId().c_str(),
266 pRequest->GetObfuscatedDescription().c_str() );
267
268 uint16_t reqId = ntohs( req->header.requestid );
269 if( reqId == kXR_pgwrite )
270 {
271 //--------------------------------------------------------------------
272 // In case of pgwrite by definition this wont be a partial response
273 // so we can already remove the handler from the in-queue
274 //--------------------------------------------------------------------
275 return RemoveHandler;
276 }
277
278 //----------------------------------------------------------------------
279 // Otherwise (pgread), first of all we need to read the body of the
280 // kXR_status response, we can handle the raw data (if any) only after
281 // we have the whole kXR_status body
282 //----------------------------------------------------------------------
283 pTimeoutFence.store( true, std::memory_order_relaxed );
284 return None;
285 }
286
287 //------------------------------------------------------------------------
288 // Default
289 //------------------------------------------------------------------------
290 default:
291 return RemoveHandler;
292 }
293 return RemoveHandler;
294 }
kXR_char streamid[2]
Definition XProtocol.hh:156
kXR_char streamid[2]
Definition XProtocol.hh:914
@ kXR_waitresp
Definition XProtocol.hh:906
@ kXR_redirect
Definition XProtocol.hh:904
@ kXR_oksofar
Definition XProtocol.hh:900
@ kXR_status
Definition XProtocol.hh:907
@ kXR_ok
Definition XProtocol.hh:899
@ kXR_attn
Definition XProtocol.hh:901
@ kXR_wait
Definition XProtocol.hh:905
@ kXR_error
Definition XProtocol.hh:903
struct ClientRequestHdr header
Definition XProtocol.hh:846
@ kXR_pgwrite
Definition XProtocol.hh:138
ServerResponseHeader hdr
@ Ignore
Ignore the message.
const uint64_t XRootDMsg

References ServerResponseHeader::dlen, XrdCl::Log::Dump(), XrdCl::ExDbgMsg, XrdCl::Buffer::GetBuffer(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), ServerResponse::hdr, ClientRequest::header, XrdCl::MsgHandler::Ignore, kXR_attn, kXR_error, kXR_ok, kXR_oksofar, kXR_pgwrite, kXR_read, kXR_readv, kXR_redirect, kXR_status, kXR_wait, kXR_waitresp, XrdCl::MsgHandler::None, XrdCl::MsgHandler::NoProcess, XrdCl::MsgHandler::Raw, XrdCl::MsgHandler::RemoveHandler, ClientRequestHdr::requestid, ServerResponseHeader::status, ClientRequestHdr::streamid, ServerResponseHeader::streamid, XrdSysCondVar::Wait(), XrdCl::Log::Warning(), and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ GetExpiration()

time_t XrdCl::XRootDMsgHandler::GetExpiration ( )
inlinevirtual

Get a timestamp after which we give up.

Implements XrdCl::MsgHandler.

Definition at line 330 of file XrdClXRootDMsgHandler.hh.

331 {
332 return pExpiration;
333 }

◆ GetRequest()

const Message * XrdCl::XRootDMsgHandler::GetRequest ( ) const
inline

Get the request pointer.

Definition at line 356 of file XrdClXRootDMsgHandler.hh.

357 {
358 return pRequest;
359 }

◆ GetSid()

uint16_t XrdCl::XRootDMsgHandler::GetSid ( ) const
virtual

Get handler sid

return sid of the corresponding request, otherwise 0

Implements XrdCl::MsgHandler.

Definition at line 389 of file XrdClXRootDMsgHandler.cc.

390 {
391 ClientRequest* req = (ClientRequest*) pRequest->GetBuffer();
392 return ((uint16_t)req->header.streamid[1] << 8) | (uint16_t)req->header.streamid[0];
393 }

References XrdCl::Buffer::GetBuffer(), ClientRequest::header, and ClientRequestHdr::streamid.

+ Here is the call graph for this function:

◆ InspectStatusRsp()

uint16_t XrdCl::XRootDMsgHandler::InspectStatusRsp ( )
virtual

Reexamine the incoming message, and decide on the action to be taken

In case of kXR_status the message can be only fully examined after reading the whole body (without raw data).

Parameters
msgthe message, may be zero if receive failed
Returns
action type that needs to be take wrt the message and the handler

Implements XrdCl::MsgHandler.

Definition at line 299 of file XrdClXRootDMsgHandler.cc.

300 {
301 if( !pResponse )
302 return 0;
303
304 Log *log = DefaultEnv::GetLog();
305 ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
306
307 //--------------------------------------------------------------------------
308 // Additional action is only required for kXR_status
309 //--------------------------------------------------------------------------
310 if( rsp->hdr.status != kXR_status ) return 0;
311
312 //--------------------------------------------------------------------------
313 // Ignore malformed status response
314 //--------------------------------------------------------------------------
315 if( pResponse->GetSize() < sizeof( ServerResponseStatus ) )
316 {
317 log->Error( XRootDMsg, "[%s] kXR_status: invalid message size.", pUrl.GetHostId().c_str() );
318 return Corrupted;
319 }
320
321 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
322 uint16_t reqId = ntohs( req->header.requestid );
323 //--------------------------------------------------------------------------
324 // Unmarshal the status body
325 //--------------------------------------------------------------------------
326 XRootDStatus st = XRootDTransport::UnMarshalStatusBody( *pResponse, reqId );
327
328 if( !st.IsOK() && st.code == errDataError )
329 {
330 log->Error( XRootDMsg, "[%s] %s", pUrl.GetHostId().c_str(),
331 st.GetErrorMessage().c_str() );
332 return Corrupted;
333 }
334
335 if( !st.IsOK() )
336 {
337 log->Error( XRootDMsg, "[%s] Failed to unmarshall status body.",
338 pUrl.GetHostId().c_str() );
339 pStatus = st;
340 HandleRspOrQueue();
341 return Ignore;
342 }
343
344 //--------------------------------------------------------------------------
345 // Common handling for partial results
346 //--------------------------------------------------------------------------
347 ServerResponseV2 *rspst = (ServerResponseV2*)pResponse->GetBuffer();
349 {
350 pPartialResps.push_back( std::move( pResponse ) );
351 }
352
353 //--------------------------------------------------------------------------
354 // Decide the actions that we need to take
355 //--------------------------------------------------------------------------
356 uint16_t action = 0;
357 if( reqId == kXR_pgread )
358 {
359 //----------------------------------------------------------------------
360 // The message contains only Status header and body but no raw data
361 //----------------------------------------------------------------------
362 if( !pPageReader )
363 pPageReader.reset( new AsyncPageReader( *pChunkList, pCrc32cDigests ) );
364 pPageReader->SetRsp( rspst );
365
366 action |= Raw;
367
369 action |= NoProcess;
370 else
371 action |= RemoveHandler;
372 }
373 else if( reqId == kXR_pgwrite )
374 {
375 // if data corruption has been detected on the server side we will
376 // send some additional data pointing to the pages that need to be
377 // retransmitted
378 if( size_t( sizeof( ServerResponseHeader ) + rspst->status.hdr.dlen + rspst->status.bdy.dlen ) >
379 pResponse->GetCursor() )
380 action |= More;
381 }
382
383 return action;
384 }
ServerResponseStatus status
struct ServerResponseBody_Status bdy
struct ServerResponseHeader hdr
@ More
there are more (non-raw) data to be read
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
const uint16_t errDataError
data is corrupted
@ kXR_PartialResult

References ServerResponseStatus::bdy, XrdCl::Status::code, XrdCl::MsgHandler::Corrupted, ServerResponseHeader::dlen, ServerResponseBody_Status::dlen, XrdCl::errDataError, XrdCl::Log::Error(), XrdCl::Buffer::GetBuffer(), XrdCl::XRootDStatus::GetErrorMessage(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), ServerResponseStatus::hdr, ServerResponse::hdr, ClientRequest::header, XrdCl::MsgHandler::Ignore, XrdCl::Status::IsOK(), XrdProto::kXR_PartialResult, kXR_pgread, kXR_pgwrite, kXR_status, XrdCl::MsgHandler::More, XrdCl::MsgHandler::NoProcess, XrdCl::MsgHandler::Raw, XrdCl::MsgHandler::RemoveHandler, ClientRequestHdr::requestid, ServerResponseBody_Status::resptype, ServerResponseHeader::status, ServerResponseV2::status, XrdCl::XRootDTransport::UnMarshalStatusBody(), and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ IsRaw()

bool XrdCl::XRootDMsgHandler::IsRaw ( ) const
virtual

Are we a raw writer or not?

Reimplemented from XrdCl::MsgHandler.

Definition at line 929 of file XrdClXRootDMsgHandler.cc.

930 {
931 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
932 uint16_t reqId = ntohs( req->header.requestid );
933 if( reqId == kXR_write || reqId == kXR_writev || reqId == kXR_pgwrite )
934 return true;
935 // checkpoint + execute
936 if( reqId == kXR_chkpoint && req->chkpoint.opcode == kXR_ckpXeq )
937 {
938 ClientRequest *xeq = (ClientRequest*)pRequest->GetBuffer( sizeof( ClientRequest ) );
939 reqId = ntohs( xeq->header.requestid );
940 return reqId != kXR_truncate; // only checkpointed truncate does not have raw data
941 }
942
943 return false;
944 }
static const int kXR_ckpXeq
Definition XProtocol.hh:216
@ kXR_writev
Definition XProtocol.hh:143
@ kXR_write
Definition XProtocol.hh:131
@ kXR_truncate
Definition XProtocol.hh:140
@ kXR_chkpoint
Definition XProtocol.hh:124
struct ClientChkPointRequest chkpoint
Definition XProtocol.hh:849

References ClientRequest::chkpoint, XrdCl::Buffer::GetBuffer(), ClientRequest::header, kXR_chkpoint, kXR_ckpXeq, kXR_pgwrite, kXR_truncate, kXR_write, kXR_writev, ClientChkPointRequest::opcode, and ClientRequestHdr::requestid.

+ Here is the call graph for this function:

◆ OnStatusReady()

void XrdCl::XRootDMsgHandler::OnStatusReady ( const Message message,
XRootDStatus  status 
)
virtual

The requested action has been performed and the status is available.

Implements XrdCl::MsgHandler.

Definition at line 896 of file XrdClXRootDMsgHandler.cc.

898 {
899 Log *log = DefaultEnv::GetLog();
900
901 //--------------------------------------------------------------------------
902 // We were successful, so we now need to listen for a response
903 //--------------------------------------------------------------------------
904 if( status.IsOK() )
905 {
906 log->Dump( XRootDMsg, "[%s] Message %s has been successfully sent.",
907 pUrl.GetHostId().c_str(), message->GetObfuscatedDescription().c_str() );
908
909 log->Debug( ExDbgMsg, "[%s] Moving MsgHandler: %p (message: %s ) from out-queue to in-queue.",
910 pUrl.GetHostId().c_str(), this,
911 pRequest->GetObfuscatedDescription().c_str() );
912
913 pMsgInFly = true;
914 return;
915 }
916
917 //--------------------------------------------------------------------------
918 // We have failed, recover if possible
919 //--------------------------------------------------------------------------
920 log->Error( XRootDMsg, "[%s] Impossible to send message %s. Trying to "
921 "recover.", pUrl.GetHostId().c_str(),
922 message->GetObfuscatedDescription().c_str() );
923 HandleError( status );
924 }

References XrdCl::Log::Debug(), XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::ExDbgMsg, XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::Status::IsOK(), and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ OnStreamEvent()

uint8_t XrdCl::XRootDMsgHandler::OnStreamEvent ( StreamEvent  event,
XRootDStatus  status 
)
virtual

Handle an event other that a message arrival

Parameters
eventtype of the event
statusstatus info

Reimplemented from XrdCl::MsgHandler.

Definition at line 859 of file XrdClXRootDMsgHandler.cc.

861 {
862 Log *log = DefaultEnv::GetLog();
863 log->Dump( XRootDMsg, "[%s] Stream event reported for msg %s",
864 pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
865
866 if( event == Ready )
867 return 0;
868
869 if( pTimeoutFence.load( std::memory_order_relaxed ) )
870 return 0;
871
872 HandleError( status );
873 return RemoveHandler;
874 }
@ Ready
The stream has become connected.

References XrdCl::Log::Dump(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::MsgHandler::Ready, XrdCl::MsgHandler::RemoveHandler, and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ PartialReceived()

void XrdCl::XRootDMsgHandler::PartialReceived ( )

Bookkeeping after partial response has been received:

  • take down the timeout fence after oksofar response has been handled
  • reset status-response-body marshaled flag

Definition at line 1106 of file XrdClXRootDMsgHandler.cc.

1107 {
1108 pTimeoutFence.store( false, std::memory_order_relaxed ); // Take down the timeout fence
1109 }

Referenced by XrdCl::Stream::ForceError(), XrdCl::Stream::OnError(), and XrdCl::Stream::OnIncoming().

+ Here is the caller graph for this function:

◆ Process()

void XrdCl::XRootDMsgHandler::Process ( )
virtual

Process the message if it was "taken" by the examine action.

Process the message if it was "taken" by the examine action

Parameters
msgthe message to be processed

Reimplemented from XrdCl::MsgHandler.

Definition at line 398 of file XrdClXRootDMsgHandler.cc.

399 {
400 Log *log = DefaultEnv::GetLog();
401
402 ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
403
404 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
405
406 //--------------------------------------------------------------------------
407 // If it is a local file, it can be only a metalink redirector
408 //--------------------------------------------------------------------------
409 if( pUrl.IsLocalFile() && pUrl.IsMetalink() )
410 pHosts->back().protocol = kXR_PROTOCOLVERSION;
411
412 //--------------------------------------------------------------------------
413 // We got an answer, check who we were talking to
414 //--------------------------------------------------------------------------
415 else
416 {
417 AnyObject qryResult;
418 int *qryResponse = 0;
419 pPostMaster->QueryTransport( pUrl, XRootDQuery::ServerFlags, qryResult );
420 qryResult.Get( qryResponse );
421 pHosts->back().flags = *qryResponse; delete qryResponse; qryResponse = 0;
422 pPostMaster->QueryTransport( pUrl, XRootDQuery::ProtocolVersion, qryResult );
423 qryResult.Get( qryResponse );
424 pHosts->back().protocol = *qryResponse; delete qryResponse;
425 }
426
427 //--------------------------------------------------------------------------
428 // Process the message
429 //--------------------------------------------------------------------------
430 Status st = XRootDTransport::UnMarshallBody( pResponse.get(), req->header.requestid );
431 if( !st.IsOK() )
432 {
433 pStatus = Status( stFatal, errInvalidMessage );
434 HandleResponse();
435 return;
436 }
437
438 //--------------------------------------------------------------------------
439 // we have an response for the message so it's not in fly anymore
440 //--------------------------------------------------------------------------
441 pMsgInFly = false;
442
443 //--------------------------------------------------------------------------
444 // Reset the aggregated wait (used to omit wait response in case of Metalink
445 // redirector)
446 //--------------------------------------------------------------------------
447 if( rsp->hdr.status != kXR_wait )
448 pAggregatedWaitTime = 0;
449
450 switch( rsp->hdr.status )
451 {
452 //------------------------------------------------------------------------
453 // kXR_ok - we're done here
454 //------------------------------------------------------------------------
455 case kXR_ok:
456 {
457 log->Dump( XRootDMsg, "[%s] Got a kXR_ok response to request %s",
458 pUrl.GetHostId().c_str(),
459 pRequest->GetObfuscatedDescription().c_str() );
460 pStatus = Status();
461 HandleResponse();
462 return;
463 }
464
465 case kXR_status:
466 {
467 log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request %s",
468 pUrl.GetHostId().c_str(),
469 pRequest->GetObfuscatedDescription().c_str() );
470 pStatus = Status();
471 HandleResponse();
472 return;
473 }
474
475 //------------------------------------------------------------------------
476 // kXR_ok - we're serving partial result to the user
477 //------------------------------------------------------------------------
478 case kXR_oksofar:
479 {
480 log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request %s",
481 pUrl.GetHostId().c_str(),
482 pRequest->GetObfuscatedDescription().c_str() );
483 pStatus = Status( stOK, suContinue );
484 HandleResponse();
485 return;
486 }
487
488 //------------------------------------------------------------------------
489 // kXR_error - we've got a problem
490 //------------------------------------------------------------------------
491 case kXR_error:
492 {
493 char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
494 memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
495 log->Dump( XRootDMsg, "[%s] Got a kXR_error response to request %s "
496 "[%d] %s", pUrl.GetHostId().c_str(),
497 pRequest->GetObfuscatedDescription().c_str(), rsp->body.error.errnum,
498 errmsg );
499 delete [] errmsg;
500
501 HandleError( Status(stError, errErrorResponse, rsp->body.error.errnum) );
502 return;
503 }
504
505 //------------------------------------------------------------------------
506 // kXR_redirect - they tell us to go elsewhere
507 //------------------------------------------------------------------------
508 case kXR_redirect:
509 {
510 if( rsp->hdr.dlen <= 4 )
511 {
512 log->Error( XRootDMsg, "[%s] Got invalid redirect response.",
513 pUrl.GetHostId().c_str() );
514 pStatus = Status( stError, errInvalidResponse );
515 HandleResponse();
516 return;
517 }
518
519 char *urlInfoBuff = new char[rsp->hdr.dlen-3];
520 urlInfoBuff[rsp->hdr.dlen-4] = 0;
521 memcpy( urlInfoBuff, rsp->body.redirect.host, rsp->hdr.dlen-4 );
522 std::string urlInfo = urlInfoBuff;
523 delete [] urlInfoBuff;
524 log->Dump( XRootDMsg, "[%s] Got kXR_redirect response to "
525 "message %s: %s, port %d", pUrl.GetHostId().c_str(),
526 pRequest->GetObfuscatedDescription().c_str(), urlInfo.c_str(),
527 rsp->body.redirect.port );
528
529 //----------------------------------------------------------------------
530 // Check if we can proceed
531 //----------------------------------------------------------------------
532 if( !pRedirectCounter )
533 {
534 log->Warning( XRootDMsg, "[%s] Redirect limit has been reached for "
535 "message %s, the last known error is: %s",
536 pUrl.GetHostId().c_str(),
537 pRequest->GetObfuscatedDescription().c_str(),
538 pLastError.ToString().c_str() );
539
540
541 pStatus = Status( stFatal, errRedirectLimit );
542 HandleResponse();
543 return;
544 }
545 --pRedirectCounter;
546
547 //----------------------------------------------------------------------
548 // Keep the info about this server if we still need to find a load
549 // balancer
550 //----------------------------------------------------------------------
551 uint32_t flags = pHosts->back().flags;
552 if( !pHasLoadBalancer )
553 {
554 if( flags & kXR_isManager )
555 {
556 //------------------------------------------------------------------
557 // If the current server is a meta manager then it supersedes
558 // any existing load balancer, otherwise we assign a load-balancer
559 // only if it has not been already assigned
560 //------------------------------------------------------------------
561 if( ( flags & kXR_attrMeta ) || !pLoadBalancer.url.IsValid() )
562 {
563 pLoadBalancer = pHosts->back();
564 log->Dump( XRootDMsg, "[%s] Current server has been assigned "
565 "as a load-balancer for message %s",
566 pUrl.GetHostId().c_str(),
567 pRequest->GetObfuscatedDescription().c_str() );
568 HostList::iterator it;
569 for( it = pHosts->begin(); it != pHosts->end(); ++it )
570 it->loadBalancer = false;
571 pHosts->back().loadBalancer = true;
572 }
573 }
574 }
575
576 //----------------------------------------------------------------------
577 // If the redirect comes from a data server safe the URL because
578 // in case of a failure we will use it as the effective data server URL
579 // for the tried CGI opaque info
580 //----------------------------------------------------------------------
581 if( flags & kXR_isServer )
582 pEffectiveDataServerUrl = new URL( pHosts->back().url );
583
584 //----------------------------------------------------------------------
585 // Build the URL and check it's validity
586 //----------------------------------------------------------------------
587 std::vector<std::string> urlComponents;
588 std::string newCgi;
589 Utils::splitString( urlComponents, urlInfo, "?" );
590
591 std::ostringstream o;
592
593 o << urlComponents[0];
594 if( rsp->body.redirect.port > 0 )
595 o << ":" << rsp->body.redirect.port << "/";
596 else if( rsp->body.redirect.port < 0 )
597 {
598 //--------------------------------------------------------------------
599 // check if the manager wants to enforce write recovery at himself
600 // (beware we are dealing here with negative flags)
601 //--------------------------------------------------------------------
602 if( ~uint32_t( rsp->body.redirect.port ) & kXR_recoverWrts )
603 pHosts->back().flags |= kXR_recoverWrts;
604
605 //--------------------------------------------------------------------
606 // check if the manager wants to collapse the communication channel
607 // (the redirect host is to replace the current host)
608 //--------------------------------------------------------------------
609 if( ~uint32_t( rsp->body.redirect.port ) & kXR_collapseRedir )
610 {
611 std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
612 pPostMaster->CollapseRedirect( pUrl, url );
613 }
614
615 if( ~uint32_t( rsp->body.redirect.port ) & kXR_ecRedir )
616 {
617 std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
618 if( Utils::CheckEC( pRequest, url ) )
619 pRedirectAsAnswer = true;
620 }
621 }
622
623 URL newUrl = URL( o.str() );
624 if( !newUrl.IsValid() )
625 {
626 pStatus = Status( stError, errInvalidRedirectURL );
627 log->Error( XRootDMsg, "[%s] Got invalid redirection URL: %s",
628 pUrl.GetHostId().c_str(), urlInfo.c_str() );
629 HandleResponse();
630 return;
631 }
632
633 if( pUrl.GetUserName() != "" && newUrl.GetUserName() == "" )
634 newUrl.SetUserName( pUrl.GetUserName() );
635
636 if( pUrl.GetPassword() != "" && newUrl.GetPassword() == "" )
637 newUrl.SetPassword( pUrl.GetPassword() );
638
639 //----------------------------------------------------------------------
640 // Forward any "xrd.*" params from the original client request also to
641 // the new redirection url
642 // Also, we need to preserve any "xrdcl.*' as they are important for
643 // our internal workflows.
644 //----------------------------------------------------------------------
645 std::ostringstream ossXrd;
646 const URL::ParamsMap &urlParams = pUrl.GetParams();
647
648 for(URL::ParamsMap::const_iterator it = urlParams.begin();
649 it != urlParams.end(); ++it )
650 {
651 if( it->first.compare( 0, 4, "xrd." ) &&
652 it->first.compare( 0, 6, "xrdcl." ) )
653 continue;
654
655 ossXrd << it->first << '=' << it->second << '&';
656 }
657
658 std::string xrdCgi = ossXrd.str();
659 pRedirectUrl = newUrl.GetURL();
660
661 URL cgiURL;
662 if( urlComponents.size() > 1 )
663 {
664 pRedirectUrl += "?";
665 pRedirectUrl += urlComponents[1];
666 std::ostringstream o;
667 o << "fake://fake:111//fake?";
668 o << urlComponents[1];
669
670 if( urlComponents.size() == 3 )
671 o << '?' << urlComponents[2];
672
673 if (!xrdCgi.empty())
674 {
675 o << '&' << xrdCgi;
676 pRedirectUrl += '&';
677 pRedirectUrl += xrdCgi;
678 }
679
680 cgiURL = URL( o.str() );
681 }
682 else {
683 if (!xrdCgi.empty())
684 {
685 std::ostringstream o;
686 o << "fake://fake:111//fake?";
687 o << xrdCgi;
688 cgiURL = URL( o.str() );
689 pRedirectUrl += '?';
690 pRedirectUrl += xrdCgi;
691 }
692 }
693
694 //----------------------------------------------------------------------
695 // Check if we need to return the URL as a response
696 //----------------------------------------------------------------------
697 if( newUrl.GetProtocol() != "root" && newUrl.GetProtocol() != "xroot" &&
698 newUrl.GetProtocol() != "roots" && newUrl.GetProtocol() != "xroots" &&
699 !newUrl.IsLocalFile() )
700 pRedirectAsAnswer = true;
701
702 if( pRedirectAsAnswer )
703 {
704 pStatus = Status( stError, errRedirect );
705 HandleResponse();
706 return;
707 }
708
709 //----------------------------------------------------------------------
710 // Rewrite the message in a way required to send it to another server
711 //----------------------------------------------------------------------
712 newUrl.SetParams( cgiURL.GetParams() );
713 Status st = RewriteRequestRedirect( newUrl );
714 if( !st.IsOK() )
715 {
716 pStatus = st;
717 HandleResponse();
718 return;
719 }
720
721 //----------------------------------------------------------------------
722 // Make sure we don't change the protocol by accident (root vs roots)
723 //----------------------------------------------------------------------
724 if( ( pUrl.GetProtocol() == "roots" || pUrl.GetProtocol() == "xroots" ) &&
725 ( newUrl.GetProtocol() == "root" || newUrl.GetProtocol() == "xroot" ) )
726 newUrl.SetProtocol( "roots" );
727
728 //----------------------------------------------------------------------
729 // Send the request to the new location
730 //----------------------------------------------------------------------
731 HandleError( RetryAtServer( newUrl, RedirectEntry::EntryRedirect ) );
732 return;
733 }
734
735 //------------------------------------------------------------------------
736 // kXR_wait - we wait, and re-issue the request later
737 //------------------------------------------------------------------------
738 case kXR_wait:
739 {
740 uint32_t waitSeconds = 0;
741
742 if( rsp->hdr.dlen >= 4 )
743 {
744 char *infoMsg = new char[rsp->hdr.dlen-3];
745 infoMsg[rsp->hdr.dlen-4] = 0;
746 memcpy( infoMsg, rsp->body.wait.infomsg, rsp->hdr.dlen-4 );
747 log->Dump( XRootDMsg, "[%s] Got kXR_wait response of %d seconds to "
748 "message %s: %s", pUrl.GetHostId().c_str(),
749 rsp->body.wait.seconds, pRequest->GetObfuscatedDescription().c_str(),
750 infoMsg );
751 delete [] infoMsg;
752 waitSeconds = rsp->body.wait.seconds;
753 }
754 else
755 {
756 log->Dump( XRootDMsg, "[%s] Got kXR_wait response of 0 seconds to "
757 "message %s", pUrl.GetHostId().c_str(),
758 pRequest->GetObfuscatedDescription().c_str() );
759 }
760
761 pAggregatedWaitTime += waitSeconds;
762
763 // We need a special case if the data node comes from metalink
764 // redirector. In this case it might make more sense to try the
765 // next entry in the Metalink than wait.
766 if( OmitWait( *pRequest, pLoadBalancer.url ) )
767 {
768 int maxWait = DefaultMaxMetalinkWait;
769 DefaultEnv::GetEnv()->GetInt( "MaxMetalinkWait", maxWait );
770 if( pAggregatedWaitTime > maxWait )
771 {
772 UpdateTriedCGI();
773 HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRedirectOnWait ) );
774 return;
775 }
776 }
777
778 //----------------------------------------------------------------------
779 // Some messages require rewriting before they can be sent again
780 // after wait
781 //----------------------------------------------------------------------
782 Status st = RewriteRequestWait();
783 if( !st.IsOK() )
784 {
785 pStatus = st;
786 HandleResponse();
787 return;
788 }
789
790 //----------------------------------------------------------------------
791 // Register a task to resend the message in some seconds, if we still
792 // have time to do that, and report a timeout otherwise
793 //----------------------------------------------------------------------
794 time_t resendTime = ::time(0)+waitSeconds;
795
796 if( resendTime < pExpiration )
797 {
798 log->Debug( ExDbgMsg, "[%s] Scheduling WaitTask for MsgHandler: %p (message: %s ).",
799 pUrl.GetHostId().c_str(), this,
800 pRequest->GetObfuscatedDescription().c_str() );
801
802 TaskManager *taskMgr = pPostMaster->GetTaskManager();
803 taskMgr->RegisterTask( new WaitTask( this ), resendTime );
804 }
805 else
806 {
807 log->Debug( XRootDMsg, "[%s] Wait time is too long, timing out %s",
808 pUrl.GetHostId().c_str(),
809 pRequest->GetObfuscatedDescription().c_str() );
810 HandleError( Status( stError, errOperationExpired) );
811 }
812 return;
813 }
814
815 //------------------------------------------------------------------------
816 // kXR_waitresp - the response will be returned in some seconds as an
817 // unsolicited message. Currently all messages of this type are handled
818 // one step before in the XrdClStream::OnIncoming as they need to be
819 // processed synchronously.
820 //------------------------------------------------------------------------
821 case kXR_waitresp:
822 {
823 if( rsp->hdr.dlen < 4 )
824 {
825 log->Error( XRootDMsg, "[%s] Got invalid waitresp response.",
826 pUrl.GetHostId().c_str() );
827 pStatus = Status( stError, errInvalidResponse );
828 HandleResponse();
829 return;
830 }
831
832 log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %d seconds to "
833 "message %s", pUrl.GetHostId().c_str(),
834 rsp->body.waitresp.seconds,
835 pRequest->GetObfuscatedDescription().c_str() );
836 return;
837 }
838
839 //------------------------------------------------------------------------
840 // Default - unrecognized/unsupported response, declare an error
841 //------------------------------------------------------------------------
842 default:
843 {
844 log->Dump( XRootDMsg, "[%s] Got unrecognized response %d to "
845 "message %s", pUrl.GetHostId().c_str(),
846 rsp->hdr.status, pRequest->GetObfuscatedDescription().c_str() );
847 pStatus = Status( stError, errInvalidResponse );
848 HandleResponse();
849 return;
850 }
851 }
852
853 return;
854 }
#define kXR_isManager
union ServerResponse::@0 body
#define kXR_collapseRedir
#define kXR_attrMeta
#define kXR_recoverWrts
#define kXR_isServer
#define kXR_PROTOCOLVERSION
Definition XProtocol.hh:70
#define kXR_ecRedir
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
void CollapseRedirect(const URL &oldurl, const URL &newURL)
Collapse channel URL - replace the URL of the channel.
TaskManager * GetTaskManager()
Get the task manager object user by the post master.
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
void RegisterTask(Task *task, time_t time, bool own=true)
bool IsMetalink() const
Is it a URL to a metalink.
Definition XrdClURL.cc:458
const std::string & GetPassword() const
Get the password.
Definition XrdClURL.hh:153
std::map< std::string, std::string > ParamsMap
Definition XrdClURL.hh:33
const std::string & GetUserName() const
Get the username.
Definition XrdClURL.hh:135
bool IsLocalFile() const
Definition XrdClURL.cc:467
const ParamsMap & GetParams() const
Get the URL params.
Definition XrdClURL.hh:244
const std::string & GetProtocol() const
Get the protocol.
Definition XrdClURL.hh:118
bool IsValid() const
Is the url valid.
Definition XrdClURL.cc:445
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition XrdClUtils.hh:56
static bool CheckEC(const Message *req, const URL &url)
Check if this client can support given EC redirect.
static XRootDStatus UnMarshallBody(Message *msg, uint16_t reqType)
Unmarshall the body of the incoming message.
const uint16_t errRedirectLimit
const int DefaultMaxMetalinkWait
const uint16_t errErrorResponse
const uint16_t errOperationExpired
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t stOK
Everything went OK.
const uint16_t errInvalidResponse
const uint16_t errInvalidRedirectURL
const uint16_t suContinue
const uint16_t errRedirect
const uint16_t errInvalidMessage
URL url
URL of the host.
std::string ToString() const
Create a string representation.
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version

References ServerResponse::body, XrdCl::Utils::CheckEC(), XrdCl::PostMaster::CollapseRedirect(), XrdCl::Log::Debug(), XrdCl::DefaultMaxMetalinkWait, ServerResponseHeader::dlen, XrdCl::Log::Dump(), XrdCl::RedirectEntry::EntryRedirect, XrdCl::RedirectEntry::EntryRedirectOnWait, XrdCl::errErrorResponse, XrdCl::errInvalidMessage, XrdCl::errInvalidRedirectURL, XrdCl::errInvalidResponse, XrdCl::errOperationExpired, XrdCl::Log::Error(), XrdCl::errRedirect, XrdCl::errRedirectLimit, XrdCl::ExDbgMsg, XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), XrdCl::DefaultEnv::GetEnv(), XrdCl::URL::GetHostId(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::URL::GetParams(), XrdCl::URL::GetPassword(), XrdCl::URL::GetProtocol(), XrdCl::PostMaster::GetTaskManager(), XrdCl::URL::GetURL(), XrdCl::URL::GetUserName(), ServerResponse::hdr, ClientRequest::header, XrdCl::URL::IsLocalFile(), XrdCl::URL::IsMetalink(), XrdCl::Status::IsOK(), XrdCl::URL::IsValid(), kXR_attrMeta, kXR_collapseRedir, kXR_ecRedir, kXR_error, kXR_isManager, kXR_isServer, kXR_ok, kXR_oksofar, kXR_PROTOCOLVERSION, kXR_recoverWrts, kXR_redirect, kXR_status, kXR_wait, kXR_waitresp, XrdCl::XRootDQuery::ProtocolVersion, XrdCl::PostMaster::QueryTransport(), XrdCl::TaskManager::RegisterTask(), ClientRequestHdr::requestid, XrdCl::XRootDQuery::ServerFlags, XrdCl::URL::SetParams(), XrdCl::URL::SetPassword(), XrdCl::URL::SetProtocol(), XrdCl::URL::SetUserName(), XrdCl::Utils::splitString(), ServerResponseHeader::status, XrdCl::stError, XrdCl::stFatal, XrdCl::stOK, XrdCl::suContinue, XrdCl::Status::ToString(), XrdCl::XRootDTransport::UnMarshallBody(), XrdCl::HostInfo::url, XrdCl::Log::Warning(), and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ ReadMessageBody()

XRootDStatus XrdCl::XRootDMsgHandler::ReadMessageBody ( Message msg,
Socket socket,
uint32_t &  bytesRead 
)
virtual

Read message body directly from a socket - called if Examine returns Raw flag - only socket related errors may be returned here

Parameters
msgthe corresponding message header
socketthe socket to read from
bytesReadnumber of bytes read by the method
Returns
stOK & suDone if the whole body has been processed stOK & suRetry if more data is needed stError on failure

Reimplemented from XrdCl::MsgHandler.

Definition at line 879 of file XrdClXRootDMsgHandler.cc.

882 {
883 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
884 uint16_t reqId = ntohs( req->header.requestid );
885
886 if( reqId == kXR_pgread )
887 return pPageReader->Read( *socket, bytesRead );
888
889 return pBodyReader->Read( *socket, bytesRead );
890 }

References XrdCl::Buffer::GetBuffer(), ClientRequest::header, kXR_pgread, and ClientRequestHdr::requestid.

+ Here is the call graph for this function:

◆ SetChunkList()

void XrdCl::XRootDMsgHandler::SetChunkList ( ChunkList chunkList)
inline

Set the chunk list.

Definition at line 383 of file XrdClXRootDMsgHandler.hh.

384 {
385 pChunkList = chunkList;
386 if( pBodyReader )
387 pBodyReader->SetChunkList( chunkList );
388 if( chunkList )
389 pChunkStatus.resize( chunkList->size() );
390 else
391 pChunkStatus.clear();
392 }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetCrc32cDigests()

void XrdCl::XRootDMsgHandler::SetCrc32cDigests ( std::vector< uint32_t > &&  crc32cDigests)
inline

Definition at line 394 of file XrdClXRootDMsgHandler.hh.

395 {
396 pCrc32cDigests = std::move( crc32cDigests );
397 }

Referenced by XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetExpiration()

void XrdCl::XRootDMsgHandler::SetExpiration ( time_t  expiration)
inline

Set a timestamp after which we give up.

Definition at line 322 of file XrdClXRootDMsgHandler.hh.

323 {
324 pExpiration = expiration;
325 }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetFollowMetalink()

void XrdCl::XRootDMsgHandler::SetFollowMetalink ( bool  followMetalink)
inline

Definition at line 415 of file XrdClXRootDMsgHandler.hh.

416 {
417 pFollowMetalink = followMetalink;
418 }

Referenced by XrdCl::MessageUtils::RedirectMessage().

+ Here is the caller graph for this function:

◆ SetHostList()

void XrdCl::XRootDMsgHandler::SetHostList ( HostList hostList)
inline

Set host list.

Definition at line 375 of file XrdClXRootDMsgHandler.hh.

376 {
377 pHosts.reset( hostList );
378 }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetKernelBuffer()

void XrdCl::XRootDMsgHandler::SetKernelBuffer ( XrdSys::KernelBuffer kbuff)
inline

Set the kernel buffer.

Definition at line 402 of file XrdClXRootDMsgHandler.hh.

403 {
404 pKBuff = kbuff;
405 }

Referenced by XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetLoadBalancer()

void XrdCl::XRootDMsgHandler::SetLoadBalancer ( const HostInfo loadBalancer)
inline

Set the load balancer.

Definition at line 364 of file XrdClXRootDMsgHandler.hh.

365 {
366 if( !loadBalancer.url.IsValid() )
367 return;
368 pLoadBalancer = loadBalancer;
369 pHasLoadBalancer = true;
370 }

References XrdCl::URL::IsValid(), and XrdCl::HostInfo::url.

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ SetOksofarAsAnswer()

void XrdCl::XRootDMsgHandler::SetOksofarAsAnswer ( bool  oksofarAsAnswer)
inline

Treat the kXR_oksofar response as a valid answer to the message and notify the handler with the URL as a response

Definition at line 348 of file XrdClXRootDMsgHandler.hh.

349 {
350 pOksofarAsAnswer = oksofarAsAnswer;
351 }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetRedirectAsAnswer()

void XrdCl::XRootDMsgHandler::SetRedirectAsAnswer ( bool  redirectAsAnswer)
inline

Treat the kXR_redirect response as a valid answer to the message and notify the handler with the URL as a response

Definition at line 339 of file XrdClXRootDMsgHandler.hh.

340 {
341 pRedirectAsAnswer = redirectAsAnswer;
342 }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetRedirectCounter()

void XrdCl::XRootDMsgHandler::SetRedirectCounter ( uint16_t  redirectCounter)
inline

Set the redirect counter.

Definition at line 410 of file XrdClXRootDMsgHandler.hh.

411 {
412 pRedirectCounter = redirectCounter;
413 }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetStateful()

void XrdCl::XRootDMsgHandler::SetStateful ( bool  stateful)
inline

Definition at line 420 of file XrdClXRootDMsgHandler.hh.

421 {
422 pStateful = stateful;
423 }

Referenced by XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ WaitDone()

void XrdCl::XRootDMsgHandler::WaitDone ( time_t  now)

Called after the wait time for kXR_wait has elapsed

Parameters
nowcurrent timestamp

Definition at line 1098 of file XrdClXRootDMsgHandler.cc.

1099 {
1100 HandleError( RetryAtServer( pUrl, RedirectEntry::EntryWait ) );
1101 }

References XrdCl::RedirectEntry::EntryWait.

◆ WriteMessageBody()

XRootDStatus XrdCl::XRootDMsgHandler::WriteMessageBody ( Socket socket,
uint32_t &  bytesWritten 
)
virtual

Write message body directly to a socket - called if IsRaw returns true - only socket related errors may be returned here

Parameters
socketthe socket to read from
bytesWrittennumber of bytes written by the method
Returns
stOK & suDone if the whole body has been processed stOK & suRetry if more data needs to be written stError on failure

Reimplemented from XrdCl::MsgHandler.

Definition at line 949 of file XrdClXRootDMsgHandler.cc.

951 {
952 //--------------------------------------------------------------------------
953 // First check if it is a PgWrite
954 //--------------------------------------------------------------------------
955 if( !pChunkList->empty() && !pCrc32cDigests.empty() )
956 {
957 //------------------------------------------------------------------------
958 // PgWrite will have just one chunk
959 //------------------------------------------------------------------------
960 ChunkInfo chunk = pChunkList->front();
961 //------------------------------------------------------------------------
962 // Calculate the size of the first and last page (in case the chunk is not
963 // 4KB aligned)
964 //------------------------------------------------------------------------
965 int fLen = 0, lLen = 0;
966 size_t nbpgs = XrdOucPgrwUtils::csNum( chunk.offset, chunk.length, fLen, lLen );
967
968 //------------------------------------------------------------------------
969 // Set the crc32c buffer if not ready yet
970 //------------------------------------------------------------------------
971 if( pPgWrtCksumBuff.GetCursor() == 0 )
972 {
973 uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
974 memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
975 }
976
977 uint32_t btsLeft = chunk.length - pAsyncOffset;
978 uint32_t pglen = ( pPgWrtCurrentPageNb == 0 ? fLen : XrdSys::PageSize ) - pPgWrtCurrentPageOffset;
979 if( pglen > btsLeft ) pglen = btsLeft;
980 char* pgbuf = static_cast<char*>( chunk.buffer ) + pAsyncOffset;
981
982 while( btsLeft > 0 )
983 {
984 // first write the crc32c digest
985 while( pPgWrtCksumBuff.GetCursor() < sizeof( uint32_t ) )
986 {
987 uint32_t dgstlen = sizeof( uint32_t ) - pPgWrtCksumBuff.GetCursor();
988 char* dgstbuf = pPgWrtCksumBuff.GetBufferAtCursor();
989 int btswrt = 0;
990 Status st = socket->Send( dgstbuf, dgstlen, btswrt );
991 if( !st.IsOK() ) return st;
992 bytesWritten += btswrt;
993 pPgWrtCksumBuff.AdvanceCursor( btswrt );
994 if( st.code == suRetry ) return st;
995 }
996 // then write the raw data (one page)
997 int btswrt = 0;
998 Status st = socket->Send( pgbuf, pglen, btswrt );
999 if( !st.IsOK() ) return st;
1000 pgbuf += btswrt;
1001 pglen -= btswrt;
1002 btsLeft -= btswrt;
1003 bytesWritten += btswrt;
1004 pAsyncOffset += btswrt; // update the offset to the raw data
1005 if( st.code == suRetry ) return st;
1006 // if we managed to write all the data ...
1007 if( pglen == 0 )
1008 {
1009 // move to the next page
1010 ++pPgWrtCurrentPageNb;
1011 if( pPgWrtCurrentPageNb < nbpgs )
1012 {
1013 // set the digest buffer
1014 pPgWrtCksumBuff.SetCursor( 0 );
1015 uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1016 memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
1017 }
1018 // set the page length
1019 pglen = XrdSys::PageSize;
1020 if( pglen > btsLeft ) pglen = btsLeft;
1021 // reset offset in the current page
1022 pPgWrtCurrentPageOffset = 0;
1023 }
1024 else
1025 // otherwise just adjust the offset in the current page
1026 pPgWrtCurrentPageOffset += btswrt;
1027
1028 }
1029 }
1030 else if( !pChunkList->empty() )
1031 {
1032 size_t size = pChunkList->size();
1033 for( size_t i = pAsyncChunkIndex ; i < size; ++i )
1034 {
1035 char *buffer = (char*)(*pChunkList)[i].buffer;
1036 uint32_t size = (*pChunkList)[i].length;
1037 size_t leftToBeWritten = size - pAsyncOffset;
1038
1039 while( leftToBeWritten )
1040 {
1041 int btswrt = 0;
1042 Status st = socket->Send( buffer + pAsyncOffset, leftToBeWritten, btswrt );
1043 bytesWritten += btswrt;
1044 if( !st.IsOK() || st.code == suRetry ) return st;
1045 pAsyncOffset += btswrt;
1046 leftToBeWritten -= btswrt;
1047 }
1048 //----------------------------------------------------------------------
1049 // Remember that we have moved to the next chunk, also clear the offset
1050 // within the buffer as we are going to move to a new one
1051 //----------------------------------------------------------------------
1052 ++pAsyncChunkIndex;
1053 pAsyncOffset = 0;
1054 }
1055 }
1056 else
1057 {
1058 Log *log = DefaultEnv::GetLog();
1059
1060 //------------------------------------------------------------------------
1061 // If the socket is encrypted we cannot use a kernel buffer, we have to
1062 // convert to user space buffer
1063 //------------------------------------------------------------------------
1064 if( socket->IsEncrypted() )
1065 {
1066 log->Debug( XRootDMsg, "[%s] Channel is encrypted: cannot use kernel buffer.",
1067 pUrl.GetHostId().c_str() );
1068
1069 char *ubuff = 0;
1070 ssize_t ret = XrdSys::Move( *pKBuff, ubuff );
1071 if( ret < 0 ) return Status( stError, errInternal );
1072 pChunkList->push_back( ChunkInfo( 0, ret, ubuff ) );
1073 return WriteMessageBody( socket, bytesWritten );
1074 }
1075
1076 //------------------------------------------------------------------------
1077 // Send the data
1078 //------------------------------------------------------------------------
1079 while( !pKBuff->Empty() )
1080 {
1081 int btswrt = 0;
1082 Status st = socket->Send( *pKBuff, btswrt );
1083 bytesWritten += btswrt;
1084 if( !st.IsOK() || st.code == suRetry ) return st;
1085 }
1086
1087 log->Debug( XRootDMsg, "[%s] Request %s payload (kernel buffer) transferred to socket.",
1088 pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
1089 }
1090
1091 return Status();
1092 }
void AdvanceCursor(uint32_t delta)
Advance the cursor.
char * GetBufferAtCursor()
Get the buffer pointer at the append cursor.
void SetCursor(uint32_t cursor)
Set the cursor.
uint32_t GetCursor() const
Get append cursor.
XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten)
const uint16_t suRetry
const uint16_t errInternal
Internal error.
static const int PageSize
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)

References XrdCl::Buffer::AdvanceCursor(), XrdCl::ChunkInfo::buffer, XrdCl::Status::code, XrdOucPgrwUtils::csNum(), XrdCl::Log::Debug(), XrdSys::KernelBuffer::Empty(), XrdCl::errInternal, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::Socket::IsEncrypted(), XrdCl::Status::IsOK(), XrdCl::ChunkInfo::length, XrdSys::Move(), XrdCl::ChunkInfo::offset, XrdSys::PageSize, XrdCl::Socket::Send(), XrdCl::Buffer::SetCursor(), XrdCl::stError, XrdCl::suRetry, WriteMessageBody(), and XrdCl::XRootDMsg.

Referenced by WriteMessageBody().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

Friends And Related Symbol Documentation

◆ HandleRspJob

friend class HandleRspJob
friend

Definition at line 121 of file XrdClXRootDMsgHandler.hh.


The documentation for this class was generated from the following files: