提交 408cac8c authored 作者: gukai's avatar gukai

新增nsq、upstream

上级 f06524a7
package nsq
import (
"github.com/nsqio/go-nsq"
"gitlab.jxhh.com/stbz/library.git/logs"
"time"
)
type NsqConfig struct {
Topic string
Channel string
Addr string
Handler nsq.Handler
}
func InitConsumer(consumer *NsqConfig) {
cfg := nsq.NewConfig()
cfg.LookupdPollInterval = time.Second * 30 //设置重连时间
c, err := nsq.NewConsumer(consumer.Topic, consumer.Channel, cfg) // 新建一个消费者
if logs.CheckErr(err, "InitConsumer") {
return
}
c.AddHandler(consumer.Handler) // 添加消费者接口
//建立NSQLookupd连接
err = c.ConnectToNSQD(consumer.Addr)
if logs.CheckErr(err, "ConnectToNSQD") {
return
}
}
package nsq
type CommonReq struct {
MsgType int `json:"msgType"`
MsgData string `json:"msgData"`
Source int `json:"source"`
MsgSendTime int64 `json:"msgSendTime"`
}
type CommonReply struct {
Success interface{} `json:"success"`
Msg string `json:"msg"`
Data interface{} `json:"data"`
Code int `json:"code"`
}
type NotifyReqData struct {
RealSource int `json:"real_source"`
Source int `json:"source"`
Type int `json:"type"`
CreatedTime int `json:"created_time"`
Result string `json:"result"`
}
type NotifyReq struct {
Id int `json:"id"`
Type string `json:"type"`
Data NotifyReqData `json:"data"`
}
type NotifyRes struct {
Code int `json:"code"`
Msg string `json:"msg"`
}
type NotifyResNew struct {
Msg string `json:"message"`
}
type NotifyRefundData struct {
AfsServiceId string `json:"afsServiceId"`
State int `json:"state"`
AfterSrvStatus string `json:"after_srv_status"`
}
package nsq
import (
"fmt"
"github.com/nsqio/go-nsq"
"gitlab.jxhh.com/stbz/library.git/logs"
)
type NsqProducer struct {
producer *nsq.Producer
}
var NsqProducers *NsqProducer
func New(config *NsqConfig) {
NsqProducers = InitProducer(config)
}
func InitProducer(config *NsqConfig) *NsqProducer {
producer, err := nsq.NewProducer(config.Addr, nsq.NewConfig())
if logs.CheckErr(err,"InitProducer") {
//panic(err)
producer.Stop()
}
err = producer.Ping()
if logs.CheckErr(err,"InitProducer") {
producer.Stop()
}
return &NsqProducer{
producer: producer,
}
}
//发布消息
func (p *NsqProducer) Publish(topic string, message string) error {
logs.Info("topic",message)
var err error
if p.producer != nil {
if message == "" { //不能发布空串,否则会导致error
return nil
}
err = p.producer.Publish(topic, []byte(message)) // 发布消息
logs.CheckErr(err, "Nsq Publish")
return err
}
return fmt.Errorf("producer is nil", err)
}
package upstream
import (
"errors"
)
type AliMsgType string
type AliRefundStatus string
const (
//产品下架
AliProductExpire AliMsgType = "PRODUCT_RELATION_VIEW_PRODUCT_EXPIRE"
// 产品删除
AliProductExpire2 AliMsgType = "PRODUCT_RELATION_VIEW_PRODUCT_DELETE"
// 产品审核下架
AliProductExpire3 AliMsgType = "PRODUCT_RELATION_VIEW_PRODUCT_AUDIT"
// 产品新增或修改
AliProductModify AliMsgType = "PRODUCT_RELATION_VIEW_PRODUCT_NEW_OR_MODIFY"
// 产品上架
AliProductRepost AliMsgType = "PRODUCT_RELATION_VIEW_PRODUCT_REPOST"
// 商品池&超买价变更消息
AliProductPrice AliMsgType = "PRODUCT_RELATION_VIEW_EXIT_SUPERBUYER"
// 订单发货
AliOrderSendGoods AliMsgType = "ORDER_BUYER_VIEW_ANNOUNCE_SENDGOODS"
// 订单确认收货
AliOrderComfirm AliMsgType = "ORDER_BUYER_VIEW_ORDER_COMFIRM_RECEIVEGOODS"
// 售中退款
AliOrderRefund AliMsgType = "ORDER_BUYER_VIEW_ORDER_BUYER_REFUND_IN_SALES"
// 售后退款
AliOrderRefund2 AliMsgType = "ORDER_BUYER_VIEW_ORDER_REFUND_AFTER_SALES"
// AliAfterSaleStateAgree AliAfterSaleStateAgree
AliAfterSaleStateAgree AliRefundStatus = "SELLER_AGREE_REFUND_PROCOTOL"
// AliAfterSaleStateAgree2 AliAfterSaleStateAgree2
AliAfterSaleStateAgree2 AliRefundStatus = "SYSTEM_AGREE_REFUND_PROTOCOL"
// AliAfterSaleStateRefuse AliAfterSaleStateRefuse
AliAfterSaleStateRefuse AliRefundStatus = "SELLER_REJECT_REFUND"
// AliAfterSaleStateRefuse2 AliAfterSaleStateRefuse2
AliAfterSaleStateRefuse2 AliRefundStatus = "SELLER_REJECT_REFUND_PROCOTOL"
// AliAfterSaleStateSuccess AliAfterSaleStateSuccess
AliAfterSaleStateSuccess AliRefundStatus = "SELLER_AGREE_REFUND"
// AliAfterSaleStateSuccess2 AliAfterSaleStateSuccess2
AliAfterSaleStateSuccess2 AliRefundStatus = "SYSTEM_AGREE_REFUND"
)
// ToType ToType
func (c AliMsgType) ToType() (int, error) {
switch c {
case AliProductExpire, AliProductExpire2,AliProductExpire3:
return ProductExpire, nil
case AliProductModify:
return ProductModify, nil
case AliProductRepost:
return ProductRepost, nil
case AliProductPrice:
return ProductPrice, nil
case AliOrderSendGoods:
return OrderSendgoods, nil
case AliOrderComfirm:
return OrderComfirmReceivegoods, nil
case AliOrderRefund, AliOrderRefund2:
return RefundApply, nil
default:
return 0, errors.New("not allow type")
}
}
// ToStatus ToStatus
func (c AliRefundStatus) ToStatus() (int, error) {
switch c {
case AliAfterSaleStateAgree, AliAfterSaleStateAgree2:
return RefundAgree, nil
case AliAfterSaleStateRefuse, AliAfterSaleStateRefuse2:
return RefundRefuse, nil
case AliAfterSaleStateSuccess, AliAfterSaleStateSuccess2:
return RefundSuccess, nil
default:
return 0, errors.New("not allow type")
}
}
//"refundsuccess" == Sockedata.Data.CurrentStatus && Sockedata.Data.RefundAction == "SYSTEM_AGREE_REFUND_PROTOCOL"
// IsSuccess IsSuccess
func (c AliRefundStatus) IsSuccess() bool {
return c == AliAfterSaleStateSuccess || c == AliAfterSaleStateSuccess2 || c == AliAfterSaleStateAgree2
}
package upstream
import (
"errors"
)
type CloudMsgType int
const (
//产品下架
CloudProductExpire CloudMsgType = 102
// 产品新增或修改
CloudProductModify CloudMsgType = 104
// 产品上架
CloudProductRepost CloudMsgType = 101
// 商品池&超买价变更消息
CloudProductPrice CloudMsgType = 103
//发货
CloudOrderSendGoods CloudMsgType = 201
//确认收货
CloudOrderReceive CloudMsgType = 202
// 售后同意
CloudRefundAgree CloudMsgType = 301
// 售后拒绝
CloudRefundRefuse CloudMsgType = 302
//售后成功
CloudRefundSuccess CloudMsgType = 303
//售后关闭
CloudRefundClose CloudMsgType = 304
)
// ToType ToType
func (c CloudMsgType) ToType() (int, error) {
switch c {
case CloudProductExpire:
return ProductExpire, nil
case CloudProductModify:
return ProductModify, nil
case CloudProductRepost:
return ProductRepost, nil
case CloudProductPrice:
return ProductPrice, nil
case CloudOrderSendGoods:
return OrderSendgoods, nil
case CloudOrderReceive:
return OrderComfirmReceivegoods, nil
case CloudRefundAgree:
return RefundAgree, nil
case CloudRefundRefuse:
return RefundApply, nil
case CloudRefundSuccess:
return RefundSuccess, nil
default:
return 0, errors.New("not allow type")
}
}
package upstream
//推送类型 101 商品下架 102 商品修改 103 价格变更 104 商品上架 201 订单发货 202 确认收货 301 同意售后退款 302 拒绝售后退款 303 售后退款成功',
const (
ProductExpire = 101
ProductModify = 102
ProductPrice = 103
ProductRepost = 104
OrderSendgoods = 201
OrderComfirmReceivegoods = 202
OrderSuccess = 203
RefundApply = 300
RefundAgree = 301
RefundRefuse = 302
RefundSuccess = 303
)
const (
jd = 2
ali = 6
tm = 7
cloud = 1
suning = 8
)
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论