Flink CDC实时数据同步

大数据数据中台技术服务知识库
前言

本实验以DataLeap on Las为例,完成数据从datagen随机生成到mysql的数据同步。 由于现阶段DataLeap与Las服务以华北2(北京)-可用区A(cn-beijing-a)为主,以下相关的私有网络等产品都指此地域&可用区。

当前现有LAS Flink 支持的Connector见:https://www.volcengine.com/docs/6492/130252

关于实验
  • 预计部署时间:40分钟
  • 级别:中级
  • 相关产品:大数据开发套件、湖仓一体分析服务LAS
  • 受众: 通用

环境说明

  1. 已购买开通私有网络服务
  2. 已购买开通DataLeap产品
  3. 已购买开通湖仓一体LAS服务
  4. 子账户具备DataLeap相关权限(参考:https://www.volcengine.com/docs/6260/65408)
  5. 已购买并开通云数据库MYSQL版本服务
实验说明

步骤1:创建MYSQL实例

图片 创建mysql实例可参考:https://www.volcengine.com/docs/6313/75366

步骤2:配置MYSQL访问白名单

本实验案例中,白名单主要设置私有网络的IPv4 CIDR 图片 具体的白名单设置方法,可参考:https://www.volcengine.com/docs/6357/96144

步骤3:创建库表

在mysql中,通过登录控制台,创建本案例中Flink CDC Sink相关的库表 图片

创建MYSQL库

CREATE DATABASE demo;

创建MYSQL表

CREATE TABLE `student_sink` (
 `id` int(11) NOT NULL AUTO_INCREMENT, 
 `name` text, `subject` text, 
 `score` int(20) DEFAULT NULL, 
 PRIMARY KEY (`id`) ) 
 ENGINE=InnoDB AUTO_INCREMENT=48796 DEFAULT CHARSET=utf8

步骤4:创建Flink-SQL任务

如果没有DataLeap项目,需要先新建项目。新建项目后,进入项目中的“数据开发”,进入开发页面。 在本实验案例中,LAS Flink VPC与MYSQL VPC属不同VPC,因此需要利用Connector跨VPC访问方式,实现Flink跨VPC访问的数据访问。具体可参考:https://www.volcengine.com/docs/6492/146363

-- 开启跨VPC访问
set las.cross.vpc.access.enabled=true;

-- 指定私有 VPC ID
set las.cross.vpc.vpc.id=替换自己的VPC ID;

-- 指定子网 ID
set las.cross.vpc.subnet.id=替换自己的子网ID;

-- 指定安全组 ID
set las.cross.vpc.security.group.id=替换自己的安全组;

CREATE  TEMPORARY TABLE student_source (
            id      INT,
            name    STRING,
            subject STRING,
            score   INT,
            primary key (id) NOT enforced
        )
        WITH (
            'connector' = 'datagen',
            'rows-per-second' = '5',
            'fields.id.kind' = 'sequence',
            'fields.id.start' = '1',
            'fields.id.end' = '50'
        );

CREATE  TEMPORARY TABLE student_sink (
            id      INT,
            name    STRING,
            subject STRING,
            score   INT,
            primary key (id) NOT enforced
        )
        WITH (
            'connector' = 'jdbc',
            'url' = 'jdbc:mysql://实例:端口/demo?serverTimezone=UTC&useSSL=false',
            'username' = '账户',
            'password' = '密码',
            'table-name' = 'student_sink'
        );

INSERT INTO student_sink
SELECT  *
FROM    student_source

在本实验中,用到了Flink提供的随机数据生成Connector:datagen,该Connector 选项参数详情如下: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/datagen/

选项要求默认值类型描述
connector必填String指定要使用的连接器,本例子中指定datagen
rows-per-second选填10000Long每秒发送数据的速度
number-of-rows选填Long要发送的行总数,默认情况下,是无界的
fields.#.kind选填randomStringsequence或random
fields.#.min选填随机生成器的最小值,适用于数字类型
fields.#.max选填随机生成器的最大值,适用于数字类型
fields.#.max-past选填0DurationMaximum past of timestamp random generator, only works for timestamp types.
fields.#.length选填100Integer用于生成char/varchar/string/array/map/multiset类型的集合的大小或长度
fields.#.start选填序列生成器的起始值
fields.#.end选填序列生成器的最终值

步骤5:作业调试并提交

填写运行参数 图片

调试并提交 图片

提交记录在LAS的“作业管理”标签中可以查询 作业提交后,需检查作业的运行状态是否正常,提交日志、执行日志是否有异常日志打印。如发现异常无法处理,请联系客服获取帮助。 图片

登录MYSQ客户端,检查数据写入情况 图片 关于LAS的作业编写与调试,可参考:https://www.volcengine.com/docs/6260/80007

如果您有其他问题,欢迎您联系火山引擎技术支持服务

21
0
0
0
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论