Elasticsearch向量检索的性能测试和比较
应用场景
云搜索服务的向量检索引擎提供了全托管、高性能的分布式向量数据库服务。为了方便用户在业务场景进行向量搜索的性能压力测试,为产品选择和资源配置提供准确的参考依据,本文提供了基于开源数据集和开源压力测试工具的Elasticsearch向量检索的性能测试方案。
测试前准备
- 创建Elasticsearch向量数据库,参考创建Elasticsearch集群。
“节点数量”选择“3”,“节点规格”选择“通用计算型”的“4vCPUs | 16GB”(由于测试的数据量不大,且为了和第三方的基线测试保持相同的CPU规格),“节点存储”选择“超高I/O”,不启用安全模式。
- 获取测试数据集。
- sift-128-euclidean:维度128,base数据100万条,使用欧式距离度量。
- cohere-768-cosine:维度768,base数据100万条,使用余弦距离度量。
- gist-960-euclidean:维度960,base数据100万条,使用欧式距离度量。
“sift-128-euclidean”和“gist-960-euclidean”数据的下载地址是https://github.com/erikbern/ann-benchmarks。如需使用“cohere-768-cosine”数据,请提交工单获取。
图1 下载“sift-128-euclidean”和“gist-960-euclidean”数据
- 准备测试工具。
- 准备数据写入和召回率测试脚本,参考脚本base_test_example.py。
- 下载性能测试使用的开源压测工具Wrk,获取地址https://github.com/wg/wrk/tree/master。
性能测试的操作步骤
- 创建一个弹性云服务器ECS,用于安装压测工具和执行测试脚本。操作指导请参见快速购买和使用Linux ECS。
- ECS必须和Elasticsearch集群在同一个虚拟私有云和安全组中。
- 也可以使用其他客户端服务器,但是必须保证服务器和Elasticsearch集群在同一VPC。
- 将测试数据集上传到ECS上。
- 将数据写入和召回率测试脚本上传到ECS上,并执行如下命令。
pip install h5py pip install elasticsearch==7.6 python3 base_test_example.py
执行完成后,会创建测试的向量索引,写入测试数据,并返回平均查询召回率Recall。
- 在ECS上安装开源压测工具Wrk。
- 在ECS上准备压测的查询请求文件,用于模拟真实业务场景。参考脚本prepare_query.py。
pip install h5py python3 prepare_query.py
- 在ECS上准备Wrk的压测配置脚本。参考脚本perf.lua,脚本中的查询请求文件名称、集群访问地址和索引名称需要根据实际环境修改。
- 在ECS执行如下命令进行向量检索的性能压测。
wrk -c60 -t60 -d10m -s perf.lua http://x.x.x.x:9200
- “t”表示压测线程数。
- “c”表示与服务端的连接数。
- “d”表示压测时间,“10m”表示10分钟 。
- “s”表示Wrk的压测配置脚本。
- “x.x.x.x”表示Elasticsearch集群的访问地址。
在回显中获得测试数据,其中“Requests/sec”即查询吞吐量QPS。
图2 测试结果示例
性能测试比较
以下测试方案只列出了对比项的参数值,未列举的参数建议保持默认值。
- GRAPH类索引
百万规模的场景推荐使用GRAPH索引类型。
- 测试方案一:使用不同维度的数据集,在Top10召回率均达到99%的情况下,测试向量数据库能支撑的最大QPS。每个数据集均基于默认参数和调优参数分别进行测试,通过调整构建参数可以使得图索引结构更优,在同等召回精度下能取得更高的查询性能。
测试结果:
表1 GRAPH类索引测试结果1 数据集
构建参数
查询参数
性能指标
efc
shrink
ef
max_scan_num
QPS
Recall
sift-128-euclidean
200
1.0
84
10000
15562
0.99
500
0.8
50
10000
17332
0.99
cohere-768-cosine
200
1.0
154
10000
3232
0.99
500
0.95
106
10000
3821
0.99
gist-960-euclidean
200
1.0
800
19000
860
0.99
500
0.9
400
15000
1236
0.99
结论:对于不同的数据集,使用默认参数均能达到99%以上的召回率。在进一步调整构建参数和查询参数后,增加了一定的索引构建开销,同时也达到更高的查询性能。
- 测试方案二:使用同一数据集,通过调整索引参数,测试不同召回率下的查询性能。本方案用COHERE数据集,分别测试了Top10召回率为99%、98%及95%时的集群最大QPS。
测试结果:
表2 GRAPH类索引测试结果1 数据集
构建参数
查询参数
性能指标
efc
ef
QPS
Recall
cohere-768-cosine
500
128
3687
0.99
500
80
5320
0.98
500
36
9028
0.95
结论:同一集群在统一索引构建参数的情况下,通过调整ef参数可以获得不同的查询精度,在略微损失召回率的场景下可以获得成倍的性能提升。
- 测试方案一:使用不同维度的数据集,在Top10召回率均达到99%的情况下,测试向量数据库能支撑的最大QPS。每个数据集均基于默认参数和调优参数分别进行测试,通过调整构建参数可以使得图索引结构更优,在同等召回精度下能取得更高的查询性能。
- GRAPH_PQ类索引
基于图算法的索引为了保证查询性能通常需要常驻内存,因此当向量维度较高或数据量较大时,内存资源成为影响成本及性能的关键因素。具体来说,高维度的向量和大规模的数据集对内存的需求显著增加,这不仅关系到存储成本,还直接影响到索引算法的运行效率和响应速度。该场景推荐使用GRAPH_PQ索引类型。
测试方案:使用维度较高的COHERE与GIST数据集,测试在Top10召回率达到95%时的集群最大QPS,并与GRAPH索引对比常驻内存开销。
测试结果:表3 GRAPH_PQ类索引测试结果 数据集
构建参数
查询参数
性能指标
内存开销
efc
fragment_num
ef
topk
QPS
Recall
GRAPH_PQ
GRAPH
cohere-768-cosine
200
64
85
130
8723
0.95
332MB
3.3GB
gist-960-euclidean
200
120
200
360
4267
0.95
387MB
4.0GB
结论:结果显示使用GRAPH_PQ类索引能够在节约10倍+内存开销的情况下,取得与GRAPH索引差不多的精度和性能。因此,CSS向量索引的GRAPH_PQ算法融合了图索引与量化算法,能够大幅降低内存的开销,提升单机的数据容量。
测试数据中涉及的索引参数说明请参见表4,关于构建参数的详细说明请参见在Elasticsearch集群创建向量索引,关于查询参数的详细说明请参见在Elasticsearch集群使用向量索引搜索数据。
脚本“base_test_example.py”
# -*- coding: UTF-8 -*-
import json
import time
import h5py
from elasticsearch import Elasticsearch
from elasticsearch import helpers
def get_client(hosts: list, user: str = None, password: str = None):
if user and password:
return Elasticsearch(hosts, http_auth=(user, password), verify_certs=False, ssl_show_warn=False)
else:
return Elasticsearch(hosts)
# 索引参数说明请参见在Elasticsearch集群创建向量索引。
def create(es_client, index_name, shards, replicas, dim, algorithm="GRAPH",
metric="euclidean", neighbors=64, efc=200, shrink=1.0):
index_mapping = {
"settings": {
"index": {
"vector": True
},
"number_of_shards": shards,
"number_of_replicas": replicas,
},
"mappings": {
"properties": {
"id": {
"type": "integer"
},
"vec": {
"type": "vector",
"indexing": True,
"dimension": dim,
"algorithm": algorithm,
"metric": metric,
"neighbors": neighbors,
"efc": efc,
"shrink": shrink,
}
}
}
}
es_client.indices.create(index=index_name, body=index_mapping)
print(f"Create index success! Index name: {index_name}")
def write(es_client, index_name, vectors, bulk_size=1000):
print("Start write! Index name: " + index_name)
start = time.time()
for i in range(0, len(vectors), bulk_size):
actions = [{
"_index": index_name,
"id": i + j,
"vec": v.tolist()
} for j, v in enumerate(vectors[i: i + bulk_size])]
helpers.bulk(es_client, actions, request_timeout=180)
print(f"Write success! Docs count: {len(vectors)}, total cost: {time.time() - start:.2f} seconds")
merge(es_client, index_name)
def merge(es_client, index_name, seg_cnt=1):
print(f"Start merge! Index name: {index_name}")
start = time.time()
es_client.indices.forcemerge(index=index_name, max_num_segments=seg_cnt, request_timeout=7200)
print(f"Merge success! Total cost: {time.time() - start:.2f} seconds")
# 查询参数说明请参考见在Elasticsearch集群使用向量索引搜索数据。
def query(es_client, index_name, queries, gts, size=10, k=10, ef=200, msn=10000):
print("Start query! Index name: " + index_name)
i = 0
precision = []
for vec in queries:
hits = set()
dsl = {
"size": size,
"stored_fields": ["_none_"],
"docvalue_fields": ["id"],
"query": {
"vector": {
"vec": {
"vector": vec.tolist(),
"topk": k,
"ef": ef,
"max_scan_num": msn
}
}
}
}
res = es_client.search(index=index_name, body=json.dumps(dsl))
for hit in res['hits']['hits']:
hits.add(int(hit['fields']['id'][0]))
precision.append(len(hits.intersection(set(gts[i, :size]))) / size)
i += 1
print(f"Query complete! Average precision: {sum(precision) / len(precision)}")
def load_test_data(src):
hdf5_file = h5py.File(src, "r")
base_vectors = hdf5_file["train"]
query_vectors = hdf5_file["test"]
ground_truths = hdf5_file["neighbors"]
return base_vectors, query_vectors, ground_truths
def test_sift(es_client):
index_name = "index_sift_graph"
vectors, queries, gts = load_test_data(r"sift-128-euclidean.hdf5")
# 根据实际测试需求调整分片和副本数、索引算法、索引参数等。本文性能测试均配置的是1个分片、2个副本。
create(es_client, index_name, shards=1, replicas=2, dim=128)
write(es_client, index_name, vectors)
query(es_client, index_name, queries, gts)
if __name__ == "__main__":
# 此处修改为CSS集群的实际访问地址。
client = get_client(['http://x.x.x.x:9200'])
test_sift(client)
脚本“prepare_query.py”
import base64
import json
import struct
import h5py
def prepare_query(src, dst, size=10, k=10, ef=200, msn=10000, metric="euclidean", rescore=False, use_base64=True):
"""
使用该函数从hdf5格式的源数据文件读取出query向量,并生成完整的query请求体,用于性能压测。
:param src: hdf5格式的源数据文件路径。
:param dst: 目标文件路径。
:param size: 指定查询返回结果数。
:param k: 指定Segment级别索引查询topk条相似结果。
:param ef: 索引查询参数,用于指定查询过程使用的队列大小。
:param msn: 索引查询参数,用于指定max_scan_num。
:param metric: 使用何种度量进行rescore精排,euclidean、cosine、inner_product等。
:param rescore: 是否使用rescore精排,对于GRAPH_PQ索引可以开启精排。
:param use_base64: 是否使用base64编码的向量数据。
"""
hdf5_file = h5py.File(src, "r")
query_vectors = hdf5_file["test"]
with open(dst, "w", encoding="utf8") as fw:
for vec in query_vectors:
query_template = {
"size": size,
"stored_fields": ["_none_"],
"docvalue_fields": ["id"],
"query": {
"vector": {
"vec": {
"vector": vec.tolist() if not use_base64 else floats2base64(vec),
"topk": k,
"ef": ef,
"max_scan_num": msn,
}
}
}
}
if rescore:
query_template["query"]["rescore"] = {
"window_size": k,
"vector_rescore": {
"field": "vec",
"vector": vec.tolist() if not use_base64 else floats2base64(vec),
"metric": metric
}
}
fw.write(json.dumps(query_template))
fw.write("\n")
def floats2base64(vector):
data = struct.pack('<{}f'.format(len(vector)), *vector)
return base64.b64encode(data).decode()
if __name__ == "__main__":
# 修改为数据文件实际地址。
prepare_query(r"/path/to/sift-128-euclidean.hdf5", r"requests.txt")
脚本“perf.lua”
local random = math.random
local reqs = {}
local cnt = 0
-- 压测的查询请求文件名称根据需要调整。
for line in io.lines("requests.txt") do
table.insert(reqs, line)
cnt = cnt + 1
end
local addrs = {}
local counter = 0
function setup(thread)
local append = function(host, port)
for i, addr in ipairs(wrk.lookup(host, port)) do
if wrk.connect(addr) then
addrs[#addrs+1] = addr
end
end
end
if #addrs == 0 then
-- 根据集群的实际地址进行修改。
append("x.x.x.x", 9200)
append("x.x.x.x", 9200)
append("x.x.x.x", 9200)
end
local index = counter % #addrs + 1
counter = counter + 1
thread.addr = addrs[index]
end
-- 索引名称根据需要调整。
wrk.path = "/index_sift_graph/_search?request_cache=false&preference=_local"
wrk.method = "GET"
wrk.headers["Content-Type"] = "application/json"
function request()
return wrk.format(wrk.method, wrk.path, wrk.headers, reqs[random(cnt)])
end