提交 c8d92aa2 authored 作者: limeng's avatar limeng

整理代码

上级 86ff6ece
......@@ -16,37 +16,40 @@ const (
)
//根据文档Id 快速获取内容
func GetDoc(req GetDocReq) {
func GetDoc(req GetDocReq) (docContent *elastic.GetResult,err error){
client, err := connection()
if err != nil || client == nil {
err = errors.New("elastic连接失败")
err = errors.New("elastic连接失败:"+err.Error())
return
}
defer client.Stop()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
docContent, err := client.Get().Index(req.IndexName).Id(req.DocId).Do(ctx)
fmt.Println(docContent)
docContent, err = client.Get().Index(req.IndexName).Id(req.DocId).Do(ctx)
if err != nil {
err = errors.New("获取文档失败,错误原因:"+err.Error())
return
}
return
}
//根据搜索条件 获取总数
func GetCount(req GetCountReq) (count int64) {
func GetCount(req GetCountReq) (count int64,err error) {
client, err := connection()
if err != nil || client == nil {
err = errors.New("elastic连接失败")
err = errors.New("elastic连接失败"+err.Error())
return
}
defer client.Stop()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
countService := client.Count("index_name").Pretty(true)
countService := client.Count(req.IndexName).Pretty(true)
count, err = countService.Query(req.Condition).Do(ctx)
if err != nil {
err = errors.New("获取总数失败,错误原因:"+err.Error())
return
}
return count
return
}
//根据搜索条件 获取数据
......@@ -56,14 +59,14 @@ func GetList(req GetListReq) (searchResult *elastic.SearchResult) {
//连接Es
client, err := connection()
if err != nil || client == nil {
err = errors.New("elastic连接失败")
err = errors.New("elastic连接失败"+err.Error())
return
}
defer client.Stop()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
//准备查询条件
searchService := client.Search().Index("index_name").Pretty(true).Query(req.Condition)
searchService := client.Search().Index(req.IndexName).Pretty(true).Query(req.Condition)
if isSearchAfter == false {
//分页查询Es
......@@ -101,7 +104,7 @@ func Bulk(req BulkReq) (bulkIndex *BulkIndex, err error) {
bulkIndexAdd := new(elastic.BulkIndexRequest)
bulkIndexUpdate := new(elastic.BulkUpdateRequest)
bulkIndexDel := new(elastic.BulkDeleteRequest)
fmt.Println(bulkIndexUpdate, bulkIndexDel)
switch req.Type {
case "add":
bulkIndexAdd = BulkAdd(req.IndexName)
......@@ -145,7 +148,7 @@ func BulkUpdate(indexName string) (indexReq *elastic.BulkUpdateRequest) {
func BulkDo(esRequest []elastic.BulkableRequest, ctx context.Context) (bulkResponse *elastic.BulkResponse, err error) {
client, err := connection()
if err != nil || client == nil {
err = errors.New("elastic连接失败")
err = errors.New("elastic连接失败"+err.Error())
return
}
defer client.Stop()
......@@ -159,7 +162,7 @@ func BulkDo(esRequest []elastic.BulkableRequest, ctx context.Context) (bulkRespo
func UpdateByScript(req UpdateByScriptReq) (err error) {
client, err := connection()
if err != nil || client == nil {
err = errors.New("elastic连接失败")
err = errors.New("elastic连接失败"+err.Error())
return
}
defer client.Stop()
......@@ -167,16 +170,21 @@ func UpdateByScript(req UpdateByScriptReq) (err error) {
defer cancel()
_, err = client.Update().Index("1").Id(req.DocId).Routing(req.RoutingId).Script(elastic.NewScript(req.ScriptCtx)).Do(ctx)
if err != nil {
err = errors.New("script更新文档失败"+err.Error())
return
}
return
}
//同BulkDo BulkDo http处理 BulkProcessor tcp处理 相对更安全些
func BulkProcessor(esRequest []elastic.BulkableRequest, ctx context.Context,numDocs int) {
func BulkProcessor(esRequest []elastic.BulkableRequest, ctx context.Context,numDocs int) (err error){
if numDocs>30{
err = errors.New("请合理输入参数,批量处理最大为30")
return
}
client, err := connection()
if err != nil || client == nil {
err = errors.New("elastic连接失败")
err = errors.New("elastic连接失败"+err.Error())
return
}
defer client.Stop()
......@@ -200,14 +208,15 @@ func BulkProcessor(esRequest []elastic.BulkableRequest, ctx context.Context,numD
}
atomic.AddInt64(&afterRequests, int64(len(requests)))
if response.Errors{
logs.Error("1")
logs.Error("%s",response.Failed())
}else{
logs.Info("log","序号:{} ,执行 {} 条数据批量操作成功,共耗费{}毫秒",executionId,len(requests),response.Took)
logs.Info("log","序号:{%s} ,执行 {%s} 条数据批量操作成功,共耗费{%d}毫秒",executionId,len(requests),response.Took)
}
}
//每添加30个request,执行一次bulk操作
p, err := client.BulkProcessor().Name("Worker-1").Before(beforeFn).After(afterFn).Stats(true).BulkActions(30).Do(ctx)
p, err := client.BulkProcessor().Name("Worker-1").Before(beforeFn).After(afterFn).Stats(true).BulkActions(numDocs).Do(ctx)
if err != nil {
fmt.Println(err)
}
......@@ -232,5 +241,5 @@ func BulkProcessor(esRequest []elastic.BulkableRequest, ctx context.Context,numD
fmt.Printf("Worker %d: Number of requests queued: %d\n", i, w.Queued)
fmt.Printf(" Last response time : %v\n", w.LastDuration)
}
return
}
......@@ -22,12 +22,14 @@ type GetDocReq struct {
}
type GetCountReq struct {
Condition *elastic.BoolQuery
IndexName string `json:"index_name"`
}
type GetListReq struct {
Condition *elastic.BoolQuery
Page int `json:"page"`
Limit int `json:"limit"`
SearchAfter *SearchAfter `json:"search_after"`
IndexName string `json:"index_name"`
}
type SearchAfter struct {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论