Flink将实时产生的数据流式写入到hdfs中
import cn.itcast.day03.source.custom.MyNoParallelSource;
import cn.itcast.day03.source.custom.Order;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.sql.Time;
import java.util.concurrent.TimeUnit;
/**
* 将实时产生的数据流式写入到hdfs中
*/
public class StreamHDFSDemo {
public static void main(String[] args) throws Exception {
/**
* 实现步骤:
* 1)创建flink流处理的运行环境
* 2)构建数据源(自定义数据源)
* 3)开启checkpoint
* 4)设置一个并行度写入数据
* 5)数据实时写入到hdfs(指定分桶策略和滚动策略)
* 6)递交作业执行
*/
//TODO 1)创建flink流处理的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//TODO 2)构建数据源(自定义数据源)
DataStreamSource<Order> orderDataStreamSource = env.addSource(new MyNoParallelSource());
//TODO 3)开启checkpoint
env.enableCheckpointing(5000);//每隔五秒钟进行一次checkpoint(向所有的算子发送栅栏barrier,先想source算子发。。。),周期性执行
//TODO 4)设置一个并行度写入数据
env.setParallelism(1);
//TODO 5)数据实时写入到hdfs(指定分桶策略和滚动策略)
//指定写入数据的路径
String outputPath = "hdfs://node1:8020/test/output/streamingfile";
//定义写入文件的名称和格式
OutputFileConfig config = OutputFileConfig.builder()
.withPartPrefix("order")
.withPartSuffix(".txt")
.build();
//指定行编码,一次写入一行数据
StreamingFileSink streamingFileSink = StreamingFileSink.forRowFormat(new Path(outputPath), new SimpleStringEncoder<>("utf-8"))
/**
* 指定分桶策略:
* DateTimeBucketAssigner:默认的分桶策略,默认基于时间的分配器,每小时产生一个桶,指定时间格式:yyyy-MM-dd-HH
* BasePathBucketAssigner:把所有的文件放到一个基本路径下的分配器(全局桶)
*/
.withBucketAssigner(new DateTimeBucketAssigner<>())
/**
* 指定滚动策略:
* DefaultRollingPolicy
* CheckpointRollingPolicy
* OnCheckpointRollingPolicy
*/
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.SECONDS.toMillis(5)) //设置滚动时间间隔,每5秒钟产生一个文件
.withInactivityInterval(TimeUnit.SECONDS.toMillis(2)) //设置不活动的时间间隔,未写入数据处于不活动状态时滚动文件
.withMaxPartSize(1024*1024*1024)//设置文件大小,达到一个1G大小时滚动文件
.build()
).withOutputFileConfig(config).build();
orderDataStreamSource.addSink(streamingFileSink);
//TODO 6)递交作业执行
env.execute();
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。