更新时间:2024-10-25 GMT+08:00

向量检索的客户端代码示例(Python)

OpenSearch提供了标准的REST接口,以及Java、Python等语言编写的客户端。

本节提供一份创建向量索引、导入向量数据和查询向量数据的Python代码示例,介绍如何使用客户端实现向量检索。

前提条件

客户端已经安装python依赖包。如果未安装可以执行如下命令安装:

pip install opensearch-py==1.1.0

代码示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from opensearchpy import OpenSearch

# 创建客户端
def get_client(hosts: list, user: str = None, password: str = None):
    if user and password:
        return OpenSearch(hosts, http_auth=(user, password), verify_certs=False, ssl_show_warn=False)
    else:
        return OpenSearch(hosts)

# 创建索引表
def create(client: OpenSearch, index: str):
    # 索引mapping信息
    index_mapping = {
        "settings": {
            "index": {
                "vector": "true",  # 开启向量特性
                "number_of_shards": 1,  # 索引分片数,根据实际需求设置
                "number_of_replicas": 0,  # 索引副本数,根据实际需求设置
            }
        },
        "mappings": {
            "properties": {
                "my_vector": {
                    "type": "vector",
                    "dimension": 2,
                    "indexing": True,
                    "algorithm": "GRAPH",
                    "metric": "euclidean"
                }
                # 可根据需求添加其他字段
            }
        }
    }
    res = client.indices.create(index=index, body=index_mapping)
    print("create index result: ", res)

# 写入数据
def write(client: OpenSearch, index: str, vecs: list, bulk_size=500):
    for i in range(0, len(vecs), bulk_size):
        actions = ""
        for vec in vecs[i: i + bulk_size]:
            actions += '{"index": {"_index": "%s"}}\n' % index
            actions += '{"my_vector": %s}\n' % str(vec)
        client.bulk(body=actions, request_timeout=3600)
    client.indices.refresh(index=index, request_timeout=3600)
    print("write index success!")

# 查询向量索引
def search(client: OpenSearch, index: str, query: list[float], size: int):
    # 查询语句,可根据需求选择合适的查询方式
    query_body = {
        "size": size,
        "query": {
            "vector": {
                "my_vector": {
                    "vector": query,
                    "topk": size
                }
            }
        }
    }
    res = client.search(index=index, body=query_body)
    print("search index result: ", res)

# 删除索引
def delete(client: OpenSearch, index: str):
    res = client.indices.delete(index=index)
    print("delete index result: ", res)

if __name__ == '__main__':
    os_client = get_client(hosts=['http://x.x.x.x:9200'])

    # 对于开启了https的安全集群,使用:
    # os_client = get_client(hosts=['https://x.x.x.x:9200', 'https://x.x.x.x:9200'], user='xxxxx', password='xxxxx')

    # 对于未开启https的安全集群,使用:
    # os_client = get_client(hosts=['http://x.x.x.x:9200', 'http://x.x.x.x:9200'], user='xxxxx', password='xxxxx')

    # 测试索引名称
    index_name = "my_index"

    # 创建索引
    create(os_client, index=index_name)

    # 写入数据
    data = [[1.0, 1.0], [2.0, 2.0], [3.0, 3.0]]
    write(os_client, index=index_name, vecs=data)

    # 查询索引
    query_vector = [1.0, 1.0]
    search(os_client, index=index_name, query=query_vector, size=3)

    # 删除索引
    delete(os_client, index=index_name)