operator chain条件

一共5块包含7条原则,全部满足则可以chain到一起,具体如下。

1.单映射又是满映射

  1. 上下游的并行度一致
  2. 下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)

2.SlotSharingGroup

  1. 上下游节点都在同一个SlotSharingGroup中

3.ChainingStrategy

  1. 下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)
  2. 上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)

4.ForwardPartitioner

  1. 两个节点间数据分区方式是 forward

5.其他

  1. 用户没有禁用 chain

Chain规则源码

public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
    StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
    StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

    StreamOperatorFactory<?> headOperator = upStreamVertex.getOperatorFactory();
    StreamOperatorFactory<?> outOperator = downStreamVertex.getOperatorFactory();

    return downStreamVertex.getInEdges().size() == 1
        && outOperator != null
        && headOperator != null
        && upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
        && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
        && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
            headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
        && (edge.getPartitioner() instanceof ForwardPartitioner)
        && edge.getShuffleMode() != ShuffleMode.BATCH
        && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
        && streamGraph.isChainingEnabled();
}

Chain策略(ChainingStrategy)

一共就三种ALWAYS、NEVER、HEAD

  • ALWAYS:可以与上下游链接,map、flatmap、filter等默认是ALWAYS。
  • HEAD:只能与下游链接,不能与上游链接,Source默认是HEAD。
  • NEVER:上下游都不链接,算子自己独享一个Task。
public enum ChainingStrategy {

	/**
	 * Operators will be eagerly chained whenever possible.
	 *
	 * <p>To optimize performance, it is generally a good practice to allow maximal
	 * chaining and increase operator parallelism.
	 */
	ALWAYS,

	/**
	 * The operator will not be chained to the preceding or succeeding operators.
	 */
	NEVER,

	/**
	 * The operator will not be chained to the predecessor, but successors may chain to this
	 * operator.
	 */
	HEAD
}

设置Chain

通过调用StreamExecutionEnvironment.disableOperatorChaining()来全局禁用chaining。

map().startNewChain().map
// 独占Task
map().disableChaining()
// 
map().slotSharingGroup("group-name")

方法设置Chain的基本原理是通过改变Chain策略,具体对应如下。

startNewChain() //对应的策略是后面算子 ChainingStrategy.HEAD
disableChaining() //对应的策略是 ChainingStrategy.NEVER

注意

Chained 的 Operators 之间数据默认序列化后拷贝传递。通过下面代码可以开启对象重用,这样就关闭了这层 copy 操作,可以减少对象序列化开销和 GC 压力等。

env.getConfig().enableObjectReuse();

具体源码可阅读 org.apache.flink.streaming.runtime.tasks.OperatorChain 与 org.apache.flink.streaming.runtime.tasks.OperatorChain.CopyingChainingOutput。官方建议开发人员在完全了解 reuse 内部机制后才使用该功能,冒然使用可能会给程序带来 bug。