Saturday, August 1, 2015


Maven Project build for Storm-Kafka programs


/*Note:
The following is the pom.xml file for Storm and kafka Projects build using Maven
*/

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.comcom.nato.rt</groupId>
    <artifactId>storm-rt-usecases</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>storm-rt-usecases</name>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.9.2</artifactId>
            <version>0.8.1.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>0.9.1-incubating</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>0.9.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-hbase</artifactId>
            <version>0.9.4</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>exec</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <executable>java</executable>
                    <includeProjectDependencies>true</includeProjectDependencies>
                    <includePluginDependencies>false</includePluginDependencies>
                    <classpathScope>compile</classpathScope>
                    <mainClass>${storm.topology}</mainClass>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <executions>
                    <execution>
                        <id>create-my-bundle</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                        <configuration>
                            <descriptorRefs>
                                <descriptorRef>jar-with-dependencies</descriptorRef>
                            </descriptorRefs>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

    </build>
</project>

MapReduce .iv)MapReduce Job for Replcacing the Numeric values into Actual values to only duplicate IP fields.

/*
Note:
This MR job  converts the input numeric data into text data which is converted in LIbSvmMapper.
The input data:
121  0  155  175
122  1  157   176
123  0  155  175
 128  1  157   176

output is:
100.20.20.0     India        150.100.0.20    00.120.0.abc.txxyt.00
101.21.21.1    USA        151.100.0.21    11.101.axc.asdfs.212
102.20.20.0    India        150.100.0.20    00.120.0.abc.txxyt.00
103.21.21.1    USA        151.100.0.21    11.101.axc.asdfs.213

*/

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DupIPDriver implements Tool {

public static void main(String[] args) throws Exception {
int ec=ToolRunner.run(new DupIPDriver(), args);
System.out.println(ec);
}

public int run(String[] args) throws Exception {

Path dupIPoutput=new Path(args[0]);//List of Duplicate value records
Path dupIPs=new Path(args[1]);// Final output with duplicate Ip's

Configuration conf=new Configuration();

Job replDupIpValJob=Job.getInstance(conf, "ReplaceDupIPValues");

replDupIpValJob.setJarByClass(DupIPDriver.class);

replDupIpValJob.setMapperClass(ReplValMapper.class);
//replDupIpValJob.setReducerClass(ReplValReducer.class);
replDupIpValJob.setNumReduceTasks(0);

replDupIpValJob.addCacheFile(new URI("<ServerName&PAth>UniqueValData"));

replDupIpValJob.setMapOutputKeyClass(Text.class);
replDupIpValJob.setMapOutputValueClass(Text.class);

replDupIpValJob.setOutputKeyClass(Text.class);
replDupIpValJob.setOutputValueClass(NullWritable.class);

FileInputFormat.setInputPaths(replDupIpValJob, new Path(dupIPoutput+"/part-r-00000"));
FileOutputFormat.setOutputPath(replDupIpValJob, dupIPs);

replDupIpValJob.waitForCompletion(true);
}
}


Mapper Code:
package com.mr.dupip.replaceval;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.*;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;

public class ReplValMapper extends Mapper<LongWritable,Text, Text,NullWritable>{

public Map<String, String> uniqueKeyValMap = new HashMap<String, String>();
/
protected void setup(Context context) throws IOException {
URI[] uris = context.getCacheFiles();

for (URI uri : uris) {
System.out.println("~~~Cache File ::::" + uri.toString());
}
FileSystem fs = FileSystem.get(context.getConfiguration());
Path cacheFile = new Path(uris[0]);
BufferedReader bf = new BufferedReader(new InputStreamReader(fs.open(cacheFile)));
String setupData = null;
while ((setupData = bf.readLine()) != null) {
//Eg:- c0:ea:e4:84:dd:8b~98
String[] words = setupData.split("~");
for (int i = 0; i < words.length; i++) {
uniqueKeyValMap.put(words[1], words[0]);
}
}
}
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//Eg:- 55:576 96:46 34 18:576 44:576
String line=value.toString();
String[] words=line.split(" ");
String[] source_ip=words[0].split(":");//55:576
//source_ipMap.put(source_ip[1], source_ip[0]);//576,55
String[] pattern=words[1].split(":");//96:46
//patternMap.put(pattern[1], pattern[0]);//46,96
String[] dest_ip=words[2].split(":");//18:576
//dest_ipMap.put(dest_ip[1], dest_ip[0]);//576,18
String[] originator=words[3].split(":");//44:576
//orignatorMap.put(originator[1],originator[0]);//576,44
String sentence="";
sentence=uniqueKeyValMap.get(source_ip[0])+" "+uniqueKeyValMap.get(pattern[0])+" "+uniqueKeyValMap.get(dest_ip[0])+" "+uniqueKeyValMap.get(originator[0]);
context.write(new Text(sentence), NullWritable.get());
}

}

