Saturday, August 1, 2015
Maven Project build for Storm-Kafka programs
/*Note:
The following is the pom.xml file for Storm and kafka Projects build using Maven
*/
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.comcom.nato.rt</groupId>
<artifactId>storm-rt-usecases</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>storm-rt-usecases</name>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.1-incubating</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.4</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hbase</artifactId>
<version>0.9.4</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<classpathScope>compile</classpathScope>
<mainClass>${storm.topology}</mainClass>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<id>create-my-bundle</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
MapReduce .iv)MapReduce Job for Replcacing the Numeric values into Actual values to only duplicate IP fields.
/*
Note:
This MR job converts the input numeric data into text data which is converted in LIbSvmMapper.
The input data:
121 0 155 175
122 1 157 176
123 0 155 175
128 1 157 176
output is:
*/
Note:
This MR job converts the input numeric data into text data which is converted in LIbSvmMapper.
The input data:
121 0 155 175
122 1 157 176
123 0 155 175
128 1 157 176
output is:
100.20.20.0
India
150.100.0.20 00.120.0.abc.txxyt.00
101.21.21.1
USA 151.100.0.21
11.101.axc.asdfs.212
102.20.20.0
India
150.100.0.20 00.120.0.abc.txxyt.00
103.21.21.1
USA 151.100.0.21
11.101.axc.asdfs.213
*/
import
org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import
org.apache.hadoop.util.ToolRunner;
public class DupIPDriver implements
Tool {
public static void main(String[] args)
throws Exception {
int ec=ToolRunner.run(new
DupIPDriver(), args);
System.out.println(ec);
}
public int run(String[] args) throws
Exception {
Path dupIPoutput=new
Path(args[0]);//List of Duplicate value records
Path dupIPs=new Path(args[1]);//
Final output with duplicate Ip's
Configuration conf=new
Configuration();
Job
replDupIpValJob=Job.getInstance(conf, "ReplaceDupIPValues");
replDupIpValJob.setJarByClass(DupIPDriver.class);
replDupIpValJob.setMapperClass(ReplValMapper.class);
//replDupIpValJob.setReducerClass(ReplValReducer.class);
replDupIpValJob.setNumReduceTasks(0);
replDupIpValJob.addCacheFile(new
URI("<ServerName&PAth>UniqueValData"));
replDupIpValJob.setMapOutputKeyClass(Text.class);
replDupIpValJob.setMapOutputValueClass(Text.class);
replDupIpValJob.setOutputKeyClass(Text.class);
replDupIpValJob.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(replDupIpValJob,
new Path(dupIPoutput+"/part-r-00000"));
FileOutputFormat.setOutputPath(replDupIpValJob,
dupIPs);
replDupIpValJob.waitForCompletion(true);
}
}
Mapper Code:
package com.mr.dupip.replaceval;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Mapper;
import
org.apache.hadoop.mapreduce.Mapper.Context;
public class ReplValMapper extends
Mapper<LongWritable,Text, Text,NullWritable>{
public Map<String, String>
uniqueKeyValMap = new HashMap<String, String>();
/
protected void setup(Context context)
throws IOException {
URI[] uris = context.getCacheFiles();
for (URI uri : uris) {
System.out.println("~~~Cache
File ::::" + uri.toString());
}
FileSystem fs =
FileSystem.get(context.getConfiguration());
Path cacheFile = new Path(uris[0]);
BufferedReader bf = new
BufferedReader(new InputStreamReader(fs.open(cacheFile)));
String setupData = null;
while ((setupData = bf.readLine()) !=
null) {
//Eg:- c0:ea:e4:84:dd:8b~98
String[] words =
setupData.split("~");
for (int i = 0; i < words.length;
i++) {
uniqueKeyValMap.put(words[1],
words[0]);
}
}
}
protected void map(LongWritable key,
Text value, Context context) throws IOException, InterruptedException
{
//Eg:- 55:576 96:46 34 18:576
44:576
String line=value.toString();
String[] words=line.split(" ");
String[]
source_ip=words[0].split(":");//55:576
//source_ipMap.put(source_ip[1],
source_ip[0]);//576,55
String[]
pattern=words[1].split(":");//96:46
//patternMap.put(pattern[1],
pattern[0]);//46,96
String[]
dest_ip=words[2].split(":");//18:576
//dest_ipMap.put(dest_ip[1],
dest_ip[0]);//576,18
String[]
originator=words[3].split(":");//44:576
//orignatorMap.put(originator[1],originator[0]);//576,44
String sentence="";
sentence=uniqueKeyValMap.get(source_ip[0])+"
"+uniqueKeyValMap.get(pattern[0])+"
"+uniqueKeyValMap.get(dest_ip[0])+"
"+uniqueKeyValMap.get(originator[0]);
context.write(new Text(sentence),
NullWritable.get());
}
}
MapReduce iii)MapReduce Job for finding the Diplicate IP fields
/*Note:
This MapReduce program reads the data from the last LIbSvmMapper MR Program and Reads unique records.It avoids writing duplicate records.
Input data:
121 0 155 175
122 1 157 176
121 0 155 175
123 0 155 175
122 1 157 176
128 1 157 176
Output Data:
121 0 155 175
122 1 157 176
123 0 155 175
128 1 157 176
*/
import
org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import
org.apache.hadoop.util.ToolRunner;
public class DupIPDriver implements
Tool {
public static void main(String[] args)
throws Exception {
int ec=ToolRunner.run(new
DupIPDriver(), args);
System.out.println(ec);
}
public int run(String[] args) throws
Exception {
Path lsOpath=new
Path(args[0]);////LibSvm Data format in numeric form
Path dupIPoutput=new
Path(args[1]);//List of Duplicate value records
Configuration conf=new
Configuration();
Job dupipJob=Job.getInstance(conf,
"FindDupIpJob");
dupipJob.setJarByClass(DupIPDriver.class);
dupipJob.setMapperClass(DupIPMapper.class);
dupipJob.setReducerClass(DupIpReducer.class);
FileInputFormat.setInputPaths(dupipJob,
lsOpath+"/part-m-*");
FileOutputFormat.setOutputPath(dupipJob,
dupIPoutput);
//This UniqueValData file MapReduce code output available in previous MapReduce code LibSvmMapper //Class.
dupipJob.addCacheFile(new
URI("<ServerName&Path>/UniqueValData"));
dupipJob.setMapOutputKeyClass(Text.class);
dupipJob.setMapOutputValueClass(Text.class);
dupipJob.setOutputKeyClass(Text.class);
dupipJob.setOutputValueClass(NullWritable.class);
dupipJob.waitForCompletion(true);
}
}
Mapper Code:
package com.mr.dupip.replaceval;
import java.io.IOException;
import
org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Mapper;
public class DupIPMapper extends
Mapper<LongWritable,Text, Text,Text>{
protected void map(LongWritable key,
Text value, Context context) throws IOException, InterruptedException
{
String line=value.toString();
String[] words=line.split(" ");
context.write(new
Text(words[2].trim()),new Text(line.trim()));
}
}
Reducer Code:
package com.mr.dupip.replaceval;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Reducer;
import
org.apache.hadoop.mapreduce.Reducer.Context;
public class DupIpReducer extends
Reducer<Text, Text, Text, NullWritable>{
HashSet<String> hs;
protected void setup(Context context)
throws IOException {
hs=new HashSet<String>();
}
public void reduce(Text key,
Iterable<Text> values,Context context) throws IOException,
InterruptedException {
for(Text line:values){
String line1=line.toString();
hs.add(line1);
}
for(String eachLine:hs){
if(hs.size()>1){
context.write(new
Text(eachLine),NullWritable.get());
}
}
hs.clear();
}
}
MapReduce ii)mapReduce Job For Converting all text data into numeric Data by assigning a unique code for each field.
/*
Its a MapSide join.This mapper convert all text data fields into a numeric data for easy processing needed problems.After all procesing has done we can get back the numeric values into our original text data.
That conversion code will be provided into next blog.
Input data Sample for converting into Text data into Numeric data
Input data Sample for converting into Text data into Numeric data
Input Data Example:
100.20.20.0
India
150.100.0.20 00.120.0.abc.txxyt.00
101.21.21.1
USA 151.100.0.21
11.101.axc.asdfs.212
102.20.20.0
India
150.100.0.20 00.120.0.abc.txxyt.00
103.21.21.1
USA 151.100.0.21
11.101.axc.asdfs.213
. . .
*/
import
org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import
org.apache.hadoop.util.ToolRunner;
public class DupIPDriver implements
Tool {
public static void main(String[] args)
throws Exception {
int ec=ToolRunner.run(new
DupIPDriver(), args);
System.out.println(ec);
}
public int run(String[] args) throws
Exception {
Path filterData=new
Path(args[0]);//Filterin Data with 4 columns
Path lsOpath=new
Path(args[1]);////LibSvm Data format in numeric form
Configuration conf=new
Configuration();
Job
lsDataConvertjob=Job.getInstance(conf,"LibSVMdata");
lsDataConvertjob.setJarByClass(DupIPDriver.class);
lsDataConvertjob.setMapperClass(LibSvmMapper.class);
lsDataConvertjob.setNumReduceTasks(0);
lsDataConvertjob.setMapOutputKeyClass(Text.class);
lsDataConvertjob.setMapOutputValueClass(NullWritable.class);
lsDataConvertjob.setOutputKeyClass(Text.class);
lsDataConvertjob.setOutputValueClass(NullWritable.class);
//lsDataConvertjob.addCacheFile(new
URI(wcOpath+"/part-*"));
//This cache file is hard coded.It
is taken from the wordcount program output file for .
//this word count program has been written in the last blog.
//DataConvertjob.addCacheFile(new
URI("<ServerName & Path>/DWordCount/part-r-00000"));
lsDataConvertjob.addCacheFile(new
URI("<ServerName&Path>/DWordCount/part-r-00000"));
FileInputFormat.setInputPaths(lsDataConvertjob,filterData+"/part-*");
FileOutputFormat.setOutputPath(lsDataConvertjob,
lsOpath);
lsDataConvertjob.waitForCompletion(true);
}
}
Mapper Code:
public class LibSvmMapper extends
Mapper<LongWritable, Text, Text, NullWritable> {
public Map<String, String> count
= new HashMap<String, String>();
public Map<String, String>
uniqueId = new HashMap<String, String>();
protected void setup(Context context)
throws IOException {
URI[] uris = context.getCacheFiles();
for (URI uri : uris) {
System.out.println("~~~Cache
File ::::" + uri.toString());
}
FileSystem fs =
FileSystem.get(context.getConfiguration());
Path cacheFile = new Path(uris[0]);
BufferedReader bf = new
BufferedReader(new InputStreamReader(
fs.open(cacheFile)));
// BufferedReader bf = new
BufferedReader(new
//
FileReader(cacheFiles[0].toString()));
String setupData = null;
int uniqueIdValue = 1;
while ((setupData = bf.readLine()) !=
null) {
String[] words =
setupData.split("\t");
for (int i = 0; i < words.length;
i++) {
count.put(words[0], words[1]);
uniqueId.put(words[0],
""+uniqueIdValue);
uniqueIdValue++;
}
}
//FSDataOutputStream out =
fs.create(new Path("<ServerName&Path>/UniqueValData"));
FSDataOutputStream out =
fs.create(new Path("<ServerName&Path>/UniqueValData"));
for (Entry<String, String>
entry : uniqueId.entrySet()){
String
sentence=entry.getKey()+"~"+entry.getValue();
out.writeBytes(sentence);
out.writeBytes("\n");
}
}
protected void map(LongWritable key,
Text value, Context context)
throws IOException,
InterruptedException {
String s = value.toString();
String words[] = s.split(" ");
for(int i=0; i<words.length;i++){
opRow.append(uniqueId.get(words[i])).append(":").append(count.get(words[i])).append("
");
}
context.write(new
Text(opRow.toString().trim()), NullWritable.get());
}
}
MapReduce i)WordCount Program
Data set Example:
Input Data Example:
100.20.20.0 India 150.100.0.20 00.120.0.abc.txxyt.00
Input Data Example:
100.20.20.0 India 150.100.0.20 00.120.0.abc.txxyt.00
101.21.21.1 USA 151.100.0.21 11.101.axc.asdfs.212
102.20.20.0 India 150.100.0.20 00.120.0.abc.txxyt.00
103.21.21.1 USA 151.100.0.21 11.101.axc.asdfs.213
. . .
. . . .
102.20.20.0 India 150.100.0.20 00.120.0.abc.txxyt.00
103.21.21.1 USA 151.100.0.21 11.101.axc.asdfs.213
. . .
. . . .
MapReduce code for WordCount
Example:
import
org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import
org.apache.hadoop.util.ToolRunner;
public class DupIPDriver implements
Tool {
public static void main(String[] args)
throws Exception {
int ec=ToolRunner.run(new
DupIPDriver(), args);
System.out.println(ec);
}
public int run(String[] args) throws
Exception {
Path inpath=new Path(args[0]);//Raw
Syslog Data
Path wcOpath=new
Path(args[1]);//List of Wordcounts
Configuration conf=new
Configuration();
Job
wordCountjob=Job.getInstance(conf,"WordCount");
wordCountjob.setJarByClass(DupIPDriver.class);
wordCountjob.setMapperClass(WordMap.class);
wordCountjob.setReducerClass(WordReducer.class);
wordCountjob.setNumReduceTasks(1);
/*
wordCountjob.setMapOutputKeyClass(Text.class);
wordCountjob.setMapOutputValueClass(Text.class);
*/
wordCountjob.setOutputKeyClass(Text.class);
wordCountjob.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(wordCountjob,
inpath);
FileOutputFormat.setOutputPath(wordCountjob,
wcOpath);
wordCountjob.waitForCompletion(true);
}
}
Maper Code:
package com.mr.dupip.replaceval;
import java.io.IOException;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Mapper;
public class WordMap extends
Mapper<LongWritable,Text, Text, IntWritable>{
public void map(LongWritable key, Text
value,Context context) throws IOException, InterruptedException {
String s=value.toString();
for (String word : s.split(" "))
{
if (word.length()>0) {
context.write(new Text(word),new
IntWritable(1));
}
}
}
}
Reducer Code:
package com.mr.dupip.replaceval;
import java.io.IOException;
import java.util.Iterator;
import
org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Reducer;
import
org.apache.hadoop.mapreduce.Reducer.Context;
public class WordReducer extends
Reducer<Text, IntWritable, Text, IntWritable>{
public void reduce(Text key,
Iterable<IntWritable> values,Context context) throws
IOException, InterruptedException {
int count=0;
for(IntWritable a : values){
int i=a.get();
count+=i;
}
context.write(key, new
IntWritable(count));
}
}
Saturday, May 9, 2015
Apache Storm-Killing the Topology
Apache Storm
Killing the Storm topology
Once we submit the storm topology to the Cluster,if it's required to kill it we can use below command.
>storm kill {TopologyName}
Eg:
1.>storm kill SampleTopology
2.>storm kill StormTopology
The above topology names are in a single word.If we click on it in UI page ,it will take you to the navigation page.
If suppose the topology names are contain multiple words with spaces,then the above command will not work.
1.>storm kill Sample Topology
(This topology will not able to kill It throws an RunTimeException saying :"Exception in thread "main" NotAliveException(msg:SampleStorm is not alive)" )
So for this we need to write a seperate java program to kill it.
java Program for Killing the Storm Topology names contain Spaces.
Program:
package com.storm.killing_topology;
import java.util.*;
import org.apache.thrift7.TException;
import backtype.storm.generated.KillOptions;
import backtype.storm.generated.Nimbus.Client;
import backtype.storm.generated.NotAliveException;
import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;
public class KillingTopo1ogy {
public static void main(String[] args) throws NotAliveException, TException {
Map conf = Utils.readStormConfig();
Client client = NimbusClient.getConfiguredClient(conf).getClient();
KillOptions killOpts = new KillOptions();
//killOpts.set_wait_secs(waitSeconds) // time to wait before killing
client.killTopologyWithOpts(" Topology Name", killOpts) ;//provide topology name
}
}
here in place of the "Topology Name " in Quotes we can provide the topology names which can have spaces in their names.
Then Execute the above program by using below command.
>storm jar SampleStorm.jar com.storm.killing_topology.KillingTopo1ogy
Killing the Storm topology
Once we submit the storm topology to the Cluster,if it's required to kill it we can use below command.
>storm kill {TopologyName}
Eg:
1.>storm kill SampleTopology
2.>storm kill StormTopology
The above topology names are in a single word.If we click on it in UI page ,it will take you to the navigation page.
If suppose the topology names are contain multiple words with spaces,then the above command will not work.
1.>storm kill Sample Topology
(This topology will not able to kill It throws an RunTimeException saying :"Exception in thread "main" NotAliveException(msg:SampleStorm is not alive)" )
So for this we need to write a seperate java program to kill it.
java Program for Killing the Storm Topology names contain Spaces.
Program:
package com.storm.killing_topology;
import java.util.*;
import org.apache.thrift7.TException;
import backtype.storm.generated.KillOptions;
import backtype.storm.generated.Nimbus.Client;
import backtype.storm.generated.NotAliveException;
import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;
public class KillingTopo1ogy {
public static void main(String[] args) throws NotAliveException, TException {
Map conf = Utils.readStormConfig();
Client client = NimbusClient.getConfiguredClient(conf).getClient();
KillOptions killOpts = new KillOptions();
//killOpts.set_wait_secs(waitSeconds) // time to wait before killing
client.killTopologyWithOpts(" Topology Name", killOpts) ;//provide topology name
}
}
here in place of the "Topology Name " in Quotes we can provide the topology names which can have spaces in their names.
Then Execute the above program by using below command.
>storm jar SampleStorm.jar com.storm.killing_topology.KillingTopo1ogy
Apache Storm -Sample Storm Topology Program
Storm Topology submission
The following is the sample apache storm program to submit the topology into the cluster.
It contains the three parts.
1.Sample Spout program
2.Sample Bolt Program
3.Sample Topology submitter program
Now we can have a look on the Sample Spout program.
1.SampleStormSpout.java
package com.sample.storm_example;
import java.util.*;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class SampleStormSpout extends BaseRichSpout
{
private static final long serialVersionUID=2L;
private SpoutOutputCollector spoutOutputCollector;
private static final Map<Integer,String> map = new HashMap<Integer,String>();
static
{
map.put(0,"Kuamr");
map.put(1,"Jinto");
map.put(2, "Bharath");
}
public void open(Map conf, TopologyContext context,SpoutOutputCollector spoutOutputCollector)
{
this.spoutOutputCollector=spoutOutputCollector;
}
public void nextTuple()
{
final Random rand=new Random();
int randNum=rand.nextInt(3);
spoutOutputCollector.emit(new Values(map.get(randNum)));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("site"));
}
}
2.SampleStormBolt.java
package com.sample.storm_example;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
public class SampleStormBolt extends BaseBasicBolt
{
private static final long serialversionUID=2L;
public void execute(Tuple input, BasicOutputCollector collector) {
String test=input.getStringByField("site");
System.out.println("Name of the input site ID is "+test);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
3.SampleTopology.java
package com.sample.storm.storm_example;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
public class SampleTopology {
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
TopologyBuilder builder=new TopologyBuilder();
builder.setSpout("LearningStormSpout", new LearningStormSpout(),2);
builder.setBolt("LearningStormBolt",new LearningStormBolt(),4);
Config config=new Config();
config.setDebug(true);
StormSubmitter.submitTopology("Storm_Learning_Topology",config,builder.createTopology());
/*If we want to run this program in local cluster use below code instead of above StormSubmitter class
Code Snippet:
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalClusterTolology", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
*/
}
}
Note:
The above project has to be created as a Maven Project.
pom.xml
<repositories>
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.9.0.1</version>
<exclusions>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>0.94.5</version>
<exclusions>
<exclusion>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.zookeeper</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
</dependency>
</dependencies>
Execution Procedure:
1.Run it as a Maven install .
2.get the jar
3.Place the jar at storm executable location.
4.Use the below command to run the storm jar.
>storm jar SampleStorm-0.0.1-SNAPSHOT.jar com.sample.storm_example.SampleTopology
(Jar will submitted to the Cluster and check it in the UI )
The following is the sample apache storm program to submit the topology into the cluster.
It contains the three parts.
1.Sample Spout program
2.Sample Bolt Program
3.Sample Topology submitter program
Now we can have a look on the Sample Spout program.
1.SampleStormSpout.java
package com.sample.storm_example;
import java.util.*;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class SampleStormSpout extends BaseRichSpout
{
private static final long serialVersionUID=2L;
private SpoutOutputCollector spoutOutputCollector;
private static final Map<Integer,String> map = new HashMap<Integer,String>();
static
{
map.put(0,"Kuamr");
map.put(1,"Jinto");
map.put(2, "Bharath");
}
public void open(Map conf, TopologyContext context,SpoutOutputCollector spoutOutputCollector)
{
this.spoutOutputCollector=spoutOutputCollector;
}
public void nextTuple()
{
final Random rand=new Random();
int randNum=rand.nextInt(3);
spoutOutputCollector.emit(new Values(map.get(randNum)));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("site"));
}
}
2.SampleStormBolt.java
package com.sample.storm_example;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
public class SampleStormBolt extends BaseBasicBolt
{
private static final long serialversionUID=2L;
public void execute(Tuple input, BasicOutputCollector collector) {
String test=input.getStringByField("site");
System.out.println("Name of the input site ID is "+test);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
3.SampleTopology.java
package com.sample.storm.storm_example;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
public class SampleTopology {
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
TopologyBuilder builder=new TopologyBuilder();
builder.setSpout("LearningStormSpout", new LearningStormSpout(),2);
builder.setBolt("LearningStormBolt",new LearningStormBolt(),4);
Config config=new Config();
config.setDebug(true);
StormSubmitter.submitTopology("Storm_Learning_Topology",config,builder.createTopology());
/*If we want to run this program in local cluster use below code instead of above StormSubmitter class
Code Snippet:
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalClusterTolology", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
*/
}
}
Note:
The above project has to be created as a Maven Project.
pom.xml
<repositories>
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.9.0.1</version>
<exclusions>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>0.94.5</version>
<exclusions>
<exclusion>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.zookeeper</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
</dependency>
</dependencies>
Execution Procedure:
1.Run it as a Maven install .
2.get the jar
3.Place the jar at storm executable location.
4.Use the below command to run the storm jar.
>storm jar SampleStorm-0.0.1-SNAPSHOT.jar com.sample.storm_example.SampleTopology
(Jar will submitted to the Cluster and check it in the UI )
Subscribe to:
Posts (Atom)