如何在链接的可观察对象之间传递结果 选项0结果选择器已弃用选项1.嵌套管道也称为“使用闭包”选项2自定义运算符此处也有说明

如何解决如何在链接的可观察对象之间传递结果 选项0结果选择器已弃用选项1.嵌套管道也称为“使用闭包”选项2自定义运算符此处也有说明

抽象问题:每次源Observable发出事件时,都需要触发一系列API调用和Angular服务。其中一些调用取决于先前的结果。

在我的示例中,源Observable startUpload$触发了一系列依存调用。

使用解构可以这样写:

this.startUploadEvent$.pipe(
      concatMap(event => this.getAuthenticationHeaders(event)),map(({ event,headers }) => this.generateUploadId(event,headers)),tap(({ event,headers,id }) => this.emitUploadStartEvent(id,event)),concatMap(({ event,id }) => this.createPdfDocument(event,id)),id,pdfId }) => this.uploadBilderForPdf(event,pdfId,mergeMap(({ event,cloudId }) => this.closePdf(cloudId,event,pdfId)),cloudId }) => this.emitUploadDoneEvent(id,cloudId)),).subscribe()

它几乎像是一种命令式方法。但这有一些问题:

  • 解构链在代码上重复,并且变得越来越{ event,cloudId }
  • 需要方法(例如generateUploadId(event,headers))来接收所有先前的值,以便即使方法本身不需要它也可以将它们传递给下一个管道
  • 需要内部可观测对象(在方法内)以映射值,以便其他管道阶段可以破坏它们:

_

private closePdf(cloudId,pdfId) {
    return this.httpClient.post(...,{ headers } )
        .pipe(
             //...,map(() => ({ event,cloudId }))
        )
}

如果编译器能够处理样板(例如使用async await)来编写如下所示的代码(没有上述任何问题),那就太好了:

private startUpload(event: StartUploadEvent) {
    const headers = this.getAuthenticationHeaders(event)
    const id = this.generateUploadId()

    this.emitUploadStartEvent(id,event)

    const pdfId = this.createPdfDocument(event,id)
    this.uploadBilderForPdf(event,id)

    const cloudId = this.closePdf(headers,pdfId)
    this.emitUploadDoneEvent(id,cloudId)

    return cloudId
  }

如何在链接的可观察对象之间传递结果而又没有我提到的问题?我错过了一个rxjs概念吗?

解决方法

您的方法绝对不应该与上下文耦合,也不要考虑将结果映射到特定形状。

RxJS与功能编程有关。在函数式编程中,有一种类似于使参数适应参数 ref

它使我们能够将方法签名与上下文分离。

为了实现这一点,您可以编写上下文相关的mapcontentMapmergMap运算符版本,以便最终的解决方案如下:

this.startUploadEvent$.pipe(
      map(withKey('event')),concatMap_(({event}) => this.getAuthenticationHeaders(event),'headers'),map_(({ headers }) => this.generateUploadId(headers),'id'),tap(({ event,id }) => this.emitUploadStartEvent(id,event)),concatMap_(({ id }) => this.createPdfDocument(id),'pdfId'),concatMap_(({ pdfId }) => this.uploadBuilderForPdf(pdfId),'cloudId'),mergeMap_(({ cloudId }) => this.closePdf(cloudId)),tap(({id,event,cloudId}) => this.emitUploadDoneEvent(id,cloudId)),).subscribe(console.log);

在这些运算符后注意_

Stackblitz Example

那些自定义运算符的目标是获取参数对象,然后通过投影函数并将投影结果添加到原始参数对象中。

function map_<K extends string,P,V>(project: (params: P) => V): OperatorFunction<P,P>;
function map_<K extends string,V>(project: (params: P) => V,key: K): OperatorFunction<P,P & Record<K,V>>;
function map_<K extends string,key?: K): OperatorFunction<P,P> {
  return map(gatherParams(project,key));
}

function concatMap_<K extends string,V>(projection: (params: P) => Observable<V>): OperatorFunction<P,P>;
function concatMap_<K extends string,V>(projection: (params: P) => Observable<V>,V>>;
function concatMap_<K extends string,P> {
  return concatMap(gatherParamsOperator(projection,key));
}

function mergeMap_<K extends string,P>;
function mergeMap_<K extends string,V>>;
function mergeMap_<K extends string,P> {
  return mergeMap(gatherParamsOperator(projection,key));
}

// https://github.com/Microsoft/TypeScript/wiki/FAQ#why-am-i-getting-supplied-parameters-do-not-match-any-signature-error
function gatherParams<K extends string,V>(fn: (params: P) => V): (params: P) => P;
function gatherParams<K extends string,V>(fn: (params: P) => V,key: K): (params: P) => P & Record<K,V>;
function gatherParams<K extends string,key?: K): (params: P) => P {
  return (params: P) => {
    if (typeof key === 'string') {
      return Object.assign({},params,{ [key]: fn(params) } as Record<K,V>);
    }

    return params;
  };
}

