到目前为止,我已经避免了测试多线程代码的噩梦,因为它似乎是一个太大的雷区。我想知道人们是如何测试依赖于线程的代码以获得成功执行的,或者人们是如何测试那些仅在两个线程以给定方式交互时才会出现的问题的?

对于今天的程序员来说,这似乎是一个非常关键的问题,恕我直言,将我们的知识集中在这个问题上是很有用的。


当前回答

它并不完美,但我用c#写了这个帮助程序:

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Proto.Promises.Tests.Threading
{
    public class ThreadHelper
    {
        public static readonly int multiThreadCount = Environment.ProcessorCount * 100;
        private static readonly int[] offsets = new int[] { 0, 10, 100, 1000 };

        private readonly Stack<Task> _executingTasks = new Stack<Task>(multiThreadCount);
        private readonly Barrier _barrier = new Barrier(1);
        private int _currentParticipants = 0;
        private readonly TimeSpan _timeout;

        public ThreadHelper() : this(TimeSpan.FromSeconds(10)) { } // 10 second timeout should be enough for most cases.

        public ThreadHelper(TimeSpan timeout)
        {
            _timeout = timeout;
        }

        /// <summary>
        /// Execute the action multiple times in parallel threads.
        /// </summary>
        public void ExecuteMultiActionParallel(Action action)
        {
            for (int i = 0; i < multiThreadCount; ++i)
            {
                AddParallelAction(action);
            }
            ExecutePendingParallelActions();
        }

        /// <summary>
        /// Execute the action once in a separate thread.
        /// </summary>
        public void ExecuteSingleAction(Action action)
        {
            AddParallelAction(action);
            ExecutePendingParallelActions();
        }

        /// <summary>
        /// Add an action to be run in parallel.
        /// </summary>
        public void AddParallelAction(Action action)
        {
            var taskSource = new TaskCompletionSource<bool>();
            lock (_executingTasks)
            {
                ++_currentParticipants;
                _barrier.AddParticipant();
                _executingTasks.Push(taskSource.Task);
            }
            new Thread(() =>
            {
                try
                {
                    _barrier.SignalAndWait(); // Try to make actions run in lock-step to increase likelihood of breaking race conditions.
                    action.Invoke();
                    taskSource.SetResult(true);
                }
                catch (Exception e)
                {
                    taskSource.SetException(e);
                }
            }).Start();
        }

        /// <summary>
        /// Runs the pending actions in parallel, attempting to run them in lock-step.
        /// </summary>
        public void ExecutePendingParallelActions()
        {
            Task[] tasks;
            lock (_executingTasks)
            {
                _barrier.SignalAndWait();
                _barrier.RemoveParticipants(_currentParticipants);
                _currentParticipants = 0;
                tasks = _executingTasks.ToArray();
                _executingTasks.Clear();
            }
            try
            {
                if (!Task.WaitAll(tasks, _timeout))
                {
                    throw new TimeoutException($"Action(s) timed out after {_timeout}, there may be a deadlock.");
                }
            }
            catch (AggregateException e)
            {
                // Only throw one exception instead of aggregate to try to avoid overloading the test error output.
                throw e.Flatten().InnerException;
            }
        }

        /// <summary>
        /// Run each action in parallel multiple times with differing offsets for each run.
        /// <para/>The number of runs is 4^actions.Length, so be careful if you don't want the test to run too long.
        /// </summary>
        /// <param name="expandToProcessorCount">If true, copies each action on additional threads up to the processor count. This can help test more without increasing the time it takes to complete.
        /// <para/>Example: 2 actions with 6 processors, runs each action 3 times in parallel.</param>
        /// <param name="setup">The action to run before each parallel run.</param>
        /// <param name="teardown">The action to run after each parallel run.</param>
        /// <param name="actions">The actions to run in parallel.</param>
        public void ExecuteParallelActionsWithOffsets(bool expandToProcessorCount, Action setup, Action teardown, params Action[] actions)
        {
            setup += () => { };
            teardown += () => { };
            int actionCount = actions.Length;
            int expandCount = expandToProcessorCount ? Math.Max(Environment.ProcessorCount / actionCount, 1) : 1;
            foreach (var combo in GenerateCombinations(offsets, actionCount))
            {
                setup.Invoke();
                for (int k = 0; k < expandCount; ++k)
                {
                    for (int i = 0; i < actionCount; ++i)
                    {
                        int offset = combo[i];
                        Action action = actions[i];
                        AddParallelAction(() =>
                        {
                            for (int j = offset; j > 0; --j) { } // Just spin in a loop for the offset.
                            action.Invoke();
                        });
                    }
                }
                ExecutePendingParallelActions();
                teardown.Invoke();
            }
        }

        // Input: [1, 2, 3], 3
        // Ouput: [
        //          [1, 1, 1],
        //          [2, 1, 1],
        //          [3, 1, 1],
        //          [1, 2, 1],
        //          [2, 2, 1],
        //          [3, 2, 1],
        //          [1, 3, 1],
        //          [2, 3, 1],
        //          [3, 3, 1],
        //          [1, 1, 2],
        //          [2, 1, 2],
        //          [3, 1, 2],
        //          [1, 2, 2],
        //          [2, 2, 2],
        //          [3, 2, 2],
        //          [1, 3, 2],
        //          [2, 3, 2],
        //          [3, 3, 2],
        //          [1, 1, 3],
        //          [2, 1, 3],
        //          [3, 1, 3],
        //          [1, 2, 3],
        //          [2, 2, 3],
        //          [3, 2, 3],
        //          [1, 3, 3],
        //          [2, 3, 3],
        //          [3, 3, 3]
        //        ]
        private static IEnumerable<int[]> GenerateCombinations(int[] options, int count)
        {
            int[] indexTracker = new int[count];
            int[] combo = new int[count];
            for (int i = 0; i < count; ++i)
            {
                combo[i] = options[0];
            }
            // Same algorithm as picking a combination lock.
            int rollovers = 0;
            while (rollovers < count)
            {
                yield return combo; // No need to duplicate the array since we're just reading it.
                for (int i = 0; i < count; ++i)
                {
                    int index = ++indexTracker[i];
                    if (index == options.Length)
                    {
                        indexTracker[i] = 0;
                        combo[i] = options[0];
                        if (i == rollovers)
                        {
                            ++rollovers;
                        }
                    }
                    else
                    {
                        combo[i] = options[index];
                        break;
                    }
                }
            }
        }
    }
}

