/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing;

import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.AppendingState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.windowing.AggregateApplyWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ReduceApplyProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

public class WindowOperatorBuilder<T, K, W extends Window> {
    private static final String WINDOW_STATE_NAME = "window-contents";
    private final ExecutionConfig config;
    private final WindowAssigner<? super T, W> windowAssigner;
    private final TypeInformation<T> inputType;
    private final KeySelector<T, K> keySelector;
    private final TypeInformation<K> keyType;
    private Trigger<? super T, ? super W> trigger;
    @Nullable
    private Evictor<? super T, ? super W> evictor;
    private long allowedLateness = 0L;
    @Nullable
    private OutputTag<T> lateDataOutputTag;

    public WindowOperatorBuilder(WindowAssigner<? super T, W> windowAssigner, Trigger<? super T, ? super W> trigger, ExecutionConfig config, TypeInformation<T> inputType, KeySelector<T, K> keySelector, TypeInformation<K> keyType) {
        this.windowAssigner = windowAssigner;
        this.config = config;
        this.inputType = inputType;
        this.keySelector = keySelector;
        this.keyType = keyType;
        this.trigger = trigger;
    }

    public void trigger(Trigger<? super T, ? super W> trigger) {
        Preconditions.checkNotNull(trigger, "Window triggers cannot be null");
        if (this.windowAssigner instanceof MergingWindowAssigner && !trigger.canMerge()) {
            throw new UnsupportedOperationException("A merging window assigner cannot be used with a trigger that does not support merging.");
        }
        this.trigger = trigger;
    }

    public void allowedLateness(Time lateness) {
        Preconditions.checkNotNull(lateness, "Allowed lateness cannot be null");
        long millis = lateness.toMilliseconds();
        Preconditions.checkArgument(millis >= 0L, "The allowed lateness cannot be negative.");
        this.allowedLateness = millis;
    }

    public void sideOutputLateData(OutputTag<T> outputTag) {
        Preconditions.checkNotNull(outputTag, "Side output tag must not be null.");
        this.lateDataOutputTag = outputTag;
    }

    public void evictor(Evictor<? super T, ? super W> evictor) {
        Preconditions.checkNotNull(evictor, "Evictor cannot be null");
        this.evictor = evictor;
    }

    public <R> WindowOperator<K, T, ?, R, W> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) {
        Preconditions.checkNotNull(reduceFunction, "ReduceFunction cannot be null");
        Preconditions.checkNotNull(function, "WindowFunction cannot be null");
        if (reduceFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction.");
        }
        if (this.evictor != null) {
            return this.buildEvictingWindowOperator(new InternalIterableWindowFunction<K, W, T, R>(new ReduceApplyWindowFunction<K, W, T, R>(reduceFunction, function)));
        }
        ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<T>(WINDOW_STATE_NAME, reduceFunction, this.inputType.createSerializer(this.config));
        return this.buildWindowOperator(stateDesc, new InternalSingleValueWindowFunction<T, R, K, W>(function));
    }

    public <R> WindowOperator<K, T, ?, R, W> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function) {
        Preconditions.checkNotNull(reduceFunction, "ReduceFunction cannot be null");
        Preconditions.checkNotNull(function, "ProcessWindowFunction cannot be null");
        if (reduceFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction.");
        }
        if (this.evictor != null) {
            return this.buildEvictingWindowOperator(new InternalIterableProcessWindowFunction<K, W, T, R>(new ReduceApplyProcessWindowFunction<K, W, T, R>(reduceFunction, function)));
        }
        ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<T>(WINDOW_STATE_NAME, reduceFunction, this.inputType.createSerializer(this.config));
        return this.buildWindowOperator(stateDesc, new InternalSingleValueProcessWindowFunction<T, R, K, W>(function));
    }

    public <ACC, V, R> WindowOperator<K, T, ?, R, W> aggregate(AggregateFunction<T, ACC, V> aggregateFunction, WindowFunction<V, R, K, W> windowFunction, TypeInformation<ACC> accumulatorType) {
        Preconditions.checkNotNull(aggregateFunction, "AggregateFunction cannot be null");
        Preconditions.checkNotNull(windowFunction, "WindowFunction cannot be null");
        if (aggregateFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("This aggregate function cannot be a RichFunction.");
        }
        if (this.evictor != null) {
            return this.buildEvictingWindowOperator(new InternalIterableWindowFunction(new AggregateApplyWindowFunction<K, W, T, ACC, V, R>(aggregateFunction, windowFunction)));
        }
        AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor<T, ACC, V>(WINDOW_STATE_NAME, aggregateFunction, accumulatorType.createSerializer(this.config));
        return this.buildWindowOperator(stateDesc, new InternalSingleValueWindowFunction<V, R, K, W>(windowFunction));
    }

    public <ACC, V, R> WindowOperator<K, T, ?, R, W> aggregate(AggregateFunction<T, ACC, V> aggregateFunction, ProcessWindowFunction<V, R, K, W> windowFunction, TypeInformation<ACC> accumulatorType) {
        Preconditions.checkNotNull(aggregateFunction, "AggregateFunction cannot be null");
        Preconditions.checkNotNull(windowFunction, "ProcessWindowFunction cannot be null");
        if (aggregateFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("This aggregate function cannot be a RichFunction.");
        }
        if (this.evictor != null) {
            return this.buildEvictingWindowOperator(new InternalAggregateProcessWindowFunction<T, ACC, V, R, K, W>(aggregateFunction, windowFunction));
        }
        AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor<T, ACC, V>(WINDOW_STATE_NAME, aggregateFunction, accumulatorType.createSerializer(this.config));
        return this.buildWindowOperator(stateDesc, new InternalSingleValueProcessWindowFunction<V, R, K, W>(windowFunction));
    }

    public <R> WindowOperator<K, T, ?, R, W> apply(WindowFunction<T, R, K, W> function) {
        Preconditions.checkNotNull(function, "WindowFunction cannot be null");
        return this.apply(new InternalIterableWindowFunction<T, R, K, W>(function));
    }

    public <R> WindowOperator<K, T, ?, R, W> process(ProcessWindowFunction<T, R, K, W> function) {
        Preconditions.checkNotNull(function, "ProcessWindowFunction cannot be null");
        return this.apply(new InternalIterableProcessWindowFunction<T, R, K, W>(function));
    }

    private <R> WindowOperator<K, T, ?, R, W> apply(InternalWindowFunction<Iterable<T>, R, K, W> function) {
        if (this.evictor != null) {
            return this.buildEvictingWindowOperator(function);
        }
        ListStateDescriptor<T> stateDesc = new ListStateDescriptor<T>(WINDOW_STATE_NAME, this.inputType.createSerializer(this.config));
        return this.buildWindowOperator(stateDesc, function);
    }

    private <ACC, R> WindowOperator<K, T, ACC, R, W> buildWindowOperator(StateDescriptor<? extends AppendingState<T, ACC>, ?> stateDesc, InternalWindowFunction<ACC, R, K, W> function) {
        return new WindowOperator<K, T, ACC, R, W>(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.config), this.keySelector, this.keyType.createSerializer(this.config), stateDesc, function, this.trigger, this.allowedLateness, this.lateDataOutputTag);
    }

    private <R> WindowOperator<K, T, Iterable<T>, R, W> buildEvictingWindowOperator(InternalWindowFunction<Iterable<T>, R, K, W> function) {
        StreamElementSerializer<T> streamRecordSerializer = new StreamElementSerializer<T>(this.inputType.createSerializer(this.config));
        ListStateDescriptor<StreamElement> stateDesc = new ListStateDescriptor<StreamElement>(WINDOW_STATE_NAME, streamRecordSerializer);
        return new EvictingWindowOperator<K, T, R, W>(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.config), this.keySelector, this.keyType.createSerializer(this.config), stateDesc, function, this.trigger, this.evictor, this.allowedLateness, this.lateDataOutputTag);
    }

    private static String generateFunctionName(Function function) {
        Class<?> functionClass = function.getClass();
        if (functionClass.isAnonymousClass()) {
            Class<?>[] interfaces = functionClass.getInterfaces();
            if (interfaces.length == 0) {
                Class<?> functionSuperClass = functionClass.getSuperclass();
                return functionSuperClass.getSimpleName() + functionClass.getName().substring(functionClass.getEnclosingClass().getName().length());
            }
            Class<?> functionInterface = functionClass.getInterfaces()[0];
            return functionInterface.getSimpleName() + functionClass.getName().substring(functionClass.getEnclosingClass().getName().length());
        }
        return functionClass.getSimpleName();
    }

    public String generateOperatorName() {
        return this.windowAssigner.getClass().getSimpleName();
    }

    public String generateOperatorDescription(Function function1, @Nullable Function function2) {
        return "Window(" + this.windowAssigner + ", " + this.trigger.getClass().getSimpleName() + ", " + (this.evictor == null ? "" : this.evictor.getClass().getSimpleName() + ", ") + WindowOperatorBuilder.generateFunctionName(function1) + (function2 == null ? "" : ", " + WindowOperatorBuilder.generateFunctionName(function2)) + ")";
    }

    @VisibleForTesting
    public long getAllowedLateness() {
        return this.allowedLateness;
    }
}

