提交 e28bc12d authored 作者: 张立波's avatar 张立波

nsq

上级 7c1296b1
...@@ -23,22 +23,25 @@ type ConsumerPool struct { ...@@ -23,22 +23,25 @@ type ConsumerPool struct {
// InitConsumer 初始化单个消费者 // InitConsumer 初始化单个消费者
func InitConsumer(consumer *NsqConfig) { func InitConsumer(consumer *NsqConfig) {
cfg := nsq.NewConfig() config := nsq.NewConfig()
cfg.LookupdPollInterval = time.Second * 30 config.HeartbeatInterval = 5 * time.Second
config.MsgTimeout = 30 * time.Second
config.DialTimeout = 5 * time.Second
config.LookupdPollInterval = time.Second * 30
// 优化内存:限制MaxInFlight,避免过多消息堆积在内存中 // 优化内存:限制MaxInFlight,避免过多消息堆积在内存中
if consumer.MaxInFlight > 0 { if consumer.MaxInFlight > 0 {
cfg.MaxInFlight = consumer.MaxInFlight config.MaxInFlight = consumer.MaxInFlight
} else { } else {
cfg.MaxInFlight = 1 // 默认最小值,减少内存占用 config.MaxInFlight = 1 // 默认最小值,减少内存占用
} }
// 设置其他内存优化参数 // 设置其他内存优化参数
cfg.DefaultRequeueDelay = time.Second * 5 config.DefaultRequeueDelay = time.Second * 5
cfg.MaxRequeueDelay = time.Minute config.MaxRequeueDelay = time.Minute
cfg.ReadTimeout = time.Second * 30 config.ReadTimeout = time.Second * 30
cfg.WriteTimeout = time.Second * 10 config.WriteTimeout = time.Second * 5
c, err := nsq.NewConsumer(consumer.Topic, consumer.Channel, cfg) // 新建一个消费者 c, err := nsq.NewConsumer(consumer.Topic, consumer.Channel, config) // 新建一个消费者
if logs.CheckErr(err, "InitConsumer") { if logs.CheckErr(err, "InitConsumer") {
return return
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论