/*
Its a MapSide join.This mapper convert all text data fields into a numeric data for easy processing needed problems.After all procesing has done we can get back the numeric values into our original text data.
That conversion code will be provided into next blog.
Input data Sample for converting into Text data into Numeric data
Input data Sample for converting into Text data into Numeric data
Input Data Example:
100.20.20.0
India
150.100.0.20 00.120.0.abc.txxyt.00
101.21.21.1
USA 151.100.0.21
11.101.axc.asdfs.212
102.20.20.0
India
150.100.0.20 00.120.0.abc.txxyt.00
103.21.21.1
USA 151.100.0.21
11.101.axc.asdfs.213
. . .
*/
import
org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import
org.apache.hadoop.util.ToolRunner;
public class DupIPDriver implements
Tool {
public static void main(String[] args)
throws Exception {
int ec=ToolRunner.run(new
DupIPDriver(), args);
System.out.println(ec);
}
public int run(String[] args) throws
Exception {
Path filterData=new
Path(args[0]);//Filterin Data with 4 columns
Path lsOpath=new
Path(args[1]);////LibSvm Data format in numeric form
Configuration conf=new
Configuration();
Job
lsDataConvertjob=Job.getInstance(conf,"LibSVMdata");
lsDataConvertjob.setJarByClass(DupIPDriver.class);
lsDataConvertjob.setMapperClass(LibSvmMapper.class);
lsDataConvertjob.setNumReduceTasks(0);
lsDataConvertjob.setMapOutputKeyClass(Text.class);
lsDataConvertjob.setMapOutputValueClass(NullWritable.class);
lsDataConvertjob.setOutputKeyClass(Text.class);
lsDataConvertjob.setOutputValueClass(NullWritable.class);
//lsDataConvertjob.addCacheFile(new
URI(wcOpath+"/part-*"));
//This cache file is hard coded.It
is taken from the wordcount program output file for .
//this word count program has been written in the last blog.
//DataConvertjob.addCacheFile(new
URI("<ServerName & Path>/DWordCount/part-r-00000"));
lsDataConvertjob.addCacheFile(new
URI("<ServerName&Path>/DWordCount/part-r-00000"));
FileInputFormat.setInputPaths(lsDataConvertjob,filterData+"/part-*");
FileOutputFormat.setOutputPath(lsDataConvertjob,
lsOpath);
lsDataConvertjob.waitForCompletion(true);
}
}
Mapper Code:
public class LibSvmMapper extends
Mapper<LongWritable, Text, Text, NullWritable> {
public Map<String, String> count
= new HashMap<String, String>();
public Map<String, String>
uniqueId = new HashMap<String, String>();
protected void setup(Context context)
throws IOException {
URI[] uris = context.getCacheFiles();
for (URI uri : uris) {
System.out.println("~~~Cache
File ::::" + uri.toString());
}
FileSystem fs =
FileSystem.get(context.getConfiguration());
Path cacheFile = new Path(uris[0]);
BufferedReader bf = new
BufferedReader(new InputStreamReader(
fs.open(cacheFile)));
// BufferedReader bf = new
BufferedReader(new
//
FileReader(cacheFiles[0].toString()));
String setupData = null;
int uniqueIdValue = 1;
while ((setupData = bf.readLine()) !=
null) {
String[] words =
setupData.split("\t");
for (int i = 0; i < words.length;
i++) {
count.put(words[0], words[1]);
uniqueId.put(words[0],
""+uniqueIdValue);
uniqueIdValue++;
}
}
//FSDataOutputStream out =
fs.create(new Path("<ServerName&Path>/UniqueValData"));
FSDataOutputStream out =
fs.create(new Path("<ServerName&Path>/UniqueValData"));
for (Entry<String, String>
entry : uniqueId.entrySet()){
String
sentence=entry.getKey()+"~"+entry.getValue();
out.writeBytes(sentence);
out.writeBytes("\n");
}
}
protected void map(LongWritable key,
Text value, Context context)
throws IOException,
InterruptedException {
String s = value.toString();
String words[] = s.split(" ");
for(int i=0; i<words.length;i++){
opRow.append(uniqueId.get(words[i])).append(":").append(count.get(words[i])).append("
");
}
context.write(new
Text(opRow.toString().trim()), NullWritable.get());
}
}
No comments:
Post a Comment