如何使用JUnit测试触发异步进程的方法?

我不知道如何让我的测试等待流程结束(它不是一个确切的单元测试,它更像一个集成测试,因为它涉及到几个类,而不仅仅是一个)。


当前回答

对于所有Spring用户来说,这是我现在通常做集成测试的方式,其中涉及到异步行为:

当异步任务(例如I/O调用)完成时,在生产代码中触发应用程序事件。大多数情况下,这个事件对于处理生产中异步操作的响应是必要的。

有了这个事件,您就可以在测试用例中使用以下策略:

执行测试中的系统 监听事件并确保事件已经触发 做你的断言

要解决这个问题,首先需要触发某种域事件。我在这里使用UUID来标识已完成的任务,但您当然可以使用其他东西,只要它是唯一的。

(注意,下面的代码片段也使用Lombok注释来摆脱锅炉板代码)

@RequiredArgsConstructor
class TaskCompletedEvent() {
  private final UUID taskId;
  // add more fields containing the result of the task if required
}

产品代码本身通常是这样的:

@Component
@RequiredArgsConstructor
class Production {

  private final ApplicationEventPublisher eventPublisher;

  void doSomeTask(UUID taskId) {
    // do something like calling a REST endpoint asynchronously
    eventPublisher.publishEvent(new TaskCompletedEvent(taskId));
  }

}

然后我可以使用Spring @EventListener在测试代码中捕获已发布的事件。事件监听器稍微复杂一点,因为它必须以线程安全的方式处理两种情况:

生产代码比测试用例快,并且在测试用例检查事件之前事件已经触发,或者 测试用例比生产代码快,并且测试用例必须等待事件。

在这里的其他答案中提到的第二种情况使用了CountDownLatch。还要注意,事件处理程序方法上的@Order注释确保在生产中使用的任何其他事件侦听器之后调用该事件处理程序方法。

@Component
class TaskCompletionEventListener {

  private Map<UUID, CountDownLatch> waitLatches = new ConcurrentHashMap<>();
  private List<UUID> eventsReceived = new ArrayList<>();

  void waitForCompletion(UUID taskId) {
    synchronized (this) {
      if (eventAlreadyReceived(taskId)) {
        return;
      }
      checkNobodyIsWaiting(taskId);
      createLatch(taskId);
    }
    waitForEvent(taskId);
  }

  private void checkNobodyIsWaiting(UUID taskId) {
    if (waitLatches.containsKey(taskId)) {
      throw new IllegalArgumentException("Only one waiting test per task ID supported, but another test is already waiting for " + taskId + " to complete.");
    }
  }

  private boolean eventAlreadyReceived(UUID taskId) {
    return eventsReceived.remove(taskId);
  }

  private void createLatch(UUID taskId) {
    waitLatches.put(taskId, new CountDownLatch(1));
  }

  @SneakyThrows
  private void waitForEvent(UUID taskId) {
    var latch = waitLatches.get(taskId);
    latch.await();
  }

  @EventListener
  @Order
  void eventReceived(TaskCompletedEvent event) {
    var taskId = event.getTaskId();
    synchronized (this) {
      if (isSomebodyWaiting(taskId)) {
        notifyWaitingTest(taskId);
      } else {
        eventsReceived.add(taskId);
      }
    }
  }

  private boolean isSomebodyWaiting(UUID taskId) {
    return waitLatches.containsKey(taskId);
  }

  private void notifyWaitingTest(UUID taskId) {
    var latch = waitLatches.remove(taskId);
    latch.countDown();
  }

}

最后一步是在测试用例中执行被测试的系统。我在这里使用JUnit 5进行SpringBoot测试,但这对于使用Spring上下文的所有测试都应该是一样的。

@SpringBootTest
class ProductionIntegrationTest {

  @Autowired
  private Production sut;

  @Autowired
  private TaskCompletionEventListener listener;

  @Test
  void thatTaskCompletesSuccessfully() {
    var taskId = UUID.randomUUID();
    sut.doSomeTask(taskId);
    listener.waitForCompletion(taskId);
    // do some assertions like looking into the DB if value was stored successfully
  }

}

