Presto Event Listener开发
2019-07-30 22:12
417 查看
简介
同Hive Hook一样,Presto也支持自定义实现Event Listener,用于侦听Presto引擎执行查询时发生的事件,并作出相应的处理。我们可以利用该功能实现诸如自定义日志记录、调试和性能分析插件,帮助我们更好的运维Presto集群。但是不同于Hive Hook的是,在Presto集群中,一次只能有一个Event Listener处于活动状态。
Event Listener作为Plugin监听以下事件:
- Query Creation(查询建立相关信息)
- Query completion (success or failure)(查询执行相关信息,包含成功查询的细节信息,失败查询的错误码等信息)
- Split completion (success or failure)(split执行信息,同理包含成功和失败的细节信息)
了解Hook及Listener模式的朋友对于其步骤应该很清楚了,我们只需要:
- 实现Presto Event Listener和EventListenerFactory接口。
- 正确的打包我们的jar。
- 部署,放到Presto指定目录,修改配置文件。
接口
- 实现EventListener,该类是我们的核心逻辑所在,供包含上面所说的三个事件:
public interface EventListener { //query创建的详细信息 default void queryCreated(QueryCreatedEvent queryCreatedEvent) { } //query执行的详细信息 default void queryCompleted(QueryCompletedEvent queryCompletedEvent) { } //split执行的详细信息 default void splitCompleted(SplitCompletedEvent splitCompletedEvent) { } }
- 实现EventListenerFactory创建我们自己实现的EventListener
- 实现Plugin接口,实现getEventListenerFactories()方法,获取我们自己实现的EventListenerFactory
- 添加配置信息,为etc/event-listener.properties。其中event-listener.name为必备属性,其他属性为我们plugin所需要的信息。
示例
由于集群运维的需要,先需要将用户的查询历史、查询花费的时间等信息进行统计,以便于后续对各个业务的查询进行优先级分级和评分,方便后续Presto集群稳定性易用性的维护。这里给出一个简单的将这些信息存储到Mysql数据库的样例。
Maven Pom
<dependency> <groupId>com.facebook.presto</groupId> <artifactId>presto-spi</artifactId> <version>0.220</version> <scope>compile</scope> </dependency>
QueryEventListenerFactory
public class QueryEventListenerFactory implements EventListenerFactory { @Override public String getName() { return "query-event-listener"; } @Override public EventListener create(Map<String, String> config) { if (!config.containsKey("jdbc.uri")) { throw new RuntimeException("/etc/event-listener.properties file missing jdbc.uri"); } if (!config.containsKey("jdbc.user")) { throw new RuntimeException("/etc/event-listener.properties file missing jdbc.user"); } if (!config.containsKey("jdbc.pwd")) { throw new RuntimeException("/etc/event-listener.properties file missing jdbc.pwd"); } return new QueryEventListener(config); } }
QueryEventPlugin
public class QueryEventPlugin implements Plugin { @Override public Iterable<EventListenerFactory> getEventListenerFactories() { EventListenerFactory listenerFactory = new QueryEventListenerFactory(); return Arrays.asList(listenerFactory); } }
QueryEventListener
public class QueryEventListener implements EventListener { private Map<String, String> config; private Connection connection; public QueryEventListener(Map<String, String> config) { this.config = new HashMap<>(); this.config.putAll(config); init(); } private void init() { try { if (connection == null || !connection.isValid(10)) { Class.forName("com.mysql.jdbc.Driver"); connection = DriverManager .getConnection(config.get("jdbc.uri"), config.get("jdbc.user"), config.get("jdbc.pwd")); } } catch (SQLException | ClassNotFoundException e) { e.printStackTrace(); } } @Override public void queryCreated(QueryCreatedEvent queryCreatedEvent) { } @Override public void queryCompleted(QueryCompletedEvent queryCompletedEvent) { String queryId = queryCompletedEvent.getMetadata().getQueryId(); String querySql = queryCompletedEvent.getMetadata().getQuery(); String queryState = queryCompletedEvent.getMetadata().getQueryState(); String queryUser = queryCompletedEvent.getContext().getUser(); long createTime = queryCompletedEvent.getCreateTime().toEpochMilli(); long endTime = queryCompletedEvent.getEndTime().toEpochMilli(); long startTime = queryCompletedEvent.getExecutionStartTime().toEpochMilli(); //insert into query execution table long analysisTime = queryCompletedEvent.getStatistics().getAnalysisTime().orElse(Duration.ZERO) .toMillis(); long cpuTime = queryCompletedEvent.getStatistics().getCpuTime().toMillis(); long queuedTime = queryCompletedEvent.getStatistics().getQueuedTime().toMillis(); long wallTime = queryCompletedEvent.getStatistics().getWallTime().toMillis(); int completedSplits = queryCompletedEvent.getStatistics().getCompletedSplits(); double cumulativeMemory = queryCompletedEvent.getStatistics().getCumulativeMemory(); long outputBytes = queryCompletedEvent.getStatistics().getOutputBytes(); long outputRows = queryCompletedEvent.getStatistics().getOutputRows(); long totalBytes = queryCompletedEvent.getStatistics().getTotalBytes(); long totalRows = queryCompletedEvent.getStatistics().getTotalRows(); long writtenBytes = queryCompletedEvent.getStatistics().getWrittenBytes(); long writtenRows = queryCompletedEvent.getStatistics().getWrittenRows(); //insert into query info table queryCompletedEvent.getFailureInfo().ifPresent(queryFailureInfo -> { int code = queryFailureInfo.getErrorCode().getCode(); String name = queryFailureInfo.getErrorCode().getName(); String failureType = queryFailureInfo.getFailureType().orElse("").toUpperCase(); String failureHost = queryFailureInfo.getFailureHost().orElse("").toUpperCase(); String failureMessage = queryFailureInfo.getFailureMessage().orElse("").toUpperCase(); String failureTask = queryFailureInfo.getFailureTask().orElse("").toUpperCase(); String failuresJson = queryFailureInfo.getFailuresJson(); // insert into failed query table }); } @Override public void splitCompleted(SplitCompletedEvent splitCompletedEvent) { long createTime = splitCompletedEvent.getCreateTime().toEpochMilli(); long endTime = splitCompletedEvent.getEndTime().orElse(Instant.MIN).toEpochMilli(); String payload = splitCompletedEvent.getPayload(); String queryId = splitCompletedEvent.getQueryId(); String stageId = splitCompletedEvent.getStageId(); long startTime = splitCompletedEvent.getStartTime().orElse(Instant.MIN).toEpochMilli(); String taskId = splitCompletedEvent.getTaskId(); long completedDataSizeBytes = splitCompletedEvent.getStatistics().getCompletedDataSizeBytes(); long completedPositions = splitCompletedEvent.getStatistics().getCompletedPositions(); long completedReadTime = splitCompletedEvent.getStatistics().getCompletedReadTime().toMillis(); long cpuTime = splitCompletedEvent.getStatistics().getCpuTime().toMillis(); long queuedTime = splitCompletedEvent.getStatistics().getQueuedTime().toMillis(); long wallTime = splitCompletedEvent.getStatistics().getWallTime().toMillis(); //insert into stage info table } }
打包
- Presto使用服务提供者接口(SPI)来扩展Presto。Presto使用SPI加载连接器,功能,类型和系统访问控制。SPI通过元数据文件加载。我们还需要创建
src/main/resources/META-INF/services/com.facebook.presto.spi.Plugin
元数据文件。该文件应包含我们插件的类名如:com.ji3jin.presto.listener.QueryEventListener
- 执行
mvn clean install
打包
部署
- 创建配置文件etc/event-listener.properties
event-listener.name=query-event-listener jdbc.uri=jdbc:mysql://localhost:3306/presto_monitor jdbc.user=presto jdbc.pwd=presto123
- 在presto根目录下创建
query-event-listener
目录,名称与我们上面event listener的name一致 - 将我们的jar包和mysql connector的jar包拷贝到上面创建的目录
- 重新启动Presto服务即可
好了,现在你可以执行查询,然后就可以在Mysql中看到你的查询历史和相关时间的统计信息了。如果你目前的工作对此也有需要,还等什么,快动手实现一个吧。
欢迎关注我的公众号:叁金大数据
相关文章推荐
- presto 插件开发
- 如何开发 Presto 自定义函数(UDF)
- 最近在研究一个c#控件的开发,需求是要在SplitContainer控件中分割条部分作为容器放入各类控件。开发过程中收集的资料罗列如下
- Android 2.2 开发环境配置参考资料
- 敏捷软件开发模型--SCRUM
- 我开发了一套软件,要另外安装一套驱动程序,我要如何在installShield中一起安装
- WinForm"不错的Vista风格水晶按钮"控件使用(附:源码demo) 之配餐系统的开发
- Silverlight 5 3d游戏开发(2) 用3D绘制2D图形
- 在Windows中安装iPhone开发环境
- Android开发指南-二维图形
- 使用 C# 开发智能手机软件:推箱子(四)
- JMatrix开发手记 1
- Struts使用Tiles辅助开发
- 基于Asterisk的VoIP开发指南——(2)Asterisk AGI程序编写指南
- 敏捷开发般若敏捷系列之二:什么是敏捷(上)(无住,不住于法,破法执)
- 转载 Maven2插件开发详解
- 开发管理 CheckLists(17) -敏捷开发 SCRUM评估会议
- 《IOS开发中常用的开发技巧》
- Ruby开发集成环境
- 指尖下的js ——多触式web前端开发之一:对于Touch的处理