如何使用JUnit测试触发异步进程的方法?

我不知道如何让我的测试等待流程结束(它不是一个确切的单元测试,它更像一个集成测试,因为它涉及到几个类,而不仅仅是一个)。


当前回答

对于所有Spring用户来说,这是我现在通常做集成测试的方式,其中涉及到异步行为:

当异步任务(例如I/O调用)完成时,在生产代码中触发应用程序事件。大多数情况下,这个事件对于处理生产中异步操作的响应是必要的。

有了这个事件,您就可以在测试用例中使用以下策略:

执行测试中的系统 监听事件并确保事件已经触发 做你的断言

要解决这个问题,首先需要触发某种域事件。我在这里使用UUID来标识已完成的任务,但您当然可以使用其他东西,只要它是唯一的。

(注意,下面的代码片段也使用Lombok注释来摆脱锅炉板代码)

@RequiredArgsConstructor
class TaskCompletedEvent() {
  private final UUID taskId;
  // add more fields containing the result of the task if required
}

产品代码本身通常是这样的:

@Component
@RequiredArgsConstructor
class Production {

  private final ApplicationEventPublisher eventPublisher;

  void doSomeTask(UUID taskId) {
    // do something like calling a REST endpoint asynchronously
    eventPublisher.publishEvent(new TaskCompletedEvent(taskId));
  }

}

然后我可以使用Spring @EventListener在测试代码中捕获已发布的事件。事件监听器稍微复杂一点,因为它必须以线程安全的方式处理两种情况:

生产代码比测试用例快,并且在测试用例检查事件之前事件已经触发,或者 测试用例比生产代码快,并且测试用例必须等待事件。

在这里的其他答案中提到的第二种情况使用了CountDownLatch。还要注意,事件处理程序方法上的@Order注释确保在生产中使用的任何其他事件侦听器之后调用该事件处理程序方法。

@Component
class TaskCompletionEventListener {

  private Map<UUID, CountDownLatch> waitLatches = new ConcurrentHashMap<>();
  private List<UUID> eventsReceived = new ArrayList<>();

  void waitForCompletion(UUID taskId) {
    synchronized (this) {
      if (eventAlreadyReceived(taskId)) {
        return;
      }
      checkNobodyIsWaiting(taskId);
      createLatch(taskId);
    }
    waitForEvent(taskId);
  }

  private void checkNobodyIsWaiting(UUID taskId) {
    if (waitLatches.containsKey(taskId)) {
      throw new IllegalArgumentException("Only one waiting test per task ID supported, but another test is already waiting for " + taskId + " to complete.");
    }
  }

  private boolean eventAlreadyReceived(UUID taskId) {
    return eventsReceived.remove(taskId);
  }

  private void createLatch(UUID taskId) {
    waitLatches.put(taskId, new CountDownLatch(1));
  }

  @SneakyThrows
  private void waitForEvent(UUID taskId) {
    var latch = waitLatches.get(taskId);
    latch.await();
  }

  @EventListener
  @Order
  void eventReceived(TaskCompletedEvent event) {
    var taskId = event.getTaskId();
    synchronized (this) {
      if (isSomebodyWaiting(taskId)) {
        notifyWaitingTest(taskId);
      } else {
        eventsReceived.add(taskId);
      }
    }
  }

  private boolean isSomebodyWaiting(UUID taskId) {
    return waitLatches.containsKey(taskId);
  }

  private void notifyWaitingTest(UUID taskId) {
    var latch = waitLatches.remove(taskId);
    latch.countDown();
  }

}

最后一步是在测试用例中执行被测试的系统。我在这里使用JUnit 5进行SpringBoot测试,但这对于使用Spring上下文的所有测试都应该是一样的。

@SpringBootTest
class ProductionIntegrationTest {

  @Autowired
  private Production sut;

  @Autowired
  private TaskCompletionEventListener listener;

  @Test
  void thatTaskCompletesSuccessfully() {
    var taskId = UUID.randomUUID();
    sut.doSomeTask(taskId);
    listener.waitForCompletion(taskId);
    // do some assertions like looking into the DB if value was stored successfully
  }

}

请注意,与这里的其他答案相比,如果您并行执行测试,并且多个线程同时执行异步代码,则此解决方案也可以工作。

其他回答

对于测试异步方法,我发现一个非常有用的方法是在测试对象的构造函数中注入Executor实例。在生产中,执行器实例被配置为异步运行,而在测试中,它可以被模拟为同步运行。

所以假设我试图测试异步方法Foo#doAsync(Callback c),

class Foo {
  private final Executor executor;
  public Foo(Executor executor) {
    this.executor = executor;
  }

  public void doAsync(Callback c) {
    executor.execute(new Runnable() {
      @Override public void run() {
        // Do stuff here
        c.onComplete(data);
      }
    });
  }
}

在生产中,我会用Executors.newSingleThreadExecutor() Executor实例构造Foo,而在测试中,我可能会用执行以下操作的同步执行器构造它

class SynchronousExecutor implements Executor {
  @Override public void execute(Runnable r) {
    r.run();
  }
}

现在异步方法的JUnit测试非常干净

@Test public void testDoAsync() {
  Executor executor = new SynchronousExecutor();
  Foo objectToTest = new Foo(executor);

  Callback callback = mock(Callback.class);
  objectToTest.doAsync(callback);

  // Verify that Callback#onComplete was called using Mockito.
  verify(callback).onComplete(any(Data.class));

  // Assert that we got back the data that we expected.
  assertEquals(expectedData, callback.getData());
}

恕我直言,让单元测试创建或等待线程是一种糟糕的实践。您希望这些测试在瞬间运行。这就是为什么我想提出一个测试异步进程的两步方法。

测试您的异步进程是否正确提交。您可以模拟接受异步请求的对象,并确保提交的作业具有正确的属性,等等。 测试你的异步回调是否在做正确的事情。在这里,您可以模拟最初提交的作业,并假设它已正确初始化,并验证您的回调是否正确。

对于所有Spring用户来说,这是我现在通常做集成测试的方式,其中涉及到异步行为:

当异步任务(例如I/O调用)完成时,在生产代码中触发应用程序事件。大多数情况下,这个事件对于处理生产中异步操作的响应是必要的。

有了这个事件,您就可以在测试用例中使用以下策略:

执行测试中的系统 监听事件并确保事件已经触发 做你的断言

要解决这个问题,首先需要触发某种域事件。我在这里使用UUID来标识已完成的任务,但您当然可以使用其他东西,只要它是唯一的。

(注意,下面的代码片段也使用Lombok注释来摆脱锅炉板代码)

@RequiredArgsConstructor
class TaskCompletedEvent() {
  private final UUID taskId;
  // add more fields containing the result of the task if required
}

产品代码本身通常是这样的:

@Component
@RequiredArgsConstructor
class Production {

  private final ApplicationEventPublisher eventPublisher;

  void doSomeTask(UUID taskId) {
    // do something like calling a REST endpoint asynchronously
    eventPublisher.publishEvent(new TaskCompletedEvent(taskId));
  }

}

然后我可以使用Spring @EventListener在测试代码中捕获已发布的事件。事件监听器稍微复杂一点,因为它必须以线程安全的方式处理两种情况:

生产代码比测试用例快,并且在测试用例检查事件之前事件已经触发,或者 测试用例比生产代码快,并且测试用例必须等待事件。

在这里的其他答案中提到的第二种情况使用了CountDownLatch。还要注意,事件处理程序方法上的@Order注释确保在生产中使用的任何其他事件侦听器之后调用该事件处理程序方法。

@Component
class TaskCompletionEventListener {

  private Map<UUID, CountDownLatch> waitLatches = new ConcurrentHashMap<>();
  private List<UUID> eventsReceived = new ArrayList<>();

  void waitForCompletion(UUID taskId) {
    synchronized (this) {
      if (eventAlreadyReceived(taskId)) {
        return;
      }
      checkNobodyIsWaiting(taskId);
      createLatch(taskId);
    }
    waitForEvent(taskId);
  }

  private void checkNobodyIsWaiting(UUID taskId) {
    if (waitLatches.containsKey(taskId)) {
      throw new IllegalArgumentException("Only one waiting test per task ID supported, but another test is already waiting for " + taskId + " to complete.");
    }
  }

  private boolean eventAlreadyReceived(UUID taskId) {
    return eventsReceived.remove(taskId);
  }

  private void createLatch(UUID taskId) {
    waitLatches.put(taskId, new CountDownLatch(1));
  }

  @SneakyThrows
  private void waitForEvent(UUID taskId) {
    var latch = waitLatches.get(taskId);
    latch.await();
  }

  @EventListener
  @Order
  void eventReceived(TaskCompletedEvent event) {
    var taskId = event.getTaskId();
    synchronized (this) {
      if (isSomebodyWaiting(taskId)) {
        notifyWaitingTest(taskId);
      } else {
        eventsReceived.add(taskId);
      }
    }
  }

  private boolean isSomebodyWaiting(UUID taskId) {
    return waitLatches.containsKey(taskId);
  }

  private void notifyWaitingTest(UUID taskId) {
    var latch = waitLatches.remove(taskId);
    latch.countDown();
  }

}

最后一步是在测试用例中执行被测试的系统。我在这里使用JUnit 5进行SpringBoot测试,但这对于使用Spring上下文的所有测试都应该是一样的。

@SpringBootTest
class ProductionIntegrationTest {

  @Autowired
  private Production sut;

  @Autowired
  private TaskCompletionEventListener listener;

  @Test
  void thatTaskCompletesSuccessfully() {
    var taskId = UUID.randomUUID();
    sut.doSomeTask(taskId);
    listener.waitForCompletion(taskId);
    // do some assertions like looking into the DB if value was stored successfully
  }

}

请注意,与这里的其他答案相比,如果您并行执行测试,并且多个线程同时执行异步代码,则此解决方案也可以工作。

这里有很多答案,但一个简单的答案是创建一个完整的CompletableFuture并使用它:

CompletableFuture.completedFuture("donzo")

所以在我的测试中:

this.exactly(2).of(mockEventHubClientWrapper).sendASync(with(any(LinkedList.class)));
this.will(returnValue(new CompletableFuture<>().completedFuture("donzo")));

我只是确保所有这些东西都会被调用。如果你使用下面的代码,这个技巧是有效的:

CompletableFuture.allOf(calls.toArray(new CompletableFuture[0])).join();

它将压缩通过它,因为所有的CompletableFutures都完成了!

我找到一个库套接字。IO来测试异步逻辑。使用LinkedBlockingQueue看起来简单。这里有一个例子:

    @Test(timeout = TIMEOUT)
public void message() throws URISyntaxException, InterruptedException {
    final BlockingQueue<Object> values = new LinkedBlockingQueue<Object>();

    socket = client();
    socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
        @Override
        public void call(Object... objects) {
            socket.send("foo", "bar");
        }
    }).on(Socket.EVENT_MESSAGE, new Emitter.Listener() {
        @Override
        public void call(Object... args) {
            values.offer(args);
        }
    });
    socket.connect();

    assertThat((Object[])values.take(), is(new Object[] {"hello client"}));
    assertThat((Object[])values.take(), is(new Object[] {"foo", "bar"}));
    socket.disconnect();
}

使用LinkedBlockingQueue使用API来阻塞直到得到结果,就像同步方式一样。并设置超时,以避免假设有太多时间等待结果。