提交 86ff6ece authored 作者: limeng's avatar limeng

es

上级 a79433b5
package elastic
import (
"github.com/gogf/gf/frame/g"
"github.com/olivere/elastic/v7"
"gitlab.jxhh.com/stbz/library.git/logs"
"log"
"os"
)
func connection() (search *elastic.Client,err error){
opts := []elastic.ClientOptionFunc{
elastic.SetURL("http://" + g.Cfg().Get("Es.Url").(string)),
elastic.SetSniff(false),
elastic.SetBasicAuth(g.Cfg().Get("Es.UserName").(string), g.Cfg().Get("Es.Password").(string)),
elastic.SetInfoLog(log.New(os.Stdout,"",log.LstdFlags)),
elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)),
}
client, err := elastic.NewClient(opts...)
if err != nil {
logs.Error("Es 链接失败 error:【%v】",err)
}
return client,err
}
package elastic
import (
"context"
"errors"
"fmt"
"github.com/olivere/elastic/v7"
"gitlab.jxhh.com/stbz/library.git/logs"
"sync/atomic"
"time"
)
const (
docType = "_doc"
refresh = "true"
)
//根据文档Id 快速获取内容
func GetDoc(req GetDocReq) {
client, err := connection()
if err != nil || client == nil {
err = errors.New("elastic连接失败")
return
}
defer client.Stop()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
docContent, err := client.Get().Index(req.IndexName).Id(req.DocId).Do(ctx)
fmt.Println(docContent)
return
}
//根据搜索条件 获取总数
func GetCount(req GetCountReq) (count int64) {
client, err := connection()
if err != nil || client == nil {
err = errors.New("elastic连接失败")
return
}
defer client.Stop()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
countService := client.Count("index_name").Pretty(true)
count, err = countService.Query(req.Condition).Do(ctx)
if err != nil {
return
}
return count
}
//根据搜索条件 获取数据
func GetList(req GetListReq) (searchResult *elastic.SearchResult) {
//检测是from_size查询还是search_after滚动式查询
isSearchAfter, from, size := checkParam(req)
//连接Es
client, err := connection()
if err != nil || client == nil {
err = errors.New("elastic连接失败")
return
}
defer client.Stop()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
//准备查询条件
searchService := client.Search().Index("index_name").Pretty(true).Query(req.Condition)
if isSearchAfter == false {
//分页查询Es
searchResult, err = searchService.From(from).Size(size).Do(ctx)
}
if isSearchAfter == true {
//滚动式查询Es
searchResult, err = searchService.
SearchAfter(req.SearchAfter.Value).
From(from).Size(size).
Sort(req.SearchAfter.SortField.Field, req.SearchAfter.SortField.Sort).
Do(ctx)
}
return searchResult
}
//检验参数 区分 分页查询 滚动式查询
func checkParam(req GetListReq) (isSearchAfter bool, from int, size int) {
if req.Page != 0 && req.Limit != 0 {
from = (req.Page - 1) * req.Limit
size = req.Limit
}
if req.SearchAfter != nil {
from = 0
size = 50
isSearchAfter = true
}
return
}
//批量处理 准备数据 1.添加 2.更新 3.删除
func Bulk(req BulkReq) (bulkIndex *BulkIndex, err error) {
bulkIndexAdd := new(elastic.BulkIndexRequest)
bulkIndexUpdate := new(elastic.BulkUpdateRequest)
bulkIndexDel := new(elastic.BulkDeleteRequest)
fmt.Println(bulkIndexUpdate, bulkIndexDel)
switch req.Type {
case "add":
bulkIndexAdd = BulkAdd(req.IndexName)
case "update":
bulkIndexUpdate = BulkUpdate(req.IndexName)
case "del":
bulkIndexDel = BulkDel(req.IndexName)
}
if bulkIndexAdd != nil {
bulkIndex.BulkIndexRequest = bulkIndexAdd
}
if bulkIndexUpdate != nil {
bulkIndex.BulkUpdateRequest = bulkIndexUpdate
}
if bulkIndexDel != nil {
bulkIndex.BulkDeleteRequest = bulkIndexDel
}
return
}
//批量处理 准备数据 1.添加
func BulkAdd(indexName string) (indexReq *elastic.BulkIndexRequest) {
indexReq = elastic.NewBulkIndexRequest().Index(indexName).Type(docType)
return
}
//批量处理 准备数据 1.删除
func BulkDel(indexName string) (indexReq *elastic.BulkDeleteRequest) {
indexReq = elastic.NewBulkDeleteRequest().Index(indexName).Type(docType)
return
}
//批量处理 准备数据 1.更新
func BulkUpdate(indexName string) (indexReq *elastic.BulkUpdateRequest) {
indexReq = elastic.NewBulkUpdateRequest().Index(indexName).Type(docType)
return
}
//批量处理数据
func BulkDo(esRequest []elastic.BulkableRequest, ctx context.Context) (bulkResponse *elastic.BulkResponse, err error) {
client, err := connection()
if err != nil || client == nil {
err = errors.New("elastic连接失败")
return
}
defer client.Stop()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
bulkResponse, err = client.Bulk().Add(esRequest...).Refresh(refresh).Do(ctx)
return
}
//通过script更新文档 更新文档中某一字段
func UpdateByScript(req UpdateByScriptReq) (err error) {
client, err := connection()
if err != nil || client == nil {
err = errors.New("elastic连接失败")
return
}
defer client.Stop()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = client.Update().Index("1").Id(req.DocId).Routing(req.RoutingId).Script(elastic.NewScript(req.ScriptCtx)).Do(ctx)
if err != nil {
return
}
return
}
//同BulkDo BulkDo http处理 BulkProcessor tcp处理 相对更安全些
func BulkProcessor(esRequest []elastic.BulkableRequest, ctx context.Context,numDocs int) {
client, err := connection()
if err != nil || client == nil {
err = errors.New("elastic连接失败")
return
}
defer client.Stop()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
var beforeRequests int64
var before int64
var afters int64
var failures int64
var afterRequests int64
beforeFn := func(executionId int64, requests []elastic.BulkableRequest) {
atomic.AddInt64(&beforeRequests, int64(len(requests)))
atomic.AddInt64(&before, 1)
logs.Info("elastic","序号:{} 开始执行 {} 条数据批量操作。", executionId,len(requests))
}
afterFn := func(executionId int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
// 在每次执行BulkRequest后调用,通过此方法可以获取BulkResponse是否包含错误
atomic.AddInt64(&afters, 1)
if err != nil {
atomic.AddInt64(&failures, 1)
}
atomic.AddInt64(&afterRequests, int64(len(requests)))
if response.Errors{
logs.Error("1")
}else{
logs.Info("log","序号:{} ,执行 {} 条数据批量操作成功,共耗费{}毫秒",executionId,len(requests),response.Took)
}
}
//每添加30个request,执行一次bulk操作
p, err := client.BulkProcessor().Name("Worker-1").Before(beforeFn).After(afterFn).Stats(true).BulkActions(30).Do(ctx)
if err != nil {
fmt.Println(err)
}
for _,v:=range esRequest{
p.Add(v)
}
err = p.Close()
if err != nil {
fmt.Println(err)
}
stats := p.Stats()
err =p.Flush()
fmt.Println(stats)
fmt.Printf("Number of times flush has been invoked: %d\n", stats.Flushed)
fmt.Printf("Number of times workers committed reqs: %d\n", stats.Committed)
fmt.Printf("Number of requests indexed : %d\n", stats.Indexed)
fmt.Printf("Number of requests reported as created: %d\n", stats.Created)
fmt.Printf("Number of requests reported as updated: %d\n", stats.Updated)
fmt.Printf("Number of requests reported as success: %d\n", stats.Succeeded)
fmt.Printf("Number of requests reported as failed : %d\n", stats.Failed)
for i, w := range stats.Workers {
fmt.Printf("Worker %d: Number of requests queued: %d\n", i, w.Queued)
fmt.Printf(" Last response time : %v\n", w.LastDuration)
}
}
package elastic
import "github.com/olivere/elastic/v7"
type BulkReq struct {
Type string `json:"type"`
IndexName string `json:"index_name"`
}
type BulkIndex struct {
*elastic.BulkIndexRequest
*elastic.BulkDeleteRequest
*elastic.BulkUpdateRequest
}
type UpdateByScriptReq struct {
DocId string `json:"doc_id"`
RoutingId string `json:"routing_id"`
ScriptCtx string `json:"script_ctx"`
}
type GetDocReq struct {
DocId string `json:"doc_id"`
IndexName string `json:"index_name"`
}
type GetCountReq struct {
Condition *elastic.BoolQuery
}
type GetListReq struct {
Condition *elastic.BoolQuery
Page int `json:"page"`
Limit int `json:"limit"`
SearchAfter *SearchAfter `json:"search_after"`
}
type SearchAfter struct {
SortField struct{
Field string `json:"field"`
Sort bool `json:"sort" `
} `json:"sort_field"`
Value int `json:"value"`
}
......@@ -5,4 +5,6 @@ go 1.16
require (
github.com/gogf/gf v1.16.6
github.com/kjk/betterguid v0.0.0-20170621091430-c442874ba63a
github.com/olivere/elastic/v7 v7.0.30 // indirect
)
差异被折叠。
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论