文档首页/ 云搜索服务 CSS/ 最佳实践/ Elasticsearch向量检索的性能测试和比较
更新时间:2024-12-19 GMT+08:00
分享

Elasticsearch向量检索的性能测试和比较

应用场景

云搜索服务的向量检索引擎提供了全托管、高性能的分布式向量数据库服务。为了方便用户在业务场景进行向量搜索的性能压力测试,为产品选择和资源配置提供准确的参考依据,本文提供了基于开源数据集和开源压力测试工具的Elasticsearch向量检索的性能测试方案。

配置测试环境

  • 创建Elasticsearch向量数据库。
    1. 参考创建Elasticsearch集群,创建Elasticsearch集群。

      “节点数量”选择“3”“节点规格”选择“通用计算型”“4vCPUs | 16GB”(由于测试的数据量不大,且为了和第三方的基线测试保持相同的CPU规格)。

    2. 创建集群索引,分片设置为1,副本设置为2。
  • 获取测试数据集。
    • sift-128-euclidean:维度128,base数据100万条,使用欧式距离度量。
    • cohere-768-cosine:维度768,base数据100万条,使用余弦距离度量。
    • gist-960-euclidean:维度960,base数据100万条,使用欧式距离度量。

    “sift-128-euclidean”“gist-960-euclidean”数据的下载地址是https://github.com/erikbern/ann-benchmarks。如需使用“cohere-768-cosine”数据,请提交工单获取。

    图1 下载“sift-128-euclidean”“gist-960-euclidean”数据
  • 准备测试工具。

性能测试的操作步骤

  1. 创建一个弹性云服务器ECS,用于安装压测工具和执行测试脚本。操作指导请参见快速购买和使用Linux ECS
    • ECS必须和Elasticsearch集群在同一个虚拟私有云和安全组中。
    • 也可以使用其他客户端服务器,但是必须保证服务器和Elasticsearch集群在同一VPC。
  2. 将数据写入和召回率测试脚本上传到ECS上。
  3. 在ECS上安装开源压测工具Wrk。
  4. 在ECS上准备压测的查询请求文件,用于模拟真实业务场景。参考脚本prepare_query.py
  5. 在ECS上准备Wrk的压测配置脚本。参考脚本perf.lua
  6. 在ECS执行如下命令进行向量检索的性能压测。
    wrk -c60 -t60 -d10m -s perf.lua http://x.x.x.x:9200 
    • “t”表示压测线程数。
    • “c”表示与服务端的连接数。
    • “d”表示压测时间,“10m”表示10分钟 。
    • “s”表示Wrk的压测配置脚本。
    • “x.x.x.x”表示Elasticsearch集群的访问地址。

    在回显中获得测试数据。

性能测试比较

  • GRAPH类索引

    百万规模的场景推荐使用GRAPH索引类型。

    • 测试方案一:使用不同维度的数据集,在Top10召回率均达到99%的情况下,测试向量数据库能支撑的最大QPS。每个数据集均基于默认参数和调优参数分别进行测试,通过调整构建参数可以使得图索引结构更优,在同等召回精度下能取得更高的查询性能。
      测试结果
      表1 GRAPH类索引测试结果1

      数据集

      构建参数

      查询参数

      性能指标

      efc

      shrink

      ef

      max_scan_num

      QPS

      Recall

      sift-128-euclidean

      200

      1.0

      84

      10000

      15562

      0.99

      500

      0.8

      50

      10000

      17332

      0.99

      cohere-768-cosine

      200

      1.0

      154

      10000

      3232

      0.99

      500

      0.95

      106

      10000

      3821

      0.99

      gist-960-euclidean

      200

      1.0

      800

      19000

      860

      0.99

      500

      0.9

      400

      15000

      1236

      0.99

      结论:对于不同的数据集,使用默认参数均能达到99%以上的召回率。在进一步调整构建参数和查询参数后,增加了一定的索引构建开销,同时也达到更高的查询性能。

    • 测试方案二:使用同一数据集,通过调整索引参数,测试不同召回率下的查询性能。本方案用COHERE数据集,分别测试了Top10召回率为99%、98%及95%时的集群最大QPS。
      测试结果:
      表2 GRAPH类索引测试结果1

      数据集

      构建参数

      查询参数

      性能指标

      efc

      ef

      QPS

      Recall

      cohere-768-cosine

      500

      128

      3687

      0.99

      500

      80

      5320

      0.98

      500

      36

      9028

      0.95

      结论:同一集群在统一索引构建参数的情况下,通过调整ef参数可以获得不同的查询精度,在略微损失召回率的场景下可以获得成倍的性能提升。

  • GRAPH_PQ类索引

    基于图算法的索引为了保证查询性能通常需要常驻内存,因此当向量维度较高或数据量较大时,内存资源成为影响成本及性能的关键因素。具体来说,高维度的向量和大规模的数据集对内存的需求显著增加,这不仅关系到存储成本,还直接影响到索引算法的运行效率和响应速度。该场景推荐使用GRAPH_PQ索引类型。

    测试方案:使用维度较高的COHERE与GIST数据集,测试在Top10召回率达到95%时的集群最大QPS,并与GRAPH索引对比常驻内存开销

    测试结果:
    表3 GRAPH_PQ类索引测试结果

    数据集

    构建参数

    查询参数

    性能指标

    内存开销

    efc

    fragment_num

    ef

    topk

    QPS

    Recall

    GRAPH_PQ

    GRAPH

    cohere-768-cosine

    200

    64

    85

    130

    8723

    0.95

    332MB

    3.3GB

    gist-960-euclidean

    200

    120

    200

    360

    4267

    0.95

    387MB

    4.0GB

    结论:结果显示使用GRAPH_PQ类索引能够在节约10倍+内存开销的情况下,取得与GRAPH索引差不多的精度和性能。因此,CSS向量索引的GRAPH_PQ算法融合了图索引与量化算法,能够大幅降低内存的开销,提升单机的数据容量。

