Showing posts with label Hadoop-Development. Show all posts
Showing posts with label Hadoop-Development. Show all posts

Thursday, May 26, 2016

Working with Sqoop2 import command.



Note:
Please configure PostgresSql as Sqoop2 metastore , then follow below steps.
Reference for Configuring ,please follow previous post " PostgresSql configuration for SQOOP2"

 This steps are tested in MapR-cluster-5.0,CentOS-6.6

 STEPS:
Step 1:
Check available links and Connectors.
sqoop:000> show link
+----+------+--------------+----------------+---------+
| Id | Name | Connector Id | Connector Name | Enabled |
+----+------+--------------+----------------+---------+
+----+------+--------------+----------------+---------+

sqoop:000> show connector
+----+------------------------+------------------+------------------------------------------------------
+----------------------+
| Id | Name | Version | Class | Supported Directions |
+----+------------------------+------------------+------------------------------------------------------
+----------------------+
| 1 | kite-connector | 1.99.6-mapr-1507 | org.apache.sqoop.connector.kite.KiteConnector |
FROM/TO |
| 2 | kafka-connector | 1.99.6-mapr-1507 | org.apache.sqoop.connector.kafka.KafkaConnector
| TO |
| 3 | hdfs-connector | 1.99.6-mapr-1507 | org.apache.sqoop.connector.hdfs.HdfsConnector |
FROM/TO |
| 4 | generic-jdbc-connector | 1.99.6-mapr-1507 |
org.apache.sqoop.connector.jdbc.GenericJdbcConnector | FROM/TO |
+----+------------------------+------------------+------------------------------------------------------
+----------------------+


Step 2:
Create a link for RDBMS( from which DB we would like to import Data
NOTE: Provide the Connecotr ID for Name :generic-jdbc-connector . For Ex here Id is 4 for -c
arguement.)

sqoop:000> create link -c 4
Creating link for connector with id 4
Please fill following values to create new link object
Name: <mysql>
Link configuration
JDBC Driver Class: com.mysql.jdbc.Driver
JDBC Connection String: jdbc:mysql://<DB HostName>/<Database>
Username: <sqoop>
Password: <*****>
JDBC Connection Properties:<Optional>
There are currently 0 values in the map:
entry#
New link was successfully created with validation status OK and persistent id 2

sqoop:000> show link
+----+-------+--------------+------------------------+---------+
| Id | Name | Connector Id | Connector Name | Enabled |
+----+-------+--------------+------------------------+---------+
| 2 | mysql | 4 | generic-jdbc-connector | true |
+----+-------+--------------+------------------------+---------+


Step 3:
Create a link for import location i.e. MFS location

sqoop:000> create link -c 3
Creating link for connector with id 3
Please fill following values to create new link object
Name: maprfs
Link configuration
HDFS URI:maprfs://<CLDB HostName>:7222
Hadoop conf directory: /opt/mapr/hadoop/hadoop-0.20.2/conf
New link was successfully created with validation status OK and persistent id 4

sqoop:000> show link
+----+--------+--------------+------------------------+---------+
| Id | Name | Connector Id | Connector Name | Enabled |
+----+--------+--------------+------------------------+---------+
| 2 | mysql | 4 | generic-jdbc-connector | true |
| 4 | maprfs | 3 | hdfs-connector | true |
+----+--------+--------------+------------------------+---------+


Step 4:
Create a Job

sqoop:000> create job --from 2 --to 4
Creating job for links with from id 2 and to id 4
Please fill following values to create new job object
Name: tetsjob
From database configuration
Schema name: mysql
Table name: <TableName>
Table SQL statement:<Optional>
Table column names:<Optional>
Partition column name: <Provide a ColumnNamefor Partitioning>
Null value allowed for the partition column: true
Boundary query:<Optional>
Incremental read
Check column:<Optional>
Last value:<Optional>
To HDFS configuration
Override null value:<Optional>
Null value:<Optional>
Output format:
0 : TEXT_FILE
1 : SEQUENCE_FILE
Choose: 0
Compression format:
0 : NONE
1 : DEFAULT
2 : DEFLATE
3 : GZIP
4 : BZIP2
5 : LZO
6 : LZ4
7 : SNAPPY
8 : CUSTOM
Choose: 0
Custom compression format:
Output directory: </MFS LOCATION NAME>
Append mode:<Optional>
Throttling resources
Extractors:<Optional>
Loaders:<Optional>
New job was successfully created with validation status OK and persistent id 12

