Flink HBase Sample Program (Java)
Description
Call Flink APIs to read data from and write data to HBase.
Sample Code
The following example shows the main logic code of WriteHBase and ReadHBase.
For details about the complete code, see com.huawei.bigdata.flink.examples.WriteHBase and com.huawei.bigdata.flink.examples.ReadHBase.
- Main logic code of WriteHBase
public static void main(String[] args) throws Exception { System.out.println("use command as: "); System.out.println( "./bin/flink run --class com.huawei.bigdata.flink.examples.WriteHBase" + " /opt/test.jar --tableName t1 --confDir /tmp/hbaseConf"); System.out.println( "******************************************************************************************"); System.out.println("<tableName> hbase tableName"); System.out.println("<confDir> hbase conf dir"); System.out.println( "******************************************************************************************"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); ParameterTool paraTool = ParameterTool.fromArgs(args); DataStream<Row> messageStream = env.addSource(new SimpleStringGenerator()); messageStream.addSink( new HBaseWriteSink(paraTool.get("tableName"), createConf(paraTool.get("confDir")))); env.execute("WriteHBase"); } private static org.apache.hadoop.conf.Configuration createConf(String confDir) { LOG.info("Create HBase configuration."); org.apache.hadoop.conf.Configuration hbaseConf = HBaseConfigurationUtil.getHBaseConfiguration(); if (confDir != null) { File hbaseSite = new File(confDir + File.separator + "hbase-site.xml"); if (hbaseSite.exists()) { LOG.info("Add hbase-site.xml"); hbaseConf.addResource(new Path(hbaseSite.getPath())); } File coreSite = new File(confDir + File.separator + "core-site.xml"); if (coreSite.exists()) { LOG.info("Add core-site.xml"); hbaseConf.addResource(new Path(coreSite.getPath())); } File hdfsSite = new File(confDir + File.separator + "hdfs-site.xml"); if (hdfsSite.exists()) { LOG.info("Add hdfs-site.xml"); hbaseConf.addResource(new Path(hdfsSite.getPath())); } } LOG.info("HBase configuration created successfully."); return hbaseConf; } /** * @since 8.2.0 */ private static class HBaseWriteSink extends RichSinkFunction<Row> { private Connection conn; private BufferedMutator bufferedMutator; private String tableName; private final byte[] serializedConfig; private Admin admin; private org.apache.hadoop.conf.Configuration hbaseConf; private long flushTimeIntervalMillis = 5000; //5s private long preFlushTime; public HBaseWriteSink(String sourceTable, org.apache.hadoop.conf.Configuration conf) { this.tableName = sourceTable; this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(conf); } private void deserializeConfiguration() { LOG.info("Deserialize HBase configuration."); hbaseConf = HBaseConfigurationUtil.deserializeConfiguration( serializedConfig, HBaseConfigurationUtil.getHBaseConfiguration()); LOG.info("Deserialization successfully."); } private void createTable() throws IOException { LOG.info("Create HBase Table."); if (admin.tableExists(TableName.valueOf(tableName))) { LOG.info("Table already exists."); return; } // Specify the table descriptor. TableDescriptorBuilder htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)); // Set the column family name to f1. ColumnFamilyDescriptorBuilder hcd = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f1")); // Set data encoding methods. HBase provides DIFF,FAST_DIFF,PREFIX hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF); // Set compression methods, HBase provides two default compression // methods:GZ and SNAPPY hcd.setCompressionType(Compression.Algorithm.SNAPPY); htd.setColumnFamily(hcd.build()); try { admin.createTable(htd.build()); } catch (IOException e) { if (!(e instanceof TableExistsException) || !admin.tableExists(TableName.valueOf(tableName))) { throw e; } LOG.info("Table already exists, ignore."); } LOG.info("Table created successfully."); } @Override public void open(Configuration parameters) throws Exception { LOG.info("Write sink open"); super.open(parameters); deserializeConfiguration(); conn = ConnectionFactory.createConnection(hbaseConf); admin = conn.getAdmin(); createTable(); bufferedMutator = conn.getBufferedMutator(TableName.valueOf(tableName)); preFlushTime = System.currentTimeMillis(); } @Override public void close() throws Exception { LOG.info("Close HBase Connection."); try { if (admin != null) { admin.close(); admin = null; } if (bufferedMutator != null) { bufferedMutator.close(); bufferedMutator = null; } if (conn != null) { conn.close(); conn = null; } } catch (IOException e) { LOG.error("Close HBase Exception:", e); throw new RuntimeException(e); } LOG.info("Close successfully."); } @Override public void invoke(Row value, Context context) throws Exception { LOG.info("Write data to HBase."); Put put = new Put(Bytes.toBytes(value.getField(0).toString())); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("q1"), (Bytes.toBytes(value.getField(1).toString()))); bufferedMutator.mutate(put); if (preFlushTime + flushTimeIntervalMillis >= System.currentTimeMillis()) { LOG.info("Flush data to HBase."); bufferedMutator.flush(); preFlushTime = System.currentTimeMillis(); LOG.info("Flush successfully."); } else { LOG.info("Skip Flush."); } LOG.info("Write successfully."); } } /** * @since 8.2.0 */ public static class SimpleStringGenerator implements SourceFunction<Row> { private static final long serialVersionUID = 2174904787118597072L; boolean running = true; long i = 0; Random random = new Random(); @Override public void run(SourceContext<Row> ctx) throws Exception { while (running) { Row row = new Row(2); row.setField(0, "rk" + random.nextLong()); row.setField(1, "v" + random.nextLong()); ctx.collect(row); Thread.sleep(1000); } } @Override public void cancel() { running = false; } }
- Main logic code of ReadHBase
public static void main(String[] args) throws Exception { System.out.println("use command as: "); System.out.println( "./bin/flink run --class com.huawei.bigdata.flink.examples.ReadHBase" + " /opt/test.jar --tableName t1 --confDir /tmp/hbaseConf"); System.out.println( "******************************************************************************************"); System.out.println("<tableName> hbase tableName"); System.out.println("<confDir> hbase conf dir"); System.out.println( "******************************************************************************************"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); ParameterTool paraTool = ParameterTool.fromArgs(args); DataStream<Row> messageStream = env.addSource( new HBaseReaderSource( paraTool.get("tableName"), createConf(paraTool.get("confDir")))); messageStream .rebalance() .map( new MapFunction<Row, String>() { @Override public String map(Row s) throws Exception { return "Flink says " + s + System.getProperty("line.separator"); } }) .print(); env.execute("ReadHBase"); } private static org.apache.hadoop.conf.Configuration createConf(String confDir) { LOG.info("Create HBase configuration."); org.apache.hadoop.conf.Configuration hbaseConf = HBaseConfigurationUtil.getHBaseConfiguration(); if (confDir != null) { File hbaseSite = new File(confDir + File.separator + "hbase-site.xml"); if (hbaseSite.exists()) { LOG.info("Add hbase-site.xml"); hbaseConf.addResource(new Path(hbaseSite.getPath())); } File coreSite = new File(confDir + File.separator + "core-site.xml"); if (coreSite.exists()) { LOG.info("Add core-site.xml"); hbaseConf.addResource(new Path(coreSite.getPath())); } File hdfsSite = new File(confDir + File.separator + "hdfs-site.xml"); if (hdfsSite.exists()) { LOG.info("Add hdfs-site.xml"); hbaseConf.addResource(new Path(hdfsSite.getPath())); } } LOG.info("HBase configuration created successfully."); return hbaseConf; } private static class HBaseReaderSource extends RichSourceFunction<Row> { private Connection conn; private Table table; private Scan scan; private String tableName; private final byte[] serializedConfig; private Admin admin; private org.apache.hadoop.conf.Configuration hbaseConf; public HBaseReaderSource(String sourceTable, org.apache.hadoop.conf.Configuration conf) { this.tableName = sourceTable; this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(conf); } @Override public void open(Configuration parameters) throws Exception { LOG.info("Read source open"); super.open(parameters); deserializeConfiguration(); conn = ConnectionFactory.createConnection(hbaseConf); admin = conn.getAdmin(); if (!admin.tableExists(TableName.valueOf(tableName))) { throw new IOException("table does not exist."); } table = conn.getTable(TableName.valueOf(tableName)); scan = new Scan(); } private void deserializeConfiguration() { LOG.info("Deserialize HBase configuration."); hbaseConf = HBaseConfigurationUtil.deserializeConfiguration( serializedConfig, HBaseConfigurationUtil.getHBaseConfiguration()); LOG.info("Deserialization successfully."); } @Override public void run(SourceContext<Row> sourceContext) throws Exception { LOG.info("Read source run"); try (ResultScanner scanner = table.getScanner(scan)) { Iterator<Result> iterator = scanner.iterator(); while (iterator.hasNext()) { Result result = iterator.next(); String rowKey = Bytes.toString(result.getRow()); byte[] value = result.getValue(Bytes.toBytes("f1"), Bytes.toBytes("q1")); Row row = new Row(2); row.setField(0, rowKey); row.setField(1, Bytes.toString(value)); sourceContext.collect(row); LOG.info("Send data successfully."); } } LOG.info("Read successfully."); } @Override public void close() throws Exception { closeHBase(); } private void closeHBase() { LOG.info("Close HBase Connection."); try { if (admin != null) { admin.close(); admin = null; } if (table != null) { table.close(); table = null; } if (conn != null) { conn.close(); conn = null; } } catch (IOException e) { LOG.error("Close HBase Exception:", e); throw new RuntimeException(e); } LOG.info("Close successfully."); } @Override public void cancel() { closeHBase(); } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot