flume整合redis
2016-06-24 17:48
585 查看
flume整合redis 其实与mysql差不多
这是代码部分:
agent1.sinks.sink1.type =Sink.RedisSink
agent1.sinks.sink1.REDIS_HOST = //这些都是你自己的连接参数 用到哪些就使用哪些。没有的可以不用
agent1.sinks.sink1.REDIS_PORT=6379
agent1.sinks.sink1.REDIS_PWD=
agent1.sinks.sink1.REDIS_INSTANCE=
agent1.sinks.sink1.REDIS_AUTH=
这是代码部分:
package Sink; import java.util.List; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; import com.google.common.collect.Lists; import com.org.utils.CONSTANTS; import com.org.utils.RedisUtils; public class RedisSink extends AbstractSink implements Configurable{ private Logger LOG = LoggerFactory.getLogger(RedisSink.class); private Jedis jedis = null; private String REDIS_HOST; private int REDIS_PORT; private String REDIS_PWD; private String REDIS_INSTANCE; private String REDIS_AUTH; public Status process() throws EventDeliveryException { // TODO Auto-generated method stub Status result = Status.READY; Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); Event event; String content; List<String> actions = Lists.newArrayList(); transaction.begin(); try{ event = channel.take(); if(event != null){ content = new String(event.getBody()); LOG.info("content:"+content); actions.add(content); } else { result = Status.BACKOFF; } if(actions.size() > 0){ for(String temp : actions){ LOG.info(temp+" "); /* * 日志解析存到redis上 *这里的temp就是flume接收的日志 * */ } } } transaction.commit(); } catch (Throwable e) { LOG.error("Fail to show"); transaction.rollback(); } finally { transaction.close(); } return result; } @Override public void start(){ super.start(); jedis = new Jedis(REDIS_HOST,REDIS_PORT); jedis.auth(REDIS_AUTH); }
//这里是接收flume参数的地方。
public void configure(Context context) { // TODO Auto-generated method stub REDIS_HOST= context.getString("REDIS_HOST"); REDIS_PORT=context.getInteger("REDIS_PORT"); REDIS_PWD=context.getString("REDIS_PWD"); REDIS_INSTANCE=context.getString("REDIS_INSTANCE"); REDIS_AUTH = context.getString("REDIS_AUTH"); } }flume 配置代码:
agent1.sinks.sink1.type =Sink.RedisSink
agent1.sinks.sink1.REDIS_HOST = //这些都是你自己的连接参数 用到哪些就使用哪些。没有的可以不用
agent1.sinks.sink1.REDIS_PORT=6379
agent1.sinks.sink1.REDIS_PWD=
agent1.sinks.sink1.REDIS_INSTANCE=
agent1.sinks.sink1.REDIS_AUTH=
相关文章推荐
- 分布式业务Redis安装与集群配置
- Redis客户端之Spring整合Jedis
- Redis之CentOS7安装
- redis主从复制原理
- redis常用命令
- Java中使用Jedis操作Redis
- php 将查询出的数组数据存入redis
- Cenos7 下安装redis 和操作redis
- redis-cluster集群配置
- Yii2-Redis使用小记 - Cache(转)
- redis-集群 入门
- redis学习那点事儿(2)key
- redis的安装和启动和检测和停止
- Redis之java操作篇(Jedis)
- mongodb,redis,mysql 简要对比
- 小心Redis漏洞让你服务器沦为肉鸡
- Mongodb与Redis应用指标对比
- StackExchange.Redis helper访问类封装
- keepalived实现redis主备切换
- keepalived实现redis主备切换