计算
弹性云服务器 ECS
Flexus云服务
裸金属服务器 BMS
弹性伸缩 AS
镜像服务 IMS
专属主机 DeH
函数工作流 FunctionGraph
云手机服务器 CPH
Huawei Cloud EulerOS
网络
虚拟私有云 VPC
弹性公网IP EIP
虚拟专用网络 VPN
弹性负载均衡 ELB
NAT网关 NAT
云专线 DC
VPC终端节点 VPCEP
云连接 CC
企业路由器 ER
企业交换机 ESW
全球加速 GA
安全与合规
安全技术与应用
Web应用防火墙 WAF
企业主机安全 HSS
云防火墙 CFW
安全云脑 SecMaster
DDoS防护 AAD
数据加密服务 DEW
数据库安全服务 DBSS
云堡垒机 CBH
数据安全中心 DSC
云证书管理服务 CCM
边缘安全 EdgeSec
CDN与智能边缘
内容分发网络 CDN
CloudPond云服务
智能边缘云 IEC
迁移
主机迁移服务 SMS
对象存储迁移服务 OMS
云数据迁移 CDM
迁移中心 MGC
大数据
MapReduce服务 MRS
数据湖探索 DLI
表格存储服务 CloudTable
云搜索服务 CSS
数据接入服务 DIS
数据仓库服务 GaussDB(DWS)
数据治理中心 DataArts Studio
数据可视化 DLV
数据湖工厂 DLF
湖仓构建 LakeFormation
企业应用
云桌面 Workspace
应用与数据集成平台 ROMA Connect
云解析服务 DNS
专属云
专属计算集群 DCC
IoT物联网
IoT物联网
设备接入 IoTDA
智能边缘平台 IEF
用户服务
账号中心
费用中心
成本中心
资源中心
企业管理
工单管理
国际站常见问题
ICP备案
我的凭证
支持计划
客户运营能力
合作伙伴支持计划
专业服务
区块链
区块链服务 BCS
Web3节点引擎服务 NES
解决方案
SAP
高性能计算 HPC
视频
视频直播 Live
视频点播 VOD
媒体处理 MPC
实时音视频 SparkRTC
数字内容生产线 MetaStudio
存储
对象存储服务 OBS
云硬盘 EVS
云备份 CBR
存储容灾服务 SDRS
高性能弹性文件服务 SFS Turbo
弹性文件服务 SFS
云硬盘备份 VBS
云服务器备份 CSBS
数据快递服务 DES
专属分布式存储服务 DSS
容器
云容器引擎 CCE
容器镜像服务 SWR
应用服务网格 ASM
华为云UCS
云容器实例 CCI
管理与监管
云监控服务 CES
统一身份认证服务 IAM
资源编排服务 RFS
云审计服务 CTS
标签管理服务 TMS
云日志服务 LTS
配置审计 Config
资源访问管理 RAM
消息通知服务 SMN
应用运维管理 AOM
应用性能管理 APM
组织 Organizations
优化顾问 OA
IAM 身份中心
云运维中心 COC
资源治理中心 RGC
应用身份管理服务 OneAccess
数据库
云数据库 RDS
文档数据库服务 DDS
数据管理服务 DAS
数据复制服务 DRS
云数据库 GeminiDB
云数据库 GaussDB
分布式数据库中间件 DDM
数据库和应用迁移 UGO
云数据库 TaurusDB
人工智能
人脸识别服务 FRS
图引擎服务 GES
图像识别 Image
内容审核 Moderation
文字识别 OCR
AI开发平台ModelArts
图像搜索 ImageSearch
对话机器人服务 CBS
华为HiLens
视频智能分析服务 VIAS
语音交互服务 SIS
应用中间件
分布式缓存服务 DCS
API网关 APIG
微服务引擎 CSE
分布式消息服务Kafka版
分布式消息服务RabbitMQ版
分布式消息服务RocketMQ版
多活高可用服务 MAS
事件网格 EG
企业协同
华为云会议 Meeting
云通信
消息&短信 MSGSMS
云生态
合作伙伴中心
云商店
开发者工具
SDK开发指南
API签名指南
Terraform
华为云命令行工具服务 KooCLI
其他
产品价格详情
系统权限
管理控制台
客户关联华为云合作伙伴须知
消息中心
公共问题
开发与运维
应用管理与运维平台 ServiceStage
软件开发生产线 CodeArts
需求管理 CodeArts Req
部署 CodeArts Deploy
性能测试 CodeArts PerfTest
编译构建 CodeArts Build
流水线 CodeArts Pipeline
制品仓库 CodeArts Artifact
测试计划 CodeArts TestPlan
代码检查 CodeArts Check
代码托管 CodeArts Repo
云应用引擎 CAE
开天aPaaS
云消息服务 KooMessage
云手机服务 KooPhone
云空间服务 KooDrive
文档首页/ 云搜索服务 CSS/ 最佳实践/ Elasticsearch向量检索的性能测试和比较

Elasticsearch向量检索的性能测试和比较

更新时间:2025-01-06 GMT+08:00

应用场景

云搜索服务的向量检索引擎提供了全托管、高性能的分布式向量数据库服务。为了方便用户在业务场景进行向量搜索的性能压力测试,为产品选择和资源配置提供准确的参考依据,本文提供了基于开源数据集和开源压力测试工具的Elasticsearch向量检索的性能测试方案。

测试前准备

  • 创建Elasticsearch向量数据库,参考创建Elasticsearch集群

    “节点数量”选择“3”“节点规格”选择“通用计算型”“4vCPUs | 16GB”(由于测试的数据量不大,且为了和第三方的基线测试保持相同的CPU规格),“节点存储”选择“超高I/O”,不启用安全模式。

  • 获取测试数据集。
    • sift-128-euclidean:维度128,base数据100万条,使用欧式距离度量。
    • cohere-768-cosine:维度768,base数据100万条,使用余弦距离度量。
    • gist-960-euclidean:维度960,base数据100万条,使用欧式距离度量。

    “sift-128-euclidean”“gist-960-euclidean”数据的下载地址是https://github.com/erikbern/ann-benchmarks。如需使用“cohere-768-cosine”数据,请提交工单获取。

    图1 下载“sift-128-euclidean”“gist-960-euclidean”数据
  • 准备测试工具。

性能测试的操作步骤

  1. 创建一个弹性云服务器ECS,用于安装压测工具和执行测试脚本。操作指导请参见快速购买和使用Linux ECS
    • ECS必须和Elasticsearch集群在同一个虚拟私有云和安全组中。
    • 也可以使用其他客户端服务器,但是必须保证服务器和Elasticsearch集群在同一VPC。
  2. 将测试数据集上传到ECS上。
  3. 将数据写入和召回率测试脚本上传到ECS上,并执行如下命令。
    pip install h5py
    pip install elasticsearch==7.10
    
    python3 base_test_example.py

    执行完成后,会创建测试的向量索引,写入测试数据,并返回平均查询召回率Recall。

  4. 在ECS上安装开源压测工具Wrk。
  5. 在ECS上准备压测的查询请求文件,用于模拟真实业务场景。参考脚本prepare_query.py
    pip install h5py
    
    python3 prepare_query.py
  6. 在ECS上准备Wrk的压测配置脚本。参考脚本perf.lua,脚本中的查询请求文件名称、集群访问地址和索引名称需要根据实际环境修改。
  7. 在ECS执行如下命令进行向量检索的性能压测。
    wrk -c60 -t60 -d10m -s perf.lua http://x.x.x.x:9200 
    • “t”表示压测线程数。
    • “c”表示与服务端的连接数。
    • “d”表示压测时间,“10m”表示10分钟 。
    • “s”表示Wrk的压测配置脚本。
    • “x.x.x.x”表示Elasticsearch集群的访问地址。

    在回显中获得测试数据,其中“Requests/sec”即查询吞吐量QPS。

    图2 测试结果示例

性能测试比较

  • GRAPH类索引

    百万规模的场景推荐使用GRAPH索引类型。

    • 测试方案一:使用不同维度的数据集,在Top10召回率均达到99%的情况下,测试向量数据库能支撑的最大QPS。每个数据集均基于默认参数和调优参数分别进行测试,通过调整构建参数可以使得图索引结构更优,在同等召回精度下能取得更高的查询性能。
      测试结果
      表1 GRAPH类索引测试结果1

      数据集

      构建参数

      查询参数

      性能指标

      efc

      shrink

      ef

      max_scan_num

      QPS

      Recall

      sift-128-euclidean

      200

      1.0

      84

      10000

      15562

      0.99

      500

      0.8

      50

      10000

      17332

      0.99

      cohere-768-cosine

      200

      1.0

      154

      10000

      3232

      0.99

      500

      0.95

      106

      10000

      3821

      0.99

      gist-960-euclidean

      200

      1.0

      800

      19000

      860

      0.99

      500

      0.9

      400

      15000

      1236

      0.99

      结论:对于不同的数据集,使用默认参数均能达到99%以上的召回率。在进一步调整构建参数和查询参数后,增加了一定的索引构建开销,同时也达到更高的查询性能。

    • 测试方案二:使用同一数据集,通过调整索引参数,测试不同召回率下的查询性能。本方案用COHERE数据集,分别测试了Top10召回率为99%、98%及95%时的集群最大QPS。
      测试结果:
      表2 GRAPH类索引测试结果1

      数据集

      构建参数

      查询参数

      性能指标

      efc

      ef

      QPS

      Recall

      cohere-768-cosine

      500

      128

      3687

      0.99

      500

      80

      5320

      0.98

      500

      36

      9028

      0.95

      结论:同一集群在统一索引构建参数的情况下,通过调整ef参数可以获得不同的查询精度,在略微损失召回率的场景下可以获得成倍的性能提升。

  • GRAPH_PQ类索引

    基于图算法的索引为了保证查询性能通常需要常驻内存,因此当向量维度较高或数据量较大时,内存资源成为影响成本及性能的关键因素。具体来说,高维度的向量和大规模的数据集对内存的需求显著增加,这不仅关系到存储成本,还直接影响到索引算法的运行效率和响应速度。该场景推荐使用GRAPH_PQ索引类型。

    测试方案:使用维度较高的COHERE与GIST数据集,测试在Top10召回率达到95%时的集群最大QPS,并与GRAPH索引对比常驻内存开销

    测试结果:
    表3 GRAPH_PQ类索引测试结果

    数据集

    构建参数

    查询参数

    性能指标

    内存开销

    efc

    fragment_num

    ef

    topk

    QPS

    Recall

    GRAPH_PQ

    GRAPH

    cohere-768-cosine

    200

    64

    85

    130

    8723

    0.95

    332MB

    3.3GB

    gist-960-euclidean

    200

    120

    200

    360

    4267

    0.95

    387MB

    4.0GB

    结论:结果显示使用GRAPH_PQ类索引能够在节约10倍+内存开销的情况下,取得与GRAPH索引差不多的精度和性能。因此,CSS向量索引的GRAPH_PQ算法融合了图索引与量化算法,能够大幅降低内存的开销,提升单机的数据容量。

测试数据中涉及的索引参数说明请参见表4,关于构建参数的详细说明请参见在Elasticsearch集群创建向量索引,关于查询参数的详细说明请参见在Elasticsearch集群使用向量索引搜索数据

表4 索引参数说明

类型

参数名称

说明

构建参数

efc

构建hnsw时考察邻居节点的队列大小,默认值为200,值越大精度越高,构建速度将会变慢。

shrink

