如何使用JUnit测试触发异步进程的方法?

我不知道如何让我的测试等待流程结束(它不是一个确切的单元测试,它更像一个集成测试,因为它涉及到几个类,而不仅仅是一个)。


当前回答

我找到一个库套接字。IO来测试异步逻辑。使用LinkedBlockingQueue看起来简单。这里有一个例子:

    @Test(timeout = TIMEOUT)
public void message() throws URISyntaxException, InterruptedException {
    final BlockingQueue<Object> values = new LinkedBlockingQueue<Object>();

    socket = client();
    socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
        @Override
        public void call(Object... objects) {
            socket.send("foo", "bar");
        }
    }).on(Socket.EVENT_MESSAGE, new Emitter.Listener() {
        @Override
        public void call(Object... args) {
            values.offer(args);
        }
    });
    socket.connect();

    assertThat((Object[])values.take(), is(new Object[] {"hello client"}));
    assertThat((Object[])values.take(), is(new Object[] {"foo", "bar"}));
    socket.disconnect();
}

使用LinkedBlockingQueue使用API来阻塞直到得到结果,就像同步方式一样。并设置超时,以避免假设有太多时间等待结果。

其他回答

对于所有Spring用户来说,这是我现在通常做集成测试的方式,其中涉及到异步行为:

当异步任务(例如I/O调用)完成时,在生产代码中触发应用程序事件。大多数情况下,这个事件对于处理生产中异步操作的响应是必要的。

有了这个事件,您就可以在测试用例中使用以下策略:

执行测试中的系统 监听事件并确保事件已经触发 做你的断言

要解决这个问题,首先需要触发某种域事件。我在这里使用UUID来标识已完成的任务,但您当然可以使用其他东西,只要它是唯一的。

(注意,下面的代码片段也使用Lombok注释来摆脱锅炉板代码)

@RequiredArgsConstructor
class TaskCompletedEvent() {
  private final UUID taskId;
  // add more fields containing the result of the task if required
}

产品代码本身通常是这样的:

@Component
@RequiredArgsConstructor
class Production {

  private final ApplicationEventPublisher eventPublisher;

  void doSomeTask(UUID taskId) {
    // do something like calling a REST endpoint asynchronously
    eventPublisher.publishEvent(new TaskCompletedEvent(taskId));
  }

}

然后我可以使用Spring @EventListener在测试代码中捕获已发布的事件。事件监听器稍微复杂一点,因为它必须以线程安全的方式处理两种情况:

生产代码比测试用例快,并且在测试用例检查事件之前事件已经触发,或者 测试用例比生产代码快,并且测试用例必须等待事件。

在这里的其他答案中提到的第二种情况使用了CountDownLatch。还要注意,事件处理程序方法上的@Order注释确保在生产中使用的任何其他事件侦听器之后调用该事件处理程序方法。

@Component
class TaskCompletionEventListener {

  private Map<UUID, CountDownLatch> waitLatches = new ConcurrentHashMap<>();
  private List<UUID> eventsReceived = new ArrayList<>();

  void waitForCompletion(UUID taskId) {
    synchronized (this) {
      if (eventAlreadyReceived(taskId)) {
        return;
      }
      checkNobodyIsWaiting(taskId);
      createLatch(taskId);
    }
    waitForEvent(taskId);
  }

  private void checkNobodyIsWaiting(UUID taskId) {
    if (waitLatches.containsKey(taskId)) {
      throw new IllegalArgumentException("Only one waiting test per task ID supported, but another test is already waiting for " + taskId + " to complete.");
    }
  }

  private boolean eventAlreadyReceived(UUID taskId) {
    return eventsReceived.remove(taskId);
  }

  private void createLatch(UUID taskId) {
    waitLatches.put(taskId, new CountDownLatch(1));
  }

  @SneakyThrows
  private void waitForEvent(UUID taskId) {
    var latch = waitLatches.get(taskId);
    latch.await();
  }

  @EventListener
  @Order
  void eventReceived(TaskCompletedEvent event) {
    var taskId = event.getTaskId();
    synchronized (this) {
      if (isSomebodyWaiting(taskId)) {
        notifyWaitingTest(taskId);
      } else {
        eventsReceived.add(taskId);
      }
    }
  }

