Spark流式状态管理updateStateByKey、mapWithState等

通常使用Spark的流式框架如Spark Streaming,做无状态的流式计算是非常方便的,仅需处理每个批次时间间隔内的数据即可,不需要关注之前的数据,这是建立在业务需求对批次之间的数据没有联系的基础之上的。

但如果我们要跨批次做一些数据统计,比如batch是3秒,但要统计每1分钟的用户行为,那么就要在整个流式链条中维护一个状态来保存近1分钟的用户行为。

那么如果维护这样一个状态呢?一般情况下,主要通过以下几种方式:

1. spark内置算子:updateStateByKey、mapWithState

2. 第三方存储系统维护状态:如redis、alluxio、HBase这里主要以spark内置算子:updateStateByKey、mapWithState为例,通过一些示例代码(不涉及offset管理),来看看如何进行状态维护。

 

updateStateByKey

分析相关源码发现,这个算子的核心思想就是将之前有状态的RDD和当前的RDD做一次cogroup,得到一个新的状态的RDD,具有如下特点:

1. 可以设置初始状态

2. key超时删除。用updatefunc返回None值。updateFunc不管是否有已保存状态key的新数据到来,都会被已存在状态的key调用,新增的key也会调用3. 不适合大数据量状态存储,尤其是key的维度比较高、value状态比较大的

/**
* @author:微信公众号:大数据学习与分享
*/
object StateOperator {

  private val brokers = "kafka-1:9092,kafka-2:9092,kafka-3:9092"
  private val topics = "test"
  private val groupId = "test"
  private val batchTime = "10"
  private val mapwithstateCKDir = "/mapwithstate"
  private val updateStateByKeyCKDir = "/mapwithstate"

  def main(args: Array[String]): Unit = {
    
    val ssc = StreamingContext.getOrCreate(updateStateByKeyCKDir,() => createContext(brokers,topics,groupId,batchTime,updateStateByKeyCKDir))

    ssc.start()
    ssc.awaitTermination()
  }

  def createContext(brokers: String,topics: String,groupId: String,batchTime: String,checkpointDirectory: String): StreamingContext = {

    val conf = new SparkConf().setAppName("testState").setMaster("local[*]")
      .set("spark.streaming.kafka.maxRatePerPartition","5")
      .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
    val ssc = new StreamingContext(conf,Seconds(batchTime.toInt))

    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String,String]("metadata.broker.list" -> brokers,"group.id" -> groupId,"auto.offset.reset" -> "smallest")

    val streams = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicsSet)
      .map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)

    ssc.checkpoint("/redis/updateStateByKey")

    val initialRDD = ssc.sparkContext.parallelize(List(("word",0)))

    //updateStateByKey 底层核心是对preStateRDD(之前数据状态的RDD)和当前批次的RDD进行cogroup
    val stateStreams = streams.updateStateByKey(updateFunc,new HashPartitioner(ssc.sparkContext.defaultParallelism),true,initialRDD)

    stateStreams.checkpoint(Duration(5))

    stateStreams.foreachRDD { rdd =>
      val res = rdd.map { case (word,count) => (count,word) }.sortByKey(false).take(10).map { case (v,k) => (k,v) }
      res.foreach(println)
    }

    ssc.checkpoint(checkpointDirectory)
    ssc
  }

  //无论当前批次RDD有多少key(比如preStateRDD有而当前批次没有)都需要对所有的数据进行cogroup并调用一次定义的updateFunc函数
  val updateFunc = (iterator: Iterator[(String,Seq[Int],Option[Int])]) => {
    iterator.flatMap(t => Some(t._2.sum + t._3.getOrElse(0)).map(v => (t._1,v)))
  }

}

 

通过updateStateByKey获得的是整个状态的数据,而且每次状态更新时都要将当前批次过来的数据与之前保存的状态进行cogroup操作,并且对所有数据都调用自定义的函数进行一次计算。

随着时间推移,数据量不断增长,需要维护的状态越来越大,会非常影响性能。如果不能在当前批次将数据处理完成,很容易造成数据堆积,影响程序稳定运行甚至宕掉,这就引出了mapWithState。

 

mapWithState

支持输出全量的状态和更新的状态,还支持对状态超时管理,用户可以根据业务需求选择需要的输出,性能优于于updateStateByKey。

def main(args: Array[String]): Unit = {
    //单词统计
    val ssc = StreamingContext.getOrCreate(mapwithstateCKDir,mapwithstateCKDir))

    ssc.start()
    ssc.awaitTermination()
}

