XRootD
Loading...
Searching...
No Matches
XrdClReplay.cc File Reference
#include "XrdCl/XrdClOperations.hh"
#include "XrdCl/XrdClUtils.hh"
#include "XrdCl/XrdClFileOperations.hh"
#include "XrdSys/XrdSysPthread.hh"
#include "XrdClAction.hh"
#include "XrdClActionMetrics.hh"
#include "XrdClReplayArgs.hh"
#include <fstream>
#include <vector>
#include <tuple>
#include <unordered_map>
#include <chrono>
#include <iostream>
#include <thread>
#include <iomanip>
#include <atomic>
#include <stdarg.h>
#include <getopt.h>
#include <map>
#include <numeric>
#include <mutex>
#include <condition_variable>
+ Include dependency graph for XrdClReplay.cc:

Go to the source code of this file.

Classes

class  XrdCl::ActionExecutor
 Executes an action registered in the csv file. More...
 
class  XrdCl::barrier_t
 
class  XrdCl::BufferPool
 Buffer pool - to limit memory consumption. More...
 
class  XrdCl::mytimer_t
 Timer helper class. More...
 

Namespaces

namespace  XrdCl
 

Typedefs

using XrdCl::action_list = std::multimap< double, ActionExecutor >
 List of actions: start time - action.
 

Functions

bool XrdCl::AssureFile (const std::string &url, uint64_t size, bool viatruncate, bool verify)
 AssureFile creates input data files on the fly if required.
 
std::thread XrdCl::ExecuteActions (std::unique_ptr< File > file, action_list &&actions, double t0, double speed, ActionMetrics &metric, bool simulate)
 
int main (int argc, char **argv)
 
std::unordered_map< File *, action_listXrdCl::ParseInput (const std::string &path, double &t0, double &t1, std::unordered_map< File *, std::string > &filenames, std::unordered_map< File *, double > &synchronicity, std::unordered_map< File *, size_t > &responseerrors, const std::vector< std::string > &option_regex)
 
std::vector< std::string > XrdCl::ToColumns (const std::string &row)
 Split a row into columns.
 
void usage ()
 

Function Documentation

◆ main()

int main ( int  argc,
char **  argv 
)

Definition at line 1108 of file XrdClReplay.cc.

