package reactor.core.publisher;

import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Supplier;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Exceptions;
import reactor.core.Producer;
import reactor.core.Receiver;
import reactor.core.Trackable;
import reactor.core.publisher.EventLoopProcessor;
import reactor.core.publisher.RingBuffer;
import reactor.util.concurrent.QueueSupplier;
import reactor.util.concurrent.WaitStrategy;

/* loaded from: input_file:reactor/core/publisher/WorkQueueProcessor.class */
public final class WorkQueueProcessor<E> extends EventLoopProcessor<E> {
    final RingBuffer.Sequence workSequence;
    final RingBuffer.Sequence retrySequence;
    volatile RingBuffer<EventLoopProcessor.Slot<E>> retryBuffer;
    final WaitStrategy writeWait;
    volatile int replaying;
    private static final Supplier FACTORY = EventLoopProcessor.Slot::new;
    static final AtomicReferenceFieldUpdater<WorkQueueProcessor, RingBuffer> RETRY_REF = AtomicReferenceFieldUpdater.newUpdater(WorkQueueProcessor.class, RingBuffer.class, "retryBuffer");
    static final AtomicIntegerFieldUpdater<WorkQueueProcessor> REPLAYING = AtomicIntegerFieldUpdater.newUpdater(WorkQueueProcessor.class, "replaying");

    /* loaded from: input_file:reactor/core/publisher/WorkQueueProcessor$QueueSubscriberLoop.class */
    static final class QueueSubscriberLoop<T> implements Runnable, Producer, Trackable, Subscription, Receiver {
        private final RingBuffer.Reader barrier;
        private final WorkQueueProcessor<T> processor;
        private final Subscriber<? super T> subscriber;
        private final AtomicBoolean running = new AtomicBoolean(false);
        private final RingBuffer.Sequence sequence = EventLoopProcessor.wrap(-1, this);
        private final RingBuffer.Sequence pendingRequest = RingBuffer.newSequence(0);
        private final Runnable waiter = new Runnable() { // from class: reactor.core.publisher.WorkQueueProcessor.QueueSubscriberLoop.1
            @Override // java.lang.Runnable
            public void run() {
                if (!QueueSubscriberLoop.this.barrier.isAlerted() && QueueSubscriberLoop.this.isRunning()) {
                    if (!QueueSubscriberLoop.this.replay(QueueSubscriberLoop.this.pendingRequest.getAsLong() == Long.MAX_VALUE)) {
                        return;
                    }
                }
                WaitStrategy.throwAlert();
            }
        };

        public QueueSubscriberLoop(Subscriber<? super T> subscriber, WorkQueueProcessor<T> workQueueProcessor) {
            this.processor = workQueueProcessor;
            this.subscriber = subscriber;
            this.barrier = workQueueProcessor.ringBuffer.newReader();
        }

        public RingBuffer.Sequence getSequence() {
            return this.sequence;
        }

        public void halt() {
            this.running.set(false);
            this.barrier.alert();
        }

