XRootD
Loading...
Searching...
No Matches
XrdTpcStream.cc
Go to the documentation of this file.
1
2#include <sstream>
3
4#include "XrdTpcStream.hh"
5
8
9using namespace TPC;
10
12{
13 for (std::vector<Entry*>::iterator buffer_iter = m_buffers.begin();
14 buffer_iter != m_buffers.end();
15 buffer_iter++) {
16 delete *buffer_iter;
17 *buffer_iter = NULL;
18 }
19 m_fh->close();
20}
21
22
23bool
25{
26 // Do not close twice
27 if (!m_open_for_write) {
28 return false;
29 }
30 m_open_for_write = false;
31
32 for (std::vector<Entry*>::iterator buffer_iter = m_buffers.begin();
33 buffer_iter != m_buffers.end();
34 buffer_iter++) {
35 delete *buffer_iter;
36 *buffer_iter = NULL;
37 }
38
39 if (m_fh->close() == SFS_ERROR) {
40 std::stringstream ss;
41 const char *msg = m_fh->error.getErrText();
42 if (!msg || (*msg == '\0')) {msg = "(no error message provided)";}
43 ss << "Failure when closing file handle: " << msg << " (code=" << m_fh->error.getErrInfo() << ")";
44 m_error_buf = ss.str();
45 return false;
46 }
47
48 // If there are outstanding buffers to reorder, finalization failed
49 return m_avail_count == m_buffers.size();
50}
51
52
53int
54Stream::Stat(struct stat* buf)
55{
56 return m_fh->stat(buf);
57}
58
59ssize_t
60Stream::Write(off_t offset, const char *buf, size_t size, bool force)
61{
62/*
63 * NOTE: these lines are useful for debuggin the state of the buffer
64 * management code; too expensive to compile in and have a runtime switch.
65 std::stringstream ss;
66 ss << "Offset=" << offset << ", Size=" << size << ", force=" << force;
67 m_log.Emsg("Stream::Write", ss.str().c_str());
68 DumpBuffers();
69*/
70 if (!m_open_for_write) {
71 if (!m_error_buf.size()) {m_error_buf = "Logic error: writing to a buffer not opened for write";}
72 return SFS_ERROR;
73 }
74 size_t bytes_accepted = 0;
75 int retval = size;
76 if (offset < m_offset) {
77 if (!m_error_buf.size()) {m_error_buf = "Logic error: writing to a prior offset";}
78 return SFS_ERROR;
79 }
80 // If this is write is appending to the stream and
81 // MB-aligned, then we write it to disk; otherwise, the
82 // data will be buffered.
83 if (offset == m_offset && (force || (size && !(size % (1024*1024))))) {
84 retval = WriteImpl(offset, buf, size);
85 bytes_accepted = retval;
86 // On failure, we don't care about flushing buffers from memory --
87 // the stream is now invalid.
88 if (retval < 0) {
89 return retval;
90 }
91 // If there are no in-use buffers, then we don't need to
92 // do any accounting.
93 if (m_avail_count == m_buffers.size()) {
94 return retval;
95 }
96 }
97 // Even if we already accepted the current data, always
98 // iterate through available buffers and try to write as
99 // much out to disk as possible.
100 Entry *avail_entry;
101 bool buffer_was_written;
102 size_t avail_count = 0;
103 do {
104 avail_count = 0;
105 avail_entry = NULL;
106 buffer_was_written = false;
107 for (std::vector<Entry*>::iterator entry_iter = m_buffers.begin();
108 entry_iter != m_buffers.end();
109 entry_iter++) {
110 // Always try to dump from memory; when size == 0, then we are
111 // going to force a flush even if things are not MB-aligned.
112 int retval2 = (*entry_iter)->Write(*this, size == 0);
113 if (retval2 == SFS_ERROR) {
114 if (!m_error_buf.size()) {m_error_buf = "Unknown filesystem write failure.";}
115 return retval2;
116 }
117 buffer_was_written |= retval2 > 0;
118 if ((*entry_iter)->Available()) { // Empty buffer
119 if (!avail_entry) {avail_entry = *entry_iter;}
120 avail_count ++;
121 }
122 else if (bytes_accepted != size && size) {
123 size_t new_accept = (*entry_iter)->Accept(offset + bytes_accepted, buf + bytes_accepted, size - bytes_accepted);
124 // Partial accept; buffer should be writable which means we should free it up
125 // for next iteration
126 if (new_accept && new_accept != size - bytes_accepted) {
127 int retval3 = (*entry_iter)->Write(*this, false);
128 if (retval3 == SFS_ERROR) {
129 if (!m_error_buf.size()) {m_error_buf = "Unknown filesystem write failure.";}
130 return SFS_ERROR;
131 }
132 buffer_was_written = true;
133 }
134 bytes_accepted += new_accept;
135 }
136 }
137 } while ((avail_count != m_buffers.size()) && buffer_was_written);
138 m_avail_count = avail_count;
139
140 if (bytes_accepted != size && size) { // No place for this data in allocated buffers
141 if (!avail_entry) { // No available buffers to allocate; logic error, should not happen.
142 DumpBuffers();
143 m_error_buf = "No empty buffers available to place unordered data.";
144 return SFS_ERROR;
145 }
146 if (avail_entry->Accept(offset + bytes_accepted, buf + bytes_accepted, size - bytes_accepted) != size - bytes_accepted) { // Empty buffer cannot accept?!?
147 m_error_buf = "Empty re-ordering buffer was unable to to accept data; internal logic error.";
148 return SFS_ERROR;
149 }
150 m_avail_count --;
151 }
152
153 // If we have low buffer occupancy, then release memory.
154 if ((m_buffers.size() > 2) && (m_avail_count * 2 > m_buffers.size())) {
155 for (std::vector<Entry*>::iterator entry_iter = m_buffers.begin();
156 entry_iter != m_buffers.end();
157 entry_iter++) {
158 (*entry_iter)->ShrinkIfUnused();
159 }
160 }
161
162 return retval;
163}
164
165
166ssize_t Stream::WriteImpl(off_t offset, const char *buf, size_t size)
167{
168 ssize_t retval;
169 if (size == 0) {return 0;}
170 retval = m_fh->write(offset, buf, size);
171 if (retval != SFS_ERROR) {
172 m_offset += retval;
173 } else {
174 std::stringstream ss;
175 const char *msg = m_fh->error.getErrText();
176 if (!msg || (*msg == '\0')) {msg = "(no error message provided)";}
177 ss << msg << " (code=" << m_fh->error.getErrInfo() << ")";
178 m_error_buf = ss.str();
179 }
180 return retval;
181}
182
183
184void
186{
187 m_log.Emsg("Stream::DumpBuffers", "Beginning dump of stream buffers.");
188 {
189 std::stringstream ss;
190 ss << "Stream offset: " << m_offset;
191 m_log.Emsg("Stream::DumpBuffers", ss.str().c_str());
192 }
193 size_t idx = 0;
194 for (std::vector<Entry*>::const_iterator entry_iter = m_buffers.begin();
195 entry_iter!= m_buffers.end();
196 entry_iter++) {
197 std::stringstream ss;
198 ss << "Buffer " << idx << ": Offset=" << (*entry_iter)->GetOffset() << ", Size="
199 << (*entry_iter)->GetSize() << ", Capacity=" << (*entry_iter)->GetCapacity();
200 m_log.Emsg("Stream::DumpBuffers", ss.str().c_str());
201 idx ++;
202 }
203 m_log.Emsg("Stream::DumpBuffers", "Finish dump of stream buffers.");
204}
205
206
207int
208Stream::Read(off_t offset, char *buf, size_t size)
209{
210 return m_fh->read(offset, buf, size);
211}
#define stat(a, b)
Definition XrdPosix.hh:96
#define SFS_ERROR
int Read(off_t offset, char *buffer, size_t size)
ssize_t Write(off_t offset, const char *buffer, size_t size, bool force)
bool Finalize()
void DumpBuffers() const
int Stat(struct stat *)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)