PostgreSQL内核源码分析 逻辑复制基本流程,发布订阅创建背后的故事

06-01 1179阅读

逻辑复制代码框架

​专栏内容:

  • postgresql使用入门基础
  • 手写数据库toadb
  • 并发编程

    个人主页:我的主页

    管理社区:开源数据库

    座右铭:天行健,君子以自强不息;地势坤,君子以厚德载物.

    ✅ 🔥🔥🔥重大消息🔥🔥🔥 ❤️❤️❤️❤️ 关注公众号【开源无限】可免费领取《手写数据库内核toadb》源代码一份 ❤️❤️❤️❤️

    一、概述


    在我们使用数据库时,往往需要感知数据库中某些数据库对象的变化,比如表中insert/update操作使数据发生了变化。

    为了及时得到数据的变化,我们常常会开启一个循环和定时器,不断的查询比较,这个工作非常耗时和容易出错,还不是很准确,令人非常头疼。

    在Postgresql中有两种实时的复制模式:

    • 一种对文件内容的复制,也就是文件中二进制数据直接复制,也称为物理复制;
    • 另一种是对数据库对象的复制,数据库对象比如table, database都是逻辑概念,所以也称为逻辑复制;

      逻辑复制本身就是基于数据库对象的变化,当有变化时就需要产生复制事件,这一特性刚好就可以用来作为数据库对象的变化事件,这样只需要订阅这一事件即可,由数据库来检查数据库对象的变化,并通知我们,即省力就及时。

      PostgreSQL中如何实现逻辑复制功能呢?我们从几个方面来逐层展开介绍,首先介绍一下逻辑复制的代码结构,再来看一下产生通知的流程,以及如何应用到备份。

      本文就来分享一下逻辑复制的代码框架结构,在整体上对逻辑复制有初步的认识。

      二、 创建发布与订阅


      在create publication与create subscription之后,整个逻辑复制就建立起来了,那么我们首先来看一下这两个命令处理中做了什么事情。

      2.1 发布

      在主库创建发布,它是一个命令处理,一般在src/backend/commands/路径下就有对应的命令处理,每个命令会对应一个或多个源代码文件,可以看到src/backend/commands/publicationcmds.c,就是发布命令的处理代码了。

      在这个源文件中,可以看到创建create,删除 remove, 修改 alter等几个接口。

      我们重点来看创建发布者中主要逻辑,下面摘选了代码中的部分内容来分析。

      PostgreSQL内核源码分析 逻辑复制基本流程,发布订阅创建背后的故事
      (图片来源网络,侵删)
      /*
       * Create new publication.
       */
      ObjectAddress
      CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt);
      
      • 检查是否已经存在
        	puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
        							 CStringGetDatum(stmt->pubname));
        	if (OidIsValid(puboid))
        		ereport(ERROR,
        				(errcode(ERRCODE_DUPLICATE_OBJECT),
        				 errmsg("publication \"%s\" already exists",
        						stmt->pubname)));
        
        • 生成新发布者

          一个新的发布者,新产生一行数据,插入到系统表中,以及相关系统表的处理。

              /*  */
          	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
          	recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
          	ObjectAddressSet(myself, PublicationRelationId, puboid);
          
          • 数据库对象的处理

            下面是对于发布者关联数据库对象的处理,数据库对象为:所有表,某个表或某个字段,或者某个schema等等

            PostgreSQL内核源码分析 逻辑复制基本流程,发布订阅创建背后的故事
            (图片来源网络,侵删)
            	if (stmt->for_all_tables)
            	{
            		/* Invalidate relcache so that publication info is rebuilt. */
            		CacheInvalidateRelcacheAll();
            	}
            	else
            	{
            		/* FOR TABLES IN SCHEMA requires superuser */
            		if (schemaidlist != NIL && !superuser())
            			ereport(ERROR,
            					errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
            					errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
            		if (relations != NIL)
            		{
            			PublicationAddTables(puboid, rels, true, NULL);
            		}
            		if (schemaidlist != NIL)
            		{
            			PublicationAddSchemas(puboid, schemaidlist, true, NULL);
            		}
            	}
            
            • 配置参数

              最后对于配置参数的检查,逻辑复制时WAL_LEVEL必须为logical

                  /*  */
              	if (wal_level != WAL_LEVEL_LOGICAL)
              		ereport(WARNING,
              				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
              				 errmsg("wal_level is insufficient to publish logical changes"),
              				 errhint("Set wal_level to \"logical\" before creating subscriptions.")));
              

              如果在搭建逻辑复制时,没有修改配置参数,这里就会报错,创建发布失败。

              PostgreSQL内核源码分析 逻辑复制基本流程,发布订阅创建背后的故事
              (图片来源网络,侵删)

              好了,至此发布源就创建好了,就等待订阅任务来触发了。

              2.2 订阅

              通过在备库创建订阅,来启动对数据库对象进行逻辑复制。

              创建订阅主要会进行如下几步:

              • 系统表中创建订阅记录

                同样create subscription命令的执行对应的源代码在src/backend/commands/subscriptioncmds.c中,下面我们摘取重点代码进行分享。

                创建订阅的函数如下,同样还有删除和修改的函数。

                /*
                 * Create new subscription.
                 */
                ObjectAddress
                CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
                				   bool isTopLevel);
                

                首先是命令检查,权限检查,同时还会在系统表中检查是否已经存在。

                这里使用了动态库libpqwalreceiver来处理连接和WAL接收,它是一个公共组件,在这里会用,同样在流复制的工具中也会用到。

                	load_file("libpqwalreceiver", false);
                	walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
                

                在系统表pg_subscription中插入一条新的数据,同时处理与之依赖的其它系统表的数据。

                tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
                	/* Insert tuple into catalog. */
                	CatalogTupleInsert(rel, tup);
                
                • 与发布者校验信息

                  通过动态库,创建与发布者的连接。

                  wrconn = walrcv_connect(conninfo, true, true, must_use_password,
                  								stmt->subname, &err);
                  

                  从发布者获取订阅的表的信息,还有数据库的版本信息等,进行检查。

                  • 启动 worker

                    当上面订阅相关工作处理完成后,此时会注册一个在事务提交时执行的任务,这个任务就是通知logical replication launcher进程。

                    	if (opts.enabled)
                    		ApplyLauncherWakeupAtCommit();
                    

                    此后create subscription命令主体就结束了,最后事务提交后,在logical replication launcher进程中会收到信号通知。

                    当收到信号之后,会检查当前集簇中的订阅列表,如果订阅是启用状态,给每个订阅对应的启动一个logical replication apply worker进程,

                    用于从主库接收逻辑复制数据,并应用于备库。

                    这里查询备库对应的后台进程,可以看到多了一个postgres: logical replication apply worker for subscription 16397进程。

                    [senllang@hatch bin]$ ps -ef|grep postgres |grep 2477570
                    senllang 2477570       1  0 Nov19 ?        00:00:00 /opt/postgres/bin/postgres -p 5433
                    senllang 2477571 2477570  0 Nov19 ?        00:00:00 postgres: checkpointer
                    senllang 2477572 2477570  0 Nov19 ?        00:00:00 postgres: background writer
                    senllang 2477574 2477570  0 Nov19 ?        00:00:00 postgres: walwriter
                    senllang 2477575 2477570  0 Nov19 ?        00:00:00 postgres: autovacuum launcher
                    senllang 2477576 2477570  0 Nov19 ?        00:00:00 postgres: logical replication launcher
                    senllang 2515681 2477570  0 08:26 ?        00:00:00 postgres: logical replication apply worker for subscription 16397
                    
                    • 建立主备间的逻辑复制通信

                      当备库的apply worker进程进行工作之后,会主动连接主库,主库也会创建一个walsender进程,来专门处理此订阅的WAL数据发送和逻辑复制的流程控制。

                      [senllang@hatch bin]$ ps -ef|grep postgres |grep 2477547
                      senllang 2477547       1  0 Nov19 ?        00:00:00 /opt/postgres/bin/postgres -D pgA
                      senllang 2477548 2477547  0 Nov19 ?        00:00:00 postgres: checkpointer
                      senllang 2477549 2477547  0 Nov19 ?        00:00:00 postgres: background writer
                      senllang 2477551 2477547  0 Nov19 ?        00:00:00 postgres: walwriter
                      senllang 2477552 2477547  0 Nov19 ?        00:00:00 postgres: autovacuum launcher
                      senllang 2477553 2477547  0 Nov19 ?        00:00:00 postgres: logical replication launcher
                      senllang 2515019 2477547  0 08:21 ?        00:00:00 postgres: senllang postgres ::1(48026) idle
                      senllang 2515682 2477547  0 08:26 ?        00:00:00 postgres: walsender senllang postgres ::1(57106) START_REPLICATION
                      

                      查询主数据库的后台服务进程,可以看到也会多出来一个postgres: walsender senllang postgres ::1(57106) START_REPLICATION进程,

                      START_REPLICATION是逻辑复制的状态信息。

                      至此,整个逻辑复制从主库到备库的通信链路就建立起来了,当主库有事务提交时,就会有WAL日志的落盘,此时会触发walsender来检查,当有符合当前订阅的WAL数据时,就会发送到订阅端,订阅端会进行应用到当前备库中。

                      五、总结


                      本文主要分享了逻辑复制搭建时的内核框架代码逻辑,在主库创建发布者,在备库创建订阅时,会主动与发布者建立连接,此时发布者才会准备WAL进行发布,整个流程概览就是这样,内部细节步骤还有很多,后面会分多篇进行分类介绍。

                      结尾


                      非常感谢大家的支持,在浏览的同时别忘了留下您宝贵的评论,如果觉得值得鼓励,请点赞,收藏,我会更加努力!

                      作者邮箱:study@senllang.onaliyun.com

                      如有错误或者疏漏欢迎指出,互相学习。

                      注:未经同意,不得转载!

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

目录[+]

取消
微信二维码
微信二维码
支付宝二维码