概述
C++通过<future>头文件提供了一组支持异步编程的工具,使用这些工具比直接进行多线程操作更加高级、更加简便。
主要包括如下的类型:
std::future:表示异步操作的结果,这个结果在未来可能可用,支持查询操作的状态,等待操作完成和获取结果。注意用于获取结果的get()方法调用会阻塞当前执行流,直到结果准备就绪。
std::promise:承诺在未来提供一个可用的值,通常与 std::future 配对使用,set_result()可以设置异步操作的结果。可用get_future()提取获得一个关联的std::future对象。
std::packaged_task:封装一个函数或可调用对象,使其可以作为异步任务执行。可用get_future()获得一个关联的std::future对象。
还包括如下的函数:
std::async:用于启动异步任务,返回一个std::future对象代表任务的结果,注意我们必须要用变量接收这个返回值,否则当前语句会阻塞式的等待任务结束,因为只有异步任务结束才会销毁返回的临时变量!
这里std::future和std::promise是成对使用的,std::packaged_task类型和std::async函数则是对异步编程的进一步封装和简化,可以避免显式处理std::promise对象。
实例
我们直接用几个例子来解释异步编程的基本用法,从最简单的例子开始
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| #include <cmath> #include <future> #include <iostream>
int main() { std::promise<double> prom; std::future<double> fut = prom.get_future();
std::thread t([&prom](double x) { prom.set_value(sqrt(x)); }, 2.0);
std::cout << "Result: " << fut.get() << '\n';
t.join(); return 0; }
|
代码解释如下:
std::promise代表一个承诺:
- 可以以引用传递方式传递到子线程中,并通过
set_value()方法设置值
- 可以使用
get_future()方法获取std::future对象
std::future代表一个异步编程的结果:可以通过get()方法获取结果
第二个例子是std::packaged_task的使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| #include <cmath> #include <future> #include <iostream>
int main() { std::packaged_task<double(double)> task([](double x) { return sqrt(x); }); std::future<double> result = task.get_future();
std::thread th(std::move(task), 2.0);
std::cout << "Result: " << result.get() << '\n';
th.join(); return 0; }
|
std::packaged_task的使用比第一个例子更加简单,省略了std::promise的定义和使用,
自动包装一个可调用对象,并把执行结果传递给std::future对象。
第三个例子是std::async函数的使用,仍然需要提供一个可调用对象(以及需要传入的参数)
1 2 3 4 5 6 7 8 9 10 11 12 13
| #include <cmath> #include <future> #include <iostream>
int main() { std::future<double> result = std::async(std::launch::async, [](double x) { return sqrt(x); }, 2.0);
std::cout << "Result: " << result.get() << '\n';
return 0; }
|
std::async的使用更加高级和简洁,完全不需要手动创建和管理线程,返回的std::future对象可以获取可调用对象的结果。
std::future
std::future对象通常不会直接创建,而是通过如下几种方式获得:
std::promise对象的get_future()方法
std::packaged_task对象的get_future()方法
std::async函数的返回值
这些方式获得的对象自动与对应的异步操作相关联。
std::future对象支持如下方法:
get():获取对应异步操作的结果。如果结果尚未准备好,此调用将阻塞,直到结果可用。(暂不讨论异步操作中的异常问题)
wait():阻塞当前线程,进入无限等待状态,直到对应的异步操作完成,无返回值。
wait_for:等待指定的时间段,在这段时间内异步操作完成或超时都将结束等待。
wait_until:等待直到指定的时间点,在时间点之间异步操作完成或超时都将结束等待。
valid():检查std::future对象是否有效,即是否关联了一个异步操作,返回布尔值。
其中get()和wait()都会阻塞当前线程直到任务完成,但是wait()可以多次调用,而get()只允许调用一次。
可以使用wait_for(0)实现非阻塞式的检查
1 2 3 4 5 6
| if(fut.wait_for(std::chrono::seconds(0)) == std::future_status::ready) { } else { }
|
std::promise
std::promise通常和std::future成对出现:(使用相同的类型模板参数)
std::promise用于在某一线程中通过set_future()设置某个值
std::future则用于在另一线程中通过get()获取这个值。
通常先创建std::promise对象,然后使用get_future()创建与之关联的std::future对象。
由于std::promise对象不支持,我们必须通过移动或者引用传递的方式提供给子线程。
对于更复杂的情况,则需要使用共享的std::shared_future类型,它相比于std::future有更弱的所有权,
允许多个线程都通过get()获取结果。(std::future只能调用一次get())
使用std::shared_future的示例如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| #include <future> #include <iostream> #include <thread>
int main() { std::promise<int> prom; std::thread t([&]() { std::this_thread::sleep_for(std::chrono::seconds(2)); prom.set_value(42); }); t.detach();
std::shared_future<int> sharedFuture = prom.get_future().share();
std::cout << "Starting tasks...\n";
std::thread task1([&]() { std::cout << sharedFuture.get() << '\n'; }); std::thread task2([&]() { std::cout << sharedFuture.get() << '\n'; });
task1.join(); task2.join();
std::cout << "Tasks completed.\n"; return 0; }
|
虽然多个线程都可以用get()获取结果,但是显然在结果尚未就绪时,对应的线程仍然需要陷入阻塞式的等待中。
std::packaged_task
std::packaged_task只是对可调用对象的一次封装,省略了std::promise的角色,
并且显然和std::promise一样不支持拷贝,只能使用移动的方式传递给子线程,其它没什么好说的。
不同编译器对于std::packaged_task的实现还不一样,例如gcc允许对其进行移动,但是MSVC似乎不允许。
std::async
std::async的调用方式有两种:
- 第一种方式需要依次传入启动策略、可调用对象、可调用对象需要的参数;
- 第一种方式只需要传入可调用对象、可调用对象需要的参数,使用默认的启动策略。
两种用法示例如下
1 2 3 4 5
| std::future<double> result1 = std::async(std::launch::async, [](double x) { return sqrt(x); }, 2.0);
std::future<double> result2 = std::async([](double x) { return sqrt(x); }, 2.0);
|
std::async接受的启动策略通过std::launch枚举类提供:
std::launch::async:表示任务将立刻在另一个新线程中异步执行
std::launch::deferred:表示任务会被延迟执行,直到需要提供结果时才会在当前线程中同步执行,例如用户调用std::future::get()或std::future::wait()函数时。
std::launch::async | std::launch::deferred:这是上面两个策略的组合,任务既可以在一个单独的线程上异步执行,也可以选择延迟执行,取决于具体实现,不同的编译器和操作系统可能会有不同的默认行为。
在启动策略缺省时,std::async会使用std::launch::async | std::launch::deferred策略。
需要强调的是,我们必须使用std::future对象来接收std::async函数的返回值,否则产生的临时对象会直到异步操作完成才会析构,这会对主线程产生阻塞。
C++ 实现简易线程池
下面提供一个线程池的简易实现(参考 C++ 并发三剑客future, promise和async)
simple_thread_pool.hpp1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
| #pragma once
#include <atomic> #include <condition_variable> #include <future> #include <mutex> #include <queue> #include <thread> #include <vector>
class SimpleThreadPool { public: explicit SimpleThreadPool(uint32_t thread_num) : m_thread_num(thread_num > 0 ? thread_num : 1) { start(); }
~SimpleThreadPool() { stop(); }
SimpleThreadPool(const SimpleThreadPool &) = delete; SimpleThreadPool &operator=(const SimpleThreadPool &) = delete;
template <class F, class... Args> auto commit(F &&f, Args &&...args) { using RetType = std::invoke_result_t<F, Args...>;
if (!m_running.load()) throw std::runtime_error("ThreadPool is stopped.");
auto new_task_ptr = std::make_shared<std::packaged_task<RetType()>>( [func = std::forward<F>(f), ... args = std::forward<Args>(args)]() mutable { return func(std::forward<Args>(args)...); });
std::future<RetType> result = new_task_ptr->get_future(); { std::lock_guard<std::mutex> mtx_guard(m_mtx); m_tasks.emplace([new_task_ptr] { (*new_task_ptr)(); }); }
m_cv.notify_one();
return result; }
uint32_t get_idle_thread_num() const { return m_idle_thread_num; }
uint32_t get_thread_num() const { return m_thread_num; }
private: void start() { std::unique_lock<std::mutex> mtx_guard(m_mtx);
m_running.store(true); m_idle_thread_num.store(m_thread_num);
for (uint32_t i = 0; i < m_thread_num; ++i) { m_pool.emplace_back([this]() { while (this->m_running.load()) { std::packaged_task<void()> task;
{ std::unique_lock<std::mutex> mtx_guard2(m_mtx);
this->m_cv.wait(mtx_guard2, [this] { return !this->m_running.load() || !this->m_tasks.empty(); });
if (this->m_tasks.empty()) { return; }
task = std::move(this->m_tasks.front()); this->m_tasks.pop(); }
this->m_idle_thread_num--; task(); this->m_idle_thread_num++; } }); } }
void stop() { m_running.store(false); m_cv.notify_all();
for (auto &td : m_pool) { if (td.joinable()) { td.join(); } } }
std::mutex m_mtx; std::condition_variable m_cv; std::atomic_bool m_running; std::atomic_uint32_t m_thread_num; std::atomic_uint32_t m_idle_thread_num;
std::queue<std::packaged_task<void()>> m_tasks; std::vector<std::thread> m_pool; };
|
测试代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| #include "simple_thread_pool.hpp"
#include <iostream> #include <stdexcept> #include <string> #include <thread> #include <vector>
int main() { try { auto pool = SimpleThreadPool{5};
std::vector<std::future<int>> results; results.reserve(10);
for (int i = 0; i < 10; ++i) { std::string msg = "current available threads: " + std::to_string(pool.get_idle_thread_num()) + '\n'; std::cout << msg;
if (i % 3 == 0) { auto func = [i]() -> int { std::string msg2 = "Task " + std::to_string(i) + " executed\n"; std::cout << msg2; std::this_thread::sleep_for(std::chrono::seconds(3 + i));
throw std::runtime_error("Exception in task " + std::to_string(i)); }; results.emplace_back(pool.commit(func)); } else { auto func = [i](int j) -> int { std::string msg2 = "Task " + std::to_string(i) + " executed\n"; std::cout << msg2; std::this_thread::sleep_for(std::chrono::seconds(3 + i)); return i * j; }; results.emplace_back(pool.commit(func, i + 1)); } }
for (size_t i = 0; i < results.size(); ++i) { try { std::string msg = "Result of task " + std::to_string(i) + ": " + std::to_string(results[i].get()) + '\n'; std::cout << msg; } catch (const std::exception &e) { std::cerr << "Exception caught: " << e.what() << '\n'; } catch (...) { std::cerr << "Unknown exception caught\n"; } } } catch (const std::exception &e) { std::cerr << "Exception caught: " << e.what() << '\n'; }
return 0; }
|