sqoop:000> show job

sqoop:000> start job -j <Job Id>
Ex: start job -j 12
Submission details
Job ID: 12
Server URL:
Created by: mapr
Creation date:
Lastly updated by: mapr
External ID: job_<ID>
http://<Host>:8088/proxy/application_1461206632562_0005/
Source Connector schema: Schema{TABLE SCHEMA WILL BE DISPLAYED HERE}

sqoop:000> status job -j <JobID>

PostgresSql configuration for SQOOP2




Please follow below steps to configure the PostgresSql for Sqoop2.
(Sqoop2 will store it's metastore in PostgresSql.)
This one is specific to MapR-cluster  environment.

Required Steps:
Step 1:
Install the postgresql using below command
$ yum install postgresql-server

Step 2:
Start the postgresql service using below command
$ service postgresql initdb

Step 3: Change the parameter in the below specified file
$ vim /var/lib/pgsql/data/postgresql.conf
listen_addresses = <10.10.71.19 >
#Note : add IP of postgresql IP where it has installed.

Step 4:
Add parameters to below specified file
$ vim /var/lib/pgsql/data/pg_hba.conf

# "local" is for Unix domain socket connections only
#local all all ident
local all all trust
# IPv4 local connections:
#host all all 127.0.0.1/32 trust
host all all 10.10.72.78/32 trust

# IPv6 local connections:
host all all ::1/128 ident

Step 5:
Comment existing below parameters in the below and add new parameters values into the specified file.
$ vi /opt/mapr/sqoop/sqoop-2.0.0/server/conf/sqoop.properties

org.apache.sqoop.repository.jdbc.handler=org.apache.sqoop.repository.postgresql.PostgresqlRepositoryHandler
org.apache.sqoop.repository.jdbc.transaction.isolation=READ_COMMITTED
org.apache.sqoop.repository.jdbc.maximum.connections=10
org.apache.sqoop.repository.jdbc.url=jdbc:postgresql://10.10.72.110:5432/sqoop
org.apache.sqoop.repository.jdbc.driver=org.postgresql.Driver
org.apache.sqoop.repository.jdbc.user=sqoop
org.apache.sqoop.repository.jdbc.password=sqoop
#org.apache.sqoop.repository.jdbc.properties.property=value


Step 6:
Download Jar and place it into below path.
Downloadablw link:
Place the downloaded jar into this location.
/opt/mapr/sqoop/sqoop-2.0.0/lib

Step 7: Ecxecute below command.
$ chkconfig postgresql on

Step 8:
Start the postgresql shell using below command
$ psql -U postgres

Step 9:
Create a table using below command.

$ CREATE ROLE sqoop LOGIN ENCRYPTED PASSWORD 'sqoop'
NOSUPERUSER INHERIT CREATEDB NOCREATEROLE;

$ CREATE DATABASE "sqoop" WITH OWNER = sqoop TABLESPACE = pg_default;

Step 10:
login postgres
$/usr/bin/pg_ctl -D /var/lib/pgsql/data -l logfile start



Oozie installation in MapR platform for unsecured cluster



Steps:
Log-in as a root user and follow the below steps.
Step 1:
             $ cd /opt/mapr
             $ yum install mapr-oozie

Step 2: Add below properties into the " core-site.xml " file.
           
           $vi /opt/mapr/hadoop/hadoop-2.7.0/etc/hadoop/core-site.xml
 <property>
  <name>hadoop.proxyuser.mapr.hosts</name>
  <value>*</value>
</property>
<property>
  <name>hadoop.proxyuser.mapr.groups</name>
  <value>*</value>
</property>

Step 3: Re-configure the cluster using below  command.
$/opt/mapr/server/configure.sh   -R

Step 4: export the OOZIE_URL in CLI.

$export OOZIE_URL=http://10.10.80.242:11000/oozie

Step 5:
Start the oozie service from CLI using below command.
$maprcli node services -name oozie -action restart -nodes `hostname`

Step 6:Check the list of running of services using below command.
$maprcli node list -columns svc

Note:
These steps are for non-secure cluster only.


HP Vertica Cluster to HDFS platform Using SQOOP




Follow below steps to Achieve the importing from HP vertica cluster to HDFS.

Step 1:
Please download and add recent version of below jars to Sqoop library
vertica-jdbc-7.1.2-0.jar;
vertica-jdk5-6.1.3-0.jar
hadoop-vertica.jar

Step 2:
Please use below query to run  using sqoop.

>  sqoop import \
 --driver com.vertica.jdbc.Driver  \
--connect jdbc:vertica://<HOSTNAME>:5433/<DATABASE-NAME> \
 --username <UNAME> \
 -P \
 --table <TABLE-NAME> \
 --target-dir <TARGET-DIRECTORY-NAME>  \
--as-textfile  \
-m <No-Mappers>



Saturday, August 1, 2015

Maven Project build for Storm-Kafka programs


/*Note:
The following is the pom.xml file for Storm and kafka Projects build using Maven
*/

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.comcom.nato.rt</groupId>
    <artifactId>storm-rt-usecases</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>storm-rt-usecases</name>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.9.2</artifactId>
            <version>0.8.1.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>0.9.1-incubating</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>0.9.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-hbase</artifactId>
            <version>0.9.4</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>exec</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <executable>java</executable>
                    <includeProjectDependencies>true</includeProjectDependencies>
                    <includePluginDependencies>false</includePluginDependencies>
                    <classpathScope>compile</classpathScope>
                    <mainClass>${storm.topology}</mainClass>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <executions>
                    <execution>
                        <id>create-my-bundle</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                        <configuration>
                            <descriptorRefs>
                                <descriptorRef>jar-with-dependencies</descriptorRef>
                            </descriptorRefs>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

    </build>
</project>

MapReduce .iv)MapReduce Job for Replcacing the Numeric values into Actual values to only duplicate IP fields.

