大数据编程期末大作业2023

目录

一、Hadoop基础操作

二、RDD编程

三、Spark SQL编程

四、Spark Streaming编程

五、Flume的安装配置


一、Hadoop基础操作

按要求完成以下操作:

1、在HDFS中创建目录 /user/root/你的名字。  例如:李四同学  /user/root/lisi,后同。

首先需要启动hdfs,在终端输入如下命令:

start-dfs.sh

在终端输入如下命令创建目录:

hdfs dfs -mkdir -p /user/root/***(你自己的名字全拼,下同)

2、创建本地文件lisi.txt,文件内容为包括Lisi love Hadoop等其他任意输入的6行英文句子,并将该文件上传到HDFS中第1题所创建的目录中。

在终端的root目录下面创建本地文件并输入题目要求的内容:

vim ***.txt

然后我们再在终端输入上传命令:

hdfs dfs -put ***.txt /user/root/***

3、查看上传到HDFS中的lisi.txt文件的内容。

直接在终端输入查看命令:

hdfs dfs -cat /user/root/***/***.txt

4、在Hadoop官方的示例程序包hadoop-mapreduce-examples-3.1.3.jar中,包括计算Pi值的测试模块,使用hadoop jar命令提交计算Pi的MapReduce任务。

首先,启动yarn,命令如下:

start-yarn.sh

然后进入到hadoop下的mapreduce目录中: 

cd /usr/local/servers/hadoop/share/hadoop/mapreduce/

最后执行如下命令即可计算Pi: 

hadoop jar hadoop-mapreduce-examples-3.1.3.jar pi 4 4

可以看出精度不是很高,上面命令后面的两个数字含义是,第一个4是运行4次map任务,第二个4是每个map任务投掷次数,总投掷次数就是两者相乘,想要提高精度就可以让数字变大,但是很容易出现作业计算失败的异常,这是因为计算内存不够,所以不能调的太大。 

5、直接将第4题的计算结果保存到/user/root/lisi目录中lisiPi文件里。

先将计算结果保存到本地系统home目录下:

hadoop jar hadoop-mapreduce-examples-3.1.3.jar pi 4 4 > /home/zhanghc/***PI.txt

然后将***PI.txt文件上传到HDFS的“/user/root/***”目录下并查看结果:

hdfs dfs -put /home/zhanghc/***PI.txt /user/root/***
hdfs dfs -cat /user/root/***/***PI.txt

二、RDD编程

现有一份2019年我国部分省份高考分数线数据文件exam2019.csv,共有四个数据字段,字段说明如表1所示:

表1 高考分数线数据字段说明

字段名称

说明

地区

省、直辖市或自治区

考生类别

考生报考类别,如理科

批次

划定的学校级别,如本科批次

分数线

达到所属批次的最低分

为了解2019年全国各地的高考分数线情况,请使用Spark编程,完成以下需求:

1、读取exam2019.csv并创建RDD。

首先将该文件上传到我们的终端,我是放在主目录“/home/zhanghc”下的。

然后启动pyspark:

pyspark

再读取我们的文件并创建RDD: 

>>> data = sc.textFile("file:///home/zhanghc/exam2019.csv")

2、查找出各地区本科批次的分数线。

# 对RDD数据进行map操作,拆分每一行数据
>>> data_map = data.map(lambda x: x.split(","))

# 对拆分后的RDD进行filter操作,过滤出本科的数据
>>> data_filter = data_map.filter(lambda x: x[2] == '本科批')

# 对过滤后的RDD进行map操作,抽取出地区和分数线
>>> data_result = data_filter.map(lambda x:(x[0],x[3]))

# 对抽取后的RDD进行reduceByKey操作,按地区进行分组
>>> data_reduce = data_result.reduceByKey(lambda x,y:x+','+y)

# 打印结果
>>> data_reduce.collect()

3、将结果以文本格式存储到HDFS上,命名为/user/root/你的名字exam2019。

>>> data_reduce.saveAsTextFile("hdfs://localhost:9000/user/root/***exam2019")

查看上传到HDFS的“***exam2019”中的文件内容:

三、Spark SQL编程

某餐饮企业预备使用大数据技术对过往餐饮点评大数据进行分析以提高服务与菜品质量,实现服务升级,具体情况如下:现有一份顾客对某城市餐饮店的点评数据restaurant.csv,记录了不同类别餐饮店在口味、环境、服务等方面的评分,数据共有12列,前10列数据字段的说明如表2所示,最后两列的数据为空则不描述。

表2 顾客对某城市餐饮店的点评数据字段说明

字段名称

字段名称

类别

餐饮店类别

行政区

餐饮店所在位置区域

点评数

有多少人进行了点评

口味

口味评分

环境

环境评分

服务

服务评分

人均消费

人均消费(单位:元)

城市

餐饮店所在城市

Lng

经度

Lat

纬度

为探究人们对该城市餐饮店的点评分布情况,分析客户在餐饮方面的消费喜好,请使用Spark SQL进行编程,完成如下需求:

1、读取restaurant.csv数据,删除最后为空值的两列,再删除含有空值的行。

# 读取文件
>>> df = spark.read.csv("file:///home/zhanghc/restaurant.csv",header=True)

# 删除最后两列
>>> df = df.drop(df._c10).drop(df._c11)

# 删除含有空值的行
>>> df = df.na.drop()

# 查看结果
>>> df.show()

2、筛选出口味评分大于7分的数据。

>>> result1 = df.filter(df.口味 > 7)
>>> result1.show()

3、统计各类别餐饮店点评数,并按降序排列。

# 选出需要的列,转换成rdd
>>> dps = df.select('类别','点评数').rdd

# 计算每种类别餐饮点评数的总和
>>> dps = dps.map(lambda x:(x[0],int(x[1]))).reduceByKey(lambda x,y: x+y).sortBy(lambda x: x[1],False)

# 将计算得出的表格标签进行修改
>>> dps = dps.toDF().withColumnRenamed('_1','类别').withColumnRenamed('_2','点评数')

显示结果
>>> dps.show()

 

4、将步骤2和步骤3的结果保存到HDFS上,命名为/user/root/你的名字restaurant。

>>> result1.rdd.saveAsTextFile("hdfs://localhost:9000/user/root/***restaurant1")
>>> dps.rdd.saveAsTextFile("hdfs://localhost:9000/user/root/***restaurant2")

查看上传到HDFS的“***restaurant”中的文件内容:

四、Spark Streaming编程

现有一份某饭店的菜单数据文件menu.txt,部分数据如表3所示,每一行有3个字段,分别表示菜品ID、菜名和单价(单位:元)。

表3 某饭店的菜单数据

1 香菇肥牛 58

2 麻婆豆腐 32

3 红烧茄子 15

4 小炒凉粉 16

5 京酱肉丝 22

6 剁椒鱼头 48

7 土豆炖鸡 38

8 锅巴香虾 66

一位顾客依次点了麻婆豆腐、土豆炖鸡、红烧茄子和香菇肥牛共4个菜,为实时计算顾客点餐的费用,请使用Spark streaming 编程完成以下操作:

1、在虚拟机上启动8888端口。

直接在终端输入如下命令:

nc -lk 8888

2、使用Spark streaming连接虚拟机的8888端口,并实时统计顾客点餐的总费用。

创建一个py程序名为prizeSum.py,并填入如下代码:

vi prizeSum.py
# /home/zhanghc/prizeSum.py
from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import sys

# 从sys.argv中获取主机名和端口号
if len(sys.argv) != 3:
    print("Usage:prizeSum.py <hostname> <port>",file=sys.stderr)
    exit(-1)

# 创建SparkContext
sc = SparkContext(appName="pythonSparkStreamingPrizeSum")

# 创建StreamingContext
ssc = StreamingContext(sc,5)

# 创建函数,实现累加
def accumulate(values,sums):
    return sum(values) + (sums or 0)

# 设置检查点目录
ssc.checkpoint("file:///home/zhanghc/")

initialStateRDD = sc.parallelize([])

# 从指定的主机和端口接收数据流
lines = ssc.socketTextStream(sys.argv[1],int(sys.argv[2]))

# 将数据流中的每一行转换为一个元组
costs= lines.map(lambda x: x.split(" "))

# 将每一行的价格累加
totalCost = costs.map(lambda x: ("总价",int(x[2]))).updateStateByKey(accumulate,initialRDD=initialStateRDD)
totalCost.map(lambda x: x.values())

# 打印结果
totalCost.pprint()

# 启动Streaming处理流
ssc.start()

# 等待程序终止
ssc.awaitTermination()

3、启动Spark streaming程序,在8888端口输入顾客所点的菜单数据,如“3 红烧茄子15”,查看顾客本次点餐的总费用。

启动prizeSum.py程序:

spark-submit prizeSum.py localhost 8888

五、Flume的安装配置

