89 template <
typename ...Ts>
96 template<
int C,
typename A,
typename... As>
99 std::get<C>(this->mChannels) = authority.
subscribe(this->mChannelIDs[C], &ChannelSynchronizer::channelCallback<C, A>,
this, t);
102 subscribeToChannelHelper<C+1, As...>(authority, t);
111 template<
int C,
typename A,
typename... As>
112 bool isValidHelper()
const 114 if(!std::get<C>(this->mChannelReads).
isValid())
119 return isValidHelper<C + 1, As...>();
123 bool isValidHelper()
const 128 template<
int C,
typename A,
typename... As>
129 bool waitForPublisherHelper(
const Duration& timeout )
const 136 return waitForPublisherHelper<C + 1, As...>(timeout);
140 bool waitForPublisherHelper(
const Duration& timeout)
const 146 template<
int C,
typename A,
typename... As>
147 void unsubscribeHelper(
Authority& authority)
const 149 authority.
unsubscribe<A>(std::get<C>(this->mChannels).getID());
150 unsubscribeHelper<C + 1, As...>(authority);
154 void unsubscribeHelper(
Authority& authority)
const 158 template<
int C,
typename A,
typename... As>
159 bool getNearestChannelReadObjectsOfAllChannels(
160 const int& idOfChannelWhoseCallbackGotCalledFirst,
162 const Time& timestamp,
163 const uint32_t sequenceID = 0)
165 if(C == idOfChannelWhoseCallbackGotCalledFirst)
170 return getNearestChannelReadObjectsOfAllChannels<C + 1, As...>(idOfChannelWhoseCallbackGotCalledFirst, tmpChannelReads, timestamp, sequenceID);
175 if(this->mSynchronizationMode == SynchronizationMode::BY_TIMESTAMP)
177 std::get<C>(tmpChannelReads) = std::get<C>(this->mChannels).read(timestamp, this->mTolerance);
179 else if(this->mSynchronizationMode == SynchronizationMode::BY_SEQUENCE_ID)
181 std::get<C>(tmpChannelReads) = std::get<C>(this->mChannels).read(sequenceID, this->mTolerance);
189 return getNearestChannelReadObjectsOfAllChannels<C + 1, As...>(idOfChannelWhoseCallbackGotCalledFirst, tmpChannelReads, timestamp, sequenceID);
193 bool getNearestChannelReadObjectsOfAllChannels(
194 const int& idOfChannelWhoseCallbackGotCalledFirst,
196 const Time& timestamp,
197 const uint32_t sequenceID = 0)
202 template<
int C,
typename A,
typename... As>
203 bool checkForNewTimestampsAndUpdate(std::tuple<
ChannelRead<Ts>...>& tmpChannelReads)
212 if(this->mLastTimes[C] == newTimestamp)
216 this->mLastTimes[C] = newTimestamp;
218 return checkForNewTimestampsAndUpdate<C + 1, As...>(tmpChannelReads);
222 bool checkForNewTimestampsAndUpdate(std::tuple<
ChannelRead<Ts>...>& tmpChannelReads)
227 template<
int C,
typename A,
typename... As>
228 bool getOldestTimestampOfChannelReadsHelper(
Time& oldestTimestamp)
230 const Time& timestamp = std::get<C>(this->mChannels).
read().getTimestamp();
231 if(timestamp < oldestTimestamp)
234 oldestTimestamp = timestamp;
237 return getOldestTimestampOfChannelReadsHelper<C + 1, As...>(oldestTimestamp);
241 bool getOldestTimestampOfChannelReadsHelper(
Time& oldestTimestamp)
246 template<
int C,
typename A,
typename... As>
247 bool getOldestSequenceIDOfChannelReadsHelper(uint32_t& oldestSequenceID)
249 const uint32_t sequenceID = std::get<C>(this->mChannels).
read()->sequenceID;
250 if(sequenceID < oldestSequenceID)
252 oldestSequenceID = sequenceID;
256 return getOldestSequenceIDOfChannelReadsHelper<C + 1, As...>(oldestSequenceID);
260 bool getOldestSequenceIDOfChannelReadsHelper(uint32_t& oldestSequenceID)
302 struct StringArgumentHelper
304 StringArgumentHelper() {}
305 StringArgumentHelper(
const std::string& str) : string(str) {}
306 StringArgumentHelper(
const char* cStr) : string(cStr) {}
311 template<std::size_t I = 0, std::size_t end,
typename... Tp>
312 inline typename std::enable_if<I >= end,
void>
::type 313 stringArgumentHelperUnpack(std::tuple<Tp...>& t, std::vector<std::string>& resultVector) { }
315 template<std::size_t I = 0, std::size_t end,
typename... Tp>
317 stringArgumentHelperUnpack(std::tuple<Tp...>& t, std::vector<std::string>& resultVector)
319 auto d = std::get<I>(t);
320 resultVector.push_back(d.string);
321 stringArgumentHelperUnpack<I + 1, end, Tp...>(t, resultVector);
330 template<
int N,
int ...S>
331 struct generateIntegerSequence : generateIntegerSequence<N-1, N-1, S...> { };
334 struct generateIntegerSequence<0, S...> {
335 typedef sequence<S...>
type;
339 void upCallFunctionHelper(sequence<S...>)
345 this->mFunction(std::get<S>(this->mChannelReads)...);
348 template<
int SeqLength =
sizeof...(Ts)>
349 void upCallFunctionHelper()
357 template<
class Class,
class... Args,
int... Is>
358 std::function<void (Args...)> bind_with_variadic_placeholders(
void (
Class::*p)(Args...),
Class* obj, sequence<Is...>)
365 template<
class Class,
class... Args>
366 std::function<void (Args...)> bind_with_variadic_placeholders(
void (
Class::*p)(Args...),
Class* obj)
368 return bind_with_variadic_placeholders(p, obj,
typename generateIntegerSequence<
sizeof...(Args)>::
type());
372 template<
int C,
typename A>
375 std::tuple<ChannelRead<Ts>...> tmpChannelReads;
376 std::get<C>(tmpChannelReads) = channelRead;
382 if(!getNearestChannelReadObjectsOfAllChannels<0, Ts...>(C, tmpChannelReads, channelRead.
getTimestamp(), channelRead->sequenceID))
385 this->call(tmpChannelReads);
398 if(!checkForNewTimestampsAndUpdate<0, Ts...>(tmpChannelReads))
406 this->mChannelReads = tmpChannelReads;
413 upCallFunctionHelper();
421 : mLastTimes(
std::vector<
Time>(sizeof...(Ts),
Time::unixEpoch())),
424 assert(
sizeof...(Ts) > 0 &&
"Number of template arguments of ChannelSynchronizer has to be > 0!");
433 unsubscribeHelper<0, Ts...>(authority);
445 StringArgumentHelper<Ts>... channelIDs,
448 std::tuple<StringArgumentHelper<Ts>...> tuple(channelIDs...);
449 this->mChannelIDs.clear();
451 stringArgumentHelperUnpack<0,
sizeof...(Ts), StringArgumentHelper<Ts>...>(tuple, this->mChannelIDs);
452 this->mFunction = NULL;
454 this->mTolerance = t;
456 subscribeToChannelHelper<0, Ts...>(authority, t);
468 StringArgumentHelper<Ts>... channelIDs,
472 std::tuple<StringArgumentHelper<Ts>...> tuple(channelIDs...);
474 std::vector<std::string> tmpChannelIDs;
475 stringArgumentHelperUnpack<0,
sizeof...(Ts), StringArgumentHelper<Ts>...>(tuple, tmpChannelIDs);
477 subscribe(authority, tmpChannelIDs, fn, t);
484 const std::vector<std::string>& channelIDs,
488 this->mChannelIDs = channelIDs;
489 this->mFunction = fn;
490 this->mTolerance = t;
492 subscribeToChannelHelper<0, Ts...>(authority, t);
498 template<
typename Class>
500 StringArgumentHelper<Ts>... channelIDs,
505 std::tuple<StringArgumentHelper<Ts>...> tuple(channelIDs...);
507 std::vector<std::string> tmpChannelIDs;
508 stringArgumentHelperUnpack<0,
sizeof...(Ts), StringArgumentHelper<Ts>...>(tuple, tmpChannelIDs);
510 subscribe(authority, tmpChannelIDs, f, obj, t);
516 template<
typename Class>
518 std::vector<std::string> channelIDs,
523 std::function<void (ChannelRead<Ts>...)> upcallFunction = bind_with_variadic_placeholders<Class, ChannelRead<Ts>...>(f, obj);
524 subscribe(authority, channelIDs, upcallFunction, t);
533 std::tuple<ChannelRead<Ts>... >
read()
535 return this->mChannelReads;
543 return isValidHelper<0, Ts...>();
556 end = Time::eternity();
560 while(!boost::this_thread::interruption_requested())
562 if(this->mSynchronizationMode == ChannelSynchronizerBase::SynchronizationMode::BY_TIMESTAMP)
567 if(getOldestTimestampOfChannelReadsHelper<0, Ts...>(oldestTimestamp))
569 std::tuple<ChannelRead<Ts>...> tmpChannelReads;
570 if(getNearestChannelReadObjectsOfAllChannels<0, Ts...>(-1, tmpChannelReads, oldestTimestamp))
572 this->mChannelReads = tmpChannelReads;
577 catch(XInvalidRead& ex) {}
583 uint32_t oldestSequenceID = std::numeric_limits<uint32_t>::max();
586 if(getOldestSequenceIDOfChannelReadsHelper<0, Ts...>(oldestSequenceID))
588 std::tuple<ChannelRead<Ts>...> tmpChannelReads;
590 if(getNearestChannelReadObjectsOfAllChannels<0, Ts...>(-1, tmpChannelReads,
Time(), oldestSequenceID))
592 this->mChannelReads = tmpChannelReads;
597 catch(XInvalidRead& ex) {}
605 return this->mChannelReads;
610 return waitForPublisherHelper<0, Ts...>(timeout);
615 this->mSynchronizationMode = synchronizationMode;
620 std::tuple<Channel<Ts>...> mChannels;
621 std::tuple<ChannelRead<Ts>...> mChannelReads;
622 std::vector<std::string> mChannelIDs;
623 std::vector<Time> mLastTimes;
627 std::function<void (ChannelRead<Ts>...)> mFunction;
640 #define MIRA_FW_INTERNAL_NUMBER(z, n, data) \ 641 BOOST_PP_COMMA_IF(n) BOOST_PP_CAT(data,n) 643 #define MIRA_CHANNEL_SYNCHRONIZER( TNAME, TNUM ) \ 645 BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_NUMBER,typename type) \ 646 > class TNAME : public ChannelSynchronizer<BOOST_PP_REPEAT(TNUM,MIRA_FW_INTERNAL_NUMBER,type)>\ 650 #define MIRA_FW_GEN_CHANNEL_SYNCHRONIZER( z, n, data) \ 651 MIRA_CHANNEL_SYNCHRONIZER(BOOST_PP_CAT(ChannelSynchronizer,n), n)\ 665 #undef MIRA_FW_INTERNAL_NUMBER 666 #undef MIRA_CHANNEL_SYNCHRONIZER 667 #undef MIRA_FW_GEN_CHANNEL_SYNCHRONIZER void subscribe(Authority &authority, StringArgumentHelper< Ts >... channelIDs, void(Class::*f)(ChannelRead< Ts >...), Class *obj, const Duration &t=Duration::milliseconds(100))
Same as above but with a function and object pointer.
Definition: ChannelSynchronizer.h:499
void subscribe(Authority &authority, StringArgumentHelper< Ts >... channelIDs, std::function< void(ChannelRead< Ts >...)> fn, const Duration &t=Duration::milliseconds(100))
Call this instead of Authority::subscribe()
Definition: ChannelSynchronizer.h:467
Definition: ChannelSynchronizer.h:84
void subscribe(Authority &authority, const std::vector< std::string > &channelIDs, std::function< void(ChannelRead< Ts >...)> fn, const Duration &t=Duration::milliseconds(100))
Same as above but with the channel IDs as std::vector<std::string>.
Definition: ChannelSynchronizer.h:483
tick_type milliseconds() const
Returns normalized number of milliseconds (0..999)
Definition: Time.h:285
Definition: ChannelSynchronizer.h:90
bool isValid() const
Checks if this duration is invalid.
Definition: Time.h:257
specialize cv::DataType for our ImgPixel and inherit from cv::DataType<Vec>
Definition: IOService.h:67
bool isValid() const
Return true if all ChannelRead objects contain valid data.
Definition: ChannelSynchronizer.h:541
Class object which supports some kind of class reflection.
Definition: Class.h:97
Definition: ChannelSynchronizer.h:85
SynchronizationMode
Definition: ChannelSynchronizer.h:82
Definition: ChannelSynchronizer.h:79
An object that allows read access to data of a channel.
Definition: ChannelReadWrite.h:494
void unsubscribe(const std::string &channelID)
Unsubscribe from a given channel.
Wrapper class for boost::posix_time::ptime for adding more functionality to it.
Definition: Time.h:418
#define MIRA_FW_GEN_CHANNEL_SYNCHRONIZER(z, n, data)
Definition: ChannelSynchronizer.h:650
bool isValid() const
Returns true, if data was assigned to the ChannelRead or ChannelWrite and if this data is locked...
Definition: ChannelReadWrite.h:238
PropertyHint type(const std::string &t)
Sets the attribute "type" to the specified value.
Definition: PropertyHint.h:295
Use this class to represent time durations.
Definition: Time.h:106
Authorities act as a facade to the framework.
Definition: Authority.h:94
std::tuple< ChannelRead< Ts >... > read()
Return (synchronized) ChannelRead objects.
Definition: ChannelSynchronizer.h:533
ChannelSynchronizer()
Definition: ChannelSynchronizer.h:419
Definition: ChannelSynchronizer.h:56
void subscribe(Authority &authority, std::vector< std::string > channelIDs, void(Class::*f)(ChannelRead< Ts >...), Class *obj, const Duration &t=Duration::milliseconds(100))
Same as above but with a function and object pointer.
Definition: ChannelSynchronizer.h:517
Base class for exceptions.
Definition: Exception.h:199
static Duration infinity()
Returns a special duration time representing positive infinity.
Definition: Time.h:242
bool isInfinity() const
Checks if this duration is infinity.
Definition: Time.h:264
static Time now() static Time eternity()
Returns the current utc based time.
Definition: Time.h:481
std::tuple< ChannelRead< Ts >... > waitForData(const Duration &timeout=Duration::infinity())
Return the latest (synchronized) element once it is available.
Definition: ChannelSynchronizer.h:552
void subscribe(Authority &authority, StringArgumentHelper< Ts >... channelIDs, const Duration &t=Duration::milliseconds(100))
Call this instead of Authority::subscribe() Function provided for convenience (if no callback functio...
Definition: ChannelSynchronizer.h:444
void setSynchronizationMode(const SynchronizationMode &synchronizationMode)
Definition: ChannelSynchronizer.h:613
#define MIRA_SLEEP(ms)
Sleeps for ms milliseconds This is a thread interruption point - if interruption of the current threa...
Definition: Thread.h:96
The framework that holds all manager classes and provides startup and shutdown of all framework relat...
Channel< T > subscribe(const std::string &channelID, const Duration &storageDuration=Duration::seconds(0))
Subscribes authority to a given channel.
bool waitForPublisher(const Duration &timeout=Duration::infinity()) const
Definition: ChannelSynchronizer.h:608
void unsubscribe(Authority &authority)
Call this instead of Authority::unsubscribe(...) to unsubscribe all the channels of the synchronizer...
Definition: ChannelSynchronizer.h:431
const Time & getTimestamp() const
Definition: ChannelReadWrite.h:258