Using RSB to Receive and Publish Data » History » Version 8

J. Wienke, 06/30/2011 03:51 PM

1 1 J. Wienke
h1. Using RSB to Receive and Publish Data
2 1 J. Wienke
3 1 J. Wienke
This wiki page describes the first steps to integrate an existing component with RSB. We assume that this component requires data for processing, which is already available in RSB and wants to provide processed data for other components via RSB.
4 2 J. Wienke
5 2 J. Wienke
h2. Receiving Data
6 2 J. Wienke
7 5 J. Wienke
To receive data in RSB made available by other components you need to know the [[Glossary#scope|Scope]] under which the data are available, e.g. @/example/informer@ for example dummy data. Furthermore, you need to know how the data is represented. As data is transmitted over the network and should be available in multiple programming languages, this includes two aspects:
8 2 J. Wienke
9 2 J. Wienke
# which class/type represent the data in your programming language, e.g. @std::string@ is used for textual content in C++
10 2 J. Wienke
# how is the data transmitted over the network, i.e. how is your programming language class serialized into bytes and deserialized back to a class
11 2 J. Wienke
12 2 J. Wienke
While knowledge about the first item is essential to use RSB, the second point is only important slightly advanced use cases, where programming language classes will be used, which are not directly known to the RSB framework. Support for primary data types like strings is already built in (see [[Types]]).
13 2 J. Wienke
14 1 J. Wienke
In RSB terms the programming language representation is called [[Glossary#data-type|data type]] and the serialized representation is called [[Glossary#wire-schema|wire schema]].
15 3 J. Wienke
16 3 J. Wienke
So, assuming a simple component that works with a processing loop (see pseudo-code below), what are the required steps to receive data from RSB?
17 3 J. Wienke
<pre>
18 3 J. Wienke
<code class="cpp">
19 3 J. Wienke
while (true) {
20 3 J. Wienke
21 3 J. Wienke
    // receive new data
22 3 J. Wienke
23 3 J. Wienke
    // do some heavy processing with the data to generate new results
24 3 J. Wienke
25 3 J. Wienke
    // inform the world about the newly generated data
26 3 J. Wienke
27 3 J. Wienke
}
28 3 J. Wienke
</code>
29 3 J. Wienke
</pre>
30 1 J. Wienke
31 5 J. Wienke
Receiving data in RSB means _listening_ for new data published by other components, hence the "Listener":https://code.cor-lab.de/embedded/rsb/cpp/classrsb_1_1Listener.html class is used for this purpose. Creating client objects in RSB is the responsibility of a dedicated factory, hence we use this code to get a new listener:
32 5 J. Wienke
<pre>
33 5 J. Wienke
<code class="cpp">
34 5 J. Wienke
Factory &factory = Factory::getInstance();
35 5 J. Wienke
ListenerPtr listener = factory.createListener(Scope("/example/informer"));
36 5 J. Wienke
</code>
37 5 J. Wienke
</pre>
38 5 J. Wienke
39 6 J. Wienke
The Listener itself notifies interested clients asynchronously each time it receives new data in the bus. To synchronize the asynchronous reception with the main loop of our component we use a "SynchronizedQueue":https://code.cor-lab.de/embedded/rsc/classrsc_1_1threading_1_1SynchronizedQueue.html available in project:rsc. As we expect @std::string@ as data type in this example, we create a queue containing shared pointers to strings (for safe memory management).
40 6 J. Wienke
<pre>
41 6 J. Wienke
<code class="cpp">
42 6 J. Wienke
boost::shared_ptr<rsc::threading::SynchronizedQueue<boost::shared_ptr<string> > > queue(
43 6 J. Wienke
        new rsc::threading::SynchronizedQueue<boost::shared_ptr<string> >);
44 6 J. Wienke
</code>
45 6 J. Wienke
</pre>
46 6 J. Wienke
We also need to use a shared pointer to handle the queue instance as it will be passed around.
47 6 J. Wienke
48 6 J. Wienke
No we need to tell the Listener to fill the queue with new data it receives. This is the task of "QueuePushHandler":https://code.cor-lab.de/embedded/rsb/cpp/classrsb_1_1QueuePushHandler.html. An instance of this class will be installed in the listener:
49 6 J. Wienke
<pre>
50 6 J. Wienke
<code class="cpp">
51 6 J. Wienke
listener->addHandler(HandlerPtr(new QueuePushHandler<string> (queue)));
52 6 J. Wienke
</code>
53 6 J. Wienke
</pre>
54 7 J. Wienke
55 7 J. Wienke
To receive data in the main loop (first comment in the first listing) we can now try to pop elements from the queue. The operation will wait until data is available:
56 7 J. Wienke
<pre>
57 7 J. Wienke
<code class="cpp">
58 7 J. Wienke
boost::shared_ptr<string> data = queue->pop();
59 7 J. Wienke
</code>
60 7 J. Wienke
</pre>
61 8 J. Wienke
Afterwards we can process the data and generate new output which should be published for other components.
62 7 J. Wienke
63 1 J. Wienke
h2. Publishing Generated Results
64 8 J. Wienke
65 8 J. Wienke
Publishing data essentially means _informing_ other components about the data. Hence, we need an instance of "Informer":https://code.cor-lab.de/embedded/rsb/cpp/classrsb_1_1Informer.html.
66 8 J. Wienke
<pre>
67 8 J. Wienke
<code class="cpp">
68 8 J. Wienke
Informer<string>::Ptr informer = factory.createInformer<string> (Scope("/result/scope"));
69 8 J. Wienke
</code>
70 8 J. Wienke
</pre>
71 8 J. Wienke
As you can see, the Informer also needs a scope that defines at which channel on the bus the result data is available for other listeners.
72 8 J. Wienke
73 8 J. Wienke
In the main loop we can now publish generated data (also of type string):
74 8 J. Wienke
<pre>
75 8 J. Wienke
<code class="cpp">
76 8 J. Wienke
boost::shared_ptr<string> resultData(new string("heavily processed result"));
77 8 J. Wienke
informer->publish(resultData);
78 8 J. Wienke
</code>
79 8 J. Wienke
</pre>
80 8 J. Wienke
Again we use a shared pointer for safe memory management with efficient pointer handling. That's it!