How Do I Create an OBS Partitioned Table for a Flink SQL Job?
Scenario
When using a Flink SQL job, you need to create an OBS partition table for subsequent batch processing.
Procedure
In the following example, the day field is used as the partition field with the parquet encoding format to dump car_info data to OBS. For more information, see File System Sink Stream (Recommended).
1 2 3 4 5 6 7 8 9 10 11 12 13 |
create sink stream car_infos ( carId string, carOwner string, average_speed double, day string ) partitioned by (day) with ( type = "filesystem", file.path = "obs://obs-sink/car_infos", encode = "parquet", ak = "{{myAk}}", sk = "{{mySk}}" ); |
Structure of the data storage directory in OBS: obs://obs-sink/car_infos/day=xx/part-x-x.
After the data is generated, the OBS partition table can be established for subsequent batch processing through the following SQL statements:
- Create an OBS partitioned table.
1 2 3 4 5 6 7 8
create table car_infos ( carId string, carOwner string, average_speed double ) partitioned by (day string) stored as parquet location 'obs://obs-sink/car-infos';
- Restore partition information from the associated OBS path.
1
alter table car_infos recover partitions;
Flink SQL FAQs
- How Much Data Can Be Processed in a Day by a Flink SQL Job?
- Does Data in the Temporary Stream of Flink SQL Need to Be Cleared Periodically? How Do I Clear the Data?
- Why Is a Message Displayed Indicating That the OBS Bucket Is Not Authorized When I Select an OBS Bucket for a Flink SQL Job?
- How Do I Create an OBS Partitioned Table for a Flink SQL Job?
- How Do I Change the Number of Kafka Partitions of a Flink SQL Job Without Stopping It?
- How Do I Dump Data to OBS and Create an OBS Partitioned Table?
- Why Is Error Message "DLI.0005" Displayed When I Use an EL Expression to Create a Table in a Flink SQL Job?
- Why Is No Data Queried in the DLI Table Created Using the OBS File Path When Data Is Written to OBS by a Flink Job Output Stream?
- Why Does a Flink SQL Job Fails to Be Executed, and Is "connect to DIS failed java.lang.IllegalArgumentException: Access key cannot be null" Displayed in the Log?
- Why Is Error "Not authorized" Reported When a Flink SQL Job Reads DIS Data?
- Data Writing Fails After a Flink SQL Job Consumed Kafka and Sank Data to the Elasticsearch Cluster
- How Does Flink Opensource SQL Parse Nested JSON?
- Why Is the RDS Database Time Read by a Flink Opensource SQL Job Different from RDS Database Time?
- What Are the Syntax Differences Between Flink SQL and Flink Opensource SQL?
- Why Does Job Submission Fail When the failure-handler Parameter of the Elasticsearch Result Table for a Flink Opensource SQL Job Is Set to retry_rejected?
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbotmore