MIRA
ChannelVectorSynchronizer.h
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012 by
3  * MetraLabs GmbH (MLAB), GERMANY
4  * and
5  * Neuroinformatics and Cognitive Robotics Labs (NICR) at TU Ilmenau, GERMANY
6  * All rights reserved.
7  *
8  * Contact: info@mira-project.org
9  *
10  * Commercial Usage:
11  * Licensees holding valid commercial licenses may use this file in
12  * accordance with the commercial license agreement provided with the
13  * software or, alternatively, in accordance with the terms contained in
14  * a written agreement between you and MLAB or NICR.
15  *
16  * GNU General Public License Usage:
17  * Alternatively, this file may be used under the terms of the GNU
18  * General Public License version 3.0 as published by the Free Software
19  * Foundation and appearing in the file LICENSE.GPL3 included in the
20  * packaging of this file. Please review the following information to
21  * ensure the GNU General Public License version 3.0 requirements will be
22  * met: http://www.gnu.org/copyleft/gpl.html.
23  * Alternatively you may (at your option) use any later version of the GNU
24  * General Public License if such license has been publicly approved by
25  * MLAB and NICR (or its successors, if any).
26  *
27  * IN NO EVENT SHALL "MLAB" OR "NICR" BE LIABLE TO ANY PARTY FOR DIRECT,
28  * INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF
29  * THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF "MLAB" OR
30  * "NICR" HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  *
32  * "MLAB" AND "NICR" SPECIFICALLY DISCLAIM ANY WARRANTIES, INCLUDING,
33  * BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
34  * FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
35  * ON AN "AS IS" BASIS, AND "MLAB" AND "NICR" HAVE NO OBLIGATION TO
36  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS OR MODIFICATIONS.
37  */
38 
47 #ifndef _MIRA_CHANNELVECTORSYNCHRONIZER_H_
48 #define _MIRA_CHANNELVECTORSYNCHRONIZER_H_
49 
50 #ifndef Q_MOC_RUN
51 #include <boost/preprocessor/seq.hpp>
52 #include <boost/preprocessor/tuple/elem.hpp>
53 #include <boost/preprocessor/repetition/repeat.hpp>
54 #include <boost/preprocessor/comparison/not_equal.hpp>
55 #include <boost/preprocessor/repetition/for.hpp>
56 #include <boost/preprocessor/comma_if.hpp>
57 #include <boost/preprocessor/arithmetic/inc.hpp>
58 
59 #include <boost/tuple/tuple.hpp>
60 #endif
61 
62 #include <fw/Framework.h>
63 
64 namespace mira {
65 
67 
68 #ifdef DOXYGEN
69 // This is a fake class to generate a doxygen documentation
105 template<typename type0> class ChannelVectorSynchronizer1 {
106 public:
107 
108  /* generate constructor which is empty*/
110  {}
111 
116  void unsubscribe(Authority& authority)
117  {
118  while(mChannel0.size() != 0){
119  authority.unsubscribe<type0>(mChannel0.back().getID());
120  mLast0.pop_back();
121  mChannelRead0.pop_back();
122  mChannel0.pop_back();
123  }
124  }
125 
130  void subscribe(Authority& authority,
131  const std::vector<std::string>& channelID0
132  ,const Duration& t = Duration::seconds(0))
133  {
134  mFunction = 0;
135  for(const auto& channelID:(channelID0))
136  {
137  (mLast0).push_back(Time::unixEpoch());
138  (mChannelRead0).push_back(ChannelRead<type0>());
139  (mChannel0).push_back(authority.subscribe<type0>(channelID, &ChannelVectorSynchronizer1<
140  type0>::callback0, this, t));
141  }
142  mTolerance = t;
143  }
144 
154  void subscribe(Authority& authority,
155  const std::vector<std::string>& channelID0
156  ,boost::function<void ( std::vector<ChannelRead<type0>>)> fn
157  ,const Duration& t = Duration::seconds(0))
158  {
159  mFunction = fn;
160  for(const auto& channelID:(channelID0))
161  {
162  (mLast0).push_back(Time::unixEpoch());
163  (mChannelRead0).push_back(ChannelRead<type0>());
164  (mChannel0).push_back(authority.subscribe<type0>(channelID, &ChannelVectorSynchronizer1<
165  type0>::callback0, this, t));
166  } mTolerance = t;
167  }
168 
172  template<typename Class>
173  void subscribe(Authority& authority,
174  const std::vector<std::string>& channelID0
175  , void (Class::*f)( std::vector<ChannelRead<type0>> )
176  , Class* obj
177  , const Duration& t = Duration::seconds(0)
178  )
179  {
180  subscribe(authority,
181  channelID0,
182  boost::bind(f, obj, _1),t);
183  }
184 
185  // Called when vector of channels of type0 has new data
187  std::vector<ChannelRead<type0>> c0;
188 
189  try{
190  for(auto& channel:mChannel0){
191  c0.push_back(channel.read(ic.getTimestamp(), mTolerance));
192  };
193  }
194  catch(...)
195  { return; }
196  /* now we can call the function with all channel elements */
197  call( c0);
198  }
199 
200  void call( std::vector<ChannelRead<type0>> c0)
201  {
202  typename std::vector<ChannelRead<type0>>::const_iterator cIter0 = c0.begin();
203  std::vector<Time>::iterator lastIter0 = mLast0.begin();
204  for(; cIter0 != c0.end(); ++cIter0, ++lastIter0){
205  if(*lastIter0 == cIter0->getTimestamp())
206  return;
207  }
208  cIter0 = c0.begin();
209  lastIter0 = mLast0.begin();
210  for(; cIter0 != c0.end(); ++cIter0, ++lastIter0){
211  *lastIter0 = cIter0->getTimestamp();
212  }
213  mChannelRead0 = c0;
214 
215  if (mFunction)
216  mFunction(c0);
217  }
218 
225  boost::tuple< std::vector<ChannelRead<type0>>> read() {
226  /* BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_ISVALID,mChannelRead)*/
227  if(!isValid()){
228  MIRA_THROW(XInvalidRead, "Cannot read from synchronized channel "\
229  << "' since there is no data");\
230  };
231  return boost::make_tuple( mChannelRead0);
232  }
233 
237  bool isValid() const {
238  for(const auto& channelRead:mChannelRead0) {
239  if(!channelRead.isValid()){
240  return false;
241  }
242  }
243  return true;
244  }
245 
252  boost::tuple< std::vector<ChannelRead<type0>>> waitForData(const Duration& timeout = Duration::infinity())
253  {
254  /* validateReadAccess is private :( */
255  /*BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_VALIDATE,mChannel)*/
256 
257  Time end;
258  if(!timeout.isValid() || timeout.isInfinity())
259  end = Time::eternity();
260  else
261  end = Time::now() + timeout;
262 
263  while(!boost::this_thread::interruption_requested())
264  {
265  Time oldestTimeStamp = Time::now();
266  Time timeStamp;
267  try {
268  std::vector<ChannelRead<type0>> c0;
269  for(auto& channel:mChannel0){
270  timeStamp = channel.read().getTimestamp();
271  if ( timeStamp < oldestTimeStamp )
272  oldestTimeStamp = timeStamp;
273  }
274  for(auto& channel:mChannel0){
275  c0.push_back(channel.read(oldestTimeStamp, mTolerance));
276  };
277  mChannelRead0 = c0;
278  return read();
279  }
280  catch(XInvalidRead& ex) {}
281  if(Time::now()>end) /* handle timeout */
282  break;
283  mira::sleep(mira::Duration::milliseconds(50));;
284  }
285  return boost::make_tuple( mChannelRead0);
286  }
287 
288  bool waitForPublisher(const Duration& timeout = Duration::infinity()) const
289  {
290  /*return BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_WAIT_FOR_PUBLISHER,);*/
291  /* bool result = true; */
292  for(const auto& channel:mChannel0){
293  /* if(result) */
294  /* result = channel.waitForPublisher( timeout ); */
295  if(!channel.waitForPublisher( timeout ))
296  return false;
297  }
298  /* return result; */
299  return true;
300  }
301 
302  std::vector<Time> mLast0;
303  std::vector<Channel<type0>> mChannel0;
304  std::vector<ChannelRead<type0>> mChannelRead0;
306  boost::function<void ( std::vector<ChannelRead<type0>>)> mFunction;
307 };
308 
309 
310 #else
311 
313 
314 // create: data0, data1, ...
315 #define MIRA_FW_INTERNAL_NUMBER(z, n, data) \
316  BOOST_PP_COMMA_IF(n) BOOST_PP_CAT(data,n)
317 
318 // create: data1, data2, ...
319 #define MIRA_FW_INTERNAL_INC_NUMBER(z, n, data) \
320  BOOST_PP_COMMA_IF(n) BOOST_PP_CAT(data,BOOST_PP_INC(n))
321 
322 // create: std::vector<ChannelRead<type0>>, std::vector<ChannelRead<type1>>, ...
323 #define MIRA_FW_INTERNAL_CHANNEL(z, n, data) \
324  BOOST_PP_COMMA_IF(n) std::vector<ChannelRead<BOOST_PP_CAT(type,n)>>
325 
326 // create: std::vector<ChannelRead<type0>> c0, std::vector<ChannelRead<type1>> c1, ...
327 #define MIRA_FW_INTERNAL_CHANNEL_VAR(z, n, data) \
328  BOOST_PP_COMMA_IF(n) std::vector<ChannelRead<BOOST_PP_CAT(type,n)>> BOOST_PP_CAT(c,n)
329 
330 // create:
331 // foreach(const auto& channel, mChannel(n)){
332 // if(!channel.waitForPublisher( timeout ))
333 // return false;
334 // }
335 #define MIRA_FW_INTERNAL_WAIT_FOR_PUBLISHER(z, n, data) \
336  foreach(const auto& channel, BOOST_PP_CAT(mChannel,n)){ \
337  if(!channel.waitForPublisher( timeout )) \
338  return false; \
339  } \
340 
341 // create:
342 // foreach(const auto& channelRead, data(n)){
343 // if(!channelRead.isValid())
344 // return false;
345 // }
346 #define MIRA_FW_INTERNAL_ISVALID(z,n,data) \
347  foreach(const auto& channelRead, BOOST_PP_CAT(data,n)) { \
348  if(!channelRead.isValid()){ \
349  return false; \
350  } \
351  } \
352 
353 // create:
354 // std::vector<Time> mLast(n);
355 // std::vector<Channel<type(n)>> mChannel(n);
356 // std::vector<ChannelRead<type(n)>> mChannelRead(n);
357 #define MIRA_FW_INTERNAL_MEMBER(z, n, data) \
358  std::vector<Time> BOOST_PP_CAT(mLast,n);\
359  std::vector<Channel<BOOST_PP_CAT(type,n)>> BOOST_PP_CAT(mChannel,n);\
360  std::vector<ChannelRead<BOOST_PP_CAT(type,n)>> BOOST_PP_CAT(mChannelRead,n);\
361 
362 // create:
363 // std::vector<ChannelRead<type(n)>> c(n);
364 #define MIRA_FW_INTERNAL_DECL_CHANNEL(z, n, data) \
365  std::vector<ChannelRead<BOOST_PP_CAT(type,n)>> BOOST_PP_CAT(c,n);\
366 
367 // create:
368 // foreach(auto& channel, mChannel(n)){
369 // c(n).push_back(channel.read(ic.getTimestamp(), mTolerance));
370 // }
371 #define MIRA_FW_INTERNAL_PUSH_CHANNEL_READ(z, n, data) \
372  foreach(auto& channel, BOOST_PP_CAT(mChannel,n)){ \
373  BOOST_PP_CAT(c,n).push_back(channel.read(ic.getTimestamp(), mTolerance)); \
374  }; \
375 
376 // create:
377 // typename std::vector<ChannelRead<type0>>::const_iterator cIter(n) = c(n).begin();
378 // std::vector<Time>::iterator lastIter(n) = mLast(n).begin();
379 // for(; cIter(n) != c(n).end(); ++cIter(n), ++lastIter(n)){
380 // if(*lastIter(n) == cIter(n)->getTimestamp())
381 // return;
382 // }
383 #define MIRA_FW_INTERNAL_CALL_RET(z, n, data) \
384  typename std::vector<ChannelRead<BOOST_PP_CAT(type,n)>>::const_iterator BOOST_PP_CAT(cIter,n) = BOOST_PP_CAT(c,n).begin(); \
385  std::vector<Time>::iterator BOOST_PP_CAT(lastIter,n) = BOOST_PP_CAT(mLast,n).begin();\
386  for(; BOOST_PP_CAT(cIter,n) != BOOST_PP_CAT(c,n).end(); ++BOOST_PP_CAT(cIter,n), ++BOOST_PP_CAT(lastIter,n)){ \
387  if(*BOOST_PP_CAT(lastIter,n) == BOOST_PP_CAT(cIter,n)->getTimestamp()) \
388  return; \
389  }
390 
391 // create:
392 // mChannelReadn = c(n);
393 #define MIRA_FW_INTERNAL_GET_CHANNELREAD(z, n, data) \
394  BOOST_PP_CAT(mChannelRead,n) = BOOST_PP_CAT(c,n);
395 
396 // create:
397 // void callback0(ChannelRead<type0> ic) {
398 // std::vector<ChannelRead<type0>> c0;
399 // std::vector<ChannelRead<type1>> c1;
400 // ...
401 //
402 // try{
403 // for(auto& channel:mChannel0){
404 // c0.push_back(channel.read(ic.getTimestamp(), mTolerance));
405 // };
406 // for(auto& channel:mChannel1){
407 // c1.push_back(channel.read(ic.getTimestamp(), mTolerance));
408 // };
409 // ...
410 // }
411 // catch(...)
412 // { return; }
413 // /* now we can call the function with all channel elements */
414 // call( c0, c1, ...);
415 // }
416 #define MIRA_FW_INTERNAL_GENCALLBACK(z, n, data) \
417  void BOOST_PP_CAT(callback,n)(ChannelRead<BOOST_PP_CAT(type,n)> ic) { \
418  BOOST_PP_REPEAT(data,MIRA_FW_INTERNAL_DECL_CHANNEL,) \
419  try{ \
420  BOOST_PP_REPEAT(data,MIRA_FW_INTERNAL_PUSH_CHANNEL_READ,) \
421  }\
422  catch(...)\
423  { return; }\
424  /* now we can call the function with all channel elements */\
425  call(BOOST_PP_REPEAT(data,MIRA_FW_INTERNAL_NUMBER,c));\
426  }\
427 
428 // create:
429 // cItern = cn.begin();
430 // lastItern = mLastn.begin();
431 // for(; cItern != cn.end(); ++cItern, ++lastItern){
432 // *lastItern = cItern->getTimestamp();
433 // }
434 #define MIRA_FW_INTERNAL_CALL_ASSIGN(z, n, data) \
435  BOOST_PP_CAT(cIter,n) = BOOST_PP_CAT(c,n).begin(); \
436  BOOST_PP_CAT(lastIter,n) = BOOST_PP_CAT(mLast,n).begin();\
437  for(; BOOST_PP_CAT(cIter,n) != BOOST_PP_CAT(c,n).end(); ++BOOST_PP_CAT(cIter,n), ++BOOST_PP_CAT(lastIter,n)){ \
438  *BOOST_PP_CAT(lastIter,n) = BOOST_PP_CAT(cIter,n)->getTimestamp(); \
439  } \
440 
441 // create:
442 // foreach(auto& channel, mChannel(n)){
443 // timeStamp = channel.read().getTimestamp();
444 // if ( timeStamp < oldestTimeStamp )
445 // oldestTimeStamp = timeStamp;
446 // }
447 #define MIRA_FW_INTERNAL_OLDESTTIMESTAMP(z, n, data) \
448  foreach(auto& channel, BOOST_PP_CAT(mChannel,n)){ \
449  timeStamp = channel.read().getTimestamp(); \
450  if ( timeStamp < oldestTimeStamp ) \
451  oldestTimeStamp = timeStamp; \
452  } \
453 
454 // create:
455 // foreach(auto& channel, mChannel(n)){
456 // cn.push_back(channel.read(oldestTimeStamp, mTolerance));
457 // };
458 #define MIRA_FW_INTERNAL_READ_FROM_OLD(z, n, data) \
459  foreach(auto& channel, BOOST_PP_CAT(mChannel,n)){ \
460  BOOST_PP_CAT(c,n).push_back(channel.read(oldestTimeStamp, mTolerance)); \
461  };\
462 
463 
464 // data contains 2 tuple elements 1: number of template arguments, 2: Class name
465 #define MIRA_FW_INTERNAL_SUBSCRIBE(z, n, data) \
466  foreach(const auto& channelID, (BOOST_PP_CAT(channelID,n))) \
467  { \
468  (BOOST_PP_CAT(mLast,n)).push_back(Time::unixEpoch()); \
469  (BOOST_PP_CAT(mChannelRead,n)).push_back(ChannelRead<BOOST_PP_CAT(type,n)>()); \
470  (BOOST_PP_CAT(mChannel,n)).push_back(authority.subscribe<BOOST_PP_CAT(type,n)>(channelID, &BOOST_PP_TUPLE_ELEM(2,1,data)<\
471  BOOST_PP_REPEAT(BOOST_PP_TUPLE_ELEM(2,0,data),MIRA_FW_INTERNAL_NUMBER,type)>::BOOST_PP_CAT(callback,n), this, t)); \
472  }
473 
474 // create:
475 // while(mChannel(n).size() != 0){
476 // authority.unsubscribe<typen>(mChannel(n).back().getID());
477 // mLast(n).pop_back();
478 // mChannelRead(n).pop_back();
479 // mChannel(n).pop_back();
480 // }
481 #define MIRA_FW_INTERNAL_UNSUBSCRIBE(z, n, data) \
482  while(BOOST_PP_CAT(mChannel,n).size() != 0){ \
483  authority.unsubscribe<BOOST_PP_CAT(type,n)>(BOOST_PP_CAT(mChannel,n).back().getID()); \
484  BOOST_PP_CAT(mLast,n).pop_back(); \
485  BOOST_PP_CAT(mChannelRead,n).pop_back(); \
486  BOOST_PP_CAT(mChannel,n).pop_back(); \
487  } \
488 
489 
490 // Use this macro to generate a synchronizer class with the given name
491 // and the given number of channel types
492 #define MIRA_CHANNEL_SYNCHRONIZER( TNAME, TNUM ) \
493  template < \
494  BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_NUMBER,typename type) \
495  > class TNAME\
496  {\
497  public:\
498  \
499  /* generate constructor which is empty*/ \
500  TNAME() \
501  {} \
502  \
503  void unsubscribe(Authority& authority)\
504  {\
505  BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_UNSUBSCRIBE,(TNUM,TNAME))\
506  }\
507  \
508  void subscribe(Authority& authority,\
509  BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_NUMBER,const std::vector<std::string>& channelID)\
510  \
511  ,const Duration& t = Duration::seconds(0))\
512  {\
513  mFunction = NULL;\
514  BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_SUBSCRIBE,(TNUM,TNAME))\
515  mTolerance = t;\
516  }\
517  \
518  void subscribe(Authority& authority,\
519  BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_NUMBER,const std::vector<std::string>& channelID)\
520  \
521  ,boost::function<void (BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_CHANNEL,))> fn\
522  ,const Duration& t = Duration::seconds(0))\
523  {\
524  mFunction = fn;\
525  BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_SUBSCRIBE,(TNUM,TNAME))\
526  mTolerance = t;\
527  }\
528  \
529  /* generate subscribe method provided for convenience*/\
530  template<typename Class>\
531  void subscribe(Authority& authority,\
532  BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_NUMBER,const std::vector<std::string>& channelID)\
533  \
534  , void (Class::*f)( BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_CHANNEL,) )\
535  , Class* obj\
536  , const Duration& t = Duration::seconds(0)\
537  )\
538  {\
539  subscribe(authority,\
540  BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_NUMBER,channelID),\
541  boost::bind(f, obj, BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_INC_NUMBER,_)),t);\
542  }\
543  /* generate callback functions for every channel*/ \
544  BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_GENCALLBACK,TNUM)\
545  \
546  void call(BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_CHANNEL_VAR,))\
547  {\
548  BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_CALL_RET,)\
549  BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_CALL_ASSIGN,)\
550  BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_GET_CHANNELREAD,)\
551  if (mFunction) mFunction( BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_NUMBER,c));\
552  }\
553  \
554  boost::tuple<BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_CHANNEL,)> read() { \
555  if(!isValid()) \
556  MIRA_THROW(XInvalidRead, "Cannot read from synchronized channel "\
557  << "' since there is no data");\
558  return boost::make_tuple(BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_NUMBER,mChannelRead));\
559  }\
560  \
561  bool isValid() const { \
562  /*bool allValid = true;*/ \
563  BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_ISVALID,mChannelRead) \
564  /*return allValid;*/ \
565  return true; \
566  }\
567  \
568  boost::tuple<BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_CHANNEL,)> waitForData(const Duration& timeout = Duration::infinity()) \
569  {\
570  /* validateReadAccess is private :( */ \
571  /*BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_VALIDATE,mChannel)*/\
572  \
573  Time end;\
574  if(!timeout.isValid() || timeout.isInfinity())\
575  end = Time::eternity();\
576  else\
577  end = Time::now() + timeout;\
578  \
579  while(!boost::this_thread::interruption_requested())\
580  {\
581  Time oldestTimeStamp = Time::now();\
582  Time timeStamp;\
583  try {\
584  BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_DECL_CHANNEL,)\
585  BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_OLDESTTIMESTAMP,)\
586  BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_READ_FROM_OLD,)\
587  BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_GET_CHANNELREAD,)\
588  return read();\
589  } catch(XInvalidRead& ex) {}\
590  if(Time::now()>end) /* handle timeout */\
591  break;\
592  MIRA_SLEEP(50);\
593  }\
594  return boost::make_tuple(BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_NUMBER,mChannelRead));\
595  }\
596  \
597  bool waitForPublisher(const Duration& timeout = Duration::infinity()) const\
598  {\
599  BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_WAIT_FOR_PUBLISHER,) \
600  return true; \
601  }\
602  \
603  BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_MEMBER,)\
604  Duration mTolerance; \
605  boost::function<void (BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_CHANNEL,))> mFunction; \
606  };\
607  \
608 
609 
610 #define MIRA_FW_GEN_CHANNEL_SYNCHRONIZER( z, n, data) \
611  MIRA_CHANNEL_SYNCHRONIZER(BOOST_PP_CAT(ChannelVectorSynchronizer,n), n)\
612  /* NEXT SYNCHRONIZER */\
613 
614 // generate predefined channel synchronizer for 1 vector of channels
615 // MIRA_CHANNEL_SYNCHRONIZER(BOOST_PP_CAT(ChannelVectorSynchronizer, 1), 1)
616 // generate predefined channel synchronizer for 1 to 9 vectors of channels
617 BOOST_PP_REPEAT_FROM_TO(1,10,MIRA_FW_GEN_CHANNEL_SYNCHRONIZER,)
618 
619 #undef MIRA_FW_INTERNAL_NUMBER
620 #undef MIRA_FW_INTERNAL_INC_NUMBER
621 #undef MIRA_FW_INTERNAL_CHANNEL
622 #undef MIRA_FW_INTERNAL_CHANNEL_VAR
623 #undef MIRA_FW_INTERNAL_WAIT_FOR_PUBLISHER
624 #undef MIRA_FW_INTERNAL_ISVALID
625 #undef MIRA_FW_INTERNAL_MEMBER
626 #undef MIRA_FW_INTERNAL_DECL_CHANNEL
627 #undef MIRA_FW_INTERNAL_PUSH_CHANNEL_READ
628 #undef MIRA_FW_INTERNAL_CALL_RET
629 #undef MIRA_FW_INTERNAL_GET_CHANNELREAD
630 #undef MIRA_FW_INTERNAL_GENCALLBACK
631 #undef MIRA_FW_INTERNAL_CALL_ASSIGN
632 #undef MIRA_FW_INTERNAL_OLDESTTIMESTAMP
633 #undef MIRA_FW_INTERNAL_READ_FROM_OLD
634 #undef MIRA_FW_INTERNAL_SUBSCRIBE
635 #undef MIRA_FW_INTERNAL_UNSUBSCRIBE
636 #undef MIRA_CHANNEL_SYNCHRONIZER
637 #undef MIRA_FW_GEN_CHANNEL_SYNCHRONIZER
638 
639 #endif
640 
642 
643 }
644 
645 #endif
tick_type milliseconds() const
Returns normalized number of milliseconds (0..999)
Definition: Time.h:285
Class that can be registered as a filter when subscribing to more than one channel to only get a call...
Definition: ChannelVectorSynchronizer.h:105
void unsubscribe(Authority &authority)
Call this instead of Authority::unsubscribe(...) to unsubscribe all the channels of the synchronizer...
Definition: ChannelVectorSynchronizer.h:116
specialize cv::DataType for our ImgPixel and inherit from cv::DataType<Vec>
Definition: IOService.h:67
Class object which supports some kind of class reflection.
Definition: Class.h:97
std::vector< ChannelRead< type0 > > mChannelRead0
Definition: ChannelVectorSynchronizer.h:304
static Time unixEpoch()
Returns the unix epoch 1.1.1970 0:0:0.000.
Definition: Time.h:511
void unsubscribe(const std::string &channelID)
Unsubscribe from a given channel.
#define MIRA_THROW(ex, msg)
Macro for throwing an exception.
Definition: Exception.h:82
Wrapper class for boost::posix_time::ptime for adding more functionality to it.
Definition: Time.h:418
#define MIRA_FW_GEN_CHANNEL_SYNCHRONIZER(z, n, data)
Definition: ChannelSynchronizer.h:650
void call(std::vector< ChannelRead< type0 >> c0)
Definition: ChannelVectorSynchronizer.h:200
bool waitForPublisher(const Duration &timeout=Duration::infinity()) const
Definition: ChannelVectorSynchronizer.h:288
sec_type seconds() const
Returns normalized number of seconds (0..59)
Definition: Time.h:280
Use this class to represent time durations.
Definition: Time.h:106
ChannelVectorSynchronizer1()
Definition: ChannelVectorSynchronizer.h:109
Authorities act as a facade to the framework.
Definition: Authority.h:94
boost::function< void(std::vector< ChannelRead< type0 >>)> mFunction
Definition: ChannelVectorSynchronizer.h:306
void subscribe(Authority &authority, const std::vector< std::string > &channelID0, void(Class::*f)(std::vector< ChannelRead< type0 >>), Class *obj, const Duration &t=Duration::seconds(0))
Same as above but with a function and object pointer.
Definition: ChannelVectorSynchronizer.h:173
static Duration infinity()
Returns a special duration time representing positive infinity.
Definition: Time.h:242
void callback0(ChannelRead< type0 > ic)
Definition: ChannelVectorSynchronizer.h:186
boost::tuple< std::vector< ChannelRead< type0 > > > read()
Return (synchronized) ChannelRead objects.
Definition: ChannelVectorSynchronizer.h:225
std::vector< Channel< type0 > > mChannel0
Definition: ChannelVectorSynchronizer.h:303
bool isValid() const
Return true if all ChannelRead objects contain valid data.
Definition: ChannelVectorSynchronizer.h:237
static Time now() static Time eternity()
Returns the current utc based time.
Definition: Time.h:481
std::vector< Time > mLast0
Definition: ChannelVectorSynchronizer.h:302
Duration mTolerance
Definition: ChannelVectorSynchronizer.h:305
The framework that holds all manager classes and provides startup and shutdown of all framework relat...
void subscribe(Authority &authority, const std::vector< std::string > &channelID0, boost::function< void(std::vector< ChannelRead< type0 >>)> fn, const Duration &t=Duration::seconds(0))
Call this instead of Authority::subscribe()
Definition: ChannelVectorSynchronizer.h:154
void subscribe(Authority &authority, const std::vector< std::string > &channelID0, const Duration &t=Duration::seconds(0))
Call this instead of Authority::subscribe() Function provided for convenience (if no callback functio...
Definition: ChannelVectorSynchronizer.h:130
Channel< T > subscribe(const std::string &channelID, const Duration &storageDuration=Duration::seconds(0))
Subscribes authority to a given channel.
const Time & getTimestamp() const
Definition: ChannelReadWrite.h:258
boost::tuple< std::vector< ChannelRead< type0 > > > waitForData(const Duration &timeout=Duration::infinity())
Return the latest (synchronized) element once it is available.
Definition: ChannelVectorSynchronizer.h:252