共计 23138 个字符,预计需要花费 58 分钟才能阅读完成。
一、测试环境说明
本次测试 Linux 端和 Windows 端主要参数配置如下:
Linux:
操作系统版本:SUSE Linux Enterprise Server 10 SP4 32bit
MQ 版本:7.1.0.3
Ip 地址:192.168.0.151
Windows:
操作系统版本:Windows 7 旗舰版 SP1 32bit
MQ 版本:7.1.0.3
Ip 地址:192.168.0.111
二、MQ 简介
1)消息中间件概述
消息队列技术是分布式应用间交换信息的一种技术。消息队列可驻留在内存或磁盘上, 队列存储消息直到它们被应用程序读走。通过消息队列,应用程序可独立地执行,它们不需要知道彼此的位置、或在继续执行前不需要等待接收程序接收此消息。
在分布式计算环境中,为了集成分布式应用,开发者需要对异构网络环境下的分布式应用提供有效的通信手段。为了管理需要共享的信息,对应用提供公共的信息交换机制是重要的。
2)消息队列 (Message Queue)
消息队列为构造以同步或异步方式实现的分布式应用提供了松耦合方法。消息队列的 API 调用被嵌入到新的或现存的应用中,通过消息发送到内存或基于磁盘的队列或从它读出而提供信息交换。消息队列可用在应用中以执行多种功能,比如要求服务、交换信息或异步处理等。
中间件是一种独立的系统软件或服务程序,分布式应用系统借助这种软件在不同的技术之间共享资源,管理计算资源和网络通讯。它在计算机系统中是一个关键软件,它能实现应用的互连和互操作性,能保证系统的安全、可靠、高效的运行。中间件位于用户应用和操作系统及网络软件之间,它为应用提供了公用的通信手段,并且独立于网络和操作系统。中间件为开发者提供了公用于所有环境的应用程序接口,当应用程序中嵌入其函数调用,它便可利用其运行的特定操作系统和网络环境的功能,为应用执行通信功能。
如果没有消息中间件完成信息交换,应用开发者为了传输数据,必须要学会如何用网络和操作系统软件的功能,编写相应的应用程序来发送和接收信息,且交换信息没有标准方法,每个应用必须进行特定的编程从而和多平台、不同环境下的一个或多个应用通信。例如,为了实现网络上不同主机系统间的通信,将要求具备在网络上如何交换信息的知识(比如用 TCP/IP 的 socket 程序设计);为了实现同一主机内不同进程之间的通讯,将要求具备操作系统的消息队列或命名管道 (Pipes) 等知识。
MQ 具有强大的跨平台性,它支持的平台数多达 35 种。它支持各种主流 Unix 操作系统平台, 如:HP-UX、AIX、SUN Solaris、Digital UNIX、Open VMX、SUNOS、NCR UNIX;支持各种主机平台,如:OS/390、MVS/ESA、VSE/ESA;同样支持 Windows NT 服务器。在 PC 平台上支持 Windows9X/Windows NT/Windows 2000 和 UNIX (UnixWare、Solaris)以及主要的 Linux 版本(RedHat、TurboLinux 等)。此外,MQ 还支持其他各种操作系统平台,如:OS/2、AS/400、Sequent DYNIX、SCO OpenServer、SCO UnixWare、Tandem 等。
三、基本概念
1)队列管理器
队列管理器是 MQ 系统中最上层的一个概念,由它为我们提供基于队列的消息服务。
2)消息
在 MQ 中,我们把应用程序交由 MQ 传输的数据定义为消息,我们可以定义消息的内容并对消息进行广义的理解,比如:用户的各种类型的数据文件,某个应用向其它应用发出的处理请求等都可以作为消息。消息有两部分组成:消息描述符(Message Discription 或 Message Header),描述消息的特征,如:消息的优先级、生命周期、消息 Id 等;
消息体 (Message Body),即用户数据部分。在 MQ 中,消息分为两种类型,非永久性(non-persistent) 消息和永久性 (persistent) 消息,非永久性消息是存储在内存中的,它是为了提高性能而设计的,当系统掉电或 MQ 队列管理器重新启动时,将不可恢复。当用户对消息的可靠性要求不高,而侧重系统的性能表现时,可以采用该种类型的消息,如:当发布股票信息时,由于股票信息是不断更新的,我们可能每若干秒就会发布一次,新的消息会不断覆盖旧的消息。永久性消息是存储在硬盘上,并且纪录数据日志的,它具有高可靠性,在网络和系统发生故障等情况下都能确保消息不丢、不重。
此外,在 MQ 中,还有逻辑消息和物理消息的概念。利用逻辑消息和物理消息,我们可以将大消息进行分段处理,也可以将若干个本身完整的消息在应用逻辑上归为一组进行处理。
3)消息队列
队列是消息的安全存放地,队列存储消息直到它被应用程序处理。
消息队列以下述方式工作:
a) 程序 A 形成对消息队列系统的调用,此调用告知消息队列系统,消息准备好了投向程序 B;
b) 消息队列系统发送此消息到程序 B 驻留处的系统,并将它放到程序 B 的队列中;
c) 适当时间后,程序 B 从它的队列中读此消息,并处理此信息。
由于采用了先进的程序设计思想以及内部工作机制,MQ 能够在各种网络条件下保证消息的可靠传递,可以克服网络线路质量差或不稳定的现状,在传输过程中,如果通信线路出现故障或远端的主机发生故障,本地的应用程序都不会受到影响,可以继续发送数据,而无需等待网络故障恢复或远端主机正常后再重新运行。
在 MQ 中,队列分为很多种类型,其中包括:本地队列、远程队列、模板队列、动态队列、别名队列等。
本地队列又分为普通本地队列和传输队列,普通本地队列是应用程序通过 API 对其进行读写操作的队列;传输队列可以理解为存储 - 转发队列,比如:我们将某个消息交给 MQ 系统发送到远程主机,而此时网络发生故障,MQ 将把消息放在传输队列中暂存,当网络恢复时,再发往远端目的地。
远程队列是目的队列在本地的定义,它类似一个地址指针,指向远程主机上的某个目的队列,它仅仅是个定义,不真正占用磁盘存储空间。
模板队列和动态队列是 MQ 的一个特色,它的一个典型用途是用作系统的可扩展性考虑。我们可以创建一个模板队列,当今后需要新增队列时,每打开一个模板队列,MQ 便会自动生成一个动态队列,我们还可以指定该动态队列为临时队列或者是永久队列,若为临时队列我们可以在关闭它的同时将它删除,相反,若为永久队列,我们可以将它永久保留,为我所用。
4)通道
通道是 MQ 系统中队列管理器之间传递消息的管道,它是建立在物理的网络连接之上的一个逻辑概念,也是 MQ 产品的精华。
在 MQ 中,主要有三大类通道类型,即消息通道,MQI 通道和 Cluster 通道。消息通道是用于在 MQ 的服务器和服务器之间传输消息的,需要强调指出的是,该通道是单向的,它又有发送 (sender), 接收(receive), 请求者(requestor), 服务者(server) 等不同类型,供用户在不同情况下使用。MQI 通道是 MQ Client 和 MQ Server 之间通讯和传输消息用的,与消息通道不同,它的传输是双向的。群集 (Cluster) 通道是位于同一个 MQ 群集内部的队列管理器之间通讯使用的。
工作原理:
如图所示:
首先来看本地通讯的情况,应用程序 A 和应用程序 B 运行于同一系统 A,它们之间可以借助消息队列技术进行彼此的通讯:应用程序 A 向队列 1 发送一条信息,而当应用程序 B 需要时就可以得到该信息。
其次是远程通讯的情况,如果信息传输的目标改为在系统 B 上的应用程序 C,这种变化不会对应用程序 A 产生影响,应用程序 A 向队列 2 发送一条信息,系统 A 的 MQ 发现 Q2 所指向的目的队列实际上位于系统 B,它将信息放到本地的一个特殊队列-传输队列 (Transmission Queue)。我们建立一条从系统 A 到系统 B 的消息通道,消息通道代理将从传输队列中读取消息,并传递这条信息到系统 B,然后等待确认。只有 MQ 接到系统 B 成功收到信息的确认之后,它才从传输队列中真正将该信息删除。如果通讯线路不通,或系统 B 不在运行,信息会留在传输队列中,直到被成功地传送到目的地。这是 MQ 最基本而最重要的技术 – 确保信息传输,并且是一次且仅一次(once-and-only-once) 的传递。
MQ 提供了用于应用集成的松耦合的连接方法,因为共享信息的应用不需要知道彼此物理位置(网络地址);不需要知道彼此间怎样建立通信;不需要同时处于运行状态;不需要在同样的操作系统或网络环境下运行。
四、搭建过程
MQ 软件下载链接:
http://www-03.ibm.com/software/products/en/ibm-mq
2)Linux 版搭建
1. 在 /tmp 目录下新建 mq 目录,将介质 WMQ_7.1.0.3_TRIAL_LNX_X86_32_ML.tar 上传至 /tmp/mq 目录并解压
2. 运行许可证程序:
mq01:~/mq # ./mqlicense.sh –accept
根据提示,输入数字 1 接受协议
3. 安装 WebSphere MQ for Linux 服务器,即分别安装 Runtime、SDK 和 Server 软件包:
mq01:~/mq # rpm -ivh MQSeriesRuntime-7.1.0-3.i386.rpm
mq01:~/mq # rpm -ivh MQSeriesSDK-7.1.0-3.i386.rpm
mq01:~/mq # rpm -ivh MQSeriesServer-7.1.0-3.i386.rpm
进行 MQSeriesRuntime-7.1.0-3.i386.rpm 安装的时候, 系统自动创建了一个 mqm 用户和 mqm 组,安装完毕后,需要使用该用户来进行 MQ 的配置。
4. 更改用户 mqm 属性,设置密码和环境变量
4.1 使用 root 用户将 mqm:x:104:1000::/var/mqm:/bin/false 更改为 mqm:x:104:1000::/var/mqm:/bin/bash
4.2 使用 root 用户修改 mqm 密码:
4.3 设置 mqm 环境变量,mqm 用户下新建.bash_profile 文件,写入如下内容:
export PATH=/opt/mqm/samp/bin:/opt/mqm/bin:/usr/bin:/usr/sbin:/usr/local/bin:/usr/X11R6/bin:$PATH
5. 运行 /opt/mqm/bin/mqconfig,检查系统参数设置是否满足软件要求
将所有标为“FAIL”的检测项调整到“PASS”;针对 System V Semaphores、System V Shared Memory 和 System Settings,修改 /etc/sysctl.conf 文件,添加或修改其中的一些值:
kernel.shmall = 2097152
kernel.shmmax = 2147483648
kernel.shmmni = 4096
kernel.sem = 250 32000 100 128
fs.file-max = 524288
net.ipv4.ip_local_port_range = 1024 65000
kernel.msgmni = 1024
net.ipv4.tcp_keepalive_time = 300
kernel.sem =500 256000 250 1024
输入命令 sysctl–p,使设置生效
针对 Current User Limits (root)参数:打开 /etc/security/limits.conf 文件,添加或修改如下配置,设置 mqm 用户的最大文件打开数限制:
mqm hard nofile 10240
mqm soft nofile 10240
6. 修改完毕之后,重新运行 /opt/mqm/bin/mqconfig 确认我们的修改已经生效,然后就可以进行下一步了;否则重新修改
7. 安装 WebSphere MQ for Linux 客户端
mq01:~/mq # rpm -ivh MQSeriesClient-7.1.0-3.i386.rpm
8. 安装 WebSphere MQ 样本程序
mq01:~/mq # rpm -ivh MQSeriesSamples-7.1.0-3.i386.rpm
9. 安装 MQ 其他软件包
mq01:~/mq # rpm -ivh MQSeriesMan-7.1.0-3.i386.rpm
mq01:~/mq # rpm -ivh MQSeriesJava-7.1.0-3.i386.rpm
mq01:~/mq # rpm -ivh MQSeriesMsg_Zh_CN-7.1.0-3.i386.rpm
至此完成 MQ 在 linux 上安装工作
更多详情见请继续阅读下一页的精彩内容:https://www.linuxidc.com/Linux/2018-09/154220p2.htm
1)Windows 版搭建
解压 WS_MQ_V7.1.0.3_TRIAL_FOR_WINDOWS_ML.zip,双击 Setup.exe 安装运行,选择 WebSphere MQ 安装(I)
选择安装语言为“简体中文”,单击“启动 IBM WebSphere MQ 安装程序”开始安装
安装截图 1
安装截图 2, 选择接受,下一步
安装类型选择“定制”,下一步
选择程序安装目录,下一步
下一步
勾选所有功能选项,下一步
安装
安装状态截图
安装完成
运行安装向导
运行截图
选择“否”,下一步
安装完成
启动界面
一、测试环境说明
本次测试 Linux 端和 Windows 端主要参数配置如下:
Linux:
操作系统版本:SUSE Linux Enterprise Server 10 SP4 32bit
MQ 版本:7.1.0.3
Ip 地址:192.168.0.151
Windows:
操作系统版本:Windows 7 旗舰版 SP1 32bit
MQ 版本:7.1.0.3
Ip 地址:192.168.0.111
二、MQ 简介
1)消息中间件概述
消息队列技术是分布式应用间交换信息的一种技术。消息队列可驻留在内存或磁盘上, 队列存储消息直到它们被应用程序读走。通过消息队列,应用程序可独立地执行,它们不需要知道彼此的位置、或在继续执行前不需要等待接收程序接收此消息。
在分布式计算环境中,为了集成分布式应用,开发者需要对异构网络环境下的分布式应用提供有效的通信手段。为了管理需要共享的信息,对应用提供公共的信息交换机制是重要的。
2)消息队列 (Message Queue)
消息队列为构造以同步或异步方式实现的分布式应用提供了松耦合方法。消息队列的 API 调用被嵌入到新的或现存的应用中,通过消息发送到内存或基于磁盘的队列或从它读出而提供信息交换。消息队列可用在应用中以执行多种功能,比如要求服务、交换信息或异步处理等。
中间件是一种独立的系统软件或服务程序,分布式应用系统借助这种软件在不同的技术之间共享资源,管理计算资源和网络通讯。它在计算机系统中是一个关键软件,它能实现应用的互连和互操作性,能保证系统的安全、可靠、高效的运行。中间件位于用户应用和操作系统及网络软件之间,它为应用提供了公用的通信手段,并且独立于网络和操作系统。中间件为开发者提供了公用于所有环境的应用程序接口,当应用程序中嵌入其函数调用,它便可利用其运行的特定操作系统和网络环境的功能,为应用执行通信功能。
如果没有消息中间件完成信息交换,应用开发者为了传输数据,必须要学会如何用网络和操作系统软件的功能,编写相应的应用程序来发送和接收信息,且交换信息没有标准方法,每个应用必须进行特定的编程从而和多平台、不同环境下的一个或多个应用通信。例如,为了实现网络上不同主机系统间的通信,将要求具备在网络上如何交换信息的知识(比如用 TCP/IP 的 socket 程序设计);为了实现同一主机内不同进程之间的通讯,将要求具备操作系统的消息队列或命名管道 (Pipes) 等知识。
MQ 具有强大的跨平台性,它支持的平台数多达 35 种。它支持各种主流 Unix 操作系统平台, 如:HP-UX、AIX、SUN Solaris、Digital UNIX、Open VMX、SUNOS、NCR UNIX;支持各种主机平台,如:OS/390、MVS/ESA、VSE/ESA;同样支持 Windows NT 服务器。在 PC 平台上支持 Windows9X/Windows NT/Windows 2000 和 UNIX (UnixWare、Solaris)以及主要的 Linux 版本(RedHat、TurboLinux 等)。此外,MQ 还支持其他各种操作系统平台,如:OS/2、AS/400、Sequent DYNIX、SCO OpenServer、SCO UnixWare、Tandem 等。
三、基本概念
1)队列管理器
队列管理器是 MQ 系统中最上层的一个概念,由它为我们提供基于队列的消息服务。
2)消息
在 MQ 中,我们把应用程序交由 MQ 传输的数据定义为消息,我们可以定义消息的内容并对消息进行广义的理解,比如:用户的各种类型的数据文件,某个应用向其它应用发出的处理请求等都可以作为消息。消息有两部分组成:消息描述符(Message Discription 或 Message Header),描述消息的特征,如:消息的优先级、生命周期、消息 Id 等;
消息体 (Message Body),即用户数据部分。在 MQ 中,消息分为两种类型,非永久性(non-persistent) 消息和永久性 (persistent) 消息,非永久性消息是存储在内存中的,它是为了提高性能而设计的,当系统掉电或 MQ 队列管理器重新启动时,将不可恢复。当用户对消息的可靠性要求不高,而侧重系统的性能表现时,可以采用该种类型的消息,如:当发布股票信息时,由于股票信息是不断更新的,我们可能每若干秒就会发布一次,新的消息会不断覆盖旧的消息。永久性消息是存储在硬盘上,并且纪录数据日志的,它具有高可靠性,在网络和系统发生故障等情况下都能确保消息不丢、不重。
此外,在 MQ 中,还有逻辑消息和物理消息的概念。利用逻辑消息和物理消息,我们可以将大消息进行分段处理,也可以将若干个本身完整的消息在应用逻辑上归为一组进行处理。
3)消息队列
队列是消息的安全存放地,队列存储消息直到它被应用程序处理。
消息队列以下述方式工作:
a) 程序 A 形成对消息队列系统的调用,此调用告知消息队列系统,消息准备好了投向程序 B;
b) 消息队列系统发送此消息到程序 B 驻留处的系统,并将它放到程序 B 的队列中;
c) 适当时间后,程序 B 从它的队列中读此消息,并处理此信息。
由于采用了先进的程序设计思想以及内部工作机制,MQ 能够在各种网络条件下保证消息的可靠传递,可以克服网络线路质量差或不稳定的现状,在传输过程中,如果通信线路出现故障或远端的主机发生故障,本地的应用程序都不会受到影响,可以继续发送数据,而无需等待网络故障恢复或远端主机正常后再重新运行。
在 MQ 中,队列分为很多种类型,其中包括:本地队列、远程队列、模板队列、动态队列、别名队列等。
本地队列又分为普通本地队列和传输队列,普通本地队列是应用程序通过 API 对其进行读写操作的队列;传输队列可以理解为存储 - 转发队列,比如:我们将某个消息交给 MQ 系统发送到远程主机,而此时网络发生故障,MQ 将把消息放在传输队列中暂存,当网络恢复时,再发往远端目的地。
远程队列是目的队列在本地的定义,它类似一个地址指针,指向远程主机上的某个目的队列,它仅仅是个定义,不真正占用磁盘存储空间。
模板队列和动态队列是 MQ 的一个特色,它的一个典型用途是用作系统的可扩展性考虑。我们可以创建一个模板队列,当今后需要新增队列时,每打开一个模板队列,MQ 便会自动生成一个动态队列,我们还可以指定该动态队列为临时队列或者是永久队列,若为临时队列我们可以在关闭它的同时将它删除,相反,若为永久队列,我们可以将它永久保留,为我所用。
4)通道
通道是 MQ 系统中队列管理器之间传递消息的管道,它是建立在物理的网络连接之上的一个逻辑概念,也是 MQ 产品的精华。
在 MQ 中,主要有三大类通道类型,即消息通道,MQI 通道和 Cluster 通道。消息通道是用于在 MQ 的服务器和服务器之间传输消息的,需要强调指出的是,该通道是单向的,它又有发送 (sender), 接收(receive), 请求者(requestor), 服务者(server) 等不同类型,供用户在不同情况下使用。MQI 通道是 MQ Client 和 MQ Server 之间通讯和传输消息用的,与消息通道不同,它的传输是双向的。群集 (Cluster) 通道是位于同一个 MQ 群集内部的队列管理器之间通讯使用的。
工作原理:
如图所示:
首先来看本地通讯的情况,应用程序 A 和应用程序 B 运行于同一系统 A,它们之间可以借助消息队列技术进行彼此的通讯:应用程序 A 向队列 1 发送一条信息,而当应用程序 B 需要时就可以得到该信息。
其次是远程通讯的情况,如果信息传输的目标改为在系统 B 上的应用程序 C,这种变化不会对应用程序 A 产生影响,应用程序 A 向队列 2 发送一条信息,系统 A 的 MQ 发现 Q2 所指向的目的队列实际上位于系统 B,它将信息放到本地的一个特殊队列-传输队列 (Transmission Queue)。我们建立一条从系统 A 到系统 B 的消息通道,消息通道代理将从传输队列中读取消息,并传递这条信息到系统 B,然后等待确认。只有 MQ 接到系统 B 成功收到信息的确认之后,它才从传输队列中真正将该信息删除。如果通讯线路不通,或系统 B 不在运行,信息会留在传输队列中,直到被成功地传送到目的地。这是 MQ 最基本而最重要的技术 – 确保信息传输,并且是一次且仅一次(once-and-only-once) 的传递。
MQ 提供了用于应用集成的松耦合的连接方法,因为共享信息的应用不需要知道彼此物理位置(网络地址);不需要知道彼此间怎样建立通信;不需要同时处于运行状态;不需要在同样的操作系统或网络环境下运行。
四、搭建过程
MQ 软件下载链接:
http://www-03.ibm.com/software/products/en/ibm-mq
2)Linux 版搭建
1. 在 /tmp 目录下新建 mq 目录,将介质 WMQ_7.1.0.3_TRIAL_LNX_X86_32_ML.tar 上传至 /tmp/mq 目录并解压
2. 运行许可证程序:
mq01:~/mq # ./mqlicense.sh –accept
根据提示,输入数字 1 接受协议
3. 安装 WebSphere MQ for Linux 服务器,即分别安装 Runtime、SDK 和 Server 软件包:
mq01:~/mq # rpm -ivh MQSeriesRuntime-7.1.0-3.i386.rpm
mq01:~/mq # rpm -ivh MQSeriesSDK-7.1.0-3.i386.rpm
mq01:~/mq # rpm -ivh MQSeriesServer-7.1.0-3.i386.rpm
进行 MQSeriesRuntime-7.1.0-3.i386.rpm 安装的时候, 系统自动创建了一个 mqm 用户和 mqm 组,安装完毕后,需要使用该用户来进行 MQ 的配置。
4. 更改用户 mqm 属性,设置密码和环境变量
4.1 使用 root 用户将 mqm:x:104:1000::/var/mqm:/bin/false 更改为 mqm:x:104:1000::/var/mqm:/bin/bash
4.2 使用 root 用户修改 mqm 密码:
4.3 设置 mqm 环境变量,mqm 用户下新建.bash_profile 文件,写入如下内容:
export PATH=/opt/mqm/samp/bin:/opt/mqm/bin:/usr/bin:/usr/sbin:/usr/local/bin:/usr/X11R6/bin:$PATH
5. 运行 /opt/mqm/bin/mqconfig,检查系统参数设置是否满足软件要求
将所有标为“FAIL”的检测项调整到“PASS”;针对 System V Semaphores、System V Shared Memory 和 System Settings,修改 /etc/sysctl.conf 文件,添加或修改其中的一些值:
kernel.shmall = 2097152
kernel.shmmax = 2147483648
kernel.shmmni = 4096
kernel.sem = 250 32000 100 128
fs.file-max = 524288
net.ipv4.ip_local_port_range = 1024 65000
kernel.msgmni = 1024
net.ipv4.tcp_keepalive_time = 300
kernel.sem =500 256000 250 1024
输入命令 sysctl–p,使设置生效
针对 Current User Limits (root)参数:打开 /etc/security/limits.conf 文件,添加或修改如下配置,设置 mqm 用户的最大文件打开数限制:
mqm hard nofile 10240
mqm soft nofile 10240
6. 修改完毕之后,重新运行 /opt/mqm/bin/mqconfig 确认我们的修改已经生效,然后就可以进行下一步了;否则重新修改
7. 安装 WebSphere MQ for Linux 客户端
mq01:~/mq # rpm -ivh MQSeriesClient-7.1.0-3.i386.rpm
8. 安装 WebSphere MQ 样本程序
mq01:~/mq # rpm -ivh MQSeriesSamples-7.1.0-3.i386.rpm
9. 安装 MQ 其他软件包
mq01:~/mq # rpm -ivh MQSeriesMan-7.1.0-3.i386.rpm
mq01:~/mq # rpm -ivh MQSeriesJava-7.1.0-3.i386.rpm
mq01:~/mq # rpm -ivh MQSeriesMsg_Zh_CN-7.1.0-3.i386.rpm
至此完成 MQ 在 linux 上安装工作
更多详情见请继续阅读下一页的精彩内容:https://www.linuxidc.com/Linux/2018-09/154220p2.htm
五、测试
本次测试分三个场景,场景一为 linux 服务器上新建两个队列管理器 QM1 和 QM2,QM1 向 QM2 发送消息,观察消息是否正常送达 QM2;场景二为 windows 服务器新建队列管理器 QM3,向 linux 服务器上的 QM2 发送消息,观察消息是否正常送达 QM2;场景三为编写 java 程序,通过调用 QM1 的相关参数向 QM2 发送消息,观察消息是否正常送达 QM2。
1) 测试场景一
概述:向队列管理器 QM1 中的远程队列 QR 发送消息,通过传送队列 QX 和传输通道 C 将消息发送至队列管理器 QM2 中的本地队列 QL。
1. 在 linux 服务器 192.168.0.151 上新建两个队列管理器 QM1 和 QM2:
mqm@mq:~> crtmqm QM1
mqm@mq:~> crtmqm QM2
2. 启动 QM1 和 QM2
mqm@mq:~> strmqm QM1
mqm@mq:~> strmqm QM2
3.建立队列和通道
定义 QM1 的队列和通道创建脚本(/var/mqm 下新建 sh 目录,在 sh 目录下创建)
/var/mqm/sh$vi define_qm1.tst
DEFINE QREMOTE (QR) RNAME (QL) RQMNAME (QM2) XMITQ (QX) REPLACE
DEFINE QLOCAL (QX) USAGE (XMITQ) REPLACE
DEFINE CHANNEL (C) CHLTYPE (SDR) TRPTYPE (TCP) CONNAME (‘127.0.0.1 (1502)’) XMITQ (QX) REPLACE
4. 创建 QM1 的队列和通道:
mqm@mq:~/sh> runmqsc QM1 < define_qm1.tst > out
查看 out 文件,确认没有错误;
5. 定义 qm2 的队列和通道创建脚本
/var/mqm/sh$vi define_qm1.tst
DEFINE QLOCAL (QL) REPLACE
DEFINE CHANNEL (C) CHLTYPE (RCVR) TRPTYPE (TCP) REPLACE
6. 创建 QM2 的队列和通道
mqm@mq:~/sh> runmqsc QM2 < define_qm2.tst > out
查看 out 文件,确认没有错误;
7. 进入 mqsc 命令模式,在 QM2 上新建并启动监听
DEFINE LISTENER(L2) TRPTYPE(TCP) PORT(1502) CONTROL(QMGR) REPLACE
START LISTENER(L2)
8. 进入 mqsc 命令模式,在 QM1 上运行通道
START CHANNLE(C)
9. 发送报文测试
运行
mqm@mq:~/sh> amqsput QR QM1
输入“this is a test!”,双击回车结束
10. 接收报文测试
运行
mqm@mq:~/sh> amqsget QL QM2
ctrl+ c 结束
测试成功。
2) 测试场景二
概述:向 windows 下的队列管理器 QM3 中的远程队列 QR 发送消息,通过传送队列 QX 和传输通道 C 将消息发送至 linux 下的队列管理器 QM2 中的本地队列 QL。QM2 及其队列的创建参照场景一。
1. 在 windows 上新建队列管理器 QM3
创建方式有两种,一种为命令模式,可在 dos 下输入命令,类似 linux;一种为图形化界面,这里主要介绍图形化创建模式。
如图,“队列管理器”——“新建”——“队列管理器”
队列名为 QM3,下一步
监听端口自定义,这里取默认值 1414,单击完成
2. 新建本地队列 QX
“队列”——“新建”——“本地队列”
队列名为 QX
使用情况改成“传输”,单击完成,完成队列创建。
3. 新建远程队列 QR
“队列”——“新建”——“远程队列定义”
队列名为 QR,下一步
远程队列管理器填写 linux 服务器上的 QM2;远程队列为 QM2 下的本地队列 QL,传输队列为 windows 下 QM3 的本地队列 QX。单击完成,完成队列创建。
4. 创建发送方通道 C
“通道”——“新建”——“发送方通道”
通道名为 C,下一步
连接名为 linux 服务器 ip 地址 + 队列管理器 QM2 监听端口,传输队列为本地队列 QX,单击完成,完成发送通道创建
5. 启动发送通道 C
6. 放入消息至远程队列 QR
右键队列 QR,选择放入测试消息
写入 test,回车
7. 队列管理器 QM2 中浏览消息
查看队列管理器中消息方式有三种命令,分别是:amqsgbr、amqsbcg 和提取命令 amqsget,在 linux 服务器上执行浏览命令,如图,成功接收消息“test”
3) 测试场景三
概述:通过 java 程序调用 linux 下队列管理器 QM1 相关参数向 QM2 发送消息,观察 QM2 下的本地队列是否成功接收消息。QM1 和 QM2 及其队列等的创建参照测试场景一。
1. 导入相关 jar 包
下载链接:
所有文件已上传,可以到 Linux 公社资源站下载:
—————————————— 分割线 ——————————————
免费下载地址在 http://linux.linuxidc.com/
用户名与密码都是www.linuxidc.com
具体下载目录在 /2018 年资料 / 8 月 /19 日 /IBM MQ 运维使用手册 /
下载方法见 http://www.linuxidc.com/Linux/2013-07/87684.htm
—————————————— 分割线 ——————————————
2. 配置相关参数和测试消息
参数配置
写入测试消息“Hello 123”
3. 程序模板
package mq;
import java.io.IOException;
import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
public class MessageByMQ{
// 定义队列管理器和队列的名称
private static String qmName;
private static String qName;
private static MQQueueManager qMgr;
static{
// 设置环境:
//MQEnvironment 中包含控制 MQQueueManager 对象中的环境的构成的静态变量,MQEnvironment 的值的设定会在 MQQueueManager 的构造函数加载的时候起作用,
// 因此必须在建立 MQQueueManager 对象之前设定 MQEnvironment 中的值.
MQEnvironment.hostname=”192.168.0.151″; //MQ 服务器的 IP 地址
MQEnvironment.channel=”C1″; // 服务器连接的通道
MQEnvironment.CCSID=1208; // 服务器 MQ 服务使用的编码 1381 代表 GBK、1208 代表 UTF(Coded Character Set Identifier:CCSID)
MQEnvironment.port=1501; //MQ 端口
qmName = “QM1”; //MQ 的队列管理器名称
qName = “QR”; //MQ 远程队列的名称
try {
// 定义并初始化队列管理器对象并连接
//MQQueueManager 可以被多线程共享,但是从 MQ 获取信息的时候是同步的,任何时候只有一个线程可以和 MQ 通信。
qMgr = new MQQueueManager(qmName);
} catch (MQException e) {
// TODO Auto-generated catch block
System.out.println(“ 初使化 MQ 出错 ”);
e.printStackTrace();
}
}
/**
* 往 MQ 发送消息
* @param message
* @return
*/
public static int sendMessage(String message){
int result=0;
try{
// 设置将要连接的队列属性
// Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface
//(except for completion code constants and error code constants).
//MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default.
//MQOO_OUTPUT:Open the queue to put messages.
/* 目标为远程队列,所有这里不可以用 MQOO_INPUT_AS_Q_DEF 属性 */
//int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT;
/* 以下选��可适合远程队列与本地队列 */
int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
// 连接队列
//MQQueue provides inquire, set, put and get operations for WebSphere MQ queues.
//The inquire and set capabilities are inherited from MQManagedObject.
/* 关闭了就重新打开 */
if(qMgr==null || !qMgr.isConnected()){
qMgr = new MQQueueManager(qmName);
}
MQQueue queue = qMgr.accessQueue(qName, openOptions);
// 定义一个简单的消息
MQMessage putMessage = new MQMessage();
// 将数据放入消息缓冲区
putMessage.writeUTF(message);
// 设置写入消息的属性(默认属性)
//MQMessage message = new MQMessage();
//message.format = “MQSTR”;
putMessage.format = “MQSTR”; // 设置消息格式 zl01
MQPutMessageOptions pmo = new MQPutMessageOptions();
// 将消息写入队列
queue.put(putMessage,pmo);
queue.close();
}catch (MQException ex) {
System.out.println(“A WebSphere MQ error occurred : Completion code ”
+ ex.completionCode + ” Reason code ” + ex.reasonCode);
ex.printStackTrace();
}catch (IOException ex) {
System.out.println(“An error occurred whilst writing to the message buffer: ” + ex);
}catch(Exception ex){
ex.printStackTrace();
}finally{
try {
qMgr.disconnect();
} catch (MQException e) {
e.printStackTrace();
}
}
return result;
}
/**
* 从队列中去获取消息,如果队列中没有消息,就会发生异常,不过没有关系,有 TRY…CATCH,如果是第三方程序调用方法,如果无返回则说明无消息
* 第三方可以将该方法放于一个无限循环的 while(true){…} 之中,不需要设置等待,因为在该方法内部在没有消息的时候会自动等待。
* @return
*/
public static String getMessage(){
String message=null;
try{
// 设置将要连接的队列属性
// Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface
//(except for completion code constants and error code constants).
//MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default.
//MQOO_OUTPUT:Open the queue to put messages.
int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT;
MQMessage retrieve = new MQMessage();
// 设置取出消息的属性(默认属性)
//Set the put message options.(设置放置消息选项)
//MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;//Get messages under sync point control(在同步点控制下获取消息)
gmo.options = gmo.options + MQC.MQGMO_WAIT; // Wait if no messages on the Queue(如果在队列上没有消息则等待)
gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;// Fail if Qeue Manager Quiescing(如果队列管理器停顿则失败)
gmo.waitInterval = 1000 ; // Sets the time limit for the wait.(设置等待的毫秒时间限制)
/* 关闭了就重新打开 */
if(qMgr==null || !qMgr.isConnected()){
qMgr = new MQQueueManager(qmName);
}
MQQueue queue = qMgr.accessQueue(qName, openOptions);
// 从队列中取出消息
queue.get(retrieve, gmo);
message = retrieve.readUTF();
System.out.println(“The message is: ” + message);
queue.close();
}catch (MQException ex) {
System.out.println(“A WebSphere MQ error occurred : Completion code ”
+ ex.completionCode + ” Reason code ” + ex.reasonCode);
}catch (IOException ex) {
System.out.println(“An error occurred whilst writing to the message buffer: ” + ex);
}catch(Exception ex){
ex.printStackTrace();
}finally{
try {
qMgr.disconnect();
} catch (MQException e) {
e.printStackTrace();
}
}
return message;
}
public static void main(String args[]) {
/* 下面两个方法可同时使用,也可以单独使用 */
sendMessage(“Hello 123”);
//getMessage();
}
}
4. 队列管理器 QM2 中浏览消息
参照测试场景二中的第 7 步,队列 QL 中查看测试消息“Hello 123”
六、客户端配置
为方便对部署在 linux 上的 MQ 程序进行管理,可以通过在 windows 端添加远程远程队列管理器的方式进行图形化管理。以为 QM1 新建远程队列管理器进行图形化管理为例,步骤如下:
1. 修改队列管理器 QM1 访问权限
mqm@mq:~> strmqm QM1
mqm@mq:~> runmqsc QM1
alter qmgr chlauth(disabled)
2. 在 linux 端队列管理器 QM1 下新建监听
mqsc 命令模式下新建监听 L1,端口为 1501
DEFINE LISTENER(L1) TRPTYPE(TCP) PORT(1501) CONTROL(QMGR) REPLACE
START LISTENER(L1)
3. 新建服务器连接通道 C1 并启动
define channel(C1) chltype(SVRCONN) trptype(TCP) mcauser(‘mqm’) replace
start channel(C1)
当 windows 端没有连接时,C1 状态为“通道状态未找到”,此状态为正常。
4. 在 windows 端启动 MQ,新建远程队列管理器 QM1
“队列管理器”——“添加远程队列管理器”
队列名为 QM1
Ip 为 192.168.0.151,端口为 L1 的 1501,服务器连接通道为 C1,单击完成
如图,具体操作可参见目录五的测试场景二
附件:生产创建队列管理器及通道和监听器语句
– 创建队列管理器
crtmqm QM_VACT
– 启动队列管理器
strmqm QM_VACT
– 创建服务器连接通道
DEFINE CHANNEL (‘VACT.SVR.CONN’) CHLTYPE(SVRCONN) +
TRPTYPE(TCP) +
DESCR(‘ ‘) +
HBINT(300) +
MAXMSGL(4194304) +
MCAUSER(‘mqm’) +
RCVDATA(‘ ‘) +
RCVEXIT(‘ ‘) +
SCYDATA(‘ ‘) +
SCYEXIT(‘ ‘) +
SENDDATA(‘ ‘) +
SENDEXIT(‘ ‘) +
SSLCAUTH(REQUIRED) +
SSLCIPH(‘ ‘) +
SSLPEER(‘ ‘) +
KAINT(AUTO) +
MONCHL(QMGR) +
COMPMSG(NONE) +
COMPHDR(NONE) +
SHARECNV(10) +
MAXINST(999999999) +
MAXINSTC(999999999) +
REPLACE
– 创建侦听器
DEFINE LISTENER (‘LISTENER.TCP’) +
TRPTYPE(TCP) +
IPADDR(‘ ‘) +
PORT(10010) +
BACKLOG(0) +
DESCR(‘ ‘) +
CONTROL(QMGR) +
REPLACE
– 创建死信队列
DEFINE QLOCAL (‘DEADQ’) +
DESCR(‘ ‘) +
PUT(ENABLED) +
DEFPRTY(0) +
DEFPSIST(YES) +
DEFPRESP(SYNC) +
* CURDEPTH(0) +
CLWLUSEQ(QMGR) +
SCOPE(QMGR) +
GET(ENABLED) +
PROPCTL(COMPAT) +
DEFREADA(NO) +
MAXDEPTH(20000) +
MAXMSGL(4194304) +
SHARE +
DEFSOPT(SHARED) +
MSGDLVSQ(PRIORITY) +
HARDENBO +
USAGE(NORMAL) +
NOTRIGGER +
TRIGTYPE(FIRST) +
TRIGDPTH(1) +
TRIGMPRI(0) +
TRIGDATA(‘ ‘) +
PROCESS(‘ ‘) +
INITQ(‘ ‘) +
RETINTVL(999999999) +
BOTHRESH(0) +
BOQNAME(‘ ‘) +
QDEPTHHI(80) +
QDEPTHLO(20) +
QDPMAXEV(ENABLED) +
QDPHIEV(DISABLED) +
QDPLOEV(DISABLED) +
QSVCINT(999999999) +
QSVCIEV(NONE) +
DISTL(NO) +
NPMCLASS(NORMAL) +
STATQ(QMGR) +
MONQ(QMGR) +
ACCTQ(QMGR) +
CLUSTER(‘ ‘) +
CLUSNL(‘ ‘) +
DEFBIND(OPEN) +
CLWLRANK(0) +
CLWLPRTY(0) +
REPLACE
– 创建本地队列
DEFINE QLOCAL (‘LQ.EAM.SD.100100’) +
DESCR(‘ ‘) +
PUT(ENABLED) +
DEFPRTY(0) +
DEFPSIST(YES) +
DEFPRESP(SYNC) +
* CURDEPTH(0) +
CLWLUSEQ(QMGR) +
SCOPE(QMGR) +
GET(ENABLED) +
PROPCTL(COMPAT) +
DEFREADA(NO) +
MAXDEPTH(20000) +
MAXMSGL(4194304) +
SHARE +
DEFSOPT(SHARED) +
MSGDLVSQ(PRIORITY) +
HARDENBO +
USAGE(NORMAL) +
NOTRIGGER +
TRIGTYPE(FIRST) +
TRIGDPTH(1) +
TRIGMPRI(0) +
TRIGDATA(‘ ‘) +
PROCESS(‘ ‘) +
INITQ(‘ ‘) +
RETINTVL(999999999) +
BOTHRESH(0) +
BOQNAME(‘ ‘) +
QDEPTHHI(80) +
QDEPTHLO(20) +
QDPMAXEV(ENABLED) +
QDPHIEV(DISABLED) +
QDPLOEV(DISABLED) +
QSVCINT(999999999) +
QSVCIEV(NONE) +
DISTL(NO) +
NPMCLASS(NORMAL) +
STATQ(QMGR) +
MONQ(QMGR) +
ACCTQ(QMGR) +
CLUSTER(‘ ‘) +
CLUSNL(‘ ‘) +
DEFBIND(OPEN) +
CLWLRANK(0) +
CLWLPRTY(0) +
REPLACE
– 创建远程队列
DEFINE QREMOTE (‘RQ.TSC.VACT.BMS’) +
* ALTDATE (2013-07-03) +
* ALTTIME (16.42.30) +
DESCR(‘ ‘) +
PUT(ENABLED) +
DEFPRTY(0) +
DEFPSIST(NO) +
DEFPRESP(SYNC) +
SCOPE(QMGR) +
XMITQ(‘TQ.TSC.VACT.BMS’) +
RNAME(‘LQ.VACT.BMS’) +
RQMNAME(‘QM_BMS’) +
CLUSTER(‘ ‘) +
CLUSNL(‘ ‘) +
DEFBIND(OPEN) +
CLWLRANK(0) +
CLWLPRTY(0) +
REPLACE
– 创建传输队列
DEFINE QLOCAL (‘TQ.TSC.VACT.BMS’) +
DESCR(‘ ‘) +
PUT(ENABLED) +
DEFPRTY(0) +
DEFPSIST(YES) +
DEFPRESP(SYNC) +
* CURDEPTH(0) +
CLWLUSEQ(QMGR) +
SCOPE(QMGR) +
GET(ENABLED) +
PROPCTL(COMPAT) +
DEFREADA(NO) +
MAXDEPTH(20000) +
MAXMSGL(4194304) +
SHARE +
DEFSOPT(SHARED) +
MSGDLVSQ(PRIORITY) +
HARDENBO +
USAGE(XMITQ) +
TRIGGER +
TRIGTYPE(FIRST) +
TRIGDPTH(1) +
TRIGMPRI(0) +
TRIGDATA(‘VACT.TO.BMS’) +
PROCESS(‘ ‘) +
INITQ(‘SYSTEM.CHANNEL.INITQ’) +
RETINTVL(999999999) +
BOTHRESH(0) +
BOQNAME(‘ ‘) +
QDEPTHHI(80) +
QDEPTHLO(20) +
QDPMAXEV(ENABLED) +
QDPHIEV(DISABLED) +
QDPLOEV(DISABLED) +
QSVCINT(999999999) +
QSVCIEV(NONE) +
DISTL(YES) +
NPMCLASS(NORMAL) +
STATQ(QMGR) +
MONQ(QMGR) +
ACCTQ(QMGR) +
CLUSTER(‘ ‘) +
CLUSNL(‘ ‘) +
DEFBIND(OPEN) +
CLWLRANK(0) +
CLWLPRTY(0) +
REPLACE
– 创建发送通道
DEFINE CHANNEL (‘BMS.TO.VACT’) CHLTYPE(SDR) +
* ALTDATE (2013-05-27) +
* ALTTIME (20.34.39) +
TRPTYPE(TCP) +
BATCHINT(0) +
BATCHHB(0) +
BATCHSZ(50) +
CONNAME(‘localhost(10010)’) +
LOCLADDR(‘ ‘) +
CONVERT(NO) +
DESCR(‘ ‘) +
DISCINT(6000) +
HBINT(300) +
LONGRTY(999999999) +
LONGTMR(1200) +
SHORTRTY(10) +
SHORTTMR(60) +
MAXMSGL(4194304) +
MCATYPE(PROCESS) +
MCAUSER(‘ ‘) +
MSGDATA(‘ ‘) +
MSGEXIT(‘ ‘) +
NPMSPEED(FAST) +
RCVDATA(‘ ‘) +
RCVEXIT(‘ ‘) +
SCYDATA(‘ ‘) +
SCYEXIT(‘ ‘) +
SENDDATA(‘ ‘) +
SENDEXIT(‘ ‘) +
SEQWRAP(999999999) +
USERID(‘ ‘) +
XMITQ(‘TQ.TSC.BMS.VACT’) +
SSLCIPH(‘ ‘) +
SSLPEER(‘ ‘) +
KAINT(AUTO) +
MONCHL(QMGR) +
STATCHL(QMGR) +
COMPMSG(NONE) +
COMPHDR(NONE) +
PROPCTL(COMPAT) +
REPLACE
– 创建接收通道
DEFINE CHANNEL (‘VACT.TO.BMS’) CHLTYPE(RCVR) +
* ALTDATE (2013-05-28) +
* ALTTIME (14.17.24) +
TRPTYPE(TCP) +
BATCHSZ(50) +
DESCR(‘ ‘) +
HBINT(300) +
MAXMSGL(4194304) +
MCAUSER(‘ ‘) +
MRDATA(‘ ‘) +
MREXIT(‘ ‘) +
MRRTY(10) +
MRTMR(1000) +
MSGDATA(‘ ‘) +
MSGEXIT(‘ ‘) +
NPMSPEED(FAST) +
PUTAUT(DEF) +
RCVDATA(‘ ‘) +
RCVEXIT(‘ ‘) +
SCYDATA(‘ ‘) +
SCYEXIT(‘ ‘) +
SENDDATA(‘ ‘) +
SENDEXIT(‘ ‘) +
SEQWRAP(999999999) +
SSLCAUTH(REQUIRED) +
SSLCIPH(‘ ‘) +
SSLPEER(‘ ‘) +
KAINT(AUTO) +
MONCHL(QMGR) +
STATCHL(QMGR) +
COMPMSG(NONE) +
COMPHDR(NONE) +
REPLACE
: