您的位置:首页 > 其它

sparkstreaming接受kafka数据实时存入hbse并集成rest服务

2017-11-22 17:46 766 查看


1:整个项目的流程分析

通过flume向kafka发送数据,然后通过sparkstreaming实时处理kafka的数据,处理完之后存到hbse,算法模型通过rest服务调用处理完的数据


2:服务器各组件的版本

java version “1.7.0_65”
Scala 2.11.8
Spark version 2.1.0
flume-1.6.0
kafka_2.10-0.8.2.1
hbase-1.0.0
服务器版本Aliyun Linux release6 15.01
dubbo2.8.4
开发工具eclipse


3:整个项目的代码结构




4:pom文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>

<groupId>com</groupId>
<artifactId>streaming_hbase_rest</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>com.streaming_hbase_rest</name>
<url>http://maven.apache.org</url>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.6</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>

<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.12</version>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo</artifactId>
<version>2.8.4</version>
</dependency>

<dependency>
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
<version>3.15.0-GA</version>
</dependency>

<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-core</artifactId>
<version>1.1.7</version>
</dependency>

<dependency>
<groupId>org.glassfish.grizzly</groupId>
<artifactId>grizzly-core</artifactId>
<version>2.1.4</version>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.2.1</version>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.1.39</version>
</dependency>

<dependency>
<groupId>com.thoughtworks.xstream</groupId>
<artifactId>xstream</artifactId>
<version>1.4.1</version>
</dependency>

<dependency>
<groupId>org.apache.bsf</groupId>
<artifactId>bsf-api</artifactId>
<version>3.1</version>
</dependency>

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>

<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.5.0</version>
</dependency>

<dependency>
<groupId>com.googlecode.xmemcached</groupId>
<artifactId>xmemcached</artifactId>
<version>1.3.6</version>
</dependency>

<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-frontend-simple</artifactId>
<version>2.6.1</version>
</dependency>

<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-http</artifactId>
<version>2.6.1</version>
</dependency>

<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.8.0</version>
</dependency>

<dependency>
<groupId>com.caucho</groupId>
<artifactId>hessian</artifactId>
<version>4.0.7</version>
</dependency>

<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<version>6.1.26</version>
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.2</version>
</dependency>

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.1.0</version>
</dependency>

<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
<version>1.0.0.GA</version>
</dependency>

<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
<version>4.2.0.Final</version>
</dependency>

<dependency>
<groupId>javax.cache</groupId>
<artifactId>cache-api</artifactId>
<version>0.4</version>
</dependency>

<dependency>
<groupId>org.jboss.resteasy</groupId>
<artifactId>resteasy-jaxrs</artifactId>
<version>3.0.7.Final</version>
</dependency>

<dependency>
<groupId>org.jboss.resteasy</groupId>
<artifactId>resteasy-client</artifactId>
<version>3.0.7.Final</version>
</dependency>

<dependency>
<groupId>org.jboss.resteasy</groupId>
<artifactId>resteasy-netty</artifactId>
<version>3.0.7.Final</version>
</dependency>

<dependency>
<groupId>org.jboss.resteasy</groupId>
<artifactId>resteasy-jdk-http</artifactId>
<version>3.0.7.Final</version>
</dependency>

<dependency>
<groupId>org.jboss.resteasy</groupId>
<artifactId>resteasy-jackson-provider</artifactId>
<version>3.0.7.Final</version>
</dependency>

<dependency>
<groupId>org.jboss.resteasy</groupId>
<artifactId>resteasy-jaxb-provider</artifactId>
<version>3.0.7.Final</version>
</dependency>

<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-core</artifactId>
<version>8.0.11</version>
</dependency>

<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-logging-juli</artifactId>
<version>8.0.11</version>
</dependency>

<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
<version>2.24.0</version>
</dependency>

<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
<version>0.26</version>
</dependency>

<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>fst</artifactId>
<version>1.55</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.3.6.RELEASE</version>
</dependency>

