更新时间:2021-09-18 GMT+08:00
分享

Java代码样例

使用springboot框架,添加相应pom依赖:

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>${jackson.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>${httpclient.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpasyncclient</artifactId>
            <version>${httpasyncclient.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpmime</artifactId>
            <version>4.5.6</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-orm</artifactId>
            <version>${spring.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aop</artifactId>
            <version>${spring.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-tx</artifactId>
            <version>${spring.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>${spring.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.9</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
            <version>${springboot.version}</version>
        </dependency>

application.properties配置项如下所示:

iamUrl=https://IP:端口/v3/auth/tokens
password=xxxxxxxxx
domainName=xxxxxxxxx
name=xxxxxxxxx
projectId=e078************************7bc2e
receiverUrl=https://IP:端口/cdflapigw/inference/v1.0/xxxxxxxxxx

JAVA核心代码如下所示:

package com.huawei.softcom.ai.smartorder.replayer.file2rest;

import com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.nio.conn.NoopIOSessionStrategy;
import org.apache.http.nio.conn.SchemeIOSessionStrategy;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.ssl.SSLContextBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.*;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;

import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.text.MessageFormat;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

@Component("restAlarmSender ")
public class RestAlarmSender {
    private static final Logger logger = LoggerFactory.getLogger(RestAlarmSender.class);

    @Value("${receiverUrl:null}")
    private String receiverUrl;

    @Value("${batchSendSize:null}")
    private int batchSendSize;

    private String token;

    @Value("${iamUrl}")
    private String iamUrl;

    @Value("${password}")
    private String password;

    @Value("${domainName}")
    private String domainName;

    @Value("${name}")
    private String name;

    @Value("${projectId}")
    private String projectId;

    @Autowired
    private RestTemplate restTemplate;

    private LinkedBlockingQueue<RawAlarm> alarmsToSend = new LinkedBlockingQueue<>();

    CloseableHttpAsyncClient httpclient = initHttpAsyncClient();

    private static HttpClientContext context = HttpClientContext.create();

    ObjectMapper mapper = new ObjectMapper();

    public RestAlarmSender() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(this::sendAlarmsByBatch);
    }

    private void getToken(){
        String url = iamUrl;
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        String json = "{\"auth\":{\"identity\":{\"password\":{\"user\":{\"password\":\""+ password + "\",\"domain\":{\"name\":\"" + domainName + "\"},\"name\":\""+ name +"\"}},\"methods\":[\"password\"]},\"scope\":{\"project\":{\"id\":\""+projectId+"\"}}}}";
        HttpEntity<String> httpEntity = new HttpEntity<>(json, headers);
        try {
            ResponseEntity<String> response = restTemplate.exchange(url, HttpMethod.POST, httpEntity, String.class);
            logger.info(response.getBody());
            token = response.getHeaders().get("X-Subject-Token").get(0);
        } catch (Exception e) {
            logger.error("config URL error :" + e.getMessage());
        }
    }

    public void sendRawAlarm(RawAlarm alarm) {
        // add to send queue
        alarmsToSend.add(alarm);
    }

    private void sendAlarmsByBatch() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                // blocking if empty
                RawAlarm first = alarmsToSend.take();

                List<RawAlarm> alarmList = new ArrayList<>();
                alarmList.add(first);
                while (!alarmsToSend.isEmpty() && alarmList.size() < batchSendSize) {
                    alarmList.add(alarmsToSend.take());
                }
                postAlarms(alarmList);
            } catch (InterruptedException ex) {
                logger.error(String.format(Locale.ENGLISH, "Sending alarms interrupted %s", ex.getMessage()));
                Thread.currentThread().interrupt();
            }
        }
    }

    private void postAlarms(List<RawAlarm> alarms) {
        HttpPost httpPost = new HttpPost(receiverUrl);

        if(null == token){
            getToken();
        }

        httpPost.setHeader("Content-Type", "application/json");
        httpPost.setHeader("x-access-address", "/replayer/v1.0/alarm/cdfl/batch");
        httpPost.setHeader("X-Auth-Token", token);

        try {
            String alarmStr = mapper.writeValueAsString(alarms);
            StringEntity entity = new StringEntity(alarmStr, "UTF-8");
            entity.setContentEncoding("UTF-8");
            httpPost.setEntity(entity);
            httpclient.start();
            httpclient.execute(httpPost, context, new FutureCallback<HttpResponse>() {
                @Override
                public void completed(HttpResponse result) {
                    logger.info(MessageFormat.format("Sending alarm successfully, url:{0}, number of " + "alarms:{1}",
                        receiverUrl, alarms.size()));
                }

                @Override
                public void failed(Exception e) {
                    logger.error(MessageFormat.format("Sending alarm unsuccessfully, url:{0}, message: {1}",
                        receiverUrl, e.getMessage()));
                }

                @Override
                public void cancelled() {
                    // todo
                }
            });
        }catch (IOException e1) {
            logger.error(MessageFormat.format("Sending alarm unsuccessfully, url:{0}, IOException", receiverUrl));
        }
        catch (Exception e) {
            logger.error(MessageFormat.format("Sending alarm unsuccessfully, url:{0}, message: {1}", receiverUrl,
                e.getMessage()));
        }
    }
}
分享:

    相关文档

    相关产品

close