阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Redis应用—-消息传递

178次阅读
没有评论

共计 12051 个字符,预计需要花费 31 分钟才能阅读完成。

1、摘要

消息传递这一应用广泛存在于各个网站中,这个功能也是一个网站必不可少的。常见的消息传递应用有,新浪微博中的 @我呀、给你评论然后的提示呀、赞赞赞提示、私信呀、甚至是发微博分享的新鲜事;知乎中的私信呀、live 发送过来的消息、知乎团队消息呀等等。

2、实现方法

消息传递即两个或者多个客户端在相互发送和接收消息。

通常有两种方法实现:

第一种为消息推送。Redis 内置有这种机制,publish 往频道推送消息、subscribe 订阅频道。这种方法有一个缺点就是必须保证接收者时刻在线(即是此时程序不能停下来,一直保持监控状态,假若断线后就会出现客户端丢失信息)

第二种为消息拉取。所谓消息拉取,就是客户端自主去获取存储在服务器中的数据。Redis 内部没有实现消息拉取这种机制。因此我们需要自己手动编写代码去实现这个功能。

在这里我们,我们进一步将消息传递再细分为一对一的消息传递,多对多的消息传递(群组消息传递)。

【注:两个类的代码相对较多,因此将其折叠起来了】

 

3、一对一消息传递

例子 1 :一对一消息发送与获取

模块要求:

1、提示有多少个联系人发来新消息

2、信息包含发送人、时间、信息内容

3、能够获取之前的旧消息

4、并且消息能够保持 7 天,过期将会被动触发删除

Redis 实现思路:

1、新消息与旧消息分别采用两个链表来存储

2、原始消息的结构采用数组的形式存放,并且含有发送人、时间戳、信息内容

3、在推入 redis 的链表前,需要将数据转换为 json 类型然后再进行存储

4、在取出新信息时应该使用 rpoplpush 来实现,将已读的新消息推入旧消息链表中

5、取出旧消息时,应该用旧消息的时间与现在的时间进行对比,若超时,则直接删除后面的全部数据(因为数据是按时间一个一个压进链表中的,所以对于时间是有序排列的)

数据存储结构图:

 Redis 应用 ---- 消息传递

PHP 的实现代码:

#SinglePullMessage.class.php

Redis 应用 ---- 消息传递  

  1 <?php
  2 # 单接接收者接收消息 
  3 class SinglePullMessage
  4 {  5     private $redis='';  # 存储 redis 对象 
  6     /**
  7     * @desc 构造函数
  8     * 
  9     * @param $host string | redis 主机
 10     * @param $port int    | 端口
 11     */
 12     public function __construct($host,$port=6379)
 13     { 14         $this->redis=new Redis();
 15         $this->redis->connect($host,$port);
 16     } 
 17 
 18     /**
 19     * @desc 发送消息(一个人) 20     * 
 21     * @param $toUser   string    | 接收人
 22     * @param $messageArr array   | 发送的消息数组,包含 sender、message、time 
 23     *
 24     * @return bool
 25     */
 26     public function sendSingle($toUser,$messageArr)
 27     { 28         $json_message=json_encode($messageArr);    # 编码成 json 数据 
 29         return $this->redis->lpush($toUser,$json_message);      # 将数据推入链表 
 30     }
 31 
 32     /**
 33     * @desc 用户获取新消息
 34     *
 35     * @param $user string | 用户名
 36     *
 37     * @return array 返回数组,包含多少个用户发来新消息,以及具体消息
 38     */
 39     public function getNewMessage($user)
 40     { 41         # 接收新信息数据,并且将数据推入旧信息数据链表中,并且在原链表中删除 
 42         $messageArr=array();
 43         while($json_message=$this->redis->rpoplpush($user, 'preMessage_'.$user))
 44         { 45             $temp=json_decode($json_message);   # 将 json 数据变成对象 
 46             $messageArr[$temp->sender][]=$temp;        # 转换成数组信息 
 47         }
 48         if($messageArr)
 49         { 50             $arr['count']=count($messageArr);   # 统计有多少个用户发来信息 
 51             $arr['messageArr']=$messageArr;
 52             return $arr;
 53         }
 54         return false;
 55     }
 56 
 57     public function getPreMessage($user)
 58     { 59         ## 取出旧消息 
 60         $messageArr=array();
 61         $json_pre=$this->redis->lrange('preMessage_'.$user, 0, -1);    # 一次性将全部旧消息取出来 
 62         foreach ($json_pre as $k => $v) 
 63         { 64             $temp=json_decode($v);            #json 反编码 
 65             $timeout=$temp->time+60*60*24*7;  # 数据过期时间  七天过期 
 66             if($timeout<time())               # 判断数据是否过期 
 67             { 68                 if($k==0)                     # 若是最迟插入的数据都过期了,则将所有数据删除 
 69                 { 70                     $this->redis->del('preMessage_'.$user);
 71                     break;
 72                 }
 73                 $this->redis->ltrim('preMessage_'.$user, 0, $k);  # 若检测出有过期的,则将比它之前插入的所有数据删除 
 74                 break;
 75             }
 76             $messageArr[$temp->sender][]=$temp;
 77         }
 78         return $messageArr;
 79     }
 80 
 81     /**
 82     * @desc 消息处理,没什么特别的作用。在这里这是用来处理数组信息,然后将其输出。 83     *
 84     * @param $arr array | 需要处理的信息数组
 85     *
 86     * @return 返回打印输出
 87     */
 88     public function dealArr($arr)
 89     { 90         foreach ($arr as $k => $v) 
 91         { 92             foreach ($v as $k1 => $v2) 
 93             { 94                 echo '发送人:'.$v2->sender.'发送时间:'.date('Y-m-d h:i:s',$v2->time).'<br/>';
 95                 echo '消息内容:'.$v2->message.'<br/>';
 96             }
 97             echo "<hr/>";
 98         }
 99     }
100 
101 
102 }

 

 测试:

