概述

Key Groups机制决定Flink中带有key的数据由哪个subtask处理。类似Spark中的Hash Shuffle。Key-Group数量就是最大并行度的值。即算子并行度不能超过Key-Group个数。初次设置最大并行度后,如果调整必须丢弃Checkpoint/Savepoint。

最大并行度取值

设置代码如下

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ExecutionConfig executionConfig = env.getConfig();
executionConfig.setMaxParallelism(4);

如果没有手动设置则按照以下规则:默认值第一次启动时设置:

  • 当任务并行度小于 128 时,最大并行度默认是 128。
  • 任务并行度大于等于 128 时,最大并行度取值为:parallelism + (parallelism / 2) 不会大于 2^15 = 32768

key分配算法

决定key属于哪个Key-Group。哈希取模法。key计算hashCode()在和maxParallelism取模。举例:key的hashCode为11,最大并行度是10,那么Key Group内会包含KG-0到KG-9。 11 % 10 = 1。那么这个key会分配到KG-1中。

Key-Group分配算法

决定Key-Group属于哪个并行Operator(subtask里的)。原则是尽可能均匀将Key-Group分给Operator。Flink采用最简单粗暴的方式。除以算子并行度,整除部分直接可以均匀分配,余数部分逐一分给前N个算子。举例:假如有8个key-group,算子并行度为3,8 / 3 = 2 余 2。前2个Operator分配3个Key-Group,剩下的2个分配Key-Group。Key-Group和Operator是对齐的,即编号小的KG在编号小的Operator里。

最大并行度存在原因

算子在未来可以提升并发能力的上限。如果硬要调高最大并行度参数,作业已经保存过的状态或者 checkpoint/savepoint 只能丢弃。调高最大并行度产生过多key-groups,使状态增大,state snapshots也随之增大,降低性能。