更新时间:2026-01-09 GMT+08:00
分享

通过RestHighLevelClient接入Elasticsearch集群

在使用Elasticsearch进行数据查询和管理时,如果直接调用API,会因为API的复杂性和多样性导致开发效率低下。CSS服务的Elasticsearch集群支持通过High Level REST Client进行数据查询和管理。High Level REST Client对Elasticsearch的API进行了封装,用户只需要构造对应的结构即可对Elasticsearch集群进行访问,简化了开发流程。REST Client的详细使用指导请参见Java High Level REST Client

前提条件

  • Elasticsearch集群处于可用状态。
  • 运行Java代码的服务器与Elasticsearch集群之间网络互通。
  • 根据集群的网络配置方式,获取集群的访问地址,操作指导请参见网络配置
  • 确认服务器已安装Java,要求JDK版本为1.8及以上,JDK1.8官网下载地址:Java Downloads
  • 确认High Level REST Client版本。CSS服务支持使用高于Elasticsearch集群版本的Java客户端连接Elasticsearch集群,但是为确保更好的兼容性,建议使用与Elasticsearch集群同版本的Java客户端连接Elasticsearch集群。

    当使用比Elasticsearch集群更高版本的High Level REST Client,并且某些请求存在兼容性问题时,可以使用“RestHighLevelClient.getLowLevelClient()”方式直接获取Low Level REST Client,实现自定义的Elasticsearch请求。操作指导请参见通过RestLowLevelClient接入Elasticsearch集群

引入依赖

在运行Java代码的服务器中引入Java依赖,有如下两种方式:

  • Maven方式:

    使用时需要将7.10.2替换为实际Java客户端的版本号。

    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>7.10.2</version>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>7.10.2</version>
    </dependency>
  • Gradle方式:

    使用时需要将7.10.2替换为实际Java客户端的版本号。

    compile group: 'org.elasticsearch.client', name: 'elasticsearch-rest-high-level-client', version: '7.10.2'

接入集群

当使用Java客户端库访问不同安全类型的Elasticsearch集群时,示例代码会有差异,请根据实际业务场景选择参考文档。

表1 接入集群的场景介绍

Elasticsearch集群安全类型

是否加载安全证书

参考文档

非安全模式

-

RestHighLevelClient连接非安全集群

安全模式+HTTP协议

安全模式+HTTPS协议

RestHighLevelClient连接安全集群(无证书)

安全模式+HTTPS协议

RestHighLevelClient连接安全集群(加载证书)

RestHighLevelClient连接非安全集群

使用Elasticsearch RestHighLevelClient连接非安全模式的Elasticsearch集群,并查询索引“test”是否存在。代码示例如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import org.apache.http.HttpHost;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

/**
 * Rest High Level Client客户端连接非安全集群
 */
public class Main {
    public static void main(String[] args) throws IOException {
        List<String> host = Arrays.asList("{集群访问地址}");
        RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, 9200, "http"));
        final RestHighLevelClient client = new RestHighLevelClient(builder);
        GetIndexRequest indexRequest = new GetIndexRequest("test");
        boolean exists = client.indices().exists(indexRequest, RequestOptions.DEFAULT);
        System.out.println(exists);
        client.close();
    }

    /**
     * constructHttpHosts函数转换host集群节点IP列表。
     */
    public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) {
        return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new);
    }
}

该示例代码为判断集群是否存在test索引,当返回“true”(索引存在)或“false”(索引不存在)时,表示正常返回查询结果,集群连接成功。

RestHighLevelClient连接安全集群(无证书)

使用Elasticsearch RestHighLevelClient在不加载安全证书的情况下连接安全模式的Elasticsearch集群(支持HTTP协议或HTTPS协议),查询索引“test”是否存在。代码示例如下:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;

import java.io.IOException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;

/**
 * Rest High Level Client客户端连接安全集群(不使用证书)
 */
public class Main {

    private static final Logger logger = LogManager.getLogger(Main.class);

