Saturday, August 1, 2015


Maven Project build for Storm-Kafka programs


/*Note:
The following is the pom.xml file for Storm and kafka Projects build using Maven
*/

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.comcom.nato.rt</groupId>
    <artifactId>storm-rt-usecases</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>storm-rt-usecases</name>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.9.2</artifactId>
            <version>0.8.1.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>0.9.1-incubating</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>0.9.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-hbase</artifactId>
            <version>0.9.4</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>exec</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <executable>java</executable>
                    <includeProjectDependencies>true</includeProjectDependencies>
                    <includePluginDependencies>false</includePluginDependencies>
                    <classpathScope>compile</classpathScope>
                    <mainClass>${storm.topology}</mainClass>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <executions>
                    <execution>
                        <id>create-my-bundle</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                        <configuration>
                            <descriptorRefs>
                                <descriptorRef>jar-with-dependencies</descriptorRef>
                            </descriptorRefs>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

    </build>
</project>

MapReduce .iv)MapReduce Job for Replcacing the Numeric values into Actual values to only duplicate IP fields.

/*
Note:
This MR job  converts the input numeric data into text data which is converted in LIbSvmMapper.
The input data:
121  0  155  175
122  1  157   176
123  0  155  175
 128  1  157   176

output is:
100.20.20.0     India        150.100.0.20    00.120.0.abc.txxyt.00
101.21.21.1    USA        151.100.0.21    11.101.axc.asdfs.212
102.20.20.0    India        150.100.0.20    00.120.0.abc.txxyt.00
103.21.21.1    USA        151.100.0.21    11.101.axc.asdfs.213

*/

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DupIPDriver implements Tool {

public static void main(String[] args) throws Exception {
int ec=ToolRunner.run(new DupIPDriver(), args);
System.out.println(ec);
}

public int run(String[] args) throws Exception {

Path dupIPoutput=new Path(args[0]);//List of Duplicate value records
Path dupIPs=new Path(args[1]);// Final output with duplicate Ip's

Configuration conf=new Configuration();

Job replDupIpValJob=Job.getInstance(conf, "ReplaceDupIPValues");

replDupIpValJob.setJarByClass(DupIPDriver.class);

replDupIpValJob.setMapperClass(ReplValMapper.class);
//replDupIpValJob.setReducerClass(ReplValReducer.class);
replDupIpValJob.setNumReduceTasks(0);

replDupIpValJob.addCacheFile(new URI("<ServerName&PAth>UniqueValData"));

replDupIpValJob.setMapOutputKeyClass(Text.class);
replDupIpValJob.setMapOutputValueClass(Text.class);

replDupIpValJob.setOutputKeyClass(Text.class);
replDupIpValJob.setOutputValueClass(NullWritable.class);

FileInputFormat.setInputPaths(replDupIpValJob, new Path(dupIPoutput+"/part-r-00000"));
FileOutputFormat.setOutputPath(replDupIpValJob, dupIPs);

replDupIpValJob.waitForCompletion(true);
}
}


Mapper Code:
package com.mr.dupip.replaceval;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.*;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;

public class ReplValMapper extends Mapper<LongWritable,Text, Text,NullWritable>{

public Map<String, String> uniqueKeyValMap = new HashMap<String, String>();
/
protected void setup(Context context) throws IOException {
URI[] uris = context.getCacheFiles();

for (URI uri : uris) {
System.out.println("~~~Cache File ::::" + uri.toString());
}
FileSystem fs = FileSystem.get(context.getConfiguration());
Path cacheFile = new Path(uris[0]);
BufferedReader bf = new BufferedReader(new InputStreamReader(fs.open(cacheFile)));
String setupData = null;
while ((setupData = bf.readLine()) != null) {
//Eg:- c0:ea:e4:84:dd:8b~98
String[] words = setupData.split("~");
for (int i = 0; i < words.length; i++) {
uniqueKeyValMap.put(words[1], words[0]);
}
}
}
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//Eg:- 55:576 96:46 34 18:576 44:576
String line=value.toString();
String[] words=line.split(" ");
String[] source_ip=words[0].split(":");//55:576
//source_ipMap.put(source_ip[1], source_ip[0]);//576,55
String[] pattern=words[1].split(":");//96:46
//patternMap.put(pattern[1], pattern[0]);//46,96
String[] dest_ip=words[2].split(":");//18:576
//dest_ipMap.put(dest_ip[1], dest_ip[0]);//576,18
String[] originator=words[3].split(":");//44:576
//orignatorMap.put(originator[1],originator[0]);//576,44
String sentence="";
sentence=uniqueKeyValMap.get(source_ip[0])+" "+uniqueKeyValMap.get(pattern[0])+" "+uniqueKeyValMap.get(dest_ip[0])+" "+uniqueKeyValMap.get(originator[0]);
context.write(new Text(sentence), NullWritable.get());
}

}

