MapReduce之RecordReader五

文章目录

1. RecordReader概述

RecordReader又叫记录读取器,是用来加载数据并把数据转换为适合mapper读取的键值对。RecordReader实例是由输入格式定义的,默认的输入格式为TextInputFormat,提供了一个LineRecordReader。这个类会把输入文件的每一行作为一个新的值,关联到每一行的键则是该行在文件中的字节偏移量。RecordReader会在输入块上被重复地调用直到整个输入块被处理完毕,每一次调用RecordReader都会调用Mapper的map()方法。

SequenceFileInputFormat对应的RecordReader是SequenceFileRecordReader。LineRecordReader是每行的偏移量作为读入map的key,每行的内容作为读入map的value。很多时候hadoop内置的RecordReader并不能满足需求,比如在读取记录时,希望map读入的key值不是偏移量而是行号或者是文件名,这个时候可以自己定义RecordReader。

2. RecordReader的应用

2.1 RecordReader的实现步骤

  1. 继承抽象类RecordReader,实现RecordReader的一个实例。
  2. 实现自定义的InputFormat类,重写InputFormat中的CreateRecordReader()方法,返回值是自定义的RecordReader实例。

2.2 需求分析

分别计算奇数行和偶数行的累加和。

2.3 上传测试文件

在这里插入图片描述

hadoop fs -put recordread /recordread

2.4 执行代码

MyInputFormat

package com.mapreduce.recordread;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;
import java.nio.file.FileSystem;
import java.nio.file.Path;

public class MyInputFormat extends FileInputFormat<LongWritable, Text> {
    @Override
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        //返回自定义的RecordReader
        return new MyRecordReader();
    }
    //为了使切分数据时行号不发生混乱,这里设置为不进行切分
    protected boolean isSplitable(FileSystem fileSystem, Path filename){
        return false;
    }
}

MyRecordReader

package com.mapreduce.recordread;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;

import java.io.IOException;

public class MyRecordReader extends RecordReader<LongWritable, Text> {

    private long start; //起始位置(相对于整个分片而言)
    private long end; //结束位置(相对于整个分片而言)
    private long pos; //当前位置

    private FSDataInputStream fin = null; //文件输入流
    private LongWritable key = null;
    private Text value = null;
    private LineReader reader = null; //定义行阅读器
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException{
        FileSplit fileSplit = (FileSplit)split; //获取分片
        start = fileSplit.getStart(); //获取起始位置
        end = start + fileSplit.getLength(); //获取结束位置
        Configuration configuration = context.getConfiguration(); //创建配置
        Path path = fileSplit.getPath(); //获取文件路径
        FileSystem fileSystem = path.getFileSystem(configuration); //根据路径获取文件系统
        fin = fileSystem.open(path); //打开文件输入流
        fin.seek(start); //找到开始位置开始读取
        reader = new LineReader(fin); //创建一个行阅读器
        pos = 1; //将位置设为1,从第一行开始记录行号
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if(key == null){
            key = new LongWritable();
        }
        key.set(pos);
        if(value == null){
            value = new Text();
        }
        if(reader.readLine(value) == 0){ //此处的value时用来存储给定的行,而返回值是读取的字节数,包括换行
            //如果只有一个换行也算一行
            return false;
        }
        pos++;
        return true;
    }

    @Override
    public LongWritable getCurrentKey(){return key;}

    @Override
    public Text getCurrentValue() {return value;}

    @Override
    public float getProgress(){return 0;}

    @Override
    public void close() throws IOException{
        if(fin != null) fin.close();
    }
}

MyMapper

package com.mapreduce.recordread;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text> {

    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException{
        //System.out.println("key = "+key+" value = "+value);
        context.write(key, value); //直接将读取的记录写出去
    }
}

MyPartitioner

package com.mapreduce.recordread;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class MyPartitioner extends Partitioner<LongWritable, Text> {
    @Override
    public int getPartition(LongWritable key, Text value, int numPartitions) {
        //偶数放到第二个分区进行计算
        if(key.get()%2 == 0){
            //偶数行将输入到reduce的key设置为1
            key.set(1);
            return 1;
        }else {
            //奇数放在第一个分区进行计算
            key.set(0);
            return 0;
        }
    }
}

