如何解决当物化视图中出现新的 MAX 值时,具有唯一的 Kafka ksqlDB 事件
对 ksqlDB 事件和物化视图感到困惑。当表聚合/物化视图中确定的新高股价出现时,我想在专用主题/流中收到通知,但我正在为每个事件报告高价,而不仅仅是在出现新高价事件时.
这是我的工作示例/设置。
为股票创建基础流和主题。
ksql> create stream stocks (symbol VARCHAR KEY,company VARCHAR,price DECIMAL(9,2))
> WITH (KAFKA_TOPIC='stocks',PARTITIONS=1,VALUE_FORMAT='json');
Message
----------------
Stream created
----------------
添加一些 Acme Corp 股票价格的初始数据
ksql> insert into stocks (symbol,company,price) values ('ACME','Acme Corp',111.11);
ksql> insert into stocks (symbol,111.12);
ksql> insert into stocks (symbol,111.13);
打印基础主题以证明数据存在。
ksql> print 'stocks' from beginning limit 3;
Key format: KAFKA_INT or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/03/21 14:26:57.169 Z,key: 1094929733,value: {"COMPANY":"Acme Corp","PRICE":111.11}
rowtime: 2021/03/21 14:27:01.717 Z,"PRICE":111.12}
rowtime: 2021/03/21 14:27:04.546 Z,"PRICE":111.13}
Topic printing ceased
创建聚合/物化视图以显示最高股价。
ksql> create table stock_highs as
> select symbol,max(price) as high
> from stocks
> group by symbol
> emit changes;
Message
--------------------------------------------
Created query with ID CTAS_STOCK_HIGHS_115
--------------------------------------------
查询以进行目视检查。
ksql> select * from stock_highs where symbol = 'ACME';
+--------------+------------------------+
|SYMBOL |HIGH |
+--------------+------------------------+
|ACME |111.13 |
Query terminated
在单独的终端(终端 2)中使用消费者(又名打印 STOCK_HIGHS)来观察股票高价的变化。
ksql> print STOCK_HIGHS from beginning;
Key format: KAFKA_INT or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/03/21 14:27:04.546 Z,value: {"HIGH":111.13}
回到原始终端(终端 1),使用 ksql 客户端插入更多数据以强制更新库存高位。
ksql> insert into stocks (symbol,111.20);
ksql> insert into stocks (symbol,111.15);
上述插入应该给出 111.20 的新高价,而忽略 111.15 的价格,因此,在我的想法和用例中,我想从 STOCK_HIGHS 主题中获得一个事件/消息,显示 111.20 的新高股价,但是不是 111.15 的价格。但是,我在消费者(终端 2)中收到了两个新事件。
ksql> print STOCK_HIGHS from beginning;
Key format: KAFKA_INT or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/03/21 14:27:04.546 Z,value: {"HIGH":111.13}
rowtime: 2021/03/21 14:37:51.234 Z,value: {"HIGH":111.20}
rowtime: 2021/03/21 14:39:03.301 Z,value: {"HIGH":111.20}
问题是我真的只想在新高价格出现时收到通知或“事件”,这样我就可以让消费者在新高价格出现时离开并做一些有意义的事情。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。