Flink与PyFlink流处理(一)

2023-09-04 16:37:55

1概述

Apache Flink是一个分布式计算引擎,用于在无边界和有边界数据流上进行有状态的计算。它能够在所有常见的集群环境中运行,且能够以内存速度和任意规模进行计算。

1.1无界流与有界流
  • 无界流:有定义流的开始,但没有定义流的结束,数据会无休止地产生。比如采集MySQL Binlog数据和读取MQ中的数据。持续/实时处理
  • 有界流:既有定义流的开始,也有定义流的结束。比如读取RDB中表数据和读取HDFS上的文件。批/离线处理
1.2有状态计算
  • 状态state(一个变量)是流处理特有的。由于流处理一般要进行增量计算——数据逐条处理,每次计算都是建立在上一次计算结果之上的,而Flink本身就能够提供状态管理以支撑增量计算。

2架构

Flink Runtime采用了标准的主从架构

2.1JobManager
  • Dispatcher:向客户端提供了REST接口,用以提交作业。并将提交的作业传递给一个新的JobMaster。
  • JobMaster:管理一个作业的执行。在Flink中可以存在多个。
  • ResourceManager:负责Flink中资源的分配与回收(比如:task slot),在Flink中只能存在一个。
2.2TaskManager

一个Flink Job中包含多个task。任务管理器负责执行作业流中的task任务,且在Flink中必须至少有一个TaskManager(JVM进程)。

  • task slot:资源调度的基本单位。一个槽位可以执行一个task。
  • state backend:状态管理。
1
2
1. MemoryStateBackend(默认配置)——内存操作:状态`state`在TaskManager内存中维护,checkpoint快照存储在JobManager内存中。强调高性能,适用于本地开发和调试。
2. RocksDBStateBackend——外存操作:状态`state`在RocksDB数据库中维护,checkpoint快照存储到指定的文件系统目录中。强调高可靠,适用于状态数据较大的场景。

架构

3分层API

要使用Flink进行流处理,前提是能够开发出可以在Flink上运行的应用程序,这就需要API接口作为支持。

Flink根据抽象程度分层,提供了三种不同的API。每一层级API在简洁性和表达力上有着不同的侧重。

3.1ProcessFunction

用户可以在此抽象层中对状态和时间进行细粒度控制(比如:任意地修改状态数据、注册定时器来触发回调操作等),允许程序实现复杂的计算。

大多数应用程序不需要使用上述最底层的抽象API。

3.2DataStream API/DataSet API
  • DataStream API :应用于实时的流处理。
  • DataSet API :应用于离线的批处理。

所写程序未经过优化器优化。

3.3SQL/Table API

Flink支持两种关系型的API:Table API和SQL。它们集成在一个抽象层,构成批流统一的API。

  • 为了在流上使用SQL查询,数据流会记录到Table表(输入流<—>Source Table,Sink Table<—>输出流)
  • 在Table表上进行查询,将产生一个新的动态表(编程角度:SQL支持链式调用)

SQL/Table API统一了DataStream API和DataSet API,一般情况下,建议使用该层API开发应用程序。

分层API

4连接器(SQL/Table API Connector)

上述接口仅仅提供了应用程序与Flink的交互,而Flink作为一个流计算引擎,必然需要与第三方/外部系统交互数据,交互的工具就是Connector连接器。Flink目前支持的连接器有:Kafka、JDBC、Elasticsearch等。

Flink默认是没有打包Connector的,所以在程序开发之前,我们需要下载各个Connector所依赖的Jar包。

4.1 Kafka Connector使用示例

Flink提供了一套与表连接器(table connector)一起使用的表格式,定义了数据和表结构的映射关系,支持JSON、CSV等类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
-- 以下示例展示了如何创建连接Kafka数据的表,同时指定了数据映射格式
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTRAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json' # 'format' = 'csv'
......
)
4.2小结

下图描述了一个流处理场景:源源不断的数据流向Kafka。应用程序会从Kafka获取数据源,经过Flink转换处理,最终持久化到MySQL中。

分层API

PyFlink即Flink on Python,旨在将Flink生态系统的全部功能输出给Python用户

5.1环境安装
  • Python版本要求:3.6+
  • 安装命令:pip install apache-flink==1.10.0
5.2TableEnvironment
1
# TableEnvironment是SQL/Table API编程的核心,我们总是使用它创建source/sink table表、处理SQL查询、执行Job作业等
  • 流处理的编程主体结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 1. 导入相关包
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

# 2. 创建TableEnvironment实例对象
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)

# 3. 注册/创建source/sink表
table_env.sql_update("""CREATE TABLE source_table (...) WITH (...)""")
table_env.sql_update("""CREATE TABLE sink_table (...) WITH (...)""")

# 4. 执行SQL查询
table_env.from_path("source_table")\
.select("...")\
.insert_into("sink_table")

# 5. 执行Job作业
table_env.execute("job_name")
  • PyFlink1.10版使用文档

https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/python/index.html

6某CDN日志实时分析案例

6.1架构

CDN日志的解析一般有一个通用的架构模式,就是首先要将各个边缘节点的日志数据进行采集,一般会采集到消息队列,然后将消息队列和实时计算集群进行集成进行实时的日志分析,最后将分析的结果写到存储系统里面。将架构实例化,消息队列采用Kafka,实时计算采用Flink,最终将数据存储到MySQL中。如下图所示:

架构2

1
2
3
4
# CDN 假数据 "uuid,client_ip,request_time,response_size,uri"
8a613f0a-b8a3-40d0-8288-688b724d3546,14.104.149.183,460,637121,https://www.aliyun.com/solution/all?spm=5176.141278.h2v3icoap.4.427b70feEbSUkR&aly_as=Pmz98PMp
86a8193a-2b75-4a1a-8f16-69c211c58119,1.69.127.101,604,670506,https://www.aliyun.com/ss/?spm=5176.7920199.1kquk9v2l.3.610951f4HI0mNh
6745b486-8df1-4971-aac6-17302433900f,36.24.191.86,477,10857,https://www.aliyun.com/activity?spm=5176.12825654.h2v3icoap.1.54212c4a3UoADs#/promotionArea

本案例将从CDN访问日志中,获取如下统计指标:

  • 按IP统计资源访问量
  • 按IP统计资源下载总数
  • 按IP统计资源平均下载速度
6.2Kafka数据读取DDL定义
字段名 类型 描述
uuid VARCHAR 日志的标识符
client_ip VARCHAR 请求CDN资源的客户端IP
request_time BIGINT 处理请求花费的时间
response_size BIGINT 返回的资源体量
uri VARCHAR 请求的网址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
kafka_source_ddl = """
CREATE TABLE cdn_access_log (
uuid VARCHAR,
client_ip VARCHAR,
request_time BIGINT,
response_size BIGINT,
uri VARCHAR
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal', # 不必使用具体的版本号
'connector.topic' = 'access_log',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'format.type' = 'csv',
'format.ignore-parse-errors' = 'true' # 解析异常时,跳过当前字段数据(字段将置为null)
)
"""
6.3MySQL数据写入DDL定义
字段名 类型 描述
client_ip VARCHAR(255) PRIMARY KEY 请求CDN资源的客户端IP
access_count BIGINT 当前的访问量
total_download BIGINT 下载总量
download_speed DOUBLE 下载速度
1
2
3
4
5
6
7
8
9
10
11
12
13
14
mysql_sink_ddl = """
CREATE TABLE cdn_access_statistic (
client_ip VARCHAR,
access_count BIGINT,
total_download BIGINT,
download_speed DOUBLE
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://10.8.50.191:3306/Flink',
'connector.table' = 'access_statistic',
'connector.username' = 'zer0py2c',
'connector.password' = '123456'
)
"""
6.4核心统计逻辑

在核心统计逻辑定义好之后开始执行作业:

1
2
3
4
5
6
7
8
9
10
11
12
13
# t_env为TableEnvironment实例对象
t_env.from_path("cdn_access_log")\
.select("uuid, client_ip, response_size, request_time")\
.group_by("client_ip")\
.select( # 计算访问量
"client_ip, count(uuid) as access_count, "
# 计算下载总量
"sum(response_size) as total_download, "
# 计算下载速度
"sum(response_size) * 1.0 / sum(request_time) as download_speed") \
.insert_into("cdn_access_statistic")
# 执行作业
t_env.execute("pyflink_parse_cdn_log")

统计情况示例:

mysql

6.6环境配置
  • MySQL库表创建
1
2
3
4
5
6
7
8
CREATE DATABASE flink DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

CREATE TABLE cdn_access_statistic(
client_ip VARCHAR(255) PRIMARY KEY,
access_count BIGINT,
total_download BIGINT,
download_speed DOUBLE
) ENGINE=InnoDB CHARSET=utf8mb4;
  • Kafka主题创建
1
$ ./bin/kafka-topics.sh --create --replication-factor 1 --partitions 1 --topic cdn_access_log --zookeeper localhost:2181
  • Connector的Jar包下载
1
2
3
4
5
$ cd $PYFLINK_LIB
$ curl -O https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar
$ curl -O https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.10.0/flink-jdbc_2.11-1.10.0.jar
$ curl -O https://repo1.maven.org/maven2/org/apache/flink/flink-csv/1.10.0/flink-csv-1.10.0-sql-jar.jar
$ curl -O https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar
  • 命令行方式提交作业
1
$ $PYFLINK_LIB/../bin/flink run -m localhost:4000 -pyfs cdn_connector_ddl.py -py cdn_demo.py