如何从异步多线程获取顺序结果

如何解决如何从异步多线程获取顺序结果

我正在尝试通过一些Web API通过多线程获取顺序列表。每页的顺序非常重要,我必须保留它们。

这是我的顺序代码:

def get_all_items(max_count):
    res = []
    curr_page = 1
    per_page = 100
    while True:
        try:
            pagination_list = get_pagination_list(page=curr_page,per_page=per_page)  # Assume this is a 3rd-party API,slow Network IO
        except Exception:
            break
        if not pagination_list:
            break
        if (curr_page - 1) * per_page > max_count:
            break
        # ... and other conditions to break,lets just make it simple

        res.extend(pagination_list)
        curr_page += 1

    return res

我目前的想法是使用字典{[curr_page]:[pagination_list]}使列表排序,但是我不知道该怎么做用break替换ThreadPoolExecutor逻辑或threading。另外,创建的线程需要完成。

有什么想法吗?预先感谢。

解决方法

import concurrent.futures
import time
import math

def get_pagination_list(page,per_page):
    time.sleep(0.5)
    if page >= 4: # Suppose the server only ever returns 4 pages of results
        return []
    return [f"Result {i} in page {page}." for i in range(per_page)]


def get_all_items(max_count=999):
    per_page = 6
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        futures = {}
        for i in range(math.ceil(max_count/per_page)):
            future = executor.submit(get_pagination_list,page=i,per_page=per_page)
            futures[future] = i
            
        results = {}
        count = 0
        for completed_future in concurrent.futures.as_completed(futures):
            try:
                result = completed_future.result()
            except Exception as exc:
                print(exc) # Any exceptions raised in get_pagination_list are raised again when completed_future.result() is called
            else:
                count += len(result)
                print("Count:",count)
                # If the page is empty or we have reached the max number of results we should cancel all the other futures
                if not result or count >= max_count:
                    for future in futures:
                        cancelled = future.cancel()
                        # "If the call is currently being executed or finished running and cannot be cancelled then the method will 
                        # return False,otherwise the call will be cancelled and the method will return True."
                        # These 3 lines below can be deleted.
                        if cancelled:
                            page_number = futures[future]
                            print(f"Successfully cancelled future for page {page_number}",future)
                page_number = futures[completed_future]
                results[page_number] = result
    
    res = []
    print("These are the results of the completed futures:")
    for key,value in sorted(results.items()):
        print(f"Page number: {key},result: {value}")
        res.extend(value)
    
    return res[:max_count] # Only return at most max_count items

max_items = 10
items = get_all_items(max_items)
print(f"get_all_items({max_items}) returned {len(items)} items:")
for item in items:
    print(item)

max_items = 100
items = get_all_items(max_items)
print(f"get_all_items({max_items}) returned {len(items)} items:")
for item in items:
    print(item)

输出:

Count: 6
Count: 12
These are the results of the completed futures:
Page number: 0,result: ['Result 0 in page 0.','Result 1 in page 0.','Result 2 in page 0.','Result 3 in page 0.','Result 4 in page 0.','Result 5 in page 0.']
Page number: 1,result: ['Result 0 in page 1.','Result 1 in page 1.','Result 2 in page 1.','Result 3 in page 1.','Result 4 in page 1.','Result 5 in page 1.']
get_all_items(10) returned 10 items:
Result 0 in page 0.
Result 1 in page 0.
Result 2 in page 0.
Result 3 in page 0.
Result 4 in page 0.
Result 5 in page 0.
Result 0 in page 1.
Result 1 in page 1.
Result 2 in page 1.
Result 3 in page 1.
Count: 6
Count: 12
Count: 18
Count: 24
Count: 24
Successfully cancelled future for page 7 <Future at 0x199ddb2ec50 state=cancelled>
Successfully cancelled future for page 8 <Future at 0x199ddb2e7f0 state=cancelled>
Successfully cancelled future for page 9 <Future at 0x199ddb2e860 state=cancelled>
Successfully cancelled future for page 11 <Future at 0x199ddb2e940 state=cancelled>
Successfully cancelled future for page 12 <Future at 0x199ddb2e198 state=cancelled>
Successfully cancelled future for page 13 <Future at 0x199ddb2e390 state=cancelled>
Successfully cancelled future for page 14 <Future at 0x199ddb2e9e8 state=cancelled>
Successfully cancelled future for page 15 <Future at 0x199ddb2e6d8 state=cancelled>
Successfully cancelled future for page 16 <Future at 0x199ddb2e630 state=cancelled>
Count: 24
Successfully cancelled future for page 7 <Future at 0x199ddb2ec50 state=cancelled>
Successfully cancelled future for page 8 <Future at 0x199ddb2e7f0 state=cancelled>
Successfully cancelled future for page 9 <Future at 0x199ddb2e860 state=cancelled>
Successfully cancelled future for page 11 <Future at 0x199ddb2e940 state=cancelled>
Successfully cancelled future for page 12 <Future at 0x199ddb2e198 state=cancelled>
Successfully cancelled future for page 13 <Future at 0x199ddb2e390 state=cancelled>
Successfully cancelled future for page 14 <Future at 0x199ddb2e9e8 state=cancelled>
Successfully cancelled future for page 15 <Future at 0x199ddb2e6d8 state=cancelled>
Successfully cancelled future for page 16 <Future at 0x199ddb2e630 state=cancelled>



