通过Python客户端接入OpenSearch集群
通过Python客户端连接CSS服务的OpenSearch集群,进行数据查询和管理。
OpenSearch Python Client或Elasticsearch Python Client是官方提供的Python库,用于与OpenSearch集群进行交互。
前提条件
- OpenSearch集群处于可用状态。
- 运行Python代码的服务器与OpenSearch集群之间网络互通。
- 根据集群的网络配置方式,获取集群的访问地址,操作指导请参见网络配置。
- 确认服务器已安装Python3。
安装Python客户端库
在运行Python代码的服务器中安装Python客户端库。
CSS服务支持使用Elasticsearch 7.10 Python客户端连接OpenSearch集群,但是为确保更好的兼容性,建议使用与OpenSearch集群同版本的Python客户端库。
可以根据实际使用的Python客户端类型选择参考示例。
- 场景一(推荐):使用OpenSearch Python客户端访问OpenSearch集群,安装OpenSearch Python客户端库。
pip install opensearch-py
- 场景二:使用Elasticsearch 7.10 Python客户端访问OpenSearch集群,安装Elasticsearch 7.10 Python客户端库。
pip install Elasticsearch==7.10
接入集群
当使用不同类型的Python客户端访问不同安全类型的OpenSearch集群时,示例代码会有差异,请根据实际业务场景选择参考文档。
Python客户端类型 |
OpenSearch集群安全类型 |
参考文档 |
---|---|---|
OpenSearch |
非安全模式 |
|
安全模式+HTTP协议 |
||
安全模式+HTTPS协议 |
||
Elasticsearch 7.10.2 |
非安全模式 |
|
安全模式+HTTP协议 |
||
安全模式+HTTPS协议 |
OpenSearch Python连接非安全集群
使用OpenSearch Python客户端连接非安全模式的OpenSearch集群,并查询索引“test”是否存在。
使用集群连接信息创建客户端实例的代码示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
from opensearchpy import OpenSearch class OpenSearchFactory(object): def __init__(self, host: list, port: str, username: str, password: str): self.port = port self.host = host self.username = username self.password = password def create(self) -> OpenSearch: addrs = [] for host in self.host: addr = {'host': host, 'port': self.port} addrs.append(addr) if self.username and self.password: opensearch = OpenSearch(addrs, http_auth=(self.username, self.password)) else: opensearch = OpenSearch(addrs) return opensearch os = OpenSearchFactory(["{集群访问地址}"], "9200", None, None).create() print(os.indices.exists(index='test')) |
该程序为判断集群是否存在test索引,当返回“true”或“false”时,表示正常返回查询结果,集群连接成功。
OpenSearch Python连接HTTP协议的安全集群
使用OpenSearch Python客户端连接安全模式+HTTP协议的OpenSearch集群,并查询索引“test”是否存在。
使用集群连接信息创建客户端实例的代码示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
from opensearchpy import OpenSearch class OpenSearchFactory(object): def __init__(self, host: list, port: str, username: str, password: str): self.port = port self.host = host self.username = username self.password = password def create(self) -> OpenSearch: addrs = [] for host in self.host: addr = {'host': host, 'port': self.port} addrs.append(addr) if self.username and self.password: opensearch = OpenSearch(addrs, http_auth=(self.username, self.password)) else: opensearch = OpenSearch(addrs) return opensearch os = OpenSearchFactory(["xxx.xxx.xxx.xxx"], "9200", "username", "password").create() print(os.indices.exists(index='test')) |
参数 |
描述 |
---|---|
host |
集群的访问地址,当存在多个IP地址时,中间用“,”隔开。 |
port |
集群的连接端口,填写“9200”。 |
username |
访问集群的用户名。 |
password |
用户名对应的密码。 |
该程序为判断集群是否存在test索引,当返回“true”或“false”时,表示正常返回查询结果,集群连接成功。
OpenSearch Python连接HTTPS协议的安全集群
使用OpenSearch Python客户端连接安全模式+HTTPS协议的OpenSearch集群,并查询索引“test”是否存在。
使用集群连接信息创建客户端实例的代码示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
from opensearchpy import OpenSearch import ssl class OpenSearchFactory(object): def __init__(self, host: list, port: str, username: str, password: str): self.port = port self.host = host self.username = username self.password = password def create(self) -> OpenSearch: context = ssl._create_unverified_context() addrs = [] for host in self.host: addr = {'host': host, 'port': self.port} addrs.append(addr) if self.username and self.password: opensearch = OpenSearch(addrs, http_auth=(self.username, self.password), scheme="https", ssl_context=context) else: opensearch = OpenSearch(addrs) return opensearch os = OpenSearchFactory(["xxx.xxx.xxx.xxx"], "9200", "username", "password").create() print(os.indices.exists(index='test')) |
参数 |
描述 |
---|---|
host |
集群的访问地址,当存在多个IP地址时,中间用“,”隔开。 |
port |
集群的连接端口,填写“9200”。 |
username |
访问集群的用户名。 |
password |
用户名对应的密码。 |
该程序为判断集群是否存在test索引,当返回“true”或“false”时,表示正常返回查询结果,集群连接成功。
Elasticsearch Python连接非安全集群
使用Elasticsearch 7.10.2 Python客户端连接非安全模式的OpenSearch集群,并查询索引“test”是否存在。
使用集群连接信息创建客户端实例的代码示例如下:
from elasticsearch import Elasticsearch class ElasticFactory(object): def __init__(self, host: list, port: str, username: str, password: str): self.port = port self.host = host self.username = username self.password = password def create(self) -> Elasticsearch: addrs = [] for host in self.host: addr = {'host': host, 'port': self.port} addrs.append(addr) if self.username and self.password: elasticsearch = Elasticsearch(addrs, http_auth=(self.username, self.password)) else: elasticsearch = Elasticsearch(addrs) return elasticsearch es = ElasticFactory(["{集群访问地址}"], "9200", None, None).create() print(es.indices.exists(index='test'))
该程序为判断集群是否存在test索引,当返回“true”或“false”时,表示正常返回查询结果,集群连接成功。
Elasticsearch Python连接HTTP协议的安全集群
使用Elasticsearch 7.10.2 Python客户端连接安全模式+HTTP协议的OpenSearch集群,并查询索引“test”是否存在。
使用集群连接信息创建客户端实例的代码示例如下:
from elasticsearch import Elasticsearch class ElasticFactory(object): def __init__(self, host: list, port: str, username: str, password: str): self.port = port self.host = host self.username = username self.password = password def create(self) -> Elasticsearch: addrs = [] for host in self.host: addr = {'host': host, 'port': self.port} addrs.append(addr) if self.username and self.password: elasticsearch = Elasticsearch(addrs, http_auth=(self.username, self.password)) else: elasticsearch = Elasticsearch(addrs) return elasticsearch es = ElasticFactory(["xxx.xxx.xxx.xxx"], "9200", "username", "password").create() print(es.indices.exists(index='test'))
参数 |
描述 |
---|---|
host |
集群的访问地址,当存在多个IP地址时,中间用“,”隔开。 |
port |
集群的连接端口,填写“9200”。 |
username |
访问集群的用户名。 |
password |
用户名对应的密码。 |
该程序为判断集群是否存在test索引,当返回“true”或“false”时,表示正常返回查询结果,集群连接成功。
Elasticsearch Python连接HTTPS协议的安全集群
使用Elasticsearch 7.10.2 Python客户端连接安全模式+HTTPS协议的OpenSearch集群,并查询索引“test”是否存在。
使用集群连接信息创建客户端实例的代码示例如下:
from elasticsearch import Elasticsearch import ssl class ElasticFactory(object): def __init__(self, host: list, port: str, username: str, password: str): self.port = port self.host = host self.username = username self.password = password def create(self) -> Elasticsearch: context = ssl._create_unverified_context() addrs = [] for host in self.host: addr = {'host': host, 'port': self.port} addrs.append(addr) if self.username and self.password: elasticsearch = Elasticsearch(addrs, http_auth=(self.username, self.password), scheme="https", ssl_context=context) else: elasticsearch = Elasticsearch(addrs) return elasticsearch es = ElasticFactory(["xxx.xxx.xxx.xxx"], "9200", "username", "password").create() print(es.indices.exists(index='test'))
参数 |
描述 |
---|---|
host |
集群的访问地址,当存在多个IP地址时,中间用“,”隔开。 |
port |
集群的连接端口,填写“9200”。 |
username |
访问集群的用户名。 |
password |
用户名对应的密码。 |
该程序为判断集群是否存在test索引,当返回“true”或“false”时,表示正常返回查询结果,集群连接成功。