/*
Note:
This MR job  converts the input numeric data into text data which is converted in LIbSvmMapper.
The input data:
121  0  155  175
122  1  157   176
123  0  155  175
 128  1  157   176

output is:
100.20.20.0     India        150.100.0.20    00.120.0.abc.txxyt.00
101.21.21.1    USA        151.100.0.21    11.101.axc.asdfs.212
102.20.20.0    India        150.100.0.20    00.120.0.abc.txxyt.00
103.21.21.1    USA        151.100.0.21    11.101.axc.asdfs.213

*/

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DupIPDriver implements Tool {

public static void main(String[] args) throws Exception {
int ec=ToolRunner.run(new DupIPDriver(), args);
System.out.println(ec);
}

public int run(String[] args) throws Exception {

Path dupIPoutput=new Path(args[0]);//List of Duplicate value records
Path dupIPs=new Path(args[1]);// Final output with duplicate Ip's

Configuration conf=new Configuration();

Job replDupIpValJob=Job.getInstance(conf, "ReplaceDupIPValues");

replDupIpValJob.setJarByClass(DupIPDriver.class);

replDupIpValJob.setMapperClass(ReplValMapper.class);
//replDupIpValJob.setReducerClass(ReplValReducer.class);
replDupIpValJob.setNumReduceTasks(0);

replDupIpValJob.addCacheFile(new URI("<ServerName&PAth>UniqueValData"));

replDupIpValJob.setMapOutputKeyClass(Text.class);
replDupIpValJob.setMapOutputValueClass(Text.class);

replDupIpValJob.setOutputKeyClass(Text.class);
replDupIpValJob.setOutputValueClass(NullWritable.class);

FileInputFormat.setInputPaths(replDupIpValJob, new Path(dupIPoutput+"/part-r-00000"));
FileOutputFormat.setOutputPath(replDupIpValJob, dupIPs);

replDupIpValJob.waitForCompletion(true);
}
}


