Using RSB to Receive and Publish Data » History » Version 10

« Previous - Version 10/12 (diff) - Next » - Current version
J. Wienke, 06/30/2011 03:53 PM


Using RSB to Receive and Publish Data

This wiki page describes the first steps to integrate an existing component with RSB. It is based on this example in C++. The concepts, however, are the same in other language implementations of RSB.

We assume that this component requires data for processing, which is already available in RSB and wants to provide processed data for other components via RSB.

Receiving Data

To receive data in RSB made available by other components you need to know the Scope under which the data are available, e.g. /example/informer for example dummy data. Furthermore, you need to know how the data is represented. As data is transmitted over the network and should be available in multiple programming languages, this includes two aspects:

  1. which class/type represent the data in your programming language, e.g. std::string is used for textual content in C++
  2. how is the data transmitted over the network, i.e. how is your programming language class serialized into bytes and deserialized back to a class

While knowledge about the first item is essential to use RSB, the second point is only important slightly advanced use cases, where programming language classes will be used, which are not directly known to the RSB framework. Support for primary data types like strings is already built in (see Types).

In RSB terms the programming language representation is called data type and the serialized representation is called wire schema.

So, assuming a simple component that works with a processing loop (see pseudo-code below), what are the required steps to receive data from RSB?

while (true) {

    // receive new data

    // do some heavy processing with the data to generate new results

    // inform the world about the newly generated data

}

Receiving data in RSB means listening for new data published by other components, hence the Listener class is used for this purpose. Creating client objects in RSB is the responsibility of a dedicated factory, hence we use this code to get a new listener:

Factory &factory = Factory::getInstance();
ListenerPtr listener = factory.createListener(Scope("/example/informer"));

The Listener itself notifies interested clients asynchronously each time it receives new data in the bus. To synchronize the asynchronous reception with the main loop of our component we use a SynchronizedQueue available in Robotics Systems Commons. As we expect std::string as data type in this example, we create a queue containing shared pointers to strings (for safe memory management).

boost::shared_ptr<rsc::threading::SynchronizedQueue<boost::shared_ptr<string> > > queue(
        new rsc::threading::SynchronizedQueue<boost::shared_ptr<string> >);


We also need to use a shared pointer to handle the queue instance as it will be passed around.

No we need to tell the Listener to fill the queue with new data it receives. This is the task of QueuePushHandler. An instance of this class will be installed in the listener:

listener->addHandler(HandlerPtr(new QueuePushHandler<string> (queue)));

To receive data in the main loop (first comment in the first listing) we can now try to pop elements from the queue. The operation will wait until data is available:

boost::shared_ptr<string> data = queue->pop();


Afterwards we can process the data and generate new output which should be published for other components.

Publishing Generated Results

Publishing data essentially means informing other components about the data. Hence, we need an instance of Informer.

Informer<string>::Ptr informer = factory.createInformer<string> (Scope("/result/scope"));


As you can see, the Informer also needs a scope that defines at which channel on the bus the result data is available for other listeners.

In the main loop we can now publish generated data (also of type string):

boost::shared_ptr<string> resultData(new string("heavily processed result"));
informer->publish(resultData);


Again we use a shared pointer for safe memory management with efficient pointer handling. That's it!

Don't forget to have a look at the full example code.