更新时间:2023-09-27 GMT+08:00

For Each节点使用介绍

适用场景

当您进行作业开发时,如果某些任务的参数有差异、但处理逻辑全部一致,在这种情况下您可以通过For Each节点避免重复开发作业。

For Each节点可指定一个子作业循环执行,并通过数据集对子作业中的参数进行循环替换。关键参数如下:

  • 子作业:选择需要循环执行的作业。
  • 数据集:即不同子任务的参数值的集合。可以是给定的数据集,如“[['1'],['3'],['2']]”;也可以是EL表达式如“#{Job.getNodeOutput('preNodeName')}”,即前一个节点的输出值。
  • 子作业参数:参数名即子作业中定义的变量;参数值一般配置为数据集中的某组数据,每次运行中会将参数值传递到子作业以供使用。例如参数值填写为:#{Loop.current[0]},即将数据集中每行数据的第一个数值遍历传递给子作业。

For Each节点举例如图1所示 。从图中可以看出,子作业“foreach”中的参数名为“result”,参数值为一维数组数据集“[['1'],['3'],['2']] ”的遍历(即第一次循环为1,第二次循环为3,第三次循环为2)。

图1 for each节点

For Each节点与EL表达式

要想使用好For Each节点,您必须对EL表达式有所了解。EL表达式用法请参考EL表达式

下面为您展示For Each节点常用的一些EL表达式。

  • #{Loop.dataArray} :For循环节点输入的数据集,是一个二维数组。
  • #{Loop.current}:由于For循环节点在处理数据集的时候,是一行一行进行处理的,那Loop.current就表示当前处理到的某行数据,Loop.current是一个一维数组,一般定义格式为#{Loop.current[0]}、#{Loop.current[1]}或其他,0表示遍历到当前行的第一个值。
  • #{Loop.offset}:For循环节点在处理数据集时当前的偏移量,从0开始。
  • #{Job.getNodeOutput('preNodeName')}:获取前面节点的输出。

使用案例

案例场景

因数据规整要求,需要周期性地将多组DLI源数据表数据导入到对应的DLI目的表,如表1所示。

表1 需要导入的列表情况

源数据表名

目的表名

a_new

a

b_2

b

c_3

c

d_1

d

c_5

e

b_1

f

如果通过SQL节点分别执行导入脚本,需要开发大量脚本和节点,导致重复性工作。在这种情况下,我们可以使用For Each节点进行循环作业,节省开发工作量。

