package com.caucho.amp.queue;

import com.caucho.amp.queue.DisruptorBuilder;
import com.caucho.amp.queue.MessageDeliver;
import com.caucho.amp.thread.ThreadPool;
import com.caucho.util.L10N;
import java.util.Objects;
import java.util.concurrent.Executor;

/* loaded from: input_file:com/caucho/amp/queue/QueueServiceBuilderImpl.class */
public class QueueServiceBuilderImpl<M extends MessageDeliver> extends QueueServiceBuilderBase<M> {
    private static final L10N L = new L10N(QueueServiceBuilderImpl.class);
    private Executor _executor;
    private long _workerIdleTimeout;
    private int _threadMax = 65536;
    private OutboxFactory<M> _outboxFactory = new OutboxFactoryImpl();

    public OutboxFactory<M> getOutboxFactory() {
        return this._outboxFactory;
    }

    public void setOutboxFactory(OutboxFactory<M> outboxFactory) {
        Objects.requireNonNull(outboxFactory);
        this._outboxFactory = outboxFactory;
    }

    public Executor getExecutor() {
        return this._executor;
    }

    public void setExecutor(Executor executor) {
        Objects.requireNonNull(executor);
        this._executor = executor;
    }

    public void setWorkerIdleTimeout(long j) {
        this._workerIdleTimeout = j;
    }

    public void setThreadMax(int i) {
        this._threadMax = i;
    }

    public int getThreadMax() {
        return this._threadMax;
    }

    @Override // com.caucho.amp.queue.QueueDeliverBuilder
    public OutboxDeliverImpl<M> createOutbox(DeliverAmp<M> deliverAmp) {
        return this._outboxFactory.createOutbox(deliverAmp);
    }

    @Override // com.caucho.amp.queue.QueueServiceBuilderBase, com.caucho.amp.queue.QueueServiceBuilder
    public QueueService<M> build(DeliverAmp<M> deliverAmp) {
        validateBuilder();
        if (deliverAmp == null) {
            throw new IllegalArgumentException(L.l("'processors' is required"));
        }
        QueueDeliver<M> buildQueue = buildQueue();
        return new QueueServiceImpl(buildQueue, new WorkerDeliverSingleThread(this._outboxFactory.createOutbox(deliverAmp), buildQueue, deliverAmp, createExecutor()));
    }

    protected QueueDeliver<M> buildQueue() {
        int initial = getInitial();
        int capacity = getCapacity();
        return (initial >= capacity || initial <= 0) ? new QueueRing(capacity) : new QueueRingResizing(initial, capacity);
    }

    @Override // com.caucho.amp.queue.QueueDeliverBuilder
    public QueueDeliver<M> buildQueue(CounterBuilder counterBuilder) {
        int initial = getInitial();
        int capacity = getCapacity();
        return (initial >= capacity || initial <= 0) ? new QueueRing(capacity, counterBuilder) : new QueueRingResizing(initial, capacity, counterBuilder);
    }

    @Override // com.caucho.amp.queue.QueueServiceBuilderBase, com.caucho.amp.queue.QueueServiceBuilder
    public QueueService<M> build(DeliverAmp<M>... deliverAmpArr) {
        return buildMultiworker(deliverAmpArr);
    }

    public QueueService<M> buildMultiworker(DeliverAmp<M>... deliverAmpArr) {
        validateBuilder();
        if (deliverAmpArr.length == 1) {
            return build(deliverAmpArr[0]);
        }
        QueueDeliver<M> buildQueue = buildQueue();
        Objects.requireNonNull(deliverAmpArr);
        WorkerDeliverLifecycle[] workerDeliverLifecycleArr = new WorkerDeliverLifecycle[deliverAmpArr.length];
        Executor createExecutor = createExecutor();
        for (int i = 0; i < workerDeliverLifecycleArr.length; i++) {
            DeliverAmp<M> deliverAmp = deliverAmpArr[i];
            workerDeliverLifecycleArr[i] = new WorkerDeliverMultiThread(this._outboxFactory.createOutbox(deliverAmp), buildQueue, deliverAmp, createExecutor);
        }
        return new QueueServiceImpl(buildQueue, new WorkerDeliverMultiCoordinator(buildQueue, workerDeliverLifecycleArr, getMultiworkerOffset()));
    }

    public QueueService<M> buildSpawn(DeliverAmp<M> deliverAmp) {
        validateBuilder();
        return build(new DeliverAmpSpawn(deliverAmp, createBlockingExecutor()));
    }

    @Override // com.caucho.amp.queue.QueueServiceBuilderBase, com.caucho.amp.queue.QueueServiceBuilder
    public DisruptorBuilder<M> disruptorBuilder(DisruptorBuilder.DeliverFactory<M> deliverFactory) {
        return new DisruptorBuilderTop(this, deliverFactory);
    }

    @Override // com.caucho.amp.queue.QueueServiceBuilder
    public DisruptorBuilder<M> disruptorBuilder(final DeliverAmp<M> deliverAmp) {
        return new DisruptorBuilderTop(this, new DisruptorBuilder.DeliverFactory<M>() { // from class: com.caucho.amp.queue.QueueServiceBuilderImpl.1
            @Override // com.caucho.jdkadapt.Supplier
            public DeliverAmp<M> get() {
                return deliverAmp;
            }

            @Override // com.caucho.amp.queue.DisruptorBuilder.DeliverFactory
            public int getMaxWorkers() {
                return 1;
            }
        });
    }

    public <X extends Runnable> QueueService<X> buildSpawnTask(SpawnThreadManager spawnThreadManager) {
        validateBuilder();
        return new QueueServiceSpawn(buildQueue(), createBlockingExecutor(), spawnThreadManager);
    }

    @Override // com.caucho.amp.queue.QueueDeliverBuilder
    public Executor createExecutor() {
        Executor executor = this._executor;
        if (executor == null) {
            executor = ThreadPool.getCurrent().getThrottleExecutor();
        }
        return executor;
    }

    private Executor createBlockingExecutor() {
        Executor executor = this._executor;
        if (executor == null) {
            executor = ThreadPool.getCurrent().getThrottleExecutor();
        }
        return executor;
    }
}
