如何解决由于$ foo发出,WithLatestFrom$ foo不发出,这可能是由于使用merge
我正在实现一个可观察对象,可以在“分配”之前将其订阅。将其视为提升一个可观察对象的定义,这样我就不必担心我创建其他可观察对象派生的可观察对象的顺序,我称之为ColdSubject。
ColdSubject可以正常工作(我可以在其中添加可观察对象,只有当有人订阅ColdObservable时,它的运算符才会得到评估)。
尽管withLatestFrom
在等待obs$
的过程中永远不会发射,尽管可以观察到它多次“等待”发射给订户!
export class ColdSubject<T> {
// If you subscribe to this before an observable has been added to $acc,you will be notified as soon as one is added,and if you subscribe to this after an observable is added to acc$ you will also be notified
public obs$: Observable<T>;
public acc$ = new BehaviorSubject<Observable<T>>(merge());
constructor() {
this.obs$ = this.acc$.pipe(switchMap(v => v));
}
addObservable(newObservable: Observable<T>) {
this.acc$.next(merge(this.acc$.getValue(),newObservable))
}
}
const foo = new ColdSubject<number>();
# I know this observable is waiting for withLatestFrom because "Tap yeet" is logged
of('yeet').pipe(
tap(v => console.log(`tap ${v}`)),withLatestFrom(foo.obs$)
).subscribe(v => {
console.log(`WithLatestFrom ${v}`);
});
# This observable will begin emitting 5 seconds into the script,because I wait 5 seconds to subscribe to it
foo.addObservable(
interval(1000).pipe(
take(5),tap(v => console.log(`Interval ${v}`))
)
);
# Subscribe 5 seconds into script start,so I know that my ColdSubject only evaluates its observables once they're subscribed to
setTimeout(
() => foo.obs$.subscribe(v => console.log(`Subscribe ${v}`)),5000
);
为什么foo.obs$
发出几次,而等待其最新值的操作却不发出?
解决方法
查看source code可以看到withLatestFrom
由_next
触发,而next
由源Observable调用protected _next(value: T) {
if (this.toRespond.length === 0) {
/**
* value - emitted by the source Observable
* ...this.values - emitted by the Observables passed to `withLatestFrom`
*/
const args = [value,...this.values];
if (this.project) {
this._tryProject(args);
} else {
this.destination.next(args);
}
}
}
触发:
withLatestFrom
您的问题是您的源代码立即完成,而传递给foo.obs
的Observable尚未发出。到combineLatest
发出时,您的Observable源早已完成。
在您的情况下,我建议使用combineLatest(of("yeet"),foo.obs$)
.pipe(
tap(v => console.log(`tap ${v}`)),)
.subscribe(v => {});
,如下所示:
declare
v_txt varchar2(4000) := '["Project title afor BYU heads","The values are,\n exactly up to the requirement and analysis done by the team.
Also it is difficult to,\n prepare a scenario notwithstanding the fact it is difficult. This user story is going to be slightly complex however it is up to the team","Active","Disabled","25 tonnes of fuel","www.examplesites.com/html.asp&net;","Apprehension","","25","Stable"]';
begin
push_data(100,substr(v_txt,2,length(v_txt) - 1));
end;
/
,
of('yeet')发出并完成,因此withLatestFrom将在源完成时完成。
将您的订阅更改为
of('yeet').pipe(
tap(v => console.log(`tap ${v}`)),withLatestFrom(foo.obs$)
).subscribe({ complete: () => console.log('yeet complete') });
您将看到它实际上是完整的。
https://stackblitz.com/edit/rxjs-6-opeartors-2nuam1?file=index.ts
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。