/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.BufferWithChannel;
import org.apache.flink.runtime.io.network.partition.DataBuffer;
import org.apache.flink.util.Preconditions;

@NotThreadSafe
public class SortBasedDataBuffer
implements DataBuffer {
    private static final int INDEX_ENTRY_SIZE = 16;
    private final BufferPool bufferPool;
    private final ArrayList<MemorySegment> segments = new ArrayList();
    private final long[] firstIndexEntryAddresses;
    private final long[] lastIndexEntryAddresses;
    private final int bufferSize;
    private final int numGuaranteedBuffers;
    private long numTotalBytes;
    private long numTotalRecords;
    private long numTotalBytesRead;
    private boolean isFull;
    private boolean isFinished;
    private boolean isReleased;
    private int writeSegmentIndex;
    private int writeSegmentOffset;
    private final int[] subpartitionReadOrder;
    private long readIndexEntryAddress;
    private int recordRemainingBytes;
    private int readOrderIndex = -1;

    public SortBasedDataBuffer(BufferPool bufferPool, int numSubpartitions, int bufferSize, int numGuaranteedBuffers, @Nullable int[] customReadOrder) {
        Preconditions.checkArgument(bufferSize > 16, "Buffer size is too small.");
        Preconditions.checkArgument(numGuaranteedBuffers > 0, "No guaranteed buffers for sort.");
        this.bufferPool = Preconditions.checkNotNull(bufferPool);
        this.bufferSize = bufferSize;
        this.numGuaranteedBuffers = numGuaranteedBuffers;
        this.firstIndexEntryAddresses = new long[numSubpartitions];
        this.lastIndexEntryAddresses = new long[numSubpartitions];
        Arrays.fill(this.firstIndexEntryAddresses, -1L);
        Arrays.fill(this.lastIndexEntryAddresses, -1L);
        this.subpartitionReadOrder = new int[numSubpartitions];
        if (customReadOrder != null) {
            Preconditions.checkArgument(customReadOrder.length == numSubpartitions, "Illegal data read order.");
            System.arraycopy(customReadOrder, 0, this.subpartitionReadOrder, 0, numSubpartitions);
        } else {
            for (int channel = 0; channel < numSubpartitions; ++channel) {
                this.subpartitionReadOrder[channel] = channel;
            }
        }
    }

    @Override
    public boolean append(ByteBuffer source, int targetChannel, Buffer.DataType dataType) throws IOException {
        Preconditions.checkArgument(source.hasRemaining(), "Cannot append empty data.");
        Preconditions.checkState(!this.isFull, "Sort buffer is already full.");
        Preconditions.checkState(!this.isFinished, "Sort buffer is already finished.");
        Preconditions.checkState(!this.isReleased, "Sort buffer is already released.");
        int totalBytes = source.remaining();
        if (!this.allocateBuffersForRecord(totalBytes)) {
            this.isFull = true;
            if (this.hasRemaining()) {
                this.updateReadChannelAndIndexEntryAddress();
            }
            return true;
        }
        this.writeIndex(targetChannel, totalBytes, dataType);
        this.writeRecord(source);
        ++this.numTotalRecords;
        this.numTotalBytes += (long)totalBytes;
        return false;
    }

    private void writeIndex(int channelIndex, int numRecordBytes, Buffer.DataType dataType) {
        MemorySegment segment = this.segments.get(this.writeSegmentIndex);
        segment.putLong(this.writeSegmentOffset, (long)numRecordBytes << 32 | (long)dataType.ordinal());
        long indexEntryAddress = (long)this.writeSegmentIndex << 32 | (long)this.writeSegmentOffset;
        long lastIndexEntryAddress = this.lastIndexEntryAddresses[channelIndex];
        this.lastIndexEntryAddresses[channelIndex] = indexEntryAddress;
        if (lastIndexEntryAddress >= 0L) {
            segment = this.segments.get(this.getSegmentIndexFromPointer(lastIndexEntryAddress));
            segment.putLong(this.getSegmentOffsetFromPointer(lastIndexEntryAddress) + 8, indexEntryAddress);
        } else {
            this.firstIndexEntryAddresses[channelIndex] = indexEntryAddress;
        }
        this.updateWriteSegmentIndexAndOffset(16);
    }

    private void writeRecord(ByteBuffer source) {
        while (source.hasRemaining()) {
            MemorySegment segment = this.segments.get(this.writeSegmentIndex);
            int toCopy = Math.min(this.bufferSize - this.writeSegmentOffset, source.remaining());
            segment.put(this.writeSegmentOffset, source, toCopy);
            this.updateWriteSegmentIndexAndOffset(toCopy);
        }
    }

    private boolean allocateBuffersForRecord(int numRecordBytes) throws IOException {
        int availableBytes;
        int numBytesRequired = 16 + numRecordBytes;
        int n = availableBytes = this.writeSegmentIndex == this.segments.size() ? 0 : this.bufferSize - this.writeSegmentOffset;
        if (availableBytes >= numBytesRequired) {
            return true;
        }
        if (availableBytes < 16) {
            this.updateWriteSegmentIndexAndOffset(availableBytes);
            availableBytes = 0;
        }
        do {
            MemorySegment segment;
            if ((segment = this.requestBufferFromPool()) == null) {
                return false;
            }
            this.addBuffer(segment);
        } while ((availableBytes += this.bufferSize) < numBytesRequired);
        return true;
    }

    private void addBuffer(MemorySegment segment) {
        if (segment.size() != this.bufferSize) {
            this.bufferPool.recycle(segment);
            throw new IllegalStateException("Illegal memory segment size.");
        }
        if (this.isReleased) {
            this.bufferPool.recycle(segment);
            throw new IllegalStateException("Sort buffer is already released.");
        }
        this.segments.add(segment);
    }

    private MemorySegment requestBufferFromPool() throws IOException {
        try {
            if (this.segments.size() < this.numGuaranteedBuffers) {
                return this.bufferPool.requestMemorySegmentBlocking();
            }
        }
        catch (InterruptedException e) {
            throw new IOException("Interrupted while requesting buffer.");
        }
        return this.bufferPool.requestMemorySegment();
    }

    private void updateWriteSegmentIndexAndOffset(int numBytes) {
        this.writeSegmentOffset += numBytes;
        if (this.writeSegmentOffset == this.bufferSize) {
            ++this.writeSegmentIndex;
            this.writeSegmentOffset = 0;
        }
    }

    @Override
    public BufferWithChannel getNextBuffer(MemorySegment transitBuffer) {
        Preconditions.checkState(this.isFull, "Sort buffer is not ready to be read.");
        Preconditions.checkState(!this.isReleased, "Sort buffer is already released.");
        if (!this.hasRemaining()) {
            return null;
        }
        int numBytesCopied = 0;
        Buffer.DataType bufferDataType = Buffer.DataType.DATA_BUFFER;
        int channelIndex = this.subpartitionReadOrder[this.readOrderIndex];
        do {
            int sourceSegmentIndex = this.getSegmentIndexFromPointer(this.readIndexEntryAddress);
            int sourceSegmentOffset = this.getSegmentOffsetFromPointer(this.readIndexEntryAddress);
            MemorySegment sourceSegment = this.segments.get(sourceSegmentIndex);
            long lengthAndDataType = sourceSegment.getLong(sourceSegmentOffset);
            int length = this.getSegmentIndexFromPointer(lengthAndDataType);
            Buffer.DataType dataType = Buffer.DataType.values()[this.getSegmentOffsetFromPointer(lengthAndDataType)];
            if (dataType.isEvent() && numBytesCopied > 0) break;
            bufferDataType = dataType;
            long nextReadIndexEntryAddress = sourceSegment.getLong(sourceSegmentOffset + 8);
            sourceSegmentOffset += 16;
            if (bufferDataType.isEvent() && transitBuffer.size() < length) {
                transitBuffer = MemorySegmentFactory.allocateUnpooledSegment(length);
            }
            numBytesCopied += this.copyRecordOrEvent(transitBuffer, numBytesCopied, sourceSegmentIndex, sourceSegmentOffset, length);
            if (this.recordRemainingBytes != 0) continue;
            if (this.readIndexEntryAddress == this.lastIndexEntryAddresses[channelIndex]) {
                this.updateReadChannelAndIndexEntryAddress();
                break;
            }
            this.readIndexEntryAddress = nextReadIndexEntryAddress;
        } while (numBytesCopied < transitBuffer.size() && bufferDataType.isBuffer());
        this.numTotalBytesRead += (long)numBytesCopied;
        NetworkBuffer buffer = new NetworkBuffer(transitBuffer, buf -> {}, bufferDataType, numBytesCopied);
        return new BufferWithChannel(buffer, channelIndex);
    }

    private int copyRecordOrEvent(MemorySegment targetSegment, int targetSegmentOffset, int sourceSegmentIndex, int sourceSegmentOffset, int recordLength) {
        int numBytes;
        if (this.recordRemainingBytes > 0) {
            long position = (long)sourceSegmentOffset + (long)(recordLength - this.recordRemainingBytes);
            sourceSegmentIndex = (int)((long)sourceSegmentIndex + position / (long)this.bufferSize);
            sourceSegmentOffset = (int)(position % (long)this.bufferSize);
        } else {
            this.recordRemainingBytes = recordLength;
        }
        int targetSegmentSize = targetSegment.size();
        int numBytesToCopy = Math.min(targetSegmentSize - targetSegmentOffset, this.recordRemainingBytes);
        do {
            if (sourceSegmentOffset == this.bufferSize) {
                ++sourceSegmentIndex;
                sourceSegmentOffset = 0;
            }
            int sourceRemainingBytes = Math.min(this.bufferSize - sourceSegmentOffset, this.recordRemainingBytes);
            numBytes = Math.min(targetSegmentSize - targetSegmentOffset, sourceRemainingBytes);
            MemorySegment sourceSegment = this.segments.get(sourceSegmentIndex);
            sourceSegment.copyTo(sourceSegmentOffset, targetSegment, targetSegmentOffset, numBytes);
            this.recordRemainingBytes -= numBytes;
            sourceSegmentOffset += numBytes;
        } while (this.recordRemainingBytes > 0 && (targetSegmentOffset += numBytes) < targetSegmentSize);
        return numBytesToCopy;
    }

    private void updateReadChannelAndIndexEntryAddress() {
        int channelIndex;
        while (++this.readOrderIndex < this.firstIndexEntryAddresses.length && (this.readIndexEntryAddress = this.firstIndexEntryAddresses[channelIndex = this.subpartitionReadOrder[this.readOrderIndex]]) < 0L) {
        }
    }

    private int getSegmentIndexFromPointer(long value) {
        return (int)(value >>> 32);
    }

    private int getSegmentOffsetFromPointer(long value) {
        return (int)value;
    }

    @Override
    public long numTotalRecords() {
        return this.numTotalRecords;
    }

    @Override
    public long numTotalBytes() {
        return this.numTotalBytes;
    }

    @Override
    public boolean hasRemaining() {
        return this.numTotalBytesRead < this.numTotalBytes;
    }

    @Override
    public void reset() {
        Preconditions.checkState(!this.isFinished, "Sort buffer has been finished.");
        Preconditions.checkState(!this.isReleased, "Sort buffer has been released.");
        Preconditions.checkState(!this.hasRemaining(), "Still has remaining data.");
        for (MemorySegment segment : this.segments) {
            this.bufferPool.recycle(segment);
        }
        this.segments.clear();
        Arrays.fill(this.firstIndexEntryAddresses, -1L);
        Arrays.fill(this.lastIndexEntryAddresses, -1L);
        this.isFull = false;
        this.writeSegmentIndex = 0;
        this.writeSegmentOffset = 0;
        this.readIndexEntryAddress = 0L;
        this.recordRemainingBytes = 0;
        this.readOrderIndex = -1;
    }

    @Override
    public void finish() {
        Preconditions.checkState(!this.isFull, "DataBuffer must not be full.");
        Preconditions.checkState(!this.isFinished, "DataBuffer is already finished.");
        this.isFinished = true;
        this.isFull = true;
        this.updateReadChannelAndIndexEntryAddress();
    }

    @Override
    public boolean isFinished() {
        return this.isFinished;
    }

    @Override
    public void release() {
        if (this.isReleased) {
            return;
        }
        this.isReleased = true;
        for (MemorySegment segment : this.segments) {
            this.bufferPool.recycle(segment);
        }
        this.segments.clear();
    }

    @Override
    public boolean isReleased() {
        return this.isReleased;
    }
}

