Redis 通过 PUBLISH 、 SUBSCRIBE 等命令实现了订阅与发布模式,这个功能提供两种信息机制,分别是订阅/发布到频道和订阅/发布到模式,下文先讨论订阅/发布到频道的实现,再讨论订阅/发布到模式的实现。
Redis 的 SUBSCRIBE 命令可以让客户端订阅任意数量的频道,每当有新信息发送到被订阅的频道时,信息就会被发送给所有订阅指定频道的客户端。
作为例子,下图展示了频道 channel1
,以及订阅这个频道的三个客户端 —— client2
、 client5
和 client1
之间的关系:
channel1 [label = "subscribe"]; client5 -> channel1 [label = "subscribe"]; client1 -> channel1 [label = "subscribe"];}" />
当有新消息通过 PUBLISH 命令发送给频道 channel1
时,这个消息就会被发送给订阅它的三个客户端:
![digraph send_message_to_subscriber { node [style = filled]; edge [style = "dashed, bold"]; message [label = "PUBLISH channel1 message", shape = plaintext, fillcolor = "#FADCAD"]; message -> channel1 [color = "#B22222]"]; channel1 [label = "channel1", fillcolor = "#A8E270"]; node [shape = box]; client2 [label = "client2", fillcolor = "#95BBE3"]; client5 [label = "client5", fillcolor = "#95BBE3"]; client1 [label = "client1", fillcolor = "#95BBE3"]; / client2 -> channel1 [label = "subscribe"]; client5 -> channel1 [label = "subscribe"]; client1 -> channel1 [label = "subscribe"]; / channel1 -> client2 [label = "message", color = "#B22222"]; channel1 -> client5 [label = "message", color = "#B22222"]; channel1 -> client1 [label = "message", color = "#B22222"];}](http://box.kancloud.cn/2015-09-13_55f4effd69833.svg)
在后面的内容中,我们将探讨 SUBSCRIBE 和 PUBLISH 命令的实现,以及这套订阅与发布机制的运作原理。
每个 Redis 服务器进程都维持着一个表示服务器状态的 redis.h/redisServer
结构,结构的 pubsub_channels
属性是一个字典,这个字典就用于保存订阅频道的信息:
struct redisServer { // ... dict *pubsub_channels; // ... };
其中,字典的键为正在被订阅的频道,而字典的值则是一个链表,链表中保存了所有订阅这个频道的客户端。
比如说,在下图展示的这个 pubsub_channels
示例中, client2
、 client5
和 client1
就订阅了 channel1
,而其他频道也分别被别的客户端所订阅:
channel1 | channel2 | channel3 | ... | channelN", fillcolor = "#A8E270"]; // clients blocking for channel1 client1 [label = "client1", fillcolor = "#95BBE3"]; client5 [label = "client5", fillcolor = "#95BBE3"]; client2 [label = "client2", fillcolor = "#95BBE3"]; null_1 [label = "NULL", shape = plaintext]; pubsub:channel1 -> client2; client2 -> client5; client5 -> client1; client1 -> null_1; // clients blocking for channel2 client7 [label = "client7", fillcolor = "#95BBE3"]; null_2 [label = "NULL", shape = plaintext]; pubsub:channel2 -> client7; client7 -> null_2; // channel client3 [label = "client3", fillcolor = "#95BBE3"]; client4 [label = "client4", fillcolor = "#95BBE3"]; client6 [label = "client6", fillcolor = "#95BBE3"]; null_3 [label = "NULL", shape = plaintext]; pubsub:channel3 -> client3; client3 -> client4; client4 -> client6; client6 -> null_3;}" />
当客户端调用 SUBSCRIBE 命令时,程序就将客户端和要订阅的频道在 pubsub_channels
字典中关联起来。
举个例子,如果客户端 client10086
执行命令 SUBSCRIBE channel1 channel2 channel3
,那么前面展示的 pubsub_channels
将变成下面这个样子:
channel1 | channel2 | channel3 | ... | channelN", fillcolor = "#A8E270"]; // clients blocking for channel1 client1 [label = "client1", fillcolor = "#95BBE3"]; client5 [label = "client5", fillcolor = "#95BBE3"]; client2 [label = "client2", fillcolor = "#95BBE3"]; client10086 [label = "client10086", fillcolor = "#FFC1C1"]; client10086_1 [label = "client10086", fillcolor = "#FFC1C1"]; client10086_2 [label = "client10086", fillcolor = "#FFC1C1"]; null_1 [label = "NULL", shape = plaintext]; null_2 [label = "NULL", shape = plaintext]; null_3 [label = "NULL", shape = plaintext]; pubsub:channel1 -> client2; client2 -> client5; client5 -> client1; client1 -> client10086; client10086 -> null_1; // clients blocking for channel2 client7 [label = "client7", fillcolor = "#95BBE3"]; pubsub:channel2 -> client7; client7 -> client10086_1; client10086_1 -> null_2; // channel client3 [label = "client3", fillcolor = "#95BBE3"]; client4 [label = "client4", fillcolor = "#95BBE3"]; client6 [label = "client6", fillcolor = "#95BBE3"]; pubsub:channel3 -> client3; client3 -> client4; client4 -> client6; client6 -> client10086_2; client10086_2 -> null_3;}" />
SUBSCRIBE 命令的行为可以用伪代码表示如下:
def SUBSCRIBE(client, channels): # 遍历所有输入频道 for channel in channels: # 将客户端添加到链表的末尾 redisServer.pubsub_channels[channel].append(client)
通过 pubsub_channels
字典,程序只要检查某个频道是否为字典的键,就可以知道该频道是否正在被客户端订阅;只要取出某个键的值,就可以得到所有订阅该频道的客户端的信息。
了解了 pubsub_channels
字典的结构之后,解释 PUBLISH 命令的实现就非常简单了:当调用 PUBLISH channel message
命令,程序首先根据 channel
定位到字典的键,然后将信息发送给字典值链表中的所有客户端。
比如说,对于以下这个 pubsub_channels
实例,如果某个客户端执行命令 PUBLISH channel1 "hello moto"
,那么 client2
、 client5
和 client1
三个客户端都将接收到 "hello moto"
信息:
channel1 | channel2 | channel3 | ... | channelN", fillcolor = "#A8E270"]; // clients blocking for channel1 client1 [label = "client1", fillcolor = "#95BBE3"]; client5 [label = "client5", fillcolor = "#95BBE3"]; client2 [label = "client2", fillcolor = "#95BBE3"]; null_1 [label = "NULL", shape = plaintext]; pubsub:channel1 -> client2; client2 -> client5; client5 -> client1; client1 -> null_1; // clients blocking for channel2 client7 [label = "client7", fillcolor = "#95BBE3"]; null_2 [label = "NULL", shape = plaintext]; pubsub:channel2 -> client7; client7 -> null_2; // channel client3 [label = "client3", fillcolor = "#95BBE3"]; client4 [label = "client4", fillcolor = "#95BBE3"]; client6 [label = "client6", fillcolor = "#95BBE3"]; null_3 [label = "NULL", shape = plaintext]; pubsub:channel3 -> client3; client3 -> client4; client4 -> client6; client6 -> null_3;}" />
PUBLISH 命令的实现可以用以下伪代码来描述:
def PUBLISH(channel, message): # 遍历所有订阅频道 channel 的客户端 for client in server.pubsub_channels[channel]: # 将信息发送给它们 send_message(client, message)
使用 UNSUBSCRIBE 命令可以退订指定的频道,这个命令执行的是订阅的反操作:它从 pubsub_channels
字典的给定频道(键)中,删除关于当前客户端的信息,这样被退订频道的信息就不会再发送给这个客户端。
当使用 PUBLISH 命令发送信息到某个频道时,不仅所有订阅该频道的客户端会收到信息,如果有某个/某些模式和这个频道匹配的话,那么所有订阅这个/这些频道的客户端也同样会收到信息。
下图展示了一个带有频道和模式的例子,其中 tweet.shop.*
模式匹配了 tweet.shop.kindle
频道和 tweet.shop.ipad
频道,并且有不同的客户端分别订阅它们三个:
kindle [label = "match"]; pattern -> ipad [label = "match"]; node [shape = box]; client123 [fillcolor = "#95BBE3"]; client256 [fillcolor = "#95BBE3"]; clientX [fillcolor = "#95BBE3"]; clientY [fillcolor = "#95BBE3"]; client3333 [fillcolor = "#95BBE3"]; client4444 [fillcolor = "#95BBE3"]; client5555 [fillcolor = "#95BBE3"]; client123 -> pattern [label = "subscribe"]; client256 -> pattern [label = "subscribe"]; clientX -> kindle [label = "subscribe"]; clientY -> kindle [label = "subscribe"]; client3333 -> ipad [label = "subscribe"]; client4444 -> ipad [label = "subscribe"]; client5555 -> ipad [label = "subscribe"];}" />
当有信息发送到 tweet.shop.kindle
频道时,信息除了发送给 clientX
和 clientY
之外,还会发送给订阅 tweet.shop.*
模式的 client123
和 client256
:
pattern [label = "match", dir = back]; node [shape = box]; ipad -> client3333 [label = "subscribe", dir = back]; ipad -> client4444 [label = "subscribe", dir = back]; ipad -> client5555 [label = "subscribe", dir = back]; node [shape = plaintext]; message [label = "PUBLISH tweet.shop.kindle message", fillcolor = "#FADCAD"]; kindle [label = "tweet.shop.kindle", shape = ellipse, fillcolor = "#A8E270"]; pattern [label = "tweet.shop.*", shape = octagon]; message -> kindle [style = "bold, dashed", color = "#B22222"]; kindle -> pattern [style = "bold, dashed", color = "#B22222"]; node [shape = box]; kindle -> clientX [style = "bold, dashed", color = "#B22222", label = "message"]; kindle -> clientY [style = "bold, dashed", color = "#B22222", label = "message"]; pattern -> client123 [label = "message", style = "bold, dashed", color = "#B22222"]; pattern -> client256 [label = "message", style = "bold, dashed", color = "#B22222"]; // client color client123 [fillcolor = "#95BBE3"]; client256 [fillcolor = "#95BBE3"]; clientX [fillcolor = "#95BBE3"]; clientY [fillcolor = "#95BBE3"]; client3333 [fillcolor = "#95BBE3"]; client4444 [fillcolor = "#95BBE3"]; client5555 [fillcolor = "#95BBE3"];}" />
另一方面,如果接收到信息的是频道 tweet.shop.ipad
,那么 client123
和 client256
同样会收到信息:
kindle [label = "match"]; pattern -> ipad [style = "bold, dashed", color = "#B22222", dir = back]; node [shape = box]; client123 -> pattern [label = "message", dir = back, style= "bold, dashed", color = "#B22222"]; client256 -> pattern [label = "message", dir = back, style= "bold, dashed", color = "#B22222"]; clientX -> kindle [label = "subscribe"]; clientY -> kindle [label = "subscribe"]; client3333 -> ipad [label = "message", style = "bold, dashed", color = "#B22222", dir = back]; client4444 -> ipad [label = "message", style = "bold, dashed", color = "#B22222", dir = back]; client5555 -> ipad [label = "message", style = "bold, dashed", color = "#B22222", dir = back]; // new publish [label = "PUBLISH tweet.shop.ipad message", shape = plaintext, fillcolor = "#FADCAD"]; ipad -> publish [style = "bold, dashed", color = "#B22222", dir = back]; // client color client123 [fillcolor = "#95BBE3"]; client256 [fillcolor = "#95BBE3"]; clientX [fillcolor = "#95BBE3"]; clientY [fillcolor = "#95BBE3"]; client3333 [fillcolor = "#95BBE3"]; client4444 [fillcolor = "#95BBE3"]; client5555 [fillcolor = "#95BBE3"];}" />
redisServer.pubsub_patterns
属性是一个链表,链表中保存着所有和模式相关的信息:
struct redisServer { // ... list *pubsub_patterns; // ... };
链表中的每个节点都包含一个 redis.h/pubsubPattern
结构:
typedef struct pubsubPattern { redisClient *client; robj *pattern; } pubsubPattern;
client
属性保存着订阅模式的客户端,而 pattern
属性则保存着被订阅的模式。
每当调用 PSUBSCRIBE
命令订阅一个模式时,程序就创建一个包含客户端信息和被订阅模式的 pubsubPattern
结构,并将该结构添加到 redisServer.pubsub_patterns
链表中。
作为例子,下图展示了一个包含两个模式的 pubsub_patterns
链表,其中 client123
和 client256
都正在订阅 tweet.shop.*
模式:
pubsub_patterns | ...", fillcolor = "#A8E270"]; pubsubPattern_1 [label = "pubsubPattern | client \n client123 | pattern \n tweet.shop.*", fillcolor = "#95BBE3"]; pubsubPattern_2 [label = "pubsubPattern | client \n client256 | pattern \n tweet.shop.*", fillcolor = "#95BBE3"]; redisServer:pubsub_patterns -> pubsubPattern_1; pubsubPattern_1 -> pubsubPattern_2;}" />
如果这时客户端 client10086
执行 PSUBSCRIBE broadcast.list.*
,那么 pubsub_patterns
链表将被更新成这样:
pubsub_patterns | ...", fillcolor = "#A8E270"]; pubsubPattern_1 [label = "pubsubPattern | client \n client123 | pattern \n tweet.shop.*", fillcolor = "#95BBE3"]; pubsubPattern_2 [label = "pubsubPattern | client \n client256 | pattern \n tweet.shop.*", fillcolor = "#95BBE3"]; pubsubPattern_3 [label = "pubsubPattern | client \n client10086 | pattern \n broadcast.live.*", fillcolor = "#FFC1C1"]; redisServer:pubsub_patterns -> pubsubPattern_1; pubsubPattern_1 -> pubsubPattern_2; pubsubPattern_2 -> pubsubPattern_3;}" />
通过遍历整个 pubsub_patterns
链表,程序可以检查所有正在被订阅的模式,以及订阅这些模式的客户端。
发送信息到模式的工作也是由 PUBLISH 命令进行的,在前面讲解频道的时候,我们给出了这样一段伪代码,说它定义了 PUBLISH 命令的行为:
def PUBLISH(channel, message): # 遍历所有订阅频道 channel 的客户端 for client in server.pubsub_channels[channel]: # 将信息发送给它们 send_message(client, message)
但是,这段伪代码并没有完整描述 PUBLISH 命令的行为,因为 PUBLISH 除了将 message
发送到所有订阅 channel
的客户端之外,它还会将 channel
和 pubsub_patterns
中的模式进行对比,如果 channel
和某个模式匹配的话,那么也将 message
发送到订阅那个模式的客户端。
完整描述 PUBLISH 功能的伪代码定于如下:
def PUBLISH(channel, message): # 遍历所有订阅频道 channel 的客户端 for client in server.pubsub_channels[channel]: # 将信息发送给它们 send_message(client, message) # 取出所有模式,以及订阅模式的客户端 for pattern, client in server.pubsub_patterns: # 如果 channel 和模式匹配 if match(channel, pattern): # 那么也将信息发给订阅这个模式的客户端 send_message(client, message)
举个例子,如果 Redis 服务器的 pubsub_patterns
状态如下:
pubsub_patterns | ...", fillcolor = "#A8E270"]; pubsubPattern_1 [label = "pubsubPattern | client \n client123 | pattern \n tweet.shop.*", fillcolor = "#95BBE3"]; pubsubPattern_2 [label = "pubsubPattern | client \n client256 | pattern \n tweet.shop.*", fillcolor = "#95BBE3"]; pubsubPattern_3 [label = "pubsubPattern | client \n client10086 | pattern \n broadcast.live.*", fillcolor = "#FFC1C1"]; redisServer:pubsub_patterns -> pubsubPattern_1; pubsubPattern_1 -> pubsubPattern_2; pubsubPattern_2 -> pubsubPattern_3;}" />
那么当某个客户端发送信息 "Amazon Kindle, $69."
到 tweet.shop.kindle
频道时,除了所有订阅了 tweet.shop.kindle
频道的客户端会收到信息之外,客户端 client123
和 client256
也同样会收到信息,因为这两个客户端订阅的 tweet.shop.*
模式和 tweet.shop.kindle
频道匹配。
使用 PUNSUBSCRIBE 命令可以退订指定的模式,这个命令执行的是订阅模式的反操作:程序会删除 redisServer.pubsub_patterns
链表中,所有和被退订模式相关联的 pubsubPattern
结构,这样客户端就不会再收到和模式相匹配的频道发来的信息。
redisServer.pubsub_channels
字典保存,字典的键为被订阅的频道,字典的值为订阅频道的所有客户端。redisServer.pubsub_patterns
链表保存,链表的每个节点都保存着一个 pubsubPattern
结构,结构中保存着被订阅的模式,以及订阅该模式的客户端。程序通过遍历链表来查找某个频道是否和某个模式匹配。