使用SQL Gateway提交Flink SQL作业
本章节适用于MRS 3.6.0-LTS及以后版本。
操作场景
本章节介绍通过REST方式使用SQL Gateway提交Flink SQL作业。
使用SQL Gateway提交Flink SQL作业
- 已安装集群客户端,例如安装目录为“/opt/hadoopclient”。
- 集群已启用Kerberos认证(安全模式)时,需在“客户端安装目录/Flink/flink/conf/flink-conf.yaml”中增加如下配置,集群未启用Kerberos认证(普通模式)时跳过该步骤。
env.java.opts.sql-gateway: -Djava.security.auth.login.config=/客户端安装目录/Flink/flink/conf/jaas.conf
- 以客户端安装用户,登录安装客户端的节点。
- 执行以下命令,切换到客户端安装目录。
cd /opt/hadoopclient
- 执行以下命令初始化环境变量。
source /opt/hadoopclient/bigdata_env
- 执行以下命令,启动一个Yarn session集群,并记录yarn-session ID(yid)。
yarn-session.sh -t ssl/ -nm "session-SQL" -d
- 启动一个SQL Gateway服务,并指定6获取的yarn-session ID。
sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=SQL Gateway服务IP地址 -Dsql-gateway.endpoint.rest.bind-port=SQL Gateway服务端口 -Dsql-gateway.endpoint.rest.port=SQL Gateway服务端口 -Dhigh-availability.cluster-id=yarn-session ID
示例:
sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=localhost -Dsql-gateway.endpoint.rest.bind-port=32325 -Dsql-gateway.endpoint.rest.port=32325 -Dhigh-availability.cluster-id=application_1734003203762_0003
- SQL Gateway服务IP地址:可设置为当前节点IP地址或localhost。
- SQL Gateway服务端口:可设置为未被占用的端口号,如“32325”。
- yarn-session ID:为6获取的yarn-session ID,如“application_ 1734003203762_0003”。
图1 启动SQL Gateway服务
- 执行以下命令查看服务端口是否启动成功。
netstat -an|grep 32325图2 如下图所示,表示启动成功。
- 通过REST向SQL Gateway提交SQL作业。
- 打开session,执行以下命令,记录返回的sessionHandle。
curl -X POST http://SQL Gateway服务IP地址:SQL Gateway服务端口/sessions
- 执行SQL作业,记录返回的operationHandle。
curl -X POST http://SQL Gateway服务IP地址:SQL Gateway服务端口/sessions/${sessionHandle}/statements/ --data '{"statement": "SELECT 1"}'
- 执行以下命令,获取SQL作业的执行结果。
curl -X GET http://SQL Gateway服务IP地址:SQL Gateway服务端口/sessions/${sessionHandle}/operations/${operationHandle}/result/0
图3 提交SQL作业并查询执行结果
- 打开session,执行以下命令,记录返回的sessionHandle。
- 执行以下命令停止SQL Gateway服务。
sql-gateway.sh stop
使用SQL Gateway更多SQL示例
- 创建Catalog
执行以下命令创建Catalog:
curl -X POST http://SQL Gateway服务IP地址:SQL Gateway服务端口/sessions/${sessionHandle}/statements/ --data '{"statement": "CREATE CATALOG hive_catalog WITH ('\''type'\'' = '\''hive'\'','\''default-database'\'' = '\''default'\'','\''hive-version'\'' = '\''3.1.0'\'','\''hive-conf-dir'\'' = '\''/opt/mrsclient/Hive/config/'\'')"}' - 使用Catalog
执行以下命令使用Catalog:
curl -X POST http://SQL Gateway服务IP地址:SQL Gateway服务端口/sessions/${sessionHandle}/statements/ --data '{"statement": "use catalog hive_catalog;"}' - 建表
执行以下命令在Catalog中创建datagen表:
curl -X POST http://SQL Gateway服务IP地址:SQL Gateway服务端口/sessions/${sessionHandle}/statements/ --data '{"statement": "CREATE TABLE datagen ( f_sequence INT) WITH ('\''connector'\'' = '\''datagen'\'','\''rows-per-second'\''='\''1'\'');"}'执行以下命令在Catalog中创建print表:
curl -X POST http://SQL Gateway服务IP地址:SQL Gateway服务端口/sessions/${sessionHandle}/statements/ --data '{"statement": "CREATE TABLE print( f_sequence INT) WITH ('\''connector'\'' = '\''print'\'');"}' - Catalog中获取执行计划
执行以下命令在Catalog中获取执行计划:
curl -X POST http://SQL Gateway服务IP地址:SQL Gateway服务端口/sessions/${sessionHandle}/statements/ --data '{"statement": "explain Insert into print select * from datagen;"}'