如何解决通过Apache Flink将SQL查询的结果写入文件
我有以下任务:
- 使用对Hive表的SQL请求创建作业;
- 在远程Flink群集上运行此作业;
- 在文件中收集此作业的结果(最好使用HDFS)。
注意
因为有必要在远程Flink群集上运行此作业,所以我无法以简单的方式使用 TableEnvironment 。这张票证中提到了此问题:https://issues.apache.org/jira/browse/FLINK-18095。对于当前解决方案,我使用http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Table-Environment-for-Remote-Execution-td35691.html中的adivce。
代码
EnvironmentSettings batchSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
// create remote env
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost",8081,"/path/to/my/jar");
// create StreamTableEnvironment
TableConfig tableConfig = new TableConfig();
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
CatalogManager catalogManager = CatalogManager.newBuilder()
.classLoader(classLoader)
.config(tableConfig.getConfiguration())
.defaultCatalog(
batchSettings.getBuiltInCatalogName(),new GenericInMemoryCatalog(
batchSettings.getBuiltInCatalogName(),batchSettings.getBuiltInDatabaseName()))
.executionConfig(
streamExecutionEnvironment.getConfig())
.build();
ModuleManager moduleManager = new ModuleManager();
BatchExecutor batchExecutor = new BatchExecutor(streamExecutionEnvironment);
FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig,catalogManager,moduleManager);
StreamTableEnvironmentImpl tableEnv = new StreamTableEnvironmentImpl(
catalogManager,moduleManager,functionCatalog,tableConfig,streamExecutionEnvironment,new BatchPlanner(batchExecutor,catalogManager),batchExecutor,false);
// configure HiveCatalog
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/path/to/hive/conf"; // a local path
HiveCatalog hive = new HiveCatalog(name,defaultDatabase,hiveConfDir);
tableEnv.registerCatalog("myhive",hive);
tableEnv.useCatalog("myhive");
// request to Hive
Table table = tableEnv.sqlQuery("select * from myhive.`default`.test");
问题
在此步骤中,我可以调用 table.execute()方法,然后通过 collect()方法获得 CloseableIterator 。但就我而言,由于请求的缘故,我可以获得大量的行,因此将其收集到文件(HDFS中的ORC)将是完美的选择。
我如何达到目标?
解决方法
Table.execute().collect()
将视图结果返回给客户端以进行交互。您可以使用文件系统连接器,并使用INSERT INTO
将视图写入文件。例如:
// create a filesystem table
tableEnvironment.executeSql("CREATE TABLE MyUserTable (\n" +
" column_name1 INT,\n" +
" column_name2 STRING,\n" +
" ..." +
" \n" +
") WITH (\n" +
" 'connector' = 'filesystem',\n" +
" 'path' = 'hdfs://path/to/your/file',\n" +
" 'format' = 'orc' \n" +
")");
// submit the job
tableEnvironment.executeSql("insert into MyUserTable select * from myhive.`default`.test");
查看有关文件系统连接器的更多信息:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。