REST-for-Physics  v2.3
Rare Event Searches ToolKit for Physics
TRestThread.cxx
1
25
26#include "TRestThread.h"
27
28using namespace std;
29
30#ifdef TIME_MEASUREMENT
31#include <chrono>
32using namespace chrono;
33#endif
34
39 fOutputEvent = nullptr;
40 fInputEvent = nullptr;
41
42 fAnalysisTree = nullptr;
43 fEventTree = nullptr;
44
45 fOutputFile = nullptr;
46
47 fProcessChain.clear();
48
49 isFinished = false;
50
51 fCompressionLevel = 1;
53}
54
66 RESTInfo << "thread " << fThreadId << ": validating process chain" << RESTendl;
67
68 // add the non-general processes to list for validation
69 vector<TRestEventProcess*> processes;
70 for (unsigned int i = 0; i < fProcessChain.size(); i++) {
71 RESTValue inEvent = fProcessChain[i]->GetInputEvent();
72 RESTValue outEvent = fProcessChain[i]->GetOutputEvent();
73
74 if (outEvent.type == "TRestEvent" && inEvent.type == "TRestEvent") {
75 RESTInfo << "general process: " << fProcessChain[i]->GetName() << RESTendl;
76 continue;
77 } else if (outEvent.cl && outEvent.cl->InheritsFrom("TRestEvent") && inEvent.cl &&
78 inEvent.cl->InheritsFrom("TRestEvent")) {
79 processes.push_back(fProcessChain[i]);
80 } else {
81 RESTError << "Process: " << fProcessChain[i]->ClassName()
82 << " not properly written, the input/output event is illegal!" << RESTendl;
83 RESTError << "Hint: they must be inherited from TRestEvent" << RESTendl;
84 abort();
85 }
86 }
87
88 if (processes.size() > 0) {
89 // verify that the input event of first process is OK
90 if (input != nullptr) {
91 if ((string)input->ClassName() != processes[0]->GetInputEvent().type) {
92 RESTError << "(ValidateChain): Input event type does not match!" << RESTendl;
93 cout << "Input type of the first non-external process in chain: "
94 << processes[0]->GetInputEvent().type << endl;
95 cout << "The event type from file: " << input->ClassName() << endl;
96 cout << "No events will be processed. Please correct event process "
97 "input/output."
98 << endl;
99 }
100 }
101
102 // verify that the output event type is good to be the input event of the next process
103 for (unsigned int i = 0; i < processes.size() - 1; i++) {
104 string outEventType = processes[i]->GetOutputEvent().type;
105 string nextinEventType = processes[i + 1]->GetInputEvent().type;
106 if (outEventType != nextinEventType && outEventType != "TRestEvent" &&
107 nextinEventType != "TRestEvent") {
108 RESTError << "(ValidateChain): Event process input/output does not match" << RESTendl;
109 RESTError << "The event output for process " << processes[i]->GetName() << " is "
110 << outEventType << RESTendl;
111 RESTError << "The event input for process " << processes[i + 1]->GetName() << " is "
112 << nextinEventType << RESTendl;
113 RESTError << "No events will be processed. Please correctly connect the process chain!"
114 << RESTendl;
115 GetChar();
116 return -1;
117 }
118 }
119 }
120 return 0;
121}
122
123void TRestThread::SetThreadId(Int_t id) {
124 fThreadId = id;
125 if (fThreadId != 0 && fVerboseLevel > TRestStringOutput::REST_Verbose_Level::REST_Essential)
127}
128
152 RESTDebug << "Processing ..." << RESTendl;
153 for (int i = 0; i < 5; i++) {
154 TRestEvent* ProcessedEvent = fInputEvent;
155 RESTDebug << "Test run " << i << " : Input Event ---- " << fInputEvent->ClassName() << "("
156 << fInputEvent << ")" << RESTendl;
157 for (unsigned int j = 0; j < fProcessChain.size(); j++) {
158 RESTDebug << "t" << fThreadId << "p" << j << ": " << fProcessChain[j]->ClassName() << RESTendl;
159
160 fProcessChain[j]->SetObservableValidation(true);
161
162 fProcessChain[j]->BeginOfEventProcess(ProcessedEvent);
163 ProcessedEvent = fProcessChain[j]->ProcessEvent(ProcessedEvent);
164 if (fProcessChain[j]->ApplyCut()) ProcessedEvent = nullptr;
165 // if the output of ProcessEvent() is NULL we assume the event is cut.
166 // we try to use GetOutputEvent()
167 if (ProcessedEvent == nullptr) {
168 ProcessedEvent = fProcessChain[j]->GetOutputEvent();
169 }
170 // if still null we perform another try
171 if (ProcessedEvent == nullptr) {
172 RESTDebug << " ---- NULL" << RESTendl;
173 break;
174 }
175 // check if the output event is same as the processed event
176 TRestEvent* outputevent = fProcessChain[j]->GetOutputEvent();
177 if (outputevent != ProcessedEvent) {
178 RESTWarning
179 << "Test run, in " << fProcessChain[j]->ClassName()
180 << " : output event is different with process returned event! Please check to assign "
181 "the TRestEvent datamember as inputEvent in ProcessEvent() method"
182 << RESTendl;
183 }
184
185 fProcessChain[j]->EndOfEventProcess();
186
187 fProcessChain[j]->SetObservableValidation(false);
188
189 RESTDebug << " .... " << ProcessedEvent->ClassName() << "(" << ProcessedEvent << ")" << RESTendl;
190 }
191
192 fOutputEvent = ProcessedEvent;
193 if (fOutputEvent != nullptr) {
194 RESTDebug << "Output Event ---- " << fOutputEvent->ClassName() << "(" << fOutputEvent << ")"
195 << RESTendl;
196 break;
197 } else {
198 RESTDebug << "Null output, trying again" << RESTendl;
199 }
200 fHostRunner->GetNextevtFunc(fInputEvent, fAnalysisTree);
201 }
202 if (fOutputEvent == nullptr) {
203 // fOutputEvent = fProcessChain[fProcessChain.size() - 1]->GetOutputEvent();
204 return false;
205 }
206 return true;
207}
208
223void TRestThread::PrepareToProcess(bool* outputConfig) {
224 RESTDebug << "Entering TRestThread::PrepareToProcess" << RESTendl;
225
226 string threadFileName;
227 if (fHostRunner->GetOutputDataFile()->GetName() == (string) "/dev/null") {
228 threadFileName = "/dev/null";
229 } else {
230 threadFileName = REST_TMP_PATH + "rest_thread_tmp" + ToString(this) + ".root";
231 }
232
233 bool outputConfigToDel = false;
234 if (outputConfig == nullptr) {
235 outputConfigToDel = true;
236 outputConfig = new bool[4];
237 for (int i = 0; i < 4; i++) {
238 outputConfig[i] = true;
239 }
240 }
241 if (outputConfig[3] == false) {
242 cout << "Error! output analysis must be on!" << endl;
243 exit(1);
244 }
245
246 if (fProcessChain.size() > 0) {
247 RESTDebug << "TRestThread: Creating file : " << threadFileName << RESTendl;
248 fOutputFile = new TFile(threadFileName.c_str(), "recreate");
249 fOutputFile->SetCompressionLevel(fCompressionLevel);
250 fAnalysisTree = new TRestAnalysisTree("AnalysisTree_" + ToString(fThreadId), "dummyTree");
251 fAnalysisTree->DisableQuickObservableValueSetting();
252
253 RESTDebug << "TRestThread: Finding first input event of process chain..." << RESTendl;
254 if (fHostRunner->GetInputEvent() == nullptr) {
255 RESTError
256 << "Input event is not initialized from TRestRun! Please check your input file and file "
257 "reading process!"
258 << RESTendl;
259 exit(1);
260 }
261 fInputEvent = (TRestEvent*)fHostRunner->GetInputEvent()->Clone();
262 string chainInputType = fProcessChain[0]->GetInputEvent().type;
263 // we check if the process chain input type matches fInputEvent
264 if (chainInputType != "TRestEvent" && chainInputType != (string)fInputEvent->ClassName()) {
265 cout << "REST ERROR: Input event type does not match!" << endl;
266 cout << "Process input type : " << chainInputType
267 << ", File input type : " << fInputEvent->ClassName() << endl;
268 exit(1);
269 }
270
271 RESTDebug << "TRestThread: Reading input event and input observable..." << RESTendl;
272 if (fHostRunner->GetNextevtFunc(fInputEvent, fAnalysisTree) != 0) {
273 RESTError << "In thread " << fThreadId << ")::Failed to read input event, process cannot start!"
274 << RESTendl;
275 exit(1);
276 }
277
278 RESTDebug << "TRestThread: Init process..." << RESTendl;
279 for (unsigned int i = 0; i < fProcessChain.size(); i++) {
280 fProcessChain[i]->SetAnalysisTree(fAnalysisTree);
281 for (unsigned int j = 0; j < fProcessChain.size(); j++) {
282 fProcessChain[i]->SetFriendProcess(fProcessChain[j]);
283 }
284 RESTDebug << "InitProcess() process for " << fProcessChain[i]->ClassName() << RESTendl;
285 fProcessChain[i]->InitProcess();
286 }
287
288 // test run
289 if (fHostRunner->UseTestRun()) {
290 RESTDebug << "Test Run..." << RESTendl;
291 if (!TestRun()) {
292 RESTError << "In thread " << fThreadId << ")::test run failed!" << RESTendl;
293 RESTError << "One of the processes has NULL pointer fOutputEvent!" << RESTendl;
295 RESTError << "To see more detail, turn on debug mode for "
296 "TRestProcessRunner!"
297 << RESTendl;
298 exit(1);
299 }
300 RESTDebug << "Test Run complete!" << RESTendl;
301 } else {
302 RESTDebug << "Initializing output event" << RESTendl;
303 string chainOutputType = fProcessChain[fProcessChain.size() - 1]->GetOutputEvent().type;
304 fOutputEvent = REST_Reflection::Assembly(chainOutputType);
305 if (fOutputEvent == nullptr) {
306 exit(1);
307 }
308 }
309
311 // create dummy tree to store events
312 fEventTree = new TTree((TString) "EventTree_" + ToString(fThreadId), "dummyTree");
313 vector<pair<TString, TRestEvent*>> branchesToAdd;
314 // avoid duplicated branch
315 // if event type is same, we only create branch for the last of this type
316 // event
317 if (outputConfig[1] == true) {
318 // outputConfig[1]: the input events of each processes in the process chain(can be many)
319
320 // the input event of process chain
321 TRestEvent* evt = fInputEvent;
322 {
323 TString BranchName = (TString)evt->GetName() + "Branch";
324 if (branchesToAdd.size() == 0)
325 branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
326 else
327 for (unsigned int j = 0; j < branchesToAdd.size(); j++) {
328 if (branchesToAdd[j].first == BranchName)
329 branchesToAdd[j].second = evt;
330 else if (j == branchesToAdd.size() - 1)
331 branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
332 }
333 }
334 // the input event of other processes
335 for (unsigned int i = 0; i < fProcessChain.size(); i++) {
336 TRestEvent* evt = fProcessChain[i]->GetOutputEvent();
337 if (evt != nullptr) {
338 TString BranchName = (TString)evt->GetName() + "Branch";
339 if (branchesToAdd.size() == 0)
340 branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
341 else
342 for (unsigned int j = 0; j < branchesToAdd.size(); j++) {
343 if (branchesToAdd[j].first == BranchName)
344 branchesToAdd[j].second = evt;
345 else if (j == branchesToAdd.size() - 1)
346 branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
347 }
348 }
349 }
350 }
351
352 if (outputConfig[2] == true) {
353 // outputConfig[2]: output event: the output event of the last process in the process chain
354 TRestEvent* evt = fOutputEvent;
355 {
356 TString BranchName = (TString)evt->GetName() + "Branch";
357 if (branchesToAdd.size() == 0)
358 branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
359 else
360 for (unsigned int j = 0; j < branchesToAdd.size(); j++) {
361 if (branchesToAdd[j].first == BranchName)
362 branchesToAdd[j].second = evt;
363 else if (j == branchesToAdd.size() - 1)
364 branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
365 }
366 }
367 }
368
369 auto iter = branchesToAdd.begin();
370 while (iter != branchesToAdd.end()) {
371 fEventTree->Branch(iter->first, iter->second->ClassName(), iter->second);
372 iter++;
373 }
374
375 if (fEventTree->GetListOfBranches()->GetLast() < 0) // if no event branches are created
376 {
377 delete fEventTree;
378 fEventTree = nullptr;
379 }
380
381 // fAnalysisTree->CreateBranches();
382
383 // create output temp file for process-defined output object
384
385 fOutputFile->cd();
386 fOutputFile->Clear();
387 for (unsigned int i = 0; i < fProcessChain.size(); i++) {
388 fProcessChain[i]->InitProcess();
389 }
390
391 RESTDebug << "Thread " << fThreadId << " Ready!" << RESTendl;
392 } else {
393 string tmp = fHostRunner->GetInputEvent()->ClassName();
394 fInputEvent = REST_Reflection::Assembly(tmp);
395 fOutputEvent = fInputEvent;
396 fOutputFile = new TFile(threadFileName.c_str(), "recreate");
397 fOutputFile->SetCompressionLevel(fCompressionLevel);
398 fOutputFile->cd();
399
400 RESTDebug << "Creating Analysis Tree..." << RESTendl;
401 fAnalysisTree = new TRestAnalysisTree("AnalysisTree_" + ToString(fThreadId), "dummyTree");
402 fAnalysisTree->DisableQuickObservableValueSetting();
403 fEventTree = new TTree((TString) "EventTree_" + ToString(fThreadId), "dummyTree");
404 // fEventTree->CreateEventBranches();
405
406 if (outputConfig[2]) {
407 TString BranchName = (TString)fInputEvent->GetName() + "Branch";
408 if (fEventTree->GetBranch(BranchName) == nullptr) // avoid duplicated branch
409 {
410 fEventTree->Branch(BranchName, fInputEvent->ClassName(), fInputEvent);
411 }
412 }
413 // currently, external process analysis is not supported!
414 }
415 if (outputConfigToDel) delete outputConfig;
416}
417
434 isFinished = false;
435
436 while (fHostRunner->GetNextevtFunc(fInputEvent, fAnalysisTree) == 0) {
437 ProcessEvent();
438 /*if (fOutputEvent != nullptr) */ fHostRunner->FillThreadEventFunc(this);
439 }
440
441 // fHostRunner->WriteThreadFileFunc(this);
442 isFinished = true;
443}
444
450 t = thread(&TRestThread::StartProcess, this);
451 t.detach();
452 // t.join();
453}
454
463 RESTDebug << "Entering TRestThread::AddProcess" << RESTendl;
464
465 fProcessChain.push_back(process);
466 // if (Console::CompatibilityMode && process->GetVerboseLevel() >= REST_Debug) {
467 // warning << "REST WARNING! Cannot use \"debug\" output level for process " << process->GetName()
468 // << endl;
469 // process->SetVerboseLevel(REST_Info);
470 //}
471 if (process->GetVerboseLevel() > fVerboseLevel) {
472 SetVerboseLevel(process->GetVerboseLevel());
473 }
474}
475
484 TRestEvent* ProcessedEvent = fInputEvent;
485 fProcessNullReturned = false;
486
488#ifdef TIME_MEASUREMENT
489 vector<int> processtime(fProcessChain.size());
490#endif
491
492 for (unsigned int j = 0; j < fProcessChain.size(); j++) {
493 cout << "------- Starting process " + (string)fProcessChain[j]->GetName() + " -------" << endl;
494
495#ifdef TIME_MEASUREMENT
496 high_resolution_clock::time_point t1 = high_resolution_clock::now();
497#endif
498
499 fProcessChain[j]->BeginOfEventProcess(ProcessedEvent);
500 ProcessedEvent = fProcessChain[j]->ProcessEvent(ProcessedEvent);
501 if (fProcessChain[j]->ApplyCut()) ProcessedEvent = nullptr;
502 fProcessChain[j]->EndOfEventProcess();
503
504#ifdef TIME_MEASUREMENT
505 high_resolution_clock::time_point t2 = high_resolution_clock::now();
506 processtime[j] = (int)duration_cast<microseconds>(t2 - t1).count();
507#endif
508
509 if (ProcessedEvent == nullptr) {
510 cout << "------- End of process " + (string)fProcessChain[j]->GetName() +
511 " (NULL returned) -------"
512 << endl;
513 fProcessNullReturned = true;
514 break;
515 } else {
516 cout << "------- End of process " + (string)fProcessChain[j]->GetName() + " -------" << endl;
517 }
518
519 if (fProcessChain[j]->GetVerboseLevel() >= TRestStringOutput::REST_Verbose_Level::REST_Extreme &&
520 j < fProcessChain.size() - 1) {
521 GetChar();
522 }
523 }
524
525#ifdef TIME_MEASUREMENT
526 cout << "Process timing summary : " << endl;
527 for (unsigned int j = 0; j < fProcessChain.size(); j++) {
528 cout << fProcessChain[j]->ClassName() << "(" << fProcessChain[j]->GetName()
529 << ") : " << processtime[j] / (double)1000 << " ms" << endl;
530 }
531#endif
532
533 if (fHostRunner->UseTestRun()) {
534 fOutputEvent = ProcessedEvent;
535 } else {
536 cout << "Transferring output event..." << endl;
537 if (ProcessedEvent != nullptr) {
538 ProcessedEvent->CloneTo(fOutputEvent);
539 }
540 }
541
542 GetChar(
543 "======= End of Event " +
544 ((fOutputEvent == nullptr) ? ToString(fInputEvent->GetID()) : ToString(fOutputEvent->GetID())) +
545 " =======");
546 } else {
547 for (unsigned int j = 0; j < fProcessChain.size(); j++) {
548 fProcessChain[j]->BeginOfEventProcess(ProcessedEvent);
549 ProcessedEvent = fProcessChain[j]->ProcessEvent(ProcessedEvent);
550 if (fProcessChain[j]->ApplyCut()) ProcessedEvent = nullptr;
551 fProcessChain[j]->EndOfEventProcess();
552 if (ProcessedEvent == nullptr) {
553 fProcessNullReturned = true;
554 break;
555 }
556 }
557
558 if (fHostRunner->UseTestRun()) {
559 fOutputEvent = ProcessedEvent;
560 } else {
561 if (ProcessedEvent != nullptr) {
562 ProcessedEvent->CloneTo(fOutputEvent);
563 }
564 }
565 }
566}
567
574 RESTInfo << "TRestThread : Writing temp file. Thread id : " << fThreadId << RESTendl;
575
576 if (fOutputFile == nullptr) return;
577
578 fOutputFile->cd();
579 Int_t nErrors = 0;
580 Int_t nWarnings = 0;
581 for (unsigned int i = 0; i < fProcessChain.size(); i++) {
582 // The processes must call object->Write in this method
583 fProcessChain[i]->EndProcess();
584 if (fProcessChain[i]->GetError()) nErrors++;
585 if (fProcessChain[i]->GetWarning()) nWarnings++;
586 }
587
588 if (nWarnings)
589 RESTWarning << nWarnings
590 << " process warnings were found! Use run0->PrintWarnings(); to get additional info."
591 << RESTendl;
592 if (nErrors)
593 RESTError << nErrors << " process errors were found! Use run0->PrintErrors(); to get additional info."
594 << RESTendl;
595
596 delete fAnalysisTree;
597}
std::string type
Type of the wrapped object.
TClass * cl
Pointer to the corresponding TClass helper, if the wrapped object is in class type.
REST core data-saving helper based on TTree.
void DisableQuickObservableValueSetting()
It will disable quick observable value setting.
A base class for any REST event process.
A base class for any REST event.
Definition: TRestEvent.h:38
virtual void CloneTo(TRestEvent *target)
Clone the content of this TRestEvent object to another.
Definition: TRestEvent.cxx:98
TRestStringOutput::REST_Verbose_Level GetVerboseLevel()
returns the verboselevel in type of REST_Verbose_Level enumerator
Int_t GetNextevtFunc(TRestEvent *targetevt, TRestAnalysisTree *targettree)
Get next event and copy it to the address of targetevt.
void FillThreadEventFunc(TRestThread *t)
Calling back the FillEvent() method in TRestThread.
@ REST_Essential
+show some essential information, as well as warnings
@ REST_Debug
+show the defined debug messages
void AddProcess(TRestEventProcess *process)
Add a process.
bool TestRun()
Make a test run of our process chain.
void Initialize()
Set variables by default during initialization.
Definition: TRestThread.cxx:38
void StartProcess()
The main function of this class. Thread will run this function until the end.
void EndProcess()
Write and close the output file.
void ProcessEvent()
Process a single event.
void PrepareToProcess(bool *outputConfig=nullptr)
Prepare some thing before we can start process.
Int_t ValidateChain(TRestEvent *input)
Check if the input/output of each process in the process chain matches.
Definition: TRestThread.cxx:65
void StartThread()
Create a thread with the method StartProcess().
TRestReflector Assembly(const std::string &typeName)
Assembly an object of type: typeName, returning the allocated memory address and size.
Int_t GetChar(std::string hint="Press a KEY to continue ...")
Helps to pause the program, printing a message before pausing.