/*
 * Decompiled with CFR 0.152.
 */
package com.github.mizosoft.methanol.internal.flow;

import com.github.mizosoft.methanol.internal.concurrent.Delayer;
import com.github.mizosoft.methanol.internal.flow.FlowSupport;
import com.github.mizosoft.methanol.internal.flow.SerializedSubscriber;
import com.github.mizosoft.methanol.internal.flow.Upstream;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.Future;
import org.checkerframework.checker.nullness.qual.Nullable;

public abstract class TimeoutSubscriber<T>
extends SerializedSubscriber<T> {
    private static final Future<Void> DISABLED_TIMEOUT = CompletableFuture.completedFuture(null);
    private static final long TOMBSTONE = -1L;
    private static final VarHandle INDEX;
    private static final VarHandle DEMAND;
    private static final VarHandle TIMEOUT_TASK;
    private final Duration timeout;
    private final Delayer delayer;
    private final Upstream unwrappedUpstream = new Upstream();
    private volatile long demand;
    private volatile long index;
    private volatile @Nullable Future<Void> timeoutTask;

    public TimeoutSubscriber(Duration timeout2, Delayer delayer) {
        this.timeout = timeout2;
        this.delayer = delayer;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        Objects.requireNonNull(subscription);
        if (this.unwrappedUpstream.setOrCancel(subscription)) {
            super.onSubscribe(new TimeoutSubscription());
        }
    }

    @Override
    public void onNext(T item) {
        Objects.requireNonNull(item);
        long currentIndex = this.index;
        if (currentIndex != -1L && INDEX.compareAndSet(this, currentIndex++, currentIndex)) {
            long currentDemand;
            Future<Void> currentTimeoutTask = this.timeoutTask;
            if (currentTimeoutTask == DISABLED_TIMEOUT || !TIMEOUT_TASK.compareAndSet(this, currentTimeoutTask, null)) {
                return;
            }
            if (currentTimeoutTask != null) {
                currentTimeoutTask.cancel(true);
            }
            if ((currentDemand = FlowSupport.subtractAndGetDemand(this, DEMAND, 1L)) > 0L) {
                try {
                    this.scheduleTimeout(currentIndex);
                }
                catch (Error | RuntimeException e) {
                    this.upstream.cancel();
                    super.onError(e);
                    return;
                }
            } else if (currentDemand < 0L) {
                this.upstream.cancel();
                super.onError(new IllegalStateException("missing backpressure: receiving more items than requested"));
                return;
            }
            super.onNext(item);
        }
    }

    @Override
    public void onError(Throwable throwable) {
        Objects.requireNonNull(throwable);
        if (INDEX.getAndSet(this, -1L) != -1L) {
            this.disableTimeouts();
            super.onError(throwable);
        }
    }

    @Override
    public void onComplete() {
        if (INDEX.getAndSet(this, -1L) != -1L) {
            this.disableTimeouts();
            super.onComplete();
        }
    }

    private void scheduleTimeout(long index) {
        Future<Void> newTimeoutTask;
        Future<Void> currentTimeoutTask = this.timeoutTask;
        if (currentTimeoutTask != DISABLED_TIMEOUT && !TIMEOUT_TASK.compareAndSet(this, currentTimeoutTask, newTimeoutTask = this.delayer.delay(FlowSupport.SYNC_EXECUTOR, () -> this.onTimeout(index), this.timeout))) {
            newTimeoutTask.cancel(true);
        }
    }

    protected abstract Throwable timeoutError(long var1, Duration var3);

    private void onTimeout(long index) {
        if (INDEX.compareAndSet(this, index, -1L)) {
            this.upstream.cancel();
            super.onError(this.timeoutError(index, this.timeout));
        }
    }

    private void disableTimeouts() {
        Future future = TIMEOUT_TASK.getAndSet(this, DISABLED_TIMEOUT);
        if (future != null) {
            future.cancel(true);
        }
    }

    static {
        MethodHandles.Lookup lookup = MethodHandles.lookup();
        try {
            INDEX = lookup.findVarHandle(TimeoutSubscriber.class, "index", Long.TYPE);
            DEMAND = lookup.findVarHandle(TimeoutSubscriber.class, "demand", Long.TYPE);
            TIMEOUT_TASK = lookup.findVarHandle(TimeoutSubscriber.class, "timeoutTask", Future.class);
        }
        catch (IllegalAccessException | NoSuchFieldException e) {
            throw new IllegalArgumentException(e);
        }
    }

    private final class TimeoutSubscription
    implements Flow.Subscription {
        TimeoutSubscription() {
        }

        @Override
        public void request(long n) {
            long currentIndex = TimeoutSubscriber.this.index;
            if (currentIndex != -1L) {
                if (n > 0L && FlowSupport.getAndAddDemand(TimeoutSubscriber.this, DEMAND, n) == 0L) {
                    try {
                        TimeoutSubscriber.this.scheduleTimeout(currentIndex);
                    }
                    catch (Error | RuntimeException e) {
                        this.cancel();
                        throw e;
                    }
                }
                TimeoutSubscriber.this.unwrappedUpstream.request(n);
            }
        }

        @Override
        public void cancel() {
            TimeoutSubscriber.this.index = -1L;
            TimeoutSubscriber.this.disableTimeouts();
            TimeoutSubscriber.this.unwrappedUpstream.cancel();
        }
    }
}

