Flink SQL 在奇虎 360 的实践

1.Flink SQL在奇虎 360 的落地

与阿里、字节不同,我们刚开始将 Flink 投入生成时,主要推的还是 Java/Scala 作业,而非 SQL

当时的考虑是 Flink SQL 的发展还比较年轻,相比 Spark SQL、Presto 等引擎,其功能还比较弱。

Flink 1.7 版本

在 1.7 的时代,我们的 SQL 作业主要以 ETL 为主。这类作业没有复杂的计算逻辑,例如:将数据从 Kafka 读取、清洗后写入 HDFS 中。

用户在前端页面编写 SQL 语句,并使用 GRPC 协议将作业提交信息发送到我们内部实现的 Gateway 中。 Gateway 使用 Table API 去构建 Flink 作业,并提交到 Yarn 集群。

2020 年 Flink 1.11 发版后,我们的平台也从 Flink 1.7 升级到 Flink 1.11

直至 2021 年 4 月,我们停止了在 Flink 1.7 版本上的新功能开发。自此,平台不再支持用户提交新的 Flink 1.7 作业。

虽然 Flink 1.7 不再更新,但是线上仍有不少 Flink 1.7 的作业,这些作业的迁移存在一定的困难。因为可能由于 API 的变更,导致用户需要修改代码,这对用户是很不友好的。

这个痛点也是我们鼓励用户使用 SQL 的原因,因为 SQL 作业不需要编译、打包,用起来太方便了。

Flink 1.11 升级

而在 Flink 1.11.1 版本发布之后,我们转而鼓励用户使用 SQL。之所以这样做,主要有以下几个原因(包括上面说的升级):

  • SQL 属于高层 API ,SQL 使得作业的开发过程变得简单,降低学习成本;
  • Java/Scala 作业的迁移不方便,特别是版本跨度比较大的时候;
  • 使用 SQL 开发作业,我们内部对 Flink 的功能改动,用户能第一时间使用上。

我们在 2020 年 10 月正式上线了 Flink 1.11.1 ,当时也弃用了原来自定义的 Gateway 而改用 SQL Client

就目前而言,业内 Flink SQL Gateway 的版本各异、质量参差不齐,这也是我们选择 SQL Client 而非 Gateway 的原因。

如果社区在不久的将来推出 Gateway ,我们能直接使用官方版本,而无沉重的历史包袱。

2.Flink SQL 在奇虎 360 的业务场景

目前我们平台的 Flink SQL 业务大部分来自游戏和信息流

像花椒直播、金融等部门还是主要使用 Java/Scala 进行 Flink 作业的开发。

WeGame ETL 作业

读取 Kakfa 数据抽取、转换、清洗再写到 Hive 表,这大概是业内 Flink SQL 用的最多的场景了。

在 1.11.1 版本的使用上,最让人头疼的大概是就是小文件过多的问题了。但社区的也非常的给力,在 Flink 1.12 版本,社区对小文件合并的方案进行了实现。

实时指标与大盘

我们还可以使用 Flink SQL 统计服务的实时运行指标,并将统计信息输送给大盘展示。

例如,我们系统部的使用 SQL 对每分钟云盘的请求进行分组统计,最后产出实时的请求曲线。当然,这里只是举了一个简单的例子,线上的实时报表业务可能非常复杂(因为维度会更多)。实时的报表数据能够及时把握市场动向,及时调整运营策略。

3.Flink SQL 改进

SQL 作业执行多 INSERT

基于 Flink 1.11 的 Blink Planner,开发支持 SQL 作业多 INSERT

Blink Planner 相比 Old Planner 有分段优化和子计划复用的特性,使得上游数据不用重复计算,极大地提高了计算资源的利用。

Segment Optimize 分段优化

假如我们的 SQL 语句如下:

sql CREATE VIEW my_view AS SELECT word, COUNT(1) AS freq FROM T1 GROUP BY word; INSERT INTO T2 SELECT * FROM MyView WHERE freq >10; INSERT INTO T3 SELECT count(word) AS freq2, freq FROM my_view GROUP BY freq;

如果使用 Old Planner 执行计划会被翻译成左边的,相比之下, Blink Planner 能够进行分段优化。

Sub-Plan Reuse 子计划复用

假如我们的 SQL 语句如下:

sql INSERT INTO T2 SELECT freq FROM (SELECT word, COUNT(1) AS freq FROM T1 GROUP BY word) t WHERE word LIKE 'T%' UNION ALL SELECT COUNT(word) AS freq2 FROM (SELECT word, COUNT(1) AS freq FROM T1 GROUP BY word) t GROUP BY freq;

其生成的执行计划如下,经过 Blink Planner 优化后,可以看到上游的子计划被复用。也就是说,这部分数据不用重复计算,可以节省资源。

分段优化的特性在新版 Table API 中是可用的,但是在 SQL Client 中是不可用的。

因为 SQL Client 遇到 DML 语句,会立即提交作业(Flink Version < 1.12)。

我们结合奇麟大数据平台,支持了将若干个 DML 语句合并提交。

