/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.over;

import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.dataview.PerKeyStateDataViewStore;
import org.apache.flink.table.runtime.generated.AggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.util.StreamRecordCollector;

public class NonBufferOverWindowOperator
extends TableStreamOperator<RowData>
implements OneInputStreamOperator<RowData, RowData> {
    private GeneratedAggsHandleFunction[] aggsHandlers;
    private GeneratedRecordComparator genComparator;
    private final boolean[] resetAccumulators;
    private RecordComparator partitionComparator;
    private RowData lastInput;
    private AggsHandleFunction[] processors;
    private JoinedRowData[] joinedRows;
    private StreamRecordCollector<RowData> collector;
    private AbstractRowDataSerializer<RowData> serializer;

    public NonBufferOverWindowOperator(GeneratedAggsHandleFunction[] aggsHandlers, GeneratedRecordComparator genComparator, boolean[] resetAccumulators) {
        this.aggsHandlers = aggsHandlers;
        this.genComparator = genComparator;
        this.resetAccumulators = resetAccumulators;
    }

    @Override
    public void open() throws Exception {
        super.open();
        ClassLoader cl = this.getUserCodeClassloader();
        this.serializer = (AbstractRowDataSerializer)this.getOperatorConfig().getTypeSerializerIn1(cl);
        this.partitionComparator = (RecordComparator)this.genComparator.newInstance(cl);
        this.genComparator = null;
        this.collector = new StreamRecordCollector(this.output);
        this.processors = new AggsHandleFunction[this.aggsHandlers.length];
        this.joinedRows = new JoinedRowData[this.aggsHandlers.length];
        for (int i = 0; i < this.aggsHandlers.length; ++i) {
            AggsHandleFunction func = (AggsHandleFunction)this.aggsHandlers[i].newInstance(cl);
            func.open(new PerKeyStateDataViewStore(this.getRuntimeContext()));
            this.processors[i] = func;
            this.joinedRows[i] = new JoinedRowData();
        }
        this.aggsHandlers = null;
    }

    @Override
    public void processElement(StreamRecord<RowData> element) throws Exception {
        RowData input = element.getValue();
        boolean changePartition = this.lastInput == null || this.partitionComparator.compare(this.lastInput, input) != 0;
        RowData output = input;
        for (int i = 0; i < this.processors.length; ++i) {
            AggsHandleFunction processor = this.processors[i];
            if (changePartition || this.resetAccumulators[i]) {
                processor.setAccumulators(processor.createAccumulators());
            }
            processor.accumulate(input);
            RowData value = processor.getValue();
            output = this.joinedRows[i].replace(output, value);
        }
        this.collector.collect(output);
        if (changePartition) {
            this.lastInput = this.serializer.copy(input);
        }
    }
}

