博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
HBase读写的几种方式(三)flink篇
阅读量:4089 次
发布时间:2019-05-25

本文共 28765 字,大约阅读时间需要 95 分钟。

1. HBase连接的方式概况

主要分为:

  1. 纯Java API读写HBase的方式;
  2. Spark读写HBase的方式;
  3. Flink读写HBase的方式;
  4. HBase通过Phoenix读写的方式;

第一种方式是HBase自身提供的比较原始的高效操作方式,而第二、第三则分别是Spark、Flink集成HBase的方式,最后一种是第三方插件Phoenix集成的JDBC方式,Phoenix集成的JDBC操作方式也能在Spark、Flink中调用。

注意:

这里我们使用HBase2.1.2版本,flink1.7.2版本,scala-2.12版本。

2. Flink Streaming和Flink DataSet读写HBase

 Flink上读取HBase数据有两种方式:

  • 继承RichSourceFunction重写父类方法(flink streaming)
  • 实现自定义TableInputFormat接口(flink streaming和flink dataSet)

Flink上将数据写入HBase也有两种方式:

  • 继承RichSinkFunction重写父类方法(flink streaming)
  • 实现OutputFormat接口(flink streaming和flink dataSet)

注意:

① Flink Streaming流式处理有上述两种方式;但是Flink DataSet批处理,读只有“实现TableInputFormat接口”一种方式,写只有”实现OutputFormat接口“一种方式

②TableInputFormat接口是在flink-hbase-2.12-1.7.2里面的,而该jar包对应的hbase版本是1.4.3,而项目中我们使用HBase2.1.2版本,故需要对TableInputFormat重写。

   

2.1 Flink读取HBase的两种方式

注意:读取HBase之前可以先执行节点2.2.2实现OutputFormat接口:Flink dataSet 批处理写入HBase的方法,确保HBase test表里面有数据,数据如下:

  

2.1.1 继承RichSourceFunction重写父类方法:

package cn.swordfall.hbaseOnFlinkimport org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.functions.source.RichSourceFunctionimport org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContextimport org.apache.hadoop.hbase.{Cell, HBaseConfiguration, HConstants, TableName}import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Scan, Table}import org.apache.hadoop.hbase.util.Bytesimport scala.collection.JavaConverters._/**  * @Author: Yang JianQiu  * @Date: 2019/2/28 18:05  *  * 以HBase为数据源  * 从HBase中获取数据,然后以流的形式发射  *  * 从HBase读取数据  * 第一种:继承RichSourceFunction重写父类方法  */class HBaseReader extends RichSourceFunction[(String, String)]{  private var conn: Connection = null  private var table: Table = null  private var scan: Scan = null  /**    * 在open方法使用HBase的客户端连接    * @param parameters    */  override def open(parameters: Configuration): Unit = {    val config: org.apache.hadoop.conf.Configuration = HBaseConfiguration.create()    config.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201")    config.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")    config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000)    config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000)    val tableName: TableName = TableName.valueOf("test")    val cf1: String = "cf1"    conn = ConnectionFactory.createConnection(config)    table = conn.getTable(tableName)    scan = new Scan()    scan.withStartRow(Bytes.toBytes("100"))    scan.withStopRow(Bytes.toBytes("107"))    scan.addFamily(Bytes.toBytes(cf1))  }  /**    * run方法来自java的接口文件SourceFunction,使用IDEA工具Ctrl + o 无法便捷获取到该方法,直接override会提示    * @param sourceContext    */  override def run(sourceContext: SourceContext[(String, String)]): Unit = {    val rs = table.getScanner(scan)    val iterator = rs.iterator()    while (iterator.hasNext){      val result = iterator.next()      val rowKey = Bytes.toString(result.getRow)      val sb: StringBuffer = new StringBuffer()      for (cell:Cell <- result.listCells().asScala){        val value = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)        sb.append(value).append("_")      }      val valueString = sb.replace(sb.length() - 1, sb.length(), "").toString      sourceContext.collect((rowKey, valueString))    }  }  /**    * 必须添加    */  override def cancel(): Unit = {  }  /**    * 关闭hbase的连接,关闭table表    */  override def close(): Unit = {    try {      if (table != null) {        table.close()      }      if (conn != null) {        conn.close()      }    } catch {      case e:Exception => println(e.getMessage)    }  }}