1109{
1110 XrdCl::ReplayArgs opt(argc, argv);
1111 int rc = 0;
1112
1113 try
1114 {
1115 double t0 = 0;
1116 double t1 = 0;
1117 std::unordered_map<XrdCl::File*, std::string> filenames;
1118 std::unordered_map<XrdCl::File*, double> synchronicity;
1119 std::unordered_map<XrdCl::File*, size_t> responseerrors;
1120 auto actions = XrdCl::ParseInput(opt.path(),
1121 t0,
1122 t1,
1123 filenames,
1124 synchronicity,
1125 responseerrors,
1126 opt.regex()); // parse the input file
1127 std::vector<std::thread> threads;
1128 std::unordered_map<XrdCl::File*, XrdCl::ActionMetrics> metrics;
1129 threads.reserve(actions.size());
1130 double toffset = XrdCl::Action::timeNow() - t0;
1131 XrdCl::mytimer_t timer;
1132 XrdCl::ActionMetrics summetric;
1133 bool sampling_error = false;
1134
1135 for (auto& action : actions)
1136 {
1137 metrics[action.first].fname = filenames[action.first];
1138 metrics[action.first].synchronicity = synchronicity[action.first];
1139 metrics[action.first].errors = responseerrors[action.first];
1140 if (metrics[action.first].errors)
1141 {
1142 sampling_error = true;
1143 }
1144 }
1145
1146 if (sampling_error)
1147 {
1148 std::cerr << "Warning: IO file contains unsuccessful samples!" << std::endl;
1149 if (!opt.suppress_error())
1150 {
1151 std::cerr << "... run with [-f] or [--suppress] option to suppress unsuccessful IO events!"
1152 << std::endl;
1153 exit(-1);
1154 }
1155 }
1156
1157
1158 if (opt.print())
1159 toffset = 0; // indicate not to follow timing
1160
1161 for (auto& action : actions)
1162 {
1163 // execute list of actions against file object
1164 threads.emplace_back(ExecuteActions(std::unique_ptr<XrdCl::File>(action.first),
1165 std::move(action.second),
1166 toffset,
1167 opt.speed(),
1168 metrics[action.first],
1169 opt.print()));
1170 }
1171
1172 for (auto& t : threads) // wait until we are done
1173 t.join();
1174
1175 if (opt.json())
1176 {
1177 std::cout << "{" << std::endl;
1178 if (opt.longformat())
1179 std::cout << " \"metrics\": [" << std::endl;
1180 }
1181
1182 for (auto& metric : metrics)
1183 {
1184 if (opt.longformat())
1185 {
1186 std::cout << metric.second.Dump(opt.json());
1187 }
1188 summetric.add(metric.second);
1189 }
1190
1191 if (opt.summary())
1192 std::cout << summetric.Dump(opt.json());
1193
1194 if (opt.json())
1195 {
1196 if (opt.longformat())
1197 std::cout << " ]," << std::endl;
1198 }
1199
1200 double tbench = timer.elapsed();
1201
1202 if (opt.json())
1203 {
1204 {
1205 std::cout << " \"iosummary\": { " << std::endl;
1206 if (!opt.print())
1207 {
1208 std::cout << " \"player::runtime\": " << tbench << "," << std::endl;
1209 }
1210 std::cout << " \"player::speed\": " << opt.speed() << "," << std::endl;
1211 std::cout << " \"sampled::runtime\": " << t1 - t0 << "," << std::endl;
1212 std::cout << " \"volume::totalread\": " << summetric.getBytesRead() << "," << std::endl;
1213 std::cout << " \"volume::totalwrite\": " << summetric.getBytesWritten() << ","
1214 << std::endl;
1215 std::cout << " \"volume::read\": " << summetric.ios["Read::b"] << "," << std::endl;
1216 std::cout << " \"volume::write\": " << summetric.ios["Write::b"] << "," << std::endl;
1217 std::cout << " \"volume::pgread\": " << summetric.ios["PgRead::b"] << "," << std::endl;
1218 std::cout << " \"volume::pgwrite\": " << summetric.ios["PgWrite::b"] << "," << std::endl;
1219 std::cout << " \"volume::vectorread\": " << summetric.ios["VectorRead::b"] << ","
1220 << std::endl;
1221 std::cout << " \"volume::vectorwrite\": " << summetric.ios["VectorWrite::b"] << ","
1222 << std::endl;
1223 std::cout << " \"iops::read\": " << summetric.ios["Read::n"] << "," << std::endl;
1224 std::cout << " \"iops::write\": " << summetric.ios["Write::n"] << "," << std::endl;
1225 std::cout << " \"iops::pgread\": " << summetric.ios["PgRead::n"] << "," << std::endl;
1226 std::cout << " \"iops::pgwrite\": " << summetric.ios["PgRead::n"] << "," << std::endl;
1227 std::cout << " \"iops::vectorread\": " << summetric.ios["VectorRead::n"] << ","
1228 << std::endl;
1229 std::cout << " \"iops::vectorwrite\": " << summetric.ios["VectorRead::n"] << ","
1230 << std::endl;
1231 std::cout << " \"files::read\": " << summetric.ios["OpenR::n"] << "," << std::endl;
1232 std::cout << " \"files::write\": " << summetric.ios["OpenW::n"] << "," << std::endl;
1233 std::cout << " \"datasetsize::read\": " << summetric.ios["Read::o"] << "," << std::endl;
1234 std::cout << " \"datasetsize::write\": " << summetric.ios["Write::o"] << "," << std::endl;
1235 if (!opt.print())
1236 {
1237 std::cout << " \"bandwidth::mb::read\": "
1238 << summetric.getBytesRead() / tbench / 1000000.0 << "," << std::endl;
1239 std::cout << " \"bandwdith::mb::write\": "
1240 << summetric.getBytesWritten() / tbench / 1000000.0 << "," << std::endl;
1241 std::cout << " \"performancemark\": " << (100.0 * (t1 - t0) / tbench) << ","
1242 << std::endl;
1243 std::cout << " \"gain::read\":"
1244 << (100.0 * summetric.delays["Read::tnomi"] / summetric.delays["Read::tmeas"])
1245 << "," << std::endl;
1246 std::cout << " \"gain::write\":"
1247 << (100.0 * summetric.delays["Write::tnomi"] / summetric.delays["Write::tmeas"])
1248 << std::endl;
1249 }
1250 std::cout << " \"synchronicity::read\":"
1251 << summetric.aggregated_synchronicity.ReadSynchronicity() << "," << std::endl;
1252 std::cout << " \"synchronicity::write\":"
1253 << summetric.aggregated_synchronicity.WriteSynchronicity() << "," << std::endl;
1254 std::cout << " \"response::error:\":" << summetric.ios["All::e"] << std::endl;
1255 std::cout << " }" << std::endl;
1256 std::cout << "}" << std::endl;
1257 }
1258 }
1259 else
1260 {
1261 std::cout << "# =============================================" << std::endl;
1262 if (!opt.print())
1263 std::cout << "# IO Summary" << std::endl;
1264 else
1265 std::cout << "# IO Summary (print mode)" << std::endl;
1266 std::cout << "# =============================================" << std::endl;
1267 if (!opt.print())
1268 {
1269 std::cout << "# Total Runtime : " << std::fixed << tbench << " s" << std::endl;
1270 }
1271 std::cout << "# Sampled Runtime : " << std::fixed << t1 - t0 << " s" << std::endl;
1272 std::cout << "# Playback Speed : " << std::fixed << std::setprecision(2) << opt.speed()
1273 << std::endl;
1274 std::cout << "# IO Volume (R) : " << std::fixed
1276 << " [ std:" << XrdCl::ActionMetrics::humanreadable(summetric.ios["Read::b"])
1277 << " vec:" << XrdCl::ActionMetrics::humanreadable(summetric.ios["VectorRead::b"])
1278 << " page:" << XrdCl::ActionMetrics::humanreadable(summetric.ios["PgRead::b"])
1279 << " ] " << std::endl;
1280 std::cout << "# IO Volume (W) : " << std::fixed
1282 << " [ std:" << XrdCl::ActionMetrics::humanreadable(summetric.ios["Write::b"])
1283 << " vec:" << XrdCl::ActionMetrics::humanreadable(summetric.ios["VectorWrite::b"])
1284 << " page:" << XrdCl::ActionMetrics::humanreadable(summetric.ios["PgWrite::b"])
1285 << " ] " << std::endl;
1286 std::cout << "# IOPS (R) : " << std::fixed << summetric.getIopsRead()
1287 << " [ std:" << summetric.ios["Read::n"]
1288 << " vec:" << summetric.ios["VectorRead::n"]
1289 << " page:" << summetric.ios["PgRead::n"] << " ] " << std::endl;
1290 std::cout << "# IOPS (W) : " << std::fixed << summetric.getIopsWrite()
1291 << " [ std:" << summetric.ios["Write::n"]
1292 << " vec:" << summetric.ios["VectorWrite::n"]
1293 << " page:" << summetric.ios["PgWrite::n"] << " ] " << std::endl;
1294 std::cout << "# Files (R) : " << std::fixed << summetric.ios["OpenR::n"] << std::endl;
1295 std::cout << "# Files (W) : " << std::fixed << summetric.ios["OpenW::n"] << std::endl;
1296 std::cout << "# Datasize (R) : " << std::fixed
1297 << XrdCl::ActionMetrics::humanreadable(summetric.ios["Read::o"]) << std::endl;
1298 std::cout << "# Datasize (W) : " << std::fixed
1299 << XrdCl::ActionMetrics::humanreadable(summetric.ios["Write::o"]) << std::endl;
1300 if (!opt.print())
1301 {
1302 std::cout << "# IO BW (R) : " << std::fixed << std::setprecision(2)
1303 << summetric.getBytesRead() / tbench / 1000000.0 << " MB/s" << std::endl;
1304 std::cout << "# IO BW (W) : " << std::fixed << std::setprecision(2)
1305 << summetric.getBytesRead() / tbench / 1000000.0 << " MB/s" << std::endl;
1306 }
1307 std::cout << "# ---------------------------------------------" << std::endl;
1308 std::cout << "# Quality Estimation" << std::endl;
1309 std::cout << "# ---------------------------------------------" << std::endl;
1310 if (!opt.print())
1311 {
1312 std::cout << "# Performance Mark : " << std::fixed << std::setprecision(2)
1313 << (100.0 * (t1 - t0) / tbench) << "%" << std::endl;
1314 std::cout << "# Gain Mark(R) : " << std::fixed << std::setprecision(2)
1315 << (100.0 * summetric.delays["Read::tnomi"] / summetric.delays["Read::tmeas"])
1316 << "%" << std::endl;
1317 std::cout << "# Gain Mark(W) : " << std::fixed << std::setprecision(2)
1318 << (100.0 * summetric.delays["Write::tnomi"] / summetric.delays["Write::tmeas"])
1319 << "%" << std::endl;
1320 }
1321 std::cout << "# Synchronicity(R) : " << std::fixed << std::setprecision(2)
1322 << summetric.aggregated_synchronicity.ReadSynchronicity() << "%" << std::endl;
1323 std::cout << "# Synchronicity(W) : " << std::fixed << std::setprecision(2)
1324 << summetric.aggregated_synchronicity.WriteSynchronicity() << "%" << std::endl;
1325 if (!opt.print())
1326 {
1327 std::cout << "# ---------------------------------------------" << std::endl;
1328 std::cout << "# Response Errors : " << std::fixed << summetric.ios["All::e"] << std::endl;
1329 std::cout << "# =============================================" << std::endl;
1330 if (summetric.ios["All::e"])
1331 {
1332 std::cerr << "Error: replay job failed with IO errors!" << std::endl;
1333 rc = -5;
1334 }
1335 }
1336 if (opt.create() || opt.verify())
1337 {
1338 std::cout << "# ---------------------------------------------" << std::endl;
1339 if (opt.create())
1340 {
1341 std::cout << "# Creating Dataset ..." << std::endl;
1342 }
1343 else
1344 {
1345 std::cout << "# Verifying Dataset ..." << std::endl;
1346 }
1347 uint64_t created_sofar = 0;
1348 for (auto& metric : metrics)
1349 {
1350 if (metric.second.getBytesRead() && !metric.second.getBytesWritten())
1351 {
1352 std::cout << "# ............................................." << std::endl;
1353 std::cout << "# file: " << metric.second.fname << std::endl;
1354 std::cout << "# size: "
1355 << XrdCl::ActionMetrics::humanreadable(metric.second.ios["Read::o"]) << " [ "
1356 << XrdCl::ActionMetrics::humanreadable(created_sofar) << " out of "
1357 << XrdCl::ActionMetrics::humanreadable(summetric.ios["Read::o"]) << " ] "
1358 << std::setprecision(2) << " ( "
1359 << 100.0 * created_sofar / summetric.ios["Read::o"] << "% )" << std::endl;
1360 if (!XrdCl::AssureFile(
1361 metric.second.fname, metric.second.ios["Read::o"], opt.truncate(), opt.verify()))
1362 {
1363 if (opt.verify())
1364 {
1365 rc = -5;
1366 }
1367 else
1368 {
1369 std::cerr << "Error: failed to assure that file " << metric.second.fname
1370 << " is stored with a size of "
1371 << XrdCl::ActionMetrics::humanreadable(metric.second.ios["Read::o"])
1372 << " !!!";
1373 rc = -5;
1374 }
1375 }
1376 }
1377 }
1378 }
1379 }
1380 }
1381 catch (const std::invalid_argument& ex)
1382 {
1383 std::cout << ex.what() << std::endl; // print parsing errors
1384 return 1;
1385 }
1386
1387 return rc;
1388}
Args parse for XrdClReplay.
Timer helper class.
double elapsed() const
std::unordered_map< File *, action_list > ParseInput(const std::string &path, double &t0, double &t1, std::unordered_map< File *, std::string > &filenames, std::unordered_map< File *, double > &synchronicity, std::unordered_map< File *, size_t > &responseerrors, const std::vector< std::string > &option_regex)
std::thread ExecuteActions(std::unique_ptr< File > file, action_list &&actions, double t0, double speed, ActionMetrics &metric, bool simulate)
bool AssureFile(const std::string &url, uint64_t size, bool viatruncate, bool verify)
AssureFile creates input data files on the fly if required.
Metrics struct storing all timing and IO information of an action.
synchronicity_t aggregated_synchronicity
void add(const ActionMetrics &other)
std::map< std::string, uint64_t > ios
std::map< std::string, double > delays
std::string Dump(bool json) const
static std::string humanreadable(uint64_t insize)
static double timeNow()
Get curretn unix time in ns precision as a double.