    /**
     * 创建客户端的类,定义create函数用于创建客户端
     */
    public static RestHighLevelClient create(List<String> host, int port, String protocol, int connectTimeout,
        int connectionRequestTimeout, int socketTimeout, String username, String password) throws IOException {

        RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, port, protocol))
            .setRequestConfigCallback(requestConfig -> requestConfig.setConnectTimeout(connectTimeout)
                .setConnectionRequestTimeout(connectionRequestTimeout)
                .setSocketTimeout(socketTimeout))
            .setHttpClientConfigCallback(httpClientBuilder -> {
                // enable user authentication
                final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
                httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                // set keepalive
                httpClientBuilder.setKeepAliveStrategy(((httpResponse, httpContext) -> TimeUnit.MINUTES.toMinutes(10)));
                // enable SSL / TLS
                SSLContext sc = null;
                try {
                    sc = SSLContext.getInstance("SSL");
                    sc.init(null, trustAllCerts, new SecureRandom());
                } catch (KeyManagementException | NoSuchAlgorithmException e) {
                    e.printStackTrace();
                }
                SSLIOSessionStrategy sslStrategy = new SSLIOSessionStrategy(sc, new NoopHostnameVerifier());
                httpClientBuilder.setSSLStrategy(sslStrategy);

                return httpClientBuilder;
            });
        final RestHighLevelClient client = new RestHighLevelClient(builder);
        logger.info("es rest client build success {} ", client);

        ClusterHealthRequest request = new ClusterHealthRequest();
        ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT);
        System.out.println("es rest client health response {} " + response);
        return client;
    }

    /**
     * constructHttpHosts函数转换host集群节点IP列表
     */
    public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) {
        return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new);
    }

    /**
     * trustAllCerts忽略证书配置
     */
    public static TrustManager[] trustAllCerts = new TrustManager[] {
        new X509TrustManager() {
            @Override
            public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
            }

            @Override
            public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
            }

            @Override
            public X509Certificate[] getAcceptedIssuers() {
                return null;
            }
        }
    };

    /**
     * main函数参考如下,调用上面的create函数创建客户端,查询“test”索引是否存在
     */
    public static void main(String[] args) throws IOException {
        RestHighLevelClient client = create(Arrays.asList("{host}"), 9200, "https", 1000, 1000, 1000, "username", "password");
        GetIndexRequest indexRequest = new GetIndexRequest("test");
        boolean exists = client.indices().exists(indexRequest, RequestOptions.DEFAULT);
        System.out.println(exists);
        client.close();
    }
}
表2 函数中的变量说明

参数

描述

host

集群访问地址,当存在多个IP地址时,中间用“,”隔开。

port

集群的连接端口,默认是“9200”

protocol

连接协议,“http”或者“https”,根据集群实际情况填写。

connectTimeout

socket连接超时时间(毫秒)。

connectionRequestTimeout

socket连接请求超时时间(毫秒)。

socketTimeout

socket请求超时时间(毫秒)。

username

访问集群的用户名。

password

用户名对应的密码。

该示例代码为判断集群是否存在test索引,当返回“true”(索引存在)或“false”(索引不存在)时,表示正常返回查询结果,集群连接成功。

RestHighLevelClient连接安全集群(加载证书)

使用Elasticsearch RestHighLevelClient在加载安全证书的情况下连接安全模式+HTTPS协议的Elasticsearch集群,查询索引“test”是否存在。代码示例如下:

连接安全模式+HTTPS协议的Elasticsearch集群需要先提前准备好安全证书,操作指导请参见获取并上传安全证书

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.KeyStore;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;

/**
 * Rest High Level Client客户端连接安全集群(使用https证书)
 */
public class Main {
    public static RestHighLevelClient create(List<String> host, int port, String protocol, int connectTimeout,
        int connectionRequestTimeout, int socketTimeout, String username, String password, String certFilePath,
        String certPassword) throws IOException {

        RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, port, protocol))
            .setRequestConfigCallback(requestConfig -> requestConfig.setConnectTimeout(connectTimeout)
                .setConnectionRequestTimeout(connectionRequestTimeout)
                .setSocketTimeout(socketTimeout))
            .setHttpClientConfigCallback(httpClientBuilder -> {
                // enable user authentication
                final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
                httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                // set keepalive
                httpClientBuilder.setKeepAliveStrategy(((httpResponse, httpContext) -> TimeUnit.MINUTES.toMinutes(10)));
                // enable SSL / TLS
                SSLContext sc = null;
                try {
                    TrustManager[] tm = {new MyX509TrustManager(certFilePath, certPassword)};
                    sc = SSLContext.getInstance("SSL", "SunJSSE");
                    //也可以使用SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
                    sc.init(null, tm, new SecureRandom());
                } catch (Exception e) {
                    e.printStackTrace();
                }
                SSLIOSessionStrategy sslStrategy = new SSLIOSessionStrategy(sc, new NoopHostnameVerifier());
                httpClientBuilder.setSSLStrategy(sslStrategy);

                return httpClientBuilder;
            });
        final RestHighLevelClient client = new RestHighLevelClient(builder);
        logger.info("es rest client build success {} ", client);

        ClusterHealthRequest request = new ClusterHealthRequest();
        ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT);
        logger.info("es rest client health response {} ", response);
        return client;
    }

    /**
     * constructHttpHosts函数转换host集群节点IP列表。
     */
    public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) {
        return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new);
    }

    private static final Logger logger = LogManager.getLogger(Main.class);

    public static class MyX509TrustManager implements X509TrustManager {
        X509TrustManager sunJSSEX509TrustManager;

        MyX509TrustManager(String certFilePath, String certPassword) throws Exception {
            File file = new File(certFilePath);
            if (!file.isFile()) {
                throw new Exception("Wrong Certification Path");
            }
            System.out.println("Loading KeyStore " + file + "...");
            InputStream in = new FileInputStream(file);
            KeyStore ks = KeyStore.getInstance("JKS");
            ks.load(in, certPassword.toCharArray());
            TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509", "SunJSSE");
            tmf.init(ks);
            TrustManager[] tms = tmf.getTrustManagers();
            for (TrustManager tm : tms) {
                if (tm instanceof X509TrustManager) {
                    sunJSSEX509TrustManager = (X509TrustManager) tm;
                    return;
                }
            }
            throw new Exception("Couldn't initialize");
        }

        @Override
        public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {

        }

        @Override
        public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {

        }

        @Override
        public X509Certificate[] getAcceptedIssuers() {
            return new X509Certificate[0];
        }
    }

    /**
     * main函数参考如下,调用上面的create函数创建客户端,查询“test”索引是否存在。
     */
    public static void main(String[] args) throws IOException {
        RestHighLevelClient client = create(Arrays.asList({host}), 9200, "https", 1000, 1000, 1000, "username", "password", "certFilePath", "certPassword");
        GetIndexRequest indexRequest = new GetIndexRequest("test");
        boolean exists = client.indices().exists(indexRequest, RequestOptions.DEFAULT);
        System.out.println(exists);
        client.close();
    }	
}
表3 函数中的参数说明

参数

描述

host

集群访问地址,当存在多个IP地址时,中间用“,”隔开。

port

集群的连接端口,默认是“9200”

protocol

连接协议,此处填写“https”

connectTimeout

socket连接超时时间(毫秒)。

connectionRequestTimeout

socket连接请求超时时间(毫秒)。

socketTimeout

socket请求超时时间(毫秒)。

username

访问集群的用户名。

password

用户名对应的密码。

certFilePath

安全证书路径。

certPassword

安全证书密码。

该示例代码为判断集群是否存在test索引,当返回“true”(索引存在)或“false”(索引不存在)时,表示正常返回查询结果,集群连接成功。

获取并上传安全证书

当接入安全模式+HTTPS协议的Elasticsearch集群时,如需加载安全证书则可以参考如下步骤获取安全证书,并上传至客户端。

  1. 获取安全证书(CloudSearchService.cer)。
    1. 登录云搜索服务管理控制台
    2. 在左侧导航栏,选择“集群管理 > Elasticsearch”
    3. 在集群列表,单击目标集群名称,进入集群详情页。
    4. 选择“概览”页签,在“网络信息”下方,单击“HTTPS访问”“下载证书”获取安全证书。
      图1 下载安全证书
  2. 转换安全证书(CloudSearchService.cer)。将下载的安全证书上传到客户端机器上,使用keytool工具将“.cer”证书转换成Java可以读取的“.jks”证书格式。
    • 在Linux系统中,执行如下命令转换证书。
      keytool -import -alias newname -keystore ./truststore.jks -file ./CloudSearchService.cer 
    • 在Windows系统中,执行如下命令转换证书。
      keytool -import -alias newname -keystore .\truststore.jks -file .\CloudSearchService.cer

    其中,newname是由用户自定义的证书名称。

    该命令执行后,会提示设置证书密码,并确认密码。请保存该密码,后续接入集群会使用。

相关文档