Using RSB to Receive and Publish Data » History » Version 11

Version 10 (J. Wienke, 06/30/2011 03:53 PM) → Version 11/12 (J. Wienke, 08/12/2011 03:47 PM)

h1. Using RSB to Receive and Publish Data

This wiki page describes the first steps to integrate an existing component with RSB. It is based on "this example":https://code.cor-lab.de/projects/rsb/repository/entry/trunk/cpp/core/examples/queue_processing/queueprocessing.cpp in C++. The concepts, however, are the same in other language implementations of RSB.

We assume that this component requires data for processing, which is already available in RSB and wants to provide processed data for other components via RSB.

h2. Receiving Data

To receive data in RSB made available by other components you need to know the [[Glossary#scope|Scope]] under which the data are available, e.g. @/example/informer@ for example dummy data. Furthermore, you need to know how the data is represented. As data is transmitted over the network and should be available in multiple programming languages, this includes two aspects:

# which class/type represent the data in your programming language, e.g. @std::string@ is used for textual content in C++
# how is the data transmitted over the network, i.e. how is your programming language class serialized into bytes and deserialized back to a class

While knowledge about the first item is essential to use RSB, the second point is only important slightly advanced use cases, where programming language classes will be used, which are not directly known to the RSB framework. Support for primary data types like strings is already built in (see [[Types]]).

In RSB terms the programming language representation is called [[Glossary#data-type|data type]] and the serialized representation is called [[Glossary#wire-schema|wire schema]].

So, assuming a simple component that works with a processing loop (see pseudo-code below), what are the required steps to receive data from RSB?
<pre>
<code class="cpp">
while (true) {

// receive new data

// do some heavy processing with the data to generate new results

// inform the world about the newly generated data

}
</code>
</pre>

Receiving data in RSB means _listening_ for new data published by other components, hence the "Listener":https://code.cor-lab.de/embedded/rsb/cpp/classrsb_1_1Listener.html class is used for this purpose. Creating client objects in RSB is the responsibility of a dedicated factory, hence we use this code to get a new listener:
<pre>
<code class="cpp">
Factory &factory = Factory::getInstance();
ListenerPtr listener = factory.createListener(Scope("/example/informer"));
</code>
</pre>

The Listener itself notifies interested clients asynchronously each time it receives new data in the bus. To synchronize the asynchronous reception with the main loop of our component we use a "SynchronizedQueue":https://code.cor-lab.de/embedded/rsc/classrsc_1_1threading_1_1SynchronizedQueue.html available in project:rsc. As we expect @std::string@ as data type in this example, we create a queue containing shared pointers to strings (for safe memory management).
<pre>
<code class="cpp">
boost::shared_ptr<rsc::threading::SynchronizedQueue<boost::shared_ptr<string> > > queue(
new rsc::threading::SynchronizedQueue<boost::shared_ptr<string> >);
</code>
</pre>
We also need to use a shared pointer to handle the queue instance as it will be passed around. Please note that the queue can also be limited in size so that only the most recent events will be kept in cases where your processing is slower than the input to the queue.

Now No we need to tell the Listener to fill the queue with new data it receives. This is the task of "QueuePushHandler":https://code.cor-lab.de/embedded/rsb/cpp/classrsb_1_1QueuePushHandler.html. An instance of this class will be installed in the listener:
<pre>
<code class="cpp">
listener->addHandler(HandlerPtr(new QueuePushHandler<string> (queue)));
</code>
</pre>

To receive data in the main loop (first comment in the first listing) we can now try to pop elements from the queue. The operation will wait until data is available:
<pre>
<code class="cpp">
boost::shared_ptr<string> data = queue->pop();
</code>
</pre>
Afterwards we can process the data and generate new output which should be published for other components.

h2. Publishing Generated Results

Publishing data essentially means _informing_ other components about the data. Hence, we need an instance of "Informer":https://code.cor-lab.de/embedded/rsb/cpp/classrsb_1_1Informer.html.
<pre>
<code class="cpp">
Informer<string>::Ptr informer = factory.createInformer<string> (Scope("/result/scope"));
</code>
</pre>
As you can see, the Informer also needs a scope that defines at which channel on the bus the result data is available for other listeners.

In the main loop we can now publish generated data (also of type string):
<pre>
<code class="cpp">
boost::shared_ptr<string> resultData(new string("heavily processed result"));
informer->publish(resultData);
</code>
</pre>
Again we use a shared pointer for safe memory management with efficient pointer handling. That's it!

Don't forget to have a look at the "full example code":https://code.cor-lab.de/projects/rsb/repository/entry/trunk/cpp/core/examples/queue_processing/queueprocessing.cpp.