更新时间:2026-01-09 GMT+08:00
分享

通过ElasticsearchClient接入OpenSearch集群

CSS服务的OpenSearch集群支持通过Elasticsearch Java API Client进行数据查询和管理操作。Java API Client作为Elasticsearch 8.x版本提供的官方Java客户端,对Elasticsearch的原生API进行了封装,用户只需构造相应的请求结构,即可实现对OpenSearch集群的访问。Elasticsearch Java API Client的详细使用指导请参见Java API Client

CSS服务的OpenSearch集群虽然兼容Elasticsearch Java API Client连接方式,但需注意高版本客户端的某些特性可能不受支持。为确保功能兼容性,建议在使用前确认目标接口已在CSS集群中实现并验证通过。

前提条件

  • OpenSearch集群处于可用状态,且集群版本号是2.19.0、镜像版本号不低于2.19.0_25.6.1_xxx。
  • 运行Java代码的服务器与OpenSearch集群之间网络互通。
  • 根据集群的网络配置方式,获取集群的访问地址,操作指导请参见网络配置
  • 确认服务器已安装Java,要求JDK版本为11及以上,JDK11官网下载地址:Java Archive Downloads

引入依赖

在运行Java代码的服务器中引入Java API Client依赖,有如下两种方式:

  • Maven方式:

    使用时需要将8.19.0替换为实际Java客户端的版本号。

    <dependencies>
    	<dependency>
    		<groupId>co.elastic.clients</groupId>
    		<artifactId>elasticsearch-java</artifactId>
    		<version>8.19.0</version>
    	</dependency>
    	<dependency>
    		<groupId>com.fasterxml.jackson.core</groupId>
    		<artifactId>jackson-databind</artifactId>
    		<version>2.17.0</version>
    	</dependency>
    </dependencies>
  • Gradle方式:

    使用时需要将8.19.0替换为实际Java客户端的版本号。

    dependencies {
        implementation 'co.elastic.clients:elasticsearch-java:8.19.0'
        implementation 'com.fasterxml.jackson.core:jackson-databind:2.17.0'
    }

接入集群

当使用Java API Client访问不同安全类型的OpenSearch集群时,示例代码会有差异,请根据实际业务场景选择参考文档。

表1 接入集群的场景介绍

OpenSearch集群安全类型

是否加载安全证书

参考文档

非安全模式

-

ElasticsearchClient连接非安全集群

安全模式+HTTP协议

安全模式+HTTPS协议

ElasticsearchClient连接安全集群(无证书)

安全模式+HTTPS协议

ElasticsearchClient连接安全集群(加载证书)

ElasticsearchClient连接非安全集群

使用Elasticsearch Java API Client连接非安全模式的OpenSearch集群,并查询索引“test”是否存在。代码示例如下:

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
import co.elastic.clients.elasticsearch.indices.DeleteIndexResponse;
import co.elastic.clients.elasticsearch.indices.GetIndexResponse;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.elasticsearch.client.RestClient;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;

public class Main {