使用示例:

[Test]
public void DeferredMayBeBeResolvedAndPromiseAwaitedConcurrently_void0()
{
    Promise.Deferred deferred = default(Promise.Deferred);
    Promise promise = default(Promise);

    int invokedCount = 0;

    var threadHelper = new ThreadHelper();
    threadHelper.ExecuteParallelActionsWithOffsets(false,
        // Setup
        () =>
        {
            invokedCount = 0;
            deferred = Promise.NewDeferred();
            promise = deferred.Promise;
        },
        // Teardown
        () => Assert.AreEqual(1, invokedCount),
        // Parallel Actions
        () => deferred.Resolve(),
        () => promise.Then(() => { Interlocked.Increment(ref invokedCount); }).Forget()
    );
}

其他回答

听着,要做到这一点并不容易。我正在做一个本来就是多线程的项目。事件来自操作系统,我必须并发地处理它们。

处理测试复杂的多线程应用程序代码的最简单方法是:如果它太复杂而无法测试,那么您做错了。如果您有一个单独的实例,其中有多个线程作用于它,并且您无法测试这些线程相互踩在一起的情况,那么您的设计需要重做。它既简单又复杂。

有许多方法可以为多线程编程,以避免线程同时通过实例运行。最简单的方法是使所有对象都是不可变的。当然,这通常是不可能的。因此,您必须在设计中确定线程与同一实例交互的地方,并减少这些地方的数量。通过这样做,您可以隔离多线程实际发生的几个类,从而降低测试系统的总体复杂性。

但是您必须意识到,即使这样做,您仍然不能测试两个线程相互践踏的每一种情况。要做到这一点,您必须在同一个测试中并发地运行两个线程,然后准确地控制它们在任何给定时刻执行的行。你能做的就是模拟这种情况。但这可能需要您专门为测试编写代码,这充其量是迈向真正解决方案的半步。

测试代码是否存在线程问题的最好方法可能是对代码进行静态分析。如果您的线程代码没有遵循有限的线程安全模式集,那么您可能会遇到问题。我相信VS中的代码分析确实包含了一些线程的知识,但可能不多。

看,就目前的情况来看(可能还会持续很长一段时间),测试多线程应用程序的最佳方法是尽可能降低线程代码的复杂性。最小化线程交互的区域,尽可能地进行测试,并使用代码分析来识别危险区域。

确实很难!在我的(c++)单元测试中,我按照使用的并发模式将其分解为几个类别:

