更新时间:2024-01-19 GMT+08:00

获取SQL节点的输出结果值

当您在数据开发模块进行作业开发,需要获取SQL节点的输出结果值,并将结果应用于后续作业节点或判断时,可参考本教程获取SQL节点的输出结果。

场景说明

使用EL表达式#{Job.getNodeOutput("前一节点名")}获取的前一节点的输出结果时,输出结果为二维数组形式 ,形如[["Dean",...,"08"],...,["Smith",...,"53"]]所示。为获取其中的值,本案例提供了如表1所示的两个常见方法示例。

表1 获取结果值常见方法

方法

关键配置

适用场景要求

通过StringUtil提取输出结果值

当SQL节点的输出结果只有一个字段,形如[["11"]]所示时,可以通过StringUtil内嵌对象EL表达式分割二维数组,获取前一节点输出的字段值:

#{StringUtil.split(StringUtil.split(StringUtil.split(Job.getNodeOutput("前一节点名"),"]")[0],"[")[0],"\\"")[0]}

通过StringUtil提取输出结果值配置简单,但对适用场景有如下要求:

  • 前一SQL节点的输出结果只有一个字段,形如[["11"]]所示。
  • 输出结果值数据类型为String,需要应用场景支持String数据类型。例如当需要使用IF条件判断输出结果值的数值大小时,不支持String类型,则不能使用本方法。

通过For Each节点提取输出结果值

通过For Each节点,循环获取数据集中二维数组的值:

  • For Each节点数据集:#{Job.getNodeOutput('前一节点名')}
  • For Each节点子作业参数:#{Loop.current[索引]}

通过For Each节点输出结果值适用场景更广泛,但需将作业拆分为主作业和子作业。

通过StringUtil提取输出结果值

场景说明

通过StringUtil内嵌对象EL表达式分割二维数组结果,获取前一节点输出的字段值,输出结果类型为String。

本例中,MRS Hive SQL节点返回单字段二维数组,Kafka Client节点发送的数据定义为StringUtil内嵌对象EL表达式,通过此表达式即可分割二维数组,获取MRS Hive SQL节点输出的字段值。

为便于查看最终获得的结果值,本例选择Kafka Client节点进行演示。在实际使用中,您可以根据您的业务需求选择后续节点类型,在节点任务中应用StringUtil内嵌对象EL表达式,即可获取前一节点返回的数据值。

图1 作业样例
其中,Kafka Client节点的关键配置为“发送数据”参数,取值如下:
#{StringUtil.split(StringUtil.split(StringUtil.split(Job.getNodeOutput("count95"),"]")[0],"[")[0],"\\"")[0]}

配置方法

  1. 登录DataArts Studio控制台,找到所需要的DataArts Studio实例,单击实例卡片上的“进入控制台”,进入概览页面。
  2. 选择“空间管理”页签,在工作空间列表中,找到所需要的工作空间,单击工作空间的“数据开发”,系统跳转至数据开发页面。
  3. 构造原始表格student_score。新建临时Hive SQL脚本,选择Hive连接和数据库后,粘贴如下SQL语句并运行,运行成功后即可删除此脚本。

    CREATE TABLE `student_score` (`name` String COMMENT '', `score` INT COMMENT '');
    INSERT INTO
        student_score
    VALUES
        ('ZHAO', '90'),
        ('QIAN', '88'),
        ('SUN', '93'),
        ('LI', '94'),
        ('ZHOU', '85'),
        ('WU', '79'),
        ('ZHENG', '87'),
        ('WANG', '97'),
        ('FENG', '83'),
        ('CEHN', '99');

  4. 新建MRS Hive SQL节点调用的Hive SQL脚本。新建Hive SQL脚本,选择Hive连接和数据库后,粘贴如下SQL语句并提交版本,脚本命名为count95。

    --从student_score表中统计成绩在95分以上的人数--
    SELECT count(*) FROM student_score WHERE score> "95" ; 

  5. 在“作业开发”页面,新建数据开发作业。选择一个MRS Hive SQL节点和一个Kafka Client节点,选中连线图标并拖动,编排如图1所示的作业。
  6. 配置MRS Hive SQL节点参数。SQL脚本选择4中提交的脚本count95,选择Hive连接和数据库。

    图2 配置MRS Hive SQL节点参数

  7. 配置Kafka Client节点参数。发送数据定义为:#{StringUtil.split(StringUtil.split(StringUtil.split(Job.getNodeOutput("count95"),"]")[0],"[")[0],"\\"")[0]},选择Kafka连接和Topic名称。

    图3 配置Kafka Client节点参数

  8. 作业节点配置完成后,选择测试运行。待作业测试运行成功后,在Kafka Client节点上右键查看日志,可以发现MRS Hive SQL节点返回的二维数组[["2"]]已被清洗为2

    您可以将Kafka Client节点中的发送数据定义为#{Job.getNodeOutput("count95")},然后作业运行后查看Kafka Client节点日志,则可以验证MRS Hive SQL节点返回的结果为二维数组[["2"]]

    图4 查看Kafka Client节点日志

通过For Each节点提取输出结果值

场景说明

结合For Each节点及其支持的Loop内嵌对象EL表达式#{Loop.current[0]},循环获取前一节点输出的结果值。