    /**
     * constructHttpHosts函数转换host集群节点IP列表。
     */
    public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) {
        return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new);
    }

    private static ElasticsearchClient create(List<String> host, int port, String protocol, int connectTimeout,
        int connectionRequestTimeout, int socketTimeout, String username, String password, String certFilePath,
        String certPassword) throws IOException {
        // Create the low-level client
        RestClient restClient = RestClient.builder(constructHttpHosts(host, port, protocol))
            .setRequestConfigCallback(requestConfig -> requestConfig.setConnectTimeout(connectTimeout)
                .setConnectionRequestTimeout(connectionRequestTimeout)
                .setSocketTimeout(socketTimeout))
            .setHttpClientConfigCallback(httpClientBuilder -> {
                // enable user authentication
                if (username != null && password != null) {
                    final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                    credentialsProvider.setCredentials(AuthScope.ANY,
                        new UsernamePasswordCredentials(username, password));
                    httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                }
                // set keepalive
                httpClientBuilder.setKeepAliveStrategy(((httpResponse, httpContext) -> TimeUnit.MINUTES.toMinutes(10)));
                // enable SSL / TLS
                SSLContext sc = (certFilePath != null && certPassword != null) ?
                    createContextFromCaCert(certFilePath, certPassword) : createTrustAllCertsContext();

                SSLIOSessionStrategy sslStrategy = new SSLIOSessionStrategy(sc, new NoopHostnameVerifier());
                httpClientBuilder.setSSLStrategy(sslStrategy);

                httpClientBuilder.setMaxConnTotal(500);
                httpClientBuilder.setMaxConnPerRoute(300);
                return httpClientBuilder;
            })
            .build();

        // Create the transport with a Jackson mapper
        ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());

        // And create the API client
        return new ElasticsearchClient(transport);
    }

    private static SSLContext createTrustAllCertsContext() {
        try {
            SSLContext sslContext = SSLContext.getInstance("SSL");
            sslContext.init(null, trustAllCerts, new SecureRandom());
            return sslContext;
        } catch (NoSuchAlgorithmException | KeyManagementException e) {
            throw new RuntimeException("Can not create the SSLContext", e);
        }
    }

    private static SSLContext createContextFromCaCert(String certFilePath, String certPassword) {
        try {
            // enable SSL / TLS
            TrustManager[] tm = {new MyX509TrustManager(certFilePath, certPassword)};
            SSLContext sc = SSLContext.getInstance("SSL", "SunJSSE");
            //也可以使用SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
            sc.init(null, tm, new SecureRandom());
            return sc;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * trustAllCerts忽略证书配置
     */
    private static TrustManager[] trustAllCerts = new TrustManager[] {
        new X509TrustManager() {
            @Override
            public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
            }

            @Override
            public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
            }

            @Override
            public X509Certificate[] getAcceptedIssuers() {
                return null;
            }
        }
    };

    /**
     * 证书配置认证
     */
    private static class MyX509TrustManager implements X509TrustManager {
        X509TrustManager sunJSSEX509TrustManager;

        MyX509TrustManager(String certFilePath, String certPassword) throws Exception {
            File file = new File(certFilePath);
            if (!file.isFile()) {
                throw new Exception("Wrong Certification Path");
            }
            System.out.println("Loading KeyStore " + file + "...");
            InputStream in = new FileInputStream(file);
            KeyStore ks = KeyStore.getInstance("JKS");
            ks.load(in, certPassword.toCharArray());
            TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509", "SunJSSE");
            tmf.init(ks);
            TrustManager[] tms = tmf.getTrustManagers();
            for (TrustManager tm : tms) {
                if (tm instanceof X509TrustManager) {
                    sunJSSEX509TrustManager = (X509TrustManager) tm;
                    return;
                }
            }
            throw new Exception("Couldn't initialize");
        }

        @Override
        public void checkClientTrusted(X509Certificate[] chain, String authType) {

        }

        @Override
        public void checkServerTrusted(X509Certificate[] chain, String authType) {

        }

        @Override
        public X509Certificate[] getAcceptedIssuers() {
            return new X509Certificate[0];
        }
    }

    /**
     * main函数参考如下,调用上面的create函数创建客户端,查询“test”索引是否存在。
     */
    public static void main(String[] args) throws IOException {
        ElasticsearchClient esClient = create(Arrays.asList("{访问集群的IP地址}"), 9200, "https", 1000, 1000, 1000, null, null, null, null);

        CreateIndexResponse indexRequest = esClient.indices()
            .create(createIndexBuilder -> createIndexBuilder.index("test"));
        boolean acknowledged = indexRequest.acknowledged();
        System.out.println("Create index successfully! " + acknowledged);

        GetIndexResponse getIndexResponse = esClient.indices().get(getIndexRequest -> getIndexRequest.index("test"));
        System.out.println("Query index successfully! \n" + getIndexResponse.toString());

        DeleteIndexResponse deleteResponse = esClient.indices()
            .delete(createIndexBuilder -> createIndexBuilder.index("test"));
        System.out.println("Delete index successfully! \n" + deleteResponse.toString());
    }
}

该示例代码为判断集群是否存在test索引,当返回“true”(索引存在)或“false”(索引不存在)时,表示正常返回查询结果,集群连接成功。

ElasticsearchClient连接安全集群(无证书)

使用Elasticsearch Java API Client在不加载安全证书的情况下连接安全模式的OpenSearch集群(支持HTTP协议或HTTPS协议),查询索引“test”是否存在。代码示例如下:

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
import co.elastic.clients.elasticsearch.indices.DeleteIndexResponse;
import co.elastic.clients.elasticsearch.indices.GetIndexResponse;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.elasticsearch.client.RestClient;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;

public class Main {

    /**
     * constructHttpHosts函数转换host集群节点IP列表。
     */
    public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) {
        return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new);
    }

    private static ElasticsearchClient create(List<String> host, int port, String protocol, int connectTimeout,
        int connectionRequestTimeout, int socketTimeout, String username, String password, String certFilePath,
        String certPassword) throws IOException {
        // Create the low-level client
        RestClient restClient = RestClient.builder(constructHttpHosts(host, port, protocol))
            .setRequestConfigCallback(requestConfig -> requestConfig.setConnectTimeout(connectTimeout)
                .setConnectionRequestTimeout(connectionRequestTimeout)
                .setSocketTimeout(socketTimeout))
            .setHttpClientConfigCallback(httpClientBuilder -> {
                // enable user authentication
                if (username != null && password != null) {
                    final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                    credentialsProvider.setCredentials(AuthScope.ANY,
                        new UsernamePasswordCredentials(username, password));
                    httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                }
                // set keepalive
                httpClientBuilder.setKeepAliveStrategy(((httpResponse, httpContext) -> TimeUnit.MINUTES.toMinutes(10)));
                // enable SSL / TLS
                SSLContext sc = (certFilePath != null && certPassword != null) ?
                    createContextFromCaCert(certFilePath, certPassword) : createTrustAllCertsContext();

                SSLIOSessionStrategy sslStrategy = new SSLIOSessionStrategy(sc, new NoopHostnameVerifier());
                httpClientBuilder.setSSLStrategy(sslStrategy);

                httpClientBuilder.setMaxConnTotal(500);
                httpClientBuilder.setMaxConnPerRoute(300);
                return httpClientBuilder;
            })
            .build();

        // Create the transport with a Jackson mapper
        ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());

        // And create the API client
        return new ElasticsearchClient(transport);
    }

    private static SSLContext createTrustAllCertsContext() {
        try {
            SSLContext sslContext = SSLContext.getInstance("SSL");
            sslContext.init(null, trustAllCerts, new SecureRandom());
            return sslContext;
        } catch (NoSuchAlgorithmException | KeyManagementException e) {
            throw new RuntimeException("Can not create the SSLContext", e);
        }
    }

    private static SSLContext createContextFromCaCert(String certFilePath, String certPassword) {
        try {
            // enable SSL / TLS
            TrustManager[] tm = {new MyX509TrustManager(certFilePath, certPassword)};
            SSLContext sc = SSLContext.getInstance("SSL", "SunJSSE");
            //也可以使用SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
            sc.init(null, tm, new SecureRandom());
            return sc;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * trustAllCerts忽略证书配置
     */
    private static TrustManager[] trustAllCerts = new TrustManager[] {
        new X509TrustManager() {
            @Override
            public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
            }

            @Override
            public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
            }

            @Override
            public X509Certificate[] getAcceptedIssuers() {
                return null;
            }
        }
    };

    /**
     * 证书配置认证
     */
    private static class MyX509TrustManager implements X509TrustManager {
        X509TrustManager sunJSSEX509TrustManager;

        MyX509TrustManager(String certFilePath, String certPassword) throws Exception {
            File file = new File(certFilePath);
            if (!file.isFile()) {
                throw new Exception("Wrong Certification Path");
            }
            System.out.println("Loading KeyStore " + file + "...");
            InputStream in = new FileInputStream(file);
            KeyStore ks = KeyStore.getInstance("JKS");
            ks.load(in, certPassword.toCharArray());
            TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509", "SunJSSE");
            tmf.init(ks);
            TrustManager[] tms = tmf.getTrustManagers();
            for (TrustManager tm : tms) {
                if (tm instanceof X509TrustManager) {
                    sunJSSEX509TrustManager = (X509TrustManager) tm;
                    return;
                }
            }
            throw new Exception("Couldn't initialize");
        }

        @Override
        public void checkClientTrusted(X509Certificate[] chain, String authType) {

        }

        @Override
        public void checkServerTrusted(X509Certificate[] chain, String authType) {

        }

        @Override
        public X509Certificate[] getAcceptedIssuers() {
            return new X509Certificate[0];
        }
    }

    /**
     * main函数参考如下,调用上面的create函数创建客户端,查询“test”索引是否存在。
     */
    public static void main(String[] args) throws IOException {
        ElasticsearchClient esClient = create(Arrays.asList("{访问集群的IP地址}"), 9200, "https", 1000, 1000, 1000, "{用户名}", "{密码}", null, null);

        CreateIndexResponse indexRequest = esClient.indices()
            .create(createIndexBuilder -> createIndexBuilder.index("test"));
        boolean acknowledged = indexRequest.acknowledged();
        System.out.println("Create index successfully! " + acknowledged);

        GetIndexResponse getIndexResponse = esClient.indices().get(getIndexRequest -> getIndexRequest.index("test"));
        System.out.println("Query index successfully! \n" + getIndexResponse.toString());

        DeleteIndexResponse deleteResponse = esClient.indices()
            .delete(createIndexBuilder -> createIndexBuilder.index("test"));
        System.out.println("Delete index successfully! \n" + deleteResponse.toString());
    }
}
表2 函数中的变量说明

参数

描述

host

集群访问地址,当存在多个IP地址时,中间用“,”隔开。

port

集群的连接端口,默认是“9200”

protocol

连接协议,“http”或者“https”,根据集群实际情况填写。

connectTimeout

socket连接超时时间(毫秒)。

connectionRequestTimeout

socket连接请求超时时间(毫秒)。

socketTimeout

socket请求超时时间(毫秒)。

username

访问集群的用户名。

password

用户名对应的密码。

该示例代码为判断集群是否存在test索引,当返回“true”(索引存在)或“false”(索引不存在)时,表示正常返回查询结果,集群连接成功。

ElasticsearchClient连接安全集群(加载证书)

使用Elasticsearch Java API Client在加载安全证书的情况下连接安全模式+HTTPS协议的OpenSearch集群,查询索引“test”是否存在。代码示例如下:

连接安全模式+HTTPS协议的OpenSearch集群需要先提前准备好安全证书,操作指导请参见获取并上传安全证书

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
import co.elastic.clients.elasticsearch.indices.DeleteIndexResponse;
import co.elastic.clients.elasticsearch.indices.GetIndexResponse;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.elasticsearch.client.RestClient;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;

public class Main {

    /**
     * constructHttpHosts函数转换host集群节点IP列表。
     */
    public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) {
        return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new);
    }

    private static ElasticsearchClient create(List<String> host, int port, String protocol, int connectTimeout,
        int connectionRequestTimeout, int socketTimeout, String username, String password, String certFilePath,
        String certPassword) throws IOException {
        // Create the low-level client
        RestClient restClient = RestClient.builder(constructHttpHosts(host, port, protocol))
            .setRequestConfigCallback(requestConfig -> requestConfig.setConnectTimeout(connectTimeout)
                .setConnectionRequestTimeout(connectionRequestTimeout)
                .setSocketTimeout(socketTimeout))
            .setHttpClientConfigCallback(httpClientBuilder -> {
                // enable user authentication
                if (username != null && password != null) {
                    final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                    credentialsProvider.setCredentials(AuthScope.ANY,
                        new UsernamePasswordCredentials(username, password));
                    httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                }
                // set keepalive
                httpClientBuilder.setKeepAliveStrategy(((httpResponse, httpContext) -> TimeUnit.MINUTES.toMinutes(10)));
                // enable SSL / TLS
                SSLContext sc = (certFilePath != null && certPassword != null) ?
                    createContextFromCaCert(certFilePath, certPassword) : createTrustAllCertsContext();

                SSLIOSessionStrategy sslStrategy = new SSLIOSessionStrategy(sc, new NoopHostnameVerifier());
                httpClientBuilder.setSSLStrategy(sslStrategy);

                httpClientBuilder.setMaxConnTotal(500);
                httpClientBuilder.setMaxConnPerRoute(300);
                return httpClientBuilder;
            })
            .build();

        // Create the transport with a Jackson mapper
        ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());

        // And create the API client
        return new ElasticsearchClient(transport);
    }

    private static SSLContext createTrustAllCertsContext() {
        try {
            SSLContext sslContext = SSLContext.getInstance("SSL");
            sslContext.init(null, trustAllCerts, new SecureRandom());
            return sslContext;
        } catch (NoSuchAlgorithmException | KeyManagementException e) {
            throw new RuntimeException("Can not create the SSLContext", e);
        }
    }

    private static SSLContext createContextFromCaCert(String certFilePath, String certPassword) {
        try {
            // enable SSL / TLS
            TrustManager[] tm = {new MyX509TrustManager(certFilePath, certPassword)};
            SSLContext sc = SSLContext.getInstance("SSL", "SunJSSE");
            //也可以使用SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
            sc.init(null, tm, new SecureRandom());
            return sc;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * trustAllCerts忽略证书配置
     */
    private static TrustManager[] trustAllCerts = new TrustManager[] {
        new X509TrustManager() {
            @Override
            public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
            }

            @Override
            public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
            }

            @Override
            public X509Certificate[] getAcceptedIssuers() {
                return null;
            }
        }
    };

    /**
     * 证书配置认证
     */
    private static class MyX509TrustManager implements X509TrustManager {
        X509TrustManager sunJSSEX509TrustManager;

        MyX509TrustManager(String certFilePath, String certPassword) throws Exception {
            File file = new File(certFilePath);
            if (!file.isFile()) {
                throw new Exception("Wrong Certification Path");
            }
            System.out.println("Loading KeyStore " + file + "...");
            InputStream in = new FileInputStream(file);
            KeyStore ks = KeyStore.getInstance("JKS");
            ks.load(in, certPassword.toCharArray());
            TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509", "SunJSSE");
            tmf.init(ks);
            TrustManager[] tms = tmf.getTrustManagers();
            for (TrustManager tm : tms) {
                if (tm instanceof X509TrustManager) {
                    sunJSSEX509TrustManager = (X509TrustManager) tm;
                    return;
                }
            }
            throw new Exception("Couldn't initialize");
        }

        @Override
        public void checkClientTrusted(X509Certificate[] chain, String authType) {

        }

        @Override
        public void checkServerTrusted(X509Certificate[] chain, String authType) {

        }

        @Override
        public X509Certificate[] getAcceptedIssuers() {
            return new X509Certificate[0];
        }
    }

    /**
     * main函数参考如下,调用上面的create函数创建客户端,查询“test”索引是否存在。
     */
    public static void main(String[] args) throws IOException {
        ElasticsearchClient esClient = create(Arrays.asList("{访问集群的IP地址}"), 9200, "https", 1000, 1000, 1000, "{用户名}", "{密码}", "{证书路径}", "{证书密码}");

        CreateIndexResponse indexRequest = esClient.indices()
            .create(createIndexBuilder -> createIndexBuilder.index("test"));
        boolean acknowledged = indexRequest.acknowledged();
        System.out.println("Create index successfully! " + acknowledged);

        GetIndexResponse getIndexResponse = esClient.indices().get(getIndexRequest -> getIndexRequest.index("test"));
        System.out.println("Query index successfully! \n" + getIndexResponse.toString());

        DeleteIndexResponse deleteResponse = esClient.indices()
            .delete(createIndexBuilder -> createIndexBuilder.index("test"));
        System.out.println("Delete index successfully! \n" + deleteResponse.toString());
    }
}
表3 函数中的参数说明

参数

描述

host

集群访问地址,当存在多个IP地址时,中间用“,”隔开。

port

集群的连接端口,默认是“9200”

protocol

连接协议,此处填写“https”

connectTimeout

socket连接超时时间(毫秒)。

connectionRequestTimeout

socket连接请求超时时间(毫秒)。

socketTimeout

socket请求超时时间(毫秒)。

username

访问集群的用户名。

password

用户名对应的密码。

certFilePath

安全证书路径。

certPassword

安全证书密码。

该示例代码为判断集群是否存在test索引,当返回“true”(索引存在)或“false”(索引不存在)时,表示正常返回查询结果,集群连接成功。

获取并上传安全证书

当接入安全模式+HTTPS协议的OpenSearch集群时,必须加载安全证书。可以参考如下步骤获取安全证书,并上传至客户端。

  1. 获取安全证书(CloudSearchService.cer)。
    1. 登录云搜索服务管理控制台
    2. 在左侧导航栏,选择“集群管理 > OpenSearch”
    3. 在集群列表,单击目标集群名称,进入集群详情页。
    4. 选择“概览”页签,在“网络信息”下方,单击“HTTPS访问”“下载证书”获取安全证书。
      图1 下载安全证书
  2. 转换安全证书(CloudSearchService.cer)。将下载的安全证书上传到客户端上,使用keytool工具将“.cer”证书转换成Java可以读取的“.jks”证书格式。
    • 在Linux系统中,执行如下命令转换证书。
      keytool -import -alias newname -keystore ./truststore.jks -file ./CloudSearchService.cer 
    • 在Windows系统中,执行如下命令转换证书。
      keytool -import -alias newname -keystore .\truststore.jks -file .\CloudSearchService.cer

    其中,newname是由用户自定义的证书名称。

    该命令执行后,会提示设置证书密码,并确认密码。请保存该密码,后续接入集群会使用。

FAQ:高并发场景如何增加客户端连接数?

在使用Elasticsearch Java API Client访问集群时,如果遇到高并发请求场景,可能会出现连接数不足的情况,导致请求延迟或失败。

在初始化Java API Client时,通过自定义HttpClientConfigCallback注入配置HttpClientBuilder来增加客户端的最大连接数,可以提升并发处理能力。

具体设置如下:

//设置最大总连接数为500,建议根据实际并发量评估该值
httpClientBuilder.setMaxConnTotal(500);
//设置每个路由的最大连接数为300,通常建议设置为总连接数的60%左右
httpClientBuilder.setMaxConnPerRoute(300);

相关文档