Spark基础【完善案例一、框架式开发模式再回顾】

一 完善需求一

1 存在问题

问题一:在过滤数据时同一个RDD重复使用,造成数据的重复读取

因为join可能存在笛卡尔乘积,而join底层实现就是corgroup,所以corgroup可能存在笛卡尔乘积,源码如下

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
  this.cogroup(other, partitioner).flatMapValues( pair =>
    for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
  )
}

问题二:corgroup可能存在存在shuffle【性能瓶颈】,源码如下

def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
    : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
  if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
    throw new SparkException("HashPartitioner cannot partition array keys.")
  }
  val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
  cg.mapValues { case Array(vs, w1s) =>
    (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
  }
}


class CoGroupedRDD[K: ClassTag](
    @transient var rdds: Seq[RDD[_ <: Product2[K, _]]],
    part: Partitioner)
  extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) {
override def getDependencies: Seq[Dependency[_]] = {
    rdds.map { rdd: RDD[_] =>
      if (rdd.partitioner == Some(part)) {
        logDebug("Adding one-to-one dependency with " + rdd)
        // 窄依赖不会有shuffle
        new OneToOneDependency(rdd)
      } else {
        logDebug("Adding shuffle dependency with " + rdd)
        new ShuffleDependency[K, Any, CoGroupCombiner](
          rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)
      }
    }
  }

2 需求优化

解决问题一:添加代码

fileDatas.persist(StorageLevel.MEMORY_AND_DISK)

解决问题二:

  • 思路

    	(品类ID,点击)Data
      	(品类ID,下单)Data
      	(品类ID,支付)Data
      	 最终合成(品类ID,(点击,下单,支付))
    转换思路:
    	 将3个合成1个,聚合操作也可以
    
    使用reduceByKey(虽然存在shuffle,但不会出现笛卡尔乘积)
    但是其输入输出如下:
    	(Int,Int)=>(Int),如果将数据变为以下格式,就可以使用reduceByKey
    	(点击,下单,支付),(点击,下单,支付)=>(点击,下单,支付)
    如何转换:
    	(品类ID,点击)=>(品类ID,(点击,0, 0))
    	(品类ID,下单)=>(品类ID,(0,下单, 0))
    	(品类ID,支付)=>(品类ID,(0,0, 支付))
    最终合成:
    	(品类ID,(点击,下单, 支付))
    

修改需求一中第五步,代码如下

// TODO 5 对统计结果进行排序,先点击,再下单,最后支付
// (品类ID,点击)=>(品类ID,(点击,0, 0))
// (品类ID,下单)=>(品类ID,(0,下单, 0))
// (品类ID,支付)=>(品类ID,(0,0, 支付))
val clickMapDatas: RDD[(String, (Int, Int, Int))] = clickCntDatas.map {
  case (cid, cnt) => {
    (cid, (cnt, 0, 0))
  }
}
val orderMapDatas: RDD[(String, (Int, Int, Int))] = orderCntDatas.map {
  case (cid, cnt) => {
    (cid, (0, cnt, 0))
  }
}
val payMapDatas: RDD[(String, (Int, Int, Int))] = payCntDatas.map {
  case (cid, cnt) => {
    (cid, (0, 0, cnt))
  }
}
// 将三个独立的数据集合并在一起,用于reduceByKey
val unionRDD: RDD[(String, (Int, Int, Int))] = clickMapDatas.union(orderMapDatas).union(payMapDatas)

val reduceRDD: RDD[(String, (Int, Int, Int))] = unionRDD.reduceByKey(
  (t1, t2) => {
    (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
  }
)
val top10: Array[(String, (Int, Int, Int))] = reduceRDD.sortBy(_._2,false).take(10)

二 进一步优化

以上程序中存在四次reduceByKey,意味着存在四次shuffle

数据的最终表示形式:
	(品类ID,(点击,下单, 支付))
由以下数据转换而来:
	(品类ID,点击)=>(品类ID,(点击,0, 0))
	(品类ID,下单)=>(品类ID,(0,下单, 0))
	(品类ID,支付)=>(品类ID,(0,0, 支付))
每组数据中,点击,下单,支付都为sum
若不进行求和,可减少三次reduceByKey,减少三次shuffle
	(品类ID,点击)=>(品类ID,(1,0, 0))
	(品类ID,点击)=>(品类ID,(1,0, 0))
	(品类ID,点击)=>(品类ID,(1,0, 0))
	(品类ID,下单)=>(品类ID,(0,1, 0))
	(品类ID,下单)=>(品类ID,(0,1, 0))
	(品类ID,下单)=>(品类ID,(0,1, 0))
	(品类ID,支付)=>(品类ID,(0,0, 1))
	(品类ID,支付)=>(品类ID,(0,0, 1))
	(品类ID,支付)=>(品类ID,(0,0, 1))
将每一条数据转换成以上三类即可,同时减少了过滤操作

代码如下

def main(args: Array[String]): Unit = {

  val conf = new SparkConf().setMaster("local").setAppName("HotCategoryTop10")
  val sc = new SparkContext(conf)

  // TODO  1 读取文件,获取原始数据
  val fileDatas: RDD[String] = sc.textFile("data/user_visit_action.txt")

  // flatMap要求返回的必须是一个可以迭代的集合
  val flatMap: RDD[(String, (Int, Int, Int))] = fileDatas.flatMap(
    data => {
      val datas: mutable.ArrayOps.ofRef[String] = data.split("_")
      // 点击数据的场景
      if (datas(6) != "-1") {
        List((datas(6), (1, 0, 0)))
        // 下单数据的场景
      } else if (datas(8) != "null") {
        val id: String = datas(8)
        val ids: Array[String] = id.split(",")
        ids.map({
          id => (id, (0, 1, 0))
        })
        // 支付数据场景
      } else if (datas(10) != "null") {
        val id: String = datas(10)
        val ids: Array[String] = id.split(",")
        ids.map({
          id => (id, (0, 0, 1))
        })
      } else {
        Nil
      }
    }
  )
  val top10: Array[(String, (Int, Int, Int))] = flatMap.reduceByKey(
    (t1, t2) => {
      (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
    }
  ).sortBy(_._2, false).take(10)

  top10.foreach(println)

  sc.stop()
}

三 使用累加器完成需求一

自定义累加器实现需求一

def main(args: Array[String]): Unit = {

  val conf = new SparkConf().setMaster("local").setAppName("HotCategoryTop10")
  val sc = new SparkContext(conf)

  // TODO  1 读取文件,获取原始数据
  val fileDatas: RDD[String] = sc.textFile("data/user_visit_action.txt")

  // 创建累加器对象
  val acc = new HotCategoryAccumulator()
  // 注册累加器
  sc.register(acc,"HotCategory")

  fileDatas.foreach(
    data => {
      val datas: Array[String] = data.split("_")
      if( datas(6) != "-1"){
        // 点击的场景
        acc.add( (datas(6),"click"))
      }else if ( datas(8) != "null"){
        // 下单的场景
        val id: String = datas(8)
        val ids: Array[String] = id.split(",")
        ids.foreach(
          id => {
            acc.add(id,"order")
          }
        )
      } else if(datas(10) != "null"){
        // 支付的场景
        val id: String = datas(10)
        val ids: Array[String] = id.split(",")
        ids.foreach(
          id => {
            acc.add(id,"pay")
          }
        )
      }
    }
  )

  // TODO 获取累加器结果
  val resultMap: mutable.Map[String, HotCategoryCount] = acc.value
  
  val top10: List[HotCategoryCount] = resultMap.map(_._2).toList.sortWith(
    (left, right) => {
      if (left.clickCnt > right.clickCnt) {
        true
      } else if (left.clickCnt == right.clickCnt) {
        if (left.orderCnt > right.orderCnt) {
          true
        } else if (left.clickCnt == right.clickCnt) {
          left.payCnt > right.payCnt
        } else {
          false
        }
      } else {
        false
      }
    }
  ).take(10)

  top10.foreach(println)

  sc.stop()
}

case class HotCategoryCount ( cid:String, var clickCnt : Int, var orderCnt : Int, var payCnt : Int)

// TODO 自定义热门点击累加器
// 1 继承AccumulatorV2
// 2 定义泛型
//      IN:   (品类ID,行为类型)
//      OUT:   Map[品类ID,HotCategoryCount]
// 3 重写方法(3 + 3)
class HotCategoryAccumulator extends AccumulatorV2[(String,String), mutable.Map[String,HotCategoryCount] ]{

  private val map = mutable.Map[String,HotCategoryCount]()

  override def isZero: Boolean = {
    map.isEmpty
  }

  override def copy(): AccumulatorV2[(String, String), mutable.Map[String, HotCategoryCount]] = {
    new HotCategoryAccumulator()
  }

  override def reset(): Unit = {
    map.clear()
  }

  override def add(v: (String, String)): Unit = {
    val (cid,actionType) = v
    val hcc: HotCategoryCount = map.getOrElse(cid,HotCategoryCount(cid,0,0,0))
    actionType match {
      case "click" => hcc.clickCnt += 1
      case "order" => hcc.orderCnt += 1
      case "pay" => hcc.payCnt += 1
    }
    map.update(cid,hcc)
  }

  override def merge(other: AccumulatorV2[(String, String), mutable.Map[String, HotCategoryCount]]): Unit = {
    other.value.foreach{
      case (cid, otherHcc) => {
        val thisHcc: HotCategoryCount = map.getOrElse(cid,HotCategoryCount(cid,0,0,0))
        thisHcc.clickCnt += otherHcc.clickCnt
        thisHcc.orderCnt += otherHcc.orderCnt
        thisHcc.payCnt += otherHcc.payCnt
        map.update(cid,thisHcc)
      }
    }
  }

  override def value: mutable.Map[String, HotCategoryCount] = {
    map
  }
}

四 框架式开发模式

将需求一更改为框架式开发模式,以便将需求一的结果传给其他应用使用

1 Application

object HotCategoryTop10Application extends App with TApplication {

  execute(appName = "HotCategoryTop10") {
    //将用户请求传递给Controller层
    val controller = new HotCategoryTop10Controller
    //获取从Controller层传回的结果
    controller.dispatch()
  }
}

2 Controller

class HotCategoryTop10Controller extends TController{

  private val hotCategoryTop10Service = new HotCategoryTop10Service

  override def dispatch(): Unit = {
    val result: Array[(String, (Int, Int, Int))] = hotCategoryTop10Service.analysis()
    result.foreach(println)
  }
}

3 Service

class HotCategoryTop10Service extends TService{
  private val hotCategoryTop10Dao = new HotCategoryTop10Dao

  override def analysis() = {

    val fileDatas: RDD[String] = hotCategoryTop10Dao.readFileBySpark("data/user_visit_action.txt")

    // flatMap要求返回的必须是一个可以迭代的集合
    val flatMap: RDD[(String, (Int, Int, Int))] = fileDatas.flatMap(
      data => {
        val datas: mutable.ArrayOps.ofRef[String] = data.split("_")
        // 点击数据的场景
        if (datas(6) != "-1") {
          List((datas(6), (1, 0, 0)))
          // 下单数据的场景
        } else if (datas(8) != null) {
          val id: String = datas(8)
          val ids: Array[String] = id.split(",")
          ids.map({
            id => (id, (0, 1, 0))
          })
          // 支付数据场景
        } else if (datas(10) != null) {
          val id: String = datas(10)
          val ids: Array[String] = id.split(",")
          ids.map({
            id => (id, (0, 0, 1))
          })
        } else {
          Nil
        }
      }
    )
    val top10: Array[(String, (Int, Int, Int))] = flatMap.reduceByKey(
      (t1, t2) => {
        (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
      }
    ).sortBy(_._2, false).take(10)
    top10
  }
}

4 Dao

class HotCategoryTop10Dao extends TDao{

}

5 TApplication

//抽取Application层的通用方法
trait TApplication {
  def execute(master:String = "local[*]",appName:String)(op: => Unit): Unit = {

    val conf = new SparkConf().setMaster(master).setAppName(appName)
    val sc = new SparkContext(conf)
    EnvCache.put(sc)
    try {
      op
    } catch {
      case e: Exception => e.printStackTrace()
    }

    sc.stop()
    EnvCache.clear()
  }

}

6 TController层

//Controller层的运行方法,由实现类实现
trait TController {
  def dispatch(): Unit
}

7 TDao

trait TDao {
  //读取文件,返回文件内容
  def readFile(path: String) = {
    //EnvCache.get获取文件根目录
    val source: BufferedSource = Source.fromFile(EnvCache.get() + path)
    val lines = source.getLines().toList
    source.close()
    lines
  }

  def readFileBySpark(path: String) = {
    //EnvCache.get获取文件根目录
    EnvCache.get().asInstanceOf[SparkContext].textFile(path)
  }
}

8 TService

//service层的执行方法,由实现类实现
trait TService {
  def analysis(): Any = {

  }
  def analysis(data : Any ): Any = {

  }
}

9 util

object EnvCache {
  //在线程中开辟一块空间,这个空间内的数据可以供任意层取出
  //ThreadLocal不能解决线程安全问题,只是共享数据
  private val envCache: ThreadLocal[Object] = new ThreadLocal[Object]

  def put(data: Object): Unit = {
    envCache.set(data)
  }

  def get() = {
    envCache.get()
  }

  def clear(): Unit = {
    envCache.remove()
  }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


学习编程是顺着互联网的发展潮流,是一件好事。新手如何学习编程?其实不难,不过在学习编程之前你得先了解你的目的是什么?这个很重要,因为目的决定你的发展方向、决定你的发展速度。
IT行业是什么工作做什么?IT行业的工作有:产品策划类、页面设计类、前端与移动、开发与测试、营销推广类、数据运营类、运营维护类、游戏相关类等,根据不同的分类下面有细分了不同的岗位。
女生学Java好就业吗?女生适合学Java编程吗?目前有不少女生学习Java开发,但要结合自身的情况,先了解自己适不适合去学习Java,不要盲目的选择不适合自己的Java培训班进行学习。只要肯下功夫钻研,多看、多想、多练
Can’t connect to local MySQL server through socket \'/var/lib/mysql/mysql.sock问题 1.进入mysql路径
oracle基本命令 一、登录操作 1.管理员登录 # 管理员登录 sqlplus / as sysdba 2.普通用户登录
一、背景 因为项目中需要通北京网络,所以需要连vpn,但是服务器有时候会断掉,所以写个shell脚本每五分钟去判断是否连接,于是就有下面的shell脚本。
BETWEEN 操作符选取介于两个值之间的数据范围内的值。这些值可以是数值、文本或者日期。
假如你已经使用过苹果开发者中心上架app,你肯定知道在苹果开发者中心的web界面,无法直接提交ipa文件,而是需要使用第三方工具,将ipa文件上传到构建版本,开...
下面的 SQL 语句指定了两个别名,一个是 name 列的别名,一个是 country 列的别名。**提示:**如果列名称包含空格,要求使用双引号或方括号:
在使用H5混合开发的app打包后,需要将ipa文件上传到appstore进行发布,就需要去苹果开发者中心进行发布。​
+----+--------------+---------------------------+-------+---------+
数组的声明并不是声明一个个单独的变量,比如 number0、number1、...、number99,而是声明一个数组变量,比如 numbers,然后使用 nu...
第一步:到appuploader官网下载辅助工具和iCloud驱动,使用前面创建的AppID登录。
如需删除表中的列,请使用下面的语法(请注意,某些数据库系统不允许这种在数据库表中删除列的方式):
前不久在制作win11pe,制作了一版,1.26GB,太大了,不满意,想再裁剪下,发现这次dism mount正常,commit或discard巨慢,以前都很快...
赛门铁克各个版本概览:https://knowledge.broadcom.com/external/article?legacyId=tech163829
实测Python 3.6.6用pip 21.3.1,再高就报错了,Python 3.10.7用pip 22.3.1是可以的
Broadcom Corporation (博通公司,股票代号AVGO)是全球领先的有线和无线通信半导体公司。其产品实现向家庭、 办公室和移动环境以及在这些环境...
发现个问题,server2016上安装了c4d这些版本,低版本的正常显示窗格,但红色圈出的高版本c4d打开后不显示窗格,
TAT:https://cloud.tencent.com/document/product/1340