如何解决为什么此线程池死锁或运行太多次?
我正在尝试在c++
中编写一个满足以下条件的线程池:
- 单个编写器偶尔会写一个新的输入值,一旦写入,许多线程会同时访问该相同的值,并且每个线程都会吐出一个随机的浮点数。
- 每个工作线程都使用相同的函数,因此没有理由为所有不同的函数建立线程安全队列。我将公用函数存储在
thread_pool
类中。
到目前为止,这些功能是程序中计算量最大的方面。我试图避免的主要事情是阻止这些功能正常工作的所有锁。
- 所有这些函数的浮点输出都将被简单地平均。
- 用户只有一个名为
thread_pool::start_work
的功能,可以更改此共享输入,并告诉所有工作人员执行固定数量的任务。 -
thread_pool::start_work
返回std::future
以下是我到目前为止的内容。可以使用g++ test_tp.cpp -std=c++17 -lpthread; ./a.out
来构建和运行它,但是不幸的是,它要么死锁,要么工作次数过多(有时很少)。我认为这是因为m_num_comps_done
不是线程安全的。有可能所有线程都跳过最后一个计数,然后它们全部结束yield
。但这不是原子变量吗?
#include <vector>
#include <thread>
#include <mutex>
#include <shared_mutex>
#include <queue>
#include <atomic>
#include <future>
#include <iostream>
#include <numeric>
/**
* @class join_threads
* @brief RAII thread killer
*/
class join_threads
{
std::vector<std::thread>& m_threads;
public:
explicit join_threads(std::vector<std::thread>& threads_)
: m_threads(threads_) {}
~join_threads() {
for(unsigned long i=0; i < m_threads.size(); ++i) {
if(m_threads[i].joinable())
m_threads[i].join();
}
}
};
// how remove the first two template parameters ?
template<typename func_input_t,typename F>
class thread_pool
{
using func_output_t = typename std::result_of<F(func_input_t)>::type;
static_assert( std::is_floating_point<func_output_t>::value,"function output type must be floating point");
unsigned m_num_comps;
std::atomic_bool m_done;
std::atomic_bool m_has_an_input;
std::atomic<int> m_num_comps_done; // need to be atomic? why?
F m_f; // same function always used
func_input_t m_param; // changed occasionally by a single writer
func_output_t m_working_output; // many reader threads average all their output to get this
std::promise<func_output_t> m_out;
mutable std::shared_mutex m_mut;
mutable std::mutex m_output_mut;
std::vector<std::thread> m_threads;
join_threads m_joiner;
void worker_thread() {
while(!m_done)
{
if(m_has_an_input){
if( m_num_comps_done.load() < m_num_comps - 1 ) {
std::shared_lock<std::shared_mutex> lk(m_mut);
func_output_t tmp = m_f(m_param); // long time
m_num_comps_done++;
// quick
std::lock_guard<std::mutex> lk2(m_output_mut);
m_working_output += tmp / m_num_comps;
}else if(m_num_comps_done.load() == m_num_comps - 1){
std::shared_lock<std::shared_mutex> lk(m_mut);
func_output_t tmp = m_f(m_param); // long time
m_num_comps_done++;
std::lock_guard<std::mutex> lk2(m_output_mut);
m_working_output += tmp / m_num_comps;
m_num_comps_done++;
try{
m_out.set_value(m_working_output);
}catch(std::future_error& e){
std::cout << "future_error caught: " << e.what() << "\n";
}
}else{
std::this_thread::yield();
}
}else{
std::this_thread::yield();
}
}
}
public:
/**
* @brief ctor spawns working threads
*/
thread_pool(F f,unsigned num_comps)
: m_num_comps(num_comps),m_done(false),m_has_an_input(false),m_joiner(m_threads),m_f(f)
{
unsigned const thread_count=std::thread::hardware_concurrency(); // should I subtract one?
try {
for(unsigned i=0; i<thread_count; ++i) {
m_threads.push_back( std::thread(&thread_pool::worker_thread,this));
}
} catch(...) {
m_done=true;
throw;
}
}
~thread_pool() {
m_done=true;
}
/**
* @brief changes the shared data member,* resets the num_comps_left variable,* resets the accumulator thing to 0,and
* resets the promise object
*/
std::future<func_output_t> start_work(func_input_t new_param) {
std::unique_lock<std::shared_mutex> lk(m_mut);
m_param = new_param;
m_num_comps_done = 0;
m_working_output = 0.0;
m_out = std::promise<func_output_t>();
m_has_an_input = true; // only really matters just after initialization
return m_out.get_future();
}
};
double slowSum(std::vector<double> nums) {
// std::this_thread::sleep_for(std::chrono::milliseconds(200));
return std::accumulate(nums.begin(),nums.end(),0.0);
}
int main(){
// construct
thread_pool<std::vector<double>,std::function<double(std::vector<double>)>>
le_pool(slowSum,1000);
// add work
auto ans = le_pool.start_work(std::vector<double>{1.2,3.2,4213.1});
std::cout << "final answer is: " << ans.get() << "\n";
std::cout << "it should be 4217.5\n";
return 1;
}
解决方法
您检查“完成”计数,然后获取锁。这允许多个线程等待锁。特别是,可能没有线程进入第二个if
正文。
另一方面是因为您所有的线程都一直在运行,因此“最后一个”线程可能无法提早访问其独占部分(在运行足够的线程之前),甚至可能访问得较晚(因为其他线程正在等待)第一个循环中的互斥体。
要解决第一个问题,由于第二个if
块具有与第一个if
块相同的代码,因此只有一个块可以检查计数以查看是否已经到达终点,应该设置输出值。
第二个问题要求您在获取互斥锁之后再次检查m_num_comps_done
。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。