您的位置:首页 > 数据库 > Redis

flume整合redis

2016-06-24 17:48 585 查看
flume整合redis 其实与mysql差不多

这是代码部分:

package Sink;

import java.util.List;

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.Jedis;

import com.google.common.collect.Lists;
import com.org.utils.CONSTANTS;
import com.org.utils.RedisUtils;

public class RedisSink extends AbstractSink implements Configurable{
private Logger LOG = LoggerFactory.getLogger(RedisSink.class);
private Jedis jedis = null;
private String REDIS_HOST;
private int REDIS_PORT;
private String REDIS_PWD;
private String REDIS_INSTANCE;
private String REDIS_AUTH;

public Status process() throws EventDeliveryException {

// TODO Auto-generated method stub
Status result = Status.READY;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
Event event;
String content;
List<String> actions = Lists.newArrayList();
transaction.begin();
try{

event = channel.take();
if(event != null){
content = new String(event.getBody());
LOG.info("content:"+content);
actions.add(content);
} else {
result = Status.BACKOFF;

}
if(actions.size() > 0){
for(String temp : actions){
LOG.info(temp+" ");
/*
* 日志解析存到redis上
*这里的temp就是flume接收的日志
* */

}
}
}
transaction.commit();
} catch (Throwable e) {
LOG.error("Fail to show");
transaction.rollback();
} finally {
transaction.close();
}
return result;
}
@Override
public void start(){
super.start();
jedis = new Jedis(REDIS_HOST,REDIS_PORT);
jedis.auth(REDIS_AUTH);
}
//这里是接收flume参数的地方。
public void configure(Context context) {
// TODO Auto-generated method stub
REDIS_HOST= context.getString("REDIS_HOST");
REDIS_PORT=context.getInteger("REDIS_PORT");
REDIS_PWD=context.getString("REDIS_PWD");
REDIS_INSTANCE=context.getString("REDIS_INSTANCE");
REDIS_AUTH = context.getString("REDIS_AUTH");

}
}
flume 配置代码:

 agent1.sinks.sink1.type =Sink.RedisSink

 agent1.sinks.sink1.REDIS_HOST = //这些都是你自己的连接参数 用到哪些就使用哪些。没有的可以不用

 agent1.sinks.sink1.REDIS_PORT=6379

 agent1.sinks.sink1.REDIS_PWD=

 agent1.sinks.sink1.REDIS_INSTANCE=

 agent1.sinks.sink1.REDIS_AUTH=
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: