如何解决使用 hyper crate Body 作为 Future crate Stream 参数
使用 hyper crate,我向端点发出 HTTP 请求,然后尝试将响应正文传递给期望参数为 Futures crate Stream 的第三方库。
>这会导致类型错误。
Cargo.toml
[dependencies]
bytes = "1.0.1"
http = "0.2.3"
tokio = { version = "1.1.0",features = ["full"] }
hyper = { version = "0.14.2",features = ["full"] }
hyper-tls = "0.5.0"
futures = "0.3.12"
示例
use std::io;
use bytes::Bytes;
use hyper::{Client,Body};
use hyper_tls::HttpsConnector;
use http::Request;
use futures::stream::Stream;
// ---- begin third-party library
type ConsumableStream = dyn Stream<Item = Result<Bytes,io::Error>> + Send + Sync + 'static;
async fn stream_consumer(_: &mut ConsumableStream) {
// consume stream...
}
// ---- end third-party library
#[tokio::main]
async fn main() {
let uri = "https://jsonplaceholder.typicode.com/todos/1";
let https = HttpsConnector::new();
let client = Client::builder().build::<_,Body>(https);
let request = Request::get(uri).body(Body::empty()).unwrap();
let response = client.request(request).await.unwrap();
let mut body = Box::new(response.into_body());
stream_consumer(&mut body).await;
}
货物运行输出
error[E0271]: type mismatch resolving `<std::boxed::Box<hyper::body::body::Body> as futures_core::stream::Stream>::Item == std::result::Result<bytes::bytes::Bytes,std::io::Error>`
--> src/bin/future_streams.rs:24:21
|
24 | stream_consumer(&mut body).await;
| ^^^^^^^^^ expected struct `std::io::Error`,found struct `hyper::error::Error`
|
= note: expected enum `std::result::Result<_,std::io::Error>`
found enum `std::result::Result<_,hyper::error::Error>`
= note: required for the cast to the object type `(dyn futures_core::stream::Stream<Item = std::result::Result<bytes::bytes::Bytes,std::io::Error>> + std::marker::Send + std::marker::Sync + 'static)`
error: aborting due to previous error; 1 warning emitted
For more information about this error,try `rustc --explain E0271`.
error: could not compile `rustest`.
To learn more,run the command again with --verbose.
问题
将 hyper Body 用作预期 Future Stream 类型的函数参数的最有效方法是什么?
解决方法
ConsumableStream
期待一个 Result<Bytes,io::Error>
,但 client.request
返回一个 Result<Bytes,hyper::Error>
。如果 ConsumableStream
来自第三方库并且您无法更改类型定义,则可以映射结果流:
use futures::TryStreamExt;
use std::io;
#[tokio::main]
async fn main() {
// ...
let body = response
.into_body()
.map_err(|e| io::Error::new(io::ErrorKind::Other,e));
stream_consumer(&mut Box::new(body)).await;
}