Easy, Real-Time Big Data Analysis Using Storm
2012-12-31 20:54
696 查看
Conceptually straightforward and easy to
work with, Storm makes handling big data analysis a breeze.
Today, companies regularly generate terabytes of data in their daily operations. The sources include everything from data captured from network
sensors, to the Web, social media, transactional business data, and data created in other business contexts. Given the volume of data being generated, real-time computation has become a major challenge faced by many organizations. A scalable real-time computation
system that we have used effectively is the open-source Storm tool, which was developed at Twitter and is sometimes referred to as "real-time Hadoop." However, Storm is far simpler to use
than Hadoop in that it does not require mastering an alternate universe of new technologies simply to handle big data jobs.
This article explains how to use Storm. The example project, called "Speeding Alert System," analyzes real-time data and raises a trigger and
relevant data to a database, when the speed of a vehicle exceeds a predefined threshold.
huge amount of data—but does so in real time — with guaranteed reliability; that is, every message will be processed. Storm also offers features such as fault tolerance and distributed computation, which make it suitable for processing huge amounts of data
on different machines. It has these features as well:
It has simple scalability. To scale, you simply add machines and change parallelism settings
of the topology. Storm's usage of Hadoop's Zookeeper for cluster coordination makes it scalable for large cluster sizes.
It guarantees processing of every message.
Storm clusters are easy to manage.
Storm is fault tolerant: Once a topology is submitted, Storm runs the topology until it is
killed or the cluster is shut down. Also, if there are faults during execution, reassignment of tasks is handled by Storm.
Topologies in Storm can be defined in any language, although typically Java is used.
To follow the rest of the article, you first need to install and set up Storm. The steps are straightforward:
Download the
Storm archive from the official Storm website.
Unpack the bin/ directory onto your PATH and make sure the bin/storm script is executable.
failures. It is similar to the Job Tracker in Hadoop.
node executes a subset of a topology. The coordination between Nimbus and several supervisors is managed by a Zookeeper system or cluster.
is packaged into a Storm "topology." A topology consists of a graph of spouts (data sources) and bolts (data operations) that are connected with stream groupings (coordination). Let's look at these terms in greater depth.
makes sure to resend a tuple (which is an ordered list of data items) if Storm fails to process it. An unreliable spout does not track the tuple once it's emitted. The main method in a spout is
to the topology or it returns if there is nothing to emit.
to files/databases, and so on. Bolts receive the data from a spout for processing, which may further emit tuples to another bolt in case of complex stream transformations. The main method in a bolt is
In both the spout and bolt, to emit the tuple to more than one stream, the streams can be declared and specified in
stream groupings provided by Storm: shuffle grouping, fields grouping, all grouping, one grouping, direct grouping, and local/shuffle grouping. Custom implementation by using the
when a specific value crosses a predefined threshold. Using a Storm topology, the log file is read line by line and the topology is designed to monitor the incoming data. In terms of Storm components, the spout reads the incoming data. It not only reads the
data from existing files, but it also monitors for new files. As soon as a file is modified, spout reads this new entry and, after converting it to tuples (a format that can be read by a bolt), emits the tuples to the bolt to perform threshold analysis, which
finds any record that has exceeded the threshold.
The next section explains the use case in detail.
Instant threshold checks if the value of a field has exceeded the threshold value at that instant
and raises a trigger if the condition is satisfied. For example, raise a trigger if the speed of a vehicle exceeds 80 km/h.
Time series threshold checks if the value of a field has exceeded the threshold value for a
given time window and raises a trigger if the same is satisfied. For example, raise a trigger if the speed of a vehicle exceeds 80 km/h more than once in last five minutes.
Listing One shows a log file of the type we'll use, which contains vehicle data information such as vehicle number, speed at which the vehicle
is traveling, and location in which the information is captured.
Listing One: A log file with entries of vehicles passing through the checkpoint.
AB 123, 60, North city
BC 123, 70, South city
CD 234, 40, South city
DE 123, 40, East city
EF 123, 90, South city
GH 123, 50, West city
A corresponding XML file is created, which consists of the schema for the incoming data. It is used for parsing the log file. The schema XML
and its corresponding description are shown in the table.
The XML file and the log file are in a directory that is monitored by the spout constantly for real-time changes. The topology we use for this
example is shown in Figure 1.
Figure 1: Topology created in Storm to process real-time data.
As shown in Figure 1, the
further threshold processing. Once the processing is done, the contents of the line for which the threshold is calculated is emitted to the
for this process is explained next.
consider an example log file, which has vehicle data information such as vehicle number, speed at which the vehicle is travelling, and location in which the information is captured. (See Figure 2.)
Figure 2: Flow of data from log files to Spout.
Listing Two shows the specific XML file for a tuple, which specifies the fields and the delimiter separating the fields in a log file. Both the XML file and the data are kept in a directory
whose path is specified in the spout.
Listing Two: An XML file created for describing the log file.
An instance of
stores necessary information related to log file such as fields, delimiter, and type of field. This object is created by serializing the XML file using XStream.
Listen to changes on individual log files. Monitor the directory for the addition of new log files.
Convert rows read by the spout to tuples after declaring fields for them.
Declare the grouping between spout and bolt, deciding the way in which tuples are given to bolt.
The code for
Listing Three: Logic in Open, nextTuple, and declareOutputFields methods of Spout.
data added to the log file and as soon as data is added, it reads and emits the data to the bolt for processing.
Figure 3: Flow of data from Spout to Bolt.
Threshold value to check
Threshold column number to check
Threshold column data type
Threshold check operator
Threshold frequency of occurrence
Threshold time window
A class, shown Listing Four, is defined to hold these values.
Listing Four: ThresholdInfo class.
Based on the values provided in fields, the threshold check is made in the
values.
Listing Five: Code for threshold check.
The tuples emitted by the threshold bolt are passed to the next corresponding bolt, which is the
is done in
Listing Six: Code for creation of tables.
Insertion of data is done in batches. The logic for insertion is provided in
input types.
Listing Seven: Code for insertion of data.
Once the spout and bolt are ready to be executed, a topology is built by the topology builder to execute it. The next section explains the execution steps.
Using Storm Submitter, we submit the topology to the cluster. It takes name of the topology, configuration, and topology as input.
Submit the topology.
Listing Eight: Building and executing a topology.
After building the topology, it is submitted to local cluster. Once the topology is submitted, it runs until it is explicitly killed or the cluster is shut down without requiring any modifications.
This is another big advantage of Storm.
This comparatively simple example shows the ease with which it's possible to set up and use Storm once you understand the basic concepts of topology, spout, and bolt. The code is straightforward
and both scalability and speed are provided by Storm. So, if you're looking to handle big data and don't want to traverse the Hadoop universe, you might well find that using Storm is a simple and elegant solution.
Shruthi Kumar works as a technology analyst and Siddharth Patankar is a software engineer with the Cloud Center of Excellence at Infosys Labs.
Ref: http://www.drdobbs.com/cloud/easy-real-time-big-data-analysis-using-s/240143874?pgno=1
work with, Storm makes handling big data analysis a breeze.
Today, companies regularly generate terabytes of data in their daily operations. The sources include everything from data captured from network
sensors, to the Web, social media, transactional business data, and data created in other business contexts. Given the volume of data being generated, real-time computation has become a major challenge faced by many organizations. A scalable real-time computation
system that we have used effectively is the open-source Storm tool, which was developed at Twitter and is sometimes referred to as "real-time Hadoop." However, Storm is far simpler to use
than Hadoop in that it does not require mastering an alternate universe of new technologies simply to handle big data jobs.
This article explains how to use Storm. The example project, called "Speeding Alert System," analyzes real-time data and raises a trigger and
relevant data to a database, when the speed of a vehicle exceeds a predefined threshold.
Storm
Whereas Hadoop relies on batch processing, Storm is a real-time, distributed, fault-tolerant, computation system. Like Hadoop, it can processhuge amount of data—but does so in real time — with guaranteed reliability; that is, every message will be processed. Storm also offers features such as fault tolerance and distributed computation, which make it suitable for processing huge amounts of data
on different machines. It has these features as well:
It has simple scalability. To scale, you simply add machines and change parallelism settings
of the topology. Storm's usage of Hadoop's Zookeeper for cluster coordination makes it scalable for large cluster sizes.
It guarantees processing of every message.
Storm clusters are easy to manage.
Storm is fault tolerant: Once a topology is submitted, Storm runs the topology until it is
killed or the cluster is shut down. Also, if there are faults during execution, reassignment of tasks is handled by Storm.
Topologies in Storm can be defined in any language, although typically Java is used.
To follow the rest of the article, you first need to install and set up Storm. The steps are straightforward:
Download the
Storm archive from the official Storm website.
Unpack the bin/ directory onto your PATH and make sure the bin/storm script is executable.
Storm Components
A Storm cluster mainly consists of a master and worker node, with coordination done by Zookeeper.Master Node:
The master node runs a daemon, Nimbus, which is responsible for distributing the code around the cluster, assigning the tasks, and monitoringfailures. It is similar to the Job Tracker in Hadoop.
Worker Node:
The worker node runs a daemon, Supervisor, which listens to the work assigned and runs the worker process based on requirements. Each workernode executes a subset of a topology. The coordination between Nimbus and several supervisors is managed by a Zookeeper system or cluster.
Zookeeper
Zookeeper is responsible for maintaining the coordination service between the supervisor and master. The logic for a real-time applicationis packaged into a Storm "topology." A topology consists of a graph of spouts (data sources) and bolts (data operations) that are connected with stream groupings (coordination). Let's look at these terms in greater depth.
Spout:
In simple terms, a spout reads the data from a source for use in the topology. A spout can either be reliable or unreliable. A reliable spoutmakes sure to resend a tuple (which is an ordered list of data items) if Storm fails to process it. An unreliable spout does not track the tuple once it's emitted. The main method in a spout is
nextTuple(). This method either emits a new tuple
to the topology or it returns if there is nothing to emit.
Bolt:
A bolt is responsible for all the processing that happens in a topology. Bolts can do anything from filtering to joins, aggregations, talkingto files/databases, and so on. Bolts receive the data from a spout for processing, which may further emit tuples to another bolt in case of complex stream transformations. The main method in a bolt is
execute(), which accepts a tuple as input.
In both the spout and bolt, to emit the tuple to more than one stream, the streams can be declared and specified in
declareStream().
Stream Groupings:
A stream grouping defines how a stream should be partitioned among the bolt's tasks. There are built-instream groupings provided by Storm: shuffle grouping, fields grouping, all grouping, one grouping, direct grouping, and local/shuffle grouping. Custom implementation by using the
CustomStreamGroupinginterface can also be added.
Implementation
For our use case, we designed one topology of spout and bolt that can process a huge amount of data (log files) designed to trigger an alarmwhen a specific value crosses a predefined threshold. Using a Storm topology, the log file is read line by line and the topology is designed to monitor the incoming data. In terms of Storm components, the spout reads the incoming data. It not only reads the
data from existing files, but it also monitors for new files. As soon as a file is modified, spout reads this new entry and, after converting it to tuples (a format that can be read by a bolt), emits the tuples to the bolt to perform threshold analysis, which
finds any record that has exceeded the threshold.
The next section explains the use case in detail.
Threshold Analysis
In this article, we will be mainly concentrating on two types of threshold analysis: instant threshold and time series threshold.Instant threshold checks if the value of a field has exceeded the threshold value at that instant
and raises a trigger if the condition is satisfied. For example, raise a trigger if the speed of a vehicle exceeds 80 km/h.
Time series threshold checks if the value of a field has exceeded the threshold value for a
given time window and raises a trigger if the same is satisfied. For example, raise a trigger if the speed of a vehicle exceeds 80 km/h more than once in last five minutes.
Listing One shows a log file of the type we'll use, which contains vehicle data information such as vehicle number, speed at which the vehicle
is traveling, and location in which the information is captured.
Listing One: A log file with entries of vehicles passing through the checkpoint.
AB 123, 60, North city
BC 123, 70, South city
CD 234, 40, South city
DE 123, 40, East city
EF 123, 90, South city
GH 123, 50, West city
A corresponding XML file is created, which consists of the schema for the incoming data. It is used for parsing the log file. The schema XML
and its corresponding description are shown in the table.
The XML file and the log file are in a directory that is monitored by the spout constantly for real-time changes. The topology we use for this
example is shown in Figure 1.
Figure 1: Topology created in Storm to process real-time data.
As shown in Figure 1, the
FileListenerSpoutaccepts the input log file, reads the data line by line, and emits the datea to the
ThresoldCalculatorBoltfor
further threshold processing. Once the processing is done, the contents of the line for which the threshold is calculated is emitted to the
DBWriterBolt, where it is persisted in the database (or an alert is raised). The detailed implementation
for this process is explained next.
Spout Implementation
Spout takes a log file and the XML descriptor file as the input. The XML file consists of the schema corresponding to the log file. Let usconsider an example log file, which has vehicle data information such as vehicle number, speed at which the vehicle is travelling, and location in which the information is captured. (See Figure 2.)
Figure 2: Flow of data from log files to Spout.
Listing Two shows the specific XML file for a tuple, which specifies the fields and the delimiter separating the fields in a log file. Both the XML file and the data are kept in a directory
whose path is specified in the spout.
Listing Two: An XML file created for describing the log file.
<TUPLEINFO> <FIELDLIST> <FIELD> <COLUMNNAME>vehicle_number</COLUMNNAME> <COLUMNTYPE>string</COLUMNTYPE> </FIELD> <FIELD> <COLUMNNAME>speed</COLUMNNAME> <COLUMNTYPE>int</COLUMNTYPE> </FIELD> <FIELD> <COLUMNNAME>location</COLUMNNAME> <COLUMNTYPE>string</COLUMNTYPE> </FIELD> </FIELDLIST> <DELIMITER>,</DELIMITER> </TUPLEINFO>
An instance of
Spoutis initialized with constructor parameters of
Directory,
Path, and
TupleInfoobject. The
TupleInfoobject
stores necessary information related to log file such as fields, delimiter, and type of field. This object is created by serializing the XML file using XStream.
Spoutimplementation steps are:
Listen to changes on individual log files. Monitor the directory for the addition of new log files.
Convert rows read by the spout to tuples after declaring fields for them.
Declare the grouping between spout and bolt, deciding the way in which tuples are given to bolt.
The code for
Spoutis shown in Listing Three.
Listing Three: Logic in Open, nextTuple, and declareOutputFields methods of Spout.
public void open( Map conf, TopologyContext context,SpoutOutputCollector collector ) { _collector = collector; try { fileReader = new BufferedReader(new FileReader(new File(file))); } catch (FileNotFoundException e) { System.exit(1); } } public void nextTuple() { protected void ListenFile(File file) { Utils.sleep(2000); RandomAccessFile access = null; String line = null; try { while ((line = access.readLine()) != null) { if (line !=null) { String[] fields=null; if (tupleInfo.getDelimiter().equals("|")) fields = line.split("\\"+tupleInfo.getDelimiter()); else fields = line.split(tupleInfo.getDelimiter()); if (tupleInfo.getFieldList().size() == fields.length) _collector.emit(new Values(fields)); } } } catch (IOException ex) { } } } public void declareOutputFields(OutputFieldsDeclarer declarer) { String[] fieldsArr = new String [tupleInfo.getFieldList().size()]; for(int i=0; i<tupleInfo.getFieldList().size(); i++) { fieldsArr[i] = tupleInfo.getFieldList().get(i).getColumnName(); } declarer.declare(new Fields(fieldsArr)); }
declareOutputFields()decides the format in which the tuple is emitted, so that the bolt can decode the tuple in a similar fashion.
Spoutkeeps on listening to the
data added to the log file and as soon as data is added, it reads and emits the data to the bolt for processing.
Bolt Implementation
The output ofSpoutis given to
Boltfor further processing. The topology we have considered for our use case consists of two bolts as shown in Figure 3.
Figure 3: Flow of data from Spout to Bolt.
ThresholdCalculatorBolt
The tuples emitted by spout is received by theThresholdCalculatorBoltfor threshold processing. It accepts several of inputs for threshold check. The inputs it accepts are:
Threshold value to check
Threshold column number to check
Threshold column data type
Threshold check operator
Threshold frequency of occurrence
Threshold time window
A class, shown Listing Four, is defined to hold these values.
Listing Four: ThresholdInfo class.
public class ThresholdInfo implements Serializable { private String action; private String rule; private Object thresholdValue; private int thresholdColNumber; private Integer timeWindow; private int frequencyOfOccurence; }
Based on the values provided in fields, the threshold check is made in the
execute()method as shown in Listing Five. The code mostly consists of parsing and checking the incoming
values.
Listing Five: Code for threshold check.
public void execute(Tuple tuple, BasicOutputCollector collector) { if(tuple!=null) { List<Object> inputTupleList = (List<Object>) tuple.getValues(); int thresholdColNum = thresholdInfo.getThresholdColNumber(); Object thresholdValue = thresholdInfo.getThresholdValue(); String thresholdDataType = tupleInfo.getFieldList().get(thresholdColNum-1).getColumnType(); Integer timeWindow = thresholdInfo.getTimeWindow(); int frequency = thresholdInfo.getFrequencyOfOccurence(); if(thresholdDataType.equalsIgnoreCase("string")) { String valueToCheck = inputTupleList.get(thresholdColNum-1).toString(); String frequencyChkOp = thresholdInfo.getAction(); if(timeWindow!=null) { long curTime = System.currentTimeMillis(); long diffInMinutes = (curTime-startTime)/(1000); if(diffInMinutes>=timeWindow) { if(frequencyChkOp.equals("==")) { if(valueToCheck.equalsIgnoreCase(thresholdValue.toString())) { count.incrementAndGet(); if(count.get() > frequency) splitAndEmit(inputTupleList,collector); } } else if(frequencyChkOp.equals("!=")) { if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString())) { count.incrementAndGet(); if(count.get() > frequency) splitAndEmit(inputTupleList,collector); } } else System.out.println("Operator not supported"); } } else { if(frequencyChkOp.equals("==")) { if(valueToCheck.equalsIgnoreCase(thresholdValue.toString())) { count.incrementAndGet(); if(count.get() > frequency) splitAndEmit(inputTupleList,collector); } } else if(frequencyChkOp.equals("!=")) { if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString())) { count.incrementAndGet(); if(count.get() > frequency) splitAndEmit(inputTupleList,collector); } } } } else if(thresholdDataType.equalsIgnoreCase("int") || thresholdDataType.equalsIgnoreCase("double") || thresholdDataType.equalsIgnoreCase("float") || thresholdDataType.equalsIgnoreCase("long") || thresholdDataType.equalsIgnoreCase("short")) { String frequencyChkOp = thresholdInfo.getAction(); if(timeWindow!=null) { long valueToCheck = Long.parseLong(inputTupleList.get(thresholdColNum-1).toString()); long curTime = System.currentTimeMillis(); long diffInMinutes = (curTime-startTime)/(1000); System.out.println("Difference in minutes="+diffInMinutes); if(diffInMinutes>=timeWindow) { if(frequencyChkOp.equals("<")) { if(valueToCheck < Double.parseDouble(thresholdValue.toString())) { count.incrementAndGet(); if(count.get() > frequency) splitAndEmit(inputTupleList,collector); } } else if(frequencyChkOp.equals(">")) { if(valueToCheck > Double.parseDouble(thresholdValue.toString())) { count.incrementAndGet(); if(count.get() > frequency) splitAndEmit(inputTupleList,collector); } } else if(frequencyChkOp.equals("==")) { if(valueToCheck == Double.parseDouble(thresholdValue.toString())) { count.incrementAndGet(); if(count.get() > frequency) splitAndEmit(inputTupleList,collector); } } else if(frequencyChkOp.equals("!=")) { . . . } } } else splitAndEmit(null,collector); } else { System.err.println("Emitting null in bolt"); splitAndEmit(null,collector); } }
The tuples emitted by the threshold bolt are passed to the next corresponding bolt, which is the
DBWriterBoltbolt in our case.
DBWriterBolt
The processed tuple has to be persisted for raising a trigger or for further use.DBWriterBoltdoes the job of persisting the tuples into the database. The creation of a table
is done in
prepare(), which is the first method invoked by the topology. Code for this method is given in Listing Six.
Listing Six: Code for creation of tables.
public void prepare( Map StormConf, TopologyContext context ) { try { Class.forName(dbClass); } catch (ClassNotFoundException e) { System.out.println("Driver not found"); e.printStackTrace(); } try { connection driverManager.getConnection( "jdbc:mysql://"+databaseIP+":"+databasePort+"/"+databaseName, userName, pwd); connection.prepareStatement("DROP TABLE IF EXISTS "+tableName).execute(); StringBuilder createQuery = new StringBuilder( "CREATE TABLE IF NOT EXISTS "+tableName+"("); for(Field fields : tupleInfo.getFieldList()) { if(fields.getColumnType().equalsIgnoreCase("String")) createQuery.append(fields.getColumnName()+" VARCHAR(500),"); else createQuery.append(fields.getColumnName()+" "+fields.getColumnType()+","); } createQuery.append("thresholdTimeStamp timestamp)"); connection.prepareStatement(createQuery.toString()).execute(); // Insert Query StringBuilder insertQuery = new StringBuilder("INSERT INTO "+tableName+"("); String tempCreateQuery = new String(); for(Field fields : tupleInfo.getFieldList()) { insertQuery.append(fields.getColumnName()+","); } insertQuery.append("thresholdTimeStamp").append(") values ("); for(Field fields : tupleInfo.getFieldList()) { insertQuery.append("?,"); } insertQuery.append("?)"); prepStatement = connection.prepareStatement(insertQuery.toString()); } catch (SQLException e) { e.printStackTrace(); } }
Insertion of data is done in batches. The logic for insertion is provided in
execute()as shown in Listing Seven, and consists mostly of parsing the variety of different possible
input types.
Listing Seven: Code for insertion of data.
public void execute(Tuple tuple, BasicOutputCollector collector) { batchExecuted=false; if(tuple!=null) { List<Object> inputTupleList = (List<Object>) tuple.getValues(); int dbIndex=0; for(int i=0;i<tupleInfo.getFieldList().size();i++) { Field field = tupleInfo.getFieldList().get(i); try { dbIndex = i+1; if(field.getColumnType().equalsIgnoreCase("String")) prepStatement.setString(dbIndex, inputTupleList.get(i).toString()); else if(field.getColumnType().equalsIgnoreCase("int")) prepStatement.setInt(dbIndex, Integer.parseInt(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase("long")) prepStatement.setLong(dbIndex, Long.parseLong(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase("float")) prepStatement.setFloat(dbIndex, Float.parseFloat(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase("double")) prepStatement.setDouble(dbIndex, Double.parseDouble(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase("short")) prepStatement.setShort(dbIndex, Short.parseShort(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase("boolean")) prepStatement.setBoolean(dbIndex, Boolean.parseBoolean(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase("byte")) prepStatement.setByte(dbIndex, Byte.parseByte(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase("Date")) { Date dateToAdd=null; if (!(inputTupleList.get(i) instanceof Date)) { DateFormat df = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); try { dateToAdd = df.parse(inputTupleList.get(i).toString()); } catch (ParseException e) { System.err.println("Data type not valid"); } } else { dateToAdd = (Date)inputTupleList.get(i); java.sql.Date sqlDate = new java.sql.Date(dateToAdd.getTime()); prepStatement.setDate(dbIndex, sqlDate); } } catch (SQLException e) { e.printStackTrace(); } } Date now = new Date(); try { prepStatement.setTimestamp(dbIndex+1, new java.sql.Timestamp(now.getTime())); prepStatement.addBatch(); counter.incrementAndGet(); if (counter.get()== batchSize) executeBatch(); } catch (SQLException e1) { e1.printStackTrace(); } } else { long curTime = System.currentTimeMillis(); long diffInSeconds = (curTime-startTime)/(60*1000); if(counter.get()<batchSize && diffInSeconds>batchTimeWindowInSeconds) { try { executeBatch(); startTime = System.currentTimeMillis(); } catch (SQLException e) { e.printStackTrace(); } } } } public void executeBatch() throws SQLException { batchExecuted=true; prepStatement.executeBatch(); counter = new AtomicInteger(0); }
Once the spout and bolt are ready to be executed, a topology is built by the topology builder to execute it. The next section explains the execution steps.
Running and Testing the Topology in a Local Cluster
Define the topology usingTopologyBuilder, which exposes the Java API for specifying a topology for Storm to execute.
Using Storm Submitter, we submit the topology to the cluster. It takes name of the topology, configuration, and topology as input.
Submit the topology.
Listing Eight: Building and executing a topology.
public class StormMain { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException { ParallelFileSpout parallelFileSpout = new ParallelFileSpout(); ThresholdBolt thresholdBolt = new ThresholdBolt(); DBWriterBolt dbWriterBolt = new DBWriterBolt(); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", parallelFileSpout, 1); builder.setBolt("thresholdBolt", thresholdBolt,1).shuffleGrouping("spout"); builder.setBolt("dbWriterBolt",dbWriterBolt,1).shuffleGrouping("thresholdBolt"); if(this.argsMain!=null && this.argsMain.length > 0) { conf.setNumWorkers(1); StormSubmitter.submitTopology( this.argsMain[0], conf, builder.createTopology()); } else { Config conf = new Config(); conf.setDebug(true); conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology( "Threshold_Test", conf, builder.createTopology()); } } }
After building the topology, it is submitted to local cluster. Once the topology is submitted, it runs until it is explicitly killed or the cluster is shut down without requiring any modifications.
This is another big advantage of Storm.
This comparatively simple example shows the ease with which it's possible to set up and use Storm once you understand the basic concepts of topology, spout, and bolt. The code is straightforward
and both scalability and speed are provided by Storm. So, if you're looking to handle big data and don't want to traverse the Hadoop universe, you might well find that using Storm is a simple and elegant solution.
Shruthi Kumar works as a technology analyst and Siddharth Patankar is a software engineer with the Cloud Center of Excellence at Infosys Labs.
Ref: http://www.drdobbs.com/cloud/easy-real-time-big-data-analysis-using-s/240143874?pgno=1
相关文章推荐
- 关注的storm作者一本书---Big Data Principles and best practices of scalable realtime data systems
- Real Time Detection of Outliers in Sensor Data using Spark Streaming
- An Artificial Neural Network-based Stock Trading Sysytem Using Technical Analysis and Big Data Frame
- Real Time Analytics for Big Data: An Alternative Approach
- 实时数据分析Real-time data analysis frameworks (or stream system)
- 论文阅读 | An Artificial Neural Network-based Stock Trading System Using Technical Analysis and Big Data
- New Apache project will Drill big data in near real time
- GridGain = Real Time Big Data
- big data for realtime
- Note of big data dummies:Looking at Real-Time and Non-Real-Time Requirements
- Scala In 5 Years – My Prediction « GridGain – Real Time Big Data
- Javascript miscellanea -display data real time, using window.status
- Javascript miscellanea -display data real time, using window.status
- 姿态估计论文思路整理 -- Realtime Multi-Person 2D Pose Estimation using Part Affinity Fields
- Streaming Real-time Data Into HBase
- Real-time tracking of multiple objects using adaptive correlation filters with complex constraints
- Oracle Golden Gate 系列十八 -- GG 多对一 real-time data warehousing 说明 与 示例
- Social Data and Log Analysis Using MongoDB
- Realtime Multi-Person 2D Pose Estimation using Part Affinity Fields ∗ 实时多人人体姿态估计论文原理讲解
- 采用Excel RTD(Excel Real-Time Data)技术实时刷新Excel单元格的数据