<!--==spark依赖==== -->
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
<scope>provided</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>2.1.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-tags_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_2.10</artifactId>
<version>2.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-launcher_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-launcher_2.10</artifactId>
<version>2.1.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-network-shuffle_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-shuffle_2.10</artifactId>
<version>2.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-unsafe_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-unsafe_2.10</artifactId>
<version>2.1.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-crypto -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-crypto</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/net.sf.py4j/py4j -->
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.10.4</version>
</dependency>

<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.21.Final</version>
</dependency>

<!-- https://mvnrepository.com/artifact/net.razorvine/pyrolite -->
<dependency>
<groupId>net.razorvine</groupId>
<artifactId>pyrolite</artifactId>
<version>4.13</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-logging/commons-logging -->
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1.1</version>
</dependency>

<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.4</version>
<classifier>jdk15</classifier>
</dependency>

<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>14.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.1</version>
</dependency>

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.3</version>
</dependency>
</dependencies>
</project>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
[/code]


4:rest服务相关代码


- UserClick

因为本需求是要根据userid去统计点击次数,所有该JavaBean只有两个属性

package com.streaming_hbase_rest;

import java.io.Serializable;

import javax.validation.constraints.NotNull;
import javax.xml.bind.annotation.XmlRootElement;

@XmlRootElement

