如何解决尚不支持处理时间临时连接
我使用的是 Flink 1.12.0,我正在阅读 https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html#processing-time-temporal-join。看起来支持处理时间临时连接。
但是,当我运行以下应用程序时,它会抱怨 Processing-time temporal join is not supported yet
。我很困惑是代码错误还是Flink真的不支持Processing-time temporal join。
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tenv = StreamTableEnvironment.create(env)
val ddl1 =
"""
create table s1(
id STRING,tradeDate TIMESTAMP,price DOUBLE,pt as PROCTIME()
) with (
'connector' = 'filesystem','path' = 'D:/T018_LookupJoin_stock.csv','format' = 'csv'
)
""".stripMargin(' ')
tenv.executeSql(ddl1)
val ddl2 =
"""
create table s2(
id STRING primary key not enforced,name STRING,tradeDate TIMESTAMP
) with (
'connector' = 'filesystem','path' = 'D:/T018_LookupJoin_stocktimechanging.csv','format' = 'csv'
)
""".stripMargin(' ')
tenv.executeSql(ddl2)
val sql =
"""
select s1.id,s1.price,s1.tradeDate,u.name,u.tradeDate
from s1 join s2 for SYSTEM_TIME as of s1.pt as u
on s1.id = u.id
""".stripMargin(' ')
tenv.sqlQuery(sql).toAppendStream[Row].print()
env.execute()
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。