github.com/nsqio/nsq/nsqd/topic.go #215
func (t *Topic) messagePump() {
......
for {
select {
case msg = <-memoryMsgChan:
......
case <-t.exitChan:
goto exit
}
for i, channel := range chans {
chanMsg := msg
// copy the message because each channel
// needs a unique instance but...
// fastpath to avoid copy if its the first channel
// (the topic already created the first copy)
if i > 0 {
chanMsg = NewMessage(msg.ID, msg.Body)
chanMsg.Timestamp = msg.Timestamp
chanMsg.deferred = msg.deferred
}
if chanMsg.deferred != 0 {
channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
continue
}
err := channel.PutMessage(chanMsg)
......
}
}
......
}
github.com/nsqio/nsq/nsqd/topic.go #44
func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
t := &Topic{
......
}
......
t.waitGroup.Wrap(func() { t.messagePump() })
t.ctx.nsqd.Notify(t)
return t
}