除了使用List实现简单的消息队列功能以外,Redis还提供了发布订阅的消息机制。在这种机制下,消息发布者向指定频道(channel)发布消息,消息订阅者可以收到指定频道的消息,同一个频道可以有多个消息订阅者,如下图:
Redis也提供了一些命令支持这个机制,接下来我们详细介绍一下这些命令。
在Redis中,发布订阅相关命令有:
发布消息的命令是publish
,语法是:
publish 频道名称 消息
比如,要向channel:one-more-study:demo频道发布一条消息“I am One More Study.”,命令如下:
> publish channel:one-more-study:demo "I am One More Study." (integer) 0
返回的结果是订阅者的个数,上例中没有订阅者,所以返回结果为0。
订阅消息的命令是subscribe
,订阅者可以订阅一个或者多个频道,语法是:
subscribe 频道名称 [频道名称 ...]
比如,订阅一个channel:one-more-study:demo频道,命令如下:
> subscribe channel:one-more-study:demo Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "channel:one-more-study:demo" 3) (integer) 1
返回结果中有3条,分别表示:返回值的类型(订阅成功)、订阅的频道名称、目前已订阅的频道数量。当订阅者接受到消息时,就会显示:
1) "message" 2) "channel:one-more-study:demo" 3) "I am One More Study."
同样也是3条结果,分别表示:返回值的类型(信息)、消息来源的频道名称、消息内容。
新开启的订阅者,是无法收到该频道之前的历史消息的,因为Redis没有对发布的消息做持久化。
取消订阅的命令是unsubscribe
,可以取消一个或者多个频道的订阅,语法是:
unsubscribe [频道名称 [频道名称 ...]]
比如,取消订阅channel:one-more-study:demo频道,命令如下:
> unsubscribe channel:one-more-study:demo 1) "unsubscribe" 2) "channel:one-more-study:demo" 3) (integer) 0
返回结果中有3条,分别表示:返回值的类型(取消订阅成功)、取消订阅的频道名称、目前已订阅的频道数量。
按模式订阅消息的命令是psubscribe
,订阅一个或多个符合给定模式的频道,语法是:
psubscribe 模式 [模式 ...]
每个模式以 * 作为匹配符,比如 channel* 匹配所有以 channel 开头的频道,命令如下:
> psubscribe channel:* Reading messages... (press Ctrl-C to quit) 1) "psubscribe" 2) "channel*" 3) (integer) 1
返回结果中有3条,分别表示:返回值的类型(按模式订阅成功)、订阅的模式、目前已订阅的模式数量。当订阅者接受到消息时,就会显示:
1) "pmessage" 2) "channel*" 3) "channel:one-more-study:demo" 4) "I am One More Study."
返回结果中有4条,分别表示:返回值的类型(信息)、消息匹配的模式、消息来源的频道名称、消息内容。
按模式取消订阅的命令是punsubscribe
,可以取消一个或者多个模式的订阅,语法是:
punsubscribe [模式 [模式 ...]]
每个模式以 * 作为匹配符,比如 channel:* 匹配所有以 channel 开头的频道,命令如下:
1> punsubscribe channel:* 1) "punsubscribe" 2) "channel:*" 3) (integer) 0
返回结果中有3条,分别表示:返回值的类型(按模式取消订阅成功)、取消订阅的模式、目前已订阅的模式数量。
活跃频道指的是至少有一个订阅者的频道,语法是:
pubsub channels [模式]
比如:
> pubsub channels 1) "channel:one-more-study:test" 2) "channel:one-more-study:demo" 3) "channel:demo" > pubsub channels *demo 1) "channel:one-more-study:demo" 2) "channel:demo" > pubsub channels *one-more-study* 1) "channel:one-more-study:test" 2) "channel:one-more-study:demo"
pubsub numsub [频道名称 ...]
比如:
> pubsub numsub channel:one-more-study:demo 1) "channel:one-more-study:demo" 2) (integer) 1
> pubsub numpat (integer) 1
光说不练假把式,我们使用Java语言写一个简单的发布订阅示例。
Jedis是Redis官方推荐的Java连接开发工具,我们使用Jedis写一个简单的集群示例。
package onemore.study; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPoolConfig; import java.util.HashSet; import java.util.Set; /** * Jedis集群 * * @author 万猫学社 */ public enum Cluster { INSTANCE; //为了简单,把IP和端口直接写在这里,实际开发中写在配置文件会更好。 private final String hostAndPorts = "192.168.0.60:6379;192.168.0.61:6379;192.168.0.62:6379"; private JedisCluster jedisCluster; Cluster() { JedisPoolConfig poolConfig = new JedisPoolConfig(); //最大连接数 poolConfig.setMaxTotal(20); //最大空闲数 poolConfig.setMaxIdle(10); //最小空闲数 poolConfig.setMinIdle(2); //从jedis连接池获取连接时,校验并返回可用的连接 poolConfig.setTestOnBorrow(true); //把连接放回jedis连接池时,校验并返回可用的连接 poolConfig.setTestOnReturn(true); Set<HostAndPort> nodes = new HashSet<>(); String[] hosts = hostAndPorts.split(";"); for (String hostport : hosts) { String[] ipport = hostport.split(":"); String ip = ipport[0]; int port = Integer.parseInt(ipport[1]); nodes.add(new HostAndPort(ip, port)); } jedisCluster = new JedisCluster(nodes, 1000, poolConfig); } public JedisCluster getJedisCluster() { return jedisCluster; } }
package onemore.study; import redis.clients.jedis.JedisCluster; /** * 发布者 * * @author 万猫学社 */ public class Publisher implements Runnable { private final String CHANNEL_NAME = "channel:one-more-study:demo"; private final String QUIT_COMMAND = "quit"; @Override public void run() { JedisCluster jedisCluster = Cluster.INSTANCE.getJedisCluster(); for (int i = 1; i <= 3; i++) { String message = "第" + i + "消息"; System.out.println(Thread.currentThread().getName() + " 发布:" + message); jedisCluster.publish(CHANNEL_NAME, message); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("------------------"); } jedisCluster.publish(CHANNEL_NAME, QUIT_COMMAND); } }
package onemore.study; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPubSub; /** * 订阅者 * * @author 万猫学社 */ public class Subscriber implements Runnable { private final String CHANNEL_NAME = "channel:one-more-study:demo"; private final String QUIT_COMMAND = "quit"; private final JedisPubSub jedisPubSub = new JedisPubSub() { @Override public void onMessage(String channel, String message) { System.out.println(Thread.currentThread().getName() + " 接收:" + message); if (QUIT_COMMAND.equals(message)) { unsubscribe(CHANNEL_NAME); } } }; @Override public void run() { JedisCluster jedisCluster = Cluster.INSTANCE.getJedisCluster(); jedisCluster.subscribe(jedisPubSub, CHANNEL_NAME); } }
package onemore.study; public class App { public static void main(String[] args) throws InterruptedException { //创建3个订阅者 new Thread(new Subscriber()).start(); new Thread(new Subscriber()).start(); new Thread(new Subscriber()).start(); Thread.sleep(1000); //创建发布者 new Thread(new Publisher()).start(); } }
运行结果如下:
Thread-6 发布:第1消息 Thread-0 接收:第1消息 Thread-1 接收:第1消息 Thread-2 接收:第1消息 ------------------ Thread-6 发布:第2消息 Thread-0 接收:第2消息 Thread-1 接收:第2消息 Thread-2 接收:第2消息 ------------------ Thread-6 发布:第3消息 Thread-0 接收:第3消息 Thread-2 接收:第3消息 Thread-1 接收:第3消息 ------------------ Thread-0 接收:quit Thread-1 接收:quit Thread-2 接收:quit