通过RestHighLevelClient接入OpenSearch集群
CSS服务的OpenSearch集群支持通过High Level REST Client进行数据查询和管理。High Level REST Client对OpenSearch的API进行了封装,用户只需要构造对应的结构即可对OpenSearch集群进行访问。Rest Client的详细使用指导请参见Java high-level REST client。
前提条件
- OpenSearch集群处于可用状态。
- 运行Java代码的服务器与OpenSearch集群之间网络互通。
- 根据集群的网络配置方式,获取集群的访问地址,操作指导请参见网络配置。
- 确认服务器已安装Java,要求JDK版本为11及以上,JDK11官网下载地址:Java Archive Downloads。
引入依赖
在运行Java代码的服务器中引入Java依赖。
CSS服务支持使用Elasticsearch 7.10.2 Java客户端连接OpenSearch集群,但是为确保更好的兼容性,建议使用与OpenSearch集群同版本的OpenSearch Java客户端连接OpenSearch集群。
当使用比OpenSearch集群更高版本的High Level REST Client,并且某些请求存在兼容性问题时,可以使用“RestHighLevelClient.getLowLevelClient()”方式直接获取Low Level REST Client,实现自定义的OpenSearch请求。操作指导请参见通过RestLowLevelClient接入OpenSearch集群。
可以根据实际使用的Java客户端类型选择参考示例。
接入集群
当使用不同类型的Java客户端库访问不同安全类型的OpenSearch集群时,示例代码会有差异,请根据实际业务场景选择参考文档。
|
Java客户端库 |
OpenSearch集群安全类型 |
是否加载安全证书 |
参考文档 |
|---|---|---|---|
|
OpenSearch |
非安全模式 |
- |
|
|
OpenSearch |
安全模式+HTTP协议 安全模式+HTTPS协议 |
否 |
|
|
OpenSearch |
安全模式+HTTPS协议 |
是 |
|
|
Elasticsearch 7.10.2 |
非安全模式 |
- |
|
|
Elasticsearch 7.10.2 |
安全模式+HTTP协议 安全模式+HTTPS协议 |
否 |
|
|
Elasticsearch 7.10.2 |
安全模式+HTTPS协议 |
是 |
OpenSearch RestHighLevelClient连接非安全集群
使用OpenSearch RestHighLevelClient连接非安全模式的OpenSearch集群,并查询索引“test”是否存在。代码示例如下:
import org.apache.http.HttpHost;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.GetIndexRequest;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
public class RestHighLevelClientExample {
public static void main(String[] args) throws IOException {
List<String> host = Arrays.asList("{集群访问地址}");
RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, 9200, "http"));
final RestHighLevelClient client = new RestHighLevelClient(builder);
GetIndexRequest indexRequest = new GetIndexRequest("test");
boolean exists = client.indices().exists(indexRequest, RequestOptions.DEFAULT);
System.out.println(exists);
client.close();
}
/**
* constructHttpHosts函数转换host集群节点IP列表
*/
public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) {
return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new);
}
}
该示例代码为判断集群是否存在test索引,当返回“true”或“false”时,表示正常返回查询结果,集群连接成功。
OpenSearch RestHighLevelClient连接安全集群(无证书)
使用OpenSearch RestHighLevelClient在不加载安全证书的情况下连接安全模式的OpenSearch集群(支持HTTP协议或HTTPS协议),查询索引“test”是否存在。代码示例如下:
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.common.Nullable;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
/**
* Rest High Level连接安全集群(不使用证书)
*/
public class RestHighLevelClientExample {
/**
* 创建客户端的类,定义create函数用于创建客户端
*/
public static RestHighLevelClient create(List<String> host, int port, String protocol, int connectTimeout, int connectionRequestTimeout, int socketTimeout, String username, String password) throws IOException{
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
SSLContext sc = null;
try {
sc = SSLContext.getInstance("SSL");
sc.init(null, trustAllCerts, new SecureRandom());
} catch (KeyManagementException | NoSuchAlgorithmException e) {
e.printStackTrace();
}
SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sc, new NullHostNameVerifier());
SecuredHttpClientConfigCallback httpClientConfigCallback = new SecuredHttpClientConfigCallback(sessionStrategy,
credentialsProvider);
RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, port, protocol))
.setRequestConfigCallback(requestConfig -> requestConfig.setConnectTimeout(connectTimeout)
.setConnectionRequestTimeout(connectionRequestTimeout)
.setSocketTimeout(socketTimeout))
.setHttpClientConfigCallback(httpClientConfigCallback);
final RestHighLevelClient client = new RestHighLevelClient(builder);
logger.info("opensearch rest client build success {} ", client);
ClusterHealthRequest request = new ClusterHealthRequest();
ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT);
logger.info("opensearch rest client health response {} ", response);
return client;
}
/**
* constructHttpHosts函数转换host集群节点IP列表
*/
public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) {
return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new);
}
/**
* trustAllCerts忽略证书配置
*/
public static TrustManager[] trustAllCerts = new TrustManager[] {
new X509TrustManager() {
@Override
public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
}
@Override
public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return null;
}
}
};
private static final Logger logger = LogManager.getLogger(RestHighLevelClientExampleHttpWithSecurityNoCert.class);
static class SecuredHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback {
@Nullable
private final CredentialsProvider credentialsProvider;
/**
* The {@link SSLIOSessionStrategy} for all requests to enable SSL / TLS encryption.
*/
private final SSLIOSessionStrategy sslStrategy;
/**
* Create a new {@link SecuredHttpClientConfigCallback}.
*
* @param credentialsProvider The credential provider, if a username/password have been supplied
* @param sslStrategy The SSL strategy, if SSL / TLS have been supplied
* @throws NullPointerException if {@code sslStrategy} is {@code null}
*/
SecuredHttpClientConfigCallback(final SSLIOSessionStrategy sslStrategy,
@Nullable final CredentialsProvider credentialsProvider) {
this.sslStrategy = Objects.requireNonNull(sslStrategy);
this.credentialsProvider = credentialsProvider;
}
/**
* Get the {@link CredentialsProvider} that will be added to the HTTP client.
*
* @return Can be {@code null}.
*/
@Nullable
CredentialsProvider getCredentialsProvider() {
return credentialsProvider;
}
/**
* Get the {@link SSLIOSessionStrategy} that will be added to the HTTP client.
*
* @return Never {@code null}.
*/
SSLIOSessionStrategy getSSLStrategy() {
return sslStrategy;
}
/**
* Sets the {@linkplain HttpAsyncClientBuilder#setDefaultCredentialsProvider(CredentialsProvider) credential provider},
*
* @param httpClientBuilder The client to configure.
* @return Always {@code httpClientBuilder}.
*/
@Override
public HttpAsyncClientBuilder customizeHttpClient(final HttpAsyncClientBuilder httpClientBuilder) {
// enable SSL / TLS
httpClientBuilder.setSSLStrategy(sslStrategy);
// enable user authentication
if (credentialsProvider != null) {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
return httpClientBuilder;
}
}
public static class NullHostNameVerifier implements HostnameVerifier {
@Override
public boolean verify(String arg0, SSLSession arg1) {
return true;
}
}
/**
* main函数参考如下,调用上面的create函数创建客户端,查询test索引是否存在
*/
public static void main(String[] args) throws IOException {
RestHighLevelClient client = create(Arrays.asList("{host}"), 9200, "https", 30000, 30000, 30000, "username", "password");
GetIndexRequest indexRequest = new GetIndexRequest("test");
boolean exists = client.indices().exists(indexRequest, RequestOptions.DEFAULT);
System.out.println(exists);
client.close();
}
}
|
参数 |
描述 |
|---|---|
|
host |
集群访问地址,当存在多个IP地址时,中间用“,”隔开。 |
|
port |
集群的连接端口,默认是“9200”。 |
|
protocol |
连接协议,“http”或者“https”,根据集群实际情况填写。 |
|
connectTimeout |
socket连接超时时间(毫秒)。 |
|
connectionRequestTimeout |
socket连接请求超时时间(毫秒)。 |
|
socketTimeout |
socket请求超时时间(毫秒)。 |
|
username |
访问集群的用户名。 |
|
password |
用户名对应的密码。 |
该示例代码为判断集群是否存在test索引,当返回“true”或“false”时,表示正常返回查询结果,集群连接成功。
OpenSearch RestHighLevelClient连接安全集群(加载证书)
使用OpenSearch RestHighLevelClient在加载安全证书的情况下连接安全模式+HTTPS协议的OpenSearch集群,查询索引“test”是否存在。代码示例如下:
连接安全模式+HTTPS协议的OpenSearch集群需要先提前准备好安全证书,操作指导请参见获取并上传安全证书。
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.common.Nullable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.KeyStore;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;
/**
* Rest Hive Level连接安全集群(使用https证书)
*/
public class RestHighLevelClientExample {
public static RestHighLevelClient create(List<String> host, int port, String protocol, int connectTimeout, int connectionRequestTimeout, int socketTimeout, String username, String password, String certFilePath,
String certPassword) throws IOException {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
SSLContext sc = null;
try {
TrustManager[] tm = {new MyX509TrustManager(certFilePath, certPassword)};
sc = SSLContext.getInstance("SSL", "SunJSSE");
//也可以使用SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
sc.init(null, tm, new SecureRandom());
} catch (Exception e) {
e.printStackTrace();
}
SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sc, new NoopHostnameVerifier());
SecuredHttpClientConfigCallback httpClientConfigCallback = new SecuredHttpClientConfigCallback(sessionStrategy,
credentialsProvider);
RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, port, protocol))
.setRequestConfigCallback(requestConfig -> requestConfig.setConnectTimeout(connectTimeout)
.setConnectionRequestTimeout(connectionRequestTimeout)
.setSocketTimeout(socketTimeout))
.setHttpClientConfigCallback(httpClientConfigCallback);
final RestHighLevelClient client = new RestHighLevelClient(builder);
logger.info("opensearch rest client build success {} ", client);
ClusterHealthRequest request = new ClusterHealthRequest();
ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT);
logger.info("opensearch rest client health response {} ", response);
return client;
}
/**
* constructHttpHosts函数转换host集群节点IP列表
*/
public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) {
return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new);
}
/**
* SecuredHttpClientConfigCallback类定义
*/
static class SecuredHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback {
@Nullable
private final CredentialsProvider credentialsProvider;
private final SSLIOSessionStrategy sslStrategy;
SecuredHttpClientConfigCallback(final SSLIOSessionStrategy sslStrategy,
@Nullable final CredentialsProvider credentialsProvider) {
this.sslStrategy = Objects.requireNonNull(sslStrategy);
this.credentialsProvider = credentialsProvider;
}
@Nullable
CredentialsProvider getCredentialsProvider() {
return credentialsProvider;
}
SSLIOSessionStrategy getSSLStrategy() {
return sslStrategy;
}
@Override
public HttpAsyncClientBuilder customizeHttpClient(final HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.setSSLStrategy(sslStrategy);
if (credentialsProvider != null) {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
return httpClientBuilder;
}
}
private static final Logger logger = LogManager.getLogger(RestHighLevelClient.class);
public static class MyX509TrustManager implements X509TrustManager {
X509TrustManager sunJSSEX509TrustManager;
MyX509TrustManager(String certFilePath, String certPassword) throws Exception {
File file = new File(certFilePath);
if (!file.isFile()) {
throw new Exception("Wrong Certification Path");
}
System.out.println("Loading KeyStore " + file + "...");
InputStream in = new FileInputStream(file);
KeyStore ks = KeyStore.getInstance("JKS");
ks.load(in, certPassword.toCharArray());
TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509", "SunJSSE");
tmf.init(ks);
TrustManager[] tms = tmf.getTrustManagers();
for (TrustManager tm : tms) {
if (tm instanceof X509TrustManager) {
sunJSSEX509TrustManager = (X509TrustManager) tm;
return;
}
}
throw new Exception("Couldn't initialize");
}
@Override
public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
}
@Override
public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
}
/**
* main函数参考如下,调用上面的create函数创建客户端,查询test索引是否存在
*/
public static void main(String[] args) throws IOException {
RestHighLevelClient client = create(Arrays.asList("{host}"), 9200, "https", 30000, 30000, 30000, "username", "password", "certFilePath", "certPassword");
GetIndexRequest indexRequest = new GetIndexRequest("test");
boolean exists = client.indices().exists(indexRequest, RequestOptions.DEFAULT);
System.out.println(exists);
client.close();
}
}
|
参数 |
描述 |
|---|---|
|
host |
集群访问地址,当存在多个IP地址时,中间用“,”隔开。 |
|
port |
集群的连接端口,默认是“9200”。 |
|
protocol |
连接协议,此处填写“https”。 |
|
connectTimeout |
socket连接超时时间(毫秒)。 |
|
connectionRequestTimeout |
socket连接请求超时时间(毫秒)。 |
|
socketTimeout |
socket请求超时时间(毫秒)。 |
|
username |
访问集群的用户名。 |
|
password |
用户名对应的密码。 |
|
certFilePath |
安全证书路径。 |
|
certPassword |
安全证书密码。 |
该示例代码为判断集群是否存在test索引,当返回“true”或“false”时,表示正常返回查询结果,集群连接成功。
Elasticsearch RestHighLevelClient连接非安全集群
通过Elasticsearch 7.10.2 RestHighLevelClient连接非安全模式的OpenSearch集群,并查询索引“test”是否存在。代码示例如下:
import org.apache.http.HttpHost;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
public class RestHighLevelClientExample {
public static void main(String[] args) throws IOException {
List<String> host = Arrays.asList("{集群访问地址}");
RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, 9200, "http"));
final RestHighLevelClient client = new RestHighLevelClient(builder);
GetIndexRequest indexRequest = new GetIndexRequest("test");
boolean exists = client.indices().exists(indexRequest, RequestOptions.DEFAULT);
System.out.println(exists);
client.close();
}
/**
* constructHttpHosts函数转换host集群节点IP列表
*/
public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) {
return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new);
}
}
该示例代码为判断集群是否存在test索引,当返回“true”或“false”时,表示正常返回查询结果,集群连接成功。
Elasticsearch RestHighLevelClient连接安全集群(无证书)
使用Elasticsearch 7.10.2 RestHighLevelClient在不加载安全证书的情况下连接安全模式的OpenSearch集群(支持HTTP协议或HTTPS协议),查询索引“test”是否存在。代码示例如下:
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.Nullable;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
/**
* Rest High Level连接安全集群(不使用证书)
*/
public class RestHighLevelClientExample {
/**
* 创建客户端的类,定义create函数用于创建客户端
*/
public static RestHighLevelClient create(List<String> host, int port, String protocol, int connectTimeout, int connectionRequestTimeout, int socketTimeout, String username, String password) throws IOException{
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
SSLContext sc = null;
try {
sc = SSLContext.getInstance("SSL");
sc.init(null, trustAllCerts, new SecureRandom());
} catch (KeyManagementException | NoSuchAlgorithmException e) {
e.printStackTrace();
}
SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sc, new NullHostNameVerifier());
SecuredHttpClientConfigCallback httpClientConfigCallback = new SecuredHttpClientConfigCallback(sessionStrategy,
credentialsProvider);
RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, port, protocol))
.setRequestConfigCallback(requestConfig -> requestConfig.setConnectTimeout(connectTimeout)
.setConnectionRequestTimeout(connectionRequestTimeout)
.setSocketTimeout(socketTimeout))
.setHttpClientConfigCallback(httpClientConfigCallback);
final RestHighLevelClient client = new RestHighLevelClient(builder);
logger.info("es rest client build success {} ", client);
ClusterHealthRequest request = new ClusterHealthRequest();
ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT);
logger.info("es rest client health response {} ", response);
return client;
}
/**
* constructHttpHosts函数转换host集群节点IP列表
*/
public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) {
return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new);
}
/**
* trustAllCerts忽略证书配置
*/
public static TrustManager[] trustAllCerts = new TrustManager[] {
new X509TrustManager() {
@Override
public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
}
@Override
public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return null;
}
}
};
private static final Logger logger = LogManager.getLogger(RestHighLevelClientExampleHttpWithSecurityNoCert.class);
static class SecuredHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback {
@Nullable
private final CredentialsProvider credentialsProvider;
/**
* The {@link SSLIOSessionStrategy} for all requests to enable SSL / TLS encryption.
*/
private final SSLIOSessionStrategy sslStrategy;
/**
* Create a new {@link SecuredHttpClientConfigCallback}.
*
* @param credentialsProvider The credential provider, if a username/password have been supplied
* @param sslStrategy The SSL strategy, if SSL / TLS have been supplied
* @throws NullPointerException if {@code sslStrategy} is {@code null}
*/
SecuredHttpClientConfigCallback(final SSLIOSessionStrategy sslStrategy,
@Nullable final CredentialsProvider credentialsProvider) {
this.sslStrategy = Objects.requireNonNull(sslStrategy);
this.credentialsProvider = credentialsProvider;
}
/**
* Get the {@link CredentialsProvider} that will be added to the HTTP client.
*
* @return Can be {@code null}.
*/
@Nullable
CredentialsProvider getCredentialsProvider() {
return credentialsProvider;
}
/**
* Get the {@link SSLIOSessionStrategy} that will be added to the HTTP client.
*
* @return Never {@code null}.
*/
SSLIOSessionStrategy getSSLStrategy() {
return sslStrategy;
}
/**
* Sets the {@linkplain HttpAsyncClientBuilder#setDefaultCredentialsProvider(CredentialsProvider) credential provider},
*
* @param httpClientBuilder The client to configure.
* @return Always {@code httpClientBuilder}.
*/
@Override
public HttpAsyncClientBuilder customizeHttpClient(final HttpAsyncClientBuilder httpClientBuilder) {
// enable SSL / TLS
httpClientBuilder.setSSLStrategy(sslStrategy);
// enable user authentication
if (credentialsProvider != null) {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
return httpClientBuilder;
}
}
public static class NullHostNameVerifier implements HostnameVerifier {
@Override
public boolean verify(String arg0, SSLSession arg1) {
return true;
}
}
/**
* main函数参考如下,调用上面的create函数创建客户端,查询test索引是否存在
*/
public static void main(String[] args) throws IOException {
RestHighLevelClient client = create(Arrays.asList("{host}"), 9200, "https", 30000, 30000, 30000, "username", "password");
GetIndexRequest indexRequest = new GetIndexRequest("test");
boolean exists = client.indices().exists(indexRequest, RequestOptions.DEFAULT);
System.out.println(exists);
client.close();
}
}
|
参数 |
描述 |
|---|---|
|
host |
集群访问地址,当存在多个IP地址时,中间用“,”隔开。 |
|
port |
集群的连接端口,默认是“9200”。 |
|
protocol |
连接协议,“http”或者“https”,根据集群实际情况填写。 |
|
connectTimeout |
socket连接超时时间(毫秒)。 |
|
connectionRequestTimeout |
socket连接请求超时时间(毫秒)。 |
|
socketTimeout |
socket请求超时时间(毫秒)。 |
|
username |
访问集群的用户名。 |
|
password |
用户名对应的密码。 |
该示例代码为判断集群是否存在test索引,当返回“true”或“false”时,表示正常返回查询结果,集群连接成功。
Elasticsearch RestHighLevelClient连接安全集群(加载证书)
连接安全模式+HTTPS协议的OpenSearch集群需要先提前准备好安全证书,操作指导请参见获取并上传安全证书。
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.Nullable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.KeyStore;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;
/**
* Rest Hive Level连接安全集群(使用https证书)
*/
public class RestHighLevelClientExample {
public static RestHighLevelClient create(List<String> host, int port, String protocol, int connectTimeout, int connectionRequestTimeout, int socketTimeout, String username, String password, String certFilePath,
String certPassword) throws IOException {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
SSLContext sc = null;
try {
TrustManager[] tm = {new MyX509TrustManager(certFilePath, certPassword)};
sc = SSLContext.getInstance("SSL", "SunJSSE");
//也可以使用SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
sc.init(null, tm, new SecureRandom());
} catch (Exception e) {
e.printStackTrace();
}
SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sc, new NoopHostnameVerifier());
SecuredHttpClientConfigCallback httpClientConfigCallback = new SecuredHttpClientConfigCallback(sessionStrategy,
credentialsProvider);
RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, port, protocol))
.setRequestConfigCallback(requestConfig -> requestConfig.setConnectTimeout(connectTimeout)
.setConnectionRequestTimeout(connectionRequestTimeout)
.setSocketTimeout(socketTimeout))
.setHttpClientConfigCallback(httpClientConfigCallback);
final RestHighLevelClient client = new RestHighLevelClient(builder);
logger.info("es rest client build success {} ", client);
ClusterHealthRequest request = new ClusterHealthRequest();
ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT);
logger.info("es rest client health response {} ", response);
return client;
}
/**
* constructHttpHosts函数转换host集群节点IP列表
*/
public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) {
return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new);
}
/**
* SecuredHttpClientConfigCallback类定义
*/
static class SecuredHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback {
@Nullable
private final CredentialsProvider credentialsProvider;
private final SSLIOSessionStrategy sslStrategy;
SecuredHttpClientConfigCallback(final SSLIOSessionStrategy sslStrategy,
@Nullable final CredentialsProvider credentialsProvider) {
this.sslStrategy = Objects.requireNonNull(sslStrategy);
this.credentialsProvider = credentialsProvider;
}
@Nullable
CredentialsProvider getCredentialsProvider() {
return credentialsProvider;
}
SSLIOSessionStrategy getSSLStrategy() {
return sslStrategy;
}
@Override
public HttpAsyncClientBuilder customizeHttpClient(final HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.setSSLStrategy(sslStrategy);
if (credentialsProvider != null) {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
return httpClientBuilder;
}
}
private static final Logger logger = LogManager.getLogger(RestHighLevelClient.class);
public static class MyX509TrustManager implements X509TrustManager {
X509TrustManager sunJSSEX509TrustManager;
MyX509TrustManager(String certFilePath, String certPassword) throws Exception {
File file = new File(certFilePath);
if (!file.isFile()) {
throw new Exception("Wrong Certification Path");
}
System.out.println("Loading KeyStore " + file + "...");
InputStream in = new FileInputStream(file);
KeyStore ks = KeyStore.getInstance("JKS");
ks.load(in, certPassword.toCharArray());
TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509", "SunJSSE");
tmf.init(ks);
TrustManager[] tms = tmf.getTrustManagers();
for (TrustManager tm : tms) {
if (tm instanceof X509TrustManager) {
sunJSSEX509TrustManager = (X509TrustManager) tm;
return;
}
}
throw new Exception("Couldn't initialize");
}
@Override
public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
}
@Override
public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
}
/**
* main函数参考如下,调用上面的create函数创建客户端,查询test索引是否存在
*/
public static void main(String[] args) throws IOException {
RestHighLevelClient client = create(Arrays.asList("{host}"), 9200, "https", 30000, 30000, 30000, "username", "password", "certFilePath", "certPassword");
GetIndexRequest indexRequest = new GetIndexRequest("test");
boolean exists = client.indices().exists(indexRequest, RequestOptions.DEFAULT);
System.out.println(exists);
client.close();
}
}
|
参数 |
描述 |
|---|---|
|
host |
集群访问地址,当存在多个IP地址时,中间用“,”隔开。 |
|
port |
集群的连接端口,默认是“9200”。 |
|
protocol |
连接协议,此处填写“https”。 |
|
connectTimeout |
socket连接超时时间(毫秒)。 |
|
connectionRequestTimeout |
socket连接请求超时时间(毫秒)。 |
|
socketTimeout |
socket请求超时时间(毫秒)。 |
|
username |
访问集群的用户名。 |
|
password |
用户名对应的密码。 |
|
certFilePath |
安全证书路径。 |
|
certPassword |
安全证书密码。 |
该示例代码为判断集群是否存在test索引,当返回“true”或“false”时,表示正常返回查询结果,集群连接成功。
获取并上传安全证书
当接入安全模式+HTTPS协议的OpenSearch集群时,必须加载安全证书。可以参考如下步骤获取安全证书,并上传至客户端。
- 获取安全证书(CloudSearchService.cer)。
- 登录云搜索服务管理控制台。
- 在左侧导航栏,选择“集群管理 > OpenSearch”。
- 在集群列表,单击目标集群名称,进入集群详情页。
- 选择“概览”页签,在“配置信息”下方,单击“HTTPS访问”后面的“下载证书”获取安全证书。
图1 下载安全证书
- 转换安全证书(CloudSearchService.cer)。将下载的安全证书上传到客户端上,使用keytool工具将“.cer”证书转换成Java可以读取的“.jks”证书格式。
- 在Linux系统中,执行如下命令转换证书。
keytool -import -alias newname -keystore ./truststore.jks -file ./CloudSearchService.cer - 在Windows系统中,执行如下命令转换证书。
keytool -import -alias newname -keystore .\truststore.jks -file .\CloudSearchService.cer
其中,newname是由用户自定义的证书名称。
该命令执行后,会提示设置证书密码,并确认密码。请保存该密码,后续接入集群会使用。
- 在Linux系统中,执行如下命令转换证书。