提交 00965c9e authored 作者: limeng's avatar limeng

修改批量

上级 c8d92aa2
...@@ -53,7 +53,7 @@ func GetCount(req GetCountReq) (count int64,err error) { ...@@ -53,7 +53,7 @@ func GetCount(req GetCountReq) (count int64,err error) {
} }
//根据搜索条件 获取数据 //根据搜索条件 获取数据
func GetList(req GetListReq) (searchResult *elastic.SearchResult) { func GetList(req GetListReq) (searchResult *elastic.SearchResult,err error) {
//检测是from_size查询还是search_after滚动式查询 //检测是from_size查询还是search_after滚动式查询
isSearchAfter, from, size := checkParam(req) isSearchAfter, from, size := checkParam(req)
//连接Es //连接Es
...@@ -82,7 +82,7 @@ func GetList(req GetListReq) (searchResult *elastic.SearchResult) { ...@@ -82,7 +82,7 @@ func GetList(req GetListReq) (searchResult *elastic.SearchResult) {
Do(ctx) Do(ctx)
} }
return searchResult return
} }
//检验参数 区分 分页查询 滚动式查询 //检验参数 区分 分页查询 滚动式查询
...@@ -107,11 +107,11 @@ func Bulk(req BulkReq) (bulkIndex *BulkIndex, err error) { ...@@ -107,11 +107,11 @@ func Bulk(req BulkReq) (bulkIndex *BulkIndex, err error) {
switch req.Type { switch req.Type {
case "add": case "add":
bulkIndexAdd = BulkAdd(req.IndexName) bulkIndexAdd = BulkAdd(req)
case "update": case "update":
bulkIndexUpdate = BulkUpdate(req.IndexName) bulkIndexUpdate = BulkUpdate(req)
case "del": case "del":
bulkIndexDel = BulkDel(req.IndexName) bulkIndexDel = BulkDel(req)
} }
if bulkIndexAdd != nil { if bulkIndexAdd != nil {
...@@ -127,20 +127,20 @@ func Bulk(req BulkReq) (bulkIndex *BulkIndex, err error) { ...@@ -127,20 +127,20 @@ func Bulk(req BulkReq) (bulkIndex *BulkIndex, err error) {
} }
//批量处理 准备数据 1.添加 //批量处理 准备数据 1.添加
func BulkAdd(indexName string) (indexReq *elastic.BulkIndexRequest) { func BulkAdd(req BulkReq) (indexReq *elastic.BulkIndexRequest) {
indexReq = elastic.NewBulkIndexRequest().Index(indexName).Type(docType) indexReq = elastic.NewBulkIndexRequest().Index(req.IndexName).Type(docType).Id(req.DocId).Routing(req.RoutingId).Doc(req.Doc)
return return
} }
//批量处理 准备数据 1.删除 //批量处理 准备数据 1.删除
func BulkDel(indexName string) (indexReq *elastic.BulkDeleteRequest) { func BulkDel(req BulkReq) (indexReq *elastic.BulkDeleteRequest) {
indexReq = elastic.NewBulkDeleteRequest().Index(indexName).Type(docType) indexReq = elastic.NewBulkDeleteRequest().Index(req.IndexName).Type(docType).Id(req.DocId).Routing(req.RoutingId)
return return
} }
//批量处理 准备数据 1.更新 //批量处理 准备数据 1.更新
func BulkUpdate(indexName string) (indexReq *elastic.BulkUpdateRequest) { func BulkUpdate(req BulkReq) (indexReq *elastic.BulkUpdateRequest) {
indexReq = elastic.NewBulkUpdateRequest().Index(indexName).Type(docType) indexReq = elastic.NewBulkUpdateRequest().Index(req.IndexName).Type(docType).Id(req.DocId).Routing(req.RoutingId).Doc(req.Doc)
return return
} }
......
...@@ -5,6 +5,9 @@ import "github.com/olivere/elastic/v7" ...@@ -5,6 +5,9 @@ import "github.com/olivere/elastic/v7"
type BulkReq struct { type BulkReq struct {
Type string `json:"type"` Type string `json:"type"`
IndexName string `json:"index_name"` IndexName string `json:"index_name"`
DocId string `json:"doc_id"`
RoutingId string `json:"routing_id"`
Doc interface{} `json:"doc"`
} }
type BulkIndex struct { type BulkIndex struct {
*elastic.BulkIndexRequest *elastic.BulkIndexRequest
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论