Spark streaming kafka OffsetOutOfRangeException 异常分析与解决
2017-03-29 10:46
567 查看
Spark streaming kafka OffsetOutOfRangeException
异常分析与解决
原文地址:
http://blog.csdn.net/xueba207/article/details/51174818
自从把Spark 从1.3升级到1.6之后,kafka Streaming相关问题频出。最近又遇到了一个。
job中使用Kafka DirectStream 读取topic中数据,然后做处理。其中有个测试job,停止了几天,再次启动时爆出了kafka.common.OffsetOutOfRangeException。下文记录下异常分析与解决过程。
从字面意思上,说是kafka topic的offset越界异常;在job中使用的是Kafka DirectStream,每成功处理一批数据,就把对应的offset更新到zookeeper中;和数组越界异常一样,offset越界应该分为头越界和尾越界,如下图所示。
头部越界: zookeeper中保存的offset在topic中仍然存在的最老message的offset之前时(zk_offset < earliest_offset);
尾部越界: zookeeper中保存的offset在topic中最新message的offset之后时(zk_offset > last_offset)
因为代码中采用了之前文章的方法,因此不可能是尾部越界,因此猜测是头部越界。
是什么导致头部越界呢?
考虑到kafka broker配置中修改了message的保持时间为24小时:
log.retention.hours=24(The minimum age of a log file to be eligible for deletion)
因此,应该是kafka 中未被消费的数据被broker清除了,使得zk中的offset落在仍存在的最老message offset的左侧,本来合法的offset变得不非法了。
改kafka broker 的retention time 为2分钟
配置文件
kafka/config/server.properties
修改完成后重启kafka。
使用zk shell 命令得到解析器所保存的zk_offset
停止spark streaming kafka DirectStream job
发送数据到kafka topic,等待一段时间(超过两分钟)
启动streaming job,复现该异常。
通过异常验证可以导致异常的原因为:kafka broker因为log.retention.hours的配置,导致topic中有些数据被清除,而在retention时间范围内streaming job都没有把将要被清除的message消费掉,因此zk中offset落在了earliest_offset的左侧,引发异常。
首先想到的方法就是 streaming job要及时消费掉topic中的数据,消费延迟不得大于log.retention.time的配置。
但是更好的办法是在遇到该问题时,依然能让job正常运行,因此就需要在发现
同样使用Spark Streaming ‘numRecords must not be negative’问题解决,解决思路的方法。
代码:
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
矫正offset核心代码:
异常分析与解决
原文地址:
http://blog.csdn.net/xueba207/article/details/51174818
自从把Spark 从1.3升级到1.6之后,kafka Streaming相关问题频出。最近又遇到了一个。
job中使用Kafka DirectStream 读取topic中数据,然后做处理。其中有个测试job,停止了几天,再次启动时爆出了kafka.common.OffsetOutOfRangeException。下文记录下异常分析与解决过程。
异常分析
从字面意思上,说是kafka topic的offset越界异常;在job中使用的是Kafka DirectStream,每成功处理一批数据,就把对应的offset更新到zookeeper中;和数组越界异常一样,offset越界应该分为头越界和尾越界,如下图所示。 头部越界: zookeeper中保存的offset在topic中仍然存在的最老message的offset之前时(zk_offset < earliest_offset);
尾部越界: zookeeper中保存的offset在topic中最新message的offset之后时(zk_offset > last_offset)
因为代码中采用了之前文章的方法,因此不可能是尾部越界,因此猜测是头部越界。
是什么导致头部越界呢?
考虑到kafka broker配置中修改了message的保持时间为24小时:
log.retention.hours=24(The minimum age of a log file to be eligible for deletion)
因此,应该是kafka 中未被消费的数据被broker清除了,使得zk中的offset落在仍存在的最老message offset的左侧,本来合法的offset变得不非法了。
验证猜测
改kafka broker 的retention time 为2分钟 配置文件
kafka/config/server.properties
log.retention.hours=168 -> log.retention.minutes=2
修改完成后重启kafka。
使用zk shell 命令得到解析器所保存的zk_offset
停止spark streaming kafka DirectStream job
发送数据到kafka topic,等待一段时间(超过两分钟)
启动streaming job,复现该异常。
通过异常验证可以导致异常的原因为:kafka broker因为log.retention.hours的配置,导致topic中有些数据被清除,而在retention时间范围内streaming job都没有把将要被清除的message消费掉,因此zk中offset落在了earliest_offset的左侧,引发异常。
解决方法
首先想到的方法就是 streaming job要及时消费掉topic中的数据,消费延迟不得大于log.retention.time的配置。 但是更好的办法是在遇到该问题时,依然能让job正常运行,因此就需要在发现
zk_offset<earliest_offset时矫正zk_offset为合法值。
同样使用Spark Streaming ‘numRecords must not be negative’问题解决,解决思路的方法。
代码:
package com.frey.v1.utils.kafka; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import kafka.api.PartitionOffsetRequestInfo; import kafka.cluster.Broker; import kafka.common.TopicAndPartition; import kafka.javaapi.*; import kafka.javaapi.consumer.SimpleConsumer; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; /** * KafkaOffsetTool * * @author FREY * @date 2016/4/11 */ public class KafkaOffsetTool { private static KafkaOffsetTool instance; final int TIMEOUT = 100000; final int BUFFERSIZE = 64 * 1024; private KafkaOffsetTool() { } public static synchronized KafkaOffsetTool getInstance() { if (instance == null) { instance = new KafkaOffsetTool(); } return instance; } public Map<TopicAndPartition, Long> getLastOffset(String brokerList, List<String> topics, String groupId) { Map<TopicAndPartition, Long> topicAndPartitionLongMap = Maps.newHashMap(); Map<TopicAndPartition, Broker> topicAndPartitionBrokerMap = KafkaOffsetTool.getInstance().findLeader(brokerList, topics); for (Map.Entry<TopicAndPartition, Broker> topicAndPartitionBrokerEntry : topicAndPartitionBrokerMap .entrySet()) { // get leader broker Broker leaderBroker = topicAndPartitionBrokerEntry.getValue(); SimpleConsumer simpleConsumer = new SimpleConsumer(leaderBroker.host(), leaderBroker.port(), TIMEOUT, BUFFERSIZE, groupId); long readOffset = getTopicAndPartitionLastOffset(simpleConsumer, topicAndPartitionBrokerEntry.getKey(), groupId); topicAndPartitionLongMap.put(topicAndPartitionBrokerEntry.getKey(), readOffset); } return topicAndPartitionLongMap; } /** * * @param brokerList * @param topics * @param groupId * @return */ public Map<TopicAndPartition, Long> getEarliestOffset(String brokerList, List<String> topics, String groupId) { Map<TopicAndPartition, Long> topicAndPartitionLongMap = Maps.newHashMap(); Map<TopicAndPartition, Broker> topicAndPartitionBrokerMap = KafkaOffsetTool.getInstance().findLeader(brokerList, topics); for (Map.Entry<TopicAndPartition, Broker> topicAndPartitionBrokerEntry : topicAndPartitionBrokerMap .entrySet()) { // get leader broker Broker leaderBroker = topicAndPartitionBrokerEntry.getValue(); SimpleConsumer simpleConsumer = new SimpleConsumer(leaderBroker.host(), leaderBroker.port(), TIMEOUT, BUFFERSIZE, groupId); long readOffset = getTopicAndPartitionEarliestOffset(simpleConsumer, topicAndPartitionBrokerEntry.getKey(), groupId); topicAndPartitionLongMap.put(topicAndPartitionBrokerEntry.getKey(), readOffset); } return topicAndPartitionLongMap; } /** * 得到所有的 TopicAndPartition * * @param brokerList * @param topics * @return topicAndPartitions */ private Map<TopicAndPartition, Broker> findLeader(String brokerList, List<String> topics) { // get broker's url array String[] brokerUrlArray = getBorkerUrlFromBrokerList(brokerList); // get broker's port map Map<String, Integer> brokerPortMap = getPortFromBrokerList(brokerList); // create array list of TopicAndPartition Map<TopicAndPartition, Broker> topicAndPartitionBrokerMap = Maps.newHashMap(); for (String broker : brokerUrlArray) { SimpleConsumer consumer = null; try { // new instance of simple Consumer consumer = new SimpleConsumer(broker, brokerPortMap.get(broker), TIMEOUT, BUFFERSIZE, "leaderLookup" + new Date().getTime()); TopicMetadataRequest req = new TopicMetadataRequest(topics); TopicMetadataResponse resp = consumer.send(req); List<TopicMetadata> metaData = resp.topicsMetadata(); for (TopicMetadata item : metaData) { for (PartitionMetadata part : item.partitionsMetadata()) { TopicAndPartition topicAndPartition = new TopicAndPartition(item.topic(), part.partitionId()); topicAndPartitionBrokerMap.put(topicAndPartition, part.leader()); } } } catch (Exception e) { e.printStackTrace(); } finally { if (consumer != null) consumer.close(); } } return topicAndPartitionBrokerMap; } /** * get last offset * @param consumer * @param topicAndPartition * @param clientName * @return */ private long getTopicAndPartitionLastOffset(SimpleConsumer consumer, TopicAndPartition topicAndPartition, String clientName) { Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo( kafka.api.OffsetRequest.LatestTime(), 1)); OffsetRequest request = new OffsetRequest( requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { System.out .println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topicAndPartition.topic(), topicAndPartition.partition())); return 0; } long[] offsets = response.offsets(topicAndPartition.topic(), topicAndPartition.partition()); return offsets[0]; } /** * get earliest offset * @param consumer * @param topicAndPartition * @param clientName * @return */ private long getTopicAndPartitionEarliestOffset(SimpleConsumer consumer, TopicAndPartition topicAndPartition, String clientName) { Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo( kafka.api.OffsetRequest.EarliestTime(), 1)); OffsetRequest request = new OffsetRequest( requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { System.out .println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topicAndPartition.topic(), topicAndPartition.partition())); return 0; } long[] offsets = response.offsets(topicAndPartition.topic(), topicAndPartition.partition()); return offsets[0]; } /** * 得到所有的broker url * * @param brokerlist * @return */ private String[] getBorkerUrlFromBrokerList(String brokerlist) { String[] brokers = brokerlist.split(","); for (int i = 0; i < brokers.length; i++) { brokers[i] = brokers[i].split(":")[0]; } return brokers; } /** * 得到broker url 与 其port 的映射关系 * * @param brokerlist * @return */ private Map<String, Integer> getPortFromBrokerList(String brokerlist) { Map<String, Integer> map = new HashMap<String, Integer>(); String[] brokers = brokerlist.split(","); for (String item : brokers) { String[] itemArr = item.split(":"); if (itemArr.length > 1) { map.put(itemArr[0], Integer.parseInt(itemArr[1])); } } return map; } public static void main(String[] args) { List<String> topics = Lists.newArrayList(); topics.add("my_topic"); // topics.add("bugfix"); Map<TopicAndPartition, Long> topicAndPartitionLongMap = KafkaOffsetTool.getInstance().getEarliestOffset("broker1:9092,broker2:9092", topics, "com.frey.group"); for (Map.Entry<TopicAndPartition, Long> entry : topicAndPartitionLongMap.entrySet()) { System.out.println(entry.getKey().topic() + "-"+ entry.getKey().partition() + ":" + entry.getValue()); } } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
矫正offset核心代码:
/** 以下 矫正 offset */ // lastest offsets Map<TopicAndPartition, Long> lastestTopicAndPartitionLongMap = KafkaOffsetTool.getInstance().getLastOffset(kafkaParams.get("metadata.broker.list"), Lists.newArrayList(topicsSet), kafkaParams.get(Constants.KAFKA_CONSUMER_GROUP_ID)); // earliest offsets Map<TopicAndPartition, Long> earliestTopicAndPartitionLongMap = KafkaOffsetTool.getInstance().getEarliestOffset(kafkaParams.get("metadata.broker.list"), Lists.newArrayList(topicsSet), kafkaParams.get(Constants.KAFKA_CONSUMER_GROUP_ID)); for (Map.Entry<TopicAndPartition, Long> topicAndPartitionLongEntry : fromOffsets.entrySet()) { long zkOffset = topicAndPartitionLongEntry.getValue(); long lastestOffset = lastestTopicAndPartitionLongMap.get(topicAndPartitionLongEntry.getKey()); long earliestOffset = earliestTopicAndPartitionLongMap.get(topicAndPartitionLongEntry.getKey()); // zkoffset 不在可用message offset区间内 if (zkOffset > lastestOffset || zkOffset < earliestOffset) { // set offset = earliestOffset logger.warn("矫正offset: " + zkOffset +" -> "+ earliestOffset); topicAndPartitionLongEntry.setValue(earliestOffset); } } /** 以上 矫正 offset */
相关文章推荐
- spark streaming kafka OffsetOutOfRangeException 异常分析与解决
- java.lang.NoSuchMethodError和kafka.common.OffsetOutOfRangeException(Spark)
- Kafka kafka.common.OffsetOutOfRangeException 问题处理
- Kafka 副本OffsetOutOfRangeException
- kafka的OffsetOutOfRangeException
- 解决图片缩放时异常:java.lang.IllegalArgumentException: pointerIndex out of range
- eclipse Java文件打开异常解决方法 :java.lang.StringIndexOutOfBoundsException: String index out of range: 26
- Caused by: kafka.common.OffsetOutOfRangeException
- ASP.net 异常详细信息: System.ArgumentOutOfRangeException: 索引超出范围。必须为非负值并小于集合大小。解决方法
- kafka.common.OffsetOutOfRangeException 问题处理
- AsParallel \AsQueryable<T>().ToList() [System.ArgumentOutOfRangeException was unhandled" 索引超出范围。必须为非负值并小于集合大小]解决方法
- SQL Server出现System.OutOfMemoryException异常的解决方法
- kafka-storm spout拉取数据问题offset out of range(todo)
- 解决多指操作放大缩小 指针错误 java.lang.IllegalArgumentException: pointerIndex out of range
- sqlserver,执行生成脚本时“引发类型为“System.OutOfMemoryException”的异常”(已解决)
- java.lang.IllegalArgumentException: pointerIndex out of range 问题的两种解决办法
- sqlserver,执行生成脚本时“引发类型为“System.OutOfMemoryException”的异常”(已解决)
- (WeakReference )弱引用解决OutOfMemoryException异常
- C# B/S程序中使用DropDownList出现System.ArgumentOutOfRangeException: “DropDownList1”有一个无效 SelectedValue,因为它不在项目列表中的解决方法
- java.lang.IllegalArgumentException: timeout arguments out of range异常