XRootD
Loading...
Searching...
No Matches
XrdXrootdAioTask Class Referenceabstract

#include <XrdXrootdAioTask.hh>

+ Inheritance diagram for XrdXrootdAioTask:
+ Collaboration diagram for XrdXrootdAioTask:

Public Member Functions

void Completed (XrdXrootdAioBuff *aioP)
 
const char * ID ()
 
void Init (XrdXrootdProtocol *protP, XrdXrootdResponse &resp, XrdXrootdFile *fP)
 
virtual void Read (long long offs, int dlen)=0
 
virtual void Recycle (bool release)=0
 
XrdXrootdProtocolurProtocol ()
 
virtual int Write (long long offs, int dlen)=0
 
- Public Member Functions inherited from XrdJob
 XrdJob (const char *desc="")
 
virtual ~XrdJob ()
 
virtual void DoIt ()=0
 

Protected Member Functions

 XrdXrootdAioTask (const char *what="aio request")
 
virtual ~XrdXrootdAioTask ()
 
virtual void CopyF2L ()=0
 
virtual int CopyL2F ()=0
 
virtual bool CopyL2F (XrdXrootdAioBuff *aioP)=0
 
bool Drain ()
 
int gdDone () override
 
void gdFail () override
 
XrdXrootdAioBuffgetBuff (bool wait)
 
void SendError (int rc, const char *eText)
 
void SendFSError (int rc)
 
bool Validate (XrdXrootdAioBuff *aioP)
 

Protected Attributes

union { 
 
XrdXrootdNormAionextNorm
 
XrdXrootdPgrwAionextPgrw
 
XrdXrootdAioTasknextTask
 
};  
 
union { 
 
XrdXrootdAioBufffinalRead
 
XrdXrootdAioBuffpendWrite
 
};  
 
XrdSysMutex aioMutex
 
XrdSysCondVar2 aioReady
 
char aioState
 
XrdXrootdFiledataFile
 
int dataLen
 
XrdLinkdataLink
 
off_t dataOffset
 
off_t highOffset
 
RAtomic_uchar inFlight
 
RAtomic_bool isDone
 
XrdXrootdAioBuffpendQ
 
XrdXrootdAioBuffpendQEnd
 
XrdXrootdProtocolProtocol
 
XrdXrootdResponse Response
 
char Status
 

Static Protected Attributes

static const int aioDead = 0x01
 
static const int aioHeld = 0x02
 
static const int aioPage = 0x04
 
static const int aioRead = 0x08
 
static const int aioSchd = 0x10
 
static const int Offline = 0
 
static const int Running = 1
 
static const char * TraceID = "AioTask"
 
static const int Waiting = 2
 

Friends

class XrdXrootdAioFob
 

Additional Inherited Members

- Public Attributes inherited from XrdJob
const char * Comment
 
XrdJobNextJob
 

Detailed Description

Definition at line 46 of file XrdXrootdAioTask.hh.

Constructor & Destructor Documentation

◆ XrdXrootdAioTask()

XrdXrootdAioTask::XrdXrootdAioTask ( const char * what = "aio request")
inlineprotected

Definition at line 69 of file XrdXrootdAioTask.hh.

70 : XrdJob(what), aioReady(aioMutex) {}
XrdJob(const char *desc="")
Definition XrdJob.hh:51
XrdSysCondVar2 aioReady

References XrdJob::XrdJob(), aioMutex, and aioReady.

+ Here is the call graph for this function:

◆ ~XrdXrootdAioTask()

virtual XrdXrootdAioTask::~XrdXrootdAioTask ( )
inlineprotectedvirtual

Definition at line 71 of file XrdXrootdAioTask.hh.

71{}

Member Function Documentation

◆ Completed()

void XrdXrootdAioTask::Completed ( XrdXrootdAioBuff * aioP)

Definition at line 73 of file XrdXrootdAioTask.cc.

