您的位置:首页 > 运维架构

spark streaming 应用程序 监控 邮件提醒

2018-03-05 15:15 555 查看
spark streaming应用程序,放到线上后,怎么监测spark streaming程序的阻塞状态,

虽然spark 提供了spark webUI去查看,但是作为开发人员总不能天天去看spark webUI页面吧,

去官网看,貌似可以通过请求spark 自带的jetty 服务器可以获取相关检测统计信息,

http://host:8088/proxy/application_1517299288666_7058/streaming/


返回的数据是要html页面,可以通过正则去解析需要的信息;但是这样子很不方便,能不能在spark streaming 发生阻塞的时候给我发送邮件提醒甚至是钉钉提醒呢?

这个方法比较笨,有没有更好的方法呢?跟踪spark StreamingContext源码:

spark streaming StreamingJobProgressListener 监控器

private val nextInputStreamId = new AtomicInteger(0)

private[streaming] var checkpointDir: String = {
if (isCheckpointPresent) {
sc.setCheckpointDir(cp_.checkpointDir)
cp_.checkpointDir
} else {
null
}
}

private[streaming] val checkpointDuration: Duration = {
if (isCheckpointPresent) cp_.checkpointDuration else graph.batchDuration
}

private[streaming] val scheduler = new JobScheduler(this)

private[streaming] val waiter = new ContextWaiter

private[streaming] val progressListener = new StreamingJobProgressListener(this)

private[streaming] val uiTab: Option[StreamingTab] =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(new StreamingTab(this))
} else {
None
}


StreamingJobProgressListener实现:

private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
extends StreamingListener with SparkListener {

private val waitingBatchUIData = new HashMap[Time, BatchUIData]
private val runningBatchUIData = new HashMap[Time, BatchUIData]
private val completedBatchUIData = new Queue[BatchUIData]
private val batchUIDataLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)
private var totalCompletedBatches = 0L
private var totalReceivedRecords = 0L
private var totalProcessedRecords = 0L
private val receiverInfos = new HashMap[Int, ReceiverInfo]
......


然后我自己实现了一个Listener类,当发生阻塞的时候,可以发送邮件,以下实现比较简单

import org.apache.spark.streaming.scheduler._
import streaming.test.email.EmailSender
import org.slf4j._

class BJJListener(private val appName:String, private val duration: Int) extends StreamingListener{

private val logger = LoggerFactory.getLogger("BJJListener")

override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {
super.onReceiverStarted(receiverStarted)
}

override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = super.onReceiverError(receiverError)

override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = super.onReceiverStopped(receiverStopped)

override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
super.onBatchSubmitted(batchSubmitted)
val batchInfo = batchSubmitted.batchInfo
val batchTime = batchInfo.batchTime
logger.info("BJJListener  batchTime : ", batchTime)
}

override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
val batchInfo = batchStarted.batchInfo
val processingStartTime = batchInfo.processingStartTime
logger.info("BJJListener  processingStartTime : ", processingStartTime)
}

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
val batchInfo = batchCompleted.batchInfo
val processingStartTime = batchCompleted.batchInfo.processingStartTime

val processingEndTime = batchInfo.processingEndTime
val processingDelay = batchInfo.processingDelay
val totalDelay = batchInfo.totalDelay

if(totalDelay.get >= 6 * duration * 1000 && totalDelay.get >= 10 * duration * 1000){
val monitorTitle = s"spark streaming $appName 程序阻塞异常警告"
val monitorContent = s"BJJListener : processingStartTime -> ${processingStartTime.get}, processingEndTime -> ${processingEndTime.get} , " +
s"processingDelay -> ${processingDelay.get} , totalDelay -> ${totalDelay.get}, 请及时检查!"
EmailSender.sendMail(monitorTitle, monitorContent)
}
logger.info("BJJListener  processingEndTime : ", processingEndTime)
logger.info("BJJListener  processingDelay : ", processingDelay)
logger.info("BJJListener  totalDelay : ", totalDelay)
}

override def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted): Unit =
super.onOutputOperationStarted(outputOperationStarted)

override def onOutputOperationCompleted(outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit =
super.onOutputOperationCompleted(outputOperationCompleted)

}


通过可插拔的方式添加自己实现的listener

ssc.addStreamingListener(new BJJListener(appName, 10))


//spark streaming程序测试的例子:

import kafka.serializer.StringDecoder
import org.ansj.splitWord.analysis.NlpAnalysis
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* Created by Amy on 2018/2/2.
*/
object test {
def main(args: Array[String]): Unit = {

System.setProperty("hadoop.home.dir", "D:\\mcyarn\\hadoop-common-2.2.0-bin-master")
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
val appName = "spark Steaming test"
val conf = new SparkConf().setMaster("local[2]").setAppName("test")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))

val brokerList = "localhost:9092"
val zookeeperConnect = "localhost:2181"
val groupId = "baasdf20180302"
val newsTopic = "test"

val kafkaParams = Map("metadata.broker.list" -> brokerList, "group.id" -> groupId,
"zookeeper.connect"->zookeeperConnect,
"auto.offset.reset" -> kafka.api.OffsetRequest.LargestTimeString)

val kafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder, StringDecoder](ssc, kafkaParams,
topics = Set(newsTopic)).map(_._1)

kafkaStream.foreachRDD(rdd=>{
if(!rdd.isEmpty()){

val rdds = rdd.union(rdd).union(rdd).union(rdd)
val transform = rdds.map(news=>{
if(news!=null){
val split = NlpAnalysis.parse(news).toStringWithOutNature(" ")
split
}else{
null
}
})

val wordCount = transform.map(word=>(word, 1)).reduceByKey(_+_)
wordCount.foreach(println)
println(rdd.count())
}
})

