如何解决Apache Flink SQL InvalidProgramException:所选排序键不是可排序类型
这是我的Flink SQL
SELECT t.reportCode FROM query_record_info as t LEFT JOIN credit_report_head as c ON t.reportCode = c.reportCode
当我运行它时,我错了
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Selected sort key is not a sortable type
at org.apache.flink.api.java.operators.SortPartitionOperator.ensureSortableKey(SortPartitionOperator.java:145)
at org.apache.flink.api.java.operators.SortPartitionOperator.sortPartition(SortPartitionOperator.java:111)
at org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:468)
at org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:467)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.ArrayOps$ofInt.foldLeft(ArrayOps.scala:234)
at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.partitionAndSort(DataSetJoin.scala:467)
at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.addLeftOuterJoin(DataSetJoin.scala:270)
at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:178)
at org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91)
at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:498)
at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:476)
at org.apache.flink.table.api.java.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:147)
at org.myorg.quickstart.CreditTest.main(CreditTest.java:108)
但是如果我从sql中删除LEFT
,没问题
SELECT t.reportCode FROM query_record_info as t JOIN credit_report_head as c ON t.reportCode = c.reportCode
我是flink的入门者。期待您的回复
解决方法
// 创建环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
CreditReport creditReport = JSONUtil.toBean(jsonStr,CreditReport.class);
DataSource<CreditReport> reportDataSource = env.fromElements(creditReport);
//从资源中抽取出,记录信息
FlatMapFunction<CreditReport,QueryRecordInfo> queryRecordInfoFlat = new FlatMapFunction<CreditReport,QueryRecordInfo>() {
@Override
public void flatMap(CreditReport value,Collector<QueryRecordInfo> out) throws Exception {
List<QueryRecordInfo> queryRecordInfos = creditReport.getQueryRecordInfos();
for (QueryRecordInfo queryRecordInfo : queryRecordInfos) {
out.collect(queryRecordInfo);
}
}
};
//从资源中抽取出,记录报告头
FlatMapFunction<CreditReport,CreditReportHead> queryRecordHeaderFlat = new FlatMapFunction<CreditReport,CreditReportHead>() {
@Override
public void flatMap(CreditReport value,Collector<CreditReportHead> out) throws Exception {
out.collect(value.getCreditReportHead());
}
};
DataSet<QueryRecordInfo> records = reportDataSource.flatMap(queryRecordInfoFlat);
tableEnv.createTemporaryView(QueryRecordInfo,records);
DataSet<CreditReportHead> headers = reportDataSource.flatMap(queryRecordHeaderFlat);
tableEnv.createTemporaryView(CreditReportHead,headers);
Table queryResult = tableEnv.sqlQuery("SELECT t.reportCode as reportCode,reason as reason FROM credit_report_head as t left JOIN query_record_info as c ON t.reportCode = c.reportCode");
DataSet<ReportCode> reportHeadDataSet = tableEnv.toDataSet(queryResult,ReportCode.class);
reportHeadDataSet.print();
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。