74{
75// Lock this code path
76//
77 aioMutex.Lock();
78
79// If this request is not running and completed then take a shortcut.
80//
81 if (Status == Offline && isDone)
82 {aioP->Recycle();
83 inFlight--;
84 aioMutex.UnLock();
85 if (inFlight <= 0) Recycle(true);
86 return;
87 }
88
89// Add this element to the end of the queue
90//
91 aioP->next = 0;
92 if (!pendQ) pendQEnd = pendQ = aioP;
93 else {pendQEnd->next = aioP;
94 pendQEnd = aioP;
95 }
96
97// Check if the request is waiting for our buffer tell it now has one. Otherwise,
98// if the task is offline then it cannot be done (see above); so schedule it.
99//
100 if (Status != Running)
101 {if (Status == Waiting) aioReady.Signal();
102 else Sched->Schedule(this);
103 Status = Running;
104 }
105
106 aioMutex.UnLock();
107}
void Schedule(XrdJob *jp)
XrdXrootdAioBuff * next
virtual void Recycle() override
static const int Offline
static const int Waiting
virtual void Recycle(bool release)=0
XrdXrootdAioBuff * pendQ
static const int Running
XrdXrootdAioBuff * pendQEnd
RAtomic_uchar inFlight
XrdScheduler * Sched

References aioMutex, aioReady, inFlight, isDone, XrdXrootdAioBuff::next, Offline, pendQ, pendQEnd, XrdXrootdAioBuff::Recycle(), Recycle(), Running, XrdXrootd::Sched, Status, and Waiting.

+ Here is the call graph for this function:

◆ CopyF2L()

virtual void XrdXrootdAioTask::CopyF2L ( )
protectedpure virtual

◆ CopyL2F() [1/2]

virtual int XrdXrootdAioTask::CopyL2F ( )
protectedpure virtual

Referenced by gdDone().

+ Here is the caller graph for this function:

◆ CopyL2F() [2/2]

virtual bool XrdXrootdAioTask::CopyL2F ( XrdXrootdAioBuff * aioP)
protectedpure virtual

◆ Drain()

bool XrdXrootdAioTask::Drain ( )
protected

Definition at line 113 of file XrdXrootdAioTask.cc.

114{
115 XrdXrootdAioBuff *aioP;
116 int maxWait = 6; // Max seconds to wait for outstanding requests
117
118// Reap as many aio object as you can
119//
120 aioMutex.Lock();
121 while(inFlight > 0)
122 {while((aioP = pendQ))
123 {if (!(pendQ = aioP->next)) pendQEnd = 0;
124 aioMutex.UnLock(); // Open a window of opportunity
125 inFlight--;
126 aioP->Recycle();
127 aioMutex.Lock();
128 }
129 if (inFlight <= 0 || !Wait4Buff(maxWait)) break;
130 }
131
132// If there are still in flight requets, issue message and we will run the
133// drain in the background.
134//
135 if (inFlight > 0)
136 {char buff[128];
137 snprintf(buff, sizeof(buff),
138 "aio%c overdue %d inflight request%s for",
139 (aioState & aioRead ? 'R' : 'W'), int(inFlight),
140 (inFlight > 1 ? "s" : ""));
141 eLog.Emsg("AioTask", buff, dataLink->ID, dataFile->FileKey);
142 }
143
144// Indicate we are going offline and tell the caller if we need to stay
145// alive to drain the tardy requests in the background.
146//
147 Status = Offline;
148 isDone = true;
149 aioMutex.UnLock();
150 return inFlight <= 0;
151}
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
XrdXrootdFile * dataFile
static const int aioRead
XrdSysError eLog

References aioMutex, aioRead, aioState, dataFile, dataLink, XrdXrootd::eLog, inFlight, isDone, XrdXrootdAioBuff::next, Offline, pendQ, pendQEnd, XrdXrootdAioBuff::Recycle(), and Status.

Referenced by gdDone(), and gdFail().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ gdDone()

int XrdXrootdAioTask::gdDone ( )
overrideprotected

Definition at line 157 of file XrdXrootdAioTask.cc.

158{
159 XrdXrootdAioBuff *bP = pendWrite;
160 int rc;
161
162// Do some debugging
163//
164 TRACEP(DEBUG,"gdDone: "<<(void *)this<<" pendWrite "
165 <<(pendWrite != 0 ? "set":"not set"));
166
167// This is a callback indicating the pending aio object has all of the data.
168// Resume sending data to the destination.
169//
170 pendWrite = 0;
171 if (!bP) rc = CopyL2F();
172 else {if (CopyL2F(bP) && (inFlight || !isDone)) rc = CopyL2F();
173 else rc = 0;
174 }
175
176// Do some debugging
177//
178 TRACEP(DEBUG,"gdDone: "<<(void *)this<<" ending rc="<<rc);
179
180// If we are not pausing for data to be delivered. Drain any oustanding aio
181// requests and discard left over bytes, if any. Note we must copy the left
182// over length as we may recycle before discarding as discard must be last.
183//
184 if (rc <= 0)
185 {XrdXrootdProtocol* prot = Protocol;
186 int dlen = dataLen;
187 if (!inFlight) Recycle(true);
188 else Recycle(Drain());
189 if (!rc && dlen) return prot->getDump(Comment, dlen);
190 }
191 return rc;
192}
#define DEBUG(x)
#define TRACEP(act, x)
const char * Comment
Definition XrdJob.hh:47
virtual int CopyL2F()=0
XrdXrootdProtocol * Protocol
int getDump(const char *dtype, int dlen)

References XrdJob::Comment, CopyL2F(), dataLen, DEBUG, Drain(), XrdXrootdProtocol::getDump(), inFlight, isDone, Protocol, Recycle(), and TRACEP.

Referenced by XrdXrootdNormAio::Write(), and XrdXrootdPgrwAio::Write().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ gdFail()

void XrdXrootdAioTask::gdFail ( )
overrideprotected

Definition at line 198 of file XrdXrootdAioTask.cc.

199{
200 char eBuff[512];
201
202// Do some tracing
203//
204 TRACEP(DEBUG,"gdFail: "<<(void *)this);
205
206// Format message for display
207//
208 snprintf(eBuff, sizeof(eBuff), "link error aborted %s for", Comment);
209 eLog.Emsg("AioTask", eBuff, dataLink->ID, dataFile->FileKey);
210
211// This is a callback indicating the link is dead. Terminate this operation.
212//
213 isDone = true;
214 aioState |= aioDead;
215 dataLen = 0;
216 if (pendWrite) {pendWrite->Recycle(); pendWrite = 0;}
217
218// If this is a read, cancel all queued read requests
219//
220 if (aioState & aioRead) dataFile->aioFob->Reset(Protocol);
221
222// If we still have any requests in flight drain them.
223//
224 if (!inFlight) Recycle(true);
225 else Recycle(Drain());
226}
static const int aioDead

References aioDead, aioRead, aioState, XrdJob::Comment, dataFile, dataLen, dataLink, DEBUG, Drain(), XrdXrootd::eLog, inFlight, isDone, Protocol, Recycle(), and TRACEP.

+ Here is the call graph for this function:

◆ getBuff()

XrdXrootdAioBuff * XrdXrootdAioTask::getBuff ( bool wait)
protected

Definition at line 232 of file XrdXrootdAioTask.cc.

233{
234 XrdXrootdAioBuff* aioP;
235
236// Try to get the next buffer
237//
238 aioMutex.Lock();
239do{if ((aioP = pendQ))
240 {if (!(pendQ = aioP->next)) pendQEnd = 0;
241 aioMutex.UnLock();
242 inFlight--;
243 return aioP;
244 }
245
246// If the caller does not want to wait or if there is nothing in flight, return
247//
248 if (!wait || !inFlight)
249 {aioMutex.UnLock();
250 return 0;
251 }
252
253// So, wait for a buffer to arrive
254//
255 } while(Wait4Buff());
256
257// We timed out and this is considered an error
258//
259 aioMutex.UnLock();
260 SendError(ETIMEDOUT, (aioState & aioRead ? "aio file read timed out"
261 : "aio file write timed out"));
262 return 0;
263}
void SendError(int rc, const char *eText)