sql INSERT INTO T2 SELECT mu_func(a) FROM T1 WHERE mu_func(a) > 1000; INSERT INTO T3 SELECT mu_func(a) FROM T1 WHERE mu_func(a) <= 1000; EXECUTE;

可以看到,我们在末尾使用 EXECUTE 关键字去触发真正的提交。

后来,这个功能在社区高版本中也有相应的实现,其用法如下:

sql BEGIN STATEMENT SET; INSERT INTO T2 SELECT mu_func(a) FROM T1 WHERE mu_func(a) > 1000; INSERT INTO T3 SELECT mu_func(a) FROM T1 WHERE mu_func(a) <= 1000; END;

Text 格式数据解析

对于纯文本格式的数据,Flink SQL 在 1.11.1 时还没有很好的方案能读取数据。

因为现有的 Format 支持的都是 JSON、AVRO 等有格式化的数据,像 Nginx 日志这些的处理就受限了。

对此,我们在 Flink SQL 1.11.1 支持了 Text Format 。

sql CREATE TABLE nginx_log ( line VARCHAR ) WITH ( ... 'format' = 'text', 'text.charset' = 'UTF-8' )

后来,社区的高版本也对纯文本格式进行了支持:

sql CREATE TABLE nginx_log ( line VARCHAR ) WITH ( ... 'format' = 'raw', 'raw.charset' = 'UTF-8', 'raw.endianness' = 'big-endian' )

对于这类纯文本这时候,我们可以自定义 UDF 去处理每一行数据,如:

“`java import org.apache.flink.table.functions.ScalarFunction;

public class StringUDF extends ScalarFunction { public String eval(String s) { return s.toLowerCase(); } } “`

讲到 SQL UDF ,我们还对 SQL UDF 结果复用进行了优化。

SQL UDF 结果复用

我们指定, SQL 作业会通过 Code Gen + Hard Code Operator 生成一系列的 Transformation,然后翻译成 Flink 的算子执行。

这时你会发现,对每一条数据,用户自定义函数每执行了两次。实际上,我们这里可以对用户自定义函数的计算结果进行复用,使得这个函数只被调用一次。

如果用户自定义函数会执行复杂的逻辑,这个作业的性能将会有极大提升。

所以我们在 Code Gen 阶段对 UDF 的调用进行了优化,使得相同的计算过程只执行一次。

4.Flink SQL 的多语言 UDF

多语言 UDF 的应用场景

Flink SQL 是上层的 API ,极大地降低了 Flink 的开发门槛。

但,目前 Flink SQL 的 UDF 语言只支持 Java/Scala 和 Python,这就会对会增加编程语言的学习成本。

“`java import org.apache.flink.table.functions.ScalarFunction;

public class StringUDF extends ScalarFunction { public String eval(String s) { return s.toLowerCase(); } } “`

例如,算法开发的同事可能对 Python 熟悉,运维的同事可能更了解 Shell,又或者更熟悉 Go 、C++ 的等等。

也正因如此,我们对 Flink SQL 中的 UDF 进行扩展,使其支持其他语言的能力。

下面是 Flink 对 Python UDF 支持的 FLIP

FLIP-106: Support Python UDF in SQL Function DDL

FLIP-114: Support Python UDF in SQL Client

多语言 UDF 的设计

类似 Py4J ,我们使用进程通信来完成用户 UDF 与 Flink 的数据交互。

在使用上,我们新增 COMMAND 关键字,支持指定 UDF 执行命令。

sql -- Table T1 CREATE TABLE T1(...); -- Create UDF CREATE FUNCTION my_func AS 'net.qihoo.flink.StreamingFunction' COMMAND 'python3 ./demo.py'; -- Create UDTF that convert column to row CREATE FUNCTION my_func AS 'net.qihoo.flink.StreamingExplodeFunction' COMMAND 'python3 ./demo.py'; -- Create UDTF that convert row to column CREATE FUNCTION my_func AS 'net.qihoo.flink.StreamingPivotFunction' COMMAND 'python3 ./demo.py'; -- Apply function SELECT my_func(a) FROM T1;

当然,我们会在用户定义开发 SQL 作业时,要求用户上传 UDF 文件到 HDFS 中。

但随着平台在线开发能力(Online Develop)的完善,后期所有的代码开发都会转移为在线模式。

5.Flink SQL 的作业调优

SQL 作业调优

SQL 作为更高层的 API ,用户能够非常容易地上手使用 Flink SQL 开发一个作业。但也正是如此,使其不像 Stream API 一样拥有设置并发资源等等的能力。

这就可能造成资源分配不均,导致资源浪费。

对于小规模的作业来说,可能资源用多一些也无关紧要。但是一旦作业规模上去了,某些计算节点就可能成为这个作业的执行瓶颈。

常见的有,Window 状态太大、Sink 并发设置不合理,这都会对整个作业造成影响。

由此可见,SQL 作业的调优是有必要的。

业内常见的调优

定义表的并发

