通过Rest Low Level Client接入Elasticsearch集群
High Level Client是在Low Level Client基础上进行封装的,如果High Level Client中的方法调用(例如“.search”,“.bulk”)不能满足使用需求,或存在兼容性问题,可以选择使用Low Level Client方式,甚至可以使用“HighLevelClient.getLowLevelClient()”方式直接获取Low Level Client。使用Low Level Client发送请求时需要自定义请求结构,使用上更为灵活,能满足所有Elasticsearch支持的请求格式,例如GET、POST、DELETE、HEAD等。
本文介绍通过Rest Low Level Client访问CSS集群的配置说明。Rest Low Level Client接入集群有3种方式,每一种方式又分为直接创建Rest Client(即Rest Low Level Client)和通过创建High Level Client再调用getLowLevelClient()获取Low Level Client。
- 通过Rest Low Level Client连接非安全集群:适用于非安全模式的集群
- 通过Rest Low Level Client连接安全集群(不使用安全证书):适用于安全模式+HTTP协议的集群、安全模式+HTTPS协议的集群(忽略证书)
- 通过Rest Low Level Client连接安全集群(使用安全证书):适用于安全模式+HTTPS协议的集群
注意事项
建议Rest Low Level Client的版本和Elasticsearch的版本保持一致,例如需要访问的ES集群版本是7.6.2,则使用的Rest Low Level Client客户端版本建议也是7.6.2。
准备工作
- CSS集群处于可用状态。
- 确保运行Java代码的服务器与CSS集群的网络是互通的。
- 根据集群选择的网络配置方式,获取集群的访问地址,具体操作请参见网络配置。
- 确认服务器已安装JDK1.8,JDK1.8官网下载地址:http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html。
- 通过Maven方式引入apache版本。代码示例以7.6.2版本为例。
其中7.6.2为Elasticsearch Java客户端的版本号。
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client</artifactId> <version>7.6.2</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.6.2</version> </dependency>
通过Rest Low Level Client连接非安全集群
- 方式一:直接创建Rest Low Level Client
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
import org.apache.http.HttpHost; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import java.io.IOException; import java.util.Arrays; import java.util.List; public class Main { public static void main(String[] args) throws IOException { List<String> host = Arrays.asList("xxx.xxx.xxx.xxx", "xxx.xxx.xxx.xxx"); RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, 9200, "http")); /** * 创建Rest Low Level Client。 */ RestClient lowLevelClient = builder.build(); /** * 查询“test”索引是否存在。当索引存在时返回200,不存在时返回404。 */ Request request = new Request("HEAD", "/test"); Response response = lowLevelClient.performRequest(request); System.out.println(response.getStatusLine().getStatusCode()); lowLevelClient.close(); } /** * constructHttpHosts函数转换host集群节点ip列表。 */ public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) { return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new); } }
- 方式二:创建High Level Client再调用getLowLevelClient()获取Low Level Client
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
import org.apache.http.HttpHost; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import java.io.IOException; import java.util.Arrays; import java.util.List; public class Main { public static void main(String[] args) throws IOException { List<String> host = Arrays.asList("xxx.xxx.xxx.xxx", "xxx.xxx.xxx.xxx"); RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, 9200, "http")); final RestHighLevelClient restHighLevelClient = new RestHighLevelClient(builder); /** * 创建High Level Client再调用getLowLevelClient()获取Low Level Client,即client创建仅下面这一行代码存在差别。 */ final RestClient lowLevelClient = restHighLevelClient.getLowLevelClient(); /** * 查询“test”索引是否存在。当索引存在时返回200,不存在时返回404。 */ Request request = new Request("HEAD", "/test"); Response response = lowLevelClient.performRequest(request); System.out.println(response.getStatusLine().getStatusCode()); lowLevelClient.close(); } /** * constructHttpHosts函数转换host集群节点ip列表。 */ public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) { return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new); } }
其中,host为集群的访问地址,当存在多个IP地址时,中间用“,”隔开;test为查询的索引名称。
通过Rest Low Level Client连接安全集群(不使用安全证书)
- 方式一:直接创建Rest Low Level Client
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
import org.apache.http.HttpHost; import org.apache.http.HttpResponse; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.http.protocol.HttpContext; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.common.Nullable; import java.io.IOException; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.concurrent.TimeUnit; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSession; import javax.net.ssl.TrustManager;import javax.net.ssl.X509TrustManager; public class Main { /** * 创建客户端的类,定义create函数用于创建客户端。 */ public static RestClient create(List<String> host, int port, String protocol, int connectTimeout, int connectionRequestTimeout, int socketTimeout, String username, String password) throws IOException { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); SSLContext sc = null; try { sc = SSLContext.getInstance("SSL"); sc.init(null, trustAllCerts, new SecureRandom()); } catch (KeyManagementException | NoSuchAlgorithmException e) { e.printStackTrace(); } SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sc, new NullHostNameVerifier()); SecuredHttpClientConfigCallback httpClientConfigCallback = new SecuredHttpClientConfigCallback(sessionStrategy, credentialsProvider); RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, port, protocol)) .setRequestConfigCallback(requestConfig -> requestConfig.setConnectTimeout(connectTimeout) .setConnectionRequestTimeout(connectionRequestTimeout) .setSocketTimeout(socketTimeout)) .setHttpClientConfigCallback(httpClientConfigCallback); final RestClient client = builder.build(); logger.info("es rest client build success {} ", client); return client; } /** * constructHttpHosts函数转换host集群节点ip列表。 */ public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) { return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new); } /** * trustAllCerts忽略证书配置。 */ public static TrustManager[] trustAllCerts = new TrustManager[] { new X509TrustManager() { @Override public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { } @Override public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { } @Override public X509Certificate[] getAcceptedIssuers() { return null; } } }; /** * CustomConnectionKeepAliveStrategy函数设置连接的保活时间,主要应对大量短连接的情况和数据请求不高的场景。 */ public static class CustomConnectionKeepAliveStrategy extends DefaultConnectionKeepAliveStrategy { public static final CustomConnectionKeepAliveStrategy INSTANCE = new CustomConnectionKeepAliveStrategy(); private CustomConnectionKeepAliveStrategy() { super(); } /** * 最大keep alive的时间(分钟) * 这里默认为10分钟,可以根据实际情况设置。可以观察客户端机器状态为TIME_WAIT的TCP连接数,如果太多,可以增大此值。 */ private final long MAX_KEEP_ALIVE_MINUTES = 10; @Override public long getKeepAliveDuration(HttpResponse response, HttpContext context) { long keepAliveDuration = super.getKeepAliveDuration(response, context); // <0 为无限期keepalive // 将无限期替换成一个默认的时间 if (keepAliveDuration < 0) { return TimeUnit.MINUTES.toMillis(MAX_KEEP_ALIVE_MINUTES); } return keepAliveDuration; } } private static final Logger logger = LogManager.getLogger(Main.class); static class SecuredHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback { @Nullable private final CredentialsProvider credentialsProvider; /** * The {@link SSLIOSessionStrategy} for all requests to enable SSL / TLS encryption. */ private final SSLIOSessionStrategy sslStrategy; /** * Create a new {@link SecuredHttpClientConfigCallback}. * * @param credentialsProvider The credential provider, if a username/password have been supplied * @param sslStrategy The SSL strategy, if SSL / TLS have been supplied * @throws NullPointerException if {@code sslStrategy} is {@code null} */ SecuredHttpClientConfigCallback(final SSLIOSessionStrategy sslStrategy, @Nullable final CredentialsProvider credentialsProvider) { this.sslStrategy = Objects.requireNonNull(sslStrategy); this.credentialsProvider = credentialsProvider; } /** * Get the {@link CredentialsProvider} that will be added to the HTTP client. * * @return Can be {@code null}. */ @Nullable CredentialsProvider getCredentialsProvider() { return credentialsProvider; } /** * Get the {@link SSLIOSessionStrategy} that will be added to the HTTP client. * * @return Never {@code null}. */ SSLIOSessionStrategy getSSLStrategy() { return sslStrategy; } /** * Sets the {@linkplain HttpAsyncClientBuilder#setDefaultCredentialsProvider(CredentialsProvider) credential provider}, * * @param httpClientBuilder The client to configure. * @return Always {@code httpClientBuilder}. */ @Override public HttpAsyncClientBuilder customizeHttpClient(final HttpAsyncClientBuilder httpClientBuilder) { // enable SSL / TLS httpClientBuilder.setSSLStrategy(sslStrategy); // enable user authentication if (credentialsProvider != null) { httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } return httpClientBuilder; } } public static class NullHostNameVerifier implements HostnameVerifier { @Override public boolean verify(String arg0, SSLSession arg1) { return true; } } /** * main函数参考如下,调用create函数创建Rest Low Level Client客户端,查询“test”索引是否存在。 */ public static void main(String[] args) throws IOException { RestClient lowLevelClient = create(Arrays.asList("xxx.xxx.xxx.xxx", "xxx.xxx.xxx.xxx"), 9200, "http", 1000, 1000, 1000, "username", "password"); Request request = new Request("HEAD", "/test"); Response response = lowLevelClient.performRequest(request); System.out.println(response.getStatusLine().getStatusCode()); lowLevelClient.close(); } }
- 方式二:创建High Level Client再调用getLowLevelClient()获取Low Level Client
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
import org.apache.http.HttpHost; import org.apache.http.HttpResponse; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.http.protocol.HttpContext; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.common.Nullable; import java.io.IOException; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.concurrent.TimeUnit; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSession; import javax.net.ssl.TrustManager;import javax.net.ssl.X509TrustManager; import org.elasticsearch.client.RestHighLevelClient; public class Main13 { /** * 创建客户端的类,定义create函数用于创建客户端。 */ public static RestHighLevelClient create(List<String> host, int port, String protocol, int connectTimeout, int connectionRequestTimeout, int socketTimeout, String username, String password) throws IOException { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); SSLContext sc = null; try { sc = SSLContext.getInstance("SSL"); sc.init(null, trustAllCerts, new SecureRandom()); } catch (KeyManagementException | NoSuchAlgorithmException e) { e.printStackTrace(); } SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sc, new NullHostNameVerifier()); SecuredHttpClientConfigCallback httpClientConfigCallback = new SecuredHttpClientConfigCallback(sessionStrategy, credentialsProvider); RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, port, protocol)) .setRequestConfigCallback(requestConfig -> requestConfig.setConnectTimeout(connectTimeout) .setConnectionRequestTimeout(connectionRequestTimeout) .setSocketTimeout(socketTimeout)) .setHttpClientConfigCallback(httpClientConfigCallback); final RestHighLevelClient client = new RestHighLevelClient(builder); logger.info("es rest client build success {} ", client); return client; } /** * constructHttpHosts函数转换host集群节点ip列表。 */ public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) { return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new); } /** * trustAllCerts忽略证书配置。 */ public static TrustManager[] trustAllCerts = new TrustManager[] { new X509TrustManager() { @Override public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { } @Override public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { } @Override public X509Certificate[] getAcceptedIssuers() { return null; } } }; /** * CustomConnectionKeepAliveStrategy函数设置连接的保活时间,主要应对大量短连接的情况和数据请求不高的场景。 */ public static class CustomConnectionKeepAliveStrategy extends DefaultConnectionKeepAliveStrategy { public static final CustomConnectionKeepAliveStrategy INSTANCE = new CustomConnectionKeepAliveStrategy(); private CustomConnectionKeepAliveStrategy() { super(); } /** * 最大keep alive的时间(分钟) * 这里默认为10分钟,可以根据实际情况设置。可以观察客户端机器状态为TIME_WAIT的TCP连接数,如果太多,可以增大此值。 */ private final long MAX_KEEP_ALIVE_MINUTES = 10; @Override public long getKeepAliveDuration(HttpResponse response, HttpContext context) { long keepAliveDuration = super.getKeepAliveDuration(response, context); // <0 为无限期keepalive // 将无限期替换成一个默认的时间 if (keepAliveDuration < 0) { return TimeUnit.MINUTES.toMillis(MAX_KEEP_ALIVE_MINUTES); } return keepAliveDuration; } } private static final Logger logger = LogManager.getLogger(Main.class); static class SecuredHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback { @Nullable private final CredentialsProvider credentialsProvider; /** * The {@link SSLIOSessionStrategy} for all requests to enable SSL / TLS encryption. */ private final SSLIOSessionStrategy sslStrategy; /** * Create a new {@link SecuredHttpClientConfigCallback}. * * @param credentialsProvider The credential provider, if a username/password have been supplied * @param sslStrategy The SSL strategy, if SSL / TLS have been supplied * @throws NullPointerException if {@code sslStrategy} is {@code null} */ SecuredHttpClientConfigCallback(final SSLIOSessionStrategy sslStrategy, @Nullable final CredentialsProvider credentialsProvider) { this.sslStrategy = Objects.requireNonNull(sslStrategy); this.credentialsProvider = credentialsProvider; } /** * Get the {@link CredentialsProvider} that will be added to the HTTP client. * * @return Can be {@code null}. */ @Nullable CredentialsProvider getCredentialsProvider() { return credentialsProvider; } /** * Get the {@link SSLIOSessionStrategy} that will be added to the HTTP client. * * @return Never {@code null}. */ SSLIOSessionStrategy getSSLStrategy() { return sslStrategy; } /** * Sets the {@linkplain HttpAsyncClientBuilder#setDefaultCredentialsProvider(CredentialsProvider) credential provider}, * * @param httpClientBuilder The client to configure. * @return Always {@code httpClientBuilder}. */ @Override public HttpAsyncClientBuilder customizeHttpClient(final HttpAsyncClientBuilder httpClientBuilder) { // enable SSL / TLS httpClientBuilder.setSSLStrategy(sslStrategy); // enable user authentication if (credentialsProvider != null) { httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } return httpClientBuilder; } } public static class NullHostNameVerifier implements HostnameVerifier { @Override public boolean verify(String arg0, SSLSession arg1) { return true; } } /** * main函数参考如下,调用create函数创建High Level Client再调用getLowLevelClient()获取Low Level Client,并查询“test”索引是否存在。 */ public static void main(String[] args) throws IOException { RestHighLevelClient client = create(Arrays.asList("xxx.xxx.xxx.xxx", "xxx.xxx.xxx.xxx"), 9200, "http", 1000, 1000, 1000, "username", "password"); RestClient lowLevelClient = client.getLowLevelClient(); Request request = new Request("HEAD", "test"); Response response = lowLevelClient.performRequest(request); System.out.println(response.getStatusLine().getStatusCode()); lowLevelClient.close(); } }
参数 |
描述 |
---|---|
host |
Elasticsearch集群访问地址,当存在多个IP地址时,中间用“,”隔开。 |
port |
Elasticsearch集群的连接端口,默认是“9200”。 |
protocol |
连接协议,“http”或者“https”,根据集群实际情况填写。 |
connectTimeout |
socket连接超时时间。 |
connectionRequestTimeout |
socket连接请求超时时间。 |
socketTimeout |
socket请求超时时间。 |
username |
访问集群的用户名。 |
password |
用户名对应的密码。 |
通过Rest Low Level Client连接安全集群(使用安全证书)
- 方式一:直接创建Rest Low Level Client
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.common.Nullable; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.security.KeyStore; import java.security.SecureRandom; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.Arrays; import java.util.List; import java.util.Objects; import javax.net.ssl.SSLContext;import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.X509TrustManager; public class Main13 { private static final Logger logger = LogManager.getLogger(Main.class); /** * 创建客户端的类,定义create函数用于创建客户端。 */ public static RestClient create(List<String> host, int port, String protocol, int connectTimeout, int connectionRequestTimeout, int socketTimeout, String username, String password, String cerFilePath, String cerPassword) throws IOException { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); SSLContext sc = null; try { TrustManager[] tm = {new MyX509TrustManager(cerFilePath, cerPassword)}; sc = SSLContext.getInstance("SSL", "SunJSSE"); //也可以使用SSLContext sslContext = SSLContext.getInstance("TLSv1.2"); sc.init(null, tm, new SecureRandom()); } catch (Exception e) { e.printStackTrace(); } SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sc, new NoopHostnameVerifier()); SecuredHttpClientConfigCallback httpClientConfigCallback = new SecuredHttpClientConfigCallback(sessionStrategy, credentialsProvider); RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, port, protocol)) .setRequestConfigCallback(requestConfig -> requestConfig.setConnectTimeout(connectTimeout) .setConnectionRequestTimeout(connectionRequestTimeout) .setSocketTimeout(socketTimeout)) .setHttpClientConfigCallback(httpClientConfigCallback); final RestClient client = builder.build(); logger.info("es rest client build success {} ", client); return client; } /** * constructHttpHosts函数转换host集群节点ip列表。 */ public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) { return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new);} static class SecuredHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback { @Nullable private final CredentialsProvider credentialsProvider; private final SSLIOSessionStrategy sslStrategy; SecuredHttpClientConfigCallback(final SSLIOSessionStrategy sslStrategy, @Nullable final CredentialsProvider credentialsProvider) { this.sslStrategy = Objects.requireNonNull(sslStrategy); this.credentialsProvider = credentialsProvider; } @Nullable CredentialsProvider getCredentialsProvider() { return credentialsProvider; } SSLIOSessionStrategy getSSLStrategy() { return sslStrategy; } @Override public HttpAsyncClientBuilder customizeHttpClient(final HttpAsyncClientBuilder httpClientBuilder) { httpClientBuilder.setSSLStrategy(sslStrategy); if (credentialsProvider != null) { httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } return httpClientBuilder; }} public static class MyX509TrustManager implements X509TrustManager { X509TrustManager sunJSSEX509TrustManager; MyX509TrustManager(String cerFilePath, String cerPassword) throws Exception { File file = new File(cerFilePath); if (!file.isFile()) { throw new Exception("Wrong Certification Path"); } System.out.println("Loading KeyStore " + file + "..."); InputStream in = new FileInputStream(file); KeyStore ks = KeyStore.getInstance("JKS"); ks.load(in, cerPassword.toCharArray()); TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509", "SunJSSE"); tmf.init(ks); TrustManager[] tms = tmf.getTrustManagers(); for (TrustManager tm : tms) { if (tm instanceof X509TrustManager) { sunJSSEX509TrustManager = (X509TrustManager) tm; return; } } throw new Exception("Couldn't initialize"); } @Override public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { } @Override public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { } @Override public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; } } /** * main函数参考如下,调用create函数创建Rest Low Level Client,查询“test”索引是否存在。 */ public static void main(String[] args) throws IOException { RestClient lowLevelClient = create(Arrays.asList("xxx.xxx.xxx.xxx", "xxx.xxx.xxx.xxx"), 9200, "https", 1000, 1000, 1000, "username", "password", "cerFilePath", "cerPassword"); Request request = new Request("HEAD", "test"); Response response = lowLevelClient.performRequest(request); System.out.println(response.getStatusLine().getStatusCode()); lowLevelClient.close(); } }
- 方式二:创建High Level Client再调用getLowLevelClient()获取Low Level Client
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.Nullable; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.security.KeyStore; import java.security.SecureRandom; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.Arrays; import java.util.List; import java.util.Objects; import javax.net.ssl.SSLContext;import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.X509TrustManager; public class Main { private static final Logger logger = LogManager.getLogger(Main.class); /** * 创建客户端的类,定义create函数用于创建客户端。 */ public static RestHighLevelClient create(List<String> host, int port, String protocol, int connectTimeout, int connectionRequestTimeout, int socketTimeout, String username, String password, String cerFilePath, String cerPassword) throws IOException { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); SSLContext sc = null; try { TrustManager[] tm = {new MyX509TrustManager(cerFilePath, cerPassword)}; sc = SSLContext.getInstance("SSL", "SunJSSE"); //也可以使用SSLContext sslContext = SSLContext.getInstance("TLSv1.2"); sc.init(null, tm, new SecureRandom()); } catch (Exception e) { e.printStackTrace(); } SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sc, new NoopHostnameVerifier()); SecuredHttpClientConfigCallback httpClientConfigCallback = new SecuredHttpClientConfigCallback(sessionStrategy, credentialsProvider); RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, port, protocol)) .setRequestConfigCallback(requestConfig -> requestConfig.setConnectTimeout(connectTimeout) .setConnectionRequestTimeout(connectionRequestTimeout) .setSocketTimeout(socketTimeout)) .setHttpClientConfigCallback(httpClientConfigCallback); final RestHighLevelClient client = new RestHighLevelClient(builder); logger.info("es rest client build success {} ", client); ClusterHealthRequest request = new ClusterHealthRequest(); ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT); logger.info("es rest client health response {} ", response); return client; } /** * constructHttpHosts函数转换host集群节点ip列表。 */ public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) { return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new);} static class SecuredHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback { @Nullable private final CredentialsProvider credentialsProvider; private final SSLIOSessionStrategy sslStrategy; SecuredHttpClientConfigCallback(final SSLIOSessionStrategy sslStrategy, @Nullable final CredentialsProvider credentialsProvider) { this.sslStrategy = Objects.requireNonNull(sslStrategy); this.credentialsProvider = credentialsProvider; } @Nullable CredentialsProvider getCredentialsProvider() { return credentialsProvider; } SSLIOSessionStrategy getSSLStrategy() { return sslStrategy; } @Override public HttpAsyncClientBuilder customizeHttpClient(final HttpAsyncClientBuilder httpClientBuilder) { httpClientBuilder.setSSLStrategy(sslStrategy); if (credentialsProvider != null) { httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } return httpClientBuilder; }} public static class MyX509TrustManager implements X509TrustManager { X509TrustManager sunJSSEX509TrustManager; MyX509TrustManager(String cerFilePath, String cerPassword) throws Exception { File file = new File(cerFilePath); if (!file.isFile()) { throw new Exception("Wrong Certification Path"); } System.out.println("Loading KeyStore " + file + "..."); InputStream in = new FileInputStream(file); KeyStore ks = KeyStore.getInstance("JKS"); ks.load(in, cerPassword.toCharArray()); TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509", "SunJSSE"); tmf.init(ks); TrustManager[] tms = tmf.getTrustManagers(); for (TrustManager tm : tms) { if (tm instanceof X509TrustManager) { sunJSSEX509TrustManager = (X509TrustManager) tm; return; } } throw new Exception("Couldn't initialize"); } @Override public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { } @Override public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { } @Override public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; } } /** * main函数参考如下,调用create函数创建High Level Client再调用getLowLevelClient()获取Low Level Client,并查询“test”索引是否存在。 */ public static void main(String[] args) throws IOException { RestHighLevelClient client = create(Arrays.asList("xxx.xxx.xxx.xxx", "xxx.xxx.xxx.xxx"), 9200, "https", 1000, 1000, 1000, "username", "password", "cerFilePath", "cerPassword"); RestClient lowLevelClient = client.getLowLevelClient(); Request request = new Request("HEAD", "test"); Response response = lowLevelClient.performRequest(request); System.out.println(response.getStatusLine().getStatusCode()); lowLevelClient.close(); } }
参数 |
描述 |
---|---|
host |
Elasticsearch集群访问地址,当存在多个IP地址时,中间用“,”隔开。 |
port |
Elasticsearch集群的连接端口,默认是“9200”。 |
protocol |
连接协议,此处填写“https”。 |
connectTimeout |
socket连接超时时间。 |
connectionRequestTimeout |
socket连接请求超时时间。 |
socketTimeout |
socket请求超时时间。 |
username |
访问集群的用户名。 |
password |
用户名对应的密码。 |
cerFilePath |
证书路径。 |
cerPassword |
证书密码。 |