更新时间:2024-12-25 GMT+08:00
分享

在DLI使用Delta提交Spark Jar作业

1. 添加如下依赖

<dependency>
    <groupId>io.delta</groupId>
    <artifactId>delta-core_2.12</artifactId>
    <version>2.3.0</version>
</dependency>

2. SparkSession添加下面两个参数

.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

3. 编写代码(可通过sql或者api两种方式实现)。

1) sql开发示例如下,具体sql语法参考“Delta Sql语法参考”。

public static void main( String[] args )
{
    SparkSession spark = SparkSession
            .builder()
            .enableHiveSupport()
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
            .appName("DeltaDemo")
            .getOrCreate();
    String sql_create = "CREATE TABLE if not exists dligms.deltaTest1012 (\n" +
            "  id int,\n" +
            "  name string,\n" +
            "  start_year int,\n" +
            "  class string\n" +
            ") USING DELTA\n" +
            " partitioned by(start_year, class)\n" +
            " location 'obs://bucket_name/path/deltaTest1012'";
    spark.sql(sql_create);
    String sql_insert = "insert into dligms.deltaTest1012 values\n" +
            "(1, 'zhangsan', 2024, 'whlg0905')," +
            "(2, 'lisi', 2024, 'whlg0905')," +
            "(3, 'wangwu', 2024, 'whlg0905')," +
            "(4, 'zhaoliu', 2024, 'whlg0905')";
    spark.sql(sql_insert);
    String sql_select = "select * from dligms.deltaTest1012";
    spark.sql(sql_select).show();
    spark.stop();
}

2) api开发示例如下(java)。

public static void main( String[] args )
{
    SparkSession spark = SparkSession
            .builder()
            .enableHiveSupport()
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
            .appName("DeltaDemo")
            .getOrCreate();
    DeltaTable.createIfNotExists(spark)
            .tableName("deltaJava1011")
            .addColumn("id", "INT")
            .addColumn("name", "STRING")
            .addColumn("start_year", "INT")
            .addColumn("class", "STRING")
            .partitionedBy("start_year", "class")
            .location("obs://bucket_name/path/deltaTest1011")
            .execute();
    Dataset<Row> data = spark.read().format("csv")
            .option("header", "true")
            .option("inferSchema", "true")
            .load("obs://bucket_name/path/export/test1011/");
    data.write().insertInto("deltaJava1011");
    spark.stop();
}

4. 编译打包后参照https://support.huaweicloud.com/devg-dli/dli_09_0205.html创建Spark Jar作业

相关文档