所有文章 > API产品 > 大数据运维实战:Presto如何自定义日志插件实现日志采集存储?

大数据运维实战:Presto如何自定义日志插件实现日志采集存储?

1.背景 

Presto的Web界面提供了方便查看运行任务状态信息的功能,然而这些信息存储在Presto的内存中。因此,当Presto发生重启时,内存数据会被清空。此外,一旦存储的任务数超过100条,我们也无法查看历史任务的状态信息。为了解决这个问题,我们需要将任务的状态信息存储起来。通常有以下两种常见的方法:

  1. 使用Presto Client REST API,可以方便地获取运行任务的状态,并进行自定义的存储。
  2. 通过实现事件监听器自定义插件,来监听任务的执行状态,然后进行自定义的存储。

本文主要介绍自定义插件实现的方式。

2.Presto Event Listener简介

Event Listener是Presto提供的一种扩展点,允许开发者捕获并响应Presto在执行查询过程中的关键事件。这些事件包括查询开始、查询完成、计划阶段开始和完成、以及各种统计信息的收集。通过实现自定义的Event Listener,用户可以记录详细的查询执行信息、进行性能分析、触发外部系统动作(如通知、日志记录或数据处理),甚至进行实时监控和警报。

2.1 自定义Event Listener实现步骤

 实现自定义Event Listener涉及到几个关键步骤:

  1. 定义EventListener类:首先,需要创建一个实现com.facebook.presto.spi.eventlistener.EventListener接口的类。在这个类中,你将重写一系列方法,如queryCreated、queryCompleted
    、splitCompleted等,以响应不同的事件。
  2. 创建EventListenerFactory类:为了使Presto能够识别并实例化你的EventListener,还需要实现com.facebook.presto.spi.eventlistener.EventListenerFactory接口。在这个工厂类中,实现getName方法来命名你的监听器,并实现create方法来根据Presto配置创建EventListener实例。
  3. 配置Presto:最后一步是在Presto的配置文件中注册你的EventListener。这通常在Presto的config.properties文件中完成,通过添加event.listener.names属性并列出你的监听器名称。 

2.2 EventListener 接口说明

public interface EventListener
{
//该方法用于处理查询创建事件,它接受一个 QueryCreatedEvent 类型的参数,该参数包含了查询创建时的详细信息。
default void queryCreated(QueryCreatedEvent queryCreatedEvent)
{
}
//该方法用于处理查询完成事件,它接受一个 QueryCompletedEvent 类型的参数,该参数包含了查询完成时的详细信息。
default void queryCompleted(QueryCompletedEvent queryCompletedEvent)
{
}
//该方法用于处理分割完成事件,它接受一个 SplitCompletedEvent 类型的参数,该参数包含了分割完成时的详细信息。
default void splitCompleted(SplitCompletedEvent splitCompletedEvent)
{
}
}

3.实践案例

下面通过一个简单的例子来说明如何实现自定义Event Listener。假设我们的目标是创建一个监听器,用于记录查询任务的一些状态信息,并将这些信息发送kafka 中。

1.自定义一个工厂类QueryEventListenerFactory,实现了EventListenerFactory接口。

package com.ds;

import com.facebook.presto.spi.eventlistener.EventListener;
import com.facebook.presto.spi.eventlistener.EventListenerFactory;

import java.util.Map;

public class QueryEventListenerFactory implements EventListenerFactory {
@Override
public String getName() {
return "query-event-listener";
}

@Override
///直接使用Presto传递过来的Map<String, String>配置,
public EventListener create(Map<String, String> config) {
if (!config.containsKey("kafka.bootstrap.servers")) {
throw new RuntimeException("event-listener.properties file missing kafka.bootstrap.servers");
}
// 检查Kafka主题配置
if (!config.containsKey("kafka.topic")) {
throw new RuntimeException("event-listener.properties file missing kafka.topic");
}

return new QueryEventListener(config);
}
}

代码简介:

