CQRS +むベント゜ヌシングの抂芁パヌト2

前回の蚘事では、CQRS +むベント゜ヌシングの基本から始めたした。 今回は、ESをさらに詳しく芋おいくこずを提案したす。



前の蚘事で投皿した䟋では、むベント゜ヌシングの魔法がIRepository抜象化ず2぀のIRepository.SaveおよびIRepository.GetById <>メ゜ッドの背埌に隠されおいたした。

䜕が起こっおいるかをより詳现に理解するために、Rinat AbdulinのLokad IDDDサンプルプロゞェクトの䟋を䜿甚しお、むベントストアからナニットを保存およびロヌドするプロセスに぀いお話すこずにしたした。 圌のアプリケヌションサヌビスでは、远加の抜象化なしでむベントストアに盎接蚎えるので、すべおが非垞に明確に芋えたす。 Application ServiceはCommandHandlerに類䌌しおいたすが、1぀のナニットのすべおのコマンドを凊理したす。 このアプロヌチは非垞に䟿利で、1぀のプロゞェクトで切り替えたした。



ApplicationService



IApplicationServiceむンタヌフェむスは非垞にシンプルです。

public interface IApplicationService { void Execute(ICommand cmd); }
      
      





Executeメ゜ッドでは、コマンドを枡し、サヌビスがそれらを目的のハンドラヌにリダむレクトするこずを期埅したす。



この䟋のRinatにはCustomer集蚈が1぀しかないため、サヌビスにはCustomerApplicationServiceも1぀しかありたせん。 したがっお、実際には、ロゞックを基本クラスに転送する必芁はありたせん。 私の意芋では䟋のための玠晎らしい゜リュヌション。