1、发送消息

#建立 test1.php

1 include './SinglePullMessage.class.php';
2 $object=new SinglePullMessage('192.168.95.11');
3 # 发送消息 
4 $sender='boss';     # 发送者 
5 $to='jane';         # 接收者 
6 $message='How are you';    # 信息 
7 $time=time();
8 $arr=array('sender'=>$sender,'message'=>$message,'time'=>$time);
9 echo $object->sendSingle($to,$arr);

 

2、获取新消息

#建立 test2.php

 1 include './SinglePullMessage.class.php';
 2 $object=new SinglePullMessage('192.168.95.11');
 3 # 获取新消息 
 4 $arr=$object->getNewMessage('jane');
 5 if($arr)
 6 { 7     echo $arr['count']."个联系人发来新消息 <br/><hr/>";
 8     $object->dealArr($arr['messageArr']);   
 9 }
10 else
11     echo "无新消息";

 

  访问结果:

Redis 应用 ---- 消息传递

3、获取旧消息

#建立 test3.php

 1 include './SinglePullMessage.class.php';
 2 $object=new SinglePullMessage('192.168.95.11');
 3 # 获取旧消息 
 4 $arr=$object->getPreMessage('jane');
 5 if($arr)
 6 { 7     $object->dealArr($arr);
 8 }
 9 else
10     echo "无旧数据";

 

4、多对多消息传递

例子 2 :多对多消息发送与获取(即是群组)

模块要求:

1、用户能够自行创建群组,并成为群主

2、群主可以拉人进来作为群组成员、并且可以踢人

3、用户可以直接退出群组

4、可以发送消息,每一位成员都可以拉取消息

5、群组的消息最大容纳量为 5000 条

6、成员可以拉取新消息,并提示有多少新消息

7、成员可以分页获取之前已读的旧消息

。。。。。功能就写这几个吧,有需要或者想练习的同学们可以增加其他功能,例如禁言、匿名消息发送、文件发送等等。

Redis 实现思路:

1、群组的消息以及群组的成员组成采用有序集合进行存储。群组消息有序集合的 member 存储用户发送的 json 数据消息,score 存储唯一值,将采用原子操作 incr 获取 string 中的自增长值进行存储;群组成员有序集合的 member 存储 user,score 存储非零数字(在这里这个 score 意义不大,我的例子代码中使用数字 1 为群主的 score,其他的存储为 2。当然这使用这个数据还可以扩展别的功能,例如群组中成员等级)可参考下面数据存储结构简图。

2、用户所加入的群组也是采用有序集合进行存储。其中,member 存储群组 ID,score 存储用户已经获取该群组的最大消息分值(对应群组消息的 score 值)

3、用户创建群组的时候,通过原子操作 incr 从而获取一个唯一 ID

4、用户在群中发送消息时,也是通过原子操作 incr 获取一个唯一自增长有序 ID

5、在执行 incr 时,为防止并发导致竞争关系,因此需要进行加锁操作【redis 详细锁的讲解可以参考:Redis 构建分布式锁 http://www.linuxidc.com/Linux/2017-09/146927.htm】

