Flink_CDC构建流式ETL

作者: 罗宇侠(宇侠)

本篇教程将展示如何基于 Flink CDC 快速构建 MySQL 和 Postgres 的流式 ETL。

Flink-CDC 项目地址:

https://github.com/ververica/flink-cdc-connectors

 

本教程的演示基于 Docker 环境,都将在 Flink SQL CLI 中进行,只涉及 SQL,无需一行 Java/Scala 代码,也无需安装 IDE。

假设我们正在经营电子商务业务,商品和订单的数据存储在 MySQL 中,订单对应的物流信息存储在 Postgres 中。

对于订单表,为了方便进行分析,我们希望让它关联上其对应的商品和物流信息,构成一张宽表,并且实时把它写到 ElasticSearch 中。

接下来的内容将介绍如何使用 Flink Mysql/Postgres CDC 来实现这个需求,系统的整体架构如下图所示:

null

一、准备阶段

准备一台已经安装了 Docker 的 Linux 或者 MacOS 电脑。

1.1 准备教程所需要的组件

接下来的教程将以 docker-compose 的方式准备所需要的组件。

使用下面的内容创建一个 docker-compose.yml 文件:

该 Docker Compose 中包含的容器有:

docker-compose.yml 所在目录下执行下面的命令来启动本教程需要的组件:

该命令将以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。你可以通过 docker ps 来观察上述的容器是否正常启动了,也可以通过访问 http://localhost:5601/ 来查看 Kibana 是否运行正常。

注:本教程接下来用到的容器相关的命令也都需要在 docker-compose.yml 所在目录下执行。

  1. 下载 Flink 1.13.2 [1] 并将其解压至目录 flink-1.13.2

  2. 下载下面列出的依赖包,并将它们放到目录 flink-1.13.2/lib/

[1] https://downloads.apache.org/flink/flink-1.13.2/flink-1.13.2-bin-scala_2.11.tgz

该链接无法下载,可以在这里下载: https://archive.apache.org/dist/flink/flink-1.13.2/

1.3 准备数据

1.3.1 在 MySQL 数据库中准备数据

  1. 进入 MySQL 容器:

  2. 创建数据库和表 productsorders,并插入数据:

1.3.2 在 Postgres 数据库中准备数据

  1. 进入 Postgres 容器:

  2. 创建表 shipments,并插入数据:

  1. 使用下面的命令跳转至 Flink 目录下:

  2. 使用下面的命令启动 Flink 集群:

    启动成功的话,可以在 http://localhost:8081/ 访问到 Flink Web UI,如下所示:

    null

  3. 使用下面的命令启动 Flink SQL CLI

    启动成功后,可以看到如下的页面:

null

首先,开启 checkpoint,每隔 3 秒做一次 checkpoint。

然后, 对于数据库中的表 products, orders, shipments,使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据。

最后,创建 enriched_orders 表, 用来将关联后的订单数据写入 Elasticsearch 中。

四、关联订单数据并且将其写入 Elasticsearch 中

使用 Flink SQL 将订单表 order 与 商品表 products,物流信息表 shipments 关联,并将关联后的订单信息写入 Elasticsearch 中。

启动成功后,可以访问 http://localhost:8081/#/job/running 在 Flink Web UI 上看到正在运行的 Flink Streaming Job,如下图所示:

null

现在,就可以在 Kibana 中看到包含商品和物流信息的订单数据。

首先访问 http://localhost:5601/app/kibana#/management/kibana/index_pattern 创建 index pattern enriched_orders

null

然后就可以在 http://localhost:5601/app/kibana#/discover 看到写入的数据了。

null

接下来,修改 MySQL 和 Postgres 数据库中表的数据,Kibana 中显示的订单数据也将实时更新。

  1. 在 MySQL 的 orders 表中插入一条数据:

  2. 在 Postgres 的 shipment 表中插入一条数据:

  3. 在 MySQL 的 orders 表中更新订单的状态:

  4. 在 Postgres 的 shipment 表中更新物流的状态:

  5. 在 MYSQL 的 orders 表中删除一条数据:

    每执行一步就刷新一次 Kibana,可以看到 Kibana 中显示的订单数据将实时更新,如下所示:

 

五、环境清理

本教程结束后,在 docker-compose.yml 文件所在的目录下执行如下命令停止所有容器:

在 Flink 所在目录 flink-1.13.2 下执行如下命令停止 Flink 集群:

六、总结

在本文中,我们以一个简单的业务场景展示了如何使用 Flink CDC 快速构建 Streaming ETL。希望通过本文,能够帮助读者快速上手 Flink CDC ,也希望 Flink CDC 能满足你的业务需求。