构建hnsw时的裁边系数,默认值为1.0f。

fragment_num

段数,默认值为0,插件自动根据向量长度设置合适的段数。

查询参数

ef

查询时考察邻居节点的队列大小。值越大查询精度越高,查询速度会变慢。默认值为200。

max_scan_num

扫描节点上限。值越大精度越高,查询速度变慢。默认值为10000。

topk

查询时返回top k条数据。

脚本“base_test_example.py”

# -*- coding: UTF-8 -*-
import json
import time

import h5py
from elasticsearch import Elasticsearch
from elasticsearch import helpers

def get_client(hosts: list, user: str = None, password: str = None):
    if user and password:
        return Elasticsearch(hosts, http_auth=(user, password), verify_certs=False, ssl_show_warn=False)
    else:
        return Elasticsearch(hosts)

# 索引参数说明请参见在Elasticsearch集群创建向量索引。
def create(es_client, index_name, shards, replicas, dim, algorithm="GRAPH",
           metric="euclidean", neighbors=64, efc=200, shrink=1.0):
    index_mapping = {
        "settings": {
            "index": {
                "vector": True
            },
            "number_of_shards": shards,
            "number_of_replicas": replicas,
        },
        "mappings": {
            "properties": {
                "id": {
                    "type": "integer"
                },
                "vec": {
                    "type": "vector",
                    "indexing": True,
                    "dimension": dim,
                    "algorithm": algorithm,
                    "metric": metric,
                    "neighbors": neighbors,
                    "efc": efc,
                    "shrink": shrink,
                }
            }
        }
    }
    es_client.indices.create(index=index_name, body=index_mapping)
    print(f"Create index success! Index name: {index_name}")

def write(es_client, index_name, vectors, bulk_size=1000):
    print("Start write! Index name: " + index_name)
    start = time.time()
    for i in range(0, len(vectors), bulk_size):
        actions = [{
            "_index": index_name,
            "id": i + j,
            "vec": v.tolist()
        } for j, v in enumerate(vectors[i: i + bulk_size])]
        helpers.bulk(es_client, actions, request_timeout=180)
    print(f"Write success! Docs count: {len(vectors)}, total cost: {time.time() - start:.2f} seconds")
    merge(es_client, index_name)

def merge(es_client, index_name, seg_cnt=1):
    print(f"Start merge! Index name: {index_name}")
    start = time.time()
    es_client.indices.forcemerge(index=index_name, max_num_segments=seg_cnt, request_timeout=7200)
    print(f"Merge success! Total cost: {time.time() - start:.2f} seconds")

# 查询参数说明请参考见在Elasticsearch集群使用向量索引搜索数据。
def query(es_client, index_name, queries, gts, size=10, k=10, ef=200, msn=10000):
    print("Start query! Index name: " + index_name)
    i = 0
    precision = []
    for vec in queries:
        hits = set()
        dsl = {
            "size": size,
            "stored_fields": ["_none_"],
            "docvalue_fields": ["id"],
            "query": {
                "vector": {
                    "vec": {
                        "vector": vec.tolist(),
                        "topk": k,
                        "ef": ef,
                        "max_scan_num": msn
                    }
                }
            }
        }
        res = es_client.search(index=index_name, body=json.dumps(dsl))
        for hit in res['hits']['hits']:
            hits.add(int(hit['fields']['id'][0]))
        precision.append(len(hits.intersection(set(gts[i, :size]))) / size)
        i += 1
    print(f"Query complete! Average precision: {sum(precision) / len(precision)}")

def load_test_data(src):
    hdf5_file = h5py.File(src, "r")
    base_vectors = hdf5_file["train"]
    query_vectors = hdf5_file["test"]
    ground_truths = hdf5_file["neighbors"]
    return base_vectors, query_vectors, ground_truths