6、创建群组方法简要思路,任何一个用户都可以创建群组聊天,在创建的同时,可以选择时是否添加群组成员(参数通过数组的形式)。创建过程将会为这个群组建立一个群组成员有序集合(群组信息有序集合暂时不创建),接着将群主添加进去,再将群 ID 添加用户所参加的群组有序集合中。

数据存储结构图:

Redis 应用 ---- 消息传递

 

Redis 应用 ---- 消息传递

PHP 的代码实现:

#ManyPullMessage.class.php

Redis 应用 ---- 消息传递

  1 <?php
  2 class ManyPullMessage
  3 {  4     private $redis='';  # 存储 redis 对象 
  5     /**
  6     * @desc 构造函数
  7     * 
  8     * @param $host string | redis 主机
  9     * @param $port int    | 端口
 10     */
 11     public function __construct($host,$port=6379)
 12     { 13         $this->redis=new Redis();
 14         $this->redis->connect($host,$port);
 15     } 
 16 
 17     /**
 18     * @desc 用于创建群组的方法,在创建的同时还可以拉人进群组
 19     * 
 20     * @param $user   string   | 用户名,创建群组的主人
 21     * @param $addUser array   | 其他用户构成的数组
 22     *
 23     * @param $lockName string | 锁的名字,用于获取群组 ID 的时候用
 24     * @return int 返回群组 ID
 25     */
 26     public function createGroupChat($user, $addUser=array(), $lockName='chatIdLock')
 27     { 28         $identifier=$this->getLock($lockName);  # 获取锁 
 29         if($identifier)
 30         { 31             $id=$this->redis->incr('groupChatID');       # 获取群组 ID
 32             $this->releaseLock($lockName,$identifier);   # 释放锁 
 33         }
 34         else
 35             return false;
 36         $messageCount=$this->redis->set('countMessage_'.$id, 0);  # 初始化这个群组消息计数器 
 37         # 开启非事务型流水线,一次性将所有 redis 命令传给 redis,减少与 redis 的连接 
 38         $pipe=$this->redis->pipeline();   
 39         $this->redis->zadd('groupChat_'.$id, 1, $user);  # 创建群组成员有序集合,并添加群主 
 40         # 将这个群组添加到 user 所参加的群组有序集合中 
 41         $this->redis->zadd('hasGroupChat_'.$user, 0, $id);  
 42         foreach ($addUser as $v)    # 创建群组的同时需要添加的用户成员 
 43         { 44             $this->redis->zadd('groupChat_'.$id, 2, $v);
 45             $this->redis->zadd('hasGroupChat_'.$v, 0, $id);
 46         }
 47         $pipe->exec();
 48         return $id;    # 返回群组 ID
 49     }
 50 
 51     /**
 52     * @desc 群主主动拉人进群
 53     *
 54     * @param $user       string | 群主名
 55     * @param $groupChatID   int | 群组 ID
 56     * @param $addMembers array  | 需要拉进群的用户
 57     *
 58     * @return bool
 59     */
 60     public function addMembers($user, $groupChatID, $addMembers=array())
 61     { 62         $groupMasterScore=$this->redis->zscore('groupChat_'.$groupChatID, $user);  # 将 groupChatName 的群主取出来 
 63         if($groupMasterScore==1)     # 判断 user 是否是群主 
 64         { 65             $pipe=$this->redis->pipeline(); # 开启非事务流水线 
 66             foreach ($addMembers as $v) 
 67             { 68                 $this->redis->zadd('groupChat_'.$groupChatID, 2, $v);                 # 添加进群 
 69                 $this->redis->zadd('hasGroupChat_'.$v, 0, $groupChatID); # 添加群名到用户的有序集合中 
 70             }
 71             $pipe->exec();
 72             return true;
 73         }
 74         return false;
 75     }
 76 
 77     /**
 78     * @desc 群主删除成员
 79     *
 80     * @param $user       string | 群主名
 81     * @param $groupChatID   int | 群组 ID
 82     * @param $delMembers  array | 需要删除的成员名字
 83     *
 84     * @return bool
 85     */
 86     public function delMembers($user, $groupChatID, $delMembers=array())
 87     { 88         $groupMasterScore=$this->redis->zscore('groupChat_'.$groupChatID, $user); 
 89         if($groupMasterScore==1)     # 判断 user 是否是群主 
 90         { 91             $pipe=$this->redis->pipeline(); # 开启非事务流水线 
 92             foreach ($delMembers as $v) 
 93             { 94                 $this->redis->zrem('groupChat_'.$groupChatID, $v);                 
 95                 $this->redis->zrem('hasGroupChat_'.$v, $groupChatID); 
 96             }
 97             $pipe->exec();
 98             return true;
 99         }
100         return false;
101     }
102 
103     /**
104     * @desc 退出群组
105     *
106     * @param $user string     | 用户名
107     * @param $groupChatID int | 群组名
108     */
109     public function quitGroupChat($user, $groupChatID)
110     {111         $this->redis->zrem('groupChat_'.$groupChatID, $user);
112         $this->redis->zrem('hasGroupChat_'.$user, $groupChatID);
113         return true;
114     }
115 
116     /**
117     * @desc 发送消息
118     *
119     * @param $user string        | 用户名
120     * @param $groupChatID int    | 群组 ID
121     * @param $messageArr array   | 包含发送消息的数组
122     * @param $preLockName string | 群消息锁前缀,群消息锁全名为 countLock_群 ID
123     *
124     * @return bool
125     */
126     public function sendMessage($user, $groupChatID, $messageArr, $preLockName='countLock_')
127     {128         $memberScore=$this->redis->zscore('groupChat_'.$groupChatID, $user); # 成员 score
129         if($memberScore)
130         {131             $identifier=$this->getLock($preLockName.$groupChatID);  # 获取锁 
132             if($identifier)     # 判断获取锁是否成功 
133             {134                 $messageCount=$this->redis->incr('countMessage_'.$groupChatID);
135                 $this->releaseLock($preLockName.$groupChatID,$identifier);  # 释放锁 
136             }
137             else
138                 return false;
139             $json_message=json_encode($messageArr);
140             $this->redis->zadd('groupChatMessage_'.$groupChatID, $messageCount, $json_message);
141             $count=$this->redis->zcard('groupChatMessage_'.$groupChatID);   # 查看信息量大小 
142             if($count>5000) # 判断数据量有没有达到 5000 条 
143             {# 数据量超 5000,则需要清除旧数据 
144                 $start=5000-$count;
145                 $this->redis->zremrangebyrank('groupChatMessage_'.$groupChatID, $start, $count);
146             }
147             return true;
148         }
149         return false;
150     }
151 
152     /**
153     * @desc 获取新信息
154     *
155     * @param $user string | 用户名
156     *
157     * @return 成功则放回 json 数据数组,无新信息返回 false
158     */
159     public function getNewMessage($user)
160     {161         $arrID=$this->redis->zrange('hasGroupChat_'.$user, 0, -1, 'withscores');    # 获取用户拥有的群组 ID
162         $json_message=array();  # 初始化 
163         foreach ($arrID as $k => $v)    # 遍历循环所有群组,查看是否有新消息 
164         {165             $messageCount=$this->redis->get('countMessage_'.$k);    # 群组最大信息分值数 
166             if($messageCount>$v)    # 判断用户是否存在未读新消息 
167             {168                 $json_message[$k]['message']=$this->redis->zrangebyscore('groupChatMessage_'.$k, $v+1, $messageCount);
169                 $json_message[$k]['count']=count($json_message[$k]['message']);  # 统计新消息数量 
170                 $this->redis->zadd('hasGroupChat_'.$user, $messageCount, $k);    # 更新已获取消息 
171             }   
172         }
173         if($json_message)
174             return $json_message;
175         return false;
176     }
177 
178     /**
179     * @desc 分页获取群组信息
180     *
181     * @param $user    string  | 用户名 
182     * @param $groupChatID int | 群组 ID
183     * @param $page        int | 第几页
184     * @param $size        int | 每页多少条数据
185     *
186     * @return 成功返回 json 数据,失败返回 false
187     */
188     public function getPartMessage($user, $groupChatID, $page=1, $size=10)
189     {190         $start=$page*$size-$size;   # 开始截取数据位置 
191         $stop=$page*$size-1;        # 结束截取数据位置 
192         $json_message=$this->redis->zrevrange('groupChatMessage_'.$groupChatID, $start, $stop);
193         if($json_message)
194             return $json_message;
195         return false;
196     }
197 
198 
199     /**
200     * @desc 加锁方法
201     *
202     * @param $lockName string | 锁的名字
203     * @param $timeout int | 锁的过期时间
204     *
205     * @return 成功返回 identifier/ 失败返回 false
206     */
207     public function getLock($lockName, $timeout=2)
208     {209         $identifier=uniqid();       # 获取唯一标识符 
210         $timeout=ceil($timeout);    # 确保是整数 
211         $end=time()+$timeout;
212         while(time()<$end)          # 循环获取锁 
213         {214             /*
215             #这里的 set 操作可以等同于下面那个 if 操作,并且可以减少一次与 redis 通讯
216             if($this->redis->set($lockName, $identifier array('nx', 'ex'=>$timeout)))
217                 return $identifier;
218             */
219             if($this->redis->setnx($lockName, $identifier))    # 查看 $lockName 是否被上锁 
220             {221                 $this->redis->expire($lockName, $timeout);     # 为 $lockName 设置过期时间 
222                 return $identifier;                             # 返回一维标识符 
223             }
224             elseif ($this->redis->ttl($lockName)===-1) 
225             {226                 $this->redis->expire($lockName, $timeout);     # 检测是否有设置过期时间,没有则加上 
227             }
228             usleep(0.001);         # 停止 0.001ms
229         }
230         return false;
231     }
232 
233     /**
234     * @desc 释放锁
235     *
236     * @param $lockName string   | 锁名
237     * @param $identifier string | 锁的唯一值
238     *
239     * @param bool
240     */
241     public function releaseLock($lockName,$identifier)
242     {243         if($this->redis->get($lockName)==$identifier)   # 判断是锁有没有被其他客户端修改 
244         {245             $this->redis->multi();
246             $this->redis->del($lockName);   # 释放锁 
247             $this->redis->exec();
248             return true;
249         }
250         else
251         {252             return false;   # 其他客户端修改了锁,不能删除别人的锁 
253         }
254     }
255 
256 
257 }
258 
259 ?>

