为什么此线程池死锁或运行太多次?

如何解决为什么此线程池死锁或运行太多次?

我正在尝试在c++中编写一个满足以下条件的线程池:

  • 单个编写器偶尔会写一个新的输入值,一旦写入,许多线程会同时访问该相同的值,并且每个线程都会吐出一个随机的浮点数。
  • 每个工作线程都使用相同的函数,因此没有理由为所有不同的函数建立线程安全队列。我将公用函数存储在thread_pool类中。
  • 到目前为止,这些功能是程序中计算量最大的方面。我试图避免的主要事情是阻止这些功能正常工作的所有锁。
  • 所有这些函数的浮点输出都将被简单地平均。
  • 用户只有一个名为thread_pool::start_work的功能,可以更改此共享输入,并告诉所有工作人员执行固定数量的任务。
  • thread_pool::start_work返回std::future

以下是我到目前为止的内容。可以使用g++ test_tp.cpp -std=c++17 -lpthread; ./a.out来构建和运行它,但是不幸的是,它要么死锁,要么工作次数过多(有时很少)。我认为这是因为m_num_comps_done不是线程安全的。有可能所有线程都跳过最后一个计数,然后它们全部结束yield。但这不是原子变量吗?

#include <vector>
#include <thread>
#include <mutex>
#include <shared_mutex>
#include <queue>
#include <atomic>
#include <future>

#include <iostream>
#include <numeric>

/**
 * @class join_threads
 * @brief RAII thread killer
 */
class join_threads
{
    std::vector<std::thread>& m_threads;
public:

    explicit join_threads(std::vector<std::thread>& threads_)
        : m_threads(threads_) {}

    ~join_threads() {
        for(unsigned long i=0; i < m_threads.size(); ++i) {
            if(m_threads[i].joinable())
                m_threads[i].join();
        }
    }
};


// how remove the first two template parameters ?
template<typename func_input_t,typename F>
class thread_pool
{

    using func_output_t = typename std::result_of<F(func_input_t)>::type;

    static_assert( std::is_floating_point<func_output_t>::value,"function output type must be floating point");

    unsigned m_num_comps;
    std::atomic_bool m_done;
    std::atomic_bool m_has_an_input;
    std::atomic<int> m_num_comps_done; // need to be atomic? why?
    F m_f; // same function always used
    func_input_t m_param; // changed occasionally by a single writer
    func_output_t m_working_output; // many reader threads average all their output to get this
    std::promise<func_output_t> m_out;
    mutable std::shared_mutex m_mut;
    mutable std::mutex m_output_mut;
    std::vector<std::thread> m_threads;
    join_threads m_joiner;
    
    void worker_thread() {

        while(!m_done)
        {
            if(m_has_an_input){
                if( m_num_comps_done.load() < m_num_comps - 1 ) {
                    
                    std::shared_lock<std::shared_mutex> lk(m_mut);
                    func_output_t tmp = m_f(m_param); // long time
                    m_num_comps_done++;

                    // quick
                    std::lock_guard<std::mutex> lk2(m_output_mut);
                    m_working_output += tmp / m_num_comps;
                
                }else if(m_num_comps_done.load() == m_num_comps - 1){
                    
                    std::shared_lock<std::shared_mutex> lk(m_mut);
                    func_output_t tmp = m_f(m_param); // long time
                    m_num_comps_done++;

                    std::lock_guard<std::mutex> lk2(m_output_mut);
                    m_working_output += tmp / m_num_comps;
                    m_num_comps_done++;

                    try{                    
                        m_out.set_value(m_working_output);
                    }catch(std::future_error& e){
                        std::cout << "future_error caught: " << e.what() << "\n";
                    }

                }else{
                    std::this_thread::yield();
                }

            }else{
                  std::this_thread::yield();
            }
        }
    }

public:
   
    /**
     * @brief ctor spawns working threads
     */  
    thread_pool(F f,unsigned num_comps) 
        : m_num_comps(num_comps),m_done(false),m_has_an_input(false),m_joiner(m_threads),m_f(f) 
    {

        unsigned const thread_count=std::thread::hardware_concurrency(); // should I subtract one?

        try {
            for(unsigned i=0; i<thread_count; ++i) {
                m_threads.push_back( std::thread(&thread_pool::worker_thread,this));
            }
        } catch(...) {
            m_done=true;
            throw;
        }
    }

    ~thread_pool() {
        m_done=true;
    }


    /**
     * @brief changes the shared data member,* resets the num_comps_left variable,* resets the accumulator thing to 0,and
     * resets the promise object
     */
    std::future<func_output_t> start_work(func_input_t new_param) {
        std::unique_lock<std::shared_mutex> lk(m_mut);
        m_param = new_param;
        m_num_comps_done = 0;
        m_working_output = 0.0;
        m_out = std::promise<func_output_t>();
        m_has_an_input = true; // only really matters just after initialization
        return m_out.get_future();
    }
};


double slowSum(std::vector<double> nums) {
//    std::this_thread::sleep_for(std::chrono::milliseconds(200));
    return std::accumulate(nums.begin(),nums.end(),0.0);
}

int main(){

    // construct
    thread_pool<std::vector<double>,std::function<double(std::vector<double>)>> 
        le_pool(slowSum,1000);
    
    // add work
    auto ans = le_pool.start_work(std::vector<double>{1.2,3.2,4213.1}); 
    std::cout << "final answer is: " << ans.get() << "\n";
    std::cout << "it should be 4217.5\n";

    return 1;
}

解决方法

您检查“完成”计数,然后获取锁。这允许多个线程等待锁。特别是,可能没有线程进入第二个if正文。

另一方面是因为您所有的线程都一直在运行,因此“最后一个”线程可能无法提早访问其独占部分(在运行足够的线程之前),甚至可能访问得较晚(因为其他线程正在等待)第一个循环中的互斥体。

要解决第一个问题,由于第二个if块具有与第一个if块相同的代码,因此只有一个块可以检查计数以查看是否已经到达终点,应该设置输出值。

第二个问题要求您在获取互斥锁之后再次检查m_num_comps_done

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-