您的位置:首页 > 大数据

二,使用Storm实现实时大数据分析实例:用storm来监测车辆速度是否超过80km/h

2015-04-22 12:17 633 查看
Bolt的实现Spout的输出结果将给予Bolt进行更深一步的处理。经过对用例的思考,我们的topology中需要如Figure
3中的两个Bolt。Figure 3:Spout到Bolt的数据流程。ThresholdCalculatorBoltSpout将tuple发出,由ThresholdCalculatorBolt接收并进行临界值处理。在这里,它将接收好几项输入进行检查;分别是:临界值检查临界值栏数检查(拆分成字段的数目)

临界值数据类型(拆分后字段的类型)

临界值出现的频数

临界值时间段检查

Listing Four中的类,定义用来保存这些值。Listing Four:ThresholdInfo类 public class ThresholdInfo implementsSerializable

{

private String action;

private String rule;

private Object thresholdValue;

private int thresholdColNumber;

private Integer timeWindow;

private int frequencyOfOccurence;

}

基于字段中提供的值,临界值检查将被Listing
Five中的execute()方法执行。代码大部分的功能是解析和接收值的检测。
Listing Five:临界值检测代码段 public void execute(Tuple tuple, BasicOutputCollector collector)

{

if(tuple!=null)

{

List inputTupleList = (List) tuple.getValues();

int thresholdColNum = thresholdInfo.getThresholdColNumber();

Object thresholdValue = thresholdInfo.getThresholdValue();

String thresholdDataType = tupleInfo.getFieldList().get(thresholdColNum-1).getColumnType();

Integer timeWindow = thresholdInfo.getTimeWindow();

int frequency = thresholdInfo.getFrequencyOfOccurence();

if(thresholdDataType.equalsIgnoreCase("string"))

{

String valueToCheck = inputTupleList.get(thresholdColNum-1).toString();

String frequencyChkOp = thresholdInfo.getAction();

if(timeWindow!=null)

{

long curTime = System.currentTimeMillis();

long diffInMinutes = (curTime-startTime)/(1000);

if(diffInMinutes>=timeWindow)

{

if(frequencyChkOp.equals("=="))

{

if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))

{

count.incrementAndGet();

if(count.get() > frequency)

splitAndEmit(inputTupleList,collector);

}

}

else if(frequencyChkOp.equals("!="))

{

if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))

{

count.incrementAndGet();

if(count.get() > frequency)

splitAndEmit(inputTupleList,collector);

}

}

else System.out.println("Operator not supported");

}

}

else

{

if(frequencyChkOp.equals("=="))

{

if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))

{

count.incrementAndGet();

if(count.get() > frequency)

splitAndEmit(inputTupleList,collector);

}

}

else if(frequencyChkOp.equals("!="))

{

if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))

{

count.incrementAndGet();

if(count.get() > frequency)

splitAndEmit(inputTupleList,collector);

}

}

}

}

else if(thresholdDataType.equalsIgnoreCase("int") || thresholdDataType.equalsIgnoreCase("double") || thresholdDataType.equalsIgnoreCase("float") || thresholdDataType.equalsIgnoreCase("long") || thresholdDataType.equalsIgnoreCase("short"))

{

String frequencyChkOp = thresholdInfo.getAction();

if(timeWindow!=null)

{

long valueToCheck = Long.parseLong(inputTupleList.get(thresholdColNum-1).toString());

long curTime = System.currentTimeMillis();

long diffInMinutes = (curTime-startTime)/(1000);

System.out.println("Difference in minutes="+diffInMinutes);

if(diffInMinutes>=timeWindow)

{

if(frequencyChkOp.equals("<"))

{

if(valueToCheck < Double.parseDouble(thresholdValue.toString()))

{

count.incrementAndGet();

if(count.get() > frequency)

splitAndEmit(inputTupleList,collector);

}

}

else if(frequencyChkOp.equals(">"))

{

if(valueToCheck > Double.parseDouble(thresholdValue.toString()))

{

count.incrementAndGet();

if(count.get() > frequency)

splitAndEmit(inputTupleList,collector);

}

}

else if(frequencyChkOp.equals("=="))

{

if(valueToCheck == Double.parseDouble(thresholdValue.toString()))

{

count.incrementAndGet();

if(count.get() > frequency)

splitAndEmit(inputTupleList,collector);

}

}

else if(frequencyChkOp.equals("!="))

{

. . .

}

}

}

else

splitAndEmit(null,collector);

}