Flume是非常流行的日志采集系统,可以作为Spark Streaming的高级数据源。请到Flume官网下载Flume安装文件(版本不限),并将其安装到你的系统中。要求把Flume Source设置为netcat类型,从终端上不断给Flume Source发送各种消息,Flume把消息汇集到Sink(这里把Sink类型设置为avro),由Sink把消息推送给Spark Streaming,由自己编写的Spark Streaming应用程序对消息进行处理。

1、安装Flume

(1)下载Flume:

到Flume官网下载Flume1.7.0安装文件,下载地址如下:

http://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz

下载完成后上传到虚拟机的“/usr/local/uploads”目录下。

(2)解压安装包:

首先进入到“uploads”目录下。

tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /usr/local     #解压到“/usr/local”目录下
cd /usr/local
mv apache-flume-1.7.0-bin flume     #将解压的文件修改名字为flume,简化操作
chown -R hadoop:hadoop ./flume      #把/usr/local/flume目录的权限赋予当前登录Linux系统的用户,这里假设是hadoop用户

(3)配置环境变量:

首先,修改/etc/profile配置文件:

vi /etc/profile

export FLUME_HOME=/usr/local/flume
export PATH=$PATH:$FLUME_HOME/bin
export FLUME_CONF_DIR=$FLUME_HOME/conf

使文件生效:

source /etc/profile

下面修改 flume-env.sh 配置文件:

cd /usr/local/flume/conf
cp flume-env.sh.template flume-env.sh
vi flume-env.sh

在文件中增加一行内容,用于设置JAVA_HOME变量:

export JAVA_HOME=/usr/local/servers/jdk

然后,保存flume-env.sh文件,并退出vim编辑器。

(4)查看Flume版本信息:

cd /usr/local/flume
./bin/flume-ng version

2、使用Flume作为Spark Streaming数据源

(1)在“/usr/local/flume/conf”目录下创建两个conf文件:

#/usr/local/flume/conf/avro.conf
  a1.sources = r1
  a1.sinks = k1
  a1.channels = c1

# Describe/configure the source
  a1.sources.r1.type = avro
  a1.sources.r1.channels = c1
  a1.sources.r1.bind = 0.0.0.0
  a1.sources.r1.port = 4141
    #注意这个端口名,在后面的教程中会用得到

# Describe the sink
  a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
  a1.channels.c1.type = memory
  a1.channels.c1.capacity = 1000
  a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
  a1.sources.r1.channels = c1
  a1.sinks.k1.channel = c1
#/usr/local/flume/conf/netcat.conf
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
        #同上,记住该端口名

    # Describe the sink
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

(2)Spark准备工作:

首先,到官网下载spark-streaming-flume_2.11-2.3.4.jar:

https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume

上面的网址要是打不开,可以用下面的这个网址:

https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-flume_2.11

把这个jar文件放到“/usr/local/spark/jars/flume”目录下。

cd /usr/local/spark/jars
mkdir flume
cd flume
cp /usr/local/uploads/spark-streaming-flume_2.11-2.3.4.jar .

然后,修改spark目录下conf/spark-env.sh文件中的SPARK_DIST_CLASSPATH变量。把flume的相关jar包添加到此文件中。

