更新时间:2024-08-15 GMT+08:00
分享

通过Rest Low Level Client接入Elasticsearch集群

High Level Client是在Low Level Client基础上进行封装的,如果High Level Client中的方法调用(例如“.search”“.bulk”)不能满足使用需求,或存在兼容性问题,可以选择使用Low Level Client方式,甚至可以使用“HighLevelClient.getLowLevelClient()”方式直接获取Low Level Client。使用Low Level Client发送请求时需要自定义请求结构,使用上更为灵活,能满足所有Elasticsearch支持的请求格式,例如GET、POST、DELETE、HEAD等。

本文介绍通过Rest Low Level Client访问CSS集群的配置说明。Rest Low Level Client接入集群有3种方式,每一种方式又分为直接创建Rest Client(即Rest Low Level Client)和通过创建High Level Client再调用getLowLevelClient()获取Low Level Client。

注意事项

建议Rest Low Level Client的版本和Elasticsearch的版本保持一致,例如需要访问的ES集群版本是7.6.2,则使用的Rest Low Level Client客户端版本建议也是7.6.2。

准备工作

  • CSS集群处于可用状态。
  • 确保运行Java代码的服务器与CSS集群的网络是互通的。
  • 根据集群选择的网络配置方式,获取集群的访问地址,具体操作请参见网络配置
  • 确认服务器已安装JDK1.8,JDK1.8官网下载地址:http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
  • 通过Maven方式引入apache版本。代码示例以7.6.2版本为例。

    其中7.6.2为Elasticsearch Java客户端的版本号。

    <dependency>
    	<groupId>org.elasticsearch.client</groupId>
    	<artifactId>elasticsearch-rest-client</artifactId>
    	<version>7.6.2</version>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>7.6.2</version>
    </dependency>

通过Rest Low Level Client连接非安全集群

  • 方式一:直接创建Rest Low Level Client
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    import org.apache.http.HttpHost;
    import org.elasticsearch.client.Request;
    import org.elasticsearch.client.Response;
    import org.elasticsearch.client.RestClient;
    import org.elasticsearch.client.RestClientBuilder;
    
    import java.io.IOException;
    import java.util.Arrays;
    import java.util.List;
    
    public class Main {
    
        public static void main(String[] args) throws IOException {
            List<String> host = Arrays.asList("xxx.xxx.xxx.xxx", "xxx.xxx.xxx.xxx");
            RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, 9200, "http"));
            /**
             * 创建Rest Low Level Client。
             */
            RestClient lowLevelClient = builder.build();
            /**
             * 查询“test”索引是否存在。当索引存在时返回200,不存在时返回404。
             */
            Request request = new Request("HEAD", "/test");
            Response response = lowLevelClient.performRequest(request);
            System.out.println(response.getStatusLine().getStatusCode());
            lowLevelClient.close();
        }
    
        /**
         * constructHttpHosts函数转换host集群节点ip列表。
         */
        public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) {
            return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new);
        }
    }
    
  • 方式二:创建High Level Client再调用getLowLevelClient()获取Low Level Client
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    import org.apache.http.HttpHost;
    import org.elasticsearch.client.Request;
    import org.elasticsearch.client.Response;
    import org.elasticsearch.client.RestClient;
    import org.elasticsearch.client.RestClientBuilder;
    import org.elasticsearch.client.RestHighLevelClient;
    
    import java.io.IOException;
    import java.util.Arrays;
    import java.util.List;
    
    public class Main {
    
        public static void main(String[] args) throws IOException {
            List<String> host = Arrays.asList("xxx.xxx.xxx.xxx", "xxx.xxx.xxx.xxx");
            RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, 9200, "http"));
            final RestHighLevelClient restHighLevelClient = new RestHighLevelClient(builder);
            /**
             * 创建High Level Client再调用getLowLevelClient()获取Low Level Client,即client创建仅下面这一行代码存在差别。
             */
            final RestClient lowLevelClient = restHighLevelClient.getLowLevelClient();
            /**
             * 查询“test”索引是否存在。当索引存在时返回200,不存在时返回404。
             */
            Request request = new Request("HEAD", "/test");
            Response response = lowLevelClient.performRequest(request);
            System.out.println(response.getStatusLine().getStatusCode());
            lowLevelClient.close();
        }
    
        /**
         * constructHttpHosts函数转换host集群节点ip列表。
         */
        public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) {
            return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new);
        }
    }
    

其中,host为集群的访问地址,当存在多个IP地址时,中间用“,”隔开;test为查询的索引名称。

通过Rest Low Level Client连接安全集群(不使用安全证书)

  • 方式一:直接创建Rest Low Level Client
      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    import org.apache.http.HttpHost;
    import org.apache.http.HttpResponse;
    import org.apache.http.auth.AuthScope;
    import org.apache.http.auth.UsernamePasswordCredentials;
    import org.apache.http.client.CredentialsProvider;
    import org.apache.http.impl.client.BasicCredentialsProvider;
    import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
    import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
    import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
    import org.apache.http.protocol.HttpContext;
    import org.apache.logging.log4j.LogManager;
    import org.apache.logging.log4j.Logger;
    import org.elasticsearch.client.Request;
    import org.elasticsearch.client.Response;
    import org.elasticsearch.client.RestClient;
    import org.elasticsearch.client.RestClientBuilder;
    import org.elasticsearch.common.Nullable;
    
    import java.io.IOException;
    import java.security.KeyManagementException;
    import java.security.NoSuchAlgorithmException;
    import java.security.SecureRandom;
    import java.security.cert.CertificateException;
    import java.security.cert.X509Certificate;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Objects;
    import java.util.concurrent.TimeUnit;
    
    import javax.net.ssl.HostnameVerifier;
    import javax.net.ssl.SSLContext;
    import javax.net.ssl.SSLSession;
    import javax.net.ssl.TrustManager;import javax.net.ssl.X509TrustManager;
    
    public class Main {
    
        /**
         * 创建客户端的类,定义create函数用于创建客户端。
         */
        public static RestClient create(List<String> host, int port, String protocol, int connectTimeout, int connectionRequestTimeout, int socketTimeout,  String username, String password) throws IOException {
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
            SSLContext sc = null;
            try {
                sc = SSLContext.getInstance("SSL");
                sc.init(null, trustAllCerts, new SecureRandom());
            } catch (KeyManagementException | NoSuchAlgorithmException e) {
                e.printStackTrace();
            }
            SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sc, new NullHostNameVerifier());
            SecuredHttpClientConfigCallback httpClientConfigCallback = new SecuredHttpClientConfigCallback(sessionStrategy,
                credentialsProvider);
    
            RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, port, protocol))
                .setRequestConfigCallback(requestConfig -> requestConfig.setConnectTimeout(connectTimeout)
                    .setConnectionRequestTimeout(connectionRequestTimeout)
                    .setSocketTimeout(socketTimeout))
                .setHttpClientConfigCallback(httpClientConfigCallback);
            final RestClient client = builder.build();
            logger.info("es rest client build success {} ", client);
            return client;
        }
    
        /**
         * constructHttpHosts函数转换host集群节点ip列表。
         */
        public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) {
            return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new);
        }
    
        /**
         * trustAllCerts忽略证书配置。
         */
        public static TrustManager[] trustAllCerts = new TrustManager[] {
            new X509TrustManager() {
                @Override
                public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
                }
    
                @Override
                public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
                }
    
                @Override
                public X509Certificate[] getAcceptedIssuers() {
                    return null;
                }
            }
        };
    
        /**
         * CustomConnectionKeepAliveStrategy函数设置连接的保活时间,主要应对大量短连接的情况和数据请求不高的场景。
         */
        public static class CustomConnectionKeepAliveStrategy extends DefaultConnectionKeepAliveStrategy {
            public static final CustomConnectionKeepAliveStrategy INSTANCE = new CustomConnectionKeepAliveStrategy();
    
            private CustomConnectionKeepAliveStrategy() {
                super();
            }
    
            /**
             * 最大keep alive的时间(分钟)
             * 这里默认为10分钟,可以根据实际情况设置。可以观察客户端机器状态为TIME_WAIT的TCP连接数,如果太多,可以增大此值。
             */
            private final long MAX_KEEP_ALIVE_MINUTES = 10;
    
            @Override
            public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
                long keepAliveDuration = super.getKeepAliveDuration(response, context);
                // <0 为无限期keepalive
                // 将无限期替换成一个默认的时间
                if (keepAliveDuration < 0) {
                    return TimeUnit.MINUTES.toMillis(MAX_KEEP_ALIVE_MINUTES);
                }
                return keepAliveDuration;
            }
        }
    
        private static final Logger logger = LogManager.getLogger(Main.class);
    
        static class SecuredHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback {
            @Nullable
            private final CredentialsProvider credentialsProvider;
            /**
             * The {@link SSLIOSessionStrategy} for all requests to enable SSL / TLS encryption.
             */
            private final SSLIOSessionStrategy sslStrategy;
            /**
             * Create a new {@link SecuredHttpClientConfigCallback}.
             *
             * @param credentialsProvider The credential provider, if a username/password have been supplied
             * @param sslStrategy         The SSL strategy, if SSL / TLS have been supplied
             * @throws NullPointerException if {@code sslStrategy} is {@code null}
             */
            SecuredHttpClientConfigCallback(final SSLIOSessionStrategy sslStrategy,
                @Nullable final CredentialsProvider credentialsProvider) {
                this.sslStrategy = Objects.requireNonNull(sslStrategy);
                this.credentialsProvider = credentialsProvider;
            }
            /**
             * Get the {@link CredentialsProvider} that will be added to the HTTP client.
             *
             * @return Can be {@code null}.
             */
            @Nullable
            CredentialsProvider getCredentialsProvider() {
                return credentialsProvider;
            }
            /**
             * Get the {@link SSLIOSessionStrategy} that will be added to the HTTP client.
             *
             * @return Never {@code null}.
             */
            SSLIOSessionStrategy getSSLStrategy() {
                return sslStrategy;
            }
            /**
             * Sets the {@linkplain HttpAsyncClientBuilder#setDefaultCredentialsProvider(CredentialsProvider) credential provider},
             *
             * @param httpClientBuilder The client to configure.
             * @return Always {@code httpClientBuilder}.
             */
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(final HttpAsyncClientBuilder httpClientBuilder) {
                // enable SSL / TLS
                httpClientBuilder.setSSLStrategy(sslStrategy);
                // enable user authentication
                if (credentialsProvider != null) {
                    httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                }
                return httpClientBuilder;
            }
        }
    
        public static class NullHostNameVerifier implements HostnameVerifier {
            @Override
            public boolean verify(String arg0, SSLSession arg1) {
                return true;
            }
        }
    
        /**
         * main函数参考如下,调用create函数创建Rest Low Level Client客户端,查询“test”索引是否存在。
         */
        public static void main(String[] args) throws IOException {
            RestClient lowLevelClient = create(Arrays.asList("xxx.xxx.xxx.xxx", "xxx.xxx.xxx.xxx"), 9200, "http", 1000, 1000, 1000, "username", "password");
            Request request = new Request("HEAD", "/test");
            Response response = lowLevelClient.performRequest(request);
            System.out.println(response.getStatusLine().getStatusCode());
            lowLevelClient.close();
        }
    }
    
  • 方式二:创建High Level Client再调用getLowLevelClient()获取Low Level Client
      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    import org.apache.http.HttpHost;
    import org.apache.http.HttpResponse;
    import org.apache.http.auth.AuthScope;
    import org.apache.http.auth.UsernamePasswordCredentials;
    import org.apache.http.client.CredentialsProvider;
    import org.apache.http.impl.client.BasicCredentialsProvider;
    import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
    import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
    import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
    import org.apache.http.protocol.HttpContext;
    import org.apache.logging.log4j.LogManager;
    import org.apache.logging.log4j.Logger;
    import org.elasticsearch.client.Request;
    import org.elasticsearch.client.Response;
    import org.elasticsearch.client.RestClient;
    import org.elasticsearch.client.RestClientBuilder;
    import org.elasticsearch.common.Nullable;
    
    import java.io.IOException;
    import java.security.KeyManagementException;
    import java.security.NoSuchAlgorithmException;
    import java.security.SecureRandom;
    import java.security.cert.CertificateException;
    import java.security.cert.X509Certificate;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Objects;
    import java.util.concurrent.TimeUnit;
    
    import javax.net.ssl.HostnameVerifier;
    import javax.net.ssl.SSLContext;
    import javax.net.ssl.SSLSession;
    import javax.net.ssl.TrustManager;import javax.net.ssl.X509TrustManager;
    
    import org.elasticsearch.client.RestHighLevelClient;
    
    public class Main13 {
    
        /**
         * 创建客户端的类,定义create函数用于创建客户端。
         */
        public static RestHighLevelClient create(List<String> host, int port, String protocol, int connectTimeout, int connectionRequestTimeout, int socketTimeout,  String username, String password) throws IOException {
    
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
            SSLContext sc = null;
            try {
                sc = SSLContext.getInstance("SSL");
                sc.init(null, trustAllCerts, new SecureRandom());
            } catch (KeyManagementException | NoSuchAlgorithmException e) {
                e.printStackTrace();
            }
            SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sc, new NullHostNameVerifier());
            SecuredHttpClientConfigCallback httpClientConfigCallback = new SecuredHttpClientConfigCallback(sessionStrategy,
                credentialsProvider);
    
            RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, port, protocol))
                .setRequestConfigCallback(requestConfig -> requestConfig.setConnectTimeout(connectTimeout)
                    .setConnectionRequestTimeout(connectionRequestTimeout)
                    .setSocketTimeout(socketTimeout))
                .setHttpClientConfigCallback(httpClientConfigCallback);
            final RestHighLevelClient client = new RestHighLevelClient(builder);
            logger.info("es rest client build success {} ", client);
            return client;
        }
    
        /**
         * constructHttpHosts函数转换host集群节点ip列表。
         */
        public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) {
            return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new);
        }
    
        /**
         * trustAllCerts忽略证书配置。
         */
        public static TrustManager[] trustAllCerts = new TrustManager[] {
            new X509TrustManager() {
                @Override
                public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
                }
    
                @Override
                public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
                }
    
                @Override
                public X509Certificate[] getAcceptedIssuers() {
                    return null;
                }
            }
        };
    
        /**
         * CustomConnectionKeepAliveStrategy函数设置连接的保活时间,主要应对大量短连接的情况和数据请求不高的场景。
         */
        public static class CustomConnectionKeepAliveStrategy extends DefaultConnectionKeepAliveStrategy {
            public static final CustomConnectionKeepAliveStrategy INSTANCE = new CustomConnectionKeepAliveStrategy();
    
            private CustomConnectionKeepAliveStrategy() {
                super();
            }
    
            /**
             * 最大keep alive的时间(分钟)
             * 这里默认为10分钟,可以根据实际情况设置。可以观察客户端机器状态为TIME_WAIT的TCP连接数,如果太多,可以增大此值。
             */
            private final long MAX_KEEP_ALIVE_MINUTES = 10;
    
            @Override
            public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
                long keepAliveDuration = super.getKeepAliveDuration(response, context);
                // <0 为无限期keepalive
                // 将无限期替换成一个默认的时间
                if (keepAliveDuration < 0) {
                    return TimeUnit.MINUTES.toMillis(MAX_KEEP_ALIVE_MINUTES);
                }
                return keepAliveDuration;
            }
        }
    
        private static final Logger logger = LogManager.getLogger(Main.class);
    
        static class SecuredHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback {
            @Nullable
            private final CredentialsProvider credentialsProvider;
            /**
             * The {@link SSLIOSessionStrategy} for all requests to enable SSL / TLS encryption.
             */
            private final SSLIOSessionStrategy sslStrategy;
            /**
             * Create a new {@link SecuredHttpClientConfigCallback}.
             *
             * @param credentialsProvider The credential provider, if a username/password have been supplied
             * @param sslStrategy         The SSL strategy, if SSL / TLS have been supplied
             * @throws NullPointerException if {@code sslStrategy} is {@code null}
             */
            SecuredHttpClientConfigCallback(final SSLIOSessionStrategy sslStrategy,
                @Nullable final CredentialsProvider credentialsProvider) {
                this.sslStrategy = Objects.requireNonNull(sslStrategy);
                this.credentialsProvider = credentialsProvider;
            }
            /**
             * Get the {@link CredentialsProvider} that will be added to the HTTP client.
             *
             * @return Can be {@code null}.
             */
            @Nullable
            CredentialsProvider getCredentialsProvider() {
                return credentialsProvider;
            }
            /**
             * Get the {@link SSLIOSessionStrategy} that will be added to the HTTP client.
             *
             * @return Never {@code null}.
             */
            SSLIOSessionStrategy getSSLStrategy() {
                return sslStrategy;
            }
            /**
             * Sets the {@linkplain HttpAsyncClientBuilder#setDefaultCredentialsProvider(CredentialsProvider) credential provider},
             *
             * @param httpClientBuilder The client to configure.
             * @return Always {@code httpClientBuilder}.
             */
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(final HttpAsyncClientBuilder httpClientBuilder) {
                // enable SSL / TLS
                httpClientBuilder.setSSLStrategy(sslStrategy);
                // enable user authentication
                if (credentialsProvider != null) {
                    httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                }
                return httpClientBuilder;
            }
        }
    
        public static class NullHostNameVerifier implements HostnameVerifier {
            @Override
            public boolean verify(String arg0, SSLSession arg1) {
                return true;
            }
        }
    
        /**
         * main函数参考如下,调用create函数创建High Level Client再调用getLowLevelClient()获取Low Level Client,并查询“test”索引是否存在。
         */
        public static void main(String[] args) throws IOException {
            RestHighLevelClient client = create(Arrays.asList("xxx.xxx.xxx.xxx", "xxx.xxx.xxx.xxx"), 9200, "http", 1000, 1000, 1000, "username", "password");
            RestClient lowLevelClient = client.getLowLevelClient();
            Request request = new Request("HEAD", "test");
            Response response = lowLevelClient.performRequest(request);
            System.out.println(response.getStatusLine().getStatusCode());
            lowLevelClient.close();
        }
    }
    
