/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.sort;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.operators.sort.BaseTemporalSortOperator;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RowTimeSortOperator
extends BaseTemporalSortOperator {
    private static final long serialVersionUID = 2085278292749212811L;
    private static final Logger LOG = LoggerFactory.getLogger(RowTimeSortOperator.class);
    private final InternalTypeInfo<RowData> inputRowType;
    private final int rowTimeIdx;
    private GeneratedRecordComparator gComparator;
    private transient RecordComparator comparator;
    private transient MapState<Long, List<RowData>> dataState;
    private transient ValueState<Long> lastTriggeringTsState;

    public RowTimeSortOperator(InternalTypeInfo<RowData> inputRowType, int rowTimeIdx, GeneratedRecordComparator gComparator) {
        this.inputRowType = inputRowType;
        Preconditions.checkArgument(rowTimeIdx >= 0 && rowTimeIdx < inputRowType.toRowSize(), "RowTimeIdx must be 0 or positive number and smaller than input row arity!");
        this.rowTimeIdx = rowTimeIdx;
        this.gComparator = gComparator;
    }

    @Override
    public void open() throws Exception {
        super.open();
        LOG.info("Opening RowTimeSortOperator");
        if (this.gComparator != null) {
            this.comparator = (RecordComparator)this.gComparator.newInstance(this.getContainingTask().getUserCodeClassLoader());
            this.gComparator = null;
        }
        BasicTypeInfo<Long> keyTypeInfo = BasicTypeInfo.LONG_TYPE_INFO;
        ListTypeInfo<RowData> valueTypeInfo = new ListTypeInfo<RowData>(this.inputRowType);
        MapStateDescriptor<Long, RowData> mapStateDescriptor = new MapStateDescriptor<Long, RowData>("dataState", (TypeInformation<Long>)keyTypeInfo, (TypeInformation<RowData>)valueTypeInfo);
        this.dataState = this.getRuntimeContext().getMapState(mapStateDescriptor);
        ValueStateDescriptor<Long> lastTriggeringTsDescriptor = new ValueStateDescriptor<Long>("lastTriggeringTsState", Long.class);
        this.lastTriggeringTsState = this.getRuntimeContext().getState(lastTriggeringTsDescriptor);
    }

    @Override
    public void processElement(StreamRecord<RowData> element) throws Exception {
        RowData input = element.getValue();
        long rowTime = input.getLong(this.rowTimeIdx);
        Long lastTriggeringTs = this.lastTriggeringTsState.value();
        if (lastTriggeringTs == null || rowTime > lastTriggeringTs) {
            List<RowData> rows = this.dataState.get(rowTime);
            if (null != rows) {
                rows.add(input);
                this.dataState.put(rowTime, rows);
            } else {
                ArrayList<RowData> newRows = new ArrayList<RowData>();
                newRows.add(input);
                this.dataState.put(rowTime, newRows);
                this.timerService.registerEventTimeTimer(rowTime);
            }
        }
    }

    @Override
    public void onEventTime(InternalTimer<RowData, VoidNamespace> timer) throws Exception {
        long timestamp = timer.getTimestamp();
        List<RowData> inputs = this.dataState.get(timestamp);
        if (inputs != null) {
            if (this.comparator != null) {
                inputs.sort(this.comparator);
            }
            inputs.forEach(row -> this.collector.collect(row));
            this.dataState.remove(timestamp);
            this.lastTriggeringTsState.update(timestamp);
        }
    }

    @Override
    public void onProcessingTime(InternalTimer<RowData, VoidNamespace> timer) throws Exception {
        throw new UnsupportedOperationException("Now Sort only is supported based event time here!");
    }
}

