通过Rest High Level Client接入Elasticsearch集群
Elasticsearch官方提供了SDK(Rest High level Client)方式连接集群,Rest Client客户端对Elasticsearch的API进行了封装,用户只需要构造对应的结构即可对ES集群进行访问。Rest Client的详细使用请参考官方文档:https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/master/index.html
本文介绍通过Rest High level Client访问CSS集群的配置说明。Rest High level Client接入集群有3种方式:
- 通过Rest High Level Client连接非安全集群:适用于非安全模式的集群
- 通过Rest High Level Client连接安全集群(不使用安全证书):适用于安全模式+HTTP协议的集群、安全模式+HTTPS协议的集群(忽略证书)
- 通过Rest High Level Client连接安全集群(使用安全证书):适用于安全模式+HTTPS协议的集群
约束限制
建议Rest High Level Client的版本和Elasticsearch的版本保持一致,例如需要访问的ES集群版本是7.6.2,则使用的Rest High Level Client客户端版本建议也是7.6.2。如果您使用相比Elasticsearch集群更高版本的Java Rest High Level Client且存在少量请求的兼容性问题,您可以使用“RestHighLevelClient.getLowLevelClient()”方式直接获取Low Level Client,实现自定义的Elasticsearch请求内容。
前提条件
- CSS集群处于可用状态。
- 确保运行Java代码的服务器与CSS集群的网络是互通的。
- 根据集群选择的网络配置方式,获取集群的访问地址,具体操作请参见网络配置。
- 确认服务器已安装JDK1.8,JDK1.8官网下载地址:http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html。
- 服务器已引入Java依赖。
其中7.6.2为Elasticsearch Java客户端的版本号。
- Maven方式引入:
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.6.2</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.6.2</version> </dependency>
- Gradle方式引入:
compile group: 'org.elasticsearch.client', name: 'elasticsearch-rest-high-level-client', version: '7.6.2'
- Maven方式引入:
通过Rest High Level Client连接非安全集群
通过Rest High Level Client连接非安全集群,并查询test索引是否存在。代码示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
import org.apache.http.HttpHost; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.indices.GetIndexRequest; import java.io.IOException; import java.util.Arrays; import java.util.List; /** * Rest Hive Level 连接非安全集群 */ public class Main { public static void main(String[] args) throws IOException { List<String> host = Arrays.asList("x.x.x.x", "x.x.x.x"); RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, 9200, "http")); final RestHighLevelClient client = new RestHighLevelClient(builder); GetIndexRequest indexRequest = new GetIndexRequest("test"); boolean exists = client.indices().exists(indexRequest, RequestOptions.DEFAULT); System.out.println(exists); client.close(); } /** * constructHttpHosts函数转换host集群节点ip列表。 */ public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) { return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new); } } |
其中,host为集群访问地址,当存在多个IP地址时,中间用“,”隔开;test为查询的索引名称。
通过Rest High Level Client连接安全集群(不使用安全证书)
该场景适用于连接2种集群:安全模式+HTTP协议的集群、安全模式+HTTPS协议的集群(忽略证书)。
代码示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.indices.GetIndexRequest; import org.elasticsearch.common.Nullable; import java.io.IOException; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.Arrays; import java.util.List; import java.util.Objects; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSession; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; /** * Rest High Level连接安全集群(不使用证书) */ public class Main { /** * 创建客户端的类,定义create函数用于创建客户端。 */ public static RestHighLevelClient create(List<String> host, int port, String protocol, int connectTimeout, int connectionRequestTimeout, int socketTimeout, String username, String password) throws IOException{ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); SSLContext sc = null; try { sc = SSLContext.getInstance("SSL"); sc.init(null, trustAllCerts, new SecureRandom()); } catch (KeyManagementException | NoSuchAlgorithmException e) { e.printStackTrace(); } SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sc, new NullHostNameVerifier()); SecuredHttpClientConfigCallback httpClientConfigCallback = new SecuredHttpClientConfigCallback(sessionStrategy, credentialsProvider); RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, port, protocol)) .setRequestConfigCallback(requestConfig -> requestConfig.setConnectTimeout(connectTimeout) .setConnectionRequestTimeout(connectionRequestTimeout) .setSocketTimeout(socketTimeout)) .setHttpClientConfigCallback(httpClientConfigCallback); final RestHighLevelClient client = new RestHighLevelClient(builder); logger.info("es rest client build success {} ", client); ClusterHealthRequest request = new ClusterHealthRequest(); ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT); logger.info("es rest client health response {} ", response); return client; } /** * constructHttpHosts函数转换host集群节点ip列表。 */ public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) { return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new); } /** * trustAllCerts忽略证书配置。 */ public static TrustManager[] trustAllCerts = new TrustManager[] { new X509TrustManager() { @Override public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { } @Override public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { } @Override public X509Certificate[] getAcceptedIssuers() { return null; } } }; private static final Logger logger = LogManager.getLogger(Main.class); static class SecuredHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback { @Nullable private final CredentialsProvider credentialsProvider; /** * The {@link SSLIOSessionStrategy} for all requests to enable SSL / TLS encryption. */ private final SSLIOSessionStrategy sslStrategy; /** * Create a new {@link SecuredHttpClientConfigCallback}. * * @param credentialsProvider The credential provider, if a username/password have been supplied * @param sslStrategy The SSL strategy, if SSL / TLS have been supplied * @throws NullPointerException if {@code sslStrategy} is {@code null} */ SecuredHttpClientConfigCallback(final SSLIOSessionStrategy sslStrategy, @Nullable final CredentialsProvider credentialsProvider) { this.sslStrategy = Objects.requireNonNull(sslStrategy); this.credentialsProvider = credentialsProvider; } /** * Get the {@link CredentialsProvider} that will be added to the HTTP client. * * @return Can be {@code null}. */ @Nullable CredentialsProvider getCredentialsProvider() { return credentialsProvider; } /** * Get the {@link SSLIOSessionStrategy} that will be added to the HTTP client. * * @return Never {@code null}. */ SSLIOSessionStrategy getSSLStrategy() { return sslStrategy; } /** * Sets the {@linkplain HttpAsyncClientBuilder#setDefaultCredentialsProvider(CredentialsProvider) credential provider}, * * @param httpClientBuilder The client to configure. * @return Always {@code httpClientBuilder}. */ @Override public HttpAsyncClientBuilder customizeHttpClient(final HttpAsyncClientBuilder httpClientBuilder) { // enable SSL / TLS httpClientBuilder.setSSLStrategy(sslStrategy); // enable user authentication if (credentialsProvider != null) { httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } return httpClientBuilder; } } public static class NullHostNameVerifier implements HostnameVerifier { @Override public boolean verify(String arg0, SSLSession arg1) { return true; } } /** * main函数参考如下,调用上面的create函数创建客户端,查询“test”索引是否存在。 */ public static void main(String[] args) throws IOException { RestHighLevelClient client = create(Arrays.asList("x.x.x.x", "x.x.x.x"), 9200, "https", 1000, 1000, 1000, "username", "password"); GetIndexRequest indexRequest = new GetIndexRequest("test"); boolean exists = client.indices().exists(indexRequest, RequestOptions.DEFAULT); System.out.println(exists); client.close(); } } |
参数 |
描述 |
---|---|
host |
Elasticsearch集群访问地址,当存在多个IP地址时,中间用“,”隔开。 |
port |
Elasticsearch集群的连接端口,默认是“9200”。 |
protocol |
连接协议,“http”或者“https”,根据集群实际情况填写。 |
connectTimeout |
socket连接超时时间。 |
connectionRequestTimeout |
socket连接请求超时时间。 |
socketTimeout |
socket请求超时时间。 |
username |
访问集群的用户名。 |
password |
用户名对应的密码。 |
通过Rest High Level Client连接安全集群(使用安全证书)
该场景适用于使用安全证书连接安全模式+HTTPS协议的集群。
- 获取安全证书(CloudSearchService.cer)。
- 登录云搜索服务控制台。
- 选择“集群管理”进入集群列表。
- 单击对应集群的名称,进入集群基本信息页面。
- 在“基本信息”页面,单击“HTTPS访问”后面的“下载证书”。
图1 下载证书
- 转换安全证书(CloudSearchService.cer)。将下载的安全证书上传到客户端机器上,使用keytool工具将“.cer”证书转换成Java可以读取的“.jks”证书格式。
- 在Linux系统中,执行如下命令转换证书。
keytool -import -alias newname -keystore ./truststore.jks -file ./CloudSearchService.cer
- 在Windows系统中,执行如下命令转换证书。
keytool -import -alias newname -keystore .\truststore.jks -file .\CloudSearchService.cer
其中,newname是由用户自定义的证书名称。
该命令执行后,会提示设置证书密码,并确认密码。请保存该密码,后续接入集群会使用。
- 在Linux系统中,执行如下命令转换证书。
- 接入集群。代码示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.indices.GetIndexRequest; import org.elasticsearch.common.Nullable; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.security.KeyStore; import java.security.SecureRandom; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.Arrays; import java.util.List; import java.util.Objects; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.X509TrustManager; /** * Rest Hive Level连接安全集群(使用https证书) */ public class Main { public static RestHighLevelClient create(List<String> host, int port, String protocol, int connectTimeout, int connectionRequestTimeout, int socketTimeout, String username, String password, String cerFilePath, String cerPassword) throws IOException { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); SSLContext sc = null; try { TrustManager[] tm = {new MyX509TrustManager(cerFilePath, cerPassword)}; sc = SSLContext.getInstance("SSL", "SunJSSE"); //也可以使用SSLContext sslContext = SSLContext.getInstance("TLSv1.2"); sc.init(null, tm, new SecureRandom()); } catch (Exception e) { e.printStackTrace(); } SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sc, new NoopHostnameVerifier()); SecuredHttpClientConfigCallback httpClientConfigCallback = new SecuredHttpClientConfigCallback(sessionStrategy, credentialsProvider); RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, port, protocol)) .setRequestConfigCallback(requestConfig -> requestConfig.setConnectTimeout(connectTimeout) .setConnectionRequestTimeout(connectionRequestTimeout) .setSocketTimeout(socketTimeout)) .setHttpClientConfigCallback(httpClientConfigCallback); final RestHighLevelClient client = new RestHighLevelClient(builder); logger.info("es rest client build success {} ", client); ClusterHealthRequest request = new ClusterHealthRequest(); ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT); logger.info("es rest client health response {} ", response); return client; } /** * constructHttpHosts函数转换host集群节点ip列表。 */ public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) { return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new); } /** * SecuredHttpClientConfigCallback类定义。 */ static class SecuredHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback { @Nullable private final CredentialsProvider credentialsProvider; private final SSLIOSessionStrategy sslStrategy; SecuredHttpClientConfigCallback(final SSLIOSessionStrategy sslStrategy, @Nullable final CredentialsProvider credentialsProvider) { this.sslStrategy = Objects.requireNonNull(sslStrategy); this.credentialsProvider = credentialsProvider; } @Nullable CredentialsProvider getCredentialsProvider() { return credentialsProvider; } SSLIOSessionStrategy getSSLStrategy() { return sslStrategy; } @Override public HttpAsyncClientBuilder customizeHttpClient(final HttpAsyncClientBuilder httpClientBuilder) { httpClientBuilder.setSSLStrategy(sslStrategy); if (credentialsProvider != null) { httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } return httpClientBuilder; } } private static final Logger logger = LogManager.getLogger(Main.class); public static class MyX509TrustManager implements X509TrustManager { X509TrustManager sunJSSEX509TrustManager; MyX509TrustManager(String cerFilePath, String cerPassword) throws Exception { File file = new File(cerFilePath); if (!file.isFile()) { throw new Exception("Wrong Certification Path"); } System.out.println("Loading KeyStore " + file + "..."); InputStream in = new FileInputStream(file); KeyStore ks = KeyStore.getInstance("JKS"); ks.load(in, cerPassword.toCharArray()); TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509", "SunJSSE"); tmf.init(ks); TrustManager[] tms = tmf.getTrustManagers(); for (TrustManager tm : tms) { if (tm instanceof X509TrustManager) { sunJSSEX509TrustManager = (X509TrustManager) tm; return; } } throw new Exception("Couldn't initialize"); } @Override public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { } @Override public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { } @Override public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; } } /** * main函数参考如下,调用上面的create函数创建客户端,查询“test”索引是否存在。 */ public static void main(String[] args) throws IOException { RestHighLevelClient client = create(Arrays.asList("xxx.xxx.xxx.xxx", "xxx.xxx.xxx.xxx"), 9200, "https", 1000, 1000, 1000, "username", "password", "cerFilePath", "cerPassword"); GetIndexRequest indexRequest = new GetIndexRequest("test"); boolean exists = client.indices().exists(indexRequest, RequestOptions.DEFAULT); System.out.println(exists); client.close(); } }
表2 函数中的参数说明 参数
描述
host
Elasticsearch集群访问地址,当存在多个IP地址时,中间用“,”隔开。
port
Elasticsearch集群的连接端口,默认是“9200”。
protocol
连接协议,此处填写“https”。
connectTimeout
socket连接超时时间。
connectionRequestTimeout
socket连接请求超时时间。
socketTimeout
socket请求超时时间。
username
访问集群的用户名。
password
用户名对应的密码。
cerFilePath
证书路径。
cerPassword
证书密码。