else

{

System.err.println("Emitting null in bolt");

splitAndEmit(null,collector);

}

}

经由Bolt发送的的tuple将会传递到下一个对应的Bolt,在我们的用例中是DBWriterBolt。DBWriterBolt经过处理的tuple必须被持久化以便于触发tigger或者更深层次的使用。DBWiterBolt做了这个持久化的工作并把tuple存入了数据库。表的建立由prepare()函数完成,这也将是topology调用的第一个方法。方法的编码如Listing
Six所示。Listing Six:建表编码。 public void prepare( Map StormConf, TopologyContext context )

{

try

{

Class.forName(dbClass);

}

catch (ClassNotFoundException e)

{

System.out.println("Driver not found");

e.printStackTrace();

}

try

{

connection driverManager.getConnection(

"jdbc:mysql://"+databaseIP+":"+databasePort+"/"+databaseName, userName, pwd);

connection.prepareStatement("DROP TABLE IF EXISTS "+tableName).execute();

StringBuilder createQuery = new StringBuilder(

"CREATE TABLE IF NOT EXISTS "+tableName+"(");

for(Field fields : tupleInfo.getFieldList())

{

if(fields.getColumnType().equalsIgnoreCase("String"))

createQuery.append(fields.getColumnName()+" VARCHAR(500),");

else

createQuery.append(fields.getColumnName()+" "+fields.getColumnType()+",");

}

createQuery.append("thresholdTimeStamp timestamp)");

connection.prepareStatement(createQuery.toString()).execute();

// Insert Query

StringBuilder insertQuery = new StringBuilder("INSERT INTO "+tableName+"(");

String tempCreateQuery = new String();

for(Field fields : tupleInfo.getFieldList())

{

insertQuery.append(fields.getColumnName()+",");

}

insertQuery.append("thresholdTimeStamp").append(") values (");

for(Field fields : tupleInfo.getFieldList())

{

insertQuery.append("?,");

}

insertQuery.append("?)");

prepStatement = connection.prepareStatement(insertQuery.toString());

}

catch (SQLException e)

{

e.printStackTrace();

}

}

数据分批次的插入数据库。插入的逻辑由Listting
Seven中的execute()方法提供。大部分的编码都是用来实现可能存在不同类型输入的解析。Listing Seven:数据插入的代码部分。 public void execute(Tuple tuple, BasicOutputCollector collector)

