Synchronous Request-Response using Apache Kafka

Event Driven Architecture in general, and Apache Kafka in particular, have attracted much attention recently. To take full advantage of the event-driven architecture, the event delegation mechanism must be essentially asynchronous. However, there may be some special usage scenarios / flows that require the semantics of a Synchronous Request-Response . This release shows how to implement Request-Response using Apache Kafka .



Translated by @middle_java



Original article date: 26 October 2018



Apache Kafka is inherently asynchronous. Therefore, the Request-Response semantics for Apache Kafka are not natural. However, this challenge is not new. The Enterprise Integration Pattern Request-Reply provides a proven mechanism for synchronous messaging over asynchronous channels:







The Return Address pattern complements the Request-Reply pattern with a mechanism for the requester to indicate the address to which the response should be sent:







Recently in Spring Kafka 2.1.3 support was added out of the box of the “Request Reply” pattern, and in version 2.2 some of its roughnesses were polished. Let's see how this support works:



Client Side: ReplyingKafkaTemplate



The well-known abstraction of the Template forms the basis for the client part of the Request-Reply mechanism in Spring.



@Bean public ReplyingKafkaTemplate < String, Request, Reply > replyKafkaTemplate( ProducerFactory < String, Request > pf, KafkaMessageListenerContainer < String, Reply > lc) { return new ReplyingKafkaTemplate < > (pf, lc); }
      
      





Everything is pretty straight forward here: we set up ReplyingKafkaTemplate , which sends request messages with String keys and receives response messages with String keys. However, the ReplyingKafkaTemplate must be based on the ProducerFactory Request, ConsumerFactory Response, and MessageListenerContainer with the appropriate consumer and producer configurations. Therefore, the required configuration is pretty weighty:



  @Value("${kafka.topic.car.reply}") private String replyTopic; @Bean public Map < String, Object > consumerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return props; } @Bean public Map < String, Object > producerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ProducerFactory < String, Request > requestProducerFactory() { return new DefaultKafkaProducerFactory < > (producerConfigs()); } @Bean public ConsumerFactory < String, Reply > replyConsumerFactory() { return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(), new JsonSerializer < Reply > ()); } @Bean public KafkaMessageListenerContainer < String, Reply > replyListenerContainer() { ContainerProperties containerProperties = new ContainerProperties(replyTopic); return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties); }
      
      





In this case, using replyKafkaTemplate to send a synchronous request and receive a response is as follows:



 @Value("${kafka.topic.car.request}") private String requestTopic; @Value("${kafka.topic.car.reply}") private String replyTopic; @Autowired private ReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate; ... RequestReply request = RequestReply.request(...); // producer record ProducerRecord < String, Request > record = new ProducerRecord < String, Request > (requestTopic, request); //       record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes())); //     Kafka          RequestReplyFuture < String, Request, Reply > sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record); sendAndReceive.addCallback(new ListenableFutureCallback < ConsumerRecord < String, Reply >> () { @Override public void onSuccess(ConsumerRecord < String, Reply > result) { //   consumer record Reply reply = result.value(); System.out.println("Reply: " + reply.toString()); } });
      
      





There is also a lot of boilerplate and a low-level API, and even this obsolete ListenableFuture API instead of the modern CompletableFuture .



requestReplyKafkaTemplate takes care of generating and setting the KafkaHeaders.CORRELATION_ID header, but we must explicitly set the KafkaHeaders.REPLY_TOPIC header for the request. Please also note that the same topic for the answer was too unintentional above in replyListenerContainer . Some muck. Not quite what I expected from the Spring abstraction.



Server Side: @SendTo



On the server side, the usual KafkaListener listening on the topic for the request is additionally decorated with @SendTo annotation to provide a response message. The object returned by the listener method is automatically wrapped in the response message, CORRELATION_ID is added , and the response is published in the topic specified in the REPLY_TOPIC header.



  @Bean public Map < String, Object > consumerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return props; } @Bean public Map < String, Object > producerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ConsumerFactory < String, Request > requestConsumerFactory() { return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(), new JsonSerializer < Request > ()); } @Bean public KafkaListenerContainerFactory < ConcurrentMessageListenerContainer < String, Request >> requestListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory < String, Request > factory = new ConcurrentKafkaListenerContainerFactory < > (); factory.setConsumerFactory(requestConsumerFactory()); factory.setReplyTemplate(replyTemplate()); return factory; } @Bean public ProducerFactory < String, Reply > replyProducerFactory() { return new DefaultKafkaProducerFactory < > (producerConfigs()); } @Bean public KafkaTemplate < String, Reply > replyTemplate() { return new KafkaTemplate < > (replyProducerFactory()); }
      
      