Mapper Code:
package com.mr.dupip.replaceval;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.*;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;

public class ReplValMapper extends Mapper<LongWritable,Text, Text,NullWritable>{

public Map<String, String> uniqueKeyValMap = new HashMap<String, String>();
/
protected void setup(Context context) throws IOException {
URI[] uris = context.getCacheFiles();

for (URI uri : uris) {
System.out.println("~~~Cache File ::::" + uri.toString());
}
FileSystem fs = FileSystem.get(context.getConfiguration());
Path cacheFile = new Path(uris[0]);
BufferedReader bf = new BufferedReader(new InputStreamReader(fs.open(cacheFile)));
String setupData = null;
while ((setupData = bf.readLine()) != null) {
//Eg:- c0:ea:e4:84:dd:8b~98
String[] words = setupData.split("~");
for (int i = 0; i < words.length; i++) {
uniqueKeyValMap.put(words[1], words[0]);
}
}
}
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//Eg:- 55:576 96:46 34 18:576 44:576
String line=value.toString();
String[] words=line.split(" ");
String[] source_ip=words[0].split(":");//55:576
//source_ipMap.put(source_ip[1], source_ip[0]);//576,55
String[] pattern=words[1].split(":");//96:46
//patternMap.put(pattern[1], pattern[0]);//46,96
String[] dest_ip=words[2].split(":");//18:576
//dest_ipMap.put(dest_ip[1], dest_ip[0]);//576,18
String[] originator=words[3].split(":");//44:576
//orignatorMap.put(originator[1],originator[0]);//576,44
String sentence="";
sentence=uniqueKeyValMap.get(source_ip[0])+" "+uniqueKeyValMap.get(pattern[0])+" "+uniqueKeyValMap.get(dest_ip[0])+" "+uniqueKeyValMap.get(originator[0]);
context.write(new Text(sentence), NullWritable.get());
}

}

MapReduce iii)MapReduce Job for finding the Diplicate IP fields



/*Note:
This MapReduce program reads the data from the last LIbSvmMapper MR Program and Reads unique records.It avoids writing duplicate records.
Input data:
121  0  155  175
122  1  157   176
121  0  155  175
123  0  155  175
122  1  157   176
128  1  157   176

Output Data:
121  0  155  175
122  1  157   176
123  0  155  175
 128  1  157   176
*/
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DupIPDriver implements Tool {

public static void main(String[] args) throws Exception {
int ec=ToolRunner.run(new DupIPDriver(), args);
System.out.println(ec);
}

public int run(String[] args) throws Exception {


Path lsOpath=new Path(args[0]);////LibSvm Data format in numeric form
Path dupIPoutput=new Path(args[1]);//List of Duplicate value records


Configuration conf=new Configuration();

Job dupipJob=Job.getInstance(conf, "FindDupIpJob");
dupipJob.setJarByClass(DupIPDriver.class);

dupipJob.setMapperClass(DupIPMapper.class);
dupipJob.setReducerClass(DupIpReducer.class);

FileInputFormat.setInputPaths(dupipJob, lsOpath+"/part-m-*");
FileOutputFormat.setOutputPath(dupipJob, dupIPoutput);

//This UniqueValData file MapReduce code output available in previous MapReduce code LibSvmMapper //Class.
dupipJob.addCacheFile(new URI("<ServerName&Path>/UniqueValData"));
dupipJob.setMapOutputKeyClass(Text.class);
dupipJob.setMapOutputValueClass(Text.class);
dupipJob.setOutputKeyClass(Text.class);
dupipJob.setOutputValueClass(NullWritable.class);
dupipJob.waitForCompletion(true);
}
}
Mapper Code:
package com.mr.dupip.replaceval;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DupIPMapper extends Mapper<LongWritable,Text, Text,Text>{

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line=value.toString();
String[] words=line.split(" ");
context.write(new Text(words[2].trim()),new Text(line.trim()));
}
}
Reducer Code:
package com.mr.dupip.replaceval;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class DupIpReducer extends Reducer<Text, Text, Text, NullWritable>{

HashSet<String> hs;
protected void setup(Context context) throws IOException {
hs=new HashSet<String>();
}
public void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
for(Text line:values){
String line1=line.toString();
hs.add(line1);
}
for(String eachLine:hs){
if(hs.size()>1){
context.write(new Text(eachLine),NullWritable.get());
}
}
hs.clear();
}
}

