åã®èšäºã§æçš¿ããäŸã§ã¯ãã€ãã³ããœãŒã·ã³ã°ã®éæ³ãIRepositoryæœè±¡åãš2ã€ã®IRepository.SaveïŒïŒããã³IRepository.GetById <>ïŒïŒã¡ãœããã®èåŸã«é ãããŠããŸããã
äœãèµ·ãã£ãŠãããããã詳现ã«ç解ããããã«ãRinat Abdulinã®Lokad IDDDãµã³ãã«ãããžã§ã¯ãã®äŸã䜿çšããŠãã€ãã³ãã¹ãã¢ãããŠããããä¿åããã³ããŒãããããã»ã¹ã«ã€ããŠè©±ãããšã«ããŸããã 圌ã®ã¢ããªã±ãŒã·ã§ã³ãµãŒãã¹ã§ã¯ãè¿œå ã®æœè±¡åãªãã§ã€ãã³ãã¹ãã¢ã«çŽæ¥èšŽããã®ã§ããã¹ãŠãéåžžã«æ確ã«èŠããŸãã Application Serviceã¯CommandHandlerã«é¡äŒŒããŠããŸããã1ã€ã®ãŠãããã®ãã¹ãŠã®ã³ãã³ããåŠçããŸãã ãã®ã¢ãããŒãã¯éåžžã«äŸ¿å©ã§ã1ã€ã®ãããžã§ã¯ãã§åãæ¿ããŸããã
ApplicationService
IApplicationServiceã€ã³ã¿ãŒãã§ã€ã¹ã¯éåžžã«ã·ã³ãã«ã§ãã
public interface IApplicationService { void Execute(ICommand cmd); }
Executeã¡ãœããã§ã¯ãã³ãã³ããæž¡ãããµãŒãã¹ãããããç®çã®ãã³ãã©ãŒã«ãªãã€ã¬ã¯ãããããšãæåŸ ããŸãã
ãã®äŸã®Rinatã«ã¯Customeréèšã1ã€ãããªãããããµãŒãã¹ã«ã¯CustomerApplicationServiceã1ã€ãããããŸããã ãããã£ãŠãå®éã«ã¯ãããžãã¯ãåºæ¬ã¯ã©ã¹ã«è»¢éããå¿ èŠã¯ãããŸããã ç§ã®æèŠã§ã¯äŸã®ããã®çŽ æŽããããœãªã¥ãŒã·ã§ã³ã
Executeã¡ãœããã¯ã眲åãšäžèŽããWhenã¡ãœãããªãŒããŒããŒãã®1ã€ã«ã³ãã³ãã®åŠçãæž¡ããŸãã Executeã¡ãœããã®å®è£ ã¯ãã¹ããŒã«ãŒã䜿çšãããšéåžžã«ç°¡åã§ããããã¹ãŠã®ã¡ãœããã§ãªãã¬ã¯ã·ã§ã³ãå®è¡ããå¿ èŠã¯ãããŸããã
public void Execute(ICommand cmd) { // pass command to a specific method named when // that can handle the command ((dynamic)this).When((dynamic)cmd); }
createã³ãã³ã-CreateCustomerããå§ããŸãããã
[Serializable] public sealed class CreateCustomer : ICommand { public CustomerId Id { get; set; } public string Name { get; set; } public Currency Currency { get; set; } public override string ToString() { return string.Format("Create {0} named '{1}' with {2}", Id, Name, Currency); } }
å®éã®ãããžã§ã¯ãã§ã¯ãã»ãšãã©ã®å ŽåUIãšApplicationServiceã®éã«ã¡ãã»ãŒãžãã¥ãŒããããŸãããããšãã°ãRinatã¯ãµãŒãã¹ã¢ããªã±ãŒã·ã§ã³ãªããžã§ã¯ãã«çŽæ¥ã³ãã³ããéä¿¡ããããã«å¶éããŸããïŒã¯ã©ã¹ApplicationServerãåç §ïŒã
CreateCustomerã³ãã³ããExecuteã¡ãœããã«å ¥ããšãWhenã¡ãœããã«ãªãã€ã¬ã¯ããããŸãã
public void When(CreateCustomer c) { Update(c.Id, a => a.Create(c.Id,c.Name, c.Currency, _pricingService, DateTime.UtcNow)); }
Updateã¡ãœããã§ã¯ãéèšã®èå¥åãšãéèšã®ç¶æ ãå€æŽããã¡ãœãããåŒã³åºãã¢ã¯ã·ã§ã³ãæž¡ããŸãã äžè¬çã«ãç§ã®æèŠã§ã¯ããã®ã³ã³ããã¹ãã§Createã¡ãœãããåŒã³åºãããšã¯å°ãå¥åŠã«èŠããã®ã§ãéçŽã§Createã¡ãœãããäœæããã®ã§ã¯ãªããå¥ã®ã³ã³ã¹ãã©ã¯ã¿ãŒãäœæããæ¹ãè¯ãã§ãã éèšãäœæããŠããããã§ãããäœããã®çç±ã§ãç¶æ ãå€æŽããã¡ãœãããšããŠCreateã¡ãœãããæž¡ããŸãã ãã¶ã€ããŒã«ãšã£ãŠãããã¯èµ·ãããªãã£ãã§ãããã
Updateã¡ãœããã«æ»ãã次ã®ã¿ã¹ã¯ãå®è¡ããŸãã1ïŒçŸåšã®éçŽã®ãã¹ãŠã®ã€ãã³ããããŒãããŸãã2ïŒããŒããããã€ãã³ãã䜿çšããŠéçŽãäœæãããã®ç¶æ ã埩å ããŸãã3ïŒéçŽã§æž¡ãããActionå®è¡ã¢ã¯ã·ã§ã³ãå®è¡ãã4ïŒæ°ããã€ãã³ããããã°ä¿åããŸãã
void Update(CustomerId id, Action<Customer> execute) { // Load event stream from the store EventStream stream = _eventStore.LoadEventStream(id); // create new Customer aggregate from the history Customer customer = new Customer(stream.Events); // execute delegated action execute(customer); // append resulting changes to the stream _eventStore.AppendToStream(id, stream.Version, customer.Changes); }
ç¶æ å埩
ååã®èšäºã§ç€ºããäŸã§ã¯ãéçŽã®ç¶æ ã¯éçŽã¯ã©ã¹ã®ãã©ã€ããŒããã£ãŒã«ããšããŠä¿åãããŠããŸããã ã¹ãããã·ã§ãããè¿œå ããå Žåãããã¯ããŸã䟿å©ã§ã¯ãããŸããã æ¯åäœããã®åœ¢ã§ç¶æ ãåžãåºããããªãã¬ã¯ã·ã§ã³ã䜿çšããå¿ èŠããããŸãã Rinatã«ã¯ã¯ããã«äŸ¿å©ãªã¢ãããŒãããããŸã-ç¶æ ã«ã¯CustomerStateãšåŒã°ããå¥ã®ã¯ã©ã¹ããããéçŽããæ圱ã¡ãœãããæœåºããããšãã§ããã¹ãããã·ã§ããã¯äŸã«ãããŸãããä¿åãšããŒããã¯ããã«ç°¡åã§ãã
ã芧ã®ãšãããåãéçŽã®ã€ãã³ãã®ãªã¹ãã¯ã³ã³ã¹ãã©ã¯ã¿ãŒã«è»¢éãããŸããããã¯ãç¶æ ã埩å ããããã«æšæž¬ããã®ãé£ãããªãããã§ãã
ãŠãããã¯ãç¶æ ã®å埩ãCustomerStateã¯ã©ã¹ã«å§ä»»ããŸãã
/// <summary> /// Aggregate state, which is separate from this class in order to ensure, /// that we modify it ONLY by passing events. /// </summary> readonly CustomerState _state; public Customer(IEnumerable<IEvent> events) { _state = new CustomerState(events); }
CustomerStateã¯ã©ã¹å šäœã®ã³ãŒããæäŸããããã€ãã®Whenæ圱ã¡ãœãããåé€ããŸãã
/// <summary> /// This is the state of the customer aggregate. /// It can be mutated only by passing events to it. /// </summary> public class CustomerState { public string Name { get; private set; } public bool Created { get; private set; } public CustomerId Id { get; private set; } public bool ConsumptionLocked { get; private set; } public bool ManualBilling { get; private set; } public Currency Currency { get; private set; } public CurrencyAmount Balance { get; private set; } public int MaxTransactionId { get; private set; } public CustomerState(IEnumerable<IEvent> events) { foreach (var e in events) { Mutate(e); } } ... public void When(CustomerCreated e) { Created = true; Name = e.Name; Id = e.Id; Currency = e.Currency; Balance = new CurrencyAmount(0, e.Currency); } public void When(CustomerRenamed e) { Name = e.Name; } public void Mutate(IEvent e) { // .NET magic to call one of the 'When' handlers with // matching signature ((dynamic) this).When((dynamic)e); } }
ã³ã³ã¹ãã©ã¯ã¿ãŒã§ç¢ºèªã§ããããã«ãæž¡ãããã€ãã³ããå®è¡ããããããMutateã¡ãœããã«æž¡ããŸããMutateã¡ãœããã¯ãããããããã«é©åãªæ圱ã¡ãœããã«åŒã蟌ã¿ãŸãã
ãã¹ãŠã®ããããã£ã«ã¯ãã©ã€ããŒãã»ãã¿ãŒã¡ãœãããããã察å¿ããã€ãã³ããæž¡ãããšã«ãã£ãŠã®ã¿å€éšããç¶æ ãå€æŽã§ããããšãä¿èšŒãããŠããŸãã
ç¶æ ã埩å ãããã®ã§ãå€æŽããããšãã§ããŸãã ç¶æ å€æŽã¡ãœããã«å°ãæ»ããŸãããã CreateCustomerã³ãã³ããç解ãå§ããã®ã§ããŠãããã®Createã¡ãœãããèŠãŠãããŸãã
public void Create(CustomerId id, string name, Currency currency, IPricingService service, DateTime utc) { if (_state.Created) throw new InvalidOperationException("Customer was already created"); Apply(new CustomerCreated { Created = utc, Name = name, Id = id, Currency = currency }); var bonus = service.GetWelcomeBonus(currency); if (bonus.Amount > 0) AddPayment("Welcome bonus", bonus, utc); }
ããã¯ããŠãããã®çŸåšã®ç¶æ ã«ã¢ã¯ã»ã¹ã§ãããããããžãã¹ã«ãŒã«ã確èªããå Žæã§ãã Customerã¯1åããäœæã§ããªããšããããžãã¹ã«ãŒã«ããããŸãïŒãã ããæè¡çãªå¶éããããŸãïŒã®ã§ãäœææžã¿ã®ãŠãããã§CreateãåŒã³åºãããšãããšãã¢ã¯ã·ã§ã³ãåé€ãããŸãã
ãã¹ãŠã®ããžãã¹ã«ãŒã«ãæºããããŠããå ŽåãCustomerCreatedã€ãã³ããApplyã¡ãœããã«æž¡ããŸãã Applyã¡ãœããã¯éåžžã«åçŽã§ãã€ãã³ããç¶æ ãªããžã§ã¯ãã«æž¡ããçŸåšã®å€æŽã®ãªã¹ãã«è¿œå ããã ãã§ãã
public readonly IList<IEvent> Changes = new List<IEvent>(); readonly CustomerState _state; void Apply(IEvent e) { // pass each event to modify current in-memory state _state.Mutate(e); // append event to change list for further persistence Changes.Add(e); }
ã»ãšãã©ã®å ŽåãéçŽã䜿çšããæäœããšã«1ã€ã®ã€ãã³ããçºçããŸãã ãã ããCreateã¡ãœããã®å Žåã¯ã2ã€ã®ã€ãã³ããçºçããå¯èœæ§ããããŸãã
CustomerCreateã€ãã³ããApplyã¡ãœããã«æž¡ããåŸãããžãã¹ã«ãŒã«ã§ããŒãã¹é¡ããŒããã倧ããããå¿ èŠãããå ŽåãçŸåšã®é¡§å®¢ã«ãŠã§ã«ã«ã ããŒãã¹ãè¿œå ã§ããŸãã ãã®å ŽåãAddPaymentéèšã¡ãœãããåŒã³åºãããŸããããã¯ããã§ãã¯ãéèšãããCustomerPaymentAddedã€ãã³ããããªã¬ãŒããã ãã§ãã
public void AddPayment(string name, CurrencyAmount amount, DateTime utc) { Apply(new CustomerPaymentAdded() { Id = _state.Id, Payment = amount, NewBalance = _state.Balance + amount, PaymentName = name, Transaction = _state.MaxTransactionId + 1, TimeUtc = utc }); }
次ã«ãæ°ããã€ãã³ããä¿åããèªã¿åãã¢ãã«ã§å ¬éããå¿ èŠããããŸãã ããã次ã®è¡ã ãšæããŸãã
// append resulting changes to the stream _eventStore.AppendToStream(id, stream.Version, customer.Changes);
確èªããŠãã ãã...
public void AppendToStream(IIdentity id, long originalVersion, ICollection<IEvent> events) { if (events.Count == 0) return; var name = IdentityToString(id); var data = SerializeEvent(events.ToArray()); try { _appendOnlyStore.Append(name, data, originalVersion); } catch(AppendOnlyStoreConcurrencyException e) { // load server events var server = LoadEventStream(id, 0, int.MaxValue); // throw a real problem throw OptimisticConcurrencyException.Create(server.Version, e.ExpectedStreamVersion, id, server.Events); } // technically there should be a parallel process that queries new changes // from the event store and sends them via messages (avoiding 2PC problem). // however, for demo purposes, we'll just send them to the console from here Console.ForegroundColor = ConsoleColor.DarkGreen; foreach (var @event in events) { Console.WriteLine(" {0} r{1} Event: {2}", id,originalVersion, @event); } Console.ForegroundColor = ConsoleColor.DarkGray; }
ãŸããã»ãšãã©æºè¶³ããŠããŸãã ã€ãã³ãã¯ã·ãªã¢ã«åãããè¿œå å°çšã¹ãã¢ã«ä¿åãããŸãïŒåé€ããã³å€æŽã¯è¡ããŸããïŒã ããããRinatã¯ïŒãã¬ãŒã³ããŒã·ã§ã³ã¬ãã«ã§ïŒèªã¿åãã¢ãã«ã«éä¿¡ãã代ããã«ãåã«ã³ã³ãœãŒã«ã«è¡šç€ºããŸãã
ãã ããããšãã°ãããã§ååã§ãã
ãããã¡ãã»ãŒãžãã¥ãŒã§ã©ã®ããã«æ©èœãããã確èªãããå Žåã¯ã以åã®èšäºã®githubã§äŸãèŠãããšãã§ããŸãã ãããå°ãå€æŽããEvent Sourcingã€ã³ãã©ã¹ãã©ã¯ãã£ã®äžéšããœãªã¥ãŒã·ã§ã³ã«å°å ¥ããŸããã ãã®äŸã§ã¯ãã¹ãããã·ã§ãããè¿œå ããæ¹æ³ã瀺ããŸãã
ã¹ãããã·ã§ãã
ã¹ãããã·ã§ããã¯ããŠãããã®ç¶æ ã®äžéã¹ãããã·ã§ãããååŸããããã«å¿ èŠã§ãã ããã«ããããŠãããã®ã€ãã³ãã®ãããŒå šäœãããŠã³ããŒãããã®ã§ã¯ãªããæåŸã®ã¹ãããã·ã§ãããäœæããåŸã«çºçããã€ãã³ãã®ã¿ãããŠã³ããŒãã§ããŸãã
ããã§ã¯ãå®è£ ãèŠãŠã¿ãŸãããã
UserCommandHandlerã¯ã©ã¹ã«ã¯ãRinatã®ã¡ãœãããšéåžžã«ãã䌌ãUpdateã¡ãœããããããŸãããç§ã«ãšã£ãŠã¯ãã¹ãããã·ã§ãããä¿åããã³ããŒãããŸãã 100ããŒãžã§ã³ããšã«ã¹ãããã·ã§ãããäœæããŸãã
private const int SnapshotInterval = 100;
ãŸãããªããžããªããã¹ãããã·ã§ãããäœæãããã®ã¹ãããã·ã§ãããäœæããåŸã«çºçããã€ãã³ãã®ã¹ããªãŒã ãèªã¿èŸŒã¿ãŸãã 次ã«ãããããã¹ãŠããŠãããã®èšèšè ã«æž¡ããŸãã
private void Update(string userId, Action<UserAR> updateAction) { var snapshot = _snapshotRepository.Load(userId); var startVersion = snapshot == null ? 0 : snapshot.StreamVersion + 1; var stream = _eventStore.OpenStream(userId, startVersion, int.MaxValue); var user = new UserAR(snapshot, stream); updateAction(user); var originalVersion = stream.GetVersion(); _eventStore.AppendToStream(userId, originalVersion, user.Changes); var newVersion = originalVersion + 1; if (newVersion % SnapshotInterval == 0) { _snapshotRepository.Save(new Snapshot(userId, newVersion,user.State)); } }
ã³ã³ã¹ãã©ã¯ã¿ãŒã§ã¯ãã¹ãããã·ã§ããããç¶æ ãååŸããããã¹ãããã·ã§ããããªãå Žåã¯æ°ããç¶æ ãäœæããããšããŸãã åä¿¡ç¶æ ã§ã¯ãæ¬ èœããŠããã€ãã³ãããã¹ãŠå€±ããããã®çµæããŠãããã®çŸåšã®ããŒãžã§ã³ãååŸãããŸãã
public UserAR(Snapshot snapshot, TransitionStream stream) { _state = snapshot != null ? (UserState) snapshot.Payload : new UserState(); foreach (var transition in stream.Read()) { foreach (var @event in transition.Events) { _state.Mutate((IEvent) @event.Data); } } }
ã¢ã°ãªã²ãŒããæäœããåŸãã¢ã°ãªã²ãŒãããŒãžã§ã³ãã¹ãããã·ã§ãããååŸããééã®åæ°ã§ãããã©ããã確èªããããã§ããã°ãæ°ããã¹ãããã·ã§ããããªããžããªã«ä¿åããŸãã UserCommandHandlerããéèšã®ç¶æ ãååŸããã«ã¯ããã®Stateããããã£ãå éšã²ãã¿ãŒã«ããå¿ èŠããããŸããã
以äžã§ãã¹ãããã·ã§ãããã§ããŸãããããã«ãããå€ãã®ã€ãã³ããããå Žåã«ããŠãããã®ç¶æ ãã¯ããã«é«éã«åŸ©å ã§ããŸãã
ãã£ãŒãããã¯
CQRS + ESã®ãããã¯ã«èå³ãããå Žåã¯ãã³ã¡ã³ãã§è³ªåããŠãã ããã ããã«å¥åããªãå Žåã¯ãSkypeã§ç§ã«æçŽãæžãããšãã§ããŸã æè¿ããã§ãªã£ãã³ã¹ã¯ã®å人ãç§ã®Skypeãããã¯ããŸããã圌ã®è³ªåã®ãããã§ã次ã®èšäºã®ã¢ã€ãã¢ããããããããŸããã èªç±ã«äœ¿ããããã«ãªã£ãã®ã§ããã£ãšé »ç¹ã«æžãã€ããã§ãã