/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.disk.iomanager;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader;
import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter;
import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback;
import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileReader;
import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileSegmentReader;
import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBulkBlockReader;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriterWithCallback;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileSegmentReader;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.BulkBlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.FileSegment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.ReadRequest;
import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback;
import org.apache.flink.runtime.io.disk.iomanager.RequestQueue;
import org.apache.flink.runtime.io.disk.iomanager.WriteRequest;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;

public class IOManagerAsync
extends IOManager
implements Thread.UncaughtExceptionHandler {
    private final WriterThread[] writers;
    private final ReaderThread[] readers;
    private final AtomicBoolean isShutdown = new AtomicBoolean();
    private final Thread shutdownHook;

    public IOManagerAsync() {
        this(EnvironmentInformation.getTemporaryFileDirectory());
    }

    public IOManagerAsync(String tempDir) {
        this(new String[]{tempDir});
    }

    public IOManagerAsync(String[] tempDirs) {
        super(tempDirs);
        Thread t;
        int i;
        this.writers = new WriterThread[tempDirs.length];
        for (i = 0; i < this.writers.length; ++i) {
            t = new WriterThread();
            this.writers[i] = t;
            t.setName("IOManager writer thread #" + (i + 1));
            t.setDaemon(true);
            t.setUncaughtExceptionHandler(this);
            t.start();
        }
        this.readers = new ReaderThread[tempDirs.length];
        for (i = 0; i < this.readers.length; ++i) {
            t = new ReaderThread();
            this.readers[i] = t;
            t.setName("IOManager reader thread #" + (i + 1));
            t.setDaemon(true);
            t.setUncaughtExceptionHandler(this);
            t.start();
        }
        this.shutdownHook = ShutdownHookUtil.addShutdownHook(this::close, this.getClass().getSimpleName(), LOG);
    }

    @Override
    public void close() throws Exception {
        if (!this.isShutdown.compareAndSet(false, true)) {
            return;
        }
        ShutdownHookUtil.removeShutdownHook(this.shutdownHook, this.getClass().getSimpleName(), LOG);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Shutting down I/O manager.");
        }
        ArrayList<AutoCloseable> closeables = new ArrayList<AutoCloseable>(this.writers.length + this.readers.length + 2);
        for (WriterThread writerThread : this.writers) {
            closeables.add(IOManagerAsync.getWriterThreadCloser(writerThread));
        }
        for (Thread thread : this.readers) {
            closeables.add(IOManagerAsync.getReaderThreadCloser((ReaderThread)thread));
        }
        closeables.add(() -> {
            try {
                for (WriterThread writerThread : this.writers) {
                    writerThread.join();
                }
                for (Thread thread : this.readers) {
                    thread.join();
                }
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        });
        closeables.add(() -> super.close());
        IOUtils.closeAll(closeables);
    }

    private static AutoCloseable getWriterThreadCloser(WriterThread thread) {
        return () -> {
            try {
                thread.shutdown();
            }
            catch (Throwable t) {
                throw new IOException("Error while shutting down IO Manager writer thread.", t);
            }
        };
    }

    private static AutoCloseable getReaderThreadCloser(ReaderThread thread) {
        return () -> {
            try {
                thread.shutdown();
            }
            catch (Throwable t) {
                throw new IOException("Error while shutting down IO Manager reader thread.", t);
            }
        };
    }

    @Override
    public void uncaughtException(Thread t, Throwable e) {
        LOG.error("IO Thread '" + t.getName() + "' terminated due to an exception. Shutting down I/O Manager.", e);
        try {
            this.close();
        }
        catch (Exception ex) {
            LOG.warn("IOManagerAsync did not shut down properly.", (Throwable)ex);
        }
    }

    @Override
    public BlockChannelWriter<MemorySegment> createBlockChannelWriter(FileIOChannel.ID channelID, LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException {
        Preconditions.checkState(!this.isShutdown.get(), "I/O-Manager is shut down.");
        return new AsynchronousBlockWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue, returnQueue);
    }

    @Override
    public BlockChannelWriterWithCallback<MemorySegment> createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback<MemorySegment> callback) throws IOException {
        Preconditions.checkState(!this.isShutdown.get(), "I/O-Manager is shut down.");
        return new AsynchronousBlockWriterWithCallback(channelID, this.writers[channelID.getThreadNum()].requestQueue, callback);
    }

    @Override
    public BlockChannelReader<MemorySegment> createBlockChannelReader(FileIOChannel.ID channelID, LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException {
        Preconditions.checkState(!this.isShutdown.get(), "I/O-Manager is shut down.");
        return new AsynchronousBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, returnQueue);
    }

    @Override
    public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) throws IOException {
        Preconditions.checkState(!this.isShutdown.get(), "I/O-Manager is shut down.");
        return new AsynchronousBufferFileWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue);
    }

    @Override
    public BufferFileReader createBufferFileReader(FileIOChannel.ID channelID, RequestDoneCallback<Buffer> callback) throws IOException {
        Preconditions.checkState(!this.isShutdown.get(), "I/O-Manager is shut down.");
        return new AsynchronousBufferFileReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, callback);
    }

    @Override
    public BufferFileSegmentReader createBufferFileSegmentReader(FileIOChannel.ID channelID, RequestDoneCallback<FileSegment> callback) throws IOException {
        Preconditions.checkState(!this.isShutdown.get(), "I/O-Manager is shut down.");
        return new AsynchronousBufferFileSegmentReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, callback);
    }

    @Override
    public BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID, List<MemorySegment> targetSegments, int numBlocks) throws IOException {
        Preconditions.checkState(!this.isShutdown.get(), "I/O-Manager is shut down.");
        return new AsynchronousBulkBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, targetSegments, numBlocks);
    }

    RequestQueue<ReadRequest> getReadRequestQueue(FileIOChannel.ID channelID) {
        return this.readers[channelID.getThreadNum()].requestQueue;
    }

    RequestQueue<WriteRequest> getWriteRequestQueue(FileIOChannel.ID channelID) {
        return this.writers[channelID.getThreadNum()].requestQueue;
    }

    private static final class WriterThread
    extends Thread {
        protected final RequestQueue<WriteRequest> requestQueue = new RequestQueue();
        private volatile boolean alive = true;

        protected WriterThread() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void shutdown() {
            WriterThread writerThread = this;
            synchronized (writerThread) {
                if (this.alive) {
                    this.alive = false;
                    this.requestQueue.close();
                    this.interrupt();
                }
                try {
                    this.join(1000L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                IOException ioex = new IOException("IO-Manager has been closed.");
                while (!this.requestQueue.isEmpty()) {
                    WriteRequest request = (WriteRequest)this.requestQueue.poll();
                    if (request == null) continue;
                    try {
                        request.requestDone(ioex);
                    }
                    catch (Throwable t) {
                        LOG.error("The handler of the request complete callback threw an exception" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
                    }
                }
            }
        }

        @Override
        public void run() {
            while (this.alive) {
                WriteRequest request = null;
                while (this.alive && request == null) {
                    try {
                        request = (WriteRequest)this.requestQueue.take();
                    }
                    catch (InterruptedException e) {
                        if (!this.alive) {
                            return;
                        }
                        LOG.warn(Thread.currentThread() + " was interrupted without shutdown.");
                    }
                }
                IOException ioex = null;
                try {
                    request.write();
                }
                catch (IOException e) {
                    ioex = e;
                }
                catch (Throwable t) {
                    ioex = new IOException("The buffer could not be written: " + t.getMessage(), t);
                    LOG.error("I/O writing thread encountered an error" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
                }
                try {
                    request.requestDone(ioex);
                }
                catch (Throwable t) {
                    LOG.error("The handler of the request-complete-callback threw an exception" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
                }
            }
        }
    }

    private static final class ReaderThread
    extends Thread {
        protected final RequestQueue<ReadRequest> requestQueue = new RequestQueue();
        private volatile boolean alive = true;

        protected ReaderThread() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void shutdown() {
            ReaderThread readerThread = this;
            synchronized (readerThread) {
                if (this.alive) {
                    this.alive = false;
                    this.requestQueue.close();
                    this.interrupt();
                }
                try {
                    this.join(1000L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                IOException ioex = new IOException("IO-Manager has been closed.");
                while (!this.requestQueue.isEmpty()) {
                    ReadRequest request = (ReadRequest)this.requestQueue.poll();
                    if (request == null) continue;
                    try {
                        request.requestDone(ioex);
                    }
                    catch (Throwable t) {
                        LOG.error("The handler of the request complete callback threw an exception" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
                    }
                }
            }
        }

        @Override
        public void run() {
            while (this.alive) {
                ReadRequest request = null;
                while (this.alive && request == null) {
                    try {
                        request = (ReadRequest)this.requestQueue.take();
                    }
                    catch (InterruptedException e) {
                        if (!this.alive) {
                            return;
                        }
                        LOG.warn(Thread.currentThread() + " was interrupted without shutdown.");
                    }
                }
                IOException ioex = null;
                try {
                    request.read();
                }
                catch (IOException e) {
                    ioex = e;
                }
                catch (Throwable t) {
                    ioex = new IOException("The buffer could not be read: " + t.getMessage(), t);
                    LOG.error("I/O reading thread encountered an error" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
                }
                try {
                    request.requestDone(ioex);
                }
                catch (Throwable t) {
                    LOG.error("The handler of the request-complete-callback threw an exception" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
                }
            }
        }
    }
}

