Kafka学习笔记——使用Kafka记录APP的操作日志
2017-06-10 23:25
531 查看
上一篇文章我们讲到了Kafka的工作原理和如何使用Kafka的代码示例,这里我们开始讲解Kafka的实战,在实际的应用中我们如何使用kafka的。下面将介绍前台的操作日志定时推送到kafka,然后通过kafka将消息日志进行保存,方便大数据的统计分析形成运营报表。
我们先看看工程的目录结构:
kafka的版本是:
下面我们依次看下代码实现:
我们先看看工程的目录结构:
kafka的版本是:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.10.2.0</version> </dependency>
下面我们依次看下代码实现:
错误码字典类CodeConstant.java
public class CodeConstant { // 参数为空 public static int NULL_ERROR = -1; // 请求参数错误 public static int PARAM_ERROR = -2; // token错误 public static int TOKEN_ERROR = -3; }
返回信息实体类JsonMsg.java
public class JsonMsg { private int code; private String message; public int getCode() { return code; } public void setCode(int code) { this.code = code; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } }
kafka消息实体类Message.java
/** * kafka消息实体类 * @author fuyuwei * 2017年6月10日 下午10:57:17 */ public class Message implements Serializable { private static final long serialVersionUID = -6170235919490993626L; /** * 消息主键 */ protected String messageId; /** * 回复消息对应的源消息主键 */ protected String sourceMessageId; /** * 发送消息相关信息 */ protected String sender; /** * 消息体 */ protected byte[] messageBody; /** * 消息创建时间 */ protected long createTime; public Message(byte[] messageBody){ this.sender = getIps(); createMessageId(); this.messageBody = messageBody; this.createTime = System.currentTimeMillis(); } public String getIps(){ try { return InetAddress.getLocalHost().getHostAddress(); } catch (UnknownHostException e) { e.printStackTrace(); } return ""; } /** * 消息转为在消息中间件传输的内容 * @return * @throws BusinessException */ public String toJSONString() throws BusinessException { createMessageId(); try { return JsonUtil.toJSon(this); } catch (BusinessException e) { throw e; } } /** * 接收到的消息转为实体对象 * @param content 消息内容 * @return 消息实体 * @throws BusinessException */ public Message toMessage(String content) throws BusinessException{ return JsonUtil.readValue(content, Message.class); } public String toString(){ String date =null; try { SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); date = sdf.format(new Date(createTime)); } catch (Exception e) { } StringBuffer sb = new StringBuffer(); sb.append("messageId:"+this.messageId+"\r\n").append("sourceMessageId:"+this.messageId+"\r\n") .append("sender:"+sender+"\r\n").append("messageBody"+messageBody+"\r\n") .append("createTime="+date+"\r\n"); return sb.toString(); } public String getMessageId() { return messageId; } private void createMessageId() { this.messageId = sender+createUUID(); } private String createUUID(){ String id = UUID.randomUUID().toString(); return id.substring(0,8)+id.substring(9,13)+id.substring(14,18)+id.substring(19,23)+id.substring(24); //去掉“-”符号 } public String getSender() { return sender; } public void setSender(String sender) { this.sender = sender; } public long getCreateTime() { return createTime; } public void setCreateTime(long createTime) { this.createTime = createTime; } public String getSourceMessageId() { return sourceMessageId; } public void setSourceMessageId(String sourceMessageId) { this.sourceMessageId = sourceMessageId; } public byte[] getMessageBody() { return messageBody; } public void setMessageBody(byte[] messageBody) { this.messageBody = messageBody; } public void setMessageId(String messageId) { this.messageId = messageId; } }
Http访问类KafkaLogController.java
/** * kafka log接收器 * @author fuyuwei * 2017年6月10日 下午8:00:07 */ @Controller public class KafkaLogController { private static MessageProducer producer = MessageProducer.getInstance(); /** * 接收前台传来的日志字符串 * 既然采用Http协议请求,务必考虑传输的安全性,添加了请求的参数拦截校验 * @author fuyuwei * 2017年6月10日 下午8:01:36 * @param req * @param resp * @throws Throwable */ @RequestMapping(value = "/kafka/log/receiveLog.do", method= RequestMethod.POST) public void receiveLog(HttpServletRequest req, HttpServletResponse resp) throws Throwable{ ServletInputStream is = req.getInputStream(); byte[] bytes = readStream(is); if(bytes == null || bytes.length == 0){ JsonMsg msg = new JsonMsg(); msg.setCode(CodeConstant.NULL_ERROR); msg.setMessage("the request data is null"); // 不设置缓存 RespUtil.responJson(resp, msg, 0); return; } Message message = new Message(bytes); producer.sendMessage("appLog", message); BizLogger.info("receiveLog","appLog",message.getMessageId()); } /** * 把日志字符串转换为字节流数组 * @author fuyuwei * 2017年6月10日 下午8:05:20 * @param inStream * @return */ public static byte[] readStream(InputStream inStream){ ByteArrayOutputStream outStream = new ByteArrayOutputStream(); BufferedInputStream inputStream = new BufferedInputStream(inStream); try { byte[] buffer = new byte[1024]; int len = -1; while((len = inputStream.read(buffer)) != -1){ outStream.write(buffer,0,len); } return outStream.toByteArray(); } catch (IOException e) { BizLogger.error(e, "inputStream.read failure..."); } return null; } }
Spring启动加载类InitMessageConsumer.java
public class InitMessageConsumer implements InitializingBean, DisposableBean { public MessageConsumer consumer; @Override public void destroy() throws Exception { } @Override public void afterPropertiesSet() throws Exception { } public void initMethod() { BizLogger.info("init MessageReceiver start"); consumer = new MessageConsumer("appLog", 2,"app-group", new MessageConsumerExecutor()); try { consumer.receiveMessage(); } catch (Exception e) { BizLogger.error(e, "InitAndDestroySeqBean initMethod"); } BizLogger.info("MessageReceiver init finish!"); } public void destroyMethod() { if (null != consumer) { consumer.close(); } } }
拦截器AccessInteceptor.java
public class AccessInteceptor implements HandlerInterceptor { @Override public void afterCompletion(HttpServletRequest req, HttpServletResponse res, Object o, Exception e) throws Exception { } @Override public void postHandle(HttpServletRequest req, HttpServletResponse res, Object o, ModelAndView m) throws Exception { } @Override public boolean preHandle(HttpServletRequest req, HttpServletResponse res, Object o) throws Exception { String flagImmei = req.getHeader("flagImmei"); String tk = req.getHeader("token"); if(flagImmei.length() > 40){ JsonMsg msg = new JsonMsg(); msg.setCode(CodeConstant.PARAM_ERROR); msg.setMessage("the request data is null"); // 不设置缓存 RespUtil.responJson(res, msg, 0); return false; } if(!AppAESUtil.check(tk, flagImmei)){ JsonMsg msg = new JsonMsg(); msg.setCode(CodeConstant.TOKEN_ERROR); msg.setMessage("the token is error"); RespUtil.responJson(res, msg, 0); return false; } return true; } }
消息生产者MessageProducer.java
public class MessageProducer implements MessageService { private Producer<String, byte[]> producer; private static MessageProducer instance = null; /** * 初始化生产者 */ private MessageProducer() { try { Properties properties = new Properties(); properties.load(new ClassPathResource("producer.properties").getInputStream()); producer = new KafkaProducer<>(properties); } catch (IOException e) { BizLogger.error(e, "load producer file fail!"); } } /** * 单例模式 * @author fuyuwei * 2017年6月10日 下午8:44:05 * @return */ public static synchronized MessageProducer getInstance() { if(instance == null){ synchronized(MessageProducer.class){ if(instance == null){ instance = new MessageProducer(); } } } return instance; } /** * 发送消息 */ public boolean sendMessage(String topic, Message message) throws Exception { Collection<Message> messages = new ArrayList<Message>(); messages.add(message); return sendMessage(topic, messages); } /** * 批量发送消息 */ public boolean sendMessage(String topic, Collection<Message> messages) throws Exception { if (messages == null || messages.isEmpty()) { return false; } for (Message message : messages) { ProducerRecord<String, byte[]> km = new ProducerRecord<String, byte[]>(topic, message.getMessageId(), message.getMessageBody()); producer.send(km); } return true; } /** * 关闭发送客户端 */ public void close() { producer.close(); } }
消息消费者MessageConsumer.java
public class MessageConsumer { private String topic; private int partitionsNum; private String topicConsumerGroup; private MessageExecutor executor; private ConsumerConnector connector; private ExecutorService threadPool; public MessageConsumer(String topic, int partitionsNum,String topicConsumerGroup, MessageExecutor executor){ this.topic = topic; this.executor = executor; this.partitionsNum = partitionsNum; this.topicConsumerGroup = topicConsumerGroup; createConsumerConsumer(); } /** * 初始化消息消费者,创建connector * @author fuyuwei * 2017年6月10日 下午11:02:26 * @return */ private boolean createConsumerConsumer() { try{ Properties properties = new Properties(); properties.load(new ClassPathResource("consumer.properties").getInputStream()); properties.put("group.id",topicConsumerGroup); ConsumerConfig config=new ConsumerConfig(properties); connector=Consumer.createJavaConsumerConnector(config); return true; }catch (IOException e) { BizLogger.error(e, "MessageConsumer","init kafka consumer properties error"); } return false; } /** * 接收消息,并启动线程放到线程池执行 * @author fuyuwei * 2017年6月10日 下午11:02:51 * @throws Exception */ public void receiveMessage() throws Exception{ Map<String,Integer> topics = new HashMap<String,Integer>(); topics.put(topic, partitionsNum); Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics); List<KafkaStream<byte[], byte[]>> partitions = streams.get(topic); threadPool = Executors.newFixedThreadPool(partitionsNum); for(KafkaStream<byte[], byte[]> partition : partitions){ threadPool.submit(new ReceiverMessageRunner(partition)); } } public void close(){ try{ if (threadPool != null) threadPool.shutdownNow(); }catch(Exception e){ BizLogger.error(e, "MessageConsumer","close fail"); }finally{ if (connector != null) connector.shutdown(); } } private class ReceiverMessageRunner implements Runnable{ private KafkaStream<byte[], byte[]> partition; public ReceiverMessageRunner(KafkaStream<byte[], byte[]> partition) { this.partition = partition; } public void run(){ ConsumerIterator<byte[], byte[]> it = partition.iterator(); while(it.hasNext()){ MessageAndMetadata<byte[],byte[]> item = it.next(); executor.execute(item.message()); } } } }
执行消息的保存操作MessageConsumerExecutor.java
public class MessageConsumerExecutor implements MessageExecutor { @Override public void execute(byte[] message ) { try { BizLogger.info("ReceiverMessageExecutor","start Resolve message"); String random = randomString(); int totalLength = message.length; if(totalLength <= 4 ){ BizLogger.info("message length is not correct"); } byte[] header = new byte[4];// 4个字节的消息头 System.arraycopy(message, 0, header, 0, 4); int headerLength = Utility.byte2Int(header); if(headerLength >= totalLength){ BizLogger.info("message header is not correct","headerLength",headerLength,"totalLength",totalLength); return; } byte[] headerMessage = new byte[headerLength]; System.arraycopy(message, 4, headerMessage, 0, headerLength); BizLogger.info("start parse headerMessage"); NYMobStatHeader mobheader = NYMobStatHeader.parseFrom(headerMessage); BizLogger.info("header",mobheader.getAppVer(),mobheader.getSysVer(),mobheader.getSdkVer(),mobheader.getDeviceName(),mobheader.getTelcom(),mobheader.getImei(),mobheader.getNetworkType(),mobheader.getAppId(),mobheader.getUserId(),random + mobheader.getFileName()); int currentLength = 4 + headerLength; while (currentLength < totalLength) { byte[] bodyMessageFlag = new byte[4];// 4个字节代表消息体的长度 System.arraycopy(message, currentLength, bodyMessageFlag, 0, 4); int bodyLength = Utility.byte2Int(bodyMessageFlag); if(bodyLength >= totalLength){ BizLogger.info("message body is not correct"); return; } byte[] bodyMessage = new byte[bodyLength]; currentLength = currentLength + 4 ; System.arraycopy(message, currentLength, bodyMessage, 0, bodyLength); currentLength = currentLength + bodyLength; NYMobStatModel statModel = NYMobStatModel.parseFrom(bodyMessage); Map<String,String> maps = statModel.getEventAttributesMap(); StringBuffer keys = new StringBuffer(); if(maps != null){ Set<String> keySet=maps.keySet(); Iterator<String> iterator=keySet.iterator(); while(iterator.hasNext()){ String key=iterator.next(); String value = maps.get(key); keys.append(key).append(":").append(value).append(","); } } BizLogger.info("body",statModel.getDataType(),statModel.getCtime(),statModel.getEventId(),statModel.getEventLabel(),keys.toString(),statModel.getPageId(),statModel.getFromPageId(),statModel.getUserId(),random + mobheader.getFileName()); } } catch (InvalidProtocolBufferException e) { BizLogger.info("protobuff parse fail "); ErrorMessageLogger.info("ReceiverMessageExecutor","protobuff parse fail"); }catch (Exception e) { BizLogger.info("parse fail "); ErrorMessageLogger.info("ReceiverMessageExecutor","parse fail"); } } public static String randomString(){ String s = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; char[] c = s.toCharArray(); Random random = new Random(); StringBuffer buffer = new StringBuffer(); for(int i = 0;i< 5;i++){ buffer.append(c[random.nextInt(c.length)]); } return buffer.toString(); } }
定义保存消息操作接口类MessageExecutor.java
public interface MessageExecutor { public void execute(byte[] message) ; }
发送消息接口类MessageService.java
public interface MessageService { /** * 发送消息 * @param message 消息 * @return * @throws BusinessException */ public boolean sendMessage(String topic,Message message) throws Exception; /** * 批量发送消息 * @param messages 消息集合 * @return * @throws BusinessException */ public boolean sendMessage(String topic,Collection<Message> messages)throws Exception; }
序列化工具类GoogleprotobufUtils.java和往前台返回json信息的工具类RespUtil.java
public class RespUtil { /** * * @author fuyuwei * 2017年6月10日 下午8:23:41 * @param resp * @param msg * @param cachetime * @throws IOException */ public static void responJson(HttpServletResponse resp, JsonMsg msg,int cachetime) throws IOException { resp.setHeader("Access-Control-Allow-Origin", "*"); if (cachetime == 0) { resp.setHeader("Cache-Control", "no-cache"); resp.setHeader("Pragma", "no-cache"); } else { resp.setHeader("Cache-Control", (new StringBuilder()).append("max-age=").append(cachetime) .toString()); } resp.setContentType("application/json;charset=utf-8"); resp.getWriter().write(msg.toString()); resp.getWriter().close(); } }
消费者配置文件consumer.properties
zookeeper.connect=127.0.01:2181 # timeout in ms for connecting to zookeeper zookeeper.session.timeout.ms=20000 zookeeper.connectiontimeout.ms=1000000 zookeeper.sync.time.ms=20000 auto.commit.enable=true auto.commit.interval.ms=1000 queued.max.message.chunks=50 rebalance.max.retries=5 # 最大取多少块缓存到消费者(默认10) queued.max.message.chunks=50 # 每次feth将得到多条消息,此值为总大小,提升此值,将会消耗更多的consumer端内存 fetch.min.bytes=6553600 # 当消息的尺寸不足时,server阻塞的时间,如果超时,消息将立即发送给consumer fetch.wait.max.ms=5000 socket.receive.buffer.bytes=655360 auto.offset.reset=largest # 指定序列化处理类(mafka client API调用说明-->3.序列化约定wiki),默认为kafka.serializer.DefaultDecoder,即byte[] derializer.class=kafka.serializer.DefaultDecoder
生产者配置producer.properties
bootstrap.servers=127.0.01:9092 partitioner.class=org.apache.kafka.clients.producer.internals.DefaultPartitioner value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.StringSerializer buffer.memory=33554432 linger.ms=0 acks=1 request.timeout.ms=10000
Spring文件配置spring-mvc.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd"> <!-- jsp视图解析器 --> <bean id="jspViewResolver" class="org.springframework.web.servlet.view.InternalResourceViewResolver"> <property name="viewClass" value="org.springframework.web.servlet.view.JstlView"/> <!-- <property name="prefix" value="/jsp/" /> --> <property name="viewNames" value="/jsp/*"/> <property name="suffix" value=".jsp"/> <property name="order" value="1"/> </bean> <mvc:default-servlet-handler/> <context:annotation-config/> <mvc:annotation-driven /> <bean id="initAndDestroySeqBean" class="com.swk.init.InitMessageConsumer" init-method="initMethod" destroy-method="destroyMethod"/> <!-- 扫描控制器类 --> <context:component-scan base-package="com/swk/controller"></context:component-scan> <bean class="org.springframework.web.servlet.mvc.annotation.DefaultAnnotationHandlerMapping"/> <bean class="org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandlerAdapter"/> <!-- 采用注解方式配置MVC --> <mvc:annotation-driven content-negotiation-manager="contentNegotiationManager"> </mvc:annotation-driven> <mvc:interceptors> <mvc:interceptor> <mvc:mapping path="/kafka/log/*" /> <bean class="com.swk.interceptor.AccessInteceptor" /> </mvc:interceptor> </mvc:interceptors> <bean id="contentNegotiationManager" class="org.springframework.web.accept.ContentNegotiationManagerFactoryBean"> <property name="favorPathExtension" value="true"/> <property name="favorParameter" value="true"/> <property name="defaultContentType" value="text/html"/> <property name="mediaTypes"> <value> json=application/json xml=application/xml </value> </property> </bean> </beans>
相关文章推荐
- Kafka学习笔记——使用Kafka记录APP的操作日志
- MVC使用Log4Net进行错误日志记录学习笔记4
- MVC使用Log4Net进行错误日志记录学习笔记4
- 微软企业库5.0 学习之路——第九步、使用PolicyInjection模块进行AOP—PART4——建立自定义Call Handler实现用户操作日志记录
- AWS学习笔记(五)--启用CloudTrail记录AWS 账户操作日志
- [EntLib]微软企业库5.0 学习之路——第九步、使用PolicyInjection模块进行AOP—PART4——建立自定义Call Handler实现用户操作日志记录
- 学习WP7应用开发的笔记--在App和Page中使用资源集合的注意点1
- WPF and Silverlight 学习笔记(二十七):基本图形的使用(2)Path和位图操作
- C#--使用XML文件记录操作日志
- C#学习笔记14——Trace、Debug和TraceSource的使用以及日志设计
- ASP学习笔记1操作必须使用一个可更新的查询
- ORACLE DBA学习笔记--日志文件(使用LogMiner分析日志)
- 【VC++ 中使用ADO操作数据库学习笔记】_ConnectionPtr指针的基本用法
- 实例学习SSIS(四)--使用日志记录和错误流重定向
- [Linux学习笔记]第1天:操作系统的发展史,linux诞生,Linux发行商,shell,terminal,shell使用技巧,文件目录,基本操作
- SilverLight学习笔记--关于使用IValueConvert对绑定数据的格式化操作
- 实例学习SSIS(四)-- 使用日志记录和错误流重定向
- [翻译] 使用ASP.NET MVC操作过滤器记录日志
- cocos2d-x学习笔记17:记录存储2:SQLite基本使用 推荐