aspose file tools*
The moose likes MongoDB and the fly likes Map Reducer with MongoDB hadoop driver throws Exception! Big Moose Saloon
  Search | Java FAQ | Recent Topics | Flagged Topics | Hot Topics | Zero Replies
Register / Login


Win a copy of EJB 3 in Action this week in the EJB and other Java EE Technologies forum!
JavaRanch » Java Forums » Databases » MongoDB
Bookmark "Map Reducer with MongoDB hadoop driver throws Exception!" Watch "Map Reducer with MongoDB hadoop driver throws Exception!" New topic
Author

Map Reducer with MongoDB hadoop driver throws Exception!

Joseph Hwang
Greenhorn

Joined: Aug 17, 2013
Posts: 13
I made mapreduce codes with MongoDB Hadoop Driver. These are my codes.

==== Mapper
public class MapperCarrier extends Mapper<LongWritable, Text, BSONWritable, IntWritable> {

private final static IntWritable outputValue = new IntWritable(1);

private BasicDBObjectBuilder builder = null;

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{

if(key.get() >0) {

String[] columns = value.toString().split(",");

if(columns != null && columns.length >0) {

if(!columns[15].equals("NA")) {
int depDelayTime = Integer.parseInt(columns[15]);

if(depDelayTime>0) {
BasicDBObject list = new BasicDBObject();
list.put("date",columns[0]+columns[1]);

builder = BasicDBObjectBuilder.start().add("carrierCode", columns[8]).add("delayTime", new BasicDBObject("departure",list));

context.write(new BSONWritable(builder.get()), outputValue);
}
}
....
....

==== Reducer
public class ReducerCarrier extends Reducer<BSONWritable, IntWritable, NullWritable, MongoUpdateWritable> {

@Override
public void reduce(BSONWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{

String code = (String)key.getDoc().get("carrierCode");
BSONObject keyType = (BSONObject)key.getDoc().get("delayTime");

BasicDBObject query = new BasicDBObject("carrierCode", code );

int sum = 0;

if(keyType.keySet().contains("departure")) {

query.append("delayTime", "departure");

for(IntWritable value : values) {
sum += value.get();
}
BSONObject time = (BSONObject)keyType.get("departure");
String date = (String)time.get("date");
query.append("delayTime", new BasicDBObject("departure",new BasicDBObject("date", date)));
} else if(keyType.keySet().contains("arrival")) {

query.append("delayTime", "arrival");

for(IntWritable value : values) {
sum += value.get();
}
BSONObject time = (BSONObject)keyType.get("arrival");
String date = (String)time.get("date");
query.append("delayTime", new BasicDBObject("arrival",new BasicDBObject("date", date)));
}

BasicDBObject update = new BasicDBObject("$addToSet", new BasicDBObject("times",new Integer(sum)));

NullWritable nullKey = NullWritable.get();
MongoUpdateWritable muw = new MongoUpdateWritable(query,update,true,false);


context.write(nullKey, muw); // throws exception
}
}

=== mongodb-default.xml
<configuration>
<property>
<name>mongo.job.verbose</name>
<value>true</value>
</property>
<property>
<name>mongo.job.background</name>
<value>false</value>
</property>
<property>
<name>mongo.input.uri</name>
<value><!--mongodb://127.0.0.1:27017/airCarrier.timeDelay--></value>
</property>
<property>
<name>mongo.output.uri</name>
<value>mongodb://127.0.0.1:27017/airCarrier.timeDelay</value>
</property>
<property>
<name>mongo.input.split.read_shard_chunks</name>
<value>false</value>
</property>
<property>
<name>mongo.input.split.create_input_splits</name>
<value>true</value>
</property>
<property>
<name>mongo.input.query</name>
<!--<value>{"x": {"$regex": "^eliot", "$options": ""}}</value>-->
<value></value>
</property>
<property>
<name>mongo.input.key</name>
<value>location</value>
</property>
<property>
<name>mongo.input.fields</name>
<value></value>
</property>
<property>
<name>mongo.input.sort</name>
<value></value>
</property>
<property>
<name>mongo.input.limit</name>
<value>0</value> <!-- 0 == no limit -->
</property>
<property>
<name>mongo.input.skip</name>
<value>0</value> <!-- 0 == no skip -->
</property>
<property>
<name>mapred.input.dir</name>
<value>file:///home/user01/input_tmp</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>file:///home/user01/output_tmp</value>
</property>
<property>
<name>mongo.job.mapper</name>
<value>com.aaa.mongo.MapperCarrier</value>
</property>
<property>
<name>mongo.job.reducer</name>
<value>com.aaa.mongo.ReducerCarrier</value>
</property>
<property>
<name>mongo.job.input.format</name>
<value>org.apache.hadoop.mapreduce.lib.input.TextInputFormat</value>
<!--<value>com.mongodb.hadoop.MongoInputFormat</value>-->
</property>
<property>
<name>mongo.job.output.format</name>
<!--<value>org.apache.hadoop.mapreduce.lib.output.TextOutputFormat</value>-->
<value>com.mongodb.hadoop.MongoOutputFormat</value>
</property>
<property>
<name>mongo.job.mapper.output.key</name>
<value>com.mongodb.hadoop.io.BSONWritable</value> <== I have no idea of this element
</property>
<property>
<name>mongo.job.mapper.output.value</name>
<value>org.apache.hadoop.io.IntWritable</value> <== I have no idea of this element
</property>
<property>
<name>mongo.job.output.key</name>
<value>com.mongodb.hadoop.io.BSONWritable</value> <== I have no idea of this element
</property>
<property>
<name>mongo.job.output.value</name>
<value>org.apache.hadoop.io.IntWritable</value> <== I have no idea of this element
</property>
<property>
<name>mongo.job.combiner</name>
<value>com.aaa.mongo.ReducerCarrier</value>
</property>
<property>
<name>mongo.job.partitioner</name>
<value></value>
</property>
<property>
<name>mongo.job.sort_comparator</name>
<value></value>
</property>
<property>
<name>mongo.input.split_size</name>
<value>8</value>
</property>

</configuration>

Belows are the exception:

Caused by: java.io.IOException: wrong key class: class org.apache.hadoop.io.NullWritable is not class com.mongodb.hadoop.io.BSONWritable
at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:164)
at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1168)
at org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1492)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at com.aaa.mongo.ReducerCarrier.reduce(ReducerCarrier.java:60)
at com.aaa.mongo.ReducerCarrier.reduce(ReducerCarrier.java:1)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1513)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1436)

I need your advice! Thanks in advanced!
 
I agree. Here's the link: http://aspose.com/file-tools
 
subject: Map Reducer with MongoDB hadoop driver throws Exception!
 
Similar Threads
WordCount program giving exception.
Java Hadoop
HADOOP Reducer code not working...!
how to get record from MongoDB
mapreduce giving a wrong count