References aioMutex, aioRead, aioState, inFlight, XrdXrootdAioBuff::next, pendQ, pendQEnd, and SendError().

+ Here is the call graph for this function:

◆ ID()

const char * XrdXrootdAioTask::ID ( )

Definition at line 269 of file XrdXrootdAioTask.cc.

269{return dataLink->ID;}

References dataLink.

◆ Init()

void XrdXrootdAioTask::Init ( XrdXrootdProtocol * protP,
XrdXrootdResponse & resp,
XrdXrootdFile * fP )

Definition at line 275 of file XrdXrootdAioTask.cc.

278{
279
280// Reset the object
281//
282 pendQEnd = pendQ = 0;
283 finalRead = 0; // Also sets pendWrite
284 Protocol = protP;
285 dataLink = resp.theLink();
286 Response = resp;
287 dataFile = fP;
288 aioState = 0;
289 inFlight = 0;
290 isDone = false;
291 Status = Running;
292}
XrdXrootdResponse Response

References aioState, dataFile, dataLink, inFlight, isDone, pendQ, pendQEnd, Protocol, Response, Running, Status, and XrdXrootdResponse::theLink().

Referenced by XrdXrootdNormAio::Alloc(), and XrdXrootdPgrwAio::Alloc().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Read()

virtual void XrdXrootdAioTask::Read ( long long offs,
int dlen )
pure virtual

Implemented in XrdXrootdNormAio, and XrdXrootdPgrwAio.

◆ Recycle()

virtual void XrdXrootdAioTask::Recycle ( bool release)
pure virtual

Implemented in XrdXrootdNormAio, and XrdXrootdPgrwAio.

Referenced by Completed(), gdDone(), gdFail(), XrdXrootdAioFob::Reset(), and XrdXrootdAioFob::Reset().

+ Here is the caller graph for this function:

◆ SendError()

void XrdXrootdAioTask::SendError ( int rc,
const char * eText )
protected

Definition at line 298 of file XrdXrootdAioTask.cc.

299{
300 char eBuff[1024];
301
302// If there is no error text, use the rc
303//
304 if (!eText) eText = (rc ? XrdSysE2T(rc) : "invalid error code");
305
306// For message for display
307//
308 snprintf(eBuff, sizeof(eBuff), "async %s failed for %s;",
309 (aioState & aioRead ? "read" : "write"), dataLink->ID);
310 eLog.Emsg("AioTask", eBuff, eText, dataFile->FileKey);
311
312// If this request is still active, send the error to the client
313//
314 if (!isDone)
316 if (Response.Send(eCode, eText))
317 {aioState |= aioDead;
318 dataLen = 0;
319 } else if (aioState & aioRead) dataLen = 0;
320 isDone = true;
321 }
322}
XErrorCode
Definition XProtocol.hh:989
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104
static int mapError(int rc)

References aioDead, aioRead, aioState, dataFile, dataLen, dataLink, XrdXrootd::eLog, isDone, XProtocol::mapError(), Response, and XrdSysE2T().

Referenced by getBuff(), SendFSError(), and Validate().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ SendFSError()

void XrdXrootdAioTask::SendFSError ( int rc)
protected

Definition at line 328 of file XrdXrootdAioTask.cc.