json { "nodes": [{ "slot_group": "default", "chain_strategy": "NEVER", "parallelism": 1, "name": "Sink: Sink(table=[default_catalog.default_database.Resources1], fields=[username, ip_addr, collect_time, pid, memory])", "index": 3, "id": "ea632d67b7d595e5b851708ae9ad79d6" }, ... }

使用 SQL Hint

sql insert into T2 /*+ OPTIONS('parallelism'='2') */ select * from T1;

使用 Java API

“`java final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

SourceFunction source = …; MapFunction<String, String> mapFunction = …; SinkFunction sink = …; // Update parallelism to 2 env.addSource(source).uid(“source”).setParallelism(2) .map(mapFunction).uid(“map”).setParallelism(1) .addSink(sink).uid(“sink”).setParallelism(1); env.execute(); “`

带来的问题

并发的调整,可能会影响最终 DAG 的生成,最终导致状态恢复受影响。

FFA 2020 字节跳动公司也谈到这个问题,李本超老师说我们可以将 Source 和 Sink 与其上下游不 Chain 在一起。

这种做法在一定程度上解决了作业恢复的难题,但是也有其中的局限性,例如,只能修改 Source 和 Sink 的并发而不能修改中间的算子、不能充分利用 Operator Chain 的优化。

调优的本质

调优的本质有 2 点:

  • 提高作业的性能,使作业在相同的时间处理更多数据
  • 提高资源使用率,在完成相同任务同时使用更少资源

我们的实践

StreamGraph 可视化

我们的功能暂时基于司内的奇麟大数据平台开发,对语法提示、语法检测、StreamGraph 可视化、作业状态恢复等功能进行了实现。

基于 JSON 的配置文件

json { "nodes": [{ "slot_group": "default", "chain_strategy": "NEVER", "parallelism": 1, "name": "Sink: Sink(table=[default_catalog.default_database.Resources1], fields=[username, ip_addr, collect_time, pid, memory])", "index": 3, "id": "ea632d67b7d595e5b851708ae9ad79d6" }, { "slot_group": "default", "chain_strategy": "NEVER", "parallelism": 1, "name": "Calc(select=[username, ip_addr, collect_time, pid, memory], where=[(memory <= 100)])", "index": 2, "id": "0a448493b4782967b150582570326227" }, { "slot_group": "default", "chain_strategy": "NEVER", "parallelism": 1, "name": "Source: TableSourceScan(table=[[default_catalog, default_database, KafkaSource]], fields=[username, ip_addr, collect_time, process_name, hostname, pid, memory])", "index": 1, "id": "bc764cd8ddf7a0cff126f51c16239658" }], "edges": [{ "name": "FORWARD", "source": 1, "target": 2 }, { "name": "FORWARD", "source": 2, "target": 3 }], "version": "1.11" }

在提交时,我们可以指定自定义文件。

兼容恢复算子状态

关于状态恢复

下图是作业恢复的简要图。

显然,我们这里说的作业恢复是外部的作业恢复,而非 Flink 的内部重试。

因为 Flink 内部重试情况下,作业的逻辑和资源使用都是不变的,而下面是将 Source 的并发度从 1 改成 2

算子状态恢复过程,在 JobManager 端进行。

下面分析两种典型的场景

例如,我们在开启 Local-Global 后,会新增算子。作业就可能因为新增了算子后,无法再恢复原来的状态了,这对流作业来说是极为不友好的。

新增算子还好处理,因为新的算子本身就没有数据,我们不做状态恢复即可,但减少算子就可能造成数据丢失了。

当然理想的情况下,我们是希望数据能够保持强一致性的,但是这很难达到。

我们能做的就是兼容地恢复算子的状态,下面对几种场景进行说明:

  • 增删 Source/Sink

新增的 Source 我们不恢复,我们希望其余算子的匹配程度达到最大。

  • 中间新增/减少了算子

对于这种情况,我们希望达到前一个作业的算子与新作业算子的匹配程度达到最大。

我们的做法也比较朴素,就是希望前后两个拓扑图节点能完成匹配。

目前,我们采用了贪心的思想,做的实现。

因为拓扑结构的变动,往往不会非常巨大,所以我们还是能 Cover 住大多数场景的。

调优功能展望

近期展望

我们的调优,主要是对算子的并发数量和 Slot 分组情况进行调整。,目前还没有实现对资源(CPU、内存)更细粒度的设置。

在未来,我们希望在调优上有以下几点实现:

  • 实现 Operator 级别 CPU、内存 的设置
  • 更优的资源分配算法,实现计算密集型算子和内存密集型算子的混布

Auto Scale

我们也非常期待作业能根据作业运行的统计信息,实现内存使用、并发数的自动调优。

6.Flink SQL 在奇虎 360 的未来发展

平台元数据管理

平台构建更加完善的元数据管理体系,帮助用户快速开发 Flink SQL 作业。

支持更多数据源

支持 Doris Connector、

在 Flink 1.11.1 支持 Doris Connector

支持 Hudi、Iceberg 数据湖

在 Flink 1.11.1 支持,接入 Flink SQL

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