【Flink】 docker compose 体验不同版本

环境

想要尝试新版本的 flink, 但是又跟现有环境冲突, 比如 JDK 8 和 11, scala 2.11 和 2.22, Kafka 和 Zookeeper 版本以及数据的冲突和混用的危险, 这种情况就适合用 Docker 来解决了, 环境 版本 数据 等等问题都能得到较好的管理和隔离, 而且加上 compose / swarm / Kubernates 的支持, 管理效率极大提高.

官方示例 - flink-playgrounds/table-walkthrough

Flink 官方提供了一个 Docker Compose 的方式供我们试玩 Flink:
https://github.com/apache/flink-playgrounds

首先需要有 :

  • Java 8 or 11 (本地开发和构建 Flink 程序用)
  • Maven (本地开发和构建 Flink 程序用)
  • Docker (运行 JobManager TaskManager Kafka Zookeeper MySQL Grafana 等等容器)

Docker 和 Compose 准备

检查 docker 是否已经启动:
systemctl status docker.service

若无 docker compose 则需要安装 :

DOCKER_CONFIG=${DOCKER_CONFIG:-$HOME/.docker}
mkdir -p $DOCKER_CONFIG/cli-plugins
curl -SL https://github.com/docker/compose/releases/download/v2.16.0/docker-compose-linux-x86_64 -o $DOCKER_CONFIG/cli-plugins/docker-compose
chmod +x $DOCKER_CONFIG/cli-plugins/docker-compose
# for all users
# sudo chmod +x /usr/local/lib/docker/cli-plugins/docker-compose
docker compose version

构建 images

接着就可以 git clone 仓库:
git clone -b release-1.14 git@github.com:apache/flink-playgrounds.git
cd table-walkthrough

在 flink-playgrounds.git 中, 有一个文件 table-walkthrough/docker-compose.yml (文件部分内容如下), 定义了要用到的一些 services (JobManager TaskManager Kafka Zookeeper MySQL Grafana 等等), 它们的 images / 端口 / 环境变量 / 依赖关系等等. 详细说明文档可参考: Compose file version 2 reference

version: '2.1'
services:
jobmanager:
image: apache/flink-table-walkthrough:1-FLINK-1.14-scala_2.12
build: .
hostname: "jobmanager"
expose:
- "6123"
ports:
- "8082:8081"
command: standalone-job
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
depends_on:
- kafka
- mysql
...
kafka:
image: wurstmeister/kafka:2.13-2.8.1
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_ADVERTISED_HOST_NAME: "kafka"
KAFKA_ADVERTISED_PORT: "9092"
HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "kafka:1:1"
....

其中可以看到上面 jobmanager 的 build: . , 会使用 table-walkthrough/Dockerfile 进行 image build, 将会编译和构建 table-walkthrough 这个目录下的 Maven 项目, 并和 Flink 库一起构成 docker image.

Dockerfile 文件:

FROM maven:3.8-jdk-8-slim AS builder

COPY ./pom.xml /opt/pom.xml
COPY ./src /opt/src
RUN cd /opt; mvn clean install -Dmaven.test.skip

FROM apache/flink:1.14.4-scala_2.12-java8

# Download connector libraries
RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.14.4/flink-sql-connector-kafka_2.12-1.14.4.jar; \
wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.14.4/flink-connector-jdbc_2.12-1.14.4.jar; \
wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.14.4/flink-csv-1.14.4.jar; \
wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar;

COPY --from=builder /opt/target/spend-report-*.jar /opt/flink/usrlib/spend-report.jar

RUN echo "execution.checkpointing.interval: 10s" >> /opt/flink/conf/flink-conf.yaml; \
echo "pipeline.object-reuse: true" >> /opt/flink/conf/flink-conf.yaml; \
echo "pipeline.time-characteristic: EventTime" >> /opt/flink/conf/flink-conf.yaml; \
echo "taskmanager.memory.jvm-metaspace.size: 256m" >> /opt/flink/conf/flink-conf.yaml;

不过在执行构建之前, 我们需要实现 report() 方法 (table-walkthrough/src/main/java/org/apache/flink/playgrounds/spendreport/SpendReport.java):

public static Table report(Table transactions) {
throw new UnimplementedException();
}

在其中加入业务逻辑, 这里进行简单的 groupBy account_idlog_ts :

public static Table report(Table transactions) {
return transactions.select(
$("account_id"),
$("transaction_time").floor(TimeIntervalUnit.HOUR).as("log_ts"),
$("amount"))
.groupBy($("account_id"), $("log_ts"))
.select(
$("account_id"),
$("log_ts"),
$("amount").sum().as("amount"));
}

然后就可以开始执行构建了:

➜  docker compose build

..
=> [internal] load build context 0.0s
=> => transferring context: 1.18kB 0.0s
=> [stage-1 1/4] FROM docker.io/apache/flink:1.14.4-scala_2.12-java8@sha256:07cd20e018e2180c90ec5cf8342f208d408fe28d871a0727c7e52099bbf80f 0.0s
=> [builder 2/4] COPY ./pom.xml /opt/pom.xml 0.0s
=> [builder 3/4] COPY ./src /opt/src 0.1s
=> [builder 4/4] RUN cd /opt; mvn clean install -Dmaven.test.skip 225.9s
=> CACHED [stage-1 2/4] RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1 0.0s
=> [stage-1 3/4] COPY --from=builder /opt/target/spend-report-*.jar /opt/flink/usrlib/spend-report.jar 0.0s
=> [stage-1 4/4] RUN echo "execution.checkpointing.interval: 10s" >> /opt/flink/conf/flink-conf.yaml; echo "pipeline.object-reuse: tru 0.5s
=> exporting to image 0.4s
=> => exporting layers 0.4s
=> => writing image sha256:5fa7a43cfb5aefabb6d86928bf265fd3f21382457658cc182c84a307cc0671cd 0.0s
=> => naming to docker.io/apache/flink-table-walkthrough:1-FLINK-1.14-scala_2.12 0.0s

完成之后, 可以看到新的 images:

➜  docker compose images
CONTAINER REPOSITORY TAG IMAGE ID SIZE
table-walkthrough-data-generator-1 apache/data-generator 1 4f1a70cd1617 283MB
table-walkthrough-grafana-1 grafana/grafana 7.5.8 6f31769d2f32 202MB
table-walkthrough-jobmanager-1 apache/flink-table-walkthrough 1-FLINK-1.14-scala_2.12 e7e4a6cfc1bd 664MB
table-walkthrough-kafka-1 wurstmeister/kafka 2.13-2.8.1 a692873757c0 468MB
table-walkthrough-mysql-1 mysql 8.0.19 0c27e8e5fcfa 546MB
table-walkthrough-taskmanager-1 apache/flink-table-walkthrough 1-FLINK-1.14-scala_2.12 e7e4a6cfc1bd 664MB
table-walkthrough-zookeeper-1 wurstmeister/zookeeper 3.4.6 6fe5551964f5 451MB

启动 services

然后启动所有 services :
docker-compose up -d

可以看到启动的 services :

➜  docker compose ps    
NAME IMAGE COMMAND SERVICE CREATED STATUS PORTS
table-walkthrough-data-generator-1 apache/data-generator:1 "/docker-entrypoint.…" data-generator 33 minutes ago Up 33 minutes
table-walkthrough-grafana-1 grafana/grafana:7.5.8 "/run.sh" grafana 33 minutes ago Up 33 minutes 0.0.0.0:3000->3000/tcp, :::3000->3000/tcp
table-walkthrough-jobmanager-1 apache/flink-table-walkthrough:1-FLINK-1.14-scala_2.12 "/docker-entrypoint.…" jobmanager 8 minutes ago Up 8 minutes 6123/tcp, 0.0.0.0:8082->8081/tcp, :::8082->8081/tcp
table-walkthrough-kafka-1 wurstmeister/kafka:2.13-2.8.1 "start-kafka.sh" kafka 33 minutes ago Up 33 minutes 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp
table-walkthrough-mysql-1 mysql:8.0.19 "docker-entrypoint.s…" mysql 33 minutes ago Up 33 minutes 3306/tcp, 33060/tcp
table-walkthrough-taskmanager-1 apache/flink-table-walkthrough:1-FLINK-1.14-scala_2.12 "/docker-entrypoint.…" taskmanager 8 minutes ago Up 8 minutes 6121-6123/tcp, 8081/tcp
table-walkthrough-zookeeper-1 wurstmeister/zookeeper:3.4.6 "/bin/sh -c '/usr/sb…" zookeeper 33 minutes ago Up 33 minutes 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp

打开 Flink WebUI 可以查看运行中的 Jobs: http://localhost:8082/

打开 Grafana 查看实时统计结果

MySQL 查询写入的数据 :

mysql> select * from spend_report limit 10;
+------------+-------------------------+--------+
| account_id | log_ts | amount |
+------------+-------------------------+--------+
| 1 | 2001-04-13 01:00:00.000 | 1056 |
| 1 | 2001-04-13 02:00:00.000 | 1606 |
| 1 | 2001-04-13 03:00:00.000 | 1475 |
| 1 | 2001-04-13 04:00:00.000 | 1338 |
| 1 | 2001-04-13 05:00:00.000 | 741 |
| 1 | 2001-04-13 06:00:00.000 | 1407 |
| 1 | 2001-04-13 07:00:00.000 | 613 |
| 1 | 2001-04-13 08:00:00.000 | 1286 |
| 1 | 2001-04-13 09:00:00.000 | 627 |
| 1 | 2001-04-13 10:00:00.000 | 732 |
+------------+-------------------------+--------+

想要停止时 : docker compose stop

示例中的 Dockerfile 构建起来还是比较慢的, 若有代码修改, 重新构建会重复浪费一些时间, 可以修改构建步骤, 比如把 mvn 部分独立出来.

参考


https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/try-flink/table_api/