flink设置watermark以及事件时间字段源码分析
背景
1.1、提取时间戳字段,用于事件事件语义处理数据
1.2、设置水位线(水印)watermark
TimestampAssigner 核心接口介绍
TimestampAssigner 时间分配器接口 实现类关系图:
提取时间戳字段方法:
TimestampAssigner 时间戳分配器, 提取数据流中的时间戳字段,
AssignerWithPeriodicWatermarks //周期性的生成水印
AssignerWithPunctuatedWatermarks //打断式的生成,也就是可以每一条数据都生成
BoundedOutOfOrdernessTimestampExtractor //乱序数据周期性生成
AscendingTimestampExtractor //升序数据周期性生成
IngestionTimeExtractor //进入flink系统时间分配器
TimestampAssigner 实现类
AssignerWithPeriodicWatermarks //周期性的生成水印
AssignerWithPunctuatedWatermarks //打断式的生成,也就是可以每一条数据都生成
BoundedOutOfOrdernessTimestampExtractor //乱序数据周期性生成
AscendingTimestampExtractor //升序数据周期性生成
IngestionTimeExtractor //进入flink系统时间分配器
设置时间戳、水印方法
DataStream类设置时间戳的方法:assignTimestampsAndWatermarks,指定watermark
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {
// match parallelism to input, otherwise dop=1 sources could lead to some strange
// behaviour: the watermark will creep along very slowly because the elements
// from the source go to each extraction operator round robin.
final int inputParallelism = getTransformation().getParallelism();
final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);
TimestampsAndPeriodicWatermarksOperator<T> operator =
new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);
return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
.setParallelism(inputParallelism);
}
1、AssignerWithPeriodicWatermarks接口:
public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> {
@Nullable
Watermark getCurrentWatermark();
}
官方实现类:
BoundedOutOfOrdernessTimestampExtractor //乱序数据周期性生成 AscendingTimestampExtractor //升序数据周期性生成 IngestionTimeExtractor //进入flink系统时间分配器
因此我们一般选择使用实现类即可
2、AscendingTimestampExtractor 周期性生成watermark,升序数据
//实现类提取时间戳字段方法,调用者实现
public abstract long extractAscendingTimestamp(T element);
//根据数据流时间戳,计算watermark的时间戳 --升序处理数据
@Override
public final long extractTimestamp(T element, long elementPrevTimestamp) {
//数据流中获取的时间戳
final long newTimestamp = extractAscendingTimestamp(element);
//如果当前数据的时俱戳大于当前已知的时间戳中的,则更新watermark中的时间戳
if (newTimestamp >= this.currentTimestamp) {
this.currentTimestamp = newTimestamp;
return newTimestamp;
} else {
//否则打印日志
violationHandler.handleViolation(newTimestamp, this.currentTimestamp);
return newTimestamp;
}
}
打印日志处理方法:
public static final class LoggingHandler implements MonotonyViolationHandler {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(AscendingTimestampExtractor.class);
@Override
public void handleViolation(long elementTimestamp, long lastTimestamp) {
LOG.warn("Timestamp monotony violated: {} < {}", elementTimestamp, lastTimestamp);
}
}
获取当前watermark的方法
@Override
public final Watermark getCurrentWatermark() {
//默认延迟1毫秒
//如果当前时间戳等于Long.MIN_VALUE 则返回Long.MIN_VALUE,否则返回最大时间戳-1
return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
}
AscendingTimestampExtractor 继承 AscendingTimestampExtractor
@PublicEvolving
@Deprecated
public abstract class AscendingTimestampExtractor<T>
extends org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor<T> {
}
3、BoundedOutOfOrdernessTimestampExtractor 周期性的乱序数据
1、在创建对象时,默认给了一个最大的时间戳, Long.MIN_VALUE + this.maxOutOfOrderness;
2、来一条数据,判断当前时间戳和最大时间戳的大小,如果当前时间戳大于最大时间戳,则更新
3、生成watermark,用最大时间戳减去最大延迟,也就是watermark中的时间戳调慢的时间,比如原本是3点结束的窗口,延迟为1分钟,那么watermark中的时间应该为2分59秒
4、默认是 Long.MIN_VALUE是防止出现最大的时间戳减去最大延迟为负数,watermark中的时间戳为负数,出现时间倒转
BoundedOutOfOrdernessTimestampExtractor 有参构造函数:
/** The current maximum timestamp seen so far. */
private long currentMaxTimestamp; //截至目前最大的时间戳
/** The timestamp of the last emitted watermark. */
private long lastEmittedWatermark = Long.MIN_VALUE; //上次watermark中时间戳
/**
* The (fixed) interval between the maximum seen timestamp seen in the records
* and that of the watermark to be emitted.
*/
private final long maxOutOfOrderness; //最大延迟时间
//构造函数 maxOutOfOrderness为乱序可容忍的最大程度,单位可以为milliseconds seconds minutes等等
public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
//如果延迟时间小于0,抛出异常
if (maxOutOfOrderness.toMilliseconds() < 0) {
throw new RuntimeException("Tried to set the maximum allowed " +
"lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
}
//最大延迟转换为毫秒数
this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
//计算最大的默认的时间戳 防止数据溢出,这里要要加上最大延迟
this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
}
实现类重写提取时间戳字段的方法: 调用者使用,提取出指定字段的数据并返回当前时间戳的大小
public abstract long extractTimestamp(T element);
extractTimestamp重载方法,用于更新最大的时间戳,每来一条数据进行判断
@Override
public final long extractTimestamp(T element, long previousElementTimestamp) {
//获取当前数据的时间戳大小
long timestamp = extractTimestamp(element);
//如果当前数据的时间戳大小大于目前最大的时间戳,则赋值
if (timestamp > currentMaxTimestamp) {
currentMaxTimestamp = timestamp;
}
//如果当前数据的时间戳小于目前最大的时间戳,则不变
return timestamp;
}
获取watermark中的时间戳:
@Override
public final Watermark getCurrentWatermark() {
// this guarantees that the watermark never goes backwards.
//当前时间的最大时间戳 - 最大延迟时间 =watermark中的时间戳
long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
// 如果当前的最大时间戳延迟后的时间戳大于上次的watermark中的时间戳,则更新watermark
if (potentialWM >= lastEmittedWatermark) {
lastEmittedWatermark = potentialWM;
}
return new Watermark(lastEmittedWatermark);
}
4、接口AssignerWithPunctuatedWatermarks
每一条数据都生成watermark的接口
public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> {
@Nullable
Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
}
没有实现类,需我们自己实现
public static class Test implements AssignerWithPunctuatedWatermarks {
/**
* 生成Watermark
*
* @param lastElement 上一条数据
* @param extractedTimestamp 水印的时间戳
* @return
*/
@Nullable
@Override
public Watermark checkAndGetNextWatermark(Object lastElement, long extractedTimestamp) {
return null;
}
//提取时间戳字段
@Override
public long extractTimestamp(Object element, long previousElementTimestamp) {
return 0;
}
}
Watermark类介绍
@PublicEvolving
public final class Watermark extends StreamElement {
/** The watermark that signifies end-of-event-time. */
public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);
// ------------------------------------------------------------------------
/** The timestamp of the watermark in milliseconds. */
private final long timestamp;
/**
* Creates a new watermark with the given timestamp in milliseconds.
*/
//构造函数,传入时间戳
public Watermark(long timestamp) {
this.timestamp = timestamp;
}
/**
* Returns the timestamp associated with this {@link Watermark} in milliseconds.
*/
//获取当前水印的时间戳大小
public long getTimestamp() {
return timestamp;
}
// ------------------------------------------------------------------------
@Override
public boolean equals(Object o) {
return this == o ||
o != null && o.getClass() == Watermark.class && ((Watermark) o).timestamp == this.timestamp;
}
@Override
public int hashCode() {
return (int) (timestamp ^ (timestamp >>> 32));
}
@Override
public String toString() {
return "Watermark @ " + timestamp;
}
}
总结:
1、Watermark可以理解为一个带着时间戳的空数据或者带着时间戳的标志数据,和其他数据一样,一条一条的处理
2、Watermark只能一直递增
3、Watermark计算方式为当前时间戳减去延迟时间 ,实现窗口延迟
4、window的执行由watermark触发,watermark机制结合window实现
5、升序数据-AscendingTimestampExtractor
乱序数据-BoundedOutOfOrdernessTimestampExtractor
6、BoundedOutOfOrdernessTimestampExtractor比AscendingTimestampExtractor区别就在于,使用了一个最大的时间戳的值,
来对每个数据进行判断,大于则更新,不大于则不更新。而AscendingTimestampExtractor后面的数据如果小于则会出现预警日志
以上仅为个人学习时的理解,如果不确定,麻烦大佬指正!