  private boolean isSomebodyWaiting(UUID taskId) {
    return waitLatches.containsKey(taskId);
  }

  private void notifyWaitingTest(UUID taskId) {
    var latch = waitLatches.remove(taskId);
    latch.countDown();
  }

}

最后一步是在测试用例中执行被测试的系统。我在这里使用JUnit 5进行SpringBoot测试,但这对于使用Spring上下文的所有测试都应该是一样的。

@SpringBootTest
class ProductionIntegrationTest {

  @Autowired
  private Production sut;

  @Autowired
  private TaskCompletionEventListener listener;

  @Test
  void thatTaskCompletesSuccessfully() {
    var taskId = UUID.randomUUID();
    sut.doSomeTask(taskId);
    listener.waitForCompletion(taskId);
    // do some assertions like looking into the DB if value was stored successfully
  }

}

请注意,与这里的其他答案相比,如果您并行执行测试,并且多个线程同时执行异步代码,则此解决方案也可以工作。

如果您使用CompletableFuture(在Java 8中引入)或SettableFuture(来自谷歌Guava),您可以在测试完成后立即完成测试,而不是等待预先设置的时间。你的测试应该是这样的:

CompletableFuture<String> future = new CompletableFuture<>();
executorService.submit(new Runnable() {         
    @Override
    public void run() {
        future.complete("Hello World!");                
    }
});
assertEquals("Hello World!", future.get());

注意,有一个库为pre Java-8提供了CompletableFuture,它甚至使用了相同的名称(并提供了所有相关的Java-8类),例如: net.sourceforge.streamsupport: streamsupport-minifuture: 1.7.4 这对于Android开发很有用,即使我们使用JDK-v11构建,我们也希望保持代码与前Android-7设备兼容。

值得一提的是,《并发实践》中有一章非常有用,它描述了一些单元测试方法,并给出了解决问题的方案。

您可以尝试使用await库。这使得测试您所谈论的系统变得很容易。

尽可能避免使用并行线程进行测试(大多数时候都是这样)。这只会使您的测试不可靠(有时通过,有时失败)。

只有当你需要调用其他库/系统时,你可能不得不等待其他线程,在这种情况下,总是使用await库而不是Thread.sleep()。

永远不要在测试中只调用get()或join(),否则您的测试可能会在CI服务器上一直运行,以防将来永远无法完成。在调用get()之前,始终在测试中首先断言isDone()。对于CompletionStage,就是. tocompletablefuture (). isdone()。

当你像这样测试一个非阻塞方法时:

public static CompletionStage<String> createGreeting(CompletableFuture<String> future) {
    return future.thenApply(result -> "Hello " + result);
}

那么你不应该仅仅通过在测试中传递一个完整的Future来测试结果,你还应该确保你的方法doSomething()不会通过调用join()或get()来阻塞。如果使用非阻塞框架,这一点尤其重要。

要做到这一点,测试一个未完成的未来,你手动设置为完成:

@Test
public void testDoSomething() throws Exception {
    CompletableFuture<String> innerFuture = new CompletableFuture<>();
    CompletableFuture<String> futureResult = createGreeting(innerFuture).toCompletableFuture();
    assertFalse(futureResult.isDone());

    // this triggers the future to complete
    innerFuture.complete("world");
    assertTrue(futureResult.isDone());

    // futher asserts about fooResult here
    assertEquals(futureResult.get(), "Hello world");
}

这样,如果您将future.join()添加到doSomething(),测试将失败。

如果你的服务使用ExecutorService,比如applyasync(…, executorService),然后在你的测试中注入一个单线程的executorService,比如来自guava的:

ExecutorService executorService = Executors.newSingleThreadExecutor();

如果你的代码使用forkJoinPool,比如applyasync(…),重写代码使用ExecutorService(有很多好的理由),或者使用await。

为了缩短示例,我将BarService设置为测试中作为Java8 lambda实现的方法参数,通常它将是您将模拟的注入引用。