测试数据中涉及的索引参数说明请参见表4,关于构建参数的详细说明请参见在Elasticsearch集群创建向量索引,关于查询参数的详细说明请参见在Elasticsearch集群使用向量索引搜索数据

表4 索引参数说明

类型

参数名称

说明

构建参数

efc

构建hnsw时考察邻居节点的队列大小,默认值为200,值越大精度越高,构建速度将会变慢。

shrink

构建hnsw时的裁边系数,默认值为1.0f。

fragment_num

段数,默认值为0,插件自动根据向量长度设置合适的段数。

查询参数

ef

查询时考察邻居节点的队列大小。值越大查询精度越高,查询速度会变慢。默认值为200。

max_scan_num

扫描节点上限。值越大精度越高,查询速度变慢。默认值为10000。

topk

查询时返回top k条数据。

脚本“base_test_example.py”

# -*- coding: UTF-8 -*-
import json
import time

import h5py
from elasticsearch import Elasticsearch
from elasticsearch import helpers

def get_client(hosts: list, user: str = None, password: str = None):
    if user and password:
        return Elasticsearch(hosts, http_auth=(user, password), verify_certs=False, ssl_show_warn=False)
    else:
        return Elasticsearch(hosts)

# 索引参数说明请参见在Elasticsearch集群创建向量索引。
def create(es_client, index_name, shards, replicas, dim, algorithm="GRAPH",
           metric="euclidean", neighbors=64, efc=200, shrink=1.0):
    index_mapping = {
        "settings": {
            "index": {
                "vector": True
            },
            "number_of_shards": shards,
            "number_of_replicas": replicas,
        },
        "mappings": {
            "properties": {
                "id": {
                    "type": "integer"
                },
                "vec": {
                    "type": "vector",
                    "indexing": True,
                    "dimension": dim,
                    "algorithm": algorithm,
                    "metric": metric,
                    "neighbors": neighbors,
                    "efc": efc,
                    "shrink": shrink,
                }
            }
        }
    }
    es_client.indices.create(index=index_name, body=index_mapping)
    print(f"Create index success! Index name: {index_name}")

def write(es_client, index_name, vectors, bulk_size=1000):
    print("Start write! Index name: " + index_name)
    start = time.time()
    for i in range(0, len(vectors), bulk_size):
        actions = [{
            "_index": index_name,
            "id": i + j,
            "vec": v.tolist()
        } for j, v in enumerate(vectors[i: i + bulk_size])]
        helpers.bulk(es_client, actions, request_timeout=180)
    print(f"Write success! Docs count: {len(vectors)}, total cost: {time.time() - start:.2f} seconds")
    merge(es_client, index_name)

def merge(es_client, index_name, seg_cnt=1):
    print(f"Start merge! Index name: {index_name}")
    start = time.time()
    es_client.indices.forcemerge(index=index_name, max_num_segments=seg_cnt, request_timeout=7200)
    print(f"Merge success! Total cost: {time.time() - start:.2f} seconds")

# 查询参数说明请参考见在Elasticsearch集群使用向量索引搜索数据。
def query(es_client, index_name, queries, gts, size=10, k=10, ef=200, msn=10000):
    print("Start query! Index name: " + index_name)
    i = 0
    precision = []
    for vec in queries:
        hits = set()
        dsl = {
            "size": size,
            "stored_fields": ["_none_"],
            "docvalue_fields": ["id"],
            "query": {
                "vector": {
                    "vec": {
                        "vector": vec.tolist(),
                        "topk": k,
                        "ef": ef,
                        "max_scan_num": msn
                    }
                }
            }
        }
        res = es_client.search(index=index_name, body=json.dumps(dsl))
        for hit in res['hits']['hits']:
            hits.add(int(hit['fields']['id'][0]))
        precision.append(len(hits.intersection(set(gts[i, :size]))) / size)
        i += 1
    print(f"Query complete! Average precision: {sum(precision) / len(precision)}")

def load_test_data(src):
    hdf5_file = h5py.File(src, "r")
    base_vectors = hdf5_file["train"]
    query_vectors = hdf5_file["test"]
    ground_truths = hdf5_file["neighbors"]
    return base_vectors, query_vectors, ground_truths

def test_sift(es_client):
    index_name = "index_sift_graph"
    vectors, queries, gts = load_test_data(r"sift-128-euclidean.hdf5")
    # 根据实际测试需求调整分片和副本数、索引算法、索引参数等。
    create(es_client, index_name, shards=1, replicas=2, dim=128)
    write(es_client, index_name, vectors)
    query(es_client, index_name, queries, gts)

if __name__ == "__main__":
    # 此处修改为CSS集群的实际访问地址。
    client = get_client(['http://x.x.x.x:9200'])
    test_sift(client)

脚本“prepare_query.py”

import base64
import json
import struct

import h5py

def prepare_query(src, dst, size=10, k=10, ef=200, msn=10000, metric="euclidean", rescore=False, use_base64=True):
    """
    使用该函数从hdf5格式的源数据文件读取出query向量,并生成完整的query请求体,用于性能压测。
    :param src: hdf5格式的源数据文件路径。
    :param dst: 目标文件路径。
    :param size: 指定查询返回结果数。
    :param k: 指定Segment级别索引查询topk条相似结果。
    :param ef: 索引查询参数,用于指定查询过程使用的队列大小。
    :param msn: 索引查询参数,用于指定max_scan_num。
    :param metric: 使用何种度量进行rescore精排,euclidean、cosine、inner_product等。
    :param rescore: 是否使用rescore精排,对于GRAPH_PQ索引可以开启精排。
    :param use_base64: 是否使用base64编码的向量数据。
    """
    hdf5_file = h5py.File(src, "r")
    query_vectors = hdf5_file["test"]
    with open(dst, "w", encoding="utf8") as fw:
        for vec in query_vectors:
            query_template = {
                "size": size,
                "stored_fields": ["_none_"],
                "docvalue_fields": ["id"],
                "query": {
                    "vector": {
                        "vec": {
                            "vector": vec.tolist() if not use_base64 else floats2base64(vec),
                            "topk": k,
                            "ef": ef,
                            "max_scan_num": msn,
                        }
                    }
                }
            }
            if rescore:
                query_template["query"]["rescore"] = {
                    "window_size": k,
                    "vector_rescore": {
                        "field": "vec",
                        "vector": vec.tolist() if not use_base64 else floats2base64(vec),
                        "metric": metric
                    }
                }
            fw.write(json.dumps(query_template))
            fw.write("\n")

def floats2base64(vector):
    data = struct.pack('<{}f'.format(len(vector)), *vector)
    return base64.b64encode(data).decode()

if __name__ == "__main__":
    # 修改为数据文件实际地址。
    prepare_query(r"/path/to/sift-128-euclidean.hdf5", r"requests.txt")

脚本“perf.lua”

local random = math.random

local reqs = {}
local cnt = 0

-- 压测数据文件名称根据需要调整。
for line in io.lines("sift_requests.txt") do
    table.insert(reqs, line)
    cnt = cnt + 1
end


local addrs = {}
local counter = 0
function setup(thread)
   local append = function(host, port)
      for i, addr in ipairs(wrk.lookup(host, port)) do
         if wrk.connect(addr) then
            addrs[#addrs+1] = addr
         end
      end
   end

   if #addrs == 0 then
      -- 根据集群的实际地址进行修改。
      append("192.168.0.84", 9200)
      append("192.168.0.32", 9200)
      append("192.168.0.49", 9200)
   end

   local index = counter % #addrs + 1
   counter = counter + 1
   thread.addr = addrs[index]
end

-- 索引名称根据需要调整。
wrk.path = "/index_sift_graph/_search?request_cache=false&preference=_local"
wrk.method = "GET"
wrk.headers["Content-Type"] = "application/json"


function request()
    return wrk.format(wrk.method, wrk.path, wrk.headers, reqs[random(cnt)])
end

相关文档