How to find the word count in a file using hadoop mapreduce functionality and partition the data based on words starting character in best way?
package com.orienit.hadoop.training.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WordCountJob implements Tool {
private Configuration conf;
@Override
public Configuration getConf() {
return conf; // getting the configuration
}
@Override
public void setConf(Configuration conf) {
this.conf = conf; // setting the configuration
}
@Override
public int run(String[] args) throws Exception {
// setting no of reducers
wordCountJob.setNumReduceTasks(26);
// setting custom partitioner class
wordCountJob.setPartitionerClass(WordCountPartitioner.class);
// setting mapper output key class: K2
wordCountJob.setMapOutputKeyClass(Text.class);
// setting mapper output value class: V2
wordCountJob.setMapOutputValueClass(LongWritable.class);
// setting reducer output key class: K3
wordCountJob.setOutputKeyClass(Text.class);
// setting reducer output value class: V3
wordCountJob.setOutputValueClass(LongWritable.class);
// setting the input format class ,i.e for K1, V1
wordCountJob.setInputFormatClass(TextInputFormat.class);
// setting the output format class
wordCountJob.setOutputFormatClass(TextOutputFormat.class);
// setting the input file path
FileInputFormat.addInputPath(wordCountJob, new Path(args[0]));
// setting the output folder path
FileOutputFormat.setOutputPath(wordCountJob, new Path(args[1]));
Path outputpath = new Path(args[1]);
// delete the output folder if exists
outputpath.getFileSystem(conf).delete(outputpath,true);
// to execute the job and return the status
return wordCountJob.waitForCompletion(true) ? 0 : -1;
}
public static void main(String[] args) throws Exception {
// start the job providing arguments and configurations
ToolRunner.run(new Configuration(), new WordCountJob(), args);
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WordCountJob implements Tool {
private Configuration conf;
@Override
public Configuration getConf() {
return conf; // getting the configuration
}
@Override
public void setConf(Configuration conf) {
this.conf = conf; // setting the configuration
}
@Override
public int run(String[] args) throws Exception {
// initializing the job configuration
Job wordCountJob = new Job(getConf());
// setting the job name
wordCountJob.setJobName("Orien IT WordCount Job");
// to call this as a jar
wordCountJob.setJarByClass(this.getClass());
// setting custom mapper class
// setting custom mapper class
wordCountJob.setMapperClass(WordCountMapper.class);
// setting custom reducer class
wordCountJob.setReducerClass(WordCountReducer.class);
// setting custom combiner class
wordCountJob.setCombinerClass(WordCountCombiner.class);
// setting no of reducers
wordCountJob.setNumReduceTasks(26);
// setting custom partitioner class
wordCountJob.setPartitionerClass(WordCountPartitioner.class);
// setting mapper output key class: K2
wordCountJob.setMapOutputKeyClass(Text.class);
// setting mapper output value class: V2
wordCountJob.setMapOutputValueClass(LongWritable.class);
// setting reducer output key class: K3
wordCountJob.setOutputKeyClass(Text.class);
// setting reducer output value class: V3
wordCountJob.setOutputValueClass(LongWritable.class);
// setting the input format class ,i.e for K1, V1
wordCountJob.setInputFormatClass(TextInputFormat.class);
// setting the output format class
wordCountJob.setOutputFormatClass(TextOutputFormat.class);
// setting the input file path
FileInputFormat.addInputPath(wordCountJob, new Path(args[0]));
// setting the output folder path
FileOutputFormat.setOutputPath(wordCountJob, new Path(args[1]));
Path outputpath = new Path(args[1]);
// delete the output folder if exists
outputpath.getFileSystem(conf).delete(outputpath,true);
// to execute the job and return the status
return wordCountJob.waitForCompletion(true) ? 0 : -1;
}
public static void main(String[] args) throws Exception {
// start the job providing arguments and configurations
ToolRunner.run(new Configuration(), new WordCountJob(), args);
}
}
package com.orienit.hadoop.training.wordcount;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends
Mapper<LongWritable, Text, Text, LongWritable> {
private Text text = new Text();
private final static LongWritable one = new LongWritable(1);
enum MyCounter {
MAPS
};
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer words = new StringTokenizer(line, " ");
Counter staticCounter = context.getCounter(MyCounter.MAPS);
Counter dynamicCounter = context.getCounter("OrienIT", "how many maps");
while (words.hasMoreTokens()) {
staticCounter.increment(1); // increment the static counter
dynamicCounter.increment(1); // increment the dynamic counter
text.set(words.nextToken());
context.write(text, one); // write the map output
}
};
}
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends
Mapper<LongWritable, Text, Text, LongWritable> {
private Text text = new Text();
private final static LongWritable one = new LongWritable(1);
enum MyCounter {
MAPS
};
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer words = new StringTokenizer(line, " ");
Counter staticCounter = context.getCounter(MyCounter.MAPS);
Counter dynamicCounter = context.getCounter("OrienIT", "how many maps");
while (words.hasMoreTokens()) {
staticCounter.increment(1); // increment the static counter
dynamicCounter.increment(1); // increment the dynamic counter
text.set(words.nextToken());
context.write(text, one); // write the map output
}
};
}
package com.orienit.hadoop.training.wordcount;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class WordCountPartitioner extends Partitioner<Text, LongWritable> {
@Override
public int getPartition(Text text, LongWritable lw, int noOfReducers) {
String word = text.toString().toLowerCase();
return (Math.abs(word.charAt(0) - 'a')) % noOfReducers; // return the partition number to write the reducer output
}
}
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class WordCountPartitioner extends Partitioner<Text, LongWritable> {
@Override
public int getPartition(Text text, LongWritable lw, int noOfReducers) {
String word = text.toString().toLowerCase();
return (Math.abs(word.charAt(0) - 'a')) % noOfReducers; // return the partition number to write the reducer output
}
}
package com.orienit.hadoop.training.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> value, Context context) throws IOException,
InterruptedException {
long sum = 0;
while (value.iterator().hasNext()) {
sum += value.iterator().next().get();
}
context.write(key, new LongWritable(sum)); // write the reducer output
};
}
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> value, Context context) throws IOException,
InterruptedException {
long sum = 0;
while (value.iterator().hasNext()) {
sum += value.iterator().next().get();
}
context.write(key, new LongWritable(sum)); // write the reducer output
};
}
package com.orienit.hadoop.training.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountCombiner extends Reducer<Text, LongWritable, Text, LongWritable> {
protected void reduce(Text key, Iterable<LongWritable> value, Context context) throws IOException,
InterruptedException {
long sum = 0;
while (value.iterator().hasNext()) {
sum += value.iterator().next().get();
}
context.write(key, new LongWritable(sum));
};
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class GrepJob implements Tool {
private Configuration conf;
@Override
public Configuration getConf() {
return conf;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public int run(String[] args) throws Exception {
Job grepJob = new Job(getConf());
grepJob.setJobName("OrienIT Grep Job");
grepJob.setJarByClass(this.getClass());
grepJob.setMapperClass(GrepMapper.class);
grepJob.setNumReduceTasks(0);
grepJob.setOutputKeyClass(Text.class);
grepJob.setOutputValueClass(NullWritable.class);
grepJob.setInputFormatClass(TextInputFormat.class);
grepJob.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(grepJob, new Path(args[0]));
FileOutputFormat.setOutputPath(grepJob, new Path(args[1]));
Path outputpath = new Path(args[1]);
outputpath.getFileSystem(conf).delete(outputpath,true);
return grepJob.waitForCompletion(true) == true ? 0 : -1;
}
public static void main(String[] args) throws Exception {
Configuration conf1 = new Configuration();
conf1.set("grep-arg", "Hyderabad");
ToolRunner.run(conf1, new GrepJob(), args);
}
}
package com.orienit.hadoop.training.grep;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class GrepMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
if (value.toString().contains(context.getConfiguration().get("grep-arg"))) {
context.write( value, NullWritable.get());
}
};
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SedJob implements Tool {
private Configuration conf;
@Override
public Configuration getConf() {
return conf;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public int run(String[] args) throws Exception {
Job sedjob = new Job(getConf());
sedjob.setJobName("OrienIT sed Count");
sedjob.setJarByClass(this.getClass());
sedjob.setMapperClass(SedMapper.class);
sedjob.setNumReduceTasks(0);
sedjob.setOutputKeyClass(Text.class);
sedjob.setOutputValueClass(NullWritable.class);
sedjob.setInputFormatClass(TextInputFormat.class);
sedjob.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(sedjob, new Path(args[0]));
FileOutputFormat.setOutputPath(sedjob, new Path(args[1]));
Path outputpath = new Path(args[1]);
outputpath.getFileSystem(conf).delete(outputpath,true);
return sedjob.waitForCompletion(true) == true ? 0 : -1;
}
public static void main(String[] args) throws Exception {
Configuration conf1 = new Configuration();
conf1.set("sed-arg1", "hadoop");
conf1.set("sed-arg2", "BigData");
ToolRunner.run(conf1, new SedJob(), args);
}
}
package com.orienit.hadoop.training.sed;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SedMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
if (value.toString().contains(context.getConfiguration().get("sed-arg1"))) {
context.write( new Text(value.toString().replaceAll(
context.getConfiguration().get("sed-arg1"), context.getConfiguration().get("sed-arg2"))),NullWritable.get());
} else {
context.write(value,NullWritable.get());
}
}
}
*********************************************************************************************
Write a Distributed Grep programming
package com.orienit.hadoop.training.grep;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class GrepJob implements Tool {
private Configuration conf;
@Override
public Configuration getConf() {
return conf;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public int run(String[] args) throws Exception {
Job grepJob = new Job(getConf());
grepJob.setJobName("OrienIT Grep Job");
grepJob.setJarByClass(this.getClass());
grepJob.setMapperClass(GrepMapper.class);
grepJob.setNumReduceTasks(0);
grepJob.setOutputKeyClass(Text.class);
grepJob.setOutputValueClass(NullWritable.class);
grepJob.setInputFormatClass(TextInputFormat.class);
grepJob.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(grepJob, new Path(args[0]));
FileOutputFormat.setOutputPath(grepJob, new Path(args[1]));
Path outputpath = new Path(args[1]);
outputpath.getFileSystem(conf).delete(outputpath,true);
return grepJob.waitForCompletion(true) == true ? 0 : -1;
}
public static void main(String[] args) throws Exception {
Configuration conf1 = new Configuration();
conf1.set("grep-arg", "Hyderabad");
ToolRunner.run(conf1, new GrepJob(), args);
}
}
package com.orienit.hadoop.training.grep;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class GrepMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
if (value.toString().contains(context.getConfiguration().get("grep-arg"))) {
context.write( value, NullWritable.get());
}
};
}
*********************************************************************************************
Write a Distributed Sed programming
package com.orienit.hadoop.training.sed;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SedJob implements Tool {
private Configuration conf;
@Override
public Configuration getConf() {
return conf;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public int run(String[] args) throws Exception {
Job sedjob = new Job(getConf());
sedjob.setJobName("OrienIT sed Count");
sedjob.setJarByClass(this.getClass());
sedjob.setMapperClass(SedMapper.class);
sedjob.setNumReduceTasks(0);
sedjob.setOutputKeyClass(Text.class);
sedjob.setOutputValueClass(NullWritable.class);
sedjob.setInputFormatClass(TextInputFormat.class);
sedjob.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(sedjob, new Path(args[0]));
FileOutputFormat.setOutputPath(sedjob, new Path(args[1]));
Path outputpath = new Path(args[1]);
outputpath.getFileSystem(conf).delete(outputpath,true);
return sedjob.waitForCompletion(true) == true ? 0 : -1;
}
public static void main(String[] args) throws Exception {
Configuration conf1 = new Configuration();
conf1.set("sed-arg1", "hadoop");
conf1.set("sed-arg2", "BigData");
ToolRunner.run(conf1, new SedJob(), args);
}
}
package com.orienit.hadoop.training.sed;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SedMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
if (value.toString().contains(context.getConfiguration().get("sed-arg1"))) {
context.write( new Text(value.toString().replaceAll(
context.getConfiguration().get("sed-arg1"), context.getConfiguration().get("sed-arg2"))),NullWritable.get());
} else {
context.write(value,NullWritable.get());
}
}
}
No comments:
Post a Comment