如何解决如何异步转发锈流?
我正在尝试构建在简单的pub子接口上运行的消息路由器结构。我最终希望从套接字使用流,并根据流标签过滤/分配对象给订阅者。我创建了以下代码,但是我无法弄清楚如何从流中接收数据并进行异步处理,因为来自futures_util::stream::StreamExt
的fold和forward方法在连接流之前需要进行await
调用给订户。代码如下:
use futures_util::core_reexport::cell::RefCell;
use futures::channel::mpsc::{Sender,Receiver};
use std::collections::HashMap;
use std::ops::Deref;
use futures_util::core_reexport::ops::DerefMut;
use futures::{Sink,SinkExt};
use futures::task::{Context,Poll};
use futures_util::core_reexport::pin::Pin;
pub trait Addressable {
fn get_topic(&self) -> &String;
}
pub struct MessageRouter<T: Addressable + Clone + Send> {
subscriptions: RefCell<HashMap<String,Sender<T>>>
}
impl <T: Addressable + Clone + Send> MessageRouter<T> {
pub fn new() -> Self {
MessageRouter {
subscriptions: RefCell::new(HashMap::new())
}
}
}
pub struct Subscription<T>(Receiver<T>);
impl <T> Deref for Subscription<T> {
type Target = Receiver<T>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl <T> DerefMut for Subscription<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<T: Addressable + Clone + Send> MessageRouter<T> {
pub async fn publish(&self,item: T) {
let mut subscriptions = self.subscriptions.borrow_mut();
if let Some(subscriber) = subscriptions.get_mut(item.get_topic()) {
match subscriber.send(item.clone()).await {
Err(_) => subscriptions.remove(item.get_topic()),Ok(_) => None
};
}
}
pub fn subscribe(&self,topic: String) -> Subscription<T> {
let (sender,receiver) = futures::channel::mpsc::channel(128);
self.subscriptions.borrow_mut().insert(topic.clone(),sender);
return Subscription(receiver);
}
}
impl <T: Addressable + Clone + Send> Sink<T> for MessageRouter<T> {
type Error = futures::channel::mpsc::SendError;
fn poll_ready(self: Pin<&mut Self>,cx: &mut Context<'_>) -> Poll<Result<(),Self::Error>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>,item: T) -> Result<(),Self::Error> {
let mut subscriptions = self.subscriptions.borrow_mut();
if let Some(subscriber) = subscriptions.get_mut(item.get_topic()) {
return subscriber.start_send(item);
}
Ok(())
}
fn poll_flush(self: Pin<&mut Self>,Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>,Self::Error>> {
Poll::Ready(Ok(()))
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。