/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.security.token;

import java.io.IOException;
import java.time.Clock;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
import org.apache.flink.runtime.security.token.DelegationTokenProvider;
import org.apache.flink.runtime.security.token.KerberosLoginProvider;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KerberosDelegationTokenManager
implements DelegationTokenManager {
    private static final Logger LOG = LoggerFactory.getLogger(KerberosDelegationTokenManager.class);
    private final Configuration configuration;
    private final double tokensRenewalTimeRatio;
    private final long renewalRetryBackoffPeriod;
    private final KerberosLoginProvider kerberosLoginProvider;
    @VisibleForTesting
    final Map<String, DelegationTokenProvider> delegationTokenProviders;
    @Nullable
    private final ScheduledExecutor scheduledExecutor;
    @Nullable
    private final ExecutorService ioExecutor;
    @Nullable
    private ScheduledFuture<?> tgtRenewalFuture;
    private final Object tokensUpdateFutureLock = new Object();
    @Nullable
    @GuardedBy(value="tokensUpdateFutureLock")
    private ScheduledFuture<?> tokensUpdateFuture;

    public KerberosDelegationTokenManager(Configuration configuration, @Nullable ScheduledExecutor scheduledExecutor, @Nullable ExecutorService ioExecutor) {
        this(configuration, scheduledExecutor, ioExecutor, new KerberosLoginProvider(configuration));
    }

    public KerberosDelegationTokenManager(Configuration configuration, @Nullable ScheduledExecutor scheduledExecutor, @Nullable ExecutorService ioExecutor, KerberosLoginProvider kerberosLoginProvider) {
        this.configuration = Preconditions.checkNotNull(configuration, "Flink configuration must not be null");
        SecurityConfiguration securityConfiguration = new SecurityConfiguration(configuration);
        this.tokensRenewalTimeRatio = configuration.get(SecurityOptions.KERBEROS_TOKENS_RENEWAL_TIME_RATIO);
        this.renewalRetryBackoffPeriod = configuration.get(SecurityOptions.KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF).toMillis();
        this.kerberosLoginProvider = kerberosLoginProvider;
        this.delegationTokenProviders = this.loadProviders();
        this.scheduledExecutor = scheduledExecutor;
        this.ioExecutor = ioExecutor;
    }

    private Map<String, DelegationTokenProvider> loadProviders() {
        LOG.info("Loading delegation token providers");
        ServiceLoader<DelegationTokenProvider> serviceLoader = ServiceLoader.load(DelegationTokenProvider.class);
        HashMap<String, DelegationTokenProvider> providers = new HashMap<String, DelegationTokenProvider>();
        for (DelegationTokenProvider provider : serviceLoader) {
            try {
                if (this.isProviderEnabled(provider.serviceName())) {
                    provider.init(this.configuration);
                    LOG.info("Delegation token provider {} loaded and initialized", (Object)provider.serviceName());
                    providers.put(provider.serviceName(), provider);
                    continue;
                }
                LOG.info("Delegation token provider {} is disabled so not loaded", (Object)provider.serviceName());
            }
            catch (Exception | NoClassDefFoundError e) {
                LOG.error("Failed to initialize delegation token provider {}", (Object)provider.serviceName(), (Object)e);
                if (e instanceof NoClassDefFoundError) continue;
                throw new FlinkRuntimeException(e);
            }
        }
        LOG.info("Delegation token providers loaded successfully");
        return providers;
    }

    @VisibleForTesting
    boolean isProviderEnabled(String serviceName) {
        return this.configuration.getBoolean(String.format("security.kerberos.token.provider.%s.enabled", serviceName), true);
    }

    @VisibleForTesting
    boolean isProviderLoaded(String serviceName) {
        return this.delegationTokenProviders.containsKey(serviceName);
    }

    @Override
    public void obtainDelegationTokens(Credentials credentials) throws Exception {
        LOG.info("Obtaining delegation tokens");
        if (this.kerberosLoginProvider.isLoginPossible()) {
            UserGroupInformation freshUGI = this.kerberosLoginProvider.doLogin();
            freshUGI.doAs(() -> {
                this.obtainDelegationTokensAndGetNextRenewal(credentials);
                return null;
            });
            LOG.info("Delegation tokens obtained successfully");
        } else {
            LOG.info("Real user has no kerberos credentials so no tokens obtained");
        }
    }

    protected Optional<Long> obtainDelegationTokensAndGetNextRenewal(Credentials credentials) {
        Optional<Long> nextRenewal = this.delegationTokenProviders.values().stream().map(provider -> {
            try {
                Optional<Object> nr = Optional.empty();
                if (provider.delegationTokensRequired()) {
                    LOG.debug("Obtaining delegation token for service {}", (Object)provider.serviceName());
                    nr = provider.obtainDelegationTokens(credentials);
                    LOG.debug("Obtained delegation token for service {} successfully", (Object)provider.serviceName());
                } else {
                    LOG.debug("Service {} does not need to obtain delegation token", (Object)provider.serviceName());
                }
                return nr;
            }
            catch (Exception e) {
                LOG.error("Failed to obtain delegation token for provider {}", (Object)provider.serviceName(), (Object)e);
                throw new FlinkRuntimeException(e);
            }
        }).flatMap(nr -> nr.map(Stream::of).orElseGet(Stream::empty)).min(Long::compare);
        credentials.getAllTokens().forEach(token -> LOG.debug("Token Service:{} Identifier:{}", (Object)token.getService(), (Object)token.getIdentifier()));
        return nextRenewal;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void start() throws Exception {
        Preconditions.checkNotNull(this.scheduledExecutor, "Scheduled executor must not be null");
        Preconditions.checkNotNull(this.ioExecutor, "IO executor must not be null");
        Object object = this.tokensUpdateFutureLock;
        synchronized (object) {
            Preconditions.checkState(this.tgtRenewalFuture == null && this.tokensUpdateFuture == null, "Manager is already started");
        }
        if (!this.kerberosLoginProvider.isLoginPossible()) {
            LOG.info("Renewal is NOT possible, skipping to start renewal task");
            return;
        }
        this.startTGTRenewal();
        this.startTokensUpdate();
    }

    @VisibleForTesting
    void startTGTRenewal() throws IOException {
        LOG.info("Starting TGT renewal task");
        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
        if (currentUser.isFromKeytab()) {
            long tgtRenewalPeriod = this.configuration.get(SecurityOptions.KERBEROS_RELOGIN_PERIOD).toMillis();
            this.tgtRenewalFuture = this.scheduledExecutor.scheduleAtFixedRate(() -> this.ioExecutor.execute(() -> {
                try {
                    LOG.debug("Renewing TGT");
                    currentUser.checkTGTAndReloginFromKeytab();
                    LOG.debug("TGT renewed successfully");
                }
                catch (Exception e) {
                    LOG.warn("Error while renewing TGT", (Throwable)e);
                }
            }), 0L, tgtRenewalPeriod, TimeUnit.MILLISECONDS);
            LOG.info("TGT renewal task started and reoccur in {} ms", (Object)tgtRenewalPeriod);
        } else {
            LOG.info("TGT renewal task not started");
        }
    }

    @VisibleForTesting
    void stopTGTRenewal() {
        if (this.tgtRenewalFuture != null) {
            this.tgtRenewalFuture.cancel(true);
            this.tgtRenewalFuture = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void startTokensUpdate() {
        block10: {
            try {
                LOG.info("Starting tokens update task");
                Credentials credentials = new Credentials();
                UserGroupInformation freshUGI = this.kerberosLoginProvider.doLogin();
                Optional nextRenewal = (Optional)freshUGI.doAs(() -> this.obtainDelegationTokensAndGetNextRenewal(credentials));
                if (nextRenewal.isPresent()) {
                    long renewalDelay = this.calculateRenewalDelay(Clock.systemDefaultZone(), (Long)nextRenewal.get());
                    Object object = this.tokensUpdateFutureLock;
                    synchronized (object) {
                        this.tokensUpdateFuture = this.scheduledExecutor.schedule(() -> this.ioExecutor.execute(this::startTokensUpdate), renewalDelay, TimeUnit.MILLISECONDS);
                    }
                    LOG.info("Tokens update task started with {} ms delay", (Object)renewalDelay);
                    break block10;
                }
                LOG.warn("Tokens update task not started because either no tokens obtained or none of the tokens specified its renewal date");
            }
            catch (InterruptedException e) {
                LOG.debug("Interrupted", (Throwable)e);
            }
            catch (Exception e) {
                Object object = this.tokensUpdateFutureLock;
                synchronized (object) {
                    this.tokensUpdateFuture = this.scheduledExecutor.schedule(() -> this.ioExecutor.execute(this::startTokensUpdate), this.renewalRetryBackoffPeriod, TimeUnit.MILLISECONDS);
                }
                LOG.warn("Failed to update tokens, will try again in {} ms", (Object)this.renewalRetryBackoffPeriod, (Object)e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void stopTokensUpdate() {
        Object object = this.tokensUpdateFutureLock;
        synchronized (object) {
            if (this.tokensUpdateFuture != null) {
                this.tokensUpdateFuture.cancel(true);
                this.tokensUpdateFuture = null;
            }
        }
    }

    @VisibleForTesting
    long calculateRenewalDelay(Clock clock, long nextRenewal) {
        long now = clock.millis();
        long renewalDelay = Math.round(this.tokensRenewalTimeRatio * (double)(nextRenewal - now));
        LOG.debug("Calculated delay on renewal is {}, based on next renewal {} and the ratio {}, and current time {}", new Object[]{renewalDelay, nextRenewal, this.tokensRenewalTimeRatio, now});
        return renewalDelay;
    }

    @Override
    public void stop() {
        LOG.info("Stopping credential renewal");
        this.stopTokensUpdate();
        this.stopTGTRenewal();
        LOG.info("Stopped credential renewal");
    }
}

