粒度锁定共享内存缓冲区

如何解决粒度锁定共享内存缓冲区

我需要创建一个进程要推送到的共享内存,另一个进程要从中采样。为了最大程度地减少共享内存被锁定的时间,我尝试为共享内存缓冲区中的每个索引创建一个锁。

运行此命令时,出现错误:FirebaseError: Firebase: Firebase App named '[DEFAULT]' already exists (app/duplicate-app). at initializeApp (https://www.gstatic.com/firebasejs/7.6.1/firebase.js:1:72088) at Object.Os.initializeApp (https://www.gstatic.com/firebasejs/7.6.1/firebase.js:1:75732) at _callee$ (webpack-internal:///./src/reducers/global/firebase/actions.ts:178:26) 。 (以下是堆栈跟踪的缩写)

如何对长度大于100,000的共享内存缓冲区实现粒度控制,以便读取某些段而写入其他段?我还可以使用其他构造吗?

OSError: [Errno 23] Too many open files in system

我正在使用docker镜像In [1]: import multiprocessing as mp ...: from multiprocessing.shared_memory import SharedMemory ...: ...: class Memory: ...: def __init__(self,length): ...: self.shm = SharedMemory(create=True,size=length) ...: self.locks = [mp.Lock() for _ in range(length)] ...: ...: def __getitem__(self,item): ...: return int.from_bytes(self.shm.buf[item],'big') ...: ...: def __setitem__(self,key,value): ...: assert isinstance(value,int) ...: self.shm.buf[key] = value.to_bytes(1,'big') ...: In [2]: m = Memory(100_000) --------------------------------------------------------------------------- OSError Traceback (most recent call last) <ipython-input-2-2c6433483d72> in <module> ----> 1 m = Memory(100_000) ... OSError: [Errno 23] Too many open files in system 在Kubernetes上运行它。

gitlab-registry.nautilus.optiputer.net/ian/torch:latest返回ulimit。 无论如何,在Kubernetes上设置unlimited显然是open issue

更多详细信息:

  • 在实际的实现中,我设置了跨步,以使每个键索引ulimit个字节组成六个不同的对象,这些对象是28240int数组的混合。
  • 输入由我试图保持为空的队列提供。即使我在整个缓冲区上设置了锁定,这方面的工作也很好。

推力工人:

numpy
  • 如果我在整个缓冲区上设置了锁定,则即使我将其设置为比应有的大小大,输出队列也会在满和空之间振荡。我试图保持输出队列满。

样本工作者:

def _push_worker(self) -> None:
    buffer_len = 100
    while True:
        sample = self.replay_in_queue.get()
        self.buffer_in.append(sample)
        if len(self.buffer_in) >= self.initial_memory // buffer_len:
            index = self.sample_count % self.memory_maxlen
            self.memory[index: index + buffer_len] = self.buffer_in
            self.sample_count += buffer_len
            self.buffer_in = []
  • 我试图锁定缓冲区的小块,效果很好,但是我想知道是否有完美的解决方案来解决我的问题。

解决方法

我的解决方案是锁定内存块。这需要微调块的数量以获得最佳性能。


class Memory:
    """
    A shared memory utility class
    """
    # n_bytes:
    #   2 4x84x84 uint8 arrays
    #   4 32-bit (4-byte) numbers
    #   1 bool (1-byte)
    int_size = 4
    array_dtype = 'uint8'
    array_bytes = 4 * 84 * 84
    array_shape = (4,84,84)
    stride = 2 * array_bytes + 4 * int_size + 1
    _offset = 0

    def __init__(self,length: int):
        self._length = length
        self._shared_memory = SharedMemory(create=True,size=self.stride * length)

        _n_locks = 1_000
        self._locks = [mp.Lock() for _ in range(_n_locks)]
        self._lock_length = length // _n_locks
        assert self._lock_length == length / _n_locks,"length must be divisible by _n_locks"

    def __del__(self):
        self._shared_memory.unlink()

    @property
    def _buf(self):
        return self._shared_memory.buf

    def __len__(self):
        return self._length

    def __getitem__(self,index: Union[slice,int]):
        if isinstance(index,int):
            return self._get_item(index)
        elif isinstance(index,slice):
            return self._get_slice(index)
        else:
            raise IndexError

    def _get_slice(self,slice_: slice):
        start = slice_.start if slice_.start is not None else 0
        step = slice_.step if slice_.step is not None else 1
        stop = slice_.stop if slice_.stop is not None else self._length
        if slice_.stop > self._length:
            raise IndexError
        return [self._get_item(i % self._length) for i in range(start,stop,step)]

    # todo: use __get_slice__ and __set_slice__
    def _get_item(self,index):
        if index < 0 or index > self._length:
            raise IndexError(f"index {index} out of bounds")

        with self._locks[index // self._lock_length]:
            self._offset = index * self.stride

            actor_id = int.from_bytes(self._get(self.int_size),'big')
            step_number = int.from_bytes(self._get(self.int_size),'big')
            state = np.frombuffer(self._get(self.array_bytes),dtype='uint8').reshape(self.array_shape)
            action = int.from_bytes(self._get(self.int_size),'big')
            next_state = np.frombuffer(self._get(self.array_bytes),dtype='uint8').reshape(self.array_shape)
            reward = int.from_bytes(self._get(self.int_size),'big',signed=True)
            done = int.from_bytes(self._get(1),'big')
            if done:
                next_state = None
            return Transition(actor_id,step_number,state,action,next_state,reward,done)

    def _get(self,n_bytes: int) -> bytes:
        """
        Get item at `_offset` and move forward `n_bytes`

        :param n_bytes: Number of bytes to retrieve from memory
        :return: bytes copied from memory
        """
        item = self._buf[self._offset: self._offset + n_bytes]
        self._offset += n_bytes
        return item.tobytes()

    def __setitem__(self,index: Union[int,slice],transition: Union[List[Transition],Transition]):
        """
        Store `transition` in shared memory

        :param index: Index of the memory location to store
        :param transition: a `Transition`
        """
        if isinstance(index,int):
            assert isinstance(transition,Transition)
            self._set_item(index,transition)
        elif isinstance(index,slice):
            assert isinstance(transition,List)
            self._set_slice(index,transition)
        else:
            raise IndexError

    def _set_slice(self,slice_: slice,transitions: List[Transition]):
        start = slice_.start if slice_.start is not None else 0
        step = slice_.step if slice_.step is not None else 1
        stop = slice_.stop if slice_.stop is not None else self._length
        for i,t in zip(range(start,step),transitions):
            self._set_item(i % self._length,t)

    def _set_item(self,index,transition):
        if index < 0 or index > self._length:
            raise IndexError(f"index {index} out of bounds")

        with self._locks[index // self._lock_length]:
            self._offset = index * self.stride

            # 'actor_id','step_number','state','action','next_state','reward','done'
            self._set(transition.actor_id.to_bytes(self.int_size,'big'))
            self._set(transition.step_number.to_bytes(self.int_size,'big'))
            self._set(transition.state.tobytes())
            self._set(transition.action.to_bytes(self.int_size,'big'))
            if transition.next_state is not None:
                self._set(transition.next_state.tobytes())
            else:
                self._offset += self.array_bytes
            self._set(int(transition.reward).to_bytes(self.int_size,signed=True))
            self._set(transition.done.to_bytes(1,'big'))

    def _set(self,bytearray_: Union[bytearray,bytes]):
        """
        update `_buf` and move `_offset`

        :param bytearray_: a bytearray
        """
        len_ = len(bytearray_)
        self._buf[self._offset: self._offset + len_] = bytearray_
        self._offset = self._offset + len_

    def __iter__(self):
        for i in range(self._length):
            yield self[i]

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-