通过Rest High Level Client接入Elasticsearch集群
Elasticsearch官方提供了SDK(Rest High level Client)方式连接集群,Rest Client客户端对Elasticsearch的API进行了封装,用户只需要构造对应的结构即可对ES集群进行访问。Rest Client的详细使用请参考官方文档:https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/master/index.html
本文介绍通过Rest High level Client访问CSS集群的配置说明。Rest High level Client接入集群有3种方式:
- 通过Rest High Level Client连接非安全集群:适用于非安全模式的集群
- 通过Rest High Level Client连接安全集群(不使用安全证书):适用于安全模式+HTTP协议的集群、安全模式+HTTPS协议的集群(忽略证书)
- 通过Rest High Level Client连接安全集群(使用安全证书):适用于安全模式+HTTPS协议的集群
建议Rest High Level Client的版本和Elasticsearch的版本保持一致,例如需要访问的ES集群版本是7.6.2,则使用的Rest High Level Client客户端版本建议也是7.6.2。如果您使用相比Elasticsearch集群更高版本的Java Rest High Level Client且存在少量请求的兼容性问题,您可以使用“RestHighLevelClient.getLowLevelClient()”方式直接获取Low Level Client,实现自定义的Elasticsearch请求内容。
- CSS集群处于可用状态。
- 确保运行Java代码的服务器与CSS集群的网络是互通的。
- 根据集群选择的网络配置方式,获取集群的访问地址,具体操作请参见网络配置。
- 确认服务器已安装JDK1.8,JDK1.8官网下载地址:http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html。
- 服务器已引入Java依赖。
其中7.6.2为Elasticsearch Java客户端的版本号。
- Maven方式引入:
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.6.2</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.6.2</version> </dependency>
- Gradle方式引入:
compile group: 'org.elasticsearch.client', name: 'elasticsearch-rest-high-level-client', version: '7.6.2'
- Maven方式引入:
通过Rest High Level Client连接非安全集群
通过Rest High Level Client连接非安全集群,并查询test索引是否存在。代码示例如下:
import org.apache.http.HttpHost; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.indices.GetIndexRequest; import java.io.IOException; import java.util.Arrays; import java.util.List; /** * Rest Hive Level 连接非安全集群 */ public class Main { public static void main(String[] args) throws IOException { List<String> host = Arrays.asList("x.x.x.x", "x.x.x.x"); RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, 9200, "http")); final RestHighLevelClient client = new RestHighLevelClient(builder); GetIndexRequest indexRequest = new GetIndexRequest("test"); boolean exists = client.indices().exists(indexRequest, RequestOptions.DEFAULT); System.out.println(exists); client.close(); } /** * constructHttpHosts函数转换host集群节点ip列表。 */ public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) { return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new); } } |
通过Rest High Level Client连接安全集群(不使用安全证书)
import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.indices.GetIndexRequest; import org.elasticsearch.common.Nullable; import java.io.IOException; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.Arrays; import java.util.List; import java.util.Objects; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSession; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; /** * Rest High Level连接安全集群(不使用证书) */ public class Main { /** * 创建客户端的类,定义create函数用于创建客户端。 */ public static RestHighLevelClient create(List<String> host, int port, String protocol, int connectTimeout, int connectionRequestTimeout, int socketTimeout, String username, String password) throws IOException{ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); SSLContext sc = null; try { sc = SSLContext.getInstance("SSL"); sc.init(null, trustAllCerts, new SecureRandom()); } catch (KeyManagementException | NoSuchAlgorithmException e) { e.printStackTrace(); } SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sc, new NullHostNameVerifier()); SecuredHttpClientConfigCallback httpClientConfigCallback = new SecuredHttpClientConfigCallback(sessionStrategy, credentialsProvider); RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, port, protocol)) .setRequestConfigCallback(requestConfig -> requestConfig.setConnectTimeout(connectTimeout) .setConnectionRequestTimeout(connectionRequestTimeout) .setSocketTimeout(socketTimeout)) .setHttpClientConfigCallback(httpClientConfigCallback); final RestHighLevelClient client = new RestHighLevelClient(builder); logger.info("es rest client build success {} ", client); ClusterHealthRequest request = new ClusterHealthRequest(); ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT); logger.info("es rest client health response {} ", response); return client; } /** * constructHttpHosts函数转换host集群节点ip列表。 */ public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) { return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new); } /** * trustAllCerts忽略证书配置。 */ public static TrustManager[] trustAllCerts = new TrustManager[] { new X509TrustManager() { @Override public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { } @Override public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { } @Override public X509Certificate[] getAcceptedIssuers() { return null; } } }; private static final Logger logger = LogManager.getLogger(Main.class); static class SecuredHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback { @Nullable private final CredentialsProvider credentialsProvider; /** * The {@link SSLIOSessionStrategy} for all requests to enable SSL / TLS encryption. */ private final SSLIOSessionStrategy sslStrategy; /** * Create a new {@link SecuredHttpClientConfigCallback}. * * @param credentialsProvider The credential provider, if a username/password have been supplied * @param sslStrategy The SSL strategy, if SSL / TLS have been supplied * @throws NullPointerException if {@code sslStrategy} is {@code null} */ SecuredHttpClientConfigCallback(final SSLIOSessionStrategy sslStrategy, @Nullable final CredentialsProvider credentialsProvider) { this.sslStrategy = Objects.requireNonNull(sslStrategy); this.credentialsProvider = credentialsProvider; } /** * Get the {@link CredentialsProvider} that will be added to the HTTP client. * * @return Can be {@code null}. */ @Nullable CredentialsProvider getCredentialsProvider() { return credentialsProvider; } /** * Get the {@link SSLIOSessionStrategy} that will be added to the HTTP client. * * @return Never {@code null}. */ SSLIOSessionStrategy getSSLStrategy() { return sslStrategy; } /** * Sets the {@linkplain HttpAsyncClientBuilder#setDefaultCredentialsProvider(CredentialsProvider) credential provider}, * * @param httpClientBuilder The client to configure. * @return Always {@code httpClientBuilder}. */ @Override public HttpAsyncClientBuilder customizeHttpClient(final HttpAsyncClientBuilder httpClientBuilder) { // enable SSL / TLS httpClientBuilder.setSSLStrategy(sslStrategy); // enable user authentication if (credentialsProvider != null) { httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } return httpClientBuilder; } } public static class NullHostNameVerifier implements HostnameVerifier { @Override public boolean verify(String arg0, SSLSession arg1) { return true; } } /** * main函数参考如下,调用上面的create函数创建客户端,查询“test”索引是否存在。 */ public static void main(String[] args) throws IOException { RestHighLevelClient client = create(Arrays.asList("x.x.x.x", "x.x.x.x"), 9200, "https", 1000, 1000, 1000, "username", "password"); GetIndexRequest indexRequest = new GetIndexRequest("test"); boolean exists = client.indices().exists(indexRequest, RequestOptions.DEFAULT); System.out.println(exists); client.close(); } } |
参数 |
描述 |
host |
Elasticsearch集群访问地址,当存在多个IP地址时,中间用“,”隔开。 |
port |
Elasticsearch集群的连接端口,默认是“9200”。 |
protocol |
连接协议,“http”或者“https”,根据集群实际情况填写。 |
connectTimeout |
socket连接超时时间。 |
connectionRequestTimeout |
socket连接请求超时时间。 |
socketTimeout |
socket请求超时时间。 |
username |
访问集群的用户名。 |
password |
用户名对应的密码。 |
通过Rest High Level Client连接安全集群(使用安全证书)
- 获取安全证书(CloudSearchService.cer)。
- 登录云搜索服务控制台。
- 选择“集群管理”进入集群列表。
- 单击对应集群的名称,进入集群基本信息页面。
- 在“基本信息”页面,单击“HTTPS访问”后面的“下载证书”。
图1 下载证书
- 转换安全证书(CloudSearchService.cer)。将下载的安全证书上传到客户端机器上,使用keytool工具将“.cer”证书转换成Java可以读取的“.jks”证书格式。
- 在Linux系统中,执行如下命令转换证书。
keytool -import -alias newname -keystore ./truststore.jks -file ./CloudSearchService.cer
- 在Windows系统中,执行如下命令转换证书。
keytool -import -alias newname -keystore .\truststore.jks -file .\CloudSearchService.cer
- 在Linux系统中,执行如下命令转换证书。
- 接入集群。代码示例如下:
import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.indices.GetIndexRequest; import org.elasticsearch.common.Nullable; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.security.KeyStore; import java.security.SecureRandom; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.Arrays; import java.util.List; import java.util.Objects; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.X509TrustManager; /** * Rest Hive Level连接安全集群(使用https证书) */ public class Main { public static RestHighLevelClient create(List<String> host, int port, String protocol, int connectTimeout, int connectionRequestTimeout, int socketTimeout, String username, String password, String cerFilePath, String cerPassword) throws IOException { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); SSLContext sc = null; try { TrustManager[] tm = {new MyX509TrustManager(cerFilePath, cerPassword)}; sc = SSLContext.getInstance("SSL", "SunJSSE"); //也可以使用SSLContext sslContext = SSLContext.getInstance("TLSv1.2"); sc.init(null, tm, new SecureRandom()); } catch (Exception e) { e.printStackTrace(); } SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sc, new NoopHostnameVerifier()); SecuredHttpClientConfigCallback httpClientConfigCallback = new SecuredHttpClientConfigCallback(sessionStrategy, credentialsProvider); RestClientBuilder builder = RestClient.builder(constructHttpHosts(host, port, protocol)) .setRequestConfigCallback(requestConfig -> requestConfig.setConnectTimeout(connectTimeout) .setConnectionRequestTimeout(connectionRequestTimeout) .setSocketTimeout(socketTimeout)) .setHttpClientConfigCallback(httpClientConfigCallback); final RestHighLevelClient client = new RestHighLevelClient(builder); logger.info("es rest client build success {} ", client); ClusterHealthRequest request = new ClusterHealthRequest(); ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT); logger.info("es rest client health response {} ", response); return client; } /** * constructHttpHosts函数转换host集群节点ip列表。 */ public static HttpHost[] constructHttpHosts(List<String> host, int port, String protocol) { return host.stream().map(p -> new HttpHost(p, port, protocol)).toArray(HttpHost[]::new); } /** * SecuredHttpClientConfigCallback类定义。 */ static class SecuredHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback { @Nullable private final CredentialsProvider credentialsProvider; private final SSLIOSessionStrategy sslStrategy; SecuredHttpClientConfigCallback(final SSLIOSessionStrategy sslStrategy, @Nullable final CredentialsProvider credentialsProvider) { this.sslStrategy = Objects.requireNonNull(sslStrategy); this.credentialsProvider = credentialsProvider; } @Nullable CredentialsProvider getCredentialsProvider() { return credentialsProvider; } SSLIOSessionStrategy getSSLStrategy() { return sslStrategy; } @Override public HttpAsyncClientBuilder customizeHttpClient(final HttpAsyncClientBuilder httpClientBuilder) { httpClientBuilder.setSSLStrategy(sslStrategy); if (credentialsProvider != null) { httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } return httpClientBuilder; } } private static final Logger logger = LogManager.getLogger(Main.class); public static class MyX509TrustManager implements X509TrustManager { X509TrustManager sunJSSEX509TrustManager; MyX509TrustManager(String cerFilePath, String cerPassword) throws Exception { File file = new File(cerFilePath); if (!file.isFile()) { throw new Exception("Wrong Certification Path"); } System.out.println("Loading KeyStore " + file + "..."); InputStream in = new FileInputStream(file); KeyStore ks = KeyStore.getInstance("JKS"); ks.load(in, cerPassword.toCharArray()); TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509", "SunJSSE"); tmf.init(ks); TrustManager[] tms = tmf.getTrustManagers(); for (TrustManager tm : tms) { if (tm instanceof X509TrustManager) { sunJSSEX509TrustManager = (X509TrustManager) tm; return; } } throw new Exception("Couldn't initialize"); } @Override public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { } @Override public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { } @Override public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; } } /** * main函数参考如下,调用上面的create函数创建客户端,查询“test”索引是否存在。 */ public static void main(String[] args) throws IOException { RestHighLevelClient client = create(Arrays.asList("xxx.xxx.xxx.xxx", "xxx.xxx.xxx.xxx"), 9200, "https", 1000, 1000, 1000, "username", "password", "cerFilePath", "cerPassword"); GetIndexRequest indexRequest = new GetIndexRequest("test"); boolean exists = client.indices().exists(indexRequest, RequestOptions.DEFAULT); System.out.println(exists); client.close(); } }
表2 函数中的参数说明 参数