0004-Add-rsb-rpc-pattern_d138e5f.patch

S. Barut, 10/09/2018 03:24 PM

Download (24.3 KB)

View differences:

rsb-cil-test/Program.cs
4 4
using Rsb;
5 5
using Rsb.Converter;
6 6
using Rsb.Transport.Socket;
7
using Rsb.Patterns;
7 8

  
8 9
using Rst.Kinematics;
9 10
using log4net.Config;
......
36 37
        static void Main(String[] args)
37 38
        {
38 39

  
39
            BasicConfigurator.Configure();
40
            //BasicConfigurator.Configure();
40 41

  
41 42
            var inConverters = buildConverterSelection(new List<Tuple<String, IConverter>> {
42 43
                new Tuple<string, IConverter>("bool",                                new BoolConverter()                                ),
......
123 124
            repoInformer.Deactivate();
124 125
            Console.ReadKey();
125 126
            repoListener.Deactivate();
127

  
128
            Console.WriteLine("\nRPC pattern test");
129

  
130
            RemoteServer remote = Factory.Instance.CreateRemoteServer("/test/methods", 1000);
131
            remote.Activate();
132

  
133
            LocalServer server = Factory.Instance.CreateLocalServer("/test/methods");
134
            server.AddMethod("echo", new EchoCall());
135

  
136
            //should have a timeout, since the LocalServer is not activated
137
            try
138
            {
139
                remote.Call<string, string>("echo", "hallo");
140
            }
141
            catch (RSBException e)
142
            {
143
                Console.WriteLine(e.Message);
144
            }
145
            
146
            server.Activate();
147
            // should work even though it is added after activation
148
            server.AddMethod("echo2", new EchoCall());
149

  
150
            Console.WriteLine("Called by Echo - {0}", remote.Call<string, string>("echo", "hallo"));
151
            Console.WriteLine("Called by Echo2- {0}", remote.Call<string, string>("echo2", "hallo"));
152

  
153
            server.AddMethod("voidvoid", new VoidVoidCall());
154
            server.AddMethod("voidtype", new VoidTypeCall());
155
            server.AddMethod("typevoid", new TypeVoidCall());
156

  
157
            Console.WriteLine("Called by typevoid- {0}", remote.Call<string>("typevoid"));
158
            remote.Call<string>("voidtype", "typevoid-echo");
159
            remote.Call("voidvoid");
160

  
161
            // do some asynchronous calls
162
            var futureEcho = remote.CallAsync<string,string>("echo", "Async");
163
            remote.CallAsync("voidvoid");
164
            var futureType = remote.CallAsync<string>("typevoid");
165

  
166
            Console.WriteLine("Press any Key to get results...");
167
            Console.ReadKey();
168
            Console.WriteLine();
169

  
170
            Console.WriteLine("Answer from typevoid: {0}", futureType.Get());
171
            Console.WriteLine("Answer from echo    : {0}", futureEcho.Get());
172

  
173
            Console.ReadKey();
174
            Console.WriteLine();
175

  
176
            server.AddMethod("exception", new ExceptionCall());
177

  
178
            // Should not throw an exception yet
179
            var futureInt = remote.CallAsync<int>("exception");
180
            try
181
            {
182
                int output = remote.Call<int>("exception");
183
            }
184
            catch (RSBErrorReplyException e)
185
            {
186
                Console.WriteLine("RSB Reply: " + e.Message);
187
            }
188
            try
189
            {
190
                futureInt.Get();
191
            }
192
            catch (RSBErrorReplyException e)
193
            {
194
                Console.WriteLine("RSB Reply: " + e.Message);
195
            }
196
            Console.ReadKey();
197

  
198
            server.Deactivate();
199
            remote.Deactivate();
200
        }
201

  
202
	}
203

  
204
    public class ExceptionCall : DataCallback<int, Null>
205
    {
206
        public override int Invoke(Null param)
207
        {
208
            throw new NotImplementedException("Won't be implemented!");
209
        }
210
    }
211

  
212
    public class VoidVoidCall : DataCallback<Null, Null>
213
    {
214
        public override Null Invoke(Null param)
215
        {
216
            Console.WriteLine("Invoked VoidVoid Call");
217
            return Null.Instance;
218
        }
219
    }
220

  
221
    public class TypeVoidCall : DataCallback<string, Null>
222
    {
223
        public override string Invoke(Null param)
224
        {
225
            return "TypeVoid Call Invoked!";
126 226
        }
227
    }
127 228

  
229
    public class VoidTypeCall : DataCallback<Null, string>
230
    {
231
        public override Null Invoke(string param)
232
        {
233
            Console.WriteLine("Invoked with: {0}", param);
234
            return Null.Instance;
235
        }
236
    }
237

  
238
    public class EchoCall : DataCallback<string, string>
239
    {
240
        public override string Invoke(string data)
241
        {
242
            return "Echo: " + data;
243
        }
128 244
    }
129 245
}
rsb-cil/RSBErrorReplyException.cs
1
using System;
2
using System.Collections.Generic;
3
using System.Linq;
4
using System.Text;
5
using System.Threading.Tasks;
6

  
7
namespace Rsb
8
{
9
    public class RSBErrorReplyException : Exception
10
    {
11
        public RSBErrorReplyException()
12
        {
13
        }
14

  
15
        public RSBErrorReplyException(string message)
16
        : base(message)
17
        {
18
        }
19

  
20
        public RSBErrorReplyException(string message, Exception inner)
21
        : base(message, inner)
22
        {
23
        }
24
    }
25
}
rsb-cil/Rsb/Factory.cs
1 1
using Rsb.Transport.Socket;
2 2
using Rsb.Converter;
3
using Rsb.Patterns;
3 4

  
4 5
namespace Rsb
5 6
{
......
55 56
            return CreateListener(new Scope(scope));
56 57
        }