1.QueryEventListenerFactory类实现了EventListenerFactory接口,这个接口是用来创建事件监听器的工厂。
2.getName方法返回一个字符串"query-event-listener",表示这个工厂的名称。
3.create方法接收一个Map<String, String>类型的配置参数,用于创建事件监听器。在方法内部,首先检查配置中是否包含"kafka.bootstrap.servers"和"kafka.topic"这两个关键配置,如果缺少其中之一,就会抛出运行时异常。
4.如果配置完整,就会创建一个新的QueryEventListener对象,将配置参数传递给它,并返回该对象作为事件监听器。
5.说明:Presto在启动过程中会读取/etc 目录下以.properties结尾的配置文件,并将配置项以Map<String, String>的形式传递给EventListenerFactory的create方法。因此,QueryEventListenerFactory可以从这个Map中读取所需的配置信息,如果没有找到必要的配置项,会抛出异常。

2.自定义类 QueryEventPlugin 实现Plugin接口。

package com.ds;

import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.eventlistener.EventListenerFactory;
import java.util.Arrays;

public class QueryEventPlugin implements Plugin {

@Override
public Iterable<EventListenerFactory> getEventListenerFactories() {
EventListenerFactory listenerFactory = new QueryEventListenerFactory();
return Arrays.asList(listenerFactory);
}
}

代码简介:

  • QueryEventPlugin 类实现了 Presto 的 Plugin 接口,这样它可以被 Presto 识别为一个插件。
  • 在 QueryEventPlugin 类中,重写了 getEventListenerFactories 方法,该方法返回一个可迭代的 EventListenerFactory 集合。
  • 在getEventListenerFactories方法中,创建了一个 QueryEventListenerFactory 的实例,并将其放入一个 Iterable 集合中返回。

3.自定义一个核心类QueryEventListener

package com.ds;

