更新时间:2024-10-16 GMT+08:00
分享

应用端加工RoaringBitmap结果集并入库GaussDB(DWS)开发示例

GaussDB(DWS) 自8.1.3版本后支持位图功能(RoaringBitmap),在使用JAVA语言基于GaussDB(DWS)进行二次开发时,可以使用CopyManager接口,实现小批量RoaringBitmap的数据入库至GaussDB(DWS)。

如针对大批量数据入库,需要在应用侧增加算力进行优化,否则会影响入库性能。

加工RoaringBitmap

  1. 访问Maven下载开源的RoaringBitmap的jar包,推荐下载0.9.15版本。

    POM文件依赖项配置如下:
    1
    2
    3
    4
    5
    6
    7
    <dependencies>
     <dependency>
     <groupId>org.roaringbitmap</groupId>
     <artifactId>RoaringBitmap</artifactId>
     <version>0.9.15</version>
     </dependency>
     </dependencies>
    

  2. 调用jar包实现Roaringbitmap的转换。

    大致过程是声明一个RoaringBitmap,调用add方法,将要转化的int转化成Roaringbitmap类型,再对数据进行序列化。代码示例如下:

    1
    2
    3
    4
    5
    6
    7
    RoaringBitmap rr2 = new RoaringBitmap ();
    for (int i = 1; i < 10000000;  i++)  {
          rr2.add(i);
    }
    ByteArrayOutputStream a = new ByteArrayOutputStream();
    DataOutputStream b = new DataOutputStream(a);
    rr2.serialize(b);
    

数据入库

调用CopyManager进行入库,实现数据不落地,小批量RoaringBitmap入库。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
//以下用例以gsjdbc4.jar为例,如果要使用gsjdbc200.jar,请替换驱动类名(将代码中的“org.postgresql”替换成“com.huawei.gauss200.jdbc”)与连接URL串前缀(将“jdbc:postgresql”替换为“jdbc:gaussdb”)。

package rb_demo;

import org.postgresql.copy.CopyManager;
import org.postgresql.core.BaseConnection;
import org.roaringbitmap.RoaringBitmap;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public class rb_demo {

	private static String hexStr =  "0123456789ABCDEF";
	
	public static String bytesToHex(byte[] bytes) {
		StringBuffer sb = new StringBuffer();
		for (int i = 0; i < bytes.length; i++) {
			String hex = Integer.toHexString(bytes[i] & 0xFF);
			if (hex.length() < 2) {
				sb.append(0);
			}
			sb.append(hex);
		}
		return sb.toString();
	}

	public static Connection GetConnection(String username, String passwd) {
		String driver = "org.postgresql.Driver";
		String sourceURL = "jdbc:postgresql://10.185.180.161:8000/gaussdb";  //数据库URL
		Connection conn = null;
		try {
			// 加载数据库驱动。
			Class.forName(driver).newInstance();
		} catch (Exception e) {
			e.printStackTrace();
			return null;
		}

		try {
			// 创建数据库连接。
			conn = DriverManager.getConnection(sourceURL, username, passwd);
			System.out.println("Connection succeed!");
		} catch (Exception e) {
			e.printStackTrace();
			return null;
		}

		return conn;
	}

	public static void main(String[] args) throws IOException {

		RoaringBitmap rr2 = new RoaringBitmap();

		for (int i = 1; i < 10000000; i++) {
			rr2.add(i);
		}
		
		ByteArrayOutputStream a = new ByteArrayOutputStream();
		
		DataOutputStream b = new DataOutputStream(a);
		rr2.serialize(b);

		Connection conn = GetConnection("test", "Gauss_234");  //数据库用户名、密码
		Statement pstmt = null;
		try {
			conn.setAutoCommit(true);
			pstmt = conn.createStatement();

			pstmt.execute("drop table if exists t_rb");
			pstmt.execute("create table t_rb(c1 int, c2 roaringbitmap) distribute by hash (c1);");

			StringReader sr = null;
			CopyManager cm = null;
			cm = new CopyManager((BaseConnection) conn);

			String delimiter = "|";
			StringBuffer tuples = new StringBuffer();
			tuples.append("1" + delimiter + "\\x" + bytesToHex(a.toByteArray()));


			StringBuffer sb = new StringBuffer();
			sb.append(tuples.toString());

			sr = new StringReader(tuples.toString());
			String sql = "copy t_rb from STDIN with (delimiter '|', NOESCAPING)";
			
			long rows = cm.copyIn(sql, sr);// 执行copy入库

			pstmt.close();
		} catch (SQLException e) {
			if (pstmt != null) {
				try {
					pstmt.close();
				} catch (SQLException e1) {
					e1.printStackTrace();
				}
			}
			e.printStackTrace();
		}
	}
}

相关文档