MIRA
ChannelManager.h
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012 by
3  * MetraLabs GmbH (MLAB), GERMANY
4  * and
5  * Neuroinformatics and Cognitive Robotics Labs (NICR) at TU Ilmenau, GERMANY
6  * All rights reserved.
7  *
8  * Contact: info@mira-project.org
9  *
10  * Commercial Usage:
11  * Licensees holding valid commercial licenses may use this file in
12  * accordance with the commercial license agreement provided with the
13  * software or, alternatively, in accordance with the terms contained in
14  * a written agreement between you and MLAB or NICR.
15  *
16  * GNU General Public License Usage:
17  * Alternatively, this file may be used under the terms of the GNU
18  * General Public License version 3.0 as published by the Free Software
19  * Foundation and appearing in the file LICENSE.GPL3 included in the
20  * packaging of this file. Please review the following information to
21  * ensure the GNU General Public License version 3.0 requirements will be
22  * met: http://www.gnu.org/copyleft/gpl.html.
23  * Alternatively you may (at your option) use any later version of the GNU
24  * General Public License if such license has been publicly approved by
25  * MLAB and NICR (or its successors, if any).
26  *
27  * IN NO EVENT SHALL "MLAB" OR "NICR" BE LIABLE TO ANY PARTY FOR DIRECT,
28  * INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF
29  * THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF "MLAB" OR
30  * "NICR" HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  *
32  * "MLAB" AND "NICR" SPECIFICALLY DISCLAIM ANY WARRANTIES, INCLUDING,
33  * BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
34  * FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
35  * ON AN "AS IS" BASIS, AND "MLAB" AND "NICR" HAVE NO OBLIGATION TO
36  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS OR MODIFICATIONS.
37  */
38 
47 #ifndef _MIRA_CHANNELMANAGER_H_
48 #define _MIRA_CHANNELMANAGER_H_
49 
50 #include <platform/Types.h>
51 #include <error/Exceptions.h>
52 #include <thread/Thread.h>
53 #include <transform/Pose.h>
54 
55 #include <fw/Channel.h>
56 
57 namespace mira {
58 
60 
64 MIRA_DEFINE_SERIALIZABLE_EXCEPTION(XUnknownChannel, XRuntime)
65 
66 
76 {
77 public:
79  struct ChannelInfo
80  {
82 
83  ChannelInfo(const std::string& iID, bool iInternal = false) :
84  id(iID),
85  internal(iInternal) {}
86 
87  bool operator<(const ChannelInfo& other) const
88  {
89  return id < other.id;
90  }
91 
92  bool operator==(const ChannelInfo& other) const
93  {
94  return id == other.id;
95  }
96 
97  std::string id;
98  bool internal;
99  };
100 
101  ChannelManager();
102  ~ChannelManager();
103 
105  template<typename Reflector>
106  void reflect(Reflector& r)
107  {
108  r.interface("IChannelManager");
109 
110  r.method("createChannel",
111  [this](const std::string& channelID, const std::string& type) {
112  std::string id = ResourceName::makeFullyQualified(channelID, "/");
113  auto ch = publish<void>(id, "", true, type); // publish void, internal, without publisher
114  ChannelPromoteByTypename::instance().promoteChannel(ch, type); // try to automatically promote to typed
115  },
116  "Publish a channel with given type. If type is registered for automatic promotion, "
117  "the channel will be promoted to typed immediately, else remains untyped.",
118  "channelID", "ID of channel to create", "/namespace/Rect2i",
119  "type", "Typename of channel type", "mira::Rect<int,2>");
120 
121  r.method("writeChannel",
122  [this](const std::string& channelID, const json::Value& value,
123  Time timestamp, std::string frameID, uint32 sequenceID) {
124  writeChannel(channelID, value, std::move(timestamp),
125  std::move(frameID), sequenceID);
126  },
127  "Writes a json representation to the channel, "
128  "with specified timestamp, frame and sequence ID.",
129  "channelID", "ID of channel to write to", "/robot/RobotFrame",
130  "value", "value to write", JSONSerializer().serialize(Pose2()),
131  "time", "data timestamp", Time::now(),
132  "frameID", "data frame ID", "/robot/OdometryFrame",
133  "sequenceID", "data sequence ID", 0);
134  r.method("writeChannel",
135  [this](const std::string& channelID, const json::Value& value) {
136  writeChannel(channelID, value);
137  },
138  "Writes a json representation to the channel "
139  "with timestamp taken from clock 'now', default frame ID and increasing sequence ID.",
140  "channelID", "ID of channel to write to", "/robot/RobotFrame",
141  "value", "value to write", JSONSerializer().serialize(Pose2()));
142  r.method("writeChannel",
143  [this](const std::string& channelID, const json::Value& value, Time timestamp) {
144  writeChannel(channelID, value, std::move(timestamp));
145  },
146  "Writes a json representation to the channel with specified timestamp, "
147  "default frame ID and increasing sequence ID.",
148  "channelID", "ID of channel to write to", "/robot/RobotFrame",
149  "value", "value to write", JSONSerializer().serialize(Pose2()),
150  "time", "data timestamp", Time::now());
151  r.method("writeChannelWithFrameID", // needs a different name to distinguish JSONRPC signature from above
152  [this](const std::string& channelID, const json::Value& value, std::string frameID) {
153  writeChannel(channelID, value, std::move(frameID));
154  },
155  "Writes a json representation to the channel with specified frame ID, "
156  "timestamp taken from clock 'now' and increasing sequence ID.",
157  "channelID", "ID of channel to write to", "/robot/RobotFrame",
158  "value", "value to write", JSONSerializer().serialize(Pose2()),
159  "frameID", "data frame ID", "/robot/OdometryFrame");
160 
161  r.method("readChannel", &ChannelManager::readChannel, this,
162  "Reads the latest channel data as json representation",
163  "channelID", "ID of channel to read", "/robot/RobotFrame");
164  r.method("getChannelList", &ChannelManager::getChannelList, this,
165  "get list of channels",
166  "publishedOnly", "if true, include only channels with (local) publishers", false,
167  "subscribedOnly", "if true, include only channels with (local) subscribers", false);
168  }
169 
170 public:
191  template<typename T>
192  ConcreteChannel<T>* publish(const std::string& channelID,
193  const std::string& publisherID,
194  bool internal, const Typename& type,
195  bool noAutoPromoteCheck = false)
196  {
197  // If exists && id and T are not the same throw Exception
198  ConcreteChannel<T>* channel = obtainConcreteChannel<T>(channelID);
199  channel->fixateType(); // fixate the type
200 
201  if ( channel->isTyped() ) {
202  Typename channelType = channel->getTypename();
203  if ((channelType != type) &&
204  // typed publish to polymorphic channel might specify a different type
205  // (when the polymorphic channel is typed, its type has been fixated),
206  // so untyped publish must match it)
207  ((channelType[channelType.size()-1] != '*') || (typeName<T>() == "void")))
208  MIRA_THROW(XBadCast, "Invalid publish<void> to typed channel '" << channelID <<
209  "'. Typename does not match. ('" <<
210  type << "' != '" << channel->getTypename() << "')");
211 
212  if (mCheckChannelRegistrations && !noAutoPromoteCheck)
213  ChannelPromoteByTypename::instance().checkForPromoter(channel);
214  } else {
215  if (!channel->getTypename().empty() && (channel->getTypename() != type))
216  MIRA_THROW(XBadCast, "Invalid publish<void> to untyped channel '" << channelID <<
217  "' (with set typename). Typename does not match. ('" <<
218  type << "' != '" << channel->getTypename() << "')");
219 
220  channel->setTypename(type);
221  }
222 
223  {
224  insertPublishedChannel(channelID, ChannelInfo(publisherID, internal));
225  MIRA_LOG(DEBUG) << "'" << publisherID << "' published channel '" << channelID << "'";
226  }
227 
228  if (!internal)
229  remotePublishChannel(channelID, type);
230 
231  return channel;
232  }
233 
235  void enableAutoPromoteChannels(bool enable) { mAutoPromoteChannels = enable; }
236 
238  bool autoPromoteChannelsEnabled() { return mAutoPromoteChannels; }
239 
243  void enableCheckChannelRegistrations(bool enable) { mCheckChannelRegistrations = enable; }
244 
246  bool checkChannelRegistrationsEnabled() { return mCheckChannelRegistrations; }
247 
248 public:
249 
270  template<typename T>
271  ConcreteChannel<T>* subscribe(const std::string& channelID,
272  const std::string& subscriberID,
273  const Duration& storageDuration, bool internal)
274  {
275  ConcreteChannel<T>* channel = obtainConcreteChannel<T>(channelID);
276 
277  assert(channel->getBuffer()!=NULL);
278  channel->getBuffer()->setStorageDuration(storageDuration);
279  insertSubscribedChannel(channelID, ChannelInfo(subscriberID, internal));
280  MIRA_LOG(DEBUG) << "'" << subscriberID << "' subscribed to channel '" << channelID << "'";
281 
282  if (!internal)
283  remoteSubscribeChannel(channelID);
284 
285  return channel;
286  }
287 
294  void removePublisherFromAllChannels(const std::string& publisherID, bool internal);
295 
303  void removePublisher(const std::string& channelID, const std::string& publisherID, bool internal);
304 
309  void removeSubscriberFromAllChannels(const std::string& subscriberID);
310 
316  void removeSubscriber(const std::string& channelID, const std::string& subscriberID);
317 
323  void deductNumSubscribers(const std::string& channelID);
324 
332  template<typename T>
333  ConcreteChannel<T>* getConcreteChannel(const std::string& channelID)
334  {
335  return channel_cast<T>(getAbstractChannel(channelID));
336  }
337 
338 public:
339 
343  std::set<std::string> getChannels() const;
344 
356  std::set<std::string> getChannelList(bool publishedOnly = false,
357  bool subscribedOnly = false) const;
358 
365  std::set<std::string> getSubscribedChannels(bool includeInternal = false) const;
366 
376  std::set<std::string> getSubscribedChannelsBy(const std::string& subscriberID) const;
377 
381  bool isSubscribedOn(const std::string& subscriberID,
382  const std::string& channelID) const;
383 
390  std::map<std::string, Typename> getPublishedChannels(bool includeInternal = false) const;
391 
401  std::set<std::string> getPublishedChannelsBy(const std::string& publisherID) const;
402 
406  bool hasPublished(const std::string& publisherID, const std::string& channelID) const;
407 
414  std::list<std::pair<std::string, Typename>> getChannelsOfType(const Typename& type) const;
415 
422  template <typename T>
423  std::list<std::pair<std::string, Typename>> getChannelsOfType() const {
424  return getChannelsOfType(typeName<T>());
425  }
426 
434  bool hasSubscriber(const std::string& channelID) const;
435 
443  bool hasPublisher(const std::string& channelID, bool includeInternal) const;
444 
448  uint32 getNrPublishers(const std::string& channelID) const;
449 
453  std::set<std::string> getPublishers(const std::string& channelID,
454  bool includeInternal = false) const;
455 
459  uint32 getNrSubscribers(const std::string& channelID) const;
460 
464  std::set<std::string> getSubscribers(const std::string& channelID,
465  bool includeInternal = false) const;
466 
471  bool hasChannel(const std::string& channelID) const;
472 
477  int getTypeId(const std::string& channelID) const;
478 
482  void setTypename(const std::string& channelID, const Typename& typenam);
483 
488  Typename getTypename(const std::string& channelID) const;
489 
494  void setTypeMeta(const std::string& channelID, TypeMetaPtr meta);
495 
499  TypeMetaPtr getTypeMeta(const std::string& channelID) const;
500 
506  Time getLastSlotTime(const std::string& channelID) const;
507 
511  std::size_t getNrOfSlots(const std::string& channelID) const;
512 
517  uint64 getNrOfDataChanges(const std::string& channelID) const;
518 
519 public:
520 
525  void ensureChannel(const std::string& channelID)
526  {
527  obtainConcreteChannel<void>(channelID);
528  }
529 
535  void setStorageDuration(const std::string& channelID, const Duration& storageDuration);
536 
542  void setAutoIncreaseStorageDuration(const std::string& channelID, bool increase);
543 
549  void setMinSlots(const std::string& channelID, std::size_t minSlots);
550 
556  void setMaxSlots(const std::string& channelID, std::size_t maxSlots);
557 
559  Duration getStorageDuration(const std::string& channelID);
560 
562  bool isAutoIncreasingStorageDuration(const std::string& channelID);
563 
565  std::size_t getMinSlots(const std::string& channelID);
566 
568  std::size_t getMaxSlots(const std::string& channelID);
569 
570 public:
571 
575  void writeChannel(const std::string& channelID,
576  const json::Value& value,
577  Time time,
578  std::string frameID,
579  uint32 sequenceID);
580 
586  void writeChannel(const std::string& channelID,
587  const json::Value& value,
588  Time time = Time::now());
589 
594  void writeChannel(const std::string& channelID,
595  const json::Value& value,
596  std::string frameID,
597  Time time = Time::now());
598 
603  json::Value readChannel(const std::string& channelID);
604 
605 private:
606 
613  const AbstractChannel* getAbstractChannel(const std::string& channelID) const;
614 
621  AbstractChannelPtr getAbstractChannel(const std::string& channelID);
622 
629  AbstractChannelPtr getAbstractChannelNoLocking(const std::string& channelID);
630 
637  const AbstractChannel* getAbstractChannelNoLocking(const std::string& channelID) const;
638 
646  template<typename T>
647  ConcreteChannel<T>* obtainConcreteChannel(const std::string& channelID)
648  {
649  boost::mutex::scoped_lock lock(mChannelsMutex);
650 
651  ConcreteChannel<T>* channel = NULL;
652  try
653  {
654  channel = channel_cast<T>(getAbstractChannelNoLocking(channelID));
655  }
656  catch (XUnknownChannel&)
657  {
658  // if we reach here, the channel does not exist, so create a new one
659  channel = new ConcreteChannel<T>(channelID);
660  mChannels.insert(ChannelMap::value_type(channelID, channel));
661  }
662  return channel;
663  }
664 
667  void remotePublishChannel(const std::string& channelID, const Typename& type);
668  void remoteUnpublishChannel(const std::string& channelID);
669  void remoteSubscribeChannel(const std::string& channelID);
670  void remoteUnsubscribeChannel(const std::string& channelID);
672 
673 
674 private:
675 
676  void insertPublishedChannel(const std::string& channelID, const ChannelInfo& info);
677  void insertSubscribedChannel(const std::string& channelID, const ChannelInfo& info);
678 
679 private:
680 
681  typedef std::map<std::string, AbstractChannelPtr > ChannelMap;
682  ChannelMap mChannels;
683  mutable boost::mutex mChannelsMutex;
684 
685  class Pimpl;
686  Pimpl* p;
687 
688  mutable boost::mutex mSubscriberMutex;
689  mutable boost::mutex mPublisherMutex;
690 
691  bool mAutoPromoteChannels;
692  bool mCheckChannelRegistrations;
693 
694 };
695 
697 
698 }
699 
700 #endif
Serializer for serializing objects in JSON format.
Definition: JSONSerializer.h:93
bool operator==(const ChannelInfo &other) const
Definition: ChannelManager.h:92
Typedefs for OS independent basic data types.
specialize cv::DataType for our ImgPixel and inherit from cv::DataType<Vec>
Definition: IOService.h:67
std::string Typename
Definition: Typename.h:60
#define MIRA_LOG(level)
Use this macro to log data.
Definition: LoggingCore.h:529
bool checkChannelRegistrationsEnabled()
Query if check for channel registrations in publish<T>(...) is enabled.
Definition: ChannelManager.h:246
#define MIRA_DEFINE_SERIALIZABLE_EXCEPTION(Ex, Base)
Macro for easily defining a new serializable exception class.
Definition: Exceptions.h:66
json::Value readChannel(const std::string &channelID)
Reads data from the channel with given id.
static ResourceName makeFullyQualified(const ResourceName &name, const ResourceName &ns)
Definition: ChannelReadWrite.h:65
void serialize(const std::string &name, const T &value, const std::string &comment="")
Serializes the specified object value under the given name.
Definition: Serializer.h:204
#define MIRA_THROW(ex, msg)
Macro for throwing an exception.
Definition: Exception.h:82
ConcreteChannel< T > * publish(const std::string &channelID, const std::string &publisherID, bool internal, const Typename &type, bool noAutoPromoteCheck=false)
Publishes a channel with the given id.
Definition: ChannelManager.h:192
Wrapper class for boost::posix_time::ptime for adding more functionality to it.
Definition: Time.h:416
void enableAutoPromoteChannels(bool enable)
Enable/disable automatic promotion of channel to typed in publish<void>(...)
Definition: ChannelManager.h:235
std::set< std::string > getChannelList(bool publishedOnly=false, bool subscribedOnly=false) const
Get a list of all channels managed by this manager, can be filtered for only those that have subscrib...
RigidTransform< float, 2 > Pose2
A 2D pose consisting of a translation and rotation.
Definition: Pose.h:62
ChannelInfo()
Definition: ChannelManager.h:81
void enableCheckChannelRegistrations(bool enable)
Enable/disable check for registration of channel types in publish<T>(...) (T!=void) ...
Definition: ChannelManager.h:243
void reflect(Reflector &r)
Reflect method for serialization.
Definition: ChannelManager.h:106
Commonly used exception classes.
Includes, defines and functions for threads.
std::string id
Definition: ChannelManager.h:97
#define MIRA_FRAMEWORK_EXPORT
Definition: FrameworkExports.h:61
PropertyHint type(const std::string &t)
Sets the attribute "type" to the specified value.
Definition: PropertyHint.h:295
Definition: AbstractChannel.h:70
Use this class to represent time durations.
Definition: Time.h:104
bool autoPromoteChannelsEnabled()
Query if automatic promotion of channel to typed in publish<void>(...) is enabled.
Definition: ChannelManager.h:238
ChannelInfo(const std::string &iID, bool iInternal=false)
Definition: ChannelManager.h:83
ConcreteChannel< T > * subscribe(const std::string &channelID, const std::string &subscriberID, const Duration &storageDuration, bool internal)
Subscribes to a channel with the given id.
Definition: ChannelManager.h:271
boost::shared_ptr< TypeMeta > TypeMetaPtr
Definition: MetaSerializer.h:309
Framework channel classes.
Informations about a channel that also provides ordering of channels by id.
Definition: ChannelManager.h:79
json_spirit::mValue Value
A value is an abstract description of data in JSON (underlying data can either be one of the JSON bas...
Definition: JSON.h:176
static Time now() static Time eternity()
Returns the current utc based time.
Definition: Time.h:479
An exception that occurred whenever a channel does not exist.
Definition: ChannelManager.h:75
Typedefs for different Pose datatypes that are internally RigidTransforms.
Definition: LoggingCore.h:78
std::list< std::pair< std::string, Typename > > getChannelsOfType() const
Returns a list of all channels that match the specified type.
Definition: ChannelManager.h:423
bool operator<(const ChannelInfo &other) const
Definition: ChannelManager.h:87
void ensureChannel(const std::string &channelID)
Just make sure the specified channel exists (by creating it if needed), without changing any attribut...
Definition: ChannelManager.h:525
ConcreteChannel< T > * getConcreteChannel(const std::string &channelID)
Returns the existing channel with the given id.
Definition: ChannelManager.h:333