[The Methanumプロジェクト]スタートポロジを使用して分散システムを構築するためのツールを作成する









星は、コンピュータネットワークの最も一般的なトポロジです。 この構造には、スケーリングの容易さ、信頼性(1台のマシンの障害が他のマシンに影響を与えない)、および管理の容易さといういくつかの利点があります。 もちろん、物理レベルからのこのソリューションは、ソフトウェアレベルで長い間実装されてきました。 それにもかかわらず、スタートポロジで分散システムを構築するための.Netツールのバージョンを読者に提示します。



このようなトポロジに基づいて構築されたシステムは、たとえば次の図のように、構造的に編成できます。











この記事では、提示されたアーキテクチャーに基づいた開発で一度使用した多くの便利な機能を持たない最小限のツールの作成について説明します。 しかし、これは本当に便利なシステムを構築するのに十分です。



メタン













プロジェクトは、メタン分子とトポロジーの構造的な類似性だけのために、コード名Methanumを受け取りました:)。 コミュニケーターとして機能する中央ノードは「コア」と呼ばれます。 残りのネットワークアプリケーションはカーネルに接続され、イベントをサブスクライブします。 各ネットワークアプリケーションはイベントを発行することもできます。 したがって、イベントを通じて、ネットワーク上でデータが交換されます。 イベントは、任意のデータを含むことができるシリアル化可能なイベントクラスです。 イベントには、イベントを分類するDestination stringフィールドと、キー値辞書を含むDataフィールドの2つのフィールドが最低限含まれています。 キーは文字列、引数の名前、値はオブジェクト型であり、プリミティブ(int、double、bool ...)を含むことができます。 構造体については、システムがそれらを直列化するのを多少助けなければなりません。



最初に、C#クラスライブラリの「methanum」プロジェクトを作成し、テキストの過程でファイルを追加します。



イベント













すでに述べたように、データはイベントを通じて送信されます。 イベントは、データフィールドとDestinationイベントを識別するフィールドを含むクラスです。 さらに2つのフィールドを残しました。Id-一意の識別子と、イベントが作成された時刻を含むDataTimeです。 これらの追加フィールドは、たとえばログを解析する場合など、便宜上のみ必要です。 イベントクラスには、プログラマの生活を簡素化するために設計された多くのメソッドも含まれています。それらの目的はコードから明確になり、追加の説明は不要だと思います。



