rsbtest.py
1 |
'''
|
---|---|
2 |
Created on Jan 16, 2013
|
3 |
|
4 |
@author: dklotz
|
5 |
'''
|
6 |
|
7 |
import rsb |
8 |
import logging |
9 |
import time |
10 |
import rst |
11 |
import rstsandbox |
12 |
from rst.vision.HeadObject_pb2 import HeadObject |
13 |
from rst.vision.HeadObjects_pb2 import HeadObjects |
14 |
|
15 |
class TestHandler: |
16 |
def __init__(self): |
17 |
self.eventCount = 0 |
18 |
|
19 |
def handle(self, event): |
20 |
# count the number of events received
|
21 |
self.eventCount += 1 |
22 |
|
23 |
if __name__ == '__main__': |
24 |
# Configure the logger
|
25 |
FORMAT = '%(asctime)-15s %(levelname)s %(pathname)s:%(lineno)s -- %(message)s'
|
26 |
logging.basicConfig(format=FORMAT) |
27 |
#logging.getLogger("rsb.transport.socket.BusConnection").setLevel(logging.DEBUG)
|
28 |
|
29 |
# Set up the converter for the received type
|
30 |
converter = rsb.converter.ProtocolBufferConverter(messageClass = HeadObjects) |
31 |
rsb.converter.registerGlobalConverter(converter) |
32 |
rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromDefaultSources() |
33 |
|
34 |
# Create the listener & handler
|
35 |
listener = rsb.createListener("/nao/identities")
|
36 |
|
37 |
# Create an informer
|
38 |
informer = rsb.createInformer("/test/scope", dataType=str) |
39 |
|
40 |
handler = TestHandler() |
41 |
|
42 |
listener.addHandler(handler.handle) |
43 |
|
44 |
# Wait for events; clean up when interrupted.
|
45 |
try:
|
46 |
while True: |
47 |
print("Events received so far: %s" % handler.eventCount)
|
48 |
informer.publishData("Foo bar!")
|
49 |
time.sleep(1)
|
50 |
finally:
|
51 |
listener.deactivate() |
52 |
informer.deactivate() |