Storm学习小结(二)——集成JDBC和Redis
2016-08-02 13:22
399 查看
Storm学习笔记(续)
Storm集成JDBC
在自己写的一个Demo中使用了数据库存储的操作,发现一些问题,场景大致是:spout从kafka中读取数据,发送给bolt1计算得到一个数据,bolt1发给bolt2存储到Redis,bolt2发送给bolt3,在bolt3中存储到Mysql。在前面两个bolt中都没啥问题,在bolt3中,当并发量非常大时,存在获取不到数据库连接的问题,笔者首先使用的是原声jdbc,然后换了C3P0连接池,都会在不同时机出现上述问题,于是去看了看Storm的官方文档,使用了官方提供的JDBC集成方式,发现自己遇到的问题很好的解决了。所以这里主要来说下Storm对JDBC的官方集成支持。首先来看看官方文档的说明。
Storm提供的JDBC集成是针对单表来操作的,也就是说一个jdbc bolt不管是查询还是插入操作,针对的是单表。当然,我们自己可以改写,后面再说。这里需要用到两个很重要的组件,分别是ConnectionProvider和JdbcMapper,其中ConnectionProvider负责获得数据库连接,JdbcMapper负责将Tupe和数据表里的字段做映射。
storm-jdbc提供的官方API中包括了查询和插入操作,因为查询比较简单,这里是以插入数据的角度总结的,关于查询,只要理解了插入,查询比较简单,在最后附了一块代码,感兴趣的可以看看。
添加依赖,不需要的话跳过
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-jdbc</artifactId> <version>1.0.1</version> </dependency>
1. ConnectionProvider和JdbcMapper组件介绍
(1)ConnectionProvider
根据文档说明,我们可以自己去实现一个ConnectionProvider,同时Storm也给我们提供了一个很好的实现org.apache.storm.jdbc.common.HikariCPConnectionProvider,查了下源码,这里使用的是HikariDataSource。HikariDataSource据说是“当前性能最好的数据源”,或许我上述问题的解决,关键问题就在于此~闲话不多说,通常来说,我们使用既定实现就够用了,使用方式如下:
Map hikariConfigMap = Maps.newHashMap(); hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource"); hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test"); hikariConfigMap.put("dataSource.user","root"); hikariConfigMap.put("dataSource.password","password"); ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);
(2)JdbcMapper:
如该组件的名字示意,JdbcMapper是个Mapper映射,将tupe和数据表的row做映射,也就是说,tupe中的field对应row中的column。同样的,我们也可以自己去实现JdbcMapper,或者使用Storm为我们提供的简单实现SimpleJdbcMapper。实现JdbcMapper接口只需要实现一个方法:
public interface JdbcMapper extends Serializable { List<Column> getColumns(ITuple tuple); }
Column是storm-jdbc定义的一个VO,对应于tupe中的field和表中的某一个字段。举个栗子,假如我们要插入的数据表包含三个字段col1,col2,col3,那么我们应该在实现该方法时,使得返回的List中包含三个elements,分别对应这三个字段。Column主要包含三个重要的属性,分别是Name/Val/Type,即列名/列值/列数据类型,在实现Mapper时,对于每个column分别需要指定这三个属性。在执行insert操作时,其中Name对应数据表中的字段名,Val一般而言便是从Tupe中获取而来(将要插入的值),Type通常是java.sql.Type的一种。
实现该Mapper的作用就是将数据表的字段和List里的Colum一一对应,因为storm-jdbc是通过这种映射来操作数据表的。在实现该方法时,需要尤其注意返回的List的顺序,即在往List中添加Column时,column的顺序应该和数据表中的字段顺序一一对应,storm-jdbc仅根据顺序来插入,而不会根据name来维护顺序,所以这里需要格外小心。
storm-jdbc为我们提供了简单的实现——SimpleJdbcMapper。如其名字所示,这是个简单的Mapper,它假设我们tupe中需要存储到数据库的field和数据库中的字段名一样,也就是说,如果将插入的表中有字段a1、a2,那么tupe中也有这样的a1、a2的field。SimpleJdbcMapper提供了两个构造器,分别是
public SimpleJdbcMapper(String tableName, ConnectionProvider connectionProvider) { Validate.notEmpty(tableName); Validate.notNull(connectionProvider); int queryTimeoutSecs = 30; connectionProvider.prepare(); JdbcClient client = new JdbcClient(connectionProvider, queryTimeoutSecs); this.schemaColumns = client.getColumnSchema(tableName); } public SimpleJdbcMapper(List<Column> schemaColumns) { Validate.notEmpty(schemaColumns); this.schemaColumns = schemaColumns; }
在SimpleJdbcMapper的源码中,他将根据schemaColumns来实现JdbcMapper接口中的List getColumns(ITuple tuple)方法,即从schemaColumns获取需要插入那几个字段,从tupe中获取需要插入的值,然后交给jdbcClient去执行插入操作。而这两个构造器的主要作用就是来初始化schemaColumns。如果我们使用第一个,那么它将通过connectionProvider和table自己去获取数据表的metadata,以初始化schemaColumns;如果我们使用第二个,我们需要自己提供一个组装好的schemaColumns。SimpleJdbcMapper的源码如下:
public class SimpleJdbcMapper implements JdbcMapper { private List<Column> schemaColumns; public SimpleJdbcMapper(String tableName, ConnectionProvider connectionProvider) { Validate.notEmpty(tableName); Validate.notNull(connectionProvider); int queryTimeoutSecs = 30; connectionProvider.prepare(); JdbcClient client = new JdbcClient(connectionProvider, queryTimeoutSecs); this.schemaColumns = client.getColumnSchema(tableName); } public SimpleJdbcMapper(List<Column> schemaColumns) { Validate.notEmpty(schemaColumns); this.schemaColumns = schemaColumns; } @Override public List<Column> getColumns(ITuple tuple) { List<Column> columns = new ArrayList<Column>(); for(Column column : schemaColumns) { String columnName = column.getColumnName(); Integer columnSqlType = column.getSqlType(); if(Util.getJavaType(columnSqlType).equals(String.class)) { String value = tuple.getStringByField(columnName); columns.add(new Column(columnName, value, columnSqlType)); } else if(Util.getJavaType(columnSqlType).equals(Short.class)) { Short value = tuple.getShortByField(columnName); columns.add(new Column(columnName, value, columnSqlType)); } else if(Util.getJavaType(columnSqlType).equals(Integer.class)) { Integer value = tuple.getIntegerByField(columnName); columns.add(new Column(columnName, value, columnSqlType)); } else if(Util.getJavaType(columnSqlType).equals(Long.class)) { Long value = tuple.getLongByField(columnName); columns.add(new Column(columnName, value, columnSqlType)); } else if(Util.getJavaType(columnSqlType).equals(Double.class)) { Double value = tuple.getDoubleByField(columnName); columns.add(new Column(columnName, value, columnSqlType)); } else if(Util.getJavaType(columnSqlType).equals(Float.class)) { Float value = tuple.getFloatByField(columnName); columns.add(new Column(columnName, value, columnSqlType)); } else if(Util.getJavaType(columnSqlType).equals(Boolean.class)) { Boolean value = tuple.getBooleanByField(columnName); columns.add(new Column(columnName, value, columnSqlType)); } else if(Util.getJavaType(columnSqlType).equals(byte[].class)) { byte[] value = tuple.getBinaryByField(columnName); columns.add(new Column(columnName, value, columnSqlType)); } else if(Util.getJavaType(columnSqlType).equals(Date.class)) { Long value = tuple.getLongByField(columnName); columns.add(new Column(columnName, new Date(value), columnSqlType)); } else if(Util.getJavaType(columnSqlType).equals(Time.class)) { Long value = tuple.getLongByField(columnName); columns.add(new Column(columnName, new Time(value), columnSqlType)); } else if(Util.getJavaType(columnSqlType).equals(Timestamp.class)) { Long value = tuple.getLongByField(columnName); columns.add(new Column(columnName, new Timestamp(value), columnSqlType)); } else { throw new RuntimeException("Unsupported java type in tuple " + Util.getJavaType(columnSqlType)); } } return columns; } }
有了以上ConnectionProvider和JdbcMapper之后,我们就可以去构建一个InsertBolt,storm-jdbc也给我们提供了API。该实现使用起来比较简单:
Map hikariConfigMap = Maps.newHashMap(); hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource"); hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test"); hikariConfigMap.put("dataSource.user","root"); hikariConfigMap.put("dataSource.password","password"); ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);String tableName = "user_details";
JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);
JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
.withTableName(tableName)
.withQueryTimeoutSecs(30);
上面的代码已经是一个完整的bolt,在使用SimpleJdbcMapper时,通过指定数据连接的方式初始化Mapper,也就是上面所说的第一种构造器。通过这种方式会比较受限制,因为这种机制要求我们,将要插入的数据表中的所有字段信息,都要在tupe中获取到对应的值,然而我们在应用中往往不会这么刚好符合上述的这种假定,我们可能只需要插入一个表中M个字段中的N个(N
Storm集成Redis
相对而言,Storm集成redis比较简单,官方也提供了包装好的RedisStoreBolt和RedisLookupBolt,感兴趣的自己去apache官网查看,这里说说比较灵活的方式:继承AbstractRedisBolt。首先贴上源码:
public abstract class AbstractRedisBolt extends BaseRichBolt { protected OutputCollector collector; private transient JedisCommandsInstanceContainer container; private JedisPoolConfig jedisPoolConfig; private JedisClusterConfig jedisClusterConfig; /** * Constructor for single Redis environment (JedisPool) * @param config configuration for initializing JedisPool */ public AbstractRedisBolt(JedisPoolConfig config) { this.jedisPoolConfig = config; } /** * Constructor for Redis Cluster environment (JedisCluster) * @param config configuration for initializing JedisCluster */ public AbstractRedisBolt(JedisClusterConfig config) { this.jedisClusterConfig = config; } /** * {@inheritDoc} */ @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) { // FIXME: stores map (stormConf), topologyContext and expose these to derived classes this.collector = collector; if (jedisPoolConfig != null) { this.container = JedisCommandsContainerBuilder.build(jedisPoolConfig); } else if (jedisClusterConfig != null) { this.container = JedisCommandsContainerBuilder.build(jedisClusterConfig); } else { throw new IllegalArgumentException("Jedis configuration not found"); } } /** * Borrow JedisCommands instance from container.<p/> * JedisCommands is an interface which contains single key operations. * @return implementation of JedisCommands * @see JedisCommandsInstanceContainer#getInstance() */ protected JedisCommands getInstance() { return this.container.getInstance(); } /** * Return borrowed instance to container. * @param instance borrowed object */ protected void returnInstance(JedisCommands instance) { this.container.returnInstance(instance); } }
通过源码我们看到,AbstractRedisBolt提供了两个构造器,分别用来初始化JedisPool或者JedisCluster,笔者在这里用的是JedisPool的方式。使用JedisPool,则需要提供JedisPoolConfig实例,用来初始化pool。具体使用,我们贴上官网的例子,大家根据例子结合自己的需求修改,比较easy。
public static class LookupWordTotalCountBolt extends AbstractRedisBolt { private static final Logger LOG = LoggerFactory.getLogger(LookupWordTotalCountBolt.class); private static final Random RANDOM = new Random(); public LookupWordTotalCountBolt(JedisPoolConfig config) { super(config); } public LookupWordTotalCountBolt(JedisClusterConfig config) { super(config); } @Override public void execute(Tuple input) { JedisCommands jedisCommands = null; try { jedisCommands = getInstance(); String wordName = input.getStringByField("word"); String countStr = jedisCommands.get(wordName); if (countStr != null) { int count = Integer.parseInt(countStr); this.collector.emit(new Values(wordName, count)); // print lookup result with low probability if(RANDOM.nextInt(1000) > 995) { LOG.info("Lookup result - word : " + wordName + " / count : " + count); } } else { // skip LOG.warn("Word not found in Redis - word : " + wordName); } } finally { if (jedisCommands != null) { returnInstance(jedisCommands); } this.collector.ack(input); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // wordName, count declarer.declare(new Fields("wordName", "count")); } }
不用理会代码中的Random相关,其目的只是为了打印日志而又不想全部打印,就以较小的概率打印日志。主要的代码是
jedisCommands = getInstance(); jedisCommands.get(wordName); returnInstance(jedisCommands);
即拿到jedis实例、执行插入或者查找、放回jedis实例,具体操作自己实现,就不多说啦。
相关文章推荐
- 系统集成学习小结
- redis学习小结
- redis 学习笔记(5)-Spring与Jedis的集成
- redis 学习笔记(5)-Spring与Jedis的集成
- SODBASE CEP学习(四)续:类SQL语言EPL与Storm或jStorm集成-滑动窗口
- JDBC学习小结
- SODBASE CEP学习(四)续:类SQL语言EPL与Storm或jStorm集成-使用分布式缓存
- redis 学习笔记(5)-Spring与Jedis的集成
- redis 学习笔记(5)-Spring与Jedis的集成
- redis学习小结
- 持续集成学习笔记-入门篇(10)小结
- [学习小结]Spring对JDBC的支持
- Android学习之 移动应用<App>微信支付集成小结
- redis 学习笔记(5)-Spring与Jedis的集成
- storm学习小结三:编写拓扑实践
- 框架学习之Spring 第四节 Spring集成JDBC组件开发
- spring学习--集成jdbc
- storm学习小结一:storm概述
- mysql的jdbc入门学习小结
- JDBC学习小结