/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.sorted.state;

import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.PriorityComparator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalPriorityQueueSet;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.BiConsumerWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BatchExecutionInternalTimeService<K, N>
implements InternalTimerService<N> {
    private static final Logger LOG = LoggerFactory.getLogger(BatchExecutionInternalTimeService.class);
    private final ProcessingTimeService processingTimeService;
    private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue;
    private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue;
    private long currentWatermark = Long.MIN_VALUE;
    private final Triggerable<K, N> triggerTarget;
    private K currentKey;

    BatchExecutionInternalTimeService(ProcessingTimeService processingTimeService, Triggerable<K, N> triggerTarget) {
        this.processingTimeService = Preconditions.checkNotNull(processingTimeService);
        this.triggerTarget = Preconditions.checkNotNull(triggerTarget);
        this.processingTimeTimersQueue = new BatchExecutionInternalPriorityQueueSet<TimerHeapInternalTimer<K, N>>(PriorityComparator.forPriorityComparableObjects(), 128);
        this.eventTimeTimersQueue = new BatchExecutionInternalPriorityQueueSet<TimerHeapInternalTimer<K, N>>(PriorityComparator.forPriorityComparableObjects(), 128);
    }

    @Override
    public long currentProcessingTime() {
        return this.processingTimeService.getCurrentProcessingTime();
    }

    @Override
    public long currentWatermark() {
        return this.currentWatermark;
    }

    @Override
    public void registerProcessingTimeTimer(N namespace, long time) {
        if (this.currentWatermark == Long.MAX_VALUE) {
            LOG.warn("Timer service is quiesced. Processing time timer for timestamp '{}' will be ignored.", (Object)time);
            return;
        }
        this.processingTimeTimersQueue.add(new TimerHeapInternalTimer<K, N>(time, this.currentKey, namespace));
    }

    @Override
    public void registerEventTimeTimer(N namespace, long time) {
        if (this.currentWatermark == Long.MAX_VALUE) {
            LOG.warn("Timer service is quiesced. Event time timer for timestamp '{}' will be ignored.", (Object)time);
            return;
        }
        this.eventTimeTimersQueue.add(new TimerHeapInternalTimer<K, N>(time, this.currentKey, namespace));
    }

    @Override
    public void deleteProcessingTimeTimer(N namespace, long time) {
        this.processingTimeTimersQueue.remove(new TimerHeapInternalTimer<K, N>(time, this.currentKey, namespace));
    }

    @Override
    public void deleteEventTimeTimer(N namespace, long time) {
        this.eventTimeTimersQueue.remove(new TimerHeapInternalTimer<K, N>(time, this.currentKey, namespace));
    }

    @Override
    public void forEachEventTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) {
        throw new UnsupportedOperationException("The BatchExecutionInternalTimeService should not be used in State Processor API.");
    }

    @Override
    public void forEachProcessingTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) {
        throw new UnsupportedOperationException("The BatchExecutionInternalTimeService should not be used in State Processor API.");
    }

    public void setCurrentKey(K currentKey) throws Exception {
        InternalTimer timer;
        if (currentKey != null && currentKey.equals(this.currentKey)) {
            return;
        }
        this.currentWatermark = Long.MAX_VALUE;
        while ((timer = (InternalTimer)this.eventTimeTimersQueue.poll()) != null) {
            this.triggerTarget.onEventTime(timer);
        }
        while ((timer = (InternalTimer)this.processingTimeTimersQueue.poll()) != null) {
            this.triggerTarget.onProcessingTime(timer);
        }
        this.currentWatermark = Long.MIN_VALUE;
        this.currentKey = currentKey;
    }
}

