Java Spring FrameworkのRabbitMQ遅延メッセージ交換の例

画像 この投稿では、RabbitMQで保留中のメッセージを使用する方法を示します。 遅延キューを使用すると便利なタスクの例として、ポストバックメカニズム( s2s ping backs2s pixel )を取り上げます



ポストバックメカニズムの概要:



1.イベントがあります

2.アプリケーションは、このイベントについてサードパーティのサービスに通知する必要があります。

3.サードパーティのサービスが利用できなかった場合は、数分後に再度通知を繰り返す必要があります



再通知するために、遅延キューを使用します。



デフォルトでは、RabbitMQはメッセージの遅延方法を認識せず、発行後すぐに配信されます。 遅延配信機能は、 rabbitmq-delayed-message-exchangeプラグインとして利用できます。



プラグインは実験的なものです。 一般的に非常に安定しているという事実にもかかわらず、それはあなた自身の危険とリスクで本番で使用されるべきです。



RMQとプラグインを使用してDockerコンテナーを構築する



基礎として、テストに役立つ管理プラグインを使用して公式イメージを取得します。



Dockerfile:



FROM rabbitmq:3.6-management RUN apt-get update && apt-get install -y curl RUN curl http://www.rabbitmq.com/community-plugins/v3.6.x/rabbitmq_delayed_message_exchange-0.0.1.ez > $RABBITMQ_HOME/plugins/rabbitmq_delayed_message_exchange-0.0.1.ez RUN rabbitmq-plugins enable --offline rabbitmq_delayed_message_exchange
      
      





組立
 docker build --tag=x25/rmq-delayed-message-exchange .
      
      





打ち上げ
 docker run -d --name rmq -p 5672:5672 -p 15672:15672 x25/rmq-delayed-message-exchange
      
      





春のAMQP



Spring Frameworkは、 pring-rabbit



プロジェクトのプラグインを完全にサポートしています。 バージョン1.6.4以降では、xml / bean構成と注釈の両方を使用できます。



Spring Boot Amqp Starterを使用します。



Mavenの依存関係
 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
      
      





gradleの依存関係
 compile "org.springframework.boot:spring-boot-starter-amqp"
      
      





注釈による構成



