总览:
import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.scala.{DataStream,StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment,tableConversions} import trigger.StockPrice import org.apache.flink.api.scala.createTypeInformation import java.text.SimpleDateFormat import java.util.{Date,Properties} object kafkaToMysql { def main(args: Array[String]): Unit = { //create env val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //set env val envSettings = EnvironmentSettings .newInstance() .useBlinkPlanner() .inStreamingMode() .build() //create table env val tableEnv = StreamTableEnvironment.create(env,envSettings) // create ds,connect kafka //for kafka connect val kafkaProps = new Properties() //set ip kafkaProps.setProperty("bootstrap.servers","10.10.10.10:9092") //group kafkaProps.setProperty("group.id","gksk-bigdata") val kafkaSource = new FlinkKafkaConsumer[(String)]("stockPrice",new SimpleStringSchema,kafkaProps) //set offset kafkaSource.setStartFromEarliest() //auto commit offset kafkaSource.setCommitOffsetsOnCheckpoints(true) //band datasource val ds = env.addSource(kafkaSource) val splited_stream= ds.map(s => s.split(",")) .map(s => (s(0).toString,s(1).toDouble,s(2).toLong)) // println("kafka_stream") // splited_stream.print() // println("kafka_stream——end") //create datasource table val datasource_table = tableEnv.fromDataStream(splited_stream) //create flink table val sinkSQL = """ |create table stock_price_flink |( |code varchar(20) null,|price Double null,|sale_date BigInt null |) with( |'connector.type'='jdbc',|'connector.url'='jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC',|'connector.table'='stock_price',|'connector.driver'='com.mysql.jdbc.Driver',|'connector.username'='root',|'connector.password'='root' |) |""".stripMargin // execute the create table sql tableEnv.executeSql(sinkSQL) //register table val myStock_price_table = tableEnv.from("stock_price_flink") //query val result = tableEnv.sqlQuery(s"select * from $myStock_price_table") //print result.toRetractStream[(String,Double,Long)].print() //insert data datasource_table.executeInsert("stock_price_flink") env.execute() } }
Column types of query result and sink for registered table 'default_catalog.default_database.stock_price_flink' do not match.
Query result schema: f0:String
TableSink schema: [code:String,price:Double,sale_date:Date]
原因:数据流中的数据为string,但是数据库中是3个字段
解决:将数据流转换
Task not serializable
原因:
val splited_stream= ds.map(s => s.split(",")) .map(s => Stock_Price_table(s(0),new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(s(2) )))
原文地址:https://blog.csdn.net/hzp666/article/details/128828198
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。