/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.base.source.hybrid;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.hybrid.HybridSourceSplit;
import org.apache.flink.connector.base.source.hybrid.SourceReaderFinishedEvent;
import org.apache.flink.connector.base.source.hybrid.SwitchSourceEvent;
import org.apache.flink.connector.base.source.hybrid.SwitchedSources;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HybridSourceReader<T>
implements SourceReader<T, HybridSourceSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(HybridSourceReader.class);
    private final SourceReaderContext readerContext;
    private final SwitchedSources switchedSources = new SwitchedSources();
    private int currentSourceIndex = -1;
    private boolean isFinalSource;
    private SourceReader<T, ? extends SourceSplit> currentReader;
    private CompletableFuture<Void> availabilityFuture = new CompletableFuture();
    private List<HybridSourceSplit> restoredSplits = new ArrayList<HybridSourceSplit>();

    public HybridSourceReader(SourceReaderContext readerContext) {
        this.readerContext = readerContext;
    }

    @Override
    public void start() {
        int initialSourceIndex = this.currentSourceIndex;
        if (!this.restoredSplits.isEmpty()) {
            initialSourceIndex = this.restoredSplits.get(0).sourceIndex() - 1;
        }
        this.readerContext.sendSourceEventToCoordinator(new SourceReaderFinishedEvent(initialSourceIndex));
    }

    @Override
    public InputStatus pollNext(ReaderOutput output) throws Exception {
        if (this.currentReader == null) {
            return InputStatus.NOTHING_AVAILABLE;
        }
        InputStatus status = this.currentReader.pollNext(output);
        if (status == InputStatus.END_OF_INPUT) {
            LOG.info("End of input subtask={} sourceIndex={} {}", new Object[]{this.readerContext.getIndexOfSubtask(), this.currentSourceIndex, this.currentReader});
            this.readerContext.sendSourceEventToCoordinator(new SourceReaderFinishedEvent(this.currentSourceIndex));
            if (!this.isFinalSource) {
                if (this.availabilityFuture.isDone()) {
                    this.availabilityFuture = new CompletableFuture();
                }
                return InputStatus.NOTHING_AVAILABLE;
            }
        }
        return status;
    }

    @Override
    public List<HybridSourceSplit> snapshotState(long checkpointId) {
        List state = this.currentReader != null ? this.currentReader.snapshotState(checkpointId) : Collections.emptyList();
        return HybridSourceSplit.wrapSplits(state, this.currentSourceIndex, this.switchedSources);
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        if (this.currentReader != null) {
            this.currentReader.notifyCheckpointComplete(checkpointId);
        }
    }

    @Override
    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        if (this.currentReader != null) {
            this.currentReader.notifyCheckpointAborted(checkpointId);
        }
    }

    @Override
    public CompletableFuture<Void> isAvailable() {
        return this.availabilityFuture;
    }

    @Override
    public void addSplits(List<HybridSourceSplit> splits) {
        LOG.info("Adding splits subtask={} sourceIndex={} currentReader={} {}", new Object[]{this.readerContext.getIndexOfSubtask(), this.currentSourceIndex, this.currentReader, splits});
        if (this.currentSourceIndex < 0) {
            this.restoredSplits.addAll(splits);
        } else {
            ArrayList<SourceSplit> realSplits = new ArrayList<SourceSplit>(splits.size());
            for (HybridSourceSplit split : splits) {
                Preconditions.checkState(split.sourceIndex() == this.currentSourceIndex, "Split %s while current source is %s", split, this.currentSourceIndex);
                realSplits.add(HybridSourceSplit.unwrapSplit(split, this.switchedSources));
            }
            this.currentReader.addSplits(realSplits);
        }
    }

    @Override
    public void notifyNoMoreSplits() {
        if (this.currentReader != null) {
            this.currentReader.notifyNoMoreSplits();
        }
        LOG.debug("No more splits for subtask={} sourceIndex={} currentReader={}", new Object[]{this.readerContext.getIndexOfSubtask(), this.currentSourceIndex, this.currentReader});
    }

    @Override
    public void handleSourceEvents(SourceEvent sourceEvent) {
        if (sourceEvent instanceof SwitchSourceEvent) {
            SwitchSourceEvent sse = (SwitchSourceEvent)sourceEvent;
            LOG.info("Switch source event: subtask={} sourceIndex={} source={}", new Object[]{this.readerContext.getIndexOfSubtask(), sse.sourceIndex(), sse.source()});
            this.switchedSources.put(sse.sourceIndex(), sse.source());
            this.setCurrentReader(sse.sourceIndex());
            this.isFinalSource = sse.isFinalSource();
            if (!this.availabilityFuture.isDone()) {
                this.availabilityFuture.complete(null);
            }
        } else {
            this.currentReader.handleSourceEvents(sourceEvent);
        }
    }

    @Override
    public void close() throws Exception {
        if (this.currentReader != null) {
            this.currentReader.close();
        }
        LOG.debug("Reader closed: subtask={} sourceIndex={} currentReader={}", new Object[]{this.readerContext.getIndexOfSubtask(), this.currentSourceIndex, this.currentReader});
    }

    private void setCurrentReader(int index) {
        SourceReader reader;
        Preconditions.checkArgument(index != this.currentSourceIndex);
        if (this.currentReader != null) {
            try {
                this.currentReader.close();
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to close current reader", e);
            }
            LOG.debug("Reader closed: subtask={} sourceIndex={} currentReader={}", new Object[]{this.readerContext.getIndexOfSubtask(), this.currentSourceIndex, this.currentReader});
        }
        Source source = this.switchedSources.sourceOf(index);
        try {
            reader = source.createReader(this.readerContext);
        }
        catch (Exception e) {
            throw new RuntimeException("Failed tp create reader", e);
        }
        reader.start();
        this.currentSourceIndex = index;
        this.currentReader = reader;
        this.currentReader.isAvailable().whenComplete((result, ex) -> {
            if (ex == null) {
                this.availabilityFuture.complete((Void)result);
            } else {
                this.availabilityFuture.completeExceptionally((Throwable)ex);
            }
        });
        LOG.debug("Reader started: subtask={} sourceIndex={} {}", new Object[]{this.readerContext.getIndexOfSubtask(), this.currentSourceIndex, reader});
        if (!this.restoredSplits.isEmpty()) {
            ArrayList<HybridSourceSplit> splits = new ArrayList<HybridSourceSplit>(this.restoredSplits.size());
            Iterator<HybridSourceSplit> it = this.restoredSplits.iterator();
            while (it.hasNext()) {
                HybridSourceSplit hybridSplit = it.next();
                if (hybridSplit.sourceIndex() != index) continue;
                splits.add(hybridSplit);
                it.remove();
            }
            this.addSplits((List<HybridSourceSplit>)splits);
        }
    }
}

