rsbtest.py

Test case - D. Klotz, 01/16/2013 04:26 PM

Download (1.46 KB)

 
1
'''
2
Created on Jan 16, 2013
3

4
@author: dklotz
5
'''
6

    
7
import rsb
8
import logging
9
import time
10
import rst
11
import rstsandbox
12
from rst.vision.HeadObject_pb2 import HeadObject
13
from rst.vision.HeadObjects_pb2 import HeadObjects
14

    
15
class TestHandler:
16
    def __init__(self):
17
        self.eventCount = 0
18
        
19
    def handle(self, event):
20
        # count the number of events received
21
        self.eventCount += 1
22

    
23
if __name__ == '__main__':
24
    # Configure the logger
25
    FORMAT = '%(asctime)-15s %(levelname)s %(pathname)s:%(lineno)s -- %(message)s'
26
    logging.basicConfig(format=FORMAT)
27
    #logging.getLogger("rsb.transport.socket.BusConnection").setLevel(logging.DEBUG)
28
    
29
    # Set up the converter for the received type
30
    converter = rsb.converter.ProtocolBufferConverter(messageClass = HeadObjects)
31
    rsb.converter.registerGlobalConverter(converter)
32
    rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromDefaultSources()
33
    
34
    # Create the listener & handler
35
    listener = rsb.createListener("/nao/identities")
36
    
37
    # Create an informer
38
    informer = rsb.createInformer("/test/scope", dataType=str)
39
    
40
    handler = TestHandler()
41
    
42
    listener.addHandler(handler.handle)
43

    
44
    # Wait for events; clean up when interrupted.
45
    try:
46
        while True:
47
            print("Events received so far: %s" % handler.eventCount)
48
            informer.publishData("Foo bar!")
49
            time.sleep(1)
50
    finally:
51
        listener.deactivate()
52
        informer.deactivate()