如何解决Hadoop HDFS MapReduce输出到MongoDb错误
我从网上学到了演示,并更改了一些细节,但出现错误 我有两台机器,一个主机和一个worker1,我在github中使用mongo-hadoop
java.lang.Exception: java.lang.IllegalArgumentException: Wrong FS:
file:/data/hadoop/tmp/attempt_local1130820149_0001_r_000000_0/_MONGO_OUT_TEMP/_out,expected:
hdfs://192.168.56.200:9000
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:492)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:559)
Caused by: java.lang.IllegalArgumentException: Wrong FS:
file:/data/hadoop/tmp/attempt_local1130820149_0001_r_000000_0/_MONGO_OUT_TEMP/_out,expected:
hdfs://192.168.56.200:9000
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:730)
at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:233)
at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:530)
at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:527)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:541)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:468)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1118)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1098)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:987)
at com.mongodb.hadoop.output.MongoRecordWriter.<init>(MongoRecordWriter.java:61)
at com.mongodb.hadoop.mapred.output.MongoRecordWriter.<init>(MongoRecordWriter.java:35)
at com.mongodb.hadoop.mapred.MongoOutputFormat.getRecordWriter(MongoOutputFormat.java:45)
at org.apache.hadoop.mapred.ReduceTask$OldTrackingRecordWriter.<init>(ReduceTask.java:485)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:415)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:393)
at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:347)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
文件:/ data / hadoop / tmp / attempt_local1130820149_0001_r_000000_0 / _MONGO_OUT_TEMP / _out,预期值:hdfs://192.168.56.200:9000
我搜索了很多,其他人的错误对我来说是正确的
这是我的代码
import com.mongodb.hadoop.io.BSONWritable;
import com.mongodb.hadoop.mapred.MongoOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
public class Experiment extends Configured implements Tool{
public int run(final String[] args) throws Exception {
final Configuration conf = getConf();
conf.set("mongo.output.uri",args[1]);
final JobConf job = new JobConf(conf);
FileInputFormat.setInputPaths(job,new Path(args[0]));
job.setJarByClass(Experiment.class);
job.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputFormat(MongoOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BSONWritable.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
JobClient.runJob(job);
return 0;
}
public static class MyMapper extends MapReduceBase implements Mapper {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(Object key,Object value,OutputCollector output,Reporter reporter) throws IOException {
String line = value.toString();
System.out.println(line);
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word,one);
}
}
}
public static class MyReducer extends MapReduceBase implements Reducer<Text,IntWritable,Text,IntWritable> {
public void reduce(Text key,Iterator<IntWritable> values,OutputCollector<Text,IntWritable> output,Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key,new IntWritable(sum));
}
}
public static void main(final String[] args) throws Exception{
int res = ToolRunner.run(new Experiment(),args);
System.exit(res);
}
}
我的cmd在这里
Hm.jar Experiment mongo-hadoop/* mongodb://127.0.0.1:27017/company.employees
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。