public class UserClick implements Serializable {

@NotNull
private String userId;
@NotNull
private String clickNum;
public UserClick() {
}

public String getUserId() {
return userId;
}

public void setUserId(String userId) {
this.userId = userId;
}

public String getClickNum() {
return clickNum;
}

public void setClickNum(String clickNum) {
this.clickNum = clickNum;
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
[/code]


- UserClickRestService

对外提供服务的时候所暴露的接口

package com.streaming_hbase_rest;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("clickNum")
@Produces({MediaType.APPLICATION_JSON, MediaType.TEXT_XML})
public interface UserClickRestService {
@GET
@Path("{userId}")
public String getClickNum(@PathParam("userId")String userId);

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[/code]


- UserClickRestServiceImpl

接口的具体实现类,在这里去hbase做了查询,返回给调用者某个用户的具体的点击次数

package com.streaming_hbase_rest;

import javax.ws.rs.PathParam;

import com.streaming_hbase_rest.utils.HbaseUtils;

public class UserClickRestServiceImpl implements UserClickRestService {
public HbaseUtils hu = new HbaseUtils();

@Override
public String getClickNum(@PathParam("userId") String userId) {
System.out.println("rest服务开始查询");
System.out.println("本次查询的userid为:" + userId);
String clickNum = hu.getClickNumByUserid("app_clickNum", userId);
System.out.println("查询结果为:" + clickNum);
return clickNum == "null" ? "0" : clickNum;
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
[/code]


- spring-dubbo-accessrest-provider.xml

dubbo服务端的配置文件,消费端的就不介绍了,因为消费端不需要做任何的配置,通过url就可以调用接口

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:c="http://www.springframework.org/schema/c"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation=
"http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd ">

<!-- 提供方应用信息,用于计算依赖关系 -->
<dubbo:application name="countClickNumByUser-provider"  />

<!-- 使用zookeeper广播注册中心暴露服务地址 -->
<dubbo:registry address="zookeeper://master:2181" check="false" subscribe="false" />
<!-- 在8889端口暴露服务 -->
<dubbo:protocol name="rest" port="8889" server="tomcat" ></dubbo:protocol>
<!-- 声明需要暴露的服务接口 -->
<dubbo:service interface="com.streaming_hbase_rest.UserClickRestService" ref="userClickRestService_hbase" version="1.0.0" >
<dubbo:method name="getClickNum" timeout="100000000" retries="2" loadbalance="leastactive" actives="50" />
</dubbo:service>

<!-- 和本地bean一样实现服务 -->
<bean id="userClickRestService_hbase" class="com.streaming_hbase_rest.UserClickRestServiceImpl"></bean>

</beans>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
[/code]

至此rest服务基本结束,服务的启动将和streaming一起启动,后面会做介绍

宁波整形美容医院http://www.zuanno.com/


5:工具类相关代码


- TimeUtils

一个简单的时间戳转换工具类

package com.streaming_hbase_rest.utils;

import java.text.SimpleDateFormat;

/**
* Created by MR.wu on 2017/10/19.
*/
public class TimeUtils {

/**
* 将十位java时间戳转换为时间
*/
public  String processJavaTimestamp(String timestamp){

SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long lt = 0;
String clickTime;
try {
lt = new Long(timestamp);
} catch (NumberFormatException e) {
e.printStackTrace();
}
java.util.Date date = new java.util.Date(lt* 1000L);
clickTime = simpleDateFormat.format(date);
return clickTime;

}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
[/code]


- Json_redis

解析json的工具类,由于第一个版本是基于redis的,所有名字有点矛盾,不要在意细节
可以根据自己的业务需求进行更改

package com.streaming_hbase_rest.utils;

import java.io.Serializable;

import net.sf.json.JSONObject;

public class Json_redis implements Serializable {
public TimeUtils timeUtils = new TimeUtils();

/**
* 解析json数据
*/
public String[] pressJson(JSONObject log_data) {
System.out.println("开始处理Jason数据>>>>>" + log_data);
String[] jsonKey = new String[] { "product", "userAgent", "inTime",
"clickElement", "userId", "clickTime", "outTime", "location" };
String apk_type = "null";// 设备
String user_id = "null";// 设备
String apk_version = "null";// APP版本
String product = log_data.getString(jsonKey[0]);
// 对product数据进行判断
if (product.indexOf("|") == -1 || product.equals("|")) {// 数据格式不对,没有"|"或者只有一个"|"返回两个null
apk_type = "null";
apk_version = "null";
} else if (product.startsWith("|")) {// 前面没数据
apk_type = "null";
apk_version = product.split("\\|")[1];
} else if (product.endsWith("|")) {// 后面没数据
apk_type = product.split("\\|")[0];
apk_version = "null";
} else {// 正确数据
apk_type = product.split("\\|")[0];
apk_version = product.split("\\|")[1];
}
String userAgents = log_data.getString(jsonKey[1]);
String device_no = "null";
String device_name = "null";
// 对userAgents数据进行判断
if (userAgents.equals("|") || userAgents.indexOf("|") == -1) {// 数据格式不对,没有|或者只有一个|返回两个null
device_no = "null"; // 设备号
device_name = "null"; // 设备型号
} else if (userAgents.endsWith("|")) {// 后面没数据
device_no = userAgents.split("\\|")[0]; // 设备号
device_name = "null"; // 设备型号
} else if (userAgents.startsWith("|")) {// 前面没数据
device_no = "null"; // 设备号
device_name = userAgents.split("\\|")[1]; // 设备型号
System.out.println(userAgents.split("\\|").length);
} else {// 正常数据
device_no = userAgents.split("\\|")[0]; // 设备号
device_name = userAgents.split("\\|")[1]; // 设备型号
}
String click_element = log_data.getString(jsonKey[3]);
user_id = log_data.getString(jsonKey[4]);
String click_time = log_data.getString(jsonKey[5]);

String in_time = "null";
String out_time = "null";

if (!click_time.equals("null") && click_time != null
&& !click_time.isEmpty()) {
System.out.println("+++++++++++++++++++++++++click_time=>>>"
+ click_time.toString());
// 将点击时间的时间戳转换为标准时间
click_time = timeUtils.processJavaTimestamp(click_time);
} else {
click_time = "null";
}
/**
* 判断是否包含位置字段
*/
String location = "null";

if (log_data.toString().contains("location")) {
location = log_data.getString(jsonKey[7]);

}

if (log_data.toString().equals("inTime")
&& log_data.toString().equals("outTime")) {
in_time = log_data.getString(jsonKey[2]);
out_time = log_data.getString(jsonKey[6]);
}

if (!in_time.equals("null") && in_time != null) {
in_time = timeUtils.processJavaTimestamp(in_time);
} else {
in_time = "null";
}

if (!out_time.equals("null") && out_time != null) {
out_time = timeUtils.processJavaTimestamp(out_time);
} else {
out_time = "null";
}

String[] canshu = new String[] { user_id, click_element, apk_type,
apk_version, device_no, device_name, click_time, in_time,
out_time, location };
return canshu;
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
[/code]


- HbaseUtils

package com.streaming_hbase_rest.utils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

public class HbaseUtils {
public  static Configuration conf=null;
public static  Connection connect=null;
static {
conf = HBaseConfiguration.create();
conf.addResource("hbase-site.xml");
try {
connect = ConnectionFactory.createConnection(conf);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

/* get data. */
public  String getClickNumByUserid(String tableName, String user_id)

{
System.out.println("开始根据userid查询历史的点击次数");
/* get table. */
Table table;
String value="null";
try {
table = connect.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(user_id));
System.out.println("get>>>>>"+get);

Result result = table.get(get);

if(!result.isEmpty()){
for (Cell cell : result.listCells()) {
String column = new String(CellUtil.cloneQualifier(cell));
if(column.equals("clickNum")){
value =  new String(CellUtil.cloneValue(cell));
}
}

}
table.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

return value;
}

/* put data into table. */
public  void insertClickData(String tableName,String[] parameters)
throws IOException {
/* get table. */
TableName tn = TableName.valueOf(tableName);
Table table = connect.getTable(tn);
/* create put. */
Put put = new Put(Bytes.toBytes(parameters[0]));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("user_id"), Bytes.toBytes(parameters[0]));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("clickNum"), Bytes.toBytes(parameters[1]));
/* put data. */
table.put(put);
table.close();
System.out.println("add data Success!");
}
/* put data into table. */
public  void insertOriginalData(String tableName,String[] parameters)
throws IOException {
/* get table. */
TableName tn = TableName.valueOf(tableName);
Table table = connect.getTable(tn);
Long time = System.currentTimeMillis();
String rowkey = parameters[0]+time.toString();

/* create put. */
Put put = new Put(Bytes.toBytes(rowkey));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("user_id"), Bytes.toBytes(parameters[0]));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("click_element"), Bytes.toBytes(parameters[1]));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("apk_type"), Bytes.toBytes(parameters[2]));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("apk_version"), Bytes.toBytes(parameters[3]));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("device_no"), Bytes.toBytes(parameters[4]));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("device_name"), Bytes.toBytes(parameters[5]));

put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("click_time"), Bytes.toBytes(parameters[6]));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("in_time"), Bytes.toBytes(parameters[7]));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("out_time"), Bytes.toBytes(parameters[8]));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("location"), Bytes.toBytes(parameters[9]));
/* put data. */
table.put(put);
table.close();
System.out.println("add data Success!");
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
[/code]


