package com.caucho.amp.inbox;

import com.caucho.amp.ServiceManagerAmp;
import com.caucho.amp.actor.ServiceRefCore;
import com.caucho.amp.journal.JournalAmp;
import com.caucho.amp.message.BuildMessageAmp;
import com.caucho.amp.message.ContextMessageAmp;
import com.caucho.amp.message.InboxMessage;
import com.caucho.amp.message.MessageActivate;
import com.caucho.amp.message.ReplayMessage;
import com.caucho.amp.message.RestoreMessage;
import com.caucho.amp.message.StartMessage;
import com.caucho.amp.queue.ContextOutbox;
import com.caucho.amp.queue.DisruptorBuilder;
import com.caucho.amp.queue.OutboxDeliver;
import com.caucho.amp.queue.QueueService;
import com.caucho.amp.queue.QueueServiceBuilder;
import com.caucho.amp.queue.WorkerDeliver;
import com.caucho.amp.queue.WorkerDeliverLifecycle;
import com.caucho.amp.spi.ActorAmp;
import com.caucho.amp.spi.HeadersAmp;
import com.caucho.amp.spi.InboxAmp;
import com.caucho.amp.spi.MessageAmp;
import com.caucho.amp.spi.MethodRefAmp;
import com.caucho.amp.spi.ServiceRefAmp;
import com.caucho.amp.spi.ShutdownModeAmp;
import com.caucho.amp.thread.ThreadPool;
import com.caucho.jdkadapt.Supplier;
import com.caucho.lifecycle.Lifecycle;
import com.caucho.util.L10N;
import io.baratine.core.QueueFullHandler;
import io.baratine.core.ResultWithFailure;
import io.baratine.core.ServiceConfig;
import io.baratine.core.ServiceExceptionClosed;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:com/caucho/amp/inbox/InboxQueue.class */
public class InboxQueue extends InboxBase {
    private static final L10N L = new L10N(InboxQueue.class);
    private static final Logger log = Logger.getLogger(InboxQueue.class.getName());
    private final ServiceRefAmp _serviceRef;
    private final String _anonAddress;
    private String _bindAddress;
    private final QueueService<MessageAmp> _queue;
    private final ActorAmp _actor;
    private final WorkerDeliverLifecycle _worker;
    private final boolean _isLifecycleAware;
    private final Lifecycle _lifecycle;
    private final long _sendTimeout;
    private final QueueFullHandler _fullHandler;
    private boolean _isNonBlocking;
    private boolean _isFiner;
    private InboxQueueReplyOverflow _replyOverflowQueue;
    private final InboxMessage _inboxMessage;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/caucho/amp/inbox/InboxQueue$ActiveResult.class */
    public class ActiveResult implements ResultWithFailure<Boolean> {
        private MessageActivate _onActiveMsg;

        ActiveResult(MessageActivate messageActivate) {
            this._onActiveMsg = messageActivate;
        }

        @Override // io.baratine.core.Result
        public void completed(Boolean bool) {
            InboxQueue.this._queue.offer(this._onActiveMsg);
            InboxQueue.this._lifecycle.toActive();
            InboxQueue.this._queue.wake();
        }

        @Override // io.baratine.core.ResultFailure
        public void failed(Throwable th) {
            InboxQueue.this._lifecycle.toActive();
            this._onActiveMsg.failed(th);
        }
    }

    public InboxQueue(ServiceManagerAmp serviceManagerAmp, QueueServiceBuilder<MessageAmp> queueServiceBuilder, QueueServiceFactoryInbox queueServiceFactoryInbox, ServiceConfig serviceConfig) {
        super(serviceManagerAmp);
        this._lifecycle = new Lifecycle();
        this._isFiner = log.isLoggable(Level.FINER);
        this._inboxMessage = new InboxMessage(this);
        ActorAmp mainActor = queueServiceFactoryInbox.getMainActor();
        String name = mainActor.getName();
        this._anonAddress = name == null ? "anon:" + mainActor.getApiClass().getName() : name;
        this._serviceRef = new ServiceRefCore(mainActor, this);
        QueueService<MessageAmp> build = queueServiceFactoryInbox.build(queueServiceBuilder, this);
        this._queue = build;
        this._worker = this._queue.getWorker();
        this._actor = mainActor;
        this._isLifecycleAware = mainActor.isLifecycleAware() || !build.isSingleWorker();
        long offerTimeout = serviceConfig.getOfferTimeout();
        this._sendTimeout = offerTimeout < 0 ? 60000L : offerTimeout;
        QueueFullHandler queueFullHandler = serviceConfig.getQueueFullHandler();
        this._fullHandler = queueFullHandler == null ? serviceManagerAmp.getQueueFullHandler() : queueFullHandler;
        this._isNonBlocking = mainActor.isNonblocking() && build != null && build.isSingleWorker();
    }

    MessageAmp getMessageInbox() {
        return this._inboxMessage;
    }

