您的位置:首页 > 编程语言 > Java开发

java操作nsq数据插入elasticsearch

2017-09-21 11:08 846 查看
在网上找了很多资料关于怎么从nsq中通过java拉取数据存到es中去,因为之前没有接触过nsq和elasticsearch。所以没有一套完整的流程案例,也折腾了好几天。写下来给大家借鉴,也给自己做个记录,以后忘了拿出来看看。

java调用Nsq消息服务

首先,要了解一下nsq是一个什么东西,百度一下就可以稍做了解,关于安装可以参照以下博客(在windows下安装,其他系统还没了解):http://blog.csdn.net/tian_lai_yuyuh/article/details/52700174

要通过java调用nsq消息服务,nsq官方提供了很多的client。



这里我使用的是javaNSQclient

在github上也有案例,可以参考https://github.com/brainlag/JavaNSQClient

用maven构建项目,在Pom添加依赖:

javaNSQclient:

<dependency>
<groupId>com.github.brainlag</groupId>
<artifactId>nsq-client</artifactId>
<version>1.0.0.RC4</version>
</dependency>


然后根据例子建立生产者和消费者:

生产者:

NSQProducer producer = new NSQProducer().addAddress("localhost", 4150).start();
producer.produce("TestTopic", ("this is a message").getBytes());


消费者:

NSQLookup lookup = new DefaultNSQLookup();
lookup.addLookupAddress("localhost", 4161);
NSQConsumer consumer = new NSQConsumer(lookup, "speedtest", "dustin", (message) -> {
System.out.println("received: " + message);
//now mark the message as finished.
message.finished();

//or you could requeue it, which indicates a failure and puts it back on the queue.
//message.requeue();
});

consumer.start();


添加相应信息后,就可以消费数据啦。

插入到elasticsearch中。



这是我的目录结构。

实体类

private String id;

private String ip;

private String level;

private String push_time;

private String body;

private String uuid;

private String service_name;

@ID
private String uuid;


其中实体类中必须要有id属性(案例中创建了一个id属性,在插入时并不赋值,es会自动生成唯一键的id),如果在原来的数据中有一个类似于唯一键的,直接在该属性上添加@ID 注解(如上的uuid,假设是唯一键),如果没有就添加一个ID属性并get,set。不然运行时会报错。

repository类

@Repository
public interface DataRepository extends     ElasticsearchRepository<Data,String>{

//继承ElasticsearchRepository的操作功能
//ElasticsearchRepository接口中有相应的增删改差的功能。如有其//他需求可以重写接口中的方法。
}


配置elasticsearch地址:

.yml和properties文件都可以,此处我选用的是yml格式:

spring:
data:

4000
elasticsearch:
cluster-nodes: 192.168.1.8:9300
cluster-name: poch-test
repositories:
enabled: true
properties:

client.transport.ignore_cluster_name: true


service层

先写service接口(此处省略).

在service实现类中注入repository。



调用其save方法进行插入。

最后在controller控制层中调用即可,如果nsq中拉取的数据没有唯一键的,在最后的save方法前,通过map来代替bean进行转换就可以啦。

第一次写博客,如有写的不好的地方请指教!

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: