note.wcoder.com
wcoder GitHub

Table of Contents

保证消息交付至少一次

当客户端获取消息时,会触发memoryMsgChan事件,在这个事件中服务器会把消息放入inFlightMessages中(关键一步)。

github.com/nsqio/nsq/nsqd/protocol_v2.go

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
    for {
        ......
    	select {
        ......
    	case msg := <-memoryMsgChan:
            ......
            //消息担保的关键一步
            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
            ......
    	}
    }
    ......
}

github.com/nsqio/nsq/nsqd/channel.go #409

func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {
    now := time.Now()
    msg.clientID = clientID
    msg.deliveryTS = now
    ## 默认 60s
    msg.pri = now.Add(timeout).UnixNano()
    // 以messageId为下标把数据放到 inFlightMessages内存里
    err := c.pushInFlightMessage(msg)
    if err != nil {
    	return err
    }
    // 为msg增加一个索引,把消息放入 inFlightPqueue,并以msg.pri 从小到大排序。 
    c.addToInFlightPQ(msg)
    return nil
}

服务端把消息发送给客户端,等待客户端应答:

1:如果客户端返回“FIN”说明客户端已正常接受消息,服务端根据客户端返回的MessageId删除inFlightMessages的数据。

2:如果客户端返回“REQ”说明客户端接收异常,服务器根据返回的MessageId,把数据从inFlightMessages删除,并重新写回memoryMsgChan。

github.com/nsqio/nsq/nsqd/channel.go

func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Duration) error {
	// remove from inflight first
	msg, err := c.popInFlightMessage(clientID, id)
	if err != nil {
		return err
	}
	c.removeFromInFlightPQ(msg)
	atomic.AddUint64(&c.requeueCount, 1)

	if timeout == 0 {
		c.exitMutex.RLock()
		if c.Exiting() {
			c.exitMutex.RUnlock()
			return errors.New("exiting")
		}
		err := c.put(msg)
		c.exitMutex.RUnlock()
		return err
	}

	// deferred requeue
	return c.StartDeferredTimeout(msg, timeout)
}

3:如果因网络问题没有收到客户端应答,queueScanLoop 将开始工作(运行nsqd时启动的服务)。在queueScanLoop中有一个定时事件refreshTicker.C,每5秒执行一次,从inFlightPqueue获取超时消息,并把它放回 channel的 memoryMsgChan里。这条消息将会被重新推送给客户端。

func (n *NSQD) queueScanLoop() {
    ......
    // 5秒
    refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)
    
    channels := n.channels()
    n.resizePool(len(channels), workCh, responseCh, closeCh)

    for {
    	select {
        ......
    	case <-refreshTicker.C:
    		channels = n.channels()
    		n.resizePool(len(channels), workCh, responseCh, closeCh)
    		continue
    	case <-n.exitChan:
    		goto exit
    	}
        // 每5秒最多只处理4个channel
    	num := n.getOpts().QueueScanSelectionCount
    	if num > len(channels) {
    		num = len(channels)
    	}
    
    loop:
    	for _, i := range util.UniqRands(num, len(channels)) {
    		workCh <- channels[i]
    	}
        ......
    }
    ......
}

github.com/nsqio/nsq/nsqd/channel.go #544

func (c *Channel) processInFlightQueue(t int64) bool {
    ......
    dirty := false
    for {
        c.inFlightMutex.Lock()
        // 根据当前时间戳,从inFlightPqueue获取已超时的msg
        msg, _ := c.inFlightPQ.PeekAndShift(t)
        c.inFlightMutex.Unlock()
        
        if msg == nil {
        	goto exit
        }
        dirty = true
        // 根据msg.Id从inFlightMessages弹出消息
        _, err := c.popInFlightMessage(msg.clientID, msg.ID)
        if err != nil {
        	goto exit
        }
        atomic.AddUint64(&c.timeoutCount, 1)
        c.RLock()
        client, ok := c.clients[msg.clientID]
        c.RUnlock()
        if ok {
        	client.TimedOutMessage()
        }
        // 将消息重新放入 channel的 memoryMsgChan 里
        c.put(msg)
    }
    ......
}

在这里可以看出,消息还是不能百分百成功投送,在消息从队列(memoryMsgChan)取出,放入缓存区(inFlightMessages),在这步之后nsq非正常关闭,就会导致该消息的丢失。

← Previous Next →
Less
More