优秀的编程知识分享平台

网站首页 > 技术文章 正文

flink设置watermark以及事件时间字段源码分析

nanyue 2024-09-01 20:39:36 技术文章 11 ℃

flink设置watermark以及事件时间字段源码分析

背景

1.1、提取时间戳字段,用于事件事件语义处理数据

1.2、设置水位线(水印)watermark


TimestampAssigner 核心接口介绍

TimestampAssigner 时间分配器接口 实现类关系图:



提取时间戳字段方法

TimestampAssigner 时间戳分配器, 提取数据流中的时间戳字段,

AssignerWithPeriodicWatermarks  //周期性的生成水印
AssignerWithPunctuatedWatermarks //打断式的生成,也就是可以每一条数据都生成
BoundedOutOfOrdernessTimestampExtractor //乱序数据周期性生成
AscendingTimestampExtractor //升序数据周期性生成
IngestionTimeExtractor  //进入flink系统时间分配器


TimestampAssigner 实现类

AssignerWithPeriodicWatermarks  //周期性的生成水印
AssignerWithPunctuatedWatermarks //打断式的生成,也就是可以每一条数据都生成
BoundedOutOfOrdernessTimestampExtractor //乱序数据周期性生成
AscendingTimestampExtractor //升序数据周期性生成
IngestionTimeExtractor  //进入flink系统时间分配器


设置时间戳、水印方法

DataStream类设置时间戳的方法:assignTimestampsAndWatermarks,指定watermark

    public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
            AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {

        // match parallelism to input, otherwise dop=1 sources could lead to some strange
        // behaviour: the watermark will creep along very slowly because the elements
        // from the source go to each extraction operator round robin.
        final int inputParallelism = getTransformation().getParallelism();
        final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);

        TimestampsAndPeriodicWatermarksOperator<T> operator =
                new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);

        return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
                .setParallelism(inputParallelism);
    }



1、AssignerWithPeriodicWatermarks接口:

public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> {
    @Nullable
    Watermark getCurrentWatermark();
}
    

官方实现类:

BoundedOutOfOrdernessTimestampExtractor //乱序数据周期性生成 AscendingTimestampExtractor //升序数据周期性生成 IngestionTimeExtractor //进入flink系统时间分配器

因此我们一般选择使用实现类即可


2、AscendingTimestampExtractor 周期性生成watermark,升序数据

//实现类提取时间戳字段方法,调用者实现

  public abstract long extractAscendingTimestamp(T element);

//根据数据流时间戳,计算watermark的时间戳 --升序处理数据


@Override
    public final long extractTimestamp(T element, long elementPrevTimestamp) {
    //数据流中获取的时间戳
        final long newTimestamp = extractAscendingTimestamp(element);
        //如果当前数据的时俱戳大于当前已知的时间戳中的,则更新watermark中的时间戳
        if (newTimestamp >= this.currentTimestamp) {
            this.currentTimestamp = newTimestamp;
            return newTimestamp;
        } else {
        //否则打印日志
            violationHandler.handleViolation(newTimestamp, this.currentTimestamp);
            return newTimestamp;
        }
    }

打印日志处理方法:

public static final class LoggingHandler implements MonotonyViolationHandler {
        private static final long serialVersionUID = 1L;

        private static final Logger LOG = LoggerFactory.getLogger(AscendingTimestampExtractor.class);

        @Override
        public void handleViolation(long elementTimestamp, long lastTimestamp) {
            LOG.warn("Timestamp monotony violated: {} < {}", elementTimestamp, lastTimestamp);
        }
    }


获取当前watermark的方法

    @Override
    public final Watermark getCurrentWatermark() {
        //默认延迟1毫秒
        //如果当前时间戳等于Long.MIN_VALUE 则返回Long.MIN_VALUE,否则返回最大时间戳-1
        return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
    }

AscendingTimestampExtractor 继承 AscendingTimestampExtractor

@PublicEvolving
@Deprecated
public abstract class AscendingTimestampExtractor<T>
    extends org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor<T> {

}


3、BoundedOutOfOrdernessTimestampExtractor 周期性的乱序数据

1、在创建对象时,默认给了一个最大的时间戳, Long.MIN_VALUE + this.maxOutOfOrderness;

2、来一条数据,判断当前时间戳和最大时间戳的大小,如果当前时间戳大于最大时间戳,则更新

3、生成watermark,用最大时间戳减去最大延迟,也就是watermark中的时间戳调慢的时间,比如原本是3点结束的窗口,延迟为1分钟,那么watermark中的时间应该为2分59秒

4、默认是 Long.MIN_VALUE是防止出现最大的时间戳减去最大延迟为负数,watermark中的时间戳为负数,出现时间倒转


BoundedOutOfOrdernessTimestampExtractor 有参构造函数:



    /** The current maximum timestamp seen so far. */
    private long currentMaxTimestamp; //截至目前最大的时间戳

    /** The timestamp of the last emitted watermark. */
    private long lastEmittedWatermark = Long.MIN_VALUE; //上次watermark中时间戳
    /**
     * The (fixed) interval between the maximum seen timestamp seen in the records
     * and that of the watermark to be emitted.
     */
    private final long maxOutOfOrderness; //最大延迟时间



    //构造函数  maxOutOfOrderness为乱序可容忍的最大程度,单位可以为milliseconds  seconds minutes等等
    public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
        //如果延迟时间小于0,抛出异常
        if (maxOutOfOrderness.toMilliseconds() < 0) {
            throw new RuntimeException("Tried to set the maximum allowed " +
                "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
        }
        //最大延迟转换为毫秒数
        this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
        //计算最大的默认的时间戳 防止数据溢出,这里要要加上最大延迟
        this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
    }


实现类重写提取时间戳字段的方法: 调用者使用,提取出指定字段的数据并返回当前时间戳的大小

public abstract long extractTimestamp(T element);


extractTimestamp重载方法,用于更新最大的时间戳,每来一条数据进行判断

@Override
    public final long extractTimestamp(T element, long previousElementTimestamp) {
        //获取当前数据的时间戳大小
        long timestamp = extractTimestamp(element);
        //如果当前数据的时间戳大小大于目前最大的时间戳,则赋值
        if (timestamp > currentMaxTimestamp) {
            currentMaxTimestamp = timestamp;
        }
        //如果当前数据的时间戳小于目前最大的时间戳,则不变
        return timestamp;
    }


获取watermark中的时间戳:

@Override
public final Watermark getCurrentWatermark() {
    // this guarantees that the watermark never goes backwards.
    //当前时间的最大时间戳 - 最大延迟时间 =watermark中的时间戳
    long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
    // 如果当前的最大时间戳延迟后的时间戳大于上次的watermark中的时间戳,则更新watermark
    if (potentialWM >= lastEmittedWatermark) {
        lastEmittedWatermark = potentialWM;
    }
    return new Watermark(lastEmittedWatermark);
}


4、接口AssignerWithPunctuatedWatermarks

每一条数据都生成watermark的接口

    public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> {

    @Nullable
    Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
}

没有实现类,需我们自己实现

 public static class Test implements AssignerWithPunctuatedWatermarks {

        /**
         * 生成Watermark
         *
         * @param lastElement        上一条数据
         * @param extractedTimestamp 水印的时间戳
         * @return
         */
        @Nullable
        @Override
        public Watermark checkAndGetNextWatermark(Object lastElement, long extractedTimestamp) {

            return null;
        }

        //提取时间戳字段
        @Override
        public long extractTimestamp(Object element, long previousElementTimestamp) {
            return 0;
        }
    }




Watermark类介绍

@PublicEvolving
public final class Watermark extends StreamElement {

    /** The watermark that signifies end-of-event-time. */
    public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);

    // ------------------------------------------------------------------------

    /** The timestamp of the watermark in milliseconds. */
    private final long timestamp;

    /**
     * Creates a new watermark with the given timestamp in milliseconds.
     */
    //构造函数,传入时间戳
    public Watermark(long timestamp) {
        this.timestamp = timestamp;
    }

    /**
     * Returns the timestamp associated with this {@link Watermark} in milliseconds.
     */
    //获取当前水印的时间戳大小
    public long getTimestamp() {
        return timestamp;
    }

    // ------------------------------------------------------------------------

    @Override
    public boolean equals(Object o) {
        return this == o ||
                o != null && o.getClass() == Watermark.class && ((Watermark) o).timestamp == this.timestamp;
    }

    @Override
    public int hashCode() {
        return (int) (timestamp ^ (timestamp >>> 32));
    }

    @Override
    public String toString() {
        return "Watermark @ " + timestamp;
    }
}


总结

1、Watermark可以理解为一个带着时间戳的空数据或者带着时间戳的标志数据,和其他数据一样,一条一条的处理

2、Watermark只能一直递增

3、Watermark计算方式为当前时间戳减去延迟时间 ,实现窗口延迟

4、window的执行由watermark触发,watermark机制结合window实现

5、升序数据-AscendingTimestampExtractor

乱序数据-BoundedOutOfOrdernessTimestampExtractor

6、BoundedOutOfOrdernessTimestampExtractor比AscendingTimestampExtractor区别就在于,使用了一个最大的时间戳的值,

来对每个数据进行判断,大于则更新,不大于则不更新。而AscendingTimestampExtractor后面的数据如果小于则会出现预警日志


以上仅为个人学习时的理解,如果不确定,麻烦大佬指正!

Tags:

最近发表
标签列表