调用继承RichSourceFunction的HBaseReader类,Flink Streaming流式处理的方式:

/**  * 从HBase读取数据  * 第一种:继承RichSourceFunction重写父类方法  */ def readFromHBaseWithRichSourceFunction(): Unit ={   val env = StreamExecutionEnvironment.getExecutionEnvironment   env.enableCheckpointing(5000)   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)   env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)   val dataStream: DataStream[(String, String)] = env.addSource(new HBaseReader)   dataStream.map(x => println(x._1 + " " + x._2))   env.execute() }

2.1.2 实现自定义的TableInputFormat接口:

由于版本不匹配,这里我们需要对flink-hbase-2.12-1.7.2里面的三个文件进行重写,分别是TableInputSplit、AbstractTableInputFormat、TableInputFormat

TableInputSplit重写为CustomTableInputSplit:

package cn.swordfall.hbaseOnFlink.flink172_hbase212;import org.apache.flink.core.io.LocatableInputSplit;/** * @Author: Yang JianQiu * @Date: 2019/3/19 11:50 */public class CustomTableInputSplit extends LocatableInputSplit {    private static final long serialVersionUID = 1L;    /** The name of the table to retrieve data from. */    private final byte[] tableName;    /** The start row of the split. */    private final byte[] startRow;    /** The end row of the split. */    private final byte[] endRow;    /**     * Creates a new table input split.     *     * @param splitNumber     *        the number of the input split     * @param hostnames     *        the names of the hosts storing the data the input split refers to     * @param tableName     *        the name of the table to retrieve data from     * @param startRow     *        the start row of the split     * @param endRow     *        the end row of the split     */    CustomTableInputSplit(final int splitNumber, final String[] hostnames, final byte[] tableName, final byte[] startRow,                    final byte[] endRow) {        super(splitNumber, hostnames);        this.tableName = tableName;        this.startRow = startRow;        this.endRow = endRow;    }    /**     * Returns the table name.     *     * @return The table name.     */    public byte[] getTableName() {        return this.tableName;    }    /**     * Returns the start row.     *     * @return The start row.     */    public byte[] getStartRow() {        return this.startRow;    }    /**     * Returns the end row.     *     * @return The end row.     */    public byte[] getEndRow() {        return this.endRow;    }}

AbstractTableInputFormat重写为CustomeAbstractTableInputFormat:

