阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Openfire集群源码分析

206次阅读
没有评论

共计 6471 个字符,预计需要花费 17 分钟才能阅读完成。

如果用户量增加后为了解决吞吐量问题,需要引入集群,在 openfire 中提供了集群的支持,另外也实现了两个集群插件:hazelcast 和 clustering。为了了解情况集群的工作原理,我就沿着 openfire 的源代码进行了分析,也是一次学习的过程。

首先理解集群的一些简单概念
集群的目的是让多个实例像一个实例一样运行,这样就可以通过增长实例来增长计算能力。也就是所谓的分布式计算问题,这其中最为关注的一个特性就是——CAP 理论,也就是所谓的一致性、可用性、分区容错性。集群中最核心解决的问题就是 CAP。
CAP 综合理解就是我上面写的,多个实例像一个实例一样运行。
 
所以所谓集群就是把一些数据共享或者同步到不同的实例上,这样系统使用同样的算法,取的结果当然应该是相同啦。所以一些数据库的主从复制,缓存数据集群都是类似这种解决方法。只是代码实现质量和处理规模的问题。
 
有了这个基础我们再来看看 openfire 是怎么解决这个问题的。
 
openfire 的集群设计
 
1、哪些需要进行集群间的同步
 对于 openfire 而言,有这几方面的数据需要进行保证集群间的同步:数据库存的数据、缓存数据、session。貌似就这些吧?
数据库
因为对于 openfire 来说基本上是透明的,所以这块就交给数据库本身来实现。
缓存数据
缓存是存在内存里的,所以这部分是要同步的
session
session 在 openfire 并不需要所有实例同步,但是需要做用户路由缓存,否则发消息时找不到对应的会话。由此用户路由还是要同步的。
 
2、缓存的设计
  • 缓存接口
openfire 里对缓存的数据容器提供了一个包装接口,这个接口提供了缓存数据的基本方法,用于统一数据操作。
public interface Cache<K,V> extends java.util.Map<K,V>

如果不开启集群时缓存的默认缓存容器类是:public class DefaultCache<K, V>,实际上 DefaultCache 就是用一个 Hashmap 来存数据的。

  • 缓存工厂类
为了保证缓存是可以扩展的,提供了一个工厂类:
public class CacheFactory
CacheFactory 类中会管理所有的缓存容器,如下代码:
/**
     * Returns the named cache, creating it as necessary.
     *
     * @param name         the name of the cache to create.
     * @return the named cache, creating it as necessary.
     */
    @SuppressWarnings("unchecked")
    public static synchronized <T extends Cache> T createCache(String name) {T cache = (T) caches.get(name);
        if (cache != null) {return cache;
        }
        cache = (T) cacheFactoryStrategy.createCache(name);
 
        log.info("Created cache [" + cacheFactoryStrategy.getClass().getName() + "] for" + name);
 
        return wrapCache(cache, name);
    }

上面代码中会通过缓存工厂策略对象来创建一个缓存容器,最后 warpCache 方法会将此容器放入到 caches 中。

  • 缓存工厂类的策略
在 CacheFactory 中默认是使用一个 DefaultLocalCacheStrategy 来完成缓存创建的。另外还提供了在集群条件下的缓存策略接入。也就是通过实例化不同的策略来切换缓存管理方案。比如后面要提到的 hazelcast 就是通过这个来替换了本地缓存策略的。从接口的设计上来看,openfire 的缓存策略也就是为了集群与非集群的实现。
 
3、集群的设计
在 openfire 中的集群主要包括:集群管理、数据同步管理、集群计算任务。
 
集群管理者
在 openfire 中主要是一个类来实现:ClusterManager,在 ClusterManager 中实现了集群实例的加入、退出管理,因为没有使用主从结构,所以 ClusterManager 实现了一个无中心管理,不知道我理解的对不对。因为只要当前实实例启用了集群,ClusterManager 就会主动的加载集群管理并与其他的集群进行同步。
 
  • startup
startup 是启动集群的方法,代码:
 
