48 #ifndef _MIRA_CHANNEL_H_ 49 #define _MIRA_CHANNEL_H_ 51 #ifndef _MIRA_FRAMEWORK_H_ 52 # error "Channel.h must be included via the Framework.h. You must not include it directly. Use #include <fw/Framework.h> instead" 56 #include <boost/shared_ptr.hpp> 57 #include <boost/type_traits/is_polymorphic.hpp> 104 typedef typename Buffer::ValueType ValueType;
105 typedef typename Buffer::Slot Slot;
108 typedef boost::shared_ptr<Subscriber> SubscriberPtr;
119 "ConcreteChannel must have the same size as AbstractChannel");
121 static_assert(boost::is_polymorphic<ConcreteChannel>::value==
false,
122 "ConcreateChannel and AbstractChannel must not be polymorphic");
133 boost::mutex::scoped_lock lock(mSubscribersMutex);
134 mNrOfSubscribersWithoutChannelSubscriber++;
141 void addSubscriber(SubscriberPtr subscriber)
143 boost::mutex::scoped_lock lock(mSubscribersMutex);
144 mSubscribers.push_back(subscriber);
145 subscriber->attachChannel(
this);
157 ChannelRead<T>
read();
195 ChannelReadInterval<T> readInterval(
const Time& timestamp, std::size_t nrSlots,
196 std::size_t olderSlots, std::size_t newerSlots,
217 ChannelReadInterval<T> readInterval(
const Time& from,
218 const Time& to = Time::eternity());
231 ChannelWrite<T>
write();
234 friend class Channel<T>;
247 struct ChannelCaster {
248 static ConcreteChannel<T>* cast(AbstractChannel* p)
252 promote_channel<T>(p);
254 assert(p->isTyped());
257 if(typeId<T>()!=p->getTypeId())
258 MIRA_THROW(XBadCast,
"Cannot cast channel '" << p->getID()
259 <<
"' (type '" << p->getTypename() <<
"') " 260 <<
" to typed channel of type '" << typeName<T>() <<
"'");
264 return static_cast<ConcreteChannel<T>*
>(p);
272 struct ChannelCaster<void> {
273 static ConcreteChannel<void>* cast(AbstractChannel* p)
277 return static_cast<ConcreteChannel<void>*
>(p);
285 struct ChannelCaster<T*> {
286 static ConcreteChannel<T*>* cast(AbstractChannel* p)
288 static_assert(std::is_base_of<mira::Object,T>::value,
289 "Pointers that are used in channels must be derived from " 290 "mira::Object. Pointers to other classes cannot be used.");
294 promote_channel<T*>(p);
297 assert(p->getBuffer()->isPolymorphic());
301 return static_cast<ConcreteChannel<T*>*
>(p);
308 inline ConcreteChannel<T>* channel_cast(AbstractChannel* p)
310 return ChannelCaster<T>::cast(p);
318 template <
typename T>
323 inline Channel<T> channel_cast(Channel<void> channel);
341 template <
typename T>
347 struct ParamHelper<void> {
401 template <
typename T>
415 mChannel(channel), mAccessFlags(accessFlags) {
416 assert(mChannel!=NULL);
432 return mChannel!=NULL;
443 "No channel assigned to the channel proxy (of type " <<
444 typeName<T>() <<
"). " 445 "You can obtain a channel using Authority::getChannel()!");
459 return mChannel->getID();
469 return mChannel->getTypeId();
483 return mChannel->getTypename();
492 mChannel->setTypename(
type);
500 return mChannel->getTypeMeta();
508 mChannel->setTypeMeta(meta);
526 return mChannel->getNrOfSlots() == 0;
534 return mChannel->hasSubscriber();
542 return mChannel->hasPublisher();
550 return mChannel->getBuffer()->getMaxSlots() == 1;
558 return mChannel->getBuffer()->getMaxSlots();
566 return mChannel->getBuffer()->getMinSlots();
575 return mChannel->getBuffer()->getStorageDuration();
583 return mChannel->getBuffer()->isAutoIncreasingStorageDuration();
591 return mChannel->getBuffer()->getSize();
617 validateReadAccess();
620 if(!timeout.isValid() || timeout.isInfinity())
621 end = Time::eternity();
625 while(!boost::this_thread::interruption_requested())
628 return mChannel->read();
629 }
catch(XInvalidRead& ex) {}
649 validateReadAccess();
652 if(!timeout.isValid() || timeout.isInfinity())
653 end = Time::eternity();
657 while(!boost::this_thread::interruption_requested())
678 validateReadAccess();
679 return mChannel->read();
704 validateReadAccess();
705 return mChannel->read(timestamp, mode, tolerance);
727 validateReadAccess();
728 Time newestSlotTime = mChannel->read()->timestamp;
731 for(; iter != interval.
end(); ++iter){
732 if(iter->sequenceID == sequenceID)
735 MIRA_THROW(XInvalidRead,
"No slot with sequenceID "<<sequenceID<<
736 " found within searchInterval of channel");
745 validateReadAccess();
746 return mChannel->read(timestamp,
NEAREST_SLOT, tolerance);
779 std::size_t olderSlots,
780 std::size_t newerSlots,
782 validateReadAccess();
783 return mChannel->readInterval(timestamp, nrSlots, olderSlots,
784 newerSlots, fillMode);
803 const Time& to=Time::eternity()) {
804 validateReadAccess();
805 return mChannel->readInterval(from, to);
824 validateWriteAccess();
825 return mChannel->write();
854 validateWriteAccess();
858 && (mChannel->getBuffer()->getMaxSlots()>1)
859 && (mChannel->getBuffer()->getSize()>0))
869 return mChannel->write();
886 validateReadAccess();
906 validateReadAccess();
907 ChannelRead<T> value = mChannel->read(timestamp, mode, tolerance);
917 validateReadAccess();
929 GetTime(
const Time& iT0) : t0(iT0) {}
932 typedef float result_type;
933 result_type operator()(
const Stamped<T>& p)
const {
935 return (p.timestamp-t0).totalMilliseconds();
944 typedef const T& result_type;
945 result_type operator()(
const Stamped<T>& p)
const {
972 template <
typename Filter>
974 validateReadAccess();
980 filter.samplesBefore(),
981 filter.samplesAfter());
983 if (!filter.canExtrapolate())
985 int samplesBefore = filter.samplesBefore();
986 int samplesAfter = filter.samplesAfter();
988 for (
auto it = data.
begin(); it != data.
end(); ++it)
990 if (it->timestamp <= timestamp)
992 else if (it->timestamp >= timestamp)
996 if (samplesBefore > 0)
997 MIRA_THROW(XRuntime,
"Filter::samplesBefore: Requirements not met. Missing " << samplesBefore <<
" items.");
998 if (samplesAfter > 0)
999 MIRA_THROW(XRuntime,
"Filter::samplesAfter: Requirements not met. Missing " << samplesAfter <<
" items.");
1007 if (mChannel->getNrOfSlots() == 1)
1014 const Time t0 = data.
begin()->timestamp;
1017 typedef boost::transform_iterator<GetTime, const_iterator> GetTimeIterator;
1018 GetTimeIterator begin1 = GetTimeIterator(data.
begin(), GetTime(t0));
1019 GetTimeIterator end1 = GetTimeIterator(data.
end() , GetTime(t0));
1021 typedef boost::transform_iterator<GetValue, const_iterator> GetValueIterator;
1023 GetValueIterator begin2=GetValueIterator(data.
begin(), GetValue());
1024 GetValueIterator end2 =GetValueIterator(data.
end() , GetValue());
1028 TimeContainer timeContainer(begin1, end1);
1029 ValueContainer valueContainer(begin2, end2);
1031 return makeStamped(filter.template apply<float, T>(timeContainer,
1033 (timestamp-t0).totalMilliseconds()),
1046 template <
typename U =
void>
1048 validateWriteAccess();
1055 template <
typename U =
void>
1057 validateWriteAccess();
1059 *writeSlot = std::move(value);
1072 template <
typename U>
1073 typename std::enable_if<!std::is_base_of_v<StampedHeader, std::decay_t<U>>>
::type 1075 validateWriteAccess();
1077 *writeSlot =
makeStamped(std::forward<U>(value), timestamp);
1090 template <
typename U = T>
1091 typename std::enable_if<!std::is_void<U>::value>
::type 1093 validateWriteAccess();
1100 template <
typename U = T>
1101 typename std::enable_if<!std::is_void<U>::value>
::type 1103 validateWriteAccess();
1105 *writeSlot =
makeStamped(std::move(value), timestamp);
1114 template<
typename U>
1121 void dbgDump(
bool brief=
true) { mChannel->dbgDump(brief); }
1126 void validateReadAccess()
const 1130 MIRA_THROW(XAccessViolation,
"You are not allowed to read from channel '" 1131 << mChannel->getID() <<
"'. Forgot to subscribe?");
1134 void validateWriteAccess()
const 1138 MIRA_THROW(XAccessViolation,
"You are not allowed to write to channel '" 1139 << mChannel->getID() <<
"'. Forgot to publish?");
1144 ConcreteChannel<T>* mChannel;
1151 template<
typename U>
1152 inline Channel<U> channel_cast(Channel<void> channel)
1155 return Channel<U>(channel_cast<U>(channel.mChannel), channel.mAccessFlags);
1163 #include "impl/ChannelReadWrite.hpp" 1164 #include "impl/ChannelReadInterval.hpp" TypeMetaPtr getTypeMeta() const
Return the type meta information for this channel.
Definition: Channel.h:498
std::enable_if<!std::is_base_of_v< StampedHeader, std::decay_t< U > > >::type post(U &&value, const Time ×tamp=Time::now())
Writes the specified data with the specified time stamp into the channel.
Definition: Channel.h:1074
An object that allows read access to a whole interval of channel data.
Definition: ChannelReadInterval.h:72
const std::string & getID() const
Return the channel ID, its name.
Definition: Channel.h:457
Read access (Authority is a subscriber)
Definition: Channel.h:333
bool hasPublisher() const
Returns true, if this channel has at least one publisher.
Definition: Channel.h:540
void finish()
Releases the lock explicitly.
Definition: ChannelReadWrite.h:601
std::size_t getMaxSlots() const
Returns the upper limit of slots that are allowed for this channel.
Definition: Channel.h:556
void post(const Stamped< T > &value)
Writes the specified data into the channel.
Definition: Channel.h:1047
An exception that occurs whenever a channel has no data.
Definition: Channel.h:88
void post(Stamped< T > &&value)
Same as above, for rvalue reference (move semantics)
Definition: Channel.h:1056
Write access (Authority is a publisher)
Definition: Channel.h:334
Macros for generating logical operators for using enum values as flags.
ChannelRead< T > read(uint32 sequenceID, const Duration &searchInterval=Duration::seconds(1))
Obtains read access to the element with the specified sequenceID within the given search interval If ...
Definition: Channel.h:725
void validate() const
Checks if the channel is valid otherwise throws an exception.
Definition: Channel.h:440
#define MIRA_ENUM_TO_FLAGS(EnumType)
Macro that can be used with enums that contain flags.
Definition: EnumToFlags.h:80
specialize cv::DataType for our ImgPixel and inherit from cv::DataType<Vec>
Definition: IOService.h:67
Subscriber classes used to subscribe on channels for automatic notification.
Channel(ConcreteChannel< T > *channel, ChannelAccessFlags accessFlags)
Is used by Framework to create a channel proxy object.
Definition: Channel.h:414
std::string Typename
Definition: Typename.h:60
void setTypeMeta(TypeMetaPtr meta)
Set the type meta information for this channel.
Definition: Channel.h:506
bool isTyped() const
Returns true, if the channel is typed and false, if it is untyped.
Definition: Channel.h:475
An object that allows exclusive write access to data of a channel.
Definition: ChannelReadWrite.h:652
#define MIRA_DEFINE_SERIALIZABLE_EXCEPTION(Ex, Base)
Macro for easily defining a new serializable exception class.
Definition: Exceptions.h:66
Definition: ChannelReadWrite.h:67
An object that allows read access to data of a channel.
Definition: ChannelReadWrite.h:494
ChannelReadInterval< T > readInterval(const Time ×tamp, std::size_t nrSlots, std::size_t olderSlots, std::size_t newerSlots, IntervalFillMode fillMode=PREFER_NEWER)
Obtains read access to an interval of elements before and after the requested timestamp.
Definition: Channel.h:777
#define MIRA_THROW(ex, msg)
Macro for throwing an exception.
Definition: Exception.h:82
Typename getTypename() const
Return the typename of the channel.
Definition: Channel.h:481
void finish()
Releases the lock explicitly and informs the Channel to signal all Subscribers that new data is avail...
Definition: ChannelReadWrite.h:765
Wrapper class for boost::posix_time::ptime for adding more functionality to it.
Definition: Time.h:418
ChannelRead< T > read(const Time ×tamp, SlotQueryMode mode=NEAREST_SLOT, const Duration &tolerance=Duration::infinity())
Obtains read access to the element at the specified timestamp.
Definition: Channel.h:702
const_iterator end() const
Definition: ChannelReadInterval.h:164
const_iterator begin() const
Definition: ChannelReadInterval.h:163
Classes for automatic locking/unlocking when reading and writing to channels.
MIRA_BASE_EXPORT void write(const Value &value, std::ostream &ioStream, bool formatted=false, int precision=-1)
Writes a json::Value into a given stream using the JSON format.
Commonly used exception classes.
std::size_t getNrOfSlots() const
Returns how many slots (i.e.
Definition: Channel.h:589
PropertyHint type(const std::string &t)
Sets the attribute "type" to the specified value.
Definition: PropertyHint.h:295
Definition: AbstractChannel.h:70
bool hasSoloSlot() const
Returns true, if this channel has one solo slot only.
Definition: Channel.h:548
sec_type seconds() const
Returns normalized number of seconds (0..59)
Definition: Time.h:280
Use this class to represent time durations.
Definition: Time.h:106
ChannelRead< T > read(const Time ×tamp, const Duration &tolerance)
Same as above, but always takes the nearest slot.
Definition: Channel.h:744
ChannelRead< T > read()
Obtains read access to the latest data element of this channel.
Definition: Channel.h:677
void reset()
Reset the proxy object so that it becomes invalid.
Definition: Channel.h:422
MIRA_BASE_EXPORT void read(const std::string &s, Value &oValue)
Read a json::Value from a string that contains JSON format.
Typed ChannelBuffer.
Definition: ChannelBuffer.h:892
The slot with smallest time difference to the requested will be chosen.
Definition: ChannelBuffer.h:104
Const iterator for iterating over the interval.
Definition: ChannelReadInterval.h:85
void setTypename(const Typename &type)
Set the typename of this channel.
Definition: Channel.h:490
Mix in for adding a time stamp, an optional frame id and an optional sequence id to data types like P...
Definition: Stamped.h:149
boost::shared_ptr< TypeMeta > TypeMetaPtr
Definition: MetaSerializer.h:309
std::enable_if<!std::is_void< U >::value >::type post(typename ParamHelper< T >::type &&value, const Time ×tamp=Time::now())
Same as above, for rvalue reference (move semantics)
Definition: Channel.h:1102
bool isAutoIncreasingStorageDuration() const
Returns whether the channel buffer is automatically increasing the storage duration.
Definition: Channel.h:581
std::enable_if<!std::is_void< U >::value >::type post(const typename ParamHelper< T >::type &value, const Time ×tamp=Time::now())
This allows post({}, Time()); to deduce we want to post an object of the channel's type...
Definition: Channel.h:1092
Duration getStorageDuration() const
Returns the timeframe that specifies how long a slot is guaranteed to be kept in the channel buffer...
Definition: Channel.h:573
Stamped< typename std::decay< T >::type > makeStamped(T &&value, const Time ×tamp=Time::now(), const std::string &frameID=std::string(), uint32 sequenceID=0)
Declare stamped classes for all standard data types including std::string.
Definition: Stamped.h:471
Base class for exceptions.
Definition: Exception.h:199
Generic buffer class that can be used as a replacement for std::vector whenever copying and reallocat...
Definition: Buffer.h:84
bool hasSubscriber() const
Returns true, if this channel has at least one subscriber.
Definition: Channel.h:532
static Duration infinity()
Returns a special duration time representing positive infinity.
Definition: Time.h:242
ChannelRead< T > waitForData(const Duration &timeout=Duration::infinity()) const
Waits until data in this channel becomes available.
Definition: Channel.h:615
Channel()
Create channel proxy that is not assigned to a channel.
Definition: Channel.h:409
ChannelWrite< T > write()
Obtains exclusive write access to the next free data slot of this channel.
Definition: Channel.h:823
No access at all (Authority is not a publisher nor a subscriber)
Definition: Channel.h:332
ChannelWrite< T > write(bool copyLatestData)
Same as write above with an additional parameter copyLatestData that allows you to specify whether th...
Definition: Channel.h:853
static Time now() static Time eternity()
Returns the current utc based time.
Definition: Time.h:481
std::size_t getMinSlots() const
Returns the number slots that are guaranteed to be kept in the channel buffer.
Definition: Channel.h:564
int getTypeId() const
Returns the type id of the channel.
Definition: Channel.h:467
Prefer filling the interval with slots with timestamp > requested.
Definition: ChannelBuffer.h:116
bool isValid() const
Returns true if the proxy object is valid.
Definition: Channel.h:431
IntervalFillMode
Mode that is used to determine what slots should be added to the interval when not enough slots are a...
Definition: ChannelBuffer.h:111
SlotQueryMode
Mode that is used to determine the slot obtained from a channel when no slot exists at the exact time...
Definition: ChannelBuffer.h:95
Base class for all framework channels.
#define MIRA_SLEEP(ms)
Sleeps for ms milliseconds This is a thread interruption point - if interruption of the current threa...
Definition: Thread.h:96
Implements AbstractChannelSubscriber for a concrete data type.
Definition: ChannelSubscriber.h:82
The framework that holds all manager classes and provides startup and shutdown of all framework relat...
Provides method for generating a unique id for any type.
bool isEmpty() const
Returns if the channel is empty (no data has been published)
Definition: Channel.h:524
bool waitForPublisher(const Duration &timeout=Duration::infinity()) const
Waits until this channel has at least one publisher.
Definition: Channel.h:647
Wraps an STL conform container around a range of values within another container. ...
Definition: IteratorRangeContainer.h:64
Classes for reading whole intervals of data from channels.
ChannelReadInterval< T > readInterval(const Time &from, const Time &to=Time::eternity())
Obtains read access to an interval of elements in a given time span.
Definition: Channel.h:802
ChannelAccessFlags
Flags specifying the access rights of an authority to a channel Can be combined.
Definition: Channel.h:331