更新时间:2024-08-15 GMT+08:00
分享

通过Rest High Level Client接入Elasticsearch集群

Elasticsearch官方提供了SDK(Rest High level Client)方式连接集群,Rest Client客户端对Elasticsearch的API进行了封装,用户只需要构造对应的结构即可对ES集群进行访问。Rest Client的详细使用请参考官方文档:https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/master/index.html

本文介绍通过Rest High level Client访问CSS集群的配置说明。Rest High level Client接入集群有3种方式:

约束限制

建议Rest High Level Client的版本和Elasticsearch的版本保持一致,例如需要访问的ES集群版本是7.6.2,则使用的Rest High Level Client客户端版本建议也是7.6.2。如果您使用相比Elasticsearch集群更高版本的Java Rest High Level Client且存在少量请求的兼容性问题,您可以使用“RestHighLevelClient.getLowLevelClient()”方式直接获取Low Level Client,实现自定义的Elasticsearch请求内容。

前提条件

  • CSS集群处于可用状态。
  • 确保运行Java代码的服务器与CSS集群的网络是互通的。
  • 根据集群选择的网络配置方式,获取集群的访问地址,具体操作请参见网络配置
  • 确认服务器已安装JDK1.8,JDK1.8官网下载地址:http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
  • 服务器已引入Java依赖。

    其中7.6.2为Elasticsearch Java客户端的版本号。

    • Maven方式引入:
      <dependency>
          <groupId>org.elasticsearch.client</groupId>
          <artifactId>elasticsearch-rest-high-level-client</artifactId>
          <version>7.6.2</version>
      </dependency>
      <dependency>
          <groupId>org.elasticsearch</groupId>
          <artifactId>elasticsearch</artifactId>
          <version>7.6.2</version>
      </dependency>
    • Gradle方式引入:
      compile group: 'org.elasticsearch.client', name: 'elasticsearch-rest-high-level-client', version: '7.6.2'

通过Rest High Level Client连接非安全集群

通过Rest High Level Client连接非安全集群,并查询test索引是否存在。代码示例如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import org.apache.http.HttpHost;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

/**
 * Rest Hive Level 连接非安全集群
 */
public class Main {
    public static void main(String[] args) throws IOException {
        List<String> host = Arrays.asList("x.x.x.x", "x.x.x.x");
        RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, 9200, "http"));
        final RestHighLevelClient client = new RestHighLevelClient(builder);
        GetIndexRequest indexRequest = new GetIndexRequest("test");
        boolean exists = client.indices().exists(indexRequest, RequestOptions.DEFAULT);
        System.out.println(exists);
        client.close();
    }

    /**
     * constructHttpHosts函数转换host集群节点ip列表。
     */
    public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) {
        return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new);
    }
}

其中,host为集群访问地址,当存在多个IP地址时,中间用“,”隔开;test为查询的索引名称。

通过Rest High Level Client连接安全集群(不使用安全证书)

该场景适用于连接2种集群:安全模式+HTTP协议的集群、安全模式+HTTPS协议的集群(忽略证书)。

代码示例如下:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.Nullable;

import java.io.IOException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;

/**
 * Rest High Level连接安全集群(不使用证书)
 */
public class Main {
    /**
     * 创建客户端的类,定义create函数用于创建客户端。
     */
    public static RestHighLevelClient create(List<String> host, int port, String protocol, int connectTimeout, int connectionRequestTimeout, int socketTimeout, String username, String password) throws IOException{
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
        SSLContext sc = null;
        try {
            sc = SSLContext.getInstance("SSL");
            sc.init(null, trustAllCerts, new SecureRandom());
        } catch (KeyManagementException | NoSuchAlgorithmException e) {
            e.printStackTrace();
        }
        SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sc, new NullHostNameVerifier());
        SecuredHttpClientConfigCallback httpClientConfigCallback = new SecuredHttpClientConfigCallback(sessionStrategy,
            credentialsProvider);


        RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, port, protocol))
            .setRequestConfigCallback(requestConfig -> requestConfig.setConnectTimeout(connectTimeout)
                .setConnectionRequestTimeout(connectionRequestTimeout)
                .setSocketTimeout(socketTimeout))
            .setHttpClientConfigCallback(httpClientConfigCallback);
        final RestHighLevelClient client = new RestHighLevelClient(builder);
        logger.info("es rest client build success {} ", client);

        ClusterHealthRequest request = new ClusterHealthRequest();
        ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT);
        logger.info("es rest client health response {} ", response);
        return client;
    }

    /**
     * constructHttpHosts函数转换host集群节点ip列表。
     */
    public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) {
        return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new);
    }

    /**
     * trustAllCerts忽略证书配置。
     */
    public static TrustManager[] trustAllCerts = new TrustManager[] {
        new X509TrustManager() {
            @Override
            public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
            }

            @Override
            public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
            }

            @Override
            public X509Certificate[] getAcceptedIssuers() {
                return null;
            }
        }
    };

    private static final Logger logger = LogManager.getLogger(Main.class);

    static class SecuredHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback {
        @Nullable
        private final CredentialsProvider credentialsProvider;
        /**
         * The {@link SSLIOSessionStrategy} for all requests to enable SSL / TLS encryption.
         */
        private final SSLIOSessionStrategy sslStrategy;
        /**
         * Create a new {@link SecuredHttpClientConfigCallback}.
         *
         * @param credentialsProvider The credential provider, if a username/password have been supplied
         * @param sslStrategy         The SSL strategy, if SSL / TLS have been supplied
         * @throws NullPointerException if {@code sslStrategy} is {@code null}
         */
        SecuredHttpClientConfigCallback(final SSLIOSessionStrategy sslStrategy,
            @Nullable final CredentialsProvider credentialsProvider) {
            this.sslStrategy = Objects.requireNonNull(sslStrategy);
            this.credentialsProvider = credentialsProvider;
        }
        /**
         * Get the {@link CredentialsProvider} that will be added to the HTTP client.
         *
         * @return Can be {@code null}.
         */
        @Nullable
        CredentialsProvider getCredentialsProvider() {
            return credentialsProvider;
        }
        /**
         * Get the {@link SSLIOSessionStrategy} that will be added to the HTTP client.
         *
         * @return Never {@code null}.
         */
        SSLIOSessionStrategy getSSLStrategy() {
            return sslStrategy;
        }
        /**
         * Sets the {@linkplain HttpAsyncClientBuilder#setDefaultCredentialsProvider(CredentialsProvider) credential provider},
         *
         * @param httpClientBuilder The client to configure.
         * @return Always {@code httpClientBuilder}.
         */
        @Override
        public HttpAsyncClientBuilder customizeHttpClient(final HttpAsyncClientBuilder httpClientBuilder) {
            // enable SSL / TLS
            httpClientBuilder.setSSLStrategy(sslStrategy);
            // enable user authentication
            if (credentialsProvider != null) {
                httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
            }
            return httpClientBuilder;
        }
    }

    public static class NullHostNameVerifier implements HostnameVerifier {
        @Override
        public boolean verify(String arg0, SSLSession arg1) {
            return true;
        }
    }

    /**
     * main函数参考如下,调用上面的create函数创建客户端,查询“test”索引是否存在。
     */
    public static void main(String[] args) throws IOException {
        RestHighLevelClient client = create(Arrays.asList("x.x.x.x", "x.x.x.x"), 9200, "https", 1000, 1000, 1000,  "username", "password");
        GetIndexRequest indexRequest = new GetIndexRequest("test");
        boolean exists = client.indices().exists(indexRequest, RequestOptions.DEFAULT);
        System.out.println(exists);
        client.close();
    }
}
表1 函数中的变量说明

