通过RestHighLevelClient接入OpenSearch集群
CSS服务的OpenSearch集群支持通过High Level REST Client进行数据查询和管理。High Level REST Client对OpenSearch的API进行了封装,用户只需要构造对应的结构即可对OpenSearch集群进行访问。Rest Client的详细使用指导请参见Java high-level REST client。
前提条件
- OpenSearch集群处于可用状态。
- 运行Java代码的服务器与OpenSearch集群之间网络互通。
- 根据集群的网络配置方式,获取集群的访问地址,操作指导请参见网络配置。
- 确认服务器已安装Java,要求JDK版本为11及以上,JDK11官网下载地址:Java Archive Downloads。
引入依赖
在运行Java代码的服务器中引入Java依赖。
CSS服务支持使用Elasticsearch 7.10.2 Java客户端连接OpenSearch集群,但是为确保更好的兼容性,建议使用与OpenSearch集群同版本的OpenSearch Java客户端连接OpenSearch集群。
当使用比OpenSearch集群更高版本的High Level REST Client,并且某些请求存在兼容性问题时,可以使用“RestHighLevelClient.getLowLevelClient()”方式直接获取Low Level REST Client,实现自定义的OpenSearch请求。操作指导请参见通过RestLowLevelClient接入OpenSearch集群。
可以根据实际使用的Java客户端类型选择参考示例。
接入集群
当使用不同类型的Java客户端库访问不同安全类型的OpenSearch集群时,示例代码会有差异,请根据实际业务场景选择参考文档。
Java客户端库 |
OpenSearch集群安全类型 |
是否加载安全证书 |
参考文档 |
---|---|---|---|
OpenSearch |
非安全模式 |
- |
|
OpenSearch |
安全模式+HTTP协议 安全模式+HTTPS协议 |
否 |
|
OpenSearch |
安全模式+HTTPS协议 |
是 |
|
Elasticsearch 7.10.2 |
非安全模式 |
- |
|
Elasticsearch 7.10.2 |
安全模式+HTTP协议 安全模式+HTTPS协议 |
否 |
|
Elasticsearch 7.10.2 |
安全模式+HTTPS协议 |
是 |
OpenSearch RestHighLevelClient连接非安全集群
使用OpenSearch RestHighLevelClient连接非安全模式的OpenSearch集群,并查询索引“test”是否存在。代码示例如下:
import org.apache.http.HttpHost; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestClient; import org.opensearch.client.RestClientBuilder; import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.indices.GetIndexRequest; import java.io.IOException; import java.util.Arrays; import java.util.List; public class RestHighLevelClientExample { public static void main(String[] args) throws IOException { List<String> host = Arrays.asList("{集群访问地址}"); RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, 9200, "http")); final RestHighLevelClient client = new RestHighLevelClient(builder); GetIndexRequest indexRequest = new GetIndexRequest("test"); boolean exists = client.indices().exists(indexRequest, RequestOptions.DEFAULT); System.out.println(exists); client.close(); } /** * constructHttpHosts函数转换host集群节点IP列表 */ public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) { return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new); } }
该示例代码为判断集群是否存在test索引,当返回“true”或“false”时,表示正常返回查询结果,集群连接成功。
OpenSearch RestHighLevelClient连接安全集群(无证书)
使用OpenSearch RestHighLevelClient在不加载安全证书的情况下连接安全模式的OpenSearch集群(支持HTTP协议或HTTPS协议),查询索引“test”是否存在。代码示例如下:
import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestClient; import org.opensearch.client.RestClientBuilder; import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.common.Nullable; import java.io.IOException; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.Arrays; import java.util.List; import java.util.Objects; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSession; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; /** * Rest High Level连接安全集群(不使用证书) */ public class RestHighLevelClientExample { /** * 创建客户端的类,定义create函数用于创建客户端 */ public static RestHighLevelClient create(List<String> host, int port, String protocol, int connectTimeout, int connectionRequestTimeout, int socketTimeout, String username, String password) throws IOException{ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); SSLContext sc = null; try { sc = SSLContext.getInstance("SSL"); sc.init(null, trustAllCerts, new SecureRandom()); } catch (KeyManagementException | NoSuchAlgorithmException e) { e.printStackTrace(); } SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sc, new NullHostNameVerifier()); SecuredHttpClientConfigCallback httpClientConfigCallback = new SecuredHttpClientConfigCallback(sessionStrategy, credentialsProvider); RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, port, protocol)) .setRequestConfigCallback(requestConfig -> requestConfig.setConnectTimeout(connectTimeout) .setConnectionRequestTimeout(connectionRequestTimeout) .setSocketTimeout(socketTimeout)) .setHttpClientConfigCallback(httpClientConfigCallback); final RestHighLevelClient client = new RestHighLevelClient(builder); logger.info("opensearch rest client build success {} ", client); ClusterHealthRequest request = new ClusterHealthRequest(); ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT); logger.info("opensearch rest client health response {} ", response); return client; } /** * constructHttpHosts函数转换host集群节点IP列表 */ public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) { return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new); } /** * trustAllCerts忽略证书配置 */ public static TrustManager[] trustAllCerts = new TrustManager[] { new X509TrustManager() { @Override public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { } @Override public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { } @Override public X509Certificate[] getAcceptedIssuers() { return null; } } }; private static final Logger logger = LogManager.getLogger(RestHighLevelClientExampleHttpWithSecurityNoCert.class); static class SecuredHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback { @Nullable private final CredentialsProvider credentialsProvider; /** * The {@link SSLIOSessionStrategy} for all requests to enable SSL / TLS encryption. */ private final SSLIOSessionStrategy sslStrategy; /** * Create a new {@link SecuredHttpClientConfigCallback}. * * @param credentialsProvider The credential provider, if a username/password have been supplied * @param sslStrategy The SSL strategy, if SSL / TLS have been supplied * @throws NullPointerException if {@code sslStrategy} is {@code null} */ SecuredHttpClientConfigCallback(final SSLIOSessionStrategy sslStrategy, @Nullable final CredentialsProvider credentialsProvider) { this.sslStrategy = Objects.requireNonNull(sslStrategy); this.credentialsProvider = credentialsProvider; } /** * Get the {@link CredentialsProvider} that will be added to the HTTP client. * * @return Can be {@code null}. */ @Nullable CredentialsProvider getCredentialsProvider() { return credentialsProvider; } /** * Get the {@link SSLIOSessionStrategy} that will be added to the HTTP client. * * @return Never {@code null}. */ SSLIOSessionStrategy getSSLStrategy() { return sslStrategy; } /** * Sets the {@linkplain HttpAsyncClientBuilder#setDefaultCredentialsProvider(CredentialsProvider) credential provider}, * * @param httpClientBuilder The client to configure. * @return Always {@code httpClientBuilder}. */ @Override public HttpAsyncClientBuilder customizeHttpClient(final HttpAsyncClientBuilder httpClientBuilder) { // enable SSL / TLS httpClientBuilder.setSSLStrategy(sslStrategy); // enable user authentication if (credentialsProvider != null) { httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } return httpClientBuilder; } } public static class NullHostNameVerifier implements HostnameVerifier { @Override public boolean verify(String arg0, SSLSession arg1) { return true; } } /** * main函数参考如下,调用上面的create函数创建客户端,查询test索引是否存在 */ public static void main(String[] args) throws IOException { RestHighLevelClient client = create(Arrays.asList("{host}"), 9200, "https", 30000, 30000, 30000, "username", "password"); GetIndexRequest indexRequest = new GetIndexRequest("test"); boolean exists = client.indices().exists(indexRequest, RequestOptions.DEFAULT); System.out.println(exists); client.close(); } }
参数 |
描述 |
---|---|
host |
集群访问地址,当存在多个IP地址时,中间用“,”隔开。 |
port |
集群的连接端口,默认是“9200”。 |
protocol |
连接协议,“http”或者“https”,根据集群实际情况填写。 |
connectTimeout |
socket连接超时时间(毫秒)。 |
connectionRequestTimeout |
socket连接请求超时时间(毫秒)。 |
socketTimeout |
socket请求超时时间(毫秒)。 |
username |
访问集群的用户名。 |
password |
用户名对应的密码。 |
该示例代码为判断集群是否存在test索引,当返回“true”或“false”时,表示正常返回查询结果,集群连接成功。
OpenSearch RestHighLevelClient连接安全集群(加载证书)
使用OpenSearch RestHighLevelClient在加载安全证书的情况下连接安全模式+HTTPS协议的OpenSearch集群,查询索引“test”是否存在。代码示例如下:

连接安全模式+HTTPS协议的OpenSearch集群需要先提前准备好安全证书,操作指导请参见获取并上传安全证书。
import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestClient; import org.opensearch.client.RestClientBuilder; import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.common.Nullable; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.security.KeyStore; import java.security.SecureRandom; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.Arrays; import java.util.List; import java.util.Objects; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.X509TrustManager; /** * Rest Hive Level连接安全集群(使用https证书) */ public class RestHighLevelClientExample { public static RestHighLevelClient create(List<String> host, int port, String protocol, int connectTimeout, int connectionRequestTimeout, int socketTimeout, String username, String password, String certFilePath, String certPassword) throws IOException { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); SSLContext sc = null; try { TrustManager[] tm = {new MyX509TrustManager(certFilePath, certPassword)}; sc = SSLContext.getInstance("SSL", "SunJSSE"); //也可以使用SSLContext sslContext = SSLContext.getInstance("TLSv1.2"); sc.init(null, tm, new SecureRandom()); } catch (Exception e) { e.printStackTrace(); } SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sc, new NoopHostnameVerifier()); SecuredHttpClientConfigCallback httpClientConfigCallback = new SecuredHttpClientConfigCallback(sessionStrategy, credentialsProvider); RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, port, protocol)) .setRequestConfigCallback(requestConfig -> requestConfig.setConnectTimeout(connectTimeout) .setConnectionRequestTimeout(connectionRequestTimeout) .setSocketTimeout(socketTimeout)) .setHttpClientConfigCallback(httpClientConfigCallback); final RestHighLevelClient client = new RestHighLevelClient(builder); logger.info("opensearch rest client build success {} ", client); ClusterHealthRequest request = new ClusterHealthRequest(); ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT); logger.info("opensearch rest client health response {} ", response); return client; } /** * constructHttpHosts函数转换host集群节点IP列表 */ public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) { return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new); } /** * SecuredHttpClientConfigCallback类定义 */ static class SecuredHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback { @Nullable private final CredentialsProvider credentialsProvider; private final SSLIOSessionStrategy sslStrategy; SecuredHttpClientConfigCallback(final SSLIOSessionStrategy sslStrategy, @Nullable final CredentialsProvider credentialsProvider) { this.sslStrategy = Objects.requireNonNull(sslStrategy); this.credentialsProvider = credentialsProvider; } @Nullable CredentialsProvider getCredentialsProvider() { return credentialsProvider; } SSLIOSessionStrategy getSSLStrategy() { return sslStrategy; } @Override public HttpAsyncClientBuilder customizeHttpClient(final HttpAsyncClientBuilder httpClientBuilder) { httpClientBuilder.setSSLStrategy(sslStrategy); if (credentialsProvider != null) { httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } return httpClientBuilder; } } private static final Logger logger = LogManager.getLogger(RestHighLevelClient.class); public static class MyX509TrustManager implements X509TrustManager { X509TrustManager sunJSSEX509TrustManager; MyX509TrustManager(String certFilePath, String certPassword) throws Exception { File file = new File(certFilePath); if (!file.isFile()) { throw new Exception("Wrong Certification Path"); } System.out.println("Loading KeyStore " + file + "..."); InputStream in = new FileInputStream(file); KeyStore ks = KeyStore.getInstance("JKS"); ks.load(in, certPassword.toCharArray()); TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509", "SunJSSE"); tmf.init(ks); TrustManager[] tms = tmf.getTrustManagers(); for (TrustManager tm : tms) { if (tm instanceof X509TrustManager) { sunJSSEX509TrustManager = (X509TrustManager) tm; return; } } throw new Exception("Couldn't initialize"); } @Override public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { } @Override public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { } @Override public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; } } /** * main函数参考如下,调用上面的create函数创建客户端,查询test索引是否存在 */ public static void main(String[] args) throws IOException { RestHighLevelClient client = create(Arrays.asList("{host}"), 9200, "https", 30000, 30000, 30000, "username", "password", "certFilePath", "certPassword"); GetIndexRequest indexRequest = new GetIndexRequest("test"); boolean exists = client.indices().exists(indexRequest, RequestOptions.DEFAULT); System.out.println(exists); client.close(); } }
参数 |
描述 |
---|---|
host |
集群访问地址,当存在多个IP地址时,中间用“,”隔开。 |
port |
集群的连接端口,默认是“9200”。 |
protocol |
连接协议,此处填写“https”。 |
connectTimeout |
socket连接超时时间(毫秒)。 |
connectionRequestTimeout |
socket连接请求超时时间(毫秒)。 |
socketTimeout |
socket请求超时时间(毫秒)。 |
username |
访问集群的用户名。 |
password |
用户名对应的密码。 |
certFilePath |
安全证书路径。 |
certPassword |
安全证书密码。 |
该示例代码为判断集群是否存在test索引,当返回“true”或“false”时,表示正常返回查询结果,集群连接成功。
Elasticsearch RestHighLevelClient连接非安全集群
通过Elasticsearch 7.10.2 RestHighLevelClient连接非安全模式的OpenSearch集群,并查询索引“test”是否存在。代码示例如下:
import org.apache.http.HttpHost; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.indices.GetIndexRequest; import java.io.IOException; import java.util.Arrays; import java.util.List; public class RestHighLevelClientExample { public static void main(String[] args) throws IOException { List<String> host = Arrays.asList("{集群访问地址}"); RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, 9200, "http")); final RestHighLevelClient client = new RestHighLevelClient(builder); GetIndexRequest indexRequest = new GetIndexRequest("test"); boolean exists = client.indices().exists(indexRequest, RequestOptions.DEFAULT); System.out.println(exists); client.close(); } /** * constructHttpHosts函数转换host集群节点IP列表 */ public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) { return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new); } }
该示例代码为判断集群是否存在test索引,当返回“true”或“false”时,表示正常返回查询结果,集群连接成功。
Elasticsearch RestHighLevelClient连接安全集群(无证书)
使用Elasticsearch 7.10.2 RestHighLevelClient在不加载安全证书的情况下连接安全模式的OpenSearch集群(支持HTTP协议或HTTPS协议),查询索引“test”是否存在。代码示例如下:
import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.indices.GetIndexRequest; import org.elasticsearch.common.Nullable; import java.io.IOException; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.Arrays; import java.util.List; import java.util.Objects; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSession; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; /** * Rest High Level连接安全集群(不使用证书) */ public class RestHighLevelClientExample { /** * 创建客户端的类,定义create函数用于创建客户端 */ public static RestHighLevelClient create(List<String> host, int port, String protocol, int connectTimeout, int connectionRequestTimeout, int socketTimeout, String username, String password) throws IOException{ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); SSLContext sc = null; try { sc = SSLContext.getInstance("SSL"); sc.init(null, trustAllCerts, new SecureRandom()); } catch (KeyManagementException | NoSuchAlgorithmException e) { e.printStackTrace(); } SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sc, new NullHostNameVerifier()); SecuredHttpClientConfigCallback httpClientConfigCallback = new SecuredHttpClientConfigCallback(sessionStrategy, credentialsProvider); RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, port, protocol)) .setRequestConfigCallback(requestConfig -> requestConfig.setConnectTimeout(connectTimeout) .setConnectionRequestTimeout(connectionRequestTimeout) .setSocketTimeout(socketTimeout)) .setHttpClientConfigCallback(httpClientConfigCallback); final RestHighLevelClient client = new RestHighLevelClient(builder); logger.info("es rest client build success {} ", client); ClusterHealthRequest request = new ClusterHealthRequest(); ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT); logger.info("es rest client health response {} ", response); return client; } /** * constructHttpHosts函数转换host集群节点IP列表 */ public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) { return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new); } /** * trustAllCerts忽略证书配置 */ public static TrustManager[] trustAllCerts = new TrustManager[] { new X509TrustManager() { @Override public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { } @Override public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { } @Override public X509Certificate[] getAcceptedIssuers() { return null; } } }; private static final Logger logger = LogManager.getLogger(RestHighLevelClientExampleHttpWithSecurityNoCert.class); static class SecuredHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback { @Nullable private final CredentialsProvider credentialsProvider; /** * The {@link SSLIOSessionStrategy} for all requests to enable SSL / TLS encryption. */ private final SSLIOSessionStrategy sslStrategy; /** * Create a new {@link SecuredHttpClientConfigCallback}. * * @param credentialsProvider The credential provider, if a username/password have been supplied * @param sslStrategy The SSL strategy, if SSL / TLS have been supplied * @throws NullPointerException if {@code sslStrategy} is {@code null} */ SecuredHttpClientConfigCallback(final SSLIOSessionStrategy sslStrategy, @Nullable final CredentialsProvider credentialsProvider) { this.sslStrategy = Objects.requireNonNull(sslStrategy); this.credentialsProvider = credentialsProvider; } /** * Get the {@link CredentialsProvider} that will be added to the HTTP client. * * @return Can be {@code null}. */ @Nullable CredentialsProvider getCredentialsProvider() { return credentialsProvider; } /** * Get the {@link SSLIOSessionStrategy} that will be added to the HTTP client. * * @return Never {@code null}. */ SSLIOSessionStrategy getSSLStrategy() { return sslStrategy; } /** * Sets the {@linkplain HttpAsyncClientBuilder#setDefaultCredentialsProvider(CredentialsProvider) credential provider}, * * @param httpClientBuilder The client to configure. * @return Always {@code httpClientBuilder}. */ @Override public HttpAsyncClientBuilder customizeHttpClient(final HttpAsyncClientBuilder httpClientBuilder) { // enable SSL / TLS httpClientBuilder.setSSLStrategy(sslStrategy); // enable user authentication if (credentialsProvider != null) { httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } return httpClientBuilder; } } public static class NullHostNameVerifier implements HostnameVerifier { @Override public boolean verify(String arg0, SSLSession arg1) { return true; } } /** * main函数参考如下,调用上面的create函数创建客户端,查询test索引是否存在 */ public static void main(String[] args) throws IOException { RestHighLevelClient client = create(Arrays.asList("{host}"), 9200, "https", 30000, 30000, 30000, "username", "password"); GetIndexRequest indexRequest = new GetIndexRequest("test"); boolean exists = client.indices().exists(indexRequest, RequestOptions.DEFAULT); System.out.println(exists); client.close(); } }
参数 |
描述 |
---|---|
host |
集群访问地址,当存在多个IP地址时,中间用“,”隔开。 |
port |
集群的连接端口,默认是“9200”。 |
protocol |
连接协议,“http”或者“https”,根据集群实际情况填写。 |
connectTimeout |
socket连接超时时间(毫秒)。 |
connectionRequestTimeout |
socket连接请求超时时间(毫秒)。 |
socketTimeout |
socket请求超时时间(毫秒)。 |
username |
访问集群的用户名。 |
password |
用户名对应的密码。 |
该示例代码为判断集群是否存在test索引,当返回“true”或“false”时,表示正常返回查询结果,集群连接成功。
Elasticsearch RestHighLevelClient连接安全集群(加载证书)

连接安全模式+HTTPS协议的OpenSearch集群需要先提前准备好安全证书,操作指导请参见获取并上传安全证书。
import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.indices.GetIndexRequest; import org.elasticsearch.common.Nullable; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.security.KeyStore; import java.security.SecureRandom; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.Arrays; import java.util.List; import java.util.Objects; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.X509TrustManager; /** * Rest Hive Level连接安全集群(使用https证书) */ public class RestHighLevelClientExample { public static RestHighLevelClient create(List<String> host, int port, String protocol, int connectTimeout, int connectionRequestTimeout, int socketTimeout, String username, String password, String certFilePath, String certPassword) throws IOException { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); SSLContext sc = null; try { TrustManager[] tm = {new MyX509TrustManager(certFilePath, certPassword)}; sc = SSLContext.getInstance("SSL", "SunJSSE"); //也可以使用SSLContext sslContext = SSLContext.getInstance("TLSv1.2"); sc.init(null, tm, new SecureRandom()); } catch (Exception e) { e.printStackTrace(); } SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sc, new NoopHostnameVerifier()); SecuredHttpClientConfigCallback httpClientConfigCallback = new SecuredHttpClientConfigCallback(sessionStrategy, credentialsProvider); RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, port, protocol)) .setRequestConfigCallback(requestConfig -> requestConfig.setConnectTimeout(connectTimeout) .setConnectionRequestTimeout(connectionRequestTimeout) .setSocketTimeout(socketTimeout)) .setHttpClientConfigCallback(httpClientConfigCallback); final RestHighLevelClient client = new RestHighLevelClient(builder); logger.info("es rest client build success {} ", client); ClusterHealthRequest request = new ClusterHealthRequest(); ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT); logger.info("es rest client health response {} ", response); return client; } /** * constructHttpHosts函数转换host集群节点IP列表 */ public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) { return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new); } /** * SecuredHttpClientConfigCallback类定义 */ static class SecuredHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback { @Nullable private final CredentialsProvider credentialsProvider; private final SSLIOSessionStrategy sslStrategy; SecuredHttpClientConfigCallback(final SSLIOSessionStrategy sslStrategy, @Nullable final CredentialsProvider credentialsProvider) { this.sslStrategy = Objects.requireNonNull(sslStrategy); this.credentialsProvider = credentialsProvider; } @Nullable CredentialsProvider getCredentialsProvider() { return credentialsProvider; } SSLIOSessionStrategy getSSLStrategy() { return sslStrategy; } @Override public HttpAsyncClientBuilder customizeHttpClient(final HttpAsyncClientBuilder httpClientBuilder) { httpClientBuilder.setSSLStrategy(sslStrategy); if (credentialsProvider != null) { httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } return httpClientBuilder; } } private static final Logger logger = LogManager.getLogger(RestHighLevelClient.class); public static class MyX509TrustManager implements X509TrustManager { X509TrustManager sunJSSEX509TrustManager; MyX509TrustManager(String certFilePath, String certPassword) throws Exception { File file = new File(certFilePath); if (!file.isFile()) { throw new Exception("Wrong Certification Path"); } System.out.println("Loading KeyStore " + file + "..."); InputStream in = new FileInputStream(file); KeyStore ks = KeyStore.getInstance("JKS"); ks.load(in, certPassword.toCharArray()); TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509", "SunJSSE"); tmf.init(ks); TrustManager[] tms = tmf.getTrustManagers(); for (TrustManager tm : tms) { if (tm instanceof X509TrustManager) { sunJSSEX509TrustManager = (X509TrustManager) tm; return; } } throw new Exception("Couldn't initialize"); } @Override public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { } @Override public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { } @Override public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; } } /** * main函数参考如下,调用上面的create函数创建客户端,查询test索引是否存在 */ public static void main(String[] args) throws IOException { RestHighLevelClient client = create(Arrays.asList("{host}"), 9200, "https", 30000, 30000, 30000, "username", "password", "certFilePath", "certPassword"); GetIndexRequest indexRequest = new GetIndexRequest("test"); boolean exists = client.indices().exists(indexRequest, RequestOptions.DEFAULT); System.out.println(exists); client.close(); } }
参数 |
描述 |
---|---|
host |
集群访问地址,当存在多个IP地址时,中间用“,”隔开。 |
port |
集群的连接端口,默认是“9200”。 |
protocol |
连接协议,此处填写“https”。 |
connectTimeout |
socket连接超时时间(毫秒)。 |
connectionRequestTimeout |
socket连接请求超时时间(毫秒)。 |
socketTimeout |
socket请求超时时间(毫秒)。 |
username |
访问集群的用户名。 |
password |
用户名对应的密码。 |
certFilePath |
安全证书路径。 |
certPassword |
安全证书密码。 |
该示例代码为判断集群是否存在test索引,当返回“true”或“false”时,表示正常返回查询结果,集群连接成功。
获取并上传安全证书
当接入安全模式+HTTPS协议的OpenSearch集群时,必须加载安全证书。可以参考如下步骤获取安全证书,并上传至客户端。
- 获取安全证书(CloudSearchService.cer)。
- 登录云搜索服务管理控制台。
- 在左侧导航栏,选择“集群管理 > OpenSearch”。
- 在集群列表,单击目标集群名称,进入集群详情页。
- 选择“概览”页签,在“配置信息”下方,单击“HTTPS访问”后面的“下载证书”获取安全证书。
图1 下载安全证书
- 转换安全证书(CloudSearchService.cer)。将下载的安全证书上传到客户端上,使用keytool工具将“.cer”证书转换成Java可以读取的“.jks”证书格式。
- 在Linux系统中,执行如下命令转换证书。
keytool -import -alias newname -keystore ./truststore.jks -file ./CloudSearchService.cer
- 在Windows系统中,执行如下命令转换证书。
keytool -import -alias newname -keystore .\truststore.jks -file .\CloudSearchService.cer
其中,newname是由用户自定义的证书名称。
该命令执行后,会提示设置证书密码,并确认密码。请保存该密码,后续接入集群会使用。
- 在Linux系统中,执行如下命令转换证书。