加载中...

redis接口的二次封装(发布订阅)


redis接口的二次封装(发布订阅)

其实这一小节完全可以放到上一个小结,只是这里用了完全不同的玩法,所以我还是决定单拿出来分享一下这个小细节。

上一小结有关订阅部分的代码,请看:

function _M.subscribe( self, channel )
    local redis, err = redis_c:new()
    if not redis then
        return nil, err
    end

    local ok, err = self:connect_mod(redis)
    if not ok or err then
        return nil, err
    end

    local res, err = redis:subscribe(channel)
    if not res then
        return nil, err
    end

    res, err = redis:read_reply()
    if not res then
        return nil, err
    end

    redis:unsubscribe(channel)
    self.set_keepalive_mod(redis)

    return res, err
end

其实这里的实现是有问题的,各位看官,你能发现这段代码的问题么?给个提示,在高并发订阅场景下,极有可能存在漏掉部分订阅信息。原因在与每次订阅到内容后,都会把redis对象进行释放,处理完订阅信息后再次去连接redis,在这个时间差里面,很可能有消息已经漏掉了。

正确的代码应该是这样的:

function _M.subscribe( self, channel )
    local redis, err = redis_c:new()
    if not redis then
        return nil, err
    end

    local ok, err = self:connect_mod(redis)
    if not ok or err then
        return nil, err
    end

    local res, err = redis:subscribe(channel)
    if not res then
        return nil, err
    end

    local function do_read_func ( do_read )
        if do_read == nil or do_read == true then
            res, err = redis:read_reply()
            if not res then
                return nil, err
            end
            return res
        end

        redis:unsubscribe(channel)
        self.set_keepalive_mod(redis)
        return 
    end

    return do_read_func
end

调用示例代码:

local red     = redis:new({timeout=1000})  
local func  = red:subscribe( "channel" )
if not func then
  return nil
end

while true do
    local res, err = func()
    if err then
        func(false)
    end
    ... ...
end

return cbfunc

还没有评论.