def test_sift(es_client):
    index_name = "index_sift_graph"
    vectors, queries, gts = load_test_data(r"sift-128-euclidean.hdf5")
    # 根据实际测试需求调整分片和副本数、索引算法、索引参数等。本文性能测试均配置的是1个分片、2个副本。
    create(es_client, index_name, shards=1, replicas=2, dim=128)
    write(es_client, index_name, vectors)
    query(es_client, index_name, queries, gts)

if __name__ == "__main__":
    # 此处修改为CSS集群的实际访问地址。
    client = get_client(['http://x.x.x.x:9200'])
    test_sift(client)

脚本“prepare_query.py”

import base64
import json
import struct

import h5py

def prepare_query(src, dst, size=10, k=10, ef=200, msn=10000, metric="euclidean", rescore=False, use_base64=True):
    """
    使用该函数从hdf5格式的源数据文件读取出query向量,并生成完整的query请求体,用于性能压测。
    :param src: hdf5格式的源数据文件路径。
    :param dst: 目标文件路径。
    :param size: 指定查询返回结果数。
    :param k: 指定Segment级别索引查询topk条相似结果。
    :param ef: 索引查询参数,用于指定查询过程使用的队列大小。
    :param msn: 索引查询参数,用于指定max_scan_num。
    :param metric: 使用何种度量进行rescore精排,euclidean、cosine、inner_product等。
    :param rescore: 是否使用rescore精排,对于GRAPH_PQ索引可以开启精排。
    :param use_base64: 是否使用base64编码的向量数据。
    """
    hdf5_file = h5py.File(src, "r")
    query_vectors = hdf5_file["test"]
    with open(dst, "w", encoding="utf8") as fw:
        for vec in query_vectors:
            query_template = {
                "size": size,
                "stored_fields": ["_none_"],
                "docvalue_fields": ["id"],
                "query": {
                    "vector": {
                        "vec": {
                            "vector": vec.tolist() if not use_base64 else floats2base64(vec),
                            "topk": k,
                            "ef": ef,
                            "max_scan_num": msn,
                        }
                    }
                }
            }
            if rescore:
                query_template["query"]["rescore"] = {
                    "window_size": k,
                    "vector_rescore": {
                        "field": "vec",
                        "vector": vec.tolist() if not use_base64 else floats2base64(vec),
                        "metric": metric
                    }
                }
            fw.write(json.dumps(query_template))
            fw.write("\n")

def floats2base64(vector):
    data = struct.pack('<{}f'.format(len(vector)), *vector)
    return base64.b64encode(data).decode()

if __name__ == "__main__":
    # 修改为数据文件实际地址。
    prepare_query(r"/path/to/sift-128-euclidean.hdf5", r"requests.txt")

脚本“perf.lua”

local random = math.random

local reqs = {}
local cnt = 0

-- 压测的查询请求文件名称根据需要调整。
for line in io.lines("requests.txt") do
    table.insert(reqs, line)
    cnt = cnt + 1
end


local addrs = {}
local counter = 0
function setup(thread)
   local append = function(host, port)
      for i, addr in ipairs(wrk.lookup(host, port)) do
         if wrk.connect(addr) then
            addrs[#addrs+1] = addr
         end
      end
   end

   if #addrs == 0 then
      -- 根据集群的实际地址进行修改。
      append("x.x.x.x", 9200)
      append("x.x.x.x", 9200)
      append("x.x.x.x", 9200)
   end

   local index = counter % #addrs + 1
   counter = counter + 1
   thread.addr = addrs[index]
end

-- 索引名称根据需要调整。
wrk.path = "/index_sift_graph/_search?request_cache=false&preference=_local"
wrk.method = "GET"
wrk.headers["Content-Type"] = "application/json"


function request()
    return wrk.format(wrk.method, wrk.path, wrk.headers, reqs[random(cnt)])
end

我们使用cookie来确保您的高速浏览体验。继续浏览本站,即表示您同意我们使用cookie。 详情

文档反馈

文档反馈

意见反馈

0/500

标记内容

同时提交标记内容