使用CUDA原子操作和网格同步处理共享工作队列

如何解决使用CUDA原子操作和网格同步处理共享工作队列

我正在尝试编写一个内核,该线程的线程迭代地处理工作队列中的项目。我的理解是,我应该能够通过以下方式做到这一点:使用原子操作来操纵工作队列(即,从队列中获取工作项并将新的工作项插入到队列中),并通过协作组使用网格同步来确保所有线程在同一迭代中(我确保线程块的数量不超过内核的设备容量)。但是,有时我发现工作项在迭代过程中被跳过或处理了多次。

下面的代码是一个有效的示例来说明这一点。在此示例中,创建了一个大小为input_len的数组,其中包含工作项0input_len - 1processWorkItems内核为max_iter迭代处理这些项目。每个工作项都可以将自己及其上一个和下一个工作项放入工作队列,但是marked数组用于确保在迭代过程中,每个工作项最多可以添加到工作队列一次。最后应该发生的是,histogram中的值之和等于input_len * max_iter,而histogram中的值均不大于1。但是我观察到这两个条件偶尔会出现在输出中被违反,这意味着我没有得到原子操作和/或正确的同步。如果有人可以指出我的推理和/或实施中的缺陷,我将不胜感激。我的操作系统是Ubuntu 18.04,CUDA版本是10.1,并且我已经在P100,V100和RTX 2080 Ti GPU上进行了实验,并观察到了类似的行为。

我用于RTX 2080 Ti编译的命令:

nvcc -O3 -o atomicsync atomicsync.cu --gpu-architecture=compute_75 -rdc=true

在RTX 2080 Ti上运行的一些输入和输出:

./atomicsync 50 1000 1000
Skipped 0.01% of items. 5 extra item processing.
./atomicsync 500 1000 1000
Skipped 0.00% of items. 6 extra item processing.
./atomicsync 5000 1000 1000
Skipped 0.00% of items. 14 extra item processing.

atomicsync.cu:

#include <stdio.h>
#include <cooperative_groups.h>

#define checkCudaErrors(val) check ( (val),#val,__FILE__,__LINE__ )
template< typename T >
void check(T result,char const *const func,const char *const file,int const line)
{
    if (result)
    {
        fprintf(stderr,"CUDA error at %s:%d code=%d(%s) \"%s\" \n",file,line,static_cast<unsigned int>(result),cudaGetErrorString(result),func);
        cudaDeviceReset();
        exit(EXIT_FAILURE);
    }
}

__device__ inline void addWorkItem(int input_len,int item,int item_adder,int iter,int *queue,int *queue_size,int *marked) {
    int already_marked = atomicExch(&marked[item],1);
    if(already_marked == 0) {
        int idx = atomicAdd(&queue_size[iter + 1],1);
        queue[(iter + 1) * input_len + idx] = item;
    }
}

__global__ void processWorkItems(int input_len,int max_iter,int *histogram,int *marked) {
    auto grid = cooperative_groups::this_grid();

    const int items_per_block = (input_len + gridDim.x - 1) / gridDim.x;

    for(int iter = 0; iter < max_iter; ++iter) {
        while(true) {
            // Grab work item to process
            int idx = atomicSub(&queue_size[iter],1);
            --idx;
            if(idx < 0) {
                break;
            }
            int item = queue[iter * input_len + idx];

            // Keep track of processed work items
             ++histogram[iter * input_len + item];

            // Add previous,self,and next work items to work queue
            if(item > 0) {
                addWorkItem(input_len,item - 1,item,iter,queue,queue_size,marked);
            }
            addWorkItem(input_len,marked);
            if(item + 1 < input_len) {
                addWorkItem(input_len,item + 1,marked);
            }
        }
        __threadfence_system();
        grid.sync();

        // Reset marked array for next iteration
        for(int i = 0; i < items_per_block; ++i) {
            if(blockIdx.x * items_per_block + i < input_len) {
                marked[blockIdx.x * items_per_block + i] = 0;
            }
        }
        __threadfence_system();
        grid.sync();
    }
}

int main(int argc,char* argv[])
{
    int input_len = atoi(argv[1]);
    int max_iter = atoi(argv[2]);
    int num_blocks = atoi(argv[3]);

    // A histogram to keep track of work items that have been processed in each iteration
    int histogram_host[input_len * max_iter];
    memset(histogram_host,sizeof(int) * input_len * max_iter);
    int *histogram_device;
    checkCudaErrors(cudaMalloc(&histogram_device,sizeof(int) * input_len * max_iter));
    checkCudaErrors(cudaMemcpy(histogram_device,histogram_host,sizeof(int) * input_len * max_iter,cudaMemcpyHostToDevice));

    // Size of the work queue for each iteration
    int queue_size_host[max_iter + 1];
    queue_size_host[0] = input_len;
    memset(&queue_size_host[1],sizeof(int) * max_iter);
    int *queue_size_device;
    checkCudaErrors(cudaMalloc(&queue_size_device,sizeof(int) * (max_iter + 1)));
    checkCudaErrors(cudaMemcpy(queue_size_device,queue_size_host,sizeof(int) * (max_iter + 1),cudaMemcpyHostToDevice));

    // Work queue
    int queue_host[input_len * (max_iter + 1)];
    for(int i = 0; i < input_len; ++i) {
        queue_host[i] = i;
    }
    memset(&queue_host[input_len],sizeof(int) * input_len * max_iter);
    int *queue_device;
    checkCudaErrors(cudaMalloc(&queue_device,sizeof(int) * input_len * (max_iter + 1)));
    checkCudaErrors(cudaMemcpy(queue_device,queue_host,sizeof(int) * input_len * (max_iter + 1),cudaMemcpyHostToDevice));

    // An array used to keep track of work items already added to the work queue to
    // avoid multiple additions of a work item in the same iteration
    int marked_host[input_len];
    memset(marked_host,sizeof(int) * input_len);
    int *marked_device;
    checkCudaErrors(cudaMalloc(&marked_device,sizeof(int) * input_len));
    checkCudaErrors(cudaMemcpy(marked_device,marked_host,sizeof(int) * input_len,cudaMemcpyHostToDevice));

    const dim3 threads(1,1,1);
    const dim3 blocks(num_blocks,1);

    processWorkItems<<<blocks,threads>>>(input_len,max_iter,histogram_device,queue_device,queue_size_device,marked_device);
    checkCudaErrors(cudaDeviceSynchronize());

    checkCudaErrors(cudaMemcpy(histogram_host,cudaMemcpyDeviceToHost));

    int extra = 0;
    double deficit = 0;
    for(int i = 0; i < input_len; ++i) {
        int cnt = 0;
        for(int iter = 0; iter < max_iter; ++iter) {
            if(histogram_host[iter * input_len + i] > 1) {
                ++extra;
            }
            cnt += histogram_host[iter * input_len + i];
        }
        deficit += max_iter - cnt;
    }
    printf("Skipped %.2f%% of items. %d extra item processing.\n",deficit / (input_len * max_iter) * 100,extra);

    checkCudaErrors(cudaFree(histogram_device));
    checkCudaErrors(cudaFree(queue_device));
    checkCudaErrors(cudaFree(queue_size_device));
    checkCudaErrors(cudaFree(marked_device));

    return 0;
}

解决方法

您可能希望阅读如何在programming gude中启动协作网格内核,或者研究使用网格同步的任何cuda示例代码(例如reductionMultiBlockCG,还有其他)。

您做错了。您不能使用普通的<<<...>>>启动语法来启动协作网格。因此,没有理由假设内核中的grid.sync()工作正常。

通过在cuda-memcheck下运行,很容易看到网格同步在代码中不起作用。当您这样做时,结果将大大恶化。

当我修改您的代码以进行适当的协作启动时,Tesla V100上没有问题:

$ cat t1811.cu
#include <stdio.h>
#include <cooperative_groups.h>

#define checkCudaErrors(val) check ( (val),#val,__FILE__,__LINE__ )
template< typename T >
void check(T result,char const *const func,const char *const file,int const line)
{
    if (result)
    {
        fprintf(stderr,"CUDA error at %s:%d code=%d(%s) \"%s\" \n",file,line,static_cast<unsigned int>(result),cudaGetErrorString(result),func);
        cudaDeviceReset();
        exit(EXIT_FAILURE);
    }
}

__device__ inline void addWorkItem(int input_len,int item,int item_adder,int iter,int *queue,int *queue_size,int *marked) {
    int already_marked = atomicExch(&marked[item],1);
    if(already_marked == 0) {
        int idx = atomicAdd(&queue_size[iter + 1],1);
        queue[(iter + 1) * input_len + idx] = item;
    }
}

__global__ void processWorkItems(int input_len,int max_iter,int *histogram,int *marked) {
    auto grid = cooperative_groups::this_grid();

    const int items_per_block = (input_len + gridDim.x - 1) / gridDim.x;

    for(int iter = 0; iter < max_iter; ++iter) {
        while(true) {
            // Grab work item to process
            int idx = atomicSub(&queue_size[iter],1);
            --idx;
            if(idx < 0) {
                break;
            }
            int item = queue[iter * input_len + idx];

            // Keep track of processed work items
             ++histogram[iter * input_len + item];

            // Add previous,self,and next work items to work queue
            if(item > 0) {
                addWorkItem(input_len,item - 1,item,iter,queue,queue_size,marked);
            }
            addWorkItem(input_len,marked);
            if(item + 1 < input_len) {
                addWorkItem(input_len,item + 1,marked);
            }
        }
        __threadfence_system();
        grid.sync();

        // Reset marked array for next iteration
        for(int i = 0; i < items_per_block; ++i) {
            if(blockIdx.x * items_per_block + i < input_len) {
                marked[blockIdx.x * items_per_block + i] = 0;
            }
        }
        __threadfence_system();
        grid.sync();
    }
}

