71#include "TRestMessenger.h"
80#include "TRestDataBase.h"
81#include "TRestManager.h"
82#include "TRestProcessRunner.h"
83#include "TRestStringOutput.h"
88TRestMessenger::TRestMessenger() {
Initialize(); }
90TRestMessenger::~TRestMessenger() {
92 if (fMessagePool !=
nullptr) {
93 if (fMessagePool->owner ==
this) {
100TRestMessenger::TRestMessenger(
int token,
string mode) {
103 StringToElement(
"<TRestMessenger token=\"" + ToString(token) +
"\" mode=\"" + mode +
"\"/>"),
nullptr,
109 fMode = MessagePool_TwoWay;
112#define SHMFLAG_CREATEUNIQUE (0640 | IPC_CREAT | IPC_EXCL)
113#define SHMFLAG_CREATEOROPEN (0640 | IPC_CREAT)
114#define SHMFLAG_OPEN (0640)
122 if (
ToUpper(modestr) ==
"HOST") {
123 fMode = MessagePool_Host;
124 }
else if (
ToUpper(modestr) ==
"CLIENT") {
125 fMode = MessagePool_Client;
126 }
else if (
ToUpper(modestr) ==
"TWOWAY" ||
ToUpper(modestr) ==
"AUTO") {
127 fMode = MessagePool_TwoWay;
131 string source =
GetParameter(
"messageSource",
"OUTPUTFILE");
142 bool created =
false;
143 int shmid = shmget(key,
sizeof(
messagepool_t), SHMFLAG_OPEN);
144 if (fMode == MessagePool_Host) {
146 shmid = shmget(key, 30000, SHMFLAG_CREATEUNIQUE);
148 RESTWarning <<
"TRestMessenger: unknown error!" <<
RESTendl;
154 RESTWarning <<
"TRestMessenger: shmget error!" <<
RESTendl;
155 RESTWarning <<
"Shared memory not deleted? type \"ipcrm -m " << shmid <<
"\" in the bash"
159 }
else if (fMode == MessagePool_Client) {
161 RESTWarning <<
"TRestMessenger: shmget error!" <<
RESTendl;
162 RESTWarning <<
"Shared memory not initialized? Launch Host process first!" <<
RESTendl;
165 }
else if (fMode == MessagePool_TwoWay) {
167 shmid = shmget(key, 30000, SHMFLAG_CREATEUNIQUE);
169 RESTWarning <<
"TRestMessenger: unknown error!" <<
RESTendl;
178 if (message ==
nullptr) {
179 printf(
"shmat error\n");
183 if ((
string)this->GetName() ==
"defaultName") SetName(message->name);
187 strcpy(message->name, this->GetName());
188 cout <<
"Created shared memory: " << shmid << endl;
190 if (strcmp(message->name, this->GetName()) != 0) {
191 RESTWarning <<
"TRestMessenger: connected message pool name(" << message->name
192 <<
") is different with this(" << this->GetName() <<
")!" <<
RESTendl;
194 cout <<
"Connected to shared memory: " << shmid << endl;
198 fMessagePool = message;
200 fPoolSource = source;
203bool TRestMessenger::lock(messagepool_t* pool,
int timeoutMs) {
205 while (pool->owner !=
nullptr && pool->owner != (
void*)
this) {
208 if (i >= timeoutMs) {
213 pool->owner = (
void*)
this;
218bool TRestMessenger::unlock(messagepool_t* pool,
int timeoutMs) {
220 while (pool->owner !=
nullptr && pool->owner != (
void*)
this) {
223 if (i >= timeoutMs) {
224 RESTError <<
"unlocking pool failed!" <<
RESTendl;
229 pool->owner =
nullptr;
234void TRestMessenger::AddPool(
string message) {
235 if (!IsConnected()) {
236 RESTWarning <<
"TRestMessenger: Not connected!" <<
RESTendl;
240 if (message.size() > MsgLength) {
241 message = message.substr(0, MsgLength - 2);
244 RESTWarning <<
"cannot add empty message!" <<
RESTendl;
248 messagepool_t* pool = fMessagePool;
250 RESTWarning <<
"cannot add message to pool: " << pool->name <<
": lock failed!" <<
RESTendl;
254 int pos = pool->RequirePos();
256 messagepool_t::message_t msg;
258 strcpy(msg.content, message.c_str());
259 memcpy(&pool->messages[pos], &msg,
sizeof(msg));
261 RESTWarning <<
"cannot send message: message pool is full!" <<
RESTendl;
267void TRestMessenger::SendMessage(
string message) {
268 if (!IsConnected()) {
269 RESTWarning <<
"TRestMessenger: Not connected!" <<
RESTendl;
272 if (fMode == MessagePool_Client) {
273 RESTWarning <<
"TRestMessenger: Forbidden to send message from client!" <<
RESTendl;
278 if (
ToUpper(fPoolSource) ==
"OUTPUTFILE") {
279 if (fRun !=
nullptr) {
280 message = fRun->GetOutputFileName();
282 }
else if (
ToUpper(fPoolSource) ==
"TIME") {
285 message = fPoolSource;
292vector<string> TRestMessenger::ShowMessagePool() {
293 vector<string> result;
295 if (!IsConnected()) {
296 RESTWarning <<
"TRestMessenger: Not connected!" <<
RESTendl;
300 if (!lock(fMessagePool)) {
301 RESTWarning <<
"cannot read message to pool: " << fMessagePool->name <<
": lock failed!" <<
RESTendl;
305 for (
int i = 0; i < Nmsg; i++) {
306 if (!fMessagePool->messages[i].IsEmpty()) {
307 string msg = string(fMessagePool->messages[i].content);
309 result.push_back(msg);
314 unlock(fMessagePool);
318string TRestMessenger::ConsumeMessage() {
319 if (!IsConnected()) {
320 RESTWarning <<
"TRestMessenger: Not connected!" <<
RESTendl;
323 if (fMode == MessagePool_Host) {
324 RESTWarning <<
"TRestMessenger: Forbidden to consume message from host!" <<
RESTendl;
328 if (!lock(fMessagePool)) {
329 RESTWarning <<
"cannot read message to pool: " << fMessagePool->name <<
": lock failed!" <<
RESTendl;
334 for (
int i = 0; i < Nmsg; i++) {
335 if (!fMessagePool->messages[i].IsEmpty()) {
337 if (fMessagePool->messages[i].provider !=
this) {
339 msg = string(fMessagePool->messages[i].content);
341 fMessagePool->messages[i].Reset();
347 unlock(fMessagePool);
355 RESTMetadata <<
"Connected : "
356 <<
" (token: " << fPoolToken <<
", shmid: " << fShmId <<
", source: " << fPoolSource
359 RESTMetadata <<
"Not Connected" <<
RESTendl;
361 RESTMetadata <<
"+++++++++++++++++++++++++++++++++++++++++++++" <<
RESTendl;
void PrintMetadata() override
Implemented it in the derived metadata class to print out specific metadata information.
virtual void Initialize() override
Making default settings.
virtual void InitFromConfigFile() override
To make settings from rml file. This method must be implemented in the derived class.
std::string ToUpper(std::string in)
Convert string to its upper case. Alternative of TString::ToUpper.
Int_t StringToInteger(std::string in)
Gets an integer from a string.
std::string ToDateTimeString(time_t time)
Format time_t into string.
void Reset()
max 100 messages, each 256 char length