如何解决Quarkus BackPressure配置
我使用quarkus smallrye反应性消息和kafka获得了以下堆栈跟踪:
2020-07-24 01:38:31,662 ERROR [io.sma.rea.mes.kafka] (executor-thread-870) SRMSG18207: Unable to dispatch message to Kafka: io.smallrye.mutiny.subscription.BackPressureFailure: Could not emit tick 211 due to lack of requests
at io.smallrye.mutiny.operators.multi.builders.IntervalMulti$IntervalRunnable.run(IntervalMulti.java:83)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at org.jboss.threads.ContextClassLoaderSavingRunnable.run(ContextClassLoaderSavingRunnable.java:35)
at org.jboss.threads.EnhancedQueueExecutor.safeRun(EnhancedQueueExecutor.java:2046)
at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.doRunTask(EnhancedQueueExecutor.java:1578)
at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1452)
at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29)
at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29)
at java.lang.Thread.run(Thread.java:748)
at org.jboss.threads.JBossThread.run(JBossThread.java:479)
所以我读了https://smallrye.io/smallrye-mutiny/#_how_do_i_control_the_back_pressure
根据文档,我添加了BackPressure控件。
之前:
@Outgoing( "eqs-crossing-xxx" )
public Multi< EQSAlert > eqsCrossingXXX_XXX(){
final String series = CrossingEnum.XXX_XXX.getSeries();
final String equipment = CrossingEnum.XXX_XXX.getEquipment();
final String vehicleRegex = vehicleRegexService.getRegexBySeries( series );
log.info( "Incoming request for {} - {}",series,equipment);
log.info( "Vehicle regex : {}",vehicleRegex );
return Multi
.createFrom()
.ticks()
.every(
Duration.ofSeconds( poolingInterval )
)
.concatMap(i -> {
final Multi<CrossingState> crossingStateBySeriesAndEquipment = CrossingState.getCrossingStateBySeriesAndEquipment(client,equipment);
return crossingStateBySeriesAndEquipment.flatMap(crossingState ->
crossingState.isActive() ?
EQSAlert.getEQSAlertBySeriesAndEquipment(
client,vehicleRegex,equipment
)
:
Multi.createFrom().empty()
);
});
}
之后:
@Outgoing( "eqs-crossing-xxx" )
public Multi< EQSAlert > eqsCrossingXXX_XXX(){
final String series = CrossingEnum.XXX_XXX.getSeries();
final String equipment = CrossingEnum.XXX_XXX.getEquipment();
final String vehicleRegex = vehicleRegexService.getRegexBySeries( series );
log.info( "Incoming request for {} - {}",vehicleRegex );
return Multi
.createFrom()
.ticks()
.every(
Duration.ofSeconds( poolingInterval )
)
.onOverflow()
.buffer(10)
.concatMap(i -> {
final Multi<CrossingState> crossingStateBySeriesAndEquipment = CrossingState.getCrossingStateBySeriesAndEquipment(client,equipment
)
:
Multi.createFrom().empty()
);
});
}
现在一切都好了。
我的帖子的目的是了解我为什么需要这样做?
为什么缓冲区不能保留u?
如您所见,我每5秒钟执行一次简单的sql函数调用(poolingInterval)。该函数返回一些记录(通过池返回不超过10条记录)
所以流量很低
请让我说些什么来了解缓冲区管理。
谢谢
解决方法
您的下游单位时间内只能消耗一定数量的物品。这取决于您在做什么。默认情况下,Kafka将自身限制为5个并发写入(您可以配置它)。
因此,如果排放更多,则以无界的方式(不是背压感知的)方式,下游无法跟上。添加10个项目的缓冲区可能会解决一些小问题,但这可能还不够。
drop
更为激进;如果下游无法跟上,则仅丢弃这些物品。 dropPreviousItems
正在删除已收到的项目。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。