参数

描述

host

Elasticsearch集群访问地址,当存在多个IP地址时,中间用“,”隔开。

port

Elasticsearch集群的连接端口,默认是“9200”

protocol

连接协议,“http”或者“https”,根据集群实际情况填写。

connectTimeout

socket连接超时时间。

connectionRequestTimeout

socket连接请求超时时间。

socketTimeout

socket请求超时时间。

username

访问集群的用户名。

password

用户名对应的密码。

通过Rest High Level Client连接安全集群(使用安全证书)

该场景适用于使用安全证书连接安全模式+HTTPS协议的集群。

  1. 获取安全证书(CloudSearchService.cer)。
    1. 登录云搜索服务控制台。
    2. 选择“集群管理”进入集群列表。
    3. 单击对应集群的名称,进入集群基本信息页面。
    4. “基本信息”页面,单击“HTTPS访问”后面的“下载证书”
      图1 下载证书
  2. 转换安全证书(CloudSearchService.cer)。将下载的安全证书上传到客户端机器上,使用keytool工具将“.cer”证书转换成Java可以读取的“.jks”证书格式。
    • 在Linux系统中,执行如下命令转换证书。
      keytool -import -alias newname -keystore ./truststore.jks -file ./CloudSearchService.cer 
    • 在Windows系统中,执行如下命令转换证书。
      keytool -import -alias newname -keystore .\truststore.jks -file .\CloudSearchService.cer

    其中,newname是由用户自定义的证书名称。

    该命令执行后,会提示设置证书密码,并确认密码。请保存该密码,后续接入集群会使用。

  3. 接入集群。代码示例如下:
      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    import org.apache.http.HttpHost;
    import org.apache.http.auth.AuthScope;
    import org.apache.http.auth.UsernamePasswordCredentials;
    import org.apache.http.client.CredentialsProvider;
    import org.apache.http.conn.ssl.NoopHostnameVerifier;
    import org.apache.http.impl.client.BasicCredentialsProvider;
    import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
    import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
    import org.apache.logging.log4j.LogManager;
    import org.apache.logging.log4j.Logger;
    import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
    import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
    import org.elasticsearch.client.RequestOptions;
    import org.elasticsearch.client.RestClient;
    import org.elasticsearch.client.RestClientBuilder;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.elasticsearch.client.indices.GetIndexRequest;
    import org.elasticsearch.common.Nullable;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.security.KeyStore;
    import java.security.SecureRandom;
    import java.security.cert.CertificateException;
    import java.security.cert.X509Certificate;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Objects;
    
    import javax.net.ssl.SSLContext;
    import javax.net.ssl.TrustManager;
    import javax.net.ssl.TrustManagerFactory;
    import javax.net.ssl.X509TrustManager;
    
    /**
     * Rest Hive Level连接安全集群(使用https证书)
     */
    public class Main {
        public static RestHighLevelClient create(List<String> host, int port, String protocol, int connectTimeout, int connectionRequestTimeout, int socketTimeout, String username, String password, String cerFilePath,
            String cerPassword) throws IOException {
    
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
            SSLContext sc = null;
            try {
                TrustManager[] tm = {new MyX509TrustManager(cerFilePath, cerPassword)};
                sc = SSLContext.getInstance("SSL", "SunJSSE");
                //也可以使用SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
                sc.init(null, tm, new SecureRandom());
            } catch (Exception e) {
                e.printStackTrace();
            }
    
            SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sc, new NoopHostnameVerifier());
            SecuredHttpClientConfigCallback httpClientConfigCallback = new SecuredHttpClientConfigCallback(sessionStrategy,
                credentialsProvider);
    
            RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, port, protocol))
                .setRequestConfigCallback(requestConfig -> requestConfig.setConnectTimeout(connectTimeout)
                    .setConnectionRequestTimeout(connectionRequestTimeout)
                    .setSocketTimeout(socketTimeout))
                .setHttpClientConfigCallback(httpClientConfigCallback);
            final RestHighLevelClient client = new RestHighLevelClient(builder);
            logger.info("es rest client build success {} ", client);
    
            ClusterHealthRequest request = new ClusterHealthRequest();
            ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT);
            logger.info("es rest client health response {} ", response);
            return client;
        }
    
        /**
         * constructHttpHosts函数转换host集群节点ip列表。
         */
        public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) {
            return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new);
        }
    
        /**
         * SecuredHttpClientConfigCallback类定义。
         */
        static class SecuredHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback {
            @Nullable
            private final CredentialsProvider credentialsProvider;
    
            private final SSLIOSessionStrategy sslStrategy;
    
            SecuredHttpClientConfigCallback(final SSLIOSessionStrategy sslStrategy,
                @Nullable final CredentialsProvider credentialsProvider) {
                this.sslStrategy = Objects.requireNonNull(sslStrategy);
                this.credentialsProvider = credentialsProvider;
            }
    
            @Nullable
            CredentialsProvider getCredentialsProvider() {
                return credentialsProvider;
            }
    
            SSLIOSessionStrategy getSSLStrategy() {
                return sslStrategy;
            }
    
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(final HttpAsyncClientBuilder httpClientBuilder) {
                httpClientBuilder.setSSLStrategy(sslStrategy);
                if (credentialsProvider != null) {
                    httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                }
                return httpClientBuilder;
            }
        }
    
        private static final Logger logger = LogManager.getLogger(Main.class);
    
        public static class MyX509TrustManager implements X509TrustManager {
            X509TrustManager sunJSSEX509TrustManager;
    
            MyX509TrustManager(String cerFilePath, String cerPassword) throws Exception {
                File file = new File(cerFilePath);
                if (!file.isFile()) {
                    throw new Exception("Wrong Certification Path");
                }
                System.out.println("Loading KeyStore " + file + "...");
                InputStream in = new FileInputStream(file);
                KeyStore ks = KeyStore.getInstance("JKS");
                ks.load(in, cerPassword.toCharArray());
                TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509", "SunJSSE");
                tmf.init(ks);
                TrustManager[] tms = tmf.getTrustManagers();
                for (TrustManager tm : tms) {
                    if (tm instanceof X509TrustManager) {
                        sunJSSEX509TrustManager = (X509TrustManager) tm;
                        return;
                    }
                }
                throw new Exception("Couldn't initialize");
            }
    
            @Override
            public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
    
            }
    
            @Override
            public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
    
            }
    
            @Override
            public X509Certificate[] getAcceptedIssuers() {
                return new X509Certificate[0];
            }
        }
    
        /**
         * main函数参考如下,调用上面的create函数创建客户端,查询“test”索引是否存在。
         */
        public static void main(String[] args) throws IOException {
            RestHighLevelClient client = create(Arrays.asList("xxx.xxx.xxx.xxx", "xxx.xxx.xxx.xxx"), 9200, "https", 1000, 1000, 1000, "username", "password", "cerFilePath", "cerPassword");
            GetIndexRequest indexRequest = new GetIndexRequest("test");
            boolean exists = client.indices().exists(indexRequest, RequestOptions.DEFAULT);
            System.out.println(exists);
            client.close();
        }
    }
    
    表2 函数中的参数说明

    参数

    描述

    host

    Elasticsearch集群访问地址,当存在多个IP地址时,中间用“,”隔开。

    port

    Elasticsearch集群的连接端口,默认是“9200”

    protocol

    连接协议,此处填写“https”

    connectTimeout

    socket连接超时时间。

    connectionRequestTimeout

    socket连接请求超时时间。

    socketTimeout

    socket请求超时时间。

    username

    访问集群的用户名。

    password

    用户名对应的密码。

    cerFilePath

    证书路径。

    cerPassword

    证书密码。

相关文档