package ex3; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.Arrays; import java.util.HashSet; import java.util.Set; import java.util.StringTokenizer; public class Anagram { /* * Mapper – Breaks up the input text in tokens (filtering some common * punctuation marks) and generates the required key. */ public static class Map extends Mapper { private Text sortedText = new Text(); private Text outputValue = new Text(); protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer tokenizer = new StringTokenizer(value.toString(), " \t\n\r\f,.:()!?-", false); while (tokenizer.hasMoreTokens()) { String token = tokenizer.nextToken().trim().toLowerCase(); sortedText.set(sort(token)); outputValue.set(token); context.write(sortedText, outputValue); } } /* * Sort the char array generating the key */ protected String sort(String input) { char[] cs = input.toCharArray(); Arrays.sort(cs); return new String(cs); } } /* * Combiner (optional) – Removes duplicate values from the input. Reducer – * Collects anagrams and outputs the number of anagrams (key) and all the * words concatenated (value). */ public static class Combiner extends Reducer { protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { Set uniques = new HashSet(); for (Text value : values) { if (uniques.add(value)) { context.write(key, value); } } } } /* * Reducer – Collects anagrams and outputs the number of anagrams (key) and * all the words concatenated (value). */ public static class Reduce extends Reducer { private IntWritable count = new IntWritable(); private Text outputValue = new Text(); protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { Set uniques = new HashSet(); int size = 0; // Append the anagrams in the same line StringBuilder builder = new StringBuilder(); for (Text value : values) { if (uniques.add(value)) { size++; builder.append(value.toString()); builder.append(','); } } // Remove the last semicolon builder.setLength(builder.length() - 1); // Check if there is at least one anagram if (size > 1) { count.set(size); outputValue.set(builder.toString()); context.write(count, outputValue); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "anagram"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(Map.class); job.setCombinerClass(Combiner.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); long begin = System.currentTimeMillis(); job.waitForCompletion(true); long end = System.currentTimeMillis(); long second = (end - begin) / 1000; System.err.println(job.getJobName() + " takes " + second + " seconds"); } }