Testing multithreaded and asynchronous code

Hello! This week the task was to write an integration test for a Spring Boot application using asynchronous interaction with external systems. Refreshed a lot of material about debugging multithreaded code. The article โ€œTesting Multi-Threaded and Asynchronous Codeโ€ by Jonathan Halterman, my translation of which is given below, attracted attention.



If you write the code long enough or maybe not, then you probably came across a script in which you need to test multi-threaded code. It is generally believed that threads and tests should not be mixed. This usually happens because what is to be tested just starts inside a multi-threaded system and can be tested individually without using threads. But what if you cannot separate them, or more, if multithreading is that aspect of the code that you are testing?



I am here to tell you that although the threads in the tests are not very common, they are quite used. The software police will not arrest you for starting a thread in a unit test, although how to actually test multi-threaded code is another matter. Some excellent asynchronous technologies, such as Akka and Vert.x, provide test suites to ease this burden. But beyond that, testing multi-threaded code usually requires a different approach than a typical synchronous unit test.



We go parallel



The first step is to launch any multithreaded action for which you want to check the result. For example, let's use a hypothetical API to register a message handler on a message bus and publish a message on a bus that will be delivered to our handler asynchronously in a separate thread:



messageBus.registerHandler(message - > { System.out.println("Received " + message); }); messageBus.publish("test");
      
      





Looks good. When the test starts, the bus should deliver our message to the handler in another thread, but this is not very useful, since we do not check anything. Let's update our test to confirm that the message bus delivers our message as expected:



 String msg = "test"; messageBus.registerHandler(message -> { System.out.println("Received " + message); assertEquals(message, msg); }; messageBus.publish(msg);
      
      





It looks better. We run our test and it is green. Cool! But the Received message was not printed anywhere, something was wrong somewhere.



Wait a second



In the test above, when a message is published on the message bus, it is delivered by the bus to the handler in another thread. But when a unit testing tool such as JUnit runs a test, it knows nothing about message bus flows. JUnit only knows about the main thread in which it runs the test. Thus, while the message bus is busy trying to deliver the message, the test completes execution in the main test thread and JUnit reports success. How to solve this? We need the main test thread to wait for the message bus to deliver our message. So let's add a sleep statement:



 String msg = "test"; messageBus.registerHandler(message -> { System.out.println("Received " + message); assertEquals(message, msg); }; messageBus.publish(msg); Thread.sleep(1000);
      
      





Our test is green and the expression Received is printed as expected. Cool! But one second of sleep means that our test is performed for at least one second, and there is nothing good in it. We could reduce sleep time, but then we run the risk of ending the test before receiving a message. We need a way to coordinate between the main test thread and the message handler thread. Looking at the java.util.concurrent package, we are sure to find what we can use. What about CountDownLatch ?



 String msg = "test"; CountDownLatch latch = new CountDownLatch(1); messageBus.registerHandler(message -> { System.out.println("Received " + message); assertEquals(message, msg); latch.countDown(); }; messageBus.publish(msg); latch.await();
      
      





In this approach, we share the CountDownLatch between the main test thread and the message handler thread. The main thread is forced to wait on the blocker. The test thread releases the pending main thread by calling countDown () on the blocker after receiving the message. We no longer need to sleep for one second. Our test takes exactly as much time as needed.



So happy?



With our new charm, CountDownLatch, we begin to write multi-threaded tests, like the latest fashionistas. But pretty quickly, we notice that one of our test cases is blocked forever and does not end. What is going on? Consider the message bus scenario: the blocker makes you wait, but it is released only after receiving the message. If the bus does not work and the message is never delivered, the test will never end. So let's add a timeout to the blocker:



 latch.await(1, TimeUnit.SECONDS);
      
      





A test that is blocked fails after 1 second with a TimeoutException exception. In the end, we will find the problem and fix the test, but decide to leave the timeouts in place. If this ever happens again, we would prefer our test to lock for a second and crash, than to block forever and not be completed at all.

Another problem we notice when writing tests is that they all seem to pass even when they probably shouldn't. How is this possible? Consider the message processing test again:



 messageBus.registerHandler(message -> { assertEquals(message, msg); latch.countDown(); };
      
      





We should have used CountDownLatch to coordinate the completion of our test with the main test thread, but what about asserts? If validation fails, will JUnit know about it? It turns out that since we do not perform validation in the main test thread, any flawed checks remain completely unnoticed by JUnit. Let's try a little script to test this:



 CountDownLatch latch = new CountDownLatch(1); new Thread(() -> { assertTrue(false); latch.countDown(); }).start(); latch.await();
      
      





The test is green! So what do we do now? We need a way to send any testing errors from the message handler stream back to the main test stream. If a failure occurs in the message handler thread, we need it to reappear in the main thread so that the test flips, as expected. Let's try to do this:



 CountDownLatch latch = new CountDownLatch(1); AtomicReference<AssertionError> failure = new AtomicReference<>(); new Thread(() -> { try { assertTrue(false); } catch (AssertionError e) { failure.set(e); } latch.countDown(); }).start(); latch.await(); if (failure.get() != null) throw failure.get();
      
      





Quick start and yes, the test fails, as it should! Now we can go back and add CountDownLatches, try / catch and AtomicReference blocks to all our test cases. Cool! Actually, not cool, it looks like a boilerplate.



Cut out the trash



Ideally, we need an API that allows us to coordinate the waiting, checking, and resuming execution between threads, so that unit tests can pass or fail as expected, regardless of where the check fails. Fortunately, ConcurrentUnit provides a lightweight framework that does just that: Waiter. Let's adapt the message processing test above for the last time and see what Waiter from ConcurrentUnit can do for us:



 String msg = "test"; Waiter waiter = new Waiter(); messageBus.registerHandler(message -> { waiter.assertEquals(message, msg); waiter.resume(); }; messageBus.publish(msg); waiter.await(1, TimeUnit.SECONDS);
      
      





In this test, we see that Waiter has taken the place of our CountDownLatch and AtomicReference. With Waiter, we block the main test thread, perform the test, then resume the main test thread so that the test can complete. If the check fails, then calling waiter.await will automatically release the lock and throw a failure, which will cause the test to pass or fail, as it should, even if the check was carried out from another thread.



Even more parallel



Now that we have become certified multi-threaded testers, we might want to confirm that several asynchronous actions are occurring. ConcurrentUnit's Waiter makes this simple:



 Waiter waiter = new Waiter(); messageBus.registerHandler(message -> { waiter.resume(); }; messageBus.publish("one"); messageBus.publish("two"); waiter.await(1, TimeUnit.SECONDS);
      
      





Here we publish two messages on the bus and verify that both messages are delivered, making Waiter wait until resume () is called 2 times. If messages are not delivered and resume is not called twice within 1 second, then the test will fail with a TimeoutException error.

One general tip with this approach is to make sure your timeouts are long enough to complete any concurrent actions. Under normal conditions, when the system under test works as expected, the timeout does not matter and takes effect only in the event of a system failure for any reason.



Summary



In this article, we learned that multithreaded unit testing is not evil and is fairly easy to do. We learned about the general approach when we block the main test thread, perform checks from some other threads, and then resume the main thread. And we learned about ConcurrentUnit , which can facilitate this task.

Happy testing!



Translated by @middle_java



All Articles