wood burning stoves 2.0*
The moose likes Hadoop and the fly likes Java Hadoop Big Moose Saloon
  Search | Java FAQ | Recent Topics | Flagged Topics | Hot Topics | Zero Replies
Register / Login
JavaRanch » Java Forums » Databases » Hadoop
Bookmark "Java Hadoop" Watch "Java Hadoop" New topic
Author

Java Hadoop

Prats Shah
Greenhorn

Joined: Feb 28, 2013
Posts: 9
Compiler is not even going inside Reducer's code. Can I know why is this happening?

package counter;

import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Iterator;
import java.util.Scanner;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCount {
public static class Map extends
Mapper<LongWritable, Text, Text, Text>
{

private Text word = new Text();

public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
int positiveCount=0,negativeCount=0,neutralCount=0,tokenCount=0;
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);

Scanner scanner1; // For positive words file
Scanner scanner2; //For negative words File
String timeStamp="";
while (tokenizer.hasMoreTokens())
{

scanner1 = new Scanner(new File("/home/hadoop/Downloads/positive.txt")); //Path

scanner2 = new Scanner(new File("/home/hadoop/Downloads/negative.txt")); //Path

String tempToken=tokenizer.nextToken();

tokenCount++;

if(tokenCount<4 || (tokenCount>=5 && tokenCount<=6)) // Because 4th Token is TimeStamp and from 7th onwards its Tweets
continue;
if(tokenCount==4) // Gives TimeStamp
timeStamp=tempToken;
if(tokenCount>=7) // Tweets starts from 7th Token
{
neutralCount++;

while (scanner1.hasNextLine()) // For Positive words File
{
String nextLine1 = scanner1.nextLine();
if (nextLine1.equalsIgnoreCase(tempToken))
{
positiveCount++;
neutralCount--; // To neutralise above increment
break; // If word fetching successful break
}
}
while (scanner2.hasNextLine()) // For Negative words File
{
String nextLine2 = scanner2.nextLine();
if (nextLine2.equalsIgnoreCase(tempToken))
{
negativeCount++;
neutralCount--; // To neutralise above increment
break; // If word fetching successful break
}
}
}

}
String ts=""; // Temporary TimeStamp to truncate seconds ( 12:48:53=>12:48 )
StringTokenizer tt=new StringTokenizer(timeStamp,":"); // Temporary Tokenizer for ts
// System.out.println(timeStamp+" "+positiveCount+" "+neutralCount+" "+negativeCount);
try
{
ts=tt.nextToken()+":"+tt.nextToken(); // 12:48

}
catch(Exception ex)
{

}

word.set(ts);

value=new Text(Integer.toString(positiveCount)+"."+Integer.toString(neutralCount)+"."+Integer.toString(negativeCount));
// Value=>2,2,1 (Means 2 Positive, 2 Neutral and 1 Negative)

//System.out.println(value);

context.write(word, value);// Mapper writing Key Value to shared area
}

}
public static class Reduce extends
Reducer<Text, Iterator<Text>, Text, Text> {

public void reduce(Text key, Iterator<Text> values,
Context context) throws IOException, InterruptedException
{

int positive=0,negative=0,neutral=0; //Positive,Neutral and Negative Count
int ps,ng,nt; // Temporary Positive Neutral and Negative
String temp="";

while(values.hasNext())
{
String tempValue=values.next().toString(); // Temporary String from values
StringTokenizer tempToken=new StringTokenizer(tempValue,".");//Separated based on '.' See Mapper's Output

ps=Integer.valueOf(tempToken.nextToken()); //Temporary for comparison
nt=Integer.valueOf(tempToken.nextToken()); //Temporary for comparison
ng=Integer.valueOf(tempToken.nextToken()); //Temporary for comparison

if(ps>ng)
positive++;
else if (ng>ps)
negative++;
else
neutral++;

}
// Now positive, neutral and negative gives total counts for same Keys as for example 12:48 12,3,7

if(positive>negative)
temp="1.0.0"; // Finally Maximum Positive Tweets at specific TimeStamp
else if(negative>positive)
temp="0.0.1"; // Finally Maximum Negative Tweets at specific TimeStamp
else
temp="0.1.0"; // Finally Maximum Neutral Tweets at specific TimeStamp

//String output=Integer.toString(positive)+"."+Integer.toString(neutral)+"."+Integer.toString(negative);

context.write(key, new Text(temp));

}
}

public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();

Job job = new Job(conf, "wordcount");

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.addInputPath(job, new Path("counterinput"));
// Erase previous run output (if any)
FileSystem.get(conf).delete(new Path("counteroutput"), true);
FileOutputFormat.setOutputPath(job, new Path("counteroutput"));

job.waitForCompletion(true);
}

}
Hussein Baghdadi
clojure forum advocate
Bartender

Joined: Nov 08, 2003
Posts: 3479

Please, don't duplicate your question.

How did you know that the compiler isn't even going through the reducer's code?
 
wood burning stoves
 
subject: Java Hadoop