References XrdCl::ActionMetrics::add(), XrdCl::ActionMetrics::aggregated_synchronicity, XrdCl::AssureFile(), XrdCl::ReplayArgs::create(), XrdCl::ActionMetrics::delays, XrdCl::ActionMetrics::Dump(), XrdCl::mytimer_t::elapsed(), XrdCl::ActionMetrics::fname, XrdCl::ActionMetrics::getBytesRead(), XrdCl::ActionMetrics::getBytesWritten(), XrdCl::ActionMetrics::getIopsRead(), XrdCl::ActionMetrics::getIopsWrite(), XrdCl::ActionMetrics::humanreadable(), XrdCl::ActionMetrics::ios, XrdCl::ReplayArgs::json(), XrdCl::ReplayArgs::longformat(), XrdCl::ParseInput(), XrdCl::ReplayArgs::path(), XrdCl::ReplayArgs::print(), XrdCl::ActionMetrics::synchronicity_t::ReadSynchronicity(), XrdCl::ReplayArgs::regex(), XrdCl::ReplayArgs::speed(), XrdCl::ReplayArgs::summary(), XrdCl::ReplayArgs::suppress_error(), XrdCl::Action::timeNow(), XrdCl::ReplayArgs::truncate(), XrdCl::ReplayArgs::verify(), and XrdCl::ActionMetrics::synchronicity_t::WriteSynchronicity().

