Shedding with Quartz at Masstransit

This article will explain how to use Masstransit and Quartz to configure a sheduler for sending messages to the bus. A bus is an abstraction of a weak connection between those who send messages and those who subscribe to them through the bus. The sender and receiver of messages know only the type of message (this is the interface), but they do not know anything about each other.



Quartz control in Masstransit takes place through a queue, we allocate a queue for this:



Startup.cs



cfg.UseInMemoryScheduler(sheduleConfig.QuartzQueue);
      
      





View all code



At the point of access to the sheduler (in my example, this is the controller method), we get the ISendEndpoint object for the given queue:



 ISendEndpoint _sendEndpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri($"rabbitmq://{configs.Host}/{configs.QuartzQueue}"));
      
      





And we send a message with the settings for periodic message sheduling to a certain queue:



 _sheduler.ScheduledRecurringMessage = await _sendEndpoint.ScheduleRecurringSend<IRepeatingCommand>( new Uri($"rabbitmq://{configs.Host}/{configs.DestinationQueue}"), new PollExternalSystemSchedule(), new { Message = $"1 sec Quartz {User.Identity.Name}" });
      
      





PollExternalSystemSchedule frequency settings



  public class PollExternalSystemSchedule : DefaultRecurringSchedule { public PollExternalSystemSchedule() { StartTime = DateTime.UtcNow; CronExpression = "* * * * * ? *";//1 sec } }
      
      





The line for setting the frequency can be generated here .



You can already bind the consumers to the bus to which the sheduler will send messages (those who receive the message):



 cfg.ReceiveEndpoint(host, sheduleConfig.DestinationQueue, e => { e.ConfigureConsumer<ShedulerCommandConsumer>(provider); });
      
      





The consumer is tied to the address (queue address above) and the interface:



  public class ShedulerCommandConsumer : IConsumer<IRepeatingCommand> { private ILogger<ShedulerCommandConsumer> _logger; public ShedulerCommandConsumer(ILogger<ShedulerCommandConsumer> loger) { _logger = loger; } public Task Consume(ConsumeContext<IRepeatingCommand> context) { _logger.LogInformation($"Call consumer: {typeof(ShedulerCommandConsumer)} {context.Message.Message} {DateTime.UtcNow} "); return Task.CompletedTask; } }
      
      





To stop the sheduler, call:



 await _sendEndpoint.CancelScheduledRecurringSend(_sheduler.ScheduledRecurringMessage);
      
      





Source



I can also tell you about the work of the Saga state machine (in a real project, messages can also go through it).



All Articles