        public boolean isRunning() {
            return this.running.get() && (this.processor.terminated == 0 || (this.processor.error == null && this.processor.ringBuffer.getAsLong() > this.sequence.getAsLong()));
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (!this.running.compareAndSet(false, true)) {
                    Operators.error(this.subscriber, new IllegalStateException("Thread is already running"));
                    this.processor.decrementSubscribers();
                    this.processor.ringBuffer.removeGatingSequence(this.sequence);
                    this.running.set(false);
                    this.processor.writeWait.signalAllWhenBlocking();
                    return;
                }
                if (!this.processor.startSubscriber(this.subscriber, this)) {
                    this.processor.decrementSubscribers();
                    this.processor.ringBuffer.removeGatingSequence(this.sequence);
                    this.running.set(false);
                    this.processor.writeWait.signalAllWhenBlocking();
                    return;
                }
                boolean z = true;
                long j = Long.MIN_VALUE;
                long asLong = this.sequence.getAsLong();
                EventLoopProcessor.Slot<T> slot = null;
                if (!EventLoopProcessor.waitRequestOrTerminalEvent(this.pendingRequest, this.barrier, this.running, this.sequence, this.waiter)) {
                    if (!this.running.get()) {
                        this.processor.decrementSubscribers();
                        this.processor.ringBuffer.removeGatingSequence(this.sequence);
                        this.running.set(false);
                        this.processor.writeWait.signalAllWhenBlocking();
                        return;
                    }
                    if (this.processor.terminated == 1 && this.processor.ringBuffer.getAsLong() == -1) {
                        if (this.processor.error != null) {
                            this.subscriber.onError(this.processor.error);
                            this.processor.decrementSubscribers();
                            this.processor.ringBuffer.removeGatingSequence(this.sequence);
                            this.running.set(false);
                            this.processor.writeWait.signalAllWhenBlocking();
                            return;
                        }
                        this.subscriber.onComplete();
                        this.processor.decrementSubscribers();
                        this.processor.ringBuffer.removeGatingSequence(this.sequence);
                        this.running.set(false);
                        this.processor.writeWait.signalAllWhenBlocking();
                        return;
                    }
                }
                boolean z2 = this.pendingRequest.getAsLong() == Long.MAX_VALUE;
                if (replay(z2)) {
                    this.running.set(false);
                    this.processor.decrementSubscribers();
                    this.processor.ringBuffer.removeGatingSequence(this.sequence);
                    this.running.set(false);
                    this.processor.writeWait.signalAllWhenBlocking();
                    return;
                }
                while (true) {
                    if (z) {
                        z = false;
                        do {
                            try {
                                asLong = this.processor.workSequence.getAsLong() + 1;
                                while (!z2 && this.pendingRequest.getAsLong() == 0) {
                                    if (!isRunning()) {
                                        WaitStrategy.throwAlert();
                                    }
                                    LockSupport.parkNanos(1L);
                                }
                                this.sequence.set(asLong - 1);
                            } catch (RuntimeException e) {
                                if (!Exceptions.isCancel(e)) {
                                    if (!WaitStrategy.isAlert(e)) {
                                        throw e;
                                    }
                                    this.barrier.clearAlert();
                                    if (!this.running.get()) {
                                        break;
                                    }
                                    if (this.processor.terminated == 1) {
                                        if (this.processor.error == null) {
                                            if (this.processor.ringBuffer.getPending() == 0) {
                                                this.subscriber.onComplete();
                                                break;
                                            }
                                        } else {
                                            this.subscriber.onError(this.processor.error);
                                            break;
                                        }
                                    }
                                } else {
                                    reschedule(slot);
                                    break;
                                }
                                this.processor.decrementSubscribers();
                                this.processor.ringBuffer.removeGatingSequence(this.sequence);
                                this.running.set(false);
                                this.processor.writeWait.signalAllWhenBlocking();
                                return;
                            } catch (Throwable th) {
                                reschedule(slot);
                                this.subscriber.onError(th);
                                this.sequence.set(asLong);
                                z = true;
                            }
                        } while (!this.processor.workSequence.compareAndSet(asLong - 1, asLong));
                    }
                    if (j >= asLong) {
                        slot = (EventLoopProcessor.Slot) this.processor.ringBuffer.get(asLong);
                        try {
                            readNextEvent(z2);
                            this.subscriber.onNext(slot.value);
                            z = true;
                        } catch (Exception e2) {
                            if (!WaitStrategy.isAlert(e2)) {
                                throw e2;
                            }
                            this.barrier.clearAlert();
                            throw Exceptions.failWithCancel();
                        }
                    } else {
                        this.processor.readWait.signalAllWhenBlocking();
                        try {
                            j = this.barrier.waitFor(asLong, this.waiter);
                        } catch (Exception e3) {
                            if (!WaitStrategy.isAlert(e3)) {
                                throw e3;
                            }
                            this.barrier.clearAlert();
                            if (this.running.get()) {
                                throw e3;
                            }
                            this.processor.decrementSubscribers();
                            while (true) {
                                try {
                                    try {
                                        this.barrier.waitFor(asLong);
                                        reschedule((EventLoopProcessor.Slot) this.processor.ringBuffer.get(asLong));
                                        break;
                                    } catch (Exception e4) {
                                    }
                                } catch (Exception e5) {
                                    if (!WaitStrategy.isAlert(e5)) {
                                        throw e3;
                                    }
                                    this.barrier.clearAlert();
                                }
                            }
                        }
                    }
                }
                this.processor.decrementSubscribers();
                this.processor.ringBuffer.removeGatingSequence(this.sequence);
                this.running.set(false);
                this.processor.writeWait.signalAllWhenBlocking();
                return;
                this.processor.incrementSubscribers();
                throw e3;
            } catch (Throwable th2) {
                this.processor.decrementSubscribers();
                this.processor.ringBuffer.removeGatingSequence(this.sequence);
                this.running.set(false);
                this.processor.writeWait.signalAllWhenBlocking();
                throw th2;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean replay(boolean z) {
            try {
                if (!WorkQueueProcessor.REPLAYING.compareAndSet(this.processor, 0, 1)) {
                    return !this.processor.alive();
                }
                try {
                    RingBuffer<EventLoopProcessor.Slot<T>> ringBuffer = this.processor.retryBuffer;
                    if (ringBuffer == null) {
                        WorkQueueProcessor.REPLAYING.compareAndSet(this.processor, 1, 0);
                        return false;
                    }
                    while (this.running.get()) {
                        long asLong = this.processor.retrySequence.getAsLong() + 1;
                        if (ringBuffer.getCursor() < asLong) {
                            this.processor.readWait.signalAllWhenBlocking();
                            boolean z2 = !this.processor.alive();
                            WorkQueueProcessor.REPLAYING.compareAndSet(this.processor, 1, 0);
                            return z2;
                        }
                        EventLoopProcessor.Slot<T> slot = ringBuffer.get(asLong);
                        if (slot.value != null) {
                            readNextEvent(z);
                            this.subscriber.onNext(slot.value);
                            this.processor.retrySequence.set(asLong);
                        }
                    }
                    WorkQueueProcessor.REPLAYING.compareAndSet(this.processor, 1, 0);
                    return true;
                } catch (RuntimeException e) {
                    if (!Exceptions.isCancel(e)) {
                        throw e;
                    }
                    this.running.set(false);
                    WorkQueueProcessor.REPLAYING.compareAndSet(this.processor, 1, 0);
                    return true;
                }
            } catch (Throwable th) {
                WorkQueueProcessor.REPLAYING.compareAndSet(this.processor, 1, 0);
                throw th;
            }
        }

        private void reschedule(EventLoopProcessor.Slot<T> slot) {
            if (slot == null || slot.value == null) {
                return;
            }
            RingBuffer<EventLoopProcessor.Slot<T>> retryBuffer = this.processor.retryBuffer();
            long next = retryBuffer.next();
            retryBuffer.get(next).value = slot.value;
            retryBuffer.publish(next);
            this.barrier.alert();
            this.processor.readWait.signalAllWhenBlocking();
        }

        private void readNextEvent(boolean z) {
            while (!z && EventLoopProcessor.getAndSub(this.pendingRequest, 1L) == 0) {
                if (!isRunning()) {
                    WaitStrategy.throwAlert();
                }
                LockSupport.parkNanos(1L);
            }
        }

        @Override // reactor.core.Trackable
        public long requestedFromDownstream() {
            return this.pendingRequest.getAsLong();
        }

        @Override // reactor.core.Trackable
        public boolean isCancelled() {
            return !this.running.get();
        }

        @Override // reactor.core.Trackable
        public boolean isStarted() {
            return this.sequence.getAsLong() != -1;
        }

        @Override // reactor.core.Trackable
        public boolean isTerminated() {
            return !this.running.get();
        }

        @Override // reactor.core.Trackable
        public long getPending() {
            return this.processor.ringBuffer.getPending();
        }

        @Override // reactor.core.Trackable
        public long getCapacity() {
            return this.processor.getCapacity();
        }

        @Override // reactor.core.Producer
        public Object downstream() {
            return this.subscriber;
        }

        @Override // reactor.core.Receiver
        public Object upstream() {
            return this.processor;
        }

        public void request(long j) {
            if (Operators.checkRequest(j, this.subscriber) && this.running.get()) {
                EventLoopProcessor.getAndAddCap(this.pendingRequest, j);
            }
        }

        public void cancel() {
            halt();
        }
    }

    public static <E> WorkQueueProcessor<E> create() {
        return create(WorkQueueProcessor.class.getSimpleName(), QueueSupplier.SMALL_BUFFER_SIZE, (WaitStrategy) null, true);
    }

    public static <E> WorkQueueProcessor<E> create(boolean z) {
        return create(WorkQueueProcessor.class.getSimpleName(), QueueSupplier.SMALL_BUFFER_SIZE, (WaitStrategy) null, z);
    }

    public static <E> WorkQueueProcessor<E> create(ExecutorService executorService) {
        return create(executorService, QueueSupplier.SMALL_BUFFER_SIZE, (WaitStrategy) null, true);
    }

    public static <E> WorkQueueProcessor<E> create(ExecutorService executorService, boolean z) {
        return create(executorService, QueueSupplier.SMALL_BUFFER_SIZE, (WaitStrategy) null, z);
    }

    public static <E> WorkQueueProcessor<E> create(String str) {
        return create(str, QueueSupplier.SMALL_BUFFER_SIZE);
    }

    public static <E> WorkQueueProcessor<E> create(String str, int i) {
        return create(str, i, (WaitStrategy) null, true);
    }

    public static <E> WorkQueueProcessor<E> create(String str, int i, boolean z) {
        return create(str, i, (WaitStrategy) null, z);
    }