57 58

  
59
        public RemoteServer CreateRemoteServer(Scope scope, int timeout = 3000)
60
        {
61
            var server = new RemoteServer(scope)
62
            {
63
                Timeout = timeout
64
            };
65
            return server;
66
        }
67

  
68
        public RemoteServer CreateRemoteServer(string scope, int timeout = 3000)
69
        {
70
            return CreateRemoteServer(new Scope(scope), timeout);
71
        }
72

  
73
        public LocalServer CreateLocalServer(Scope scope)
74
        {
75
            var server = new LocalServer(scope);
76
            return server;
77
        }
78

  
79
        public LocalServer CreateLocalServer(string scope)
80
        {
81
            return CreateLocalServer(new Scope(scope));
82
        }
83

  
58 84
    }
59 85
}
rsb-cil/Rsb/Patterns/DataCallback.cs
1

2
using System;
3

  
4
namespace Rsb.Patterns
5
{
6
    public abstract class DataCallback<ReplyType, RequestType> : ICallback
7
    {
8
        public Event InternalInvoke(Event e) {
9

  
10
            // reply preperation
11
            var reply = new Rsb.Event();
12
            reply.Method = "REPLY";
13
            reply.Scope = e.Scope;
14
            reply.AddCause(e.EventId);
15

  
16
            try
17
            {
18
                ReplyType result = this.Invoke((RequestType)e.Data);
19
                reply.Data = result;
20
            }
21
            catch (Exception exception) {
22
                reply.Data = exception.GetType().ToString() + ": " + exception.Message;
23
                reply.MetaData.SetUserInfo("rsb:error?", "rsb:error?");
24
            }            
25

  
26
            return reply;
27
        }
28

  
29
        public abstract ReplyType Invoke(RequestType param);
30
    }
31
    
32
}
rsb-cil/Rsb/Patterns/ICallback.cs
1

2
namespace Rsb.Patterns
3
{
4
    public interface ICallback
5
    {
6
        Event InternalInvoke(Event e);
7
    }
8
}
rsb-cil/Rsb/Patterns/LocalMethod.cs
1

