/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.connector.source;

import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.connector.FlinkConnectorOptions;
import org.apache.flink.table.store.connector.TableStoreDataStreamScanProvider;
import org.apache.flink.table.store.connector.lookup.FileStoreLookupFunction;
import org.apache.flink.table.store.connector.source.FlinkSourceBuilder;
import org.apache.flink.table.store.connector.source.FlinkTableSource;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.log.LogSourceProvider;
import org.apache.flink.table.store.log.LogStoreTableFactory;
import org.apache.flink.table.store.table.AppendOnlyFileStoreTable;
import org.apache.flink.table.store.table.ChangelogValueCountFileStoreTable;
import org.apache.flink.table.store.table.ChangelogWithKeyFileStoreTable;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.utils.Projection;

public class TableStoreSource
extends FlinkTableSource
implements LookupTableSource,
SupportsWatermarkPushDown {
    private final ObjectIdentifier tableIdentifier;
    private final FileStoreTable table;
    private final boolean streaming;
    private final DynamicTableFactory.Context context;
    @Nullable
    private final LogStoreTableFactory logStoreTableFactory;
    @Nullable
    private WatermarkStrategy<RowData> watermarkStrategy;

    public TableStoreSource(ObjectIdentifier tableIdentifier, FileStoreTable table, boolean streaming, DynamicTableFactory.Context context, @Nullable LogStoreTableFactory logStoreTableFactory) {
        this(tableIdentifier, table, streaming, context, logStoreTableFactory, null, null, null, null);
    }

    private TableStoreSource(ObjectIdentifier tableIdentifier, FileStoreTable table, boolean streaming, DynamicTableFactory.Context context, @Nullable LogStoreTableFactory logStoreTableFactory, @Nullable Predicate predicate, @Nullable int[][] projectFields, @Nullable Long limit, @Nullable WatermarkStrategy<RowData> watermarkStrategy) {
        super(table, predicate, projectFields, limit);
        this.tableIdentifier = tableIdentifier;
        this.table = table;
        this.streaming = streaming;
        this.context = context;
        this.logStoreTableFactory = logStoreTableFactory;
        this.predicate = predicate;
        this.projectFields = projectFields;
        this.limit = limit;
        this.watermarkStrategy = watermarkStrategy;
    }

    public ChangelogMode getChangelogMode() {
        if (!this.streaming) {
            return ChangelogMode.insertOnly();
        }
        if (this.table instanceof AppendOnlyFileStoreTable) {
            return ChangelogMode.insertOnly();
        }
        if (this.table instanceof ChangelogValueCountFileStoreTable) {
            return ChangelogMode.all();
        }
        if (this.table instanceof ChangelogWithKeyFileStoreTable) {
            Configuration options = Configuration.fromMap(this.table.schema().options());
            if (((Boolean)options.get(CoreOptions.LOG_SCAN_REMOVE_NORMALIZE)).booleanValue()) {
                return ChangelogMode.all();
            }
            if (this.logStoreTableFactory == null && options.get(CoreOptions.CHANGELOG_PRODUCER) != CoreOptions.ChangelogProducer.NONE) {
                return ChangelogMode.all();
            }
            return options.get(CoreOptions.LOG_CONSISTENCY) == CoreOptions.LogConsistency.TRANSACTIONAL && options.get(CoreOptions.LOG_CHANGELOG_MODE) == CoreOptions.LogChangelogMode.ALL ? ChangelogMode.all() : ChangelogMode.upsert();
        }
        throw new UnsupportedOperationException("Unsupported Table subclass " + this.table.getClass().getName() + " for streaming mode.");
    }

    public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext scanContext) {
        LogSourceProvider logSourceProvider = null;
        if (this.logStoreTableFactory != null) {
            logSourceProvider = this.logStoreTableFactory.createSourceProvider(this.context, (DynamicTableSource.Context)scanContext, this.projectFields);
        }
        FlinkSourceBuilder sourceBuilder = new FlinkSourceBuilder(this.tableIdentifier, this.table).withContinuousMode(this.streaming).withLogSourceProvider(logSourceProvider).withProjection(this.projectFields).withPredicate(this.predicate).withLimit(this.limit).withParallelism((Integer)Configuration.fromMap(this.table.schema().options()).get(FlinkConnectorOptions.SCAN_PARALLELISM)).withWatermarkStrategy(this.watermarkStrategy);
        return new TableStoreDataStreamScanProvider(!this.streaming, env -> sourceBuilder.withEnv((StreamExecutionEnvironment)env).build());
    }

    public DynamicTableSource copy() {
        return new TableStoreSource(this.tableIdentifier, this.table, this.streaming, this.context, this.logStoreTableFactory, this.predicate, this.projectFields, this.limit, this.watermarkStrategy);
    }

    public String asSummaryString() {
        return "TableStore-DataSource";
    }

    public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
        this.watermarkStrategy = watermarkStrategy;
    }

    public LookupTableSource.LookupRuntimeProvider getLookupRuntimeProvider(LookupTableSource.LookupContext context) {
        if (this.limit != null) {
            throw new RuntimeException("Limit push down should not happen in Lookup source, but it is " + this.limit);
        }
        int[] projection = this.projectFields == null ? IntStream.range(0, this.table.schema().fields().size()).toArray() : Projection.of(this.projectFields).toTopLevelIndexes();
        int[] joinKey = Projection.of(context.getKeys()).toTopLevelIndexes();
        return TableFunctionProvider.of((TableFunction)new FileStoreLookupFunction(this.table, projection, joinKey, this.predicate));
    }
}

