云搜索服务 CSS
云搜索服务 CSS
- 最新动态
- 功能总览
- 服务公告
- 产品介绍
- 计费说明
- 快速入门
-
用户指南
- CSS服务权限管理
-
使用Elasticsearch搜索数据
- Elasticsearch使用流程
- Elasticsearch集群规划建议
- 创建Elasticsearch集群
- 访问Elasticsearch集群
- 导入数据至Elasticsearch集群
- 使用Elasticsearch集群搜索数据
- 增强Elasticsearch集群搜索能力
- 配置Elasticsearch集群网络
- 备份与恢复Elasticsearch集群数据
- 扩缩容Elasticsearch集群
- 升级Elasticsearch集群版本
- 管理Elasticsearch集群
- 管理Elasticsearch集群索引策略
- Elasticsearch集群监控与日志管理
- 查看Elasticsearch集群审计日志
- 使用OpenSearch搜索数据
- 使用Logstash迁移数据
- CSS服务资源监控
- 最佳实践
- API参考
- SDK参考
-
常见问题
- 产品咨询
- CSS集群访问
- CSS集群迁移
-
CSS集群搜索引擎使用
- CSS服务中为什么新创建的索引分片集中分配到单节点上?
- CSS服务中Elasticsearch 7.x集群如何在index下创建type?
- CSS服务中如何配置Elasticsearch索引副本数量?
- CSS服务中Elasticsearch集群分片过多会有哪些影响?
- 如何查看CSS集群的分片数以及副本数?
- CSS服务中Elasticsearch集群的节点node.roles为i表示什么意思?
- CSS服务中如何设置Elasticsearch集群的默认分页返回最大条数?
- CSS服务中如何更新Elasticsearch生命周期策略?
- CSS服务中如何设置Elasticsearch集群慢查询日志的阈值?
- CSS服务中如何清理Elasticsearch索引数据?
- CSS服务中如何清理Elasticsearch缓存?
- 使用delete_by_query命令删除Elasticsearch集群数据后,为什么磁盘使用率反而增加?
- CSS服务的Elasticsearch集群是否支持script dotProduct?
-
CSS集群管理
- 如何查看CSS集群所分布的可用区?
- CSS服务中Filebeat版本与集群版本的关系是什么?
- 如何获取CSS服务的安全证书?
- CSS服务中如何转换CER安全证书的格式?
- CSS服务中Elasticsearch和OpenSearch集群支持修改安全组吗?
- CSS服务中Elasticsearch集群如何设置search.max_buckets参数?
- CSS服务中如何修改Elasticsearch和OpenSearch集群的TLS算法?
- CSS服务中如何开启Elasticsearch和OpenSearch集群的安全审计日志?
- CSS服务中是否支持停止集群?
- CSS集群冻结索引后如何查询OBS上的索引占用量?
- 如何查看Elasticsearch和OpenSearch集群的系统默认插件列表
- CSS集群备份与恢复
- CSS集群监控与运维
-
故障排除
-
访问集群类
- 无法正常打开Kibana
- Elasticsearch针对filebeat配置调优
- Spring Boot使用Elasticsearch出现Connection reset by peer问题
- 为什么集群创建失败
- Elasticsearch集群出现写入拒绝“Bulk Reject”,如何解决?
- Elasticsearch集群创建index pattern卡住,如何解决?
- 云搜索控制台页面提示系统繁忙
- Elasticsearch集群报错:unassigned shards all indices
- es-head插件连接Elasticsearch集群报跨域错误
- 单节点集群打开Cerebro界面显示告警
- ECS无法连接到集群
- 集群不可用
- 数据导入导出类
-
功能使用类
- 无法备份索引
- 无法使用自定义词库功能
- 快照仓库找不到
- 集群一直处于快照中
- 数据量很大,如何进行快照备份?
- 集群突现load高的故障排查
- 使用ElasticSearch的HLRC(High Level Rest Client)时,报出I/O Reactor STOPPED
- Elasticsearch集群最大堆内存持续过高(超过90%)
- Elasticsearch集群更改规格失败
- 安全集群索引只读状态修改报错
- Elasticsearch集群某一节点分配不到shard
- 集群索引插入数据失败
- CSS创建索引报错“maximum shards open”
- 删除索引报错“403 Forbidden”是什么原因?
- Kibana中删除index pattern报错Forbidden
- 执行命令update-by-query报错“Trying to create too many scroll contexts”
- Elasticsearch集群无法创建pattern
- 端口访问类
-
访问集群类
- 视频帮助
- 产品术语
-
更多文档
-
用户指南(阿布扎比区域)
- 产品介绍
- 快速入门
- 权限管理
- 创建并接入集群
- 集群形态变更
- 导入数据到Elasticsearch
- 管理Elasticsearch类型集群
- 向量检索
- 使用Kibana相关操作
- 查询Elasticsearch SQL
- 增强特性
- 监控
- 审计
-
常见问题
- 产品咨询类
-
功能使用相关
- Elasticsearch是否支持不同VPC之间的数据迁移?
- 如何跨Region迁移CSS集群?
- 如何设置云搜索服务的慢查询日志的阈值?
- 如何更新CSS生命周期策略?
- 如何批量设置索引副本数为0?
- 为什么新创建的索引分片全部被分配到一个node节点上?
- 如何查询快照信息?
- 购买的低版本集群是否可以升级为高版本集群
- 集群被删除后是否还能恢复?
- 如何修改Elasticsearch集群的TLS算法?
- ES集群如何设置search.max_buckets参数?
- Elasticsearch集群中某个客户端节点的node.roles为i表示该节点是ingest节点吗?
- Elasticsearch 7.x集群如何在index下创建type?
- 安全模式集群相关
- 资源使用和更改相关
- 组件使用
- Kibana使用相关
- 访问集群相关
- 端口使用
- 修订记录
- API参考(阿布扎比区域)
-
用户指南(巴黎、阿姆斯特丹区域)
- 产品介绍
- 快速入门
- 权限管理
- 创建并接入集群
- 集群形态变更
- 导入数据到Elasticsearch
- 管理Elasticsearch类型集群
- 向量检索
- 使用Kibana相关操作
- 查询Elasticsearch SQL
- 增强特性
- 监控
- 审计
- 最佳实践
-
常见问题
- 产品咨询类
-
功能使用相关
- Elasticsearch是否支持不同VPC之间的数据迁移?
- 如何跨Region迁移CSS集群?
- 如何设置云搜索服务的慢查询日志的阈值?
- 如何更新CSS生命周期策略?
- 如何批量设置索引副本数为0?
- 为什么新创建的索引分片全部被分配到一个node节点上?
- 如何查询快照信息?
- 购买的低版本集群是否可以升级为高版本集群
- 集群被删除后是否还能恢复?
- 如何修改Elasticsearch集群的TLS算法?
- Elasticsearch集群如何设置search.max_buckets参数?
- Elasticsearch集群中某个客户端节点的node.roles为i表示该节点是ingest节点吗?
- Elasticsearch 7.x集群如何在index下创建type?
- 安全模式集群相关
- 资源使用和更改相关
- 组件使用
- Kibana使用相关
- 访问集群相关
- 端口使用
- 修订记录
- API参考 (巴黎、阿姆斯特丹区域)
-
用户指南(吉隆坡区域)
- 产品介绍
- 快速入门
- 权限管理
- 创建并接入集群
- 集群形态变更
- 导入数据到Elasticsearch
- 管理Elasticsearch类型集群
- 向量检索
- 使用Kibana相关操作
- 查询Elasticsearch SQL
- 增强特性
- 监控
- 审计
-
常见问题
- 产品咨询类
-
功能使用相关
- Elasticsearch是否支持不同VPC之间的数据迁移?
- 如何跨Region迁移CSS集群?
- 如何设置云搜索服务的慢查询日志的阈值?
- 如何更新CSS生命周期策略?
- 如何批量设置索引副本数为0?
- 为什么新创建的索引分片全部被分配到一个node节点上?
- 如何查询快照信息?
- 购买的低版本集群是否可以升级为高版本集群
- 集群被删除后是否还能恢复?
- 如何修改Elasticsearch集群的TLS算法?
- ES集群如何设置search.max_buckets参数?
- Elasticsearch集群中某个客户端节点的node.roles为i表示该节点是ingest节点吗?
- Elasticsearch 7.x集群如何在index下创建type?
- 安全模式集群相关
- 资源使用和更改相关
- 组件使用
- Kibana使用相关
- 访问集群相关
- 端口使用
- 修订记录
- API参考(吉隆坡区域)
-
用户指南(安卡拉区域)
- 产品介绍
- 快速入门
- 权限管理
- 查看集群运行状态和存储容量状态
- 集群列表概览
- 部署跨AZ集群
- Elasticsearch
- 导入数据到Elasticsearch
- 监控
- 审计
-
常见问题
- 产品咨询类
-
功能使用相关
- Elasticsearch是否支持不同VPC之间的数据迁移?
- 如何跨Region迁移CSS集群?
- 如何设置云搜索服务的慢查询日志的阈值?
- 如何更新CSS生命周期策略?
- 如何批量设置索引副本数为0?
- 为什么新创建的索引分片全部被分配到一个node节点上?
- 如何查询快照信息?
- 购买的低版本集群是否可以升级为高版本集群
- 集群被删除后是否还能恢复?
- 如何修改Elasticsearch集群的TLS算法?
- ES集群如何设置search.max_buckets参数?
- Elasticsearch集群中某个客户端节点的node.roles为i表示该节点是ingest节点吗?
- Elasticsearch 7.x集群如何在index下创建type?
- 安全模式集群相关
- 资源使用和更改相关
- 组件使用
- Kibana使用相关
- 访问集群相关
- 端口使用
- 修订记录
- API参考(安卡拉区域)
-
用户指南(阿布扎比区域)
- 通用参考
链接复制成功!
向量检索的客户端代码示例
Elasticsearch提供了标准的REST接口,以及Java、Python、Go等语言编写的客户端。
基于开源数据集SIFT1M(http://corpus-texmex.irisa.fr/)和Python Elasticsearch Client,本节提供一份创建向量索引、导入向量数据和查询向量数据的代码示例,介绍如何使用客户端实现向量检索。
前提条件
客户端已经安装python依赖包。如果未安装可以执行如下命令安装:
pip install numpy pip install elasticsearch==7.6.0
代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
import numpy as np import time import json from concurrent.futures import ThreadPoolExecutor, wait from elasticsearch import Elasticsearch from elasticsearch import helpers endpoint = 'http://xxx.xxx.xxx.xxx:9200/' # 构建es客户端对象 es = Elasticsearch(endpoint) # 索引mapping信息 index_mapping = ''' { "settings": { "index": { "vector": "true" } }, "mappings": { "properties": { "my_vector": { "type": "vector", "dimension": 128, "indexing": true, "algorithm": "GRAPH", "metric": "euclidean" } } } } ''' # 创建索引 def create_index(index_name, mapping): res = es.indices.create(index=index_name, ignore=400, body=mapping) print(res) # 删除索引 def delete_index(index_name): res = es.indices.delete(index=index_name) print(res) # 刷新索引 def refresh_index(index_name): res = es.indices.refresh(index=index_name) print(res) # 索引段合并 def merge_index(index_name, seg_cnt=1): start = time.time() es.indices.forcemerge(index=index_name, max_num_segments=seg_cnt, request_timeout=36000) print(f"在{time.time() - start}秒内完成merge") # 加载向量数据 def load_vectors(file_name): fv = np.fromfile(file_name, dtype=np.float32) dim = fv.view(np.int32)[0] vectors = fv.reshape(-1, 1 + dim)[:, 1:] return vectors # 加载ground_truth数据 def load_gts(file_name): fv = np.fromfile(file_name, dtype=np.int32) dim = fv.view(np.int32)[0] gts = fv.reshape(-1, 1 + dim)[:, 1:] return gts def partition(ls, size): return [ls[i:i + size] for i in range(0, len(ls), size)] # 写入向量数据 def write_index(index_name, vec_file): pool = ThreadPoolExecutor(max_workers=8) tasks = [] vectors = load_vectors(vec_file) bulk_size = 1000 partitions = partition(vectors, bulk_size) start = time.time() start_id = 0 for vecs in partitions: tasks.append(pool.submit(write_bulk, index_name, vecs, start_id)) start_id += len(vecs) wait(tasks) print(f"在{time.time() - start}秒内完成写入") def write_bulk(index_name, vecs, start_id): actions = [ { "_index": index_name, "my_vector": vecs[j].tolist(), "_id": str(j + start_id) } for j in range(len(vecs)) ] helpers.bulk(es, actions, request_timeout=3600) # 查询索引 def search_index(index_name, query_file, gt_file, k): print("Start query! Index name: " + index_name) queries = load_vectors(query_file) gt = load_gts(gt_file) took = 0 precision = [] for idx, query in enumerate(queries): hits = set() query_json = { "size": k, "_source": False, "query": { "vector": { "my_vector": { "vector": query.tolist(), "topk": k } } } } res = es.search(index=index_name, body=json.dumps(query_json)) for hit in res['hits']['hits']: hits.add(int(hit['_id'])) precision.append(len(hits.intersection(set(gt[idx, :k]))) / k) took += res['took'] print("precision: " + str(sum(precision) / len(precision))) print(f"在{took / 1000:.2f}秒内完成检索,平均took大小为{took / len(queries):.2f}毫秒") if __name__ == "__main__": vec_file = r"./data/sift/sift_base.fvecs" qry_file = r"./data/sift/sift_query.fvecs" gt_file = r"./data/sift/sift_groundtruth.ivecs" index = "test" create_index(index, index_mapping) write_index(index, vec_file) merge_index(index) refresh_index(index) search_index(index, qry_file, gt_file, 10) |
父主题: 向量检索