MapReduce iii)MapReduce Job for finding the Diplicate IP fields



/*Note:
This MapReduce program reads the data from the last LIbSvmMapper MR Program and Reads unique records.It avoids writing duplicate records.
Input data:
121  0  155  175
122  1  157   176
121  0  155  175
123  0  155  175
122  1  157   176
128  1  157   176

Output Data:
121  0  155  175
122  1  157   176
123  0  155  175
 128  1  157   176
*/
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DupIPDriver implements Tool {

public static void main(String[] args) throws Exception {
int ec=ToolRunner.run(new DupIPDriver(), args);
System.out.println(ec);
}

public int run(String[] args) throws Exception {


Path lsOpath=new Path(args[0]);////LibSvm Data format in numeric form
Path dupIPoutput=new Path(args[1]);//List of Duplicate value records


Configuration conf=new Configuration();

Job dupipJob=Job.getInstance(conf, "FindDupIpJob");
dupipJob.setJarByClass(DupIPDriver.class);

dupipJob.setMapperClass(DupIPMapper.class);
dupipJob.setReducerClass(DupIpReducer.class);

FileInputFormat.setInputPaths(dupipJob, lsOpath+"/part-m-*");
FileOutputFormat.setOutputPath(dupipJob, dupIPoutput);

//This UniqueValData file MapReduce code output available in previous MapReduce code LibSvmMapper //Class.
dupipJob.addCacheFile(new URI("<ServerName&Path>/UniqueValData"));
dupipJob.setMapOutputKeyClass(Text.class);
dupipJob.setMapOutputValueClass(Text.class);
dupipJob.setOutputKeyClass(Text.class);
dupipJob.setOutputValueClass(NullWritable.class);
dupipJob.waitForCompletion(true);
}
}
Mapper Code:
package com.mr.dupip.replaceval;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DupIPMapper extends Mapper<LongWritable,Text, Text,Text>{

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line=value.toString();
String[] words=line.split(" ");
context.write(new Text(words[2].trim()),new Text(line.trim()));
}
}
Reducer Code:
package com.mr.dupip.replaceval;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class DupIpReducer extends Reducer<Text, Text, Text, NullWritable>{

HashSet<String> hs;
protected void setup(Context context) throws IOException {
hs=new HashSet<String>();
}
public void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
for(Text line:values){
String line1=line.toString();
hs.add(line1);
}
for(String eachLine:hs){
if(hs.size()>1){
context.write(new Text(eachLine),NullWritable.get());
}
}
hs.clear();
}
}

MapReduce ii)mapReduce Job For Converting all text data into numeric Data by assigning a unique code for each field.

/*
Its a MapSide join.This mapper convert all text data fields into a numeric data for easy processing needed problems.After all procesing has done we can get back the numeric values into our  original text data.
That conversion code will be provided into next blog.
Input data Sample for converting into Text data into Numeric data

Input Data Example:
100.20.20.0     India        150.100.0.20    00.120.0.abc.txxyt.00
101.21.21.1    USA        151.100.0.21    11.101.axc.asdfs.212
102.20.20.0    India        150.100.0.20    00.120.0.abc.txxyt.00
103.21.21.1    USA        151.100.0.21    11.101.axc.asdfs.213
. . .
*/
import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;



public class DupIPDriver implements Tool {



public static void main(String[] args) throws Exception {

int ec=ToolRunner.run(new DupIPDriver(), args);

System.out.println(ec);

}



public int run(String[] args) throws Exception {




Path filterData=new Path(args[0]);//Filterin Data with 4 columns
Path lsOpath=new Path(args[1]);////LibSvm Data format in numeric form



Configuration conf=new Configuration();

Job lsDataConvertjob=Job.getInstance(conf,"LibSVMdata");

lsDataConvertjob.setJarByClass(DupIPDriver.class);


lsDataConvertjob.setMapperClass(LibSvmMapper.class);

lsDataConvertjob.setNumReduceTasks(0);


lsDataConvertjob.setMapOutputKeyClass(Text.class);

lsDataConvertjob.setMapOutputValueClass(NullWritable.class);


lsDataConvertjob.setOutputKeyClass(Text.class);

lsDataConvertjob.setOutputValueClass(NullWritable.class);


//lsDataConvertjob.addCacheFile(new URI(wcOpath+"/part-*"));

//This cache file is hard coded.It is taken from the wordcount program output file for .
//this word count program has been written in the last blog.

//DataConvertjob.addCacheFile(new URI("<ServerName & Path>/DWordCount/part-r-00000"));

lsDataConvertjob.addCacheFile(new URI("<ServerName&Path>/DWordCount/part-r-00000"));


FileInputFormat.setInputPaths(lsDataConvertjob,filterData+"/part-*");

FileOutputFormat.setOutputPath(lsDataConvertjob, lsOpath);



lsDataConvertjob.waitForCompletion(true);
}
}
Mapper Code:


