REST-for-Physics  v2.3
Rare Event Searches ToolKit for Physics
TRestProcessRunner.cxx
1
24
25#include <TGeoManager.h>
26
27#include "Math/MinimizerOptions.h"
28#include "TBranchRef.h"
29#include "TInterpreter.h"
30#include "TMinuitMinimizer.h"
31#include "TMutex.h"
32#include "TROOT.h"
33#include "TRestManager.h"
34#include "TRestThread.h"
35
36#ifdef WIN32
37#include <io.h>
38#else
39#include "unistd.h"
40#endif // !WIN32
41
42std::mutex mutex_write;
43
44using namespace std;
45#ifdef TIME_MEASUREMENT
46#include <chrono>
47using namespace std::chrono;
48int deltaTime;
49int writeTime;
50int readTime;
51high_resolution_clock::time_point tS, tE;
52#endif
53
54int ncalculated = 10;
55Long64_t bytesReadLast = 0;
56Double_t progressLast = 0;
57Int_t progressLastPrinted = 0;
58vector<Long64_t> bytesAdded(ncalculated, 0);
59vector<Double_t> progAdded(ncalculated, 0);
60int positionCalculated = 0;
61int printInterval = 200000; // 0.2s
62int inputTreeEntries = 0;
63
64ClassImp(TRestProcessRunner);
65
66TRestProcessRunner::TRestProcessRunner() { Initialize(); }
67
68TRestProcessRunner::~TRestProcessRunner() {}
69
74 fRunInfo = nullptr;
75 fInputEvent = nullptr;
76 fOutputEvent = nullptr;
77 fEventTree = nullptr;
78 fAnalysisTree = nullptr;
79 fOutputDataFile = nullptr;
81
82 fThreads.clear();
83 fProcessInfo.clear();
84
85 fThreadNumber = 0;
86 fFirstEntry = 0;
87 fNFilesSplit = 0;
88 fEventsToProcess = 0;
89 fProcessedEvents = 0;
90 fProcessNumber = 0;
91 fProcStatus = kNormal;
92 fFileSplitSize = 10000000000LL; // 10GB maximum
93 fFileCompression = 2; // default compression level
94
95 fUseTestRun = true;
96 fUsePauseMenu = true;
97 fValidateObservables = false;
98 fSortOutputEvents = true;
99 fInputAnalysisStorage = true;
100 fInputEventStorage = true;
101 fOutputEventStorage = true;
102 fOutputAnalysisStorage = true;
103}
104
115 RESTInfo << RESTendl;
116 if (fHostmgr != nullptr) {
117 fRunInfo = fHostmgr->GetRunInfo();
118 if (fRunInfo == nullptr) {
119 RESTError << "File IO has not been specified, " << RESTendl;
120 RESTError << "please make sure the \"TRestFiles\" section is ahead of the "
121 "\"TRestProcessRunner\" section"
122 << RESTendl;
123 exit(0);
124 }
125 } else {
126 RESTError << "manager not initialized!" << RESTendl;
127 exit(0);
128 }
129
130 // Exiting as soon as possible in order to avoid seg.fault at TRestThread.
131 // And sometimes gets stuck - probably a thread not closed? - and the execution
132 // of restManager does not end even if TRestThread calls exit(1)
133 // I believe it is risky to exit() at TRestThread without closing threads.
134 // It is a guess (J.G.)
135 if (!fRunInfo->GetFileProcess() && fRunInfo->GetEntries() == 0) {
136 RESTError << "TRestProcessRunner::BeginOfInit. The input file is a valid REST file but entries are 0!"
137 << RESTendl;
138 exit(1);
139 }
140
142
143 // firstEntry = StringToInteger(GetParameter("firstEntry", "0"));
144 // eventsToProcess = StringToInteger(GetParameter("eventsToProcess", "0"));
145 int lastEntry = StringToInteger(GetParameter("lastEntry", "0"));
146 if (lastEntry - fFirstEntry > 0 && fEventsToProcess == 0) {
147 fEventsToProcess = lastEntry - fFirstEntry;
148 } else if (fEventsToProcess > 0 && lastEntry - fFirstEntry > 0 &&
149 lastEntry - fFirstEntry != fEventsToProcess) {
150 RESTWarning << "Conflict number of events to process!" << RESTendl;
151 } else if (fEventsToProcess > 0 && lastEntry - fFirstEntry == 0) {
152 lastEntry = fFirstEntry + fEventsToProcess;
153 } else if (fEventsToProcess == 0 && fFirstEntry == 0 && lastEntry == 0) {
154 fEventsToProcess = REST_MAXIMUM_EVENTS;
155 lastEntry = REST_MAXIMUM_EVENTS;
156 } else {
157 RESTWarning << "error setting of event number" << RESTendl;
158 fEventsToProcess = fEventsToProcess > 0 ? fEventsToProcess : REST_MAXIMUM_EVENTS;
159 fFirstEntry = fFirstEntry > 0 ? fFirstEntry : 0;
160 lastEntry = lastEntry == fFirstEntry + fEventsToProcess ? lastEntry : REST_MAXIMUM_EVENTS;
161 }
162 fRunInfo->SetCurrentEntry(fFirstEntry);
163
164 if (fFileSplitSize < 50000000LL || fFileSplitSize > 100000000000LL) {
165 RESTWarning << "automatic file splitting size cannot < 10MB or > 100GB, setting to default (10GB)."
166 << RESTendl;
167 fFileSplitSize = 10000000000LL;
168 }
169
170 // fUseTestRun = StringToBool(GetParameter("useTestRun", "ON"));
171 // fUsePauseMenu = StringToBool(GetParameter("usePauseMenu", "OFF"));
173 fProcStatus = kIgnore;
174 if (!fOutputAnalysisStorage) {
175 RESTError << "output analysis must be turned on to process data!" << RESTendl;
176 exit(1);
177 }
178 // fValidateObservables = StringToBool(GetParameter("validateObservables", "OFF"));
179 // fSortOutputEvents = StringToBool(GetParameter("sortOutputEvents", "ON"));
180 // fThreadNumber = StringToDouble(GetParameter("threadNumber", "1"));
181 // if (ToUpper(GetParameter("inputAnalysis", "ON")) == "ON") fOutputItem[0] = true;
182 // if (ToUpper(GetParameter("inputEvent", "OFF")) == "ON") fOutputItem[1] = true;
183 // if (ToUpper(GetParameter("outputEvent", "ON")) == "ON") fOutputItem[2] = true;
184 // fOutputItem[3] = true;
185
186 // fOutputItem = Split(GetParameter("treeBranches",
187 // "inputevent:outputevent:inputanalysis"), ":");
188 if (fThreadNumber < 1) {
189 fThreadNumber = 1;
190 }
191 if (fThreadNumber > 15) {
192 fThreadNumber = 15;
193 }
194
195 for (int i = 0; i < fThreadNumber; i++) {
196 TRestThread* t = new TRestThread();
197 t->SetProcessRunner(this);
198 t->SetVerboseLevel(fVerboseLevel);
199 t->SetThreadId(i);
200 t->SetCompressionLevel(fFileCompression);
201 fThreads.push_back(t);
202 }
203}
204
213Int_t TRestProcessRunner::ReadConfig(const string& keydeclare, TiXmlElement* e) {
214 if (keydeclare == "addProcess") {
215 string active = GetParameter("value", e, "");
216 if (active != "" && ToUpper(active) != "ON") return 0;
217
218 string processName = GetParameter("name", e, "");
219
220 string processType = GetParameter("type", e, "");
221
222 if (processType == "") {
223 RESTWarning << "Bad expression of addProcess" << RESTendl;
224 return 0;
225 } else if (processName == "") {
226 RESTWarning << "Event process " << processType << " has no name, it will be skipped" << RESTendl;
227 return 0;
228 }
229
230 RESTInfo << "adding process " << processType << " \"" << processName << "\"" << RESTendl;
231 vector<TRestEventProcess*> processes;
232 for (int i = 0; i < fThreadNumber; i++) {
233 TRestEventProcess* p = InstantiateProcess(processType, e);
234 if (p != nullptr) {
235 if (p->isExternal()) {
236 fRunInfo->SetExtProcess(p);
237 return 0;
238 }
240 p->singleThreadOnly())) {
241 fUsePauseMenu = false;
242 fProcStatus = kIgnore;
243 if (fThreadNumber > 1) {
244 RESTInfo << "multi-threading is disabled due to process \"" << p->GetName() << "\""
245 << RESTendl;
246 RESTInfo << "This process is in debug mode or is single thread only" << RESTendl;
247 for (i = fThreadNumber; i > 1; i--) {
248 // delete (*fThreads.end());
249 fThreads.erase(fThreads.end() - 1);
250 fThreadNumber--;
251 }
252 }
253 }
254 processes.push_back(p);
255 } else {
256 return 1;
257 }
258 }
259
260 for (int i = 0; i < fThreadNumber; i++) {
261 TRestEventProcess* p = processes[i];
262 for (int j = 0; j < fThreadNumber; j++) {
263 p->SetParallelProcess(processes[j]);
264 }
265 fThreads[i]->AddProcess(p);
266 }
267
268 fProcessNumber++;
269 RESTDebug << "process \"" << processType << "\" has been added!" << RESTendl;
270 return 0;
271 }
272
273 return 0;
274}
275
283 RESTDebug << "Validating process chain..." << RESTendl;
284
285 if (fRunInfo->GetFileProcess() != nullptr) {
286 fInputEvent = fRunInfo->GetFileProcess()->GetOutputEvent();
287 } else {
288 if (fThreads[0]->GetProcessnum() > 0 &&
289 fThreads[0]->GetProcess(0)->GetInputEvent().address != nullptr) {
290 string name = fThreads[0]->GetProcess(0)->GetInputEvent().type;
292 a->Initialize();
293 fRunInfo->SetInputEvent(a);
294 }
295 fInputEvent = fRunInfo->GetInputEvent();
296 }
297 if (fInputEvent == nullptr) {
298 RESTError << "Cannot determine input event, validating process chain failed!" << RESTendl;
299 exit(1);
300 }
301
302 if (fProcessNumber > 0) {
303 if (fThreads[0]->ValidateChain(fInputEvent) == -1) exit(1);
304 }
305
306 ReadProcInfo();
307}
308
315 if (fRunInfo->GetFileProcess() != nullptr) {
316 fProcessInfo["FirstProcess"] = fRunInfo->GetFileProcess()->GetName();
317 } else {
318 if (fProcessNumber > 0) fProcessInfo["FirstProcess"] = fThreads[0]->GetProcess(0)->GetName();
319 }
320 int n = fProcessNumber;
321 fProcessInfo["LastProcess"] =
322 (n == 0 ? fProcessInfo["FirstProcess"] : fThreads[0]->GetProcess(n - 1)->GetName());
323 fProcessInfo["ProcNumber"] = ToString(n);
324}
325
343 RESTDebug << "Creating output File " << fRunInfo->GetOutputFileName() << RESTendl;
344
345 TString filename = fRunInfo->FormFormat(fRunInfo->GetOutputFileName());
346 fOutputDataFileName = filename;
347 fOutputDataFile = new TFile(filename, "recreate");
348 // set compression level here will cause problem in pipeline
349 // we must set in each threadCompression
350 // fOutputDataFile->SetCompressionLevel(fFile);
351 if (!fOutputDataFile->IsOpen()) {
352 RESTError << "Failed to create output file: " << fOutputDataFile->GetName() << RESTendl;
353 exit(1);
354 }
355 RESTInfo << RESTendl;
356 RESTInfo << "TRestProcessRunner : preparing threads..." << RESTendl;
357 fRunInfo->ResetEntry();
358 fRunInfo->SetCurrentEntry(fFirstEntry);
359 for (int i = 0; i < fThreadNumber; i++) {
360 fThreads[i]->PrepareToProcess(&fInputAnalysisStorage);
361 }
362
363 // print metadata
364 if (fRunInfo->GetFileProcess() != nullptr) {
365 RESTEssential << this->ClassName() << ": 1 + " << fProcessNumber << " processes loaded, "
366 << fThreadNumber << " threads prepared!" << RESTendl;
367 } else {
368 RESTEssential << this->ClassName() << ": " << fProcessNumber << " processes loaded, " << fThreadNumber
369 << " threads prepared!" << RESTendl;
370 }
372 if (fRunInfo->GetFileProcess() != nullptr) fRunInfo->GetFileProcess()->PrintMetadata();
373
374 for (int i = 0; i < fProcessNumber; i++) {
375 fThreads[0]->GetProcess(i)->PrintMetadata();
376 }
377 } else {
378 if (fRunInfo->GetFileProcess() != nullptr) {
379 RESTcout << "(external) " << fRunInfo->GetFileProcess()->ClassName() << " : "
380 << fRunInfo->GetFileProcess()->GetName() << RESTendl;
381 }
382 for (int i = 0; i < fProcessNumber; i++) {
383 RESTcout << "++ " << fThreads[0]->GetProcess(i)->ClassName() << " : "
384 << fThreads[0]->GetProcess(i)->GetName() << RESTendl;
385 }
386 }
387 RESTcout << "=" << RESTendl;
388
389 // copy thread's event tree to local
390 fOutputDataFile->cd();
391 TTree* tree = fThreads[0]->GetEventTree();
392 if (tree != nullptr) {
393 fEventTree = (TTree*)tree->Clone();
394 fEventTree->SetName("EventTree");
395 fEventTree->SetTitle("REST Event Tree");
396 fEventTree->SetDirectory(fOutputDataFile);
397 TTree::SetMaxTreeSize(100000000000LL > fFileSplitSize * 2
398 ? 100000000000LL
399 : fFileSplitSize * 2); // the default size is 100GB
400 } else {
401 fEventTree = nullptr;
402 }
403
404 // initialize analysis tree
405 fAnalysisTree = new TRestAnalysisTree("AnalysisTree", "REST Process Analysis Tree");
406 fAnalysisTree->SetDirectory(fOutputDataFile);
407 TRestAnalysisTree::SetMaxTreeSize(100000000000LL > fFileSplitSize * 2 ? 100000000000LL
408 : fFileSplitSize * 2);
409
410 tree = fThreads[0]->GetAnalysisTree();
411 if (tree != nullptr) {
412 fNBranches = tree->GetNbranches();
413 } else {
414 RESTError << "Threads are not initialized! No AnalysisTree!" << RESTendl;
415 exit(1);
416 }
417
418 fOutputDataFile->cd();
419 if (fEventTree != nullptr) {
420 fEventTree->Write(nullptr, kOverwrite);
421 }
422 if (fAnalysisTree != nullptr) {
423 fAnalysisTree->Write(nullptr, kOverwrite);
424 }
425
426 // reset runner
427 this->ResetRunTimes();
428 fProcessedEvents = 0;
429 fRunInfo->ResetEntry();
430 fRunInfo->SetCurrentEntry(fFirstEntry);
431 inputTreeEntries = fRunInfo->GetEntries();
432
433 // set root mutex
435 ROOT::Math::MinimizerOptions::SetDefaultMinimizer("Minuit");
436 TMinuitMinimizer::UseStaticMinuit(false);
437 if (gGlobalMutex == nullptr) {
438 gGlobalMutex = new TMutex(true);
439 gROOTMutex = gGlobalMutex;
440 gInterpreterMutex = gGlobalMutex;
441 }
442
443#ifdef TIME_MEASUREMENT
444 high_resolution_clock::time_point t3 = high_resolution_clock::now();
445#endif
446
447 // start the thread!
448 RESTcout << this->ClassName() << ": Starting the Process.." << RESTendl;
449 for (int i = 0; i < fThreadNumber; i++) {
450 fThreads[i]->StartThread();
451 }
452
453 while (fProcStatus == kPause ||
454 (fRunInfo->GetInputEvent() != nullptr && fEventsToProcess > fProcessedEvents)) {
456
457 if (fProcStatus == kNormal && Console::kbhit()) // if keyboard inputs
458 {
459 int a = Console::ReadKey(); // get char
460
461 if (a == 'p') {
462 fProcStatus = kPause;
463 usleep(100000); // wait 0.1s for the processes to finish;
465 "| |", TRestStringOutput::REST_Display_Orientation::kMiddle);
466 Console::ClearLinesAfterCursor();
467 RESTLog << RESTendl;
468 RESTLog << "-" << RESTendl;
469 RESTLog << "PROCESS PAUSED!" << RESTendl;
470 RESTLog << "-" << RESTendl;
471 RESTLog << " " << RESTendl;
472 }
473 }
474
475 if (fProcStatus == kPause) {
476 PauseMenu();
477 }
478 if (fProcStatus == kStopping) {
479 break;
480 }
481
482 usleep(printInterval);
483
484 // cout << eventsToProcess << " " << fProcessedEvents << " " << lastEntry <<
485 // " " << fCurrentEvent << endl; cout << fProcessedEvents << "\r";
486 // printf("%d processed events now...\r", fProcessedEvents); fflush(stdout);
487 }
488
489 if (fProcStatus != kIgnore && Console::kbhit())
490 while (getchar() != '\n')
491 ; // clear buffer
492
493 RESTEssential << "Waiting for processes to finish ..." << RESTendl;
494
495 while (true) {
496 usleep(100000);
497 bool finish = fThreads[0]->Finished();
498 for (int i = 1; i < fThreadNumber; i++) {
499 finish = finish && fThreads[i]->Finished();
500 }
501 if (finish) break;
502 }
503
504 // make dummy analysis tree filled with observables
505 fAnalysisTree->GetEntry(fAnalysisTree->GetEntries() - 1);
506 // call EndProcess() for all processes
507 for (int i = 0; i < fThreadNumber; i++) {
508 fThreads[i]->EndProcess();
509 }
510 if (fRunInfo->GetFileProcess()) {
511 fRunInfo->GetFileProcess()->EndProcess();
512 }
513 fProcStatus = kFinished;
514
515#ifdef TIME_MEASUREMENT
516 high_resolution_clock::time_point t4 = high_resolution_clock::now();
517 deltaTime = (int)duration_cast<microseconds>(t4 - t3).count();
518#endif
519
520 // reset the mutex to null
521 delete gGlobalMutex;
522 gGlobalMutex = nullptr;
523 gROOTMutex = nullptr;
524 gInterpreterMutex = nullptr;
525
526 RESTcout << this->ClassName() << ": " << fProcessedEvents << " processed events" << RESTendl;
527
528#ifdef TIME_MEASUREMENT
529 RESTInfo << "Total processing time : " << ((Double_t)deltaTime) / 1000. << " ms" << RESTendl;
530 RESTInfo << "Average read time from disk (per event) : "
531 << ((Double_t)readTime) / fProcessedEvents / 1000. << " ms" << RESTendl;
532 RESTInfo << "Average process time (per event) : "
533 << ((Double_t)(deltaTime - readTime - writeTime)) / fProcessedEvents / 1000. << " ms"
534 << RESTendl;
535 RESTInfo << "Average write time to disk (per event) : "
536 << ((Double_t)writeTime) / fProcessedEvents / 1000. << " ms" << RESTendl;
537 RESTInfo << "=" << RESTendl;
538#endif
539
540 if (fRunInfo->GetOutputFileName() != "/dev/null") {
542 }
543}
544
556 TRestStringOutput::REST_Display_Orientation::kMiddle);
557 Console::ClearLinesAfterCursor();
558
559 RESTLog << "--------------MENU--------------" << RESTendl;
560 RESTLog << "\"v\" : change the verbose level" << RESTendl;
561 RESTLog << "\"n\" : push foward one event, then pause" << RESTendl;
562 RESTLog << "\"l\" : print the latest processed event" << RESTendl;
563 RESTLog << "\"d\" : detach the current process" << RESTendl;
564 RESTLog << "\"q\" : stop and quit the process" << RESTendl;
565 RESTLog << "press \"p\" to continue process..." << RESTendl;
566 RESTLog << "-" << RESTendl;
567 RESTLog << RESTendl;
568 RESTLog << RESTendl;
569 int menuupper = 15;
570 int infobar = 11;
571 while (true) {
572 Console::ClearCurrentLine();
573 int b = Console::ReadKey(); // no need to press enter
574
575 if (b == 'v') {
576 Console::CursorUp(infobar);
577 RESTLog.setcolor(COLOR_BOLDGREEN);
578 RESTLog << "Changing verbose level for all the processes..." << RESTendl;
579 RESTLog.setcolor(COLOR_BOLDWHITE);
580 Console::CursorDown(1);
581 Console::ClearLinesAfterCursor();
582 RESTLog << "type \"0\"/\"s\" to set level silent" << RESTendl;
583 RESTLog << "type \"1\"/\"e\" to set level essential" << RESTendl;
584 RESTLog << "type \"2\"/\"i\" to set level info" << RESTendl;
585 RESTLog << "type \"3\"/\"d\" to set level debug" << RESTendl;
586 RESTLog << "type \"4\"/\"x\" to set level extreme" << RESTendl;
587 RESTLog << "type other to return the pause menu" << RESTendl;
588 RESTLog << "-" << RESTendl;
589 RESTLog << RESTendl;
590 RESTLog << RESTendl;
591 while (true) {
592 Console::CursorUp(1);
593 int c = Console::Read();
594 if (c != '\n')
595 while (Console::Read() != '\n')
596 ;
598 if (c == '0' || c == 's') {
600 } else if (c == '1' || c == 'e') {
602 } else if (c == '2' || c == 'i') {
604 } else if (c == '3' || c == 'd') {
606 } else if (c == '4' || c == 'x') {
608 } else {
609 Console::CursorUp(infobar);
610 RESTLog.setcolor(COLOR_BOLDYELLOW);
611 RESTLog << "Verbose level not set!" << RESTendl;
612 RESTLog.setcolor(COLOR_BOLDWHITE);
613 break;
614 }
615
616 gVerbose = l;
617 this->SetVerboseLevel(l);
618 for (int i = 0; i < fThreadNumber; i++) {
619 fThreads[i]->SetVerboseLevel(l);
620 for (int j = 0; j < fThreads[i]->GetProcessnum(); j++) {
621 fThreads[i]->GetProcess(j)->SetVerboseLevel(l);
622 }
623 }
624 Console::CursorUp(infobar);
625 RESTLog.setcolor(COLOR_BOLDGREEN);
626 RESTLog << "Verbose level has been set to " << ToString(static_cast<int>(l)) << "!"
627 << RESTendl;
628 RESTLog.setcolor(COLOR_BOLDWHITE);
629 break;
630 }
631 Console::ClearLinesAfterCursor();
632 break;
633 } else if (b == 'd') {
634 Console::CursorUp(infobar);
635 RESTLog.setcolor(COLOR_BOLDGREEN);
636 RESTLog << "Detaching restManager to backend" << RESTendl;
637 RESTLog.setcolor(COLOR_BOLDWHITE);
638 Console::CursorDown(1);
639 Console::ClearLinesAfterCursor();
640 RESTLog << "type filename for output redirect" << RESTendl;
641 RESTLog << "leave blank to redirect to /dev/null" << RESTendl;
642 RESTLog << " " << RESTendl;
643 RESTLog << " " << RESTendl;
644 RESTLog << " " << RESTendl;
645 RESTLog << " " << RESTendl;
646 RESTLog << "-" << RESTendl;
647 RESTLog << RESTendl;
648 RESTLog << RESTendl;
649
650 string file;
651
652 Console::CursorUp(1);
653 file = Console::ReadLine();
654 if (file == "") file = "/dev/null";
655 if (TRestTools::fileExists(file)) {
656 if (!TRestTools::isPathWritable(file)) {
657 Console::CursorUp(infobar);
658 RESTLog.setcolor(COLOR_BOLDYELLOW);
659 RESTLog << "file not writeable!" << RESTendl;
660 RESTLog.setcolor(COLOR_BOLDWHITE);
661 break;
662 }
663 } else {
665 Console::CursorUp(infobar);
666 RESTLog.setcolor(COLOR_BOLDYELLOW);
667 RESTLog << "path not writeable!" << RESTendl;
668 RESTLog.setcolor(COLOR_BOLDWHITE);
669 break;
670 }
671 }
672
673#ifdef WIN32
674 RESTWarning << "fork not available on windows!" << RESTendl;
675#else
676 pid_t pid;
677 pid = fork();
678 if (pid < 0) {
679 perror("fork error:");
680 exit(1);
681 }
682 // child process
683 if (pid == 0) {
684 RESTcout << "Child process created! pid: " << getpid() << RESTendl;
685 RESTInfo << "Restarting threads" << RESTendl;
686 mutex_write.unlock();
687 for (int i = 0; i < fThreadNumber; i++) {
688 fThreads[i]->StartThread();
689 }
690 RESTInfo << "Re-directing output to " << file << RESTendl;
691 FILE* f = freopen(file.c_str(), "w", stdout);
692 if (f == nullptr) RESTWarning << "Couldnt redirect output for file: " << file << RESTendl;
693 REST_Display_CompatibilityMode = true;
694 }
695 // father process
696 else {
697 exit(0);
698 }
699 fProcStatus = kNormal;
700 RESTInfo << "Continue processing..." << RESTendl;
701
702#endif // WIN32
703
704 break;
705 } else if (b == 'n') {
706 fProcStatus = kStep;
707 break;
708 } else if (b == 'l') {
709 // Console::ClearScreen();
710 fOutputEvent->PrintEvent();
711 break;
712 } else if (b == 'q') {
713 fProcStatus = kStopping;
714 break;
715 } else if (b == 'p') {
716 Console::CursorUp(menuupper);
717 Console::ClearLinesAfterCursor();
719 fProcStatus = kIgnore;
720 } else {
721 fProcStatus = kNormal;
722 }
723 break;
724 } else if (b == '\n') {
725 // CursorUp(1);
726 } else {
727 Console::CursorUp(infobar);
728 RESTLog.setcolor(COLOR_BOLDYELLOW);
729 RESTLog << "Invailed option \"" << (char)b << "\" (key value: " << b << ") !" << RESTendl;
730 RESTLog.setcolor(COLOR_BOLDWHITE);
731 Console::CursorDown(infobar - 1);
732 }
733 }
734}
735
760 mutex_write.lock(); // lock on
761 while (fProcStatus == kPause) {
762 usleep(100000);
763 }
764#ifdef TIME_MEASUREMENT
765 high_resolution_clock::time_point t1 = high_resolution_clock::now();
766#endif
767 int n;
768 if (fProcessedEvents >= fEventsToProcess || targetevt == nullptr || fProcStatus == kStopping) {
769 n = -1;
770 } else {
771 if (!fInputAnalysisStorage) {
772 n = fRunInfo->GetNextEvent(targetevt, nullptr);
773 } else {
774 n = fRunInfo->GetNextEvent(targetevt, targettree);
775 }
776 }
777
778#ifdef TIME_MEASUREMENT
779 high_resolution_clock::time_point t2 = high_resolution_clock::now();
780 readTime += (int)duration_cast<microseconds>(t2 - t1).count();
781#endif
782 mutex_write.unlock();
783 return n;
784}
785
793 if (fSortOutputEvents) {
794 // Make sure the thread has the minimum event id in the all the
795 // threads. Otherwise, just wait.
796 while (true) {
797 bool smallest = true;
798 for (TRestThread* otherT : fThreads) {
799 if (otherT->Finished()) {
800 continue;
801 }
802 if (t->GetThreadId() == otherT->GetThreadId()) {
803 continue;
804 }
805 int id1 = t->GetInputEvent()->GetID();
806 int id2 = otherT->GetInputEvent()->GetID();
807 if (id1 > id2) {
808 smallest = false;
809 } else if (id1 == id2) {
810 cout << "warning! Two events with same event id! output events maybe dis-ordered!"
811 << endl;
812 }
813 }
814
815 if (smallest) break;
816 usleep(1);
817 }
818 }
819
820 // Start event saving, entering mutex lock region.
821 mutex_write.lock();
822#ifdef TIME_MEASUREMENT
823 high_resolution_clock::time_point t5 = high_resolution_clock::now();
824#endif
825 if (t->GetOutputEvent() != nullptr) {
826 fOutputEvent = t->GetOutputEvent();
827 // copy address of analysis tree of the given thread
828 // to the local tree, then fill the local tree
829 TObjArray* branchesT;
830 TObjArray* branchesL;
831
832 if (fAnalysisTree != nullptr) {
833 TRestAnalysisTree* remotetree = t->GetAnalysisTree();
834
835 // t->GetAnalysisTree()->SetEventInfo(t->GetOutputEvent());
836 // branchesT = t->GetAnalysisTree()->GetListOfBranches();
837 // branchesL = fAnalysisTree->GetListOfBranches();
838 // for (int i = 0; i < nBranches; i++) {
839 // TBranch* branchT = (TBranch*)branchesT->UncheckedAt(i);
840 // TBranch* branchL = (TBranch*)branchesL->UncheckedAt(i);
841 // branchL->SetAddress(branchT->GetAddress());
842 //}
843 fAnalysisTree->SetEventInfo(fOutputEvent);
844 for (int n = 0; n < remotetree->GetNumberOfObservables(); n++) {
845 fAnalysisTree->SetObservable(n, remotetree->GetObservable(n));
846 }
847
848 fAnalysisTree->Fill();
849 }
850
851 if (fEventTree != nullptr) {
852 // t->GetEventTree()->FillEvent(t->GetOutputEvent());
853 branchesT = t->GetEventTree()->GetListOfBranches();
854 branchesL = fEventTree->GetListOfBranches();
855 for (int i = 0; i < branchesT->GetLast() + 1; i++) {
856 TBranch* branchT = (TBranch*)branchesT->UncheckedAt(i);
857 TBranch* branchL = (TBranch*)branchesL->UncheckedAt(i);
858 branchL->SetAddress(branchT->GetAddress());
859 }
860 fEventTree->Fill();
861 }
862 fProcessedEvents++;
863
864 // cout << fTempOutputDataFile << " " << fTempOutputDataFile->GetEND() << " " <<
865 // fAnalysisTree->GetDirectory() << endl; cout << fAutoSplitFileSize << endl; switch file if file size
866 // reaches target
867 if (fOutputDataFile->GetEND() > fFileSplitSize) {
868 if (fAnalysisTree->GetDirectory() == (TDirectory*)fOutputDataFile) {
869 fNFilesSplit++;
870 cout << "TRestProcessRunner: file size reaches limit (" << fFileSplitSize
871 << " bytes), switching to new file with index " << fNFilesSplit << endl;
872
873 // wait 0.1s for the process to finish
874 usleep(100000);
875 for (auto th : fThreads) {
876 for (int j = 0; j < fProcessNumber; j++) {
877 auto proc = th->GetProcess(j);
878 proc->NotifyAnalysisTreeReset();
879 }
880 }
881
882 fAnalysisTree->AutoSave();
883 fAnalysisTree->Reset();
884
885 if (fEventTree != nullptr) {
886 fEventTree->AutoSave();
887 fEventTree->Reset();
888 }
889
890 // write some information to the first(main) data file
891 fRunInfo->SetNFilesSplit(fNFilesSplit);
892 if (fOutputDataFile->GetName() != fOutputDataFileName) {
893 auto Mainfile = std::unique_ptr<TFile>{TFile::Open(fOutputDataFileName, "update")};
895 Mainfile->Write(0, TObject::kOverwrite);
896 Mainfile->Close();
897 } else {
899 }
900
901 TFile* newfile = new TFile(fOutputDataFileName + "." + ToString(fNFilesSplit), "recreate");
902
903 TBranch* branch = nullptr;
904 fAnalysisTree->SetDirectory(newfile);
905 TIter nextb1(fAnalysisTree->GetListOfBranches());
906 while ((branch = (TBranch*)nextb1())) {
907 branch->SetFile(newfile);
908 }
909 if (fAnalysisTree->GetBranchRef()) {
910 fAnalysisTree->GetBranchRef()->SetFile(newfile);
911 }
912
913 if (fEventTree != nullptr) {
914 fEventTree->SetDirectory(newfile);
915 TIter nextb2(fEventTree->GetListOfBranches());
916 while ((branch = (TBranch*)nextb2())) {
917 branch->SetFile(newfile);
918 }
919 if (fEventTree->GetBranchRef()) {
920 fEventTree->GetBranchRef()->SetFile(newfile);
921 }
922 }
923
924 fOutputDataFile->Write(nullptr, TObject::kOverwrite);
925 fOutputDataFile->Close();
926 delete fOutputDataFile;
927 fOutputDataFile = newfile;
928 } else {
929 RESTError << "internal error!" << RESTendl;
930 }
931 }
932
933#ifdef TIME_MEASUREMENT
934 high_resolution_clock::time_point t6 = high_resolution_clock::now();
935 writeTime += (int)duration_cast<microseconds>(t6 - t5).count();
936#endif
937
938 if (fProcStatus == kStep) {
939 fProcStatus = kPause;
940 cout << "Processed events:" << fProcessedEvents << endl;
941 }
942 }
943 mutex_write.unlock();
944}
945
952 RESTEssential << "Configuring output file, writing metadata and tree objects" << RESTendl;
953#ifdef TIME_MEASUREMENT
954 fProcessInfo["ProcessTime"] = ToString(deltaTime) + "ms";
955#endif
956 // write the last tree
957 fOutputDataFile->cd();
958 if (fEventTree != nullptr) {
959 fEventTree->Write(nullptr, kOverwrite);
960 }
961 if (fAnalysisTree != nullptr) {
962 fAnalysisTree->Write(nullptr, kOverwrite);
963 }
964
965 // close file
966 fOutputDataFile->Write();
967 fOutputDataFile->Close();
968 delete fOutputDataFile;
969
970 // merge process's data file to the main file
971 // we must call this method before writing process metadata,
972 // otherwise there would be problems of streamer missing
973 // https://github.com/rest-for-physics/framework/issues/348
974 fRunInfo->SetNFilesSplit(fNFilesSplit);
976
977 // write metadata
979
980 // Write geometry
981 if (gGeoManager != nullptr) {
982 gGeoManager->Write("Geometry", TObject::kOverwrite);
983 }
984}
985
990 fOutputDataFile->cd();
991
992 this->Write(nullptr, TObject::kWriteDelete);
993
994 if (fRunInfo->GetFileProcess() != nullptr) {
995 fRunInfo->GetFileProcess()->Write(nullptr, kOverwrite);
996 }
997 for (int i = 0; i < fProcessNumber; i++) {
998 fThreads[0]->GetProcess(i)->Write(nullptr, kOverwrite);
999 }
1000}
1001
1007 RESTEssential << "Merging thread files together" << RESTendl;
1008 // add threads file
1009 // processes may have their own TObject output. They are stored in the threads
1010 // file these files are mush smaller that data file, so they are merged to the
1011 // data file.
1012 vector<string> files_to_merge;
1013 for (int i = 0; i < fThreadNumber; i++) {
1014 TFile* f = fThreads[i]->GetOutputFile();
1015 if (f != nullptr) {
1016 f->Write(nullptr, TObject::kOverwrite);
1017 f->Close();
1018 }
1019 files_to_merge.push_back(f->GetName());
1020 }
1021
1023 fOutputDataFile = fRunInfo->MergeToOutputFile(files_to_merge, (string)fOutputDataFileName);
1024 } else {
1025 RESTError << "Output file: " << fOutputDataFileName << " is lost?" << RESTendl;
1026 }
1027}
1028
1029// tools
1034#ifdef TIME_MEASUREMENT
1035 readTime = 0;
1036 writeTime = 0;
1037 deltaTime = 0;
1038#endif
1039 time_t tt = time(nullptr);
1040 fProcessInfo["ProcessDate"] = Split(ToDateTimeString(tt), " ")[0];
1041}
1042
1050 TRestEventProcess* pc = REST_Reflection::Assembly((string)type);
1051 if (pc == nullptr) return nullptr;
1052
1054 pc->SetRunInfo(this->fRunInfo);
1055 pc->SetHostmgr(fHostmgr);
1056 pc->SetObservableValidation(fValidateObservables);
1058
1059 return pc;
1060}
1061
1062double TRestProcessRunner::GetReadingSpeed() {
1063 Long64_t bytes = 0;
1064 for (auto& n : bytesAdded) bytes += n;
1065 double speedbyte = bytes / (double)printInterval * (double)1000000 / ncalculated;
1066 return speedbyte;
1067}
1068
1074 if (fProcStatus == kNormal || fProcStatus == kIgnore) {
1075 // clearLinesAfterCursor();
1076
1077 // TRestStringOutput cout;
1078 // cout.setcolor(COLOR_BOLDWHITE);
1079 // cout.setorientation(0);
1080 // cout.setborder("|");
1081 // CursorDown(1);
1082
1083 double speedbyte = GetReadingSpeed();
1084
1085 double progsum = 0;
1086 for (auto& n : progAdded) progsum += n;
1087 double progspeed = progsum / ncalculated / printInterval * 1000000;
1088
1089 double prog = 0;
1090 if (fRunInfo->GetFeminosDaqTotalEvents() > 0) {
1091 prog = fProcessedEvents / (double)fRunInfo->GetFeminosDaqTotalEvents() * 100;
1092 } else if (fEventsToProcess == REST_MAXIMUM_EVENTS && fRunInfo->GetFileProcess() != nullptr)
1093 // Nevents is unknown, reading external data file
1094 {
1095 prog = fRunInfo->GetBytesRead() / (double)fRunInfo->GetTotalBytes() * 100;
1096 } else if (fRunInfo->GetFileProcess() != nullptr)
1097 // Nevents is known, reading external data file
1098 {
1099 prog = fProcessedEvents / (double)fEventsToProcess * 100;
1100 } else if (fEventsToProcess == REST_MAXIMUM_EVENTS)
1101 // Nevents is unknown, reading root file
1102 {
1103 prog = fRunInfo->GetCurrentEntry() / (double)inputTreeEntries * 100;
1104 } else {
1105 prog = fProcessedEvents / (double)fEventsToProcess * 100;
1106 }
1107
1108 char* buffer = new char[500]();
1109 if (fRunInfo->GetFileProcess() != nullptr && speedbyte > 0) {
1110 sprintf(buffer, "%d Events (%.1fMB/s), ", fProcessedEvents, speedbyte / 1024 / 1024);
1111 } else {
1112 sprintf(buffer, "%d Events, ", fProcessedEvents);
1113 }
1114
1115 string s1(buffer);
1116
1117 if (fProcStatus == kNormal) {
1118 sprintf(buffer, "%.1f min ETA, (Pause: \"p\") ", (100 - progressLast) / progspeed / 60);
1119 } else {
1120 sprintf(buffer, "%.1f min ETA, (Pause Disabled) ", (100 - progressLast) / progspeed / 60);
1121 }
1122 string s2(buffer);
1123
1124 int barlength = 0;
1125 if (REST_Display_CompatibilityMode) {
1126 barlength = 50;
1127 } else {
1128 barlength = Console::GetWidth() - s1.size() - s2.size() - 9;
1129 }
1130 sprintf(buffer, ("%.1f%[" + MakeProgressBar(prog, barlength) + "]").c_str(), prog);
1131 string s3(buffer);
1132
1133 delete[] buffer;
1134
1135 if (REST_Display_CompatibilityMode) {
1136 if (((int)prog) != progressLastPrinted) {
1137 cout << s1 << s2 << s3 << endl;
1138 progressLastPrinted = (int)prog;
1139 }
1141 printf("%s", (s1 + s2 + s3 + "\r").c_str());
1142 fflush(stdout);
1143 }
1144
1145 bytesAdded[positionCalculated] = fRunInfo->GetBytesRead() - bytesReadLast;
1146 bytesReadLast = fRunInfo->GetBytesRead();
1147 progAdded[positionCalculated] = prog - progressLast;
1148 progressLast = prog;
1149
1150 positionCalculated++;
1151 if (positionCalculated >= ncalculated) positionCalculated -= ncalculated;
1152 }
1153}
1154
1158string TRestProcessRunner::MakeProgressBar(int progress100, int length) {
1159 string progressbar(length, '-');
1160 int n = (double)progress100 / 100 * length;
1161 if (n < 0) n = 0;
1162 if (n > length + 1) n = length + 1;
1163 for (int i = 0; i < n; i++) {
1164 progressbar[i] = '=';
1165 }
1166 if (n < length + 1) progressbar[n] = '>';
1167 return progressbar;
1168}
1169
1170TRestEvent* TRestProcessRunner::GetInputEvent() { return fRunInfo->GetInputEvent(); }
1171
1172TRestAnalysisTree* TRestProcessRunner::GetInputAnalysisTree() { return fRunInfo->GetAnalysisTree(); }
1173
1176
1177 string status;
1178 if (fProcStatus == kNormal)
1179 status = "Normal";
1180 else if (fProcStatus == kStopping)
1181 status = "Terminated";
1182 else
1183 status = "Unknown";
1184
1185 RESTMetadata << "Status : " << status << RESTendl;
1186 RESTMetadata << "Processed events : " << fProcessedEvents << RESTendl;
1187 RESTMetadata << "Analysis tree branches : " << fNBranches << RESTendl;
1188 RESTMetadata << "Thread number : " << fThreadNumber << RESTendl;
1189 RESTMetadata << "Processes in each thread : " << fProcessNumber << RESTendl;
1190 RESTMetadata << "File auto split size: " << fFileSplitSize << RESTendl;
1191 RESTMetadata << "File compression level: " << fFileCompression << RESTendl;
1192 RESTMetadata << "******************************************" << RESTendl;
1193 RESTMetadata << RESTendl;
1194 RESTMetadata << RESTendl;
1195}
REST core data-saving helper based on TTree.
void SetObservable(Int_t id, RESTValue obs)
Set the value of observable whose id is as specified.
A base class for any REST event process.
void SetRunInfo(TRestRun *r)
Set TRestRun for this process.
virtual void EndProcess()
To be executed at the end of the run (outside event loop)
Bool_t singleThreadOnly() const
Return whether this process is single std::thread only.
Bool_t isExternal() const
Return whether this process is external process.
void SetParallelProcess(TRestEventProcess *p)
Add parallel process to this process.
virtual RESTValue GetOutputEvent() const =0
Get pointer to output event. Must be implemented in the derived class.
A base class for any REST event.
Definition: TRestEvent.h:38
virtual void PrintEvent() const
Definition: TRestEvent.cxx:187
virtual void Initialize()=0
Definition: TRestEvent.cxx:73
std::map< std::string, std::string > fVariables
Saving a list of rml variables. name-value std::pair.
virtual void PrintMetadata()
Implemented it in the derived metadata class to print out specific metadata information.
endl_t RESTendl
Termination flag object for TRestStringOutput.
TiXmlElement * fElementGlobal
Saving the global element, to be passed to the resident class, if necessary.
TRestStringOutput::REST_Verbose_Level fVerboseLevel
Verbose level used to print debug info.
void SetHostmgr(TRestManager *m)
Set the host manager for this class.
TRestStringOutput::REST_Verbose_Level GetVerboseLevel()
returns the verboselevel in type of REST_Verbose_Level enumerator
Int_t LoadConfigFromElement(TiXmlElement *eSectional, TiXmlElement *eGlobal, std::map< std::string, std::string > envs={})
Main starter method.
void SetConfigFile(std::string configFilename)
set config file path from external
void ReadAllParameters()
Reflection methods, Set value of a datamember in class according to TRestMetadata::fElement.
TRestManager * fHostmgr
All metadata classes can be initialized and managed by TRestManager.
std::string fConfigFileName
Full name of the rml file.
virtual Int_t Write(const char *name=nullptr, Int_t option=0, Int_t bufsize=0)
overwriting the write() method with fStore considered
std::string GetParameter(std::string parName, TiXmlElement *e, TString defaultValue=PARAMETER_NOT_FOUND_STR)
Returns the value for the parameter named parName in the given section.
void SetVerboseLevel(TRestStringOutput::REST_Verbose_Level v)
sets the verbose level
Running the processes efficiently with fantastic display.
TString fOutputDataFileName
the TFile pointer being used
void ReadProcInfo()
Create a process info list which used called by TRestRun::FormFormat().
void WriteProcessesMetadata()
Write process metadata to fOutputDataFile.
void PrintProcessedEvents(Int_t rateE)
Print number of events processed, file read speed, ETA and a progress bar.
void ResetRunTimes()
Reset running time count to 0.
void BeginOfInit()
Reads information from rml config file.
Int_t ReadConfig(const std::string &keydeclare, TiXmlElement *e)
method to deal with iterated child elements
void MergeOutputFile()
Calls TRestRun::MergeOutputFile() to merge the main file with process's tmp file.
void RunProcess()
The main executer of event process.
void EndOfInit()
Ending of the startup procedure.
TRestEventProcess * InstantiateProcess(TString type, TiXmlElement *ele)
InstantiateProcess in sequential start up.
void PauseMenu()
A pause menu providing some functions during the process.
void Initialize() override
REST run class.
std::string MakeProgressBar(int progress100, int length=100)
Make a string of progress bar with given length and percentage.
Int_t GetNextevtFunc(TRestEvent *targetevt, TRestAnalysisTree *targettree)
Get next event and copy it to the address of targetevt.
void PrintMetadata() override
Implemented it in the derived metadata class to print out specific metadata information.
Bool_t fUseTestRun
Number of files being split.
void FillThreadEventFunc(TRestThread *t)
Calling back the FillEvent() method in TRestThread.
void ConfigOutputFile()
Forming an output file.
void ResetEntry()
Reset file reading progress.
Definition: TRestRun.cxx:781
TString FormFormat(const TString &filenameFormat)
Form output file name according to file info list, proc info list and run data.
Definition: TRestRun.cxx:940
TRestEvent * GetInputEvent() const
Calling GetInputEvent() will return a basic TRestEvent*
Definition: TRestRun.h:153
void SetInputEvent(TRestEvent *event)
Retarget input event in the tree.
Definition: TRestRun.cxx:1245
TFile * MergeToOutputFile(std::vector< std::string > fileFullNames, std::string outputFileName="")
Form REST output file by merging a list of files together.
Definition: TRestRun.cxx:985
void SetExtProcess(TRestEventProcess *p)
Set external file process.
Definition: TRestRun.cxx:1203
Int_t GetNextEvent(TRestEvent *targetEvent, TRestAnalysisTree *targetTree)
Get next event by writing event data into target event and target tree.
Definition: TRestRun.cxx:796
REST_Verbose_Level
Enumerate of verbose level, containing five levels.
@ REST_Essential
+show some essential information, as well as warnings
@ REST_Info
+show most of the information for each steps
@ REST_Debug
+show the defined debug messages
@ REST_Silent
show minimum information of the software, as well as error messages
Threaded worker of a process chain.
Definition: TRestThread.h:24
static std::pair< std::string, std::string > SeparatePathAndName(const std::string &fullname)
Separate path and filename in a full path+filename string, returns a pair of string.
Definition: TRestTools.cxx:813
static bool fileExists(const std::string &filename)
Returns true if the file (or directory) with path filename exists.
Definition: TRestTools.cxx:728
static bool isPathWritable(const std::string &path)
Returns true if the path given by argument is writable.
Definition: TRestTools.cxx:778
TRestReflector Assembly(const std::string &typeName)
Assembly an object of type: typeName, returning the allocated memory address and size.
std::vector< std::string > Split(std::string in, std::string separator, bool allowBlankString=false, bool removeWhiteSpaces=false, int startPos=-1)
Split the input string according to the given separator. Returning a vector of fragments.
std::string ToUpper(std::string in)
Convert string to its upper case. Alternative of TString::ToUpper.
Int_t StringToInteger(std::string in)
Gets an integer from a string.
std::string ToDateTimeString(time_t time)
Format time_t into string.