package com.caucho.hemp.broker;

import com.caucho.bam.ActorError;
import com.caucho.bam.ActorStream;
import com.caucho.bam.Broker;
import com.caucho.hemp.packet.Message;
import com.caucho.hemp.packet.MessageError;
import com.caucho.hemp.packet.Packet;
import com.caucho.hemp.packet.PacketQueue;
import com.caucho.hemp.packet.Presence;
import com.caucho.hemp.packet.PresenceError;
import com.caucho.hemp.packet.PresenceProbe;
import com.caucho.hemp.packet.PresenceSubscribe;
import com.caucho.hemp.packet.PresenceSubscribed;
import com.caucho.hemp.packet.PresenceUnavailable;
import com.caucho.hemp.packet.PresenceUnsubscribe;
import com.caucho.hemp.packet.PresenceUnsubscribed;
import com.caucho.hemp.packet.QueryError;
import com.caucho.hemp.packet.QueryGet;
import com.caucho.hemp.packet.QueryResult;
import com.caucho.hemp.packet.QuerySet;
import com.caucho.server.util.ScheduledThreadPool;
import com.caucho.util.Alarm;
import com.caucho.util.L10N;
import com.caucho.util.WaitQueue;
import java.io.Serializable;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:com/caucho/hemp/broker/HempMemoryQueue.class */
public class HempMemoryQueue implements ActorStream, Runnable {
    private static final Logger log = Logger.getLogger(HempMemoryQueue.class.getName());
    private static final L10N L = new L10N(HempMemoryQueue.class);
    private static long _gid;
    private long _queueIdleTimeout;
    private final Executor _executor;
    private final ClassLoader _loader;
    private final Broker _broker;
    private final ActorStream _brokerStream;
    private final ActorStream _actorStream;
    private int _threadMax;
    private AtomicInteger _threadCount;
    private AtomicInteger _dequeueCount;
    private Object _idleLock;
    private final WaitQueue _wait;
    private long _lastExitTime;
    private PacketQueue _queue;
    private String _name;
    private volatile boolean _isClosed;

    public HempMemoryQueue(Broker broker, ActorStream actorStream) {
        this(null, broker, actorStream);
    }

    public HempMemoryQueue(String str, Broker broker, ActorStream actorStream) {
        this._queueIdleTimeout = 250L;
        this._executor = ScheduledThreadPool.getLocal();
        this._loader = Thread.currentThread().getContextClassLoader();
        this._threadCount = new AtomicInteger();
        this._dequeueCount = new AtomicInteger();
        this._idleLock = new Object();
        this._wait = new WaitQueue();
        if (broker == null) {
            throw new NullPointerException();
        }
        if (actorStream == null) {
            throw new NullPointerException();
        }
        this._broker = broker;
        this._brokerStream = broker.getBrokerStream();
        this._actorStream = actorStream;
        this._threadMax = 5;
        str = str == null ? this._actorStream.getJid() : str;
        this._name = str;
        this._queue = new PacketQueue(str, -1, 1024, -1L);
    }

    @Override // com.caucho.bam.ActorStream
    public String getJid() {
        return this._actorStream.getJid();
    }

    public boolean isPacketAvailable() {
        return !this._queue.isEmpty();
    }

    @Override // com.caucho.bam.ActorStream
    public void message(String str, String str2, Serializable serializable) {
        enqueue(new Message(str, str2, serializable));
    }

    @Override // com.caucho.bam.ActorStream
    public void messageError(String str, String str2, Serializable serializable, ActorError actorError) {
        enqueue(new MessageError(str, str2, serializable, actorError));
    }

    @Override // com.caucho.bam.ActorStream
    public void queryGet(long j, String str, String str2, Serializable serializable) {
        if (str2 == null) {
            throw new NullPointerException();
        }
        enqueue(new QueryGet(j, str, str2, serializable));
    }

    @Override // com.caucho.bam.ActorStream
    public void querySet(long j, String str, String str2, Serializable serializable) {
        enqueue(new QuerySet(j, str, str2, serializable));
    }

    @Override // com.caucho.bam.ActorStream
    public void queryResult(long j, String str, String str2, Serializable serializable) {
        enqueue(new QueryResult(j, str, str2, serializable));
    }

    @Override // com.caucho.bam.ActorStream
    public void queryError(long j, String str, String str2, Serializable serializable, ActorError actorError) {
        enqueue(new QueryError(j, str, str2, serializable, actorError));
    }

    @Override // com.caucho.bam.ActorStream
    public void presence(String str, String str2, Serializable serializable) {
        enqueue(new Presence(str, str2, serializable));
    }

    @Override // com.caucho.bam.ActorStream
    public void presenceUnavailable(String str, String str2, Serializable serializable) {
        enqueue(new PresenceUnavailable(str, str2, serializable));
    }