public static synchronized void startup() {if (isClusteringEnabled() && !isClusteringStarted()) {initEventDispatcher();
            CacheFactory.startClustering();}
    }
首先要判断是否开启了集群并且当前集群实例未运行时才去启动。
先是初始化了事件分发器,用于处理集群的同步事情。
 
然后就是调用 CacheFactory 的 startClustering 来运行集群。在 startClustering 方法中主要是这几个事情:
    • 会使用集群的缓存工厂策略来启动,同时使自己加入到集群中。
    • 开启一个线程用于同步缓存的状态
 
在前面 startup 中的 initEventDispatcher 方法, 在这里会注册一个分发线程监听到集群事件,收到事件后会执行 joinedCluster 或者 leftCluster 的操作,joinedCluster 就是加入到集群中的意思。
 
在 joinedCluster 时会将本地的缓存容器都转换为集群缓存。由此便完成了集群的初始化并加入到集群中了。
 
  • shutdown
shutdown 相对简单点就是退出集群,并且将缓存工厂恢复为本地缓存。
 
同步管理
上面主要是讲了如何管理集群,接着比较重要的就是如何在集群间同步数据呢?这部分主要是看具体的分布式计算系统的实现了,从 openfire 来说就是将数据放到集群缓存中,然后通过集群组件来完成的,比如使用 hazelcast。
 
因为使用缓存来解决,所以在 CacheFactory 中才会有这些么多关于集群的处理代码,特别是对于缓存策略的切换,以及集群任务处理都在 CacheFactory 作为接口方法向外公开。这样也把集群的实现透明了。
 
集群计算任务 
在这之前一直没有提到集群中的计算问题,因为既然有了集群是不是可以利用集群的优势进行一些并行计算呢?这部分我倒没有太过确定,只是看到相关的代码所以简单列一下。
 
在 CacheFactory 类中有几个方法:doClusterTask、doSynchronousClusterTask,这两个都是 overload 方法,参数有所不同而已。这几个方法就是用于执行一些计算任务的。就看一下 doClusterTask:
public static void doClusterTask(final ClusterTask<?> task) {cacheFactoryStrategy.doClusterTask(task);
    }

这里有个限定就是必须是 ClusterTask 派生的类才行,看看它的定义:

public interface ClusterTask<V> extends Runnable, Externalizable {V getResult();
 
}

主要是为了异步执行和序列化,异步是因为不能阻塞,而序列化当然就是为了能在集群中传送。

再看 CacheFactory 的 doClusterTask 方法可以发现,它只不过是代理了缓存策略工厂的 doClusterTask,具体的实现还是要看集群实现的。
 
看一看 hazelcast 的实现简单理解 openfire 集群
在 openfire 中有集群的插件实现,这里就以 hazelcast 为例子简单的做一下分析与学习。
 
  • 缓存策略工厂类(ClusteredCacheFactory)
 
ClusteredCacheFactory 实现了 CacheFactoryStrategy,代码如下:
public class ClusteredCacheFactory implements CacheFactoryStrategy {

首先是 startCluster 方法用于启动集群,主要完成几件事情:

