Updated on 2024-04-19 GMT+08:00

Orc

Function

The Apache Orc format allows to read and write Orc data. For details, see Orc Format.

Supported Connectors

  • FileSystem

Parameter Description

Table 1 Parameters

Parameter

Mandatory

Default Value

Data Type

Description

format

Yes

None

String

Specify what format to use, here should be orc.

Orc format also supports table properties from Table properties. For example, you can configure orc.compress=SNAPPY to enable snappy compression.

Data Type Mapping

Orc format type mapping is compatible with Apache Hive. The following table lists the type mapping from Flink type to Orc type.

Table 2 Data type mapping

Flink SQL Type

Orc Physical Type

Orc Logical Type

CHAR

bytes

CHAR

VARCHAR

bytes

VARCHAR

STRING

bytes

STRING

BOOLEAN

long

BOOLEAN

BYTES

bytes

BINARY

DECIMAL

decimal

DECIMAL

TINYINT

long

BYTE

SMALLINT

long

SHORT

INT

long

INT

BIGINT

long

LONG

FLOAT

double

FLOAT

DOUBLE

double

DOUBLE

DATE

long

DATE

TIMESTAMP

timestamp

TIMESTAMP

ARRAY

-

LIST

MAP

-

MAP

ROW

-

STRUCT

Example

Use Kafka to send data and output the data to Print.

  1. Create a datasource connection for the communication with the VPC and subnet where Kafka locates and bind the connection to the queue. Set a security group and inbound rule to allow access of the queue and test the connectivity of the queue using the Kafka IP address. For example, locate a general-purpose queue where the job runs and choose More > Test Address Connectivity in the Operation column. If the connection is successful, the datasource is bound to the queue. Otherwise, the binding fails.
  2. Create a Flink OpenSource SQL job and enable checkpointing. Copy the following statement and submit the job:

    CREATE TABLE kafkaSource (
      order_id string,
      order_channel string,
      order_time string, 
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic-pattern' = kafkaTopic',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    
    
    CREATE TABLE sink (
      order_id string,
      order_channel string,
      order_time string, 
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'filesystem',
      'format' = 'orc',
      'path' = 'obs://xx'
    );
    insert into sink select * from kafkaSource;    

  3. Insert the following data into the source Kafka topic:

    202103251505050001,appshop,2021-03-25 15:05:05,500.00,400.00,2021-03-25 15:10:00,0003,Cindy,330108
    
    202103241606060001,appShop,2021-03-24 16:06:06,200.00,180.00,2021-03-24 16:10:06,0001,Alice,330106

  4. Read the Parquet file in the OBS path configured in the sink table. The data results are as follows:

    202103251202020001, miniAppShop, 2021-03-25 12:02:02, 60.0, 60.0, 2021-03-25 12:03:00, 0002, Bob, 330110
    
    202103241606060001, appShop, 2021-03-24 16:06:06, 200.0, 180.0, 2021-03-24 16:10:06, 0001, Alice, 330106