Using RSB to Receive and Publish Data » History » Version 9

J. Wienke, 06/30/2011 03:53 PM

1 1 J. Wienke
h1. Using RSB to Receive and Publish Data
2 1 J. Wienke
3 9 J. Wienke
This wiki page describes the first steps to integrate an existing component with RSB. It is based on "this example":https://code.cor-lab.de/projects/rsb/repository/entry/trunk/cpp/core/examples/queue_processing/queueprocessing.cpp in C++. The concepts, however, are the same in other language implementations of RSB.
4 9 J. Wienke
5 9 J. Wienke
We assume that this component requires data for processing, which is already available in RSB and wants to provide processed data for other components via RSB.
6 2 J. Wienke
7 2 J. Wienke
h2. Receiving Data
8 2 J. Wienke
9 5 J. Wienke
To receive data in RSB made available by other components you need to know the [[Glossary#scope|Scope]] under which the data are available, e.g. @/example/informer@ for example dummy data. Furthermore, you need to know how the data is represented. As data is transmitted over the network and should be available in multiple programming languages, this includes two aspects:
10 2 J. Wienke
11 2 J. Wienke
# which class/type represent the data in your programming language, e.g. @std::string@ is used for textual content in C++
12 2 J. Wienke
# how is the data transmitted over the network, i.e. how is your programming language class serialized into bytes and deserialized back to a class
13 2 J. Wienke
14 2 J. Wienke
While knowledge about the first item is essential to use RSB, the second point is only important slightly advanced use cases, where programming language classes will be used, which are not directly known to the RSB framework. Support for primary data types like strings is already built in (see [[Types]]).
15 2 J. Wienke
16 1 J. Wienke
In RSB terms the programming language representation is called [[Glossary#data-type|data type]] and the serialized representation is called [[Glossary#wire-schema|wire schema]].
17 3 J. Wienke
18 3 J. Wienke
So, assuming a simple component that works with a processing loop (see pseudo-code below), what are the required steps to receive data from RSB?
19 3 J. Wienke
<pre>
20 3 J. Wienke
<code class="cpp">
21 3 J. Wienke
while (true) {
22 3 J. Wienke
23 3 J. Wienke
    // receive new data
24 3 J. Wienke
25 3 J. Wienke
    // do some heavy processing with the data to generate new results
26 3 J. Wienke
27 3 J. Wienke
    // inform the world about the newly generated data
28 3 J. Wienke
29 3 J. Wienke
}
30 3 J. Wienke
</code>
31 3 J. Wienke
</pre>
32 1 J. Wienke
33 5 J. Wienke
Receiving data in RSB means _listening_ for new data published by other components, hence the "Listener":https://code.cor-lab.de/embedded/rsb/cpp/classrsb_1_1Listener.html class is used for this purpose. Creating client objects in RSB is the responsibility of a dedicated factory, hence we use this code to get a new listener:
34 5 J. Wienke
<pre>
35 5 J. Wienke
<code class="cpp">
36 5 J. Wienke
Factory &factory = Factory::getInstance();
37 5 J. Wienke
ListenerPtr listener = factory.createListener(Scope("/example/informer"));
38 5 J. Wienke
</code>
39 5 J. Wienke
</pre>
40 5 J. Wienke
41 6 J. Wienke
The Listener itself notifies interested clients asynchronously each time it receives new data in the bus. To synchronize the asynchronous reception with the main loop of our component we use a "SynchronizedQueue":https://code.cor-lab.de/embedded/rsc/classrsc_1_1threading_1_1SynchronizedQueue.html available in project:rsc. As we expect @std::string@ as data type in this example, we create a queue containing shared pointers to strings (for safe memory management).
42 6 J. Wienke
<pre>
43 6 J. Wienke
<code class="cpp">
44 6 J. Wienke
boost::shared_ptr<rsc::threading::SynchronizedQueue<boost::shared_ptr<string> > > queue(
45 6 J. Wienke
        new rsc::threading::SynchronizedQueue<boost::shared_ptr<string> >);
46 6 J. Wienke
</code>
47 6 J. Wienke
</pre>
48 6 J. Wienke
We also need to use a shared pointer to handle the queue instance as it will be passed around.
49 6 J. Wienke
50 6 J. Wienke
No we need to tell the Listener to fill the queue with new data it receives. This is the task of "QueuePushHandler":https://code.cor-lab.de/embedded/rsb/cpp/classrsb_1_1QueuePushHandler.html. An instance of this class will be installed in the listener:
51 6 J. Wienke
<pre>
52 6 J. Wienke
<code class="cpp">
53 6 J. Wienke
listener->addHandler(HandlerPtr(new QueuePushHandler<string> (queue)));
54 6 J. Wienke
</code>
55 6 J. Wienke
</pre>
56 7 J. Wienke
57 7 J. Wienke
To receive data in the main loop (first comment in the first listing) we can now try to pop elements from the queue. The operation will wait until data is available:
58 7 J. Wienke
<pre>
59 7 J. Wienke
<code class="cpp">
60 7 J. Wienke
boost::shared_ptr<string> data = queue->pop();
61 7 J. Wienke
</code>
62 7 J. Wienke
</pre>
63 8 J. Wienke
Afterwards we can process the data and generate new output which should be published for other components.
64 7 J. Wienke
65 1 J. Wienke
h2. Publishing Generated Results
66 8 J. Wienke
67 8 J. Wienke
Publishing data essentially means _informing_ other components about the data. Hence, we need an instance of "Informer":https://code.cor-lab.de/embedded/rsb/cpp/classrsb_1_1Informer.html.
68 8 J. Wienke
<pre>
69 8 J. Wienke
<code class="cpp">
70 8 J. Wienke
Informer<string>::Ptr informer = factory.createInformer<string> (Scope("/result/scope"));
71 8 J. Wienke
</code>
72 8 J. Wienke
</pre>
73 8 J. Wienke
As you can see, the Informer also needs a scope that defines at which channel on the bus the result data is available for other listeners.
74 8 J. Wienke
75 8 J. Wienke
In the main loop we can now publish generated data (also of type string):
76 8 J. Wienke
<pre>
77 8 J. Wienke
<code class="cpp">
78 8 J. Wienke
boost::shared_ptr<string> resultData(new string("heavily processed result"));
79 8 J. Wienke
informer->publish(resultData);
80 8 J. Wienke
</code>
81 8 J. Wienke
</pre>
82 8 J. Wienke
Again we use a shared pointer for safe memory management with efficient pointer handling. That's it!