package cn.swordfall.hbaseOnFlink.flink172_hbase212;import org.apache.flink.addons.hbase.AbstractTableInputFormat;import org.apache.flink.api.common.io.LocatableInputSplitAssigner;import org.apache.flink.api.common.io.RichInputFormat;import org.apache.flink.api.common.io.statistics.BaseStatistics;import org.apache.flink.configuration.Configuration;import org.apache.flink.core.io.InputSplitAssigner;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.ResultScanner;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.hbase.util.Pair;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.util.ArrayList;import java.util.List;/** * @Author: Yang JianQiu * @Date: 2019/3/19 11:16 * * 由于flink-hbase_2.12_1.7.2 jar包所引用的是hbase1.4.3版本,而现在用到的是hbase2.1.2,版本不匹配 * 故需要重写flink-hbase_2.12_1.7.2里面的AbstractTableInputFormat,主要原因是AbstractTableInputFormat里面调用的是hbase1.4.3版本的api, * 而新版本hbase2.1.2已经去掉某些api */public abstract class CustomAbstractTableInputFormat
extends RichInputFormat
{ protected static final Logger LOG = LoggerFactory.getLogger(AbstractTableInputFormat.class); // helper variable to decide whether the input is exhausted or not protected boolean endReached = false; protected transient HTable table = null; protected transient Scan scan = null; /** HBase iterator wrapper. */ protected ResultScanner resultScanner = null; protected byte[] currentRow; protected long scannedRows; /** * Returns an instance of Scan that retrieves the required subset of records from the HBase table. * * @return The appropriate instance of Scan for this use case. */ protected abstract Scan getScanner(); /** * What table is to be read. * *

Per instance of a TableInputFormat derivative only a single table name is possible. * * @return The name of the table */ protected abstract String getTableName(); /** * HBase returns an instance of {

@link Result}. * *

This method maps the returned {

@link Result} instance into the output type {
@link T}. * * @param r The Result instance from HBase that needs to be converted * @return The appropriate instance of {
@link T} that contains the data of Result. */ protected abstract T mapResultToOutType(Result r); /** * Creates a {
@link Scan} object and opens the {
@link HTable} connection. * *

These are opened here because they are needed in the createInputSplits * which is called before the openInputFormat method. * *

The connection is opened in this method and closed in {

@link #closeInputFormat()}. * * @param parameters The configuration that is to be used * @see Configuration */ @Override public abstract void configure(Configuration parameters); @Override public void open(CustomTableInputSplit split) throws IOException { if (table == null) { throw new IOException("The HBase table has not been opened! " + "This needs to be done in configure()."); } if (scan == null) { throw new IOException("Scan has not been initialized! " + "This needs to be done in configure()."); } if (split == null) { throw new IOException("Input split is null!"); } logSplitInfo("opening", split); // set scan range currentRow = split.getStartRow(); /* scan.setStartRow(currentRow); scan.setStopRow(split.getEndRow());*/ scan.withStartRow(currentRow); scan.withStopRow(split.getEndRow()); resultScanner = table.getScanner(scan); endReached = false; scannedRows = 0; } @Override public T nextRecord(T reuse) throws IOException { if (resultScanner == null) { throw new IOException("No table result scanner provided!"); } try { Result res = resultScanner.next(); if (res != null) { scannedRows++; currentRow = res.getRow(); return mapResultToOutType(res); } } catch (Exception e) { resultScanner.close(); //workaround for timeout on scan LOG.warn("Error after scan of " + scannedRows + " rows. Retry with a new scanner...", e); /*scan.setStartRow(currentRow);*/ scan.withStartRow(currentRow); resultScanner = table.getScanner(scan); Result res = resultScanner.next(); if (res != null) { scannedRows++; currentRow = res.getRow(); return mapResultToOutType(res); } } endReached = true; return null; } private void logSplitInfo(String action, CustomTableInputSplit split) { int splitId = split.getSplitNumber(); String splitStart = Bytes.toString(split.getStartRow()); String splitEnd = Bytes.toString(split.getEndRow()); String splitStartKey = splitStart.isEmpty() ? "-" : splitStart; String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd; String[] hostnames = split.getHostnames(); LOG.info("{} split (this={})[{}|{}|{}|{}]", action, this, splitId, hostnames, splitStartKey, splitStopKey); } @Override public boolean reachedEnd() throws IOException { return endReached; } @Override public void close() throws IOException { LOG.info("Closing split (scanned {} rows)", scannedRows); currentRow = null; try { if (resultScanner != null) { resultScanner.close(); } } finally { resultScanner = null; } } @Override public void closeInputFormat() throws IOException { try { if (table != null) { table.close(); } } finally { table = null; } } @Override public CustomTableInputSplit[] createInputSplits(final int minNumSplits) throws IOException { if (table == null) { throw new IOException("The HBase table has not been opened! " + "This needs to be done in configure()."); } if (scan == null) { throw new IOException("Scan has not been initialized! " + "This needs to be done in configure()."); } // Get the starting and ending row keys for every region in the currently open table final Pair
keys = table.getRegionLocator().getStartEndKeys(); if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { throw new IOException("Expecting at least one region."); } final byte[] startRow = scan.getStartRow(); final byte[] stopRow = scan.getStopRow(); final boolean scanWithNoLowerBound = startRow.length == 0; final boolean scanWithNoUpperBound = stopRow.length == 0; final List
splits = new ArrayList
(minNumSplits); for (int i = 0; i < keys.getFirst().length; i++) { final byte[] startKey = keys.getFirst()[i]; final byte[] endKey = keys.getSecond()[i]; final String regionLocation = table.getRegionLocator().getRegionLocation(startKey, false).getHostnamePort(); // Test if the given region is to be included in the InputSplit while splitting the regions of a table if (!includeRegionInScan(startKey, endKey)) { continue; } // Find the region on which the given row is being served final String[] hosts = new String[]{regionLocation}; // Determine if regions contains keys used by the scan boolean isLastRegion = endKey.length == 0; if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow, endKey) < 0) && (scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) { final byte[] splitStart = scanWithNoLowerBound || Bytes.compareTo(startKey, startRow) >= 0 ? startKey : startRow; final byte[] splitStop = (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0) && !isLastRegion ? endKey : stopRow; int id = splits.size(); final CustomTableInputSplit split = new CustomTableInputSplit(id, hosts, table.getName().getName(), splitStart, splitStop); splits.add(split); } } LOG.info("Created " + splits.size() + " splits"); for (CustomTableInputSplit split : splits) { logSplitInfo("created", split); } return splits.toArray(new CustomTableInputSplit[splits.size()]); } /** * Test if the given region is to be included in the scan while splitting the regions of a table. * * @param startKey Start key of the region * @param endKey End key of the region * @return true, if this region needs to be included as part of the input (default). */ protected boolean includeRegionInScan(final byte[] startKey, final byte[] endKey) { return true; } @Override public InputSplitAssigner getInputSplitAssigner(CustomTableInputSplit[] inputSplits) { return new LocatableInputSplitAssigner(inputSplits); } @Override public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { return null; }}

TableInputFormat重写为CustomTableInputFormat:

package cn.swordfall.hbaseOnFlink.flink172_hbase212;import org.apache.flink.api.java.tuple.Tuple;import org.apache.flink.configuration.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.Scan;/** * @Author: Yang JianQiu * @Date: 2019/3/19 11:15 * 由于flink-hbase_2.12_1.7.2 jar包所引用的是hbase1.4.3版本,而现在用到的是hbase2.1.2,版本不匹配 * 故需要重写flink-hbase_2.12_1.7.2里面的TableInputFormat */public abstract class CustomTableInputFormat
extends CustomAbstractTableInputFormat
{ private static final long serialVersionUID = 1L; /** * Returns an instance of Scan that retrieves the required subset of records from the HBase table. * @return The appropriate instance of Scan for this usecase. */ @Override protected abstract Scan getScanner(); /** * What table is to be read. * Per instance of a TableInputFormat derivative only a single tablename is possible. * @return The name of the table */ @Override protected abstract String getTableName(); /** * The output from HBase is always an instance of {
@link Result}. * This method is to copy the data in the Result instance into the required {
@link Tuple} * @param r The Result instance from HBase that needs to be converted * @return The appropriate instance of {
@link Tuple} that contains the needed information. */ protected abstract T mapResultToTuple(Result r); /** * Creates a {
@link Scan} object and opens the {
@link HTable} connection. * These are opened here because they are needed in the createInputSplits * which is called before the openInputFormat method. * So the connection is opened in {
@link #configure(Configuration)} and closed in {
@link #closeInputFormat()}. * * @param parameters The configuration that is to be used * @see Configuration */ @Override public void configure(Configuration parameters) { table = createTable(); if (table != null) { scan = getScanner(); } } /** * Create an {
@link HTable} instance and set it into this format. */ private HTable createTable() { LOG.info("Initializing HBaseConfiguration"); //use files found in the classpath org.apache.hadoop.conf.Configuration hConf = HBaseConfiguration.create(); try { return null; } catch (Exception e) { LOG.error("Error instantiating a new HTable instance", e); } return null; } @Override protected T mapResultToOutType(Result r) { return mapResultToTuple(r); }}

继承自定义的CustomTableInputFormat,进行hbase连接、读取操作:

package cn.swordfall.hbaseOnFlinkimport java.io.IOExceptionimport cn.swordfall.hbaseOnFlink.flink172_hbase212.CustomTableInputFormatimport org.apache.flink.api.java.tuple.Tuple2import org.apache.flink.addons.hbase.TableInputFormatimport org.apache.flink.configuration.Configurationimport org.apache.hadoop.hbase.{Cell, HBaseConfiguration, HConstants, TableName}import org.apache.hadoop.hbase.client._import org.apache.hadoop.hbase.util.Bytesimport scala.collection.JavaConverters._/**  * @Author: Yang JianQiu  * @Date: 2019/3/1 1:14  *  * 从HBase读取数据  * 第二种:实现TableInputFormat接口  */class HBaseInputFormat extends CustomTableInputFormat[Tuple2[String, String]]{  // 结果Tuple  val tuple2 = new Tuple2[String, String]  /**    * 建立HBase连接    * @param parameters    */  override def configure(parameters: Configuration): Unit = {    val tableName: TableName = TableName.valueOf("test")    val cf1 = "cf1"    var conn: Connection = null    val config: org.apache.hadoop.conf.Configuration = HBaseConfiguration.create    config.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201")    config.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")    config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000)    config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000)    try {      conn = ConnectionFactory.createConnection(config)      table = conn.getTable(tableName).asInstanceOf[HTable]      scan = new Scan()      scan.withStartRow(Bytes.toBytes("001"))      scan.withStopRow(Bytes.toBytes("201"))      scan.addFamily(Bytes.toBytes(cf1))    } catch {      case e: IOException =>        e.printStackTrace()    }  }  /**    * 对获取的数据进行加工处理    * @param result    * @return    */  override def mapResultToTuple(result: Result): Tuple2[String, String] = {    val rowKey = Bytes.toString(result.getRow)    val sb = new StringBuffer()    for (cell: Cell <- result.listCells().asScala){      val value = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)      sb.append(value).append("_")    }    val value = sb.replace(sb.length() - 1, sb.length(), "").toString    tuple2.setField(rowKey, 0)    tuple2.setField(value, 1)    tuple2  }  /**    * tableName    * @return    */  override def getTableName: String = "test"  /**    * 获取Scan    * @return    */  override def getScanner: Scan = {    scan  }}

调用实现CustomTableInputFormat接口的类HBaseInputFormat,Flink Streaming流式处理的方式:

/** * 从HBase读取数据 * 第二种:实现TableInputFormat接口 */ def readFromHBaseWithTableInputFormat(): Unit ={   val env = StreamExecutionEnvironment.getExecutionEnvironment   env.enableCheckpointing(5000)   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)   env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)   val dataStream = env.createInput(new HBaseInputFormat)   dataStream.filter(_.f0.startsWith("10")).print()   env.execute() }

而Flink DataSet批处理的方式为:

/** * 读取HBase数据方式:实现TableInputFormat接口 */ def readFromHBaseWithTableInputFormat(): Unit ={   val env = ExecutionEnvironment.getExecutionEnvironment   val dataStream = env.createInput(new HBaseInputFormat)   dataStream.filter(_.f1.startsWith("20")).print() }

2.2 Flink写入HBase的两种方式

这里Flink Streaming写入HBase,需要从Kafka接收数据,可以开启kafka单机版,利用kafka-console-producer.sh往topic "test"写入如下数据:

100,hello,20101,nice,24102,beautiful,26

2.2.1 继承RichSinkFunction重写父类方法:

package cn.swordfall.hbaseOnFlinkimport org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName}import org.apache.hadoop.hbase.client._import org.apache.hadoop.hbase.util.Bytes/**  * @Author: Yang JianQiu  * @Date: 2019/3/1 1:34  *  * 写入HBase  * 第一种:继承RichSinkFunction重写父类方法  *  * 注意:由于flink是一条一条的处理数据,所以我们在插入hbase的时候不能来一条flush下,  * 不然会给hbase造成很大的压力,而且会产生很多线程导致集群崩溃,所以线上任务必须控制flush的频率。  *  * 解决方案:我们可以在open方法中定义一个变量,然后在写入hbase时比如500条flush一次,或者加入一个list,判断list的大小满足某个阀值flush一下  */class HBaseWriter extends RichSinkFunction[String]{  var conn: Connection = null  val scan: Scan = null  var mutator: BufferedMutator = null  var count = 0  /**    * 建立HBase连接    * @param parameters    */  override def open(parameters: Configuration): Unit = {    val config:org.apache.hadoop.conf.Configuration = HBaseConfiguration.create    config.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201")    config.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")    config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000)    config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000)    conn = ConnectionFactory.createConnection(config)    val tableName: TableName = TableName.valueOf("test")    val params: BufferedMutatorParams = new BufferedMutatorParams(tableName)    //设置缓存1m,当达到1m时数据会自动刷到hbase    params.writeBufferSize(1024 * 1024) //设置缓存的大小    mutator = conn.getBufferedMutator(params)    count = 0  }  /**    * 处理获取的hbase数据    * @param value    * @param context    */  override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {    val cf1 = "cf1"    val array: Array[String] = value.split(",")    val put: Put = new Put(Bytes.toBytes(array(0)))    put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("name"), Bytes.toBytes(array(1)))    put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("age"), Bytes.toBytes(array(2)))    mutator.mutate(put)    //每满2000条刷新一下数据    if (count >= 2000){      mutator.flush()      count = 0    }    count = count + 1  }  /**    * 关闭    */  override def close(): Unit = {    if (conn != null) conn.close()  }}

 

调用继承RichSinkFunction的HBaseWriter类,Flink Streaming流式处理的方式:

/**  * 写入HBase  * 第一种:继承RichSinkFunction重写父类方法  */ def write2HBaseWithRichSinkFunction(): Unit = {   val topic = "test"   val props = new Properties   props.put("bootstrap.servers", "192.168.187.201:9092")   props.put("group.id", "kv_flink")   props.put("enable.auto.commit", "true")   props.put("auto.commit.interval.ms", "1000")   props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")   props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")   val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment   env.enableCheckpointing(5000)   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)   env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)   val myConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema, props)   val dataStream: DataStream[String] = env.addSource(myConsumer)   //写入HBase   dataStream.addSink(new HBaseWriter)   env.execute() }

2.2.2 实现OutputFormat接口:

package cn.swordfall.hbaseOnFlinkimport org.apache.flink.api.common.io.OutputFormatimport org.apache.flink.configuration.Configurationimport org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName}import org.apache.hadoop.hbase.client._import org.apache.hadoop.hbase.util.Bytes/**  * @Author: Yang JianQiu  * @Date: 2019/3/1 1:40  *  * 写入HBase提供两种方式  * 第二种:实现OutputFormat接口  */class HBaseOutputFormat extends OutputFormat[String]{  val zkServer = "192.168.187.201"  val port = "2181"  var conn: Connection = null  var mutator: BufferedMutator = null  var count = 0  /**    * 配置输出格式。此方法总是在实例化输出格式上首先调用的    *    * @param configuration    */  override def configure(configuration: Configuration): Unit = {  }  /**    * 用于打开输出格式的并行实例,所以在open方法中我们会进行hbase的连接,配置,建表等操作。    *    * @param i    * @param i1    */  override def open(i: Int, i1: Int): Unit = {    val config: org.apache.hadoop.conf.Configuration = HBaseConfiguration.create    config.set(HConstants.ZOOKEEPER_QUORUM, zkServer)    config.set(HConstants.ZOOKEEPER_CLIENT_PORT, port)    config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000)    config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000)    conn = ConnectionFactory.createConnection(config)    val tableName: TableName = TableName.valueOf("test")    val params: BufferedMutatorParams = new BufferedMutatorParams(tableName)    //设置缓存1m,当达到1m时数据会自动刷到hbase    params.writeBufferSize(1024 * 1024) //设置缓存的大小    mutator = conn.getBufferedMutator(params)    count = 0  }  /**    * 用于将数据写入数据源,所以我们会在这个方法中调用写入hbase的API    *    * @param it    */  override def writeRecord(it: String): Unit = {    val cf1 = "cf1"    val array: Array[String] = it.split(",")    val put: Put = new Put(Bytes.toBytes(array(0)))    put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("name"), Bytes.toBytes(array(1)))    put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("age"), Bytes.toBytes(array(2)))    mutator.mutate(put)    //每4条刷新一下数据,如果是批处理调用outputFormat,这里填写的4必须不能大于批处理的记录总数量,否则数据不会更新到hbase里面    if (count >= 4){      mutator.flush()      count = 0    }    count = count + 1  }  /**    * 关闭    */  override def close(): Unit = {    try {      if (conn != null) conn.close()    } catch {      case e: Exception => println(e.getMessage)    }  }}

