本实验以DataLeap on Las为例,完成数据从datagen随机生成到mysql的数据同步。 由于现阶段DataLeap与Las服务以华北2(北京)-可用区A(cn-beijing-a)为主,以下相关的私有网络等产品都指此地域&可用区。
当前现有LAS Flink 支持的Connector见:https://www.volcengine.com/docs/6492/130252
- 预计部署时间:40分钟
- 级别:中级
- 相关产品:大数据开发套件、湖仓一体分析服务LAS
- 受众: 通用
环境说明
- 已购买开通私有网络服务
- 已购买开通DataLeap产品
- 已购买开通湖仓一体LAS服务
- 子账户具备DataLeap相关权限(参考:https://www.volcengine.com/docs/6260/65408)
- 已购买并开通云数据库MYSQL版本服务
步骤1:创建MYSQL实例
创建mysql实例可参考:https://www.volcengine.com/docs/6313/75366
步骤2:配置MYSQL访问白名单
本实验案例中,白名单主要设置私有网络的IPv4 CIDR
具体的白名单设置方法,可参考:https://www.volcengine.com/docs/6357/96144
步骤3:创建库表
在mysql中,通过登录控制台,创建本案例中Flink CDC Sink相关的库表
创建MYSQL库
CREATE DATABASE demo;
创建MYSQL表
CREATE TABLE `student_sink` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` text, `subject` text,
`score` int(20) DEFAULT NULL,
PRIMARY KEY (`id`) )
ENGINE=InnoDB AUTO_INCREMENT=48796 DEFAULT CHARSET=utf8
步骤4:创建Flink-SQL任务
如果没有DataLeap项目,需要先新建项目。新建项目后,进入项目中的“数据开发”,进入开发页面。 在本实验案例中,LAS Flink VPC与MYSQL VPC属不同VPC,因此需要利用Connector跨VPC访问方式,实现Flink跨VPC访问的数据访问。具体可参考:https://www.volcengine.com/docs/6492/146363
-- 开启跨VPC访问
set las.cross.vpc.access.enabled=true;
-- 指定私有 VPC ID
set las.cross.vpc.vpc.id=替换自己的VPC ID;
-- 指定子网 ID
set las.cross.vpc.subnet.id=替换自己的子网ID;
-- 指定安全组 ID
set las.cross.vpc.security.group.id=替换自己的安全组;
CREATE TEMPORARY TABLE student_source (
id INT,
name STRING,
subject STRING,
score INT,
primary key (id) NOT enforced
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '5',
'fields.id.kind' = 'sequence',
'fields.id.start' = '1',
'fields.id.end' = '50'
);
CREATE TEMPORARY TABLE student_sink (
id INT,
name STRING,
subject STRING,
score INT,
primary key (id) NOT enforced
)
WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://实例:端口/demo?serverTimezone=UTC&useSSL=false',
'username' = '账户',
'password' = '密码',
'table-name' = 'student_sink'
);
INSERT INTO student_sink
SELECT *
FROM student_source
在本实验中,用到了Flink提供的随机数据生成Connector:datagen,该Connector 选项参数详情如下: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/datagen/
选项 | 要求 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
connector | 必填 | String | 指定要使用的连接器,本例子中指定datagen | |
rows-per-second | 选填 | 10000 | Long | 每秒发送数据的速度 |
number-of-rows | 选填 | Long | 要发送的行总数,默认情况下,是无界的 | |
fields.#.kind | 选填 | random | String | sequence或random |
fields.#.min | 选填 | 随机生成器的最小值,适用于数字类型 | ||
fields.#.max | 选填 | 随机生成器的最大值,适用于数字类型 | ||
fields.#.max-past | 选填 | 0 | Duration | Maximum past of timestamp random generator, only works for timestamp types. |
fields.#.length | 选填 | 100 | Integer | 用于生成char/varchar/string/array/map/multiset类型的集合的大小或长度 |
fields.#.start | 选填 | 序列生成器的起始值 | ||
fields.#.end | 选填 | 序列生成器的最终值 |
步骤5:作业调试并提交
填写运行参数
调试并提交
提交记录在LAS的“作业管理”标签中可以查询
作业提交后,需检查作业的运行状态是否正常,提交日志、执行日志是否有异常日志打印。如发现异常无法处理,请联系客服获取帮助。
登录MYSQ客户端,检查数据写入情况
关于LAS的作业编写与调试,可参考:https://www.volcengine.com/docs/6260/80007
如果您有其他问题,欢迎您联系火山引擎技术支持服务