阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

OGG实现Oracle数据同步到Kafka

254次阅读
没有评论

共计 5551 个字符,预计需要花费 14 分钟才能阅读完成。

环境:
源端:Oracle12.2 ogg for Oracle 12.3
目标端:Kafka ogg for bigdata 12.3
将 Oracle 中的数据通过 OGG 同步到 Kafka

源端配置:
1、为要同步的表添加附加日志
dblogin USERID ogg@orclpdb, PASSWORD ogg
add trandata scott.tab1
add trandata scott.tab2

2、添加抽取进程
GGSCI>add extract EXT_KAF1,integrated tranlog, begin now
GGSCI>add EXTTRAIL ./dirdat/k1, extract EXT_KAF1,MEGABYTES 200

编辑抽取进程参数:
GGSCI> edit params EXT_KAF1

extract EXT_KAF1
userid c##ggadmin,PASSWORD ggadmin
LOGALLSUPCOLS
UPDATERECORDFORMAT COMPACT
exttrail ./dirdat/k1,FORMAT RELEASE 12.3
SOURCECATALOG orclpdb –(指定 pdb)
table scott.tab1;
table scott.tab2;

注册进程
GGSCI> DBLOGIN USERID c##ggadmin,PASSWORD ggadmin
GGSCI> register extract EXT_KAF1 database container (orclpdb)

3、添加投递进程:
GGSCI>add extract PMP_KAF1, exttrailsource ./dirdat/k1
GGSCI>add rmttrail ./dirdat/f1,EXTRACT PMP_KAF1,MEGABYTES 200

编辑投递进程参数:
GGSCI>edit param PMP_KAF1

EXTRACT PMP_KAF1
USERID c##ggadmin,PASSWORD ggadmin
PASSTHRU
RMTHOST 10.1.1.247, MGRPORT 9178
RMTTRAIL ./dirdat/f1,format release 12.3
SOURCECATALOG orclpdb
TABLE scott.tab1;
table scott.tab2;

4、添加数据初始化进程(Oracle initial load) 可以多个表分开初始化也可以一起初始化,此处选择分开初始化
GGSCI> add extract ek_01, sourceistable

编辑参数:
GGSCI> EDIT PARAMS ek_01

EXTRACT ek_01
USERID c##ggadmin,PASSWORD ggadmin
RMTHOST 10.1.1.247, MGRPORT 9178
RMTFILE ./dirdat/ka,maxfiles 999, megabytes 500,format release 12.3
SOURCECATALOG orclpdb
table scott.tab1;

GGSCI> add extract ek_02, sourceistable

EDIT PARAMS ek_02
EXTRACT ek_02
USERID c##ggadmin,PASSWORD ggadmin
RMTHOST 10.1.1.247, MGRPORT 9178
RMTFILE ./dirdat/kb,maxfiles 999, megabytes 500,format release 12.3
SOURCECATALOG orclpdb
table scott.tab2;

5、生成 def 文件:
GGSCI> edit param defgen1

USERID c##ggadmin,PASSWORD ggadmin
defsfile /home/oracle/ogg/ggs12/dirdef/defgen1.def,format release 12.3
SOURCECATALOG orclpdb
table scott.tab1;
table scott.tab2;

在 OGG_HOME 下执行如下命令生成 def 文件
defgen paramfile dirprm/defgen1.prm

将生成的 def 文件传到目标端 $OGG_HOME/dirdef 下

目标端配置:
1、将 $OGG_HOME/AdapterExamples/big-data/kafka 下的所有文件 copy 到 $OGG_HOME/dirprm 下
cd $OGG_HOME/AdapterExamples/big-data/kafka
cp * $OGG_HOME/dirprm

2、将 $ORACLE_HOME/AdapterExamples/trail 下的文件 tr000000000 copy 到 $OGG_HOME/dirdat 下
cd $ORACLE_HOME/AdapterExamples/trail
cp tr000000000 $OGG_HOME/dirdat

3、添加初始化进程:(可以多表一起初始化也可以分开初始化,此处选择单独初始化)
GGSCI> ADD replicat rp_01, specialrun

GGSCI> EDIT PARAMS rp_01
SPECIALRUN
end runtime
setenv(NLS_LANG=”AMERICAN_AMERICA.ZHS16GBK”)
targetdb libfile libggjava.so set property=./dirprm/kafka1.props
SOURCEDEFS ./dirdef/defgen1.def
EXTFILE ./dirdat/ka
reportcount every 1 minutes, rate
grouptransops 10000
MAP orclpdb.scott.tab1, TARGET scott.tab1;

GGSCI> ADD replicat rp_02, specialrun
GGSCI> EDIT PARAMS rp_02

SPECIALRUN
end runtime
setenv(NLS_LANG=”AMERICAN_AMERICA.ZHS16GBK”)
targetdb libfile libggjava.so set property=./dirprm/kafka2.props
SOURCEDEFS ./dirdef/defgen1.def
EXTFILE ./dirdat/kb
reportcount every 1 minutes, rate
grouptransops 10000
MAP orclpdb.scott.tab2, TARGET scott.tab2;

4、添加恢复进程:
GGSCI>add replicat r_kaf1,exttrail ./dirdat/f1
GGSCI>edit params r_kaf1

REPLICAT r_kaf1
setenv(NLS_LANG=”AMERICAN_AMERICA.ZHS16GBK”)
HANDLECOLLISIONS
targetdb libfile libggjava.so set property=./dirprm/kafka1.props
SOURCEDEFS ./dirdef/defgen1.def
reportcount every 1 minutes, rate
grouptransops 10000
MAP orclpdb.scott.tab1, TARGET scott.tab1;

GGSCI> add replicat r_kaf2,exttrail ./dirdat/f2
GGSCI> edit params r_kaf2

REPLICAT r_kaf2
setenv(NLS_LANG=”AMERICAN_AMERICA.ZHS16GBK”)
HANDLECOLLISIONS
targetdb libfile libggjava.so set property=./dirprm/kafka2.props
SOURCEDEFS ./dirdef/defgen1.def
reportcount every 1 minutes, rate
grouptransops 10000
MAP orclpdb.scott.tab2, TARGET scott.tab2;

5、参数配置:
custom_kafka_producer.properties 文件内容如下:

bootstrap.servers=10.1.1.246:9200,10.1.1.247:9200 – 只需要改动这一行就行,指定 kafka 的地址和端口号
acks=1
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=16384
linger.ms=10000

kafka1.props 文件内容如下:
gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
#The following resolves the topic name using the short table name
gg.handler.kafkahandler.topicMappingTemplate= topic1
#gg.handler.kafkahandler.format=avro_op
gg.handler.kafkahandler.format =json – 这里做了改动,指定格式为 json 格式
gg.handler.kafkahandler.format.insertOpKey=I
gg.handler.kafkahandler.format.updateOpKey=U
gg.handler.kafkahandler.format.deleteOpKey=D
gg.handler.kafkahandler.format.truncateOpKey=T
gg.handler.kafkahandler.format.prettyPrint=false
gg.handler.kafkahandler.format.jsonDelimiter=CDATA[]
gg.handler.kafkahandler.format.includePrimaryKeys=true – 包含主键
gg.handler.kafkahandler.SchemaTopicName= topic1 – 此处指定为要同步到的目标 topic 名字
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode=op
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
#Sample gg.classpath for Apache Kafka
gg.classpath=dirprm/:/opt/cloudera/parcels/KAFKA/lib/kafka/libs/ – 指定 classpath,这里很重要,必须有 kafka 安装文件的类库。
#Sample gg.classpath for HDP
#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

启动进程进程恢复:
1、启动源端抓取进程
GGSCI> start EXT_KAF1
2、启动源端投递进程
GGSCI> start PMP_KAF1
3、启动源端初始化进程
GGSCI> start ek_01
4、启动目标端初始化进程
在 $OGG_HOME 下执行如下命令:
./replicat paramfile ./dirprm/rp_01.prm reportfile ./dirrpt/rp_01.rpt -p INITIALDATALOAD
5、启动目标端恢复进程
GGSCI> start R_KAF1

遇到的错误:
1、ERROR OGG-15050 Error loading Java VM runtime library(2 no such file or directory)

OGG 实现 Oracle 数据同步到 Kafka

原因:找不到类库 (配置好环境变量之后,OGG 的 mgr 进程没有重启,导致的)
解决:重启 MGR 进程

2、ERROR OG-15051 Java or JNI exception

OGG 实现 Oracle 数据同步到 Kafka

原因:没有使用 ogg12.3.1.1.1 自带的 kafka.props,而是 copy 了 ogg12.2 的 kafka.props,导致出现异常。
解决:使用 ogg12.3.1.1.1 自带的 kafka.props,并指定相关的属性,解决。

更多 Oracle 相关信息见Oracle 专题页面 https://www.linuxidc.com/topicnews.aspx?tid=12

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-22发表,共计5551字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中