如何使用JUnit测试触发异步进程的方法?

我不知道如何让我的测试等待流程结束(它不是一个确切的单元测试,它更像一个集成测试,因为它涉及到几个类,而不仅仅是一个)。


启动进程并使用Future等待结果。


恕我直言,让单元测试创建或等待线程是一种糟糕的实践。您希望这些测试在瞬间运行。这就是为什么我想提出一个测试异步进程的两步方法。

测试您的异步进程是否正确提交。您可以模拟接受异步请求的对象,并确保提交的作业具有正确的属性,等等。 测试你的异步回调是否在做正确的事情。在这里,您可以模拟最初提交的作业,并假设它已正确初始化,并验证您的回调是否正确。


TL,博士;不幸的是,目前还没有内置的解决方案(在撰写本文时,2022年),因此您可以自由使用和/或实现任何适合您的情况。

例子

另一种方法是使用CountDownLatch类。

public class DatabaseTest {

    /**
     * Data limit
     */
    private static final int DATA_LIMIT = 5;

    /**
     * Countdown latch
     */
    private CountDownLatch lock = new CountDownLatch(1);

    /**
     * Received data
     */
    private List<Data> receiveddata;

    @Test
    public void testDataRetrieval() throws Exception {
        Database db = new MockDatabaseImpl();
        db.getData(DATA_LIMIT, new DataCallback() {
            @Override
            public void onSuccess(List<Data> data) {
                receiveddata = data;
                lock.countDown();
            }
        });

        lock.await(2000, TimeUnit.MILLISECONDS);

        assertNotNull(receiveddata);
        assertEquals(DATA_LIMIT, receiveddata.size());
    }
}

注意:不能只使用与常规对象同步的对象作为锁,因为快速回调可以在锁的wait方法被调用之前释放锁。请参阅Joe Walnes的博客文章。

由于@jtahlborn和@Ring的评论,删除了CountDownLatch周围的同步块


您可以尝试使用await库。这使得测试您所谈论的系统变得很容易。


调用SomeObject怎么样?或者使用Robotiums Solo.waitForCondition(…)方法,或者使用我写的一个类来做到这一点(如何使用,请参阅注释和测试类)


对于测试异步方法,我发现一个非常有用的方法是在测试对象的构造函数中注入Executor实例。在生产中,执行器实例被配置为异步运行,而在测试中,它可以被模拟为同步运行。

所以假设我试图测试异步方法Foo#doAsync(Callback c),

class Foo {
  private final Executor executor;
  public Foo(Executor executor) {
    this.executor = executor;
  }

  public void doAsync(Callback c) {
    executor.execute(new Runnable() {
      @Override public void run() {
        // Do stuff here
        c.onComplete(data);
      }
    });
  }
}

在生产中,我会用Executors.newSingleThreadExecutor() Executor实例构造Foo,而在测试中,我可能会用执行以下操作的同步执行器构造它

class SynchronousExecutor implements Executor {
  @Override public void execute(Runnable r) {
    r.run();
  }
}

现在异步方法的JUnit测试非常干净

@Test public void testDoAsync() {
  Executor executor = new SynchronousExecutor();
  Foo objectToTest = new Foo(executor);

  Callback callback = mock(Callback.class);
  objectToTest.doAsync(callback);

  // Verify that Callback#onComplete was called using Mockito.
  verify(callback).onComplete(any(Data.class));

  // Assert that we got back the data that we expected.
  assertEquals(expectedData, callback.getData());
}

如果您使用CompletableFuture(在Java 8中引入)或SettableFuture(来自谷歌Guava),您可以在测试完成后立即完成测试,而不是等待预先设置的时间。你的测试应该是这样的:

CompletableFuture<String> future = new CompletableFuture<>();
executorService.submit(new Runnable() {         
    @Override
    public void run() {
        future.complete("Hello World!");                
    }
});
assertEquals("Hello World!", future.get());

注意,有一个库为pre Java-8提供了CompletableFuture,它甚至使用了相同的名称(并提供了所有相关的Java-8类),例如: net.sourceforge.streamsupport: streamsupport-minifuture: 1.7.4 这对于Android开发很有用,即使我们使用JDK-v11构建,我们也希望保持代码与前Android-7设备兼容。


我更喜欢使用等待和通知。它简单明了。

@Test
public void test() throws Throwable {
    final boolean[] asyncExecuted = {false};
    final Throwable[] asyncThrowable= {null};

    // do anything async
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                // Put your test here.
                fail(); 
            }
            // lets inform the test thread that there is an error.
            catch (Throwable throwable){
                asyncThrowable[0] = throwable;
            }
            // ensure to release asyncExecuted in case of error.
            finally {
                synchronized (asyncExecuted){
                    asyncExecuted[0] = true;
                    asyncExecuted.notify();
                }
            }
        }
    }).start();

    // Waiting for the test is complete
    synchronized (asyncExecuted){
        while(!asyncExecuted[0]){
            asyncExecuted.wait();
        }
    }

    // get any async error, including exceptions and assertationErrors
    if(asyncThrowable[0] != null){
        throw asyncThrowable[0];
    }
}

基本上,我们需要创建一个最终的数组引用,在匿名内部类中使用。我宁愿创建一个布尔[],因为如果我们需要wait(),我可以设置一个值来控制。当一切都完成后,我们只需释放asyncExecuted。


如果你想测试逻辑,不要异步测试。

例如,测试这段代码,它对异步方法的结果起作用。

public class Example {
    private Dependency dependency;

    public Example(Dependency dependency) {
        this.dependency = dependency;            
    }

    public CompletableFuture<String> someAsyncMethod(){
        return dependency.asyncMethod()
                .handle((r,ex) -> {
                    if(ex != null) {
                        return "got exception";
                    } else {
                        return r.toString();
                    }
                });
    }
}

public class Dependency {
    public CompletableFuture<Integer> asyncMethod() {
        // do some async stuff       
    }
}

在测试中模拟使用同步实现的依赖关系。单元测试是完全同步的,运行时间为150毫秒。

public class DependencyTest {
    private Example sut;
    private Dependency dependency;

    public void setup() {
        dependency = Mockito.mock(Dependency.class);;
        sut = new Example(dependency);
    }

    @Test public void success() throws InterruptedException, ExecutionException {
        when(dependency.asyncMethod()).thenReturn(CompletableFuture.completedFuture(5));

        // When
        CompletableFuture<String> result = sut.someAsyncMethod();

        // Then
        assertThat(result.isCompletedExceptionally(), is(equalTo(false)));
        String value = result.get();
        assertThat(value, is(equalTo("5")));
    }

    @Test public void failed() throws InterruptedException, ExecutionException {
        // Given
        CompletableFuture<Integer> c = new CompletableFuture<Integer>();
        c.completeExceptionally(new RuntimeException("failed"));
        when(dependency.asyncMethod()).thenReturn(c);

        // When
        CompletableFuture<String> result = sut.someAsyncMethod();

        // Then
        assertThat(result.isCompletedExceptionally(), is(equalTo(false)));
        String value = result.get();
        assertThat(value, is(equalTo("got exception")));
    }
}

你不测试异步行为,但你可以测试逻辑是否正确。


如果测试结果是异步生成的,这就是我现在使用的方法。

public class TestUtil {

    public static <R> R await(Consumer<CompletableFuture<R>> completer) {
        return await(20, TimeUnit.SECONDS, completer);
    }

    public static <R> R await(int time, TimeUnit unit, Consumer<CompletableFuture<R>> completer) {
        CompletableFuture<R> f = new CompletableFuture<>();
        completer.accept(f);
        try {
            return f.get(time, unit);
        } catch (InterruptedException | TimeoutException e) {
            throw new RuntimeException("Future timed out", e);
        } catch (ExecutionException e) {
            throw new RuntimeException("Future failed", e.getCause());
        }
    }
}

使用静态导入,测试读起来还不错。 (注意,在这个例子中,我开始一个线程来说明这个想法)

    @Test
    public void testAsync() {
        String result = await(f -> {
            new Thread(() -> f.complete("My Result")).start();
        });
        assertEquals("My Result", result);
    }

如果未调用f.f complete,测试将在超时后失败。你也可以使用f.c earteexceptions来提前失败。


这里有很多答案,但一个简单的答案是创建一个完整的CompletableFuture并使用它:

CompletableFuture.completedFuture("donzo")

所以在我的测试中:

this.exactly(2).of(mockEventHubClientWrapper).sendASync(with(any(LinkedList.class)));
this.will(returnValue(new CompletableFuture<>().completedFuture("donzo")));

我只是确保所有这些东西都会被调用。如果你使用下面的代码,这个技巧是有效的:

CompletableFuture.allOf(calls.toArray(new CompletableFuture[0])).join();

它将压缩通过它,因为所有的CompletableFutures都完成了!


我找到一个库套接字。IO来测试异步逻辑。使用LinkedBlockingQueue看起来简单。这里有一个例子:

    @Test(timeout = TIMEOUT)
public void message() throws URISyntaxException, InterruptedException {
    final BlockingQueue<Object> values = new LinkedBlockingQueue<Object>();

    socket = client();
    socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
        @Override
        public void call(Object... objects) {
            socket.send("foo", "bar");
        }
    }).on(Socket.EVENT_MESSAGE, new Emitter.Listener() {
        @Override
        public void call(Object... args) {
            values.offer(args);
        }
    });
    socket.connect();

    assertThat((Object[])values.take(), is(new Object[] {"hello client"}));
    assertThat((Object[])values.take(), is(new Object[] {"foo", "bar"}));
    socket.disconnect();
}

使用LinkedBlockingQueue使用API来阻塞直到得到结果,就像同步方式一样。并设置超时,以避免假设有太多时间等待结果。


值得一提的是,《并发实践》中有一章非常有用,它描述了一些单元测试方法,并给出了解决问题的方案。


尽可能避免使用并行线程进行测试(大多数时候都是这样)。这只会使您的测试不可靠(有时通过,有时失败)。

只有当你需要调用其他库/系统时,你可能不得不等待其他线程,在这种情况下,总是使用await库而不是Thread.sleep()。

永远不要在测试中只调用get()或join(),否则您的测试可能会在CI服务器上一直运行,以防将来永远无法完成。在调用get()之前,始终在测试中首先断言isDone()。对于CompletionStage,就是. tocompletablefuture (). isdone()。

当你像这样测试一个非阻塞方法时:

public static CompletionStage<String> createGreeting(CompletableFuture<String> future) {
    return future.thenApply(result -> "Hello " + result);
}

那么你不应该仅仅通过在测试中传递一个完整的Future来测试结果,你还应该确保你的方法doSomething()不会通过调用join()或get()来阻塞。如果使用非阻塞框架,这一点尤其重要。

要做到这一点,测试一个未完成的未来,你手动设置为完成:

@Test
public void testDoSomething() throws Exception {
    CompletableFuture<String> innerFuture = new CompletableFuture<>();
    CompletableFuture<String> futureResult = createGreeting(innerFuture).toCompletableFuture();
    assertFalse(futureResult.isDone());

    // this triggers the future to complete
    innerFuture.complete("world");
    assertTrue(futureResult.isDone());

    // futher asserts about fooResult here
    assertEquals(futureResult.get(), "Hello world");
}

这样,如果您将future.join()添加到doSomething(),测试将失败。

如果你的服务使用ExecutorService,比如applyasync(…, executorService),然后在你的测试中注入一个单线程的executorService,比如来自guava的:

ExecutorService executorService = Executors.newSingleThreadExecutor();

如果你的代码使用forkJoinPool,比如applyasync(…),重写代码使用ExecutorService(有很多好的理由),或者使用await。

为了缩短示例,我将BarService设置为测试中作为Java8 lambda实现的方法参数,通常它将是您将模拟的注入引用。


对于所有Spring用户来说,这是我现在通常做集成测试的方式,其中涉及到异步行为:

当异步任务(例如I/O调用)完成时,在生产代码中触发应用程序事件。大多数情况下,这个事件对于处理生产中异步操作的响应是必要的。

有了这个事件,您就可以在测试用例中使用以下策略:

执行测试中的系统 监听事件并确保事件已经触发 做你的断言

要解决这个问题,首先需要触发某种域事件。我在这里使用UUID来标识已完成的任务,但您当然可以使用其他东西,只要它是唯一的。

(注意,下面的代码片段也使用Lombok注释来摆脱锅炉板代码)

@RequiredArgsConstructor
class TaskCompletedEvent() {
  private final UUID taskId;
  // add more fields containing the result of the task if required
}

产品代码本身通常是这样的:

@Component
@RequiredArgsConstructor
class Production {

  private final ApplicationEventPublisher eventPublisher;

  void doSomeTask(UUID taskId) {
    // do something like calling a REST endpoint asynchronously
    eventPublisher.publishEvent(new TaskCompletedEvent(taskId));
  }

}

然后我可以使用Spring @EventListener在测试代码中捕获已发布的事件。事件监听器稍微复杂一点,因为它必须以线程安全的方式处理两种情况:

生产代码比测试用例快,并且在测试用例检查事件之前事件已经触发,或者 测试用例比生产代码快,并且测试用例必须等待事件。

在这里的其他答案中提到的第二种情况使用了CountDownLatch。还要注意,事件处理程序方法上的@Order注释确保在生产中使用的任何其他事件侦听器之后调用该事件处理程序方法。

@Component
class TaskCompletionEventListener {

  private Map<UUID, CountDownLatch> waitLatches = new ConcurrentHashMap<>();
  private List<UUID> eventsReceived = new ArrayList<>();

  void waitForCompletion(UUID taskId) {
    synchronized (this) {
      if (eventAlreadyReceived(taskId)) {
        return;
      }
      checkNobodyIsWaiting(taskId);
      createLatch(taskId);
    }
    waitForEvent(taskId);
  }

  private void checkNobodyIsWaiting(UUID taskId) {
    if (waitLatches.containsKey(taskId)) {
      throw new IllegalArgumentException("Only one waiting test per task ID supported, but another test is already waiting for " + taskId + " to complete.");
    }
  }

  private boolean eventAlreadyReceived(UUID taskId) {
    return eventsReceived.remove(taskId);
  }

  private void createLatch(UUID taskId) {
    waitLatches.put(taskId, new CountDownLatch(1));
  }

  @SneakyThrows
  private void waitForEvent(UUID taskId) {
    var latch = waitLatches.get(taskId);
    latch.await();
  }

  @EventListener
  @Order
  void eventReceived(TaskCompletedEvent event) {
    var taskId = event.getTaskId();
    synchronized (this) {
      if (isSomebodyWaiting(taskId)) {
        notifyWaitingTest(taskId);
      } else {
        eventsReceived.add(taskId);
      }
    }
  }

  private boolean isSomebodyWaiting(UUID taskId) {
    return waitLatches.containsKey(taskId);
  }

  private void notifyWaitingTest(UUID taskId) {
    var latch = waitLatches.remove(taskId);
    latch.countDown();
  }

}

最后一步是在测试用例中执行被测试的系统。我在这里使用JUnit 5进行SpringBoot测试,但这对于使用Spring上下文的所有测试都应该是一样的。

@SpringBootTest
class ProductionIntegrationTest {

  @Autowired
  private Production sut;

  @Autowired
  private TaskCompletionEventListener listener;

  @Test
  void thatTaskCompletesSuccessfully() {
    var taskId = UUID.randomUUID();
    sut.doSomeTask(taskId);
    listener.waitForCompletion(taskId);
    // do some assertions like looking into the DB if value was stored successfully
  }

}

请注意,与这里的其他答案相比,如果您并行执行测试,并且多个线程同时执行异步代码,则此解决方案也可以工作。


假设你有这样的代码:

public void method() {
        CompletableFuture.runAsync(() -> {
            //logic
            //logic
            //logic
            //logic
        });
    }

试着把它重构成这样:

public void refactoredMethod() {
    CompletableFuture.runAsync(this::subMethod);
}

private void subMethod() {
    //logic
    //logic
    //logic
    //logic
}

之后,用下面的方法测试subMethod:

org.powermock.reflect.Whitebox.invokeMethod(classInstance, "subMethod"); 

这不是一个完美的解决方案,但它测试了异步执行中的所有逻辑。


JUnit 5有断言。assertTimeout(Duration, Executable)/ asserttimeoutpreempemptive()(请阅读每个的Javadoc以了解差异)和Mockito有verify(mock, timeout(毫秒).times(x))。

Assertions.assertTimeout(Duration.ofMillis(1000), () -> 
    myReactiveService.doSth().subscribe()
);

And:

Mockito.verify(myReactiveService, 
    timeout(1000).times(0)).doSth(); // cannot use never() here

管道中的超时可能是不确定的/脆弱的。所以要小心。