{

batchExecuted=false;

if(tuple!=null)

{

List inputTupleList = (List) tuple.getValues();

int dbIndex=0;

for(int i=0;i

{

Field field = tupleInfo.getFieldList().get(i);

try {

dbIndex = i+1;

if(field.getColumnType().equalsIgnoreCase("String"))

prepStatement.setString(dbIndex, inputTupleList.get(i).toString());

else if(field.getColumnType().equalsIgnoreCase("int"))

prepStatement.setInt(dbIndex,

Integer.parseInt(inputTupleList.get(i).toString()));

else if(field.getColumnType().equalsIgnoreCase("long"))

prepStatement.setLong(dbIndex,

Long.parseLong(inputTupleList.get(i).toString()));

else if(field.getColumnType().equalsIgnoreCase("float"))

prepStatement.setFloat(dbIndex,

Float.parseFloat(inputTupleList.get(i).toString()));

else if(field.getColumnType().equalsIgnoreCase("double"))

prepStatement.setDouble(dbIndex,

Double.parseDouble(inputTupleList.get(i).toString()));

else if(field.getColumnType().equalsIgnoreCase("short"))

prepStatement.setShort(dbIndex,

Short.parseShort(inputTupleList.get(i).toString()));

else if(field.getColumnType().equalsIgnoreCase("boolean"))

prepStatement.setBoolean(dbIndex,

Boolean.parseBoolean(inputTupleList.get(i).toString()));

else if(field.getColumnType().equalsIgnoreCase("byte"))

prepStatement.setByte(dbIndex,

Byte.parseByte(inputTupleList.get(i).toString()));

else if(field.getColumnType().equalsIgnoreCase("Date"))

{

Date dateToAdd=null;

if (!(inputTupleList.get(i) instanceof Date))

{

DateFormat df = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");

try

{

dateToAdd = df.parse(inputTupleList.get(i).toString());

}

catch (ParseException e)

{

System.err.println("Data type not valid");

}

}

else

{

dateToAdd = (Date)inputTupleList.get(i);

java.sql.Date sqlDate = new java.sql.Date(dateToAdd.getTime());

prepStatement.setDate(dbIndex, sqlDate);

}

}

catch (SQLException e)

{

e.printStackTrace();

}

}

Date now = new Date();

try

{

prepStatement.setTimestamp(dbIndex+1, new java.sql.Timestamp(now.getTime()));

prepStatement.addBatch();

counter.incrementAndGet();

if (counter.get()== batchSize)

executeBatch();

}

catch (SQLException e1)

{

e1.printStackTrace();

}

}

else

{

long curTime = System.currentTimeMillis();

long diffInSeconds = (curTime-startTime)/(60*1000);

if(counter.get()batchTimeWindowInSeconds)

{

try {

executeBatch();

startTime = System.currentTimeMillis();

}

catch (SQLException e) {

e.printStackTrace();

}

}

}

}

public void executeBatch() throws SQLException

{

batchExecuted=true;

prepStatement.executeBatch();

counter = new AtomicInteger(0);

}

一旦Spout和Bolt准备就绪(等待被执行),topology生成器将会建立topology并准备执行。下面就来看一下执行步骤。在本地集群上运行和测试topology通过TopologyBuilder建立topology。

使用Storm
Submitter,将topology递交给集群。以topology的名字、配置和topology的对象作为参数。

提交topology。

Listing Eight:建立和执行topology。 public class StormMain

{

public static void main(String[] args) throws AlreadyAliveException,

InvalidTopologyException,

InterruptedException

{

ParallelFileSpout parallelFileSpout = new ParallelFileSpout();

ThresholdBolt thresholdBolt = new ThresholdBolt();

DBWriterBolt dbWriterBolt = new DBWriterBolt();

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", parallelFileSpout, 1);

builder.setBolt("thresholdBolt", thresholdBolt,1).shuffleGrouping("spout");

builder.setBolt("dbWriterBolt",dbWriterBolt,1).shuffleGrouping("thresholdBolt");

if(this.argsMain!=null && this.argsMain.length > 0)

{

conf.setNumWorkers(1);

StormSubmitter.submitTopology(

this.argsMain[0], conf, builder.createTopology());

}

else

{

Config conf = new Config();

conf.setDebug(true);

conf.setMaxTaskParallelism(3);

LocalCluster cluster = new LocalCluster();

cluster.submitTopology(

"Threshold_Test", conf, builder.createTopology());

}

}

}

topology被建立后将被提交到本地集群。一旦topology被提交,除非被取缔或者集群关闭,它将一直保持运行不需要做任何的修改。这也是Storm的另一大特色之一。这个简单的例子体现了当你掌握了topology、spout和bolt的概念,将可以轻松的使用Storm进行实时处理。如果你既想处理大数据又不想遍历Hadoop的话,不难发现使用Storm将是个很好的选择。
根据本文,我们基于storm开发了深圳市实时交通路况系统,源码已经在github上开源:
https://github.com/whughchen/RealTimeTraffic
欢迎关注并给出改进意见~
-------------------------------------------------
相关博文:storm实战:深圳市实时路况分析和实时路径推荐系统
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息