import com.facebook.presto.spi.eventlistener.EventListener;
import com.facebook.presto.spi.eventlistener.QueryCompletedEvent;
import com.facebook.presto.spi.eventlistener.QueryCreatedEvent;
import com.facebook.presto.spi.eventlistener.SplitCompletedEvent;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class QueryEventListener implements EventListener {
private KafkaProducer<String, String> producer;
private String KafKaTopic;
private Map<String, String> config;

public QueryEventListener(Map<String, String> config) {
this.config = new HashMap<>();
this.config.putAll(config);
init();
}

private void init() {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", config.get("kafka.bootstrap.servers"));
kafkaProps.put("key.serializer", StringSerializer.class);
kafkaProps.put("value.serializer", StringSerializer.class);
this.producer = new KafkaProducer<>(kafkaProps);
this.KafKaTopic = (String) config.get("kafka.topic");
}

@Override
public void queryCreated(QueryCreatedEvent queryCreatedEvent) {
}

@Override
public void queryCompleted(QueryCompletedEvent queryCompletedEvent) {

String queryId = queryCompletedEvent.getMetadata().getQueryId();
String querySql = queryCompletedEvent.getMetadata().getQuery();
String queryState = queryCompletedEvent.getMetadata().getQueryState();
String queryUser = queryCompletedEvent.getContext().getUser();
long createTime = queryCompletedEvent.getCreateTime().toEpochMilli();
long endTime = queryCompletedEvent.getEndTime().toEpochMilli();
long startTime = queryCompletedEvent.getExecutionStartTime().toEpochMilli();
long analysisTime = queryCompletedEvent.getStatistics().getAnalysisTime().orElse(Duration.ZERO)
.toMillis();
long cpuTime = queryCompletedEvent.getStatistics().getCpuTime().toMillis();
long queuedTime = queryCompletedEvent.getStatistics().getQueuedTime().toMillis();
long wallTime = queryCompletedEvent.getStatistics().getWallTime().toMillis();
int completedSplits = queryCompletedEvent.getStatistics().getCompletedSplits();
double cumulativeMemory = queryCompletedEvent.getStatistics().getCumulativeMemory();
long outputBytes = queryCompletedEvent.getStatistics().getOutputBytes();
long outputRows = queryCompletedEvent.getStatistics().getOutputRows();
long totalBytes = queryCompletedEvent.getStatistics().getTotalBytes();
long totalRows = queryCompletedEvent.getStatistics().getTotalRows();
long writtenBytes = queryCompletedEvent.getStatistics().getWrittenOutputBytes();
long writtenRows = queryCompletedEvent.getStatistics().getWrittenOutputRows();
// 构造事件数据
String eventData = "{" +
"\"queryId\": \"" + queryId + "\"," +
"\"querySql\": \"" + querySql + "\"," +
"\"queryState\": \"" + queryState + "\"," +
"\"queryUser\": \"" + queryUser + "\"," +
"\"createTime\": " + createTime + "," +
"\"endTime\": " + endTime + "," +
"\"startTime\": " + startTime + "," +
"\"analysisTime\": " + analysisTime + "," +
"\"cpuTime\": " + cpuTime + "," +
"\"queuedTime\": " + queuedTime + "," +
"\"wallTime\": " + wallTime + "," +
"\"completedSplits\": " + completedSplits + "," +
"\"cumulativeMemory\": " + cumulativeMemory + "," +
"\"outputBytes\": " + outputBytes + "," +
"\"outputRows\": " + outputRows + "," +
"\"totalBytes\": " + totalBytes + "," +
"\"totalRows\": " + totalRows + "," +
"\"writtenBytes\": " + writtenBytes + "," +
"\"writtenRows\": " + writtenRows +
"}";
// 发送到 Kafka 主题
ProducerRecord<String, String> Completedrecord = new ProducerRecord<>(KafKaTopic, eventData);
producer.send(Completedrecord);

queryCompletedEvent.getFailureInfo().ifPresent(queryFailureInfo -> {
int code = queryFailureInfo.getErrorCode().getCode();
String name = queryFailureInfo.getErrorCode().getName();
String failureType = queryFailureInfo.getFailureType().orElse("").toUpperCase();
String failureHost = queryFailureInfo.getFailureHost().orElse("").toUpperCase();
String failureMessage = queryFailureInfo.getFailureMessage().orElse("").toUpperCase();
String failureTask = queryFailureInfo.getFailureTask().orElse("").toUpperCase();
String failuresJson = queryFailureInfo.getFailuresJson();
// 构造 JSON 字符串
String failureData = "{"
+ "\"code\": " + code + ","
+ "\"name\": \"" + name + "\","
+ "\"failureType\": \"" + failureType + "\","
+ "\"failureHost\": \"" + failureHost + "\","
+ "\"failureMessage\": \"" + failureMessage + "\","
+ "\"failureTask\": \"" + failureTask + "\","
+ "\"failuresJson\": \"" + failuresJson + "\""
+ "}";
// 发送到 Kafka 主题
ProducerRecord<String, String> failurerecord = new ProducerRecord<>(KafKaTopic, failureData);
producer.send(failurerecord);
});
}

@Override
public void splitCompleted(SplitCompletedEvent splitCompletedEvent) {
long createTime = splitCompletedEvent.getCreateTime().toEpochMilli();
long endTime = splitCompletedEvent.getEndTime().orElse(Instant.MIN).toEpochMilli();
String payload = splitCompletedEvent.getPayload();
String queryId = splitCompletedEvent.getQueryId();
String stageId = splitCompletedEvent.getStageId();
long startTime = splitCompletedEvent.getStartTime().orElse(Instant.MIN).toEpochMilli();
String taskId = splitCompletedEvent.getTaskId();
long completedDataSizeBytes = splitCompletedEvent.getStatistics().getCompletedDataSizeBytes();
long completedPositions = splitCompletedEvent.getStatistics().getCompletedPositions();
long completedReadTime = splitCompletedEvent.getStatistics().getCompletedReadTime().toMillis();
long cpuTime = splitCompletedEvent.getStatistics().getCpuTime().toMillis();
long queuedTime = splitCompletedEvent.getStatistics().getQueuedTime().toMillis();
long wallTime = splitCompletedEvent.getStatistics().getWallTime().toMillis();
////
// 构造拆分事件数据
String splitEventData = "{ "
+ "\"createTime\": " + createTime + ", "
+ "\"endTime\": " + endTime + ", "
+ "\"payload\": \"" + payload + "\", "
+ "\"queryId\": \"" + queryId + "\", "
+ "\"stageId\": \"" + stageId + "\", "
+ "\"startTime\": " + startTime + ", "
+ "\"taskId\": \"" + taskId + "\", "
+ "\"completedDataSizeBytes\": " + completedDataSizeBytes + ", "
+ "\"completedPositions\": " + completedPositions + ", "
+ "\"completedReadTime\": " + completedReadTime + ", "
+ "\"cpuTime\": " + cpuTime + ", "
+ "\"queuedTime\": " + queuedTime + ", "
+ "\"wallTime\": " + wallTime
+ " }";
// 发送到 Kafka 主题

ProducerRecord<String, String> splitrecord = new ProducerRecord<>(KafKaTopic, splitEventData);
producer.send(splitrecord);

}

public void close() {
if (producer != null) {
producer.close();
}
}
}