MyReducer

package com.mapreduce.recordread;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MyReducer extends Reducer<LongWritable, Text, Text, LongWritable> {
    private Text outKey = new Text();
    private LongWritable outValue = new LongWritable();

    protected void reduce(LongWritable key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException{
        System.out.println("奇数行还是偶数行:"+key);

        long sum = 0;
        for(Text value:values){
            sum += Long.parseLong(value.toString());
        }
        //判断奇偶数
        if(key.get() == 0){
            outKey.set("奇数之和为:");
        }else{
            outKey.set("偶数之和为:");
        }
        outValue.set(sum);
        context.write(outKey, outValue);
    }
}

RecordReaderApp

package com.mapreduce.recordread;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.net.URI;

public class RecordReaderApp {

    private static final String INPUT_PATH = "hdfs://master001:9000/recordreader";
    private static final String  OUTPUT_PATH = "hdfs://master001:9000/recordput";

    public static void main(String[] args) throws Exception{
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        Configuration conf = new Configuration();
        //提升代码的健壮性
        final FileSystem fileSystem = FileSystem.get(URI.create(INPUT_PATH), conf);
        if(fileSystem.exists(new Path(OUTPUT_PATH))){
            fileSystem.delete(new Path(OUTPUT_PATH), true);
        }
        Job job = Job.getInstance(conf, "RecordReaderApp");
        //run jar class 主方法
        job.setJarByClass(RecordReaderApp.class);
        //设置map
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        //设置reduce
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        //设置partition
        job.setPartitionerClass(MyPartitioner.class);
        job.setNumReduceTasks(2);
        //设置input format
        job.setInputFormatClass(MyInputFormat.class);
        FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
        //设置output format
        job.setOutputFormatClass(TextOutputFormat.class);
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
        //提交job
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

2.5 效果截图

在这里插入图片描述

3. 小结

如果程序或者集群出现任何BUG,欢迎下方留言讨论。

原文地址:https://blog.csdn.net/RivenDong/article/details/100825368

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


hadoop搭建准备工作三台虚拟机:master、node1、node2检查时间是否同步:date检查java的jdk是否被安装好:java-version修改主机名三台分别执行vim/etc/hostname并将内容指定为对应的主机名 关闭防火墙:systemctlstopfirewalld   a.查看防火墙状态:systemctlstatu
文件的更名和移动:    获取文件详细信息       遇到的问题:不能直接在web上上传文件。   权限问题:修改后即可正常创建  参考:https://blog.csdn.net/weixin_44575660/article/details/118687993
目录一、背景1)小文件是如何产生的?2)文件块大小设置3)HDFS分块目的二、HDFS小文件问题处理方案1)HadoopArchive(HAR)2)Sequencefile3)CombineFileInputFormat4)开启JVM重用5)合并本地的小文件,上传到HDFS(appendToFile)6)合并HDFS的小文件,下载到本地(getmerge)三、HDFS小文件问题处理实战操
目录一、概述二、HadoopDataNode多目录磁盘配置1)配置hdfs-site.xml2)配置详解1、dfs.datanode.data.dir2、dfs.datanode.fsdataset.volume.choosing.policy3、dfs.datanode.available-space-volume-choosing-policy.balanced-space-preference-fraction4、dfs.datanode.available
平台搭建(伪分布式)伪分布式搭建在VM中搭建std-master修改配置文件centos7-cl1.vmdkstd-master.vmx-将配置文件中vm的版本号改成自己电脑对应的vm版本修改客户端的操作系统为centos764位打开虚拟机修改虚拟机网络cd/etc/sysconfigetwork-scripts
 一、HDFS概述 1.1、HDFS产出背景及定义 1.1.1、HDFS产生背景   随着数据量越来越大,在一个操作系统存不下所有的数据,那么就分配到更多的操作系统管理的磁盘中,但是不方便管理和维护,迫切需要一种系统来管理多台机器上的文件,这就是分布式文件管理系统。HDFS只是分布式
