From c9746564cfe6e17ef90c99e67223ceebae89af8f Mon Sep 17 00:00:00 2001 From: Sinan Date: Mon, 24 Sep 2018 12:48:07 +0200 Subject: [PATCH 4/4] Add rsb rpc pattern Asynchronous calls are not implemented! --- rsb-cil-test/Program.cs | 44 +++++++++++++++++++ rsb-cil/Rsb/Factory.cs | 25 +++++++++++ rsb-cil/Rsb/Patterns/DataCallback.cs | 25 +++++++++++ rsb-cil/Rsb/Patterns/ICallback.cs | 8 ++++ rsb-cil/Rsb/Patterns/LocalServer.cs | 49 +++++++++++++++++++++ rsb-cil/Rsb/Patterns/Method.cs | 53 +++++++++++++++++++++++ rsb-cil/Rsb/Patterns/RemoteMethod.cs | 82 ++++++++++++++++++++++++++++++++++++ rsb-cil/Rsb/Patterns/RemoteServer.cs | 80 +++++++++++++++++++++++++++++++++++ rsb-cil/rsb-cil.csproj | 6 +++ 9 files changed, 372 insertions(+) create mode 100644 rsb-cil/Rsb/Patterns/DataCallback.cs create mode 100644 rsb-cil/Rsb/Patterns/ICallback.cs create mode 100644 rsb-cil/Rsb/Patterns/LocalServer.cs create mode 100644 rsb-cil/Rsb/Patterns/Method.cs create mode 100644 rsb-cil/Rsb/Patterns/RemoteMethod.cs create mode 100644 rsb-cil/Rsb/Patterns/RemoteServer.cs diff --git a/rsb-cil-test/Program.cs b/rsb-cil-test/Program.cs index 6d0f0a6..091da66 100644 --- a/rsb-cil-test/Program.cs +++ b/rsb-cil-test/Program.cs @@ -4,6 +4,7 @@ using System.Collections.Generic; using Rsb; using Rsb.Converter; using Rsb.Transport.Socket; +using Rsb.Patterns; using Rst.Kinematics; using log4net.Config; @@ -122,7 +123,50 @@ namespace rsbciltest repoInformer.Deactivate(); Console.ReadKey(); repoListener.Deactivate(); + + Console.WriteLine("\nRPC pattern test"); + + RemoteServer remote = new RemoteServer(new Scope("/test/methods")); + remote.Activate(); + remote.timeout = 1000; + + LocalServer server = new LocalServer(new Scope("/test/methods")); + server.AddMethod("echo", new Caller()); + + //should have a timeout, since the LocalServer is not activated + try + { + remote.Call("echo", "hallo"); + } + catch (RSBException e) + { + Console.WriteLine(e.Message); + } + + server.Activate(); + // should work even though it is added after activation + server.AddMethod("echo2", new Caller()); + + Console.WriteLine("Echo : {0}", remote.Call("echo", "hallo")); + Console.WriteLine("Echo2: {0}", remote.Call("echo2", "hallo")); + + Console.ReadKey(); + for (int i = 0; i < 20; i++) { + Console.WriteLine("{0}: {1}", i, remote.Call("echo", "hallo")); + } + + Console.ReadKey(); + server.Deactivate(); + remote.Deactivate(); } + } + + public class Caller : DataCallback + { + public override string Invoke(string data) + { + return data + data; + } } } diff --git a/rsb-cil/Rsb/Factory.cs b/rsb-cil/Rsb/Factory.cs index 43edae6..64a8557 100644 --- a/rsb-cil/Rsb/Factory.cs +++ b/rsb-cil/Rsb/Factory.cs @@ -113,5 +113,30 @@ namespace Rsb return CreateListener(new Scope(scope)); } + public RemoteServer CreateRemoteServer(Scope scope, int timeout = 3000) + { + var server = new RemoteServer(scope) + { + Timeout = timeout + }; + return server; + } + + public RemoteServer CreateRemoteServer(string scope, int timeout = 3000) + { + return CreateRemoteServer(new Scope(scope), timeout); + } + + public LocalServer CreateLocalServer(Scope scope) + { + var server = new LocalServer(scope); + return server; + } + + public LocalServer CreateLocalServer(string scope) + { + return CreateLocalServer(new Scope(scope)); + } + } } diff --git a/rsb-cil/Rsb/Patterns/DataCallback.cs b/rsb-cil/Rsb/Patterns/DataCallback.cs new file mode 100644 index 0000000..31cc3be --- /dev/null +++ b/rsb-cil/Rsb/Patterns/DataCallback.cs @@ -0,0 +1,25 @@ + +namespace Rsb.Patterns +{ + public abstract class DataCallback : ICallback + { + public Event InternalInvoke(Event e) { + + if (e.Method != "REQUEST") { + throw new RSBException("Request expected!"); + } + ReplyType result = this.Invoke((RequestType)e.Data); + + var reply = new Rsb.Event(); + reply.Method = "REPLY"; + reply.Scope = e.Scope; + reply.AddCause(e.EventId); + reply.Data = result; + + return reply; + } + + public abstract ReplyType Invoke(RequestType param); + } + +} diff --git a/rsb-cil/Rsb/Patterns/ICallback.cs b/rsb-cil/Rsb/Patterns/ICallback.cs new file mode 100644 index 0000000..1d10228 --- /dev/null +++ b/rsb-cil/Rsb/Patterns/ICallback.cs @@ -0,0 +1,8 @@ + +namespace Rsb.Patterns +{ + public interface ICallback + { + Event InternalInvoke(Event e); + } +} diff --git a/rsb-cil/Rsb/Patterns/LocalServer.cs b/rsb-cil/Rsb/Patterns/LocalServer.cs new file mode 100644 index 0000000..4e8bc12 --- /dev/null +++ b/rsb-cil/Rsb/Patterns/LocalServer.cs @@ -0,0 +1,49 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Rsb.Patterns +{ + public class LocalServer + { + private Scope scope; + private Dictionary registeredMethods = new Dictionary(); + + public bool IsActive { + get; private set; + } + + public LocalServer(Scope scope) { + this.scope = scope; + IsActive = false; + } + + + public void AddMethod(string name, ICallback callback) { + if (registeredMethods.ContainsKey(name)) { + throw new ArgumentException("method already registered"); + } + Method method = new Method(scope, name, callback); + registeredMethods.Add(name, method); + if (IsActive) { + method.Activate(); + } + } + + public void Activate() { + foreach (Method method in registeredMethods.Values) { + method.Activate(); + } + IsActive = true; + } + + public void Deactivate() { + foreach (Method method in registeredMethods.Values) { + method.Deactivate(); + } + IsActive = false; + } + } +} diff --git a/rsb-cil/Rsb/Patterns/Method.cs b/rsb-cil/Rsb/Patterns/Method.cs new file mode 100644 index 0000000..befcedf --- /dev/null +++ b/rsb-cil/Rsb/Patterns/Method.cs @@ -0,0 +1,53 @@ + +using System; + +namespace Rsb.Patterns +{ + public class Method + { + private Informer informer; + private Listener listener; + private string name; + public string Name { get => name; } + + private ICallback callback; + + public Method(Scope scope, string name, ICallback callback) { + this.name = name; + this.callback = callback; + Scope work = scope.Concat(new Scope("/" + name)); + + informer = Factory.Instance.createInformer(work); + listener = Factory.Instance.createListener(work); + + listener.EventReceived += proceedCall; + } + + private void proceedCall(Event theEvent) + { + // this is with the highest probability our own reply... + if (theEvent.Method == "REPLY") { + return; + } + // actually this would be handled in the ICallback too, but to make it more consistent... + if (theEvent.Method != "REQUEST") + { + throw new RSBException("Request expected!"); + } + + informer.Send(callback.InternalInvoke(theEvent)); + } + + public void Activate() { + informer.Activate(); + listener.Activate(); + } + + public void Deactivate() + { + informer.Deactivate(); + listener.Deactivate(); + } + + } +} diff --git a/rsb-cil/Rsb/Patterns/RemoteMethod.cs b/rsb-cil/Rsb/Patterns/RemoteMethod.cs new file mode 100644 index 0000000..677a182 --- /dev/null +++ b/rsb-cil/Rsb/Patterns/RemoteMethod.cs @@ -0,0 +1,82 @@ +using System; +using System.Threading; +using Rsb; + +namespace Rsb.Patterns +{ + public class RemoteMethod + { + Informer requestInformer; + Listener replyListener; + int timeout; + + object syncPrimitive; + EventId requestId; + Event result; + + public bool IsActive { get; private set; } + + public RemoteMethod(Scope scope, int timeout) { + requestInformer = Factory.Instance.createInformer(scope); + replyListener = Factory.Instance.createListener(scope); + replyListener.EventReceived += handler; + + this.timeout = timeout; + + syncPrimitive = new object(); + IsActive = false; + } + + private void handler(Event e) { + if (requestId == null) { + //maybe exception but actually we can just ignore... + return; + } + + if (e.Method == "REPLY" && e.IsCause(requestId)) + { + + // unlocking the SYNCHRONOUS call + lock (syncPrimitive) + { + result = e; + requestId = null; + Monitor.Pulse(syncPrimitive); + } + } + } + + public Event Call(Event request) { + if (!IsActive) { + throw new InvalidOperationException("Participant not active"); + } + + bool timeout; + // Locking until reply arrives (see handler above) + lock (syncPrimitive) + { + requestId = request.EventId; + requestInformer.Send(request); + timeout = !Monitor.Wait(syncPrimitive, this.timeout); + } + + if (timeout) + { + throw new RSBException(String.Format("Timeout after {0} milliseconds", this.timeout)); + } + return result; + } + + public void Activate() { + requestInformer.Activate(); + replyListener.Activate(); + IsActive = true; + } + + public void Deactivate() { + requestInformer.Deactivate(); + replyListener.Deactivate(); + IsActive = false; + } + } +} diff --git a/rsb-cil/Rsb/Patterns/RemoteServer.cs b/rsb-cil/Rsb/Patterns/RemoteServer.cs new file mode 100644 index 0000000..c00f69b --- /dev/null +++ b/rsb-cil/Rsb/Patterns/RemoteServer.cs @@ -0,0 +1,80 @@ +using System; +using System.Collections.Generic; +using System.Threading; + +namespace Rsb.Patterns +{ + public class RemoteServer + { + public int Timeout = 3000; // milliseconds + private Scope scope; + + public bool IsActive { get; private set; } + + Dictionary connections = new Dictionary(); + + public RemoteServer(Scope scope) { + this.scope = scope; + IsActive = false; + } + + public Event Call(string name, Event request) { + Scope methodScope = scope.Concat(new Scope("/" + name)); + + // checks if the request event is well-formed instead of well-forming the event + if (!request.Method.Equals("REQUEST")) { + throw new RSBException(String.Format("Request event expected! (Received: {0})", request.Method)); + } + + if (!request.Scope.Equals(methodScope)) { + throw new RSBException(String.Format("Event is not the expected scope. (Expected: {0} | Got: {1})", methodScope, request.Scope)); + } + + RemoteMethod method; + + if (connections.ContainsKey(name)) { + method = connections[name]; + } + else + { + method = new RemoteMethod(methodScope, Timeout); + connections.Add(name, method); + if (IsActive) { + method.Activate(); + } + } + + return method.Call(request); + } + + public ReplyType Call(string name, RequestType requestData) { + Event request = new Event(); + request.Data = requestData; + request.Scope = scope.Concat(new Scope("/" + name)); + request.Method = "REQUEST"; + + Event reply = this.Call(name, request); + if (reply.Data.GetType().Equals(typeof(ReplyType))) + { + return (ReplyType)reply.Data; + } + else { + throw new RSBException(String.Format("Unexpected Datatype! (Expected: {0} | Got: {1})", typeof(ReplyType).ToString(), reply.Data.GetType().ToString())); + } + } + + public void Activate() { + foreach (var remoteMethod in connections.Values) { + remoteMethod.Activate(); + } + IsActive = true; + } + + public void Deactivate() { + foreach (var remoteMethod in connections.Values) { + remoteMethod.Deactivate(); + } + IsActive = false; + } + } +} diff --git a/rsb-cil/rsb-cil.csproj b/rsb-cil/rsb-cil.csproj index 618a864..410a8db 100644 --- a/rsb-cil/rsb-cil.csproj +++ b/rsb-cil/rsb-cil.csproj @@ -52,6 +52,12 @@ + + + + + + -- 2.14.2.windows.1