提交 523c3ce8 authored 作者: 张立波's avatar 张立波

日志

上级 d136aa40
......@@ -29,7 +29,6 @@ const (
//内部消息通知
MsgGoodsUpdateTopic = "goodsUpdate"
//消息类型
//商品消息
......@@ -69,7 +68,7 @@ const (
GoodsStorageRemove = 502 //移除选品库
//内部修改商品消息
GoodsSaleUpdate = 10000 //修1改销量
GoodsSaleUpdate = 10000 //修1改销量
ErrMsgParamEmpty = "该类型消息对应字段不能为空"
ErrMsgAppIDEmpty = "该类型消息appid字段不能为空"
......@@ -196,9 +195,12 @@ func (p *NsqProducer) NotifyMessage(notifyMessage *NotifyMessage) (err error) {
}
err = NsqProducers.Publish(notifyMsg, gjson.New(notifyMessage).MustToJsonString())
if !logs.CheckErr(err, "NotifyMessage") {
logs.Info("NotifyMessage", "消息内容:【%v】", gjson.New(notifyMessage).MustToJsonString())
if err != nil {
g.Log().Cat("NotifyMessage").Cat("error").Infof(`消息内容【%v】错误【%v】`, gjson.New(notifyMessage).MustToJsonString(), err.Error())
} else {
g.Log().Cat("NotifyMessage").Infof(`消息内容【%v】`, gjson.New(notifyMessage).MustToJsonString())
}
return
}
......@@ -209,8 +211,10 @@ func (p *NsqProducer) NotifyMessage(notifyMessage *NotifyMessage) (err error) {
func (p *NsqProducer) NotifyApiLog(notifyApiLog *NotifyApiLog) (err error) {
jsonBytes, _ := json.Marshal(notifyApiLog)
err = NsqProducers.Publish(ApiRequestTopic, string(jsonBytes))
if !logs.CheckErr(err, "NotifyApiLog") {
logs.Info("NotifyApiLog", "消息内容:【%v】", string(jsonBytes))
if err != nil {
g.Log().Cat("NotifyApiLog").Cat("error").Infof(`消息内容【%v】错误【%v】`, gjson.New(notifyApiLog).MustToJsonString(), err.Error())
} else {
g.Log().Cat("NotifyApiLog").Infof(`消息内容【%v】`, gjson.New(notifyApiLog).MustToJsonString())
}
return
}
......@@ -2,6 +2,7 @@ package notify
import (
"fmt"
"github.com/gogf/gf/frame/g"
"github.com/nsqio/go-nsq"
"gitlab.jxhh.com/stbz/library.git/logs"
"time"
......@@ -13,7 +14,7 @@ type NsqProducer struct {
var (
NsqProducers *NsqProducer
nsqConfig *NsqConfig //重试
nsqConfig *NsqConfig //重试
)
func New(config *NsqConfig) {
......@@ -21,16 +22,15 @@ func New(config *NsqConfig) {
NsqProducers = InitProducer(config)
}
func InitProducer(config *NsqConfig) *NsqProducer {
producer, err := nsq.NewProducer(config.Addr, nsq.NewConfig())
if logs.CheckErr(err,"InitProducer") {
if logs.CheckErr(err, "InitProducer") {
//panic(err)
producer.Stop()
}
err = producer.Ping()
if logs.CheckErr(err,"InitProducer") {
if logs.CheckErr(err, "InitProducer") {
producer.Stop()
}
......@@ -40,10 +40,7 @@ func InitProducer(config *NsqConfig) *NsqProducer {
}
//发布消息
func (p *NsqProducer) Publish(topic string, message string) error {
//logs.Info("topic","topic:%v,message:%v",topic,message)
var err error
func (p *NsqProducer) Publish(topic string, message string) (err error) {
defer func() {
if err != nil {
//重试连接
......@@ -51,21 +48,24 @@ func (p *NsqProducer) Publish(topic string, message string) error {
}
}()
if p.producer != nil {
if message == "" { //不能发布空串,否则会导致error
return nil
}
err = p.producer.Publish(topic, []byte(message)) // 发布消息
logs.CheckErr(err, "Nsq Publish")
return err
if err != nil {
g.Log().Cat("Producer").Cat("error").Infof(`消息内容【%v】错误【%v】`, message, err.Error())
} else {
g.Log().Cat("Producer").Infof(`消息内容【%v】`, message)
}
return
}
return fmt.Errorf("producer is nil", err)
}
//发布延迟消息
func (p *NsqProducer) DeferredPublish(topic string,delay time.Duration, message string) error {
func (p *NsqProducer) DeferredPublish(topic string, delay time.Duration, message string) error {
//logs.Infof(context.Background(),"topic",message)
......@@ -74,8 +74,12 @@ func (p *NsqProducer) DeferredPublish(topic string,delay time.Duration, message
if message == "" { //不能发布空串,否则会导致error
return nil
}
err = p.producer.DeferredPublish(topic,delay,[]byte(message)) // 发布消息
logs.CheckErr(err, "Nsq Publish")
err = p.producer.DeferredPublish(topic, delay, []byte(message)) // 发布消息
if err != nil {
g.Log().Cat("Producer").Cat("error").Infof(`消息内容【%v】错误【%v】`, message, err.Error())
} else {
g.Log().Cat("Producer").Infof(`消息内容【%v】`, message)
}
return err
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论