1. OGG简介
2. 应用场景
3. 基本原理
基于日志捕获技术的实时增量数据集成
Oracle GoldenGate 数据复制过程如下:
4. 基本架构
Oracle GoldenGate主要由如下组件组成
组件 | 说明 |
---|---|
Manager | 不管是源端还是目标端必须并且只能有一个Manager进程,可以启动、关闭、监控其他进程的健康状态,报告错误事件、分配数据存储空间,发布阀值报告等,其作用: 1:监控与启动 GoldenGate 的其它进程 2:管理 trail 文件及 Reporting |
Extract | Extract 进程运行在数据库源端上,它是Golden Gate的捕获机制,可以配置Extract 进程来做如下工作: 1:初始数据装载:对于初始数据装载,Extract 进程直接从源对象中提取数据 2:同步变化捕获:保持源数据与其它数据集的同步。初始数据同步完成后,Extract 进程捕获源数据的变化;如DML变化、 DDL变化等 |
Replicat | Replicat 进程是运行在目标端系统的一个进程,负责读取 Extract 进程提取到的数据(变更的事务或 DDL 变化)并应用到目标数据库,就像 Extract 进程一样,也可以配置 Replicat 进程来完成如下工作: 1:初始化数据装载:对于初始化数据装载,Replicat 进程应用数据到目标对象或者路由它们到一个高速的 Bulk-load 工具上; 2:数据同步,将 Extract 进程捕获到的提交了的事务应用到目标数据库中; |
Collector | Collector 是运行在目标端的一个后台进程,接收从 TCP/IP 网络传输过来的数据库变化,并写到 Trail 文件里 |
Trails | 为了持续地提取与复制数据库变化,GoldenGate 将捕获到的数据变化临时存放在磁盘上的一系列文件中,这些文件就叫做 Trail 文件 |
Data Pumps | Data Pump 是一个配置在源端的辅助的 Extract 机制,Data Pump 是一个可选组件,如果不配置 Data Pump,那么由 Extract 主进程将数据发送到目标端的 Remote Trail 文件中;如果配置了 Data Pump,会由 Data Pump将Extract 主进程写好的本地 Trail 文件通过网络发送到目标端的 Remote Trail 文件中 |
5. 常用的拓扑结构
由此可见,GoldenGate TDM的复制模式非常灵活,用户可以根据自己的需求选择特定的复制方式,并根据系统扩展对复制进行扩展。
6. 支持的环境
源和目标的操作系统和数据库可以进行任意的组合
7. OGG安装部署
注:在Docker环境下,整合Oracle,
主机名 | IP | OGG |
---|---|---|
node1 | 192.168.88.10 | 源端 |
node2 | 192.168.88.20 | 目标端 |
需要切换到oracle用户操作:
1 | su - oracle |
因为配置数据库需要在sqlplus中执行,所以使用sysdba用户登录:
1 | sqlplus / as sysdba |
1 | archive log list |
Automatic archival是Disabled状态,因为Oracle默认是不开启自动归档的
1 | conn /as sysdba |
关闭数据库,执行命令:
1 | shutdown immediate |
启动并装载数据库,但没有打开数据文件,该命令常用来修改数据库运行模式或恢复数据库。执行命令:
1 | startup mount |
执行开启归档命令:
1 | alter database archivelog; |
执行打开数据库命令:
1 | alter database open; |
执行自动归档命令:
1 | alter system archive log start; |
1 | archive log list |
Automatic archival变成了Enabled状态,表示已经开启自动归档成功
1 | select force_logging,supplemental_log_data_min from v$database; |
当显示NO的时候表示没有开启,需要调整
1 | alter database force logging; |
开启辅助日志命令:
1 | alter database add supplemental log data; |
开启主键附加日志命令:
1 | alter database add supplemental log data (primary key) columns; |
开启全列附加日志命令:
1 | alter database add supplemental log data (all) columns; |
1 | select force_logging,supplemental_log_data_min from v$database; |
当显示为YES的时候表示开启成功。
1 | mkdir /u01/app/ogg/src |
1 2 3 4 5 6 7 8 9 |
su - oracle vim ~/.bash_profile export OGG_SRC_HOME=/u01/app/ogg/src export LD_LIBRARY_PATH=$ORACLE_HOME/lib:/usr/lib source ~/.bash_profile # 退出oracle用户shell命令: exit |
1 | cd /export/softwares/oracle/ogg |
创建src文件夹是用来存放解压后的OGG源端软件
1 | mkdir /export/softwares/oracle/ogg/src/ |
解压OGG源端软件到src文件夹下
1 | unzip /export/softwares/oracle/ogg/V34339-01.zip -d /export/softwares/oracle/ogg/src/ |
1 | cd /export/softwares/oracle/ogg/src/ |
fbo_ggs_Linux_x64_ora11g_64bit.tar文件才是OGG源端的软件包,解压该文件到/u01/app/ogg/src目录下,执行命令:
1 | tar -xf fbo_ggs_Linux_x64_ora11g_64bit.tar -C /u01/app/ogg/src |
1 | chown -R oracle:oinstall /u01/app/ogg/src |
可以看到/u01/app/ogg/目录下的src属于oracle用户和oinstall组
可以看到/u01/app/ogg/src目录下的所有文件都属于oracle用户和oinstall组
创建表空间在磁盘中的物理路径(需要到root用户操作)
1 2 |
mkdir -p /u01/app/oracle/oggdata/orcl/ chown -R oracle:oinstall /u01/app/oracle/oggdata/orcl |
进入sqlplus
切换到oracle用户:
1 | su - oracle |
登录sqlplus:sqlplus “/as sysdba”
创建oggtbs表空间
1 | create tablespace oggtbs datafile '/u01/app/oracle/oggdata/orcl/oggtbs.dbf' size 500M autoextend on; |
创建ogg用户(用户名和密码都是ogg)
1 | create user ogg identified by ogg default tablespace oggtbs; |
赋予ogg用户dba权限
1 | grant dba to ogg; |
使用oracle用户登录源端OGG的命令行中
1 2 3 |
su – oracle cd $OGG_SRC_HOME ./ggsci |
初始化源端OGG目录
注意:如果不在OGG_SRC_HOME下,初始化OGG目录时会报错
1 | create subdirs |
退出OGG命令行客户端:exit
检查源端OGG初始化后的目录
初始化完成后,可以查询在$OGG_SRC_HOME下是否存在dirchk、dirdat、dirdef、dirjar、dirout、dirpcs、dirprm、dirrpt、dirsql、dirtmp共11个目录。
1 2 3 4 |
#切换到oracle用户: su – oracle #登录sqlplus: sqlplus "/as sysdba" |
1 2 3 4 5 6 7 8 |
#在oracle中创建test_ogg用户: create user test_ogg identified by test_ogg default tablespace users; #为test_ogg用户授权: grant dba to test_ogg; #使用test_ogg用户登录: conn test_ogg/test_ogg; #创建test_ogg表: create table test_ogg(id int ,name varchar(20),primary key(id)); |
1 2 3 4 5 6 |
#切换到oracle用户下: su – oracle #打印源端OGG_SRC_HOME: echo $OGG_SRC_HOME #进入OGG_SRC_HOME: cd $OGG_SRC_HOME |
1 | ./ggsci |
1 | dblogin userid ogg password ogg |
1 | edit param ./globals |
oggschema ogg
然后跟使用vi一样,在新窗口中添加oggschema ogg后保存退出编辑
1 | ./ggsci |
创建mgr进程:
1 | edit param mgr |
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT ,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/,usecheckpoints, minkeepdays 3
参数名称 | 参数说明 |
---|---|
PORT | mgr的默认监听端口 |
DYNAMICPORTLIST | 当指定的mgr端口不可用时,会在这个端口列表中选择一个,最大指定范围为256个 |
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 | 重启EXTRACT进程的参数,最多5次,每次间隔3分钟 |
PURGEOLDEXTRACTS | TRAIL文件的定期清理 |
1 2 3 4 |
./ggsci add trandata test_ogg.test_ogg info trandata test_ogg.test_ogg |
配置Extract进程:
1 | edit param extkafka |
新增内容:
1 | extract extkafka |
dynamicresolution
SETENV (ORACLE_SID = “orcl”)
SETENV (NLS_LANG = “american_america.AL32UTF8”)
userid ogg,password ogg
exttrail /u01/app/ogg/src/dirdat/to
table test_ogg.test_ogg;
参数名称 | 参数说明 |
---|---|
extract extkafka | 定义extract进程名称 |
dynamicresolution | 启用动态解析 |
SETENV (ORACLE_SID = “orcl”) | 设置Oracle数据库 |
SETENV (NLS_LANG = “american_america.AL32UTF8”) | 设置字符集 |
userid ogg,password ogg | OGG连接Oracle数据库的帐号密码 |
exttrail /u01/app/ogg/src/dirdat/to | 定义trail文件的保存位置以及文件名,文件字母最多2个,否则会报错 |
table test_ogg.test_ogg; | 复制表的表名,支持*通配,必须以;结尾 |
添加Extract进程:
1 | add extract extkafka,tranlog,begin now |
将trail文件配置与extract进程绑定:
1 | add exttrail /u01/app/ogg/src/dirdat/to,extract extkafka |
配置Pump进程:
1 | edit param pukafka |
新增内容:
extract pukafka
passthru
dynamicresolution
userid ogg,password ogg
rmthost localhost mgrport 7809
rmttrail /u01/app/ogg/tgr/dirdat/to
table test_ogg.test_ogg;
?extract进程名称;passthru即禁止OGG与Oracle交互,我们这里使用pump逻辑传输,故禁止即可;dynamicresolution动态解析;userid ogg,password ogg即OGG连接Oracle数据库的帐号密码rmthost和mgrhost即目标端(kafka)OGG的mgr服务的地址以及监听端口;rmttrail即目标端trail文件存储位置以及名称。
参数名称 | 参数说明 |
---|---|
extract pukafka | 定义pump进程名称 |
passthru | 因使用了pump逻辑传输,所以禁止OGG与Oracle交互 |
dynamicresolution | 配置动态解析 |
userid ogg,password ogg | OGG连接Oracle数据库的帐号密码 |
rmthost localhost mgrport 7809 | 目标端OGG的mgr服务的地址以及监听端口 |
rmttrail /u01/app/ogg/tgr/dirdat/to | 目标端OGG的trail文件存储位置以及名称 |
table test_ogg.test_ogg; | 要采集的表,必须使用;结尾 |
将源端trail文件绑定到Extract进程:
1 | add extract pukafka,exttrailsource /u01/app/ogg/src/dirdat/to |
将目标端trail文件绑定到Extract进程:
1 | add rmttrail /u01/app/ogg/tgr/dirdat/to,extract pukafka |
?注意:该文件用来在异构数据源之间传输时,需明确知道表之间的映射关系,比如:Oracle与MySQL,Hadoop集群(HDFS,Hive,kafka等)等之间数据传输可以定义为异构数据类型的传输,故需要定义表之间的关系映射,在OGG命令行执行:
配置define文件:
1 | edit param test_ogg |
defsfile /u01/app/ogg/src/dirdef/test_ogg.test_ogg
userid ogg,password ogg
table test_ogg.test_ogg;
生成表schema文件:(在OGG_SRC_HOME目录下执行(oracle用户))
1 | ./defgen paramfile dirprm/test_ogg.prm |
将生成的/u01/app/ogg/src/dirdef/test_ogg.test_ogg发送的目标端ogg目录下的dirdef里:
1 | scp -r /u01/app/ogg/src/dirdef/test_ogg.test_ogg /u01/app/ogg/tgr/dirdef/ |
因为目标端目录还没有创建,因此发送文件可能会失败,所以执行完目标端配置后发送即可
1 | mkdir /u01/app/ogg/tgr |
1 | su oracle |
注意【非常重要】:在这里,需要调整oracle用户的.bash_profile,主要是更新LD_LIBRARY_PATH的值,来确保源端和目标端的ggsci命令都可以正常使用。如果不更新的话,目标端ggsci命令可以使用,但源端的ggsci命令无法使用,会报错./ggsci: error while loading shared libraries: libclntsh.so.11.1: cannot open shared object file: No such file or directory
1 2 3 4 5 6 7 8 9 |
vim ~/.bash_profile export OGG_TGR_HOME=/u01/app/ogg/tgr export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:$ORACLE_HOME/lib:/usr/lib source ~/.bash_profile #退出oracle用户shell命令: exit |
1 | cd /export/softwares/oracle/ogg |
创建tgr文件夹是用来存放解压后的OGG目标端软件
1 | mkdir -p /export/softwares/oracle/ogg/tgr/ |
解压OGG目标端软件到tgr文件夹下
1 | unzip /export/softwares/oracle/ogg/V971332-01.zip -d /export/softwares/oracle/ogg/tgr/ |
1 | cd /export/softwares/oracle/ogg/tgr/ |
ggs_Adapters_Linux_x64.tar文件是真正的OGG目标端软件包,解压该文件到/u01/app/ogg/tgr目录下,执行命令:
1 | tar -xf ggs_Adapters_Linux_x64.tar -C /u01/app/ogg/tgr/ |
1 | chown -R oracle:oinstall /u01/app/ogg/tgr |
可以看到/u01/app/ogg/目录下的tgr属于oracle用户和oinstall组。
1 | ll /u01/app/ogg/tgr |
可以看到/u01/app/ogg/tgr目录下的所有文件都属于oracle用户和oinstall组。
1 | su oracle |
切换oracle用户时需要重新加载环境变量:
1 2 3 |
source ~/.bash_profile cd $OGG_TGR_HOME ./ggsci |
1 | create subdirs |
退出OGG命令行客户端:exit
将生成的/u01/app/ogg/src/dirdef/test_ogg.test_ogg发送的目标端ogg目录下的dirdef里:
1 | scp -r $OGG_SRC_HOME/dirdef/test_ogg.test_ogg $OGG_TGR_HOME/dirdef/ |
1 | tar -zxf /export/softwares/zookeeper-3.4.14.tar.gz -C /export/services/ |
创建软连接:
1 | ln -s /export/services/zookeeper-3.4.14 /export/services/zookeeper |
创建zoo.cfg:
1 | cp /export/services/zookeeper/conf/zoo_sample.cfg /export/services/zookeeper/conf/zoo.cfg |
配置zoo.cfg:
1 | vim /export/services/zookeeper/conf/zoo.cfg |
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/export/datas/zookeeper/data
dataLogDir=/export/datas/zookeeper/log
clientPort=2181
创建ZooKeeper的数据路径:
1 2 |
mkdir -p /export/datas/zookeeper/data mkdir -p /export/datas/zookeeper/log |
添加到环境变量:
1 | vim /etc/profile |
export ZOOKEEPER_HOME=/export/services/zookeeper
export PATH=.:Z
O
O
K
E
E
P
E
R
H
O
M
E
/
b
i
n
:
ZOOKEEPER_HOME/bin:
ZOOKEEPERH?OME/bin:JAVA_HOME/bin:
J
A
V
A
H
O
M
E
/
j
r
e
/
b
i
n
:
JAVA_HOME/jre/bin:
JAVAH?OME/jre/bin:PATH
1 | source /etc/profile |
启动ZooKeeper:
1 2 |
zkServer.sh start zkServer.sh status |
1 | tar -zxf /export/softwares/kafka_2.11-2.2.0.tgz -C /export/services/ |
创建软连接:
1 | ln -s /export/services/kafka_2.11-2.2.0 /export/services/kafka |
配置server.prperties:
1 | vim /export/services/kafka/config/server.properties |
listeners=PLAINTEXT://server01:9092
broker.id=0
zookeeper.connect=server01:2181
添加环境变量:
1 | vim /etc/profile |
1 2 |
export KAFKA_HOME=/export/services/kafka export PATH=.:\$KAFKA_HOME/bin:\$ZOOKEEPER_HOME/bin:\$JAVA_HOME/bin:\$JAVA_HOME/jre/bin:\$PATH |
1 | source /etc/profile |
启动Kafka:
1 | kafka-server-start.sh -daemon /export/services/kafka/config/server.properties |
创建主题:
1 | kafka-topics.sh --create --zookeeper server01:2181 --replication-factor 1 --partitions 1 --topic test_ogg |
查看主题:
1 | kafka-topics.sh --list --zookeeper server01:2181 |
使用oracle用户进入OGG_SRC_HOME目录下
切换到oracle用户下:
1 | su – oracle |
打印目标端OGG_TGR_HOME:
1 | echo $OGG_TGR_HOME |
进入OGG_TGR_HOME:
1 | cd $OGG_TGR_HOME |
启动ggsci:
1 | ./ggsci |
配置目标端MRG进程
配置MGR进程:
1 | edit param mgr |
新增内容:
PORT 7810
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
1 | edit param ./GLOBALS |
新增内容:
CHECKPOINTTABLE test_ogg.checkpoint
配置目标端Replicate进程
配置replicate进程:
1 | edit param rekafka |
REPLICAT rekafka
sourcedefs /u01/app/ogg/tgr/dirdef/test_ogg.test_ogg
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP test_ogg.test_ogg, TARGET test_ogg.test_ogg;
添加trail文件到Replicate进程
1 | add replicat rekafka exttrail /u01/app/ogg/tgr/dirdat/to,checkpointtable test_ogg.checkpoint |
1 2 |
cd $OGG_TGR_HOME vim dirprm/kafka.props |
新增内容:
gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=test_ogg
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.mode=op
gg.classpath=dirprm/:/export/services/kafka/libs/*:/u01/app/ogg/tgr/:/u01/app/ogg/tgr/lib/*
1 2 |
cd $OGG_TGR_HOME vim dirprm/custom_kafka_producer.properties |
新增内容:
1 2 3 4 5 6 7 8 |
bootstrap.servers=server01:9092 acks=1 compression.type=gzip reconnect.backoff.ms=1000 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer batch.size=102400 linger.ms=10000 |
在目标端,主要做了4个操作,共包括2个进程,分别是MANAGER和REPLICAT。
前提:切换到 oracle 账号且启动了 Oracle
注意:要严格按照启动顺序执行:
1 2 3 4 5 6 7 8 |
cd $OGG_SRC_HOME ./ggsci #查看所有进程状态: info all #启动MANAGER进程: start mgr #检查所有进程状态: info all |
1 2 3 4 5 6 7 |
cd $OGG_TGR_HOME ./ggsci #启动MANAGER进程: start mgr #查看所有进程状态: info all |
3. 启动源端extract进程
1 2 3 4 5 6 |
cd $OGG_SRC_HOME ./ggsci #启动EXTRACT进程: start extkafka #查看所有进程状态: info all |
1 | start pukafka |
查看所有进程状态:
1 | info all |
1 2 |
cd $OGG_TGR_HOME ./ggsci |
启动replicat进程:
1 | start rekafka |
查看所有进程状态:
1 | info all |
1 2 3 |
su – oracle sqlplus "/as sysdba" conn test_ogg/test_ogg |
1 | conn test_ogg/test_ogg |
查看该用户拥有的表:
1 | select table_name from user_tables; |
查看TEST_OGG表的字段信息:
1 | select column_name,data_type from user_tab_columns where table_name = upper('TEST_OGG'); |
插入数据:
1 | insert into test_ogg values(1,'beijing'); |
执行Commit:
1 | commit; |
查看kafka是否多出了一个叫做test_ogg的主题:
1 | kafka-topics.sh --list --zookeeper server01:2181 |
然后启动kafka的消费者来消费test_ogg主题的数据:
1 | kafka-console-consumer.sh --bootstrap-server server01:9092 --topic test_ogg --from-beginning |
再查看kafka的test_ogg主题下是否有了数据:
执行修改数据(修改id=2的name为china-shanghai):
1 | update test_ogg set name='china-shanghai' where id=2; |
查看kafka中是否有了id为2的china-shanghai的这条记录:
删除id为2的数据:
1 | delete test_ogg where id=2; |
检查kafka是否多个一条被标记为删除的数据:
1 2 |
:{<!-- -->"table":"TEST_OGG.TEST_OGG","op_type":"I","op_ts":"2020-05-28 09:22:18.000129", {<!-- -->"table":"TEST_OGG.TEST_OGG","op_type":"U","op_ts":"2020-05-28 09:25:17.000140","current_ts":"2020-05-28T09:25:22.085000","pos":"00000000000000001227","before":{<!-- -->},"after":{<!-- -->"ID":1,"NAME":"china-shanghai"}} |
1 | ll /u01/app/ogg/src/dirdat/ |
目标端会接收到源端pump进程传过来的数据文件:
1 | ll /u01/app/ogg/tgr/dirdat/ |
源端错误日志路径
/u01/app/ogg/src/ggserr.log
目标端错误日志路径
/u01/app/ogg/tgr/ggserr.log