47 #ifndef _MIRA_CHANNELVECTORSYNCHRONIZER_H_ 48 #define _MIRA_CHANNELVECTORSYNCHRONIZER_H_ 51 #include <boost/preprocessor/seq.hpp> 52 #include <boost/preprocessor/tuple/elem.hpp> 53 #include <boost/preprocessor/repetition/repeat.hpp> 54 #include <boost/preprocessor/comparison/not_equal.hpp> 55 #include <boost/preprocessor/repetition/for.hpp> 56 #include <boost/preprocessor/comma_if.hpp> 57 #include <boost/preprocessor/arithmetic/inc.hpp> 59 #include <boost/tuple/tuple.hpp> 131 const std::vector<std::string>& channelID0
135 for(
const auto& channelID:(channelID0))
155 const std::vector<std::string>& channelID0
160 for(
const auto& channelID:(channelID0))
172 template<
typename Class>
174 const std::vector<std::string>& channelID0
182 boost::bind(f, obj, _1),t);
187 std::vector<ChannelRead<type0>> c0;
202 typename std::vector<ChannelRead<type0>>::const_iterator cIter0 = c0.begin();
203 std::vector<Time>::iterator lastIter0 =
mLast0.begin();
204 for(; cIter0 != c0.end(); ++cIter0, ++lastIter0){
205 if(*lastIter0 == cIter0->getTimestamp())
209 lastIter0 =
mLast0.begin();
210 for(; cIter0 != c0.end(); ++cIter0, ++lastIter0){
211 *lastIter0 = cIter0->getTimestamp();
225 boost::tuple< std::vector<ChannelRead<type0>>>
read() {
228 MIRA_THROW(XInvalidRead,
"Cannot read from synchronized channel "\
229 <<
"' since there is no data");\
239 if(!channelRead.isValid()){
258 if(!timeout.isValid() || timeout.isInfinity())
259 end = Time::eternity();
263 while(!boost::this_thread::interruption_requested())
268 std::vector<ChannelRead<type0>> c0;
270 timeStamp = channel.read().getTimestamp();
271 if ( timeStamp < oldestTimeStamp )
272 oldestTimeStamp = timeStamp;
275 c0.push_back(channel.read(oldestTimeStamp,
mTolerance));
280 catch(XInvalidRead& ex) {}
295 if(!channel.waitForPublisher( timeout ))
306 boost::function<void ( std::vector<ChannelRead<type0>>)>
mFunction;
315 #define MIRA_FW_INTERNAL_NUMBER(z, n, data) \ 316 BOOST_PP_COMMA_IF(n) BOOST_PP_CAT(data,n) 319 #define MIRA_FW_INTERNAL_INC_NUMBER(z, n, data) \ 320 BOOST_PP_COMMA_IF(n) BOOST_PP_CAT(data,BOOST_PP_INC(n)) 323 #define MIRA_FW_INTERNAL_CHANNEL(z, n, data) \ 324 BOOST_PP_COMMA_IF(n) std::vector<ChannelRead<BOOST_PP_CAT(type,n)>> 327 #define MIRA_FW_INTERNAL_CHANNEL_VAR(z, n, data) \ 328 BOOST_PP_COMMA_IF(n) std::vector<ChannelRead<BOOST_PP_CAT(type,n)>> BOOST_PP_CAT(c,n) 335 #define MIRA_FW_INTERNAL_WAIT_FOR_PUBLISHER(z, n, data) \ 336 foreach(const auto& channel, BOOST_PP_CAT(mChannel,n)){ \ 337 if(!channel.waitForPublisher( timeout )) \ 346 #define MIRA_FW_INTERNAL_ISVALID(z,n,data) \ 347 foreach(const auto& channelRead, BOOST_PP_CAT(data,n)) { \ 348 if(!channelRead.isValid()){ \ 357 #define MIRA_FW_INTERNAL_MEMBER(z, n, data) \ 358 std::vector<Time> BOOST_PP_CAT(mLast,n);\ 359 std::vector<Channel<BOOST_PP_CAT(type,n)>> BOOST_PP_CAT(mChannel,n);\ 360 std::vector<ChannelRead<BOOST_PP_CAT(type,n)>> BOOST_PP_CAT(mChannelRead,n);\ 364 #define MIRA_FW_INTERNAL_DECL_CHANNEL(z, n, data) \ 365 std::vector<ChannelRead<BOOST_PP_CAT(type,n)>> BOOST_PP_CAT(c,n);\ 371 #define MIRA_FW_INTERNAL_PUSH_CHANNEL_READ(z, n, data) \ 372 foreach(auto& channel, BOOST_PP_CAT(mChannel,n)){ \ 373 BOOST_PP_CAT(c,n).push_back(channel.read(ic.getTimestamp(), mTolerance)); \ 383 #define MIRA_FW_INTERNAL_CALL_RET(z, n, data) \ 384 typename std::vector<ChannelRead<BOOST_PP_CAT(type,n)>>::const_iterator BOOST_PP_CAT(cIter,n) = BOOST_PP_CAT(c,n).begin(); \ 385 std::vector<Time>::iterator BOOST_PP_CAT(lastIter,n) = BOOST_PP_CAT(mLast,n).begin();\ 386 for(; BOOST_PP_CAT(cIter,n) != BOOST_PP_CAT(c,n).end(); ++BOOST_PP_CAT(cIter,n), ++BOOST_PP_CAT(lastIter,n)){ \ 387 if(*BOOST_PP_CAT(lastIter,n) == BOOST_PP_CAT(cIter,n)->getTimestamp()) \ 393 #define MIRA_FW_INTERNAL_GET_CHANNELREAD(z, n, data) \ 394 BOOST_PP_CAT(mChannelRead,n) = BOOST_PP_CAT(c,n); 416 #define MIRA_FW_INTERNAL_GENCALLBACK(z, n, data) \ 417 void BOOST_PP_CAT(callback,n)(ChannelRead<BOOST_PP_CAT(type,n)> ic) { \ 418 BOOST_PP_REPEAT(data,MIRA_FW_INTERNAL_DECL_CHANNEL,) \ 420 BOOST_PP_REPEAT(data,MIRA_FW_INTERNAL_PUSH_CHANNEL_READ,) \ 425 call(BOOST_PP_REPEAT(data,MIRA_FW_INTERNAL_NUMBER,c));\ 434 #define MIRA_FW_INTERNAL_CALL_ASSIGN(z, n, data) \ 435 BOOST_PP_CAT(cIter,n) = BOOST_PP_CAT(c,n).begin(); \ 436 BOOST_PP_CAT(lastIter,n) = BOOST_PP_CAT(mLast,n).begin();\ 437 for(; BOOST_PP_CAT(cIter,n) != BOOST_PP_CAT(c,n).end(); ++BOOST_PP_CAT(cIter,n), ++BOOST_PP_CAT(lastIter,n)){ \ 438 *BOOST_PP_CAT(lastIter,n) = BOOST_PP_CAT(cIter,n)->getTimestamp(); \ 447 #define MIRA_FW_INTERNAL_OLDESTTIMESTAMP(z, n, data) \ 448 foreach(auto& channel, BOOST_PP_CAT(mChannel,n)){ \ 449 timeStamp = channel.read().getTimestamp(); \ 450 if ( timeStamp < oldestTimeStamp ) \ 451 oldestTimeStamp = timeStamp; \ 458 #define MIRA_FW_INTERNAL_READ_FROM_OLD(z, n, data) \ 459 foreach(auto& channel, BOOST_PP_CAT(mChannel,n)){ \ 460 BOOST_PP_CAT(c,n).push_back(channel.read(oldestTimeStamp, mTolerance)); \ 465 #define MIRA_FW_INTERNAL_SUBSCRIBE(z, n, data) \ 466 foreach(const auto& channelID, (BOOST_PP_CAT(channelID,n))) \ 468 (BOOST_PP_CAT(mLast,n)).push_back(Time::unixEpoch()); \ 469 (BOOST_PP_CAT(mChannelRead,n)).push_back(ChannelRead<BOOST_PP_CAT(type,n)>()); \ 470 (BOOST_PP_CAT(mChannel,n)).push_back(authority.subscribe<BOOST_PP_CAT(type,n)>(channelID, &BOOST_PP_TUPLE_ELEM(2,1,data)<\ 471 BOOST_PP_REPEAT(BOOST_PP_TUPLE_ELEM(2,0,data),MIRA_FW_INTERNAL_NUMBER,type)>::BOOST_PP_CAT(callback,n), this, t)); \ 481 #define MIRA_FW_INTERNAL_UNSUBSCRIBE(z, n, data) \ 482 while(BOOST_PP_CAT(mChannel,n).size() != 0){ \ 483 authority.unsubscribe<BOOST_PP_CAT(type,n)>(BOOST_PP_CAT(mChannel,n).back().getID()); \ 484 BOOST_PP_CAT(mLast,n).pop_back(); \ 485 BOOST_PP_CAT(mChannelRead,n).pop_back(); \ 486 BOOST_PP_CAT(mChannel,n).pop_back(); \ 492 #define MIRA_CHANNEL_SYNCHRONIZER( TNAME, TNUM ) \ 494 BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_NUMBER,typename type) \ 503 void unsubscribe(Authority& authority)\ 505 BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_UNSUBSCRIBE,(TNUM,TNAME))\ 508 void subscribe(Authority& authority,\ 509 BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_NUMBER,const std::vector<std::string>& channelID)\ 511 ,const Duration& t = Duration::seconds(0))\ 514 BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_SUBSCRIBE,(TNUM,TNAME))\ 518 void subscribe(Authority& authority,\ 519 BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_NUMBER,const std::vector<std::string>& channelID)\ 521 ,boost::function<void (BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_CHANNEL,))> fn\ 522 ,const Duration& t = Duration::seconds(0))\ 525 BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_SUBSCRIBE,(TNUM,TNAME))\ 530 template<typename Class>\ 531 void subscribe(Authority& authority,\ 532 BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_NUMBER,const std::vector<std::string>& channelID)\ 534 , void (Class::*f)( BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_CHANNEL,) )\ 536 , const Duration& t = Duration::seconds(0)\ 539 subscribe(authority,\ 540 BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_NUMBER,channelID),\ 541 boost::bind(f, obj, BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_INC_NUMBER,_)),t);\ 544 BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_GENCALLBACK,TNUM)\ 546 void call(BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_CHANNEL_VAR,))\ 548 BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_CALL_RET,)\ 549 BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_CALL_ASSIGN,)\ 550 BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_GET_CHANNELREAD,)\ 551 if (mFunction) mFunction( BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_NUMBER,c));\ 554 boost::tuple<BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_CHANNEL,)> read() { \ 556 MIRA_THROW(XInvalidRead, "Cannot read from synchronized channel "\ 557 << "' since there is no data");\ 558 return boost::make_tuple(BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_NUMBER,mChannelRead));\ 561 bool isValid() const { \ 563 BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_ISVALID,mChannelRead) \ 568 boost::tuple<BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_CHANNEL,)> waitForData(const Duration& timeout = Duration::infinity()) \ 574 if(!timeout.isValid() || timeout.isInfinity())\ 575 end = Time::eternity();\ 577 end = Time::now() + timeout;\ 579 while(!boost::this_thread::interruption_requested())\ 581 Time oldestTimeStamp = Time::now();\ 584 BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_DECL_CHANNEL,)\ 585 BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_OLDESTTIMESTAMP,)\ 586 BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_READ_FROM_OLD,)\ 587 BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_GET_CHANNELREAD,)\ 589 } catch(XInvalidRead& ex) {}\ 590 if(Time::now()>end) \ 594 return boost::make_tuple(BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_NUMBER,mChannelRead));\ 597 bool waitForPublisher(const Duration& timeout = Duration::infinity()) const\ 599 BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_WAIT_FOR_PUBLISHER,) \ 603 BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_MEMBER,)\ 604 Duration mTolerance; \ 605 boost::function<void (BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_CHANNEL,))> mFunction; \ 610 #define MIRA_FW_GEN_CHANNEL_SYNCHRONIZER( z, n, data) \ 611 MIRA_CHANNEL_SYNCHRONIZER(BOOST_PP_CAT(ChannelVectorSynchronizer,n), n)\ 619 #undef MIRA_FW_INTERNAL_NUMBER 620 #undef MIRA_FW_INTERNAL_INC_NUMBER 621 #undef MIRA_FW_INTERNAL_CHANNEL 622 #undef MIRA_FW_INTERNAL_CHANNEL_VAR 623 #undef MIRA_FW_INTERNAL_WAIT_FOR_PUBLISHER 624 #undef MIRA_FW_INTERNAL_ISVALID 625 #undef MIRA_FW_INTERNAL_MEMBER 626 #undef MIRA_FW_INTERNAL_DECL_CHANNEL 627 #undef MIRA_FW_INTERNAL_PUSH_CHANNEL_READ 628 #undef MIRA_FW_INTERNAL_CALL_RET 629 #undef MIRA_FW_INTERNAL_GET_CHANNELREAD 630 #undef MIRA_FW_INTERNAL_GENCALLBACK 631 #undef MIRA_FW_INTERNAL_CALL_ASSIGN 632 #undef MIRA_FW_INTERNAL_OLDESTTIMESTAMP 633 #undef MIRA_FW_INTERNAL_READ_FROM_OLD 634 #undef MIRA_FW_INTERNAL_SUBSCRIBE 635 #undef MIRA_FW_INTERNAL_UNSUBSCRIBE 636 #undef MIRA_CHANNEL_SYNCHRONIZER 637 #undef MIRA_FW_GEN_CHANNEL_SYNCHRONIZER tick_type milliseconds() const
Returns normalized number of milliseconds (0..999)
Definition: Time.h:285
Class that can be registered as a filter when subscribing to more than one channel to only get a call...
Definition: ChannelVectorSynchronizer.h:105
void unsubscribe(Authority &authority)
Call this instead of Authority::unsubscribe(...) to unsubscribe all the channels of the synchronizer...
Definition: ChannelVectorSynchronizer.h:116
specialize cv::DataType for our ImgPixel and inherit from cv::DataType<Vec>
Definition: IOService.h:67
Class object which supports some kind of class reflection.
Definition: Class.h:97
std::vector< ChannelRead< type0 > > mChannelRead0
Definition: ChannelVectorSynchronizer.h:304
static Time unixEpoch()
Returns the unix epoch 1.1.1970 0:0:0.000.
Definition: Time.h:511
void unsubscribe(const std::string &channelID)
Unsubscribe from a given channel.
#define MIRA_THROW(ex, msg)
Macro for throwing an exception.
Definition: Exception.h:82
Wrapper class for boost::posix_time::ptime for adding more functionality to it.
Definition: Time.h:418
#define MIRA_FW_GEN_CHANNEL_SYNCHRONIZER(z, n, data)
Definition: ChannelSynchronizer.h:650
void call(std::vector< ChannelRead< type0 >> c0)
Definition: ChannelVectorSynchronizer.h:200
bool waitForPublisher(const Duration &timeout=Duration::infinity()) const
Definition: ChannelVectorSynchronizer.h:288
sec_type seconds() const
Returns normalized number of seconds (0..59)
Definition: Time.h:280
Use this class to represent time durations.
Definition: Time.h:106
ChannelVectorSynchronizer1()
Definition: ChannelVectorSynchronizer.h:109
Authorities act as a facade to the framework.
Definition: Authority.h:94
boost::function< void(std::vector< ChannelRead< type0 >>)> mFunction
Definition: ChannelVectorSynchronizer.h:306
void subscribe(Authority &authority, const std::vector< std::string > &channelID0, void(Class::*f)(std::vector< ChannelRead< type0 >>), Class *obj, const Duration &t=Duration::seconds(0))
Same as above but with a function and object pointer.
Definition: ChannelVectorSynchronizer.h:173
static Duration infinity()
Returns a special duration time representing positive infinity.
Definition: Time.h:242
void callback0(ChannelRead< type0 > ic)
Definition: ChannelVectorSynchronizer.h:186
boost::tuple< std::vector< ChannelRead< type0 > > > read()
Return (synchronized) ChannelRead objects.
Definition: ChannelVectorSynchronizer.h:225
std::vector< Channel< type0 > > mChannel0
Definition: ChannelVectorSynchronizer.h:303
bool isValid() const
Return true if all ChannelRead objects contain valid data.
Definition: ChannelVectorSynchronizer.h:237
static Time now() static Time eternity()
Returns the current utc based time.
Definition: Time.h:481
std::vector< Time > mLast0
Definition: ChannelVectorSynchronizer.h:302
Duration mTolerance
Definition: ChannelVectorSynchronizer.h:305
The framework that holds all manager classes and provides startup and shutdown of all framework relat...
void subscribe(Authority &authority, const std::vector< std::string > &channelID0, boost::function< void(std::vector< ChannelRead< type0 >>)> fn, const Duration &t=Duration::seconds(0))
Call this instead of Authority::subscribe()
Definition: ChannelVectorSynchronizer.h:154
void subscribe(Authority &authority, const std::vector< std::string > &channelID0, const Duration &t=Duration::seconds(0))
Call this instead of Authority::subscribe() Function provided for convenience (if no callback functio...
Definition: ChannelVectorSynchronizer.h:130
Channel< T > subscribe(const std::string &channelID, const Duration &storageDuration=Duration::seconds(0))
Subscribes authority to a given channel.
const Time & getTimestamp() const
Definition: ChannelReadWrite.h:258
boost::tuple< std::vector< ChannelRead< type0 > > > waitForData(const Duration &timeout=Duration::infinity())
Return the latest (synchronized) element once it is available.
Definition: ChannelVectorSynchronizer.h:252