Saturday, May 9, 2015

Apache Storm-Killing the Topology

Apache Storm
Killing the Storm topology

Once we submit the storm topology to the Cluster,if it's required to kill it we can use below command.
>storm kill {TopologyName}
Eg:

1.>storm kill SampleTopology
2.>storm kill StormTopology

The above topology names are in a single word.If we click on it in UI page ,it will take you to the navigation page.
If suppose the topology names are contain multiple words with spaces,then the above command will not work.
1.>storm kill Sample Topology
(This topology will not able to kill It throws an RunTimeException saying :"Exception in thread "main" NotAliveException(msg:SampleStorm is not alive)"  )
So for this we need to write a seperate java program to kill it.

java Program for Killing the Storm Topology names contain Spaces.
Program:

package com.storm.killing_topology;

import java.util.*;

import org.apache.thrift7.TException;

import backtype.storm.generated.KillOptions;
import backtype.storm.generated.Nimbus.Client;
import backtype.storm.generated.NotAliveException;
import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;

public class KillingTopo1ogy {
   
    public static void main(String[] args) throws NotAliveException, TException {
        Map conf = Utils.readStormConfig();
                Client client = NimbusClient.getConfiguredClient(conf).getClient();
                KillOptions killOpts = new KillOptions();
                //killOpts.set_wait_secs(waitSeconds) // time to wait before killing
                client.killTopologyWithOpts(" Topology Name", killOpts) ;//provide topology name
    }
}


here in place of the "Topology Name " in Quotes we can provide the topology  names which can have spaces in their names.

Then Execute the above program by using below command.

>storm jar SampleStorm.jar com.storm.killing_topology.KillingTopo1ogy



Apache Storm -Sample Storm Topology Program

Storm Topology submission

The following is the  sample apache storm program to submit the topology into  the cluster.
It contains the three parts.
1.Sample Spout program
2.Sample Bolt Program
3.Sample Topology submitter program

Now we can have a look on the Sample Spout program.
1.SampleStormSpout.java
package com.sample.storm_example;

import java.util.*;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class SampleStormSpout extends BaseRichSpout

{   
    private static final long serialVersionUID=2L;
    private SpoutOutputCollector spoutOutputCollector;
   
    private static final Map<Integer,String> map = new HashMap<Integer,String>();
    static
    {
        map.put(0,"Kuamr");
        map.put(1,"Jinto");
        map.put(2, "Bharath");       
    }
    public void open(Map conf, TopologyContext context,SpoutOutputCollector spoutOutputCollector) 

   {
        this.spoutOutputCollector=spoutOutputCollector;       
    }
    public void nextTuple() 

   {
        final Random rand=new Random();
        int randNum=rand.nextInt(3);
        spoutOutputCollector.emit(new Values(map.get(randNum)));       
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("site"));       
    }
}




 2.SampleStormBolt.java




package com.sample.storm_example;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

public class SampleStormBolt extends BaseBasicBolt

 {
    private static final long serialversionUID=2L;  
    public void execute(Tuple input, BasicOutputCollector collector) {   
        String test=input.getStringByField("site");
        System.out.println("Name of the input site ID is   "+test);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {     
    }
}



3.SampleTopology.java
package com.sample.storm.storm_example;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;


public class SampleTopology {
   
    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {   
        TopologyBuilder builder=new TopologyBuilder();       
        builder.setSpout("LearningStormSpout", new LearningStormSpout(),2);
        builder.setBolt("LearningStormBolt",new LearningStormBolt(),4);       
        Config config=new Config();
        config.setDebug(true);       
        StormSubmitter.submitTopology("Storm_Learning_Topology",config,builder.createTopology()); 

/*If we want to run this program in local cluster use below code instead of above StormSubmitter class
Code Snippet:
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("LocalClusterTolology", conf, builder.createTopology());
        Thread.sleep(10000);
        cluster.shutdown();


*/   
            }
}



Note:
The above project has to be created as a  Maven Project.
pom.xml
<repositories>
  <repository>
    <id>clojars.org</id>
    <url>http://clojars.org/repo</url>
  </repository>
</repositories>
<dependencies>
  <dependency>
    <groupId>storm</groupId>
    <artifactId>storm</artifactId>
    <version>0.9.0.1</version>
    <exclusions>
      <exclusion>
        <artifactId>log4j-over-slf4j</artifactId>
        <groupId>org.slf4j</groupId>
      </exclusion>
    </exclusions>
  </dependency>
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-core</artifactId>
    <version>1.1.1</version>
  </dependency>
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.7</version>
  </dependency>
  <dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase</artifactId>
    <version>0.94.5</version>
    <exclusions>
      <exclusion>
        <artifactId>zookeeper</artifactId>
        <groupId>org.apache.zookeeper</groupId>
      </exclusion>
    </exclusions>
  </dependency>
  <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.10</version>
  </dependency>
</dependencies>


Execution Procedure:
1.Run it as a Maven install .
2.get the jar
3.Place the jar at storm executable location.
4.Use the below command to run the storm jar.
>storm jar SampleStorm-0.0.1-SNAPSHOT.jar com.sample.storm_example.SampleTopology
(Jar will submitted to the Cluster and check it in the UI )