写入OpenTSDB数据
功能简介
使用OpenTSDB的接口(/api/put)写入数据。
函数genWeatherData()模拟生成的气象数据,函数putData()发送气象数据到OpenTSDB服务端。
样例代码
private void putData(String tmpURL) {
PUT_URL = BASE_URL + tmpURL;
LOG.info("start to put data in opentsdb, the url is " + PUT_URL);
try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
HttpPost httpPost = new HttpPost(PUT_URL);//请求需要设置超时时间
addTimeout(httpPost);
String weatherData = genWeatherData();
StringEntity entity = new StringEntity(weatherData, "ISO-8859-1");
entity.setContentType("application/json");
httpPost.setEntity(entity);
HttpResponse response = httpClient.execute(httpPost);
int statusCode = response.getStatusLine().getStatusCode();
LOG.info("Status Code : " + statusCode);
if (statusCode != HttpStatus.SC_NO_CONTENT) {
LOG.info("Request failed! " + response.getStatusLine());
}
LOG.info("put data to opentsdb successfully.");
} catch (IOException e) {
LOG.error("Failed to put data.", e);
}
}
static class DataPoint {
public String metric;
public Long timestamp;
public Double value;
public Map<String, String> tags;
public DataPoint(String metric, Long timestamp, Double value, Map<String, String> tags) {
this.metric = metric;
this.timestamp = timestamp;
this.value = value;
this.tags = tags;
}
}
private String genWeatherData() {
List<DataPoint> dataPoints = new ArrayList<DataPoint>();
Map<String, String> tags = ImmutableMap.of("city", "Shenzhen", "region", "Longgang");
// Data of air temperature
dataPoints.add(new DataPoint("city.temp", 1498838400L, 28.0, tags));
dataPoints.add(new DataPoint("city.temp", 1498842000L, 27.0, tags));
dataPoints.add(new DataPoint("city.temp", 1498845600L, 27.0, tags));
dataPoints.add(new DataPoint("city.temp", 1498849200L, 27.0, tags));
dataPoints.add(new DataPoint("city.temp", 1498852800L, 27.0, tags));
dataPoints.add(new DataPoint("city.temp", 1498856400L, 27.0, tags));
dataPoints.add(new DataPoint("city.temp", 1498860000L, 27.0, tags));
dataPoints.add(new DataPoint("city.temp", 1498863600L, 27.0, tags));
dataPoints.add(new DataPoint("city.temp", 1498867200L, 29.0, tags));
dataPoints.add(new DataPoint("city.temp", 1498870800L, 30.0, tags));
dataPoints.add(new DataPoint("city.temp", 1498874400L, 32.0, tags));
dataPoints.add(new DataPoint("city.temp", 1498878000L, 32.0, tags));
dataPoints.add(new DataPoint("city.temp", 1498881600L, 33.0, tags));
dataPoints.add(new DataPoint("city.temp", 1498885200L, 33.0, tags));
dataPoints.add(new DataPoint("city.temp", 1498888800L, 32.0, tags));
dataPoints.add(new DataPoint("city.temp", 1498892400L, 32.0, tags));
dataPoints.add(new DataPoint("city.temp", 1498896000L, 31.0, tags));
dataPoints.add(new DataPoint("city.temp", 1498899600L, 30.0, tags));
dataPoints.add(new DataPoint("city.temp", 1498903200L, 30.0, tags));
dataPoints.add(new DataPoint("city.temp", 1498906800L, 29.0, tags));
dataPoints.add(new DataPoint("city.temp", 1498910400L, 29.0, tags));
dataPoints.add(new DataPoint("city.temp", 1498914000L, 29.0, tags));
dataPoints.add(new DataPoint("city.temp", 1498917600L, 28.0, tags));
dataPoints.add(new DataPoint("city.temp", 1498921200L, 28.0, tags));
// Data of humidity
dataPoints.add(new DataPoint("city.hum", 1498838400L, 54.0, tags));
dataPoints.add(new DataPoint("city.hum", 1498842000L, 53.0, tags));
dataPoints.add(new DataPoint("city.hum", 1498845600L, 52.0, tags));
dataPoints.add(new DataPoint("city.hum", 1498849200L, 51.0, tags));
dataPoints.add(new DataPoint("city.hum", 1498852800L, 50.0, tags));
dataPoints.add(new DataPoint("city.hum", 1498856400L, 49.0, tags));
dataPoints.add(new DataPoint("city.hum", 1498860000L, 48.0, tags));
dataPoints.add(new DataPoint("city.hum", 1498863600L, 46.0, tags));
dataPoints.add(new DataPoint("city.hum", 1498867200L, 46.0, tags));
dataPoints.add(new DataPoint("city.hum", 1498870800L, 48.0, tags));
dataPoints.add(new DataPoint("city.hum", 1498874400L, 48.0, tags));
dataPoints.add(new DataPoint("city.hum", 1498878000L, 49.0, tags));
dataPoints.add(new DataPoint("city.hum", 1498881600L, 49.0, tags));
dataPoints.add(new DataPoint("city.hum", 1498885200L, 50.0, tags));
dataPoints.add(new DataPoint("city.hum", 1498888800L, 50.0, tags));
dataPoints.add(new DataPoint("city.hum", 1498892400L, 50.0, tags));
dataPoints.add(new DataPoint("city.hum", 1498896000L, 51.0, tags));
dataPoints.add(new DataPoint("city.hum", 1498899600L, 51.0, tags));
dataPoints.add(new DataPoint("city.hum", 1498903200L, 51.0, tags));
dataPoints.add(new DataPoint("city.hum", 1498906800L, 51.0, tags));
dataPoints.add(new DataPoint("city.hum", 1498910400L, 52.0, tags));
dataPoints.add(new DataPoint("city.hum", 1498914000L, 53.0, tags));
dataPoints.add(new DataPoint("city.hum", 1498917600L, 54.0, tags));
dataPoints.add(new DataPoint("city.hum", 1498921200L, 54.0, tags));
Gson gson = new Gson();
return gson.toJson(dataPoints);
}
PUT_URL中加入了sync参数,表示必须等到数据写入HBase后才可以返回,强烈建议使用此参数;如果不使用sync,表示采用异步写入HBase的方式,可能存在丢失数据的风险。具体信息请参考OpenTSDB应用开发常见问题。