所有文章 > 学习各类API > 探索 Flink SQL Gateway REST API

探索 Flink SQL Gateway REST API

Apache Flink 中的 SQL 网关提供了一种从 SQL 客户端以外的位置在 Flink 中运行 SQL 的方法。这涵盖了使用JDBC驱动程序(支持大量客户端连接)、通过HiveServer2端点的Hive客户端访问,以及直接针对REST端点的操作,都是为了找到最适合您需求的产品。

在我继续学习 Flink SQL 的旅程时,我想知道的一件事是,在生产场景中如何提交 Flink SQL 作业。在我看来,SQL Gateway的REST API会是一个很好的选择。您将 SQL 代码放在源代码管理下的文件中,然后使用部署管道针对终端节点提交该 SQL。值得注意的是,建议在将生产作业部署到 Flink 时使用应用程序模式,而 SQL 客户端和网关尚不支持此功能。当前有一个FLIP正在讨论中,但如果您倾向于使用应用程序模式,那么就需要将SQL打包在JAR文件中进行部署。当然,您也可以选择继续使用SQL客户端或网关,不过需要注意的是,在会话模式下运行作业时会存在一些限制,主要就是没有资源隔离。

在本文中,我将向您展示如何使用终端节点,包括使用 Postman 工具探索它,使用 HTTPie 从 shell 调用终端节点,最后使用一个可行的概念验证脚本来执行脚本中的语句。

该 API 有文档记录,有两个版本,每个版本都有自己的 OpenAPI YAML 规范。我将在此处查看 <span class=“inline-code”>v2</span>。请注意,根据文档,该规范目前仍处于实验阶段。

首先,让我们在本地启动 Flink 集群和 SQL 网关:

./bin/start-cluster.sh

./bin/sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=localhost

Postman是什么

Postman 是一个方便的工具,用于使用 API 执行大量有用的操作。在这里,我只是借助它来创建针对终端节点的示例调用,以便快速了解它们之间的关系。你可以在 Web 上使用它,但假设你使用的是 Flink 的本地实例(或者至少是不可公开访问的实例),那么你需要桌面版本。请注意,为了访问我们在此处所使用的导入功能,您仍然需要先注册一个免费帐户。

在您的工作区下,单击导入并粘贴 SQL Gateway 的 OpenAPI YAML 文件的 URL (<span class=“inline-code”>https://nightlies.apache.org/flink/flink-docs-master/generated/rest_v2_sql_gateway.yml</span>),该文件在文档页面上链接到该文件。将其作为 Postman 集合导入。

CleanShot 2024-03-07 于 17.21.51 1.png

现在,您将在 Postman 集合下看到所有 API 终端节点的列表,以及每个终端节点的预创建调用。转到 Environments -> Globals 并使用 SQL Gateway 的值定义 <span class=“inline-code”>baseUrl</span>。如果你在本地运行它,那么这将是 <span class=“inline-code”>http://localhost:8083</span>

CleanShot 2024-03-07 于 17.27.59.png

现在返回 Collections,在 Flink SQL Gateway REST API 文件夹下找到 <span class=“inline-code”>get Info</span> 调用。打开它并点击 发送.您应该会看到如下所示的成功响应:

CleanShot 2024-03-07 于 17.32.59.png

您还可以单击“代码”图标 (<span class=“inline-code”></></span> ) 查看各种不同语言和工具(包括 cURL 和 HTTPie)的调用。虽然目前这还不算是什么创新之举,但当你开始使用有效载荷时,就会发现它真的非常便捷。

CleanShot 2024-03-07 于 17.34.00.png

就像我们在上面手动填充全局变量 <span class=“inline-code”>baseURL</span>一样,我们也可以从一个调用中获取响应,并在另一个调用中使用它。这非常有用,因为我们需要使用 REST API 返回的两个变量(<span class=“inline-code”>sessionHandle</span> 和 <span class=“inline-code”>operationHandle</span>)。要在 Postman 中执行此操作,请将以下内容添加到请求窗格的 Tests (测试) 选项卡中:

var jsonData = JSON.parse(responseBody); postman.setEnvironmentVariable("sessionHandle", jsonData.sessionHandle);

这假设要填充的变量名为 <span class=“inline-code”>sessionHandle</span>并且它在响应的名为 <span class=“inline-code”>sessionHandle</span> 的根键中返回。它是:

$ http  --follow --timeout 3600 POST 'localhost:8083/sessions' \
Content-Type:'application/json' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 56
content-type: application/json; charset=UTF-8

{
"sessionHandle": "190edef5-df00-4182-be0e-431737b1e93b"
}

在设置变量之后,你可以通过在双大括号中对其进行引用,来在其他调用中使用这个变量。如下所示:

CleanShot 2024-03-07 于 17.46.23.png

我在此分享了我的Postman集合的副本,其中已经为您完成了上述的变量配置。

现在,我们来了解一下如何从头开始将 SQL 语句实际提交到网关的工作流。

使用 Flink SQL 网关运行 SQL 语句

从本质上讲,最少的步骤如下。

  1. 建立会话(设置了可选配置参数)
  2. 提交 SQL 语句,这将生成一个 Operation。
  3. 检查 Operation (操作) 的状态,直到它完成
  4. 获取 Operation 的结果。

以下是执行每个操作的方法,使用 HTTPie 作为示例客户端并显示响应。我使用bash变量来存储会话和操作句柄的值。

检查连接和 Flink 版本

$ http --body --follow --timeout 3600 GET 'localhost:8083/info' \
Accept:'application/json'
{
"productName": "Apache Flink",
"version": "1.18.1"
}

1. 创建会话

POST /会话

$ printf '{
"properties": {
"execution.runtime-mode": "batch"
}
}'| http --follow --timeout 3600 POST 'localhost:8083/sessions' \
Content-Type:'application/json' \
Accept:'application/json'

HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 56
content-type: application/json; charset=UTF-8

{
"sessionHandle": "e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28"
}

$ export SESSIONHANDLE="e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28"

[可选]验证会话并读取会话配置

请注意,此处的 <span class=“inline-code”>runtime-mode</span> 是从上面在会话创建中传递的 <span class=“inline-code”>properties</span> 中设置的。

$ http --ignore-stdin --form --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 2129
content-type: application/json; charset=UTF-8
Copy
{
"properties": {
"env.java.opts.all": "--add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNN
AMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=AL
L-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-op
ens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/ja
va.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.ut
il.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED",
"execution.attached": "true",
"execution.runtime-mode": "batch",
"execution.savepoint-restore-mode": "NO_CLAIM",
"execution.savepoint.ignore-unclaimed-state": "false",
"execution.shutdown-on-attached-exit": "false",
"execution.target": "remote",
"jobmanager.bind-host": "localhost",
"jobmanager.execution.failover-strategy": "region",
"jobmanager.memory.process.size": "1600m",
"jobmanager.rpc.address": "localhost",
"jobmanager.rpc.port": "6123",
"parallelism.default": "1",
"pipeline.classpaths": "",
"pipeline.jars": "file:/Users/rmoff/flink/flink-1.18.1/opt/flink-python-1.18.1.jar",
"rest.address": "localhost",
"rest.bind-address": "localhost",
"sql-gateway.endpoint.rest.address": "localhost",
"table.catalog-store.file.path": "./conf/catalogs",
"table.catalog-store.kind": "file",
"table.resources.download-dir": "/var/folders/7x/nscwrz557vlcd_ydgt7d5wt00000gn/T/sql-gateway-e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28",
"taskmanager.bind-host": "localhost",
"taskmanager.host": "localhost",
"taskmanager.memory.process.size": "1728m",
"taskmanager.numberOfTaskSlots": "1"
}
}

2. 提交 SQL 语句

$ printf '{
"statement": "CREATE TABLE t_foo WITH ( '\''connector'\'' = '\''filesystem'\'', '\''path'\'' = '\''file:///tmp/flink-test'\'', '\''format'\'' = '\''csv'\'', '\''csv.field-delimiter'\'' = '\'','\'' ) AS SELECT name, COUNT(*) AS cnt FROM (VALUES ('\''Bob'\''), ('\''Alice'\''), ('\''Greg'\''), ('\''Bob'\'')) AS NameTable(name) GROUP BY name;"
}'| http --follow --timeout 3600 POST 'localhost:8083/sessions/'$SESSIONHANDLE'/statements' \
Content-Type:'application/json' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 58
content-type: application/json; charset=UTF-8

{
"operationHandle": "ba45649c-07b2-4b1c-a190-df3631b53549"
}

$ export OPERATIONHANDLE="ba45649c-07b2-4b1c-a190-df3631b53549"

3. 获取 Statement 执行状态

$ http --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/status' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 21
content-type: application/json; charset=UTF-8

{
"status": "FINISHED"
}

4. 获得结果

$ http --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/result/0?rowFormat=JSON' \
Accept:'application/json'

HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 483
content-type: application/json; charset=UTF-8
Copy
{
"resultType": "PAYLOAD",
"isQueryResult": false,
"jobID": "fb1a5f06643364bc82a9a4e0bd3e9c10",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": [
{
"kind": "INSERT",
"fields": [
"fb1a5f06643364bc82a9a4e0bd3e9c10"
]
}
]
},
"nextResultUri": "/v2/sessions/41ec5bb8-3574-4c6b-9b47-7bf9aa021ccc/operations/9bb84ff8-89a6-4f94-8dcc-e9125091c63b/result/1?rowFormat=JSON"
}

因为 <span class=“inline-code”>resultType</span> 不是 <span class=“inline-code”>EOS</span>并且有一个值<span class=“inline-code”>nextResultUri</span>它告诉我们还有更多要获取 – 在 <span class=“inline-code”>nextResultUri</span>中指定的位置:

