Kafka异常处理(消费者不消费数据)
2018-01-05 14:04
826 查看
问题
生产问题,OffsetMonitor 监控发现运行了一个月的kafka突然间消费有lag.而且消费端不消费数据
分析
在客户端写try..catch…捕获异常: 2017-08-27 09:47:48,103 ERROR [com.ecar.eoc.message.platform.kafka.Kafka211Context] - [kafka_Exception———->>org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions
to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms
, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.]
解决
暂时没有找到引起该报错的根本原因。不过提醒如果客户端失败,是需要重连服务器的。修改消费客户端的代码,异常重连。监控KafkaConsumer对象
KafkaConsumer<String, String> c = null;1
异常捕获后KafkaConsumer置为null
监控KafkaConsumer对象为null后重新初始化代码。
整个代码见附件(公司对java代码加密了,只能用txt文件。下载请转为.java):
package com.ecar.eoc.message.platform.kafka; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.cons 4000 umer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.log4j.Logger; import com.ecar.commons.cf.common.constant.ClientConstant; import com.ecar.commons.cf.common.context.CFContext; import com.ecar.commons.cf.common.context.CfMonitor; import com.ecar.commons.cf.common.exception.CfException; import com.ecar.commons.cf.common.vo.Head; import com.ecar.commons.cf.common.vo.MsgDatagram; import com.ecar.commons.cf.mq.async.ThreadPoolManager; import com.ecar.commons.cmf.init.context.SpringConext; import com.ecar.commons.cmf.util.PropertiesUtil; import com.ecar.commons.cmf.util.XmlConfigUtil; import com.ecar.commons.mail.bean.MailSenderInfo; import com.ecar.commons.mail.utils.SimpleMailSender; import com.ecar.eoc.message.platform.business.IMqService; import com.ecar.eoc.message.platform.exception.MessageException; import com.ecar.eoc.message.platform.utils.CmdIDUtil; public class Kafka211Context implements CFContext, CfMonitor { private static Logger logger = Logger.getLogger(Kafka211Context.class); private static Kafka211Context conext = null; private int state = ClientConstant.INT_SCUUCED; private static Long time = System.currentTimeMillis(); // 最近的kafka消费时间 // 监控间隔时间 private int monitorIntervalTime; private Long lastWarnTime; KafkaConsumer<String, String> c = null; public static Kafka211Context getConext() { if (conext != null) { return conext; } synchronized (Kafka211Context.class) { if (conext != null) { return conext; } conext = new Kafka211Context(); } return conext; } public static void setConext(Kafka211Context conext) { Kafka211Context.conext = conext; } @SuppressWarnings("static-access") @Override public void init() throws CfException { this.monitorIntervalTime = XmlConfigUtil.getValueInt("config.mq-config.monitorIntervalTime"); if (monitorIntervalTime == 0) { this.monitorIntervalTime = 10000; } this.conext = this; } @Override public void destroy() throws CfException { logger.info("Kafka context destroy..."); } @Override public void start() throws CfException { synchronized (this) { if (null != conext) { try { new Thread(this).start(); } catch (Exception e1) { e1.printStackTrace(); } // 启动一个监控线程,监控kafka取数据程序 new Thread(new Runnable() { @Override public void run() { lastWarnTime = time; while(true){ try { if (System.currentTimeMillis() - time > 1 * 60 * 1000) { logger.error("kafak error..........................."); int durations = PropertiesUtil.getValueInt( "kafka_error_email_notice", 10); if (lastWarnTime == null || System.currentTimeMillis() - lastWarnTime > durations * 60 * 1000){ lastWarnTime = System.currentTimeMillis(); // send message String receiverMail = PropertiesUtil.getValueString( "UBI_WEEK_REPORT_RECEIVE_MAIL", "373934266@qq.com,houmingbo@e-car.cn"); SimpleMailSender sender = new SimpleMailSender(); MailSenderInfo mailInfo = new MailSenderInfo(); mailInfo.setToAddress(receiverMail); mailInfo.setContent("ip为" + PropertiesUtil.getValueString( "kafka_error_server_ip", "未配置") + "的服务器有" + durations + "分钟未从kafka消费到数据,请及时处理!!!"); mailInfo.setSubject("trip服务器报警"); boolean result = sender.sendHtmlMailSSL(mailInfo); if (result) { logger.info("sendEmailForWeekReport send mail success!!!"); } else { logger.info("sendEmailForWeekReport send mail fail!!!"); } } } } catch (Exception e2) { logger.error("kafka zhou monitor error,cause by ", e2); } try { Thread.sleep(15000l); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } } } private void customering() { try { // 资源库 Properties properties = new Properties(); properties.put("bootstrap.servers", PropertiesUtil.getValueString("kafka.bootstrap.servers")); // 设置不自动提交,自己手动更新offset properties.put("enable.auto.commit", "false"); properties.put("auto.offset.reset", "earliest"); properties.put("session.timeout.ms", "30000"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("group.id", PropertiesUtil.getValueString("kafka.group.id", "")); properties.put("auto.commit.interval.ms", "1000"); // ExecutorService executor = Executors.newFixedThreadPool(200); if (Boolean.valueOf(PropertiesUtil.getValueString("kafka.production.environment", "false"))) { // 执行消费 c = new KafkaConsumer<String, String>(properties); c.subscribe(Arrays.asList(PropertiesUtil.getValueString("kafka.topic", ""))); List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>(); IMqService ser = null; // 批量提交数量 final int minBatchSize = 1; ThreadPoolManager poolManager = ThreadPoolManager.getInstance(); while (true) { ConsumerRecords<String, String> records = c.poll(100); for (ConsumerRecord<String, String> record : records){ time = System.currentTimeMillis(); try { String[] value = record.value().split("@@@"); MsgDatagram datagram = com.alibaba.fastjson.JSONObject.parseObject(value[1], MsgDatagram.class); String cmdId = datagram.getCmdId(); ser = getMqServiceByCmdID(cmdId); if (ser == null) { throw new MessageException("40002", "CMDID对应的IMqService为空"); } Head head = new Head(); head.setAppID(datagram.getAppType()); head.setCmdId(datagram.getCmdId()); head.setSerialNo("" + datagram.getSerialNo()); head.setTerminalId(datagram.getTerminalId()); head.setToken(datagram.getToken()); Map<String, String> map = new HashMap<String, String>(); String topicName = value[0]; poolManager.execute(new ConsumerThread(ser, datagram, topicName, head, map)); } catch (MessageException e) { logger.error("消费者的名字为:" + ",消费的消息为:" + record.value() + e); } catch (Exception e) { logger.error("消费者的名字为:" + ",消费的消息为:" + record.value() + e); } buffer.add(record); } if (buffer.size() >= minBatchSize) { // 这里就是处理成功了然后自己手动提交 c.commitSync(); // LOG.info("提交完毕"); buffer.clear(); } } } } catch (Exception e) { c=null; logger.error("kafka_Exception---------->>" + e); } } @Override public void monitor() throws CfException { if (null == c) { this.customering(); } } @Override public void run() { conext.customering(); while (true){ try{ this.monitor(); Thread.sleep(monitorIntervalTime); } catch (Exception e) { logger.error("Kafka monitor error,", e); try { Thread.sleep(2000L); } catch (InterruptedException e1) { } } } } private IMqService getMqServiceByCmdID(String cmdId) throws MessageException { if (null == cmdId) { throw new MessageException("40001", "CMDID为空"); } return (IMqService) SpringConext.getSpring().getBean(CmdIDUtil.getSpringBeanName(cmdId)); } public int getState() { return state; } public int getMonitorIntervalTime() { return monitorIntervalTime; } }
相关文章推荐
- Kafka异常处理(消费者不消费数据)
- 关于Spark Streaming微批次,Flink真正流处理 消费Kafka数据,处理数据的差距对比
- kafka消费者如何才能从头开始消费某个topic的全量数据
- Kafka消费异常处理
- 在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
- 快速入门系列--WCF--04元数据和异常处理
- kafka0.8生产者异常处理
- storm-kafka spout获取数据的异常解决办法
- Spark Streaming消费Kafka Direct方式数据零丢失实现
- 工作那点事4(处理Case——数据异常)
- 线程系列04,传递数据给线程,线程命名,线程异常处理,线程池
- Java:Unicode简介(处理数据显示异常)
- Kafka重复消费和丢失数据研究
- activity 异常被杀死数据保存的处理
- Scott Mitchell 的ASP.NET 2.0数据教程之十八:: 在ASP.NET页面中处理BLL/DAL层的异常
- kafka消费数据存入elasticsearch代码示例
- 使用两个不同类型的数据进行加法计算时,使用异常处理语句捕获由于数据类型错误而出现的异常,发生生成错误。是否继续并运行上次的成功生成?
- 超时流式处理 - 没有消息流入的数据异常监控
- 关于SpringKafka消费者的几个监听器:[一次处理单条消息和一次处理一批消息]以及[自动提交offset和手动提交offset]
- 关于kafka重新消费数据问题