代码简介:

在 QueryEventListener 类中,首先声明了一个 KafkaProducer 对象和一些其他实例变量。
构造函数接受一个 config 参数,该参数被用于初始化监听器的配置信息,并且在初始化过程中会建立 Kafka 生产者连接。
init 方法用于初始化 Kafka 生产者对象,配置 Kafka 连接信息,并设置 Kafka 主题。
queryCreated 方法用于处理查询创建事件,但在代码中未给出具体的处理逻辑。
queryCompleted 方法用于处理查询完成事件,根据传入的 queryCompletedEvent 对象,提取了查询完成时的一系列详细信息,并将这些信息构造成 JSON 格式的数据,然后发送到 Kafka 主题中。同时,如果查询失败,也会将相应的失败信息发送到 Kafka 主题中。
splitCompleted 方法用于处理分割完成事件,根据传入的 splitCompletedEvent 对象,提取了分割完成时的一系列详细信息,并将这些信息构造成 JSON 格式的数据,然后发送到 Kafka 主题中。
close 方法用于关闭 Kafka 生产者连接。

4.pom 依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.ds</groupId>
<artifactId>PrestoHook</artifactId>
<version>1.0-SNAPSHOT</version>

<dependencies>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-spi</artifactId>
<version>0.275</version>
<scope>compile</scope>
</dependency>

<!-- Kafka 相关依赖 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version> <!-- 替换为你所使用的 Kafka 版本 -->
</dependency>
</dependencies>
<!-- 打包插件 -->
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>

</project>

5.具体使用步骤

打包说明:

在 Presto 中,服务提供者接口(Service Provider Interface,SPI)用于扩展 Presto 并提供额外的功能,例如加载连接器、功能、类型和系统访问控制等。SPI 通过元数据文件加载,这使得 Presto 能够动态地加载和识别这些扩展点。

为了使用 SPI 扩展 Presto,我们需要创建一个特定的元数据文件并将其放置在 src/main/resources/META-INF/services/ 目录下。这个元数据文件的名称应该与要扩展的接口的完全限定名相匹配。对于插件接口来说,文件的名称应该是 com.facebook.presto.spi.Plugin,文件的内容包含 com.ds.QueryEventPlugin。

在这个元数据文件中,我们需要列出实现了对应接口的类的完全限定名。在这种情况下,就是列出实现了 com.facebook.presto.spi.Plugin 接口的类的完全限定名。这样,当 Presto 加载时,它会检查这个元数据文件,并根据文件中列出的类的名称来加载对应的插件。

6.打成 jar 包文件,部署到 presto 集群,重启 presto 服务生效。

  1. 在 presto 的 home 路径的plugin路径下新建目录query-event-listener
  2. 将 jar 包文件放在上面创建的目录下,注意权限(启动 presto 服务的用户要有可读权限)
  3. 在presto 的 home 路径的 etc 路径下新建配置文件event-listener.properties,文件的内容如下:
event-listener.name=query-event-listener
kafka.bootstrap.servers=10.82.4.11:9092
kafka.topic=prestojob

注意:这里需要部署到所有的 presto 服务节点,然后重启 presto 服务生效。

7.验证效果

使用 presto 执行一条查询语句“select 1”,对应的日志输出到 kafka 的日志内容如下:

{"queryId": "20240604_090914_00005_bb9j6","querySql": "select 1","queryState": "FINISHED","queryUser": "presto","createTime": 1717492154262,"endTime": 1717492154326,"startTime": 1717492154264,"analysisTime": 16,"cpuTime": 6,"queuedTime": 1,"wallTime": 21,"completedSplits": 17,"cumulativeMemory": 0.0,"outputBytes": 5,"outputRows": 1,"totalBytes": 0,"totalRows": 0,"writtenBytes": 0,"writtenRows": 0}

后续就可以直接消费 kafka 里面的数据,实现自定义的处理。

4.结尾总结

自定义Event Listener是Presto生态系统中一个强大而灵活的功能,它不仅增强了系统的可观测性,还提供了与外部系统交互的能力。通过本文的介绍和示例,希望能激发读者在自己的Presto部署中探索和实现自定义Event Listener,从而更好地满足业务需求和优化数据处理流程。

本文章转载微信公众号@涤生大数据

#你可能也喜欢这些API文章!