更新时间:2021-09-18 GMT+08:00
Java代码样例
使用springboot框架,添加相应pom依赖:
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>${jackson.version}</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>${httpclient.version}</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpasyncclient</artifactId> <version>${httpasyncclient.version}</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpmime</artifactId> <version>4.5.6</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-orm</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-tx</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.9</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> <version>${springboot.version}</version> </dependency>
application.properties配置项如下所示:
iamUrl=https://IP:端口/v3/auth/tokens password=xxxxxxxxx domainName=xxxxxxxxx name=xxxxxxxxx projectId=e078************************7bc2e receiverUrl=https://IP:端口/cdflapigw/inference/v1.0/xxxxxxxxxx
JAVA核心代码如下所示:
package com.huawei.softcom.ai.smartorder.replayer.file2rest; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.concurrent.FutureCallback; import org.apache.http.config.Registry; import org.apache.http.config.RegistryBuilder; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.entity.StringEntity; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager; import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor; import org.apache.http.impl.nio.reactor.IOReactorConfig; import org.apache.http.nio.conn.NoopIOSessionStrategy; import org.apache.http.nio.conn.SchemeIOSessionStrategy; import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.http.nio.reactor.IOReactorException; import org.apache.http.ssl.SSLContextBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.*; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; import javax.net.ssl.SSLContext; import java.io.IOException; import java.security.KeyManagementException; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.text.MessageFormat; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @Component("restAlarmSender ") public class RestAlarmSender { private static final Logger logger = LoggerFactory.getLogger(RestAlarmSender.class); @Value("${receiverUrl:null}") private String receiverUrl; @Value("${batchSendSize:null}") private int batchSendSize; private String token; @Value("${iamUrl}") private String iamUrl; @Value("${password}") private String password; @Value("${domainName}") private String domainName; @Value("${name}") private String name; @Value("${projectId}") private String projectId; @Autowired private RestTemplate restTemplate; private LinkedBlockingQueue<RawAlarm> alarmsToSend = new LinkedBlockingQueue<>(); CloseableHttpAsyncClient httpclient = initHttpAsyncClient(); private static HttpClientContext context = HttpClientContext.create(); ObjectMapper mapper = new ObjectMapper(); public RestAlarmSender() { ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.execute(this::sendAlarmsByBatch); } private void getToken(){ String url = iamUrl; HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); String json = "{\"auth\":{\"identity\":{\"password\":{\"user\":{\"password\":\""+ password + "\",\"domain\":{\"name\":\"" + domainName + "\"},\"name\":\""+ name +"\"}},\"methods\":[\"password\"]},\"scope\":{\"project\":{\"id\":\""+projectId+"\"}}}}"; HttpEntity<String> httpEntity = new HttpEntity<>(json, headers); try { ResponseEntity<String> response = restTemplate.exchange(url, HttpMethod.POST, httpEntity, String.class); logger.info(response.getBody()); token = response.getHeaders().get("X-Subject-Token").get(0); } catch (Exception e) { logger.error("config URL error :" + e.getMessage()); } } public void sendRawAlarm(RawAlarm alarm) { // add to send queue alarmsToSend.add(alarm); } private void sendAlarmsByBatch() { while (!Thread.currentThread().isInterrupted()) { try { // blocking if empty RawAlarm first = alarmsToSend.take(); List<RawAlarm> alarmList = new ArrayList<>(); alarmList.add(first); while (!alarmsToSend.isEmpty() && alarmList.size() < batchSendSize) { alarmList.add(alarmsToSend.take()); } postAlarms(alarmList); } catch (InterruptedException ex) { logger.error(String.format(Locale.ENGLISH, "Sending alarms interrupted %s", ex.getMessage())); Thread.currentThread().interrupt(); } } } private void postAlarms(List<RawAlarm> alarms) { HttpPost httpPost = new HttpPost(receiverUrl); if(null == token){ getToken(); } httpPost.setHeader("Content-Type", "application/json"); httpPost.setHeader("x-access-address", "/replayer/v1.0/alarm/cdfl/batch"); httpPost.setHeader("X-Auth-Token", token); try { String alarmStr = mapper.writeValueAsString(alarms); StringEntity entity = new StringEntity(alarmStr, "UTF-8"); entity.setContentEncoding("UTF-8"); httpPost.setEntity(entity); httpclient.start(); httpclient.execute(httpPost, context, new FutureCallback<HttpResponse>() { @Override public void completed(HttpResponse result) { logger.info(MessageFormat.format("Sending alarm successfully, url:{0}, number of " + "alarms:{1}", receiverUrl, alarms.size())); } @Override public void failed(Exception e) { logger.error(MessageFormat.format("Sending alarm unsuccessfully, url:{0}, message: {1}", receiverUrl, e.getMessage())); } @Override public void cancelled() { // todo } }); }catch (IOException e1) { logger.error(MessageFormat.format("Sending alarm unsuccessfully, url:{0}, IOException", receiverUrl)); } catch (Exception e) { logger.error(MessageFormat.format("Sending alarm unsuccessfully, url:{0}, message: {1}", receiverUrl, e.getMessage())); } } }
父主题: 告警接入
