提交 f06524a7 authored 作者: limeng's avatar limeng

测试例子

上级 00965c9e
......@@ -3,7 +3,6 @@ package elastic
import (
"context"
"errors"
"fmt"
"github.com/olivere/elastic/v7"
"gitlab.jxhh.com/stbz/library.git/logs"
"sync/atomic"
......@@ -11,7 +10,6 @@ import (
)
const (
docType = "_doc"
refresh = "true"
)
......@@ -28,6 +26,7 @@ func GetDoc(req GetDocReq) (docContent *elastic.GetResult,err error){
docContent, err = client.Get().Index(req.IndexName).Id(req.DocId).Do(ctx)
if err != nil {
err = errors.New("获取文档失败,错误原因:"+err.Error())
logs.Error("获取文档失败 错误原因:【%s】",err.Error())
return
}
return
......@@ -47,6 +46,7 @@ func GetCount(req GetCountReq) (count int64,err error) {
count, err = countService.Query(req.Condition).Do(ctx)
if err != nil {
err = errors.New("获取总数失败,错误原因:"+err.Error())
logs.Error("GetCount 获取总数失败:【%s】",err.Error())
return
}
return
......@@ -100,7 +100,8 @@ func checkParam(req GetListReq) (isSearchAfter bool, from int, size int) {
}
//批量处理 准备数据 1.添加 2.更新 3.删除
func Bulk(req BulkReq) (bulkIndex *BulkIndex, err error) {
func Bulk(req BulkReq) (bulkIndex BulkIndex, err error) {
bulkIndexAdd := new(elastic.BulkIndexRequest)
bulkIndexUpdate := new(elastic.BulkUpdateRequest)
bulkIndexDel := new(elastic.BulkDeleteRequest)
......@@ -114,6 +115,9 @@ func Bulk(req BulkReq) (bulkIndex *BulkIndex, err error) {
bulkIndexDel = BulkDel(req)
}
bulkIndex.BulkIndexRequest = new(elastic.BulkIndexRequest)
bulkIndex.BulkUpdateRequest = new(elastic.BulkUpdateRequest)
bulkIndex.BulkDeleteRequest = new(elastic.BulkDeleteRequest)
if bulkIndexAdd != nil {
bulkIndex.BulkIndexRequest = bulkIndexAdd
}
......@@ -128,24 +132,24 @@ func Bulk(req BulkReq) (bulkIndex *BulkIndex, err error) {
//批量处理 准备数据 1.添加
func BulkAdd(req BulkReq) (indexReq *elastic.BulkIndexRequest) {
indexReq = elastic.NewBulkIndexRequest().Index(req.IndexName).Type(docType).Id(req.DocId).Routing(req.RoutingId).Doc(req.Doc)
indexReq = elastic.NewBulkIndexRequest().Index(req.IndexName).Id(req.DocId).Routing(req.RoutingId).Doc(req.Doc)
return
}
//批量处理 准备数据 1.删除
func BulkDel(req BulkReq) (indexReq *elastic.BulkDeleteRequest) {
indexReq = elastic.NewBulkDeleteRequest().Index(req.IndexName).Type(docType).Id(req.DocId).Routing(req.RoutingId)
indexReq = elastic.NewBulkDeleteRequest().Index(req.IndexName).Id(req.DocId).Routing(req.RoutingId)
return
}
//批量处理 准备数据 1.更新
func BulkUpdate(req BulkReq) (indexReq *elastic.BulkUpdateRequest) {
indexReq = elastic.NewBulkUpdateRequest().Index(req.IndexName).Type(docType).Id(req.DocId).Routing(req.RoutingId).Doc(req.Doc)
indexReq = elastic.NewBulkUpdateRequest().Index(req.IndexName).Id(req.DocId).Routing(req.RoutingId).Doc(req.Doc)
return
}
//批量处理数据
func BulkDo(esRequest []elastic.BulkableRequest, ctx context.Context) (bulkResponse *elastic.BulkResponse, err error) {
func BulkDo(esRequest []elastic.BulkableRequest) (bulkResponse *elastic.BulkResponse, err error) {
client, err := connection()
if err != nil || client == nil {
err = errors.New("elastic连接失败"+err.Error())
......@@ -155,11 +159,15 @@ func BulkDo(esRequest []elastic.BulkableRequest, ctx context.Context) (bulkRespo
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
bulkResponse, err = client.Bulk().Add(esRequest...).Refresh(refresh).Do(ctx)
if err!=nil{
err = errors.New("BulkRestApi Bulk操作错误"+err.Error())
logs.Error("BulkRestApi Bulk操作错误:【%s】",err.Error())
}
return
}
//通过script更新文档 更新文档中某一字段
func UpdateByScript(req UpdateByScriptReq) (err error) {
func UpdateByScript(req UpdateByScriptReq) (updateResponse *elastic.UpdateResponse,err error) {
client, err := connection()
if err != nil || client == nil {
err = errors.New("elastic连接失败"+err.Error())
......@@ -168,16 +176,17 @@ func UpdateByScript(req UpdateByScriptReq) (err error) {
defer client.Stop()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = client.Update().Index("1").Id(req.DocId).Routing(req.RoutingId).Script(elastic.NewScript(req.ScriptCtx)).Do(ctx)
updateResponse, err = client.Update().Index(req.IndexName).Id(req.DocId).Routing(req.RoutingId).Script(elastic.NewScript(req.ScriptCtx)).Do(ctx)
if err != nil {
err = errors.New("script更新文档失败"+err.Error())
logs.Error("script更新文档失败:【%s】",err.Error())
return
}
return
}
//同BulkDo BulkDo http处理 BulkProcessor tcp处理 相对更安全些
func BulkProcessor(esRequest []elastic.BulkableRequest, ctx context.Context,numDocs int) (err error){
func BulkProcessor(esRequest []elastic.BulkableRequest,numDocs int) (err error){
if numDocs>30{
err = errors.New("请合理输入参数,批量处理最大为30")
return
......@@ -198,7 +207,7 @@ func BulkProcessor(esRequest []elastic.BulkableRequest, ctx context.Context,numD
beforeFn := func(executionId int64, requests []elastic.BulkableRequest) {
atomic.AddInt64(&beforeRequests, int64(len(requests)))
atomic.AddInt64(&before, 1)
logs.Info("elastic","序号:{} 开始执行 {} 条数据批量操作。", executionId,len(requests))
logs.Info("elastic","序号:{%d} 开始执行 {%d} 条数据批量操作。", executionId,len(requests))
}
afterFn := func(executionId int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
// 在每次执行BulkRequest后调用,通过此方法可以获取BulkResponse是否包含错误
......@@ -207,10 +216,11 @@ func BulkProcessor(esRequest []elastic.BulkableRequest, ctx context.Context,numD
atomic.AddInt64(&failures, 1)
}
atomic.AddInt64(&afterRequests, int64(len(requests)))
if response.Errors{
logs.Error("%s",response.Failed())
logs.Error("BulkProcessor afterFn错误 %s",response.Failed())
}else{
logs.Info("log","序号:{%s} ,执行 {%s} 条数据批量操作成功,共耗费{%d}毫秒",executionId,len(requests),response.Took)
logs.Info("log","序号:{%d} ,执行 {%d} 条数据批量操作成功,共耗费{%d}毫秒",executionId,len(requests),response.Took)
}
}
......@@ -218,28 +228,19 @@ func BulkProcessor(esRequest []elastic.BulkableRequest, ctx context.Context,numD
//每添加30个request,执行一次bulk操作
p, err := client.BulkProcessor().Name("Worker-1").Before(beforeFn).After(afterFn).Stats(true).BulkActions(numDocs).Do(ctx)
if err != nil {
fmt.Println(err)
logs.Error("BulkProcessor Bulk操作错误:【%s】",err.Error())
}
for _,v:=range esRequest{
p.Add(v)
}
err = p.Close()
err =p.Flush()
if err != nil {
fmt.Println(err)
logs.Error("BulkProcessor Flush错误:【%s】",err.Error())
}
stats := p.Stats()
err =p.Flush()
fmt.Println(stats)
fmt.Printf("Number of times flush has been invoked: %d\n", stats.Flushed)
fmt.Printf("Number of times workers committed reqs: %d\n", stats.Committed)
fmt.Printf("Number of requests indexed : %d\n", stats.Indexed)
fmt.Printf("Number of requests reported as created: %d\n", stats.Created)
fmt.Printf("Number of requests reported as updated: %d\n", stats.Updated)
fmt.Printf("Number of requests reported as success: %d\n", stats.Succeeded)
fmt.Printf("Number of requests reported as failed : %d\n", stats.Failed)
for i, w := range stats.Workers {
fmt.Printf("Worker %d: Number of requests queued: %d\n", i, w.Queued)
fmt.Printf(" Last response time : %v\n", w.LastDuration)
err = p.Close()
if err != nil {
logs.Error("BulkProcessor Close错误:【%s】",err.Error())
}
return
}
package elastic
import (
"encoding/json"
"fmt"
"github.com/olivere/elastic/v7"
"time"
)
//绑定结构体
type JdRegion struct {
Title string `json:"title"`
Id int `json:"id"`
Cover string `json:"cover"`
}
//通过文档Id获取文档数据
func testDocId() {
res,err:=GetDoc(GetDocReq{
DocId: "goods_3281484",//文档Id
IndexName: "goods_app_index_test",//索引名称
})
var jdRegion JdRegion
err =json.Unmarshal(res.Source, &jdRegion)//数据解析绑定结构体
fmt.Println(jdRegion,err)
}
//通过搜索条件获取数据总数
func testGetCount() {
//搜索条件,根据自身业务调整
q :=elastic.NewBoolQuery()
q=q.Filter(elastic.NewTermQuery("status", 1))
//通过条件获取条数
res,err:=GetCount(GetCountReq{
Condition:q, //搜索条件
IndexName: "goods_app_index_test",//索引名称
})
fmt.Println(res,err)
}
//通过搜索条件 分页 获取数据
func testGetListByFromSize() {
//搜索条件,根据自身业务调整
q :=elastic.NewBoolQuery()
q=q.Filter(elastic.NewTermQuery("status", 1))
//通过条件获取数据列表
res,err:=GetList(GetListReq{
Condition:q,//搜索条件
IndexName: "goods_app_index_test",//索引名称
Page: 1,//页码
Limit: 10,//条数
})
fmt.Println(err)
//循环列表数据
for _, hit := range res.Hits.Hits {
var jdRegion JdRegion
err := json.Unmarshal(hit.Source, &jdRegion) //数据解析绑定结构体
fmt.Println(err)
fmt.Println(jdRegion)
}
}
//通过搜索条件 滚动式查询 不可分页 获取数据
func testGetListSearchAfter() {
//搜索条件,根据自身业务调整
q :=elastic.NewBoolQuery()
q=q.Filter(elastic.NewTermQuery("status", 1))
//通过条件获取数据列表
res,err:=GetList(GetListReq{
Condition:q,//搜索条件
IndexName: "goods_app_index_test",//索引名称
SearchAfter:&SearchAfter{
SortField: struct { //滚动式查询需排序数据
Field string `json:"field"`//排序字段名称 需指定数值型字段 例如商品Id
Sort bool `json:"sort" `//正序 true 倒叙 false
}{
"id",//商品Id
true,//正序
},
Value: 0,//从商品Id大于0开始查询
},
})
//循环列表数据
for _, hit := range res.Hits.Hits {
var jdRegion JdRegion
err := json.Unmarshal(hit.Source, &jdRegion) ///数据解析绑定结构体
fmt.Println(err)
fmt.Println(jdRegion)
}
fmt.Println(err)
}
type AddChooseGoodESParam struct {
SellerId int `json:"seller_id"`
MyJoinField MyJoinField `json:"my_join_field"`
AddTimeChooseGood int64 `json:"add_time_choose_good"`
ChannelId int `json:"channel_id"`
GroupIds string `json:"groups_ids"`
}
type MyJoinField struct {
Name string `json:"name"`
Parent string `json:"parent"`
}
//restapi http方式批量处理数据 1.增加 add 2.更新 update 3,删除 delete
func testBulk() {
//初始化数据切片
esRequest := make([]elastic.BulkableRequest, 0)
//定义文档Id,路由Id,如有父子文档定义父文档Id
var i []int
i = []int{2885234, 2885240, 2885235}
for _, v := range i {
docId := fmt.Sprintf("%s_%d_%d", "seller", 1, v)
routingId := fmt.Sprintf("%s_%d", "goods", v)
parent := fmt.Sprintf("%s_%d", "goods", v)
//自定义结构体 赋值需要处理的数据
esParam := AddChooseGoodESParam{
SellerId: 1,
MyJoinField: MyJoinField{Name: "seller", Parent: parent},
AddTimeChooseGood: time.Now().UnixNano(),
ChannelId: 0,
GroupIds: "0",
}
//准备批量数据
res, err := Bulk(BulkReq{
Type: "add", //处理方式1.add 2.update 3.delete
IndexName: "goods_app_index_test", //索引名称
DocId: docId, //文档Id
RoutingId: routingId, //路由Id,指定路由更快速定位到分片,提高处理速度,
Doc: esParam, //数据,interface类型
})
fmt.Println(err, res)
//添加到切片中
esRequest = append(esRequest, res.BulkIndexRequest) //BulkIndexRequest,BulkUpdateRequest,BulkDeleteRequest
}
//操作处理添加到索引中
bulkResponse,err:=BulkDo(esRequest)
fmt.Println(err)
//结果状态 新增201为成功 修改200为成功 删除200为成功 具体其他状态请查阅文档
for _, val := range bulkResponse.Indexed() {
fmt.Println("新增成功",val.Status)
fmt.Println("修改成功",val.Status)
fmt.Println("删除成功",val.Status)
}
}
//根据script修改文档中部分字段
func testScript() {
//script为修改语句
script := fmt.Sprintf("%s'%s'", "ctx._source.group_ids = ", "0-1")
updateResponse,err :=UpdateByScript(UpdateByScriptReq{
DocId: "goods_3281484",//文档Id
RoutingId: "goods_3281484",//路由Id
ScriptCtx: script,//script语句
IndexName: "goods_app_index_test",//索引名称
})
fmt.Println(err,updateResponse)
return
}
//tcp方式批量处理数据 更安全快速 1.增加 add 2.更新 update 3,删除 delete
func testBulkProcessor() {
//初始化数据切片
esRequest := make([]elastic.BulkableRequest, 0)
var i []int
i = []int{2885234, 2885240, 2885235}
for _, v := range i {
//定义文档Id,路由Id,如有父子文档定义父文档Id
docId := fmt.Sprintf("%s_%d_%d", "seller", 2, v)
routingId := fmt.Sprintf("%s_%d", "goods", v)
parent := fmt.Sprintf("%s_%d", "goods", v)
//自定义结构体 赋值需要处理的数据
esParam := AddChooseGoodESParam{
SellerId: 1,
MyJoinField: MyJoinField{Name: "seller", Parent: parent},
AddTimeChooseGood: time.Now().UnixNano(),
ChannelId: 0,
GroupIds: "0",
}
//准备批量数据
res, err := Bulk(BulkReq{
Type: "add", //处理方式1.add 2.update 3.delete
IndexName: "goods_app_index_test", //索引名称
DocId: docId, //文档Id
RoutingId: routingId, //路由Id,指定路由更快速定位到分片,提高处理速度,
Doc: esParam, //数据,interface类型
})
fmt.Println("------", err, res)
//添加到切片中
esRequest = append(esRequest, res.BulkIndexRequest) //BulkIndexRequest,BulkUpdateRequest,BulkDeleteRequest
}
//操作处理添加到索引中 numDocs 一次批量处理的数据最多30个
err := BulkProcessor(esRequest, 3)
fmt.Println(err)
return
}
......@@ -18,6 +18,7 @@ type UpdateByScriptReq struct {
DocId string `json:"doc_id"`
RoutingId string `json:"routing_id"`
ScriptCtx string `json:"script_ctx"`
IndexName string `json:"index_name"`
}
type GetDocReq struct {
DocId string `json:"doc_id"`
......
......@@ -5,6 +5,6 @@ go 1.16
require (
github.com/gogf/gf v1.16.6
github.com/kjk/betterguid v0.0.0-20170621091430-c442874ba63a
github.com/olivere/elastic/v7 v7.0.30 // indirect
github.com/olivere/elastic/v7 v7.0.30
)
差异被折叠。
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论