如何解决RuntimeService和FunctionInitializationContext获得关键状态之间有什么区别
我有一个处理功能和CheckpointedFunction可以与操作员状态一起使用,还可以与key state
一起使用。我想知道在哪里获取关键状态的正确位置。以下是演示我的问题的代码:
第一种方法是使用getRuntimeService获取键状态。
class TestCheckpointFunctionProcessFunction extends ProcessFunction[(String,Long),String] with CheckpointedFunction {
var sumPerKeyValueState: ValueState[Long] = null
override def open(parameters: Configuration): Unit = {
val sumPerKeyStateDesc = new ValueStateDescriptor[Long]("sumPerKeyState",classOf[Long])
sumPerKeyValueState = getRuntimeContext.getState(sumPerKeyStateDesc)
}
override def processElement(value: (String,ctx: ProcessFunction[(String,String]#Context,out: Collector[String]): Unit = {
}
override def snapshotState(context: FunctionSnapshotContext): Unit = {
}
override def initializeState(context: FunctionInitializationContext): Unit = {
}
}
第二种方法是按如下方式使用FunctionInitializationContext:
class TestCheckpointFunctionProcessFunction extends ProcessFunction[(String,String] with CheckpointedFunction {
var sumPerKeyValueState: ValueState[Long] = null
override def open(parameters: Configuration): Unit = {
//
}
override def processElement(value: (String,out: Collector[String]): Unit = {
//
}
override def snapshotState(context: FunctionSnapshotContext): Unit = {
//
}
override def initializeState(context: FunctionInitializationContext): Unit = {
val sumPerKeyStateDesc = new ValueStateDescriptor[Long]("sumPerKeyState",classOf[Long])
sumPerKeyValueState = context.getKeyedStateStore.getState(sumPerKeyStateDesc)
}
}
谢谢,我想问哪种方法正确或首选。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。