Some configuration is also required here, but the listener configuration is simpler:



  @KafkaListener(topics = "${kafka.topic.car.request}", containerFactory = "requestListenerContainerFactory") @SendTo() public Reply receive(Request request) { Reply reply = ...; return reply; }
      
      





But what about multiple instances of the consumer?



Everything seems to work until we use several instances of the consumer. If we have multiple client instances, we need to make sure that the response is sent to the correct client instance. The Spring Kafka documentation assumes that each consumer can use a unique topic or that an additional KafkaHeaders header value is sent with the request. RESPONSE_PARTITION is a four-byte field containing a BIG-ENDIAN representation of the integer section number. Using separate topics for different clients is clearly not very flexible, so we choose the explicit REPLY_PARTITION setting. Then the client must know which partition it is assigned to. The documentation suggests using an explicit configuration to select a particular partition. Let's add it to our example:



  @Value("${kafka.topic.car.reply.partition}") private int replyPartition; ... @Bean public KafkaMessageListenerContainer < String, RequestReply > replyListenerContainer() { ContainerProperties containerProperties = new ContainerProperties(replyTopic); TopicPartitionInitialOffset initialOffset = new TopicPartitionInitialOffset(replyTopic, replyPartition); return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties, initialOffset); } private static byte[] intToBytesBigEndian(final int data) { return new byte[] { (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff), (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff), }; } ... record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes())); record.headers().add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition))); RequestReplyFuture < String, RequestReply, RequestReply > sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record); ...
      
      





Not very pretty, but it works. The required configuration is extensive and the API looks low level. The need to explicitly configure partitions complicates the process of dynamically scaling the number of clients. Obviously, you can do better.



Encapsulating topic processing for response and partition



Let's start by encapsulating the Return Address pattern, passing along the topic for the response and the partition. The topic for the response must be injected in the RequestReplyTemplate and, therefore, should not be present in the API at all. When it comes to partitions for an answer, we will do the opposite: we will extract the partition (s) assigned to the topic listener for the answer, and transfer this partition automatically. This eliminates the need for the client to take care of these headers.

At the same time, let's also make the API so that it resembles the standard KafkaTemplate (overload the sendAndReceive () method with simplified parameters and add the corresponding overloaded methods using the default topic):



 public class PartitionAwareReplyingKafkaTemplate < K, V, R > extends ReplyingKafkaTemplate < K, V, R > { public PartitionAwareReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory, GenericMessageListenerContainer < K, R > replyContainer) { super(producerFactory, replyContainer); } private TopicPartition getFirstAssignedReplyTopicPartition() { if (getAssignedReplyTopicPartitions() != null && getAssignedReplyTopicPartitions().iterator().hasNext()) { TopicPartition replyPartition = getAssignedReplyTopicPartitions().iterator().next(); if (this.logger.isDebugEnabled()) { this.logger.debug("Using partition " + replyPartition.partition()); } return replyPartition; } else { throw new KafkaException("Illegal state: No reply partition is assigned to this instance"); } } private static byte[] intToBytesBigEndian(final int data) { return new byte[] { (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff), (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff), }; } public RequestReplyFuture < K, V, R > sendAndReceiveDefault(@Nullable V data) { return sendAndReceive(getDefaultTopic(), data); } public RequestReplyFuture < K, V, R > sendAndReceiveDefault(K key, @Nullable V data) { return sendAndReceive(getDefaultTopic(), key, data); } ... public RequestReplyFuture < K, V, R > sendAndReceive(String topic, @Nullable V data) { ProducerRecord < K, V > record = new ProducerRecord < > (topic, data); return doSendAndReceive(record); } public RequestReplyFuture < K, V, R > sendAndReceive(String topic, K key, @Nullable V data) { ProducerRecord < K, V > record = new ProducerRecord < > (topic, key, data); return doSendAndReceive(record); } ... @Override public RequestReplyFuture < K, V, R > sendAndReceive(ProducerRecord < K, V > record) { return doSendAndReceive(record); } protected RequestReplyFuture < K, V, R > doSendAndReceive(ProducerRecord < K, V > record) { TopicPartition replyPartition = getFirstAssignedReplyTopicPartition(); record.headers() .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyPartition.topic().getBytes())) .add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition.partition()))); return super.sendAndReceive(record); } }
      
      





