/*Note:
This MapReduce program reads the data from the last LIbSvmMapper MR Program and Reads unique records.It avoids writing duplicate records.
Input data:
121 0 155 175
122 1 157 176
121 0 155 175
123 0 155 175
122 1 157 176
128 1 157 176
Output Data:
121 0 155 175
122 1 157 176
123 0 155 175
128 1 157 176
*/
import
org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import
org.apache.hadoop.util.ToolRunner;
public class DupIPDriver implements
Tool {
public static void main(String[] args)
throws Exception {
int ec=ToolRunner.run(new
DupIPDriver(), args);
System.out.println(ec);
}
public int run(String[] args) throws
Exception {
Path lsOpath=new
Path(args[0]);////LibSvm Data format in numeric form
Path dupIPoutput=new
Path(args[1]);//List of Duplicate value records
Configuration conf=new
Configuration();
Job dupipJob=Job.getInstance(conf,
"FindDupIpJob");
dupipJob.setJarByClass(DupIPDriver.class);
dupipJob.setMapperClass(DupIPMapper.class);
dupipJob.setReducerClass(DupIpReducer.class);
FileInputFormat.setInputPaths(dupipJob,
lsOpath+"/part-m-*");
FileOutputFormat.setOutputPath(dupipJob,
dupIPoutput);
//This UniqueValData file MapReduce code output available in previous MapReduce code LibSvmMapper //Class.
dupipJob.addCacheFile(new
URI("<ServerName&Path>/UniqueValData"));
dupipJob.setMapOutputKeyClass(Text.class);
dupipJob.setMapOutputValueClass(Text.class);
dupipJob.setOutputKeyClass(Text.class);
dupipJob.setOutputValueClass(NullWritable.class);
dupipJob.waitForCompletion(true);
}
}
Mapper Code:
package com.mr.dupip.replaceval;
import java.io.IOException;
import
org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Mapper;
public class DupIPMapper extends
Mapper<LongWritable,Text, Text,Text>{
protected void map(LongWritable key,
Text value, Context context) throws IOException, InterruptedException
{
String line=value.toString();
String[] words=line.split(" ");
context.write(new
Text(words[2].trim()),new Text(line.trim()));
}
}
Reducer Code:
package com.mr.dupip.replaceval;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Reducer;
import
org.apache.hadoop.mapreduce.Reducer.Context;
public class DupIpReducer extends
Reducer<Text, Text, Text, NullWritable>{
HashSet<String> hs;
protected void setup(Context context)
throws IOException {
hs=new HashSet<String>();
}
public void reduce(Text key,
Iterable<Text> values,Context context) throws IOException,
InterruptedException {
for(Text line:values){
String line1=line.toString();
hs.add(line1);
}
for(String eachLine:hs){
if(hs.size()>1){
context.write(new
Text(eachLine),NullWritable.get());
}
}
hs.clear();
}
}
No comments:
Post a Comment