如何解决粒度锁定共享内存缓冲区
我需要创建一个进程要推送到的共享内存,另一个进程要从中采样。为了最大程度地减少共享内存被锁定的时间,我尝试为共享内存缓冲区中的每个索引创建一个锁。
运行此命令时,出现错误:FirebaseError: Firebase: Firebase App named '[DEFAULT]' already exists (app/duplicate-app).
at initializeApp (https://www.gstatic.com/firebasejs/7.6.1/firebase.js:1:72088)
at Object.Os.initializeApp (https://www.gstatic.com/firebasejs/7.6.1/firebase.js:1:75732)
at _callee$ (webpack-internal:///./src/reducers/global/firebase/actions.ts:178:26)
。 (以下是堆栈跟踪的缩写)
如何对长度大于100,000的共享内存缓冲区实现粒度控制,以便读取某些段而写入其他段?我还可以使用其他构造吗?
OSError: [Errno 23] Too many open files in system
我正在使用docker镜像In [1]: import multiprocessing as mp
...: from multiprocessing.shared_memory import SharedMemory
...:
...: class Memory:
...: def __init__(self,length):
...: self.shm = SharedMemory(create=True,size=length)
...: self.locks = [mp.Lock() for _ in range(length)]
...:
...: def __getitem__(self,item):
...: return int.from_bytes(self.shm.buf[item],'big')
...:
...: def __setitem__(self,key,value):
...: assert isinstance(value,int)
...: self.shm.buf[key] = value.to_bytes(1,'big')
...:
In [2]: m = Memory(100_000)
---------------------------------------------------------------------------
OSError Traceback (most recent call last)
<ipython-input-2-2c6433483d72> in <module>
----> 1 m = Memory(100_000)
...
OSError: [Errno 23] Too many open files in system
在Kubernetes上运行它。
gitlab-registry.nautilus.optiputer.net/ian/torch:latest
返回ulimit
。
无论如何,在Kubernetes上设置unlimited
显然是open issue。
更多详细信息:
- 在实际的实现中,我设置了跨步,以使每个键索引
ulimit
个字节组成六个不同的对象,这些对象是28240
和int
数组的混合。 - 输入由我试图保持为空的队列提供。即使我在整个缓冲区上设置了锁定,这方面的工作也很好。
推力工人:
numpy
- 如果我在整个缓冲区上设置了锁定,则即使我将其设置为比应有的大小大,输出队列也会在满和空之间振荡。我试图保持输出队列满。
样本工作者:
def _push_worker(self) -> None:
buffer_len = 100
while True:
sample = self.replay_in_queue.get()
self.buffer_in.append(sample)
if len(self.buffer_in) >= self.initial_memory // buffer_len:
index = self.sample_count % self.memory_maxlen
self.memory[index: index + buffer_len] = self.buffer_in
self.sample_count += buffer_len
self.buffer_in = []
- 我试图锁定缓冲区的小块,效果很好,但是我想知道是否有完美的解决方案来解决我的问题。
解决方法
我的解决方案是锁定内存块。这需要微调块的数量以获得最佳性能。
class Memory:
"""
A shared memory utility class
"""
# n_bytes:
# 2 4x84x84 uint8 arrays
# 4 32-bit (4-byte) numbers
# 1 bool (1-byte)
int_size = 4
array_dtype = 'uint8'
array_bytes = 4 * 84 * 84
array_shape = (4,84,84)
stride = 2 * array_bytes + 4 * int_size + 1
_offset = 0
def __init__(self,length: int):
self._length = length
self._shared_memory = SharedMemory(create=True,size=self.stride * length)
_n_locks = 1_000
self._locks = [mp.Lock() for _ in range(_n_locks)]
self._lock_length = length // _n_locks
assert self._lock_length == length / _n_locks,"length must be divisible by _n_locks"
def __del__(self):
self._shared_memory.unlink()
@property
def _buf(self):
return self._shared_memory.buf
def __len__(self):
return self._length
def __getitem__(self,index: Union[slice,int]):
if isinstance(index,int):
return self._get_item(index)
elif isinstance(index,slice):
return self._get_slice(index)
else:
raise IndexError
def _get_slice(self,slice_: slice):
start = slice_.start if slice_.start is not None else 0
step = slice_.step if slice_.step is not None else 1
stop = slice_.stop if slice_.stop is not None else self._length
if slice_.stop > self._length:
raise IndexError
return [self._get_item(i % self._length) for i in range(start,stop,step)]
# todo: use __get_slice__ and __set_slice__
def _get_item(self,index):
if index < 0 or index > self._length:
raise IndexError(f"index {index} out of bounds")
with self._locks[index // self._lock_length]:
self._offset = index * self.stride
actor_id = int.from_bytes(self._get(self.int_size),'big')
step_number = int.from_bytes(self._get(self.int_size),'big')
state = np.frombuffer(self._get(self.array_bytes),dtype='uint8').reshape(self.array_shape)
action = int.from_bytes(self._get(self.int_size),'big')
next_state = np.frombuffer(self._get(self.array_bytes),dtype='uint8').reshape(self.array_shape)
reward = int.from_bytes(self._get(self.int_size),'big',signed=True)
done = int.from_bytes(self._get(1),'big')
if done:
next_state = None
return Transition(actor_id,step_number,state,action,next_state,reward,done)
def _get(self,n_bytes: int) -> bytes:
"""
Get item at `_offset` and move forward `n_bytes`
:param n_bytes: Number of bytes to retrieve from memory
:return: bytes copied from memory
"""
item = self._buf[self._offset: self._offset + n_bytes]
self._offset += n_bytes
return item.tobytes()
def __setitem__(self,index: Union[int,slice],transition: Union[List[Transition],Transition]):
"""
Store `transition` in shared memory
:param index: Index of the memory location to store
:param transition: a `Transition`
"""
if isinstance(index,int):
assert isinstance(transition,Transition)
self._set_item(index,transition)
elif isinstance(index,slice):
assert isinstance(transition,List)
self._set_slice(index,transition)
else:
raise IndexError
def _set_slice(self,slice_: slice,transitions: List[Transition]):
start = slice_.start if slice_.start is not None else 0
step = slice_.step if slice_.step is not None else 1
stop = slice_.stop if slice_.stop is not None else self._length
for i,t in zip(range(start,step),transitions):
self._set_item(i % self._length,t)
def _set_item(self,index,transition):
if index < 0 or index > self._length:
raise IndexError(f"index {index} out of bounds")
with self._locks[index // self._lock_length]:
self._offset = index * self.stride
# 'actor_id','step_number','state','action','next_state','reward','done'
self._set(transition.actor_id.to_bytes(self.int_size,'big'))
self._set(transition.step_number.to_bytes(self.int_size,'big'))
self._set(transition.state.tobytes())
self._set(transition.action.to_bytes(self.int_size,'big'))
if transition.next_state is not None:
self._set(transition.next_state.tobytes())
else:
self._offset += self.array_bytes
self._set(int(transition.reward).to_bytes(self.int_size,signed=True))
self._set(transition.done.to_bytes(1,'big'))
def _set(self,bytearray_: Union[bytearray,bytes]):
"""
update `_buf` and move `_offset`
:param bytearray_: a bytearray
"""
len_ = len(bytearray_)
self._buf[self._offset: self._offset + len_] = bytearray_
self._offset = self._offset + len_
def __iter__(self):
for i in range(self._length):
yield self[i]
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。