2
using System;
3

  
4
namespace Rsb.Patterns
5
{
6
    public class LocalMethod
7
    {
8
        private Informer informer;
9
        private Listener listener;
10
        private string name;
11
        public string Name { get => name; }
12

  
13
        private ICallback callback;
14

  
15
        public LocalMethod(Scope scope, string name, ICallback callback) {
16
            this.name = name;
17
            this.callback = callback;
18
            Scope work = scope.Concat(new Scope("/" + name));
19

  
20
            informer = Factory.Instance.CreateInformer(work);
21
            listener = Factory.Instance.CreateListener(work);
22

  
23
            listener.EventReceived += RemoteCallHandler;
24
        }
25

  
26
        private void RemoteCallHandler(Event theEvent)
27
        {
28
            // this does not interest us. possible our own reply...
29
            if (theEvent.Method == "REPLY") {
30
                return;
31
            }
32

  
33
            // usual informer publish (or something else not specified at the moment) -> ignore
34
            if (theEvent.Method != "REQUEST")
35
            {
36
                return;
37
            }
38

  
39
            informer.Send(callback.InternalInvoke(theEvent));
40
        }
41

  
42
        public void Activate() {
43
            informer.Activate();
44
            listener.Activate();
45
        }
46

  
47
        public void Deactivate()
48
        {
49
            informer.Deactivate();
50
            listener.Deactivate();
51
        }
52

  
53
    }
54
}
rsb-cil/Rsb/Patterns/LocalServer.cs
1
using System;
2
using System.Collections.Generic;
3

  
4
namespace Rsb.Patterns
5
{
6
    public class LocalServer
7
    {
8
        private Scope scope;
9
        private Dictionary<string, LocalMethod> registeredMethods = new Dictionary<string, LocalMethod>();
10

  
11
        public bool IsActive {
12
            get; private set;
13
        }
14
        
15
        public LocalServer(Scope scope) {
16
            this.scope = scope;
17
            IsActive = false;
18
        }
19

  
20

  
21
        public void AddMethod(string name, ICallback callback) {
22
            if (registeredMethods.ContainsKey(name)) {
23
                throw new ArgumentException("method already registered");
24
            }
25
            LocalMethod method = new LocalMethod(scope, name, callback);
26
            registeredMethods.Add(name, method);
27
            if (IsActive) {
28
                method.Activate();
29
            }
30
        }
31

  
32
        public void Activate() {
33
            foreach (LocalMethod method in registeredMethods.Values) {
34
                method.Activate();
35
            }
36
            IsActive = true;
37
        }
38

  
39
        public void Deactivate() {
40
            foreach (LocalMethod method in registeredMethods.Values) {
41
                method.Deactivate();
42
            }
43
            IsActive = false;
44
        }
45
    }
46
}
rsb-cil/Rsb/Patterns/RemoteMethod.cs
1
using System;
2
using System.Collections.Generic;
3
using System.Threading;
4
using System.Threading.Tasks;
5
using System.Linq;
6
using Rsb;
7
using System.Reflection;
8

  
9
namespace Rsb.Patterns
10
{
11
    public class RemoteMethod
12
    {
13
        Informer requestInformer;
14
        Listener replyListener;
15

  
16
        object sync = new object();
17

  
18
        public bool IsActive { get; private set; }
19

  
20
        //requesting ID and reply event
21
        List<Tuple<EventId, TaskCompletionSource<Event>, bool>> procedureCalls = new List<Tuple<EventId, TaskCompletionSource<Event>, bool>>();
22

  
23
        public RemoteMethod(Scope scope, int timeout) {
24
            requestInformer = Factory.Instance.CreateInformer(scope);
25
            replyListener = Factory.Instance.CreateListener(scope);
26
            replyListener.EventReceived += ListenerHandler;
27

  
28
            IsActive = false;
29
        }
30

  
31
        private void ListenerHandler(Event e) {
32
            if (e.Method == "REPLY")
33
            {
34
                EventId cause = null;
35
                TaskCompletionSource<Event> tcs = null;
36
                bool sendData = false;
37

  
38
                // find causing call
39
                // TODO sbarut: clean zombie calls
40
                foreach (var call in procedureCalls) {
41
                    if (e.IsCause(call.Item1)) {
42
                        cause = call.Item1;
43
                        tcs = call.Item2;
44
                        sendData = call.Item3;
45
                        break;
46
                    }
47
                }
48

  
49
                if (cause == null) {
50
                    throw new ArgumentException("Got Reply for a non-existing Cause");
51
                }
52

  
53
                tcs.SetResult(e);
54

  
55
                // TODO sbarut: Maybe use less Reflection?
56
                /*
57
                Type tcsType = tcs.GetType();
58
                var method = tcsType.GetMethod("SetResult");
59
                if (sendData)
60
                {
61
                    method.Invoke(tcs, new object[] { e.Data });
62
                }
63
                else
64
                {
65
                    method.Invoke(tcs, new object[] { e });
66
                }*/
67
            }
68
        }
69

  
70
        public Task<Event> Call(Event request)
71
        {
72
            if (!IsActive)
73
            {
74
                throw new InvalidOperationException("Participant not active");
75
            }
76

  
77
            TaskCompletionSource<Event> taskController = new TaskCompletionSource<Event>();
78
            procedureCalls.Add(new Tuple<EventId, TaskCompletionSource<Event>, bool>(request.EventId, taskController, true));
79

  
80
            requestInformer.Send(request);            
81

  
82
            return taskController.Task;
83
        }
84
        
85
        public void Activate() {
86
            requestInformer.Activate();
87
            replyListener.Activate();
88
            IsActive = true;
89
        }
90

  
91
        public void Deactivate() {
92
            requestInformer.Deactivate();
93
            replyListener.Deactivate();
94
            IsActive = false;
95
        }
96
    }
97

  
98
}
rsb-cil/Rsb/Patterns/RemoteServer.cs
1
using Rsb.Converter;
2
using System;
3
using System.Collections.Generic;
4
using System.Threading.Tasks;
5

  
6
namespace Rsb.Patterns
7
{
8
    public class RemoteServer
9
    {
10
        public int Timeout = 3000; // milliseconds
11
        private Scope scope;
12

  
13
        public bool IsActive { get; private set; }
14

  
15
        Dictionary<string, RemoteMethod> connections = new Dictionary<string, RemoteMethod>();
16

  
17
        public RemoteServer(Scope scope) {
18
            this.scope = scope;
19
            IsActive = false;
20
        }
21

  
22
        // Synchronous void,void Call
23
        public void Call(string name) {
24
            Call<Null, Null>(name, Null.Instance);
25
        }
26

  
27
        // Synchronous Type,void Call
28
        public ReplyType Call<ReplyType>(string name) {
29
            return Call<ReplyType, Null>(name, Null.Instance);
30
        }
31

  
32
        // Synchronous void,Type Call
33
        public void Call<RequestType>(string name, RequestType requestData) {
34
            Call<Null, RequestType>(name, requestData);
35
        }
36

  
37
        // Synchronous Type,Type Call
38
        public ReplyType Call<ReplyType, RequestType>(string name, RequestType requestData) {
39
            try
40
            {
41
                ReplyType reply = this.CallAsync<ReplyType,RequestType>(name, requestData).Get(Timeout);
42
                return reply;
43
            }
44
            catch (AggregateException ae) {
45
                ae.Handle((x) =>
46
                {
47
                    if (x is RSBErrorReplyException)
48
                    {
49
                        throw x;
50
                    }
51
                    return false;
52
                });
53
            }
54
            return default(ReplyType);
55
        }
56

  
57
        // Generic Synchronous Event Call - Beware: no Exception throwing (the user should do it on his own as he/she has access to the Event Metadata)
58
        public Event Call(string name, Event request)
59
        {
60
            return CallAsync(name, request).Get(Timeout);
61
        }
62

  
63
        // Asynchronous void,void Call
64
        public Task CallAsync(string name) {
65
            // the void,Type call not the Type,Type call! 
66
            // the difference is in the returned non-returning Task!
67
            return CallAsync(name, Null.Instance); 
68
        }
69

  
70
        // Asynchronous Type,void Call
71
        public Task<ReplyType> CallAsync<ReplyType>(string name) {
72
            return CallAsync<ReplyType, Null>(name, Null.Instance);
73
        }
74

  
75
        // Asynchronous void,Type Call
76
        public Task CallAsync<RequestType>(string name, RequestType requestData) {
77
            var callTask = CallAsync<Null, RequestType>(name, requestData);
78
            Task waiter = Task.Run(() =>
79
            {
80
                callTask.Wait();
81
            });
82
            return waiter;
83
        }
84

  
85
        // Asynchronous Type,Type Call
86
        public Task<ReplyType> CallAsync<ReplyType, RequestType>(string name, RequestType requestData)
87
        {
88
            Event request = new Event();
89
            request.Data = requestData;
90
            request.Scope = scope.Concat(new Scope("/" + name));
91
            request.Method = "REQUEST";
92

  
93
            Task<Event> eventTask = CallAsync(name, request);
94

  
95
            Task<ReplyType> castTask = Task.Run(() =>
96
            {
97
                eventTask.Wait();
98
                Event result = eventTask.Result;
99

  
100
                if (result.MetaData.HasUserInfo("rsb:error?")) {
101
                    throw new RSBErrorReplyException((String)result.Data);
102
                }
103

  
104
                if (result.Data.GetType() != typeof(ReplyType)) {
105
                    throw new RSBException(String.Format("Unexpected Datatype! (Expected: {0} | Got: {1})", typeof(ReplyType).ToString(), result.Data.GetType().ToString()));
106
                }
107
                return (ReplyType)result.Data;
108
            });
109

  
110
            return castTask;
111
        }
112

  
113
        // Generic Asynchronous Event Call - Beware: no Exception throwing (the user should do it on his own as he/she has access to the Event Metadata)
114
        public Task<Event> CallAsync(string name, Event request) {
115
            Scope methodScope = scope.Concat(new Scope("/" + name));
116

  
117
            // checks if the request event is well-formed instead of well-forming the event
118
            if (!request.Method.Equals("REQUEST"))
119
            {
120
                throw new RSBException(String.Format("Request event expected! (Received: {0})", request.Method));
121
            }
122

  
123
            if (!request.Scope.Equals(methodScope))
124
            {
125
                throw new RSBException(String.Format("Event has not the expected scope. (Expected: {0} | Got: {1})", methodScope, request.Scope));
126
            }
127

  
128
            RemoteMethod method;
129

  
130
            if (connections.ContainsKey(name))
131
            {
132
                method = connections[name];
133
            }
134
            else
135
            {
136
                method = new RemoteMethod(methodScope, Timeout);
137
                connections.Add(name, method);
138
                if (IsActive)
139
                {
140
                    method.Activate();
141
                }
142
            }
143

  
144
            return method.Call(request);
145
        }
146

  
147
        public void Activate() {
148
            foreach (var remoteMethod in connections.Values) {
149
                remoteMethod.Activate();
150
            }
151
            IsActive = true;
152
        }
153

  
154
        public void Deactivate() {
155
            foreach (var remoteMethod in connections.Values) {
156
                remoteMethod.Deactivate();
157
            }
158
            IsActive = false;
159
        }
160
    }
161
}
rsb-cil/Rsb/Patterns/TaskExtension.cs
1
using System;
2
using System.Threading.Tasks;
3

  
4
namespace Rsb.Patterns
5
{
6
    // Used to make Java Future like Get methods for Task<> types. Nice C# feature btw. 
7
    public static class TaskExtension
8
    {
9
        public static ReplyType Get<ReplyType>(this Task<ReplyType> task, int timeout)
10
        {
11
            bool timeoutFlag = false;
12
            try
13
            {
14
                timeoutFlag = !task.Wait(timeout);
15
            }
16
            catch (AggregateException ae)
17
            {
18
                ae.Handle((x) =>
19
                {
20
                    if (x is RSBErrorReplyException)
21
                    {
22
                        throw x;
23
                    }
24
                    return false;
25
                });
26
            }
27

  
28
            if (timeoutFlag)
29
            {
30
                throw new RSBException("Timeout while waiting for a RPC reply.");
31
            }
32
            return task.Result;
33
        }
34

  
35
        public static ReplyType Get<ReplyType>(this Task<ReplyType> task)
36
        {
37
            try
38
            {
39
                task.Wait();
40
            }
41
            catch (AggregateException ae)
42
            {
43
                ae.Handle((x) =>
44
                {
45
                    if (x is RSBErrorReplyException)
46
                    {
47
                        throw x;
48
                    }
49
                    return false;
50
                });
51
            }
52
            return task.Result;
53
        }
54
    }
55
}
rsb-cil/rsb-cil.csproj
40 40
  </ItemGroup>
