14个文本转图像AI API
探索 Flink SQL Gateway REST API
Apache Flink 中的 SQL 网关提供了一种从 SQL 客户端以外的位置在 Flink 中运行 SQL 的方法。这涵盖了使用JDBC驱动程序(支持大量客户端连接)、通过HiveServer2端点的Hive客户端访问,以及直接针对REST端点的操作,都是为了找到最适合您需求的产品。
在我继续学习 Flink SQL 的旅程时,我想知道的一件事是,在生产场景中如何提交 Flink SQL 作业。在我看来,SQL Gateway的REST API会是一个很好的选择。您将 SQL 代码放在源代码管理下的文件中,然后使用部署管道针对终端节点提交该 SQL。值得注意的是,建议在将生产作业部署到 Flink 时使用应用程序模式,而 SQL 客户端和网关尚不支持此功能。当前有一个FLIP正在讨论中,但如果您倾向于使用应用程序模式,那么就需要将SQL打包在JAR文件中进行部署。当然,您也可以选择继续使用SQL客户端或网关,不过需要注意的是,在会话模式下运行作业时会存在一些限制,主要就是没有资源隔离。
在本文中,我将向您展示如何使用终端节点,包括使用 Postman 工具探索它,使用 HTTPie 从 shell 调用终端节点,最后使用一个可行的概念验证脚本来执行脚本中的语句。
该 API 有文档记录,有两个版本,每个版本都有自己的 OpenAPI YAML 规范。我将在此处查看 <span class=“inline-code”>v2</span>。请注意,根据文档,该规范目前仍处于实验阶段。
首先,让我们在本地启动 Flink 集群和 SQL 网关:
./bin/start-cluster.sh
./bin/sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=localhost
Postman是什么
Postman 是一个方便的工具,用于使用 API 执行大量有用的操作。在这里,我只是借助它来创建针对终端节点的示例调用,以便快速了解它们之间的关系。你可以在 Web 上使用它,但假设你使用的是 Flink 的本地实例(或者至少是不可公开访问的实例),那么你需要桌面版本。请注意,为了访问我们在此处所使用的导入功能,您仍然需要先注册一个免费帐户。
在您的工作区下,单击导入并粘贴 SQL Gateway 的 OpenAPI YAML 文件的 URL (<span class=“inline-code”>https://nightlies.apache.org/flink/flink-docs-master/generated/rest_v2_sql_gateway.yml</span>)
,该文件在文档页面上链接到该文件。将其作为 Postman 集合导入。
现在,您将在 Postman 集合下看到所有 API 终端节点的列表,以及每个终端节点的预创建调用。转到 Environments -> Globals 并使用 SQL Gateway 的值定义 <span class=“inline-code”>baseUrl</span>
。如果你在本地运行它,那么这将是 <span class=“inline-code”>http://localhost:8083</span>
现在返回 Collections,在 Flink SQL Gateway REST API 文件夹下找到 <span class=“inline-code”>get Info</span>
调用。打开它并点击 发送.您应该会看到如下所示的成功响应:
您还可以单击“代码”图标 (<span class=“inline-code”></></span> )
查看各种不同语言和工具(包括 cURL 和 HTTPie)的调用。虽然目前这还不算是什么创新之举,但当你开始使用有效载荷时,就会发现它真的非常便捷。
就像我们在上面手动填充全局变量 <span class=“inline-code”>baseURL</span>
一样,我们也可以从一个调用中获取响应,并在另一个调用中使用它。这非常有用,因为我们需要使用 REST API 返回的两个变量(<span class=“inline-code”>sessionHandle</span> 和 <span class=“inline-code”>operationHandle</span>)
。要在 Postman 中执行此操作,请将以下内容添加到请求窗格的 Tests (测试) 选项卡中:
var jsonData = JSON.parse(responseBody); postman.setEnvironmentVariable("sessionHandle", jsonData.sessionHandle);
这假设要填充的变量名为 <span class=“inline-code”>sessionHandle</span>
并且它在响应的名为 <span class=“inline-code”>sessionHandle</span>
的根键中返回。它是:
$ http --follow --timeout 3600 POST 'localhost:8083/sessions' \
Content-Type:'application/json' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 56
content-type: application/json; charset=UTF-8
{
"sessionHandle": "190edef5-df00-4182-be0e-431737b1e93b"
}
在设置变量之后,你可以通过在双大括号中对其进行引用,来在其他调用中使用这个变量。如下所示:
我在此分享了我的Postman集合的副本,其中已经为您完成了上述的变量配置。
现在,我们来了解一下如何从头开始将 SQL 语句实际提交到网关的工作流。
使用 Flink SQL 网关运行 SQL 语句
从本质上讲,最少的步骤如下。
- 建立会话(设置了可选配置参数)
- 提交 SQL 语句,这将生成一个 Operation。
- 检查 Operation (操作) 的状态,直到它完成
- 获取 Operation 的结果。
以下是执行每个操作的方法,使用 HTTPie 作为示例客户端并显示响应。我使用bash变量来存储会话和操作句柄的值。
检查连接和 Flink 版本
$ http --body --follow --timeout 3600 GET 'localhost:8083/info' \
Accept:'application/json'
{
"productName": "Apache Flink",
"version": "1.18.1"
}
1. 创建会话
POST /会话
$ printf '{
"properties": {
"execution.runtime-mode": "batch"
}
}'| http --follow --timeout 3600 POST 'localhost:8083/sessions' \
Content-Type:'application/json' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 56
content-type: application/json; charset=UTF-8
{
"sessionHandle": "e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28"
}
$ export SESSIONHANDLE="e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28"
[可选]验证会话并读取会话配置
请注意,此处的 <span class=“inline-code”>runtime-mode</span>
是从上面在会话创建中传递的 <span class=“inline-code”>properties</span>
中设置的。
$ http --ignore-stdin --form --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 2129
content-type: application/json; charset=UTF-8
Copy
{
"properties": {
"env.java.opts.all": "--add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNN
AMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=AL
L-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-op
ens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/ja
va.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.ut
il.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED",
"execution.attached": "true",
"execution.runtime-mode": "batch",
"execution.savepoint-restore-mode": "NO_CLAIM",
"execution.savepoint.ignore-unclaimed-state": "false",
"execution.shutdown-on-attached-exit": "false",
"execution.target": "remote",
"jobmanager.bind-host": "localhost",
"jobmanager.execution.failover-strategy": "region",
"jobmanager.memory.process.size": "1600m",
"jobmanager.rpc.address": "localhost",
"jobmanager.rpc.port": "6123",
"parallelism.default": "1",
"pipeline.classpaths": "",
"pipeline.jars": "file:/Users/rmoff/flink/flink-1.18.1/opt/flink-python-1.18.1.jar",
"rest.address": "localhost",
"rest.bind-address": "localhost",
"sql-gateway.endpoint.rest.address": "localhost",
"table.catalog-store.file.path": "./conf/catalogs",
"table.catalog-store.kind": "file",
"table.resources.download-dir": "/var/folders/7x/nscwrz557vlcd_ydgt7d5wt00000gn/T/sql-gateway-e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28",
"taskmanager.bind-host": "localhost",
"taskmanager.host": "localhost",
"taskmanager.memory.process.size": "1728m",
"taskmanager.numberOfTaskSlots": "1"
}
}
2. 提交 SQL 语句
$ printf '{
"statement": "CREATE TABLE t_foo WITH ( '\''connector'\'' = '\''filesystem'\'', '\''path'\'' = '\''file:///tmp/flink-test'\'', '\''format'\'' = '\''csv'\'', '\''csv.field-delimiter'\'' = '\'','\'' ) AS SELECT name, COUNT(*) AS cnt FROM (VALUES ('\''Bob'\''), ('\''Alice'\''), ('\''Greg'\''), ('\''Bob'\'')) AS NameTable(name) GROUP BY name;"
}'| http --follow --timeout 3600 POST 'localhost:8083/sessions/'$SESSIONHANDLE'/statements' \
Content-Type:'application/json' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 58
content-type: application/json; charset=UTF-8
{
"operationHandle": "ba45649c-07b2-4b1c-a190-df3631b53549"
}
$ export OPERATIONHANDLE="ba45649c-07b2-4b1c-a190-df3631b53549"
3. 获取 Statement 执行状态
$ http --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/status' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 21
content-type: application/json; charset=UTF-8
{
"status": "FINISHED"
}
4. 获得结果
$ http --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/result/0?rowFormat=JSON' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 483
content-type: application/json; charset=UTF-8
Copy
{
"resultType": "PAYLOAD",
"isQueryResult": false,
"jobID": "fb1a5f06643364bc82a9a4e0bd3e9c10",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": [
{
"kind": "INSERT",
"fields": [
"fb1a5f06643364bc82a9a4e0bd3e9c10"
]
}
]
},
"nextResultUri": "/v2/sessions/41ec5bb8-3574-4c6b-9b47-7bf9aa021ccc/operations/9bb84ff8-89a6-4f94-8dcc-e9125091c63b/result/1?rowFormat=JSON"
}
因为 <span class=“inline-code”>resultType</span>
不是 <span class=“inline-code”>EOS</span>
并且有一个值<span class=“inline-code”>nextResultUri</span>
它告诉我们还有更多要获取 – 在 <span class=“inline-code”>nextResultUri</span>
中指定的位置:
{
"resultType": "EOS",
"isQueryResult": false,
"jobID": "fb1a5f06643364bc82a9a4e0bd3e9c10",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": []
}
}
5. 整理
正确的做法是在会话结束后将其关闭:
$ http --follow --timeout 3600 DELETE 'localhost:8083/sessions/'$SESSIONHANDLE \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 19
content-type: application/json; charset=UTF-8
{
"status": "CLOSED"
}
使用 Shell 脚本执行 Flink SQL:
我们可以使用上述所有内容和一些 bash 来编写脚本:
host='localhost:8083'
SESSIONHANDLE=$(printf '{
"properties": {
"execution.runtime-mode": "batch"
}
}'| http --follow --timeout 3600 POST $host'/sessions' \
Content-Type:'application/json' \
Accept:'application/json' | jq -r '.sessionHandle')
echo "Got session handle: "$SESSIONHANDLE
SQL_STATEMENT_ONE_LINE=$(tr '\n' ' ' < rmoff.sql)
OPERATIONHANDLE=$(printf '{
"statement": "%s"
}' "$SQL_STATEMENT_ONE_LINE" | http --follow --timeout 3600 POST $host'/sessions/'$SESSIONHANDLE'/statements' \
Content-Type:'application/json' \
Accept:'application/json' | jq -r '.operationHandle')
echo "Got operation handle: "$OPERATIONHANDLE
while [ 1 -eq 1 ]
do
STATUS=$(http --follow --timeout 3600 GET $host'/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/status' \
Accept:'application/json' | jq -r '.status')
echo $STATUS
if [ $STATUS != "RUNNING" ]; then
break
fi
sleep 2
done
echo "\n\n----- 📃 RESULTS 📃 -----\n"
URL='/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/result/0?rowFormat=JSON'
while [ 1 -eq 1 ]
do
RESULT=$(http --follow --timeout 3600 GET $host$URL \
Accept:'application/json')
echo $RESULT | jq '.'
URL=$(echo $RESULT | jq -r '.nextResultUri // ""')
if [ -z $URL ]; then
break
fi
echo "(next result chunk 👇)"
done
echo "Closing session 🗑️"
http --follow --timeout 3600 DELETE $host'/sessions/'$SESSIONHANDLE
我们将实际的 SQL 放入一个名为 <span class=“inline-code”>rmoff.sql</span>
的文件中:
CREATE TABLE t_foo WITH (
'connector' = 'filesystem',
'path' = 'file:///tmp/flink-test',
'format' = 'csv',
'csv.field-delimiter' = ','
) AS SELECT name, COUNT(*) AS cnt FROM (
VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')
) AS NameTable(name) GROUP BY name;
现在,当我们运行 shell 脚本时,我们得到这个:
Got session handle: 8d7dc671-d7aa-4ddb-ba04-706b0311aa69
Got operation handle: 3aa41360-bd21-453a-a759-b54db69c81ae
RUNNING
FINISHED
----- 📃 RESULTS 📃 -----
{
"resultType": "PAYLOAD",
"isQueryResult": false,
"jobID": "615365befee24c53d1efa195f9d72eee",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": [
{
"kind": "INSERT",
"fields": [
"615365befee24c53d1efa195f9d72eee"
]
}
]
},
"nextResultUri": "/v2/sessions/8d7dc671-d7aa-4ddb-ba04-706b0311aa69/operations/3aa41360-bd21-453a-a759-b54db69c81ae/result/1?rowFormat=JSON"
}
(next result chunk 👇)
{
"resultType": "EOS",
"isQueryResult": false,
"jobID": "615365befee24c53d1efa195f9d72eee",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": []
}
}
Closing session 🗑️
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 19
content-type: application/json; charset=UTF-8
{
"status": "CLOSED"
}
我们实际运行的SQL语句会将一个CSV文件写入<span class="inline-code">/tmp</span>
文件夹,因此让我们检查一下该文件是否已成功生成并有效:
$ ls -lrt /tmp/flink-test && cat /tmp/flink-test/*
-rw-r--r--@ 1 rmoff wheel 21 7 Mar 18:07 part-f50c05ae-e39e-40c1-8b00-b1a1ebfced0d-task-0-file-0
Alice,1
Bob,2
Greg,1
很好,这完全符合我们的预期。
总结
如果您想了解有关 Flink SQL 的更多信息,您可能有兴趣了解有关 Catalog 角色的更多信息、使用 Catalog 的动手示例,或者深入了解如何将 JAR 与 Flink SQL 结合使用。
您可能还想尝试一下我们的Decodable服务,它提供了完全托管的Apache Flink和Debezium。通过我们的CLI和API,您可以轻松地使用Flink SQL来部署管道。
原文链接:https://www.decodable.co/blog/exploring-the-flink-sql-gateway-rest-api