java - MapReduce - reducer does not combine keys -
i have simple map reduce job building reverse index.
my mapper works correctly (i checked that) , outputs key pair of word , docid:tfidf value:
mapper (only output shown):
context.write(new intwritable(wordindex), new text(index + ":" + tfidf));
the job of reducer combine these values. implementation:
public static class indexerreducer extends reducer<text, intwritable, intwritable, text> { public void reduce(intwritable key, iterable<text> values, context context) throws ioexception, interruptedexception { stringbuilder sb = new stringbuilder(); (text value : values) { sb.append(value.tostring() + " "); } context.write(key, new text(sb.tostring())); } }
however, not combine , output looks same form mapper. there lines in output same key although reducer supposed combine them - keys in output file supposed unique when using reducer, right?
this sample of how reducer output looks (note simplified example):
1 15:2.1 1 13:4.3 2 9:9.3 2 43:7.9 etc
i expected this:
1 15:2.1 13:4.3 2 9:9.3 43:7.9
for sake of completeness, including run method:
@override public int run(string[] arguments) throws exception { argumentparser parser = new argumentparser("textpreprocessor"); parser.addargument("input", true, true, "specify input directory"); parser.addargument("output", true, true, "specify output directory"); parser.parseandcheck(arguments); path inputpath = new path(parser.getstring("input")); path outputdir = new path(parser.getstring("output")); // create configuration. configuration conf = getconf(); // add distributed file vocabulary distributedcache .addcachefile(new uri("/user/myslima3/vocab.txt"), conf); // create job. job job = new job(conf, "wordcount"); job.setjarbyclass(indexermapper.class); // setup mapreduce. job.setmapperclass(indexermapper.class); job.setreducerclass(indexerreducer.class); // sort output words in reversed order. job.setsortcomparatorclass(wordcountcomparator.class); job.setnumreducetasks(1); // specify (key, value). job.setmapoutputkeyclass(intwritable.class); job.setmapoutputvalueclass(text.class); job.setoutputkeyclass(intwritable.class); job.setoutputvalueclass(text.class); // input. fileinputformat.addinputpath(job, inputpath); job.setinputformatclass(textinputformat.class); // output. fileoutputformat.setoutputpath(job, outputdir); job.setoutputformatclass(textoutputformat.class); filesystem hdfs = filesystem.get(conf); // delete output directory (if exists). if (hdfs.exists(outputdir)) hdfs.delete(outputdir, true); // execute job. return job.waitforcompletion(true) ? 0 : 1; }
i glad hint going on. new map reduce. debugging tips!
always use @override
annotation.
you defined
public static class indexerreducer extends reducer<text, intwritable, intwritable, text>
then reduce method must that
@override public void reduce(text key, iterable<intwritable> values, context context) throws ioexception, interruptedexception
Comments
Post a Comment