/*
 * Decompiled with CFR 0.152.
 */
package com.github.mizosoft.methanol;

import com.github.mizosoft.methanol.internal.Utils;
import com.github.mizosoft.methanol.internal.Validate;
import com.github.mizosoft.methanol.internal.flow.AbstractSubscription;
import com.github.mizosoft.methanol.internal.flow.FlowSupport;
import java.io.Flushable;
import java.io.IOException;
import java.io.OutputStream;
import java.net.http.HttpRequest;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.Channels;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.WritableByteChannel;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

public final class WritableBodyPublisher
implements HttpRequest.BodyPublisher,
Flushable,
AutoCloseable {
    private static final int DEFAULT_SINK_BUFFER_SIZE = 8192;
    private static final String SINK_BUFFER_SIZE_PROP = "com.github.mizosoft.methanol.WritableBodyPublisher.sinkBufferSize";
    private static final int SINK_BUFFER_SIZE = WritableBodyPublisher.getSinkBufferSize();
    private static final ByteBuffer CLOSED = ByteBuffer.allocate(0);
    private final AtomicBoolean subscribed = new AtomicBoolean();
    private final ConcurrentLinkedQueue<ByteBuffer> pipe = new ConcurrentLinkedQueue();
    private volatile @Nullable SubscriptionImpl downstreamSubscription;
    private volatile @MonotonicNonNull Throwable closeError;
    private volatile boolean closed;
    private final Object writeLock = new Object();
    private @MonotonicNonNull WritableByteChannel sinkChannel;
    private @MonotonicNonNull OutputStream sinkOutputStream;
    private @Nullable ByteBuffer sinkBuffer;

    private WritableBodyPublisher() {
    }

    public WritableByteChannel byteChannel() {
        WritableByteChannel channel = this.sinkChannel;
        if (channel == null) {
            this.sinkChannel = channel = new SinkChannel();
        }
        return channel;
    }

    public OutputStream outputStream() {
        OutputStream out = this.sinkOutputStream;
        if (out == null) {
            this.sinkOutputStream = out = new SinkChannelAdapter(this.byteChannel());
        }
        return out;
    }

    public void closeExceptionally(Throwable error) {
        Objects.requireNonNull(error);
        if (!this.closed) {
            this.closed = true;
            this.closeError = error;
            SubscriptionImpl subscription = this.downstreamSubscription;
            if (subscription != null) {
                subscription.signalError(error);
            }
        }
    }

    @Override
    public void close() {
        if (!this.closed) {
            this.closed = true;
            this.flushInternal();
            this.pipe.offer(CLOSED);
            this.signalDownstream(true);
        }
    }

    @Override
    public void flush() {
        Validate.requireState(!this.closed, "closed");
        if (this.flushInternal()) {
            this.signalDownstream(false);
        }
    }

    @Override
    public long contentLength() {
        return -1L;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
        Objects.requireNonNull(subscriber);
        SubscriptionImpl subscription = new SubscriptionImpl(subscriber);
        if (this.subscribed.compareAndSet(false, true)) {
            this.downstreamSubscription = subscription;
            Throwable e = this.closeError;
            if (e != null) {
                subscription.signalError(e);
            } else {
                subscription.signal(true);
            }
        } else {
            IllegalStateException error = new IllegalStateException("already subscribed, multiple subscribers not supported");
            try {
                subscriber.onSubscribe(FlowSupport.NOOP_SUBSCRIPTION);
            }
            catch (Throwable t2) {
                error.addSuppressed(t2);
            }
            finally {
                subscriber.onError(error);
            }
        }
    }

    private void signalDownstream(boolean force) {
        SubscriptionImpl subscription = this.downstreamSubscription;
        if (subscription != null) {
            subscription.signal(force);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean flushInternal() {
        boolean signalsAvailable = false;
        Object object = this.writeLock;
        synchronized (object) {
            ByteBuffer sink2 = this.sinkBuffer;
            if (sink2 != null && sink2.position() > 0) {
                this.sinkBuffer = sink2.hasRemaining() ? sink2.slice() : null;
                this.pipe.offer(sink2.flip().asReadOnlyBuffer());
                signalsAvailable = true;
            }
        }
        return signalsAvailable;
    }

    private static int getSinkBufferSize() {
        int size = Integer.getInteger(SINK_BUFFER_SIZE_PROP, 8192);
        if (size <= 0) {
            return 8192;
        }
        return size;
    }

    public static WritableBodyPublisher create() {
        return new WritableBodyPublisher();
    }

    private final class SinkChannel
    implements WritableByteChannel {
        SinkChannel() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public int write(ByteBuffer src) throws ClosedChannelException {
            Objects.requireNonNull(src);
            if (WritableBodyPublisher.this.closed) {
                throw new ClosedChannelException();
            }
            if (!src.hasRemaining()) {
                return 0;
            }
            int written = 0;
            boolean signalsAvailable = false;
            Object object = WritableBodyPublisher.this.writeLock;
            synchronized (object) {
                ByteBuffer sink2 = WritableBodyPublisher.this.sinkBuffer;
                do {
                    if (sink2 == null) {
                        sink2 = ByteBuffer.allocate(SINK_BUFFER_SIZE);
                    }
                    written += Utils.copyRemaining(src, sink2);
                    if (sink2.hasRemaining()) continue;
                    WritableBodyPublisher.this.pipe.offer(sink2.flip().asReadOnlyBuffer());
                    signalsAvailable = true;
                    sink2 = null;
                } while (src.hasRemaining() && this.isOpen());
                if (WritableBodyPublisher.this.closed) {
                    WritableBodyPublisher.this.sinkBuffer = null;
                    if (written <= 0) {
                        throw new AsynchronousCloseException();
                    }
                } else {
                    WritableBodyPublisher.this.sinkBuffer = sink2;
                }
            }
            if (signalsAvailable) {
                WritableBodyPublisher.this.signalDownstream(false);
            }
            return written;
        }

        @Override
        public boolean isOpen() {
            return !WritableBodyPublisher.this.closed;
        }

        @Override
        public void close() {
            WritableBodyPublisher.this.close();
        }
    }

    private final class SinkChannelAdapter
    extends OutputStream {
        private final OutputStream out;

        SinkChannelAdapter(WritableByteChannel channel) {
            this.out = Channels.newOutputStream(channel);
        }

        @Override
        public void write(int b) throws IOException {
            this.out.write(b);
        }

        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            this.out.write(b, off, len);
        }

        @Override
        public void flush() throws IOException {
            try {
                WritableBodyPublisher.this.flush();
            }
            catch (IllegalStateException e) {
                throw new IOException("closed");
            }
        }

        @Override
        public void close() {
            WritableBodyPublisher.this.close();
        }
    }

    private final class SubscriptionImpl
    extends AbstractSubscription<ByteBuffer> {
        private @Nullable ByteBuffer currentBatch;

        SubscriptionImpl(Flow.Subscriber<? super ByteBuffer> downstream) {
            super(downstream, FlowSupport.SYNC_EXECUTOR);
        }

        @Override
        protected long emit(Flow.Subscriber<? super ByteBuffer> downstream, long emit) {
            ByteBuffer batch = this.currentBatch;
            this.currentBatch = null;
            if (batch == null) {
                batch = WritableBodyPublisher.this.pipe.poll();
            }
            long submitted = 0L;
            while (true) {
                if (batch == CLOSED) {
                    this.cancelOnComplete(downstream);
                    return 0L;
                }
                if (submitted >= emit || batch == null) {
                    this.currentBatch = batch;
                    return submitted;
                }
                if (!this.submitOnNext(downstream, batch)) break;
                ++submitted;
                batch = WritableBodyPublisher.this.pipe.poll();
            }
            return 0L;
        }

        @Override
        protected void abort(boolean flowInterrupted) {
            WritableBodyPublisher.this.downstreamSubscription = null;
            WritableBodyPublisher.this.pipe.clear();
        }
    }
}