int main(int argc,char* argv[])
{
    int input_len = atoi(argv[1]);
    int max_iter = atoi(argv[2]);
    int num_blocks = atoi(argv[3]);

    // A histogram to keep track of work items that have been processed in each iteration
    int *histogram_host = new int[input_len * max_iter];
    memset(histogram_host,sizeof(int) * input_len * max_iter);
    int *histogram_device;
    checkCudaErrors(cudaMalloc(&histogram_device,sizeof(int) * input_len * max_iter));
    checkCudaErrors(cudaMemcpy(histogram_device,histogram_host,sizeof(int) * input_len * max_iter,cudaMemcpyHostToDevice));

    // Size of the work queue for each iteration
    int queue_size_host[max_iter + 1];
    queue_size_host[0] = input_len;
    memset(&queue_size_host[1],sizeof(int) * max_iter);
    int *queue_size_device;
    checkCudaErrors(cudaMalloc(&queue_size_device,sizeof(int) * (max_iter + 1)));
    checkCudaErrors(cudaMemcpy(queue_size_device,queue_size_host,sizeof(int) * (max_iter + 1),cudaMemcpyHostToDevice));

    // Work queue
    int *queue_host = new int[input_len * (max_iter + 1)];
    for(int i = 0; i < input_len; ++i) {
        queue_host[i] = i;
    }
    memset(&queue_host[input_len],sizeof(int) * input_len * max_iter);
    int *queue_device;
    checkCudaErrors(cudaMalloc(&queue_device,sizeof(int) * input_len * (max_iter + 1)));
    checkCudaErrors(cudaMemcpy(queue_device,queue_host,sizeof(int) * input_len * (max_iter + 1),cudaMemcpyHostToDevice));

    // An array used to keep track of work items already added to the work queue to
    // avoid multiple additions of a work item in the same iteration
    int marked_host[input_len];
    memset(marked_host,sizeof(int) * input_len);
    int *marked_device;
    checkCudaErrors(cudaMalloc(&marked_device,sizeof(int) * input_len));
    checkCudaErrors(cudaMemcpy(marked_device,marked_host,sizeof(int) * input_len,cudaMemcpyHostToDevice));

    const dim3 threads(1,1,1);
    const dim3 blocks(num_blocks,1);
    int dev = 0;
    int supportsCoopLaunch = 0;
    checkCudaErrors(cudaDeviceGetAttribute(&supportsCoopLaunch,cudaDevAttrCooperativeLaunch,dev));
    if (!supportsCoopLaunch) {printf("Cooperative Launch is not supported on this machine configuration.  Exiting."); return 0;}
    /// This will launch a grid that can maximally fill the GPU,on the default stream with kernel arguments
    int numBlocksPerSm = 0;
    // Number of threads my_kernel will be launched with
    int numThreads = threads.x;
    cudaDeviceProp deviceProp;
    checkCudaErrors(cudaGetDeviceProperties(&deviceProp,dev));
    checkCudaErrors(cudaOccupancyMaxActiveBlocksPerMultiprocessor(&numBlocksPerSm,processWorkItems,numThreads,0));
    // launch
    void *kernelArgs[] = { &input_len,&max_iter,&histogram_device,&queue_device,&queue_size_device,&marked_device};
    dim3 dimBlock = dim3(numThreads,1);
    num_blocks = min(num_blocks,deviceProp.multiProcessorCount*numBlocksPerSm);
    dim3 dimGrid(num_blocks,1);
    printf("launching %d blocks\n",dimGrid.x);
    checkCudaErrors(cudaLaunchCooperativeKernel((void*)processWorkItems,dimGrid,dimBlock,kernelArgs));

    // processWorkItems<<<blocks,threads>>>(input_len,max_iter,histogram_device,queue_device,queue_size_device,marked_device);
    checkCudaErrors(cudaDeviceSynchronize());

    checkCudaErrors(cudaMemcpy(histogram_host,cudaMemcpyDeviceToHost));

    int extra = 0;
    double deficit = 0;
    for(int i = 0; i < input_len; ++i) {
        int cnt = 0;
        for(int iter = 0; iter < max_iter; ++iter) {
            if(histogram_host[iter * input_len + i] > 1) {
                ++extra;
            }
            cnt += histogram_host[iter * input_len + i];
        }
        deficit += max_iter - cnt;
    }
    printf("Skipped %.2f%% of items. %d extra item processing.\n",deficit / (input_len * max_iter) * 100,extra);

    checkCudaErrors(cudaFree(histogram_device));
    checkCudaErrors(cudaFree(queue_device));
    checkCudaErrors(cudaFree(queue_size_device));
    checkCudaErrors(cudaFree(marked_device));

    return 0;
}
$ nvcc -o t1811 t1811.cu -arch=sm_70 -std=c++11 -rdc=true
$ cuda-memcheck ./t1811 50 1000 5000
========= CUDA-MEMCHECK
launching 2560 blocks
Skipped 0.00% of items. 0 extra item processing.
========= ERROR SUMMARY: 0 errors
$ cuda-memcheck ./t1811 50 1000 1000
========= CUDA-MEMCHECK
launching 1000 blocks
Skipped 0.00% of items. 0 extra item processing.
========= ERROR SUMMARY: 0 errors
$ ./t1811 50 1000 5000
launching 2560 blocks
Skipped 0.00% of items. 0 extra item processing.
$ ./t1811 50 1000 1000
launching 1000 blocks
Skipped 0.00% of items. 0 extra item processing.
$ ./t1811 50 1000 1000
launching 1000 blocks
Skipped 0.00% of items. 0 extra item processing.
$

我并不是说上面的代码没有缺陷,也不适合任何特定目的。它主要是您的代码。我已经对其进行了修改,以演示所提到的概念。

顺便说一句,我将一些基于堆栈的大型内存分配更改为基于堆的内存分配。我不建议尝试创建基于堆栈的大型数组,例如:

int histogram_host[input_len * max_iter];

我认为最好这样做:

int *histogram_host = new int[input_len * max_iter];

随着输入的命令行参数变大,取决于机器的特性,这可能成为问题。但是,这与CUDA无关。我没有尝试在您的代码中解决此模式的每个实例。

尽管与该特定问题无关,但是网格同步对于成功使用也有其他要求。这些内容涵盖在编程指南中,并且可能包括但不限于:

  • 平台支持(例如OS,GPU等)
  • 内核大小要求(启动的线程或线程块总数)

编程指南包含方便的样板代码,可用于满足这些要求。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-