    public static <E> WorkQueueProcessor<E> create(ExecutorService executorService, int i) {
        return create(executorService, i, (WaitStrategy) null, true);
    }

    public static <E> WorkQueueProcessor<E> create(ExecutorService executorService, int i, boolean z) {
        return create(executorService, i, (WaitStrategy) null, z);
    }

    public static <E> WorkQueueProcessor<E> create(String str, int i, WaitStrategy waitStrategy) {
        return create(str, i, waitStrategy, true);
    }

    public static <E> WorkQueueProcessor<E> create(String str, int i, WaitStrategy waitStrategy, boolean z) {
        return new WorkQueueProcessor<>(str, i, waitStrategy == null ? WaitStrategy.liteBlocking() : waitStrategy, false, z);
    }

    public static <E> WorkQueueProcessor<E> create(ExecutorService executorService, int i, WaitStrategy waitStrategy) {
        return create(executorService, i, waitStrategy, true);
    }

    public static <E> WorkQueueProcessor<E> create(ExecutorService executorService, int i, WaitStrategy waitStrategy, boolean z) {
        return new WorkQueueProcessor<>(null, executorService, i, waitStrategy == null ? WaitStrategy.liteBlocking() : waitStrategy, false, z);
    }

    public static <E> WorkQueueProcessor<E> share() {
        return share(WorkQueueProcessor.class.getSimpleName(), QueueSupplier.SMALL_BUFFER_SIZE, (WaitStrategy) null, true);
    }

    public static <E> WorkQueueProcessor<E> share(boolean z) {
        return share(WorkQueueProcessor.class.getSimpleName(), QueueSupplier.SMALL_BUFFER_SIZE, (WaitStrategy) null, z);
    }

    public static <E> WorkQueueProcessor<E> share(ExecutorService executorService) {
        return share(executorService, QueueSupplier.SMALL_BUFFER_SIZE, (WaitStrategy) null, true);
    }

    public static <E> WorkQueueProcessor<E> share(ExecutorService executorService, boolean z) {
        return share(executorService, QueueSupplier.SMALL_BUFFER_SIZE, (WaitStrategy) null, z);
    }

    public static <E> WorkQueueProcessor<E> share(String str, int i) {
        return share(str, i, (WaitStrategy) null, true);
    }

    public static <E> WorkQueueProcessor<E> share(String str, int i, boolean z) {
        return share(str, i, (WaitStrategy) null, z);
    }

    public static <E> WorkQueueProcessor<E> share(ExecutorService executorService, int i) {
        return share(executorService, i, (WaitStrategy) null, true);
    }

    public static <E> WorkQueueProcessor<E> share(ExecutorService executorService, int i, boolean z) {
        return share(executorService, i, (WaitStrategy) null, z);
    }

    public static <E> WorkQueueProcessor<E> share(String str, int i, WaitStrategy waitStrategy) {
        return share(str, i, waitStrategy, true);
    }

    public static <E> WorkQueueProcessor<E> share(String str, int i, WaitStrategy waitStrategy, boolean z) {
        return new WorkQueueProcessor<>(str, i, waitStrategy == null ? WaitStrategy.liteBlocking() : waitStrategy, true, z);
    }

    public static <E> WorkQueueProcessor<E> share(ExecutorService executorService, int i, WaitStrategy waitStrategy) {
        return share(executorService, i, waitStrategy, true);
    }

    public static <E> WorkQueueProcessor<E> share(ExecutorService executorService, int i, WaitStrategy waitStrategy, boolean z) {
        return new WorkQueueProcessor<>(null, executorService, i, waitStrategy == null ? WaitStrategy.liteBlocking() : waitStrategy, true, z);
    }

