Saturday, August 1, 2015
Maven Project build for Storm-Kafka programs
/*Note:
The following is the pom.xml file for Storm and kafka Projects build using Maven
*/
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.comcom.nato.rt</groupId>
<artifactId>storm-rt-usecases</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>storm-rt-usecases</name>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.1-incubating</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.4</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hbase</artifactId>
<version>0.9.4</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<classpathScope>compile</classpathScope>
<mainClass>${storm.topology}</mainClass>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<id>create-my-bundle</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
MapReduce .iv)MapReduce Job for Replcacing the Numeric values into Actual values to only duplicate IP fields.
/*
Note:
This MR job converts the input numeric data into text data which is converted in LIbSvmMapper.
The input data:
121 0 155 175
122 1 157 176
123 0 155 175
128 1 157 176
output is:
*/
Note:
This MR job converts the input numeric data into text data which is converted in LIbSvmMapper.
The input data:
121 0 155 175
122 1 157 176
123 0 155 175
128 1 157 176
output is:
100.20.20.0
India
150.100.0.20 00.120.0.abc.txxyt.00
101.21.21.1
USA 151.100.0.21
11.101.axc.asdfs.212
102.20.20.0
India
150.100.0.20 00.120.0.abc.txxyt.00
103.21.21.1
USA 151.100.0.21
11.101.axc.asdfs.213
*/
import
org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import
org.apache.hadoop.util.ToolRunner;
public class DupIPDriver implements
Tool {
public static void main(String[] args)
throws Exception {
int ec=ToolRunner.run(new
DupIPDriver(), args);
System.out.println(ec);
}
public int run(String[] args) throws
Exception {
Path dupIPoutput=new
Path(args[0]);//List of Duplicate value records
Path dupIPs=new Path(args[1]);//
Final output with duplicate Ip's
Configuration conf=new
Configuration();
Job
replDupIpValJob=Job.getInstance(conf, "ReplaceDupIPValues");
replDupIpValJob.setJarByClass(DupIPDriver.class);
replDupIpValJob.setMapperClass(ReplValMapper.class);
//replDupIpValJob.setReducerClass(ReplValReducer.class);
replDupIpValJob.setNumReduceTasks(0);
replDupIpValJob.addCacheFile(new
URI("<ServerName&PAth>UniqueValData"));
replDupIpValJob.setMapOutputKeyClass(Text.class);
replDupIpValJob.setMapOutputValueClass(Text.class);
replDupIpValJob.setOutputKeyClass(Text.class);
replDupIpValJob.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(replDupIpValJob,
new Path(dupIPoutput+"/part-r-00000"));
FileOutputFormat.setOutputPath(replDupIpValJob,
dupIPs);
replDupIpValJob.waitForCompletion(true);
}
}
Mapper Code:
package com.mr.dupip.replaceval;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Mapper;
import
org.apache.hadoop.mapreduce.Mapper.Context;
public class ReplValMapper extends
Mapper<LongWritable,Text, Text,NullWritable>{
public Map<String, String>
uniqueKeyValMap = new HashMap<String, String>();
/
protected void setup(Context context)
throws IOException {
URI[] uris = context.getCacheFiles();
for (URI uri : uris) {
System.out.println("~~~Cache
File ::::" + uri.toString());
}
FileSystem fs =
FileSystem.get(context.getConfiguration());
Path cacheFile = new Path(uris[0]);
BufferedReader bf = new
BufferedReader(new InputStreamReader(fs.open(cacheFile)));
String setupData = null;
while ((setupData = bf.readLine()) !=
null) {
//Eg:- c0:ea:e4:84:dd:8b~98
String[] words =
setupData.split("~");
for (int i = 0; i < words.length;
i++) {
uniqueKeyValMap.put(words[1],
words[0]);
}
}
}
protected void map(LongWritable key,
Text value, Context context) throws IOException, InterruptedException
{
//Eg:- 55:576 96:46 34 18:576
44:576
String line=value.toString();
String[] words=line.split(" ");
String[]
source_ip=words[0].split(":");//55:576
//source_ipMap.put(source_ip[1],
source_ip[0]);//576,55
String[]
pattern=words[1].split(":");//96:46
//patternMap.put(pattern[1],
pattern[0]);//46,96
String[]
dest_ip=words[2].split(":");//18:576
//dest_ipMap.put(dest_ip[1],
dest_ip[0]);//576,18
String[]
originator=words[3].split(":");//44:576
//orignatorMap.put(originator[1],originator[0]);//576,44
String sentence="";
sentence=uniqueKeyValMap.get(source_ip[0])+"
"+uniqueKeyValMap.get(pattern[0])+"
"+uniqueKeyValMap.get(dest_ip[0])+"
"+uniqueKeyValMap.get(originator[0]);
context.write(new Text(sentence),
NullWritable.get());
}
}
MapReduce iii)MapReduce Job for finding the Diplicate IP fields
/*Note:
This MapReduce program reads the data from the last LIbSvmMapper MR Program and Reads unique records.It avoids writing duplicate records.
Input data:
121 0 155 175
122 1 157 176
121 0 155 175
123 0 155 175
122 1 157 176
128 1 157 176
Output Data:
121 0 155 175
122 1 157 176
123 0 155 175
128 1 157 176
*/
import
org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import
org.apache.hadoop.util.ToolRunner;
public class DupIPDriver implements
Tool {
public static void main(String[] args)
throws Exception {
int ec=ToolRunner.run(new
DupIPDriver(), args);
System.out.println(ec);
}
public int run(String[] args) throws
Exception {
Path lsOpath=new
Path(args[0]);////LibSvm Data format in numeric form
Path dupIPoutput=new
Path(args[1]);//List of Duplicate value records
Configuration conf=new
Configuration();
Job dupipJob=Job.getInstance(conf,
"FindDupIpJob");
dupipJob.setJarByClass(DupIPDriver.class);
dupipJob.setMapperClass(DupIPMapper.class);
dupipJob.setReducerClass(DupIpReducer.class);
FileInputFormat.setInputPaths(dupipJob,
lsOpath+"/part-m-*");
FileOutputFormat.setOutputPath(dupipJob,
dupIPoutput);
//This UniqueValData file MapReduce code output available in previous MapReduce code LibSvmMapper //Class.
dupipJob.addCacheFile(new
URI("<ServerName&Path>/UniqueValData"));
dupipJob.setMapOutputKeyClass(Text.class);
dupipJob.setMapOutputValueClass(Text.class);
dupipJob.setOutputKeyClass(Text.class);
dupipJob.setOutputValueClass(NullWritable.class);
dupipJob.waitForCompletion(true);
}
}
Mapper Code:
package com.mr.dupip.replaceval;
import java.io.IOException;
import
org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Mapper;
public class DupIPMapper extends
Mapper<LongWritable,Text, Text,Text>{
protected void map(LongWritable key,
Text value, Context context) throws IOException, InterruptedException
{
String line=value.toString();
String[] words=line.split(" ");
context.write(new
Text(words[2].trim()),new Text(line.trim()));
}
}
Reducer Code:
package com.mr.dupip.replaceval;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Reducer;
import
org.apache.hadoop.mapreduce.Reducer.Context;
public class DupIpReducer extends
Reducer<Text, Text, Text, NullWritable>{
HashSet<String> hs;
protected void setup(Context context)
throws IOException {
hs=new HashSet<String>();
}
public void reduce(Text key,
Iterable<Text> values,Context context) throws IOException,
InterruptedException {
for(Text line:values){
String line1=line.toString();
hs.add(line1);
}
for(String eachLine:hs){
if(hs.size()>1){
context.write(new
Text(eachLine),NullWritable.get());
}
}
hs.clear();
}
}
MapReduce ii)mapReduce Job For Converting all text data into numeric Data by assigning a unique code for each field.
/*
Its a MapSide join.This mapper convert all text data fields into a numeric data for easy processing needed problems.After all procesing has done we can get back the numeric values into our original text data.
That conversion code will be provided into next blog.
Input data Sample for converting into Text data into Numeric data
Input data Sample for converting into Text data into Numeric data
Input Data Example:
100.20.20.0
India
150.100.0.20 00.120.0.abc.txxyt.00
101.21.21.1
USA 151.100.0.21
11.101.axc.asdfs.212
102.20.20.0
India
150.100.0.20 00.120.0.abc.txxyt.00
103.21.21.1
USA 151.100.0.21
11.101.axc.asdfs.213
. . .
*/
import
org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import
org.apache.hadoop.util.ToolRunner;
public class DupIPDriver implements
Tool {
public static void main(String[] args)
throws Exception {
int ec=ToolRunner.run(new
DupIPDriver(), args);
System.out.println(ec);
}
public int run(String[] args) throws
Exception {
Path filterData=new
Path(args[0]);//Filterin Data with 4 columns
Path lsOpath=new
Path(args[1]);////LibSvm Data format in numeric form
Configuration conf=new
Configuration();
Job
lsDataConvertjob=Job.getInstance(conf,"LibSVMdata");
lsDataConvertjob.setJarByClass(DupIPDriver.class);
lsDataConvertjob.setMapperClass(LibSvmMapper.class);
lsDataConvertjob.setNumReduceTasks(0);
lsDataConvertjob.setMapOutputKeyClass(Text.class);
lsDataConvertjob.setMapOutputValueClass(NullWritable.class);
lsDataConvertjob.setOutputKeyClass(Text.class);
lsDataConvertjob.setOutputValueClass(NullWritable.class);
//lsDataConvertjob.addCacheFile(new
URI(wcOpath+"/part-*"));
//This cache file is hard coded.It
is taken from the wordcount program output file for .
//this word count program has been written in the last blog.
//DataConvertjob.addCacheFile(new
URI("<ServerName & Path>/DWordCount/part-r-00000"));
lsDataConvertjob.addCacheFile(new
URI("<ServerName&Path>/DWordCount/part-r-00000"));
FileInputFormat.setInputPaths(lsDataConvertjob,filterData+"/part-*");
FileOutputFormat.setOutputPath(lsDataConvertjob,
lsOpath);
lsDataConvertjob.waitForCompletion(true);
}
}
Mapper Code:
public class LibSvmMapper extends
Mapper<LongWritable, Text, Text, NullWritable> {
public Map<String, String> count
= new HashMap<String, String>();
public Map<String, String>
uniqueId = new HashMap<String, String>();
protected void setup(Context context)
throws IOException {
URI[] uris = context.getCacheFiles();
for (URI uri : uris) {
System.out.println("~~~Cache
File ::::" + uri.toString());
}
FileSystem fs =
FileSystem.get(context.getConfiguration());
Path cacheFile = new Path(uris[0]);
BufferedReader bf = new
BufferedReader(new InputStreamReader(
fs.open(cacheFile)));
// BufferedReader bf = new
BufferedReader(new
//
FileReader(cacheFiles[0].toString()));
String setupData = null;
int uniqueIdValue = 1;
while ((setupData = bf.readLine()) !=
null) {
String[] words =
setupData.split("\t");
for (int i = 0; i < words.length;
i++) {
count.put(words[0], words[1]);
uniqueId.put(words[0],
""+uniqueIdValue);
uniqueIdValue++;
}
}
//FSDataOutputStream out =
fs.create(new Path("<ServerName&Path>/UniqueValData"));
FSDataOutputStream out =
fs.create(new Path("<ServerName&Path>/UniqueValData"));
for (Entry<String, String>
entry : uniqueId.entrySet()){
String
sentence=entry.getKey()+"~"+entry.getValue();
out.writeBytes(sentence);
out.writeBytes("\n");
}
}
protected void map(LongWritable key,
Text value, Context context)
throws IOException,
InterruptedException {
String s = value.toString();
String words[] = s.split(" ");
for(int i=0; i<words.length;i++){
opRow.append(uniqueId.get(words[i])).append(":").append(count.get(words[i])).append("
");
}
context.write(new
Text(opRow.toString().trim()), NullWritable.get());
}
}
MapReduce i)WordCount Program
Data set Example:
Input Data Example:
100.20.20.0 India 150.100.0.20 00.120.0.abc.txxyt.00
Input Data Example:
100.20.20.0 India 150.100.0.20 00.120.0.abc.txxyt.00
101.21.21.1 USA 151.100.0.21 11.101.axc.asdfs.212
102.20.20.0 India 150.100.0.20 00.120.0.abc.txxyt.00
103.21.21.1 USA 151.100.0.21 11.101.axc.asdfs.213
. . .
. . . .
102.20.20.0 India 150.100.0.20 00.120.0.abc.txxyt.00
103.21.21.1 USA 151.100.0.21 11.101.axc.asdfs.213
. . .
. . . .
MapReduce code for WordCount
Example:
import
org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import
org.apache.hadoop.util.ToolRunner;
public class DupIPDriver implements
Tool {
public static void main(String[] args)
throws Exception {
int ec=ToolRunner.run(new
DupIPDriver(), args);
System.out.println(ec);
}
public int run(String[] args) throws
Exception {
Path inpath=new Path(args[0]);//Raw
Syslog Data
Path wcOpath=new
Path(args[1]);//List of Wordcounts
Configuration conf=new
Configuration();
Job
wordCountjob=Job.getInstance(conf,"WordCount");
wordCountjob.setJarByClass(DupIPDriver.class);
wordCountjob.setMapperClass(WordMap.class);
wordCountjob.setReducerClass(WordReducer.class);
wordCountjob.setNumReduceTasks(1);
/*
wordCountjob.setMapOutputKeyClass(Text.class);
wordCountjob.setMapOutputValueClass(Text.class);
*/
wordCountjob.setOutputKeyClass(Text.class);
wordCountjob.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(wordCountjob,
inpath);
FileOutputFormat.setOutputPath(wordCountjob,
wcOpath);
wordCountjob.waitForCompletion(true);
}
}
Maper Code:
package com.mr.dupip.replaceval;
import java.io.IOException;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Mapper;
public class WordMap extends
Mapper<LongWritable,Text, Text, IntWritable>{
public void map(LongWritable key, Text
value,Context context) throws IOException, InterruptedException {
String s=value.toString();
for (String word : s.split(" "))
{
if (word.length()>0) {
context.write(new Text(word),new
IntWritable(1));
}
}
}
}
Reducer Code:
package com.mr.dupip.replaceval;
import java.io.IOException;
import java.util.Iterator;
import
org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Reducer;
import
org.apache.hadoop.mapreduce.Reducer.Context;
public class WordReducer extends
Reducer<Text, IntWritable, Text, IntWritable>{
public void reduce(Text key,
Iterable<IntWritable> values,Context context) throws
IOException, InterruptedException {
int count=0;
for(IntWritable a : values){
int i=a.get();
count+=i;
}
context.write(key, new
IntWritable(count));
}
}
Subscribe to:
Posts (Atom)