配置workers进入hadoop/etc/hadoop  编辑workers文件  然后分发给另外两个服务器     准备启动集群第一次需要初始化.  初始化完成后增加了data文件,  进入上面那个路径,就能看到当前服务器的版本号  启动HDFS  启动完毕102  
这周我对ssm框架进行了更深一步的开发,加入了多用户,并对除登录外的请求进行了拦截,这样用户在未登录的时候是访问不到资源的。并且对hadoop进行了初步的学习,包括虚拟机的安装等等。下周会对hadoop进行更深一步的学习,加油! 
前言通过在Hadoop1安装Hadoop,然后配置相应的配置文件,最后将Hadoop所有文件同步到其他Hadoop节点。一、集群规划#主机名‘master/hadoop1’‘slave01/hadoop2’‘slave02/hadoop3’#启动节点NamenodeNodemanagerNodemanager
1.先杀死进程(先进入到hadoop版本文件里,我的是/opt/module/hadoop-3.1.3/)sbin/stop-dfs.sh2.删除每个集群上的data以及logsrm-rfdata/logs/3.格式化hdfsnamenode-format4.再启动sbin/sart-dfs.sh
查看文件目录的健康信息执行如下的命令:hdfsfsck/user/hadoop-twq/cmd可以查看/user/hadoop-twq/cmd目录的健康信息:其中有一个比较重要的信息,就是Corruptblocks,表示损坏的数据块的数量查看文件中损坏的块(-list-corruptfileblocks)[hadoop-twq@master~]
titlecopyrightdatetagscategoriesHadoop2.8.0的环境搭建true2019-08-0912:12:44-0700LiunxHadoopLiunxHadoop此文为在centos7下安装Hadoop集群前期准备Hadoop下载Hadoop的下载本文下载的是2.8.0版本的Hadoop安装3个虚拟机并实现ssh免密码的登录
这是我的地图publicstaticclassMapClassextendsMapper<LongWritable,Text,Text,Text>{publicvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{String[]fields=value.toString().s
组件:Hadoop三大核心组件:HDFS(HadoopDistributedFileSystem):分布式文件系统,数据存放在这里,提供对应用程序数据的高吞吐量访问。YARN(YetAnotherResourceNegotiator):资源管理调度系统,分配比如硬盘内存等资源。用这些资源来运行程序的计算MapReduce:分布式运算框架
查看Hadoop安全模式hadoopdfsadmin-safemodegetSafemodeisOFF进入Hadoop安全模式root@centos:/$hadoopdfsadmin-safemodeenter SafemodeisON推出安全模式nange@ubuntu:/$hadoopdfsadmin-safemodeleave SafemodeisOFF
当我尝试运行sqoop命令时,我收到错误,说没有连接字符串的管理器我尝试运行的内容:sqoopexport--connect"jdbc:vertica://xxxxxxxx.com:5433/PPS_GIIA"--usernamexxxxx--passwordxxxxx--tableCountry-m1--export-dir/Eservices/SIPOC/SQLimport/part-m-0000--
好程序员大数据学习路线Hadoop学习干货分享,ApacheHadoop为可靠的,可扩展的分布式计算开发开源软件。ApacheHadoop软件库是一个框架,它允许使用简单的编程模型跨计算机群集分布式处理大型数据集(海量的数据)。包括这些模块:HadoopCommon:支持其他Hadoop模块的常用工具。Hadoop
我正在使用java,我正在尝试编写一个mapreduce,它将接收一个包含多个gz文件的文件夹.我一直在寻找,但我发现的所有教程都放弃了如何处理简单的文本文件,但没有找到解决我问题的任何东西.我在我的工作场所问过,但只提到scala,我并不熟悉.任何帮助,将不胜感激.解决方法:Hadoop检查
linux下开机自启:在/etc/init.d目录下新建文件elasticsearch并敲入shell脚本:#!/bin/sh#chkconfig:23458005#description:elasticsearchexportJAVA_HOME=/home/hadoop/jdk/jdk1.8.0_172exportJAVA_BIN=/home/hadoop/jdk/jdk1.8.0_172/binexportPATH=$PATH:$JAVA_HOME/bi
离线数据处理的主要工具Hive是必须极其熟练地掌握和精通的,但Hive背后是Hadoop的HDFS和M叩Reduce,需要会MapReduce编程么?从笔者的工作实践以及了解来看,这不是必须掌握的,但是数据开发人员必须掌握其概念、架构和工作原理,也就是说,不但要知其然,而且要知其所以然。1.起源