cd /usr/local/spark/conf
vi spark-env.sh
:/usr/local/spark/jars/flume/*:/usr/local/flume/lib/*

这样,Spark环境就准备好了。

(3)创建flume-to-spark.conf

cd /usr/local/flume/conf
vi flume-to-spark.conf
#flume-to-spark.conf: A single-node Flume configuration
        # Name the components on this agent
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # Describe/configure the source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 33333

        # Describe the sink
        a1.sinks.k1.type = avro
        a1.sinks.k1.hostname = localhost
        a1.sinks.k1.port =44444

        # Use a channel which buffers events in memory
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000000
        a1.channels.c1.transactionCapacity = 1000000

        # Bind the source and sink to the channel
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

#说明:
1、Flume suorce类为netcat,绑定到localhost的33333端口,消息可以通过telnet localhost 33333 发送到flume suorce
2、Flume Sink类为avro,绑定44444端口,flume sink通过localhost 44444端口把消息发送出来。而spark streaming程序一直监听44444端口。

#注意!!先不要启动Flume agent,因为44444端口还没打开,sink的消息无处可去,44444端口由spark streaming程序打开。

(4)编写Spark程序使用Flume数据源

A、创建python文件

cd /home/zhanghc/sparkcode
mkdir flume
cd flume
vi FlumeEventCount.py

在FlumeEventCount.py中输入以下代码: 

#/home/zhanghc/sparkcode/flume/FlumeEventCount.py
from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils
import pyspark
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: flume_wordcount.py <hostname> <port>",file=sys.stderr)
        exit(-1)

    sc = SparkContext(appName="FlumeEventCount")
    ssc = StreamingContext(sc,2)

    hostname= sys.argv[1]
    port = int(sys.argv[2])
    stream = FlumeUtils.createStream(ssc,hostname,port,pyspark.StorageLevel.MEMORY_AND_DISK_SER_2)
    stream.count().map(lambda cnt : "Recieve " + str(cnt) +" Flume events!!!!").pprint()

    ssc.start()
    ssc.awaitTermination()

B、测试运行效果

注意:可能需要安装pyspark,命令为:

pip3 install pyspark

 首先,启动Spark streaming程序:

./bin/spark-submit --driver-class-path /usr/local/spark/jars/*:/usr/local/spark/jars/flume/* /home/zhanghc/sparkcode/flume/FlumeEventCount.py localhost 44444

然后,启动一个新的终端,启动Flume Agent:

cd /usr/local/flume
bin/flume-ng agent --conf ./conf --conf-file ./conf/flume-to-spark.conf --name a1 -Dflume.root.logger=INFO,console

最后,再启动一个新的终端连接33333端口:

先要安装telnet:

yum install telnet

最后输入命令连接33333端口:

telnet localhost 33333

现在你可以在最后这个终端里输入一些字符了。在你输入字符后可以看到第一个终端会显示如下的信息:

-------------------------------------------
Time: 1488029430000 ms
-------------------------------------------
Received 1 flume events!!!

原文地址:https://blog.csdn.net/Morse_Chen/article/details/135304286

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读5.3k次,点赞10次,收藏39次。本章详细写了mysql的安装,环境的搭建以及安装时常见的问题和解决办法。_mysql安装及配置超详细教程
文章浏览阅读1.8k次,点赞50次,收藏31次。本篇文章讲解Spark编程基础这门课程的期末大作业,主要围绕Hadoop基本操作、RDD编程、SparkSQL和SparkStreaming编程展开。_直接将第4题的计算结果保存到/user/root/lisi目录中lisipi文件里。
文章浏览阅读7.8k次,点赞9次,收藏34次。ES查询常用语法目录1. ElasticSearch之查询返回结果各字段含义2. match 查询3. term查询4. terms 查询5. range 范围6. 布尔查询6.1 filter加快查询效率的原因7. boosting query(提高查询)8. dis_max(最佳匹配查询)9. 分页10. 聚合查询【内含实际的demo】_es查询语法
文章浏览阅读928次,点赞27次,收藏18次。
文章浏览阅读1.1k次,点赞24次,收藏24次。作用描述分布式协调和一致性协调多个节点的活动,确保一致性和顺序。实现一致性、领导选举、集群管理等功能,确保系统的稳定和可靠性。高可用性和容错性Zookeeper是高可用的分布式系统,通过多个节点提供服务,容忍节点故障并自动进行主从切换。作为其他分布式系统的高可用组件,提供稳定的分布式协调和管理服务,保证系统的连续可用性。配置管理和动态更新作为配置中心,集中管理和分发配置信息。通过订阅机制,实现对配置的动态更新,以适应系统的变化和需求的变化。分布式锁和并发控制。
文章浏览阅读1.5k次,点赞26次,收藏29次。为贯彻执行集团数字化转型的需要,该知识库将公示集团组织内各产研团队不同角色成员的职务“职级”岗位的评定标准;
文章浏览阅读1.2k次,点赞26次,收藏28次。在安装Hadoop之前,需要进行以下准备工作:确认操作系统:Hadoop可以运行在多种操作系统上,包括Linux、Windows和Mac OS等。选择适合你的操作系统,并确保操作系统版本符合Hadoop的要求。安装Java环境:Hadoop是基于Java开发的,因此需要先安装和配置Java环境。确保已经安装了符合Hadoop版本要求的Java Development Kit (JDK),并设置好JAVA_HOME环境变量。确认硬件要求:Hadoop是一个分布式系统,因此需要多台计算机组成集群。
文章浏览阅读974次,点赞19次,收藏24次。# 基于大数据的K-means广告效果分析毕业设计 基于大数据的K-means广告效果分析。
文章浏览阅读1.7k次,点赞6次,收藏10次。Hadoop入门理论
文章浏览阅读1.3w次,点赞28次,收藏232次。通过博客和文献调研整理的一些农业病虫害数据集与算法。_病虫害数据集
文章浏览阅读699次,点赞22次,收藏7次。ZooKeeper使用的是Zab(ZooKeeper Atomic Broadcast)协议,其选举过程基于一种名为Fast Leader Election(FLE)的算法进行。:每个参与选举的ZooKeeper服务器称为一个“Follower”或“Candidate”,它们都有一个唯一的标识ID(通常是一个整数),并且都知道集群中其他服务器的ID。总之,ZooKeeper的选举机制确保了在任何时刻集群中只有一个Leader存在,并通过过半原则保证了即使部分服务器宕机也能维持高可用性和一致性。
文章浏览阅读10w+次,点赞62次,收藏73次。informatica 9.x是一款好用且功能强大的数据集成平台,主要进行各类数据库的管理操作,是使用相当广泛的一款ETL工具(注: ETL就是用来描述将数据从源端经过抽取(extract)、转换(transform)、加载(load)到目的端的过程)。本文主要为大家图文详细介绍Windows10下informatica powercenter 9.6.1安装与配置步骤。文章到这里就结束了,本人是在虚拟机中装了一套win10然后在此基础上测试安装的这些软件,因为工作学习要分开嘛哈哈哈。!!!!!_informatica客户端安装教程
文章浏览阅读7.8w次,点赞245次,收藏2.9k次。111个Python数据分析实战项目,代码已跑通,数据可下载_python数据分析项目案例
文章浏览阅读1.9k次,点赞61次,收藏64次。TDH企业级一站式大数据基础平台致力于帮助企业更全面、更便捷、更智能、更安全的加速数字化转型。通过数年时间的打磨创新,已帮助数千家行业客户利用大数据平台构建核心商业系统,加速商业创新。为了让大数据技术得到更广泛的使用与应用从而创造更高的价值,依托于TDH强大的技术底座,星环科技推出TDH社区版(Transwarp Data Hub Community Edition)版本,致力于为企业用户、高校师生、科研机构以及其他专业开发人员提供更轻量、更简单、更易用的数据分析开发环境,轻松应对各类人员数据分析需求。_星环tdh没有hive
文章浏览阅读836次,点赞21次,收藏19次。
文章浏览阅读1k次,点赞21次,收藏15次。主要介绍ETL相关工作的一些概念和需求点
文章浏览阅读1.4k次。本文以Android、java为开发技术,实现了一个基于Android的博物馆线上导览系统 app。基于Android的博物馆线上导览系统 app的主要使用者分为管理员和用户,app端:首页、菜谱信息、甜品信息、交流论坛、我的,管理员:首页、个人中心、用户管理、菜谱信息管理、菜谱分类管理、甜品信息管理、甜品分类管理、宣传广告管理、交流论坛、系统管理等功能。通过这些功能模块的设计,基本上实现了整个博物馆线上导览的过程。
文章浏览阅读897次,点赞19次,收藏26次。1.背景介绍在当今的数字时代,数据已经成为企业和组织中最宝贵的资源之一。随着互联网、移动互联网和物联网等技术的发展,数据的产生和收集速度也急剧增加。这些数据包括结构化数据(如数据库、 spreadsheet 等)和非结构化数据(如文本、图像、音频、视频等)。这些数据为企业和组织提供了更多的信息和见解,从而帮助他们做出更明智的决策。业务智能(Business Intelligence,BI)...
文章浏览阅读932次,点赞22次,收藏16次。也就是说,一个类应该对自己需要耦合或调用的类知道的最少,类与类之间的关系越密切,耦合度越大,那么类的变化对其耦合的类的影响也会越大,这也是我们面向对象设计的核心原则:低耦合,高内聚。优秀的架构和产品都是一步一步迭代出来的,用户量的不断增大,业务的扩展进行不断地迭代升级,最终演化成优秀的架构。其根本思想是强调了类的松耦合,类之间的耦合越弱,越有利于复用,一个处在弱耦合的类被修改,不会波及有关系的类。缓存,从操作系统到浏览器,从数据库到消息队列,从应用软件到操作系统,从操作系统到CPU,无处不在。
文章浏览阅读937次,点赞22次,收藏23次。大数据可视化是关于数据视觉表现形式的科学技术研究[9],将数据转换为图形或图像在屏幕上显示出来,并进行各种交互处理的理论、方法和技术。将数据直观地展现出来,以帮助人们理解数据,同时找出包含在海量数据中的规律或者信息,更多的为态势监控和综合决策服务。数据可视化是大数据生态链的最后一公里,也是用户最直接感知数据的环节。数据可视化系统并不是为了展示用户的已知的数据之间的规律,而是为了帮助用户通过认知数据,有新的发现,发现这些数据所反映的实质。大数据可视化的实施是一系列数据的转换过程。