Updated on 2025-08-22 GMT+08:00

Flink SQL Row-Level Filtering

This section applies only to MRS 3.3.1 or later.

Scenarios

When using Flink SQL, you can apply row-level filter conditions to authorize user access to specific rows while restricting visibility of others.

Prerequisites

  • Kerberos authentication is enabled for the cluster (the cluster is in security mode). Ranger, Hive, Flink have been installed and are running properly.
  • The user, user group, or role with required permissions have been created, and the user has been added to the hive group.
  • This feature can be used only on the FlinkServer platform.
  • The default dialect is required for Flink SQL.
  • The case of the filter criteria fields must be the same as that of the Flink SQL fields.
  • This feature is not available for Hive view tables.

Configuring Flink SQL Row Filtering

  1. Configuring custom FlinkServer parameters.

    1. Log in to FusionInsight Manager.
    2. Choose Cluster > Services > Flink, click Configurations and then All Configurations, choose FlinkServer(Role) > Customization, and locate flink.customized.configs.
    3. Add custom parameter table.ranger.security.authorization, set it to true, and click Save to save the settings.
    4. Click the Instances tab, select all FlinkServer instances, and choose More > Restart Instance to restart the FlinkServer instances as prompted.

  2. Configure Hive row-level filtering for a specified user. For details, see Hive Row-Level Data Filtering.

    For example, add the filter condition a<>1 to the datagen table in the default database for user test.

  3. Submit a Flink SQL job through FlinkServer as a user with the required permissions. For details, see Creating a Job.

    The following is a SQL example:
    CREATE CATALOG myhive WITH (
      'type' = 'hive',
      'hive-version' = '3.1.0',
      'default-database' = 'default',
      'cluster.name' = 'flink_hive'
    );
    CREATE TABLE print (a INT) WITH ('connector' = 'print');
    CREATE TABLE IF NOT EXISTS myhive.`default`.datagen (a INT) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '1'
    );
    set table.sql-dialect = default;
    insert into
      print
    select
      *
    from
      myhive.`default`.datagen;

    You can view the configured filter conditions on the native Flink job page. In the following example, a<>1 is filter condition.