    • 设置缓存序列化工具类,ClusterExternalizableUtil。这个是用于集群间数据复制时的序列化工具
    • 设置远程 session 定位器,RemoteSessionLocator,因为 session 不同步,所以它主要是用于多实例间的 session 读取
    • 设置远程包路由器 ClusterPacketRouter,这样就可以在集群中发送消息了
    • 加载 Hazelcast 的实例设置 NodeID,以及设置 ClusterListener
 
在前面说起集群启动时提到了缓存切换,那具体实现时是如何做的呢?
 
因为集群启动后就要是 CacheFactory.joinedCluster 方法来加入集群的。看一下加入的代码:
/**
     * Notification message indicating that this JVM has joined a cluster.
     */
    @SuppressWarnings("unchecked")
    public static synchronized void joinedCluster() {cacheFactoryStrategy = clusteredCacheFactoryStrategy;
        // Loop through local caches and switch them to clustered cache (copy content)
        for (Cache cache : getAllCaches()) {// skip local-only caches
            if (localOnly.contains(cache.getName())) continue;
            CacheWrapper cacheWrapper = ((CacheWrapper) cache);
            Cache clusteredCache = cacheFactoryStrategy.createCache(cacheWrapper.getName());
            clusteredCache.putAll(cache);
            cacheWrapper.setWrappedCache(clusteredCache);
        }
        clusteringStarting = false;
        clusteringStarted = true;
        log.info("Clustering started; cache migration complete");
    }

这里可以看到会读取所有的缓存容器并一个个的使用 Wrapper 包装一下,然后用同样的缓存名称去 createCache 一个新的 Cache,这步使用的是切换后的集群缓存策略工厂,也就是说会使用 ClusteredCacheFactory 去创建新的缓存容器。最后再将 cache 写入到新的 clusteredCache 里,这样就完成了缓存的切换。

当然这里还是要看一下 ClusteredCacheFactory 的 createCache 实现:
public Cache createCache(String name) {// Check if cluster is being started up
        while (state == State.starting) {// Wait until cluster is fully started (or failed)
            try {Thread.sleep(250);
            }
            catch (InterruptedException e) {// Ignore
            }
        }
        if (state == State.stopped) {throw new IllegalStateException("Cannot create clustered cache when not in a cluster");
        }
        return new ClusteredCache(name, hazelcast.getMap(name));
    }
 

这里使用的是 ClusteredCache,而且最重要的是传入的第二个 map 参数换成了 hazelcast 的了,这样之后再访问这个缓存容器时已经不再是原先的本地 Cache 了,已经是 hazelcast 的 map 对象。hazelcast 会自动对 map 的数据进行同步管理,这也就完成了缓存同步的功能。

  • 集群计算

那就看 hazelcast 的实现吧,在 ClusteredCacheFactory 中 doClusterTask 举个例子吧:

public void doClusterTask(final ClusterTask task) {if (cluster == null) {return; }
        Set<Member> members = new HashSet<Member>();
        Member current = cluster.getLocalMember();
        for(Member member : cluster.getMembers()) {if (!member.getUuid().equals(current.getUuid())) {members.add(member);
            }
        }
        if (members.size() > 0) {// Asynchronously execute the task on the other cluster members
            logger.debug("Executing asynchronous MultiTask:" + task.getClass().getName());
            hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME).submitToMembers(new CallableTask<Object>(task), members);
        } else {logger.warn("No cluster members selected for cluster task" + task.getClass().getName());
        }
    }

过程就是,先获取到集群中的实例成员,当然要排除自己。然后 hazelcast 提供了 ExecutorService 来执行这个 task,方法就是 submiteToMembers。这样就提交了一个运算任务。只不过具体是如何分配计算并汇集结果倒真不太清楚。

总结

花了一天时间看了一下 openfire 的集群,顺手就写了一篇文章,确实也到了一些东西。和一些网友沟通中好像目前大家更愿意使用 redies 来完成缓存共享,以及通过代理来实现集群,而不愿意使用 openfire 的集群方案。这部分我没有遇到如何大的并发量需求确实不知道区别在哪里。以后有机会还是动手试试写一个 redies 的插件。

CentOS 下 Openfire 详细安装过程 http://www.linuxidc.com/Linux/2012-09/69539.htm

CentOS 5.4 下基于 Jabber/XMPP 协议的 Openfire 服务器配置笔记 http://www.linuxidc.com/Linux/2012-02/55497.htm

Ubuntu 12.04 安装 Openfire http://www.linuxidc.com/Linux/2012-07/64945.htm

Openfire 在使用 MySQL 数据库后的中文乱码问题解决 http://www.linuxidc.com/Linux/2014-03/97989.htm

通过 Nginx 实现 Openfire 集群的负载均衡  http://www.linuxidc.com/Linux/2015-09/122943.htm

Openfire 的详细介绍:请点这里
Openfire 的下载地址:请点这里

本文永久更新链接地址:http://www.linuxidc.com/Linux/2016-07/133531.htm

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-21发表,共计6471字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中