function gatherParamsOperator<K extends string,V>(fn: (params: P) => Observable<V>): (params: P) => Observable<P>;
function gatherParamsOperator<K extends string,V>(fn: (params: P) => Observable<V>,key: K): (params: P) => Observable<P & Record<K,V>>;
function gatherParamsOperator<K extends string,key?: K): (params: P) => Observable<P> {
  return (params: P) => {
    return fn(params).pipe(map(value => gatherParams((_: P) => value,key)(params)));
  };
}

function withKey<K extends string,V>(key: K): (value: V) => Record<K,V> {
  return (value: V) => ({ [key]: value } as Record<K,V>);
}

我在这里使用了函数重载,因为某些主题我们不需要在参数中添加其他键。仅在使用this.closePdf(...)方法的情况下,参数才应通过。

因此,您将获得以前具有安全类型的解耦版本:

enter image description here

看起来不是工程过度吗?

在大多数情况下,您应该遵循 YAGNI (不需要)的原则。而且最好不要给现有代码增加更多复杂性。对于这种情况,您应该遵循一些简单的实现,以便在操作员之间共享参数,如下所示:

ngOnInit() {
  const params: Partial<Params> = {};
  this.startUploadEvent$.pipe(
    concatMap(event => (params.event = event) && this.getAuthenticationHeaders(event)),map(headers => (params.headers = headers) && this.generateUploadId(headers)),tap(id => (params.uploadId = id) && this.emitUploadStartEvent(id,concatMap(id => this.createPdfDocument(id)),concatMap(pdfId => (params.pdfId = pdfId) && this.uploadBuilderForPdf(pdfId)),mergeMap(cloudId => (params.cloudId = cloudId) && this.closePdf(cloudId)),tap(() => this.emitUploadDoneEvent(params.pdfId,params.cloudId,params.event)),).subscribe(() => {
    console.log(params)
  });

其中Params类型为:

interface Params {
  event: any;
  headers: any;
  uploadId: any;
  pdfId: any;
  cloudId: any;
}

请注意我在作业(params.cloudId = cloudId)中使用的括号。

Stackblitz Example


还有许多其他方法,但是它们需要更改使用rxjs运算符的流程:

,

您当然不应该让方法接受与它们无关的参数!

您的主要问题:

如何在链接的可观察对象之间传递结果而又没有我提到的问题?

使用单个作用域(嵌套管道)

下面的代码与您的示例代码等效,无需传递不必要的属性。先前返回的值可通过进一步调用该函数的链来访问:

1   startUploadEvent$.pipe(
2     concatMap(event => getAuthenticationHeaders(event).pipe(
3       map(headers => generateUploadId(event,headers).pipe(
4         tap(id => emitUploadStartEvent(id,5         concatMap(id => createPdfDocument(event,headers,id)),6         concatMap(pdfId => uploadBilderForPdf(event,pdfId)),7         tap(cloudId => closePdf(cloudId,event))
8       ))
9     ))
10  ).subscribe();

请注意eventheaders的下游访问方式。不需要将它们传递到不需要它们的函数中。

我错过了一个rxjs概念吗?

也许。?真的不是...:-)

诀窍是使用.pipe来有效地对运算符进行分组,以便他们都可以访问输入参数。

通常,我们尝试将代码保持在.pipe内部:

1   const greeting$ = userId$.pipe(
2     switchMap(id => http.get(`/users/${id}`)),3     map(response => response.data.userName),4     map(name => `Hello ${name}!`),5     tap(greeting => console.log(greeting))
6   );

但是该代码实际上与以下代码没有什么不同:

1   const greeting$ = userId$.pipe(
2     switchMap(id => http.get(`/users/${id}`).pipe(
3       map(response => response.data.userName),4       map(name => `Hello ${name}! (aka User #${id})`)
5     )),6     tap(greeting => console.log(greeting))
7   );

但是,在第二种情况下,第4行可以访问nameid,而在第一种情况下,它只能访问name

请注意,第一个签名为userId$.pipe(switchMap(),map(),tap())

第二个是:userId$.pipe(switchMap(),tap())

,

您可以:

  • 将每个操作的结果分配给可观察的

  • 根据早期结果链接后续函数调用

  • 这些结果可以通过withLatestFrom

    在以后的操作调用中重用
  • shareReplay用于防止以后的withLatestFrom订阅导致较早的功能重新执行

    function startUpload(event$: Observable<string>) {
      const headers$ = event$.pipe(
        concatMap(event => getAuthenticationHeaders(event)),shareReplay()
        );
    
      const id$ = headers$.pipe(
        map(() => generateUploadId()),shareReplay()
        );
    
      const emitUploadEvent$ = id$.pipe(
        withLatestFrom(event$),// use earlier result
        map(([id,event]) => emitUploadStartEvent(id,shareReplay()
        );
    
       // etc
    }
    

如上所述,这些函数仅采用其所需的参数,并且没有传递。

演示:https://stackblitz.com/edit/so-rxjs-chaining-1?file=index.ts

可以使用rxjs自定义运算符简化此模式(请注意,可以进一步完善,包括键入内容):

function call<T,R,TArgs extends any[],OArgs extends Observable<any>[]>(
  operator: (func: ((a: TArgs) => R)) => OperatorFunction<TArgs,R>,action: (...args: any[]) => R,ignoreInput: boolean,...observableArgs: OArgs
): (args: Observable<T>) => Observable<R> {
  return (input: Observable<T>) => input.pipe(
    withLatestFrom(...observableArgs),operator((args: any[]) => action(...args.slice(ignoreInput ? 1: 0))),shareReplay(1)
  );
}

可以这样使用:

function startUpload(event$: Observable<string>) {
  const headers$ = event$.pipe(
    call(concatMap,getAuthenticationHeaders,true)
  );

  const id$ = headers$.pipe(
    call(map,generateUploadId,false)
  );

  const startEmitted$ = id$.pipe(
    call(map,emitUploadStartEvent,true,event$)
  );

  const pdfId$ = startEmitted$.pipe(
    call(map,createPdfDocument,false,event$,headers$,id$)
  );

  const uploaded$ = pdfId$.pipe(
    call(map,uploadBuilderForPdf,pdfId$,id$)
  );

  const cloudId$ = uploaded$.pipe(
    call(map,closePdf,pdfId$)
  );

  const uploadDone$ = cloudId$.pipe(
    call(map,emitUploadDoneEvent,id$,event$)
  );

  // return cloudId$ instead of uploadDone$ but preserve observable chain
  return uploadDone$.pipe(concatMap(() => cloudId$));    
}

演示:https://stackblitz.com/edit/so-rxjs-chaining-4?file=index.ts

,

您可以使用一个对象作为数据集吗?像这样:

接口:

export interface Packet {
  event: string;
  headers?: string;
  id?: number;
  pdfId?: number;
  cloudId?: number;
}

然后在代码中,如下所示:

服务:

  this.startUploadEvent$.pipe(
    concatMap(packet => this.doThingOne(packet)),map(packet => this.doThingTwo(packet)),tap(packet => this.doThingThree(packet)),// ...
  );

这样,每种方法都可以使用所需对象的位,并传递其余部分。尽管这确实需要更改每种方法才能采用并使用对象。

,

据我了解,您担心可读性,而不必在方法之间携带有效载荷。

您是否曾经考虑过将Observable转换为Promise?这里重要的是,可观察对象必须完成,以便兑现承诺并得以解决(与完成相同,但仅用于承诺)。

根据您的建议,请参见上文(例如异步等待),我是这个建议。

private async startUpload(event: StartUploadEvent) {
    const headers = await this.getAuthenticationHeaders(event).toPromise();
    const id = await this.generateUploadId().toPromise();
    
    this.emitUploadStartEvent(id,event);
    
    const pdfId = await this.createPdfDocument(event,id).toPromise();
    await this.uploadBilderForPdf(event,pdfId,id).toPromise();
    
    const cloudId = await this.closePdf(headers,pdfId).toPromise();
    this.emitUploadDoneEvent(id,cloudId)
    
    return cloudId
}

信息:您可以在此处阅读如果将一个可观察对象转换为一个承诺但未完成可观察对象将发生的情况:Why converted promise from Subject (Observable) does not work as expected

注意:我正在满足您的期望

也许还有其他方法可以解决问题,而不会违反常见的最佳做法

,

您对这类代码产生的问题是正确的,抽象的解决方案是将合并结果的责任移到每个调用,并将正确的参数从方法传递到管道。

可以很容易地进行一些改进。 tap运算符不会修改该值,因此您可以从解构中删除不需要的属性。 map只是转换结果,所以

map(({ event,headers }) => this.generateUploadId(event,headers)),

我们可以写

map(({ event,headers }) => ({
  event,id: this.generateUploadId(event,headers)
}))

this.generateUploadId不再需要返回对象。

对于高阶映射运算符,我想到了一些选择。 首先,大多数“ xMap”运算符都将结果选择器作为最后一个参数,其目的正是我们所需要的-将源值与结果结合在一起。结果选择器是depricated,所以嵌套管道是当前的处理方式,但让我们看一下使用结果选择器的样子

选项0。结果选择器(已弃用)

this.startUploadEvent$
  .pipe(
    concatMap(
      event => this.getAuthenticationHeaders(event),(event,headers) => ({ event,headers }) // <-- Result Selector
    )
  );

选项1.嵌套管道(也称为“使用闭包”)

它与选项0非常相似,但是event保留在闭包中,而不是内部可见的

this.startUploadEvent$
  .pipe(
    concatMap(
      event => this.getAuthenticationHeaders(event)
        .pipe(map(headers => ({ event,headers })))
    )
  );

选项2。自定义运算符(此处也有说明)

可以创建自定义运算符并获得与结果选择器非常相似的语法

function withResultSelector(operator,transformer) {
  let sourceValue;
  return pipe(
    tap(value => (sourceValue = value)),operator,map(value => transformer(sourceValue,value))
  );
}

用法:

this.startUploadEvent$
  .pipe(
    withResultSelector(
      concatMap(event => this.getAuthenticationHeaders(event)),headers })
    )
  );

更进一步,可以提取重复的内容并使所有功能更实用:

const mergeAs = propName => (a,b) => ({ ...a,[propName]: b });
const opAndMergeAs = (operator,propName) => withResultSelector(operator,mergeAs(propName));

this.startUploadEvent$
  .pipe(
    opAndMergeAs(concatMap(event => this.getAuthenticationHeaders(event)),"headers")
  );

为此编写适当的类型可能有点麻烦,但这是一个不同的问题

Playground我曾经写过答案。

,

您提到的这些担忧和问题是正确的,但是我在这里看到的问题正在将您的思维方式从命令式方法转变为反应式/功能性方法,但让我们首先回顾一下命令式代码

private startUpload(event: StartUploadEvent) {
    const headers = this.getAuthenticationHeaders(event)
    const id = this.generateUploadId()

    this.emitUploadStartEvent(id,event)

    const pdfId = this.createPdfDocument(event,id)
    this.uploadBilderForPdf(event,id)

    const cloudId = this.closePdf(headers,pdfId)
    this.emitUploadDoneEvent(id,cloudId)

    return cloudId
}

在这里,您可以看到event可以传递的内容更加干净,可以只获得想要的内容,然后将其传递给下一个函数,我们希望将此代码移至“响应式/函数式”方法。

从我的角度来看,主要问题是您使函数失去了它们所具有的上下文,例如getAuthenticationHeaders根本不应该返回event,而应该只返回headers和其他功能也一样。

在处理RxJS(又名反应式方法)时,您会很好地处理这些问题,这没关系,因为它保留了功能概念,并使代码更可预测,因为pure运算符应仅处理数据相同的管道可以使所有内容保持纯净,并且不会导致副作用,从而导致代码无法预测。

我认为您要寻找的东西将通过nested pipes解决(这是我认为最好的解决方案)

concatMap(event => this.getAuthenticationHeaders(event).pipe(
    map(headers => this.generateUploadId(event,headers).pipe())
))

并且它在诸如Marble.js

的某些RxJS后端库中大量使用

您可以使用类似于Result Selector的方法:

concatMap(event => this.getAuthenticationHeaders(event).pipe(
  map(headers => ({ headers,event }))
)),

或其他人建议的其他解决方案都可以使它起作用,但是您仍然会遇到与您提到的问题相同的问题,但是使用的代码更加清晰/可读。

您也可以将其转换为async/await方法,但是您将失去RxJS提供给您的反应性。

我的建议是尝试阅读有关反应式编程的更多信息,以及如何将其转变为思维方式,我将在此处提供一些链接,我认为这些链接非常好,并尝试了一些基于此的库RxJS之类的CycleJS之类的书,我建议您阅读有关函数式编程的知识,这对本书Mostly adequate guide to FP (in javascript)Composing Software

我推荐这个很棒的演讲RxJS Recipes,它将改变您使用RxJS的方式。

有用的资源:

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-