测试:

1、建立 createGroupChat.php(测试创建群组功能)

执行代码并创建 568、569 群组(群主为 jack)

1 include './ManyPullMessage.class.php';
2 $object=new ManyPullMessage('192.168.95.11');
3 # 创建群组 
4 $user='jack';
5 $arr=array('jane1','jane2');
6 $a=$object->createGroupChat($user,$arr);
7 echo "<pre>";
8 print_r($a);
9 echo "</pre>";die

Redis 应用 ---- 消息传递 

Redis 应用 ---- 消息传递

 

2、建立 addMembers.php(测试添加成员功能)

执行代码并添加新成员

1 include './ManyPullMessage.class.php';
2 $object=new ManyPullMessage('192.168.95.11');
3 $b=$object->addMembers('jack','568',array('jane1','jane2','jane3','jane4'));
4 echo "<pre>";
5 print_r($b);
6 echo "</pre>";die;

Redis 应用 ---- 消息传递

3、建立 delete.php(测试群主删除成员功能)

1 include './ManyPullMessage.class.php';
2 $object=new ManyPullMessage('192.168.95.11');
3 # 群主删除成员 
4 $c=$object->delMembers('jack', '568', array('jane1','jane4'));
5 echo "<pre>";
6 print_r($c);
7 echo "</pre>";die;

