您的位置:首页 > 其它

Kafka异常处理(消费者不消费数据)

2018-01-05 14:04 826 查看


问题

生产问题,OffsetMonitor 监控发现运行了一个月的kafka突然间消费有lag.而且消费端不消费数据


分析

在客户端写try..catch…捕获异常: 



2017-08-27 09:47:48,103 ERROR [com.ecar.eoc.message.platform.kafka.Kafka211Context] - [kafka_Exception———->>org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions
to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms

, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.]


解决

暂时没有找到引起该报错的根本原因。不过提醒如果客户端失败,是需要重连服务器的。修改消费客户端的代码,异常重连。

监控KafkaConsumer对象
KafkaConsumer<String, String> c = null;
1

异常捕获后KafkaConsumer置为null



监控KafkaConsumer对象为null后重新初始化代码。 



整个代码见附件(公司对java代码加密了,只能用txt文件。下载请转为.java):
package com.ecar.eoc.message.platform.kafka;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.cons
4000
umer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.log4j.Logger;

import com.ecar.commons.cf.common.constant.ClientConstant;
import com.ecar.commons.cf.common.context.CFContext;
import com.ecar.commons.cf.common.context.CfMonitor;
import com.ecar.commons.cf.common.exception.CfException;
import com.ecar.commons.cf.common.vo.Head;
import com.ecar.commons.cf.common.vo.MsgDatagram;
import com.ecar.commons.cf.mq.async.ThreadPoolManager;
import com.ecar.commons.cmf.init.context.SpringConext;
import com.ecar.commons.cmf.util.PropertiesUtil;
import com.ecar.commons.cmf.util.XmlConfigUtil;
import com.ecar.commons.mail.bean.MailSenderInfo;
import com.ecar.commons.mail.utils.SimpleMailSender;
import com.ecar.eoc.message.platform.business.IMqService;
import com.ecar.eoc.message.platform.exception.MessageException;
import com.ecar.eoc.message.platform.utils.CmdIDUtil;

