更新时间:2024-08-03 GMT+08:00

Flink HBase样例程序(Java)

功能介绍

通过调用Flink API读写HBase数据。

代码样例

下面列出WriteHBase和ReadHBase主要逻辑代码作为演示。

完整代码参见com.huawei.bigdata.flink.examples.WriteHBase和com.huawei.bigdata.flink.examples.ReadHBase。

  • WriteHBase主要逻辑代码
    public static void main(String[] args) throws Exception {
            System.out.println("use command as: ");
            System.out.println(
                    "./bin/flink run --class com.huawei.bigdata.flink.examples.WriteHBase"
                            + " /opt/test.jar --tableName t1 --confDir /tmp/hbaseConf");
            System.out.println(
                    "******************************************************************************************");
            System.out.println("<tableName> hbase tableName");
            System.out.println("<confDir> hbase conf dir");
            System.out.println(
                    "******************************************************************************************");
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            ParameterTool paraTool = ParameterTool.fromArgs(args);
            DataStream<Row> messageStream = env.addSource(new SimpleStringGenerator());
            messageStream.addSink(
                    new HBaseWriteSink(paraTool.get("tableName"), createConf(paraTool.get("confDir"))));
            env.execute("WriteHBase");
        }
    
        private static org.apache.hadoop.conf.Configuration createConf(String confDir) {
            LOG.info("Create HBase configuration.");
            org.apache.hadoop.conf.Configuration hbaseConf = HBaseConfigurationUtil.getHBaseConfiguration();
            if (confDir != null) {
                File hbaseSite = new File(confDir + File.separator + "hbase-site.xml");
                if (hbaseSite.exists()) {
                    LOG.info("Add hbase-site.xml");
                    hbaseConf.addResource(new Path(hbaseSite.getPath()));
                }
                File coreSite = new File(confDir + File.separator + "core-site.xml");
                if (coreSite.exists()) {
                    LOG.info("Add core-site.xml");
                    hbaseConf.addResource(new Path(coreSite.getPath()));
                }
                File hdfsSite = new File(confDir + File.separator + "hdfs-site.xml");
                if (hdfsSite.exists()) {
                    LOG.info("Add hdfs-site.xml");
                    hbaseConf.addResource(new Path(hdfsSite.getPath()));
                }
            }
            LOG.info("HBase configuration created successfully.");
            return hbaseConf;
        }
    
        /**
         * @since 8.2.0
         */
        private static class HBaseWriteSink extends RichSinkFunction<Row> {
            private Connection conn;
            private BufferedMutator bufferedMutator;
            private String tableName;
            private final byte[] serializedConfig;
            private Admin admin;
            private org.apache.hadoop.conf.Configuration hbaseConf;
            private long flushTimeIntervalMillis = 5000; //5s
            private long preFlushTime;
    
            public HBaseWriteSink(String sourceTable, org.apache.hadoop.conf.Configuration conf) {
                this.tableName = sourceTable;
                this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(conf);
            }
    
            private void deserializeConfiguration() {
                LOG.info("Deserialize HBase configuration.");
                hbaseConf = HBaseConfigurationUtil.deserializeConfiguration(
                    serializedConfig, HBaseConfigurationUtil.getHBaseConfiguration());
                LOG.info("Deserialization successfully.");
            }
    
            private void createTable() throws IOException {
                LOG.info("Create HBase Table.");
                if (admin.tableExists(TableName.valueOf(tableName))) {
                    LOG.info("Table already exists.");
                    return;
                }
                // Specify the table descriptor.
                TableDescriptorBuilder htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
                // Set the column family name to f1.
                ColumnFamilyDescriptorBuilder hcd = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f1"));
                // Set data encoding methods. HBase provides DIFF,FAST_DIFF,PREFIX
                hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
                // Set compression methods, HBase provides two default compression
                // methods:GZ and SNAPPY
                hcd.setCompressionType(Compression.Algorithm.SNAPPY);
                htd.setColumnFamily(hcd.build());
                try {
                    admin.createTable(htd.build());
                } catch (IOException e) {
                    if (!(e instanceof TableExistsException)
                    || !admin.tableExists(TableName.valueOf(tableName))) {
                        throw e;
                    }
                    LOG.info("Table already exists, ignore.");
                }
                LOG.info("Table created successfully.");
            }
    
            @Override
            public void open(Configuration parameters) throws Exception {
                LOG.info("Write sink open");
                super.open(parameters);
                deserializeConfiguration();
                conn = ConnectionFactory.createConnection(hbaseConf);
                admin = conn.getAdmin();
                createTable();
                bufferedMutator = conn.getBufferedMutator(TableName.valueOf(tableName));
                preFlushTime = System.currentTimeMillis();
            }
    
            @Override
            public void close() throws Exception {
                LOG.info("Close HBase Connection.");
                try {
                    if (admin != null) {
                        admin.close();
                        admin = null;
                    }
                    if (bufferedMutator != null) {
                        bufferedMutator.close();
                        bufferedMutator = null;
                    }
                    if (conn != null) {
                        conn.close();
                        conn = null;
                    }
                } catch (IOException e) {
                    LOG.error("Close HBase Exception:", e);
                    throw new RuntimeException(e);
                }
                LOG.info("Close successfully.");
            }
    
            @Override
            public void invoke(Row value, Context context) throws Exception {
                LOG.info("Write data to HBase.");
                Put put = new Put(Bytes.toBytes(value.getField(0).toString()));
                put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("q1"), (Bytes.toBytes(value.getField(1).toString())));
                bufferedMutator.mutate(put);
    
                if (preFlushTime + flushTimeIntervalMillis >= System.currentTimeMillis()) {
                    LOG.info("Flush data to HBase.");
                    bufferedMutator.flush();
                    preFlushTime = System.currentTimeMillis();
                    LOG.info("Flush successfully.");
                } else {
                    LOG.info("Skip Flush.");
                }
    
                LOG.info("Write successfully.");
            }
        }
    
        /**
         * @since 8.2.0
         */
        public static class SimpleStringGenerator implements SourceFunction<Row> {
            private static final long serialVersionUID = 2174904787118597072L;
            boolean running = true;
            long i = 0;
            Random random = new Random();
    
            @Override
            public void run(SourceContext<Row> ctx) throws Exception {
                while (running) {
                    Row row = new Row(2);
                    row.setField(0, "rk" + random.nextLong());
                    row.setField(1, "v" + random.nextLong());
                    ctx.collect(row);
                    Thread.sleep(1000);
                }
            }
    
            @Override
            public void cancel() {
                running = false;
            }
        }
  • ReadHBase主要逻辑代码
        public static void main(String[] args) throws Exception {
            System.out.println("use command as: ");
            System.out.println(
                    "./bin/flink run --class com.huawei.bigdata.flink.examples.ReadHBase"
                            + " /opt/test.jar --tableName t1 --confDir /tmp/hbaseConf");
    
            System.out.println(
                    "******************************************************************************************");
            System.out.println("<tableName> hbase tableName");
            System.out.println("<confDir> hbase conf dir");
            System.out.println(
                    "******************************************************************************************");
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            ParameterTool paraTool = ParameterTool.fromArgs(args);
            DataStream<Row> messageStream =
                    env.addSource(
                            new HBaseReaderSource(
                                    paraTool.get("tableName"), createConf(paraTool.get("confDir"))));
            messageStream
                    .rebalance()
                    .map(
                            new MapFunction<Row, String>() {
                                @Override
                                public String map(Row s) throws Exception {
                                    return "Flink says " + s + System.getProperty("line.separator");
                                }
                            })
                    .print();
            env.execute("ReadHBase");
        }
    
        private static org.apache.hadoop.conf.Configuration createConf(String confDir) {
            LOG.info("Create HBase configuration.");
            org.apache.hadoop.conf.Configuration hbaseConf = HBaseConfigurationUtil.getHBaseConfiguration();
            if (confDir != null) {
                File hbaseSite = new File(confDir + File.separator + "hbase-site.xml");
                if (hbaseSite.exists()) {
                    LOG.info("Add hbase-site.xml");
                    hbaseConf.addResource(new Path(hbaseSite.getPath()));
                }
                File coreSite = new File(confDir + File.separator + "core-site.xml");
                if (coreSite.exists()) {
                    LOG.info("Add core-site.xml");
                    hbaseConf.addResource(new Path(coreSite.getPath()));
                }
                File hdfsSite = new File(confDir + File.separator + "hdfs-site.xml");
                if (hdfsSite.exists()) {
                    LOG.info("Add hdfs-site.xml");
                    hbaseConf.addResource(new Path(hdfsSite.getPath()));
                }
            }
            LOG.info("HBase configuration created successfully.");
            return hbaseConf;
        }
    
        private static class HBaseReaderSource extends RichSourceFunction<Row> {
    
            private Connection conn;
            private Table table;
            private Scan scan;
            private String tableName;
            private final byte[] serializedConfig;
            private Admin admin;
            private org.apache.hadoop.conf.Configuration hbaseConf;
    
            public HBaseReaderSource(String sourceTable, org.apache.hadoop.conf.Configuration conf) {
                this.tableName = sourceTable;
                this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(conf);
            }
    
            @Override
            public void open(Configuration parameters) throws Exception {
                LOG.info("Read source open");
                super.open(parameters);
                deserializeConfiguration();
                conn = ConnectionFactory.createConnection(hbaseConf);
                admin = conn.getAdmin();
                if (!admin.tableExists(TableName.valueOf(tableName))) {
                    throw new IOException("table does not exist.");
                }
                table = conn.getTable(TableName.valueOf(tableName));
                scan = new Scan();
            }
    
            private void deserializeConfiguration() {
                LOG.info("Deserialize HBase configuration.");
                hbaseConf = HBaseConfigurationUtil.deserializeConfiguration(
                    serializedConfig, HBaseConfigurationUtil.getHBaseConfiguration());
                LOG.info("Deserialization successfully.");
            }
    
            @Override
            public void run(SourceContext<Row> sourceContext) throws Exception {
                LOG.info("Read source run");
                try (ResultScanner scanner = table.getScanner(scan)) {
                    Iterator<Result> iterator = scanner.iterator();
                    while (iterator.hasNext()) {
                        Result result = iterator.next();
                        String rowKey = Bytes.toString(result.getRow());
                        byte[] value = result.getValue(Bytes.toBytes("f1"), Bytes.toBytes("q1"));
                        Row row = new Row(2);
                        row.setField(0, rowKey);
                        row.setField(1, Bytes.toString(value));
                        sourceContext.collect(row);
                        LOG.info("Send data successfully.");
                    }
                }
    
                LOG.info("Read successfully.");
            }
    
            @Override
            public void close() throws Exception {
                closeHBase();
            }
    
            private void closeHBase() {
                LOG.info("Close HBase Connection.");
                try {
                    if (admin != null) {
                        admin.close();
                        admin = null;
                    }
                    if (table != null) {
                        table.close();
                        table = null;
                    }
                    if (conn != null) {
                        conn.close();
                        conn = null;
                    }
                } catch (IOException e) {
                    LOG.error("Close HBase Exception:", e);
                    throw new RuntimeException(e);
                }
                LOG.info("Close successfully.");
            }
    
            @Override
            public void cancel() {
                closeHBase();
            }
        }