Unit tests for classes that operate in a single thread and aren't thread aware -- easy, test as usual. Unit tests for Monitor objects (those that execute synchronized methods in the callers' thread of control) that expose a synchronized public API -- instantiate multiple mock threads that exercise the API. Construct scenarios that exercise internal conditions of the passive object. Include one longer running test that basically beats the heck out of it from multiple threads for a long period of time. This is unscientific I know but it does build confidence. Unit tests for Active objects (those that encapsulate their own thread or threads of control) -- similar to #2 above with variations depending on the class design. Public API may be blocking or non-blocking, callers may obtain futures, data may arrive at queues or need to be dequeued. There are many combinations possible here; white box away. Still requires multiple mock threads to make calls to the object under test.

题外话:

在我所做的内部开发人员培训中,我教授了并发的支柱和这两种模式,作为思考和分解并发问题的主要框架。显然还有更先进的概念,但我发现这组基础知识可以帮助工程师摆脱困境。正如上面所描述的,它还会导致代码更具单元可测试性。

运行多个线程并不困难;这是小菜一碟。不幸的是,线程通常需要彼此通信;这就是困难所在。

最初发明的允许模块之间通信的机制是函数调用;当模块A想要与模块B通信时,它只调用模块B中的一个函数。不幸的是,这对线程不起作用,因为当你调用一个函数时,该函数仍然运行在当前线程中。

为了克服这个问题,人们决定采用一种更原始的通信机制:只声明一个特定的变量,并让两个线程都可以访问该变量。换句话说,允许线程共享数据。分享数据是人们自然而然想到的第一件事,这似乎是一个不错的选择,因为它看起来非常简单。我是说,能有多难,对吧?会出什么问题呢?

竞态条件。这就是可能、也将会出错的地方。

当人们意识到他们的软件由于竞争条件而遭受随机的、不可复制的灾难性失败时,他们开始发明复杂的机制,如锁和比较-交换,旨在防止此类事情的发生。这些机制属于广义的“同步”范畴。不幸的是,同步有两个问题:

这是很难做到的,所以很容易出现bug。 它是完全不可测试的,因为您无法测试竞态条件。

精明的读者可能会注意到“非常容易出现bug”和“完全不可测试”是一个致命的组合。

现在,在自动化软件测试的概念变得流行之前,我上面提到的机制已经被行业的大部分人发明和采用了;所以,没有人知道这个问题有多致命;他们只是认为这是一个很难的主题,需要高手程序员,每个人都能接受。

如今,无论我们做什么,我们都把测试放在第一位。所以,如果某些机制是不可测试的,那么使用该机制就是不可能的。因此,同步已经失宠;现在还在练的人已经很少了,而且练的人一天比一天少。

没有同步线程就不能共享数据;然而,最初的要求不是共享数据;它允许线程之间进行通信。除了共享数据之外,还存在其他更优雅的线程间通信机制。

其中一种机制是消息传递,也称为事件。

对于消息传递,整个软件系统中只有一个地方利用了同步,那就是我们用来存储消息的并发阻塞队列收集类。(我们的想法是,我们应该至少能把那一小部分做对。)

消息传递的优点是它不受竞态条件的影响,并且是完全可测试的。

我用与处理任何单元测试相同的方式处理线程组件的单元测试,即使用反转控制和隔离框架。我在. net领域进行开发,开箱即用的线程(以及其他东西)很难(我可以说几乎不可能)完全隔离。

因此,我写的包装器看起来像这样(简化):

public interface IThread
{
    void Start();
    ...
}

public class ThreadWrapper : IThread
{
    private readonly Thread _thread;
     
    public ThreadWrapper(ThreadStart threadStart)
    {
        _thread = new Thread(threadStart);
    }

    public Start()
    {
        _thread.Start();
    }
}
    
public interface IThreadingManager
{
    IThread CreateThread(ThreadStart threadStart);
}

public class ThreadingManager : IThreadingManager
{
    public IThread CreateThread(ThreadStart threadStart)
    {
         return new ThreadWrapper(threadStart)
    }
}

从那里,我可以很容易地将IThreadingManager注入到组件中,并使用所选的隔离框架使线程在测试期间的行为符合我的预期。

到目前为止,这对我来说工作得很好,我对线程池,系统中的东西使用相同的方法。环境,睡眠等等。

等待在帮助您编写确定性单元测试时也很有用。它允许您等待系统中的某个状态更新。例如:

await().untilCall( to(myService).myMethod(), greaterThan(3) );

or

await().atMost(5,SECONDS).until(fieldIn(myObject).ofType(int.class), equalTo(1));

它还支持Scala和Groovy。

await until { something() > 4 } // Scala example