如何解决如何解决mapreduce运行时错误“所有收集器初始化失败”
我的hadoop版本是2.7.7
我编写了一个mapreduce程序来获取哪个记录器是最新的。 这样的记录
a01b8439e1e42ffcd286241b04d9b1b5,f11440a64a0f084fe346a398c62aa9ad,1475277482,108.92466,34.27657 a01b8439e1e42ffcd286241b04d9b1b5,f11440a64a0f084fe346a398c62aa9ad,1475277488,108.92527,34.27658 a01b8439e1e42ffcd286241b04d9b1b5,f11440a64a0f084fe346a398c62aa9ad,1475277506,108.9276,34.27659 a01b8439e1e42ffcd286241b04d9b1b5,f11440a64a0f084fe346a398c62aa9ad,1475277476,108.92399,34.27655 a01b8439e1e42ffcd286241b04d9b1b5,f11440a64a0f084fe346a398c62aa9ad,1475277515,108.9291,34.2766 a01b8439e1e42ffcd286241b04d9b1b5,f11440a64a0f084fe346a398c62aa9ad,1475277512,108.92859,34.2766 a01b8439e1e42ffcd286241b04d9b1b5,f11440a64a0f084fe346a398c62aa9ad,1475277497,108.92627,34.27659 a01b8439e1e42ffcd286241b04d9b1b5,f11440a64a0f084fe346a398c62aa9ad,1475277509,108.92809,34.27659 a01b8439e1e42ffcd286241b04d9b1b5,f11440a64a0f084fe346a398c62aa9ad,1475277500,108.92667,34.27659 a01b8439e1e42ffcd286241b04d9b1b5,f11440a64a0f084fe346a398c62aa9ad,1475277491,108.92561,34.27658 a01b8439e1e42ffcd286241b04d9b1b5,f11440a64a0f084fe346a398c62aa9ad,1475277479,108.92434,34.27657
dirver_id,order_id,unix_time,经度,纬度
我想获取每个订单的最后一个编码器(按unix_tim排序)
我遇到这个问题
20/09/26 16:45:32 INFO client.RMProxy:在master / 10.251.254.88:8032 20/09/26 16:45:33 WARN上连接到ResourceManager mapreduce.JobResourceUploader:Hadoop命令行选项无法解析 执行。实施工具界面并执行您的应用程序 与ToolRunner一起解决这个问题。 20/09/26 16:45:33信息 input.FileInputFormat:要处理的总输入路径:1 20/09/26 16:46:43 INFO hdfs.DFSClient:createBlockOutputStream中的异常 java.io.IOException:得到了错误,状态消息,带有firstBadLink的确认 如10.251.253.236:50010 org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:140)
在org.apache.hadoop.hdfs.DFSOutputStream $ DataStreamer.createBlockOutputStream(DFSOutputStream.java:1497)
在org.apache.hadoop.hdfs.DFSOutputStream $ DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1400) 在org.apache.hadoop.hdfs.DFSOutputStream $ DataStreamer.run(DFSOutputStream.java:554) 20/09/26 16:46:43 INFO hdfs.DFSClient:放弃 BP-1227587342-10.251.254.88-1600940262422:blk_1073742778_1954 20/09/26 16:46:43 INFO hdfs.DFSClient:不包括datanode DatanodeInfoWithStorage [10.251.253.236:50010,DS-21c974dd-296a-489d-8512-0e13947176b2,DISK] 20/09/26 16:46:43警告hdfs.DFSClient:缓慢的waitForAckedSeqno被接受 70082ms(阈值= 30000ms)20/09/26 16:46:43信息 mapreduce.JobSubmitter:分割数:21 20/09/26 16:46:43 INFO mapreduce.JobSubmitter:提交作业令牌: job_1600941960831_0027 20/09/26 16:46:44 INFO impl.YarnClientImpl: 提交的申请application_1600941960831_0027 20/09/26 16:46:44 INFO mapreduce.Job:跟踪作业的网址: http:// master:8088 / proxy / application_1600941960831_0027 / 20/09/26 16:46:44 INFO mapreduce。工作:正在运行的工作:job_1600941960831_0027 20/09/26 16:47:38 INFO mapreduce。职位:工作job_1600941960831_0027 在超级模式下运行:false 20/09/26 16:47:38 INFO mapreduce.Job: map 0%减少0%20/09/26 16:47:46 INFO mapreduce.Job:任务ID: try_1600941960831_0027_m_000005_0,状态:FAILED错误: java.io.IOException:所有收集器的初始化失败。 最后一个收集器的错误是位于的类:cn.mikyan.OrdersDriver org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:414) 在org.apache.hadoop.mapred.MapTask.access $ 100(MapTask.java:81)在 org.apache.hadoop.mapred.MapTask $ NewOutputCollector。(MapTask.java:698) 在org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:770) 在org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)处 org.apache.hadoop.mapred.YarnChild $ 2.run(YarnChild.java:164)在 java.security.AccessController.doPrivileged(本机方法),位于 javax.security.auth.Subject.doAs(Subject.java:422)在 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762) 在org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)导致 创建人:java.lang.ClassCastException:类cn.mikyan.OrdersDriver位于 java.lang.Class.asSubclass(Class.java:3404)在 org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:887) 在 org.apache.hadoop.mapred.MapTask $ MapOutputBuffer.init(MapTask.java:1004) 在 org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:402) ...还有9个
被ApplicationMaster杀死的容器。集装箱被击on 请求。退出代码为143容器退出,退出代码非零 143
我的代码是这个,很抱歉
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import cn.mikyan.writable.Orders;
public class OrderGrouping extends WritableComparator{
public OrderGrouping() {
super(Orders.class,true);
}
@Override
public int compare(WritableComparable a,WritableComparable b) {
Orders acc1 = (Orders)a;
Orders acc2 = (Orders)b;
System.out.println("3333333333333333333333333333333333333333333333333333333333333333333333333");
return acc1.getOrderid().compareTo(acc2.getOrderid());
//以订单ID 来决定分组依据
}
}
package cn.mikyan.mapper;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import cn.mikyan.writable.Orders;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringTokenizer;
public class LogCleanMapper extends Mapper<Object,Text,Object,NullWritable> {
private String[] fields;
Orders orders=new Orders();
private String driverid; //司机的id
private String orderid; //订单id
private String time; //unix时间戳 unixtime
private String xp; //经度
private String yp; //维度
private Orders order;
@Override
protected void map(Object key,Text value,Context context) throws IOException,InterruptedException {
fields = value.toString().split(",");
if (fields == null || fields.length != 5) { // 有异常数据
return;
}
driverid = fields[0];
orderid = fields[1];
time = fields[2];
xp = fields[3];
yp = fields[4];
System.out.println("3333333333333333333333333333333333333333333333333333");
order=new Orders(driverid,orderid,time,xp,yp);
System.out.println(order.toString());
context.write(order,NullWritable.get());
}
}
package cn.mikyan.reduce;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import cn.mikyan.writable.Orders;
public class OrderReducer extends Reducer<Object,NullWritable> {
private String orderId;
protected void reduce(Orders key,Iterable<NullWritable> values,Context context)
throws IOException,InterruptedException {
for(NullWritable v2:values){
context.write(key,NullWritable.get());
break;//默认是升序
}
}
}
package cn.mikyan.writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class Orders implements WritableComparable<Orders> {
private String driverid; // driver_id
private String orderid; // order_id
private String time; // unix_time
private String xp; // longitude
private String yp; // latitude
public Orders() {
}
public Orders(String driverid,String orderid,String time,String xp,String yp) {
this.driverid = driverid;
this.orderid = orderid;
this.time = time;
this.xp = xp;
this.yp = yp;
}
@Override
public void write(DataOutput out) throws IOException {// ***
out.writeUTF(driverid); // 序列化
out.writeUTF(orderid);
out.writeUTF(time);
out.writeUTF(xp);
out.writeUTF(yp);
}
@Override
public void readFields(DataInput in) throws IOException {// ***
driverid = in.readUTF(); // 序列化
orderid = in.readUTF();
time = in.readUTF();
xp = in.readUTF();
yp = in.readUTF();
}
@Override
public int compareTo(Orders o) { // *****
System.out.println("44444444444444444444444444444444444444444444444444444");
int tem = orderid.compareTo(o.orderid);
if (tem == 0) { // 如果订单ID相同的话,就 进行cost的比较
//time.compareTo(o.time)>0
//(cost > o.getCost()
if (time.compareTo(o.time)>0){
return -1;// -1:降序 1:升序
}
return 1;
}
return tem;
}
@Override
public String toString() {
return driverid + "," + orderid + "," + time + "," + xp + "," + yp;
}
public String getDriverid() {
return driverid;
}
public void setDriverid(String driverid) {
this.driverid = driverid;
}
public String getOrderid() {
return orderid;
}
public void setOrderid(String orderid) {
this.orderid = orderid;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
public String getXp() {
return xp;
}
public void setXp(String xp) {
this.xp = xp;
}
public String getYp() {
return yp;
}
public void setYp(String yp) {
this.yp = yp;
}
}
package cn.mikyan;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import cn.mikyan.Group.OrderGrouping;
import cn.mikyan.mapper.LogCleanMapper;
import cn.mikyan.reduce.OrderReducer;
public class OrdersDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapred.job.queuename","order");
Job job = Job.getInstance(conf);
job.setJarByClass(OrdersDriver.class);
job.setJobName("Sencondary Sort");
job.setMapperClass(LogCleanMapper.class);
job.setCombinerClass(OrderReducer.class);
job.setReducerClass(OrderReducer.class);
job.setOutputKeyClass(OrdersDriver.class);
job.setOutputValueClass(NullWritable.class);
job.setGroupingComparatorClass(OrderGrouping.class);
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
Path outfile = new Path(otherArgs[1]);
FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job,outfile);
//Path outfile=new Path("/MaxID/out");
FileSystem fs = outfile.getFileSystem(conf);
if(fs.exists(outfile)){
fs.delete(outfile,true);
}
System.exit(job.waitForCompletion(true)?0:1);
}
}
希望您的回答非常感谢。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。