文档首页/
MapReduce服务 MRS/
开发指南(LTS版)/
Flink开发指南(普通模式)/
开发Flink应用/
Flink读取HBase表样例程序/
Flink HBase样例程序(Java)
更新时间:2024-06-05 GMT+08:00
Flink HBase样例程序(Java)
功能介绍
通过调用Flink API读写HBase数据。
代码样例
下面列出WriteHBase和ReadHBase主要逻辑代码作为演示。
完整代码参见com.huawei.bigdata.flink.examples.WriteHBase和com.huawei.bigdata.flink.examples.ReadHBase。
- WriteHBase主要逻辑代码
public static void main(String[] args) throws Exception { System.out.println("use command as: "); System.out.println( "./bin/flink run --class com.huawei.bigdata.flink.examples.WriteHBase" + " /opt/test.jar --tableName t1 --confDir /tmp/hbaseConf"); System.out.println( "******************************************************************************************"); System.out.println("<tableName> hbase tableName"); System.out.println("<confDir> hbase conf dir"); System.out.println( "******************************************************************************************"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); ParameterTool paraTool = ParameterTool.fromArgs(args); DataStream<Row> messageStream = env.addSource(new SimpleStringGenerator()); messageStream.addSink( new HBaseWriteSink(paraTool.get("tableName"), createConf(paraTool.get("confDir")))); env.execute("WriteHBase"); } private static org.apache.hadoop.conf.Configuration createConf(String confDir) { LOG.info("Create HBase configuration."); org.apache.hadoop.conf.Configuration hbaseConf = HBaseConfigurationUtil.getHBaseConfiguration(); if (confDir != null) { File hbaseSite = new File(confDir + File.separator + "hbase-site.xml"); if (hbaseSite.exists()) { LOG.info("Add hbase-site.xml"); hbaseConf.addResource(new Path(hbaseSite.getPath())); } File coreSite = new File(confDir + File.separator + "core-site.xml"); if (coreSite.exists()) { LOG.info("Add core-site.xml"); hbaseConf.addResource(new Path(coreSite.getPath())); } File hdfsSite = new File(confDir + File.separator + "hdfs-site.xml"); if (hdfsSite.exists()) { LOG.info("Add hdfs-site.xml"); hbaseConf.addResource(new Path(hdfsSite.getPath())); } } LOG.info("HBase configuration created successfully."); return hbaseConf; } /** * @since 8.2.0 */ private static class HBaseWriteSink extends RichSinkFunction<Row> { private Connection conn; private BufferedMutator bufferedMutator; private String tableName; private final byte[] serializedConfig; private Admin admin; private org.apache.hadoop.conf.Configuration hbaseConf; private long flushTimeIntervalMillis = 5000; //5s private long preFlushTime; public HBaseWriteSink(String sourceTable, org.apache.hadoop.conf.Configuration conf) { this.tableName = sourceTable; this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(conf); } private void deserializeConfiguration() { LOG.info("Deserialize HBase configuration."); hbaseConf = HBaseConfigurationUtil.deserializeConfiguration( serializedConfig, HBaseConfigurationUtil.getHBaseConfiguration()); LOG.info("Deserialization successfully."); } private void createTable() throws IOException { LOG.info("Create HBase Table."); if (admin.tableExists(TableName.valueOf(tableName))) { LOG.info("Table already exists."); return; } // Specify the table descriptor. TableDescriptorBuilder htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)); // Set the column family name to f1. ColumnFamilyDescriptorBuilder hcd = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f1")); // Set data encoding methods. HBase provides DIFF,FAST_DIFF,PREFIX hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF); // Set compression methods, HBase provides two default compression // methods:GZ and SNAPPY hcd.setCompressionType(Compression.Algorithm.SNAPPY); htd.setColumnFamily(hcd.build()); try { admin.createTable(htd.build()); } catch (IOException e) { if (!(e instanceof TableExistsException) || !admin.tableExists(TableName.valueOf(tableName))) { throw e; } LOG.info("Table already exists, ignore."); } LOG.info("Table created successfully."); } @Override public void open(Configuration parameters) throws Exception { LOG.info("Write sink open"); super.open(parameters); deserializeConfiguration(); conn = ConnectionFactory.createConnection(hbaseConf); admin = conn.getAdmin(); createTable(); bufferedMutator = conn.getBufferedMutator(TableName.valueOf(tableName)); preFlushTime = System.currentTimeMillis(); } @Override public void close() throws Exception { LOG.info("Close HBase Connection."); try { if (admin != null) { admin.close(); admin = null; } if (bufferedMutator != null) { bufferedMutator.close(); bufferedMutator = null; } if (conn != null) { conn.close(); conn = null; } } catch (IOException e) { LOG.error("Close HBase Exception:", e); throw new RuntimeException(e); } LOG.info("Close successfully."); } @Override public void invoke(Row value, Context context) throws Exception { LOG.info("Write data to HBase."); Put put = new Put(Bytes.toBytes(value.getField(0).toString())); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("q1"), (Bytes.toBytes(value.getField(1).toString()))); bufferedMutator.mutate(put); if (preFlushTime + flushTimeIntervalMillis >= System.currentTimeMillis()) { LOG.info("Flush data to HBase."); bufferedMutator.flush(); preFlushTime = System.currentTimeMillis(); LOG.info("Flush successfully."); } else { LOG.info("Skip Flush."); } LOG.info("Write successfully."); } } /** * @since 8.2.0 */ public static class SimpleStringGenerator implements SourceFunction<Row> { private static final long serialVersionUID = 2174904787118597072L; boolean running = true; long i = 0; Random random = new Random(); @Override public void run(SourceContext<Row> ctx) throws Exception { while (running) { Row row = new Row(2); row.setField(0, "rk" + random.nextLong()); row.setField(1, "v" + random.nextLong()); ctx.collect(row); Thread.sleep(1000); } } @Override public void cancel() { running = false; } }
- ReadHBase主要逻辑代码
public static void main(String[] args) throws Exception { System.out.println("use command as: "); System.out.println( "./bin/flink run --class com.huawei.bigdata.flink.examples.ReadHBase" + " /opt/test.jar --tableName t1 --confDir /tmp/hbaseConf"); System.out.println( "******************************************************************************************"); System.out.println("<tableName> hbase tableName"); System.out.println("<confDir> hbase conf dir"); System.out.println( "******************************************************************************************"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); ParameterTool paraTool = ParameterTool.fromArgs(args); DataStream<Row> messageStream = env.addSource( new HBaseReaderSource( paraTool.get("tableName"), createConf(paraTool.get("confDir")))); messageStream .rebalance() .map( new MapFunction<Row, String>() { @Override public String map(Row s) throws Exception { return "Flink says " + s + System.getProperty("line.separator"); } }) .print(); env.execute("ReadHBase"); } private static org.apache.hadoop.conf.Configuration createConf(String confDir) { LOG.info("Create HBase configuration."); org.apache.hadoop.conf.Configuration hbaseConf = HBaseConfigurationUtil.getHBaseConfiguration(); if (confDir != null) { File hbaseSite = new File(confDir + File.separator + "hbase-site.xml"); if (hbaseSite.exists()) { LOG.info("Add hbase-site.xml"); hbaseConf.addResource(new Path(hbaseSite.getPath())); } File coreSite = new File(confDir + File.separator + "core-site.xml"); if (coreSite.exists()) { LOG.info("Add core-site.xml"); hbaseConf.addResource(new Path(coreSite.getPath())); } File hdfsSite = new File(confDir + File.separator + "hdfs-site.xml"); if (hdfsSite.exists()) { LOG.info("Add hdfs-site.xml"); hbaseConf.addResource(new Path(hdfsSite.getPath())); } } LOG.info("HBase configuration created successfully."); return hbaseConf; } private static class HBaseReaderSource extends RichSourceFunction<Row> { private Connection conn; private Table table; private Scan scan; private String tableName; private final byte[] serializedConfig; private Admin admin; private org.apache.hadoop.conf.Configuration hbaseConf; public HBaseReaderSource(String sourceTable, org.apache.hadoop.conf.Configuration conf) { this.tableName = sourceTable; this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(conf); } @Override public void open(Configuration parameters) throws Exception { LOG.info("Read source open"); super.open(parameters); deserializeConfiguration(); conn = ConnectionFactory.createConnection(hbaseConf); admin = conn.getAdmin(); if (!admin.tableExists(TableName.valueOf(tableName))) { throw new IOException("table does not exist."); } table = conn.getTable(TableName.valueOf(tableName)); scan = new Scan(); } private void deserializeConfiguration() { LOG.info("Deserialize HBase configuration."); hbaseConf = HBaseConfigurationUtil.deserializeConfiguration( serializedConfig, HBaseConfigurationUtil.getHBaseConfiguration()); LOG.info("Deserialization successfully."); } @Override public void run(SourceContext<Row> sourceContext) throws Exception { LOG.info("Read source run"); try (ResultScanner scanner = table.getScanner(scan)) { Iterator<Result> iterator = scanner.iterator(); while (iterator.hasNext()) { Result result = iterator.next(); String rowKey = Bytes.toString(result.getRow()); byte[] value = result.getValue(Bytes.toBytes("f1"), Bytes.toBytes("q1")); Row row = new Row(2); row.setField(0, rowKey); row.setField(1, Bytes.toString(value)); sourceContext.collect(row); LOG.info("Send data successfully."); } } LOG.info("Read successfully."); } @Override public void close() throws Exception { closeHBase(); } private void closeHBase() { LOG.info("Close HBase Connection."); try { if (admin != null) { admin.close(); admin = null; } if (table != null) { table.close(); table = null; } if (conn != null) { conn.close(); conn = null; } } catch (IOException e) { LOG.error("Close HBase Exception:", e); throw new RuntimeException(e); } LOG.info("Close successfully."); } @Override public void cancel() { closeHBase(); } }
父主题: Flink读取HBase表样例程序