MapReduce iii)MapReduce Job for finding the Diplicate IP fields



/*Note:
This MapReduce program reads the data from the last LIbSvmMapper MR Program and Reads unique records.It avoids writing duplicate records.
Input data:
121  0  155  175
122  1  157   176
121  0  155  175
123  0  155  175
122  1  157   176
128  1  157   176

Output Data:
121  0  155  175
122  1  157   176
123  0  155  175
 128  1  157   176
*/
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DupIPDriver implements Tool {

public static void main(String[] args) throws Exception {
int ec=ToolRunner.run(new DupIPDriver(), args);
System.out.println(ec);
}

public int run(String[] args) throws Exception {


Path lsOpath=new Path(args[0]);////LibSvm Data format in numeric form
Path dupIPoutput=new Path(args[1]);//List of Duplicate value records


Configuration conf=new Configuration();

Job dupipJob=Job.getInstance(conf, "FindDupIpJob");
dupipJob.setJarByClass(DupIPDriver.class);

dupipJob.setMapperClass(DupIPMapper.class);
dupipJob.setReducerClass(DupIpReducer.class);

FileInputFormat.setInputPaths(dupipJob, lsOpath+"/part-m-*");
FileOutputFormat.setOutputPath(dupipJob, dupIPoutput);

//This UniqueValData file MapReduce code output available in previous MapReduce code LibSvmMapper //Class.
dupipJob.addCacheFile(new URI("<ServerName&Path>/UniqueValData"));
dupipJob.setMapOutputKeyClass(Text.class);
dupipJob.setMapOutputValueClass(Text.class);
dupipJob.setOutputKeyClass(Text.class);
dupipJob.setOutputValueClass(NullWritable.class);
dupipJob.waitForCompletion(true);
}
}
Mapper Code:
package com.mr.dupip.replaceval;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DupIPMapper extends Mapper<LongWritable,Text, Text,Text>{

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line=value.toString();
String[] words=line.split(" ");
context.write(new Text(words[2].trim()),new Text(line.trim()));
}
}
Reducer Code:
package com.mr.dupip.replaceval;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class DupIpReducer extends Reducer<Text, Text, Text, NullWritable>{

HashSet<String> hs;
protected void setup(Context context) throws IOException {
hs=new HashSet<String>();
}
public void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
for(Text line:values){
String line1=line.toString();
hs.add(line1);
}
for(String eachLine:hs){
if(hs.size()>1){
context.write(new Text(eachLine),NullWritable.get());
}
}
hs.clear();
}
}

MapReduce ii)mapReduce Job For Converting all text data into numeric Data by assigning a unique code for each field.

/*
Its a MapSide join.This mapper convert all text data fields into a numeric data for easy processing needed problems.After all procesing has done we can get back the numeric values into our  original text data.
That conversion code will be provided into next blog.
Input data Sample for converting into Text data into Numeric data

Input Data Example:
100.20.20.0     India        150.100.0.20    00.120.0.abc.txxyt.00
101.21.21.1    USA        151.100.0.21    11.101.axc.asdfs.212
102.20.20.0    India        150.100.0.20    00.120.0.abc.txxyt.00
103.21.21.1    USA        151.100.0.21    11.101.axc.asdfs.213
. . .
*/
import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;



public class DupIPDriver implements Tool {



public static void main(String[] args) throws Exception {

int ec=ToolRunner.run(new DupIPDriver(), args);

System.out.println(ec);

}



public int run(String[] args) throws Exception {




Path filterData=new Path(args[0]);//Filterin Data with 4 columns
Path lsOpath=new Path(args[1]);////LibSvm Data format in numeric form



Configuration conf=new Configuration();

Job lsDataConvertjob=Job.getInstance(conf,"LibSVMdata");

lsDataConvertjob.setJarByClass(DupIPDriver.class);


lsDataConvertjob.setMapperClass(LibSvmMapper.class);

lsDataConvertjob.setNumReduceTasks(0);


lsDataConvertjob.setMapOutputKeyClass(Text.class);

lsDataConvertjob.setMapOutputValueClass(NullWritable.class);


lsDataConvertjob.setOutputKeyClass(Text.class);

lsDataConvertjob.setOutputValueClass(NullWritable.class);


//lsDataConvertjob.addCacheFile(new URI(wcOpath+"/part-*"));

//This cache file is hard coded.It is taken from the wordcount program output file for .
//this word count program has been written in the last blog.

//DataConvertjob.addCacheFile(new URI("<ServerName & Path>/DWordCount/part-r-00000"));

lsDataConvertjob.addCacheFile(new URI("<ServerName&Path>/DWordCount/part-r-00000"));


FileInputFormat.setInputPaths(lsDataConvertjob,filterData+"/part-*");

FileOutputFormat.setOutputPath(lsDataConvertjob, lsOpath);



lsDataConvertjob.waitForCompletion(true);
}
}
Mapper Code:


