Two Laptop Bag*
The moose likes MongoDB and the fly likes Map Reducer with MongoDB hadoop driver throws Exception! Big Moose Saloon
  Search | Java FAQ | Recent Topics | Flagged Topics | Hot Topics | Zero Replies
Register / Login


Win a copy of Android Security Essentials Live Lessons this week in the Android forum!
JavaRanch » Java Forums » Databases » MongoDB
Bookmark "Map Reducer with MongoDB hadoop driver throws Exception!" Watch "Map Reducer with MongoDB hadoop driver throws Exception!" New topic
Author

Map Reducer with MongoDB hadoop driver throws Exception!

Joseph Hwang
Greenhorn

Joined: Aug 17, 2013
Posts: 14
I made mapreduce codes with MongoDB Hadoop Driver. These are my codes.

==== Mapper
public class MapperCarrier extends Mapper<LongWritable, Text, BSONWritable, IntWritable> {

private final static IntWritable outputValue = new IntWritable(1);

private BasicDBObjectBuilder builder = null;

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{

if(key.get() >0) {

String[] columns = value.toString().split(",");

if(columns != null && columns.length >0) {

if(!columns[15].equals("NA")) {
int depDelayTime = Integer.parseInt(columns[15]);

if(depDelayTime>0) {
BasicDBObject list = new BasicDBObject();
list.put("date",columns[0]+columns[1]);

builder = BasicDBObjectBuilder.start().add("carrierCode", columns[8]).add("delayTime", new BasicDBObject("departure",list));

context.write(new BSONWritable(builder.get()), outputValue);
}
}
....
....

==== Reducer
public class ReducerCarrier extends Reducer<BSONWritable, IntWritable, NullWritable, MongoUpdateWritable> {

@Override
public void reduce(BSONWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{

String code = (String)key.getDoc().get("carrierCode");
BSONObject keyType = (BSONObject)key.getDoc().get("delayTime");

BasicDBObject query = new BasicDBObject("carrierCode", code );

int sum = 0;

if(keyType.keySet().contains("departure")) {

query.append("delayTime", "departure");

for(IntWritable value : values) {
sum += value.get();
}
BSONObject time = (BSONObject)keyType.get("departure");
String date = (String)time.get("date");
query.append("delayTime", new BasicDBObject("departure",new BasicDBObject("date", date)));
} else if(keyType.keySet().contains("arrival")) {

query.append("delayTime", "arrival");

for(IntWritable value : values) {
sum += value.get();
}
BSONObject time = (BSONObject)keyType.get("arrival");
String date = (String)time.get("date");
query.append("delayTime", new BasicDBObject("arrival",new BasicDBObject("date", date)));
}

BasicDBObject update = new BasicDBObject("$addToSet", new BasicDBObject("times",new Integer(sum)));

NullWritable nullKey = NullWritable.get();
MongoUpdateWritable muw = new MongoUpdateWritable(query,update,true,false);


context.write(nullKey, muw); // throws exception
}
}

=== mongodb-default.xml
<configuration>
<property>
<name>mongo.job.verbose</name>
<value>true</value>
</property>
<property>
<name>mongo.job.background</name>
<value>false</value>
</property>
<property>
<name>mongo.input.uri</name>
<value><!--mongodb://127.0.0.1:27017/airCarrier.timeDelay--></value>
</property>
<property>
<name>mongo.output.uri</name>
<value>mongodb://127.0.0.1:27017/airCarrier.timeDelay</value>
</property>
<property>
<name>mongo.input.split.read_shard_chunks</name>
<value>false</value>
</property>
<property>
<name>mongo.input.split.create_input_splits</name>
<value>true</value>
</property>
<property>
<name>mongo.input.query</name>
<!--<value>{"x": {"$regex": "^eliot", "$options": ""}}</value>-->
<value></value>
</property>
<property>
<name>mongo.input.key</name>
<value>location</value>
</property>
<property>
<name>mongo.input.fields</name>
<value></value>
</property>
<property>
<name>mongo.input.sort</name>
<value></value>
</property>
<property>
<name>mongo.input.limit</name>
<value>0</value> <!-- 0 == no limit -->
</property>
<property>
<name>mongo.input.skip</name>
<value>0</value> <!-- 0 == no skip -->
</property>
<property>
<name>mapred.input.dir</name>
<value>file:///home/user01/input_tmp</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>file:///home/user01/output_tmp</value>
</property>
<property>
<name>mongo.job.mapper</name>
<value>com.aaa.mongo.MapperCarrier</value>
</property>
<property>
<name>mongo.job.reducer</name>
<value>com.aaa.mongo.ReducerCarrier</value>
</property>
<property>
<name>mongo.job.input.format</name>
<value>org.apache.hadoop.mapreduce.lib.input.TextInputFormat</value>
<!--<value>com.mongodb.hadoop.MongoInputFormat</value>-->
</property>
<property>
<name>mongo.job.output.format</name>
<!--<value>org.apache.hadoop.mapreduce.lib.output.TextOutputFormat</value>-->
<value>com.mongodb.hadoop.MongoOutputFormat</value>
</property>
<property>
<name>mongo.job.mapper.output.key</name>
<value>com.mongodb.hadoop.io.BSONWritable</value> <== I have no idea of this element
</property>
<property>
<name>mongo.job.mapper.output.value</name>
<value>org.apache.hadoop.io.IntWritable</value> <== I have no idea of this element
</property>
<property>
<name>mongo.job.output.key</name>
<value>com.mongodb.hadoop.io.BSONWritable</value> <== I have no idea of this element
</property>
<property>
<name>mongo.job.output.value</name>
<value>org.apache.hadoop.io.IntWritable</value> <== I have no idea of this element
</property>
<property>
<name>mongo.job.combiner</name>
<value>com.aaa.mongo.ReducerCarrier</value>
</property>
<property>
<name>mongo.job.partitioner</name>
<value></value>
</property>
<property>
<name>mongo.job.sort_comparator</name>
<value></value>
</property>
<property>
<name>mongo.input.split_size</name>
<value>8</value>
</property>

</configuration>

Belows are the exception:

Caused by: java.io.IOException: wrong key class: class org.apache.hadoop.io.NullWritable is not class com.mongodb.hadoop.io.BSONWritable
at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:164)
at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1168)
at org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1492)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at com.aaa.mongo.ReducerCarrier.reduce(ReducerCarrier.java:60)
at com.aaa.mongo.ReducerCarrier.reduce(ReducerCarrier.java:1)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1513)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1436)

I need your advice! Thanks in advanced!
 
It is sorta covered in the JavaRanch Style Guide.
 
subject: Map Reducer with MongoDB hadoop driver throws Exception!
 
Similar Threads
WordCount program giving exception.
HADOOP Reducer code not working...!
how to get record from MongoDB
mapreduce giving a wrong count
Java Hadoop