如何解决如何并行运行TableAPI和DataStreamAPI以使用Python定义的功能
我在DataStream API上有一项工作,它运行良好,但是我需要使用计算所得的DataStream<Event>
并将其传递给TableAPI来调用注册python函数,然后将结果传递回新的DataStream
以重新处理该调用的结果。这里有两个问题,一个是我可以像这样工作:
/*DataStream Job*/
StreamExecutionEnvironment env = EnvironmentConfiguration.getEnv();
final DataStream<Event> eventsStream = RabbitMQConnector.eventStreamObject(env)
.flatMap(new RabbitMQConsumer())
.uid("cep.objects_mapper_id")
.name("Event Mapper")
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event,timestamp) -> event.timestamp.getTime()))
.name("Watermarks Added");
/*TableAPI job*/
final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(TableEnvironmentConfiguration.getEnv(),fsSettings);
fsTableEnv.getConfig().getConfiguration().setString("python.files","test.py");
fsTableEnv.getConfig().getConfiguration().setString("python.client.executable","python.exe");
fsTableEnv.getConfig().getConfiguration().setString("python.executable","python.exe");
fsTableEnv.executeSql("CREATE TEMPORARY SYSTEM FUNCTION func1 AS 'test.func1' LANGUAGE PYTHON");
SingleOutputStreamOperator<String> stream = eventsStream.map(x -> x.name);
Table source = fsTableEnv.fromDataStream(stream).as("name");
Table result = source.select("func1(name)");
DataStream<String> finalRes = fsTableEnv.toAppendStream(result,String.class);
finalRes.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value) {
LOG.info("Record from table: " + value);
}
});
env.execute(job_name);
在这个示例中,我一点问题都没有,但是python函数从不返回,我担心除非我执行result.exeute();
,否则它将永远不会被调用,那么当我从上面应用相同的示例时和之后
finalRes.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value) {
LOG.info("Record from table: " + value);
}
});
执行result.execute();
执行表,python函数起作用,但是直到TableAPI完成后才执行DataStreamAPI作业,但是由于从未初始化DataStreamAPI作业,因此使用者无法使用因此,应先发送到TableAPI然后再发送到python函数的流始终为空。
我的问题是:有什么办法可以并行运行两个作业,还是一个接一个地运行? 注意:我将创建一个TimerTask,以便在DataStreamAPI作业启动后等待一段时间,然后启动TableAPI作业(并行度为1),它似乎可以正常工作,但是TableAPI作业已创建并停止了很多次。
还有更好的方法吗? 希望有人理解我的问题。
谢谢!
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。