SpringBoot整合Flink CDC,实时追踪mysql数据变动

06-01 1069阅读
❃博主首页 : 「码到三十五」 ,同名公众号 :「码到三十五」,wx号 : 「liwu0213」

☠博主专栏 :

♝博主的话 : 搬的每块砖,皆为峰峦之基;公众号搜索「码到三十五」关注这个爱发技术干货的coder,一起筑基


我们将整合Spring Boot和Apache Flink CDC(Change Data Capture)来实现实时数据追踪。下面是一个基本的实践流程代码,包括搭建Spring Boot项目、整合Flink CDC以及实现数据变动的实时追踪。

文章目录

      • 前言
      • 1. MySQL开启Binlog
      • 2. 创建Spring Boot项目
      • 3. 添加依赖
      • 4. 配置Flink和MySQL CDC
      • 5. 实现数据实时追踪
      • 6. 启动Spring Boot应用
      • 7. 运行并测试

        前言

        Flink CDC(Flink Change Data Capture)是一种基于数据库日志的CDC技术,它实现了一个全增量一体化的数据集成框架。与Flink计算框架相结合,Flink CDC能够高效地实现海量数据的实时集成。其核心功能在于实时监视数据库或数据流中的数据变动,并将这些变动抽取出来,以便进行进一步的处理和分析。借助Flink CDC,用户可以轻松地构建实时数据管道,实时响应和处理数据变动,为实时分析、实时报表和实时决策等场景提供有力支持。

        Flink CDC的应用场景广泛,包括但不限于实时数据仓库更新、实时数据同步和迁移以及实时数据处理等。它还能确保数据一致性,并在数据发生变更时准确地进行捕获和处理。此外,Flink CDC支持与多种数据源进行集成,如MySQL、PostgreSQL、Oracle等,并提供了相应的连接器,便于数据的捕获和处理。

        接下来,将详细介绍MySQL CDC的使用。MySQL CDC连接器允许从MySQL数据库中读取快照数据和增量数据。

        1. MySQL开启Binlog

        MySQL中开启binlog功能,需要修改配置文件中(如Linux的/etc/my.cnf或Windows的\my.ini)的[mysqld]部分设置相关参数:

        [mysqld]
        server-id=1
        # 设置日志格式为行级格式
        binlog-format=Row
        # 设置binlog日志文件的前缀
        log-bin=mysql-bin
        # 指定需要记录二进制日志的数据库
        binlog_do_db=testjpa
        

        除了开启binlog功能外,还需要为Flink CDC配置相应的权限,以确保其能够正常连接到MySQL并读取数据。这包括授予Flink CDC连接MySQL的用户必要的权限,如SELECT、REPLICATION SLAVE、REPLICATION CLIENT、SHOW VIEW等。这些权限是Flink CDC读取数据和元数据所必需的。

        检查是否已开启binlog功能:

        mysql> SHOW VARIABLES LIKE 'log_bin';
        +---------------+-------+
        | Variable_name | Value |
        +---------------+-------+
        | log_bin       | ON    |
        +---------------+-------+
        

        至此,MySQL的相关配置已完成。

        2. 创建Spring Boot项目

        首先,你需要创建一个Spring Boot项目。可以使用Spring Initializr(https://start.spring.io/)来快速生成项目。

        3. 添加依赖

        在pom.xml中添加Apache Flink和Flink CDC的依赖。以下是必要的依赖:

            
            
                org.apache.flink
                flink-java
                1.14.0
            
            
                org.apache.flink
                flink-streaming-java_2.12
                1.14.0
            
            
                org.apache.flink
                flink-connector-mysql-cdc
                2.0.0
            
            
            
                org.springframework.boot
                spring-boot-starter
            
        
        

        4. 配置Flink和MySQL CDC

        在Spring Boot的application.yml或application.properties文件中配置Flink和MySQL数据库连接:

        flink:
          checkpoint:
            interval: 10000
          parallelism: 1
        spring:
          datasource:
            url: jdbc:mysql://localhost:3306/your_database
            username: your_username
            password: your_password
        

        5. 实现数据实时追踪

        创建一个服务类来实现数据的实时追踪:

        import org.apache.flink.streaming.api.datastream.DataStream;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
        import org.springframework.stereotype.Service;
        @Service
        public class FlinkCdcService {
            public void startDataStreaming() {
                final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
                // 使用Flink CDC连接MySQL
                String name = "inventory";
                tableEnv.executeSql("CREATE TABLE " + name + " (" +
                    "  id INT," +
                    "  name STRING," +
                    "  description STRING," +
                    "  weight DECIMAL(10, 3)" +
                    ") WITH (" +
                    "  'connector' = 'mysql-cdc'," +
                    "  'hostname' = 'localhost'," +
                    "  'port' = '3306'," +
                    "  'username' = 'your_username'," +
                    "  'password' = 'your_password'," +
                    "  'database-name' = 'your_database'," +
                    "  'table-name' = 'your_table'" +
                    ")");
                // 查询并打印结果
                DataStream dataStream = tableEnv.sqlQuery("SELECT * FROM " + name).execute().print();
                try {
                    env.execute("Flink CDC Demo");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        

        6. 启动Spring Boot应用

        在你的Spring Boot应用的启动类中调用FlinkCdcService的startDataStreaming方法来启动数据追踪:

        import org.springframework.beans.factory.annotation.Autowired;
        import org.springframework.boot.CommandLineRunner;
        import org.springframework.boot.SpringApplication;
        import org.springframework.boot.autoconfigure.SpringBootApplication;
        @SpringBootApplication
        public class FlinkCdcApplication implements CommandLineRunner {
            @Autowired
            private FlinkCdcService flinkCdcService;
            public static void main(String[] args) {
                SpringApplication.run(FlinkCdcApplication.class, args);
            }
            @Override
            public void run(String... args) throws Exception {
                flinkCdcService.startDataStreaming();
            }
        }
        

        7. 运行并测试

        运行Spring Boot应用,并在MySQL数据库中做出一些数据变动。你应该能在控制台看到实时打印的数据变动。


        关注公众号[码到三十五]获取更多技术干货 !

        SpringBoot整合Flink CDC,实时追踪mysql数据变动

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

目录[+]

取消
微信二维码
微信二维码
支付宝二维码