+ Here is the call graph for this function:

◆ usage()

void usage ( )

Definition at line 1072 of file XrdClReplay.cc.

1073{
1074 std::cerr
1075 << "usage: xrdreplay [-p|--print] [-c|--create-data] [t|--truncate-data] [-l|--long] [-s|--summary] [-h|--help] [-r|--replace <arg>:=<newarg>] [-f|--suppress] <recordfilename>\n"
1076 << std::endl;
1077 std::cerr << " -h | --help : show this help" << std::endl;
1078 std::cerr
1079 << " -f | --suppress : force to run all IO with all successful result status - suppress all others"
1080 << std::endl;
1081 std::cerr
1082 << " - by default the player won't run with an unsuccessfully recorded IO"
1083 << std::endl;
1084 std::cerr << std::endl;
1085 std::cerr
1086 << " -p | --print : print only mode - shows all the IO for the given replay file without actually running any IO"
1087 << std::endl;
1088 std::cerr
1089 << " -s | --summary : print summary - shows all the aggregated IO counter summed for all files"
1090 << std::endl;
1091 std::cerr
1092 << " -l | --long : print long - show all file IO counter for each individual file"
1093 << std::endl;
1094 std::cerr
1095 << " -r | --replace <a>:=<b> : replace in the argument list the string <a> with <b> "
1096 << std::endl;
1097 std::cerr
1098 << " - option is usable several times e.g. to change storage prefixes or filenames"
1099 << std::endl;
1100 std::cerr << std::endl;
1101 std::cerr
1102 << "example: ... --replace file:://localhost:=root://xrootd.eu/ : redirect local file to remote"
1103 << std::endl;
1104 std::cerr << std::endl;
1105 exit(-1);
1106}

Referenced by XrdPfc::Cache::ExecuteCommandUrl(), XrdSysXSLock::Lock(), main(), and XrdSysXSLock::UnLock().

+ Here is the caller graph for this function: