使用共享通道调度对多个设备的定期请求

如何解决使用共享通道调度对多个设备的定期请求

我需要以可配置的间隔(每个设备)定期从可配置的设备中请求数据。所有设备都连接到共享数据总线,因此只有一个设备可以同时发送数据。

设备的内存很少,因此每个设备只能将数据保留一定的时间,然后再被下一个块覆盖。这意味着我需要确保在任何给定设备仍可用时向其请求数据,否则它将丢失。

我正在寻找一种算法,该算法在给定一系列设备及其各自的时序属性的情况下,找到可行的时间表以实现最小的数据丢失。

我想可以使用以下属性来正式描述每个设备:

data_interval:下一个数据块变为可用所需的时间

max_request_interval:两次请求之间不会造成数据丢失的最长时间

processing_time:发送请求并完全接收包含所请求数据的相应响应所花费的时间

基本上,我需要确保在每台设备的数据准备就绪且尚未过期时向其请求数据,同时要牢记所有其他设备的截止日期。

是否存在某种针对此类问题的算法?我非常怀疑我是否是遇到这种情况的第一个人。在线搜索现有解决方案并没有产生很多有用的结果,主要是因为调度算法主要用于操作系统等,可以随意暂停和恢复调度的进程。但是,对于我来说,我无法做到这一点,因为请求和接收大块数据的过程是原子的,即只能完全执行或完全不能执行。

解决方法

我使用非抢占式截止日期单调调度解决了这个问题。

以下是一些有兴趣的人的python代码:

"""This module implements non-preemptive deadline monotonic scheduling (NPDMS) to compute a schedule of periodic,non-preemptable requests to slave devices connected to a shared data bus"""

from math import gcd
from functools import reduce
from typing import List


class Slave:

    def __init__(self,name: str,period: int,processing_time: int,offset=0,deadline=None):
        self.name = name
        self.period = int(period)
        self.processing_time = int(processing_time)
        self.offset = int(offset)
        if self.offset >= self.period:
            raise ValueError("Slave %s: offset must be < period" % name)
        self.deadline = int(deadline) if deadline else self.period
        if self.deadline > self.period:
            raise ValueError("Slave %s: deadline must be <= period" % name)


class Request:

    def __init__(self,slave: Slave,start_time: int):
        self.slave = slave
        self.start_time = start_time
        self.end_time = start_time + slave.processing_time
        self.duration = self.end_time - self.start_time

    def overlaps_with(self,other: 'Request'):
        min_duration = self.duration + other.duration
        start = min(other.start_time,self.start_time)
        end = max(other.end_time,self.end_time)
        effective_duration = end - start
        return effective_duration < min_duration


