Saturday, August 1, 2015

MapReduce ii)mapReduce Job For Converting all text data into numeric Data by assigning a unique code for each field.

/*
Its a MapSide join.This mapper convert all text data fields into a numeric data for easy processing needed problems.After all procesing has done we can get back the numeric values into our  original text data.
That conversion code will be provided into next blog.
Input data Sample for converting into Text data into Numeric data

Input Data Example:
100.20.20.0     India        150.100.0.20    00.120.0.abc.txxyt.00
101.21.21.1    USA        151.100.0.21    11.101.axc.asdfs.212
102.20.20.0    India        150.100.0.20    00.120.0.abc.txxyt.00
103.21.21.1    USA        151.100.0.21    11.101.axc.asdfs.213
. . .
*/
import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;



public class DupIPDriver implements Tool {



public static void main(String[] args) throws Exception {

int ec=ToolRunner.run(new DupIPDriver(), args);

System.out.println(ec);

}



public int run(String[] args) throws Exception {




Path filterData=new Path(args[0]);//Filterin Data with 4 columns
Path lsOpath=new Path(args[1]);////LibSvm Data format in numeric form



Configuration conf=new Configuration();

Job lsDataConvertjob=Job.getInstance(conf,"LibSVMdata");

lsDataConvertjob.setJarByClass(DupIPDriver.class);


lsDataConvertjob.setMapperClass(LibSvmMapper.class);

lsDataConvertjob.setNumReduceTasks(0);


lsDataConvertjob.setMapOutputKeyClass(Text.class);

lsDataConvertjob.setMapOutputValueClass(NullWritable.class);


lsDataConvertjob.setOutputKeyClass(Text.class);

lsDataConvertjob.setOutputValueClass(NullWritable.class);


//lsDataConvertjob.addCacheFile(new URI(wcOpath+"/part-*"));

//This cache file is hard coded.It is taken from the wordcount program output file for .
//this word count program has been written in the last blog.

//DataConvertjob.addCacheFile(new URI("<ServerName & Path>/DWordCount/part-r-00000"));

lsDataConvertjob.addCacheFile(new URI("<ServerName&Path>/DWordCount/part-r-00000"));


FileInputFormat.setInputPaths(lsDataConvertjob,filterData+"/part-*");

FileOutputFormat.setOutputPath(lsDataConvertjob, lsOpath);



lsDataConvertjob.waitForCompletion(true);
}
}
Mapper Code:


public class LibSvmMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
public Map<String, String> count = new HashMap<String, String>();

public Map<String, String> uniqueId = new HashMap<String, String>();


protected void setup(Context context) throws IOException {

URI[] uris = context.getCacheFiles();



for (URI uri : uris) {

System.out.println("~~~Cache File ::::" + uri.toString());

}

FileSystem fs = FileSystem.get(context.getConfiguration());

Path cacheFile = new Path(uris[0]);

BufferedReader bf = new BufferedReader(new InputStreamReader(

fs.open(cacheFile)));

// BufferedReader bf = new BufferedReader(new

// FileReader(cacheFiles[0].toString()));



String setupData = null;

int uniqueIdValue = 1;

while ((setupData = bf.readLine()) != null) {

String[] words = setupData.split("\t");

for (int i = 0; i < words.length; i++) {

count.put(words[0], words[1]);

uniqueId.put(words[0], ""+uniqueIdValue);

uniqueIdValue++;

}

}


//FSDataOutputStream out = fs.create(new Path("<ServerName&Path>/UniqueValData"));

FSDataOutputStream out = fs.create(new Path("<ServerName&Path>/UniqueValData"));

for (Entry<String, String> entry : uniqueId.entrySet()){

String sentence=entry.getKey()+"~"+entry.getValue();

out.writeBytes(sentence);

out.writeBytes("\n");


}

}

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String s = value.toString();

String words[] = s.split(" ");





for(int i=0; i<words.length;i++){

opRow.append(uniqueId.get(words[i])).append(":").append(count.get(words[i])).append(" ");

}


context.write(new Text(opRow.toString().trim()), NullWritable.get());



}

}



No comments:

Post a Comment