您的位置:首页 > 其它

Spark streaming kafka OffsetOutOfRangeException 异常分析与解决

2017-03-29 10:46 567 查看
Spark streaming kafka OffsetOutOfRangeException
异常分析与解决

原文地址:
http://blog.csdn.net/xueba207/article/details/51174818
自从把Spark 从1.3升级到1.6之后,kafka Streaming相关问题频出。最近又遇到了一个。 

job中使用Kafka DirectStream 读取topic中数据,然后做处理。其中有个测试job,停止了几天,再次启动时爆出了kafka.common.OffsetOutOfRangeException。下文记录下异常分析与解决过程。


异常分析

从字面意思上,说是kafka topic的offset越界异常;在job中使用的是Kafka DirectStream,每成功处理一批数据,就把对应的offset更新到zookeeper中;和数组越界异常一样,offset越界应该分为头越界和尾越界,如下图所示。 



头部越界: zookeeper中保存的offset在topic中仍然存在的最老message的offset之前时(zk_offset < earliest_offset);
尾部越界: zookeeper中保存的offset在topic中最新message的offset之后时(zk_offset > last_offset)

因为代码中采用了之前文章的方法,因此不可能是尾部越界,因此猜测是头部越界。 
是什么导致头部越界呢? 

考虑到kafka broker配置中修改了message的保持时间为24小时:

log.retention.hours=24(The minimum age of a log file to be eligible for deletion)
因此,应该是kafka 中未被消费的数据被broker清除了,使得zk中的offset落在仍存在的最老message offset的左侧,本来合法的offset变得不非法了。


验证猜测

改kafka broker 的retention time 为2分钟 

配置文件 

kafka/config/server.properties 
log.retention.hours=168 -> log.retention.minutes=2
 

修改完成后重启kafka。
使用zk shell 命令得到解析器所保存的zk_offset
停止spark streaming kafka DirectStream job
发送数据到kafka topic,等待一段时间(超过两分钟)
启动streaming job,复现该异常。

通过异常验证可以导致异常的原因为:kafka broker因为log.retention.hours的配置,导致topic中有些数据被清除,而在retention时间范围内streaming job都没有把将要被清除的message消费掉,因此zk中offset落在了earliest_offset的左侧,引发异常。


解决方法

首先想到的方法就是 streaming job要及时消费掉topic中的数据,消费延迟不得大于log.retention.time的配置。 

但是更好的办法是在遇到该问题时,依然能让job正常运行,因此就需要在发现
zk_offset<earliest_offset
时矫正zk_offset为合法值。 

同样使用Spark Streaming ‘numRecords must not be negative’问题解决,解决思路的方法。 

代码:
package com.frey.v1.utils.kafka;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;

import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* KafkaOffsetTool
*
* @author FREY
* @date 2016/4/11
*/
public class KafkaOffsetTool {

private static KafkaOffsetTool instance;
final int TIMEOUT = 100000;
final int BUFFERSIZE = 64 * 1024;

private KafkaOffsetTool() {
}

public static synchronized KafkaOffsetTool getInstance() {
if (instance == null) {
instance = new KafkaOffsetTool();
}
return instance;
}

public Map<TopicAndPartition, Long> getLastOffset(String brokerList, List<String> topics,
String groupId) {

Map<TopicAndPartition, Long> topicAndPartitionLongMap = Maps.newHashMap();

Map<TopicAndPartition, Broker> topicAndPartitionBrokerMap =
KafkaOffsetTool.getInstance().findLeader(brokerList, topics);

for (Map.Entry<TopicAndPartition, Broker> topicAndPartitionBrokerEntry : topicAndPartitionBrokerMap
.entrySet()) {
// get leader broker
Broker leaderBroker = topicAndPartitionBrokerEntry.getValue();

SimpleConsumer simpleConsumer = new SimpleConsumer(leaderBroker.host(), leaderBroker.port(),
TIMEOUT, BUFFERSIZE, groupId);

long readOffset = getTopicAndPartitionLastOffset(simpleConsumer,
topicAndPartitionBrokerEntry.getKey(), groupId);

topicAndPartitionLongMap.put(topicAndPartitionBrokerEntry.getKey(), readOffset);

}

return topicAndPartitionLongMap;

}

/**
*
* @param brokerList
* @param topics
* @param groupId
* @return
*/
public Map<TopicAndPartition, Long> getEarliestOffset(String brokerList, List<String> topics,
String groupId) {

Map<TopicAndPartition, Long> topicAndPartitionLongMap = Maps.newHashMap();

Map<TopicAndPartition, Broker> topicAndPartitionBrokerMap =
KafkaOffsetTool.getInstance().findLeader(brokerList, topics);

for (Map.Entry<TopicAndPartition, Broker> topicAndPartitionBrokerEntry : topicAndPartitionBrokerMap
.entrySet()) {
// get leader broker
Broker leaderBroker = topicAndPartitionBrokerEntry.getValue();

SimpleConsumer simpleConsumer = new SimpleConsumer(leaderBroker.host(), leaderBroker.port(),
TIMEOUT, BUFFERSIZE, groupId);

long readOffset = getTopicAndPartitionEarliestOffset(simpleConsumer,
topicAndPartitionBrokerEntry.getKey(), groupId);

topicAndPartitionLongMap.put(topicAndPartitionBrokerEntry.getKey(), readOffset);

}

return topicAndPartitionLongMap;

}

/**
* 得到所有的 TopicAndPartition
*
* @param brokerList
* @param topics
* @return topicAndPartitions
*/
private Map<TopicAndPartition, Broker> findLeader(String brokerList, List<String> topics) {
// get broker's url array
String[] brokerUrlArray = getBorkerUrlFromBrokerList(brokerList);
// get broker's port map
Map<String, Integer> brokerPortMap = getPortFromBrokerList(brokerList);

// create array list of TopicAndPartition
Map<TopicAndPartition, Broker> topicAndPartitionBrokerMap = Maps.newHashMap();

for (String broker : brokerUrlArray) {

SimpleConsumer consumer = null;
try {
// new instance of simple Consumer
consumer = new SimpleConsumer(broker, brokerPortMap.get(broker), TIMEOUT, BUFFERSIZE,
"leaderLookup" + new Date().getTime());

TopicMetadataRequest req = new TopicMetadataRequest(topics);

TopicMetadataResponse resp = consumer.send(req);

List<TopicMetadata> metaData = resp.topicsMetadata();

for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
TopicAndPartition topicAndPartition =
new TopicAndPartition(item.topic(), part.partitionId());
topicAndPartitionBrokerMap.put(topicAndPartition, part.leader());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (consumer != null)
consumer.close();
}
}
return topicAndPartitionBrokerMap;
}

/**
* get last offset
* @param consumer
* @param topicAndPartition
* @param clientName
* @return
*/
private long getTopicAndPartitionLastOffset(SimpleConsumer consumer,
TopicAndPartition topicAndPartition, String clientName) {
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();

requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
kafka.api.OffsetRequest.LatestTime(), 1));