MapReduce ii)mapReduce Job For Converting all text data into numeric Data by assigning a unique code for each field.

/*
Its a MapSide join.This mapper convert all text data fields into a numeric data for easy processing needed problems.After all procesing has done we can get back the numeric values into our  original text data.
That conversion code will be provided into next blog.
Input data Sample for converting into Text data into Numeric data

Input Data Example:
100.20.20.0     India        150.100.0.20    00.120.0.abc.txxyt.00
101.21.21.1    USA        151.100.0.21    11.101.axc.asdfs.212
102.20.20.0    India        150.100.0.20    00.120.0.abc.txxyt.00
103.21.21.1    USA        151.100.0.21    11.101.axc.asdfs.213
. . .
*/
import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;



public class DupIPDriver implements Tool {



public static void main(String[] args) throws Exception {

int ec=ToolRunner.run(new DupIPDriver(), args);

System.out.println(ec);

}



public int run(String[] args) throws Exception {




Path filterData=new Path(args[0]);//Filterin Data with 4 columns
Path lsOpath=new Path(args[1]);////LibSvm Data format in numeric form



Configuration conf=new Configuration();

Job lsDataConvertjob=Job.getInstance(conf,"LibSVMdata");

lsDataConvertjob.setJarByClass(DupIPDriver.class);


lsDataConvertjob.setMapperClass(LibSvmMapper.class);

lsDataConvertjob.setNumReduceTasks(0);


lsDataConvertjob.setMapOutputKeyClass(Text.class);

lsDataConvertjob.setMapOutputValueClass(NullWritable.class);


lsDataConvertjob.setOutputKeyClass(Text.class);

lsDataConvertjob.setOutputValueClass(NullWritable.class);


//lsDataConvertjob.addCacheFile(new URI(wcOpath+"/part-*"));

//This cache file is hard coded.It is taken from the wordcount program output file for .
//this word count program has been written in the last blog.

//DataConvertjob.addCacheFile(new URI("<ServerName & Path>/DWordCount/part-r-00000"));

lsDataConvertjob.addCacheFile(new URI("<ServerName&Path>/DWordCount/part-r-00000"));


FileInputFormat.setInputPaths(lsDataConvertjob,filterData+"/part-*");

FileOutputFormat.setOutputPath(lsDataConvertjob, lsOpath);



lsDataConvertjob.waitForCompletion(true);
}
}
Mapper Code:


public class LibSvmMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
public Map<String, String> count = new HashMap<String, String>();

public Map<String, String> uniqueId = new HashMap<String, String>();


protected void setup(Context context) throws IOException {

URI[] uris = context.getCacheFiles();



for (URI uri : uris) {

System.out.println("~~~Cache File ::::" + uri.toString());

}

FileSystem fs = FileSystem.get(context.getConfiguration());

Path cacheFile = new Path(uris[0]);

BufferedReader bf = new BufferedReader(new InputStreamReader(

fs.open(cacheFile)));

// BufferedReader bf = new BufferedReader(new

// FileReader(cacheFiles[0].toString()));



String setupData = null;

int uniqueIdValue = 1;

while ((setupData = bf.readLine()) != null) {

String[] words = setupData.split("\t");

for (int i = 0; i < words.length; i++) {

count.put(words[0], words[1]);

uniqueId.put(words[0], ""+uniqueIdValue);

uniqueIdValue++;

}

}


//FSDataOutputStream out = fs.create(new Path("<ServerName&Path>/UniqueValData"));

FSDataOutputStream out = fs.create(new Path("<ServerName&Path>/UniqueValData"));

for (Entry<String, String> entry : uniqueId.entrySet()){

String sentence=entry.getKey()+"~"+entry.getValue();

out.writeBytes(sentence);

out.writeBytes("\n");


}

}

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String s = value.toString();