class Scenario:

    def __init__(self,*slaves: Slave):
        self.slaves = list(slaves)
        self.slaves.sort(key=lambda slave: slave.deadline)
        # LCM of all slave periods
        self.cycle_period = reduce(lambda a,b: a * b // gcd(a,b),[slave.period for slave in slaves])

    def compute_schedule(self,resolution=1) -> 'Schedule':
        request_pool = []
        for t in range(0,self.cycle_period,resolution):
            for slave in self.slaves:
                if (t - slave.offset) % slave.period == 0 and t >= slave.offset:
                    request_pool.append(Request(slave,t))
        request_pool.reverse()

        scheduled_requests = []
        current_request = request_pool.pop()
        t = current_request.start_time
        while t < self.cycle_period:
            ongoing_request = Request(current_request.slave,t)
            while ongoing_request.start_time <= t < ongoing_request.end_time:
                t += resolution
            scheduled_requests.append(ongoing_request)
            if len(request_pool):
                current_request = request_pool.pop()
                t = max(current_request.start_time,t)
            else:
                current_request = None
                break

        if current_request:
            request_pool.append(current_request)

        return Schedule(self,scheduled_requests,request_pool)


class Schedule:

    def __init__(self,scenario: Scenario,requests: List[Request],unscheduled: List[Request] = None):
        self.scenario = scenario
        self.requests = requests
        self.unscheduled_requests = unscheduled if unscheduled else []

        self._utilization = 0
        for slave in self.scenario.slaves:
            self._utilization += float(slave.processing_time) / float(slave.period)

        self._missed_deadlines_dict = {}
        for slave in self.scenario.slaves:
            periods = scenario.cycle_period // slave.period
            missed_deadlines = []
            for period in range(periods):
                start = period * slave.period
                end = start + slave.period
                request = self._find_request(slave,start,end)
                if request:
                    if request.start_time < (start + slave.offset) or request.end_time > start + slave.deadline:
                        missed_deadlines.append(request)
            if missed_deadlines:
                self._missed_deadlines_dict[slave] = missed_deadlines

        self._overlapping_requests = []
        for i in range(0,len(requests)):
            if i == 0:
                continue
            previous_request = requests[i - 1]
            current_request = requests[i]
            if current_request.overlaps_with(previous_request):
                self._overlapping_requests.append((current_request,previous_request))

        self._incomplete_requests = []
        for request in self.requests:
            if request.duration < request.slave.processing_time:
                self._incomplete_requests.append(request)

    @property
    def is_feasible(self) -> bool:
        return self.utilization <= 1 \
               and not self.has_missed_deadlines \
               and not self.has_overlapping_requests \
               and not self.has_unscheduled_requests \
               and not self.has_incomplete_requests

    @property
    def utilization(self) -> float:
        return self._utilization

    @property
    def has_missed_deadlines(self) -> bool:
        return len(self._missed_deadlines_dict) > 0

    @property
    def has_overlapping_requests(self) -> bool:
        return len(self._overlapping_requests) > 0

    @property
    def has_unscheduled_requests(self) -> bool:
        return len(self.unscheduled_requests) > 0

    @property
    def has_incomplete_requests(self) -> bool:
        return len(self._incomplete_requests) > 0

    def _find_request(self,slave,end) -> [Request,None]:
        for r in self.requests:
            if r.slave == slave and r.start_time >= start and r.end_time < end:
                return r
        return None


def read_scenario(file) -> Scenario:
    from csv import DictReader
    return Scenario(*[Slave(**row) for row in DictReader(file)])


def write_schedule(schedule: Schedule,file):
    from csv import DictWriter
    writer = DictWriter(file,fieldnames=["name","start","end"])
    writer.writeheader()
    for request in schedule.requests:
        writer.writerow({"name": request.slave.name,"start": request.start_time,"end": request.end_time})


if __name__ == '__main__':
    import argparse
    import sys

    parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter,description='Use non-preemptive deadline monotonic scheduling (NPDMS) to\n'
                                                 'compute a schedule of periodic,non-preemptable requests to\n'
                                                 'slave devices connected to a shared data bus.\n\n'
                                                 'Prints the computed schedule to stdout as CSV. Returns with\n'
                                                 'exit code 0 if the schedule is feasible,else 1.')
    parser.add_argument("csv_file",metavar="SCENARIO",type=str,help="A csv file describing the scenario,i.e. a list\n"
                             "of slave devices with the following properties:\n"
                             "* name:            name/id of the slave device\n\n"
                             "* period:          duration of the period of time during\n"
                             "                   which requests must be dispatched\n\n"
                             "* processing_time: amount of time it takes to\n"
                             "                   fully process a request (worst-case)\n\n"
                             "* offset:          offset for initial phase-shifting\n"
                             "                   (default: 0)\n\n"
                             "* deadline:        amount of time during which data is\n"
                             "                   available after the start of each period\n"
                             "                   (default: <period>)")

    parser.add_argument("-r","--resolution",type=int,default=1,help="The resolution used to simulate the passage of time (default: 1)")

    args = parser.parse_args()

    with open(args.csv_file,'r') as f:
        schedule = read_scenario(f).compute_schedule(args.resolution)
        write_schedule(schedule,sys.stdout)
        exit(0 if schedule.is_feasible else 1)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-