Saturday, August 1, 2015

MapReduce iii)MapReduce Job for finding the Diplicate IP fields



/*Note:
This MapReduce program reads the data from the last LIbSvmMapper MR Program and Reads unique records.It avoids writing duplicate records.
Input data:
121  0  155  175
122  1  157   176
121  0  155  175
123  0  155  175
122  1  157   176
128  1  157   176

Output Data:
121  0  155  175
122  1  157   176
123  0  155  175
 128  1  157   176
*/
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DupIPDriver implements Tool {

public static void main(String[] args) throws Exception {
int ec=ToolRunner.run(new DupIPDriver(), args);
System.out.println(ec);
}

public int run(String[] args) throws Exception {


Path lsOpath=new Path(args[0]);////LibSvm Data format in numeric form
Path dupIPoutput=new Path(args[1]);//List of Duplicate value records


Configuration conf=new Configuration();

Job dupipJob=Job.getInstance(conf, "FindDupIpJob");
dupipJob.setJarByClass(DupIPDriver.class);

dupipJob.setMapperClass(DupIPMapper.class);
dupipJob.setReducerClass(DupIpReducer.class);

FileInputFormat.setInputPaths(dupipJob, lsOpath+"/part-m-*");
FileOutputFormat.setOutputPath(dupipJob, dupIPoutput);

//This UniqueValData file MapReduce code output available in previous MapReduce code LibSvmMapper //Class.
dupipJob.addCacheFile(new URI("<ServerName&Path>/UniqueValData"));
dupipJob.setMapOutputKeyClass(Text.class);
dupipJob.setMapOutputValueClass(Text.class);
dupipJob.setOutputKeyClass(Text.class);
dupipJob.setOutputValueClass(NullWritable.class);
dupipJob.waitForCompletion(true);
}
}
Mapper Code:
package com.mr.dupip.replaceval;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DupIPMapper extends Mapper<LongWritable,Text, Text,Text>{

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line=value.toString();
String[] words=line.split(" ");
context.write(new Text(words[2].trim()),new Text(line.trim()));
}
}
Reducer Code:
package com.mr.dupip.replaceval;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class DupIpReducer extends Reducer<Text, Text, Text, NullWritable>{

HashSet<String> hs;
protected void setup(Context context) throws IOException {
hs=new HashSet<String>();
}
public void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
for(Text line:values){
String line1=line.toString();
hs.add(line1);
}
for(String eachLine:hs){
if(hs.size()>1){
context.write(new Text(eachLine),NullWritable.get());
}
}
hs.clear();
}
}

No comments:

Post a Comment