REST-for-Physics  v2.3
Rare Event Searches ToolKit for Physics
TRestMessenger.cxx
1/*************************************************************************
2 * This file is part of the REST software framework. *
3 * *
4 * Copyright (C) 2016 GIFNA/TREX (University of Zaragoza) *
5 * For more information see http://gifna.unizar.es/trex *
6 * *
7 * REST is free software: you can redistribute it and/or modify *
8 * it under the terms of the GNU General Public License as published by *
9 * the Free Software Foundation, either version 3 of the License, or *
10 * (at your option) any later version. *
11 * *
12 * REST is distributed in the hope that it will be useful, *
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
15 * GNU General Public License for more details. *
16 * *
17 * You should have a copy of the GNU General Public License along with *
18 * REST in $REST_PATH/LICENSE. *
19 * If not, see http://www.gnu.org/licenses/. *
20 * For the list of contributors see $REST_PATH/CREDITS. *
21 *************************************************************************/
22
70
71#include "TRestMessenger.h"
72
73#include <sys/ipc.h>
74#include <sys/shm.h>
75
76#ifdef __APPLE__
77#include <unistd.h>
78#endif
79
80#include "TRestDataBase.h"
81#include "TRestManager.h"
82#include "TRestProcessRunner.h"
83#include "TRestStringOutput.h"
84
85using namespace std;
86ClassImp(TRestMessenger);
87
88TRestMessenger::TRestMessenger() { Initialize(); }
89
90TRestMessenger::~TRestMessenger() {
91 // clear the shared memories
92 if (fMessagePool != nullptr) {
93 if (fMessagePool->owner == this) {
94 unlock(fMessagePool);
95 }
96 shmdt(fMessagePool);
97 }
98}
99
100TRestMessenger::TRestMessenger(int token, string mode) {
101 Initialize();
103 StringToElement("<TRestMessenger token=\"" + ToString(token) + "\" mode=\"" + mode + "\"/>"), nullptr,
104 {});
105}
106
108 fRun = nullptr;
109 fMode = MessagePool_TwoWay;
110}
111
112#define SHMFLAG_CREATEUNIQUE (0640 | IPC_CREAT | IPC_EXCL)
113#define SHMFLAG_CREATEOROPEN (0640 | IPC_CREAT)
114#define SHMFLAG_OPEN (0640)
115
116// Example rml structure:
117// <TRestMessenger name="Messager" title="Example" verboseLevel="info"
118// messageSource="outputfile" token="116027" mode="auto"/>
120 fRun = fHostmgr != nullptr ? fHostmgr->GetRunInfo() : nullptr;
121 string modestr = GetParameter("mode", "twoway");
122 if (ToUpper(modestr) == "HOST") {
123 fMode = MessagePool_Host;
124 } else if (ToUpper(modestr) == "CLIENT") {
125 fMode = MessagePool_Client;
126 } else if (ToUpper(modestr) == "TWOWAY" || ToUpper(modestr) == "AUTO") {
127 fMode = MessagePool_TwoWay;
128 }
129
130 string token = GetParameter("token", "116027");
131 string source = GetParameter("messageSource", "OUTPUTFILE");
132 key_t key = StringToInteger(token);
133 /*
134int flag = 0;
135if (fMode == MessagePool_Host) {
136 flag = SHMFLAG_CREATEUNIQUE;
137} else {
138 flag = SHMFLAG_OPEN;
139}
140 */
141
142 bool created = false;
143 int shmid = shmget(key, sizeof(messagepool_t), SHMFLAG_OPEN);
144 if (fMode == MessagePool_Host) {
145 if (shmid == -1) {
146 shmid = shmget(key, 30000, SHMFLAG_CREATEUNIQUE);
147 if (shmid == -1) {
148 RESTWarning << "TRestMessenger: unknown error!" << RESTendl;
149 return;
150 } else {
151 created = true;
152 }
153 } else {
154 RESTWarning << "TRestMessenger: shmget error!" << RESTendl;
155 RESTWarning << "Shared memory not deleted? type \"ipcrm -m " << shmid << "\" in the bash"
156 << RESTendl;
157 return;
158 }
159 } else if (fMode == MessagePool_Client) {
160 if (shmid == -1) {
161 RESTWarning << "TRestMessenger: shmget error!" << RESTendl;
162 RESTWarning << "Shared memory not initialized? Launch Host process first!" << RESTendl;
163 return;
164 }
165 } else if (fMode == MessagePool_TwoWay) {
166 if (shmid == -1) {
167 shmid = shmget(key, 30000, SHMFLAG_CREATEUNIQUE);
168 if (shmid == -1) {
169 RESTWarning << "TRestMessenger: unknown error!" << RESTendl;
170 return;
171 } else {
172 created = true;
173 }
174 }
175 }
176
177 messagepool_t* message = (messagepool_t*)shmat(shmid, nullptr, 0);
178 if (message == nullptr) {
179 printf("shmat error\n");
180 return;
181 }
182
183 if ((string)this->GetName() == "defaultName") SetName(message->name);
184
185 if (created) {
186 message->Reset();
187 strcpy(message->name, this->GetName());
188 cout << "Created shared memory: " << shmid << endl;
189 } else {
190 if (strcmp(message->name, this->GetName()) != 0) {
191 RESTWarning << "TRestMessenger: connected message pool name(" << message->name
192 << ") is different with this(" << this->GetName() << ")!" << RESTendl;
193 }
194 cout << "Connected to shared memory: " << shmid << endl;
195 }
196
197 fShmId = shmid;
198 fMessagePool = message;
199 fPoolToken = token;
200 fPoolSource = source;
201}
202
203bool TRestMessenger::lock(messagepool_t* pool, int timeoutMs) {
204 int i = 0;
205 while (pool->owner != nullptr && pool->owner != (void*)this) {
206 usleep(1000);
207 i++;
208 if (i >= timeoutMs) {
209 return false;
210 }
211 }
212
213 pool->owner = (void*)this;
214
215 return true;
216}
217
218bool TRestMessenger::unlock(messagepool_t* pool, int timeoutMs) {
219 int i = 0;
220 while (pool->owner != nullptr && pool->owner != (void*)this) {
221 usleep(1000);
222 i++;
223 if (i >= timeoutMs) {
224 RESTError << "unlocking pool failed!" << RESTendl;
225 abort();
226 }
227 }
228
229 pool->owner = nullptr;
230
231 return true;
232}
233
234void TRestMessenger::AddPool(string message) {
235 if (!IsConnected()) {
236 RESTWarning << "TRestMessenger: Not connected!" << RESTendl;
237 return;
238 }
239
240 if (message.size() > MsgLength) {
241 message = message.substr(0, MsgLength - 2);
242 }
243 if (message == "") {
244 RESTWarning << "cannot add empty message!" << RESTendl;
245 return;
246 }
247
248 messagepool_t* pool = fMessagePool;
249 if (!lock(pool)) {
250 RESTWarning << "cannot add message to pool: " << pool->name << ": lock failed!" << RESTendl;
251 return;
252 }
253
254 int pos = pool->RequirePos();
255 if (pos != -1) {
256 messagepool_t::message_t msg;
257 msg.provider = this;
258 strcpy(msg.content, message.c_str());
259 memcpy(&pool->messages[pos], &msg, sizeof(msg));
260 } else {
261 RESTWarning << "cannot send message: message pool is full!" << RESTendl;
262 }
263
264 unlock(pool);
265}
266
267void TRestMessenger::SendMessage(string message) {
268 if (!IsConnected()) {
269 RESTWarning << "TRestMessenger: Not connected!" << RESTendl;
270 return;
271 }
272 if (fMode == MessagePool_Client) {
273 RESTWarning << "TRestMessenger: Forbidden to send message from client!" << RESTendl;
274 return;
275 }
276
277 if (message == "") {
278 if (ToUpper(fPoolSource) == "OUTPUTFILE") {
279 if (fRun != nullptr) {
280 message = fRun->GetOutputFileName();
281 }
282 } else if (ToUpper(fPoolSource) == "TIME") {
283 message = ToDateTimeString(time(0));
284 } else {
285 message = fPoolSource;
286 }
287 }
288
289 AddPool(message);
290}
291
292vector<string> TRestMessenger::ShowMessagePool() {
293 vector<string> result;
294
295 if (!IsConnected()) {
296 RESTWarning << "TRestMessenger: Not connected!" << RESTendl;
297 return result;
298 }
299
300 if (!lock(fMessagePool)) {
301 RESTWarning << "cannot read message to pool: " << fMessagePool->name << ": lock failed!" << RESTendl;
302 return result;
303 }
304
305 for (int i = 0; i < Nmsg; i++) {
306 if (!fMessagePool->messages[i].IsEmpty()) {
307 string msg = string(fMessagePool->messages[i].content);
308 if (msg != "") {
309 result.push_back(msg);
310 }
311 }
312 }
313
314 unlock(fMessagePool);
315 return result;
316}
317
318string TRestMessenger::ConsumeMessage() {
319 if (!IsConnected()) {
320 RESTWarning << "TRestMessenger: Not connected!" << RESTendl;
321 return "";
322 }
323 if (fMode == MessagePool_Host) {
324 RESTWarning << "TRestMessenger: Forbidden to consume message from host!" << RESTendl;
325 return "";
326 }
327
328 if (!lock(fMessagePool)) {
329 RESTWarning << "cannot read message to pool: " << fMessagePool->name << ": lock failed!" << RESTendl;
330 return "";
331 }
332
333 string msg = "";
334 for (int i = 0; i < Nmsg; i++) {
335 if (!fMessagePool->messages[i].IsEmpty()) {
336 // the process shall not consume the message provided by itself
337 if (fMessagePool->messages[i].provider != this) {
338 // form the message
339 msg = string(fMessagePool->messages[i].content);
340 // clear this message because it will be consumed
341 fMessagePool->messages[i].Reset();
342 break;
343 }
344 }
345 }
346
347 unlock(fMessagePool);
348 return msg;
349}
350
353
354 if (IsConnected()) {
355 RESTMetadata << "Connected : "
356 << " (token: " << fPoolToken << ", shmid: " << fShmId << ", source: " << fPoolSource
357 << ")" << RESTendl;
358 } else {
359 RESTMetadata << "Not Connected" << RESTendl;
360 }
361 RESTMetadata << "+++++++++++++++++++++++++++++++++++++++++++++" << RESTendl;
362 RESTMetadata << RESTendl;
363 RESTMetadata << RESTendl;
364}
void PrintMetadata() override
Implemented it in the derived metadata class to print out specific metadata information.
virtual void Initialize() override
Making default settings.
virtual void InitFromConfigFile() override
To make settings from rml file. This method must be implemented in the derived class.
virtual void PrintMetadata()
Implemented it in the derived metadata class to print out specific metadata information.
endl_t RESTendl
Termination flag object for TRestStringOutput.
TiXmlElement * StringToElement(std::string definition)
Parsing a string into TiXmlElement object.
Int_t LoadConfigFromElement(TiXmlElement *eSectional, TiXmlElement *eGlobal, std::map< std::string, std::string > envs={})
Main starter method.
TRestManager * fHostmgr
All metadata classes can be initialized and managed by TRestManager.
std::string GetParameter(std::string parName, TiXmlElement *e, TString defaultValue=PARAMETER_NOT_FOUND_STR)
Returns the value for the parameter named parName in the given section.
std::string ToUpper(std::string in)
Convert string to its upper case. Alternative of TString::ToUpper.
Int_t StringToInteger(std::string in)
Gets an integer from a string.
std::string ToDateTimeString(time_t time)
Format time_t into string.
void Reset()
max 100 messages, each 256 char length