更新时间:2022-05-09 GMT+08:00
分享

java样例代码

开发说明

redis只支持增强型跨源。只能使用包年包月队列。

  • 前提条件

    在DLI管理控制台上已完成创建增强跨源连接,并绑定包年包月队列。具体操作请参考《数据湖探索用户指南》。

  • 代码实现
    1. 导入依赖。
      • 涉及到的mvn依赖库
        1
        2
        3
        4
        5
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.11</artifactId>
          <version>2.3.2</version>
        </dependency>
        
      • import相关依赖包
        1
        2
        3
        4
        5
        6
        7
        8
        import org.apache.spark.SparkConf;
        import org.apache.spark.api.java.JavaRDD;
        import org.apache.spark.api.java.JavaSparkContext;
        import org.apache.spark.sql.*;
        import org.apache.spark.sql.types.DataTypes;
        import org.apache.spark.sql.types.StructField;
        import org.apache.spark.sql.types.StructType;
        import java.util.*;
        
    2. 创建会话
      1
      2
      3
      4
      5
      6
      7
      8
      SparkConf sparkConf = new SparkConf();
      sparkConf.setAppName("datasource-redis")
              .set("spark.redis.host", "192.168.4.199")
              .set("spark.redis.port", "6379")
              .set("spark.redis.auth", "******")
              .set("spark.driver.allowMultipleContexts","true");
      JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
      SQLContext sqlContext = new SQLContext(javaSparkContext);
      
  • 通过DataFrame API 访问
    1. 读取json数据为DataFrame
      1
      2
      3
      4
      JavaRDD<String> javaRDD = javaSparkContext.parallelize(Arrays.asList(
              "{\"id\":\"1\",\"name\":\"zhangsan\",\"age\":\"18\"}",
              "{\"id\":\"2\",\"name\":\"lisi\",\"age\":\"21\"}"));
      Dataset dataFrame = sqlContext.read().json(javaRDD);
      
    2. 构造redis连接配置参数
      1
      2
      3
      Map map = new HashMap<String, String>();
      map.put("table","person");
      map.put("key.column","id");
      
    3. 保存数据到redis
      1
      dataFrame.write().format("redis").options(map).mode(SaveMode.Overwrite).save();
      
    4. 读取redis中数据
      1
      sqlContext.read().format("redis").options(map).load().show();
      
    5. 操作结果

  • 提交Spark作业
    1. 将写好的java代码文件上传至DLI中。控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。
    2. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《创建批处理作业》。
      • 提交作业时,需要指定Module模块,名称为:sys.datasource.redis。
      • 通过控制台提交作业请参考《数据湖探索用户指南》中的“表6-选择依赖资源参数说明”。
      • 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请求参数说明”关于“modules”参数的说明。

完整示例代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Test_Redis_DaraFrame {
  public static void main(String[] args) {
    //create a SparkSession session  
    SparkConf sparkConf = new SparkConf();  
    sparkConf.setAppName("datasource-redis")
             .set("spark.redis.host", "192.168.4.199")
             .set("spark.redis.port", "6379")
             .set("spark.redis.auth", "******")
             .set("spark.driver.allowMultipleContexts","true");
    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
    SQLContext sqlContext = new SQLContext(javaSparkContext);

    //Read RDD in JSON format to create DataFrame
    JavaRDD<String> javaRDD = javaSparkContext.parallelize(Arrays.asList(
            "{\"id\":\"1\",\"name\":\"zhangsan\",\"age\":\"18\"}",
            "{\"id\":\"2\",\"name\":\"lisi\",\"age\":\"21\"}"));
    Dataset dataFrame = sqlContext.read().json(javaRDD);

    Map map = new HashMap<String, String>();
    map.put("table","person");
    map.put("key.column","id");
    dataFrame.write().format("redis").options(map).mode(SaveMode.Overwrite).save();      
    sqlContext.read().format("redis").options(map).load().show();

  }
}
分享:

    相关文档

    相关产品

close