优秀的编程知识分享平台

网站首页 > 技术文章 正文

算子?在flink中这个概念到底是什么?

nanyue 2024-09-01 20:39:06 技术文章 8 ℃

在前几篇文章的介绍中,我们知道当通过DataStreamAPI开发flink作业时,几乎每一步的API调用都会创建一个Transformation,与此同时,也会创建一个叫做Operator的东西,那么这个东西是什么呢?为什么需要它?它在作业执行时起到什么作用呢?希望这篇文章能帮你找到想要的答案。

本文主要介绍的是DataStream的算子体系,关于Blink SQL的算子体系将放在flink table API文章中讲解。

Operator的职责主要是生命周期管理、状态与容错管理、数据处理3个方面,在作业运行时,不同的Task会创建不同的Operator,Task负责数据的读取与反序列化操作,然后将反序列化后的数据交给相应的Operator去处理,Operator调用UDF处理完数据后就会将数据发往下游的Task或者是OperatorChain中的下一个Operator进行处理。

一、生命周期管理

1、setup:初始化函数执行环境(StreamingRuntimeContext)、时间服务、注册指标监控服务、数据输出计数器(CountingOutput)等;

2、open:该行为由各个具体的算子(如StreamFlatMap)负责实现,负责算子执行前的一系列初始化操作,算子执行该方法之后,才会执行Function进行进行数据的处理。

3、close:所有的数据处理完毕之后关闭算子,需要确保将所有的缓存数据向下游发送。

4、生命周期相关接口和类如下

SetupableStreamOperator.java

public interface SetupableStreamOperator<OUT> {

    /** Initializes the operator. Sets access to the context and the output. */
    void setup(
            StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output);

}

StreamOperator.java

public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
  void open() throws Exception;
  void close() throws Exception;
}

AbstractStreamOperator.java

public abstract class AbstractStreamOperator<OUT>
        implements StreamOperator<OUT>,
                SetupableStreamOperator<OUT>,
                CheckpointedStreamOperator,
                Serializable {
                  
    @Override
    public void setup(
            StreamTask<?, ?> containingTask,
            StreamConfig config,
            Output<StreamRecord<OUT>> output) {
        final Environment environment = containingTask.getEnvironment();
        this.container = containingTask;
        this.config = config;
        try {
            InternalOperatorMetricGroup operatorMetricGroup =
                    environment
                            .getMetricGroup()
                            .getOrAddOperator(config.getOperatorID(), config.getOperatorName());
            this.output =
                    new CountingOutput<>(
                            output,
                            operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
            if (config.isChainEnd()) {
                operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
            }
            this.metrics = operatorMetricGroup;
        } catch (Exception e) {
            LOG.warn("An error occurred while instantiating task metrics.", e);
            this.metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
            this.output = output;
        }

        this.combinedWatermark = IndexedCombinedWatermarkStatus.forInputsCount(2);
        try {
            Configuration taskManagerConfig = environment.getTaskManagerInfo().getConfiguration();
            int historySize = taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
            if (historySize <= 0) {
                LOG.warn(
                        "{} has been set to a value equal or below 0: {}. Using default.",
                        MetricOptions.LATENCY_HISTORY_SIZE,
                        historySize);
                historySize = MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
            }

            final String configuredGranularity =
                    taskManagerConfig.getString(MetricOptions.LATENCY_SOURCE_GRANULARITY);
            LatencyStats.Granularity granularity;
            try {
                granularity =
                        LatencyStats.Granularity.valueOf(
                                configuredGranularity.toUpperCase(Locale.ROOT));
            } catch (IllegalArgumentException iae) {
                granularity = LatencyStats.Granularity.OPERATOR;
                LOG.warn(
                        "Configured value {} option for {} is invalid. Defaulting to {}.",
                        configuredGranularity,
                        MetricOptions.LATENCY_SOURCE_GRANULARITY.key(),
                        granularity);
            }
            MetricGroup jobMetricGroup = this.metrics.getJobMetricGroup();
            this.latencyStats =
                    new LatencyStats(
                            jobMetricGroup.addGroup("latency"),
                            historySize,
                            container.getIndexInSubtaskGroup(),
                            getOperatorID(),
                            granularity);
        } catch (Exception e) {
            LOG.warn("An error occurred while instantiating latency metrics.", e);
            this.latencyStats =
                    new LatencyStats(
                            UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup()
                                    .addGroup("latency"),
                            1,
                            0,
                            new OperatorID(),
                            LatencyStats.Granularity.SINGLE);
        }

        this.runtimeContext =
                new StreamingRuntimeContext(
                        environment,
                        environment.getAccumulatorRegistry().getUserMap(),
                        getMetricGroup(),
                        getOperatorID(),
                        getProcessingTimeService(),
                        null,
                        environment.getExternalResourceInfoProvider());

        stateKeySelector1 = config.getStatePartitioner(0, getUserCodeClassloader());
        stateKeySelector2 = config.getStatePartitioner(1, getUserCodeClassloader());
    }
}



二、状态与容错管理

对于我们在开发UDF时所需要的状态,Operator负责对状态进行初始化操作并且在触发检查点的时候对状态进行快照处理(存储到本地文件或者远程存储中)。

状态相关接口:CheckpointedStreamOperator

public interface CheckpointedStreamOperator {
    void initializeState(StateInitializationContext context) throws Exception;

    void snapshotState(StateSnapshotContext context) throws Exception;
}

这两个方法分别是状态初始化和状态快照的方法,由具体的Operator实现,下面的initializeState是AbstractUdfStreamOperator中的一个实现,可以看到,Operator在执行状态初始化时会调用UDF(实现了RichFunction接口的函数)的initializeState方法(这个方法的实现是我们在开发UDF时编写的),从而实现状态初始化,在做数据处理的时候,我们就可以使用相关的状态。

AbstractUdfStreamOperator.java

@Override
public void initializeState(StateInitializationContext context) throws Exception {
    super.initializeState(context);
    StreamingFunctionUtils.restoreFunctionState(context, userFunction);
}

StreamingFunctionUtils.java

public static void restoreFunctionState(
        StateInitializationContext context, Function userFunction) throws Exception {

    Preconditions.checkNotNull(context);

    while (true) {

        if (tryRestoreFunction(context, userFunction)) {
            break;
        }

        // inspect if the user function is wrapped, then unwrap and try again if we can restore
        // the inner function
        if (userFunction instanceof WrappingFunction) {
            userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
        } else {
            break;
        }
    }
}

    private static boolean tryRestoreFunction(
            StateInitializationContext context, Function userFunction) throws Exception {

        if (userFunction instanceof CheckpointedFunction) {
            ((CheckpointedFunction) userFunction).initializeState(context);

            return true;
        }

    }

三、数据处理

Operator处理数据的种类分别是:数据记录、水印和延迟标记,Operator有单输入和多输入类型,分别有不同的接口进行定义,下面以常用的StreamMap为例来分析Operator处理数据的过程:

StreamMap是单输入Operator,定义单输入Operator行为的接口如下,可以看到几种不同数据的处理行为方法;

public interface Input<IN> {
    void processElement(StreamRecord<IN> element) throws Exception;
    void processWatermark(Watermark mark) throws Exception;
    void processLatencyMarker(LatencyMarker latencyMarker) throws Exception;

}

在StreamMap中实现了数据记录处理的方法processElement,这里我们看到调用了我们编写的UDF中的map方法实现,调用完成后将UDF处理之后的数据发往了下游;

public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {

    private static final long serialVersionUID = 1L;

    public StreamMap(MapFunction<IN, OUT> mapper) {
        super(mapper);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        output.collect(element.replace(userFunction.map(element.getValue())));
    }
}

StreamMap实现了OneInputStreamOperator接口,这个接口的定义如下,这个接口就是继承了上面列出的Input接口,但是到这里我们并没看到Operator处理其他类型数据(水印和延迟数据)的方法实现,那么它在哪实现的呢?我们发现StreamMap类继承了AbstractUdfStreamOperator,而AbstractUdfStreamOperator继承了AbstractStreamOperator类,显然,其他类型数据的方法实现在AbstractStreamOperator类中,代码如下:

public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT>, Input<IN> {
    @Override
    default void setKeyContextElement(StreamRecord<IN> record) throws Exception {
        setKeyContextElement1(record);
    }
}

AbstractStreamOperator.java

public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
    reportOrForwardLatencyMarker(latencyMarker);
}
public void processWatermark(Watermark mark) throws Exception {
        if (timeServiceManager != null) {
            timeServiceManager.advanceWatermark(mark);
        }
        output.emitWatermark(mark);
}


四、DataStream中的Operator体系

DataStream中所有的Operator都是接口StreamOperator的实现类,往下又分为了两个算子体系:AbstractUDFStreamOperator和其它一些Operator。AbstractUDFStreamOperator常见的实现有 StreamSource 、 StreamSink 、 StreamFilter、StreamMap、StreamFlatMap等。

结语:关于DataStream的Operator实现,本篇只是抛砖引玉,它的实现细节会涉及到作业执行时,后续介绍完任务执行后,会继续介绍任务执行时Operator是如何创建和运行的。

最后,如果大家对该系列文章有什么要求和意见,或者是相对Flink源代码的哪部分感兴趣,请写在评论区,我会和大家一起交流分享。

Tags:

最近发表
标签列表