Event.cs
using System; using System.Collections.Generic; using System.Linq; using System.Runtime.Serialization; using System.Text; using System.Web.Script.Serialization; namespace methanum { [DataContract] [KnownType(typeof(List<DateTime>))] public class Event { /// <summary> /// A unique id of the event /// </summary> [DataMember] public Guid Id { set; get; } /// <summary> /// DateTime of event creation /// </summary> [DataMember] public DateTime DataTime { get; set; } /// <summary> /// Target /// </summary> [DataMember] public string Destination { get; set; } /// <summary> /// Data container /// </summary> [DataMember] public Dictionary<string, object> Data { get; set; } public Event() { Init(); } public Event(string destination) { Init(); Destination = destination; } private void Init() { Data = new Dictionary<string, object>(); Id = Guid.NewGuid(); DataTime = DateTime.Now; } public override string ToString() { var properties = GetType().GetProperties(); var sb = new StringBuilder(); sb.AppendFormat("[{0}]", GetType().Name); foreach (var property in properties) { if (property.Name == "Data") { sb.Append("\nData = "); string s = string.Format(" {0}", '{'); s = Data.Keys.Aggregate(s, (current, key) => current + String.Format("\n {0}\t:{1}", key, Data[key])); sb.AppendFormat("{0}\n{1}", s, '}'); } else sb.AppendFormat("\n{0} = {1};", property.Name, property.GetValue(this, null)); } return sb.ToString(); } public void SetData(string key, object obj) { Data[key] = obj; } public object GetObj(string key) { return !Data.ContainsKey(key) ? null : Data[key]; } public double GetDbl(string key) { return !Data.ContainsKey(key) ? Double.NaN : Convert.ToDouble(Data[key]); } public int GetInt(string key) { return !Data.ContainsKey(key) ? Int32.MinValue : Convert.ToInt32(Data[key]); } public bool GetBool(string key) { return Data.ContainsKey(key) && Convert.ToBoolean(Data[key]); } public string GetStr(string key) { return !Data.ContainsKey(key) ? null : Convert.ToString(Data[key]); } public void SetCustomData(string key, object value) { var serializer = new JavaScriptSerializer(); var str = serializer.Serialize(value); SetData(key, str); } public object GetCustom(string key, Type valueType) { if (!Data.ContainsKey(key)) return null; if (Data[key].GetType() != typeof(string)) return null; var serializer = new JavaScriptSerializer(); var str = (string) Data[key]; var obj = serializer.Deserialize(str, valueType); return obj; } } }
      
      









ゲート













カーネルの中核はインターフェースを実装することです。それを「ゲートインターフェース」と呼びましょう。 ゲートの主な目標は、クライアントを登録し、双方向(アプリケーションからカーネルへ、またはその逆)で非同期的にイベントを送信する機能を提供することです。



IGate.cs
 using System.ServiceModel; namespace methanum { [ServiceContract(CallbackContract = typeof(IListener))] public interface IGate { [OperationContract] void Subscribe(); [OperationContract] void KillConnection(); [OperationContract] void Fire(Event evt); } }
      
      









データコントラクトは二重化されており、アプリケーションからカーネルへの順方向で、void Fire(Event evt)メソッドを呼び出してIGateを介してイベントを撮影します。 カーネルからアプリケーションへのコールバックは、後で説明するIListenerインターフェイスを介して発生します。

ゲートは次の原則に従って機能します。 カーネルが起動すると、IGateインターフェイスから継承されたGateクラスのオブジェクトが作成されます。 Gateには静的な_subscribersフィールドがあり、カーネルへのすべてのアクティブな接続を保存します。 Subscribe()メソッドを呼び出すときに、現在の接続がまだ追加されていない場合は追加します。 KillConnection()メソッドは、現在の接続を削除するために使用されます。 最も興味深いのはFire(Event evt)メソッドですが、複雑なものもありません。 メソッドの半分をIPアドレスとポートに取得し、コンソールに情報を表示するだけです。 コードのこの部分は、接続アドレスにアクセスする方法、たとえば、許可されたアドレスにイベントをフィルターまたはログする方法を示すためだけに残しました。 このメソッドの主な作業は、既存の接続をすべてバイパスし、IListenerリスナーでReceiveメソッドを非同期的に呼び出すことです。 閉じられた接続が見つかった場合、アクティブな接続のリストからすぐに削除します。



Gate.cs
 using System; using System.Collections.Generic; using System.ServiceModel; using System.ServiceModel.Channels; namespace methanum { public class Gate : IGate { private static List<OperationContext> _subscribers; public Gate() { if (_subscribers == null) _subscribers = new List<OperationContext>(); } public void Subscribe() { var oc = OperationContext.Current; if (!_subscribers.Exists(c => c.SessionId == oc.SessionId)) { _subscribers.Add(oc); Console.WriteLine("(subscribe \"{0}\")", oc.SessionId); } } public void KillConnection() { var oc = OperationContext.Current; _subscribers.RemoveAll(c => c.SessionId == oc.SessionId); Console.WriteLine("(kill \"{0}\")", oc.SessionId); } public void Fire(Event evt) { var currentOperationContext = OperationContext.Current; var remoteEndpointMessageProperty = currentOperationContext.IncomingMessageProperties[RemoteEndpointMessageProperty.Name] as RemoteEndpointMessageProperty; var ip = ""; var port = 0; if (remoteEndpointMessageProperty != null) { ip = remoteEndpointMessageProperty.Address; port = remoteEndpointMessageProperty.Port; } Console.WriteLine("(Fire (event . \"{0}\") (from . \"{1}:{2}\") (subscribers . {3}))", evt.Id, ip, port, _subscribers.Count); for (var i = _subscribers.Count - 1; i >= 0; i--) { var oc = _subscribers[i]; if (oc.Channel.State == CommunicationState.Opened) { var channel = oc.GetCallbackChannel<IListener>(); try { ((DelegateReceive) (channel.Receive)).BeginInvoke(evt, null, null); } catch (Exception e) { Console.WriteLine(e.Message); } } else { _subscribers.RemoveAt(i); Console.WriteLine("(dead . \"{0}\")", oc.SessionId); } } } } }
      
      









リスナー













カーネルからクライアントにメッセージを送信するには、IListenerインターフェイスで定義されている1つのReceiveメソッドで十分です。



IListener.cs
 using System.ServiceModel; namespace methanum { public delegate void DelegateReceive(Event evt); interface IListener { [OperationContract(IsOneWay = true)] void Receive(Event evt); } }
      
      









Connectorクラスは、クライアントアプリケーションとカーネル間の相互作用のすべてのロジックを実装するIListenerインターフェイスから継承されます。 クラスのインスタンスが作成されると、メッセージの送受信に使用されるカーネルへの接続が作成されます。 メッセージは、アプリケーションとカーネルのブロックを防ぐために、個別のスレッドで送受信されます。 イベントを区別するために、宛先フィールドがあります。 if-then-elseまたはswitch-case構造を使用したイベントのフィルタリングは不便であるため、対象の各イベントをハンドラーと一致させるメカニズムが実装されました。 このようなマッピングは、辞書<string、CbHandler> _handlers;。に保存されます。 イベントが受け入れられると、辞書で検索が実行され、キーが見つかった場合、対応するハンドラーが呼び出されます。



Connector.cs
 using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.ServiceModel; using System.Threading; namespace methanum { public delegate void CbHandler(Event evt); public class Connector : IListener { private Dictionary<string, CbHandler> _handlers; private NetTcpBinding _binding; private EndpointAddress _endpointToAddress; private InstanceContext _instance; private DuplexChannelFactory<IGate> _channelFactory; private IGate _channel; private Thread _fireThread; private List<Event> _eventQueue; public event CbHandler ReceiveEvent; private bool _isSubscribed; private object _channelSync = new object(); protected virtual void OnReceive(Event evt) { CbHandler handler = ReceiveEvent; if (handler != null) handler.BeginInvoke(evt, null, null); } //localhost:2255 public Connector(string ipAddress) { init(ipAddress); } private void init(string ipAddress) { _handlers = new Dictionary<string, CbHandler>(); _binding = new NetTcpBinding(); _endpointToAddress = new EndpointAddress(string.Format("net.tcp://{0}", ipAddress)); _instance = new InstanceContext(this); Conect(); _eventQueue = new List<Event>(); _fireThread = new Thread(FireProc); _fireThread.IsBackground = true; _fireThread.Start(); } private void Conect() { _isSubscribed = false; while (!_isSubscribed) { try { _channelFactory = new DuplexChannelFactory<IGate>(_instance, _binding, _endpointToAddress); _channel = _channelFactory.CreateChannel(); _channel.Subscribe(); _isSubscribed = true; } catch (Exception e) { if (!(e is EndpointNotFoundException)) throw e; Thread.Sleep(1000); } } } private void ReConect() { lock (_channelSync) { try { _channel.KillConnection(); } catch (Exception e) { Console.WriteLine("(ReConect-exception \"{0}\"", e.Message); } Conect(); } } public void Fire(Event evt) { lock (_eventQueue) { _eventQueue.Add(evt); } } private void FireProc() { while (true) { var isHasEventsToFire = false; lock (_eventQueue) { isHasEventsToFire = _eventQueue.Any(); } if (_isSubscribed && isHasEventsToFire) { Event evt; lock (_eventQueue) { evt = _eventQueue.First(); } try { lock (_eventQueue) { _eventQueue.Remove(evt); } _channel.Fire(evt); } catch (Exception) { if (_isSubscribed) _isSubscribed = false; ReConect(); } } else Thread.Sleep(10); } } public void SetHandler(string destination, CbHandler handler) { _handlers[destination] = handler; } public void DeleteHandler(string destination) { if(_handlers.ContainsKey(destination)) _handlers.Remove(destination); } public void Receive(Event evt) { if (_handlers.ContainsKey(evt.Destination)) { _handlers[evt.Destination].BeginInvoke(evt, null, null); } OnReceive(evt); } static public void HoldProcess() { var processName = Process.GetCurrentProcess().ProcessName; var defColor = Console.ForegroundColor; Console.ForegroundColor = ConsoleColor.Green; Console.WriteLine("The {0} is ready", processName); Console.WriteLine("Press <Enter> to terminate {0}", processName); Console.ForegroundColor = defColor; Console.ReadLine(); } } }
      
      









便宜上、サービスを開始する別の小さなクラスを作成します。



Srvrunner.cs
 using System; using System.ServiceModel; namespace methanum { public class SrvRunner { private ServiceHost _sHost; public void Start(int port) { var uris = new[] { new Uri(string.Format("net.tcp://0.0.0.0:{0}", port)) }; _sHost = new ServiceHost(typeof (Gate), uris); _sHost.Open(); foreach (var uri2 in _sHost.BaseAddresses) { Console.WriteLine("Start on: {0}", uri2.ToString()); } } public void Stop() { _sHost.Close(); } } }
      
      









コア













アプリケーションの通信に必要なすべてのクラスを実装しました。 アプリケーションが接続するカーネルを作成します。 これを行うには、ソリューションでコンソールアプリケーションの「コア」プロジェクトを作成し、それにメタンアセンブリを接続します。 一般に、私たちはすでにすべてを書きました、それは実行するためだけに残ります。



Coremain.cs
 using System; using System.Linq; using methanum; namespace Core { internal class CoreMain { private static void Main(string[] args) { int port = 0; if ((!args.Any()) || (!int.TryParse(args[0], out port))) { Console.WriteLine("Usage:"); Console.WriteLine("Core.exe port"); Environment.Exit(1); } try { var coreSrv = new SrvRunner(); coreSrv.Start(port); Console.WriteLine("The Core is ready."); Console.WriteLine("Press <ENTER> to terminate Core."); Console.ReadLine(); coreSrv.Stop(); } catch (Exception e) { Console.WriteLine(e.Message); } } } }
      
      









使用例











デモンストレーションのために、プリミティブメッセンジャーを作成します。別のコンソールアプリケーションを作成し、メタンアセンブリへのリンクを追加し、Program.csファイルの内容を挿入します。



Program.cs
 using System; using System.Linq; using methanum; namespace ClentExamle { class Program { static void Main(string[] args) { if ((!args.Any())) { Console.WriteLine("Usage:"); Console.WriteLine("ClentExample.exe coreAddress:port"); Environment.Exit(1); } var userName = ""; while (String.IsNullOrWhiteSpace(userName)) { Console.WriteLine("Please write user name:"); userName = Console.ReadLine(); } try { var maingate = new Connector(args[0]); maingate.SetHandler("message", MsgHandler); Console.WriteLine("Hello {0}, now you can send messages", userName); while (true) { var msg = Console.ReadLine(); var evt = new Event("message"); evt.SetData("name", userName); evt.SetData("text", msg); maingate.Fire(evt); } } catch (Exception e) { Console.WriteLine(e.Message); } } static private void MsgHandler(Event evt) { Console.WriteLine("[{0}] >> {1}", evt.GetStr("name"), evt.GetStr("text")); } } }
      
      









ここで、コマンドラインでポートを指定してCore.exeアプリケーションを実行します(たとえば、「Core 2255」)。 次に、コマンド「ClentExample localhost:2255」でClentExample.exeのいくつかのインスタンスを開始します。 アプリケーションは、ユーザー名を入力して、カーネルに接続することを提案します。 その結果、ブロードキャストプリミティブチャットを取得します。新しいメッセージはそれぞれ、maingate.Fire(evt)を呼び出して送信され、MsgHandler(Event evt)ハンドラーで受信されます。











完全なソースはmethanum gihabaで入手できます。



All Articles