/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.connector.sink;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.table.store.connector.sink.CommittableStateManager;
import org.apache.flink.table.store.connector.sink.Committer;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.util.function.SerializableSupplier;

public class RestoreAndFailCommittableStateManager
implements CommittableStateManager {
    private static final long serialVersionUID = 1L;
    private final SerializableSupplier<SimpleVersionedSerializer<ManifestCommittable>> committableSerializer;
    private ListState<ManifestCommittable> streamingCommitterState;

    public RestoreAndFailCommittableStateManager(SerializableSupplier<SimpleVersionedSerializer<ManifestCommittable>> committableSerializer) {
        this.committableSerializer = committableSerializer;
    }

    @Override
    public void initializeState(StateInitializationContext context, Committer committer) throws Exception {
        this.streamingCommitterState = new SimpleVersionedListState(context.getOperatorStateStore().getListState(new ListStateDescriptor("streaming_committer_raw_states", (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE)), (SimpleVersionedSerializer)this.committableSerializer.get());
        ArrayList<ManifestCommittable> restored = new ArrayList<ManifestCommittable>();
        ((Iterable)this.streamingCommitterState.get()).forEach(restored::add);
        this.streamingCommitterState.clear();
        this.recover(restored, committer);
    }

    private void recover(List<ManifestCommittable> committables, Committer committer) throws Exception {
        if (!(committables = committer.filterRecoveredCommittables(committables)).isEmpty()) {
            committer.commit(committables);
            throw new RuntimeException("This exception is intentionally thrown after committing the restored checkpoints. By restarting the job we hope that writers can start writing based on these new commits.");
        }
    }

    @Override
    public void snapshotState(StateSnapshotContext context, List<ManifestCommittable> committables) throws Exception {
        this.streamingCommitterState.update(committables);
    }
}

