Redis队列的实现

关于redis队列的实现方式有两种:

  • 生产者消费者模式
  • 发布者订阅者模式

生产者消费者模式

普通版本

比如一个队列里面,生产者A push了一个数据进去,消费者B pop了这个数据,那个这个队列依旧为空。所以是一对一的。

至于是先进先出还是先进后出等,可以依照函数lpush(从队列左边,也就是队首push一个数据)、rpush(从队列右边也就是队尾push一个数据)、lpop(同理)、rpop等来控制。

插入数据:

1
2
3
4
5
6
127.0.0.1:6379> lpush mylist 2
(integer) 1
127.0.0.1:6379> lpush mylist 1
(integer) 2
127.0.0.1:6379> rpush mylist 3
(integer) 3

显示数据:

1
2
3
4
127.0.0.1:6379> lrange mylist 0 -1
1) "1"
2) "2"
3) "3"

取出数据:

1
2
3
4
127.0.0.1:6379> lpop mylist
"1"
127.0.0.1:6379> rpop mylist
"3"

阻塞版本

但是上面的命令都是立即返回的,无论数据有无,关于取数据lpop有个增强版本,blpop(block left pop)阻塞版本

使用方法:blpop key1 key2 ... keyn 10

同时预获取多个key的值,并设置超时时间为10s,如果所有key,有些key有value就立即返回,如果所有key都没有value就阻塞10秒返回

1
2
3
4
5
6
127.0.0.1:6379> blpop mylist 5
1) "mylist"
2) "2"
127.0.0.1:6379> blpop mylist 5
(nil)
(5.01s)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
127.0.0.1:6379> rpush mylist one
(integer) 1
127.0.0.1:6379> rpush mylist two
(integer) 2
127.0.0.1:6379> rpush mylist2 one2
(integer) 1
127.0.0.1:6379> blpop mylist mylist2 5
1) "mylist"
2) "one"
127.0.0.1:6379> blpop mylist mylist2 5
1) "mylist"
2) "two"
127.0.0.1:6379> blpop mylist mylist2 5
1) "mylist2"
2) "one2"
127.0.0.1:6379> blpop mylist mylist2 5
(nil)
(5.18s)

关于blpop多个key返回数据的顺序,比如blpop mylist mylist2 5这个命令,先检查mylist有数据就返回,如果没有数据,就检查mylist2依次。。。。直到所有key检查完如果都没有数据就阻塞。

这种从多个队列里面取数据的方式可以用来做优先级的队列,比如mylist队列的优先级高于mylist2,push的时候,高优先级就push到mylist里面,普通优先级就push到mylist2里面,这样就会先取mylist里面的高优先级的数据来处理。

但是,如果遇到队列的优先级等级过多,比如有(0-9999)个优先级,上面就不行了。解决思路是插入的时候先把数据取出来自己实现二分查找找出该插入的位置,用lset命令插入。

如果数据过多,比如队列有几十万,可以把队列分成几十个或几百个小队列,比如0号队列存优先级为(0-1000),1号队列存优先级为(1001-2000)的数据,依次。。。。。

由于这种队列模式pop出来一个后就返回了,所以处理业务的时候最好把pop写在一个while(true){pop.....do logic}循环里面。

发布者订阅者模式

概念:三个用户A,B,C同时都订阅了一个channel名字叫msg,然后发布者往msg的channel里面发布了一个数据,那么A,B,C三个用户都会收到该数据。

注意:

1、很明显,三个用户ABC需要阻塞。怎么收到订阅的数据呢,肯定是依靠注册在redis里面的回调函数
2、发布的数据不会在redis里面复现,意思就是发布了以后,A,B,C由于种种原因没收到就没收到

直接上代码:

(1)发布者

1
2
3
4
5
6
7
8
9
10
11
12
13
$redis   = new Redis();
$redis->connect('127.0.0.1', '6379');

$type = 'msg';
$msg = "https://www.appblog.cn";

$result = $redis->publish($type, $msg); //同步操作,第一个参数是channel,第二个参数是数据

if (empty($result)) {
echo 'publish failed';
} else {
echo 'publish success';
}

(2)订阅者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$redis    = new Redis();
$redis->connect('127.0.0.1', '6379');

$redis->setOption(\Redis::OPT_READ_TIMEOUT, - 1);
//重点!!!得设置不timeout,否则60内没收到发布者的消息就会自动断开

$type = 'msg';

$result = $redis->subscribe(array($type), 'callback'); //异步阻塞,有消息来自动调用callback函数

function callback($redis, $type, $msg) {
//这里处理逻辑
echo $type. "==>" . $msg . "\r\n";
}

两种方式比较

1、生产者消费者模式需要消费者主动去拉数据,如果写成死循环并且阻塞模式,就和第二种方式差不多了
2、发布者订阅者模式的数据并不存在于某个key里面,如果订阅者没收到则该数据就丢失了

Powered by AppBlog.CN     浙ICP备14037229号

Copyright © 2012 - 2021 APP开发技术博客 All Rights Reserved.

访客数 : | 访问量 :