OffsetRequest request = new OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
clientName);

OffsetResponse response = consumer.getOffsetsBefore(request);

if (response.hasError()) {
System.out
.println("Error fetching data Offset Data the Broker. Reason: "
+ response.errorCode(topicAndPartition.topic(), topicAndPartition.partition()));
return 0;
}
long[] offsets = response.offsets(topicAndPartition.topic(), topicAndPartition.partition());
return offsets[0];
}

/**
* get earliest offset
* @param consumer
* @param topicAndPartition
* @param clientName
* @return
*/
private long getTopicAndPartitionEarliestOffset(SimpleConsumer consumer,
TopicAndPartition topicAndPartition, String clientName) {
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();

requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
kafka.api.OffsetRequest.EarliestTime(), 1));

OffsetRequest request = new OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
clientName);

OffsetResponse response = consumer.getOffsetsBefore(request);

if (response.hasError()) {
System.out
.println("Error fetching data Offset Data the Broker. Reason: "
+ response.errorCode(topicAndPartition.topic(), topicAndPartition.partition()));
return 0;
}
long[] offsets = response.offsets(topicAndPartition.topic(), topicAndPartition.partition());
return offsets[0];
}
/**
* 得到所有的broker url
*
* @param brokerlist
* @return
*/
private String[] getBorkerUrlFromBrokerList(String brokerlist) {
String[] brokers = brokerlist.split(",");
for (int i = 0; i < brokers.length; i++) {
brokers[i] = brokers[i].split(":")[0];
}
return brokers;
}

/**
* 得到broker url 与 其port 的映射关系
*
* @param brokerlist
* @return
*/
private Map<String, Integer> getPortFromBrokerList(String brokerlist) {
Map<String, Integer> map = new HashMap<String, Integer>();
String[] brokers = brokerlist.split(",");
for (String item : brokers) {
String[] itemArr = item.split(":");
if (itemArr.length > 1) {
map.put(itemArr[0], Integer.parseInt(itemArr[1]));
}
}
return map;
}

public static void main(String[] args) {
List<String> topics = Lists.newArrayList();
topics.add("my_topic");
//    topics.add("bugfix");
Map<TopicAndPartition, Long> topicAndPartitionLongMap =
KafkaOffsetTool.getInstance().getEarliestOffset("broker1:9092,broker2:9092", topics,
"com.frey.group");

for (Map.Entry<TopicAndPartition, Long> entry : topicAndPartitionLongMap.entrySet()) {
System.out.println(entry.getKey().topic() + "-"+ entry.getKey().partition() + ":" + entry.getValue());
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252

矫正offset核心代码:
/** 以下 矫正 offset */

// lastest offsets
Map<TopicAndPartition, Long> lastestTopicAndPartitionLongMap =
KafkaOffsetTool.getInstance().getLastOffset(kafkaParams.get("metadata.broker.list"),
Lists.newArrayList(topicsSet), kafkaParams.get(Constants.KAFKA_CONSUMER_GROUP_ID));

// earliest offsets
Map<TopicAndPartition, Long> earliestTopicAndPartitionLongMap =
KafkaOffsetTool.getInstance().getEarliestOffset(kafkaParams.get("metadata.broker.list"),
Lists.newArrayList(topicsSet), kafkaParams.get(Constants.KAFKA_CONSUMER_GROUP_ID));

for (Map.Entry<TopicAndPartition, Long> topicAndPartitionLongEntry : fromOffsets.entrySet()) {

long zkOffset = topicAndPartitionLongEntry.getValue();
long lastestOffset = lastestTopicAndPartitionLongMap.get(topicAndPartitionLongEntry.getKey());
long earliestOffset = earliestTopicAndPartitionLongMap.get(topicAndPartitionLongEntry.getKey());
// zkoffset 不在可用message offset区间内
if (zkOffset > lastestOffset || zkOffset < earliestOffset) {
// set offset = earliestOffset
logger.warn("矫正offset: " + zkOffset +" -> "+ earliestOffset);
topicAndPartitionLongEntry.setValue(earliestOffset);
}
}
/** 以上 矫正 offset */
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