ブートストラップと注釈により、Springは作業の大部分を担います。 書くだけ:



 @RabbitListener(bindings = @QueueBinding(value = @Queue(value = DELAY_QUEUE_NAME), exchange = @Exchange(value = DELAY_EXCHANGE_NAME, delayed = "true"), key = DELAY_QUEUE_NAME)) public void onMessage(Message<?> message) { //... }
      
      





また、アプリケーションが起動すると、RabbitAdminはdelayed exchange



queue



、イベントハンドラーの作成を自動的に宣言し、アノテーション付きメソッドにアタッチします。



メッセージを処理するためにより多くのスレッドが必要ですか? これは、外部構成ファイル( spring.rabbitmq.listener.concurrencyプロパティー)またはアノテーションのcontainerFactoryパラメーターを介して構成さます。



 // : @Configuration public class RabbitConfiguration { @Bean(name = "containerFactory") @Autowired public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setConcurrentConsumers(10); factory.setPrefetchCount(30); return factory; } } // : @RabbitListener(containerFactory = "containerFactory", ...)
      
      





RabbitTemplateを使用して保留中のメッセージを送信すると便利です。



 rabbitTemplate.send( DELAY_EXCHANGE_NAME, DELAY_QUEUE_NAME, MessageBuilder .withBody(data) .setHeader("x-delay", MESSAGE_DELAY_MS).build() );
      
      





すぐに送信されますが、 x-delay



ヘッダーで指定された遅延で配信されます。 最大許容遅延時間(2 ^ 32-1)ms。



サンプルアプリケーションの準備:



 @SpringBootApplication public class Application { private final Logger log = LoggerFactory.getLogger(Application.class); private final static String DELAY_QUEUE_NAME = "delayed.queue"; private final static String DELAY_EXCHANGE_NAME = "delayed.exchange"; private final static String DELAY_HEADER = "x-delay"; private final static String NUM_ATTEMPT_HEADER = "x-num-attempt"; private final static long RETRY_BACKOFF = 5000; @Autowired private RabbitTemplate rabbitTemplate; @RabbitListener(bindings = @QueueBinding(value = @Queue(value = DELAY_QUEUE_NAME), exchange = @Exchange(value = DELAY_EXCHANGE_NAME, delayed = "true"), key = DELAY_QUEUE_NAME)) public void onMessage(Message<byte[]> message) { String endpointUrl = new String(message.getPayload()); Long numAttempt = (Long) message.getHeaders().getOrDefault(NUM_ATTEMPT_HEADER, 1L); log.info("Message received, url={}, attempt={}", endpointUrl, numAttempt); if (!doNotifyEndpoint(endpointUrl) && numAttempt < 3) { rabbitTemplate.send( DELAY_EXCHANGE_NAME, DELAY_QUEUE_NAME, MessageBuilder .withBody(message.getPayload()) .setHeader(DELAY_HEADER, numAttempt * RETRY_BACKOFF) .setHeader(NUM_ATTEMPT_HEADER, numAttempt + 1) .build() ); } } private boolean doNotifyEndpoint(String url) { try { HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection(); /* @todo: set up connection timeouts */ return (connection.getResponseCode() == 200); } catch (Exception e) { log.error(e.getMessage()); return false; } } public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
      
      





application.yml
 spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / listener: prefetch: 10 concurrency: 50
      
      





build.gradle
 buildscript { repositories { mavenCentral() } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:1.4.2.RELEASE") } } apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'idea' apply plugin: 'org.springframework.boot' jar { baseName = 'rmq-delayed-demo' version = '0.1.0' } repositories { mavenCentral() } sourceCompatibility = 1.8 targetCompatibility = 1.8 dependencies { compile("org.springframework.boot:spring-boot-starter-amqp") testCompile("org.springframework.boot:spring-boot-starter-test") }
      
      





遅延配信はコントロールパネル(rmq-management)で確認します。ポート15672で利用可能です:



画像



ログ:



 2016-12-21 14:27:25.927: Message received, url=http://localhost:1234, attempt=1 2016-12-21 14:27:25.941: Connection refused (Connection refused) 2016-12-21 14:27:30.946: Message received, url=http://localhost:1234, attempt=2 2016-12-21 14:27:30.951: Connection refused (Connection refused) 2016-12-21 14:27:40.954: Message received, url=http://localhost:1234, attempt=3
      
      





XML設定



XML構成を使用するtrue



、エクスチェンジBeanでdelayed



プロパティをtrue



に設定するだけで、RabbitAdminが残りを自動的に実行します。



Integration Frameworkと組み合わせたxml構成の例
 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <int:channel id="to-delayed-rmq" /> <int-amqp:outbound-channel-adapter channel="to-delayed-rmq" amqp-template="rabbitTemplate" exchange-name="delayed.exchange" routing-key="delayed.binding" mapped-request-headers="x-delay" /> <int-amqp:inbound-channel-adapter channel="from-delayed-rmq-queue" queue-names="delayed.queue" message-converter="amqpMessageConverter" connection-factory="rabbitConnectionFactory" concurrent-consumers="10" prefetch-count="50" /> <int:service-activator input-channel="from-delayed-rmq-queue" method="onMessage"> <bean id="postbackServiceActivator" class="PostbackServiceActivator" /> </int:service-activator> <rabbit:queue name="delayed.queue" /> <rabbit:direct-exchange name="delayed.exchange" delayed="true"> <rabbit:bindings> <rabbit:binding queue="delayed.queue" key="delayed.binding" /> </rabbit:bindings> </rabbit:direct-exchange> </beans>
      
      





便利なリンク






All Articles