我相当熟悉c++ 11的std::thread, std::async和std::future组件(例如,请看这个答案),这些都很简单。
然而,我不能完全理解std::promise是什么,它做什么以及在什么情况下最好使用它。标准文档本身除了类概要之外不包含大量信息,std::thread也不包含。
谁能给一个简要的例子,说明需要std::promise的情况,以及它是最常用的解决方案的情况?
我相当熟悉c++ 11的std::thread, std::async和std::future组件(例如,请看这个答案),这些都很简单。
然而,我不能完全理解std::promise是什么,它做什么以及在什么情况下最好使用它。标准文档本身除了类概要之外不包含大量信息,std::thread也不包含。
谁能给一个简要的例子,说明需要std::promise的情况,以及它是最常用的解决方案的情况?
当前回答
在一个粗略的近似中,你可以把std::promise看作std::future的另一端(这是错误的,但是为了说明问题,你可以把它看作是)。通信通道的消费者端将使用std::future来消费来自共享状态的数据,而生产者线程将使用std::promise来写入共享状态。
其他回答
现在我对这种情况有了更好的理解(由于这里的答案,这不是一小部分!),所以我想我添加了一些我自己的文章。
c++ 11中有两个截然不同但又相关的概念:异步计算(在其他地方调用的函数)和并发执行(线程,并行工作的东西)。这两个概念在某种程度上是正交的。异步计算只是一种不同风格的函数调用,而线程是一个执行上下文。线程本身是有用的,但是为了讨论的目的,我将把它们作为一个实现细节。
异步计算有一个抽象层次。举个例子,假设我们有一个带参数的函数:
int foo(double, char, bool);
首先,我们有模板std::future<T>,它表示类型为T的未来值。该值可以通过成员函数get()检索,该函数通过等待结果有效地同步程序。或者,future支持wait_for(),它可用于探测结果是否已经可用。期货应该被认为是普通返回类型的异步插入式替换。对于我们的示例函数,我们期望std::future<int>。
现在,在层次结构上,从最高到最低的层次:
std::async: The most convenient and straight-forward way to perform an asynchronous computation is via the async function template, which returns the matching future immediately: auto fut = std::async(foo, 1.5, 'x', false); // is a std::future<int> We have very little control over the details. In particular, we don't even know if the function is executed concurrently, serially upon get(), or by some other black magic. However, the result is easily obtained when needed: auto res = fut.get(); // is an int We can now consider how to implement something like async, but in a fashion that we control. For example, we may insist that the function be executed in a separate thread. We already know that we can provide a separate thread by means of the std::thread class. The next lower level of abstraction does exactly that: std::packaged_task. This is a template that wraps a function and provides a future for the functions return value, but the object itself is callable, and calling it is at the user's discretion. We can set it up like this: std::packaged_task<int(double, char, bool)> tsk(foo); auto fut = tsk.get_future(); // is a std::future<int> The future becomes ready once we call the task and the call completes. This is the ideal job for a separate thread. We just have to make sure to move the task into the thread: std::thread thr(std::move(tsk), 1.5, 'x', false); The thread starts running immediately. We can either detach it, or have join it at the end of the scope, or whenever (e.g. using Anthony Williams's scoped_thread wrapper, which really should be in the standard library). The details of using std::thread don't concern us here, though; just be sure to join or detach thr eventually. What matters is that whenever the function call finishes, our result is ready: auto res = fut.get(); // as before Now we're down to the lowest level: How would we implement the packaged task? This is where the std::promise comes in. The promise is the building block for communicating with a future. The principal steps are these: The calling thread makes a promise. The calling thread obtains a future from the promise. The promise, along with function arguments, are moved into a separate thread. The new thread executes the function and fulfills the promise. The original thread retrieves the result. As an example, here's our very own "packaged task": template <typename> class my_task; template <typename R, typename ...Args> class my_task<R(Args...)> { std::function<R(Args...)> fn; std::promise<R> pr; // the promise of the result public: template <typename ...Ts> explicit my_task(Ts &&... ts) : fn(std::forward<Ts>(ts)...) { } template <typename ...Ts> void operator()(Ts &&... ts) { pr.set_value(fn(std::forward<Ts>(ts)...)); // fulfill the promise } std::future<R> get_future() { return pr.get_future(); } // disable copy, default move }; Usage of this template is essentially the same as that of std::packaged_task. Note that moving the entire task subsumes moving the promise. In more ad-hoc situations, one could also move a promise object explicitly into the new thread and make it a function argument of the thread function, but a task wrapper like the one above seems like a more flexible and less intrusive solution.
生产异常
承诺与异常密切相关。promise的接口本身不足以完全传达它的状态,因此每当promise上的操作没有意义时,就会抛出异常。所有异常都是std::future_error类型,它起源于std::logic_error。首先,一些约束条件的描述:
A default-constructed promise is inactive. Inactive promises can die without consequence. A promise becomes active when a future is obtained via get_future(). However, only one future may be obtained! A promise must either be satisfied via set_value() or have an exception set via set_exception() before its lifetime ends if its future is to be consumed. A satisfied promise can die without consequence, and get() becomes available on the future. A promise with an exception will raise the stored exception upon call of get() on the future. If the promise dies with neither value nor exception, calling get() on the future will raise a "broken promise" exception.
下面是一个小测试系列来演示这些不同的异常行为。首先,挽具:
#include <iostream>
#include <future>
#include <exception>
#include <stdexcept>
int test();
int main()
{
try
{
return test();
}
catch (std::future_error const & e)
{
std::cout << "Future error: " << e.what() << " / " << e.code() << std::endl;
}
catch (std::exception const & e)
{
std::cout << "Standard exception: " << e.what() << std::endl;
}
catch (...)
{
std::cout << "Unknown exception." << std::endl;
}
}
现在来看看测试。
案例1:非活动承诺
int test()
{
std::promise<int> pr;
return 0;
}
// fine, no problems
案例2:主动承诺,未使用
int test()
{
std::promise<int> pr;
auto fut = pr.get_future();
return 0;
}
// fine, no problems; fut.get() would block indefinitely
案例3:太多的未来
int test()
{
std::promise<int> pr;
auto fut1 = pr.get_future();
auto fut2 = pr.get_future(); // Error: "Future already retrieved"
return 0;
}
案例4:兑现承诺
int test()
{
std::promise<int> pr;
auto fut = pr.get_future();
{
std::promise<int> pr2(std::move(pr));
pr2.set_value(10);
}
return fut.get();
}
// Fine, returns "10".
案例5:过于满足
int test()
{
std::promise<int> pr;
auto fut = pr.get_future();
{
std::promise<int> pr2(std::move(pr));
pr2.set_value(10);
pr2.set_value(10); // Error: "Promise already satisfied"
}
return fut.get();
}
如果set_value或set_exception的值多于一个,则会抛出相同的异常。
案例6:例外
int test()
{
std::promise<int> pr;
auto fut = pr.get_future();
{
std::promise<int> pr2(std::move(pr));
pr2.set_exception(std::make_exception_ptr(std::runtime_error("Booboo")));
}
return fut.get();
}
// throws the runtime_error exception
案例7:失信
int test()
{
std::promise<int> pr;
auto fut = pr.get_future();
{
std::promise<int> pr2(std::move(pr));
} // Error: "broken promise"
return fut.get();
}
承诺是电线的另一端。
假设您需要检索由异步程序计算的future的值。然而,你不希望它在同一个线程中计算,你甚至不“现在”生成一个线程——也许你的软件被设计为从线程池中选择一个线程,所以你不知道谁将在最后执行这些计算。
现在,你传递什么给这个(未知的)线程/类/实体?你不通过将来,因为这是结果。你想要传递一些连接到未来的东西,它代表导线的另一端,所以你只会查询未来,而不知道谁会实际计算/写入一些东西。
这就是承诺。它是连接你未来的手柄。如果future是一个演讲者,并且使用get()开始听直到一些声音出来,那么promise就是一个麦克风;但不是普通的麦克风,它是通过一根电线连接到你手中的扬声器的麦克风。你可能知道电话的另一端是谁,但你不需要知道——你只需要把电话给对方,然后等对方开口。
std::promise被创建为promise/future对的端点,std::future(使用get_future()方法从std::promise创建)是另一个端点。这是一种简单的一次性方法,当一个线程通过消息向另一个线程提供数据时,它为两个线程提供了一种同步方式。
您可以将其视为一个线程创建一个承诺来提供数据,而另一个线程在未来收集承诺。此机制只能使用一次。
promise/future机制只是一个方向,从使用std::promise的set_value()方法的线程到使用std::future的get()方法来接收数据的线程。如果调用future的get()方法超过一次,则会生成异常。
如果带有std::promise的线程没有使用set_value()来履行其承诺,那么当第二个线程调用std::future的get()来收集承诺时,第二个线程将进入等待状态,直到带有std::promise的第一个线程在使用set_value()方法发送数据时完成承诺。
随着技术规范N4663编程语言- c++扩展协程的提议和Visual Studio 2017 c++编译器对co_await的支持,也可以使用std::future和std::async来编写协程功能。请参阅https://stackoverflow.com/a/50753040/1466970中的讨论和示例,其中有一节讨论了std::future与co_await的使用。
下面的示例代码是一个简单的Visual Studio 2013 Windows控制台应用程序,展示了使用一些c++ 11并发类/模板和其他功能。它说明了promise/future的使用情况,自治线程将完成一些任务并停止,以及需要更多同步行为的使用情况,由于需要多个通知,promise/future对无法工作。
关于这个例子需要注意的一点是在不同的地方添加了延迟。添加这些延迟只是为了确保使用std::cout打印到控制台的各种消息是清晰的,并且来自几个线程的文本不会混合在一起。
main()的第一部分是创建另外三个线程,并使用std::promise和std::future在线程之间发送数据。一个有趣的点是主线程启动一个线程T2,它将等待主线程的数据,执行一些操作,然后将数据发送给第三个线程T3, T3将执行一些操作并将数据发送回主线程。
main()的第二部分创建两个线程和一组队列,以允许多条消息从主线程发送到创建的两个线程。我们不能为此使用std::promise和std::future,因为promise/future是一次性的,不能重复使用。
类Sync_queue的源代码来自Stroustrup的《c++ Programming Language: 4th Edition》。
// cpp_threads.cpp : Defines the entry point for the console application.
//
#include "stdafx.h"
#include <iostream>
#include <thread> // std::thread is defined here
#include <future> // std::future and std::promise defined here
#include <list> // std::list which we use to build a message queue on.
static std::atomic<int> kount(1); // this variable is used to provide an identifier for each thread started.
//------------------------------------------------
// create a simple queue to let us send notifications to some of our threads.
// a future and promise are one shot type of notifications.
// we use Sync_queue<> to have a queue between a producer thread and a consumer thread.
// this code taken from chapter 42 section 42.3.4
// The C++ Programming Language, 4th Edition by Bjarne Stroustrup
// copyright 2014 by Pearson Education, Inc.
template<typename Ttype>
class Sync_queue {
public:
void put(const Ttype &val);
void get(Ttype &val);
private:
std::mutex mtx; // mutex used to synchronize queue access
std::condition_variable cond; // used for notifications when things are added to queue
std::list <Ttype> q; // list that is used as a message queue
};
template<typename Ttype>
void Sync_queue<Ttype>::put(const Ttype &val) {
std::lock_guard <std::mutex> lck(mtx);
q.push_back(val);
cond.notify_one();
}
template<typename Ttype>
void Sync_queue<Ttype>::get(Ttype &val) {
std::unique_lock<std::mutex> lck(mtx);
cond.wait(lck, [this]{return !q.empty(); });
val = q.front();
q.pop_front();
}
//------------------------------------------------
// thread function that starts up and gets its identifier and then
// waits for a promise to be filled by some other thread.
void func(std::promise<int> &jj) {
int myId = std::atomic_fetch_add(&kount, 1); // get my identifier
std::future<int> intFuture(jj.get_future());
auto ll = intFuture.get(); // wait for the promise attached to the future
std::cout << " func " << myId << " future " << ll << std::endl;
}
// function takes a promise from one thread and creates a value to provide as a promise to another thread.
void func2(std::promise<int> &jj, std::promise<int>&pp) {
int myId = std::atomic_fetch_add(&kount, 1); // get my identifier
std::future<int> intFuture(jj.get_future());
auto ll = intFuture.get(); // wait for the promise attached to the future
auto promiseValue = ll * 100; // create the value to provide as promised to the next thread in the chain
pp.set_value(promiseValue);
std::cout << " func2 " << myId << " promised " << promiseValue << " ll was " << ll << std::endl;
}
// thread function that starts up and waits for a series of notifications for work to do.
void func3(Sync_queue<int> &q, int iBegin, int iEnd, int *pInts) {
int myId = std::atomic_fetch_add(&kount, 1);
int ll;
q.get(ll); // wait on a notification and when we get it, processes it.
while (ll > 0) {
std::cout << " func3 " << myId << " start loop base " << ll << " " << iBegin << " to " << iEnd << std::endl;
for (int i = iBegin; i < iEnd; i++) {
pInts[i] = ll + i;
}
q.get(ll); // we finished this job so now wait for the next one.
}
}
int _tmain(int argc, _TCHAR* argv[])
{
std::chrono::milliseconds myDur(1000);
// create our various promise and future objects which we are going to use to synchronise our threads
// create our three threads which are going to do some simple things.
std::cout << "MAIN #1 - create our threads." << std::endl;
// thread T1 is going to wait on a promised int
std::promise<int> intPromiseT1;
std::thread t1(func, std::ref(intPromiseT1));
// thread T2 is going to wait on a promised int and then provide a promised int to thread T3
std::promise<int> intPromiseT2;
std::promise<int> intPromiseT3;
std::thread t2(func2, std::ref(intPromiseT2), std::ref(intPromiseT3));
// thread T3 is going to wait on a promised int and then provide a promised int to thread Main
std::promise<int> intPromiseMain;
std::thread t3(func2, std::ref(intPromiseT3), std::ref(intPromiseMain));
std::this_thread::sleep_for(myDur);
std::cout << "MAIN #2 - provide the value for promise #1" << std::endl;
intPromiseT1.set_value(22);
std::this_thread::sleep_for(myDur);
std::cout << "MAIN #2.2 - provide the value for promise #2" << std::endl;
std::this_thread::sleep_for(myDur);
intPromiseT2.set_value(1001);
std::this_thread::sleep_for(myDur);
std::cout << "MAIN #2.4 - set_value 1001 completed." << std::endl;
std::future<int> intFutureMain(intPromiseMain.get_future());
auto t3Promised = intFutureMain.get();
std::cout << "MAIN #2.3 - intFutureMain.get() from T3. " << t3Promised << std::endl;
t1.join();
t2.join();
t3.join();
int iArray[100];
Sync_queue<int> q1; // notification queue for messages to thread t11
Sync_queue<int> q2; // notification queue for messages to thread t12
std::thread t11(func3, std::ref(q1), 0, 5, iArray); // start thread t11 with its queue and section of the array
std::this_thread::sleep_for(myDur);
std::thread t12(func3, std::ref(q2), 10, 15, iArray); // start thread t12 with its queue and section of the array
std::this_thread::sleep_for(myDur);
// send a series of jobs to our threads by sending notification to each thread's queue.
for (int i = 0; i < 5; i++) {
std::cout << "MAIN #11 Loop to do array " << i << std::endl;
std::this_thread::sleep_for(myDur); // sleep a moment for I/O to complete
q1.put(i + 100);
std::this_thread::sleep_for(myDur); // sleep a moment for I/O to complete
q2.put(i + 1000);
std::this_thread::sleep_for(myDur); // sleep a moment for I/O to complete
}
// close down the job threads so that we can quit.
q1.put(-1); // indicate we are done with agreed upon out of range data value
q2.put(-1); // indicate we are done with agreed upon out of range data value
t11.join();
t12.join();
return 0;
}
这个简单的应用程序创建以下输出。
MAIN #1 - create our threads.
MAIN #2 - provide the value for promise #1
func 1 future 22
MAIN #2.2 - provide the value for promise #2
func2 2 promised 100100 ll was 1001
func2 3 promised 10010000 ll was 100100
MAIN #2.4 - set_value 1001 completed.
MAIN #2.3 - intFutureMain.get() from T3. 10010000
MAIN #11 Loop to do array 0
func3 4 start loop base 100 0 to 5
func3 5 start loop base 1000 10 to 15
MAIN #11 Loop to do array 1
func3 4 start loop base 101 0 to 5
func3 5 start loop base 1001 10 to 15
MAIN #11 Loop to do array 2
func3 4 start loop base 102 0 to 5
func3 5 start loop base 1002 10 to 15
MAIN #11 Loop to do array 3
func3 4 start loop base 103 0 to 5
func3 5 start loop base 1003 10 to 15
MAIN #11 Loop to do array 4
func3 4 start loop base 104 0 to 5
func3 5 start loop base 1004 10 to 15
在一个粗略的近似中,你可以把std::promise看作std::future的另一端(这是错误的,但是为了说明问题,你可以把它看作是)。通信通道的消费者端将使用std::future来消费来自共享状态的数据,而生产者线程将使用std::promise来写入共享状态。
在异步处理中有3个核心实体。c++ 11目前专注于其中的两个。
异步运行逻辑所需要的核心内容是:
任务(逻辑打包为某个函子对象)将运行“某处”。 实际的处理节点——线程、进程等,当这些函子被提供给它时运行它们。查看“Command”设计模式,了解基本工作线程池是如何做到这一点的。 结果句柄:有人需要这个结果,需要一个对象来为他们获取结果。出于OOP和其他原因,任何等待或同步都应该在这个句柄的api中完成。
c++ 11调用了我在(1)std::promise和(3)std::future中提到的东西。 std::thread是(2)唯一公开提供的东西。这是不幸的,因为真正的程序需要管理线程和内存资源,大多数程序都希望任务运行在线程池上,而不是为每个小任务创建和销毁一个线程(这几乎总是会导致不必要的性能损失,并且很容易造成更糟糕的资源短缺)。
根据Herb Sutter和c++ 11智囊团的其他人的说法,他们暂定计划添加std::executor,这很像Java,将是线程池的基础,逻辑上类似于(2)的设置。也许我们会在c++ 2014中看到它,但我打赌更像c++ 17(如果他们搞砸了这些标准,上帝会帮助我们)。