PostgreSQL教程

PostgreSQL/lightdb逻辑复制详解

本文主要是介绍PostgreSQL/lightdb逻辑复制详解,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

  之所以有逻辑复制,是因为物理复制是基于数据块的复制,每个实例的数据块是自己维护的,无法做到全局,所以只能借助逻辑块复制,即使是内核集成的HTAP,在行存和列存之间同步时,也采用的是逻辑块复制。逻辑复制可用于很多场景,例如部分数据同步、DW集成、同步到大数据、ES、做流式计算、缓存更新等等,在这些场景中,CDC是非常重要的。

  Postgres 10开始原生支持逻辑复制。

  逻辑复制也称为行级复制或CDC,所以vacuum、index update这些都不会包含。主要用于双主、同步到kafka/redis/gp等场景,因为基于复制协议,理论上也可以做到半同步,性能上可达到流复制的2/3,默认不支持DDL、序列(pglogical可配置)。是否支持多主?(是否可以源端不decode?直接到目标库,技术上可以的。但是因为要基于当时的catalog进行decode以便精确解析出定义,所以会比较麻烦。比如oracle logminer/OGG/xstream就支持在三个地方进行解码,也是为了需要数据字典同步)是否支持可以支持DDL?(BDR支持)

逻辑复制的架构

 

  逻辑复制涉及的组件包括:复制槽(pg_replication_slots)、订阅(pg_subscription)、复制源(pg_replication_origin)、解码插件(plugin)、发布(pg_publication、pg_publication_tables、pg_publication_rel)。其中逻辑复制的消费者不一定要是subscription,可以是其他比如pg_recvlogical。subscription和pg_subscription的存在是为了pg实例之间逻辑复制可以开箱即用。

  在PG的架构上,逻辑解码是发生在wal_sender上的,而不是消费者负责解码(oracle则支持在主机或其它实例,其他实例要求包含catalog,和维护replslot(replication slots 是从 postgresql 9.4 引入的,主要是提供了一种自动化的方法来确保主控机在所有的后备机收到 WAL 段之前不会移除它们,并且主控机也不会移除可能导致恢复冲突的行,即使后备机断开也是如此。 为了防止 WAL 被删,SLOT restart_lsn 之后的WAL文件都不会删除。(wal_keep_segments 则是一个人为设定的边界,slot 是自动设定的边界(无上限),所以使用 slot 并且下游断开后,可能导致数据库的 WAL 堆积爆满)。)中的min_catalog是一样的,否则就解码不出了,wal中包含了oid和blockid,同时pg的wal是full record的,所以apply和解析时都可以非常快),如下:

  对于任何一个有订阅或消费者的复制槽,都有一个对应的walsender(这一点和流复制是一样的)进程,实时等待被wal writer或bgwriter唤醒去读取刚刚提交的xlog,通过信号通知。 

  启动一个pg_recvlogical消费者,

[zjh@hs-10-20-30-193 ~]$ nohup lt_recvlogical -p25432 -Uzjh -d postgres --slot test_for_recvlogical --start -f - &
[1] 237063
[zjh@hs-10-20-30-193 ~]$ nohup: ignoring input and appending output to ??ohup.out?

[zjh@hs-10-20-30-193 ~]$ 
[zjh@hs-10-20-30-193 ~]$ 
[zjh@hs-10-20-30-193 ~]$ 
[zjh@hs-10-20-30-193 ~]$ tail -fn 100 nohup.out 
BEGIN 2594732
table public.users: INSERT: user_id[integer]:5 user_name[character varying]:'data5' gender[integer]:null salary[numeric]:null dept_id[integer]:null create_date[timestamp without time zone]:null update_date[timestamp without time zone]:'2022-04-10 08:30:42.870449'
COMMIT 2594732

  再对应的walsender进程状态。如下:

     libc.so.6!__epoll_wait_nocancel    
>    WaitEventSetWaitBlock(set = 0x1841828, cur_timeout = 30000, occurred_events = 0x7ffd43ffa980, nevents = 1)    C++ (gdb)
     WaitEventSetWait(set = 0x1841828, timeout = 30000, occurred_events = 0x7ffd43ffa980, nevents = 1, wait_event_info = 100663303)    C++ (gdb)
     WaitLatchOrSocket(latch = 0x7f9fcafdb2ec, wakeEvents = 43, sock = 11, timeout = 30000, wait_event_info = 100663303)    C++ (gdb)
     WalSndWaitForWal(loc = 1882311624)    C++ (gdb)
     logical_read_xlog_page(state = 0x18f2030, targetPagePtr = 1882308608, reqLen = 3016, targetRecPtr = 1882311600, cur_page = 0x18fdc18 "\006\321\005")    C++ (gdb)
     ReadPageInternal(state = 0x18f2030, pageptr = 1882308608, reqLen = 3016)    C++ (gdb)
     XLogReadRecord(state = 0x18f2030, errormsg = 0x7ffd43ffab58)    C++ (gdb)
     XLogSendLogical    C++ (gdb)
     WalSndLoop(send_data = 0x891d26 <XLogSendLogical>)    C++ (gdb)   ## 一直等待,直到有wal后调用send_data函数指针指向的XLogSendLogical函数开始写逻辑解码数据
     StartLogicalReplication(cmd = 0x18c1b58)    C++ (gdb)
     exec_replication_command(cmd_string = 0x1812418 "START_REPLICATION SLOT \"test_for_recvlogical\" LOGICAL 0/0")    C++ (gdb)
     PostgresMain(argc = 1, argv = 0x18407b0, dbname = 0x1840720 "postgres", username = 0x1840708 "zjh")    C++ (gdb)
     BackendRun(port = 0x183c740)    C++ (gdb)
     BackendStartup(port = 0x183c740)    C++ (gdb)
     ServerLoop    C++ (gdb)
     PostmasterMain(argc = 3, argv = 0x180cf00)    C++ (gdb)
     main(argc = 3, argv = 0x180cf00)    C++ (gdb)

  收到信号后,会调用信号处理器,如procsignal_sigusr1_handler。这一点上和流复制基本一致。

postgresql wal中的origin

 

  origin主要用于记录逻辑复制中记录来源于哪个源,在逻辑分析插件中使用,用于过滤掉不需要的数据来源。

    维护在pg_replication_origin中。既可以由订阅自动创建,也可以人工创建。

--publication
postgres=# \d
       List of relations
 Schema | Name | Type  | Owner  
--------+------+-------+--------
 public | t1   | table | movead
(1 row)
postgres=# create publication pub1 for all tables ;
CREATE PUBLICATION

--------------------------------------------------
--subscription
postgres=# create subscription sub1 connection 'host=xxxxxxxx port=5432 dbname=postgres user=movead' publication pub1;
NOTICE:  created replication slot "sub1" on publisher
CREATE SUBSCRIPTION
postgres=#

postgres=# select * from pg_replication_origin;
 roident |   roname    
---------+-------------
       1 | pg_16389

postgres=# select pg_replication_origin_create('test_origin');   -- 用给定的外部名称创建一个复制源,并且返回分配给它的内部ID。
 pg_replication_origin_create 
------------------------------
                            2  -- 返回的是origin id
(1 row)

postgres=# select * from pg_replication_origin;
 roident |   roname    
---------+-------------
       1 | pg_16389
       2 | test_origin
(2 rows)
postgres=#

  对于订阅产生的origin,可以通过在pub端插入数据,然后分析wal,就可以看出wal打标记了。

rmgr: Transaction len (rec/tot):     65/    65, tx:        519, lsn: 0/03000068, prev 0/03000028, desc: COMMIT 2020-04-16 17:09:01.989257 CST; origin: node 1, lsn 0/0, at 2020-04-16 17:05:49.767511 CST


rmgr: Transaction len (rec/tot):     65/    65, tx:        520, lsn: 0/03000128, prev 0/030000E8, desc: COMMIT 2020-04-16 17:09:09.327941 CST; origin: node 2, lsn 0/156BB28, at 2020-04-16 17:09:09.268948 CST

  originid通过roident标识:

postgres=#  \d pg_replication_origin
    Table "pg_catalog.pg_replication_origin"
 Column  | Type | Collation | Nullable | Default 
---------+------+-----------+----------+---------
 roident | oid  |           | not null | 
 roname  | text | C         | not null | 

  对于手工创建的origin,需要调用pg_replication_origin_session_setup () API绑定会话到origin。

postgres=# select pg_replication_origin_session_setup('test_origin'); -- 将当前会话标记为从给定的原点回放,从而允许跟踪回放进度。 只能在当前没有选择原点时使用。使用pg_replication_origin_session_reset 命令来撤销。
 pg_replication_origin_session_setup 
-------------------------------------
(1 row)
postgres=# insert into t1 values(100);select pg_current_wal_lsn();   -- 会话必须绑定到origin会话,wal中才会标记origin
INSERT 0 1
 pg_current_wal_lsn 
--------------------
 0/4000230
(1 row)

   自带插件test_decoding可以实现Origin的解析使用。

   就一般使用而言,解码器可以考虑wal2json、pglogical。但是他们俩都属于组件级别,算不到产品级。如果没有自研能力,基本使用可以考虑Debezium。

http://www.postgres.cn/docs/13/functions-admin.html 

https://www.highgo.ca/2020/04/18/the-origin-in-postgresql-logical-decoding/

https://www.postgresql.org/docs/14/replication-origins.html 

https://www.postgresql.org/docs/14/logicaldecoding.html 

https://www.postgresql.fastware.com/blog/logical-replication-tablesync-workers   双进程体系

https://wiki.postgresql.org/wiki/Logical_replication_and_physical_standby_failover 

逻辑复制入门系列

https://blog.anayrat.info/en/2017/07/29/postgresql-10-and-logical-replication-overview/

https://blog.anayrat.info/en/2017/08/05/postgresql-10-and-logical-replication-setup/

https://blog.anayrat.info/2017/08/27/postgresql-10-et-la-r%C3%A9plication-logique-restrictions/

https://blog.anayrat.info/en/2018/03/10/logical-replication-internals/ 

 

https://www.slideshare.net/PetrJelinek1/logical-replication-in-postgresql-flossuk-2016

https://www.slideshare.net/UmairShahid16/logical-replication-with-pglogical

https://www.slideshare.net/AlexanderShulgin3/streaming-huge-databases-using-logical-decoding  每行记录都包含的元数据,非常耗费资源,需要考虑下其他序列化机制。

avro(相当于带schema定义的JSONB/BSON),kafka中的应用

https://docs.oracle.com/database/121/XSTRM/xstrm_pt_concepts.htm#XSTRM72454  oracle逻辑复制

https://zhuanlan.zhihu.com/p/163204827

 

这篇关于PostgreSQL/lightdb逻辑复制详解的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!