def createContext(brokers: String,checkpointDirectory: String): StreamingContext = {

 val conf = new SparkConf().setAppName("testState").setMaster("local[*]")
   .set("spark.streaming.kafka.maxRatePerPartition","5")
   .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
 val ssc = new StreamingContext(conf,Seconds(batchTime.toInt))

 val topicsSet = topics.split(",").toSet
 val kafkaParams = Map[String,"auto.offset.reset" -> "smallest")

 val messages = KafkaUtils.createDirectStream[String,1L)).reduceByKey(_ + _)

   val stateStreams = messages.mapWithState(StateSpec.function(mapFunc).timeout(Seconds(60))).stateSnapshots()
   //.checkpoint(Duration(5))

   stateStreams.foreachRDD { (rdd,time) =>
     println("========do something")
   }

   ssc.checkpoint(checkpointDirectory)
   ssc
 }

 //key为word,value为当前批次值,state为本批次之前的状态值
 val mapFunc = (key: String,value: Option[Long],state: State[Long]) => {
   //检测是否过期
   if (state.isTimingOut()) {
     println(s"$key is timing out")
   } else {
     val sum = state.getOption().getOrElse(0L) + value.getOrElse(0L)
     val output = (key,sum)
     //更新状态
     state.update(sum)
     output
   }
 }
 
 val mapFunction = (time: Time,word: String,count: Option[Int],state: State[Int]) => {
   val sum = count.getOrElse(0) + state.getOption().getOrElse(0)
   val output = (word,sum)
   state.update(sum)
   Option(output)
 }

 

虽然mapWithState相对于updateStateByKey性能更优,但仍然不适合大数据量的状态维护,此时就需要借用第三方存储来进行状态的维护了,redis、alluxio、HBase是常用的选择。

redis比较适合维护key具有超时处理机制的场景使用;alluxio的吞吐量更高,适合于数据量更大时的场景处理。

具体采用哪种方式,要结合实际的业务场景、数据量、性能等多方面的考量。

 

关注微信公众号:大数据学习与分享,获取更对技术干货

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


1.SparkStreaming是什么?SparkStreaming是SparkCore的扩展API用来支持高吞吐、高容错的处理流式数据数据源可以是:Kafka、TCPsockets、Flume、Twitter等流式数据源处理数据:可以用SparkCore的算子map、reduce、join、window
本篇内容介绍了“Spark通讯录相似度计算怎么实现”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这...
本篇文章给大家分享的是有关如何进行Spark数据分析,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说...
本篇内容主要讲解“Spark Shuffle和Hadoop Shuffle有哪些区别”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“S...
这篇文章主要介绍“TSDB的数据怎么利用Hadoop/spark集群做数据分析”,在日常操作中,相信很多人在TSDB的数据怎么利用Hadoop/spark集群做数据分析问题上存在疑惑...
本篇内容介绍了“Hadoop与Spark性能原理是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这
小编给大家分享一下Hadoop和Spark有什么不同,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们
这篇文章主要讲解了“Hadoop和Spark的Shuffle过程有什么不同”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习...
本篇文章给大家分享的是有关基于CDP7.1.1的Spark3.0技术预览版本分析是怎样的,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获...
这篇文章主要介绍“Spark中foreachRDD、foreachPartition和foreach的区别是什么”,在日常操作中,相信很多人在Spark中foreachRDD、foreachPartition和foreach的...
本篇内容主要讲解“spark的动态分区裁剪怎么实现”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“spark的动态分...
本篇内容介绍了“spark的动态分区裁剪下物理计划怎么实现”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下
这篇文章给大家介绍基于Spark和TensorFlow 的机器学习实践是怎么样的,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。EMR E-Learning平台...
这篇文章将为大家详细讲解有关如何进行EMR Spark-SQL性能极致优化的分析,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识...
如何进行SparkSQL与Hive metastore Parquet转换的分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决...
如何浅析Hive和Spark SQL读文件时的输入任务划分,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个...
这篇文章将为大家详细讲解有关Hive on Spark参数如何调优,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。前言Hive on Spa...
这篇文章将为大家详细讲解有关fs.defaultFS变更使spark-sql查询hive失败是怎么回事,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以...
这篇文章将为大家详细讲解有关怎么解析SparkCore和SparkSQL,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解...
怎么快速搭建Spark开发环境,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。一,搭建本...