如何解决某些 x 条消息中的 KSQL 平均值
我有一个 kafka-confluent 正在运行的实例和一个名为“mqtt-source-1”的主题,它读取这个结构化的 json 消息
{
"device1": {
"t": "timestamp","id": "deviceid","type": "presence","m": [
{
"t": "nowTimestamp()","tz": "now()","k": "device_temperature","v": "double(0,40)","u": "C"
},{
"t": "nowTimestamp()","k": "battery_level","v": "double(3.26,3.56)","u": "V"
}
]
},"device2": {
"t": "timestamp","u": "V"
}
]
}
}
是否可以得到由 m->k 分组的 v 的平均值和针对特定 x 条消息计算的 deviceid?
解决方法
如果还没有完成,您需要创建一个能够正确解析传入消息的主题。这篇博文描述了如何在 JSON 中处理 JSON 和数组:https://rmoff.net/2020/05/26/working-with-json-nested-arrays-in-ksqldb-example/
现在,如果我正确理解你的最后一句话,你想按 {m->k,deviceid} 聚合。您可以使用 CREATE TABLE <<name>> AS SELECT <<your desired columns>> FROM <<input stream that contains the parsed JSONs>>
语句执行此操作。使用普通 SQL 语法按 {m->k,deviceid} 分组并创建一个应用 AVG(m->v) 的列。应该这样做。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。