/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join.lookup;

import java.util.ArrayList;
import java.util.Collection;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
import org.apache.flink.table.runtime.generated.GeneratedFunction;
import org.apache.flink.table.runtime.generated.GeneratedResultFuture;
import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.util.Collector;

public class AsyncLookupJoinWithCalcRunner
extends AsyncLookupJoinRunner {
    private static final long serialVersionUID = 8758670006385551407L;
    private final GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedCalc;

    public AsyncLookupJoinWithCalcRunner(GeneratedFunction<AsyncFunction<RowData, Object>> generatedFetcher, DataStructureConverter<RowData, Object> fetcherConverter, GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedCalc, GeneratedResultFuture<TableFunctionResultFuture<RowData>> generatedResultFuture, RowDataSerializer rightRowSerializer, boolean isLeftOuterJoin, int asyncBufferCapacity) {
        super(generatedFetcher, fetcherConverter, generatedResultFuture, rightRowSerializer, isLeftOuterJoin, asyncBufferCapacity);
        this.generatedCalc = generatedCalc;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.generatedCalc.compile(this.getRuntimeContext().getUserCodeClassLoader());
    }

    @Override
    public TableFunctionResultFuture<RowData> createFetcherResultFuture(Configuration parameters) throws Exception {
        TableFunctionResultFuture<RowData> joinConditionCollector = super.createFetcherResultFuture(parameters);
        FlatMapFunction calc = (FlatMapFunction)this.generatedCalc.newInstance(this.getRuntimeContext().getUserCodeClassLoader());
        FunctionUtils.setFunctionRuntimeContext(calc, this.getRuntimeContext());
        FunctionUtils.openFunction(calc, parameters);
        return new TemporalTableCalcResultFuture(calc, joinConditionCollector);
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    private class CalcCollectionCollector
    implements Collector<RowData> {
        Collection<RowData> collection;

        private CalcCollectionCollector() {
        }

        public void reset() {
            this.collection = new ArrayList<RowData>();
        }

        @Override
        public void collect(RowData record) {
            this.collection.add(AsyncLookupJoinWithCalcRunner.this.rightRowSerializer.copy(record));
        }

        @Override
        public void close() {
        }
    }

    private class TemporalTableCalcResultFuture
    extends TableFunctionResultFuture<RowData> {
        private static final long serialVersionUID = -6360673852888872924L;
        private final FlatMapFunction<RowData, RowData> calc;
        private final TableFunctionResultFuture<RowData> joinConditionResultFuture;
        private final CalcCollectionCollector calcCollector;

        private TemporalTableCalcResultFuture(FlatMapFunction<RowData, RowData> calc, TableFunctionResultFuture<RowData> joinConditionResultFuture) {
            this.calcCollector = new CalcCollectionCollector();
            this.calc = calc;
            this.joinConditionResultFuture = joinConditionResultFuture;
        }

        @Override
        public void setInput(Object input) {
            this.joinConditionResultFuture.setInput(input);
            this.calcCollector.reset();
        }

        @Override
        public void setResultFuture(ResultFuture<?> resultFuture) {
            this.joinConditionResultFuture.setResultFuture(resultFuture);
        }

        @Override
        public void complete(Collection<RowData> result) {
            if (result == null || result.size() == 0) {
                this.joinConditionResultFuture.complete(result);
            } else {
                for (RowData row : result) {
                    try {
                        this.calc.flatMap(row, this.calcCollector);
                    }
                    catch (Exception e) {
                        this.joinConditionResultFuture.completeExceptionally(e);
                    }
                }
                this.joinConditionResultFuture.complete(this.calcCollector.collection);
            }
        }

        @Override
        public void close() throws Exception {
            super.close();
            this.joinConditionResultFuture.close();
            FunctionUtils.closeFunction(this.calc);
        }
    }
}

