如何解决如何在链接的可观察对象之间传递结果 选项0结果选择器已弃用选项1.嵌套管道也称为“使用闭包”选项2自定义运算符此处也有说明
抽象问题:每次源Observable发出事件时,都需要触发一系列API调用和Angular服务。其中一些调用取决于先前的结果。
在我的示例中,源Observable startUpload$
触发了一系列依存调用。
使用解构可以这样写:
this.startUploadEvent$.pipe(
concatMap(event => this.getAuthenticationHeaders(event)),map(({ event,headers }) => this.generateUploadId(event,headers)),tap(({ event,headers,id }) => this.emitUploadStartEvent(id,event)),concatMap(({ event,id }) => this.createPdfDocument(event,id)),id,pdfId }) => this.uploadBilderForPdf(event,pdfId,mergeMap(({ event,cloudId }) => this.closePdf(cloudId,event,pdfId)),cloudId }) => this.emitUploadDoneEvent(id,cloudId)),).subscribe()
它几乎像是一种命令式方法。但这有一些问题:
- 解构链在代码上重复,并且变得越来越
{ event,cloudId }
- 需要方法(例如
generateUploadId(event,headers)
)来接收所有先前的值,以便即使方法本身不需要它也可以将它们传递给下一个管道 - 需要内部可观测对象(在方法内)以映射值,以便其他管道阶段可以破坏它们:
_
private closePdf(cloudId,pdfId) {
return this.httpClient.post(...,{ headers } )
.pipe(
//...,map(() => ({ event,cloudId }))
)
}
如果编译器能够处理样板(例如使用async await
)来编写如下所示的代码(没有上述任何问题),那就太好了:
private startUpload(event: StartUploadEvent) {
const headers = this.getAuthenticationHeaders(event)
const id = this.generateUploadId()
this.emitUploadStartEvent(id,event)
const pdfId = this.createPdfDocument(event,id)
this.uploadBilderForPdf(event,id)
const cloudId = this.closePdf(headers,pdfId)
this.emitUploadDoneEvent(id,cloudId)
return cloudId
}
如何在链接的可观察对象之间传递结果而又没有我提到的问题?我错过了一个rxjs概念吗?
解决方法
您的方法绝对不应该与上下文耦合,也不要考虑将结果映射到特定形状。
RxJS与功能编程有关。在函数式编程中,有一种类似于使参数适应参数 ref
它使我们能够将方法签名与上下文分离。
为了实现这一点,您可以编写上下文相关的map
,contentMap
,mergMap
运算符版本,以便最终的解决方案如下:
this.startUploadEvent$.pipe(
map(withKey('event')),concatMap_(({event}) => this.getAuthenticationHeaders(event),'headers'),map_(({ headers }) => this.generateUploadId(headers),'id'),tap(({ event,id }) => this.emitUploadStartEvent(id,event)),concatMap_(({ id }) => this.createPdfDocument(id),'pdfId'),concatMap_(({ pdfId }) => this.uploadBuilderForPdf(pdfId),'cloudId'),mergeMap_(({ cloudId }) => this.closePdf(cloudId)),tap(({id,event,cloudId}) => this.emitUploadDoneEvent(id,cloudId)),).subscribe(console.log);
在这些运算符后注意_
。
那些自定义运算符的目标是获取参数对象,然后通过投影函数并将投影结果添加到原始参数对象中。
function map_<K extends string,P,V>(project: (params: P) => V): OperatorFunction<P,P>;
function map_<K extends string,V>(project: (params: P) => V,key: K): OperatorFunction<P,P & Record<K,V>>;
function map_<K extends string,key?: K): OperatorFunction<P,P> {
return map(gatherParams(project,key));
}
function concatMap_<K extends string,V>(projection: (params: P) => Observable<V>): OperatorFunction<P,P>;
function concatMap_<K extends string,V>(projection: (params: P) => Observable<V>,V>>;
function concatMap_<K extends string,P> {
return concatMap(gatherParamsOperator(projection,key));
}
function mergeMap_<K extends string,P>;
function mergeMap_<K extends string,V>>;
function mergeMap_<K extends string,P> {
return mergeMap(gatherParamsOperator(projection,key));
}
// https://github.com/Microsoft/TypeScript/wiki/FAQ#why-am-i-getting-supplied-parameters-do-not-match-any-signature-error
function gatherParams<K extends string,V>(fn: (params: P) => V): (params: P) => P;
function gatherParams<K extends string,V>(fn: (params: P) => V,key: K): (params: P) => P & Record<K,V>;
function gatherParams<K extends string,key?: K): (params: P) => P {
return (params: P) => {
if (typeof key === 'string') {
return Object.assign({},params,{ [key]: fn(params) } as Record<K,V>);
}
return params;
};
}
function gatherParamsOperator<K extends string,V>(fn: (params: P) => Observable<V>): (params: P) => Observable<P>;
function gatherParamsOperator<K extends string,V>(fn: (params: P) => Observable<V>,key: K): (params: P) => Observable<P & Record<K,V>>;
function gatherParamsOperator<K extends string,key?: K): (params: P) => Observable<P> {
return (params: P) => {
return fn(params).pipe(map(value => gatherParams((_: P) => value,key)(params)));
};
}
function withKey<K extends string,V>(key: K): (value: V) => Record<K,V> {
return (value: V) => ({ [key]: value } as Record<K,V>);
}
我在这里使用了函数重载,因为某些主题我们不需要在参数中添加其他键。仅在使用this.closePdf(...)
方法的情况下,参数才应通过。
因此,您将获得以前具有安全类型的解耦版本:
看起来不是工程过度吗?
在大多数情况下,您应该遵循 YAGNI (不需要)的原则。而且最好不要给现有代码增加更多复杂性。对于这种情况,您应该遵循一些简单的实现,以便在操作员之间共享参数,如下所示:
ngOnInit() {
const params: Partial<Params> = {};
this.startUploadEvent$.pipe(
concatMap(event => (params.event = event) && this.getAuthenticationHeaders(event)),map(headers => (params.headers = headers) && this.generateUploadId(headers)),tap(id => (params.uploadId = id) && this.emitUploadStartEvent(id,concatMap(id => this.createPdfDocument(id)),concatMap(pdfId => (params.pdfId = pdfId) && this.uploadBuilderForPdf(pdfId)),mergeMap(cloudId => (params.cloudId = cloudId) && this.closePdf(cloudId)),tap(() => this.emitUploadDoneEvent(params.pdfId,params.cloudId,params.event)),).subscribe(() => {
console.log(params)
});
其中Params
类型为:
interface Params {
event: any;
headers: any;
uploadId: any;
pdfId: any;
cloudId: any;
}
请注意我在作业(params.cloudId = cloudId)
中使用的括号。
还有许多其他方法,但是它们需要更改使用rxjs运算符的流程:
您当然不应该让方法接受与它们无关的参数!
您的主要问题:
如何在链接的可观察对象之间传递结果而又没有我提到的问题?
使用单个作用域(嵌套管道)
下面的代码与您的示例代码等效,无需传递不必要的属性。先前返回的值可通过进一步调用该函数的链来访问:
1 startUploadEvent$.pipe(
2 concatMap(event => getAuthenticationHeaders(event).pipe(
3 map(headers => generateUploadId(event,headers).pipe(
4 tap(id => emitUploadStartEvent(id,5 concatMap(id => createPdfDocument(event,headers,id)),6 concatMap(pdfId => uploadBilderForPdf(event,pdfId)),7 tap(cloudId => closePdf(cloudId,event))
8 ))
9 ))
10 ).subscribe();
请注意event
和headers
的下游访问方式。不需要将它们传递到不需要它们的函数中。
我错过了一个rxjs概念吗?
也许。?真的不是...:-)
诀窍是使用.pipe
来有效地对运算符进行分组,以便他们都可以访问输入参数。
通常,我们尝试将代码保持在.pipe
内部:
1 const greeting$ = userId$.pipe(
2 switchMap(id => http.get(`/users/${id}`)),3 map(response => response.data.userName),4 map(name => `Hello ${name}!`),5 tap(greeting => console.log(greeting))
6 );
但是该代码实际上与以下代码没有什么不同:
1 const greeting$ = userId$.pipe(
2 switchMap(id => http.get(`/users/${id}`).pipe(
3 map(response => response.data.userName),4 map(name => `Hello ${name}! (aka User #${id})`)
5 )),6 tap(greeting => console.log(greeting))
7 );
但是,在第二种情况下,第4行可以访问name
和id
,而在第一种情况下,它只能访问name
。
请注意,第一个签名为userId$.pipe(switchMap(),map(),tap())
第二个是:userId$.pipe(switchMap(),tap())
。
您可以:
-
将每个操作的结果分配给可观察的
-
根据早期结果链接后续函数调用
-
这些结果可以通过
在以后的操作调用中重用withLatestFrom
-
shareReplay
用于防止以后的withLatestFrom
订阅导致较早的功能重新执行function startUpload(event$: Observable<string>) { const headers$ = event$.pipe( concatMap(event => getAuthenticationHeaders(event)),shareReplay() ); const id$ = headers$.pipe( map(() => generateUploadId()),shareReplay() ); const emitUploadEvent$ = id$.pipe( withLatestFrom(event$),// use earlier result map(([id,event]) => emitUploadStartEvent(id,shareReplay() ); // etc }
如上所述,这些函数仅采用其所需的参数,并且没有传递。
演示:https://stackblitz.com/edit/so-rxjs-chaining-1?file=index.ts
可以使用rxjs自定义运算符简化此模式(请注意,可以进一步完善,包括键入内容):
function call<T,R,TArgs extends any[],OArgs extends Observable<any>[]>(
operator: (func: ((a: TArgs) => R)) => OperatorFunction<TArgs,R>,action: (...args: any[]) => R,ignoreInput: boolean,...observableArgs: OArgs
): (args: Observable<T>) => Observable<R> {
return (input: Observable<T>) => input.pipe(
withLatestFrom(...observableArgs),operator((args: any[]) => action(...args.slice(ignoreInput ? 1: 0))),shareReplay(1)
);
}
可以这样使用:
function startUpload(event$: Observable<string>) {
const headers$ = event$.pipe(
call(concatMap,getAuthenticationHeaders,true)
);
const id$ = headers$.pipe(
call(map,generateUploadId,false)
);
const startEmitted$ = id$.pipe(
call(map,emitUploadStartEvent,true,event$)
);
const pdfId$ = startEmitted$.pipe(
call(map,createPdfDocument,false,event$,headers$,id$)
);
const uploaded$ = pdfId$.pipe(
call(map,uploadBuilderForPdf,pdfId$,id$)
);
const cloudId$ = uploaded$.pipe(
call(map,closePdf,pdfId$)
);
const uploadDone$ = cloudId$.pipe(
call(map,emitUploadDoneEvent,id$,event$)
);
// return cloudId$ instead of uploadDone$ but preserve observable chain
return uploadDone$.pipe(concatMap(() => cloudId$));
}
演示:https://stackblitz.com/edit/so-rxjs-chaining-4?file=index.ts
,您可以使用一个对象作为数据集吗?像这样:
接口:
export interface Packet {
event: string;
headers?: string;
id?: number;
pdfId?: number;
cloudId?: number;
}
然后在代码中,如下所示:
服务:
this.startUploadEvent$.pipe(
concatMap(packet => this.doThingOne(packet)),map(packet => this.doThingTwo(packet)),tap(packet => this.doThingThree(packet)),// ...
);
这样,每种方法都可以使用所需对象的位,并传递其余部分。尽管这确实需要更改每种方法才能采用并使用对象。
,据我了解,您担心可读性,而不必在方法之间携带有效载荷。
您是否曾经考虑过将Observable转换为Promise?这里重要的是,可观察对象必须完成,以便兑现承诺并得以解决(与完成相同,但仅用于承诺)。
根据您的建议,请参见上文(例如异步等待),我是这个建议。
private async startUpload(event: StartUploadEvent) {
const headers = await this.getAuthenticationHeaders(event).toPromise();
const id = await this.generateUploadId().toPromise();
this.emitUploadStartEvent(id,event);
const pdfId = await this.createPdfDocument(event,id).toPromise();
await this.uploadBilderForPdf(event,pdfId,id).toPromise();
const cloudId = await this.closePdf(headers,pdfId).toPromise();
this.emitUploadDoneEvent(id,cloudId)
return cloudId
}
信息:您可以在此处阅读如果将一个可观察对象转换为一个承诺但未完成可观察对象将发生的情况:Why converted promise from Subject (Observable) does not work as expected
注意:我正在满足您的期望
,也许还有其他方法可以解决问题,而不会违反常见的最佳做法
您对这类代码产生的问题是正确的,抽象的解决方案是将合并结果的责任移到每个调用,并将正确的参数从方法传递到管道。
可以很容易地进行一些改进。
tap
运算符不会修改该值,因此您可以从解构中删除不需要的属性。
map
只是转换结果,所以
map(({ event,headers }) => this.generateUploadId(event,headers)),
我们可以写
map(({ event,headers }) => ({
event,id: this.generateUploadId(event,headers)
}))
和this.generateUploadId
不再需要返回对象。
对于高阶映射运算符,我想到了一些选择。 首先,大多数“ xMap”运算符都将结果选择器作为最后一个参数,其目的正是我们所需要的-将源值与结果结合在一起。结果选择器是depricated,所以嵌套管道是当前的处理方式,但让我们看一下使用结果选择器的样子
选项0。结果选择器(已弃用)
this.startUploadEvent$
.pipe(
concatMap(
event => this.getAuthenticationHeaders(event),(event,headers) => ({ event,headers }) // <-- Result Selector
)
);
选项1.嵌套管道(也称为“使用闭包”)
它与选项0非常相似,但是event
保留在闭包中,而不是内部可见的
this.startUploadEvent$
.pipe(
concatMap(
event => this.getAuthenticationHeaders(event)
.pipe(map(headers => ({ event,headers })))
)
);
选项2。自定义运算符(此处也有说明)
可以创建自定义运算符并获得与结果选择器非常相似的语法
function withResultSelector(operator,transformer) {
let sourceValue;
return pipe(
tap(value => (sourceValue = value)),operator,map(value => transformer(sourceValue,value))
);
}
用法:
this.startUploadEvent$
.pipe(
withResultSelector(
concatMap(event => this.getAuthenticationHeaders(event)),headers })
)
);
更进一步,可以提取重复的内容并使所有功能更实用:
const mergeAs = propName => (a,b) => ({ ...a,[propName]: b });
const opAndMergeAs = (operator,propName) => withResultSelector(operator,mergeAs(propName));
this.startUploadEvent$
.pipe(
opAndMergeAs(concatMap(event => this.getAuthenticationHeaders(event)),"headers")
);
为此编写适当的类型可能有点麻烦,但这是一个不同的问题
Playground我曾经写过答案。
,您提到的这些担忧和问题是正确的,但是我在这里看到的问题正在将您的思维方式从命令式方法转变为反应式/功能性方法,但让我们首先回顾一下命令式代码
private startUpload(event: StartUploadEvent) {
const headers = this.getAuthenticationHeaders(event)
const id = this.generateUploadId()
this.emitUploadStartEvent(id,event)
const pdfId = this.createPdfDocument(event,id)
this.uploadBilderForPdf(event,id)
const cloudId = this.closePdf(headers,pdfId)
this.emitUploadDoneEvent(id,cloudId)
return cloudId
}
在这里,您可以看到event
可以传递的内容更加干净,可以只获得想要的内容,然后将其传递给下一个函数,我们希望将此代码移至“响应式/函数式”方法。
从我的角度来看,主要问题是您使函数失去了它们所具有的上下文,例如getAuthenticationHeaders
根本不应该返回event
,而应该只返回headers
和其他功能也一样。
在处理RxJS(又名反应式方法)时,您会很好地处理这些问题,这没关系,因为它保留了功能概念,并使代码更可预测,因为pure
运算符应仅处理数据相同的管道可以使所有内容保持纯净,并且不会导致副作用,从而导致代码无法预测。
我认为您要寻找的东西将通过nested pipes
解决(这是我认为最好的解决方案)
concatMap(event => this.getAuthenticationHeaders(event).pipe(
map(headers => this.generateUploadId(event,headers).pipe())
))
并且它在诸如Marble.js
的某些RxJS后端库中大量使用您可以使用类似于Result Selector
的方法:
concatMap(event => this.getAuthenticationHeaders(event).pipe(
map(headers => ({ headers,event }))
)),
或其他人建议的其他解决方案都可以使它起作用,但是您仍然会遇到与您提到的问题相同的问题,但是使用的代码更加清晰/可读。
您也可以将其转换为async/await
方法,但是您将失去RxJS提供给您的反应性。
我的建议是尝试阅读有关反应式编程的更多信息,以及如何将其转变为思维方式,我将在此处提供一些链接,我认为这些链接非常好,并尝试了一些基于此的库RxJS之类的CycleJS之类的书,我建议您阅读有关函数式编程的知识,这对本书Mostly adequate guide to FP (in javascript) &Composing Software。
我推荐这个很棒的演讲RxJS Recipes,它将改变您使用RxJS的方式。
有用的资源:
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。