public class LibSvmMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
public Map<String, String> count = new HashMap<String, String>();

public Map<String, String> uniqueId = new HashMap<String, String>();


protected void setup(Context context) throws IOException {

URI[] uris = context.getCacheFiles();



for (URI uri : uris) {

System.out.println("~~~Cache File ::::" + uri.toString());

}

FileSystem fs = FileSystem.get(context.getConfiguration());

Path cacheFile = new Path(uris[0]);

BufferedReader bf = new BufferedReader(new InputStreamReader(

fs.open(cacheFile)));

// BufferedReader bf = new BufferedReader(new

// FileReader(cacheFiles[0].toString()));



String setupData = null;

int uniqueIdValue = 1;

while ((setupData = bf.readLine()) != null) {

String[] words = setupData.split("\t");

for (int i = 0; i < words.length; i++) {

count.put(words[0], words[1]);

uniqueId.put(words[0], ""+uniqueIdValue);

uniqueIdValue++;

}

}


//FSDataOutputStream out = fs.create(new Path("<ServerName&Path>/UniqueValData"));

FSDataOutputStream out = fs.create(new Path("<ServerName&Path>/UniqueValData"));

for (Entry<String, String> entry : uniqueId.entrySet()){

String sentence=entry.getKey()+"~"+entry.getValue();

out.writeBytes(sentence);

out.writeBytes("\n");


}

}

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String s = value.toString();

String words[] = s.split(" ");





for(int i=0; i<words.length;i++){

opRow.append(uniqueId.get(words[i])).append(":").append(count.get(words[i])).append(" ");

}


context.write(new Text(opRow.toString().trim()), NullWritable.get());



}

}



MapReduce i)WordCount Program





Data set Example:


Input Data Example:
100.20.20.0    India        150.100.0.20    00.120.0.abc.txxyt.00
101.21.21.1    USA        151.100.0.21    11.101.axc.asdfs.212
102.20.20.0    India        150.100.0.20    00.120.0.abc.txxyt.00
103.21.21.1    USA        151.100.0.21    11.101.axc.asdfs.213
. . .
. .  .  .
MapReduce code for WordCount Example:



import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DupIPDriver implements Tool {

public static void main(String[] args) throws Exception {
int ec=ToolRunner.run(new DupIPDriver(), args);
System.out.println(ec);
}

public int run(String[] args) throws Exception {
Path inpath=new Path(args[0]);//Raw Syslog Data
Path wcOpath=new Path(args[1]);//List of Wordcounts

Configuration conf=new Configuration();
Job wordCountjob=Job.getInstance(conf,"WordCount");
wordCountjob.setJarByClass(DupIPDriver.class);
wordCountjob.setMapperClass(WordMap.class);
wordCountjob.setReducerClass(WordReducer.class);
wordCountjob.setNumReduceTasks(1);
/*
wordCountjob.setMapOutputKeyClass(Text.class);
wordCountjob.setMapOutputValueClass(Text.class);
*/
wordCountjob.setOutputKeyClass(Text.class);
wordCountjob.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(wordCountjob, inpath);
FileOutputFormat.setOutputPath(wordCountjob, wcOpath);
wordCountjob.waitForCompletion(true);
}
}
Maper Code:
package com.mr.dupip.replaceval;


import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordMap extends Mapper<LongWritable,Text, Text, IntWritable>{

public void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
String s=value.toString();
for (String word : s.split(" ")) {
if (word.length()>0) {
context.write(new Text(word),new IntWritable(1));
}
}
}

}

Reducer Code:
package com.mr.dupip.replaceval;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int count=0;
for(IntWritable a : values){
int i=a.get();
count+=i;
}
context.write(key, new IntWritable(count));
}

}