Executeメ゜ッドは、眲名ず䞀臎するWhenメ゜ッドオヌバヌロヌドの1぀にコマンドの凊理を枡したす。 Executeメ゜ッドの実装は、スピヌカヌを䜿甚するず非垞に簡単であり、すべおのメ゜ッドでリフレクションを実行する必芁はありたせん。

 public void Execute(ICommand cmd) { // pass command to a specific method named when // that can handle the command ((dynamic)this).When((dynamic)cmd); }
      
      





createコマンド-CreateCustomerから始めたしょう。

 [Serializable] public sealed class CreateCustomer : ICommand { public CustomerId Id { get; set; } public string Name { get; set; } public Currency Currency { get; set; } public override string ToString() { return string.Format("Create {0} named '{1}' with {2}", Id, Name, Currency); } }
      
      





実際のプロゞェクトでは、ほずんどの堎合UIずApplicationServiceの間にメッセヌゞキュヌがありたすが、たずえば、Rinatはサヌビスアプリケヌションオブゞェクトに盎接コマンドを送信するように制限したしたクラスApplicationServerを参照。

CreateCustomerコマンドがExecuteメ゜ッドに入るず、Whenメ゜ッドにリダむレクトされたす。

 public void When(CreateCustomer c) { Update(c.Id, a => a.Create(c.Id,c.Name, c.Currency, _pricingService, DateTime.UtcNow)); }
      
      





Updateメ゜ッドでは、集蚈の識別子ず、集蚈の状態を倉曎するメ゜ッドを呌び出すアクションを枡したす。 䞀般的に、私の意芋では、このコンテキストでCreateメ゜ッドを呌び出すこずは少し奇劙に芋えるので、集玄でCreateメ゜ッドを䜜成するのではなく、別のコンストラクタヌを䜜成する方が良いです。 集蚈を䜜成しおいるようですが、䜕らかの理由で、状態を倉曎するメ゜ッドずしおCreateメ゜ッドを枡したす。 デザむナヌにずっお、これは起こらなかったでしょう。



Updateメ゜ッドに戻り、次のタスクを実行したす。1珟圚の集玄のすべおのむベントをロヌドしたす。2ロヌドされたむベントを䜿甚しお集玄を䜜成し、その状態を埩元したす。3集玄で枡されたAction実行アクションを実行し、4新しいむベントがあれば保存したす。

 void Update(CustomerId id, Action<Customer> execute) { // Load event stream from the store EventStream stream = _eventStore.LoadEventStream(id); // create new Customer aggregate from the history Customer customer = new Customer(stream.Events); // execute delegated action execute(customer); // append resulting changes to the stream _eventStore.AppendToStream(id, stream.Version, customer.Changes); }
      
      







状態回埩



前回の蚘事で瀺した䟋では、集玄の状態は集玄クラスのプラむベヌトフィヌルドずしお保存されおいたした。 スナップショットを远加する堎合、これはあたり䟿利ではありたせん。 毎回䜕らかの圢で状態を吞い出すか、リフレクションを䜿甚する必芁がありたす。 Rinatにははるかに䟿利なアプロヌチがありたす-状態にはCustomerStateず呌ばれる別のクラスがあり、集玄から投圱メ゜ッドを抜出するこずができ、スナップショットは䟋にありたせんが保存ずロヌドがはるかに簡単です。

ご芧のずおり、同じ集玄のむベントのリストはコンストラクタヌに転送されたす。これは、状態を埩元するために掚枬するのが難しくないためです。

ナニットは、状態の回埩をCustomerStateクラスに委任したす。

 /// <summary> /// Aggregate state, which is separate from this class in order to ensure, /// that we modify it ONLY by passing events. /// </summary> readonly CustomerState _state; public Customer(IEnumerable<IEvent> events) { _state = new CustomerState(events); }
      
      





CustomerStateクラス党䜓のコヌドを提䟛し、いく぀かのWhen投圱メ゜ッドを削陀したす。

 /// <summary> /// This is the state of the customer aggregate. /// It can be mutated only by passing events to it. /// </summary> public class CustomerState { public string Name { get; private set; } public bool Created { get; private set; } public CustomerId Id { get; private set; } public bool ConsumptionLocked { get; private set; } public bool ManualBilling { get; private set; } public Currency Currency { get; private set; } public CurrencyAmount Balance { get; private set; } public int MaxTransactionId { get; private set; } public CustomerState(IEnumerable<IEvent> events) { foreach (var e in events) { Mutate(e); } } ... public void When(CustomerCreated e) { Created = true; Name = e.Name; Id = e.Id; Currency = e.Currency; Balance = new CurrencyAmount(0, e.Currency); } public void When(CustomerRenamed e) { Name = e.Name; } public void Mutate(IEvent e) { // .NET magic to call one of the 'When' handlers with // matching signature ((dynamic) this).When((dynamic)e); } }
      
      





コンストラクタヌで確認できるように、枡されたむベントを実行し、それらをMutateメ゜ッドに枡したす。Mutateメ゜ッドは、それらをさらに適切な投圱メ゜ッドに匕き蟌みたす。

すべおのプロパティにはプラむベヌトセッタヌメ゜ッドがあり、察応するむベントを枡すこずによっおのみ倖郚から状態を倉曎できるこずが保蚌されおいたす。



状態が埩元されたので、倉曎するこずができたす。 状態倉曎メ゜ッドに少し戻りたしょう。 CreateCustomerコマンドを理解し始めたので、ナニットのCreateメ゜ッドも芋おいきたす。

 public void Create(CustomerId id, string name, Currency currency, IPricingService service, DateTime utc) { if (_state.Created) throw new InvalidOperationException("Customer was already created"); Apply(new CustomerCreated { Created = utc, Name = name, Id = id, Currency = currency }); var bonus = service.GetWelcomeBonus(currency); if (bonus.Amount > 0) AddPayment("Welcome bonus", bonus, utc); }
      
      





これは、ナニットの珟圚の状態にアクセスできるため、ビゞネスルヌルを確認する堎所です。 Customerは1回しか䜜成できないずいうビゞネスルヌルがありたすただし、技術的な制限もありたすので、䜜成枈みのナニットでCreateを呌び出そうずするず、アクションが削陀されたす。

すべおのビゞネスルヌルが満たされおいる堎合、CustomerCreatedむベントをApplyメ゜ッドに枡したす。 Applyメ゜ッドは非垞に単玔で、むベントを状態オブゞェクトに枡し、珟圚の倉曎のリストに远加するだけです。

 public readonly IList<IEvent> Changes = new List<IEvent>(); readonly CustomerState _state; void Apply(IEvent e) { // pass each event to modify current in-memory state _state.Mutate(e); // append event to change list for further persistence Changes.Add(e); }
      
      





ほずんどの堎合、集玄を䜿甚する操䜜ごずに1぀のむベントが発生したす。 ただし、Createメ゜ッドの堎合は、2぀のむベントが発生する可胜性がありたす。

CustomerCreateむベントをApplyメ゜ッドに枡した埌、ビゞネスルヌルでボヌナス額をれロより倧きくする必芁がある堎合、珟圚の顧客にりェルカムボヌナスを远加できたす。 この堎合、AddPayment集蚈メ゜ッドが呌び出されたす。これは、チェックを集蚈せず、CustomerPaymentAddedむベントをトリガヌするだけです。

 public void AddPayment(string name, CurrencyAmount amount, DateTime utc) { Apply(new CustomerPaymentAdded() { Id = _state.Id, Payment = amount, NewBalance = _state.Balance + amount, PaymentName = name, Transaction = _state.MaxTransactionId + 1, TimeUtc = utc }); }
      
      





次に、新しいむベントを保存し、読み取りモデルで公開する必芁がありたす。 これが次の行だず思いたす。

 // append resulting changes to the stream _eventStore.AppendToStream(id, stream.Version, customer.Changes);
      
      





確認しおください...

 public void AppendToStream(IIdentity id, long originalVersion, ICollection<IEvent> events) { if (events.Count == 0) return; var name = IdentityToString(id); var data = SerializeEvent(events.ToArray()); try { _appendOnlyStore.Append(name, data, originalVersion); } catch(AppendOnlyStoreConcurrencyException e) { // load server events var server = LoadEventStream(id, 0, int.MaxValue); // throw a real problem throw OptimisticConcurrencyException.Create(server.Version, e.ExpectedStreamVersion, id, server.Events); } // technically there should be a parallel process that queries new changes // from the event store and sends them via messages (avoiding 2PC problem). // however, for demo purposes, we'll just send them to the console from here Console.ForegroundColor = ConsoleColor.DarkGreen; foreach (var @event in events) { Console.WriteLine(" {0} r{1} Event: {2}", id,originalVersion, @event); } Console.ForegroundColor = ConsoleColor.DarkGray; }
      
      





たあ、ほずんど満足しおいたす。 むベントはシリアル化され、远加専甚ストアに保存されたす削陀および倉曎は行いたせん。 しかし、Rinatはプレれンテヌションレベルで読み取りモデルに送信する代わりに、単にコン゜ヌルに衚瀺したす。

ただし、たずえば、これで十分です。

これがメッセヌゞキュヌでどのように機胜するかを確認したい堎合は、以前の蚘事のgithubで䟋を芋るこずができたす。 それを少し倉曎し、Event Sourcingむンフラストラクチャの䞀郚を゜リュヌションに導入したした。 この䟋では、スナップショットを远加する方法を瀺したす。



スナップショット



スナップショットは、ナニットの状態の䞭間スナップショットを取埗するために必芁です。 これにより、ナニットのむベントのフロヌ党䜓をダりンロヌドするのではなく、最埌のスナップショットを䜜成した埌に発生したむベントのみをダりンロヌドできたす。

それでは、実装を芋おみたしょう。

UserCommandHandlerクラスには、Rinatのメ゜ッドず非垞によく䌌たUpdateメ゜ッドがありたすが、私にずっおは、スナップショットを保存およびロヌドしたす。 100バヌゞョンごずにスナップショットを䜜成したす。

 private const int SnapshotInterval = 100;
      
      





たず、リポゞトリからスナップショットを䜜成し、このスナップショットを䜜成した埌に発生したむベントのストリヌムを読み蟌みたす。 次に、これらすべおをナニットの蚭蚈者に枡したす。

 private void Update(string userId, Action<UserAR> updateAction) { var snapshot = _snapshotRepository.Load(userId); var startVersion = snapshot == null ? 0 : snapshot.StreamVersion + 1; var stream = _eventStore.OpenStream(userId, startVersion, int.MaxValue); var user = new UserAR(snapshot, stream); updateAction(user); var originalVersion = stream.GetVersion(); _eventStore.AppendToStream(userId, originalVersion, user.Changes); var newVersion = originalVersion + 1; if (newVersion % SnapshotInterval == 0) { _snapshotRepository.Save(new Snapshot(userId, newVersion,user.State)); } }
      
      





コンストラクタヌでは、スナップショットから状態を取埗するか、スナップショットがない堎合は新しい状態を䜜成しようずしたす。 受信状態では、欠萜しおいるむベントがすべお倱われ、その結果、ナニットの珟圚のバヌゞョンが取埗されたす。

 public UserAR(Snapshot snapshot, TransitionStream stream) { _state = snapshot != null ? (UserState) snapshot.Payload : new UserState(); foreach (var transition in stream.Read()) { foreach (var @event in transition.Events) { _state.Mutate((IEvent) @event.Data); } } }
      
      





アグリゲヌトを操䜜した埌、アグリゲヌトバヌゞョンがスナップショットを取埗する間隔の倍数であるかどうかを確認し、そうであれば、新しいスナップショットをリポゞトリに保存したす。 UserCommandHandlerから集蚈の状態を取埗するには、そのStateプロパティを内郚ゲッタヌにする必芁がありたした。



以䞊で、スナップショットができたした。これにより、倚くのむベントがある堎合に、ナニットの状態をはるかに高速に埩元できたす。



フィヌドバック



CQRS + ESのトピックに興味がある堎合は、コメントで質問しおください。 ハブに別名がない堎合は、Skypeで私に手玙を曞くこずもできたす 最近、チェリャビンスクの友人が私のSkypeをノックしたした。圌の質問のおかげで、次の蚘事のアむデアがたくさんありたした。 自由に䜿えるようになったので、もっず頻繁に曞く぀もりです。



All Articles