如何解决Kafka Streams 应用程序在启动后几秒钟内退出
我希望我的应用程序继续运行并且根本不停止,但我的应用程序在几秒钟后立即退出而不给出任何错误或警告。 我一直在试图找出原因,但到目前为止我还没有发现任何有用的东西。
我已经在构建和运行选项下提供了我的资源配置文件路径。
我正在使用配置工厂从资源 json 中读取文件。
我正在使用带有思想 intelliJ 的 Scala 编程语言。 本地服务器中的 Kafka,源为 sql server,目标为 mongodb。 kafka 工具来查看主题和数据。
解决方法
根据我的经验,在没有登录 Kafka Streams(以及一般的 JVM 应用程序)的情况下突然死亡是由本机 OOM 引起的。
Kafka Streams 默认使用 RocksDB 来存储流状态。使用默认设置有点占用内存。由于 RocksDB 是原生的,它不会受到你分配的堆的约束或使用,如果原生库分配内存失败,JVM 就会死掉,不会抛出异常。
您可以通过运行以下命令来验证此假设: 留言 |尾-n 10 或者一些这样的,在失败后立即。您应该会看到一个相当明确的报告,指出由于内存分配失败而牺牲了一个进程。
您可以通过提供 RocksDBConfigSetter 的实现来调整 RocksDB 配置设置来流式传输属性。减少测试环境中的主题分区数量会有很大帮助,因为 RocksDB 表的数量会随着分区数量的增加而增加。
,你能指定你使用的是哪个 kafka 客户端吗? https://github.com/zio/zio-kafka 开箱即用:
import zio._
import zio.console._
import zio.duration._
import zio.kafka.consumer._
import zio.kafka.serde._
object Main extends App {
def run(args: List[String]) = {
val settings: ConsumerSettings =
ConsumerSettings(List("localhost:9092"))
.withGroupId("group")
.withCloseTimeout(30.seconds)
val subscription = Subscription.topics("topic")
Consumer.consumeWith(settings,subscription,Serde.string,Serde.string) { case (key,value) =>
putStrLn(s"Received message ${key}: ${value}")
.ignore
// Perform an effect with the received message
}.exitCode
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。