329{
330 XrdOucErrInfo &myError = dataFile->XrdSfsp->error;
331 int eCode;
332
333// We can only handle actual errors. Under some conditions a redirect (e.g.
334// Xcache) can return other error codes. We treat these as server errors.
335//
336 if (rc != SFS_ERROR)
337 {char eBuff[256];
338 snprintf(eBuff, sizeof(eBuff), "fs returned unexpected rc %d", rc);
339 SendError(EFAULT, eBuff);
340 if (myError.extData()) myError.Reset();
341 return;
342 }
343
344// Handle file system error but only if we are still alive
345//
346 if (!isDone)
347 {const char *eMsg = myError.getErrText(eCode);
348 eLog.Emsg("AioTask", dataLink->ID, eMsg, dataFile->FileKey);
349 int rc = XProtocol::mapError(eCode);
350 if (Response.Send((XErrorCode)rc, eMsg))
351 {aioState |= aioDead;
352 dataLen = 0;
353 } else if (aioState & aioRead) dataLen = 0;
354 isDone = true;
355 }
356
357// Clear error message and recycle aio object if need be
358//
359 if (myError.extData()) myError.Reset();
360}
#define eMsg(x)
#define SFS_ERROR
const char * getErrText()
void Reset()
Reset object to no message state. Call this method to release appendages.

References aioDead, aioRead, aioState, dataFile, dataLen, dataLink, XrdXrootd::eLog, eMsg, XrdOucErrInfo::extData(), XrdOucErrInfo::getErrText(), isDone, XProtocol::mapError(), XrdOucErrInfo::Reset(), Response, SendError(), and SFS_ERROR.

+ Here is the call graph for this function:

◆ urProtocol()

XrdXrootdProtocol * XrdXrootdAioTask::urProtocol ( )
inline

Definition at line 63 of file XrdXrootdAioTask.hh.

63{return Protocol;}

References Protocol.

Referenced by XrdXrootdAioBuff::Alloc(), and XrdXrootdAioPgrw::Alloc().

+ Here is the caller graph for this function:

◆ Validate()

bool XrdXrootdAioTask::Validate ( XrdXrootdAioBuff * aioP)
protected

Definition at line 366 of file XrdXrootdAioTask.cc.

367{
368 ssize_t aioResult = aioP->Result;
369 off_t aioOffset = aioP->sfsAio.aio_offset;
370 int aioLength = aioP->sfsAio.aio_nbytes;
371
372// Step 1: Check if this request is already completed. This may be the case
373// if we had a previous error.
374//
375 if (isDone) return false;
376
377// Step 2: Check if an error occurred as this will terminate the request even
378// if we have not sent all of the data.
379//
380 if (aioP->Result < 0)
381 {SendError(-aioP->Result, 0);
382 return false;
383 }
384
385// Step 3: Check for a short read which signals that no more data past this
386// offset is forthcomming. Save it as we will send a final response
387// using this element. We discard zero length reads. It's an error if we
388// get more than one short read with data or if its offset is less than
389// the highest full read element.
390//
391 if (aioResult < aioLength)
392 {dataLen = 0;
393 if (!aioResult)
394 {if ((finalRead && aioOffset < finalRead->sfsAio.aio_offset)
395 || aioOffset < highOffset) SendError(EFAULT, "embedded null block");
396 return false;
397 } else {
398 if (aioOffset < highOffset)
399 {SendError(ENODEV, "embedded short block");
400 return false;
401 } else {
402 if (finalRead) SendError(ENODEV, "multiple short blocks");
403 else {finalRead = aioP;
404 highOffset = aioOffset;
405 }
406 }
407 }
408 return false;
409 }
410
411// Step 4: This is a full read and its offset must be lower than the offset of
412// any short read we have encountered.
413//
414 if (finalRead && aioOffset >= finalRead->sfsAio.aio_offset)
415 {SendError(ENODEV, "read offset past EOD");
416 return false;
417 }
418 if (aioOffset > highOffset) highOffset = aioOffset;
419 return true;
420}
off_t aio_offset
Definition XrdSfsAio.hh:49
size_t aio_nbytes
Definition XrdSfsAio.hh:48
ssize_t Result
Definition XrdSfsAio.hh:65
struct aiocb sfsAio
Definition XrdSfsAio.hh:62

References aiocb::aio_nbytes, aiocb::aio_offset, dataLen, highOffset, isDone, XrdSfsAio::Result, SendError(), and XrdSfsAio::sfsAio.

+ Here is the call graph for this function:

◆ Write()

