/*
 * Decompiled with CFR 0.152.
 */
package org.apache.avro.ipc.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import org.apache.avro.Protocol;
import org.apache.avro.ipc.CallFuture;
import org.apache.avro.ipc.Callback;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.ipc.netty.NettyTransportCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyTransceiver
extends Transceiver {
    public static final int DEFAULT_CONNECTION_TIMEOUT_MILLIS = 60000;
    public static final String NETTY_CONNECT_TIMEOUT_OPTION = "connectTimeoutMillis";
    public static final String NETTY_TCP_NODELAY_OPTION = "tcpNoDelay";
    public static final String NETTY_KEEPALIVE_OPTION = "keepAlive";
    public static final boolean DEFAULT_TCP_NODELAY_VALUE = true;
    private static final Logger LOG = LoggerFactory.getLogger((String)NettyTransceiver.class.getName());
    private final AtomicInteger serialGenerator = new AtomicInteger(0);
    private final Map<Integer, Callback<List<ByteBuffer>>> requests = new ConcurrentHashMap<Integer, Callback<List<ByteBuffer>>>();
    private final Integer connectTimeoutMillis;
    private final Bootstrap bootstrap;
    private final InetSocketAddress remoteAddr;
    private final EventLoopGroup workerGroup;
    volatile ChannelFuture channelFuture;
    volatile boolean stopping;
    private final Object channelFutureLock = new Object();
    private final ReentrantReadWriteLock stateLock = new ReentrantReadWriteLock();
    private Channel channel;
    private Protocol remote;

    NettyTransceiver() {
        this.connectTimeoutMillis = 0;
        this.bootstrap = null;
        this.remoteAddr = null;
        this.channelFuture = null;
        this.workerGroup = null;
    }

    public NettyTransceiver(InetSocketAddress addr) throws IOException {
        this(addr, 60000);
    }

    public NettyTransceiver(InetSocketAddress addr, Integer connectTimeoutMillis) throws IOException {
        this(addr, connectTimeoutMillis, null, null);
    }

    public NettyTransceiver(InetSocketAddress addr, Consumer<SocketChannel> initializer) throws IOException {
        this(addr, 60000, initializer, null);
    }

    public NettyTransceiver(InetSocketAddress addr, Integer connectTimeoutMillis, Consumer<SocketChannel> initializer) throws IOException {
        this(addr, connectTimeoutMillis, initializer, null);
    }

    public NettyTransceiver(InetSocketAddress addr, Integer connectTimeoutMillis, final Consumer<SocketChannel> initializer, Consumer<Bootstrap> bootStrapInitialzier) throws IOException {
        if (connectTimeoutMillis == null) {
            connectTimeoutMillis = 60000;
        }
        this.connectTimeoutMillis = connectTimeoutMillis;
        this.workerGroup = new NioEventLoopGroup((ThreadFactory)new NettyTransceiverThreadFactory("avro"));
        this.bootstrap = (Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().group(this.workerGroup)).channel(NioSocketChannel.class)).option(ChannelOption.SO_KEEPALIVE, (Object)true)).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)connectTimeoutMillis)).option(ChannelOption.TCP_NODELAY, (Object)true)).handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            public void initChannel(SocketChannel ch) throws Exception {
                if (initializer != null) {
                    initializer.accept(ch);
                }
                ch.pipeline().addLast("frameDecoder", (ChannelHandler)new NettyTransportCodec.NettyFrameDecoder()).addLast("frameEncoder", (ChannelHandler)new NettyTransportCodec.NettyFrameEncoder()).addLast("handler", (ChannelHandler)NettyTransceiver.this.createNettyClientAvroHandler());
            }
        });
        if (bootStrapInitialzier != null) {
            bootStrapInitialzier.accept(this.bootstrap);
        }
        this.remoteAddr = addr;
        this.stateLock.readLock().lock();
        try {
            this.getChannel();
        }
        catch (Throwable e) {
            if (this.channelFuture != null) {
                this.channelFuture.channel().close();
            }
            this.workerGroup.shutdownGracefully();
            if (e instanceof IOException) {
                throw (IOException)e;
            }
            if (e instanceof RuntimeException) {
                throw (RuntimeException)e;
            }
            throw (Error)e;
        }
        finally {
            this.stateLock.readLock().unlock();
        }
    }

    protected ChannelInboundHandler createNettyClientAvroHandler() {
        return new NettyClientAvroHandler();
    }

    private static boolean isChannelReady(Channel channel) {
        return channel != null && channel.isOpen() && channel.isActive();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Channel getChannel() throws IOException {
        block14: {
            if (!NettyTransceiver.isChannelReady(this.channel)) {
                this.stateLock.readLock().unlock();
                this.stateLock.writeLock().lock();
                try {
                    if (NettyTransceiver.isChannelReady(this.channel)) break block14;
                    Object object = this.channelFutureLock;
                    synchronized (object) {
                        if (!this.stopping) {
                            LOG.debug("Connecting to {}", (Object)this.remoteAddr);
                            this.channelFuture = this.bootstrap.connect((SocketAddress)this.remoteAddr);
                        }
                    }
                    if (this.channelFuture == null) break block14;
                    try {
                        this.channelFuture.await((long)this.connectTimeoutMillis.intValue());
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new IOException("Interrupted while connecting to " + String.valueOf(this.remoteAddr));
                    }
                    object = this.channelFutureLock;
                    synchronized (object) {
                        if (!this.channelFuture.isSuccess()) {
                            this.remote = null;
                            throw new IOException("Error connecting to " + String.valueOf(this.remoteAddr), this.channelFuture.cause());
                        }
                        this.channel = this.channelFuture.channel();
                        this.channelFuture = null;
                    }
                }
                finally {
                    this.stateLock.readLock().lock();
                    this.stateLock.writeLock().unlock();
                }
            }
        }
        return this.channel;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void disconnect(boolean awaitCompletion, boolean cancelPendingRequests, Throwable cause) {
        Channel channelToClose = null;
        ConcurrentHashMap<Integer, Callback<List<ByteBuffer>>> requestsToCancel = null;
        boolean stateReadLockHeld = this.stateLock.getReadHoldCount() != 0;
        ChannelFuture channelFutureToCancel = null;
        Iterator iterator = this.channelFutureLock;
        synchronized (iterator) {
            if (this.stopping && this.channelFuture != null) {
                channelFutureToCancel = this.channelFuture;
                this.channelFuture = null;
            }
        }
        if (channelFutureToCancel != null) {
            channelFutureToCancel.cancel(true);
        }
        if (stateReadLockHeld) {
            this.stateLock.readLock().unlock();
        }
        this.stateLock.writeLock().lock();
        try {
            if (this.channel != null) {
                if (cause != null) {
                    LOG.debug("Disconnecting from {}", (Object)this.remoteAddr, (Object)cause);
                } else {
                    LOG.debug("Disconnecting from {}", (Object)this.remoteAddr);
                }
                channelToClose = this.channel;
                this.channel = null;
                this.remote = null;
                if (cancelPendingRequests) {
                    requestsToCancel = new ConcurrentHashMap<Integer, Callback<List<ByteBuffer>>>(this.requests);
                    this.requests.clear();
                }
            }
        }
        finally {
            if (stateReadLockHeld) {
                this.stateLock.readLock().lock();
            }
            this.stateLock.writeLock().unlock();
        }
        if (requestsToCancel != null && !requestsToCancel.isEmpty()) {
            LOG.debug("Removing {} pending request(s)", (Object)requestsToCancel.size());
            for (Callback request : requestsToCancel.values()) {
                request.handleError(cause != null ? cause : new IOException(((Object)((Object)this)).getClass().getSimpleName() + " closed"));
            }
        }
        if (channelToClose != null) {
            ChannelFuture closeFuture = channelToClose.close();
            if (awaitCompletion && closeFuture != null) {
                try {
                    closeFuture.await((long)this.connectTimeoutMillis.intValue());
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    LOG.warn("Interrupted while disconnecting", (Throwable)e);
                }
            }
        }
    }

    public void lockChannel() {
    }

    public void unlockChannel() {
    }

    public void close() {
        this.close(true);
    }

    public void close(boolean awaitCompletion) {
        try {
            this.stopping = true;
            this.disconnect(awaitCompletion, true, null);
        }
        finally {
            if (this.workerGroup != null) {
                this.workerGroup.shutdownGracefully();
            }
        }
    }

    public String getRemoteName() throws IOException {
        this.stateLock.readLock().lock();
        try {
            String string = this.getChannel().remoteAddress().toString();
            return string;
        }
        finally {
            this.stateLock.readLock().unlock();
        }
    }

    public List<ByteBuffer> transceive(List<ByteBuffer> request) throws IOException {
        try {
            CallFuture transceiverFuture = new CallFuture();
            this.transceive(request, (Callback<List<ByteBuffer>>)transceiverFuture);
            return (List)transceiverFuture.get();
        }
        catch (InterruptedException | ExecutionException e) {
            LOG.debug("failed to get the response", (Throwable)e);
            return null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void transceive(List<ByteBuffer> request, Callback<List<ByteBuffer>> callback) throws IOException {
        this.stateLock.readLock().lock();
        try {
            int serial = this.serialGenerator.incrementAndGet();
            NettyTransportCodec.NettyDataPack dataPack = new NettyTransportCodec.NettyDataPack(serial, request);
            this.requests.put(serial, callback);
            this.writeDataPack(dataPack);
        }
        finally {
            this.stateLock.readLock().unlock();
        }
    }

    public void writeBuffers(List<ByteBuffer> buffers) throws IOException {
        ChannelFuture writeFuture;
        this.stateLock.readLock().lock();
        try {
            writeFuture = this.writeDataPack(new NettyTransportCodec.NettyDataPack(this.serialGenerator.incrementAndGet(), buffers));
        }
        finally {
            this.stateLock.readLock().unlock();
        }
        if (!writeFuture.isDone()) {
            try {
                writeFuture.await();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IOException("Interrupted while writing Netty data pack", e);
            }
        }
        if (!writeFuture.isSuccess()) {
            throw new IOException("Error writing buffers", writeFuture.cause());
        }
    }

    private ChannelFuture writeDataPack(NettyTransportCodec.NettyDataPack dataPack) throws IOException {
        return this.getChannel().writeAndFlush((Object)dataPack);
    }

    public List<ByteBuffer> readBuffers() throws IOException {
        throw new UnsupportedOperationException();
    }

    public Protocol getRemote() {
        this.stateLock.readLock().lock();
        try {
            Protocol protocol = this.remote;
            return protocol;
        }
        finally {
            this.stateLock.readLock().unlock();
        }
    }

    public boolean isConnected() {
        this.stateLock.readLock().lock();
        try {
            boolean bl = this.remote != null;
            return bl;
        }
        finally {
            this.stateLock.readLock().unlock();
        }
    }

    public void setRemote(Protocol protocol) {
        this.stateLock.writeLock().lock();
        try {
            this.remote = protocol;
        }
        finally {
            this.stateLock.writeLock().unlock();
        }
    }

    protected static class NettyTransceiverThreadFactory
    implements ThreadFactory {
        private final AtomicInteger threadId = new AtomicInteger(0);
        private final String prefix;

        public NettyTransceiverThreadFactory(String prefix) {
            this.prefix = prefix;
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName(this.prefix + " " + this.threadId.incrementAndGet());
            return thread;
        }
    }

    protected class NettyClientAvroHandler
    extends SimpleChannelInboundHandler<NettyTransportCodec.NettyDataPack> {
        protected NettyClientAvroHandler() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void channelRead0(ChannelHandlerContext ctx, NettyTransportCodec.NettyDataPack dataPack) throws Exception {
            Callback<List<ByteBuffer>> callback = NettyTransceiver.this.requests.get(dataPack.getSerial());
            if (callback == null) {
                throw new RuntimeException("Missing previous call info");
            }
            try {
                callback.handleResult(dataPack.getDatas());
            }
            finally {
                NettyTransceiver.this.requests.remove(dataPack.getSerial());
            }
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
            NettyTransceiver.this.disconnect(false, true, e);
        }

        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            if (!ctx.channel().isOpen()) {
                LOG.info("Connection to {} disconnected.", (Object)ctx.channel().remoteAddress());
                NettyTransceiver.this.disconnect(false, true, null);
            }
            super.channelInactive(ctx);
        }
    }

    protected static class WriteFutureListener
    implements ChannelFutureListener {
        protected final Callback<List<ByteBuffer>> callback;

        public WriteFutureListener(Callback<List<ByteBuffer>> callback) {
            this.callback = callback;
        }

        public void operationComplete(ChannelFuture future) throws Exception {
            if (!future.isSuccess() && this.callback != null) {
                this.callback.handleError((Throwable)new IOException("Error writing buffers", future.cause()));
            }
        }
    }
}

