共计 15308 个字符,预计需要花费 39 分钟才能阅读完成。
Oracle 里存储的结构化数据导出到 Hadoop 体系做离线计算是一种常见数据处置手段。近期有场景需要做 Oracle 到 Hadoop 体系的实时导入,这里以此案例做以介绍。
Oracle 作为商业化的数据库解决方案,自发性的获取数据库事务日志等比较困难,故选择官方提供的同步工具 OGG(Oracle GoldenGate)来解决。
安装与基本配置
环境说明
软件配置
角色 | 数据存储服务及版本 | OGG 版本 | IP |
---|---|---|---|
源服务器 | OracleRelease11.2.0.1 | Oracle GoldenGate 11.2.1.0 for Oracle on Linux x86-64 | 10.0.0.25 |
目标服务器 | CDH5.7 | Oracle GoldenGate for Big Data 12.2.0.1 on Linux x86-64 | 10.0.0.2 |
以上源服务器上 OGG 安装在 Oracle 用户下,目标服务器上 OGG 安装在 root 用户下。
注意
Oracle 导出到异构的存储系统,如 MySQL,DB2,PG 等以及对应的不同平台,如 AIX,Windows,Linux 等官方都有提供对应的 Oracle GoldenGate 版本,可在这里或者在旧版本查询下载安装。
Oracle 源端基础配置
将下载到的对应 OGG 版本放在方便的位置并解压,本示例 Oracle 源端最终的解压目录为 /u01/gg。
-
配置环境变量
这里的环境变量主要是对执行 OGG 的用户添加 OGG 相关的环境变量,本示例为 Oracle 用户添加的环境变量如下:(/home/oracle/.bash_profile
文件)export OGG_HOME=/u01/gg/ export LD_LIBRARY_PATH=$ORACLE_HOME/lib:$OGG_HOME:/lib:/usr/lib export CLASSPATH=$ORACLE_HOME/jdk/jre:$ORACLE_HOME/jlib:$ORACLE_HOME/rdbms/jlib
-
Oracle 打开归档模式
使用如下命令查看当前是否为归档模式(archive)SQL> archive log list Database log mode Archive Mode Automatic archival Enabled Archive destination /u01/arch_log Oldest online log sequence 6 Next log sequence to archive 8 Current log sequence 8
如非以上状态,手动调整即可
SQL> conn / as sysdba(以 DBA 身份连接数据库) SQL> shutdown immediate(立即关闭数据库) SQL> startup mount(启动实例并加载数据库,但不打开) SQL> alter database archivelog(更改数据库为归档模式) SQL> alter database open(打开数据库) SQL> alter system archive log start(启用自动归档)
-
Oracle 打开日志相关
OGG 基于辅助日志等进行实时传输,故需要打开相关日志确保可获取事务内容。通过一下命令查看当前状态:SQL> select force_logging, supplemental_log_data_min from v$database; FOR SUPPLEME--- -------- YES YES
如果以上查询结果非 YES,可通过以下命令修改状态:
SQL> alter database force logging; SQL> alter database add supplemental log data;
-
Oracle 创建复制用户
为了使 Oracle 里用户的复制权限更加单纯,故专门创建复制用户,并赋予 dba 权限SQL> create tablespaceoggtbsdatafile '/u01/app/oracle/oradata/orcl/oggtbs01.dbf' size 1000M autoextend on; SQL> create user ggs identified by ggs default tablespaceoggtbs; User created. SQL> grant dba to ggs; Grant succeeded.
最终这个 ggs 帐号的权限如下所示:
SQL> select * from dba_sys_privs where GRANTEE='GGS'; GRANTEE PRIVILEGE ADM GGS DROP ANY DIRECTORY NO GGS ALTER ANY TABLE NO GGS ALTER SESSION NO GGS SELECT ANY DICTIONARY NO GGS CREATE ANY DIRECTORY NO GGS RESTRICTED SESSION NO GGS FLASHBACK ANY TABLE NO GGS UPDATE ANY TABLE NO GGS DELETE ANY TABLE NO GGS CREATE TABLE NO GGS INSERT ANY TABLE NO GRANTEE PRIVILEGE ADM GGS UNLIMITED TABLESPACE NO GGS CREATE SESSION NO GGS SELECT ANY TABLE NO
-
OGG 初始化
进入 OGG 的主目录执行./ggsci,进入 OGG 命令行[oracle@VM_0_25_CentOS gg]$ ./ggsci Oracle GoldenGate Command Interpreter for Oracle Version 11.2.1.0.3 14400833 OGGCORE_11.2.1.0.3_PLATFORMS_120823.1258_FBO Linux, x64, 64bit (optimized), Oracle 11g on Aug 23 2012 20:20:21 Copyright (C) 1995, 2012, Oracle and/or its affiliates. All rights reserved. GGSCI (VM_0_25_centos) 1> 执行 create subdirs 进行目录创建 GGSCI (VM_0_25_centos) 4> create subdirs Creating subdirectories under current directory /u01/gg Parameter files /u01/gg/dirprm: already exists Report files /u01/gg/dirrpt: already exists Checkpoint files /u01/gg/dirchk: already exists Process status files /u01/gg/dirpcs: already exists SQL script files /u01/gg/dirsql: already exists Database definitions files /u01/gg/dirdef: already exists Extract data files /u01/gg/dirdat: already exists Temporary files /u01/gg/dirtmp: already exists Stdout files /u01/gg/dirout: already exists
-
Oracle 创建模拟复制库表
模拟建一个用户叫 tcloud,密码 tcloud,同时基于这个用户建一张表,叫t_ogg
。SQL> create user tcloud identified by tcloud default tablespace users; User created. SQL> grant dba to tcloud; Grant succeeded. SQL> conn tcloud/tcloud; Connected. SQL> create table t_ogg(id int ,text_name varchar(20),primary key(id)); Table created.
目标端基础配置
将下载到的对应 OGG 版本放在方便的位置并解压,本示例 Oracle 目标端最终的解压目录为/data/gg
。
-
配置环境变量
这里需要用到 HDFS 相关的库,故需要配置 Java 环境变量以及 OGG 相关,并引入 HDFS 的相关库文件(jdk 必须是 1.7, 否则会报错在后面启动 replicatjinc),参考配置如下:export JAVA_HOME=/usr/java/jdk1.7.0_75/ export LD_LIBRARY_PATH=/usr/java/jdk1.7.0_75/jre/lib/amd64:/usr/java/jdk1.7.0_75/jre/lib/amd64/server:/usr/java/jdk1.7.0_75/jre/lib/amd64/libjsig.so:/usr/java/jdk1.7.0_75/jre/lib/amd64/server/libjvm.so:$OGG_HOME:/lib export OGG_HOME=/data/gg
-
OGG 初始化
目标端的 OGG 初始化和源端类似进入 OGG 的主目录执行./ggsci
,进入 OGG 命令行GGSCI (10.0.0.2) 2> create subdirs Creating subdirectories under current directory /data/gg Parameter files /data/gg/dirprm: already exists Report files /data/gg/dirrpt: already exists Checkpoint files /data/gg/dirchk: already exists Process status files /data/gg/dirpcs: already exists SQL script files /data/gg/dirsql: already exists Database definitions files /data/gg/dirdef: already exists Extract data files /data/gg/dirdat: already exists Temporary files /data/gg/dirtmp: already exists Credential store files /data/gg/dircrd: already exists Masterkey wallet files /data/gg/dirwlt: already exists Dump files /data/gg/dirdmp: already exists
Oracle 源配置
Oracle 实时传输到 Hadoop 集群(HDFS,Hive,Kafka 等)的基本原理如图:
根据如上原理,配置大概分为如下步骤:源端目标端配置 ogg 管理器(mgr);源端配置 extract 进程进行 Oracle 日志抓取;源端配置 pump 进程传输抓取内容到目标端;目标端配置 replicate 进程复制日志到 Hadoop 集群或者复制到用户自定义的解析器将最终结果落入到 Hadoop 集群。
配置全局变量
在源端服务器 OGG 主目录下,执行 ./ggsci
到 OGG 命令行下,执行如下命令:
GGSCI (VM_0_25_centos) 1> dblogin userid ggs password ggs
Successfully logged into database.
GGSCI (VM_0_25_centos) 3> view params ./globals
ggschema ggs
其中 ./globals
变量没有的话可以用 edit params ./globals
来编辑添加即可(编辑器默认使用的 vim)
配置管理器 mgr
在 OGG 命令行下执行如下命令:
GGSCI (VM_0_25_centos) 4> edit param mgr
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
说明:PORT 即 mgr 的默认监听端口;DYNAMICPORTLIST 动态端口列表,当指定的 mgr 端口不可用时,会在这个端口列表中选择一个,最大指定范围为 256 个;AUTORESTART 重启参数设置表示重启所有 EXTRACT 进程,最多 5 次,每次间隔 3 分钟;PURGEOLDEXTRACTS 即 TRAIL 文件的定期清理
在命令行下执行 start mgr 即可启动管理进程,通过 info mgr 可查看 mgr 状态
GGSCI (VM_0_25_centos) 5> info mgr
Manager is running (IP port VM_0_25_centos.7809).
添加复制表
在 OGG 命令行下执行添加需要复制的表的操作,如下:
GGSCI (VM_0_25_centos) 7> add trandata tcloud.t_ogg
Logging of supplemental redo data enabled for table TCLOUD.T_OGG.
GGSCI (VM_0_25_centos) 8> info trandata tcloud.t_ogg
Logging of supplemental redo log data is enabled for table TCLOUD.T_OGG.
Columns supplementally logged for table TCLOUD.T_OGG: ID.
配置 extract 进程
配置 extract 进程 OGG 命令行下执行如下命令:
GGSCI (VM_0_25_centos) 10> edit params ext2hd
extract ext2hd
dynamicresolution
SETENV (ORACLE_SID = "orcl")
SETENV (NLS_LANG = "american_america.AL32UTF8")
userid ggs,password ggs
exttrail /u01/gg/dirdat/tc
table tcloud.t_ogg;
说明:第一行指定 extract 进程名称;dynamicresolution 动态解析;SETENV 设置环境变量,这里分别设置了 Oracle 数据库以及字符集;userid ggs,password ggs 即 OGG 连接 Oracle 数据库的帐号密码,这里使用 2.3.4 中特意创建的复制帐号;exttrail 定义 trail 文件的保存位置以及文件名,注意这里文件名只能是 2 个字母,其余部分 OGG 会补齐;table 即复制表的表明,支持 *
通配,必须以; 结尾
接下来在 OGG 命令行执行如下命令添加 extract 进程:
GGSCI (VM_0_25_centos) 11> add extract ext2hd,tranlog,begin now
EXTRACT added.
最后添加 trail 文件的定义与 extract 进程绑定:
GGSCI (VM_0_25_centos) 12> add exttrail /u01/gg/dirdat/tc,extract ext2hd
EXTTRAIL added
可在 OGG 命令行下通过 info 命令查看状态:
GGSCI (VM_0_25_centos) 14> info ext2hd
EXTRACT EXT2HD Initialized 2016-11-09 15:37 Status STOPPED
Checkpoint Lag 00:00:00 (updated 00:02:32 ago)
Log Read Checkpoint Oracle Redo Logs
2016-11-09 15:37:14 Seqno 0, RBA 0
SCN 0.0 (0)
配置 pump 进程
pump 进程本质上来说也是一个 extract,只不过他的作用仅仅是把 trail 文件传递到目标端,配置过程和 extract 进程类似,只是逻辑上称之为 pump 进程
在 OGG 命令行下执行:
GGSCI (VM_0_25_centos) 16> edit params push2hd
extract push2hd
passthru
dynamicresolution
userid ggs,password ggs
rmthost 10.0.0.2 mgrport 7809
rmttrail /data/gg/dirdat/tc
table tcloud.t_ogg;
说明:第一行指定 extract 进程名称;passthru 即禁止 OGG 与 Oracle 交互,我们这里使用 pump 逻辑传输,故禁止即可;dynamicresolution 动态解析;userid ggs,password ggs 即 OGG 连接 Oracle 数据库的帐号密码,这里使用 2.3.4 中特意创建的复制帐号;rmthost 和 mgrhost 即目标端 OGG 的 mgr 服务的地址以及监听端口;rmttrail 即目标端 trail 文件存储位置以及名称
分别将本地 trail 文件和目标端的 trail 文件绑定到 extract 进程:
GGSCI (VM_0_25_centos) 17> add extract push2hd,exttrailsource /u01/gg/dirdat/tc
EXTRACT added.
GGSCI (VM_0_25_centos) 18> add rmttrail /data/gg/dirdat/tc,extract push2hd
RMTTRAIL added.
同样可以在 OGG 命令行下使用 info 查看进程状态:
GGSCI (VM_0_25_centos) 19> info push2hd
EXTRACT PUSH2HD Initialized 2016-11-09 15:52 Status STOPPED
Checkpoint Lag 00:00:00 (updated 00:01:04 ago)
Log Read Checkpoint File /u01/gg/dirdat/tc000000
First Record RBA 0
配置 define 文件
Oracle 与 MySQL,Hadoop 集群(HDFS,Hive,kafka 等)等之间数据传输可以定义为异构数据类型的传输,故需要定义表之间的关系映射,在 OGG 命令行执行:
GGSCI (VM_0_25_centos) 20> edit params tcloud
defsfile /u01/gg/dirdef/tcloud.t_ogg
userid ggs,password ggs
table tcloud.t_ogg;
在 OGG 主目录下执行:./defgen paramfile dirprm/tcloud.prm
完成之后会生成这样的文件 /u01/gg/dirdef/tcloud.t_ogg,将这个文件拷贝到目标端的 OGG 主目录下的 dirdef 目录即可。
目标端的配置
创建目标表(目录)
这里主要是当目标端为 HDFS 目录或者 Hive 表或者 MySQL 数据库时需要手动先在目标端创建好目录或者表,创建方法都类似,这里我们模拟实时传入到 HDFS 目录,故手动创建一个接收目录即可hadoop –fs mkdir /gg/replication/hive/
配置管理器 mgr
目标端的 OGG 管理器(mgr)和源端的配置类似,在 OGG 命令行下执行:
GGSCI (10.0.0.2) 2> edit params mgr
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
配置 checkpoint
checkpoint 即复制可追溯的一个偏移量记录,在全局配置里添加 checkpoint 表即可
GGSCI (10.0.0.2) 5> edit params ./GLOBALS
CHECKPOINTTABLE tcloud.checkpoint
保存即可
配置 replicate 进程
在 OGG 的命令行下执行:
GGSCI (10.0.0.2) 8> edit params r2hdfs
REPLICAT r2hdfs
sourcedefs /data/gg/dirdef/tcloud.t_ogg
TARGETDB LIBFILE libggjava.so SET property=dirprm/hdfs.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP tcloud.t_ogg, TARGET tcloud.t_ogg;
说明:REPLICATE r2hdfs 定义 rep 进程名称;sourcedefs 即在 3.6 中在源服务器上做的表映射文件;TARGETDB LIBFILE 即定义 HDFS 一些适配性的库文件以及配置文件,配置文件位于 OGG 主目录下的 dirprm/hdfs.props
;REPORTCOUNT 即复制任务的报告生成频率;GROUPTRANSOPS 为以事务传输时,事务合并的单位,减少 IO 操作;MAP 即源端与目标端的映射关系
其中 property=dirprm/hdfs.props
的配置中,最主要的几项配置及注释如下:
gg.handlerlist=hdfs //OGG for Big Data 中 handle 类型
gg.handler.hdfs.type=hdfs //OGG for Big Data 中 HDFS 目标
gg.handler.hdfs.rootFilePath=/gg/replication/hive/ //OGG for Big Data 中 HDFS 存储主目录
gg.handler.hdfs.mode=op //OGG for Big Data 中传输模式,即 op 为一次 SQL 传输一次,tx 为一次事务传输一次
gg.handler.hdfs.includeTokens=false
gg.handler.hdfs.maxFileSize=1g
gg.handler.hdfs.fileRollInterval=0
gg.handler.hdfs.inactivityRollInterval=0
gg.handler.hdfs.fileSuffix=.txt
gg.handler.hdfs.partitionByTable=true
gg.handler.hdfs.rollOnMetadataChange=true
gg.handler.hdfs.authType=none
goldengate.userexit.writers=javawriter
javawriter.stats.display=true
javawriter.stat.full=true
gg.handler.hdfs.format=delimitedtext //OGG for Big Data 中文件传输格式
#()gg.classpath=/usr/hdp/2.2.0.0-2041/hadoop/share/hadoop/common/*:/usr/hdp/2.2.0.0-2041/hadoop/share/hadoop/common/lib/*:/usr/hdp/2.2.0.0-2041/hadoop/share/hadoop/hdfs/*:/usr/hdp/2.2.0.0-2041/hadoop/etc/hadoop/:/data/gg/:/data/gg/lib/*:/usr/hdp/2.2.0.0-2041/hadoop/client/*
gg.classpath=/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/hadoop/*:/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/hadoop/lib/*:/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/hadoop-hdfs/*:/opt/cloudera/parcels/CDH-5.7.0-1/cdh5.7.0.p0.45/lib/hadoop/etc/hadoop/:/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.po.45/lib/hadoop/lib/native:/usr/local/ogg/:/usr/local/ogg/lib/*:/etc/hadoop/conf
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
###Big Data 中使用到的 HDFS 库的定义
具体的 OGG for Big Data 支持参数以及定义可参考地址
http://docs.oracle.com/goldengate/bd1221/gg-bd/GADBD/toc.htm
最后在 OGG 的命令行下执行:
GGSCI (10.0.0.2) 9> add replicat r2hdfs exttrail /data/gg/dirdat/tc,checkpointtable tcloud.checkpointtab
REPLICAT added.
将文件与复制进程绑定即可
测试
启动进程
在源端和目标端的 OGG 命令行下使用 start [进程名]的形式启动所有进程。
启动顺序按照源 mgr——目标 mgr——源 extract——源 pump——目标 replicate 来完成。
检查进程状态
以上启动完成之后,可在源端与目标端的 OGG 命令行下使用 info [进程名]来查看所有进程状态,如下:
源端:
GGSCI (VM_0_25_centos) 7> info mgr
Manager is running (IP port VM_0_25_centos.7809).
GGSCI (VM_0_25_centos) 9> info ext2hd
EXTRACT EXT2HD Last Started 2016-11-09 16:05 Status RUNNING
Checkpoint Lag 00:00:00 (updated 00:00:09 ago)
Log Read Checkpoint Oracle Redo Logs
2016-11-09 16:45:51 Seqno 8, RBA 132864000
SCN 0.1452333 (1452333)
GGSCI (VM_0_25_centos) 10> info push2hd
EXTRACT PUSH2HD Last Started 2016-11-09 16:05 Status RUNNING
Checkpoint Lag 00:00:00 (updated 00:00:01 ago)
Log Read Checkpoint File /u01/gg/dirdat/tc000000
First Record RBA 1043
目标端:
GGSCI (10.0.0.2) 13> info mgr
Manager is running (IP port 10.0.0.2.7809, Process ID 8242).
GGSCI (10.0.0.2) 14> info r2hdfs
REPLICAT R2HDFS Last Started 2016-11-09 16:45 Status RUNNING
Checkpoint Lag 00:00:00 (updated 00:00:02 ago)
Process ID 4733
Log Read Checkpoint File /data/gg/dirdat/tc000000
First Record RBA 0
所有的状态均是 RUNNING 即可。(当然也可以使用 info all 来查看所有进程状态)
测试同步更新效果
测试方法比较简单,直接在源端的数据表中 insert,update,delete 操作即可。由于 Oracle 到 Hadoop 集群的同步是异构形式,目前尚不支持 truncate 操作。
源��进行 insert 操作
SQL> conn tcloud/tcloud
Connected.
SQL> select * from t_ogg;
no rows selected
SQL> desc t_ogg;
Name Null? Type
----------------------------------------- -------- ----------------------------
ID NOT NULL NUMBER(38)
TEXT_NAME VARCHAR2(20)
SQL> insert into t_ogg values(1,'test');
1 row created.
SQL> commit;
Commit complete.
查看源端 trail 文件状态
[oracle@VM_0_25_centos dirdat]$ ls -l /u01/gg/dirdat/tc*
-rw-rw-rw- 1 oracle oinstall 1180 Nov 9 17:05 /u01/gg/dirdat/tc000000
查看目标端 trail 文件状态
[root@10 dirdat]# ls -l /data/gg/dirdat/tc*
-rw-r----- 1 root root 1217 Nov 9 17:05 /data/gg/dirdat/tc000000
查看 HDFS 中是否有写入
hadoop fs -ls /gg/replication/hive/tcloud.t_ogg
-rw-rw-r-- 3 root hdfs 110 2016-11-09 17:05
/gg/replication/hive/tcloud.t_ogg/tcloud.t_ogg_2016-11-09_17-05-30.514.txt
注意:从写入到 HDFS 的文件内容看,文件的格式如下:
ITCLOUD.T_OGG2016-11-09 09:05:25.0670822016-11-09T17:05:30.51200000000000000000001080ID1TEXT_NAMEtest
很明显 Oracle 的数据已准实时导入到 HDFS 了。导入的内容实际是一条条的类似流水日志(具体日志格式不同的传输格式,内容略有差异,本例使用的 delimitedtext。格式为操作符 数据库. 表名 操作时间戳 (GMT+0) 当前时间戳(GMT+8) 偏移量 字段 1 名称 字段 1 内容 字段 2 名称 字段 2 内容),如果要和 Oracle 的表内容完全一致,需要客户手动实现解析日志并写入到 Hive 的功能,这里官方并没有提供适配器。目前腾讯侧已实现该功能的开发。
当然你可以直接把这个 HDFS 的路径通过 LOCATION 的方式在 Hive 上建外表(external table)达到实时导入 Hive 的目的。
总结
OGG for Big Data 实现了 Oracle 实时同步到 Hadoop 体系的接口,但得到的日志目前仍需应用层来解析(关系型数据库如 MySQL 时 OGG 对应版本已实现应用层的解析,无需人工解析)。
OGG 的几个主要进程 mgr,extract,pump,replicate 配置方便,可快速配置 OGG 与异构关系存储结构的实时同步。后续如果有新增表,修改对应的 extract,pump 和 replicate 进程即可,当然如果是一整个库,在配置上述 2 个进程时,使用通配的方式即可。
附录
OGG 到 Hadoop 体系的实时同步时,可在源端 extract 和 pump 进程配置不变的情况下,直接在目标端增加 replicate 进程的方式,增加同步目标,以下简单介绍本示例中增加同步到 Kafka 的配置方法。
本示例中 extract,pump 进程都是现成的,无需再添加。只需要在目标端增加同步到 Kafka 的 replicate 进程即可。
在 OGG 的命令行下执行:
GGSCI (10.0.0.2) 4> edit params r2kafka
REPLICAT r2kafka
sourcedefs /data/gg/dirdef/tcloud.t_ogg
TARGETDB LIBFILE libggjava.so SET property=dirprm/r2kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP tcloud.t_ogg, TARGET tcloud.t_ogg;
replicate 进程和导入到 HDFS 的配置类似,差异是调用不同的配置 dirprm/r2kafka.props。这个配置的主要配置如下:
gg.handlerlist = kafkahandler //handler类型
gg.handler.kafkahandler.type = kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties //kafka 相关配置
gg.handler.kafkahandler.TopicName =ggtopic //kafka 的 topic 名称,无需手动创建
gg.handler.kafkahandler.format =json // 传输文件的格式,支持 json,xml 等
gg.handler.kafkahandler.mode =op //OGG for Big Data中传输模式,即 op 为一次 SQL 传输一次,tx 为一次事务传输一次
gg.classpath=dirprm/:/usr/hdp/2.2.0.0-2041/kafka/libs/*:/data/gg/:/data/gg/lib/* // 相关库文件的引用
r2kafka.props
引用的 custom_kafka_producer.properties
定义了 Kafka 的相关配置如下:
bootstrap.servers=10.0.0.62:6667 //kafkabroker 的地址
acks=1
compression.type=gzip // 压缩类型
reconnect.backoff.ms=1000 // 重连延时
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000
以上配置以及其他可配置项可参考地址:
以上配置完成后,在 OGG 命令行下添加 trail 文件到 replicate 进程并启动导入到 Kafka 的 replicate 进程
GGSCI (10.0.0.2) 5> add replicat r2kafka exttrail
/data/gg/dirdat/tc,checkpointtable tcloud.checkpoint
REPLICAT added.
GGSCI (10.0.0.2) 6> start r2kafka
Sending START request to MANAGER ...
REPLICAT R2KAFKA starting
GGSCI (10.0.0.2) 10> info r2kafka
REPLICAT R2KAFKA Last Started 2016-11-09 17:59 Status RUNNING
Checkpoint Lag 00:00:00 (updated 00:00:09 ago)
Process ID 5236
Log Read Checkpoint File /data/gg/dirdat/tc000000
2016-11-09 17:05:25.067082 RBA 1217
检查实时同步到 kafka 的效果,在 Oracle 源端更新表的同时,使用 kafka 客户端自带的脚本去查看这里配置的 ggtopic 这个 kafkatopic 下的消息:
SQL> insert into t_ogg values(2,'test2');
1 row created.
SQL> commit;
Commit complete.
目标端 Kafka 的同步情况:
[root@10 kafka]# bin/kafka-console-consumer.sh --zookeeper 10.0.0.223:2181 --
from-beginning --topic ggtopic
{"table":"TCLOUD.T_OGG","op_type":"I","op_ts":"2016-11-09
09:05:25.067082","current_ts":"2016-11-
09T17:59:20.943000","pos":"00000000000000001080","after":
{"ID":"1","TEXT_NAME":"test"}}
{"table":"TCLOUD.T_OGG","op_type":"I","op_ts":"2016-11-09
10:02:06.827204","current_ts":"2016-11-
09T18:02:12.323000","pos":"00000000000000001217","after":
{"ID":"2","TEXT_NAME":"test2"}}
显然,Oracle 的数据已准实时同步到 Kafka。从头开始消费这个 topic 发现之前的同步信息也存在。架构上可以直接接 Storm,SparkStreaming 等直接消费 kafka 消息进行业务逻辑的处理。
从 Oracle 实时同步到其他的 Hadoop 集群中,官方最新版本提供了 HDFS,Hbase,Flume 和 Kafka,相关配置可参考官网给出的例子配置即可。
参考文档:
http://www.oracle.com/webfolder/technetwork/tutorials/obe/fmw/goldengate/12c/OGG12c_Integrated_Replicat/index.html
用文永久更新链接地址:http://www.linuxidc.com/Linux/2016-12/138061.htm