DataX:数据同步json配置详解|mysql同步到clickhouse案例

06-01 1580阅读

文章目录

  • 0. 引言
  • 1. datax配置详解
    • 1.1 setting
      • 1.1.1 speed
      • 1.1.2 errorLimit
      • 1.2 content
        • 1.2.1 reader
        • 1.2.2 writer
        • 2. mysql同步至clickhouse案例
          • 2.1 全量同步
            • 补充:出现报错`在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数`
            • 2.2 增量同步

              0. 引言

              上节我们讲解了datax的安装和基本工作流程,本期我们具体讲解datax同步的配置项含义,以及如何利用datax实现全量、增量数据同步

              1. datax配置详解

              datax中的json同步配置文件在官方文档中都可以找到示例

              DataX:数据同步json配置详解|mysql同步到clickhouse案例

              其配置文件主要分为以下结构:

              |-- setting
                 |-- speed
                 |-- errorLimit
              |-- content
                 |-- reader
                 |-- writer
              

              以下我们以mysql的为例,具体讲解其参数含义

              1.1 setting

              1.1.1 speed

              DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,用以灵活控制作业速度,让同步作业在库可以在承受的范围内达到最佳的同步速度

              "speed": {
                 "channel": 5,
                 "byte": 1048576,
                 "record": 10000
              }
              
              • channel:管道数,即并行数,即DataX会使用多少个并行通道进行数据传输。默认值为1,需与splitPk一同使用,否则无效果,也就是说要配置并行数,同时还要指定拆分字段splitPk,否则并行数会退化为1个,该字段具体如何选择,我们在下文讲解
              • record:每次同步多少条数据,取record和byte中的最小值
              • byte:每次同步多少字节数据,取record和byte中的最小值

                1.1.2 errorLimit

                errorLimit为错误数据限制,这里有两个参数record和percentage,指当异常数据达到多少时同步取消,取record和percentage的最小值

                数据同步时,如果数据中包含格式不正确的字段(如日期、数字等),可能会导致解析失败,这时就会判定为错误数据。

                "errorLimit": {
                        "record": 0,
                        "percentage": 0.02
                      }
                
                • record: 达到错误的条数
                • percentage:达到错误的百分比

                  1.2 content

                  1.2.1 reader

                  reader模块的作用是从数据源读取数据,它是整个数据同步任务的数据来源端

                  "reader": {
                            "name": "mysqlreader",
                            "parameter": {
                              "username": "yRjwDFuoPKlqya9h9H2Amg==",
                              "password": "yiAxzw1jSbbutLBTMTLrAA==",
                              "column": [
                                "`id`",
                                "`name`",
                                "`sex`",
                                "`number`"
                              ],
                              "splitPk": "",
                              "connection": [
                                {
                                  "table": [
                                    "src_zhang"
                                  ],
                                  "jdbcUrl": [
                                    "jdbc:mysql://192.168.6.23:3308/2024122710082_default"
                                  ]
                                }
                              ]
                            }
                          }
                  
                  • name:用于指定reader的类型,如mysqlreader、oraclereader
                  • parameter.username:来源数据库连接账号
                  • parameter.password:来源数据库连接账号密码
                  • parameter.column:要同步的字段,*表示所有字段,也可支持常量配置,但是要遵循对应数据库的sql与法,示例:["id", "table", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3" , "true"]
                  • parameter.connection.table: 要同步的表,*表示所有表
                  • parameter.connection.jdbcUrl:来源数据库的jdbc链接,支持书写多个,如果多个,datax会进行探测,选择一个可以正常链接的。
                  • parameter.splitPk:用于数据分片的字段名,与speed.channel连用,推荐使用表主键(目前splitPk仅支持整形数据切分)。其字段选取遵循以下原则:

                    (1)splitPk应选择一个数据分布相对均匀的字段,以避免某个子任务处理过多数据而导致性能瓶颈,比如一张表中,主键id为自增id, 那么splitPk就可以设置为id;

                    (2)另外要确保splitPk字段在源数据库和目标数据库中都存在,并且类型一致

                    • parameter.where:指定数据过滤条件,只同步满足条件的数据,示例:"where": "id > 10"
                    • parameter.querySql:当不指定table时,可以指定一个sql来查询数据,主要用于需要关联查询的场景,示例:"querySql": ["select * from sys_user"]

                      1.2.2 writer

                      writer模块的作用是将reader模块读取并处理后的数据写入到目标数据源中。它是整个数据同步任务的数据目标端

                      "writer": {
                                          "name": "mysqlwriter",
                                          "parameter": {
                                              "writeMode": "insert",
                                              "username": "root",
                                              "password": "root",
                                              "column": [
                                                  "id",
                                                  "name"
                                              ],
                                              "session": [
                                              	"set session sql_mode='ANSI'"
                                              ],
                                              "preSql": [
                                                  "delete from test"
                                              ],
                                              "connection": [
                                                  {
                                                      "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax?useUnicode=true&characterEncoding=gbk",
                                                      "table": [
                                                          "test"
                                                      ]
                                                  }
                                              ]
                                          }
                                      }
                      
                      • name: 指定writer的类型,例如mysqlwriter、oraclewriter、txtfilewriter等。
                      • parameter.connection: 目标数据库的连接信息,用于与reader类似
                      • parameter.column:目标数据表的字段,与reader中相对应,但是不能指定常量值
                      • parameter.writeMode:写入模式,如insert、update、replace等,用于处理数据冲突。分别代表写入数据时采用的sql语句为insert into 或者 ON DUPLICATE KEY UPDATE或者 replace into
                      • parameter.session:DataX在获取Mysql连接时,执行session指定的SQL语句,修改当前connection session属性
                      • parameter.preSql:写入数据到目的表前,会先执行这个sql。如果 Sql 中有你需要操作到的表名称,请使用 @table 表示,这样在实际执行 Sql 语句时,会对变量按照实际表名称进行替换。比如你的任务是要写入到目的端的100个同构分表(表名称为:datax_00,datax01, … datax_98,datax_99),并且你希望导入数据前,先对表中数据进行删除操作,那么你可以这样配置:“preSql”:[“delete from @table”],效果是:在执行到每个表写入数据前,会先执行对应的 delete from 对应表名称
                      • parameter.postSql:写入后执行的sql,用法同preSql
                      • parameter.batchSize:次性批量提交的记录数大小,该值可以极大减少DataX与Mysql的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成DataX运行进程OOM情况,默认值1024

                        2. mysql同步至clickhouse案例

                        2.1 全量同步

                        1、这里我先单独讲解利用datax创建同步配置文件,然后进行手动调用的场景,后续我们再结合datax-web带大家进行可视化配置

                        根据上文的讲解,查询官方针对mysql reader和clickhouse writer的文档说明,书写配置文件

                        {
                          "job": {
                            "setting": {
                              "speed": {
                                "channel": 3,
                                "byte": 1048576
                              },
                              "errorLimit": {
                                "record": 0,
                                "percentage": 0.02
                              }
                            },
                            "content": [
                              {
                                "reader": {
                                  "name": "mysqlreader",
                                  "parameter": {
                                    "username": "root",
                                    "password": "yiAxzw1jSbbutLBTMTLrAA==",
                                    "column": [
                                      "`id`",
                                      "`name`",
                                      "`sex`",
                                      "`number`"
                                    ],
                                    "splitPk": "id",
                                    "connection": [
                                      {
                                        "table": [
                                          "src_zhang"
                                        ],
                                        "jdbcUrl": [
                                          "jdbc:mysql://192.168.6.23:3308/2024122710082_default"
                                        ]
                                      }
                                    ]
                                  }
                                },
                                "writer": {
                                  "name": "clickhousewriter",
                                  "parameter": {
                                    "username": "default",
                                    "password": "kFBucg+xHfmPLLcml8cg6w==",
                                    "column": [
                                      "id",
                                      "name",
                                      "sex",
                                      "number"
                                    ],
                                    "connection": [
                                      {
                                        "table": [
                                          "src_zhang_a"
                                        ],
                                        "jdbcUrl": "jdbc:clickhouse://192.168.6.23:8123/2025010713174_default"
                                      }
                                    ]
                                  }
                                }
                              }
                            ]
                          }
                        }
                        

                        2、将该json文件mysql2ck.json放到datax安装目录的job目录下

                        3、进入datax安装目录,然后执行同步任务指令

                        python bin/datax.py job/mysql2ck.json
                        

                        DataX:数据同步json配置详解|mysql同步到clickhouse案例

                        4、可以看到执行成功,我们去clickhouse数据库查询,咱们的数据已经同步过来了

                        DataX:数据同步json配置详解|mysql同步到clickhouse案例

                        补充:出现报错在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数

                        2025-01-15 14:45:14 [AnalysisStatistics.analysisStatisticsLog-53] 经DataX智能分析,该任务最可能的错误原因是:
                        2025-01-15 14:45:14 [AnalysisStatistics.analysisStatisticsLog-53] com.alibaba.datax.common.exception.DataXException: Code:[Framework-03], Description:[DataX引擎配置错误,该问题通常是由于DataX安装错误引起,请联系您的运维解决 .].  - 在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数
                        2025-01-15 14:45:14 [AnalysisStatistics.analysisStatisticsLog-53] 	at com.alibaba.datax.common.exception.DataXException.asDataXException(DataXException.java:30)
                        2025-01-15 14:45:14 [AnalysisStatistics.analysisStatisticsLog-53] 	at com.alibaba.datax.core.job.JobContainer.adjustChannelNumber(JobContainer.java:430)
                        2025-01-15 14:45:14 [AnalysisStatistics.analysisStatisticsLog-53] 	at com.alibaba.datax.core.job.JobContainer.split(JobContainer.java:387)
                        2025-01-15 14:45:14 [AnalysisStatistics.analysisStatisticsLog-53] 	at com.alibaba.datax.core.job.JobContainer.start(JobContainer.java:117)
                        2025-01-15 14:45:14 [AnalysisStatistics.analysisStatisticsLog-53] 	at com.alibaba.datax.core.Engine.start(Engine.java:86)
                        2025-01-15 14:45:14 [AnalysisStatistics.analysisStatisticsLog-53] 	at com.alibaba.datax.core.Engine.entry(Engine.java:168)
                        2025-01-15 14:45:14 [AnalysisStatistics.analysisStatisticsLog-53] 	at com.alibaba.datax.core.Engine.main(Engine.java:201)
                        

                        这是因为新版本中单个channel的bps不能为非负数,这个我们调整下datax下的/conf/core.json配置文件,调整其中的core.transport.channel.speed.byte为非负数

                        DataX:数据同步json配置详解|mysql同步到clickhouse案例

                        2.2 增量同步

                        上述我们演示的是全量同步的配置,但是在实际运行时,我们更多需要的是增量定时同步,于是我们就需要额外配置每次同步的限制条件,

                        如果我们的数据是T+1的,并且有创建时间字段,那么可以直接通过where配置项来实现

                        "where": "create_time >= CURDATE()"
                        

                        DataX:数据同步json配置详解|mysql同步到clickhouse案例

                        这里就是通过自带的日期函数来实现,但如果没有日期字段,或者不是按天更新的,那么就需要我们有地方记录上一次更新的偏移量,然后在更新脚本里通过${}占位符声明,如

                        "where": "id >= ${last_max_id}"
                        

                        或者也可以使用querySql参数来定义

                        "connection": [
                                      {
                                        "table": [
                                          "src_zhang"
                                        ],
                                        "jdbcUrl": [
                                          "jdbc:mysql://192.168.6.23:3308/2024122710082_default"
                                        ],
                                        "querySql": ["select id,name,sex,number from src_zhang where id >= ${last_max_id}"]
                                      }
                                    ]
                                  }
                        

                        然后通过datax的-p参数来定义自定义参数,如下,定义自定义参数last_max_id(参数名前加-D配置),datax中会通过该配置将参数值传递到配置文件中并进行替换

                        python bin/datax.py -p "-Dlast_max_id=4" job/mysql2ck_delta.json
                        

                        执行脚本,查看结果,可以看到新同步的数据就是从id>=4开始同步的

                        DataX:数据同步json配置详解|mysql同步到clickhouse案例

                        那么这个-Dlast_max_id=4中的4在哪里维护呢,不能每次都手动修改吧,这里提供两种思路给大家:

                        1、每次执行前查询一下最大的id,然后保存到数据库中,二次运行时,将该值取出,执行完后再次更新新的id到数据库

                        2、通过datax-web来实现增量同步,该组件已经帮我们实现了上述的步骤,我们只需要配置即可,下文我们将讲解如何在datax-web中配置定时增量同步。

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

目录[+]

取消
微信二维码
微信二维码
支付宝二维码