    @Override // com.caucho.bam.ActorStream
    public void presenceProbe(String str, String str2, Serializable serializable) {
        enqueue(new PresenceProbe(str, str2, serializable));
    }

    @Override // com.caucho.bam.ActorStream
    public void presenceSubscribe(String str, String str2, Serializable serializable) {
        enqueue(new PresenceSubscribe(str, str2, serializable));
    }

    @Override // com.caucho.bam.ActorStream
    public void presenceSubscribed(String str, String str2, Serializable serializable) {
        enqueue(new PresenceSubscribed(str, str2, serializable));
    }

    @Override // com.caucho.bam.ActorStream
    public void presenceUnsubscribe(String str, String str2, Serializable serializable) {
        enqueue(new PresenceUnsubscribe(str, str2, serializable));
    }

    @Override // com.caucho.bam.ActorStream
    public void presenceUnsubscribed(String str, String str2, Serializable serializable) {
        enqueue(new PresenceUnsubscribed(str, str2, serializable));
    }

    @Override // com.caucho.bam.ActorStream
    public void presenceError(String str, String str2, Serializable serializable, ActorError actorError) {
        enqueue(new PresenceError(str, str2, serializable, actorError));
    }

    protected ActorStream getActorStream() {
        return this._actorStream;
    }

    protected final void enqueue(Packet packet) {
        if (log.isLoggable(Level.FINEST)) {
            log.finest(this + " enqueue(" + this._queue.getSize() + ") " + packet);
        }
        this._queue.enqueue(packet);
        long j = this._lastExitTime;
        this._lastExitTime = Alarm.getCurrentTime();
        if (this._wait.wake()) {
            return;
        }
        do {
            int size = this._queue.getSize();
            int i = this._threadCount.get();
            long currentTime = Alarm.getCurrentTime();
            if (size == 0 || i == this._threadMax) {
                return;
            }
            if (i >= 2 && size / 3 < i) {
                return;
            }
            if ((i > 0 && currentTime <= j + 1) || isClosed()) {
                return;
            }
            if (this._threadCount.compareAndSet(i, i + 1)) {
                this._executor.execute(this);
                return;
            }
        } while (this._threadCount.get() <= 0);
    }

    protected void dispatch(Packet packet) {
        packet.dispatch(getActorStream(), this._brokerStream);
    }

    protected Packet dequeue() {
        return this._queue.dequeue();
    }

    private void consumeQueue(WaitQueue.Item item) {
        while (!isClosed()) {
            try {
                Packet dequeue = this._queue.dequeue();
                if (dequeue != null) {
                    this._lastExitTime = Alarm.getCurrentTime();
                    if (log.isLoggable(Level.FINEST)) {
                        log.finest(this + " dequeue " + dequeue);
                    }
                    dispatch(dequeue);
                } else if (!waitForQueue(item)) {
                    return;
                }
            } catch (Exception e) {
                log.log(Level.WARNING, e.toString(), (Throwable) e);
            }
        }
    }

    private boolean waitForQueue(WaitQueue.Item item) {
        try {
            if (!isClosed() && this._queue.getSize() == 0) {
                item.park(this._queueIdleTimeout);
            }
        } catch (Exception e) {
        }
        long currentTime = Alarm.getCurrentTime();
        if (this._queue.getSize() > 0) {
            return true;
        }
        if (this._lastExitTime + this._queueIdleTimeout >= currentTime && !Alarm.isTest()) {
            return true;
        }
        this._lastExitTime = currentTime;
        int decrementAndGet = this._threadCount.decrementAndGet();
        return this._queue.getSize() > 0 && decrementAndGet < this._threadMax && this._threadCount.compareAndSet(decrementAndGet, decrementAndGet + 1);
    }

    @Override // java.lang.Runnable
    public void run() {
        Thread currentThread = Thread.currentThread();
        StringBuilder append = new StringBuilder().append(this._name).append("-");
        long j = _gid;
        _gid = j + 1;
        currentThread.setName(append.append(j).toString());
        currentThread.setContextClassLoader(this._loader);
        boolean z = false;
        WaitQueue.Item create = this._wait.create();
        try {
            if (log.isLoggable(Level.FINEST)) {
                log.finest(this + " spawn {threadCount:" + this._threadCount.get() + ", queueSize:" + this._queue.getSize() + "}");
            }
            consumeQueue(create);
            z = true;
            if (1 == 0) {
                this._threadCount.decrementAndGet();
            }
            create.remove();
        } catch (Throwable th) {
            if (!z) {
                this._threadCount.decrementAndGet();
            }
            create.remove();
            throw th;
        }
    }

    @Override // com.caucho.bam.ActorStream
    public void close() {
        this._isClosed = true;
        this._wait.wakeAll();
    }

    public boolean isClosed() {
        return this._isClosed || this._broker.isClosed();
    }

    public String toString() {
        return getClass().getSimpleName() + "[" + this._name + "]";
    }
}