{
"resultType": "EOS",
"isQueryResult": false,
"jobID": "fb1a5f06643364bc82a9a4e0bd3e9c10",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": []
}
}

5. 整理

正确的做法是在会话结束后将其关闭:

$ http --follow --timeout 3600 DELETE 'localhost:8083/sessions/'$SESSIONHANDLE \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 19
content-type: application/json; charset=UTF-8

{
"status": "CLOSED"
}

使用 Shell 脚本执行 Flink SQL

我们可以使用上述所有内容和一些 bash 来编写脚本:

host='localhost:8083'

SESSIONHANDLE=$(printf '{
"properties": {
"execution.runtime-mode": "batch"
}
}'| http --follow --timeout 3600 POST $host'/sessions' \
Content-Type:'application/json' \
Accept:'application/json' | jq -r '.sessionHandle')

echo "Got session handle: "$SESSIONHANDLE


SQL_STATEMENT_ONE_LINE=$(tr '\n' ' ' < rmoff.sql)

OPERATIONHANDLE=$(printf '{
"statement": "%s"
}' "$SQL_STATEMENT_ONE_LINE" | http --follow --timeout 3600 POST $host'/sessions/'$SESSIONHANDLE'/statements' \
Content-Type:'application/json' \
Accept:'application/json' | jq -r '.operationHandle')

echo "Got operation handle: "$OPERATIONHANDLE

while [ 1 -eq 1 ]
do
STATUS=$(http --follow --timeout 3600 GET $host'/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/status' \
Accept:'application/json' | jq -r '.status')
echo $STATUS
if [ $STATUS != "RUNNING" ]; then
break
fi
sleep 2
done

echo "\n\n----- 📃 RESULTS 📃 -----\n"
URL='/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/result/0?rowFormat=JSON'
while [ 1 -eq 1 ]
do
RESULT=$(http --follow --timeout 3600 GET $host$URL \
Accept:'application/json')
echo $RESULT | jq '.'
URL=$(echo $RESULT | jq -r '.nextResultUri // ""')
if [ -z $URL ]; then
break
fi
echo "(next result chunk 👇)"
done

echo "Closing session 🗑️"
http --follow --timeout 3600 DELETE $host'/sessions/'$SESSIONHANDLE

我们将实际的 SQL 放入一个名为 <span class=“inline-code”>rmoff.sql</span> 的文件中:

CREATE TABLE t_foo WITH (
'connector' = 'filesystem',
'path' = 'file:///tmp/flink-test',
'format' = 'csv',
'csv.field-delimiter' = ','
) AS SELECT name, COUNT(*) AS cnt FROM (
VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')
) AS NameTable(name) GROUP BY name;

现在,当我们运行 shell 脚本时,我们得到这个:

Got session handle: 8d7dc671-d7aa-4ddb-ba04-706b0311aa69
Got operation handle: 3aa41360-bd21-453a-a759-b54db69c81ae
RUNNING
FINISHED


----- 📃 RESULTS 📃 -----

{
"resultType": "PAYLOAD",
"isQueryResult": false,
"jobID": "615365befee24c53d1efa195f9d72eee",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": [
{
"kind": "INSERT",
"fields": [
"615365befee24c53d1efa195f9d72eee"
]
}
]
},
"nextResultUri": "/v2/sessions/8d7dc671-d7aa-4ddb-ba04-706b0311aa69/operations/3aa41360-bd21-453a-a759-b54db69c81ae/result/1?rowFormat=JSON"
}
(next result chunk 👇)
{
"resultType": "EOS",
"isQueryResult": false,
"jobID": "615365befee24c53d1efa195f9d72eee",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": []
}
}
Closing session 🗑️
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 19
content-type: application/json; charset=UTF-8

{
"status": "CLOSED"
}

我们实际运行的SQL语句会将一个CSV文件写入<span class="inline-code">/tmp</span>文件夹,因此让我们检查一下该文件是否已成功生成并有效:

$ ls -lrt /tmp/flink-test && cat /tmp/flink-test/*
-rw-r--r--@ 1 rmoff wheel 21 7 Mar 18:07 part-f50c05ae-e39e-40c1-8b00-b1a1ebfced0d-task-0-file-0
Alice,1
Bob,2
Greg,1

很好,这完全符合我们的预期。

总结

如果您想了解有关 Flink SQL 的更多信息,您可能有兴趣了解有关 Catalog 角色的更多信息、使用 Catalog 的动手示例,或者深入了解如何将 JAR 与 Flink SQL 结合使用。

您可能还想尝试一下我们的Decodable服务,它提供了完全托管的Apache Flink和Debezium。通过我们的CLI和API,您可以轻松地使用Flink SQL来部署管道。

原文链接:https://www.decodable.co/blog/exploring-the-flink-sql-gateway-rest-api

#你可能也喜欢这些API文章!