0004-Add-rsb-rpc-pattern.patch

S. Barut, 09/24/2018 01:02 PM

Download (10.8 KB)

View differences:

rsb-cil-test/Program.cs
14 14

  
15 15
using Google.Protobuf;
16 16
using Google.Protobuf.Reflection;
17
using Rsb.Patterns;
17 18

  
18 19
namespace rsbciltest
19 20
{
......
119 120
            repoInformer.Deactivate();
120 121
            Console.ReadKey();
121 122
            repoListener.Deactivate();
123

  
124
            Console.WriteLine("\nRPC pattern test");
125

  
126
            RemoteServer remote = new RemoteServer(new Scope("/test/methods"));
127
            LocalServer server = new LocalServer(new Scope("/test/methods"));
128
            server.AddMethod("echo", new Caller());
129

  
130
            remote.timeout = 1000;
131
            //should have a timeout, since the LocalServer is not activated
132
            try
133
            {
134
                remote.Call<string, string>("echo", "hallo");
135
            }
136
            catch (RSBException e)
137
            {
138
                Console.WriteLine(e.Message);
139
            }
140
            
141
            server.Activate();
142
            // should work even though it is added after activation
143
            server.AddMethod("echo2", new Caller());
144

  
145
            Console.WriteLine("Echo : {0}", remote.Call<string, string>("echo", "hallo"));
146
            Console.WriteLine("Echo2: {0}", remote.Call<string, string>("echo2", "hallo"));
147

  
148
            Console.ReadKey();
149
            server.Deactivate();
122 150
        }
123 151

  
124 152
	}
153
    
154
    public class Caller : DataCallback<string, string>
155
    {
156
        public override string Invoke(string data)
157
        {
158
            return data + data;
159
        }
160
    }
125 161
}
rsb-cil/Rsb/Patterns/DataCallback.cs
1

2
namespace Rsb.Patterns
3
{
4
    public abstract class DataCallback<ReplyType, RequestType> : ICallback
5
    {
6
        public Event InternalInvoke(Event e) {
7
            
8
            if (e.Method != "REQUEST") {
9
                throw new RSBException("Request expected!");
10
            }
11
            ReplyType result = this.Invoke((RequestType)e.Data);
12

  
13
            var reply = new Rsb.Event();
14
            reply.Method = "REPLY";
15
            reply.Scope = e.Scope;
16
            reply.AddCause(e.EventId);
17
            reply.Data = result;
18

  
19
            return reply;
20
        }
21

  
22
        public abstract ReplyType Invoke(RequestType param);
23
    }
24
    
25
}
rsb-cil/Rsb/Patterns/ICallback.cs
1

2
namespace Rsb.Patterns
3
{
4
    public interface ICallback
5
    {
6
        Event InternalInvoke(Event e);
7
    }
8
}
rsb-cil/Rsb/Patterns/LocalServer.cs
1
using System;
2
using System.Collections.Generic;
3
using System.Linq;
4
using System.Text;
5
using System.Threading.Tasks;
6

  
7
namespace Rsb.Patterns
8
{
9
    public class LocalServer
10
    {
11
        private Scope scope;
12
        private Dictionary<string, Method> registeredMethods = new Dictionary<string, Method>();
13

  
14
        private bool activated = false;
15
        
16
        // forced to be created with the Factory
17
        public LocalServer(Scope scope) {
18
            this.scope = scope;
19
        }
20

  
21

  
22
        public void AddMethod(string name, ICallback callback) {
23
            if (registeredMethods.ContainsKey(name)) {
24
                throw new ArgumentException("method already registered");
25
            }
26
            Method method = new Method(scope, name, callback);
27
            registeredMethods.Add(name, method);
28
            if (activated) {
29
                method.Activate();
30
            }
31
        }
32

  
33
        public void Activate() {
34
            foreach (Method method in registeredMethods.Values) {
35
                method.Activate();
36
            }
37
            activated = true;
38
        }
39

  
40
        public void Deactivate() {
41
            foreach (Method method in registeredMethods.Values) {
42
                method.Deactivate();
43
            }
44
            activated = false;
45
        }
46
    }
47
}
rsb-cil/Rsb/Patterns/Method.cs
1

