/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join;

import java.io.Serializable;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Preconditions;

public class KeyedCoProcessOperatorWithWatermarkDelay<K, IN1, IN2, OUT>
extends KeyedCoProcessOperator<K, IN1, IN2, OUT> {
    private static final long serialVersionUID = -7435774708099223442L;
    private final Consumer<Watermark> emitter;

    public KeyedCoProcessOperatorWithWatermarkDelay(KeyedCoProcessFunction<K, IN1, IN2, OUT> flatMapper, long watermarkDelay) {
        super(flatMapper);
        Preconditions.checkArgument(watermarkDelay >= 0L, "The watermark delay should be non-negative.");
        this.emitter = watermarkDelay == 0L ? (Consumer<Watermark> & Serializable)mark -> this.output.emitWatermark((Watermark)mark) : (Consumer<Watermark> & Serializable)mark -> this.output.emitWatermark(new Watermark(mark.getTimestamp() - watermarkDelay));
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        Optional<InternalTimeServiceManager<?>> timeServiceManager = this.getTimeServiceManager();
        if (timeServiceManager.isPresent()) {
            timeServiceManager.get().advanceWatermark(mark);
        }
        this.emitter.accept(mark);
    }
}