41 41
  <ItemGroup>
42 42
    <Compile Include="Properties\AssemblyInfo.cs" />
43
    <Compile Include="RSBErrorReplyException.cs" />
43 44
    <Compile Include="Rsb\Converter\ByteConverter.cs" />
44 45
    <Compile Include="Rsb\Converter\ConverterSignature.cs" />
45 46
    <Compile Include="Rsb\Converter\DefaultConverterRepository.cs" />
......
52 53
    <Compile Include="Rsb\Converter\UInt32Converter.cs" />
53 54
    <Compile Include="Rsb\Converter\UInt64Converter.cs" />
54 55
    <Compile Include="Rsb\Factory.cs" />
56
    <Compile Include="Rsb\Patterns\DataCallback.cs" />
57
    <Compile Include="Rsb\Patterns\ICallback.cs" />
58
    <Compile Include="Rsb\Patterns\LocalServer.cs" />
59
    <Compile Include="Rsb\Patterns\LocalMethod.cs" />
60
    <Compile Include="Rsb\Patterns\RemoteMethod.cs" />
61
    <Compile Include="Rsb\Patterns\RemoteServer.cs" />
62
    <Compile Include="Rsb\Patterns\TaskExtension.cs" />
55 63
    <Compile Include="Rsb\Protocol\EventId.cs" />
56 64
    <Compile Include="Rsb\Protocol\EventMetaData.cs" />
57 65
    <Compile Include="Rsb\Protocol\Notification.cs" />
......
89 97
      </Properties>
90 98
    </MonoDevelop>
91 99
  </ProjectExtensions>
92
</Project>
100
</Project>
93
-