共计 6471 个字符,预计需要花费 17 分钟才能阅读完成。
如果用户量增加后为了解决吞吐量问题,需要引入集群,在 openfire 中提供了集群的支持,另外也实现了两个集群插件:hazelcast 和 clustering。为了了解情况集群的工作原理,我就沿着 openfire 的源代码进行了分析,也是一次学习的过程。
数据库因为对于 openfire 来说基本上是透明的,所以这块就交给数据库本身来实现。缓存数据缓存是存在内存里的,所以这部分是要同步的sessionsession 在 openfire 并不需要所有实例同步,但是需要做用户路由缓存,否则发消息时找不到对应的会话。由此用户路由还是要同步的。
- 缓存接口
public interface Cache<K,V> extends java.util.Map<K,V>
如果不开启集群时缓存的默认缓存容器类是:public class DefaultCache<K, V>,实际上 DefaultCache 就是用一个 Hashmap 来存数据的。
- 缓存工厂类
public class CacheFactory
/**
* Returns the named cache, creating it as necessary.
*
* @param name the name of the cache to create.
* @return the named cache, creating it as necessary.
*/
@SuppressWarnings("unchecked")
public static synchronized <T extends Cache> T createCache(String name) {T cache = (T) caches.get(name);
if (cache != null) {return cache;
}
cache = (T) cacheFactoryStrategy.createCache(name);
log.info("Created cache [" + cacheFactoryStrategy.getClass().getName() + "] for" + name);
return wrapCache(cache, name);
}
上面代码中会通过缓存工厂策略对象来创建一个缓存容器,最后 warpCache 方法会将此容器放入到 caches 中。
- 缓存工厂类的策略
- startup
public static synchronized void startup() {if (isClusteringEnabled() && !isClusteringStarted()) {initEventDispatcher();
CacheFactory.startClustering();}
}
- 会使用集群的缓存工厂策略来启动,同时使自己加入到集群中。
- 开启一个线程用于同步缓存的状态
- shutdown
public static void doClusterTask(final ClusterTask<?> task) {cacheFactoryStrategy.doClusterTask(task);
}
这里有个限定就是必须是 ClusterTask 派生的类才行,看看它的定义:
public interface ClusterTask<V> extends Runnable, Externalizable {V getResult();
}
主要是为了异步执行和序列化,异步是因为不能阻塞,而序列化当然就是为了能在集群中传送。
- 缓存策略工厂类(ClusteredCacheFactory)
public class ClusteredCacheFactory implements CacheFactoryStrategy {
首先是 startCluster 方法用于启动集群,主要完成几件事情:
-
- 设置缓存序列化工具类,ClusterExternalizableUtil。这个是用于集群间数据复制时的序列化工具
- 设置远程 session 定位器,RemoteSessionLocator,因为 session 不同步,所以它主要是用于多实例间的 session 读取
- 设置远程包路由器 ClusterPacketRouter,这样就可以在集群中发送消息了
- 加载 Hazelcast 的实例设置 NodeID,以及设置 ClusterListener
/**
* Notification message indicating that this JVM has joined a cluster.
*/
@SuppressWarnings("unchecked")
public static synchronized void joinedCluster() {cacheFactoryStrategy = clusteredCacheFactoryStrategy;
// Loop through local caches and switch them to clustered cache (copy content)
for (Cache cache : getAllCaches()) {// skip local-only caches
if (localOnly.contains(cache.getName())) continue;
CacheWrapper cacheWrapper = ((CacheWrapper) cache);
Cache clusteredCache = cacheFactoryStrategy.createCache(cacheWrapper.getName());
clusteredCache.putAll(cache);
cacheWrapper.setWrappedCache(clusteredCache);
}
clusteringStarting = false;
clusteringStarted = true;
log.info("Clustering started; cache migration complete");
}
这里可以看到会读取所有的缓存容器并一个个的使用 Wrapper 包装一下,然后用同样的缓存名称去 createCache 一个新的 Cache,这步使用的是切换后的集群缓存策略工厂,也就是说会使用 ClusteredCacheFactory 去创建新的缓存容器。最后再将 cache 写入到新的 clusteredCache 里,这样就完成了缓存的切换。
public Cache createCache(String name) {// Check if cluster is being started up
while (state == State.starting) {// Wait until cluster is fully started (or failed)
try {Thread.sleep(250);
}
catch (InterruptedException e) {// Ignore
}
}
if (state == State.stopped) {throw new IllegalStateException("Cannot create clustered cache when not in a cluster");
}
return new ClusteredCache(name, hazelcast.getMap(name));
}
这里使用的是 ClusteredCache,而且最重要的是传入的第二个 map 参数换成了 hazelcast 的了,这样之后再访问这个缓存容器时已经不再是原先的本地 Cache 了,已经是 hazelcast 的 map 对象。hazelcast 会自动对 map 的数据进行同步管理,这也就完成了缓存同步的功能。
- 集群计算
那就看 hazelcast 的实现吧,在 ClusteredCacheFactory 中 doClusterTask 举个例子吧:
public void doClusterTask(final ClusterTask task) {if (cluster == null) {return; }
Set<Member> members = new HashSet<Member>();
Member current = cluster.getLocalMember();
for(Member member : cluster.getMembers()) {if (!member.getUuid().equals(current.getUuid())) {members.add(member);
}
}
if (members.size() > 0) {// Asynchronously execute the task on the other cluster members
logger.debug("Executing asynchronous MultiTask:" + task.getClass().getName());
hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME).submitToMembers(new CallableTask<Object>(task), members);
} else {logger.warn("No cluster members selected for cluster task" + task.getClass().getName());
}
}
过程就是,先获取到集群中的实例成员,当然要排除自己。然后 hazelcast 提供了 ExecutorService 来执行这个 task,方法就是 submiteToMembers。这样就提交了一个运算任务。只不过具体是如何分配计算并汇集结果倒真不太清楚。
总结
CentOS 下 Openfire 详细安装过程 http://www.linuxidc.com/Linux/2012-09/69539.htm
CentOS 5.4 下基于 Jabber/XMPP 协议的 Openfire 服务器配置笔记 http://www.linuxidc.com/Linux/2012-02/55497.htm
Ubuntu 12.04 安装 Openfire http://www.linuxidc.com/Linux/2012-07/64945.htm
Openfire 在使用 MySQL 数据库后的中文乱码问题解决 http://www.linuxidc.com/Linux/2014-03/97989.htm
通过 Nginx 实现 Openfire 集群的负载均衡 http://www.linuxidc.com/Linux/2015-09/122943.htm
Openfire 的详细介绍:请点这里
Openfire 的下载地址:请点这里
本文永久更新链接地址:http://www.linuxidc.com/Linux/2016-07/133531.htm