def process(input):
# Spawns several threads to do the job
# ...
return output
process(input1) -> expect to return output1
process(input2) -> expect to return output2
GroboUtils for running multiple test threads alphaWorks ConTest to instrument classes to cause interleavings to vary between iterations Create a throwable field and check it in tearDown (see Listing 1). If you catch a bad exception in another thread, just assign it to throwable. I created the utils class in Listing 2 and have found it invaluable, especially waitForVerify and waitForCondition, which will greatly increase the performance of your tests. Make good use of AtomicBoolean in your tests. It is thread safe, and you'll often need a final reference type to store values from callback classes and suchlike. See example in Listing 3. Make sure to always give your test a timeout (e.g., @Test(timeout=60*1000)), as concurrency tests can sometimes hang forever when they're broken.
public void tearDown() {
if ( throwable != null )
throw throwable;
import static org.junit.Assert.fail;
import java.io.File;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.Random;
import org.apache.commons.collections.Closure;
import org.apache.commons.collections.Predicate;
import org.apache.commons.lang.time.StopWatch;
import org.easymock.EasyMock;
import org.easymock.classextension.internal.ClassExtensionHelper;
import static org.easymock.classextension.EasyMock.*;
import ca.digitalrapids.io.DRFileUtils;
* Various utilities for testing
public abstract class DRTestUtils
static private Random random = new Random();
/** Calls {@link #waitForCondition(Integer, Integer, Predicate, String)} with
* default max wait and check period values.
static public void waitForCondition(Predicate predicate, String errorMessage)
throws Throwable
waitForCondition(null, null, predicate, errorMessage);
/** Blocks until a condition is true, throwing an {@link AssertionError} if
* it does not become true during a given max time.
* @param maxWait_ms max time to wait for true condition. Optional; defaults
* to 30 * 1000 ms (30 seconds).
* @param checkPeriod_ms period at which to try the condition. Optional; defaults
* to 100 ms.
* @param predicate the condition
* @param errorMessage message use in the {@link AssertionError}
* @throws Throwable on {@link AssertionError} or any other exception/error
static public void waitForCondition(Integer maxWait_ms, Integer checkPeriod_ms,
Predicate predicate, String errorMessage) throws Throwable
waitForCondition(maxWait_ms, checkPeriod_ms, predicate, new Closure() {
public void execute(Object errorMessage)
}, errorMessage);
/** Blocks until a condition is true, running a closure if
* it does not become true during a given max time.
* @param maxWait_ms max time to wait for true condition. Optional; defaults
* to 30 * 1000 ms (30 seconds).
* @param checkPeriod_ms period at which to try the condition. Optional; defaults
* to 100 ms.
* @param predicate the condition
* @param closure closure to run
* @param argument argument for closure
* @throws Throwable on {@link AssertionError} or any other exception/error
static public void waitForCondition(Integer maxWait_ms, Integer checkPeriod_ms,
Predicate predicate, Closure closure, Object argument) throws Throwable
if ( maxWait_ms == null )
maxWait_ms = 30 * 1000;
if ( checkPeriod_ms == null )
checkPeriod_ms = 100;
StopWatch stopWatch = new StopWatch();
while ( !predicate.evaluate(null) ) {
if ( stopWatch.getTime() > maxWait_ms ) {
/** Calls {@link #waitForVerify(Integer, Object)} with <code>null</code>
* for {@code maxWait_ms}
static public void waitForVerify(Object easyMockProxy)
throws Throwable
waitForVerify(null, easyMockProxy);
/** Repeatedly calls {@link EasyMock#verify(Object[])} until it succeeds, or a
* max wait time has elapsed.
* @param maxWait_ms Max wait time. <code>null</code> defaults to 30s.
* @param easyMockProxy Proxy to call verify on
* @throws Throwable
static public void waitForVerify(Integer maxWait_ms, Object easyMockProxy)
throws Throwable
if ( maxWait_ms == null )
maxWait_ms = 30 * 1000;
StopWatch stopWatch = new StopWatch();
for(;;) {
catch (AssertionError e)
if ( stopWatch.getTime() > maxWait_ms )
throw e;
/** Returns a path to a directory in the temp dir with the name of the given
* class. This is useful for temporary test files.
* @param aClass test class for which to create dir
* @return the path
static public String getTestDirPathForTestClass(Object object)
String filename = object instanceof Class ?
((Class)object).getName() :
return DRFileUtils.getTempDir() + File.separator +
static public byte[] createRandomByteArray(int bytesLength)
byte[] sourceBytes = new byte[bytesLength];
return sourceBytes;
/** Returns <code>true</code> if the given object is an EasyMock mock object
static public boolean isEasyMockMock(Object object) {
try {
InvocationHandler invocationHandler = Proxy
return invocationHandler.getClass().getName().contains("easymock");
} catch (IllegalArgumentException e) {
return false;
public void testSomething() {
final AtomicBoolean called = new AtomicBoolean(false);
subject.setCallback(new SomeCallback() {
public void callback(Object arg) {
// check arg here
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Proto.Promises.Tests.Threading
public class ThreadHelper
public static readonly int multiThreadCount = Environment.ProcessorCount * 100;
private static readonly int[] offsets = new int[] { 0, 10, 100, 1000 };
private readonly Stack<Task> _executingTasks = new Stack<Task>(multiThreadCount);
private readonly Barrier _barrier = new Barrier(1);
private int _currentParticipants = 0;
private readonly TimeSpan _timeout;
public ThreadHelper() : this(TimeSpan.FromSeconds(10)) { } // 10 second timeout should be enough for most cases.
public ThreadHelper(TimeSpan timeout)
_timeout = timeout;
/// <summary>
/// Execute the action multiple times in parallel threads.
/// </summary>
public void ExecuteMultiActionParallel(Action action)
for (int i = 0; i < multiThreadCount; ++i)
/// <summary>
/// Execute the action once in a separate thread.
/// </summary>
public void ExecuteSingleAction(Action action)
/// <summary>
/// Add an action to be run in parallel.
/// </summary>
public void AddParallelAction(Action action)
var taskSource = new TaskCompletionSource<bool>();
lock (_executingTasks)
new Thread(() =>
_barrier.SignalAndWait(); // Try to make actions run in lock-step to increase likelihood of breaking race conditions.
catch (Exception e)
/// <summary>
/// Runs the pending actions in parallel, attempting to run them in lock-step.
/// </summary>
public void ExecutePendingParallelActions()
Task[] tasks;
lock (_executingTasks)
_currentParticipants = 0;
tasks = _executingTasks.ToArray();
if (!Task.WaitAll(tasks, _timeout))
throw new TimeoutException($"Action(s) timed out after {_timeout}, there may be a deadlock.");
catch (AggregateException e)
// Only throw one exception instead of aggregate to try to avoid overloading the test error output.
throw e.Flatten().InnerException;
/// <summary>
/// Run each action in parallel multiple times with differing offsets for each run.
/// <para/>The number of runs is 4^actions.Length, so be careful if you don't want the test to run too long.
/// </summary>
/// <param name="expandToProcessorCount">If true, copies each action on additional threads up to the processor count. This can help test more without increasing the time it takes to complete.
/// <para/>Example: 2 actions with 6 processors, runs each action 3 times in parallel.</param>
/// <param name="setup">The action to run before each parallel run.</param>
/// <param name="teardown">The action to run after each parallel run.</param>
/// <param name="actions">The actions to run in parallel.</param>
public void ExecuteParallelActionsWithOffsets(bool expandToProcessorCount, Action setup, Action teardown, params Action[] actions)
setup += () => { };
teardown += () => { };
int actionCount = actions.Length;
int expandCount = expandToProcessorCount ? Math.Max(Environment.ProcessorCount / actionCount, 1) : 1;
foreach (var combo in GenerateCombinations(offsets, actionCount))
for (int k = 0; k < expandCount; ++k)
for (int i = 0; i < actionCount; ++i)
int offset = combo[i];
Action action = actions[i];
AddParallelAction(() =>
for (int j = offset; j > 0; --j) { } // Just spin in a loop for the offset.
// Input: [1, 2, 3], 3
// Ouput: [
// [1, 1, 1],
// [2, 1, 1],
// [3, 1, 1],
// [1, 2, 1],
// [2, 2, 1],
// [3, 2, 1],
// [1, 3, 1],
// [2, 3, 1],
// [3, 3, 1],
// [1, 1, 2],
// [2, 1, 2],
// [3, 1, 2],
// [1, 2, 2],
// [2, 2, 2],
// [3, 2, 2],
// [1, 3, 2],
// [2, 3, 2],
// [3, 3, 2],
// [1, 1, 3],
// [2, 1, 3],
// [3, 1, 3],
// [1, 2, 3],
// [2, 2, 3],
// [3, 2, 3],
// [1, 3, 3],
// [2, 3, 3],
// [3, 3, 3]
// ]
private static IEnumerable<int[]> GenerateCombinations(int[] options, int count)
int[] indexTracker = new int[count];
int[] combo = new int[count];
for (int i = 0; i < count; ++i)
combo[i] = options[0];
// Same algorithm as picking a combination lock.
int rollovers = 0;
while (rollovers < count)
yield return combo; // No need to duplicate the array since we're just reading it.
for (int i = 0; i < count; ++i)
int index = ++indexTracker[i];
if (index == options.Length)
indexTracker[i] = 0;
combo[i] = options[0];
if (i == rollovers)
combo[i] = options[index];
public void DeferredMayBeBeResolvedAndPromiseAwaitedConcurrently_void0()
Promise.Deferred deferred = default(Promise.Deferred);
Promise promise = default(Promise);
int invokedCount = 0;
var threadHelper = new ThreadHelper();
// Setup
() =>
invokedCount = 0;
deferred = Promise.NewDeferred();
promise = deferred.Promise;
// Teardown
() => Assert.AreEqual(1, invokedCount),
// Parallel Actions
() => deferred.Resolve(),
() => promise.Then(() => { Interlocked.Increment(ref invokedCount); }).Forget()
我在测试多线程代码时也遇到了严重的问题。然后我在Gerard Meszaros的“xUnit测试模式”中找到了一个非常酷的解决方案。他描述的模式被称为Humble object。
Event-trace/replay. This requires an event monitor and then reviewing the events that were sent. In a UT framework, this would involve manually sending the events as part of a test, and then doing post-mortem reviews. Scriptable. This is where you interact with the running code with a set of triggers. "On x > foo, baz()". This could be interpreted into a UT framework where you have a run-time system triggering a given test on a certain condition. Interactive. This obviously won't work in an automatic testing situation. ;)