String words[] = s.split(" ");





for(int i=0; i<words.length;i++){

opRow.append(uniqueId.get(words[i])).append(":").append(count.get(words[i])).append(" ");

}


context.write(new Text(opRow.toString().trim()), NullWritable.get());



}

}



MapReduce i)WordCount Program





Data set Example:


Input Data Example:
100.20.20.0    India        150.100.0.20    00.120.0.abc.txxyt.00
101.21.21.1    USA        151.100.0.21    11.101.axc.asdfs.212
102.20.20.0    India        150.100.0.20    00.120.0.abc.txxyt.00
103.21.21.1    USA        151.100.0.21    11.101.axc.asdfs.213
. . .
. .  .  .
MapReduce code for WordCount Example:



import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DupIPDriver implements Tool {

public static void main(String[] args) throws Exception {
int ec=ToolRunner.run(new DupIPDriver(), args);
System.out.println(ec);
}

public int run(String[] args) throws Exception {
Path inpath=new Path(args[0]);//Raw Syslog Data
Path wcOpath=new Path(args[1]);//List of Wordcounts

Configuration conf=new Configuration();
Job wordCountjob=Job.getInstance(conf,"WordCount");
wordCountjob.setJarByClass(DupIPDriver.class);
wordCountjob.setMapperClass(WordMap.class);
wordCountjob.setReducerClass(WordReducer.class);
wordCountjob.setNumReduceTasks(1);
/*
wordCountjob.setMapOutputKeyClass(Text.class);
wordCountjob.setMapOutputValueClass(Text.class);
*/
wordCountjob.setOutputKeyClass(Text.class);
wordCountjob.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(wordCountjob, inpath);
FileOutputFormat.setOutputPath(wordCountjob, wcOpath);
wordCountjob.waitForCompletion(true);
}
}
Maper Code:
package com.mr.dupip.replaceval;


import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordMap extends Mapper<LongWritable,Text, Text, IntWritable>{

public void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
String s=value.toString();
for (String word : s.split(" ")) {
if (word.length()>0) {
context.write(new Text(word),new IntWritable(1));
}
}
}

}

Reducer Code:
package com.mr.dupip.replaceval;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int count=0;
for(IntWritable a : values){
int i=a.get();
count+=i;
}
context.write(key, new IntWritable(count));
}

}

Saturday, May 9, 2015

Apache Storm-Killing the Topology

Apache Storm
Killing the Storm topology

Once we submit the storm topology to the Cluster,if it's required to kill it we can use below command.
>storm kill {TopologyName}
Eg:

1.>storm kill SampleTopology
2.>storm kill StormTopology

The above topology names are in a single word.If we click on it in UI page ,it will take you to the navigation page.
If suppose the topology names are contain multiple words with spaces,then the above command will not work.
1.>storm kill Sample Topology
(This topology will not able to kill It throws an RunTimeException saying :"Exception in thread "main" NotAliveException(msg:SampleStorm is not alive)"  )
So for this we need to write a seperate java program to kill it.

java Program for Killing the Storm Topology names contain Spaces.
Program:

package com.storm.killing_topology;

import java.util.*;

import org.apache.thrift7.TException;

import backtype.storm.generated.KillOptions;
import backtype.storm.generated.Nimbus.Client;
import backtype.storm.generated.NotAliveException;
import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;

public class KillingTopo1ogy {
   
