更新时间:2023-09-26 GMT+08:00
CloudEvents SDK
本章节介绍使用开源的CloudEvents Java SDK发布事件。
前提条件
- 获取并安装IntelliJ IDEA,如果未安装,请至IntelliJ IDEA官方网站下载。
- 在pom.xml中加入依赖。如何在Java环境中集成API请求签名的SDK请参考AK/SK签名认证操作指导。
<dependency> <groupId>io.cloudevents</groupId> <artifactId>cloudevents-json-jackson</artifactId> <version>${cloudevents.version}</version> </dependency> <dependency> <groupId>com.huawei.apigateway</groupId> <artifactId>java-sdk-core</artifactId> <version>3.1.2</version> </dependency>
“cloudevents.version”使用最新版本2.2.0。
发布事件
发布事件的示例代码如下(以下加粗内容需要请根据实际情况替换):
import com.alibaba.fastjson.JSONObject;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.jackson.JsonFormat;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.util.EntityUtils;
import java.net.URI;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;
@Slf4j
public class PublishService {
private static final String NAME = "l******";
private static final String PASSWORD = "******";
private static final String DOMAIN_NAME = "paa******01";
private static final String PROJECT_ID = "c5******f";
private static final String CHANNEL_ID = "ff******3";
private static final String ENDPOINT = "events.cn-north-4.myhuaweicloud.com";
private static final String URL = "https://" + ENDPOINT + "/v1/" + PROJECT_ID + "/channels/" + CHANNEL_ID + "/events";
private static final String SOURCE = "test_source";
private static final String IAM_ENDPOINT = "iam.cn-north-4.myhuaweicloud.com";
private static final String IAM_ADDRESS = "https://" + IAM_ENDPOINT + "/v3/auth/tokens";
private static final String IAM_BODY = "{\"auth\": {\"identity\": {\"methods\": [\"password\"],\"password\": {\"user\": " +
"{\"name\": \"" + NAME + "\",\"password\": \"" + PASSWORD + "\",\"domain\": {\"name\": \"" + DOMAIN_NAME + "\"}}}}," +
"\"scope\": {\"project\": {\"id\": \"" + PROJECT_ID + "\"}}}}";
private static final Map<String, String> tokenCache = new HashMap<>();
private static final Map<String, LocalDateTime> expireTimeCache = new HashMap<>();
public static void main(String[] args) {
try {
PublishService publishService = new PublishService();
CloudEvent cloudEvent = publishService.buildCloudEvent(SOURCE);
publishService.publish(cloudEvent);
} catch (Exception exception) {
exception.printStackTrace();
}
}
/**
* 获取token
*
* @param userName IAM账户用户名
* @param domainName IAM账户domain用户名
* @throws Exception
*/
public String getToken(String userName, String domainName) throws Exception {
synchronized(expireTimeCache) {
String cacheKey = domainName + userName;
LocalDateTime expireTime = expireTimeCache.get(cacheKey);
if (expireTime != null && tokenCache.get(cacheKey) != null) {
LocalDateTime curTime = LocalDateTime.now().minusMinutes(5);
if (curTime.isBefore(expireTime)) {
return tokenCache.get(cacheKey);
}
}
Map<String, String> headers = new HashMap<>();
HttpResponse postResponse = getPostResponse(IAM_ADDRESS, headers, IAM_BODY);
Header[] allHeaders = postResponse.getAllHeaders();
HttpEntity entity = postResponse.getEntity();
if (entity != null) {
String entityString = EntityUtils.toString(entity, "UTF-8");
JSONObject jsonObject = JSONObject.parseObject(entityString);
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss");
String[] split = jsonObject.getJSONObject("token").getString("expires_at").split("\\.");
expireTimeCache.put(cacheKey, LocalDateTime.parse(split[0], dateTimeFormatter));
}
List<Header> collect = Arrays.stream(allHeaders).filter(header -> header.getName().equals("X-Subject-Token")).collect(Collectors.toList());
String token = collect.get(0).getValue();
tokenCache.put(cacheKey, token);
return token;
}
}
/**
* 推送CloudEvents事件到EG-engine
*
* @param cloudEvent
* @throws Exception
*/
public void publish(CloudEvent cloudEvent) throws Exception {
String body = buildBody(cloudEvent);
Map<String, String> headers = new HashMap<>();
headers.put("X-Auth-Token", getToken(DOMAIN_NAME, NAME));
HttpEntity resEntity = getPostResponse(URL, headers, body).getEntity();
if (resEntity != null) {
System.out.println(System.getProperty("line.separator") + EntityUtils.toString(resEntity, "UTF-8"));
}
}
/**
* 构建CloudEvents实体
*
* @param source 事件源名称
* @return CloudEvents实体
*/
private CloudEvent buildCloudEvent(String source) {
io.cloudevents.core.v1.CloudEventBuilder cloudEventBuilder = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSource(URI.create(source))
.withType(JsonFormat.CONTENT_TYPE)
.withTime(OffsetDateTime.now());
return cloudEventBuilder.build();
}
private static String buildBody(CloudEvent cloudEvent) {
JsonFormat jsonFormat = ((JsonFormat) Objects.requireNonNull(EventFormatProvider.getInstance()
.resolveFormat("application/cloudevents+json"))).withForceNonJsonDataToString();
EventFormatProvider.getInstance().registerFormat(jsonFormat);
return "{\"events\": [" + new String(jsonFormat.serialize(cloudEvent)) + "]}";
}
/**
* 使用指定的headers和 body进行https请求
*
* @param headers 请求headers
* @param body 请求体
* @return
* @throws Exception
*/
private HttpResponse getPostResponse(String url, Map<String, String> headers, String body) throws Exception {
CloseableHttpClient client = null;
SSLConnectionSocketFactory scsf = new SSLConnectionSocketFactory(
SSLContexts.custom().loadTrustMaterial(null, new TrustSelfSignedStrategy()).build(),
NoopHostnameVerifier.INSTANCE);
client = HttpClients.custom().setSSLSocketFactory(scsf).build();
HttpPost httpPost = new HttpPost(url);
headers.forEach(httpPost::addHeader);
httpPost.addHeader("Content-Type", "application/json");
httpPost.setEntity(new StringEntity(body, "UTF-8"));
return client.execute(httpPost);
}
}
参数说明: