spark streaming 应用程序 监控 邮件提醒
2018-03-05 15:15
555 查看
spark streaming应用程序,放到线上后,怎么监测spark streaming程序的阻塞状态,
虽然spark 提供了spark webUI去查看,但是作为开发人员总不能天天去看spark webUI页面吧,
去官网看,貌似可以通过请求spark 自带的jetty 服务器可以获取相关检测统计信息,
返回的数据是要html页面,可以通过正则去解析需要的信息;但是这样子很不方便,能不能在spark streaming 发生阻塞的时候给我发送邮件提醒甚至是钉钉提醒呢?
这个方法比较笨,有没有更好的方法呢?跟踪spark StreamingContext源码:
spark streaming StreamingJobProgressListener 监控器
StreamingJobProgressListener实现:
然后我自己实现了一个Listener类,当发生阻塞的时候,可以发送邮件,以下实现比较简单
通过可插拔的方式添加自己实现的listener
//spark streaming程序测试的例子:
阻塞到什么程序发送邮件,可以自己决定,可以发送邮件,也可以发送钉钉等,比较方便。
以上实现比较简单,有牛逼的大神,补充后,可以私信我
虽然spark 提供了spark webUI去查看,但是作为开发人员总不能天天去看spark webUI页面吧,
去官网看,貌似可以通过请求spark 自带的jetty 服务器可以获取相关检测统计信息,
http://host:8088/proxy/application_1517299288666_7058/streaming/
返回的数据是要html页面,可以通过正则去解析需要的信息;但是这样子很不方便,能不能在spark streaming 发生阻塞的时候给我发送邮件提醒甚至是钉钉提醒呢?
这个方法比较笨,有没有更好的方法呢?跟踪spark StreamingContext源码:
spark streaming StreamingJobProgressListener 监控器
private val nextInputStreamId = new AtomicInteger(0) private[streaming] var checkpointDir: String = { if (isCheckpointPresent) { sc.setCheckpointDir(cp_.checkpointDir) cp_.checkpointDir } else { null } } private[streaming] val checkpointDuration: Duration = { if (isCheckpointPresent) cp_.checkpointDuration else graph.batchDuration } private[streaming] val scheduler = new JobScheduler(this) private[streaming] val waiter = new ContextWaiter private[streaming] val progressListener = new StreamingJobProgressListener(this) private[streaming] val uiTab: Option[StreamingTab] = if (conf.getBoolean("spark.ui.enabled", true)) { Some(new StreamingTab(this)) } else { None }
StreamingJobProgressListener实现:
private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) extends StreamingListener with SparkListener { private val waitingBatchUIData = new HashMap[Time, BatchUIData] private val runningBatchUIData = new HashMap[Time, BatchUIData] private val completedBatchUIData = new Queue[BatchUIData] private val batchUIDataLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000) private var totalCompletedBatches = 0L private var totalReceivedRecords = 0L private var totalProcessedRecords = 0L private val receiverInfos = new HashMap[Int, ReceiverInfo] ......
然后我自己实现了一个Listener类,当发生阻塞的时候,可以发送邮件,以下实现比较简单
import org.apache.spark.streaming.scheduler._ import streaming.test.email.EmailSender import org.slf4j._ class BJJListener(private val appName:String, private val duration: Int) extends StreamingListener{ private val logger = LoggerFactory.getLogger("BJJListener") override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = { super.onReceiverStarted(receiverStarted) } override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = super.onReceiverError(receiverError) override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = super.onReceiverStopped(receiverStopped) override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = { super.onBatchSubmitted(batchSubmitted) val batchInfo = batchSubmitted.batchInfo val batchTime = batchInfo.batchTime logger.info("BJJListener batchTime : ", batchTime) } override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = { val batchInfo = batchStarted.batchInfo val processingStartTime = batchInfo.processingStartTime logger.info("BJJListener processingStartTime : ", processingStartTime) } override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { val batchInfo = batchCompleted.batchInfo val processingStartTime = batchCompleted.batchInfo.processingStartTime val processingEndTime = batchInfo.processingEndTime val processingDelay = batchInfo.processingDelay val totalDelay = batchInfo.totalDelay if(totalDelay.get >= 6 * duration * 1000 && totalDelay.get >= 10 * duration * 1000){ val monitorTitle = s"spark streaming $appName 程序阻塞异常警告" val monitorContent = s"BJJListener : processingStartTime -> ${processingStartTime.get}, processingEndTime -> ${processingEndTime.get} , " + s"processingDelay -> ${processingDelay.get} , totalDelay -> ${totalDelay.get}, 请及时检查!" EmailSender.sendMail(monitorTitle, monitorContent) } logger.info("BJJListener processingEndTime : ", processingEndTime) logger.info("BJJListener processingDelay : ", processingDelay) logger.info("BJJListener totalDelay : ", totalDelay) } override def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = super.onOutputOperationStarted(outputOperationStarted) override def onOutputOperationCompleted(outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = super.onOutputOperationCompleted(outputOperationCompleted) }
通过可插拔的方式添加自己实现的listener
ssc.addStreamingListener(new BJJListener(appName, 10))
//spark streaming程序测试的例子:
import kafka.serializer.StringDecoder import org.ansj.splitWord.analysis.NlpAnalysis import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Created by Amy on 2018/2/2. */ object test { def main(args: Array[String]): Unit = { System.setProperty("hadoop.home.dir", "D:\\mcyarn\\hadoop-common-2.2.0-bin-master") Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) val appName = "spark Steaming test" val conf = new SparkConf().setMaster("local[2]").setAppName("test") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(10)) val brokerList = "localhost:9092" val zookeeperConnect = "localhost:2181" val groupId = "baasdf20180302" val newsTopic = "test" val kafkaParams = Map("metadata.broker.list" -> brokerList, "group.id" -> groupId, "zookeeper.connect"->zookeeperConnect, "auto.offset.reset" -> kafka.api.OffsetRequest.LargestTimeString) val kafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder, StringDecoder](ssc, kafkaParams, topics = Set(newsTopic)).map(_._1) kafkaStream.foreachRDD(rdd=>{ if(!rdd.isEmpty()){ val rdds = rdd.union(rdd).union(rdd).union(rdd) val transform = rdds.map(news=>{ if(news!=null){ val split = NlpAnalysis.parse(news).toStringWithOutNature(" ") split }else{ null } }) val wordCount = transform.map(word=>(word, 1)).reduceByKey(_+_) wordCount.foreach(println) println(rdd.count()) } }) ssc.addStreamingListener(new BJJListener(appName, 10)) ssc.start() ssc.awaitTermination() } }
阻塞到什么程序发送邮件,可以自己决定,可以发送邮件,也可以发送钉钉等,比较方便。
EmailSendInfo 类
import java.util.Properties; public class EmailSendInfo { // 发送邮件的服务器的IP和端口 private String mailServerHost; private String mailServerPort = "25"; // 邮件发送者的地址 private String fromAddress; // 邮件接收者的地址 private String toAddress; // 登陆邮件发送服务器的用户名和密码 private String userName; private String password; // 是否需要身份验证 private boolean validate = false; // 邮件主题 private String subject; // 邮件的文本内容 private String content; // 邮件附件的文件名 private String[] attachFileNames; /** * 获得邮件会话属性 */ public Properties getProperties() { Properties p = new Properties(); p.put("mail.smtp.host", this.mailServerHost); p.put("mail.smtp.port", this.mailServerPort); p.put("mail.smtp.auth", validate ? "true" : "false"); return p; } public String getMailServerHost() { return mailServerHost; } /** * * @param mailServerHost */ public void setMailServerHost(String mailServerHost) { this.mailServerHost = mailServerHost; } public String getMailServerPort() { return mailServerPort; } /** * * @param mailServerPort */ public void setMailServerPort(String mailServerPort) { this.mailServerPort = mailServerPort; } public boolean isValidate() { return validate; } /** * * @param validate */ public void setValidate(boolean validate) { this.validate = validate; } public String[] getAttachFileNames() { return attachFileNames; } /** * * @param fileNames */ public void setAttachFileNames(String[] fileNames) { this.attachFileNames = fileNames; } public String getFromAddress() { return fromAddress; } /** * * @param fromAddress */ public void setFromAddress(String fromAddress) { this.fromAddress = fromAddress; } public String getPassword() { return password; } /** * * @param password */ public void setPassword(String password) { this.password = password; } public String getToAddress() { return toAddress; } /** * * @param toAddress */ public void setToAddress(String toAddress) { this.toAddress = toAddress; } public String getUserName() { return userName; } /** * * @param userName */ public void setUserName(String userName) { this.userName = userName; } public String getSubject() { return subject; } /** * * @param subject */ public void setSubject(String subject) { this.subject = subject; } public String getContent() { return content; } /** * * @param textContent */ public void setContent(String textContent) { this.content = textContent; } }
EmailAuthenticator 类
import javax.mail.*; public class EmailAuthenticator extends Authenticator{ private String userName; private String password; public EmailAuthenticator(){} public EmailAuthenticator(String username, String password) { this.userName = username; this.password = password; } protected PasswordAuthentication getPasswordAuthentication(){ return new PasswordAuthentication(userName, password); } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } }
EmailSender
import javax.mail.*; import javax.mail.internet.InternetAddress; import javax.mail.internet.MimeMessage; import java.util.Date; import java.util.Properties; public class EmailSender { private static boolean sendTextMail(EmailSendInfo mailInfo) { boolean sendStatus = false;//发送状态 // 判断是否需要身份认证 EmailAuthenticator authenticator = null; Properties pro = mailInfo.getProperties(); if (mailInfo.isValidate()) { // 如果需要身份认证,则创建一个密码验证器 authenticator = new EmailAuthenticator(mailInfo.getUserName(), mailInfo.getPassword()); } // 根据邮件会话属性和密码验证器构造一个发送邮件的session Session sendMailSession = Session.getInstance(pro, authenticator); //【调试时使用】开启Session的debug模式 sendMailSession.setDebug(true); try { // 根据session创建一个邮件消息 MimeMessage mailMessage = new MimeMessage(sendMailSession); // 创建邮件发送者地址 Address from = new InternetAddress(mailInfo.getFromAddress()); // 设置邮件消息的发送者 mailMessage.setFrom(from); // 创建邮件的接收者地址,并设置到邮件消息中 Address to = new InternetAddress(mailInfo.getToAddress()); mailMessage.setRecipient(Message.RecipientType.TO,to); // 设置邮件消息的主题 mailMessage.setSubject(mailInfo.getSubject(), "UTF-8"); // 设置邮件消息发送的时间 mailMessage.setSentDate(new Date()); // 设置邮件消息的主要内容 String mailContent = mailInfo.getContent(); mailMessage.setText(mailContent, "UTF-8"); // 发送邮件 Transport.send(mailMessage); sendStatus = true; } catch (MessagingException ex) { ex.printStackTrace(); } return sendStatus; } public static void sendMail(String monitorTitle, String monitorContent){ String fromaddr = "xxxx@yqzbw.com"; String toaddr = "xxxx@yqzbw.com"; String port = "25"; String host = "smtp.exmail.qq.com"; String userName = "xxxxxg@yqzbw.com"; String password = "12345678"; EmailSendInfo mailInfo = new EmailSendInfo(); mailInfo.setMailServerHost(host); mailInfo.setValidate(true); mailInfo.setUserName(userName); mailInfo.setPassword(password); mailInfo.setFromAddress(fromaddr); mailInfo.setToAddress(toaddr); mailInfo.setSubject(monitorTitle); mailInfo.setContent(monitorContent); //发送文体格式邮件 EmailSender.sendTextMail(mailInfo); } }
以上实现比较简单,有牛逼的大神,补充后,可以私信我
相关文章推荐
- spark streaming 应用程序监控
- 使用Spark Streaming + Elasticsearch搭建高可用、可扩展的App异常监控平台
- java程序监控tomcat实现项目宕机自动重启并发送邮件提醒
- 第30课:集群运行模式下的Spark Streaming日志和Web监控台实战演示彻底解密
- Flume监控的数据Push推送给SparkStreaming(Scala版本)
- Spark-Streaming 程序监控
- spark streaming监控HDFS文件目录
- Spark定制班第30课:集群运行模式下的Spark Streaming日志和Web监控台实战演示彻底解密
- 监控硬盘并发送提醒邮件的shell
- Spark Streaming场景应用- Spark Streaming计算模型及监控
- xshell 脚本网站域名监控-邮件提醒
- Linux服务器硬件运行状态及故障邮件提醒的监控脚本分享
- 监控图书馆书籍状态并发送邮件提醒
- 监控系统资源加邮件提醒
- Spark Streaming 监控HDFS目录
- 新的可视化帮助更好地了解Spark Streaming应用程序
- Spark 实践 - Spark Streaming 应用程序中的错误:ORA-01000: maximum open cursors exceeded
- Spark Streaming监控HDFS输入流
- Linx监控分享--磁盘空间监控+邮件提醒
- Spark Streaming 监控通过 nc -lk 9999 命令发送的word计数