如何解决使用共享通道调度对多个设备的定期请求
我需要以可配置的间隔(每个设备)定期从可配置的设备中请求数据。所有设备都连接到共享数据总线,因此只有一个设备可以同时发送数据。
设备的内存很少,因此每个设备只能将数据保留一定的时间,然后再被下一个块覆盖。这意味着我需要确保在任何给定设备仍可用时向其请求数据,否则它将丢失。
我正在寻找一种算法,该算法在给定一系列设备及其各自的时序属性的情况下,找到可行的时间表以实现最小的数据丢失。
我想可以使用以下属性来正式描述每个设备:
data_interval
:下一个数据块变为可用所需的时间
max_request_interval
:两次请求之间不会造成数据丢失的最长时间
processing_time
:发送请求并完全接收包含所请求数据的相应响应所花费的时间
基本上,我需要确保在每台设备的数据准备就绪且尚未过期时向其请求数据,同时要牢记所有其他设备的截止日期。
是否存在某种针对此类问题的算法?我非常怀疑我是否是遇到这种情况的第一个人。在线搜索现有解决方案并没有产生很多有用的结果,主要是因为调度算法主要用于操作系统等,可以随意暂停和恢复调度的进程。但是,对于我来说,我无法做到这一点,因为请求和接收大块数据的过程是原子的,即只能完全执行或完全不能执行。
解决方法
我使用非抢占式截止日期单调调度解决了这个问题。
以下是一些有兴趣的人的python代码:
"""This module implements non-preemptive deadline monotonic scheduling (NPDMS) to compute a schedule of periodic,non-preemptable requests to slave devices connected to a shared data bus"""
from math import gcd
from functools import reduce
from typing import List
class Slave:
def __init__(self,name: str,period: int,processing_time: int,offset=0,deadline=None):
self.name = name
self.period = int(period)
self.processing_time = int(processing_time)
self.offset = int(offset)
if self.offset >= self.period:
raise ValueError("Slave %s: offset must be < period" % name)
self.deadline = int(deadline) if deadline else self.period
if self.deadline > self.period:
raise ValueError("Slave %s: deadline must be <= period" % name)
class Request:
def __init__(self,slave: Slave,start_time: int):
self.slave = slave
self.start_time = start_time
self.end_time = start_time + slave.processing_time
self.duration = self.end_time - self.start_time
def overlaps_with(self,other: 'Request'):
min_duration = self.duration + other.duration
start = min(other.start_time,self.start_time)
end = max(other.end_time,self.end_time)
effective_duration = end - start
return effective_duration < min_duration
class Scenario:
def __init__(self,*slaves: Slave):
self.slaves = list(slaves)
self.slaves.sort(key=lambda slave: slave.deadline)
# LCM of all slave periods
self.cycle_period = reduce(lambda a,b: a * b // gcd(a,b),[slave.period for slave in slaves])
def compute_schedule(self,resolution=1) -> 'Schedule':
request_pool = []
for t in range(0,self.cycle_period,resolution):
for slave in self.slaves:
if (t - slave.offset) % slave.period == 0 and t >= slave.offset:
request_pool.append(Request(slave,t))
request_pool.reverse()
scheduled_requests = []
current_request = request_pool.pop()
t = current_request.start_time
while t < self.cycle_period:
ongoing_request = Request(current_request.slave,t)
while ongoing_request.start_time <= t < ongoing_request.end_time:
t += resolution
scheduled_requests.append(ongoing_request)
if len(request_pool):
current_request = request_pool.pop()
t = max(current_request.start_time,t)
else:
current_request = None
break
if current_request:
request_pool.append(current_request)
return Schedule(self,scheduled_requests,request_pool)
class Schedule:
def __init__(self,scenario: Scenario,requests: List[Request],unscheduled: List[Request] = None):
self.scenario = scenario
self.requests = requests
self.unscheduled_requests = unscheduled if unscheduled else []
self._utilization = 0
for slave in self.scenario.slaves:
self._utilization += float(slave.processing_time) / float(slave.period)
self._missed_deadlines_dict = {}
for slave in self.scenario.slaves:
periods = scenario.cycle_period // slave.period
missed_deadlines = []
for period in range(periods):
start = period * slave.period
end = start + slave.period
request = self._find_request(slave,start,end)
if request:
if request.start_time < (start + slave.offset) or request.end_time > start + slave.deadline:
missed_deadlines.append(request)
if missed_deadlines:
self._missed_deadlines_dict[slave] = missed_deadlines
self._overlapping_requests = []
for i in range(0,len(requests)):
if i == 0:
continue
previous_request = requests[i - 1]
current_request = requests[i]
if current_request.overlaps_with(previous_request):
self._overlapping_requests.append((current_request,previous_request))
self._incomplete_requests = []
for request in self.requests:
if request.duration < request.slave.processing_time:
self._incomplete_requests.append(request)
@property
def is_feasible(self) -> bool:
return self.utilization <= 1 \
and not self.has_missed_deadlines \
and not self.has_overlapping_requests \
and not self.has_unscheduled_requests \
and not self.has_incomplete_requests
@property
def utilization(self) -> float:
return self._utilization
@property
def has_missed_deadlines(self) -> bool:
return len(self._missed_deadlines_dict) > 0
@property
def has_overlapping_requests(self) -> bool:
return len(self._overlapping_requests) > 0
@property
def has_unscheduled_requests(self) -> bool:
return len(self.unscheduled_requests) > 0
@property
def has_incomplete_requests(self) -> bool:
return len(self._incomplete_requests) > 0
def _find_request(self,slave,end) -> [Request,None]:
for r in self.requests:
if r.slave == slave and r.start_time >= start and r.end_time < end:
return r
return None
def read_scenario(file) -> Scenario:
from csv import DictReader
return Scenario(*[Slave(**row) for row in DictReader(file)])
def write_schedule(schedule: Schedule,file):
from csv import DictWriter
writer = DictWriter(file,fieldnames=["name","start","end"])
writer.writeheader()
for request in schedule.requests:
writer.writerow({"name": request.slave.name,"start": request.start_time,"end": request.end_time})
if __name__ == '__main__':
import argparse
import sys
parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter,description='Use non-preemptive deadline monotonic scheduling (NPDMS) to\n'
'compute a schedule of periodic,non-preemptable requests to\n'
'slave devices connected to a shared data bus.\n\n'
'Prints the computed schedule to stdout as CSV. Returns with\n'
'exit code 0 if the schedule is feasible,else 1.')
parser.add_argument("csv_file",metavar="SCENARIO",type=str,help="A csv file describing the scenario,i.e. a list\n"
"of slave devices with the following properties:\n"
"* name: name/id of the slave device\n\n"
"* period: duration of the period of time during\n"
" which requests must be dispatched\n\n"
"* processing_time: amount of time it takes to\n"
" fully process a request (worst-case)\n\n"
"* offset: offset for initial phase-shifting\n"
" (default: 0)\n\n"
"* deadline: amount of time during which data is\n"
" available after the start of each period\n"
" (default: <period>)")
parser.add_argument("-r","--resolution",type=int,default=1,help="The resolution used to simulate the passage of time (default: 1)")
args = parser.parse_args()
with open(args.csv_file,'r') as f:
schedule = read_scenario(f).compute_schedule(args.resolution)
write_schedule(schedule,sys.stdout)
exit(0 if schedule.is_feasible else 1)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。