提交 7c1296b1 authored 作者: 张立波's avatar 张立波

nsq

上级 3f17abb3
...@@ -15,13 +15,29 @@ type NsqConfig struct { ...@@ -15,13 +15,29 @@ type NsqConfig struct {
Handler nsq.Handler Handler nsq.Handler
} }
// ConsumerPool 消费者池,用于管理多个消费者实例
type ConsumerPool struct {
consumers []*nsq.Consumer
config *NsqConfig
}
// InitConsumer 初始化单个消费者
func InitConsumer(consumer *NsqConfig) { func InitConsumer(consumer *NsqConfig) {
cfg := nsq.NewConfig() cfg := nsq.NewConfig()
cfg.LookupdPollInterval = time.Second * 30 cfg.LookupdPollInterval = time.Second * 30
// 优化内存:限制MaxInFlight,避免过多消息堆积在内存中
if consumer.MaxInFlight > 0 { if consumer.MaxInFlight > 0 {
cfg.MaxInFlight = consumer.MaxInFlight cfg.MaxInFlight = consumer.MaxInFlight
} else {
cfg.MaxInFlight = 1 // 默认最小值,减少内存占用
} }
//设置重连时间
// 设置其他内存优化参数
cfg.DefaultRequeueDelay = time.Second * 5
cfg.MaxRequeueDelay = time.Minute
cfg.ReadTimeout = time.Second * 30
cfg.WriteTimeout = time.Second * 10
c, err := nsq.NewConsumer(consumer.Topic, consumer.Channel, cfg) // 新建一个消费者 c, err := nsq.NewConsumer(consumer.Topic, consumer.Channel, cfg) // 新建一个消费者
if logs.CheckErr(err, "InitConsumer") { if logs.CheckErr(err, "InitConsumer") {
return return
...@@ -38,28 +54,71 @@ func InitConsumer(consumer *NsqConfig) { ...@@ -38,28 +54,71 @@ func InitConsumer(consumer *NsqConfig) {
} }
} }
func InitBatchConsumer(consumer *NsqConfig, clientNum int) { // InitBatchConsumer 批量初始化消费者,优化内存使用
cfg := nsq.NewConfig() // consumerCount: 消费者实例数量
if consumer.MaxInFlight > 0 { // totalWorkers: 总并发处理数
cfg.MaxInFlight = consumer.MaxInFlight func InitBatchConsumer(consumer *NsqConfig, consumerCount int, totalWorkers int) {
if consumerCount <= 0 {
consumerCount = 1
}
if totalWorkers <= 0 {
totalWorkers = consumerCount
} }
cfg.LookupdPollInterval = time.Second * 30 //设置重连时间
c, err := nsq.NewConsumer(consumer.Topic, consumer.Channel, cfg) // 新建一个消费者
if logs.CheckErr(err, "InitConsumer") { // 计算每个Consumer的并发数
return workersPerConsumer := totalWorkers / consumerCount
if workersPerConsumer < 1 {
workersPerConsumer = 1
} }
if clientNum == 0 { // 共享基础配置,减少内存占用
clientNum = 1 baseCfg := nsq.NewConfig()
baseCfg.LookupdPollInterval = time.Second * 30
baseCfg.DefaultRequeueDelay = time.Second * 5
baseCfg.MaxRequeueDelay = time.Minute
baseCfg.ReadTimeout = time.Second * 30
baseCfg.WriteTimeout = time.Second * 10
// 根据消费者数量分配MaxInFlight,避免内存过度使用
maxInFlightPerConsumer := 1
if consumer.MaxInFlight > 0 {
maxInFlightPerConsumer = consumer.MaxInFlight / consumerCount
if maxInFlightPerConsumer < 1 {
maxInFlightPerConsumer = 1
}
} }
baseCfg.MaxInFlight = maxInFlightPerConsumer
c.ChangeMaxInFlight(clientNum) //可以根据nsqds数量来配置 pool := &ConsumerPool{
c.AddHandler(consumer.Handler) // 批量添加消费者接口 consumers: make([]*nsq.Consumer, 0, consumerCount),
config: consumer,
}
//建立NSQLookupd连接 for i := 0; i < consumerCount; i++ {
err = c.ConnectToNSQD(consumer.Addr) c, err := nsq.NewConsumer(consumer.Topic, consumer.Channel, baseCfg)
if logs.CheckErr(err, "ConnectToNSQD") { if logs.CheckErr(err, "InitBatchConsumer") {
return continue
}
// 使用并发handlers,减少Consumer实例数量的同时保持高吞吐
if workersPerConsumer > 1 {
c.AddConcurrentHandlers(consumer.Handler, workersPerConsumer)
} else {
c.AddHandler(consumer.Handler)
}
err = c.ConnectToNSQD(consumer.Addr)
if logs.CheckErr(err, "ConnectToNSQD") {
continue
}
pool.consumers = append(pool.consumers, c)
}
}
// Stop 停止所有消费者
func (p *ConsumerPool) Stop() {
for _, c := range p.consumers {
c.Stop()
} }
} }
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论