使用 hyper crate Body 作为 Future crate Stream 参数

如何解决使用 hyper crate Body 作为 Future crate Stream 参数

使用 hyper crate,我向端点发出 HTTP 请求,然后尝试将响应正文传递给期望参数为 Futures crate Stream 的第三方库。

>

这会导致类型错误。

Cargo.toml

[dependencies]
bytes = "1.0.1"
http = "0.2.3"
tokio = { version = "1.1.0",features = ["full"] }
hyper = { version = "0.14.2",features = ["full"] }
hyper-tls = "0.5.0"
futures = "0.3.12"

示例

use std::io;
use bytes::Bytes;
use hyper::{Client,Body};
use hyper_tls::HttpsConnector;
use http::Request;
use futures::stream::Stream;

// ---- begin third-party library
type ConsumableStream = dyn Stream<Item = Result<Bytes,io::Error>> + Send + Sync + 'static;
async fn stream_consumer(_: &mut ConsumableStream) {
    // consume stream...
}
// ---- end third-party library

#[tokio::main]
async fn main() {
    let uri = "https://jsonplaceholder.typicode.com/todos/1";
    let https = HttpsConnector::new();
    let client = Client::builder().build::<_,Body>(https);
    let request = Request::get(uri).body(Body::empty()).unwrap();
    let response = client.request(request).await.unwrap();
    let mut body = Box::new(response.into_body());
    stream_consumer(&mut body).await;
}

货物运行输出

error[E0271]: type mismatch resolving `<std::boxed::Box<hyper::body::body::Body> as futures_core::stream::Stream>::Item == std::result::Result<bytes::bytes::Bytes,std::io::Error>`
  --> src/bin/future_streams.rs:24:21
   |
24 |     stream_consumer(&mut body).await;
   |                     ^^^^^^^^^ expected struct `std::io::Error`,found struct `hyper::error::Error`
   |
   = note: expected enum `std::result::Result<_,std::io::Error>`
              found enum `std::result::Result<_,hyper::error::Error>`
   = note: required for the cast to the object type `(dyn futures_core::stream::Stream<Item = std::result::Result<bytes::bytes::Bytes,std::io::Error>> + std::marker::Send + std::marker::Sync + 'static)`

error: aborting due to previous error; 1 warning emitted

For more information about this error,try `rustc --explain E0271`.
error: could not compile `rustest`.

To learn more,run the command again with --verbose.

问题

将 hyper Body 用作预期 Future Stream 类型的函数参数的最有效方法是什么?

解决方法

ConsumableStream 期待一个 Result<Bytes,io::Error>,但 client.request 返回一个 Result<Bytes,hyper::Error>。如果 ConsumableStream 来自第三方库并且您无法更改类型定义,则可以映射结果流:

use futures::TryStreamExt;
use std::io;

#[tokio::main]
async fn main() {
    // ...
    let body = response
        .into_body()
        .map_err(|e| io::Error::new(io::ErrorKind::Other,e));
    stream_consumer(&mut Box::new(body)).await;
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?