2
using System;
3

  
4
namespace Rsb.Patterns
5
{
6
    public class Method
7
    {
8
        private Informer informer;
9
        private Listener listener;
10
        private string name;
11
        public string Name { get => name; }
12

  
13
        private ICallback callback;
14

  
15
        public Method(Scope scope, string name, ICallback callback) {
16
            this.name = name;
17
            this.callback = callback;
18
            Scope work = scope.Concat(new Scope("/" + name));
19

  
20
            informer = Factory.Instance.createInformer(work);
21
            listener = Factory.Instance.createListener(work);
22

  
23
            listener.EventReceived += proceedCall;
24
        }
25

  
26
        private void proceedCall(Event theEvent)
27
        {
28
            // this is with the highest probability our own reply...
29
            if (theEvent.Method == "REPLY") {
30
                return;
31
            }
32
            // actually this would be handled in the ICallback too, but to make it more consistent...
33
            if (theEvent.Method != "REQUEST")
34
            {
35
                throw new RSBException("Request expected!");
36
            }
37

  
38
            informer.Send(callback.InternalInvoke(theEvent));
39
        }
40

  
41
        public void Activate() {
42
            informer.Activate();
43
            listener.Activate();
44
        }
45

  
46
        public void Deactivate()
47
        {
48
            informer.Deactivate();
49
            listener.Deactivate();
50
        }
51

  
52
    }
53
}
rsb-cil/Rsb/Patterns/RemoteServer.cs
1
using System;
2
using System.Threading;
3

  
4
namespace Rsb.Patterns
5
{
6
    public class RemoteServer
7
    {
8
        public int timeout = 3000; // milliseconds
9
        private Scope scope;
10

  
11
        public RemoteServer(Scope scope) {
12
            this.scope = scope;
13
        }
14

  
15
        public Event Call(string name, Event request) {
16
            Scope methodScope = scope.Concat(new Scope("/" + name));
17

  
18
            // checks if the request event is well-formed instead of well-forming the event
19
            if (!request.Method.Equals("REQUEST")) {
20
                throw new RSBException(String.Format("Request event expected! (Received: {0})", request.Method));
21
            }
22

  
23
            if (!request.Scope.Equals(methodScope)) {
24
                throw new RSBException(String.Format("Event is not the expected scope. (Expected: {0} | Got: {1})", methodScope, request.Scope));
25
            }
26

  
27
            // creating the callback for the reply
28
            Event result = new Event();
29
            object syncPrimitive = new object();
30
            NewEventHandler handler = (e) => {
31
                if (e.Method == "REPLY" && e.IsCause(request.EventId))
32
                {
33
                    result = e;
34
                    // unlocking the SYNCHRONOUS call
35
                    lock (syncPrimitive)
36
                    {
37
                        Monitor.Pulse(syncPrimitive);
38
                    }
39
                }
40
            };
41

  
42
            Informer requestInformer = Factory.Instance.createInformer(methodScope);
43
            requestInformer.Activate();
44

  
45
            Listener replyListener = Factory.Instance.createListener(methodScope);
46
            replyListener.EventReceived += handler;
47
            replyListener.Activate();
48

  
49

  
50
            bool timeout;
51
            // Locking until reply arrives (see handler above)
52
            lock (syncPrimitive)
53
            {
54
                requestInformer.Send(request);
55
                requestInformer.Deactivate();
56
                timeout = !Monitor.Wait(syncPrimitive, this.timeout);
57
            }
58
            replyListener.Deactivate();
59
            if (timeout) {
60
                throw new RSBException(String.Format("Timeout after {0} milliseconds", this.timeout));
61
            }
62
            return result;
63
        }
64

  
65
        public ReplyType Call<ReplyType, RequestType>(string name, RequestType requestData) {
66
            Event request = new Event();
67
            request.Data = requestData;
68
            request.Scope = scope.Concat(new Scope("/" + name));
69
            request.Method = "REQUEST";
70

  
71
            Event reply = this.Call(name, request);
72
            if (reply.Data.GetType().Equals(typeof(ReplyType)))
73
            {
74
                return (ReplyType)reply.Data;
75
            }
76
            else {
77
                throw new RSBException(String.Format("Unexpected Datatype! (Expected: {0} | Got: {1})", typeof(ReplyType).ToString(), reply.Data.GetType().ToString()));
78
            }   
79
        }
80
    }
81
}
rsb-cil/rsb-cil.csproj
51 51
    <Compile Include="Rsb\Converter\UInt32Converter.cs" />
52 52
    <Compile Include="Rsb\Converter\UInt64Converter.cs" />
53 53
    <Compile Include="Rsb\Factory.cs" />
54
    <Compile Include="Rsb\Patterns\DataCallback.cs" />
55
    <Compile Include="Rsb\Patterns\ICallback.cs" />
56
    <Compile Include="Rsb\Patterns\LocalServer.cs" />
57
    <Compile Include="Rsb\Patterns\Method.cs" />
58
    <Compile Include="Rsb\Patterns\RemoteServer.cs" />
54 59
    <Compile Include="Rsb\Protocol\EventId.cs" />
55 60
    <Compile Include="Rsb\Protocol\EventMetaData.cs" />
56 61
    <Compile Include="Rsb\Protocol\Notification.cs" />
57
-