    @Override // com.caucho.amp.inbox.InboxBase, com.caucho.amp.spi.InboxAmp
    public boolean isLifecycleAware() {
        return this._isLifecycleAware;
    }

    @Override // com.caucho.amp.inbox.InboxBase, com.caucho.amp.spi.InboxAmp
    public void start() {
        if (this._lifecycle.toStarting()) {
            try {
                start(this._actor);
                this._lifecycle.toActive();
            } catch (Throwable th) {
                this._lifecycle.toActive();
                throw th;
            }
        }
    }

    private void start(ActorAmp actorAmp) {
        if (!this._isLifecycleAware) {
            this._lifecycle.toActive();
            return;
        }
        OutboxAmpBase outboxAmpBase = new OutboxAmpBase();
        MessageAmp andSet = ContextMessageAmp.getAndSet(new BuildMessageAmp(this));
        OutboxDeliver current = ContextOutbox.getCurrent();
        try {
            ContextOutbox.setCurrent(outboxAmpBase);
            RestoreMessage restoreMessage = new RestoreMessage(this, this._queue);
            outboxAmpBase.setContextMessage(restoreMessage);
            this._queue.offer(restoreMessage);
            this._queue.wake();
            if (!restoreMessage.waitFor(60L, TimeUnit.SECONDS)) {
                System.err.println("Restore wait timeout: " + this);
                System.out.println("Restore timeout queue: " + this._queue.getWorker() + " " + this._queue.size());
            }
            MessageActivate messageActivate = new MessageActivate(this);
            outboxAmpBase.setContextMessage(messageActivate);
            JournalAmp journal = actorAmp.getJournal();
            if (journal != null) {
                if (log.isLoggable(Level.FINER)) {
                    log.finer(L.l("journal replay {0} ({1})", getServiceRef().getAddress(), getServiceRef().getApiClass().getName()));
                }
                ReplayMessage replayMessage = new ReplayMessage(journal, this, this._queue, new ActiveResult(messageActivate));
                outboxAmpBase.setContextMessage(replayMessage);
                this._queue.offer(replayMessage);
                this._queue.wake();
            } else {
                this._queue.offer(messageActivate);
                this._lifecycle.toActive();
                this._queue.wake();
            }
            if (!messageActivate.waitFor(10L, TimeUnit.SECONDS)) {
                System.out.println("QUEUE0: " + this._queue.getWorker() + " " + this._queue.size() + " " + ThreadPool.getCurrent().getThrottleExecutor() + " " + ThreadPool.getCurrent().getStatus());
            }
            if (!messageActivate.waitFor(60L, TimeUnit.SECONDS)) {
                System.err.println("Start wait timeout: " + this);
                System.out.println("QUEUE: " + this._queue.getWorker() + " " + this._queue.size());
            }
            this._queue.offer(new StartMessage(this));
            this._queue.wake();
            ContextOutbox.setCurrent(current);
            ContextMessageAmp.set(andSet);
        } catch (Throwable th) {
            ContextOutbox.setCurrent(current);
            ContextMessageAmp.set(andSet);
            throw th;
        }
    }

    public DisruptorBuilder.DeliverFactory<MessageAmp> createDeliverFactory(Supplier<ActorAmp> supplier, ServiceConfig serviceConfig) {
        return new DeliverFactoryInboxQueue(this, supplier, serviceConfig);
    }

    @Override // com.caucho.amp.spi.InboxAmp
    public final ServiceRefAmp getServiceRef() {
        return this._serviceRef;
    }

    @Override // com.caucho.amp.inbox.InboxBase, com.caucho.amp.spi.InboxAmp
    public String getAddress() {
        return this._bindAddress != null ? this._bindAddress : this._anonAddress;
    }

    public String getDebugName() {
        return this._actor.getApiClass().getSimpleName() + "-" + getManager().getDebugId();
    }

    @Override // com.caucho.amp.inbox.InboxBase, com.caucho.amp.spi.InboxAmp
    public boolean bind(String str) {
        if (this._bindAddress != null) {
            return false;
        }
        this._bindAddress = str;
        return true;
    }

    @Override // com.caucho.amp.inbox.InboxBase, com.caucho.amp.spi.InboxAmp
    public final long getSize() {
        return this._queue.size();
    }

    protected final QueueService<MessageAmp> getQueue() {
        return this._queue;
    }

    @Override // com.caucho.amp.inbox.InboxBase, com.caucho.amp.spi.InboxAmp
    public boolean isNonBlocking() {
        return this._isNonBlocking;
    }

    @Override // com.caucho.amp.inbox.InboxBase, com.caucho.amp.spi.InboxAmp
    public final HeadersAmp createHeaders(HeadersAmp headersAmp, MethodRefAmp methodRefAmp) {
        if (this._isFiner || headersAmp.getSize() > 0) {
            int size = headersAmp.getSize();
            if (size > 100 && size < 120) {
                log.warning("Possible cycle: " + methodRefAmp + " " + this + " " + headersAmp);
            }
            int i = (size / 2) + 1;
            headersAmp = headersAmp.add("service." + i, methodRefAmp.getService().getAddress()).add("method." + i, methodRefAmp.getName());
        }
        return headersAmp;
    }

