/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.connector.source;

import java.util.ArrayList;
import java.util.Collection;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.table.store.connector.source.ContinuousFileSplitEnumerator;
import org.apache.flink.table.store.connector.source.FileStoreSourceSplit;
import org.apache.flink.table.store.connector.source.FlinkSource;
import org.apache.flink.table.store.connector.source.PendingSplitsCheckpoint;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.table.DataTable;
import org.apache.flink.table.store.table.source.DataTableScan;
import org.apache.flink.table.store.table.source.snapshot.ContinuousDataFileSnapshotEnumerator;

public class ContinuousFileStoreSource
extends FlinkSource {
    private static final long serialVersionUID = 3L;
    private final DataTable table;
    private final ContinuousDataFileSnapshotEnumerator.Factory enumeratorFactory;

    public ContinuousFileStoreSource(DataTable table, @Nullable int[][] projectedFields, @Nullable Predicate predicate, @Nullable Long limit) {
        this(table, projectedFields, predicate, limit, ContinuousDataFileSnapshotEnumerator::create);
    }

    public ContinuousFileStoreSource(DataTable table, @Nullable int[][] projectedFields, @Nullable Predicate predicate, @Nullable Long limit, ContinuousDataFileSnapshotEnumerator.Factory enumeratorFactory) {
        super(table, projectedFields, predicate, limit);
        this.table = table;
        this.enumeratorFactory = enumeratorFactory;
    }

    public Boundedness getBoundedness() {
        return Boundedness.CONTINUOUS_UNBOUNDED;
    }

    public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnumerator(SplitEnumeratorContext<FileStoreSourceSplit> context, PendingSplitsCheckpoint checkpoint) {
        DataTableScan scan = this.table.newScan();
        if (this.predicate != null) {
            scan.withFilter(this.predicate);
        }
        Long nextSnapshotId = null;
        Collection<FileStoreSourceSplit> splits = new ArrayList<FileStoreSourceSplit>();
        if (checkpoint != null) {
            nextSnapshotId = checkpoint.currentSnapshotId();
            splits = checkpoint.splits();
        }
        return new ContinuousFileSplitEnumerator(context, splits, nextSnapshotId, this.table.options().continuousDiscoveryInterval().toMillis(), this.enumeratorFactory.create(this.table, scan, nextSnapshotId));
    }
}