    public static void main(String[] args) throws NotAliveException, TException {
        Map conf = Utils.readStormConfig();
                Client client = NimbusClient.getConfiguredClient(conf).getClient();
                KillOptions killOpts = new KillOptions();
                //killOpts.set_wait_secs(waitSeconds) // time to wait before killing
                client.killTopologyWithOpts(" Topology Name", killOpts) ;//provide topology name
    }
}


here in place of the "Topology Name " in Quotes we can provide the topology  names which can have spaces in their names.

Then Execute the above program by using below command.

>storm jar SampleStorm.jar com.storm.killing_topology.KillingTopo1ogy



Apache Storm -Sample Storm Topology Program

Storm Topology submission

The following is the  sample apache storm program to submit the topology into  the cluster.
It contains the three parts.
1.Sample Spout program
2.Sample Bolt Program
3.Sample Topology submitter program

Now we can have a look on the Sample Spout program.
1.SampleStormSpout.java
package com.sample.storm_example;

import java.util.*;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class SampleStormSpout extends BaseRichSpout

{   
    private static final long serialVersionUID=2L;
    private SpoutOutputCollector spoutOutputCollector;
   
    private static final Map<Integer,String> map = new HashMap<Integer,String>();
    static
    {
        map.put(0,"Kuamr");
        map.put(1,"Jinto");
        map.put(2, "Bharath");       
    }
    public void open(Map conf, TopologyContext context,SpoutOutputCollector spoutOutputCollector) 

   {
        this.spoutOutputCollector=spoutOutputCollector;       
    }
    public void nextTuple() 

   {
        final Random rand=new Random();
        int randNum=rand.nextInt(3);
        spoutOutputCollector.emit(new Values(map.get(randNum)));       
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("site"));       
    }
}




 2.SampleStormBolt.java




package com.sample.storm_example;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

public class SampleStormBolt extends BaseBasicBolt

 {
    private static final long serialversionUID=2L;  
    public void execute(Tuple input, BasicOutputCollector collector) {   
        String test=input.getStringByField("site");
        System.out.println("Name of the input site ID is   "+test);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {     
    }
}



3.SampleTopology.java
package com.sample.storm.storm_example;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;


public class SampleTopology {
   
    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {   
        TopologyBuilder builder=new TopologyBuilder();       
        builder.setSpout("LearningStormSpout", new LearningStormSpout(),2);
        builder.setBolt("LearningStormBolt",new LearningStormBolt(),4);       
        Config config=new Config();
        config.setDebug(true);       
        StormSubmitter.submitTopology("Storm_Learning_Topology",config,builder.createTopology()); 

/*If we want to run this program in local cluster use below code instead of above StormSubmitter class
Code Snippet:
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("LocalClusterTolology", conf, builder.createTopology());
        Thread.sleep(10000);
        cluster.shutdown();


*/   
            }
}



Note:
The above project has to be created as a  Maven Project.
pom.xml
<repositories>
  <repository>
    <id>clojars.org</id>
    <url>http://clojars.org/repo</url>
  </repository>
</repositories>
<dependencies>
  <dependency>
    <groupId>storm</groupId>
    <artifactId>storm</artifactId>
    <version>0.9.0.1</version>
    <exclusions>
      <exclusion>
        <artifactId>log4j-over-slf4j</artifactId>
        <groupId>org.slf4j</groupId>
      </exclusion>
    </exclusions>
  </dependency>
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-core</artifactId>
    <version>1.1.1</version>
  </dependency>
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.7</version>
  </dependency>
  <dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase</artifactId>
    <version>0.94.5</version>
    <exclusions>
      <exclusion>
        <artifactId>zookeeper</artifactId>
        <groupId>org.apache.zookeeper</groupId>
      </exclusion>
    </exclusions>
  </dependency>
  <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.10</version>
  </dependency>
</dependencies>


Execution Procedure:
1.Run it as a Maven install .
2.get the jar
3.Place the jar at storm executable location.
4.Use the below command to run the storm jar.
>storm jar SampleStorm-0.0.1-SNAPSHOT.jar com.sample.storm_example.SampleTopology
(Jar will submitted to the Cluster and check it in the UI )