Win a copy of Mesos in Action this week in the Cloud/Virtualizaton forum!
  • Post Reply
  • Bookmark Topic Watch Topic
  • New Topic

Map Reducer with MongoDB hadoop driver throws Exception!

 
Joseph Hwang
Greenhorn
Posts: 17
  • Mark post as helpful
  • send pies
  • Quote
  • Report post to moderator
I made mapreduce codes with MongoDB Hadoop Driver. These are my codes.

==== Mapper
public class MapperCarrier extends Mapper<LongWritable, Text, BSONWritable, IntWritable> {

private final static IntWritable outputValue = new IntWritable(1);

private BasicDBObjectBuilder builder = null;

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{

if(key.get() >0) {

String[] columns = value.toString().split(",");

if(columns != null && columns.length >0) {

if(!columns[15].equals("NA")) {
int depDelayTime = Integer.parseInt(columns[15]);

if(depDelayTime>0) {
BasicDBObject list = new BasicDBObject();
list.put("date",columns[0]+columns[1]);

builder = BasicDBObjectBuilder.start().add("carrierCode", columns[8]).add("delayTime", new BasicDBObject("departure",list));

context.write(new BSONWritable(builder.get()), outputValue);
}
}
....
....

==== Reducer
public class ReducerCarrier extends Reducer<BSONWritable, IntWritable, NullWritable, MongoUpdateWritable> {

@Override
public void reduce(BSONWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{

String code = (String)key.getDoc().get("carrierCode");
BSONObject keyType = (BSONObject)key.getDoc().get("delayTime");

BasicDBObject query = new BasicDBObject("carrierCode", code );

int sum = 0;

if(keyType.keySet().contains("departure")) {

query.append("delayTime", "departure");

for(IntWritable value : values) {
sum += value.get();
}
BSONObject time = (BSONObject)keyType.get("departure");
String date = (String)time.get("date");
query.append("delayTime", new BasicDBObject("departure",new BasicDBObject("date", date)));
} else if(keyType.keySet().contains("arrival")) {

query.append("delayTime", "arrival");

for(IntWritable value : values) {
sum += value.get();
}
BSONObject time = (BSONObject)keyType.get("arrival");
String date = (String)time.get("date");
query.append("delayTime", new BasicDBObject("arrival",new BasicDBObject("date", date)));
}

BasicDBObject update = new BasicDBObject("$addToSet", new BasicDBObject("times",new Integer(sum)));

NullWritable nullKey = NullWritable.get();
MongoUpdateWritable muw = new MongoUpdateWritable(query,update,true,false);


context.write(nullKey, muw); // throws exception
}
}

=== mongodb-default.xml
<configuration>
<property>
<name>mongo.job.verbose</name>
<value>true</value>
</property>
<property>
<name>mongo.job.background</name>
<value>false</value>
</property>
<property>
<name>mongo.input.uri</name>
<value><!--mongodb://127.0.0.1:27017/airCarrier.timeDelay--></value>
</property>
<property>
<name>mongo.output.uri</name>
<value>mongodb://127.0.0.1:27017/airCarrier.timeDelay</value>
</property>
<property>
<name>mongo.input.split.read_shard_chunks</name>
<value>false</value>
</property>
<property>
<name>mongo.input.split.create_input_splits</name>
<value>true</value>
</property>
<property>
<name>mongo.input.query</name>
<!--<value>{"x": {"$regex": "^eliot", "$options": ""}}</value>-->
<value></value>
</property>
<property>
<name>mongo.input.key</name>
<value>location</value>
</property>
<property>
<name>mongo.input.fields</name>
<value></value>
</property>
<property>
<name>mongo.input.sort</name>
<value></value>
</property>
<property>
<name>mongo.input.limit</name>
<value>0</value> <!-- 0 == no limit -->
</property>
<property>
<name>mongo.input.skip</name>
<value>0</value> <!-- 0 == no skip -->
</property>
<property>
<name>mapred.input.dir</name>
<value>file:///home/user01/input_tmp</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>file:///home/user01/output_tmp</value>
</property>
<property>
<name>mongo.job.mapper</name>
<value>com.aaa.mongo.MapperCarrier</value>
</property>
<property>
<name>mongo.job.reducer</name>
<value>com.aaa.mongo.ReducerCarrier</value>
</property>
<property>
<name>mongo.job.input.format</name>
<value>org.apache.hadoop.mapreduce.lib.input.TextInputFormat</value>
<!--<value>com.mongodb.hadoop.MongoInputFormat</value>-->
</property>
<property>
<name>mongo.job.output.format</name>
<!--<value>org.apache.hadoop.mapreduce.lib.output.TextOutputFormat</value>-->
<value>com.mongodb.hadoop.MongoOutputFormat</value>
</property>
<property>
<name>mongo.job.mapper.output.key</name>
<value>com.mongodb.hadoop.io.BSONWritable</value> <== I have no idea of this element
</property>
<property>
<name>mongo.job.mapper.output.value</name>
<value>org.apache.hadoop.io.IntWritable</value> <== I have no idea of this element
</property>
<property>
<name>mongo.job.output.key</name>
<value>com.mongodb.hadoop.io.BSONWritable</value> <== I have no idea of this element
</property>
<property>
<name>mongo.job.output.value</name>
<value>org.apache.hadoop.io.IntWritable</value> <== I have no idea of this element
</property>
<property>
<name>mongo.job.combiner</name>
<value>com.aaa.mongo.ReducerCarrier</value>
</property>
<property>
<name>mongo.job.partitioner</name>
<value></value>
</property>
<property>
<name>mongo.job.sort_comparator</name>
<value></value>
</property>
<property>
<name>mongo.input.split_size</name>
<value>8</value>
</property>

</configuration>

Belows are the exception:

Caused by: java.io.IOException: wrong key class: class org.apache.hadoop.io.NullWritable is not class com.mongodb.hadoop.io.BSONWritable
at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:164)
at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1168)
at org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1492)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at com.aaa.mongo.ReducerCarrier.reduce(ReducerCarrier.java:60)
at com.aaa.mongo.ReducerCarrier.reduce(ReducerCarrier.java:1)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1513)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1436)

I need your advice! Thanks in advanced!
 
  • Post Reply
  • Bookmark Topic Watch Topic
  • New Topic