Redis 应用 ---- 消息传递

4、建立 sendMessage.php(测试发送消息功能)

多执行几遍,568、569 都发几条

 1 include './ManyPullMessage.class.php';
 2 $object=new ManyPullMessage('192.168.95.11');
 3 # 发送消息 
 4 $user='jane2';
 5 $message='go go go';
 6 $groupChatID=568;
 7 $arr=array('sender'=>$user, 'message'=>$message, 'time'=>time());
 8 $d=$object->sendMessage($user,$groupChatID,$arr);
 9 echo "<pre>";
10 print_r($d);
11 echo "</pre>";die;

 

Redis 应用 ---- 消息传递

Redis 应用 ---- 消息传递

5、建立 getNewMessage.php(测试用户获取新消息功能)

1 include './ManyPullMessage.class.php';
2 $object=new ManyPullMessage('192.168.95.11');
3 # 用户获取新消息 
4 $e=$object->getNewMessage('jane2');
5 echo "<pre>";
6 print_r($e);
7 echo "</pre>";die;

 

Redis 应用 ---- 消息传递

6、建立 getPartMessage.php(测试用户获取某个群组部分消息)

(多发送几条消息,用于测试。568 中共 18 条数据)

 
1 include './ManyPullMessage.class.php';
2 $object=new ManyPullMessage('192.168.95.11');
3 # 用户获取某个群组部分消息 
4 $f=$object->getPartMessage('jane2', 568, 1, 10); 
5 echo "<pre>";
6 print_r($f);
7 echo "</pre>";die;

 

page=1,size=10

Redis 应用 ---- 消息传递

page=2,size=10

Redis 应用 ---- 消息传递

测试完毕,还需要别的功能可以自己进行修改添加测试。

本文永久更新链接地址 :http://www.linuxidc.com/Linux/2017-09/146928.htm

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-22发表,共计12051字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中