请注意,与这里的其他答案相比,如果您并行执行测试,并且多个线程同时执行异步代码,则此解决方案也可以工作。

其他回答

如果您使用CompletableFuture(在Java 8中引入)或SettableFuture(来自谷歌Guava),您可以在测试完成后立即完成测试,而不是等待预先设置的时间。你的测试应该是这样的:

CompletableFuture<String> future = new CompletableFuture<>();
executorService.submit(new Runnable() {         
    @Override
    public void run() {
        future.complete("Hello World!");                
    }
});
assertEquals("Hello World!", future.get());

注意,有一个库为pre Java-8提供了CompletableFuture,它甚至使用了相同的名称(并提供了所有相关的Java-8类),例如: net.sourceforge.streamsupport: streamsupport-minifuture: 1.7.4 这对于Android开发很有用,即使我们使用JDK-v11构建,我们也希望保持代码与前Android-7设备兼容。

我更喜欢使用等待和通知。它简单明了。

@Test
public void test() throws Throwable {
    final boolean[] asyncExecuted = {false};
    final Throwable[] asyncThrowable= {null};

    // do anything async
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                // Put your test here.
                fail(); 
            }
            // lets inform the test thread that there is an error.
            catch (Throwable throwable){
                asyncThrowable[0] = throwable;
            }
            // ensure to release asyncExecuted in case of error.
            finally {
                synchronized (asyncExecuted){
                    asyncExecuted[0] = true;
                    asyncExecuted.notify();
                }
            }
        }
    }).start();

    // Waiting for the test is complete
    synchronized (asyncExecuted){
        while(!asyncExecuted[0]){
            asyncExecuted.wait();
        }
    }

    // get any async error, including exceptions and assertationErrors
    if(asyncThrowable[0] != null){
        throw asyncThrowable[0];
    }
}

基本上,我们需要创建一个最终的数组引用,在匿名内部类中使用。我宁愿创建一个布尔[],因为如果我们需要wait(),我可以设置一个值来控制。当一切都完成后,我们只需释放asyncExecuted。

您可以尝试使用await库。这使得测试您所谈论的系统变得很容易。

对于所有Spring用户来说,这是我现在通常做集成测试的方式,其中涉及到异步行为:

当异步任务(例如I/O调用)完成时,在生产代码中触发应用程序事件。大多数情况下,这个事件对于处理生产中异步操作的响应是必要的。

有了这个事件,您就可以在测试用例中使用以下策略:

执行测试中的系统 监听事件并确保事件已经触发 做你的断言

要解决这个问题,首先需要触发某种域事件。我在这里使用UUID来标识已完成的任务,但您当然可以使用其他东西,只要它是唯一的。

(注意,下面的代码片段也使用Lombok注释来摆脱锅炉板代码)

@RequiredArgsConstructor
class TaskCompletedEvent() {
  private final UUID taskId;
  // add more fields containing the result of the task if required
}

产品代码本身通常是这样的:

@Component
@RequiredArgsConstructor
class Production {

  private final ApplicationEventPublisher eventPublisher;

  void doSomeTask(UUID taskId) {
    // do something like calling a REST endpoint asynchronously
    eventPublisher.publishEvent(new TaskCompletedEvent(taskId));
  }

}

然后我可以使用Spring @EventListener在测试代码中捕获已发布的事件。事件监听器稍微复杂一点,因为它必须以线程安全的方式处理两种情况:

生产代码比测试用例快,并且在测试用例检查事件之前事件已经触发,或者 测试用例比生产代码快,并且测试用例必须等待事件。

在这里的其他答案中提到的第二种情况使用了CountDownLatch。还要注意,事件处理程序方法上的@Order注释确保在生产中使用的任何其他事件侦听器之后调用该事件处理程序方法。

@Component
class TaskCompletionEventListener {

  private Map<UUID, CountDownLatch> waitLatches = new ConcurrentHashMap<>();
  private List<UUID> eventsReceived = new ArrayList<>();

