如何解决Angular 10,rxjs-订阅循环吗? 顺序请求完成并行请求缓冲的并行请求
我正在向一个API发出多个并发DELETE
请求,导致服务器端出现了死锁。
如果删除的数目固定,则可以执行以下操作:
myAPI.delete(id).subscribe(res => {
myAPI.delete(id2).subscribe(res => {
myAPI.delete(id3).subscribe(res => {
})
})
})
等待上一次删除成功完成,然后再进行新的删除(保密)。
问题是我可以收到1
至N
个请求,因此我正在寻找类似于for
订阅说明的东西。
有解决这个问题的优雅方法吗?
解决方法
如果需要删除一组ID,则可以将from
运算符与concatMap
和toArray
结合使用。最后一个是仅在完成所有操作后才调用订阅侦听器:
const ids = [ 1,2,3 ];
from(ids).pipe(
concatMap((id) => deleteThis(id)),toArray()
).subscribe(() => {
console.log('all deleted');
})
working stack具有可观察的模拟请求
,有多种方法。
顺序请求
您可以使用RxJS range
运算符和映射运算符concatMap
来依次调用每个请求。
import { range } from 'rxjs';
import { concatMap } from 'rxjs';
range(1,N).pipe( // <-- will emit 1-N numbers
concatMap(id => myAPI.delete(id)) // <-- set the correct `id` for each request here
).subscribe(
res => console.log(res),err => console.log(err),);
由于我们使用的是concatMap
运算符,因此每个id
的请求都将在上一个请求完成之前等待。
您还可以将switchMap
,flatMap
和exhaustMap
之类的其他映射运算符用于不同的行为。区别b / n他们here。
其他方法-并行请求
完成并行请求
您可以使用RxJS toArray
和forkJoin()
运算符进行多个并发调用。它同时订阅
import { range,forkJoin } from 'rxjs';
import { concatMap,toArray } from 'rxjs/operators';
range(1,N).pipe( // <-- will emit 1-N numbers
concatMap(id => myAPI.delete(id)),// <-- set the correct `id` for each request here
toArray(),// <-- buffer all notifications and emit once as an array
concatMap(reqs => forkJoin(reqs))
).subscribe(
res => console.log(res),);
缓冲的并行请求
大多数浏览器对单个域(例如Chrome-6)的最大并行请求数有硬性限制。如果遇到此问题,可以使用RxJS bufferCount
运算符而不是toArray
运算符来控制并行请求的最大数量。
import { from,bufferCount } from 'rxjs';
range(1,// <-- set the correct `id` for each request here
bufferCount(6),// <-- adjust number of parallel requests here
concatMap(reqs => forkJoin(reqs))
).subscribe(
res => console.log(res),);
,
另一种解决方案是使用concat运算符:
import { concat } from 'rxjs';
import { toArray } from 'rxjs/operators';
concat(
...[1,3].map(id => this.delete(id)),)
.pipe(toArray())
.subscribe(() => console.log('all deleted'))
或没有toArray()
运算符:
import { concat } from 'rxjs';
concat(...[1,3].map(id => this.delete(id)))
.subscribe(
res => {
console.log('one deleted')
},err => {},() => {
console.log('all deleted')
})
,
const N = 10;
range(1,N)
.pipe(concatMap(deleteById),ignoreElements())
.subscribe({
complete: () => console.log('Complete')
});
function deleteById(id: number): Observable<String> {
return of(`deleted-${id}`).pipe(delay(1000));
}
如果您想同时发出多个请求:
const N = 10;
const CHUNKED = 3; // (3 requests same time) concat (3 requests same time) ...
range(1,N)
.pipe(
bufferCount(CHUNKED),concatMap((ids) => forkJoin(ids.map(deleteById))),ignoreElements()
)
.subscribe({
complete: () => console.log(`Complete`)
});
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。