提交 043cca1d authored 作者: zhanglibo's avatar zhanglibo

Merge remote-tracking branch 'origin/master'

...@@ -2,7 +2,6 @@ package logs ...@@ -2,7 +2,6 @@ package logs
import ( import (
"context" "context"
"fmt"
"github.com/gogf/gf/errors/gerror" "github.com/gogf/gf/errors/gerror"
"github.com/gogf/gf/frame/g" "github.com/gogf/gf/frame/g"
) )
...@@ -36,12 +35,12 @@ func RequestLog(req RequestLogReq) { ...@@ -36,12 +35,12 @@ func RequestLog(req RequestLogReq) {
//记录info日志 //记录info日志
func Info(path string, format string, v ...interface{}) { func Info(path string, format string, v ...interface{}) {
g.Log().Async().Cat(path).Println(fmt.Sprintf(format, v...)) g.Log().Async().Cat(path).Infof(format, v...)
} }
//记录error日志 //记录error日志
func Error(format string, v ...interface{}) { func Error(format string, v ...interface{}) {
g.Log().Async().Cat("error").Println(fmt.Sprintf(format, v...)) g.Log().Async().Cat("error").Infof(format, v...)
} }
//检查错误 //检查错误
......
...@@ -40,9 +40,10 @@ func InitBatchConsumer(consumer *NsqConfig, clientNum int) { ...@@ -40,9 +40,10 @@ func InitBatchConsumer(consumer *NsqConfig, clientNum int) {
if clientNum == 0 { if clientNum == 0 {
clientNum = 1 clientNum = 1
} }
for i := 0; i < clientNum; i++ {
c.AddHandler(consumer.Handler) // 批量添加消费者接口 c.ChangeMaxInFlight(clientNum) //可以根据nsqds数量来配置
} c.AddHandler(consumer.Handler) // 批量添加消费者接口
//建立NSQLookupd连接 //建立NSQLookupd连接
err = c.ConnectToNSQD(consumer.Addr) err = c.ConnectToNSQD(consumer.Addr)
if logs.CheckErr(err, "ConnectToNSQD") { if logs.CheckErr(err, "ConnectToNSQD") {
......
...@@ -24,7 +24,6 @@ const ( ...@@ -24,7 +24,6 @@ const (
//API请求日志 //API请求日志
ApiRequestTopic = "apiRequest" ApiRequestTopic = "apiRequest"
//消息类型 //消息类型
//商品消息 //商品消息
...@@ -38,10 +37,14 @@ const ( ...@@ -38,10 +37,14 @@ const (
OrderComfirmReceiveGoods = 202 //确认收货 OrderComfirmReceiveGoods = 202 //确认收货
OrderSuccess = 203 //交易成功 OrderSuccess = 203 //交易成功
OrderCancel = 204 //订单取消 OrderCancel = 204 //订单取消
RefundApply = 300 //申请售后 OrderPay = 205 //延迟支付
RefundAgree = 301 //商家同意售后 OrderRejection = 206 //已拒收
RefundRefuse = 302 //商家拒绝售后 OrderReturn = 207 //拒收已入库
RefundSuccess = 303 //售后成功
RefundApply = 300 //申请售后
RefundAgree = 301 //商家同意售后
RefundRefuse = 302 //商家拒绝售后
RefundSuccess = 303 //售后成功
//标签消息 //标签消息
TagsImport = 401 //导入标签 TagsImport = 401 //导入标签
...@@ -52,7 +55,7 @@ const ( ...@@ -52,7 +55,7 @@ const (
ErrMsgParamEmpty = "该类型消息对应字段不能为空" ErrMsgParamEmpty = "该类型消息对应字段不能为空"
ErrMsgAppIDEmpty = "该类型消息appid字段不能为空" ErrMsgAppIDEmpty = "该类型消息appid字段不能为空"
ErrMsgTpye = "错误的消息类型" ErrMsgTpye = "错误的消息类型"
) )
//推送下游对应字段 //推送下游对应字段
...@@ -207,9 +210,8 @@ func (p *NsqProducer) NotifyServer(notifyServer *NotifyServer) (err error) { ...@@ -207,9 +210,8 @@ func (p *NsqProducer) NotifyServer(notifyServer *NotifyServer) (err error) {
return return
} }
/* /*
api请求日志 20220310 gk api请求日志 20220310 gk
*/ */
func (p *NsqProducer) NotifyApiLog(notifyApiLog *NotifyApiLog) (err error) { func (p *NsqProducer) NotifyApiLog(notifyApiLog *NotifyApiLog) (err error) {
...@@ -219,4 +221,4 @@ func (p *NsqProducer) NotifyApiLog(notifyApiLog *NotifyApiLog) (err error) { ...@@ -219,4 +221,4 @@ func (p *NsqProducer) NotifyApiLog(notifyApiLog *NotifyApiLog) (err error) {
logs.Info("NotifyApiLog", "消息内容:【%v】", string(jsonBytes)) logs.Info("NotifyApiLog", "消息内容:【%v】", string(jsonBytes))
} }
return return
} }
\ No newline at end of file
...@@ -11,9 +11,13 @@ type NsqProducer struct { ...@@ -11,9 +11,13 @@ type NsqProducer struct {
producer *nsq.Producer producer *nsq.Producer
} }
var NsqProducers *NsqProducer var (
NsqProducers *NsqProducer
nsqConfig *NsqConfig //重试
)
func New(config *NsqConfig) { func New(config *NsqConfig) {
nsqConfig = config
NsqProducers = InitProducer(config) NsqProducers = InitProducer(config)
} }
...@@ -38,9 +42,16 @@ func InitProducer(config *NsqConfig) *NsqProducer { ...@@ -38,9 +42,16 @@ func InitProducer(config *NsqConfig) *NsqProducer {
//发布消息 //发布消息
func (p *NsqProducer) Publish(topic string, message string) error { func (p *NsqProducer) Publish(topic string, message string) error {
logs.Info("topic","topic:%v,message:%v",topic,message) //logs.Info("topic","topic:%v,message:%v",topic,message)
var err error var err error
defer func() {
if err != nil {
//重试连接
InitProducer(nsqConfig)
}
}()
if p.producer != nil { if p.producer != nil {
if message == "" { //不能发布空串,否则会导致error if message == "" { //不能发布空串,否则会导致error
return nil return nil
...@@ -56,7 +67,7 @@ func (p *NsqProducer) Publish(topic string, message string) error { ...@@ -56,7 +67,7 @@ func (p *NsqProducer) Publish(topic string, message string) error {
//发布延迟消息 //发布延迟消息
func (p *NsqProducer) DeferredPublish(topic string,delay time.Duration, message string) error { func (p *NsqProducer) DeferredPublish(topic string,delay time.Duration, message string) error {
logs.Info("topic",message) //logs.Infof(context.Background(),"topic",message)
var err error var err error
if p.producer != nil { if p.producer != nil {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论