    WorkQueueProcessor(String str, int i, WaitStrategy waitStrategy, boolean z, boolean z2) {
        this(new EventLoopProcessor.EventLoopFactory(str, z2), null, i, waitStrategy, z, z2);
    }

    WorkQueueProcessor(ThreadFactory threadFactory, ExecutorService executorService, int i, WaitStrategy waitStrategy, boolean z, boolean z2) {
        super(i, threadFactory, executorService, z2, z, FACTORY, waitStrategy);
        this.workSequence = RingBuffer.newSequence(-1L);
        this.retrySequence = RingBuffer.newSequence(-1L);
        this.writeWait = waitStrategy;
        this.ringBuffer.addGatingSequence(this.workSequence);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // reactor.core.publisher.FluxProcessor
    public void subscribe(Subscriber<? super E> subscriber) {
        super.subscribe(subscriber);
        if (!alive()) {
            TopicProcessor.coldSource(this.ringBuffer, null, this.error, this.workSequence).subscribe(subscriber);
            return;
        }
        QueueSubscriberLoop queueSubscriberLoop = new QueueSubscriberLoop(subscriber, this);
        try {
            incrementSubscribers();
            queueSubscriberLoop.sequence.set(this.workSequence.getAsLong());
            this.ringBuffer.addGatingSequence(queueSubscriberLoop.sequence);
            this.executor.execute(queueSubscriberLoop);
        } catch (Throwable th) {
            decrementSubscribers();
            this.ringBuffer.removeGatingSequence(queueSubscriberLoop.sequence);
            if (RejectedExecutionException.class.isAssignableFrom(th.getClass())) {
                TopicProcessor.coldSource(this.ringBuffer, th, this.error, this.workSequence).subscribe(subscriber);
            } else {
                Operators.error(subscriber, th);
            }
        }
    }

    @Override // reactor.core.publisher.EventLoopProcessor
    public Flux<E> drain() {
        return TopicProcessor.coldSource(this.ringBuffer, null, this.error, this.workSequence);
    }

    @Override // reactor.core.publisher.EventLoopProcessor
    protected void doError(Throwable th) {
        this.writeWait.signalAllWhenBlocking();
    }

    @Override // reactor.core.publisher.EventLoopProcessor
    protected void doComplete() {
        this.writeWait.signalAllWhenBlocking();
    }

    @Override // reactor.core.publisher.EventLoopProcessor
    protected void requestTask(Subscription subscription) {
        Runnable runnable = () -> {
            if (alive()) {
                return;
            }
            if (this.cancelled) {
                throw Exceptions.failWithCancel();
            }
            WaitStrategy.throwAlert();
        };
        RingBuffer<EventLoopProcessor.Slot<IN>> ringBuffer = this.ringBuffer;
        ringBuffer.getClass();
        new Thread(EventLoopProcessor.createRequestTask(subscription, runnable, null, ringBuffer::getMinimumGatingSequence, this.readWait, this, this.ringBuffer.bufferSize()), this.name + "[request-task]").start();
    }

    @Override // reactor.core.Trackable
    public long getPending() {
        return this.ringBuffer.remainingCapacity() + (this.retryBuffer != null ? this.retryBuffer.remainingCapacity() : 0L);
    }

    RingBuffer<EventLoopProcessor.Slot<E>> retryBuffer() {
        RingBuffer<EventLoopProcessor.Slot<E>> ringBuffer = this.retryBuffer;
        if (ringBuffer == null) {
            ringBuffer = RingBuffer.createMultiProducer(FACTORY, 32, WaitStrategy.busySpin());
            ringBuffer.addGatingSequence(this.retrySequence);
            if (!RETRY_REF.compareAndSet(this, null, ringBuffer)) {
                ringBuffer = this.retryBuffer;
            }
        }
        return ringBuffer;
    }

    @Override // reactor.core.MultiProducer
    public long downstreamCount() {
        return this.ringBuffer.getSequenceReceivers().length - 1;
    }

    @Override // java.lang.Runnable
    public void run() {
        if (alive()) {
            return;
        }
        WaitStrategy.throwAlert();
    }

    @Override // reactor.core.publisher.EventLoopProcessor, reactor.core.MultiProducer
    public /* bridge */ /* synthetic */ Iterator downstreams() {
        return super.downstreams();
    }

    @Override // reactor.core.publisher.EventLoopProcessor
    public /* bridge */ /* synthetic */ long getAvailableCapacity() {
        return super.getAvailableCapacity();
    }
}
