storm-cassandra
2014-06-13 09:36
183 查看
通过一个动态可配置的
具体操作:
Basic Usage
均以key/map形式调用Storm topology配置中为Cassandra指定的hostname、port、keyspace,以允许topology中的多个实例可以连接同一个Cassandra实例。key在构造函数中指定以表示要使用的map:
上述创建的
则Cassandra row如下 ( from
Cassandra Write Function
如果Function不会emit任何数据,则Storm Trident 过滤掉original Tuple。在写入Cassandra之后,
emit 一个值,不要忘记 声明输出域。
Cassandra Counter Columns
The Counter Column与上相似,必须具体化rowKey 和value来具体化计数。所有其他field将按此计数累加。
上述创建的bolt 写入名为"
给定如下Tuple:
将为 "
backtype.storm.Bolt实现将Storm和Cassandra集成在一起,在类Class中指定column family, row key, and column name/values,则Bolt将Storm
Tuple对象写入Cassandra cluster的Column Family。
具体操作:
Basic UsageCassandraBolt
TridentCassandraLookupFunction
TridentCassandraWriteFunction
均以key/map形式调用Storm topology配置中为Cassandra指定的hostname、port、keyspace,以允许topology中的多个实例可以连接同一个Cassandra实例。key在构造函数中指定以表示要使用的map:
Map<String, Object> cassandraConfig = new HashMap<String, Object>(); cassandraConfig.put(StormCassandraConstants.CASSANDRA_HOST, "localhost:9160"); cassandraConfig.put(StormCassandraConstants.CASSANDRA_KEYSPACE, "testKeyspace"); Config config = new Config(); config.put("CassandraLocal", cassandraConfig);
CassandraBolt的构造器将column family name和 row key field的值作为参数:
IRichBolt cassandraBolt = new CassandraBolt("columnFamily", "rowKey");
上述创建的
CassandraBolt写入名为"
columnFamily" 的column family,并且查询/使用
backtype.storm.tuple.Tuple对象中的"
rowKey"作为 Cassandra row key。
backtype.storm.Tuple接受的每一field,
CassandraBolt将写入name/value对。比如,上述的构造器如下:
{rowKey: 12345, field1: "foo", field2: "bar}
则Cassandra row如下 ( from
cassandra-cli):
RowKey: 12345 => (column=field1, value=foo, timestamp=1321938505071001) => (column=field2, value=bar, timestamp=1321938505072000)
Cassandra Write Function
如果Function不会emit任何数据,则Storm Trident 过滤掉original Tuple。在写入Cassandra之后,
TridentCassandraWriteFunction则emit一个static Object值以进行其他处理,此主要使Tuple继续,而static value或由构造器或调用setValueToEmitAfterWrite方法设置,而如果设置为NULL则使得此Function不能emit而且Storm将过滤掉此Tuple。默认的行为是不emit,如果Function
emit 一个值,不要忘记 声明输出域。
Cassandra Counter Columns
The Counter Column与上相似,必须具体化rowKey 和value来具体化计数。所有其他field将按此计数累加。
CassandraCounterBatchingBolt logPersistenceBolt = new CassandraCounterBatchingBolt( "columnFamily", "RowKeyField", "IncrementAmountField" );
上述创建的bolt 写入名为"
columnFamily"的 column family并用名为 "
RowKeyField"指定需要接受的Tuple。 此Tuple的其他field则隐含增加
IncrementAmountField指定的值。
给定如下Tuple:
{rowKey: 12345, IncrementAmount: 1L, IncrementColumn: 'SomeCounter'}
将为 "
SomeCounter" 增加 1L。
相关文章推荐
- Storm – Kafka – Cassandra for Big Data System
- Storm/Cassandra集成错误:NoSuchMethodError: com.google.common.util.concurrent.Futures.withFallback
- zookeeper+storm+cassandra的集群部署以及问题
- Storm – Kafka – Cassandra for Big Data System
- Storm-0.9.0.1安装部署 指导
- Storm安装
- Storm系列(五)架构分析之Nimbus启动过程
- PHPStorm+PHP5.6+WIN7+IIS7
- 大数据架构:flume-ng+Kafka+Storm+HDFS 实时系统组合
- Remove advertisement of Storm 5
- Kafka+Storm+HDFS整合实践
- Storm系列(十一)架构分析之Supervisor-管理Worker进程的事件线程
- 005-采用storm程序对目录下文件的单词计数本地模式运行
- Storm Distributed RPC(DRPC)分布式远程过程调用
- hadoop、hbase、storm官方论坛交流群
- PHP学习(wampServer+PhpStorm+XDebug)——环境相关配置参考(下)
- storm 事务和DRPC结合
- Storm:storm架构
- storm中worker、executor、task之间的关系
- Storm UnresolvedAddressException异常