更新时间:2024-08-03 GMT+08:00

写入OpenTSDB数据

功能简介

使用OpenTSDB的接口(/api/put)写入数据。

函数genWeatherData()模拟生成的气象数据,函数putData()发送气象数据到OpenTSDB服务端。

样例代码

以下代码片段在com.huawei.bigdata.opentsdb.examples包的"OpentsdbExample"类的putData方法中。
private void putData(String tmpURL) {
  PUT_URL = BASE_URL + tmpURL;
  LOG.info("start to put data in opentsdb, the url is " + PUT_URL);
  try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
    HttpPost httpPost = new HttpPost(PUT_URL);//请求需要设置超时时间
    addTimeout(httpPost);
    String weatherData = genWeatherData();
    StringEntity entity = new StringEntity(weatherData, "ISO-8859-1");
    entity.setContentType("application/json");
    httpPost.setEntity(entity);
    HttpResponse response = httpClient.execute(httpPost);
    int statusCode = response.getStatusLine().getStatusCode();
    LOG.info("Status Code : " + statusCode);
    if (statusCode != HttpStatus.SC_NO_CONTENT) {
      LOG.info("Request failed! " + response.getStatusLine());
    }
    LOG.info("put data to opentsdb successfully.");
  } catch (IOException e) {
    LOG.error("Failed to put data.", e);
  }
}

static class DataPoint {
  public String metric;
  public Long timestamp;
  public Double value;
  public Map<String, String> tags;
  public DataPoint(String metric, Long timestamp, Double value, Map<String, String> tags) {
    this.metric = metric;
    this.timestamp = timestamp;
    this.value = value;
    this.tags = tags;
  }
}

private String genWeatherData() {
  List<DataPoint> dataPoints = new ArrayList<DataPoint>();
  Map<String, String> tags = ImmutableMap.of("city", "Shenzhen", "region", "Longgang");

  // Data of air temperature
  dataPoints.add(new DataPoint("city.temp", 1498838400L, 28.0, tags));
  dataPoints.add(new DataPoint("city.temp", 1498842000L, 27.0, tags));
  dataPoints.add(new DataPoint("city.temp", 1498845600L, 27.0, tags));
  dataPoints.add(new DataPoint("city.temp", 1498849200L, 27.0, tags));
  dataPoints.add(new DataPoint("city.temp", 1498852800L, 27.0, tags));
  dataPoints.add(new DataPoint("city.temp", 1498856400L, 27.0, tags));
  dataPoints.add(new DataPoint("city.temp", 1498860000L, 27.0, tags));
  dataPoints.add(new DataPoint("city.temp", 1498863600L, 27.0, tags));
  dataPoints.add(new DataPoint("city.temp", 1498867200L, 29.0, tags));
  dataPoints.add(new DataPoint("city.temp", 1498870800L, 30.0, tags));
  dataPoints.add(new DataPoint("city.temp", 1498874400L, 32.0, tags));
  dataPoints.add(new DataPoint("city.temp", 1498878000L, 32.0, tags));
  dataPoints.add(new DataPoint("city.temp", 1498881600L, 33.0, tags));
  dataPoints.add(new DataPoint("city.temp", 1498885200L, 33.0, tags));
  dataPoints.add(new DataPoint("city.temp", 1498888800L, 32.0, tags));
  dataPoints.add(new DataPoint("city.temp", 1498892400L, 32.0, tags));
  dataPoints.add(new DataPoint("city.temp", 1498896000L, 31.0, tags));
  dataPoints.add(new DataPoint("city.temp", 1498899600L, 30.0, tags));
  dataPoints.add(new DataPoint("city.temp", 1498903200L, 30.0, tags));
  dataPoints.add(new DataPoint("city.temp", 1498906800L, 29.0, tags));
  dataPoints.add(new DataPoint("city.temp", 1498910400L, 29.0, tags));
  dataPoints.add(new DataPoint("city.temp", 1498914000L, 29.0, tags));
  dataPoints.add(new DataPoint("city.temp", 1498917600L, 28.0, tags));
  dataPoints.add(new DataPoint("city.temp", 1498921200L, 28.0, tags));

  // Data of humidity
  dataPoints.add(new DataPoint("city.hum", 1498838400L, 54.0, tags));
  dataPoints.add(new DataPoint("city.hum", 1498842000L, 53.0, tags));
  dataPoints.add(new DataPoint("city.hum", 1498845600L, 52.0, tags));
  dataPoints.add(new DataPoint("city.hum", 1498849200L, 51.0, tags));
  dataPoints.add(new DataPoint("city.hum", 1498852800L, 50.0, tags));
  dataPoints.add(new DataPoint("city.hum", 1498856400L, 49.0, tags));
  dataPoints.add(new DataPoint("city.hum", 1498860000L, 48.0, tags));
  dataPoints.add(new DataPoint("city.hum", 1498863600L, 46.0, tags));
  dataPoints.add(new DataPoint("city.hum", 1498867200L, 46.0, tags));
  dataPoints.add(new DataPoint("city.hum", 1498870800L, 48.0, tags));
  dataPoints.add(new DataPoint("city.hum", 1498874400L, 48.0, tags));
  dataPoints.add(new DataPoint("city.hum", 1498878000L, 49.0, tags));
  dataPoints.add(new DataPoint("city.hum", 1498881600L, 49.0, tags));
  dataPoints.add(new DataPoint("city.hum", 1498885200L, 50.0, tags));
  dataPoints.add(new DataPoint("city.hum", 1498888800L, 50.0, tags));
  dataPoints.add(new DataPoint("city.hum", 1498892400L, 50.0, tags));
  dataPoints.add(new DataPoint("city.hum", 1498896000L, 51.0, tags));
  dataPoints.add(new DataPoint("city.hum", 1498899600L, 51.0, tags));
  dataPoints.add(new DataPoint("city.hum", 1498903200L, 51.0, tags));
  dataPoints.add(new DataPoint("city.hum", 1498906800L, 51.0, tags));
  dataPoints.add(new DataPoint("city.hum", 1498910400L, 52.0, tags));
  dataPoints.add(new DataPoint("city.hum", 1498914000L, 53.0, tags));
  dataPoints.add(new DataPoint("city.hum", 1498917600L, 54.0, tags));
  dataPoints.add(new DataPoint("city.hum", 1498921200L, 54.0, tags));

  Gson gson = new Gson();
  return gson.toJson(dataPoints);
}

PUT_URL中加入了sync参数,表示必须等到数据写入HBase后才可以返回,强烈建议使用此参数;如果不使用sync,表示采用异步写入HBase的方式,可能存在丢失数据的风险。具体信息请参考OpenTSDB应用开发常见问题