Storm Topology submission
The following is the sample apache storm program to submit the topology into the cluster.
It contains the three parts.
1.Sample Spout program
2.Sample Bolt Program
3.Sample Topology submitter program
Now we can have a look on the Sample Spout program.
1.SampleStormSpout.java
package com.sample.storm_example;
import java.util.*;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class SampleStormSpout extends BaseRichSpout
{
private static final long serialVersionUID=2L;
private SpoutOutputCollector spoutOutputCollector;
private static final Map<Integer,String> map = new HashMap<Integer,String>();
static
{
map.put(0,"Kuamr");
map.put(1,"Jinto");
map.put(2, "Bharath");
}
public void open(Map conf, TopologyContext context,SpoutOutputCollector spoutOutputCollector)
{
this.spoutOutputCollector=spoutOutputCollector;
}
public void nextTuple()
{
final Random rand=new Random();
int randNum=rand.nextInt(3);
spoutOutputCollector.emit(new Values(map.get(randNum)));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("site"));
}
}
2.SampleStormBolt.java
package com.sample.storm_example;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
public class SampleStormBolt extends BaseBasicBolt
{
private static final long serialversionUID=2L;
public void execute(Tuple input, BasicOutputCollector collector) {
String test=input.getStringByField("site");
System.out.println("Name of the input site ID is "+test);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
3.SampleTopology.java
package com.sample.storm.storm_example;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
public class SampleTopology {
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
TopologyBuilder builder=new TopologyBuilder();
builder.setSpout("LearningStormSpout", new LearningStormSpout(),2);
builder.setBolt("LearningStormBolt",new LearningStormBolt(),4);
Config config=new Config();
config.setDebug(true);
StormSubmitter.submitTopology("Storm_Learning_Topology",config,builder.createTopology());
/*If we want to run this program in local cluster use below code instead of above StormSubmitter class
Code Snippet:
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalClusterTolology", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
*/
}
}
Note:
The above project has to be created as a Maven Project.
pom.xml
<repositories>
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.9.0.1</version>
<exclusions>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>0.94.5</version>
<exclusions>
<exclusion>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.zookeeper</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
</dependency>
</dependencies>
Execution Procedure:
1.Run it as a Maven install .
2.get the jar
3.Place the jar at storm executable location.
4.Use the below command to run the storm jar.
>storm jar SampleStorm-0.0.1-SNAPSHOT.jar com.sample.storm_example.SampleTopology
(Jar will submitted to the Cluster and check it in the UI )
The following is the sample apache storm program to submit the topology into the cluster.
It contains the three parts.
1.Sample Spout program
2.Sample Bolt Program
3.Sample Topology submitter program
Now we can have a look on the Sample Spout program.
1.SampleStormSpout.java
package com.sample.storm_example;
import java.util.*;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class SampleStormSpout extends BaseRichSpout
{
private static final long serialVersionUID=2L;
private SpoutOutputCollector spoutOutputCollector;
private static final Map<Integer,String> map = new HashMap<Integer,String>();
static
{
map.put(0,"Kuamr");
map.put(1,"Jinto");
map.put(2, "Bharath");
}
public void open(Map conf, TopologyContext context,SpoutOutputCollector spoutOutputCollector)
{
this.spoutOutputCollector=spoutOutputCollector;
}
public void nextTuple()
{
final Random rand=new Random();
int randNum=rand.nextInt(3);
spoutOutputCollector.emit(new Values(map.get(randNum)));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("site"));
}
}
2.SampleStormBolt.java
package com.sample.storm_example;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
public class SampleStormBolt extends BaseBasicBolt
{
private static final long serialversionUID=2L;
public void execute(Tuple input, BasicOutputCollector collector) {
String test=input.getStringByField("site");
System.out.println("Name of the input site ID is "+test);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
3.SampleTopology.java
package com.sample.storm.storm_example;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
public class SampleTopology {
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
TopologyBuilder builder=new TopologyBuilder();
builder.setSpout("LearningStormSpout", new LearningStormSpout(),2);
builder.setBolt("LearningStormBolt",new LearningStormBolt(),4);
Config config=new Config();
config.setDebug(true);
StormSubmitter.submitTopology("Storm_Learning_Topology",config,builder.createTopology());
/*If we want to run this program in local cluster use below code instead of above StormSubmitter class
Code Snippet:
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalClusterTolology", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
*/
}
}
Note:
The above project has to be created as a Maven Project.
pom.xml
<repositories>
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.9.0.1</version>
<exclusions>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>0.94.5</version>
<exclusions>
<exclusion>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.zookeeper</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
</dependency>
</dependencies>
Execution Procedure:
1.Run it as a Maven install .
2.get the jar
3.Place the jar at storm executable location.
4.Use the below command to run the storm jar.
>storm jar SampleStorm-0.0.1-SNAPSHOT.jar com.sample.storm_example.SampleTopology
(Jar will submitted to the Cluster and check it in the UI )
I really appreciate information shared above. It’s of great help. If someone want to learn Online (Virtual) instructor led live training in Apache Storm, kindly contact us http://www.maxmunus.com/contact
ReplyDeleteMaxMunus Offer World Class Virtual Instructor led training on Apache Storm. We have industry expert trainer. We provide Training Material and Software Support. MaxMunus has successfully conducted 100000+ trainings in India, USA, UK, Australlia, Switzerland, Qatar, Saudi Arabia, Bangladesh, Bahrain and UAE etc.
For Demo Contact us.
Nitesh Kumar
MaxMunus
E-mail: nitesh@maxmunus.com
Skype id: nitesh_maxmunus
Ph:(+91) 8553912023
http://www.maxmunus.com/