/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.transformations;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.OptionalLong;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies;
import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.util.UserCodeClassLoader;

@Internal
public class SinkV1Adapter<InputT, CommT, WriterStateT, GlobalCommT>
implements org.apache.flink.api.connector.sink2.Sink<InputT> {
    private final Sink<InputT, CommT, WriterStateT, GlobalCommT> sink;

    private SinkV1Adapter(Sink<InputT, CommT, WriterStateT, GlobalCommT> sink) {
        this.sink = sink;
    }

    public static <InputT> org.apache.flink.api.connector.sink2.Sink<InputT> wrap(Sink<InputT, ?, ?, ?> sink) {
        return new SinkV1Adapter(sink).asSpecializedSink();
    }

    public SinkWriterV1Adapter<InputT, CommT, WriterStateT> createWriter(Sink.InitContext context) throws IOException {
        SinkWriter<InputT, CommT, WriterStateT> writer = this.sink.createWriter(new InitContextAdapter(context), Collections.emptyList());
        return new SinkWriterV1Adapter<InputT, CommT, WriterStateT>(writer);
    }

    public org.apache.flink.api.connector.sink2.Sink<InputT> asSpecializedSink() {
        boolean stateful = false;
        boolean globalCommitter = false;
        boolean committer = false;
        if (this.sink.getWriterStateSerializer().isPresent()) {
            stateful = true;
        }
        if (this.sink.getGlobalCommittableSerializer().isPresent()) {
            globalCommitter = true;
        }
        try {
            if (this.sink.createCommitter().isPresent()) {
                committer = true;
            }
        }
        catch (IOException e) {
            throw new IllegalStateException("Failed to instantiate committer.", e);
        }
        if (globalCommitter && committer && stateful) {
            return new StatefulGlobalTwoPhaseCommittingSinkAdapter();
        }
        if (globalCommitter) {
            return new GlobalCommittingSinkAdapter();
        }
        if (committer && stateful) {
            return new StatefulTwoPhaseCommittingSinkAdapter();
        }
        if (committer) {
            return new TwoPhaseCommittingSinkAdapter();
        }
        if (stateful) {
            return new StatefulSinkAdapter();
        }
        return this;
    }

    @Internal
    public class GlobalCommitterAdapter
    implements org.apache.flink.api.connector.sink2.Committer<CommT> {
        final GlobalCommitter<CommT, GlobalCommT> globalCommitter;
        final SimpleVersionedSerializer<GlobalCommT> globalCommittableSerializer;

        GlobalCommitterAdapter() {
            try {
                this.globalCommitter = SinkV1Adapter.this.sink.createGlobalCommitter().get();
                this.globalCommittableSerializer = SinkV1Adapter.this.sink.getGlobalCommittableSerializer().get();
            }
            catch (IOException e) {
                throw new UncheckedIOException("Cannot create global committer", e);
            }
        }

        @Override
        public void close() throws Exception {
            this.globalCommitter.close();
        }

        @Override
        public void commit(Collection<Committer.CommitRequest<CommT>> committables) throws IOException, InterruptedException {
            if (committables.isEmpty()) {
                return;
            }
            List rawCommittables = committables.stream().map(Committer.CommitRequest::getCommittable).collect(Collectors.toList());
            List globalCommittables = Collections.singletonList(this.globalCommitter.combine(rawCommittables));
            List failures = this.globalCommitter.commit(globalCommittables);
            if (!failures.isEmpty()) {
                committables.forEach(Committer.CommitRequest::retryLater);
            }
        }

        public GlobalCommitter<CommT, GlobalCommT> getGlobalCommitter() {
            return this.globalCommitter;
        }

        public SimpleVersionedSerializer<GlobalCommT> getGlobalCommittableSerializer() {
            return this.globalCommittableSerializer;
        }
    }

    private static class NoopCommitter<CommT>
    implements Committer<CommT> {
        private NoopCommitter() {
        }

        @Override
        public List<CommT> commit(List<CommT> committables) {
            return Collections.emptyList();
        }

        @Override
        public void close() throws Exception {
        }
    }

    private class StatefulGlobalTwoPhaseCommittingSinkAdapter
    extends StatefulTwoPhaseCommittingSinkAdapter
    implements WithPostCommitTopology<InputT, CommT> {
        GlobalCommittingSinkAdapter globalCommittingSinkAdapter;

        private StatefulGlobalTwoPhaseCommittingSinkAdapter() {
            this.globalCommittingSinkAdapter = new GlobalCommittingSinkAdapter();
        }

        @Override
        public void addPostCommitTopology(DataStream<CommittableMessage<CommT>> committables) {
            this.globalCommittingSinkAdapter.addPostCommitTopology(committables);
        }
    }

    private class StatefulTwoPhaseCommittingSinkAdapter
    extends StatefulSinkAdapter
    implements TwoPhaseCommittingSink<InputT, CommT>,
    StatefulSink.WithCompatibleState {
        TwoPhaseCommittingSinkAdapter adapter;

        private StatefulTwoPhaseCommittingSinkAdapter() {
            this.adapter = new TwoPhaseCommittingSinkAdapter();
        }

        @Override
        public org.apache.flink.api.connector.sink2.Committer<CommT> createCommitter() throws IOException {
            return this.adapter.createCommitter();
        }

        @Override
        public SimpleVersionedSerializer<CommT> getCommittableSerializer() {
            return this.adapter.getCommittableSerializer();
        }

        @Override
        public Collection<String> getCompatibleWriterStateNames() {
            return this.adapter.getCompatibleWriterStateNames();
        }
    }

    private class GlobalCommittingSinkAdapter
    extends TwoPhaseCommittingSinkAdapter
    implements WithPostCommitTopology<InputT, CommT> {
        private GlobalCommittingSinkAdapter() {
        }

        @Override
        public void addPostCommitTopology(DataStream<CommittableMessage<CommT>> committables) {
            StandardSinkTopologies.addGlobalCommitter(committables, () -> new GlobalCommitterAdapter(), () -> SinkV1Adapter.this.sink.getCommittableSerializer().get());
        }
    }

    private class TwoPhaseCommittingSinkAdapter
    extends PlainSinkAdapter
    implements TwoPhaseCommittingSink<InputT, CommT>,
    StatefulSink.WithCompatibleState {
        private TwoPhaseCommittingSinkAdapter() {
        }

        @Override
        public org.apache.flink.api.connector.sink2.Committer<CommT> createCommitter() throws IOException {
            return new CommitterAdapter(SinkV1Adapter.this.sink.createCommitter().orElse(new NoopCommitter()));
        }

        @Override
        public SimpleVersionedSerializer<CommT> getCommittableSerializer() {
            return SinkV1Adapter.this.sink.getCommittableSerializer().orElseThrow(() -> new IllegalStateException("This method should only be called after adapter established that the result is non-empty."));
        }

        @Override
        public Collection<String> getCompatibleWriterStateNames() {
            return SinkV1Adapter.this.sink.getCompatibleStateNames();
        }
    }

    private class StatefulSinkAdapter
    extends PlainSinkAdapter
    implements StatefulSink<InputT, WriterStateT> {
        private StatefulSinkAdapter() {
        }

        @Override
        public StatefulSink.StatefulSinkWriter<InputT, WriterStateT> restoreWriter(Sink.InitContext context, Collection<WriterStateT> recoveredState) throws IOException {
            SinkWriter writer = SinkV1Adapter.this.sink.createWriter(new InitContextAdapter(context), new ArrayList(recoveredState));
            return new SinkWriterV1Adapter(writer);
        }

        @Override
        public SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer() {
            return SinkV1Adapter.this.sink.getWriterStateSerializer().orElseThrow(() -> new IllegalStateException("This method should only be called after adapter established that the result is non-empty."));
        }
    }

    class PlainSinkAdapter
    implements org.apache.flink.api.connector.sink2.Sink<InputT> {
        PlainSinkAdapter() {
        }

        public SinkWriterV1Adapter<InputT, CommT, WriterStateT> createWriter(Sink.InitContext context) throws IOException {
            return SinkV1Adapter.this.createWriter(context);
        }

        public Sink<InputT, CommT, WriterStateT, GlobalCommT> getSink() {
            return SinkV1Adapter.this.sink;
        }
    }

    private static class CommitterAdapter<CommT>
    implements org.apache.flink.api.connector.sink2.Committer<CommT> {
        private final Committer<CommT> committer;

        public CommitterAdapter(Committer<CommT> committer) {
            this.committer = committer;
        }

        @Override
        public void commit(Collection<Committer.CommitRequest<CommT>> commitRequests) throws IOException, InterruptedException {
            List<CommT> failed = this.committer.commit(commitRequests.stream().map(Committer.CommitRequest::getCommittable).collect(Collectors.toList()));
            if (!failed.isEmpty()) {
                Set indexed = Collections.newSetFromMap(new IdentityHashMap());
                indexed.addAll(failed);
                commitRequests.stream().filter(request -> indexed.contains(request.getCommittable())).forEach(Committer.CommitRequest::retryLater);
            }
        }

        @Override
        public void close() throws Exception {
            this.committer.close();
        }
    }

    private static class ProcessingTimeServiceAdapter
    implements Sink.ProcessingTimeService {
        private final ProcessingTimeService processingTimeService;

        public ProcessingTimeServiceAdapter(ProcessingTimeService processingTimeService) {
            this.processingTimeService = processingTimeService;
        }

        @Override
        public long getCurrentProcessingTime() {
            return this.processingTimeService.getCurrentProcessingTime();
        }

        @Override
        public void registerProcessingTimer(long time, Sink.ProcessingTimeService.ProcessingTimeCallback processingTimerCallback) {
            this.processingTimeService.registerTimer(time, new ProcessingTimeCallbackAdapter(processingTimerCallback));
        }
    }

    private static class ProcessingTimeCallbackAdapter
    implements ProcessingTimeService.ProcessingTimeCallback {
        private final Sink.ProcessingTimeService.ProcessingTimeCallback processingTimerCallback;

        public ProcessingTimeCallbackAdapter(Sink.ProcessingTimeService.ProcessingTimeCallback processingTimerCallback) {
            this.processingTimerCallback = processingTimerCallback;
        }

        @Override
        public void onProcessingTime(long time) throws IOException, InterruptedException {
            this.processingTimerCallback.onProcessingTime(time);
        }
    }

    private static class InitContextAdapter
    implements Sink.InitContext {
        private final Sink.InitContext context;

        public InitContextAdapter(Sink.InitContext context) {
            this.context = context;
        }

        @Override
        public UserCodeClassLoader getUserCodeClassLoader() {
            return this.context.getUserCodeClassLoader();
        }

        @Override
        public MailboxExecutor getMailboxExecutor() {
            return this.context.getMailboxExecutor();
        }

        @Override
        public Sink.ProcessingTimeService getProcessingTimeService() {
            return new ProcessingTimeServiceAdapter(this.context.getProcessingTimeService());
        }

        @Override
        public int getSubtaskId() {
            return this.context.getSubtaskId();
        }

        @Override
        public int getNumberOfParallelSubtasks() {
            return this.context.getNumberOfParallelSubtasks();
        }

        @Override
        public SinkWriterMetricGroup metricGroup() {
            return this.context.metricGroup();
        }

        @Override
        public OptionalLong getRestoredCheckpointId() {
            return this.context.getRestoredCheckpointId();
        }

        public SerializationSchema.InitializationContext asSerializationSchemaInitializationContext() {
            return this.context.asSerializationSchemaInitializationContext();
        }
    }

    private static class WriterContextAdapter
    implements SinkWriter.Context {
        private SinkWriter.Context context;

        private WriterContextAdapter() {
        }

        public void setContext(SinkWriter.Context context) {
            this.context = context;
        }

        @Override
        public long currentWatermark() {
            return this.context.currentWatermark();
        }

        @Override
        public Long timestamp() {
            return this.context.timestamp();
        }
    }

    private static class SinkWriterV1Adapter<InputT, CommT, WriterStateT>
    implements StatefulSink.StatefulSinkWriter<InputT, WriterStateT>,
    TwoPhaseCommittingSink.PrecommittingSinkWriter<InputT, CommT> {
        private final SinkWriter<InputT, CommT, WriterStateT> writer;
        private boolean endOfInput = false;
        private final WriterContextAdapter contextAdapter = new WriterContextAdapter();

        public SinkWriterV1Adapter(SinkWriter<InputT, CommT, WriterStateT> writer) {
            this.writer = writer;
        }

        @Override
        public void write(InputT element, SinkWriter.Context context) throws IOException, InterruptedException {
            this.contextAdapter.setContext(context);
            this.writer.write(element, this.contextAdapter);
        }

        @Override
        public void flush(boolean endOfInput) throws IOException, InterruptedException {
            this.endOfInput = endOfInput;
        }

        @Override
        public List<WriterStateT> snapshotState(long checkpointId) throws IOException {
            return this.writer.snapshotState(checkpointId);
        }

        @Override
        public Collection<CommT> prepareCommit() throws IOException, InterruptedException {
            return this.writer.prepareCommit(this.endOfInput);
        }

        @Override
        public void close() throws Exception {
            this.writer.close();
        }

        @Override
        public void writeWatermark(Watermark watermark) throws IOException, InterruptedException {
            this.writer.writeWatermark(watermark);
        }
    }
}