本例中,MRS Hive SQL节点返回多字段的二维数组,选择For Each节点和EL表达式#{Loop.current[]},再通过For Each循环调用Kafka Client节点子作业,Kafka Client节点发送的数据也定义为#{Loop.current[]},通过此配置即可获取MRS Hive SQL节点输出的结果值。

为便于查看最终获得的结果值,本例中For Each节点子作业选择Kafka Client节点进行演示。在实际使用中,您可以根据您的业务需求选择子作业节点类型,在节点任务中应用Loop内嵌对象EL表达式,即可获取For Each前一节点返回的结果值。

For Each节点主作业编排如图5所示。其中,For Each节点的关键配置如下:
  • 数据集:数据集就是HIVE SQL节点的Select语句的执行结果。使用EL表达式#{Job.getNodeOutput("select95")},其中select95为前一个节点的名称。
  • 子作业参数:子作业参数是子作业中定义的参数名,然后在主作业中定义的参数值,传递到子作业以供使用。此处子作业参数名定义为namescore,其值为分别为数据集中的第一列和第二列数值,使用EL表达式#{Loop.current[0]}#{Loop.current[1]}
图5 主作业样例

而For Each节点中所选的子作业,则需要定义For Each节点中的子作业参数名,以便让主作业识别参数定义,作业如图6所示。

图6 子作业样例

配置方法

开发子作业

  1. 登录DataArts Studio控制台,找到所需要的DataArts Studio实例,单击实例卡片上的“进入控制台”,进入概览页面。
  2. 选择“空间管理”页签,在工作空间列表中,找到所需要的工作空间,单击工作空间的“数据开发”,系统跳转至数据开发页面。
  3. 在“作业开发”页面,新建数据开发子作业EL_test_slave。选择一个Kafka Client节点,并配置作业参数,编排图6所示的作业。

    此处需将参数名填写为namescore,仅用于主作业的For Each节点识别子作业参数;参数值无需填写。

  4. 配置Kafka Client节点参数。发送数据定义为:${name}: ${score},选择Kafka连接和Topic名称。

    此处不能使用EL表达式#{Job.getParam("job_param_name")} ,因为此表达式只能直接获取当前作业里配置的参数的value,并不能获取到父作业传递过来的参数值,也不能获取到工作空间里面配置的全局变量,作用域仅为本作业。

    而表达式${job_param_name},既可以获取到父作业传递过来的参数值,也可以获取到全局配置的变量。

    图7 配置Kafka Client节点参数

  5. 配置完成后提交子作业。

开发主作业

  1. 在“作业开发”主页面,进入脚本开发。
  2. 构造原始表格student_score。新建临时Hive SQL脚本,选择Hive连接和数据库后,粘贴如下SQL语句并运行,运行成功后即可删除此脚本。

    CREATE TABLE `student_score` (`name` String COMMENT '', `score` INT COMMENT '');
    INSERT INTO
        student_score
    VALUES
        ('ZHAO', '90'),
        ('QIAN', '88'),
        ('SUN', '93'),
        ('LI', '94'),
        ('ZHOU', '85'),
        ('WU', '79'),
        ('ZHENG', '87'),
        ('WANG', '97'),
        ('FENG', '83'),
        ('CEHN', '99');

  3. 新建MRS Hive SQL节点调用的Hive SQL脚本。新建Hive SQL脚本,选择Hive连接和数据库后,粘贴如下SQL语句并提交版本,脚本命名为select95。

    --从student_score表中展示成绩在95分以上的姓名和成绩--
    SELECT * FROM student_score WHERE score> "95" ; 

  4. 在“作业开发”页面,新建数据开发主作业EL_test_master。选择一个HIVE SQL节点和一个For Each节点,选中连线图标并拖动,编排图5所示的作业。
  5. 配置MRS Hive SQL节点参数。SQL脚本选择3中提交的脚本select95,选择Hive连接和数据库。

    图8 配置MRS Hive SQL节点参数

  6. 配置For Each节点属性,如图9所示。

    • 子作业:子作业选择已经开发完成的子作业EL_test_slave
    • 数据集:数据集就是HIVE SQL节点的Select语句的执行结果。使用EL表达式#{Job.getNodeOutput("select95")},其中select95为前一个节点的名称。
    • 子作业参数:子作业参数是子作业中定义的参数名,然后在主作业中定义的参数值,传递到子作业以供使用。此处子作业参数名定义为namescore,其值为分别为数据集中的第一列和第二列数值,使用EL表达式#{Loop.current[0]}#{Loop.current[1]}
    图9 配置For Each节点参数

  7. 配置完成后保存作业。

测试运行主作业

  1. 单击主作业EL_test_master画布上方的“测试运行”按钮,测试作业运行情况。主作业运行后,会通过For Each节点循环调用运行子作业EL_test_slave
  2. 单击左侧导航栏中的“实例监控”,进入实例监控中查看作业运行结果。
  3. 待作业运行完成后,从实例监控中找到子作业EL_test_slave的循环运行结果,如图10所示。

    图10 子作业运行结果

  4. 查看子作业EL_test_slave在循环运行中的结果日志,从日志中可以看到,结合For Each节点及其支持的Loop内嵌对象EL表达式,成功获取For Each前一节点输出的结果值。

    图11 查看日志