Vearch 是对大规模深度学习向量进行高性能相似搜索的弹性分布式系统。可以做一个和mysql类比的抽象理解,vearch就是一个分布式数据库,只不过存的数据的某些属性可能是向量。下面对vearch里数据相关一些名词做解释:
table
space
上的数据分布在多个partition上,每个partition只能存在一台机器上,一个机器可以有多个partitiontype
,如果type是vector
,表明这个field是一个向量在verach运行过程中,机器分为三种角色:
ps
db、space、partition
的增删,还有对集群的维护操作partition
,ps的工作就是接收router
的请求并调用c++ faiss库
负责具体的向量运算了,是真正的worker
向量召回的步骤简单理解为这样:
如果有了vearch,上述步骤[3]和步骤[5]都可以变成一条rpc请求
插入&批量插入
接口存在vearch里查询(search)
接口找到topn相近的向量这么做的好处是:
但也存在一些担忧:
内积
即可,但是如果有的业务需要返回物料的向量,比如我要top3000
物料的向量,那体积过大,明显不现实,只有把物料向量存在本地才能这么玩在上述过程中,业务方主要需要使用的这几个接口:
单条插入&批量插入
查询
下面对vearch接口工作流程进行一个梳理
Vearch对用户http请求调用步骤大致都是:
RouterRequest
RouterRequest
物料相关的信息RouterRequest
给partition所在的ps机器这里提出几个问题,最后回答
单条插入的请求如下
curl -XPOST -H "content-type: application/json" -d' { "field1": "value1", "field2": "value2", "field3": { "feature": [0.1, 0.2] } } ' http://router_server/$db_name/$space_name 复制代码
请求中的field
与space的field
一一对应,如果field类型是向量,通过feature:[xxx, xxx]
写入,并且维数要与space中该field对应
对应调用handleUpdateDoc()
方法,这个方法通过传入的http请求初始化一个UpdateRequest
,这里需要注意的是为UpdateRequest
设置PKey这里
URLParamID = "_id" args.Doc.PKey = params.ByName(URLParamID) 复制代码
如果传入参数有 _id
,那么PKey就等于传入的_id
,否则为""(空)。我们这里是插入,不需要传入_id
,所以_id
为空。而当调用查询之类接口时,会传入_id
。插入新物料时,后续的SetDocsField(docs)
方法中会为插入的物料自动生成一个id,方法大概就是自增,这里不深究,只要知道router会为新插入的物料生成唯一id就行了。
完成后,调用updateDoc()
函数处理初始化的UpdateRequest
。
updateDoc
函数通过传入的 pb UpdateRequest
构建一个RouterRequest
并发送给partition
,步骤分为:
RouterRequest
RouterRequest
结构如下
head
里是请求的基本信息:包括用户名,密码,目标dbname以及spacenamemd
是一个map,记录了请求的方法和id
HandlerType
时,value表示该请求对应的方法(增删改查)MessageID
时,value表示本条请求的唯一iddocs
是本条请求的物料信息space
是本条请求对应的space的信息sendMap
的key是partitionID,value是要发给这个partition的信息,其中items
包含了doc信息,其他别的借口时候再补充updateDoc
关键的步骤代码如下:
request := client.NewRouterRequest(ctx, docService.client) request.SetMsgID() .SetMethod(client.ReplaceDocHandler) .SetHead(args.Head) .SetSpace() .SetDocs(docs) .SetDocsField() .PartitionDocs() items := request.Execute() 复制代码
首先通过一串函数装填RouterRequest
SetMsgID()
为本条request生成唯一id,填入md[MessageID]
。SetMethod()
填写md[HandlerType]
,表明本request是一条更新请求SetSpace()
填写request的space
字段,获取方式是先从router本地缓存找,找不到就去etcd里拿SetDocs()
填写request的docs
数组SetDocsField()
为docs
数组里每一个doc
填写PKey
和Fields
字段,
generateUUID
为doc自动生成一个idPartitionDocs()
填充sendMap
字段,就是把docs
字段里的doc都加进sendmap[id]
的items
数组里
RouterRequest
装填完毕,下一步就是发送了
发送是通过RouterRequest
的Execute()
函数
所有要给partition
发送请求的接口最后都会落在这个函数上,这个函数分为两步
rpcClient
先来看构建rpcClient
上一步中,我们已经确认了要发往每个partition
的数据,存在RouterRequest
的sendMap
成员里
首先通过partitionID
获取对应partition的信息,包括机器地址等信息。获取的方式就是先从本地缓存中取,如果没有就从ETCD
里拿,这里介绍一下router的本地缓存,router本地缓存如下图所示:
缓存相关都在router.client.masterclient.cliCache
下面
router.client.masterclient.cliCache
本身继承了sync.map
,存储了nodeID
对应的rpcClient
,避免多次创建partitionCache
里存放了partitionID
对应的partition相关信息,包括机器节点ID(没有地址)serverCache
里存放了NodeID
对应的机器信息,包括IP、端口等etcd
获取相关数据,router.client.masterclient.store
就是etcd相关继续回到构建rpcClient
,构建它的关键就是填写ip和端口。从缓存和etcd拿到nodeID
后,调用GetOrCreateRPCClient(ctx, nodeID)
,同样从缓存和etcd拿到nodeID对应的具体机器信息(地址、端口),并构造一个rpcClient
,完毕
构建好以后,发送就完事了,远程调用的方法是UnaryHandler
方法入口是UnaryHandler.Execute
,根据请求是插入,调用update(ctx, store, req.Items)
这里贴一段代码,后面进入gamma引擎了,这里不做研究
func update(ctx context.Context, store PartitionStore, items []*vearchpb.Item) { item := items[0] docGamma := &gamma.Doc{Fields: item.Doc.Fields} docBytes := docGamma.Serialize() docCmd := &vearchpb.DocCmd{Type: vearchpb.OpType_REPLACE, Doc: docBytes} if err := store.Write(ctx, docCmd, nil, nil); err != nil { log.Error("Add doc failed, err: [%s]", err.Error()) item.Err = vearchpb.NewError(vearchpb.ErrorEnum_INTERNAL_ERROR, err).GetError() } else { item.Doc.Fields = nil } } 复制代码
批量插入的请求如下,每一个插入的物料要两行
curl -H "content-type: application/json" -XPOST -d' {"index": {"_id": "v1"}}\n {"field1": "value", "field2": {"feature": []}}\n {"index": {"_id": "v2"}}\n {"field1": "value", "field2": {"feature": []}}\n ' http://router_server/$db_name/$space_name/_bulk 复制代码
router中对应的处理方法是handleBulk()
,该方法通过http请求初始化一个BulkRequest
,主要就是解析请求中每一个doc
,把他们填入BulkRequest.docs
里,BulkRequest
结构如下:
message BulkRequest{ RequestHead head = 1; repeated Document docs = 4; } 复制代码
填充完后,调用bulk()
方法填充一个RouterRequest
并发送,步骤和单条插入里的updateDoc()
方法类似。
reply := handler.docService.bulk(ctx, args) 复制代码
与单条插入不同的是,批量插入的rpc请求中call的方法是BatchHandler
,ps接到router批量插入请求,调用对应的处理方法是bulk()
查询接口示例如下
curl -H "content-type: application/json" -XPOST -d' { "query": { "sum": [{ "field": "field_name", "feature": [0.1, 0.2, 0.3, 0.4, 0.5], "min_score": 0.9, "boost": 0.5 }], "filter": [{ "range": { "field_name": { "gte": 160, "lte": 180 } } }, { "term": { "field_name": ["100", "200", "300"], "operator": "or" } }] }, "retrieval_param": { "nprobe": 20 }, "fields": ["field1", "field2"], "is_brute_search": 0, "online_log_level": "debug", "quick": false, "vector_value": false, "client_type": "leader", "l2_sqrt": false, "sort": [{"field1":{"order": "asc"}}], "size": 10 } ' http://router_server/$db_name/$space_name/_search 复制代码
工作的方法与前面大同小异,无非是构造请求然后发送,这里mark一下重要的参数
sum:
跟需要查询的特征,下面又有几个参数:
select col from table
中的 col
size
指定最多返回的结果数量,通过这个参数指定topN
。若请求url中设置了size值http://router_server/$db_name/$space_name/_search?size=20优先使用url中指定的size值。quick
搜索结果默认将PQ召回向量进行计算和精排,为了加快服务端处理速度设置成true可以指定只召回,不做计算和精排。(这个不是很理解)router.client.masterclient.cliCache
下面
router.client.masterclient.cliCache
本身继承了sync.map
,存储了nodeID
对应的rpcClient
,避免多次创建partitionCache
里存放了partitionID
对应的partition相关信息,包括机器节点ID(没有地址)serverCache
里存放了NodeID
对应的机器信息,包括IP、端口等etcd
获取相关数据,router.client.masterclient.store
就是etcd相关partitionID := r.space.PartitionId(murmur3.Sum32WithSeed(cbbytes.StringToByte(doc.PKey), 0)) 复制代码