配置方法

  1. 准备源表和目的表。为了便于后续作业运行验证,需要先创建DLI源数据表和目的表,并给源数据表插入数据。

    1. 创建DLI表。您可以在DataArts Studio数据开发中,新建DLI SQL脚本执行以下SQL命令,也可以在数据湖探索(DLI)服务控制台中的SQL编辑器中执行以下SQL命令:
      /* 创建数据表 */
      CREATE TABLE a_new (name STRING, score INT) STORED AS PARQUET;
      CREATE TABLE b_2 (name STRING, score INT) STORED AS PARQUET;
      CREATE TABLE c_3 (name STRING, score INT) STORED AS PARQUET;
      CREATE TABLE d_1 (name STRING, score INT) STORED AS PARQUET;
      CREATE TABLE c_5 (name STRING, score INT) STORED AS PARQUET;
      CREATE TABLE b_1 (name STRING, score INT) STORED AS PARQUET;
      CREATE TABLE a (name STRING, score INT) STORED AS PARQUET;
      CREATE TABLE b (name STRING, score INT) STORED AS PARQUET;
      CREATE TABLE c (name STRING, score INT) STORED AS PARQUET;
      CREATE TABLE d (name STRING, score INT) STORED AS PARQUET;
      CREATE TABLE e (name STRING, score INT) STORED AS PARQUET;
      CREATE TABLE f (name STRING, score INT) STORED AS PARQUET;
    2. 给源数据表插入数据。您可以在DataArts Studio数据开发模块中,新建DLI SQL脚本执行以下SQL命令,也可以在数据湖探索(DLI)服务控制台中的SQL编辑器中执行以下SQL命令:
      /* 源数据表插入数据 */
      INSERT INTO a_new VALUES ('ZHAO','90'),('QIAN','88'),('SUN','93');
      INSERT INTO b_2 VALUES ('LI','94'),('ZHOU','85');
      INSERT INTO c_3 VALUES ('WU','79');
      INSERT INTO d_1 VALUES ('ZHENG','87'),('WANG','97');
      INSERT INTO c_5 VALUES ('FENG','83');
      INSERT INTO b_1 VALUES ('CEHN','99');

  2. 准备数据集数据。您可以通过以下方式之一获取数据集:

    1. 您可以将表1数据导入到DLI表中,然后将SQL脚本读取的结果作为数据集。
    2. 您可以将表1数据保存在OBS的CSV文件中,然后通过DLI SQL或DWS SQL创建OBS外表关联这个CSV文件,然后将OBS外表查询的结果作为数据集。DLI创建外表请参见OBS输入流,DWS创建外表请参见创建外表
    3. 您可以将表1数据保存在HDFS的CSV文件中,然后通过HIVE SQL创建Hive外表关联这个CSV文件,然后将HIVE外表查询的结果作为数据集。MRS创建外表请参见创建表
    本例以方式1进行说明,将表1中的数据导入到DLI表(Table_List)中。您可以在DataArts Studio数据开发模块中,新建DLI SQL脚本执行以下SQL命令导入数据,也可以在数据湖探索(DLI)服务控制台中的SQL编辑器中执行以下SQL命令:
    /* 创建数据表TABLE_LIST,然后插入表1数据,最后查看生成的表数据 */
    CREATE TABLE Table_List (Source STRING, Destination STRING) STORED AS PARQUET;
    INSERT INTO Table_List VALUES ('a_new','a'),('b_2','b'),('c_3','c'),('d_1','d'),('c_5','e'),('b_1','f');
    SELECT * FROM Table_List;

    生成的Table_List表数据如下:

    图2 Table_List表数据

  3. 创建要循环运行的子作业ForeachDemo。在本次操作中,定义循环执行的是一个包含了DLI SQL节点的任务。

    1. 进入DataArts Studio数据开发模块选择“作业开发”页面,新建作业ForeachDemo,然后选择DLI SQL节点,编排图3所示的作业。
      DLI SQL的语句中把要替换的变量配成${}这种参数的形式。在下面的SQL语句中,所做的操作是把${Source}表中的数据全部导入${Destination}中,${fromTable}、${toTable} 就是要替换的变量参数。SQL语句为:
      INSERT INTO ${Destination} select * from ${Source};

      此处不能使用EL表达式#{Job.getParam("job_param_name")} ,因为此表达式只能直接获取当前作业里配置的参数的value,并不能获取到父作业传递过来的参数值,也不能获取到工作空间里面配置的全局变量,作用域仅为本作业。

      而表达式${job_param_name},既可以获取到父作业传递过来的参数值,也可以获取到全局配置的变量。

      图3 循环执行子作业
    2. 配置完成SQL语句后,在子作业中配置作业参数。此处仅需要配置参数名,用于主作业ForeachDemo_master中的For Each节点识别子作业参数;参数值无需填写。
      图4 配置子作业参数

    3. 配置完成后保存作业。

  4. 创建For Each节点所在的主作业ForeachDemo_master

    1. 进入DataArts Studio数据开发模块选择“作业开发”页面,新建数据开发主作业ForeachDemo_master。选择DLI SQL节点和For Each节点,选中连线图标并拖动,编排图5所示的作业。
      图5 编排作业
    2. 配置DLI SQL节点属性,此处配置为SQL语句,语句内容如下所示。DLI SQL节点负责读取DLI表Table_List中的内容作为数据集。
      SELECT * FROM Table_List;
      图6 DLI SQL节点配置
    3. 配置For Each节点属性。
      • 子作业:子作业选择步骤2已经开发完成的子作业“ForeachDemo”。
      • 数据集:数据集就是DLI SQL节点的Select语句的执行结果。使用EL表达式#{Job.getNodeOutput('preDLI')},其中preDLI为前一个节点的名称。
      • 子作业参数:用于将数据集中的数据传递到子作业以供使用。Source对应的是数据集Table_List表的第一列,Destination是第二列,所以配置的EL表达式分别为#{Loop.current[0]}#{Loop.current[1]}
      图7 配置For Each节点
    4. 配置完成后保存作业。

  5. 测试运行主作业。

    1. 点击主作业画布上方的“测试运行”按钮,测试作业运行情况。主作业运行后,会通过For Each节点自动调用运行子作业。
    2. 点击左侧导航栏中的“实例监控”,进入实例监控中查看作业运行情况。等待作业运行成功后,就能查看For Each节点生成的子作业实例,由于数据集中有6行数据,所以这里就对应产生了6个子作业实例。
      图8 查看作业实例
    3. 查看对应的6个DLI目的表中是否已被插入预期的数据。您可以在DataArts Studio数据开发模块中,新建DLI SQL脚本执行以下SQL命令导入数据,也可以在数据湖探索(DLI)服务控制台中的SQL编辑器中执行以下SQL命令:
      /* 查看表a数据,其他表数据请修改命令后运行 */
      SELECT * FROM a;

      将查询到的表数据与给源数据表插入数据步骤中的数据进行对比,可以发现数据插入符合预期。

      图9 目的表数据

更多案例参考

For Each节点可与其他节点配合,实现更丰富的功能。您可以参考以下案例,了解For Each节点的更多用法。