在DLI使用Delta提交Spark Jar作业
1. 添加如下依赖
<dependency> <groupId>io.delta</groupId> <artifactId>delta-core_2.12</artifactId> <version>2.3.0</version> </dependency>
2. SparkSession添加下面两个参数
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
3. 编写代码(可通过sql或者api两种方式实现)。
1) sql开发示例如下,具体sql语法参考“Delta Sql语法参考”。
public static void main( String[] args ) { SparkSession spark = SparkSession .builder() .enableHiveSupport() .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .appName("DeltaDemo") .getOrCreate(); String sql_create = "CREATE TABLE if not exists dligms.deltaTest1012 (\n" + " id int,\n" + " name string,\n" + " start_year int,\n" + " class string\n" + ") USING DELTA\n" + " partitioned by(start_year, class)\n" + " location 'obs://bucket_name/path/deltaTest1012'"; spark.sql(sql_create); String sql_insert = "insert into dligms.deltaTest1012 values\n" + "(1, 'zhangsan', 2024, 'whlg0905')," + "(2, 'lisi', 2024, 'whlg0905')," + "(3, 'wangwu', 2024, 'whlg0905')," + "(4, 'zhaoliu', 2024, 'whlg0905')"; spark.sql(sql_insert); String sql_select = "select * from dligms.deltaTest1012"; spark.sql(sql_select).show(); spark.stop(); }
2) api开发示例如下(java)。
public static void main( String[] args ) { SparkSession spark = SparkSession .builder() .enableHiveSupport() .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .appName("DeltaDemo") .getOrCreate(); DeltaTable.createIfNotExists(spark) .tableName("deltaJava1011") .addColumn("id", "INT") .addColumn("name", "STRING") .addColumn("start_year", "INT") .addColumn("class", "STRING") .partitionedBy("start_year", "class") .location("obs://bucket_name/path/deltaTest1011") .execute(); Dataset<Row> data = spark.read().format("csv") .option("header", "true") .option("inferSchema", "true") .load("obs://bucket_name/path/export/test1011/"); data.write().insertInto("deltaJava1011"); spark.stop(); }
4. 编译打包后参照https://support.huaweicloud.com/devg-dli/dli_09_0205.html创建Spark Jar作业