6:程序的入口


- SparkStreamingMain

在这个类里面启动了streaming和rest服务

package com.streaming_hbase_rest;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

import net.sf.json.JSONArray;
import net.sf.json.JSONObject;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import scala.Tuple2;

import com.streaming_hbase_rest.utils.HbaseUtils;
import com.streaming_hbase_rest.utils.Json_redis;

/**
* Created by Mr.wu on 2017/9/18.
*/
public class SparkStreamingMain {
public static HbaseUtils hu = new HbaseUtils();
public static Json_redis js = new Json_redis();

public static void main(String[] args) throws IOException {
System.out.println("RestService启动");
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"dubbo/spring-dubbo-accessrest-provider.xml");
context.start();
System.out.println("RestService服务端已启动");
startStreaming();
System.in.read();
}

public static void startStreaming() {
System.out.println("streaming启动中>>>>>>>>>>>>>>>>>>");
// 设置匹配模式,以空格分隔
// 接收数据的地址和端口
String zkQuorum = "master:2181,slave1:2181,slave2:2181,slave3:2181,dw:2181";
// 话题所在的组
String group = "group1";
// 话题名称以“,”分隔
String topics = "countClickNumByUserid";
// 每个话题的分片数
int numThreads = 2;
SparkConf sparkConf = new SparkConf()
.setAppName("CountClickNumByUser_hbase").setMaster("local[*]")
.set("spark.streaming.receiver.writeAheadLog.enable", "true");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
new Duration(10000));
jssc.checkpoint("/sparkstreaming_rest/checkpoint"); // 设置检查点
// 存放话题跟分片的映射关系
Map<String, Integer> topicmap = new HashMap<>();
String[] topicsArr = topics.split(",");
int n = topicsArr.length;
for (int i = 0; i < n; i++) {
topicmap.put(topicsArr[i], numThreads);
}

System.out.println("开始从kafka获得数据>>");
// Set<String> topicSet = new HashSet<String>();
// topicSet.add("countClickNumByUserid");

// HashMap<String, String> kafkaParam = new HashMap<String, String>();
// kafkaParam.put("metadata.broker.list",
// "master:9092,slave1:9092,slave2:9092,slave3:9092,dw:9092");

// 直接操作kafka跳过zookeeper
// JavaPairInputDStream<String, String> lines =
// KafkaUtils.createDirectStream(
// jssc,
// String.class,
// String.class,
// StringDecoder.class,
// StringDecoder.class,
// kafkaParam,
// topicSet
// );

// 从Kafka中获取数据转换成RDD
JavaPairReceiverInputDStream<String, String> lines = KafkaUtils
.createStream(jssc, zkQuorum, group, topicmap);
System.out.println("从kafka获得数据结束基于Direct>>");

System.out.println("输出从kafka获得数据>>");
lines.print();
//lines.count().print();
// ----------
lines.foreachRDD(new VoidFunction<JavaPairRDD<String, String>>() {
@Override
public void call(JavaPairRDD<String, String> rdd) throws Exception {
VoidFunction<Tuple2<String, String>> aa;
// TODO Auto-generated method stub
rdd.foreach(new VoidFunction<Tuple2<String, String>>() {

@Override
public void call(Tuple2<String, String> arg0)
throws Exception {
// TODO Auto-generated method stub
System.out.println("数据处理中>>");
final Pattern SPACE = Pattern.compile(" ");
String APP_log_data = arg0._2;
JSONArray jsonArray = null;
try {
APP_log_data = APP_log_data.split("\\^")[1];
JSONObject log_data2 = JSONObject
.fromObject(APP_log_data);
jsonArray = log_data2.getJSONArray("data");
} catch (Exception e) {
// TODO Auto-generated catch block
}
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject log_data = jsonArray.getJSONObject(i);

String[] parameters = js.pressJson(log_data);
hu.insertOriginalData(
"app_spark_streaming_history", parameters);
String user_id = parameters[0];
String click_element = parameters[1];

// 如果user_id不为null,则向hbase加载历史数据,并放进map里面
if (user_id != "null" && click_element != "null") {

// 根据处理的user_id查询历史数据然后放统计表
String historyClickNum = hu
.getClickNumByUserid("app_clickNum",
user_id);

System.out.println("当前处理数据的user_id为:" + user_id
+ "从历史表里面拿到的历史点击次数为historyClickNum:"
+ historyClickNum);
if (historyClickNum == "null") {

System.out.println("没有历史数据");
// 没有历史数据
hu.insertClickData("app_clickNum",
new String[] { user_id, "1" });
} else {
System.out.println("有历史数据,开始进行累加");
// 有历史数据进行累加
Integer clickNum = new Integer(
historyClickNum);
System.out.println("累加前的点击次数为:"
+ clickNum.toString());
clickNum = clickNum + 1;
System.out.println("累加后的点击次数为:"
+ clickNum.toString());
hu.insertClickData(
"app_clickNum",
new String[] { user_id,
clickNum.toString() });
}
}
}
}
});
}
});
jssc.start();
try {
jssc.awaitTermination();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
[/code]

到此为止所有的代码写完了,接下来分享一下在开发过程中遇到的各种问题


7:错误总结和bug汇总

在sparkstreaming和rest服务集成的时候出现了javax.servlet包冲突,具体的报错信息忘记记录了,我的解决办法是将下面的包放到了pom文件的最开始,然后这个问题就解决了

<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.1.0</version>
</dependency>
1
2
3
4
5
[/code]

在访问hbase的时候遇到了,链接hbase的时候一开始什么错都不报,程序一直在等待,过了很久之后会报错什么could not find location….. 具体的报错信息记不住了。通过到服务器上zookeeper的注册目录下查看发现根目录下没有hbase文件夹,查看hbase的配置文件发现hbase的下面这个配置是:

<property>
<name>zookeeper.znode.parent</name>
<value>/mnt/usr/local/zookeeper/hbase</value>
</property>
1
2
3
4
[/code]

再次到zookeeper的注册目录下面发现hbase注册到了/mnt/usr/local/zookeeper/hbase目录下

尴尬啊,学艺不精啊!!!!

解决办法有两种

修改hbase的配置文件改成

<property>
<name>zookeeper.znode.parent</name>
<value>/hbase</value>
</property>
1
2
3
4
[/code]

在代码里面创建hbaseconf的时候指定zookeeper.znode.parent的目录(待验证,我用的是第一种办法)

项目终于跑起来了,后来通过监测发现丢数据特别严重(每天大概有几百万行的数据,但是处理的数据只有几万行),一开始的写法和上面的代码不同,错误代码如下,直接进行了flatMap没有进行foreachRDD操作(大写的尴尬啊)在网上找的大多数的demo都没有进行foreachRDD操作,如果你是运用到工作中,实际的生产中切记一定要foreachRDD



在处理丢数据的时候顺便尝试了KafkaUtils.createStream和KafkaUtils.createDirectStream两种方式从卡夫卡拿数据
先来分析一下两种实现的不同之处

KafkaUtils.createStream是基于zookeeper的,offset有zookeeper管理

KafkaUtils.createDirectStream是直接跳过了zookeeper直接操作kafka,所以要自己记录offset

经过对比发现KafkaUtils.createStream在开启WAL和checkpoint之后速度要稍微慢于KafkaUtils.createDirectStream


总结:


由于之前没做过spark,在这个过程中走了好多弯路,在网上找了一些demo但是都会有各种问题,都是一些学习的demo,很少有运用到生产的demo,所以我在做好这个项目以后就想着写篇博客供大家参考,里面还有很多的不足之处,希望大家可以批评指正。


最后展示下rest服务调用的图片



- 通过url直接调用服务,不需要做任何的配置,查询userid为2353087的点击次数

- 通过代码调用,我在这里是去查询了配置文件里所有id
import java.io.FileInputStream;
import java.util.Iterator;
import java.util.Properties;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;

public class TestConsumer {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
final String port = "8889";
Properties pro = new Properties();
FileInputStream in = new FileInputStream("aa.properties");
pro.load(in);
Iterator<String> it=pro.stringPropertyNames().iterator();

while(it.hasNext()){
String key=it.next();
System.out.println(key);
getUser("http://localhost:" + port + "/clickNum/"+key+".json");
Thread.sleep(3000);
}
}

private static void getUser(String url) {
System.out.println("Getting user via " + url);
Client client = ClientBuilder.newClient();
WebTarget target = client.target(url);
Response response = target.request().get();
try {
if (response.getStatus() != 200) {
throw new RuntimeException("Failed with HTTP error code : " + response.getStatus());
}
System.out.println("Successfully got result: " + response.readEntity(String.class));
} finally {
client.close();
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
[/code]
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: