Elasticsearch向量检索的性能测试和比较
应用场景
云搜索服务的向量检索引擎提供了全托管、高性能的分布式向量数据库服务。为了方便用户在业务场景进行向量搜索的性能压力测试,为产品选择和资源配置提供准确的参考依据,本文提供了基于开源数据集和开源压力测试工具的Elasticsearch向量检索的性能测试方案。
配置测试环境
- 创建Elasticsearch向量数据库。
- 参考创建Elasticsearch集群,创建Elasticsearch集群。
“节点数量”选择“3”,“节点规格”选择“通用计算型”的“4vCPUs | 16GB”(由于测试的数据量不大,且为了和第三方的基线测试保持相同的CPU规格)。
- 创建集群索引,分片设置为1,副本设置为2。
- 参考创建Elasticsearch集群,创建Elasticsearch集群。
- 获取测试数据集。
- sift-128-euclidean:维度128,base数据100万条,使用欧式距离度量。
- cohere-768-cosine:维度768,base数据100万条,使用余弦距离度量。
- gist-960-euclidean:维度960,base数据100万条,使用欧式距离度量。
“sift-128-euclidean”和“gist-960-euclidean”数据的下载地址是https://github.com/erikbern/ann-benchmarks。如需使用“cohere-768-cosine”数据,请提交工单获取。
图1 下载“sift-128-euclidean”和“gist-960-euclidean”数据
- 准备测试工具。
- 准备数据写入和召回率测试脚本,参考脚本base_test_example.py。
- 下载性能测试使用的开源压测工具Wrk,获取地址https://github.com/wg/wrk/tree/master。
性能测试的操作步骤
- 创建一个弹性云服务器ECS,用于安装压测工具和执行测试脚本。操作指导请参见快速购买和使用Linux ECS。
- ECS必须和Elasticsearch集群在同一个虚拟私有云和安全组中。
- 也可以使用其他客户端服务器,但是必须保证服务器和Elasticsearch集群在同一VPC。
- 将数据写入和召回率测试脚本上传到ECS上。
- 在ECS上安装开源压测工具Wrk。
- 在ECS上准备压测的查询请求文件,用于模拟真实业务场景。参考脚本prepare_query.py。
- 在ECS上准备Wrk的压测配置脚本。参考脚本perf.lua。
- 在ECS执行如下命令进行向量检索的性能压测。
wrk -c60 -t60 -d10m -s perf.lua http://x.x.x.x:9200
- “t”表示压测线程数。
- “c”表示与服务端的连接数。
- “d”表示压测时间,“10m”表示10分钟 。
- “s”表示Wrk的压测配置脚本。
- “x.x.x.x”表示Elasticsearch集群的访问地址。
在回显中获得测试数据。
性能测试比较
- GRAPH类索引
百万规模的场景推荐使用GRAPH索引类型。
- 测试方案一:使用不同维度的数据集,在Top10召回率均达到99%的情况下,测试向量数据库能支撑的最大QPS。每个数据集均基于默认参数和调优参数分别进行测试,通过调整构建参数可以使得图索引结构更优,在同等召回精度下能取得更高的查询性能。
测试结果:
表1 GRAPH类索引测试结果1 数据集
构建参数
查询参数
性能指标
efc
shrink
ef
max_scan_num
QPS
Recall
sift-128-euclidean
200
1.0
84
10000
15562
0.99
500
0.8
50
10000
17332
0.99
cohere-768-cosine
200
1.0
154
10000
3232
0.99
500
0.95
106
10000
3821
0.99
gist-960-euclidean
200
1.0
800
19000
860
0.99
500
0.9
400
15000
1236
0.99
结论:对于不同的数据集,使用默认参数均能达到99%以上的召回率。在进一步调整构建参数和查询参数后,增加了一定的索引构建开销,同时也达到更高的查询性能。
- 测试方案二:使用同一数据集,通过调整索引参数,测试不同召回率下的查询性能。本方案用COHERE数据集,分别测试了Top10召回率为99%、98%及95%时的集群最大QPS。
测试结果:
表2 GRAPH类索引测试结果1 数据集
构建参数
查询参数
性能指标
efc
ef
QPS
Recall
cohere-768-cosine
500
128
3687
0.99
500
80
5320
0.98
500
36
9028
0.95
结论:同一集群在统一索引构建参数的情况下,通过调整ef参数可以获得不同的查询精度,在略微损失召回率的场景下可以获得成倍的性能提升。
- 测试方案一:使用不同维度的数据集,在Top10召回率均达到99%的情况下,测试向量数据库能支撑的最大QPS。每个数据集均基于默认参数和调优参数分别进行测试,通过调整构建参数可以使得图索引结构更优,在同等召回精度下能取得更高的查询性能。
- GRAPH_PQ类索引
基于图算法的索引为了保证查询性能通常需要常驻内存,因此当向量维度较高或数据量较大时,内存资源成为影响成本及性能的关键因素。具体来说,高维度的向量和大规模的数据集对内存的需求显著增加,这不仅关系到存储成本,还直接影响到索引算法的运行效率和响应速度。该场景推荐使用GRAPH_PQ索引类型。
测试方案:使用维度较高的COHERE与GIST数据集,测试在Top10召回率达到95%时的集群最大QPS,并与GRAPH索引对比常驻内存开销。
测试结果:表3 GRAPH_PQ类索引测试结果 数据集
构建参数
查询参数
性能指标
内存开销
efc
fragment_num
ef
topk
QPS
Recall
GRAPH_PQ
GRAPH
cohere-768-cosine
200
64
85
130
8723
0.95
332MB
3.3GB
gist-960-euclidean
200
120
200
360
4267
0.95
387MB
4.0GB
结论:结果显示使用GRAPH_PQ类索引能够在节约10倍+内存开销的情况下,取得与GRAPH索引差不多的精度和性能。因此,CSS向量索引的GRAPH_PQ算法融合了图索引与量化算法,能够大幅降低内存的开销,提升单机的数据容量。
测试数据中涉及的索引参数说明请参见表4,关于构建参数的详细说明请参见在Elasticsearch集群创建向量索引,关于查询参数的详细说明请参见在Elasticsearch集群使用向量索引搜索数据。
脚本“base_test_example.py”
# -*- coding: UTF-8 -*- import json import time import h5py from elasticsearch import Elasticsearch from elasticsearch import helpers def get_client(hosts: list, user: str = None, password: str = None): if user and password: return Elasticsearch(hosts, http_auth=(user, password), verify_certs=False, ssl_show_warn=False) else: return Elasticsearch(hosts) # 索引参数说明请参见在Elasticsearch集群创建向量索引。 def create(es_client, index_name, shards, replicas, dim, algorithm="GRAPH", metric="euclidean", neighbors=64, efc=200, shrink=1.0): index_mapping = { "settings": { "index": { "vector": True }, "number_of_shards": shards, "number_of_replicas": replicas, }, "mappings": { "properties": { "id": { "type": "integer" }, "vec": { "type": "vector", "indexing": True, "dimension": dim, "algorithm": algorithm, "metric": metric, "neighbors": neighbors, "efc": efc, "shrink": shrink, } } } } es_client.indices.create(index=index_name, body=index_mapping) print(f"Create index success! Index name: {index_name}") def write(es_client, index_name, vectors, bulk_size=1000): print("Start write! Index name: " + index_name) start = time.time() for i in range(0, len(vectors), bulk_size): actions = [{ "_index": index_name, "id": i + j, "vec": v.tolist() } for j, v in enumerate(vectors[i: i + bulk_size])] helpers.bulk(es_client, actions, request_timeout=180) print(f"Write success! Docs count: {len(vectors)}, total cost: {time.time() - start:.2f} seconds") merge(es_client, index_name) def merge(es_client, index_name, seg_cnt=1): print(f"Start merge! Index name: {index_name}") start = time.time() es_client.indices.forcemerge(index=index_name, max_num_segments=seg_cnt, request_timeout=7200) print(f"Merge success! Total cost: {time.time() - start:.2f} seconds") # 查询参数说明请参考见在Elasticsearch集群使用向量索引搜索数据。 def query(es_client, index_name, queries, gts, size=10, k=10, ef=200, msn=10000): print("Start query! Index name: " + index_name) i = 0 precision = [] for vec in queries: hits = set() dsl = { "size": size, "stored_fields": ["_none_"], "docvalue_fields": ["id"], "query": { "vector": { "vec": { "vector": vec.tolist(), "topk": k, "ef": ef, "max_scan_num": msn } } } } res = es_client.search(index=index_name, body=json.dumps(dsl)) for hit in res['hits']['hits']: hits.add(int(hit['fields']['id'][0])) precision.append(len(hits.intersection(set(gts[i, :size]))) / size) i += 1 print(f"Query complete! Average precision: {sum(precision) / len(precision)}") def load_test_data(src): hdf5_file = h5py.File(src, "r") base_vectors = hdf5_file["train"] query_vectors = hdf5_file["test"] ground_truths = hdf5_file["neighbors"] return base_vectors, query_vectors, ground_truths def test_sift(es_client): index_name = "index_sift_graph" vectors, queries, gts = load_test_data(r"sift-128-euclidean.hdf5") # 根据实际测试需求调整分片和副本数、索引算法、索引参数等。 create(es_client, index_name, shards=1, replicas=2, dim=128) write(es_client, index_name, vectors) query(es_client, index_name, queries, gts) if __name__ == "__main__": # 此处修改为CSS集群的实际访问地址。 client = get_client(['http://x.x.x.x:9200']) test_sift(client)
脚本“prepare_query.py”
import base64 import json import struct import h5py def prepare_query(src, dst, size=10, k=10, ef=200, msn=10000, metric="euclidean", rescore=False, use_base64=True): """ 使用该函数从hdf5格式的源数据文件读取出query向量,并生成完整的query请求体,用于性能压测。 :param src: hdf5格式的源数据文件路径。 :param dst: 目标文件路径。 :param size: 指定查询返回结果数。 :param k: 指定Segment级别索引查询topk条相似结果。 :param ef: 索引查询参数,用于指定查询过程使用的队列大小。 :param msn: 索引查询参数,用于指定max_scan_num。 :param metric: 使用何种度量进行rescore精排,euclidean、cosine、inner_product等。 :param rescore: 是否使用rescore精排,对于GRAPH_PQ索引可以开启精排。 :param use_base64: 是否使用base64编码的向量数据。 """ hdf5_file = h5py.File(src, "r") query_vectors = hdf5_file["test"] with open(dst, "w", encoding="utf8") as fw: for vec in query_vectors: query_template = { "size": size, "stored_fields": ["_none_"], "docvalue_fields": ["id"], "query": { "vector": { "vec": { "vector": vec.tolist() if not use_base64 else floats2base64(vec), "topk": k, "ef": ef, "max_scan_num": msn, } } } } if rescore: query_template["query"]["rescore"] = { "window_size": k, "vector_rescore": { "field": "vec", "vector": vec.tolist() if not use_base64 else floats2base64(vec), "metric": metric } } fw.write(json.dumps(query_template)) fw.write("\n") def floats2base64(vector): data = struct.pack('<{}f'.format(len(vector)), *vector) return base64.b64encode(data).decode() if __name__ == "__main__": # 修改为数据文件实际地址。 prepare_query(r"/path/to/sift-128-euclidean.hdf5", r"requests.txt")
脚本“perf.lua”
local random = math.random local reqs = {} local cnt = 0 -- 压测数据文件名称根据需要调整。 for line in io.lines("sift_requests.txt") do table.insert(reqs, line) cnt = cnt + 1 end local addrs = {} local counter = 0 function setup(thread) local append = function(host, port) for i, addr in ipairs(wrk.lookup(host, port)) do if wrk.connect(addr) then addrs[#addrs+1] = addr end end end if #addrs == 0 then -- 根据集群的实际地址进行修改。 append("192.168.0.84", 9200) append("192.168.0.32", 9200) append("192.168.0.49", 9200) end local index = counter % #addrs + 1 counter = counter + 1 thread.addr = addrs[index] end -- 索引名称根据需要调整。 wrk.path = "/index_sift_graph/_search?request_cache=false&preference=_local" wrk.method = "GET" wrk.headers["Content-Type"] = "application/json" function request() return wrk.format(wrk.method, wrk.path, wrk.headers, reqs[random(cnt)]) end