26#include "TRestThread.h"
30#ifdef TIME_MEASUREMENT
32using namespace chrono;
39 fOutputEvent =
nullptr;
40 fInputEvent =
nullptr;
42 fAnalysisTree =
nullptr;
45 fOutputFile =
nullptr;
47 fProcessChain.clear();
51 fCompressionLevel = 1;
66 RESTInfo <<
"thread " << fThreadId <<
": validating process chain" << RESTendl;
69 vector<TRestEventProcess*> processes;
70 for (
unsigned int i = 0; i < fProcessChain.size(); i++) {
71 RESTValue inEvent = fProcessChain[i]->GetInputEvent();
72 RESTValue outEvent = fProcessChain[i]->GetOutputEvent();
74 if (outEvent.
type ==
"TRestEvent" && inEvent.
type ==
"TRestEvent") {
75 RESTInfo <<
"general process: " << fProcessChain[i]->GetName() << RESTendl;
77 }
else if (outEvent.
cl && outEvent.
cl->InheritsFrom(
"TRestEvent") && inEvent.
cl &&
78 inEvent.
cl->InheritsFrom(
"TRestEvent")) {
79 processes.push_back(fProcessChain[i]);
81 RESTError <<
"Process: " << fProcessChain[i]->ClassName()
82 <<
" not properly written, the input/output event is illegal!" << RESTendl;
83 RESTError <<
"Hint: they must be inherited from TRestEvent" << RESTendl;
88 if (processes.size() > 0) {
90 if (input !=
nullptr) {
91 if ((
string)input->ClassName() != processes[0]->GetInputEvent().type) {
92 RESTError <<
"(ValidateChain): Input event type does not match!" << RESTendl;
93 cout <<
"Input type of the first non-external process in chain: "
94 << processes[0]->GetInputEvent().type << endl;
95 cout <<
"The event type from file: " << input->ClassName() << endl;
96 cout <<
"No events will be processed. Please correct event process "
103 for (
unsigned int i = 0; i < processes.size() - 1; i++) {
104 string outEventType = processes[i]->GetOutputEvent().type;
105 string nextinEventType = processes[i + 1]->GetInputEvent().type;
106 if (outEventType != nextinEventType && outEventType !=
"TRestEvent" &&
107 nextinEventType !=
"TRestEvent") {
108 RESTError <<
"(ValidateChain): Event process input/output does not match" << RESTendl;
109 RESTError <<
"The event output for process " << processes[i]->GetName() <<
" is "
110 << outEventType << RESTendl;
111 RESTError <<
"The event input for process " << processes[i + 1]->GetName() <<
" is "
112 << nextinEventType << RESTendl;
113 RESTError <<
"No events will be processed. Please correctly connect the process chain!"
123void TRestThread::SetThreadId(Int_t
id) {
152 RESTDebug <<
"Processing ..." << RESTendl;
153 for (
int i = 0; i < 5; i++) {
155 RESTDebug <<
"Test run " << i <<
" : Input Event ---- " << fInputEvent->ClassName() <<
"("
156 << fInputEvent <<
")" << RESTendl;
157 for (
unsigned int j = 0; j < fProcessChain.size(); j++) {
158 RESTDebug <<
"t" << fThreadId <<
"p" << j <<
": " << fProcessChain[j]->ClassName() << RESTendl;
160 fProcessChain[j]->SetObservableValidation(
true);
162 fProcessChain[j]->BeginOfEventProcess(ProcessedEvent);
163 ProcessedEvent = fProcessChain[j]->ProcessEvent(ProcessedEvent);
164 if (fProcessChain[j]->ApplyCut()) ProcessedEvent =
nullptr;
167 if (ProcessedEvent ==
nullptr) {
168 ProcessedEvent = fProcessChain[j]->GetOutputEvent();
171 if (ProcessedEvent ==
nullptr) {
172 RESTDebug <<
" ---- NULL" << RESTendl;
176 TRestEvent* outputevent = fProcessChain[j]->GetOutputEvent();
177 if (outputevent != ProcessedEvent) {
179 <<
"Test run, in " << fProcessChain[j]->ClassName()
180 <<
" : output event is different with process returned event! Please check to assign "
181 "the TRestEvent datamember as inputEvent in ProcessEvent() method"
185 fProcessChain[j]->EndOfEventProcess();
187 fProcessChain[j]->SetObservableValidation(
false);
189 RESTDebug <<
" .... " << ProcessedEvent->ClassName() <<
"(" << ProcessedEvent <<
")" << RESTendl;
192 fOutputEvent = ProcessedEvent;
193 if (fOutputEvent !=
nullptr) {
194 RESTDebug <<
"Output Event ---- " << fOutputEvent->ClassName() <<
"(" << fOutputEvent <<
")"
198 RESTDebug <<
"Null output, trying again" << RESTendl;
202 if (fOutputEvent ==
nullptr) {
224 RESTDebug <<
"Entering TRestThread::PrepareToProcess" << RESTendl;
226 string threadFileName;
227 if (fHostRunner->GetOutputDataFile()->GetName() == (
string)
"/dev/null") {
228 threadFileName =
"/dev/null";
230 threadFileName = REST_TMP_PATH +
"rest_thread_tmp" + ToString(
this) +
".root";
233 bool outputConfigToDel =
false;
234 if (outputConfig ==
nullptr) {
235 outputConfigToDel =
true;
236 outputConfig =
new bool[4];
237 for (
int i = 0; i < 4; i++) {
238 outputConfig[i] =
true;
241 if (outputConfig[3] ==
false) {
242 cout <<
"Error! output analysis must be on!" << endl;
246 if (fProcessChain.size() > 0) {
247 RESTDebug <<
"TRestThread: Creating file : " << threadFileName << RESTendl;
248 fOutputFile =
new TFile(threadFileName.c_str(),
"recreate");
249 fOutputFile->SetCompressionLevel(fCompressionLevel);
250 fAnalysisTree =
new TRestAnalysisTree(
"AnalysisTree_" + ToString(fThreadId),
"dummyTree");
253 RESTDebug <<
"TRestThread: Finding first input event of process chain..." << RESTendl;
254 if (fHostRunner->GetInputEvent() ==
nullptr) {
256 <<
"Input event is not initialized from TRestRun! Please check your input file and file "
261 fInputEvent = (
TRestEvent*)fHostRunner->GetInputEvent()->Clone();
262 string chainInputType = fProcessChain[0]->GetInputEvent().type;
264 if (chainInputType !=
"TRestEvent" && chainInputType != (
string)fInputEvent->ClassName()) {
265 cout <<
"REST ERROR: Input event type does not match!" << endl;
266 cout <<
"Process input type : " << chainInputType
267 <<
", File input type : " << fInputEvent->ClassName() << endl;
271 RESTDebug <<
"TRestThread: Reading input event and input observable..." << RESTendl;
272 if (fHostRunner->
GetNextevtFunc(fInputEvent, fAnalysisTree) != 0) {
273 RESTError <<
"In thread " << fThreadId <<
")::Failed to read input event, process cannot start!"
278 RESTDebug <<
"TRestThread: Init process..." << RESTendl;
279 for (
unsigned int i = 0; i < fProcessChain.size(); i++) {
280 fProcessChain[i]->SetAnalysisTree(fAnalysisTree);
281 for (
unsigned int j = 0; j < fProcessChain.size(); j++) {
282 fProcessChain[i]->SetFriendProcess(fProcessChain[j]);
284 RESTDebug <<
"InitProcess() process for " << fProcessChain[i]->ClassName() << RESTendl;
285 fProcessChain[i]->InitProcess();
289 if (fHostRunner->UseTestRun()) {
290 RESTDebug <<
"Test Run..." << RESTendl;
292 RESTError <<
"In thread " << fThreadId <<
")::test run failed!" << RESTendl;
293 RESTError <<
"One of the processes has NULL pointer fOutputEvent!" << RESTendl;
295 RESTError <<
"To see more detail, turn on debug mode for "
296 "TRestProcessRunner!"
300 RESTDebug <<
"Test Run complete!" << RESTendl;
302 RESTDebug <<
"Initializing output event" << RESTendl;
303 string chainOutputType = fProcessChain[fProcessChain.size() - 1]->GetOutputEvent().type;
305 if (fOutputEvent ==
nullptr) {
312 fEventTree =
new TTree((TString)
"EventTree_" + ToString(fThreadId),
"dummyTree");
313 vector<pair<TString, TRestEvent*>> branchesToAdd;
317 if (outputConfig[1] ==
true) {
323 TString BranchName = (TString)evt->GetName() +
"Branch";
324 if (branchesToAdd.size() == 0)
325 branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
327 for (
unsigned int j = 0; j < branchesToAdd.size(); j++) {
328 if (branchesToAdd[j].first == BranchName)
329 branchesToAdd[j].second = evt;
330 else if (j == branchesToAdd.size() - 1)
331 branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
335 for (
unsigned int i = 0; i < fProcessChain.size(); i++) {
336 TRestEvent* evt = fProcessChain[i]->GetOutputEvent();
337 if (evt !=
nullptr) {
338 TString BranchName = (TString)evt->GetName() +
"Branch";
339 if (branchesToAdd.size() == 0)
340 branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
342 for (
unsigned int j = 0; j < branchesToAdd.size(); j++) {
343 if (branchesToAdd[j].first == BranchName)
344 branchesToAdd[j].second = evt;
345 else if (j == branchesToAdd.size() - 1)
346 branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
352 if (outputConfig[2] ==
true) {
356 TString BranchName = (TString)evt->GetName() +
"Branch";
357 if (branchesToAdd.size() == 0)
358 branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
360 for (
unsigned int j = 0; j < branchesToAdd.size(); j++) {
361 if (branchesToAdd[j].first == BranchName)
362 branchesToAdd[j].second = evt;
363 else if (j == branchesToAdd.size() - 1)
364 branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
369 auto iter = branchesToAdd.begin();
370 while (iter != branchesToAdd.end()) {
371 fEventTree->Branch(iter->first, iter->second->ClassName(), iter->second);
375 if (fEventTree->GetListOfBranches()->GetLast() < 0)
378 fEventTree =
nullptr;
386 fOutputFile->Clear();
387 for (
unsigned int i = 0; i < fProcessChain.size(); i++) {
388 fProcessChain[i]->InitProcess();
391 RESTDebug <<
"Thread " << fThreadId <<
" Ready!" << RESTendl;
393 string tmp = fHostRunner->GetInputEvent()->ClassName();
395 fOutputEvent = fInputEvent;
396 fOutputFile =
new TFile(threadFileName.c_str(),
"recreate");
397 fOutputFile->SetCompressionLevel(fCompressionLevel);
400 RESTDebug <<
"Creating Analysis Tree..." << RESTendl;
401 fAnalysisTree =
new TRestAnalysisTree(
"AnalysisTree_" + ToString(fThreadId),
"dummyTree");
403 fEventTree =
new TTree((TString)
"EventTree_" + ToString(fThreadId),
"dummyTree");
406 if (outputConfig[2]) {
407 TString BranchName = (TString)fInputEvent->GetName() +
"Branch";
408 if (fEventTree->GetBranch(BranchName) ==
nullptr)
410 fEventTree->Branch(BranchName, fInputEvent->ClassName(), fInputEvent);
415 if (outputConfigToDel)
delete outputConfig;
436 while (fHostRunner->
GetNextevtFunc(fInputEvent, fAnalysisTree) == 0) {
463 RESTDebug <<
"Entering TRestThread::AddProcess" << RESTendl;
465 fProcessChain.push_back(process);
485 fProcessNullReturned =
false;
488#ifdef TIME_MEASUREMENT
489 vector<int> processtime(fProcessChain.size());
492 for (
unsigned int j = 0; j < fProcessChain.size(); j++) {
493 cout <<
"------- Starting process " + (string)fProcessChain[j]->GetName() +
" -------" << endl;
495#ifdef TIME_MEASUREMENT
496 high_resolution_clock::time_point t1 = high_resolution_clock::now();
499 fProcessChain[j]->BeginOfEventProcess(ProcessedEvent);
500 ProcessedEvent = fProcessChain[j]->ProcessEvent(ProcessedEvent);
501 if (fProcessChain[j]->ApplyCut()) ProcessedEvent =
nullptr;
502 fProcessChain[j]->EndOfEventProcess();
504#ifdef TIME_MEASUREMENT
505 high_resolution_clock::time_point t2 = high_resolution_clock::now();
506 processtime[j] = (int)duration_cast<microseconds>(t2 - t1).count();
509 if (ProcessedEvent ==
nullptr) {
510 cout <<
"------- End of process " + (string)fProcessChain[j]->GetName() +
511 " (NULL returned) -------"
513 fProcessNullReturned =
true;
516 cout <<
"------- End of process " + (string)fProcessChain[j]->GetName() +
" -------" << endl;
520 j < fProcessChain.size() - 1) {
525#ifdef TIME_MEASUREMENT
526 cout <<
"Process timing summary : " << endl;
527 for (
unsigned int j = 0; j < fProcessChain.size(); j++) {
528 cout << fProcessChain[j]->ClassName() <<
"(" << fProcessChain[j]->GetName()
529 <<
") : " << processtime[j] / (double)1000 <<
" ms" << endl;
533 if (fHostRunner->UseTestRun()) {
534 fOutputEvent = ProcessedEvent;
536 cout <<
"Transferring output event..." << endl;
537 if (ProcessedEvent !=
nullptr) {
538 ProcessedEvent->
CloneTo(fOutputEvent);
543 "======= End of Event " +
544 ((fOutputEvent ==
nullptr) ? ToString(fInputEvent->GetID()) : ToString(fOutputEvent->GetID())) +
547 for (
unsigned int j = 0; j < fProcessChain.size(); j++) {
548 fProcessChain[j]->BeginOfEventProcess(ProcessedEvent);
549 ProcessedEvent = fProcessChain[j]->ProcessEvent(ProcessedEvent);
550 if (fProcessChain[j]->ApplyCut()) ProcessedEvent =
nullptr;
551 fProcessChain[j]->EndOfEventProcess();
552 if (ProcessedEvent ==
nullptr) {
553 fProcessNullReturned =
true;
558 if (fHostRunner->UseTestRun()) {
559 fOutputEvent = ProcessedEvent;
561 if (ProcessedEvent !=
nullptr) {
562 ProcessedEvent->
CloneTo(fOutputEvent);
574 RESTInfo <<
"TRestThread : Writing temp file. Thread id : " << fThreadId << RESTendl;
576 if (fOutputFile ==
nullptr)
return;
581 for (
unsigned int i = 0; i < fProcessChain.size(); i++) {
583 fProcessChain[i]->EndProcess();
584 if (fProcessChain[i]->GetError()) nErrors++;
585 if (fProcessChain[i]->GetWarning()) nWarnings++;
589 RESTWarning << nWarnings
590 <<
" process warnings were found! Use run0->PrintWarnings(); to get additional info."
593 RESTError << nErrors <<
" process errors were found! Use run0->PrintErrors(); to get additional info."
596 delete fAnalysisTree;
std::string type
Type of the wrapped object.
TClass * cl
Pointer to the corresponding TClass helper, if the wrapped object is in class type.
REST core data-saving helper based on TTree.
void DisableQuickObservableValueSetting()
It will disable quick observable value setting.
A base class for any REST event process.
A base class for any REST event.
virtual void CloneTo(TRestEvent *target)
Clone the content of this TRestEvent object to another.
Int_t GetNextevtFunc(TRestEvent *targetevt, TRestAnalysisTree *targettree)
Get next event and copy it to the address of targetevt.
void FillThreadEventFunc(TRestThread *t)
Calling back the FillEvent() method in TRestThread.
@ REST_Essential
+show some essential information, as well as warnings
@ REST_Extreme
show everything
@ REST_Debug
+show the defined debug messages
void AddProcess(TRestEventProcess *process)
Add a process.
bool TestRun()
Make a test run of our process chain.
void Initialize()
Set variables by default during initialization.
void StartProcess()
The main function of this class. Thread will run this function until the end.
void EndProcess()
Write and close the output file.
void ProcessEvent()
Process a single event.
void PrepareToProcess(bool *outputConfig=nullptr)
Prepare some thing before we can start process.
Int_t ValidateChain(TRestEvent *input)
Check if the input/output of each process in the process chain matches.
void StartThread()
Create a thread with the method StartProcess().
TRestReflector Assembly(const std::string &typeName)
Assembly an object of type: typeName, returning the allocated memory address and size.
Int_t GetChar(std::string hint="Press a KEY to continue ...")
Helps to pause the program, printing a message before pausing.