virtual int XrdXrootdAioTask::Write ( long long offs,
int dlen )
pure virtual

Implemented in XrdXrootdNormAio, and XrdXrootdPgrwAio.

Friends And Related Symbol Documentation

◆ XrdXrootdAioFob

friend class XrdXrootdAioFob
friend

Definition at line 49 of file XrdXrootdAioTask.hh.

References XrdXrootdAioFob.

Referenced by XrdXrootdAioFob.

Member Data Documentation

◆ [union]

union { ... } XrdXrootdAioTask

◆ [union]

union { ... } XrdXrootdAioTask

◆ aioDead

const int XrdXrootdAioTask::aioDead = 0x01
staticprotected

Definition at line 115 of file XrdXrootdAioTask.hh.

Referenced by gdFail(), SendError(), and SendFSError().

◆ aioHeld

const int XrdXrootdAioTask::aioHeld = 0x02
staticprotected

Definition at line 116 of file XrdXrootdAioTask.hh.

Referenced by XrdXrootdNormAio::Recycle(), and XrdXrootdPgrwAio::Recycle().

◆ aioMutex

XrdSysMutex XrdXrootdAioTask::aioMutex
protected

Definition at line 86 of file XrdXrootdAioTask.hh.

Referenced by XrdXrootdAioTask(), Completed(), Drain(), and getBuff().

◆ aioPage

const int XrdXrootdAioTask::aioPage = 0x04
staticprotected

Definition at line 117 of file XrdXrootdAioTask.hh.

Referenced by XrdXrootdPgrwAio::Read().

◆ aioRead

◆ aioReady

XrdSysCondVar2 XrdXrootdAioTask::aioReady
protected

Definition at line 87 of file XrdXrootdAioTask.hh.

Referenced by XrdXrootdAioTask(), and Completed().

◆ aioSchd

const int XrdXrootdAioTask::aioSchd = 0x10
staticprotected

Definition at line 119 of file XrdXrootdAioTask.hh.

◆ aioState

◆ dataFile

◆ dataLen

◆ dataLink

◆ dataOffset

off_t XrdXrootdAioTask::dataOffset
protected

◆ highOffset

off_t XrdXrootdAioTask::highOffset
protected

◆ inFlight

RAtomic_uchar XrdXrootdAioTask::inFlight
protected

Definition at line 107 of file XrdXrootdAioTask.hh.

Referenced by Completed(), Drain(), gdDone(), gdFail(), getBuff(), and Init().

◆ isDone

◆ Offline

const int XrdXrootdAioTask::Offline = 0
staticprotected

Definition at line 123 of file XrdXrootdAioTask.hh.

Referenced by Completed(), and Drain().

◆ pendQ

XrdXrootdAioBuff* XrdXrootdAioTask::pendQ
protected

Definition at line 88 of file XrdXrootdAioTask.hh.

Referenced by Completed(), Drain(), getBuff(), and Init().

◆ pendQEnd

XrdXrootdAioBuff* XrdXrootdAioTask::pendQEnd
protected

Definition at line 89 of file XrdXrootdAioTask.hh.

Referenced by Completed(), Drain(), getBuff(), and Init().

◆ Protocol

◆ Response

XrdXrootdResponse XrdXrootdAioTask::Response
protected

Definition at line 111 of file XrdXrootdAioTask.hh.

Referenced by Init(), SendError(), and SendFSError().

◆ Running

const int XrdXrootdAioTask::Running = 1
staticprotected

Definition at line 124 of file XrdXrootdAioTask.hh.

Referenced by Completed(), and Init().

◆ Status

char XrdXrootdAioTask::Status
protected

◆ TraceID

const char * XrdXrootdAioTask::TraceID = "AioTask"
staticprotected

Definition at line 84 of file XrdXrootdAioTask.hh.

◆ Waiting

const int XrdXrootdAioTask::Waiting = 2
staticprotected

Definition at line 125 of file XrdXrootdAioTask.hh.

Referenced by Completed().


The documentation for this class was generated from the following files: