到目前为止,我已经避免了测试多线程代码的噩梦,因为它似乎是一个太大的雷区。我想知道人们是如何测试依赖于线程的代码以获得成功执行的,或者人们是如何测试那些仅在两个线程以给定方式交互时才会出现的问题的?
对于今天的程序员来说,这似乎是一个非常关键的问题,恕我直言,将我们的知识集中在这个问题上是很有用的。
到目前为止,我已经避免了测试多线程代码的噩梦,因为它似乎是一个太大的雷区。我想知道人们是如何测试依赖于线程的代码以获得成功执行的,或者人们是如何测试那些仅在两个线程以给定方式交互时才会出现的问题的?
对于今天的程序员来说,这似乎是一个非常关键的问题,恕我直言,将我们的知识集中在这个问题上是很有用的。
当前回答
下面的文章提出了两种解决方案。包装一个信号量(CountDownLatch),并添加诸如从内部线程外部化数据之类的功能。实现此目的的另一种方法是使用线程池(请参阅兴趣点)。
喷头-高级同步对象
其他回答
看看我的相关答案在
为自定义Barrier设计一个Test类
它偏向于Java,但对选项进行了合理的总结。
总而言之(我认为),它不是使用一些花哨的框架来确保正确性,而是如何设计你的多线程代码。拆分关注点(并发性和功能性)有助于提高信心。测试引导的面向对象软件的发展比我能更好地解释一些选项。
静态分析和形式化方法(参见并发性:状态模型和Java程序)是一种选择,但我发现它们在商业开发中用处有限。
不要忘记,任何加载/浸泡风格的测试都很少能保证突出问题。
好运!
它并不完美,但我用c#写了这个帮助程序:
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Proto.Promises.Tests.Threading
{
public class ThreadHelper
{
public static readonly int multiThreadCount = Environment.ProcessorCount * 100;
private static readonly int[] offsets = new int[] { 0, 10, 100, 1000 };
private readonly Stack<Task> _executingTasks = new Stack<Task>(multiThreadCount);
private readonly Barrier _barrier = new Barrier(1);
private int _currentParticipants = 0;
private readonly TimeSpan _timeout;
public ThreadHelper() : this(TimeSpan.FromSeconds(10)) { } // 10 second timeout should be enough for most cases.
public ThreadHelper(TimeSpan timeout)
{
_timeout = timeout;
}
/// <summary>
/// Execute the action multiple times in parallel threads.
/// </summary>
public void ExecuteMultiActionParallel(Action action)
{
for (int i = 0; i < multiThreadCount; ++i)
{
AddParallelAction(action);
}
ExecutePendingParallelActions();
}
/// <summary>
/// Execute the action once in a separate thread.
/// </summary>
public void ExecuteSingleAction(Action action)
{
AddParallelAction(action);
ExecutePendingParallelActions();
}
/// <summary>
/// Add an action to be run in parallel.
/// </summary>
public void AddParallelAction(Action action)
{
var taskSource = new TaskCompletionSource<bool>();
lock (_executingTasks)
{
++_currentParticipants;
_barrier.AddParticipant();
_executingTasks.Push(taskSource.Task);
}
new Thread(() =>
{
try
{
_barrier.SignalAndWait(); // Try to make actions run in lock-step to increase likelihood of breaking race conditions.
action.Invoke();
taskSource.SetResult(true);
}
catch (Exception e)
{
taskSource.SetException(e);
}
}).Start();
}
/// <summary>
/// Runs the pending actions in parallel, attempting to run them in lock-step.
/// </summary>
public void ExecutePendingParallelActions()
{
Task[] tasks;
lock (_executingTasks)
{
_barrier.SignalAndWait();
_barrier.RemoveParticipants(_currentParticipants);
_currentParticipants = 0;
tasks = _executingTasks.ToArray();
_executingTasks.Clear();
}
try
{
if (!Task.WaitAll(tasks, _timeout))
{
throw new TimeoutException($"Action(s) timed out after {_timeout}, there may be a deadlock.");
}
}
catch (AggregateException e)
{
// Only throw one exception instead of aggregate to try to avoid overloading the test error output.
throw e.Flatten().InnerException;
}
}
/// <summary>
/// Run each action in parallel multiple times with differing offsets for each run.
/// <para/>The number of runs is 4^actions.Length, so be careful if you don't want the test to run too long.
/// </summary>
/// <param name="expandToProcessorCount">If true, copies each action on additional threads up to the processor count. This can help test more without increasing the time it takes to complete.
/// <para/>Example: 2 actions with 6 processors, runs each action 3 times in parallel.</param>
/// <param name="setup">The action to run before each parallel run.</param>
/// <param name="teardown">The action to run after each parallel run.</param>
/// <param name="actions">The actions to run in parallel.</param>
public void ExecuteParallelActionsWithOffsets(bool expandToProcessorCount, Action setup, Action teardown, params Action[] actions)
{
setup += () => { };
teardown += () => { };
int actionCount = actions.Length;
int expandCount = expandToProcessorCount ? Math.Max(Environment.ProcessorCount / actionCount, 1) : 1;
foreach (var combo in GenerateCombinations(offsets, actionCount))
{
setup.Invoke();
for (int k = 0; k < expandCount; ++k)
{
for (int i = 0; i < actionCount; ++i)
{
int offset = combo[i];
Action action = actions[i];
AddParallelAction(() =>
{
for (int j = offset; j > 0; --j) { } // Just spin in a loop for the offset.
action.Invoke();
});
}
}
ExecutePendingParallelActions();
teardown.Invoke();
}
}
// Input: [1, 2, 3], 3
// Ouput: [
// [1, 1, 1],
// [2, 1, 1],
// [3, 1, 1],
// [1, 2, 1],
// [2, 2, 1],
// [3, 2, 1],
// [1, 3, 1],
// [2, 3, 1],
// [3, 3, 1],
// [1, 1, 2],
// [2, 1, 2],
// [3, 1, 2],
// [1, 2, 2],
// [2, 2, 2],
// [3, 2, 2],
// [1, 3, 2],
// [2, 3, 2],
// [3, 3, 2],
// [1, 1, 3],
// [2, 1, 3],
// [3, 1, 3],
// [1, 2, 3],
// [2, 2, 3],
// [3, 2, 3],
// [1, 3, 3],
// [2, 3, 3],
// [3, 3, 3]
// ]
private static IEnumerable<int[]> GenerateCombinations(int[] options, int count)
{
int[] indexTracker = new int[count];
int[] combo = new int[count];
for (int i = 0; i < count; ++i)
{
combo[i] = options[0];
}
// Same algorithm as picking a combination lock.
int rollovers = 0;
while (rollovers < count)
{
yield return combo; // No need to duplicate the array since we're just reading it.
for (int i = 0; i < count; ++i)
{
int index = ++indexTracker[i];
if (index == options.Length)
{
indexTracker[i] = 0;
combo[i] = options[0];
if (i == rollovers)
{
++rollovers;
}
}
else
{
combo[i] = options[index];
break;
}
}
}
}
}
}
使用示例:
[Test]
public void DeferredMayBeBeResolvedAndPromiseAwaitedConcurrently_void0()
{
Promise.Deferred deferred = default(Promise.Deferred);
Promise promise = default(Promise);
int invokedCount = 0;
var threadHelper = new ThreadHelper();
threadHelper.ExecuteParallelActionsWithOffsets(false,
// Setup
() =>
{
invokedCount = 0;
deferred = Promise.NewDeferred();
promise = deferred.Promise;
},
// Teardown
() => Assert.AreEqual(1, invokedCount),
// Parallel Actions
() => deferred.Resolve(),
() => promise.Then(() => { Interlocked.Increment(ref invokedCount); }).Forget()
);
}
一个简单的测试模式可以用于一些(不是所有!)用例,就是多次重复相同的测试。例如,假设你有一个方法:
def process(input):
# Spawns several threads to do the job
# ...
return output
创建一堆测试:
process(input1) -> expect to return output1
process(input2) -> expect to return output2
...
现在将每个测试运行多次。
如果流程的实现包含一个微小的错误(例如死锁、竞态条件等),出现的概率为0.1%,那么运行1000次测试,则该错误至少出现一次的概率为64%。运行测试10000次,得到>99%的概率。
有一些很好的工具。下面是一些Java的摘要。
一些好的静态分析工具包括FindBugs(提供了一些有用的提示)、JLint、Java Pathfinder (JPF & JPF2)和Bogor。
multithreaddtc是一个非常好的动态分析工具(集成到JUnit中),您必须在其中设置自己的测试用例。
IBM研究院的竞赛很有趣。它通过插入各种线程修改行为(例如sleep & yield)来检测你的代码,试图随机发现错误。
SPIN是对Java(和其他)组件建模的一个非常酷的工具,但是您需要一些有用的框架。它很难使用,但如果你知道如何使用它,它是非常强大的。相当多的工具在底层使用SPIN。
multithreaddtc可能是最主流的,但是上面列出的一些静态分析工具绝对值得一看。
运行多个线程并不困难;这是小菜一碟。不幸的是,线程通常需要彼此通信;这就是困难所在。
最初发明的允许模块之间通信的机制是函数调用;当模块A想要与模块B通信时,它只调用模块B中的一个函数。不幸的是,这对线程不起作用,因为当你调用一个函数时,该函数仍然运行在当前线程中。
为了克服这个问题,人们决定采用一种更原始的通信机制:只声明一个特定的变量,并让两个线程都可以访问该变量。换句话说,允许线程共享数据。分享数据是人们自然而然想到的第一件事,这似乎是一个不错的选择,因为它看起来非常简单。我是说,能有多难,对吧?会出什么问题呢?
竞态条件。这就是可能、也将会出错的地方。
当人们意识到他们的软件由于竞争条件而遭受随机的、不可复制的灾难性失败时,他们开始发明复杂的机制,如锁和比较-交换,旨在防止此类事情的发生。这些机制属于广义的“同步”范畴。不幸的是,同步有两个问题:
这是很难做到的,所以很容易出现bug。 它是完全不可测试的,因为您无法测试竞态条件。
精明的读者可能会注意到“非常容易出现bug”和“完全不可测试”是一个致命的组合。
现在,在自动化软件测试的概念变得流行之前,我上面提到的机制已经被行业的大部分人发明和采用了;所以,没有人知道这个问题有多致命;他们只是认为这是一个很难的主题,需要高手程序员,每个人都能接受。
如今,无论我们做什么,我们都把测试放在第一位。所以,如果某些机制是不可测试的,那么使用该机制就是不可能的。因此,同步已经失宠;现在还在练的人已经很少了,而且练的人一天比一天少。
没有同步线程就不能共享数据;然而,最初的要求不是共享数据;它允许线程之间进行通信。除了共享数据之外,还存在其他更优雅的线程间通信机制。
其中一种机制是消息传递,也称为事件。
对于消息传递,整个软件系统中只有一个地方利用了同步,那就是我们用来存储消息的并发阻塞队列收集类。(我们的想法是,我们应该至少能把那一小部分做对。)
消息传递的优点是它不受竞态条件的影响,并且是完全可测试的。