Next step: Adapting the ListenableFuture to the more modern CompletableFuture .



 public class CompletableFutureReplyingKafkaTemplate < K, V, R > extends PartitionAwareReplyingKafkaTemplate < K, V, R > { public CompletableFutureReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory, GenericMessageListenerContainer < K, R > replyContainer) { super(producerFactory, replyContainer); } public CompletableFuture < R > requestReplyDefault(V value) { return adapt(sendAndReceiveDefault(value)); } public CompletableFuture < R > requestReplyDefault(K key, V value) { return adapt(sendAndReceiveDefault(key, value)); } ... public CompletableFuture < R > requestReply(String topic, V value) { return adapt(sendAndReceive(topic, value)); } public CompletableFuture < R > requestReply(String topic, K key, V value) { return adapt(sendAndReceive(topic, key, value)); } ... private CompletableFuture < R > adapt(RequestReplyFuture < K, V, R > requestReplyFuture) { CompletableFuture < R > completableResult = new CompletableFuture < R > () { @Override public boolean cancel(boolean mayInterruptIfRunning) { boolean result = requestReplyFuture.cancel(mayInterruptIfRunning); super.cancel(mayInterruptIfRunning); return result; } }; //       requestReplyFuture.getSendFuture().addCallback(new ListenableFutureCallback < SendResult < K, V >> () { @Override public void onSuccess(SendResult < K, V > sendResult) { // NOOP } @Override public void onFailure(Throwable t) { completableResult.completeExceptionally(t); } }); //     requestReplyFuture.addCallback(new ListenableFutureCallback < ConsumerRecord < K, R >> () { @Override public void onSuccess(ConsumerRecord < K, R > result) { completableResult.complete(result.value()); } @Override public void onFailure(Throwable t) { completableResult.completeExceptionally(t); } }); return completableResult; } }
      
      





We pack this into a utility library and now we have an API that is much more consistent with Spring ’s basic design philosophy, “Convention over Configuration” . Here is the final client code:



  @Autowired private CompletableFutureReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate; ... requestReplyKafkaTemplate.requestReply(request).thenAccept(reply - > System.out.println("Reply: " + reply.toString()); );
      
      





To summarize



To summarize, Spring for Kafka 2.2 provides a fully functional implementation of the Request-Reply pattern in Apache Kafka, but the API still has some rough edges. In this issue, we saw that some additional abstractions and adaptations of the API can provide a more logical, high-level API.



Warning 1:

One of the main advantages of an event-driven architecture is the decoupling of event producers and consumers, which makes it possible to create much more flexible and evolving systems. Using synchronous semantics “Request-Response” is the exact opposite when the requesting and responding parties are strongly related. Therefore, it should be used only if necessary.



Warning 2:

If a synchronous Request-Response is required, then the HTTP- based protocol is much simpler and more efficient than using an asynchronous channel like Apache Kafka .

However, there may be scenarios where a synchronous Request-Response through Kafka makes sense. Reasonably choose the best tool for the job.



A fully working example can be found at github.com/callistaenterprise/blog-synchronous-kafka .



Comments



Federico • 7 months ago

And when we have hybrid needs, for example, in 50% of cases we need a Request-Response and in 50% we need event management? How do we do this? The configuration needed by Spring Kafka looks pretty awful.



Jehanzeb Qayyum • 6 months ago

Spring now has default support using partitions in one common topic for the response.



Starting with version 2.2, the template tries to determine the topic for the response or partition from the configured response container (reply container).



https://docs.spring.io/spring-kafka/reference/html/#replying-template



nir rozenberg • 8 months ago

Hi,

In the last paragraph, you wrote that there may be scenarios when a synchronous Request-Response via Kafka makes sense compared to HTTP.

Can you give examples of such scenarios?

Thanks,

Nir



Janne Keskitalo nir rozenberg • 8 months ago

We are going to split a large-volume transaction processing system into several microservices, and I have an idea to use Kafka's Request-Response messaging to achieve similar processing affinity. Basically, Kafka is used to route all calls from one client to the same transaction processor process, which then sequentially executes them one at a time. This type of processing guarantees linearizability ( https://stackoverflow.com/a/19515375/7430325 ), causal connectivity, and also allows efficient caching. Essentially, coordination efforts would be transferred from the database to Kafka, and we could start the database in Serializable strict isolation mode.

I still have to delve into the details of our transaction semantics to see where it falls short, so this is just an idea.



Translated by @middle_java



All Articles