Count: 24
Successfully cancelled future for page 7 <Future at 0x199ddb2ec50 state=cancelled>
Successfully cancelled future for page 8 <Future at 0x199ddb2e7f0 state=cancelled>
Successfully cancelled future for page 9 <Future at 0x199ddb2e860 state=cancelled>
Successfully cancelled future for page 11 <Future at 0x199ddb2e940 state=cancelled>
Successfully cancelled future for page 12 <Future at 0x199ddb2e198 state=cancelled>
Successfully cancelled future for page 13 <Future at 0x199ddb2e390 state=cancelled>
Successfully cancelled future for page 14 <Future at 0x199ddb2e9e8 state=cancelled>
Successfully cancelled future for page 15 <Future at 0x199ddb2e6d8 state=cancelled>
Successfully cancelled future for page 16 <Future at 0x199ddb2e630 state=cancelled>






Count: 24
Successfully cancelled future for page 7 <Future at 0x199ddb2ec50 state=cancelled>
Successfully cancelled future for page 8 <Future at 0x199ddb2e7f0 state=cancelled>
Successfully cancelled future for page 9 <Future at 0x199ddb2e860 state=cancelled>
Successfully cancelled future for page 11 <Future at 0x199ddb2e940 state=cancelled>
Successfully cancelled future for page 12 <Future at 0x199ddb2e198 state=cancelled>
Successfully cancelled future for page 13 <Future at 0x199ddb2e390 state=cancelled>
Successfully cancelled future for page 14 <Future at 0x199ddb2e9e8 state=cancelled>
Successfully cancelled future for page 15 <Future at 0x199ddb2e6d8 state=cancelled>
Successfully cancelled future for page 16 <Future at 0x199ddb2e630 state=cancelled>
These are the results of the completed futures:
Page number: 0,'Result 5 in page 1.']
Page number: 2,result: ['Result 0 in page 2.','Result 1 in page 2.','Result 2 in page 2.','Result 3 in page 2.','Result 4 in page 2.','Result 5 in page 2.']
Page number: 3,result: ['Result 0 in page 3.','Result 1 in page 3.','Result 2 in page 3.','Result 3 in page 3.','Result 4 in page 3.','Result 5 in page 3.']
Page number: 4,result: []
Page number: 5,result: []
Page number: 6,result: []
Page number: 10,result: []
get_all_items(100) returned 24 items:
Result 0 in page 0.
Result 1 in page 0.
Result 2 in page 0.
Result 3 in page 0.
Result 4 in page 0.
Result 5 in page 0.
Result 0 in page 1.
Result 1 in page 1.
Result 2 in page 1.
Result 3 in page 1.
Result 4 in page 1.
Result 5 in page 1.
Result 0 in page 2.
Result 1 in page 2.
Result 2 in page 2.
Result 3 in page 2.
Result 4 in page 2.
Result 5 in page 2.
Result 0 in page 3.
Result 1 in page 3.
Result 2 in page 3.
Result 3 in page 3.
Result 4 in page 3.
Result 5 in page 3.
,

您可以使用concurrent.futures.ThreadPoolExecutor

它的submit方法允许我们安排函数在线程池中执行并返回Future对象,我们可以将其存储在列表中,然后以有序的方式对其进行迭代。 另外,我们可以使用with上下文管理器,并且所有期货返回结果后,线程池都会自动关闭并清除。

一个示例来说明这种方法。

import concurrent.futures
import time
import random


def process_page(index):
    time.sleep(random.randint(0,5))
    if index % 5 == 0:
        raise ValueError()  # emulate some failed


def process_all(max_count):
    future_list = []
    result = []
    need_break = False
    with concurrent.futures.ThreadPoolExecutor() as executor:
        for index in range(max_count):
            future = executor.submit(process_page,index)
            future_list.append(future)

        for index,future in enumerate(future_list):
            try:
                if need_break:
                    future.cancel()
                else:
                    result.append(future.result())
                    print(f"Page {index} processed")

                    if index >= 22:
                        need_break = True  # emulate some end condition
            except Exception as e:
                print(f"Page {index} failed to process")


process_all(100)

输出:

Page 0 failed to process
Page 1 processed
Page 2 processed
Page 3 processed
Page 4 processed
Page 5 failed to process
Page 6 processed
Page 7 processed
...
,

我认为您可以根据页面数创建一个简单的数组。 这是伪代码,

pages[] = new Array[totalPages];
while(pageCounter < totalPages):
     // Initiate an async thread to get the page data by sharing the array and its index.
     // Add the created thread to the same threadgroup so that we can close the thread pool after all the threads got finished

// In the thread after getting the page content push the page into that specific index like below,// get the page data
       pageArray[pageIndex] = pageData;

所有这些线程完成后,您的页面将自动排序。 您不必为此担心。 希望这个想法对您有所帮助。 甚至可以用list以及在指定索引处插入,而不是array来代替array。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-