XRootD
Loading...
Searching...
No Matches
XrdEcRedundancyProvider.cc
Go to the documentation of this file.
1/************************************************************************
2 * KineticIo - a file io interface library to kinetic devices. *
3 * *
4 * This Source Code Form is subject to the terms of the Mozilla *
5 * Public License, v. 2.0. If a copy of the MPL was not *
6 * distributed with this file, You can obtain one at *
7 * https://mozilla.org/MP:/2.0/. *
8 * *
9 * This program is distributed in the hope that it will be useful, *
10 * but is provided AS-IS, WITHOUT ANY WARRANTY; including without *
11 * the implied warranty of MERCHANTABILITY, NON-INFRINGEMENT or *
12 * FITNESS FOR A PARTICULAR PURPOSE. See the Mozilla Public *
13 * License for more details. *
14 ************************************************************************/
15
17
18#include <isa-l.h>
19#include <cstring>
20#include <sstream>
21#include <algorithm>
22
23namespace XrdEc
24{
25
26//--------------------------------------------------------------------------
30//--------------------------------------------------------------------------
31class Convert{
32 public:
33 //--------------------------------------------------------------------------
38 //--------------------------------------------------------------------------
39 template<typename...Args>
40 static std::string toString(Args&&...args){
41 std::stringstream s;
42 argsToStream(s, std::forward<Args>(args)...);
43 return s.str();
44 }
45 private:
46 //--------------------------------------------------------------------------
50 //--------------------------------------------------------------------------
51 template<typename Last>
52 static void argsToStream(std::stringstream& stream, Last&& last) {
53 stream << last;
54 }
55
56 //--------------------------------------------------------------------------
61 //--------------------------------------------------------------------------
62 template<typename First, typename...Rest >
63 static void argsToStream(std::stringstream& stream, First&& first, Rest&&...rest) {
64 stream << first;
65 argsToStream(stream, std::forward<Rest>(rest)...);
66 }
67};
68
69
70
71/* This function is (almost) completely ripped from the erasure_code_test.cc file
72 distributed with the isa-l library. */
74 unsigned char* encode_matrix, // in: encode matrix
75 unsigned char* decode_matrix, // in: buffer, out: generated decode matrix
76 unsigned int* decode_index, // out: order of healthy blocks used for decoding [data#1, data#3, ..., parity#1... ]
77 unsigned char* src_err_list, // in: array of #nerrs size [index error #1, index error #2, ... ]
78 unsigned char* src_in_err, // in: array of #data size > [1,0,0,0,1,0...] -> 0 == no error, 1 == error
79 unsigned int nerrs, // #total errors
80 unsigned int nsrcerrs, // #data errors
81 unsigned int k, // #data
82 unsigned int m // #data+parity
83)
84{
85 unsigned i, j, p;
86 unsigned int r;
87 unsigned char* invert_matrix, * backup, * b, s;
88 int incr = 0;
89
90 std::vector<unsigned char> memory((size_t) (m * k * 3));
91 b = &memory[0];
92 backup = &memory[m * k];
93 invert_matrix = &memory[2 * m * k];
94
95 // Construct matrix b by removing error rows
96 for (i = 0, r = 0; i < k; i++, r++) {
97 while (src_in_err[r]) {
98 r++;
99 }
100 for (j = 0; j < k; j++) {
101 b[k * i + j] = encode_matrix[k * r + j];
102 backup[k * i + j] = encode_matrix[k * r + j];
103 }
104 decode_index[i] = r;
105 }
106 incr = 0;
107 while (gf_invert_matrix(b, invert_matrix, k) < 0) {
108 if (nerrs == (m - k)) {
109 return -1;
110 }
111 incr++;
112 memcpy(b, backup, (size_t) (m * k));
113 for (i = nsrcerrs; i < nerrs - nsrcerrs; i++) {
114 if (src_err_list[i] == (decode_index[k - 1] + incr)) {
115 // skip the erased parity line
116 incr++;
117 continue;
118 }
119 }
120 if (decode_index[k - 1] + incr >= m) {
121 return -1;
122 }
123 decode_index[k - 1] += incr;
124 for (j = 0; j < k; j++) {
125 b[k * (k - 1) + j] = encode_matrix[k * decode_index[k - 1] + j];
126 }
127
128 };
129
130 for (i = 0; i < nsrcerrs; i++) {
131 for (j = 0; j < k; j++) {
132 decode_matrix[k * i + j] = invert_matrix[k * src_err_list[i] + j];
133 }
134 }
135 /* src_err_list from encode_matrix * invert of b for parity decoding */
136 for (p = nsrcerrs; p < nerrs; p++) {
137 for (i = 0; i < k; i++) {
138 s = 0;
139 for (j = 0; j < k; j++) {
140 s ^= gf_mul(invert_matrix[j * k + i],
141 encode_matrix[k * src_err_list[p] + j]);
142 }
143
144 decode_matrix[k * p + i] = s;
145 }
146 }
147 return 0;
148}
149
151 objcfg( objcfg ),
152 encode_matrix( objcfg.nbchunks * objcfg.nbdata )
153{
154 // k = data
155 // m = data + parity
156 gf_gen_cauchy1_matrix( encode_matrix.data(), static_cast<int>( objcfg.nbchunks ), static_cast<int>( objcfg.nbdata ) );
157}
158
159
160std::string RedundancyProvider::getErrorPattern( stripes_t &stripes ) const
161{
162 std::string pattern( objcfg.nbchunks, 0 );
163 for( uint8_t i = 0; i < objcfg.nbchunks; ++i )
164 if( !stripes[i].valid ) pattern[i] = '\1';
165
166 return pattern;
167}
168
169
170RedundancyProvider::CodingTable& RedundancyProvider::getCodingTable( const std::string& pattern )
171{
172 std::lock_guard<std::mutex> lock(mutex);
173
174 /* If decode matrix is not already cached we have to construct it. */
175 if( !cache.count(pattern) )
176 {
177 /* Expand pattern */
178 int nerrs = 0, nsrcerrs = 0;
179 unsigned char err_indx_list[objcfg.nbparity];
180 for (std::uint8_t i = 0; i < pattern.size(); i++) {
181 if (pattern[i]) {
182 err_indx_list[nerrs++] = i;
183 if (i < objcfg.nbdata) { nsrcerrs++; }
184 }
185 }
186
187 /* Allocate Decode Object. */
188 CodingTable dd;
189 dd.nErrors = nerrs;
190 dd.blockIndices.resize( objcfg.nbdata );
191 dd.table.resize( objcfg.nbdata * objcfg.nbparity * 32);
192
193 /* Compute decode matrix. */
194 std::vector<unsigned char> decode_matrix(objcfg.nbchunks * objcfg.nbdata);
195
196 if (gf_gen_decode_matrix( encode_matrix.data(), decode_matrix.data(), dd.blockIndices.data(),
197 err_indx_list, (unsigned char*) pattern.c_str(), nerrs, nsrcerrs,
198 static_cast<int>( objcfg.nbdata ), static_cast<int>( objcfg.nbchunks ) ) )
199 throw IOError( XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError, errno, "Failed computing decode matrix" ) );
200
201 /* Compute Tables. */
202 ec_init_tables( static_cast<int>( objcfg.nbdata ), nerrs, decode_matrix.data(), dd.table.data() );
203 cache.insert( std::make_pair(pattern, dd) );
204 }
205 return cache.at(pattern);
206}
207
208void RedundancyProvider::replication( stripes_t &stripes )
209{
210 // get index of a valid block
211 void *healthy = nullptr;
212 for( auto itr = stripes.begin(); itr != stripes.end(); ++itr )
213 {
214 if( itr->valid )
215 healthy = itr->buffer;
216 }
217
218 if( !healthy ) throw IOError( XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError ) );
219
220 // now replicate, by now 'buffers' should contain all chunks
221 for( uint8_t i = 0; i < objcfg.nbchunks; ++i )
222 {
223 if( !stripes[i].valid )
224 memcpy( stripes[i].buffer, healthy, objcfg.chunksize );
225 }
226}
227
229{
230 /* throws if stripe is not recoverable */
231 std::string pattern = getErrorPattern( stripes );
232
233 /* nothing to do if there are no parity blocks. */
234 if ( !objcfg.nbparity ) return;
235
236 /* in case of a single data block use replication */
237 if ( objcfg.nbdata == 1 )
238 return replication( stripes );
239
240 /* normal operation: erasure coding */
241 CodingTable& dd = getCodingTable(pattern);
242
243 unsigned char* inbuf[objcfg.nbdata];
244 for( uint8_t i = 0; i < objcfg.nbdata; i++ )
245 inbuf[i] = reinterpret_cast<unsigned char*>( stripes[dd.blockIndices[i]].buffer );
246
247 std::vector<unsigned char> memory( dd.nErrors * objcfg.chunksize );
248
249 unsigned char* outbuf[dd.nErrors];
250 for (int i = 0; i < dd.nErrors; i++)
251 {
252 outbuf[i] = &memory[i * objcfg.chunksize];
253 }
254
255 ec_encode_data(
256 static_cast<int>( objcfg.chunksize ), // Length of each block of data (vector) of source or destination data.
257 static_cast<int>( objcfg.nbdata ), // The number of vector sources in the generator matrix for coding.
258 dd.nErrors, // The number of output vectors to concurrently encode/decode.
259 dd.table.data(), // Pointer to array of input tables
260 inbuf, // Array of pointers to source input buffers
261 outbuf // Array of pointers to coded output buffers
262 );
263
264 int e = 0;
265 for (size_t i = 0; i < objcfg.nbchunks; i++)
266 {
267 if( pattern[i] )
268 {
269 memcpy( stripes[i].buffer, outbuf[e], objcfg.chunksize );
270 e++;
271 }
272 }
273}
274
275
276};
Class for computing parities and recovering data.
static std::string toString(Args &&...args)
RedundancyProvider(const ObjCfg &objcfg)
void compute(stripes_t &stripes)
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errDataError
data is corrupted
std::vector< stripe_t > stripes_t
All stripes in a block.
static int gf_gen_decode_matrix(unsigned char *encode_matrix, unsigned char *decode_matrix, unsigned int *decode_index, unsigned char *src_err_list, unsigned char *src_in_err, unsigned int nerrs, unsigned int nsrcerrs, unsigned int k, unsigned int m)
const uint8_t nbdata
const uint8_t nbchunks
const uint8_t nbparity
const uint64_t chunksize