调用实现OutputFormat的HBaseOutputFormat类,Flink Streaming流式处理的方式:

/**  * 写入HBase  * 第二种:实现OutputFormat接口  */ def write2HBaseWithOutputFormat(): Unit = {   val topic = "test"   val props = new Properties   props.put("bootstrap.servers", "192.168.187.201:9092")   props.put("group.id", "kv_flink")   props.put("enable.auto.commit", "true")   props.put("auto.commit.interval.ms", "1000")   props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")   props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")   val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment   env.enableCheckpointing(5000)   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)   env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)   val myConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema, props)   val dataStream: DataStream[String] = env.addSource(myConsumer)   dataStream.writeUsingOutputFormat(new HBaseOutputFormat)   env.execute() }

而Flink DataSet批处理的方式为:

/**  * 写入HBase方式:实现OutputFormat接口  */ def write2HBaseWithOutputFormat(): Unit = {   val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment   //2.定义数据   val dataSet: DataSet[String] = env.fromElements("103,zhangsan,20", "104,lisi,21", "105,wangwu,22", "106,zhaolilu,23")   dataSet.output(new HBaseOutputFormat)   //运行下面这句话,程序才会真正执行,这句代码针对的是data sinks写入数据的   env.execute() }

注意:

  如果是批处理调用的,应该要注意HBaseOutputFormat类的writeRecord方法每次批量刷新的数据量不能大于批处理的总记录数据量,否则数据更新不到hbase里面。

3. 总结

【其他相关文章】

  查看纯Java API读写HBase

 查看Spark上读写HBase

github地址:

(flink读写hbase包括java和scala两个版本的代码)

【参考资料】

https://blog.csdn.net/liguohuabigdata/article/details/78588861

 https://blog.csdn.net/aA518189/article/details/86544844

 

你可能感兴趣的文章
php使用 memcache 来存储 session
查看>>
php实现socket(转)
查看>>
PHP底层的运行机制与原理
查看>>
php 几个比较实用的函数
查看>>
深入了解php底层机制
查看>>
PHP中的stdClass 【转】
查看>>
XHProf-php轻量级的性能分析工具
查看>>
PHP7新特性 What will be in PHP 7/PHPNG
查看>>
比较strtr, str_replace和preg_replace三个函数的效率
查看>>
ubuntu 下编译PHP5.5.7问题:configure: error: freetype.h not found.
查看>>
PHP编译configure时常见错误 debian centos
查看>>
configure: error: Please reinstall the BZip2 distribution
查看>>
OpenCV gpu模块样例注释:video_reader.cpp
查看>>
【增强学习在无人驾驶中的应用】
查看>>
《python+opencv实践》四、图像特征提取与描述——29理解图像特征
查看>>
《python+opencv实践》四、图像特征提取与描述——30Harris 角点检测
查看>>
《python+opencv实践》四、图像特征提取与描述——31 Shi-Tomasi 角点检测& 适合于跟踪的图像特征
查看>>
OpenCV meanshift目标跟踪总结
查看>>
人工神经网络——神经元模型介绍
查看>>
人工神经网络——感知器介绍
查看>>