/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;

@Internal
public final class OperatorEventDispatcherImpl
implements OperatorEventDispatcher {
    private final Map<OperatorID, OperatorEventHandler> handlers;
    private final ClassLoader classLoader;
    private final TaskOperatorEventGateway toCoordinator;

    public OperatorEventDispatcherImpl(ClassLoader classLoader, TaskOperatorEventGateway toCoordinator) {
        this.classLoader = Preconditions.checkNotNull(classLoader);
        this.toCoordinator = Preconditions.checkNotNull(toCoordinator);
        this.handlers = new HashMap<OperatorID, OperatorEventHandler>();
    }

    void dispatchEventToHandlers(OperatorID operatorID, SerializedValue<OperatorEvent> serializedEvent) throws FlinkException {
        OperatorEvent evt;
        try {
            evt = serializedEvent.deserializeValue(this.classLoader);
        }
        catch (IOException | ClassNotFoundException e) {
            throw new FlinkException("Could not deserialize operator event", e);
        }
        OperatorEventHandler handler = this.handlers.get(operatorID);
        if (handler == null) {
            throw new FlinkException("Operator not registered for operator events");
        }
        handler.handleOperatorEvent(evt);
    }

    @Override
    public void registerEventHandler(OperatorID operator, OperatorEventHandler handler) {
        OperatorEventHandler prior = this.handlers.putIfAbsent(operator, handler);
        if (prior != null) {
            throw new IllegalStateException("already a handler registered for this operatorId");
        }
    }

    @Override
    public OperatorEventGateway getOperatorEventGateway(OperatorID operatorId) {
        return new OperatorEventGatewayImpl(this.toCoordinator, operatorId);
    }

    private static final class OperatorEventGatewayImpl
    implements OperatorEventGateway {
        private final TaskOperatorEventGateway toCoordinator;
        private final OperatorID operatorId;

        private OperatorEventGatewayImpl(TaskOperatorEventGateway toCoordinator, OperatorID operatorId) {
            this.toCoordinator = toCoordinator;
            this.operatorId = operatorId;
        }

        @Override
        public void sendEventToCoordinator(OperatorEvent event) {
            SerializedValue<OperatorEvent> serializedEvent;
            try {
                serializedEvent = new SerializedValue<OperatorEvent>(event);
            }
            catch (IOException e) {
                throw new FlinkRuntimeException("Cannot serialize operator event", e);
            }
            this.toCoordinator.sendOperatorEventToCoordinator(this.operatorId, serializedEvent);
        }
    }
}

