当客户端获取消息时,会触发memoryMsgChan事件,在这个事件中服务器会把消息放入inFlightMessages中(关键一步)。
github.com/nsqio/nsq/nsqd/protocol_v2.go
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
for {
......
select {
......
case msg := <-memoryMsgChan:
......
//消息担保的关键一步
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
......
}
}
......
}
github.com/nsqio/nsq/nsqd/channel.go #409
func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {
now := time.Now()
msg.clientID = clientID
msg.deliveryTS = now
## 默认 60s
msg.pri = now.Add(timeout).UnixNano()
// 以messageId为下标把数据放到 inFlightMessages内存里
err := c.pushInFlightMessage(msg)
if err != nil {
return err
}
// 为msg增加一个索引,把消息放入 inFlightPqueue,并以msg.pri 从小到大排序。
c.addToInFlightPQ(msg)
return nil
}
服务端把消息发送给客户端,等待客户端应答:
1:如果客户端返回“FIN”说明客户端已正常接受消息,服务端根据客户端返回的MessageId删除inFlightMessages的数据。
2:如果客户端返回“REQ”说明客户端接收异常,服务器根据返回的MessageId,把数据从inFlightMessages删除,并重新写回memoryMsgChan。
github.com/nsqio/nsq/nsqd/channel.go
func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Duration) error {
// remove from inflight first
msg, err := c.popInFlightMessage(clientID, id)
if err != nil {
return err
}
c.removeFromInFlightPQ(msg)
atomic.AddUint64(&c.requeueCount, 1)
if timeout == 0 {
c.exitMutex.RLock()
if c.Exiting() {
c.exitMutex.RUnlock()
return errors.New("exiting")
}
err := c.put(msg)
c.exitMutex.RUnlock()
return err
}
// deferred requeue
return c.StartDeferredTimeout(msg, timeout)
}
3:如果因网络问题没有收到客户端应答,queueScanLoop 将开始工作(运行nsqd时启动的服务)。在queueScanLoop中有一个定时事件refreshTicker.C,每5秒执行一次,从inFlightPqueue获取超时消息,并把它放回 channel的 memoryMsgChan里。这条消息将会被重新推送给客户端。
func (n *NSQD) queueScanLoop() {
......
// 5秒
refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)
channels := n.channels()
n.resizePool(len(channels), workCh, responseCh, closeCh)
for {
select {
......
case <-refreshTicker.C:
channels = n.channels()
n.resizePool(len(channels), workCh, responseCh, closeCh)
continue
case <-n.exitChan:
goto exit
}
// 每5秒最多只处理4个channel
num := n.getOpts().QueueScanSelectionCount
if num > len(channels) {
num = len(channels)
}
loop:
for _, i := range util.UniqRands(num, len(channels)) {
workCh <- channels[i]
}
......
}
......
}
github.com/nsqio/nsq/nsqd/channel.go #544
func (c *Channel) processInFlightQueue(t int64) bool {
......
dirty := false
for {
c.inFlightMutex.Lock()
// 根据当前时间戳,从inFlightPqueue获取已超时的msg
msg, _ := c.inFlightPQ.PeekAndShift(t)
c.inFlightMutex.Unlock()
if msg == nil {
goto exit
}
dirty = true
// 根据msg.Id从inFlightMessages弹出消息
_, err := c.popInFlightMessage(msg.clientID, msg.ID)
if err != nil {
goto exit
}
atomic.AddUint64(&c.timeoutCount, 1)
c.RLock()
client, ok := c.clients[msg.clientID]
c.RUnlock()
if ok {
client.TimedOutMessage()
}
// 将消息重新放入 channel的 memoryMsgChan 里
c.put(msg)
}
......
}