    @Override // com.caucho.amp.spi.InboxAmp
    public ActorAmp getDirectActor() {
        return this._actor;
    }

    @Override // com.caucho.amp.spi.InboxAmp
    public boolean offer(MessageAmp messageAmp, long j) {
        if (this._lifecycle.isAfterStopping()) {
            messageAmp.failed(new ServiceExceptionClosed(L.l("{0} for message {1}", getServiceRef(), messageAmp)));
            return true;
        }
        QueueService<MessageAmp> queueService = this._queue;
        long min = Math.min(this._sendTimeout, j);
        boolean offer = queueService.offer(messageAmp, min, TimeUnit.MILLISECONDS);
        if (offer) {
            return offer;
        }
        this._fullHandler.onQueueFull(getServiceRef(), queueService.size(), min, TimeUnit.MILLISECONDS, messageAmp);
        return false;
    }

    @Override // com.caucho.amp.inbox.InboxBase, com.caucho.amp.spi.InboxAmp
    public boolean offerResult(MessageAmp messageAmp) {
        InboxQueueReplyOverflow inboxQueueReplyOverflow;
        if (this._lifecycle.isAfterStopping()) {
            messageAmp.failed(new ServiceExceptionClosed(L.l("{0} for message {1}", getServiceRef(), messageAmp)));
            return true;
        }
        QueueService<MessageAmp> queueService = this._queue;
        boolean offer = queueService.offer(messageAmp, 0L, TimeUnit.MILLISECONDS);
        if (offer) {
            return offer;
        }
        synchronized (this) {
            inboxQueueReplyOverflow = this._replyOverflowQueue;
            if (inboxQueueReplyOverflow == null) {
                InboxQueueReplyOverflow inboxQueueReplyOverflow2 = new InboxQueueReplyOverflow(queueService);
                inboxQueueReplyOverflow = inboxQueueReplyOverflow2;
                this._replyOverflowQueue = inboxQueueReplyOverflow2;
            }
        }
        return inboxQueueReplyOverflow.offer(messageAmp);
    }

    @Override // com.caucho.amp.inbox.InboxBase, com.caucho.amp.spi.InboxAmp
    public final void offerAndWake(MessageAmp messageAmp, long j) {
        if (isClosed()) {
            messageAmp.failed(new ServiceExceptionClosed(L.l("Service {0}, actor {1} for message {2}", getServiceRef(), this._actor, messageAmp)));
            return;
        }
        QueueService<MessageAmp> queueService = this._queue;
        long min = Math.min(j, this._sendTimeout);
        if (!queueService.offer(messageAmp, min, TimeUnit.MILLISECONDS)) {
            this._fullHandler.onQueueFull(getServiceRef(), queueService.size(), min, TimeUnit.MILLISECONDS, messageAmp);
        }
        queueService.wake();
    }

    @Override // com.caucho.amp.inbox.InboxBase, com.caucho.amp.spi.InboxAmp
    public final WorkerDeliver getWorker() {
        return this._worker;
    }

    @Override // com.caucho.amp.spi.InboxAmp
    public boolean isClosed() {
        return this._lifecycle.isDestroyed();
    }

    @Override // com.caucho.amp.inbox.InboxBase, com.caucho.amp.spi.InboxAmp
    public void shutdown(ShutdownModeAmp shutdownModeAmp) {
        if (this._lifecycle.toStopping()) {
            super.shutdown(shutdownModeAmp);
            this._lifecycle.toDestroy();
        }
    }

    @Override // com.caucho.amp.inbox.InboxBase, com.caucho.amp.spi.InboxAmp
    public void activateActors() {
        this._worker.activate();
    }

    @Override // com.caucho.amp.inbox.InboxBase, com.caucho.amp.spi.InboxAmp
    public void startActors() {
        this._worker.start();
    }

    @Override // com.caucho.amp.inbox.InboxBase, com.caucho.amp.spi.InboxAmp
    public void shutdownActors(ShutdownModeAmp shutdownModeAmp) {
        this._worker.shutdown(shutdownModeAmp);
    }

    public String toString() {
        return getClass().getSimpleName() + "[" + getServiceRef().getAddress() + "]";
    }

    @Override // com.caucho.amp.spi.OutboxAmp
    public InboxAmp getInbox() {
        return this;
    }

    @Override // com.caucho.amp.spi.OutboxAmp
    public MessageAmp getMessage() {
        return this._inboxMessage;
    }

    @Override // com.caucho.amp.spi.OutboxAmp
    public void setInbox(InboxAmp inboxAmp) {
    }

    @Override // com.caucho.amp.spi.OutboxAmp
    public void setMessage(MessageAmp messageAmp) {
    }
}