ssc.addStreamingListener(new BJJListener(appName, 10))

ssc.start()
ssc.awaitTermination()

}
}


阻塞到什么程序发送邮件,可以自己决定,可以发送邮件,也可以发送钉钉等,比较方便。

EmailSendInfo 类

import java.util.Properties;

public class EmailSendInfo {

// 发送邮件的服务器的IP和端口
private String mailServerHost;
private String mailServerPort = "25";
// 邮件发送者的地址
private String fromAddress;
// 邮件接收者的地址
private String toAddress;
// 登陆邮件发送服务器的用户名和密码
private String userName;
private String password;
// 是否需要身份验证
private boolean validate = false;
// 邮件主题
private String subject;
// 邮件的文本内容
private String content;
// 邮件附件的文件名
private String[] attachFileNames;

/**
* 获得邮件会话属性
*/
public Properties getProperties() {
Properties p = new Properties();
p.put("mail.smtp.host", this.mailServerHost);
p.put("mail.smtp.port", this.mailServerPort);
p.put("mail.smtp.auth", validate ? "true" : "false");
return p;
}

public String getMailServerHost() {
return mailServerHost;
}

/**
*
* @param mailServerHost
*/
public void setMailServerHost(String mailServerHost) {
this.mailServerHost = mailServerHost;
}

public String getMailServerPort() {
return mailServerPort;
}

/**
*
* @param mailServerPort
*/
public void setMailServerPort(String mailServerPort) {
this.mailServerPort = mailServerPort;
}

public boolean isValidate() {
return validate;
}

/**
*
* @param validate
*/
public void setValidate(boolean validate) {
this.validate = validate;
}

public String[] getAttachFileNames() {
return attachFileNames;
}

/**
*
* @param fileNames
*/
public void setAttachFileNames(String[] fileNames) {
this.attachFileNames = fileNames;
}

public String getFromAddress() {
return fromAddress;
}

/**
*
* @param fromAddress
*/
public void setFromAddress(String fromAddress) {
this.fromAddress = fromAddress;
}

public String getPassword() {
return password;
}

/**
*
* @param password
*/
public void setPassword(String password) {
this.password = password;
}

public String getToAddress() {
return toAddress;
}

/**
*
* @param toAddress
*/
public void setToAddress(String toAddress) {
this.toAddress = toAddress;
}

public String getUserName() {
return userName;
}

/**
*
* @param userName
*/
public void setUserName(String userName) {
this.userName = userName;
}

public String getSubject() {
return subject;
}

/**
*
* @param subject
*/
public void setSubject(String subject) {
this.subject = subject;
}

public String getContent() {
return content;
}

/**
*
* @param textContent
*/
public void setContent(String textContent) {
this.content = textContent;
}
}


EmailAuthenticator 类

import javax.mail.*;

public class EmailAuthenticator extends Authenticator{

private String userName;
private String password;

public EmailAuthenticator(){}

public EmailAuthenticator(String username, String password) {
this.userName = username;
this.password = password;
}

protected PasswordAuthentication getPasswordAuthentication(){
return new PasswordAuthentication(userName, password);
}

public String getUserName() {
return userName;
}

public void setUserName(String userName) {
this.userName = userName;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

}


EmailSender

import javax.mail.*;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
import java.util.Date;
import java.util.Properties;

public class EmailSender {

private static boolean sendTextMail(EmailSendInfo mailInfo) {
boolean sendStatus = false;//发送状态
// 判断是否需要身份认证
EmailAuthenticator authenticator = null;
Properties pro = mailInfo.getProperties();
if (mailInfo.isValidate()) {
// 如果需要身份认证,则创建一个密码验证器
authenticator = new EmailAuthenticator(mailInfo.getUserName(), mailInfo.getPassword());
}
// 根据邮件会话属性和密码验证器构造一个发送邮件的session
Session sendMailSession = Session.getInstance(pro, authenticator);
//【调试时使用】开启Session的debug模式
sendMailSession.setDebug(true);
try {
// 根据session创建一个邮件消息
MimeMessage mailMessage = new MimeMessage(sendMailSession);
// 创建邮件发送者地址
Address from = new InternetAddress(mailInfo.getFromAddress());
// 设置邮件消息的发送者
mailMessage.setFrom(from);
// 创建邮件的接收者地址,并设置到邮件消息中
Address to = new InternetAddress(mailInfo.getToAddress());
mailMessage.setRecipient(Message.RecipientType.TO,to);
// 设置邮件消息的主题
mailMessage.setSubject(mailInfo.getSubject(), "UTF-8");
// 设置邮件消息发送的时间
mailMessage.setSentDate(new Date());
// 设置邮件消息的主要内容
String mailContent = mailInfo.getContent();
mailMessage.setText(mailContent, "UTF-8");

// 发送邮件
Transport.send(mailMessage);
sendStatus = true;
} catch (MessagingException ex) {
ex.printStackTrace();
}
return sendStatus;
}

public static void sendMail(String monitorTitle, String monitorContent){
String fromaddr = "xxxx@yqzbw.com";
String toaddr = "xxxx@yqzbw.com";
String port = "25";
String host = "smtp.exmail.qq.com";
String userName = "xxxxxg@yqzbw.com";
String password = "12345678";

EmailSendInfo mailInfo = new EmailSendInfo();
mailInfo.setMailServerHost(host);
mailInfo.setValidate(true);
mailInfo.setUserName(userName);
mailInfo.setPassword(password);
mailInfo.setFromAddress(fromaddr);
mailInfo.setToAddress(toaddr);
mailInfo.setSubject(monitorTitle);
mailInfo.setContent(monitorContent);
//发送文体格式邮件
EmailSender.sendTextMail(mailInfo);
}

}


以上实现比较简单,有牛逼的大神,补充后,可以私信我
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: