加载中...

redis接口的二次封装(发布订阅)


redis接口的二次封装(发布订阅)

其实这一小节完全可以放到上一个小结,只是这里用了完全不同的玩法,所以我还是决定单拿出来分享一下这个小细节。

上一小结有关订阅部分的代码,请看:

  1. function _M.subscribe( self, channel )
  2. local redis, err = redis_c:new()
  3. if not redis then
  4. return nil, err
  5. end
  6. local ok, err = self:connect_mod(redis)
  7. if not ok or err then
  8. return nil, err
  9. end
  10. local res, err = redis:subscribe(channel)
  11. if not res then
  12. return nil, err
  13. end
  14. res, err = redis:read_reply()
  15. if not res then
  16. return nil, err
  17. end
  18. redis:unsubscribe(channel)
  19. self.set_keepalive_mod(redis)
  20. return res, err
  21. end

其实这里的实现是有问题的,各位看官,你能发现这段代码的问题么?给个提示,在高并发订阅场景下,极有可能存在漏掉部分订阅信息。原因在与每次订阅到内容后,都会把redis对象进行释放,处理完订阅信息后再次去连接redis,在这个时间差里面,很可能有消息已经漏掉了。

正确的代码应该是这样的:

  1. function _M.subscribe( self, channel )
  2. local redis, err = redis_c:new()
  3. if not redis then
  4. return nil, err
  5. end
  6. local ok, err = self:connect_mod(redis)
  7. if not ok or err then
  8. return nil, err
  9. end
  10. local res, err = redis:subscribe(channel)
  11. if not res then
  12. return nil, err
  13. end
  14. local function do_read_func ( do_read )
  15. if do_read == nil or do_read == true then
  16. res, err = redis:read_reply()
  17. if not res then
  18. return nil, err
  19. end
  20. return res
  21. end
  22. redis:unsubscribe(channel)
  23. self.set_keepalive_mod(redis)
  24. return
  25. end
  26. return do_read_func
  27. end

调用示例代码:

  1. local red = redis:new({timeout=1000})
  2. local func = red:subscribe( "channel" )
  3. if not func then
  4. return nil
  5. end
  6. while true do
  7. local res, err = func()
  8. if err then
  9. func(false)
  10. end
  11. ... ...
  12. end
  13. return cbfunc

还没有评论.