  void waitForCompletion(UUID taskId) {
    synchronized (this) {
      if (eventAlreadyReceived(taskId)) {
        return;
      }
      checkNobodyIsWaiting(taskId);
      createLatch(taskId);
    }
    waitForEvent(taskId);
  }

  private void checkNobodyIsWaiting(UUID taskId) {
    if (waitLatches.containsKey(taskId)) {
      throw new IllegalArgumentException("Only one waiting test per task ID supported, but another test is already waiting for " + taskId + " to complete.");
    }
  }

  private boolean eventAlreadyReceived(UUID taskId) {
    return eventsReceived.remove(taskId);
  }

  private void createLatch(UUID taskId) {
    waitLatches.put(taskId, new CountDownLatch(1));
  }

  @SneakyThrows
  private void waitForEvent(UUID taskId) {
    var latch = waitLatches.get(taskId);
    latch.await();
  }

  @EventListener
  @Order
  void eventReceived(TaskCompletedEvent event) {
    var taskId = event.getTaskId();
    synchronized (this) {
      if (isSomebodyWaiting(taskId)) {
        notifyWaitingTest(taskId);
      } else {
        eventsReceived.add(taskId);
      }
    }
  }

  private boolean isSomebodyWaiting(UUID taskId) {
    return waitLatches.containsKey(taskId);
  }

  private void notifyWaitingTest(UUID taskId) {
    var latch = waitLatches.remove(taskId);
    latch.countDown();
  }

}

最后一步是在测试用例中执行被测试的系统。我在这里使用JUnit 5进行SpringBoot测试,但这对于使用Spring上下文的所有测试都应该是一样的。

@SpringBootTest
class ProductionIntegrationTest {

  @Autowired
  private Production sut;

  @Autowired
  private TaskCompletionEventListener listener;

  @Test
  void thatTaskCompletesSuccessfully() {
    var taskId = UUID.randomUUID();
    sut.doSomeTask(taskId);
    listener.waitForCompletion(taskId);
    // do some assertions like looking into the DB if value was stored successfully
  }

}

请注意,与这里的其他答案相比,如果您并行执行测试,并且多个线程同时执行异步代码,则此解决方案也可以工作。

如果你想测试逻辑,不要异步测试。

例如,测试这段代码,它对异步方法的结果起作用。

public class Example {
    private Dependency dependency;

    public Example(Dependency dependency) {
        this.dependency = dependency;            
    }

    public CompletableFuture<String> someAsyncMethod(){
        return dependency.asyncMethod()
                .handle((r,ex) -> {
                    if(ex != null) {
                        return "got exception";
                    } else {
                        return r.toString();
                    }
                });
    }
}

public class Dependency {
    public CompletableFuture<Integer> asyncMethod() {
        // do some async stuff       
    }
}

在测试中模拟使用同步实现的依赖关系。单元测试是完全同步的,运行时间为150毫秒。

public class DependencyTest {
    private Example sut;
    private Dependency dependency;

    public void setup() {
        dependency = Mockito.mock(Dependency.class);;
        sut = new Example(dependency);
    }

    @Test public void success() throws InterruptedException, ExecutionException {
        when(dependency.asyncMethod()).thenReturn(CompletableFuture.completedFuture(5));

        // When
        CompletableFuture<String> result = sut.someAsyncMethod();

        // Then
        assertThat(result.isCompletedExceptionally(), is(equalTo(false)));
        String value = result.get();
        assertThat(value, is(equalTo("5")));
    }

    @Test public void failed() throws InterruptedException, ExecutionException {
        // Given
        CompletableFuture<Integer> c = new CompletableFuture<Integer>();
        c.completeExceptionally(new RuntimeException("failed"));
        when(dependency.asyncMethod()).thenReturn(c);

        // When
        CompletableFuture<String> result = sut.someAsyncMethod();

        // Then
        assertThat(result.isCompletedExceptionally(), is(equalTo(false)));
        String value = result.get();
        assertThat(value, is(equalTo("got exception")));
    }
}

你不测试异步行为,但你可以测试逻辑是否正确。