From 405ad9750be05926196ef298f59818a980999b7d Mon Sep 17 00:00:00 2001 From: Sinan Date: Mon, 24 Sep 2018 12:48:07 +0200 Subject: [PATCH 2/3] Add rsb rpc pattern This includes LocalServer, RemoteServer with LocalMethods and RemoteMethods. As well synchronous as asynchronous calls are implemented. Exception on errors are also transmitted. --- rsb-cil-test/Program.cs | 118 ++++++++++++++++++++++++- rsb-cil/RSBErrorReplyException.cs | 25 ++++++ rsb-cil/Rsb/Factory.cs | 26 ++++++ rsb-cil/Rsb/Patterns/DataCallback.cs | 32 +++++++ rsb-cil/Rsb/Patterns/ICallback.cs | 8 ++ rsb-cil/Rsb/Patterns/LocalMethod.cs | 54 ++++++++++++ rsb-cil/Rsb/Patterns/LocalServer.cs | 46 ++++++++++ rsb-cil/Rsb/Patterns/RemoteMethod.cs | 98 +++++++++++++++++++++ rsb-cil/Rsb/Patterns/RemoteServer.cs | 161 ++++++++++++++++++++++++++++++++++ rsb-cil/Rsb/Patterns/TaskExtension.cs | 55 ++++++++++++ rsb-cil/rsb-cil.csproj | 8 ++ 11 files changed, 630 insertions(+), 1 deletion(-) create mode 100644 rsb-cil/RSBErrorReplyException.cs create mode 100644 rsb-cil/Rsb/Patterns/DataCallback.cs create mode 100644 rsb-cil/Rsb/Patterns/ICallback.cs create mode 100644 rsb-cil/Rsb/Patterns/LocalMethod.cs create mode 100644 rsb-cil/Rsb/Patterns/LocalServer.cs create mode 100644 rsb-cil/Rsb/Patterns/RemoteMethod.cs create mode 100644 rsb-cil/Rsb/Patterns/RemoteServer.cs create mode 100644 rsb-cil/Rsb/Patterns/TaskExtension.cs diff --git a/rsb-cil-test/Program.cs b/rsb-cil-test/Program.cs index 6cff907..cc6273c 100644 --- a/rsb-cil-test/Program.cs +++ b/rsb-cil-test/Program.cs @@ -4,6 +4,7 @@ using System.Collections.Generic; using Rsb; using Rsb.Converter; using Rsb.Transport.Socket; +using Rsb.Patterns; using Rst.Kinematics; using log4net.Config; @@ -36,7 +37,7 @@ namespace rsbciltest static void Main(String[] args) { - BasicConfigurator.Configure(); + //BasicConfigurator.Configure(); var inConverters = buildConverterSelection(new List> { new Tuple("bool", new BoolConverter() ), @@ -123,7 +124,122 @@ namespace rsbciltest repoInformer.Deactivate(); Console.ReadKey(); repoListener.Deactivate(); + + Console.WriteLine("\nRPC pattern test"); + + RemoteServer remote = Factory.Instance.CreateRemoteServer("/test/methods", 1000); + remote.Activate(); + + LocalServer server = Factory.Instance.CreateLocalServer("/test/methods"); + server.AddMethod("echo", new EchoCall()); + + //should have a timeout, since the LocalServer is not activated + try + { + remote.Call("echo", "hallo"); + } + catch (RSBException e) + { + Console.WriteLine(e.Message); + } + + server.Activate(); + // should work even though it is added after activation + server.AddMethod("echo2", new EchoCall()); + + Console.WriteLine("Called by Echo - {0}", remote.Call("echo", "hallo")); + Console.WriteLine("Called by Echo2- {0}", remote.Call("echo2", "hallo")); + + server.AddMethod("voidvoid", new VoidVoidCall()); + server.AddMethod("voidtype", new VoidTypeCall()); + server.AddMethod("typevoid", new TypeVoidCall()); + + Console.WriteLine("Called by typevoid- {0}", remote.Call("typevoid")); + remote.Call("voidtype", "typevoid-echo"); + remote.Call("voidvoid"); + + // do some asynchronous calls + var futureEcho = remote.CallAsync("echo", "Async"); + remote.CallAsync("voidvoid"); + var futureType = remote.CallAsync("typevoid"); + + Console.WriteLine("Press any Key to get results..."); + Console.ReadKey(); + Console.WriteLine(); + + Console.WriteLine("Answer from typevoid: {0}", futureType.Get()); + Console.WriteLine("Answer from echo : {0}", futureEcho.Get()); + + Console.ReadKey(); + Console.WriteLine(); + + server.AddMethod("exception", new ExceptionCall()); + + // Should not throw an exception yet + var futureInt = remote.CallAsync("exception"); + try + { + int output = remote.Call("exception"); + } + catch (RSBErrorReplyException e) + { + Console.WriteLine("RSB Reply: " + e.Message); + } + try + { + futureInt.Get(); + } + catch (RSBErrorReplyException e) + { + Console.WriteLine("RSB Reply: " + e.Message); + } + Console.ReadKey(); + + server.Deactivate(); + remote.Deactivate(); + } + + } + + public class ExceptionCall : DataCallback + { + public override int Invoke(Null param) + { + throw new NotImplementedException("Won't be implemented!"); + } + } + + public class VoidVoidCall : DataCallback + { + public override Null Invoke(Null param) + { + Console.WriteLine("Invoked VoidVoid Call"); + return Null.Instance; + } + } + + public class TypeVoidCall : DataCallback + { + public override string Invoke(Null param) + { + return "TypeVoid Call Invoked!"; } + } + public class VoidTypeCall : DataCallback + { + public override Null Invoke(string param) + { + Console.WriteLine("Invoked with: {0}", param); + return Null.Instance; + } + } + + public class EchoCall : DataCallback + { + public override string Invoke(string data) + { + return "Echo: " + data; + } } } diff --git a/rsb-cil/RSBErrorReplyException.cs b/rsb-cil/RSBErrorReplyException.cs new file mode 100644 index 0000000..c0f0365 --- /dev/null +++ b/rsb-cil/RSBErrorReplyException.cs @@ -0,0 +1,25 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Rsb +{ + public class RSBErrorReplyException : Exception + { + public RSBErrorReplyException() + { + } + + public RSBErrorReplyException(string message) + : base(message) + { + } + + public RSBErrorReplyException(string message, Exception inner) + : base(message, inner) + { + } + } +} diff --git a/rsb-cil/Rsb/Factory.cs b/rsb-cil/Rsb/Factory.cs index 08c431e..211e96b 100644 --- a/rsb-cil/Rsb/Factory.cs +++ b/rsb-cil/Rsb/Factory.cs @@ -1,5 +1,6 @@ using Rsb.Transport.Socket; using Rsb.Converter; +using Rsb.Patterns; namespace Rsb { @@ -52,5 +53,30 @@ namespace Rsb return CreateListener(new Scope(scope)); } + public RemoteServer CreateRemoteServer(Scope scope, int timeout = 3000) + { + var server = new RemoteServer(scope) + { + Timeout = timeout + }; + return server; + } + + public RemoteServer CreateRemoteServer(string scope, int timeout = 3000) + { + return CreateRemoteServer(new Scope(scope), timeout); + } + + public LocalServer CreateLocalServer(Scope scope) + { + var server = new LocalServer(scope); + return server; + } + + public LocalServer CreateLocalServer(string scope) + { + return CreateLocalServer(new Scope(scope)); + } + } } diff --git a/rsb-cil/Rsb/Patterns/DataCallback.cs b/rsb-cil/Rsb/Patterns/DataCallback.cs new file mode 100644 index 0000000..505b95a --- /dev/null +++ b/rsb-cil/Rsb/Patterns/DataCallback.cs @@ -0,0 +1,32 @@ + +using System; + +namespace Rsb.Patterns +{ + public abstract class DataCallback : ICallback + { + public Event InternalInvoke(Event e) { + + // reply preperation + var reply = new Rsb.Event(); + reply.Method = "REPLY"; + reply.Scope = e.Scope; + reply.AddCause(e.EventId); + + try + { + ReplyType result = this.Invoke((RequestType)e.Data); + reply.Data = result; + } + catch (Exception exception) { + reply.Data = exception.GetType().ToString() + ": " + exception.Message; + reply.MetaData.SetUserInfo("rsb:error?", "rsb:error?"); + } + + return reply; + } + + public abstract ReplyType Invoke(RequestType param); + } + +} diff --git a/rsb-cil/Rsb/Patterns/ICallback.cs b/rsb-cil/Rsb/Patterns/ICallback.cs new file mode 100644 index 0000000..1d10228 --- /dev/null +++ b/rsb-cil/Rsb/Patterns/ICallback.cs @@ -0,0 +1,8 @@ + +namespace Rsb.Patterns +{ + public interface ICallback + { + Event InternalInvoke(Event e); + } +} diff --git a/rsb-cil/Rsb/Patterns/LocalMethod.cs b/rsb-cil/Rsb/Patterns/LocalMethod.cs new file mode 100644 index 0000000..c5272cf --- /dev/null +++ b/rsb-cil/Rsb/Patterns/LocalMethod.cs @@ -0,0 +1,54 @@ + +using System; + +namespace Rsb.Patterns +{ + public class LocalMethod + { + private Informer informer; + private Listener listener; + private string name; + public string Name { get => name; } + + private ICallback callback; + + public LocalMethod(Scope scope, string name, ICallback callback) { + this.name = name; + this.callback = callback; + Scope work = scope.Concat(new Scope("/" + name)); + + informer = Factory.Instance.CreateInformer(work); + listener = Factory.Instance.CreateListener(work); + + listener.EventReceived += RemoteCallHandler; + } + + private void RemoteCallHandler(Event theEvent) + { + // this does not interest us. possible our own reply... + if (theEvent.Method == "REPLY") { + return; + } + + // usual informer publish (or something else not specified at the moment) -> ignore + if (theEvent.Method != "REQUEST") + { + return; + } + + informer.Send(callback.InternalInvoke(theEvent)); + } + + public void Activate() { + informer.Activate(); + listener.Activate(); + } + + public void Deactivate() + { + informer.Deactivate(); + listener.Deactivate(); + } + + } +} diff --git a/rsb-cil/Rsb/Patterns/LocalServer.cs b/rsb-cil/Rsb/Patterns/LocalServer.cs new file mode 100644 index 0000000..5825ca3 --- /dev/null +++ b/rsb-cil/Rsb/Patterns/LocalServer.cs @@ -0,0 +1,46 @@ +using System; +using System.Collections.Generic; + +namespace Rsb.Patterns +{ + public class LocalServer + { + private Scope scope; + private Dictionary registeredMethods = new Dictionary(); + + public bool IsActive { + get; private set; + } + + public LocalServer(Scope scope) { + this.scope = scope; + IsActive = false; + } + + + public void AddMethod(string name, ICallback callback) { + if (registeredMethods.ContainsKey(name)) { + throw new ArgumentException("method already registered"); + } + LocalMethod method = new LocalMethod(scope, name, callback); + registeredMethods.Add(name, method); + if (IsActive) { + method.Activate(); + } + } + + public void Activate() { + foreach (LocalMethod method in registeredMethods.Values) { + method.Activate(); + } + IsActive = true; + } + + public void Deactivate() { + foreach (LocalMethod method in registeredMethods.Values) { + method.Deactivate(); + } + IsActive = false; + } + } +} diff --git a/rsb-cil/Rsb/Patterns/RemoteMethod.cs b/rsb-cil/Rsb/Patterns/RemoteMethod.cs new file mode 100644 index 0000000..c08c0f1 --- /dev/null +++ b/rsb-cil/Rsb/Patterns/RemoteMethod.cs @@ -0,0 +1,98 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using System.Linq; +using Rsb; +using System.Reflection; + +namespace Rsb.Patterns +{ + public class RemoteMethod + { + Informer requestInformer; + Listener replyListener; + + object sync = new object(); + + public bool IsActive { get; private set; } + + //requesting ID and reply event + List, bool>> procedureCalls = new List, bool>>(); + + public RemoteMethod(Scope scope, int timeout) { + requestInformer = Factory.Instance.CreateInformer(scope); + replyListener = Factory.Instance.CreateListener(scope); + replyListener.EventReceived += ListenerHandler; + + IsActive = false; + } + + private void ListenerHandler(Event e) { + if (e.Method == "REPLY") + { + EventId cause = null; + TaskCompletionSource tcs = null; + bool sendData = false; + + // find causing call + // TODO sbarut: clean zombie calls + foreach (var call in procedureCalls) { + if (e.IsCause(call.Item1)) { + cause = call.Item1; + tcs = call.Item2; + sendData = call.Item3; + break; + } + } + + if (cause == null) { + throw new ArgumentException("Got Reply for a non-existing Cause"); + } + + tcs.SetResult(e); + + // TODO sbarut: Maybe use less Reflection? + /* + Type tcsType = tcs.GetType(); + var method = tcsType.GetMethod("SetResult"); + if (sendData) + { + method.Invoke(tcs, new object[] { e.Data }); + } + else + { + method.Invoke(tcs, new object[] { e }); + }*/ + } + } + + public Task Call(Event request) + { + if (!IsActive) + { + throw new InvalidOperationException("Participant not active"); + } + + TaskCompletionSource taskController = new TaskCompletionSource(); + procedureCalls.Add(new Tuple, bool>(request.EventId, taskController, true)); + + requestInformer.Send(request); + + return taskController.Task; + } + + public void Activate() { + requestInformer.Activate(); + replyListener.Activate(); + IsActive = true; + } + + public void Deactivate() { + requestInformer.Deactivate(); + replyListener.Deactivate(); + IsActive = false; + } + } + +} diff --git a/rsb-cil/Rsb/Patterns/RemoteServer.cs b/rsb-cil/Rsb/Patterns/RemoteServer.cs new file mode 100644 index 0000000..69bd19f --- /dev/null +++ b/rsb-cil/Rsb/Patterns/RemoteServer.cs @@ -0,0 +1,161 @@ +using Rsb.Converter; +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Rsb.Patterns +{ + public class RemoteServer + { + public int Timeout = 3000; // milliseconds + private Scope scope; + + public bool IsActive { get; private set; } + + Dictionary connections = new Dictionary(); + + public RemoteServer(Scope scope) { + this.scope = scope; + IsActive = false; + } + + // Synchronous void,void Call + public void Call(string name) { + Call(name, Null.Instance); + } + + // Synchronous Type,void Call + public ReplyType Call(string name) { + return Call(name, Null.Instance); + } + + // Synchronous void,Type Call + public void Call(string name, RequestType requestData) { + Call(name, requestData); + } + + // Synchronous Type,Type Call + public ReplyType Call(string name, RequestType requestData) { + try + { + ReplyType reply = this.CallAsync(name, requestData).Get(Timeout); + return reply; + } + catch (AggregateException ae) { + ae.Handle((x) => + { + if (x is RSBErrorReplyException) + { + throw x; + } + return false; + }); + } + return default(ReplyType); + } + + // Generic Synchronous Event Call - Beware: no Exception throwing (the user should do it on his own as he/she has access to the Event Metadata) + public Event Call(string name, Event request) + { + return CallAsync(name, request).Get(Timeout); + } + + // Asynchronous void,void Call + public Task CallAsync(string name) { + // the void,Type call not the Type,Type call! + // the difference is in the returned non-returning Task! + return CallAsync(name, Null.Instance); + } + + // Asynchronous Type,void Call + public Task CallAsync(string name) { + return CallAsync(name, Null.Instance); + } + + // Asynchronous void,Type Call + public Task CallAsync(string name, RequestType requestData) { + var callTask = CallAsync(name, requestData); + Task waiter = Task.Run(() => + { + callTask.Wait(); + }); + return waiter; + } + + // Asynchronous Type,Type Call + public Task CallAsync(string name, RequestType requestData) + { + Event request = new Event(); + request.Data = requestData; + request.Scope = scope.Concat(new Scope("/" + name)); + request.Method = "REQUEST"; + + Task eventTask = CallAsync(name, request); + + Task castTask = Task.Run(() => + { + eventTask.Wait(); + Event result = eventTask.Result; + + if (result.MetaData.HasUserInfo("rsb:error?")) { + throw new RSBErrorReplyException((String)result.Data); + } + + if (result.Data.GetType() != typeof(ReplyType)) { + throw new RSBException(String.Format("Unexpected Datatype! (Expected: {0} | Got: {1})", typeof(ReplyType).ToString(), result.Data.GetType().ToString())); + } + return (ReplyType)result.Data; + }); + + return castTask; + } + + // Generic Asynchronous Event Call - Beware: no Exception throwing (the user should do it on his own as he/she has access to the Event Metadata) + public Task CallAsync(string name, Event request) { + Scope methodScope = scope.Concat(new Scope("/" + name)); + + // checks if the request event is well-formed instead of well-forming the event + if (!request.Method.Equals("REQUEST")) + { + throw new RSBException(String.Format("Request event expected! (Received: {0})", request.Method)); + } + + if (!request.Scope.Equals(methodScope)) + { + throw new RSBException(String.Format("Event has not the expected scope. (Expected: {0} | Got: {1})", methodScope, request.Scope)); + } + + RemoteMethod method; + + if (connections.ContainsKey(name)) + { + method = connections[name]; + } + else + { + method = new RemoteMethod(methodScope, Timeout); + connections.Add(name, method); + if (IsActive) + { + method.Activate(); + } + } + + return method.Call(request); + } + + public void Activate() { + foreach (var remoteMethod in connections.Values) { + remoteMethod.Activate(); + } + IsActive = true; + } + + public void Deactivate() { + foreach (var remoteMethod in connections.Values) { + remoteMethod.Deactivate(); + } + IsActive = false; + } + } +} diff --git a/rsb-cil/Rsb/Patterns/TaskExtension.cs b/rsb-cil/Rsb/Patterns/TaskExtension.cs new file mode 100644 index 0000000..0d9cf57 --- /dev/null +++ b/rsb-cil/Rsb/Patterns/TaskExtension.cs @@ -0,0 +1,55 @@ +using System; +using System.Threading.Tasks; + +namespace Rsb.Patterns +{ + // Used to make Java Future like Get methods for Task<> types. Nice C# feature btw. + public static class TaskExtension + { + public static ReplyType Get(this Task task, int timeout) + { + bool timeoutFlag = false; + try + { + timeoutFlag = !task.Wait(timeout); + } + catch (AggregateException ae) + { + ae.Handle((x) => + { + if (x is RSBErrorReplyException) + { + throw x; + } + return false; + }); + } + + if (timeoutFlag) + { + throw new RSBException("Timeout while waiting for a RPC reply."); + } + return task.Result; + } + + public static ReplyType Get(this Task task) + { + try + { + task.Wait(); + } + catch (AggregateException ae) + { + ae.Handle((x) => + { + if (x is RSBErrorReplyException) + { + throw x; + } + return false; + }); + } + return task.Result; + } + } +} diff --git a/rsb-cil/rsb-cil.csproj b/rsb-cil/rsb-cil.csproj index 347bc05..fa8e7d7 100644 --- a/rsb-cil/rsb-cil.csproj +++ b/rsb-cil/rsb-cil.csproj @@ -40,6 +40,7 @@ + @@ -52,6 +53,13 @@ + + + + + + + -- 2.14.2.windows.1