public class LibSvmMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
public Map<String, String> count = new HashMap<String, String>();

public Map<String, String> uniqueId = new HashMap<String, String>();


protected void setup(Context context) throws IOException {

URI[] uris = context.getCacheFiles();



for (URI uri : uris) {

System.out.println("~~~Cache File ::::" + uri.toString());

}

FileSystem fs = FileSystem.get(context.getConfiguration());

Path cacheFile = new Path(uris[0]);

BufferedReader bf = new BufferedReader(new InputStreamReader(

fs.open(cacheFile)));

// BufferedReader bf = new BufferedReader(new

// FileReader(cacheFiles[0].toString()));



String setupData = null;

int uniqueIdValue = 1;

while ((setupData = bf.readLine()) != null) {

String[] words = setupData.split("\t");

for (int i = 0; i < words.length; i++) {

count.put(words[0], words[1]);

uniqueId.put(words[0], ""+uniqueIdValue);

uniqueIdValue++;

}

}


//FSDataOutputStream out = fs.create(new Path("<ServerName&Path>/UniqueValData"));

FSDataOutputStream out = fs.create(new Path("<ServerName&Path>/UniqueValData"));

for (Entry<String, String> entry : uniqueId.entrySet()){

String sentence=entry.getKey()+"~"+entry.getValue();

out.writeBytes(sentence);

out.writeBytes("\n");


}

}

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String s = value.toString();

String words[] = s.split(" ");





for(int i=0; i<words.length;i++){

opRow.append(uniqueId.get(words[i])).append(":").append(count.get(words[i])).append(" ");

}


context.write(new Text(opRow.toString().trim()), NullWritable.get());



}

}



MapReduce i)WordCount Program





Data set Example:


Input Data Example:
100.20.20.0    India        150.100.0.20    00.120.0.abc.txxyt.00
101.21.21.1    USA        151.100.0.21    11.101.axc.asdfs.212
102.20.20.0    India        150.100.0.20    00.120.0.abc.txxyt.00
103.21.21.1    USA        151.100.0.21    11.101.axc.asdfs.213
. . .
. .  .  .
MapReduce code for WordCount Example:



import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DupIPDriver implements Tool {

public static void main(String[] args) throws Exception {
int ec=ToolRunner.run(new DupIPDriver(), args);
System.out.println(ec);
}

public int run(String[] args) throws Exception {
Path inpath=new Path(args[0]);//Raw Syslog Data
Path wcOpath=new Path(args[1]);//List of Wordcounts

Configuration conf=new Configuration();
Job wordCountjob=Job.getInstance(conf,"WordCount");
wordCountjob.setJarByClass(DupIPDriver.class);
wordCountjob.setMapperClass(WordMap.class);
wordCountjob.setReducerClass(WordReducer.class);
wordCountjob.setNumReduceTasks(1);
/*
wordCountjob.setMapOutputKeyClass(Text.class);
wordCountjob.setMapOutputValueClass(Text.class);
*/
wordCountjob.setOutputKeyClass(Text.class);
wordCountjob.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(wordCountjob, inpath);
FileOutputFormat.setOutputPath(wordCountjob, wcOpath);
wordCountjob.waitForCompletion(true);
}
}
Maper Code:
package com.mr.dupip.replaceval;


import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordMap extends Mapper<LongWritable,Text, Text, IntWritable>{

public void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
String s=value.toString();
for (String word : s.split(" ")) {
if (word.length()>0) {
context.write(new Text(word),new IntWritable(1));
}
}
}

}

Reducer Code:
package com.mr.dupip.replaceval;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int count=0;
for(IntWritable a : values){
int i=a.get();
count+=i;
}
context.write(key, new IntWritable(count));
}

}

Saturday, May 9, 2015

Apache Storm-Killing the Topology

Apache Storm
Killing the Storm topology

Once we submit the storm topology to the Cluster,if it's required to kill it we can use below command.
>storm kill {TopologyName}
Eg:

1.>storm kill SampleTopology
2.>storm kill StormTopology

The above topology names are in a single word.If we click on it in UI page ,it will take you to the navigation page.
If suppose the topology names are contain multiple words with spaces,then the above command will not work.
1.>storm kill Sample Topology
(This topology will not able to kill It throws an RunTimeException saying :"Exception in thread "main" NotAliveException(msg:SampleStorm is not alive)"  )
So for this we need to write a seperate java program to kill it.

java Program for Killing the Storm Topology names contain Spaces.
Program:

package com.storm.killing_topology;

import java.util.*;

import org.apache.thrift7.TException;

import backtype.storm.generated.KillOptions;
import backtype.storm.generated.Nimbus.Client;
import backtype.storm.generated.NotAliveException;
import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;

public class KillingTopo1ogy {
   
    public static void main(String[] args) throws NotAliveException, TException {
        Map conf = Utils.readStormConfig();
                Client client = NimbusClient.getConfiguredClient(conf).getClient();
                KillOptions killOpts = new KillOptions();
                //killOpts.set_wait_secs(waitSeconds) // time to wait before killing
                client.killTopologyWithOpts(" Topology Name", killOpts) ;//provide topology name
    }
}


here in place of the "Topology Name " in Quotes we can provide the topology  names which can have spaces in their names.

Then Execute the above program by using below command.

>storm jar SampleStorm.jar com.storm.killing_topology.KillingTopo1ogy



Apache Storm -Sample Storm Topology Program

Storm Topology submission

The following is the  sample apache storm program to submit the topology into  the cluster.
It contains the three parts.
1.Sample Spout program
2.Sample Bolt Program
3.Sample Topology submitter program

Now we can have a look on the Sample Spout program.
1.SampleStormSpout.java
package com.sample.storm_example;

import java.util.*;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class SampleStormSpout extends BaseRichSpout

{   
    private static final long serialVersionUID=2L;
    private SpoutOutputCollector spoutOutputCollector;
   
    private static final Map<Integer,String> map = new HashMap<Integer,String>();
    static
    {
        map.put(0,"Kuamr");
        map.put(1,"Jinto");
        map.put(2, "Bharath");       
    }
    public void open(Map conf, TopologyContext context,SpoutOutputCollector spoutOutputCollector) 

   {
        this.spoutOutputCollector=spoutOutputCollector;       
    }
    public void nextTuple() 

   {
        final Random rand=new Random();
        int randNum=rand.nextInt(3);
        spoutOutputCollector.emit(new Values(map.get(randNum)));       
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("site"));       
    }
}




 2.SampleStormBolt.java




package com.sample.storm_example;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

public class SampleStormBolt extends BaseBasicBolt

 {
    private static final long serialversionUID=2L;  
    public void execute(Tuple input, BasicOutputCollector collector) {   
        String test=input.getStringByField("site");
        System.out.println("Name of the input site ID is   "+test);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {     
    }
}



3.SampleTopology.java
package com.sample.storm.storm_example;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;


public class SampleTopology {
   
    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {   
        TopologyBuilder builder=new TopologyBuilder();       
        builder.setSpout("LearningStormSpout", new LearningStormSpout(),2);
        builder.setBolt("LearningStormBolt",new LearningStormBolt(),4);       
        Config config=new Config();
        config.setDebug(true);       
        StormSubmitter.submitTopology("Storm_Learning_Topology",config,builder.createTopology()); 

/*If we want to run this program in local cluster use below code instead of above StormSubmitter class
Code Snippet:
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("LocalClusterTolology", conf, builder.createTopology());
        Thread.sleep(10000);
        cluster.shutdown();


*/   
            }
}



Note:
The above project has to be created as a  Maven Project.
pom.xml
<repositories>
  <repository>
    <id>clojars.org</id>
    <url>http://clojars.org/repo</url>
  </repository>
</repositories>
<dependencies>
  <dependency>
    <groupId>storm</groupId>
    <artifactId>storm</artifactId>
    <version>0.9.0.1</version>
    <exclusions>
      <exclusion>
        <artifactId>log4j-over-slf4j</artifactId>
        <groupId>org.slf4j</groupId>
      </exclusion>
    </exclusions>
  </dependency>
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-core</artifactId>
    <version>1.1.1</version>
  </dependency>
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.7</version>
  </dependency>
  <dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase</artifactId>
    <version>0.94.5</version>
    <exclusions>
      <exclusion>
        <artifactId>zookeeper</artifactId>
        <groupId>org.apache.zookeeper</groupId>
      </exclusion>
    </exclusions>
  </dependency>
  <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.10</version>
  </dependency>
</dependencies>


Execution Procedure:
1.Run it as a Maven install .
2.get the jar
3.Place the jar at storm executable location.
4.Use the below command to run the storm jar.
>storm jar SampleStorm-0.0.1-SNAPSHOT.jar com.sample.storm_example.SampleTopology
(Jar will submitted to the Cluster and check it in the UI )