如何使用 FastAPI 作为 RabbitMQ (RPC) 的消费者

如何解决如何使用 FastAPI 作为 RabbitMQ (RPC) 的消费者

示例 here 展示了如何使用远程过程调用 (RPC) 在 python 中创建客户端和服务器。

但我无法想象 FastAPI 服务如何成为使用 pika for RabbitMQ 来处理来自 RCP 客户端的请求的服务器。

通过显式调用它们将请求任何 Web 服务,但是,我无法想象如何将 RabbitMQ 使用者集成到 Web 服务中。

另一方面,对于客户端来说,这样做很容易,通过显式调用 Web 服务,您可以发布对队列的请求,see this example

请问有什么帮助吗?还是一个好的开始?

解决方法

您可以将 aio_pikaRPC 模式一起使用并执行以下操作:

服务 1(消耗)

循环消费:

# app/__init__.py

from fastapi import FastAPI
from app.rpc import consume

app = FastAPI()

...

@app.on_event('startup')
def startup():
    loop = asyncio.get_event_loop()
    # use the same loop to consume
    asyncio.ensure_future(consume(loop))

...

创建连接、通道并注册要从另一个服务调用的远程方法:

# app/rpc.py

from aio_pika import connect_robust
from aio_pika.patterns import RPC

from app.config import config

__all__ = [
    'consume'
]


def remote_method():
    # DO SOMETHING
    # Move this method along with others to another place e.g. app/rpc_methods
    # I put it here for simplicity
    return 'It works!'

async def consume(loop):
    connection = await connect_robust(config.AMQP_URI,loop=loop)
    channel = await connection.channel()
    rpc = await RPC.create(channel)

    # Register your remote method
    await rpc.register('remote_method',remote_method,auto_delete=True)
    return connection

这就是您需要使用和响应的全部内容,现在让我们看看调用此远程方法的第二个服务。

服务 2(调用远程方法)

让我们先创建 RPC 中间件,以便轻松管理和访问 RPC 对象,以便从 API 函数调用我们的远程方法:

# app/utils/rpc_middleware.py

import asyncio

from fastapi import Request,Response

from aio_pika import connect_robust
from aio_pika.patterns import RPC

from app.config import config

__all__ = [
    'get_rpc','rpc_middleware'
]


async def rpc_middleware(request: Request,call_next):
    response = Response("Internal server error",status_code=500)
    try:
        # You can also pass a loop as an argument. Keep it here now for simplicity
        loop = asyncio.get_event_loop()
        connection = await connect_robust(config.AMQP_URI,loop=loop)
        channel = await connection.channel()
        request.state.rpc = await RPC.create(channel)
        response = await call_next(request)
    finally:

        # UPD: just thought that we probably want to keep queue and don't
        # recreate it for each request so we can remove this line and move
        # connection,channel and rpc initialisation out from middleware 
        # and do it once on app start

        # Also based of this: https://github.com/encode/starlette/issues/1029
        # it's better to create ASGI middleware instead of HTTP
        await request.state.rpc.close()
    return response


# Dependency to use rpc inside routes functions
def get_rpc(request: Request):
    rpc = request.state.rpc
    return rpc

应用 RPC 中间件:

# app/__init__.py

from app.utils import rpc_middleware

...

app.middleware('http')(rpc_middleware)

...

通过 API 函数中的依赖项使用 RPC 对象:

# app/api/whatever.py

from aio_pika.patterns import RPC

from app.utils import get_rpc

...

@router.get('/rpc')
async def rpc_test(rpc: RPC = Depends(get_rpc)):
    response = await rpc.proxy.remote_method()
    ...

添加一些日志记录以跟踪两个服务中发生的情况。此外,您还可以将两个服务的 RPC 逻辑合二为一,以便能够从同一服务中使用和调用远程方法。

希望它有助于获得基本的想法。

,

今天(4 月 28 日)由安德烈·巴拉诺夫斯基 (Andrej Baranovskij) 发布的 youtube 上有一个很棒的教程,讨论了它。

我将在下面提供链接。你也可以查看github源代码。

Video - Fastapi and RabbitMQ

Source code

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 <select id="xxx"> SELECT di.id, di.name, di.work_type, di.updated... <where> <if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 <property name="dynamic.classpath" value="tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -> systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping("/hires") public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate<String
使用vite构建项目报错 C:\Users\ychen\work>npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-