/*
Note:
This MR job converts the input numeric data into text data which is converted in LIbSvmMapper.
The input data:
121 0 155 175
122 1 157 176
123 0 155 175
128 1 157 176
output is:
*/
Note:
This MR job converts the input numeric data into text data which is converted in LIbSvmMapper.
The input data:
121 0 155 175
122 1 157 176
123 0 155 175
128 1 157 176
output is:
100.20.20.0
India
150.100.0.20 00.120.0.abc.txxyt.00
101.21.21.1
USA 151.100.0.21
11.101.axc.asdfs.212
102.20.20.0
India
150.100.0.20 00.120.0.abc.txxyt.00
103.21.21.1
USA 151.100.0.21
11.101.axc.asdfs.213
*/
import
org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import
org.apache.hadoop.util.ToolRunner;
public class DupIPDriver implements
Tool {
public static void main(String[] args)
throws Exception {
int ec=ToolRunner.run(new
DupIPDriver(), args);
System.out.println(ec);
}
public int run(String[] args) throws
Exception {
Path dupIPoutput=new
Path(args[0]);//List of Duplicate value records
Path dupIPs=new Path(args[1]);//
Final output with duplicate Ip's
Configuration conf=new
Configuration();
Job
replDupIpValJob=Job.getInstance(conf, "ReplaceDupIPValues");
replDupIpValJob.setJarByClass(DupIPDriver.class);
replDupIpValJob.setMapperClass(ReplValMapper.class);
//replDupIpValJob.setReducerClass(ReplValReducer.class);
replDupIpValJob.setNumReduceTasks(0);
replDupIpValJob.addCacheFile(new
URI("<ServerName&PAth>UniqueValData"));
replDupIpValJob.setMapOutputKeyClass(Text.class);
replDupIpValJob.setMapOutputValueClass(Text.class);
replDupIpValJob.setOutputKeyClass(Text.class);
replDupIpValJob.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(replDupIpValJob,
new Path(dupIPoutput+"/part-r-00000"));
FileOutputFormat.setOutputPath(replDupIpValJob,
dupIPs);
replDupIpValJob.waitForCompletion(true);
}
}
Mapper Code:
package com.mr.dupip.replaceval;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Mapper;
import
org.apache.hadoop.mapreduce.Mapper.Context;
public class ReplValMapper extends
Mapper<LongWritable,Text, Text,NullWritable>{
public Map<String, String>
uniqueKeyValMap = new HashMap<String, String>();
/
protected void setup(Context context)
throws IOException {
URI[] uris = context.getCacheFiles();
for (URI uri : uris) {
System.out.println("~~~Cache
File ::::" + uri.toString());
}
FileSystem fs =
FileSystem.get(context.getConfiguration());
Path cacheFile = new Path(uris[0]);
BufferedReader bf = new
BufferedReader(new InputStreamReader(fs.open(cacheFile)));
String setupData = null;
while ((setupData = bf.readLine()) !=
null) {
//Eg:- c0:ea:e4:84:dd:8b~98
String[] words =
setupData.split("~");
for (int i = 0; i < words.length;
i++) {
uniqueKeyValMap.put(words[1],
words[0]);
}
}
}
protected void map(LongWritable key,
Text value, Context context) throws IOException, InterruptedException
{
//Eg:- 55:576 96:46 34 18:576
44:576
String line=value.toString();
String[] words=line.split(" ");
String[]
source_ip=words[0].split(":");//55:576
//source_ipMap.put(source_ip[1],
source_ip[0]);//576,55
String[]
pattern=words[1].split(":");//96:46
//patternMap.put(pattern[1],
pattern[0]);//46,96
String[]
dest_ip=words[2].split(":");//18:576
//dest_ipMap.put(dest_ip[1],
dest_ip[0]);//576,18
String[]
originator=words[3].split(":");//44:576
//orignatorMap.put(originator[1],originator[0]);//576,44
String sentence="";
sentence=uniqueKeyValMap.get(source_ip[0])+"
"+uniqueKeyValMap.get(pattern[0])+"
"+uniqueKeyValMap.get(dest_ip[0])+"
"+uniqueKeyValMap.get(originator[0]);
context.write(new Text(sentence),
NullWritable.get());
}
}
No comments:
Post a Comment