/*
 * Decompiled with CFR 0.152.
 */
package com.github.mizosoft.methanol.internal.cache;

import com.github.mizosoft.methanol.internal.Validate;
import com.github.mizosoft.methanol.internal.cache.Store;
import com.github.mizosoft.methanol.internal.flow.FlowSupport;
import com.github.mizosoft.methanol.internal.flow.Upstream;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

public final class CacheWritingPublisher
implements Flow.Publisher<List<ByteBuffer>> {
    private static final System.Logger logger = System.getLogger(CacheWritingPublisher.class.getName());
    private static final boolean DEFAULT_PROPAGATE_CANCELLATION = Boolean.getBoolean("com.github.mizosoft.methanol.internal.cache.CacheWritingPublisher.propagateCancellation");
    private final Flow.Publisher<List<ByteBuffer>> upstream;
    private final Store.Editor editor;
    private final Listener listener;
    private final AtomicBoolean subscribed = new AtomicBoolean();
    private final boolean propagateCancellation;

    public CacheWritingPublisher(Flow.Publisher<List<ByteBuffer>> upstream, Store.Editor editor) {
        this(upstream, editor, DisabledListener.INSTANCE, DEFAULT_PROPAGATE_CANCELLATION);
    }

    public CacheWritingPublisher(Flow.Publisher<List<ByteBuffer>> upstream, Store.Editor editor, Listener listener) {
        this(upstream, editor, listener, DEFAULT_PROPAGATE_CANCELLATION);
    }

    public CacheWritingPublisher(Flow.Publisher<List<ByteBuffer>> upstream, Store.Editor editor, Listener listener, boolean propagateCancellation) {
        this.upstream = Objects.requireNonNull(upstream);
        this.editor = Objects.requireNonNull(editor);
        this.listener = Objects.requireNonNull(listener);
        this.propagateCancellation = propagateCancellation;
    }

    @Override
    public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
        Objects.requireNonNull(subscriber);
        if (this.subscribed.compareAndSet(false, true)) {
            this.upstream.subscribe(new CacheWritingSubscriber(subscriber, this.editor, this.listener, this.propagateCancellation));
        } else {
            FlowSupport.refuse(subscriber, FlowSupport.multipleSubscribersToUnicast());
        }
    }

    private static enum DisabledListener implements Listener
    {
        INSTANCE;


        @Override
        public void onWriteSuccess() {
        }

        @Override
        public void onWriteFailure(Throwable unused) {
        }
    }

    public static interface Listener {
        public void onWriteSuccess();

        public void onWriteFailure(Throwable var1);

        default public Listener guarded() {
            return new Listener(){

                @Override
                public void onWriteSuccess() {
                    try {
                        this.onWriteSuccess();
                    }
                    catch (Throwable e) {
                        logger.log(System.Logger.Level.WARNING, "exception thrown by Listener::onWriteSuccess", e);
                    }
                }

                @Override
                public void onWriteFailure(Throwable error) {
                    try {
                        this.onWriteFailure(error);
                    }
                    catch (Throwable e) {
                        logger.log(System.Logger.Level.WARNING, "exception thrown by Listener::onWriteFailure", e);
                    }
                }
            };
        }

        public static Listener disabled() {
            return DisabledListener.INSTANCE;
        }
    }

    private static final class CacheWritingSubscriber
    implements Flow.Subscriber<List<ByteBuffer>> {
        private final CacheWritingSubscription downstreamSubscription;

        CacheWritingSubscriber(Flow.Subscriber<? super List<ByteBuffer>> downstream, Store.Editor editor, Listener listener, boolean propagateCancellation) {
            this.downstreamSubscription = new CacheWritingSubscription(downstream, editor, listener, propagateCancellation);
        }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            Objects.requireNonNull(subscription);
            this.downstreamSubscription.onSubscribe(subscription);
        }

        @Override
        public void onNext(List<ByteBuffer> item) {
            Objects.requireNonNull(item);
            this.downstreamSubscription.onNext(item);
        }

        @Override
        public void onError(Throwable throwable) {
            Objects.requireNonNull(throwable);
            this.downstreamSubscription.onError(throwable);
        }

        @Override
        public void onComplete() {
            this.downstreamSubscription.onComplete();
        }
    }

    static final class CacheWritingSubscription
    implements Flow.Subscription {
        private static final VarHandle DOWNSTREAM;
        private static final VarHandle STATE;
        private static final VarHandle POSITION;
        private volatile  @Nullable Flow.Subscriber<? super List<ByteBuffer>> downstream;
        private final Store.Editor editor;
        private final Listener listener;
        private final boolean propagateCancellation;
        private final Upstream upstream = new Upstream();
        private final ConcurrentLinkedQueue<ByteBuffer> writeQueue = new ConcurrentLinkedQueue();
        private volatile WritingState state = WritingState.IDLE;
        private volatile long position;
        private volatile boolean receivedBodyCompletion;

        CacheWritingSubscription( @NonNull Flow.Subscriber<? super List<ByteBuffer>> downstream, Store.Editor editor, Listener listener, boolean propagateCancellation) {
            this.downstream = downstream;
            this.editor = editor;
            this.listener = listener.guarded();
            this.propagateCancellation = propagateCancellation;
        }

        @Override
        public void request(long n) {
            if (this.downstream != null) {
                assert (this.upstream.isSet());
                this.upstream.request(n);
            }
        }

        @Override
        public void cancel() {
            this.getAndClearDownstream();
            if (this.state == WritingState.DISPOSED || this.propagateCancellation) {
                this.upstream.cancel();
            } else {
                this.upstream.request(Long.MAX_VALUE);
            }
        }

        void onSubscribe(Flow.Subscription subscription) {
            if (this.upstream.setOrCancel(subscription)) {
                Validate.castNonNull(this.downstream).onSubscribe(this);
            }
        }

        void onNext(List<ByteBuffer> buffers) {
            Flow.Subscriber<? super List<ByteBuffer>> subscriber;
            if (this.state != WritingState.DISPOSED) {
                List duplicateBuffers = buffers.stream().map(ByteBuffer::duplicate).collect(Collectors.toUnmodifiableList());
                this.writeQueue.addAll(duplicateBuffers);
                this.tryScheduleWrite(false);
            }
            if ((subscriber = this.downstream) != null) {
                subscriber.onNext(buffers);
            }
        }

        void onError(Throwable error) {
            this.upstream.clear();
            this.writeQueue.clear();
            try {
                this.discardEdit(null);
            }
            finally {
                Flow.Subscriber<? super List<ByteBuffer>> subscriber = this.getAndClearDownstream();
                if (subscriber != null) {
                    subscriber.onError(error);
                } else {
                    logger.log(System.Logger.Level.WARNING, "upstream error during background cache write", error);
                }
            }
        }

        void onComplete() {
            this.upstream.clear();
            this.receivedBodyCompletion = true;
            try {
                this.tryScheduleWrite(false);
            }
            finally {
                Flow.Subscriber<? super List<ByteBuffer>> subscriber = this.getAndClearDownstream();
                if (subscriber != null) {
                    subscriber.onComplete();
                }
            }
        }

        private Flow.Subscriber<? super List<ByteBuffer>> getAndClearDownstream() {
            return DOWNSTREAM.getAndSet(this, null);
        }

        private boolean tryScheduleWrite(boolean maintainWritingState) {
            ByteBuffer buffer = this.writeQueue.peek();
            if (buffer != null && (maintainWritingState && this.state == WritingState.WRITING || STATE.compareAndSet(this, WritingState.IDLE, WritingState.WRITING))) {
                this.writeQueue.poll();
                this.scheduleWrite(buffer);
                return true;
            }
            if (buffer == null && (maintainWritingState || this.state == WritingState.IDLE) && this.receivedBodyCompletion) {
                this.commitEdit();
                return true;
            }
            return false;
        }

        private void scheduleWrite(ByteBuffer buffer) {
            try {
                this.editor.writeAsync(POSITION.getAndAdd(this, buffer.remaining()), buffer).whenComplete((__, error) -> this.onWriteCompletion((Throwable)error));
            }
            catch (RuntimeException t2) {
                this.discardEdit(t2);
            }
        }

        private void commitEdit() {
            if (STATE.getAndSet(this, WritingState.DISPOSED) != WritingState.DISPOSED) {
                IOException commitFailure = null;
                try (Store.Editor editor = this.editor;){
                    this.editor.commitOnClose();
                }
                catch (IOException e) {
                    commitFailure = e;
                    logger.log(System.Logger.Level.WARNING, "Editor::close failure while committing edit", (Throwable)e);
                }
                if (commitFailure != null) {
                    this.listener.onWriteFailure(commitFailure);
                } else {
                    this.listener.onWriteSuccess();
                }
            }
        }

        private void discardEdit(@Nullable Throwable writeFailure) {
            if (STATE.getAndSet(this, WritingState.DISPOSED) != WritingState.DISPOSED) {
                if (writeFailure != null) {
                    logger.log(System.Logger.Level.WARNING, "aborting cache edit as a problem occurred while writing", writeFailure);
                    this.listener.onWriteFailure(writeFailure);
                }
                this.writeQueue.clear();
                try {
                    this.editor.close();
                }
                catch (IOException e) {
                    logger.log(System.Logger.Level.WARNING, "Editor::close failure while aborting edit", (Throwable)e);
                }
            }
        }

        private void onWriteCompletion(@Nullable Throwable error) {
            if (error != null) {
                try {
                    this.discardEdit(error);
                }
                finally {
                    if (this.downstream == null) {
                        this.upstream.cancel();
                    }
                }
            } else if (!this.tryScheduleWrite(true) && STATE.compareAndSet(this, WritingState.WRITING, WritingState.IDLE)) {
                this.tryScheduleWrite(false);
            }
        }

        static {
            try {
                MethodHandles.Lookup lookup = MethodHandles.lookup();
                DOWNSTREAM = lookup.findVarHandle(CacheWritingSubscription.class, "downstream", Flow.Subscriber.class);
                STATE = lookup.findVarHandle(CacheWritingSubscription.class, "state", WritingState.class);
                POSITION = lookup.findVarHandle(CacheWritingSubscription.class, "position", Long.TYPE);
            }
            catch (IllegalAccessException | NoSuchFieldException e) {
                throw new ExceptionInInitializerError(e);
            }
        }

        static enum WritingState {
            IDLE,
            WRITING,
            DISPOSED;

        }
    }
}