public class Kafka211Context implements CFContext, CfMonitor
{
private static Logger logger = Logger.getLogger(Kafka211Context.class);

private static Kafka211Context conext = null;

private int state = ClientConstant.INT_SCUUCED;

private static Long time = System.currentTimeMillis(); // 最近的kafka消费时间

// 监控间隔时间
private int monitorIntervalTime;

private Long lastWarnTime;

KafkaConsumer<String, String> c = null;

public static Kafka211Context getConext()
{
if (conext != null)
{

return conext;
}
synchronized (Kafka211Context.class)
{
if (conext != null)
{

return conext;
}
conext = new Kafka211Context();

}

return conext;
}

public static void setConext(Kafka211Context conext)
{
Kafka211Context.conext = conext;
}

@SuppressWarnings("static-access")
@Override
public void init() throws CfException
{
this.monitorIntervalTime = XmlConfigUtil.getValueInt("config.mq-config.monitorIntervalTime");
if (monitorIntervalTime == 0)
{
this.monitorIntervalTime = 10000;
}
this.conext = this;

}

@Override
public void destroy() throws CfException
{
logger.info("Kafka context destroy...");

}

@Override
public void start() throws CfException
{
synchronized (this)
{
if (null != conext)
{
try {
new Thread(this).start();
} catch (Exception e1) {
e1.printStackTrace();
}
// 启动一个监控线程,监控kafka取数据程序
new Thread(new Runnable() {
@Override
public void run() {
lastWarnTime = time;
while(true){
try {
if (System.currentTimeMillis() - time > 1 * 60 * 1000) {
logger.error("kafak error...........................");
int durations = PropertiesUtil.getValueInt(
"kafka_error_email_notice", 10);
if (lastWarnTime == null || System.currentTimeMillis() - lastWarnTime > durations * 60 * 1000){
lastWarnTime = System.currentTimeMillis();
// send message
String receiverMail = PropertiesUtil.getValueString(
"UBI_WEEK_REPORT_RECEIVE_MAIL",
"373934266@qq.com,houmingbo@e-car.cn");
SimpleMailSender sender = new SimpleMailSender();
MailSenderInfo mailInfo = new MailSenderInfo();
mailInfo.setToAddress(receiverMail);
mailInfo.setContent("ip为"
+ PropertiesUtil.getValueString(
"kafka_error_server_ip", "未配置")
+ "的服务器有" + durations
+ "分钟未从kafka消费到数据,请及时处理!!!");
mailInfo.setSubject("trip服务器报警");
boolean result = sender.sendHtmlMailSSL(mailInfo);
if (result) {
logger.info("sendEmailForWeekReport send mail success!!!");
} else {
logger.info("sendEmailForWeekReport send mail fail!!!");
}
}
}
} catch (Exception e2) {
logger.error("kafka zhou monitor error,cause by ", e2);
}

try {
Thread.sleep(15000l);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();

}

}
}

private void customering()
{
try
{
// 资源库
Properties properties = new Properties();
properties.put("bootstrap.servers", PropertiesUtil.getValueString("kafka.bootstrap.servers"));
// 设置不自动提交,自己手动更新offset
properties.put("enable.auto.commit", "false");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", PropertiesUtil.getValueString("kafka.group.id", ""));
properties.put("auto.commit.interval.ms", "1000");
// ExecutorService executor = Executors.newFixedThreadPool(200);

if (Boolean.valueOf(PropertiesUtil.getValueString("kafka.production.environment", "false")))
{
// 执行消费

c = new KafkaConsumer<String, String>(properties);
c.subscribe(Arrays.asList(PropertiesUtil.getValueString("kafka.topic", "")));
List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();
IMqService ser = null;
// 批量提交数量
final int minBatchSize = 1;
ThreadPoolManager poolManager = ThreadPoolManager.getInstance();
while (true)
{
ConsumerRecords<String, String> records = c.poll(100);

for (ConsumerRecord<String, String> record : records){
time = System.currentTimeMillis();
try
{

String[] value = record.value().split("@@@");
MsgDatagram datagram = com.alibaba.fastjson.JSONObject.parseObject(value[1], MsgDatagram.class);

String cmdId = datagram.getCmdId();

ser = getMqServiceByCmdID(cmdId);
if (ser == null)
{
throw new MessageException("40002", "CMDID对应的IMqService为空");
}
Head head = new Head();
head.setAppID(datagram.getAppType());
head.setCmdId(datagram.getCmdId());
head.setSerialNo("" + datagram.getSerialNo());
head.setTerminalId(datagram.getTerminalId());
head.setToken(datagram.getToken());
Map<String, String> map = new HashMap<String, String>();
String topicName = value[0];
poolManager.execute(new ConsumerThread(ser, datagram, topicName, head, map));
}
catch (MessageException e)
{
logger.error("消费者的名字为:" + ",消费的消息为:" + record.value() + e);
}
catch (Exception e)
{
logger.error("消费者的名字为:" + ",消费的消息为:" + record.value() + e);
}

buffer.add(record);
}
if (buffer.size() >= minBatchSize)
{
// 这里就是处理成功了然后自己手动提交
c.commitSync();
// LOG.info("提交完毕");
buffer.clear();
}
}
}
}
catch (Exception e)
{
c=null;
logger.error("kafka_Exception---------->>" + e);
}
}

@Override
public void monitor() throws CfException
{
if (null == c)
{
this.customering();
}
}

@Override
public void run()
{

conext.customering();
while (true){
try{
this.monitor();
Thread.sleep(monitorIntervalTime);
}
catch (Exception e)
{
logger.error("Kafka monitor error,", e);

try
{
Thread.sleep(2000L);
}
catch (InterruptedException e1)
{

}
}

}

}

private IMqService getMqServiceByCmdID(String cmdId) throws MessageException
{
if (null == cmdId)
{
throw new MessageException("40001", "CMDID为空");
}

return (IMqService) SpringConext.getSpring().getBean(CmdIDUtil.getSpringBeanName(cmdId));
}

public int getState()
{
return state;
}

public int getMonitorIntervalTime()
{
return monitorIntervalTime;
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