如何解决如何在KSQLDB中的输出主题上构造嵌套的JSON消息
我从一个源系统收到了以下事件有效负载
为以下json有效负载创建了Stream1
事件JSON 1
{
"event": {
"header": {
"name":"abc","version":"1.0","producer":"123","channel":"lab","countryCode":"US"
},"body":{"customerIdentifiers":[
{"customerIdentifier":"1234","customerIdType":"cc"},{"customerIdentifier":"234","customerIdType":"id"}
],"accountIdentifiers":[
{"accountIdentifier":"123","accountIdType":"no"},{"accountIdentifier":"Primary","accountIdType":"da"}
],"eventDetails":{
"offeramount":"40000","apr":"2.6%","minpayment":"400","status":"Approved"
}
}
}
事件JSON 2
{
"event": {
"header": {
"name":"abc","eventDetails":{
"offeramount":"70000","apr":"3.6%","minpayment":"600","status":"Rejected"
}
}
}
我已经在上述stream1上创建了聚合表
CREATE TABLE EVENT_TABLE AS
SELECT
avg(minpayment) as Avg_MinPayment,avg(apr) AS Avg_APr,avg(offeramount) AS Avgofferamount,status
FROM STREAM1
GROUP BY status
EMIT CHANGES;
Status | Avg_MinPayment | Avg_APr | Avgofferamount
-----------------------------------------
Approved | 400 | 2.6% | 40000
Rejected | 600 | 3.6% | 70000
我从KTable和KTable Topic json中得到了以上结果
汇总JSON1
打印'EVENT_TABLE';
{
"Status" : "Approved","Avg_Minpayment" : "400","Avg_APr" : "2.6%","offeramount" : "40000"
}
汇总JSON2
{
"Status" : "Rejected","Avg_Minpayment" : "600","Avg_APr" : "3.6%","offeramount" : "70000"
}
但是我必须构造并发布关于输出主题的最终目标json,例如以下json格式。我必须将标头和正文添加到聚合json1和聚合json2。
{
"event":{
"header":{
"name":"abc","body":{
"Key":[
{"Status":"approved","Avg_Minpayment":"400","Avg_APr":"2.6%","offeramount":"40000"},{"Status":"rejected","Avg_Minpayment":"600","Avg_APr":"3.6%","offeramount":"70000"}
]
}
}
解决方法
鉴于示例SQL不会产生示例输出,而是给出示例输入,因此并不清楚要实现的目标。实际上,您的示例SQL会因未知的列错误而失败。
类似以下将生成您的示例输出:
CREATE TABLE EVENT_TABLE AS
SELECT
status,avg(eventDetails->minpayment) as Avg_MinPayment,avg(eventDetails->apr) AS Avg_APr,avg(eventDetails->offeramount) AS Avgofferamount
FROM STREAM1
GROUP BY status
EMIT CHANGES;
接下来,您的示例输出...
Status | Avg_MinPayment | Avg_APr | Avgofferamount
-----------------------------------------
Approved | 400 | 2.6% | 40000
Rejected | 600 | 3.6% | 70000
...每种状态输出一行。但是,您说要实现的输出...
{
"event":{
"header":{
"name":"abc","version":"1.0","producer":"123","channel":"lab","countryCode":"US"
},"body":{
"Key":[
{"Status":"approved","Avg_Minpayment":"400","Avg_APr":"2.6%","offeramount":"40000"},{"Status":"rejected","Avg_Minpayment":"600","Avg_APr":"3.6%","offeramount":"70000"}
]
}
}
...包含这两种状态,即将两种示例输入消息合并到一个输出中。
如果我对您的理解正确,并且您确实确实想输出上述JSON,那么:
您首先需要包含event
信息。但是,哪些事件信息?如果您知道它们总是一样的,那么您可以使用:
CREATE TABLE EVENT_TABLE AS
SELECT
status,latest_by_offset(event) as event,avg(eventDetails->offeramount) AS Avgofferamount
FROM STREAM1
GROUP BY status
EMIT CHANGES;
latest_by_offset
聚合函数将从其看到的最后一条消息中捕获event
信息。尽管我不确定这是您想要的。您是否会收到带有不同 rejected
信息的其他accepted
和event
消息?如果是event
信息标识应将哪些邮件分组在一起,则类似的内容可能会为您提供所需的内容:
CREATE TABLE EVENT_TABLE AS
SELECT
event,collect_list(eventDetails) as body
FROM STREAM1
GROUP BY event
EMIT CHANGES;
如果关闭,则您可能需要使用STRUCT
构造函数和AS_VALUE
函数来重构输出。例如:
CREATE TABLE EVENT_TABLE AS
SELECT
event as key,AS_VALUE(event) as event,STRUCT(
keys := collect_list(eventDetails)
) as body
FROM STREAM1
GROUP BY event
EMIT CHANGES;
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。