表1 函数中的变量说明

参数

描述

host

Elasticsearch集群访问地址,当存在多个IP地址时,中间用“,”隔开。

port

Elasticsearch集群的连接端口,默认是“9200”

protocol

连接协议,“http”或者“https”,根据集群实际情况填写。

connectTimeout

socket连接超时时间。

connectionRequestTimeout

socket连接请求超时时间。

socketTimeout

socket请求超时时间。

username

访问集群的用户名。

password

用户名对应的密码。

通过Rest Low Level Client连接安全集群(使用安全证书)

  • 方式一:直接创建Rest Low Level Client
      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    import org.apache.http.HttpHost;
    import org.apache.http.auth.AuthScope;
    import org.apache.http.auth.UsernamePasswordCredentials;
    import org.apache.http.client.CredentialsProvider;
    import org.apache.http.conn.ssl.NoopHostnameVerifier;
    import org.apache.http.impl.client.BasicCredentialsProvider;
    import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
    import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
    import org.apache.logging.log4j.LogManager;
    import org.apache.logging.log4j.Logger;
    import org.elasticsearch.client.Request;
    import org.elasticsearch.client.Response;
    import org.elasticsearch.client.RestClient;
    import org.elasticsearch.client.RestClientBuilder;
    import org.elasticsearch.common.Nullable;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.security.KeyStore;
    import java.security.SecureRandom;
    import java.security.cert.CertificateException;
    import java.security.cert.X509Certificate;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Objects;
    
    import javax.net.ssl.SSLContext;import javax.net.ssl.TrustManager;
    import javax.net.ssl.TrustManagerFactory;
    import javax.net.ssl.X509TrustManager;
    
    public class Main13 {
    
        private static final Logger logger = LogManager.getLogger(Main.class);
    
        /**
         * 创建客户端的类,定义create函数用于创建客户端。
         */
        public static RestClient create(List<String> host, int port, String protocol, int connectTimeout, int connectionRequestTimeout, int socketTimeout, String username, String password, String cerFilePath, String cerPassword) throws IOException {
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
            SSLContext sc = null;
            try {
                TrustManager[] tm = {new MyX509TrustManager(cerFilePath, cerPassword)};
                sc = SSLContext.getInstance("SSL", "SunJSSE");
                //也可以使用SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
                sc.init(null, tm, new SecureRandom());
            } catch (Exception e) {
                e.printStackTrace();
            }
    
            SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sc, new NoopHostnameVerifier());
            SecuredHttpClientConfigCallback httpClientConfigCallback = new SecuredHttpClientConfigCallback(sessionStrategy,
                credentialsProvider);
    
            RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, port, protocol))
                .setRequestConfigCallback(requestConfig -> requestConfig.setConnectTimeout(connectTimeout)
                    .setConnectionRequestTimeout(connectionRequestTimeout)
                    .setSocketTimeout(socketTimeout))
                .setHttpClientConfigCallback(httpClientConfigCallback);
            final RestClient client = builder.build();
            logger.info("es rest client build success {} ", client);
            return client;
        }
    
        /**
         * constructHttpHosts函数转换host集群节点ip列表。
         */
        public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) {
            return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new);}
    
        static class SecuredHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback {
            @Nullable
            private final CredentialsProvider credentialsProvider;
    
            private final SSLIOSessionStrategy sslStrategy;
    
            SecuredHttpClientConfigCallback(final SSLIOSessionStrategy sslStrategy,
                @Nullable final CredentialsProvider credentialsProvider) {
                this.sslStrategy = Objects.requireNonNull(sslStrategy);
                this.credentialsProvider = credentialsProvider;
            }
    
            @Nullable
            CredentialsProvider getCredentialsProvider() {
                return credentialsProvider;
            }
    
            SSLIOSessionStrategy getSSLStrategy() {
                return sslStrategy;
            }
    
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(final HttpAsyncClientBuilder httpClientBuilder) {
                httpClientBuilder.setSSLStrategy(sslStrategy);
                if (credentialsProvider != null) {
                    httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                }
                return httpClientBuilder;
            }}
    
        public static class MyX509TrustManager implements X509TrustManager {
            X509TrustManager sunJSSEX509TrustManager;
    
            MyX509TrustManager(String cerFilePath, String cerPassword) throws Exception {
                File file = new File(cerFilePath);
                if (!file.isFile()) {
                    throw new Exception("Wrong Certification Path");
                }
                System.out.println("Loading KeyStore " + file + "...");
                InputStream in = new FileInputStream(file);
                KeyStore ks = KeyStore.getInstance("JKS");
                ks.load(in, cerPassword.toCharArray());
                TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509", "SunJSSE");
                tmf.init(ks);
                TrustManager[] tms = tmf.getTrustManagers();
                for (TrustManager tm : tms) {
                    if (tm instanceof X509TrustManager) {
                        sunJSSEX509TrustManager = (X509TrustManager) tm;
                        return;
                    }
                }
                throw new Exception("Couldn't initialize");
            }
    
            @Override
            public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
    
            }
    
            @Override
            public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
    
            }
    
            @Override
            public X509Certificate[] getAcceptedIssuers() {
                return new X509Certificate[0];
            }
        }
    
        /**
         * main函数参考如下,调用create函数创建Rest Low Level Client,查询“test”索引是否存在。
         */
        public static void main(String[] args) throws IOException {
            RestClient lowLevelClient = create(Arrays.asList("xxx.xxx.xxx.xxx", "xxx.xxx.xxx.xxx"), 9200, "https", 1000, 1000, 1000, "username", "password", "cerFilePath", "cerPassword");
            Request request = new Request("HEAD", "test");
            Response response = lowLevelClient.performRequest(request);
            System.out.println(response.getStatusLine().getStatusCode());
            lowLevelClient.close();
        }
    }
    
  • 方式二:创建High Level Client再调用getLowLevelClient()获取Low Level Client
      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    import org.apache.http.HttpHost;
    import org.apache.http.auth.AuthScope;
    import org.apache.http.auth.UsernamePasswordCredentials;
    import org.apache.http.client.CredentialsProvider;
    import org.apache.http.conn.ssl.NoopHostnameVerifier;
    import org.apache.http.impl.client.BasicCredentialsProvider;
    import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
    import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
    import org.apache.logging.log4j.LogManager;
    import org.apache.logging.log4j.Logger;
    import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
    import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
    import org.elasticsearch.client.Request;
    import org.elasticsearch.client.RequestOptions;
    import org.elasticsearch.client.Response;
    import org.elasticsearch.client.RestClient;
    import org.elasticsearch.client.RestClientBuilder;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.elasticsearch.common.Nullable;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.security.KeyStore;
    import java.security.SecureRandom;
    import java.security.cert.CertificateException;
    import java.security.cert.X509Certificate;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Objects;
    
    import javax.net.ssl.SSLContext;import javax.net.ssl.TrustManager;
    import javax.net.ssl.TrustManagerFactory;
    import javax.net.ssl.X509TrustManager;
    
    public class Main {
    
        private static final Logger logger = LogManager.getLogger(Main.class);
    
        /**
         * 创建客户端的类,定义create函数用于创建客户端。
         */
        public static RestHighLevelClient create(List<String> host, int port, String protocol, int connectTimeout, int connectionRequestTimeout, int socketTimeout, String username, String password, String cerFilePath, String cerPassword) throws IOException {
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
            SSLContext sc = null;
            try {
                TrustManager[] tm = {new MyX509TrustManager(cerFilePath, cerPassword)};
                sc = SSLContext.getInstance("SSL", "SunJSSE");
                //也可以使用SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
                sc.init(null, tm, new SecureRandom());
            } catch (Exception e) {
                e.printStackTrace();
            }
    
            SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sc, new NoopHostnameVerifier());
            SecuredHttpClientConfigCallback httpClientConfigCallback = new SecuredHttpClientConfigCallback(sessionStrategy,
                credentialsProvider);
    
            RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, port, protocol))
                .setRequestConfigCallback(requestConfig -> requestConfig.setConnectTimeout(connectTimeout)
                    .setConnectionRequestTimeout(connectionRequestTimeout)
                    .setSocketTimeout(socketTimeout))
                .setHttpClientConfigCallback(httpClientConfigCallback);
            final RestHighLevelClient client = new RestHighLevelClient(builder);
            logger.info("es rest client build success {} ", client);
    
            ClusterHealthRequest request = new ClusterHealthRequest();
            ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT);
            logger.info("es rest client health response {} ", response);
            return client;
        }
    
        /**
         * constructHttpHosts函数转换host集群节点ip列表。
         */
        public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) {
            return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new);}
    
        static class SecuredHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback {
            @Nullable
            private final CredentialsProvider credentialsProvider;
    
            private final SSLIOSessionStrategy sslStrategy;
    
            SecuredHttpClientConfigCallback(final SSLIOSessionStrategy sslStrategy,
                @Nullable final CredentialsProvider credentialsProvider) {
                this.sslStrategy = Objects.requireNonNull(sslStrategy);
                this.credentialsProvider = credentialsProvider;
            }
    
            @Nullable
            CredentialsProvider getCredentialsProvider() {
                return credentialsProvider;
            }
    
            SSLIOSessionStrategy getSSLStrategy() {
                return sslStrategy;
            }
    
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(final HttpAsyncClientBuilder httpClientBuilder) {
                httpClientBuilder.setSSLStrategy(sslStrategy);
                if (credentialsProvider != null) {
                    httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                }
                return httpClientBuilder;
            }}
    
        public static class MyX509TrustManager implements X509TrustManager {
            X509TrustManager sunJSSEX509TrustManager;
    
            MyX509TrustManager(String cerFilePath, String cerPassword) throws Exception {
                File file = new File(cerFilePath);
                if (!file.isFile()) {
                    throw new Exception("Wrong Certification Path");
                }
                System.out.println("Loading KeyStore " + file + "...");
                InputStream in = new FileInputStream(file);
                KeyStore ks = KeyStore.getInstance("JKS");
                ks.load(in, cerPassword.toCharArray());
                TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509", "SunJSSE");
                tmf.init(ks);
                TrustManager[] tms = tmf.getTrustManagers();
                for (TrustManager tm : tms) {
                    if (tm instanceof X509TrustManager) {
                        sunJSSEX509TrustManager = (X509TrustManager) tm;
                        return;
                    }
                }
                throw new Exception("Couldn't initialize");
            }
    
            @Override
            public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
    
            }
    
            @Override
            public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
    
            }
    
            @Override
            public X509Certificate[] getAcceptedIssuers() {
                return new X509Certificate[0];
            }
        }
    
        /**
         * main函数参考如下,调用create函数创建High Level Client再调用getLowLevelClient()获取Low Level Client,并查询“test”索引是否存在。
         */
        public static void main(String[] args) throws IOException {
            RestHighLevelClient client = create(Arrays.asList("xxx.xxx.xxx.xxx", "xxx.xxx.xxx.xxx"), 9200, "https", 1000, 1000, 1000, "username", "password", "cerFilePath", "cerPassword");
            RestClient lowLevelClient = client.getLowLevelClient();
            Request request = new Request("HEAD", "test");
            Response response = lowLevelClient.performRequest(request);
            System.out.println(response.getStatusLine().getStatusCode());
            lowLevelClient.close();
        }
    }
    
表2 函数中的参数说明

参数

描述

host

Elasticsearch集群访问地址,当存在多个IP地址时,中间用“,”隔开。

port

Elasticsearch集群的连接端口,默认是“9200”

protocol

连接协议,此处填写“https”

connectTimeout

socket连接超时时间。

connectionRequestTimeout

socket连接请求超时时间。

socketTimeout

socket请求超时时间。

username

访问集群的用户名。

password

用户名对应的密码。

cerFilePath

证书路径。

cerPassword

证书密码。

相关文档