SmackReactor.java
- /**
- *
- * Copyright 2018-2023 Florian Schmaus
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.jivesoftware.smack;
- import java.io.IOException;
- import java.nio.channels.CancelledKeyException;
- import java.nio.channels.ClosedChannelException;
- import java.nio.channels.SelectableChannel;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.util.ArrayList;
- import java.util.Collection;
- import java.util.Collections;
- import java.util.Date;
- import java.util.Iterator;
- import java.util.List;
- import java.util.Queue;
- import java.util.Set;
- import java.util.concurrent.ConcurrentLinkedQueue;
- import java.util.concurrent.DelayQueue;
- import java.util.concurrent.Semaphore;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.atomic.AtomicBoolean;
- import java.util.concurrent.locks.Lock;
- import java.util.concurrent.locks.ReentrantLock;
- import java.util.logging.Level;
- import java.util.logging.Logger;
- /**
- * The SmackReactor for non-blocking I/O.
- * <p>
- * Highlights include:
- * <ul>
- * <li>Multiple reactor threads</li>
- * <li>Scheduled actions</li>
- * </ul>
- *
- * <pre>
- *
- * ) ) )
- * ( ( (
- * ) ) )
- * (~~~~~~~~~)
- * | Smack |
- * |Reactor|
- * I _._
- * I /' `\
- * I | |
- * f | |~~~~~~~~~~~~~~|
- * .' | | # # # # |
- * '______|___|___________###|
- * </pre>
- */
- public class SmackReactor {
- private static final Logger LOGGER = Logger.getLogger(SmackReactor.class.getName());
- private static final int DEFAULT_REACTOR_THREAD_COUNT = 2;
- private static final int PENDING_SET_INTEREST_OPS_MAX_BATCH_SIZE = 1024;
- private static SmackReactor INSTANCE;
- static synchronized SmackReactor getInstance() {
- if (INSTANCE == null) {
- INSTANCE = new SmackReactor("DefaultReactor");
- }
- return INSTANCE;
- }
- private final Selector selector;
- private final String reactorName;
- private final List<Reactor> reactorThreads = Collections.synchronizedList(new ArrayList<>());
- private final DelayQueue<ScheduledAction> scheduledActions = new DelayQueue<>();
- private final Lock registrationLock = new ReentrantLock();
- /**
- * The semaphore protecting the handling of the actions. Note that it is
- * initialized with -1, which basically means that one thread will always do I/O using
- * select().
- */
- private final Semaphore actionsSemaphore = new Semaphore(-1, false);
- private final Queue<SelectionKey> pendingSelectionKeys = new ConcurrentLinkedQueue<>();
- private final Queue<SetInterestOps> pendingSetInterestOps = new ConcurrentLinkedQueue<>();
- SmackReactor(String reactorName) {
- this.reactorName = reactorName;
- try {
- selector = Selector.open();
- }
- catch (IOException e) {
- throw new IllegalStateException(e);
- }
- setReactorThreadCount(DEFAULT_REACTOR_THREAD_COUNT);
- }
- public SelectionKey registerWithSelector(SelectableChannel channel, int ops, ChannelSelectedCallback callback)
- throws ClosedChannelException {
- SelectionKeyAttachment selectionKeyAttachment = new SelectionKeyAttachment(callback);
- registrationLock.lock();
- try {
- selector.wakeup();
- return channel.register(selector, ops, selectionKeyAttachment);
- } finally {
- registrationLock.unlock();
- }
- }
- public void setInterestOps(SelectionKey selectionKey, int interestOps) {
- SetInterestOps setInterestOps = new SetInterestOps(selectionKey, interestOps);
- pendingSetInterestOps.add(setInterestOps);
- selector.wakeup();
- }
- private static final class SetInterestOps {
- private final SelectionKey selectionKey;
- private final int interestOps;
- private SetInterestOps(SelectionKey selectionKey, int interestOps) {
- this.selectionKey = selectionKey;
- this.interestOps = interestOps;
- }
- }
- ScheduledAction schedule(Runnable runnable, long delay, TimeUnit unit, ScheduledAction.Kind scheduledActionKind) {
- long releaseTimeEpoch = System.currentTimeMillis() + unit.toMillis(delay);
- Date releaseTimeDate = new Date(releaseTimeEpoch);
- ScheduledAction scheduledAction = new ScheduledAction(runnable, releaseTimeDate, this, scheduledActionKind);
- scheduledActions.add(scheduledAction);
- selector.wakeup();
- return scheduledAction;
- }
- /**
- * Cancels the scheduled action.
- *
- * @param scheduledAction the scheduled action to cancel.
- * @return <code>true</code> if the scheduled action was still pending and got removed, <code>false</code> otherwise.
- */
- boolean cancel(ScheduledAction scheduledAction) {
- return scheduledActions.remove(scheduledAction);
- }
- private class Reactor extends Thread {
- private volatile long shutdownRequestTimestamp = -1;
- @Override
- public void run() {
- try {
- reactorLoop();
- } finally {
- if (shutdownRequestTimestamp > 0) {
- long shutDownDelay = System.currentTimeMillis() - shutdownRequestTimestamp;
- LOGGER.info(this + " shut down after " + shutDownDelay + "ms");
- } else {
- boolean contained = reactorThreads.remove(this);
- assert contained;
- }
- }
- }
- private void reactorLoop() {
- // Loop until reactor shutdown was requested.
- while (shutdownRequestTimestamp < 0) {
- handleScheduledActionsOrPerformSelect();
- handlePendingSelectionKeys();
- }
- }
- @SuppressWarnings("LockNotBeforeTry")
- private void handleScheduledActionsOrPerformSelect() {
- ScheduledAction dueScheduledAction = null;
- boolean permitToHandleScheduledActions = actionsSemaphore.tryAcquire();
- if (permitToHandleScheduledActions) {
- try {
- dueScheduledAction = scheduledActions.poll();
- } finally {
- actionsSemaphore.release();
- }
- }
- if (dueScheduledAction != null) {
- dueScheduledAction.run();
- return;
- }
- int newSelectedKeysCount = 0;
- List<SelectionKey> selectedKeys;
- synchronized (selector) {
- ScheduledAction nextScheduledAction = scheduledActions.peek();
- long selectWait;
- if (nextScheduledAction == null) {
- // There is no next scheduled action, wait indefinitely in select() or until another thread invokes
- // selector.wakeup().
- selectWait = 0;
- } else {
- selectWait = nextScheduledAction.getTimeToDueMillis();
- if (selectWait <= 0) {
- // A scheduled action was just released and became ready to execute.
- return;
- }
- }
- // Before we call select, we handle the pending the interest Ops. This will not block since no other
- // thread is currently in select() at this time.
- // Note: This was put deliberately before the registration lock. It may cause more synchronization but
- // allows for more parallelism.
- // Hopefully that assumption is right.
- int myHandledPendingSetInterestOps = 0;
- for (SetInterestOps setInterestOps; (setInterestOps = pendingSetInterestOps.poll()) != null;) {
- setInterestOpsCancelledKeySafe(setInterestOps.selectionKey, setInterestOps.interestOps);
- if (myHandledPendingSetInterestOps++ >= PENDING_SET_INTEREST_OPS_MAX_BATCH_SIZE) {
- // This thread has handled enough "set pending interest ops" requests. Wakeup another one to
- // handle the remaining (if any).
- selector.wakeup();
- break;
- }
- }
- // Ensure that a wakeup() in registerWithSelector() gives the corresponding
- // register() in the same method the chance to actually register the channel. In
- // other words: This construct ensures that there is never another select()
- // between a corresponding wakeup() and register() calls.
- // See also https://stackoverflow.com/a/1112809/194894
- registrationLock.lock();
- registrationLock.unlock();
- try {
- newSelectedKeysCount = selector.select(selectWait);
- } catch (IOException e) {
- LOGGER.log(Level.SEVERE, "IOException while using select()", e);
- return;
- }
- if (newSelectedKeysCount == 0) {
- return;
- }
- // Copy the selected-key set over to selectedKeys, remove the keys from the
- // selected key set and loose interest of the key OPs for the time being.
- // Note that we perform this operation in two steps in order to maximize the
- // timespan setRacing() is set.
- Set<SelectionKey> selectedKeySet = selector.selectedKeys();
- for (SelectionKey selectionKey : selectedKeySet) {
- SelectionKeyAttachment selectionKeyAttachment = (SelectionKeyAttachment) selectionKey.attachment();
- selectionKeyAttachment.setRacing();
- }
- for (SelectionKey selectionKey : selectedKeySet) {
- setInterestOpsCancelledKeySafe(selectionKey, 0);
- }
- selectedKeys = new ArrayList<>(selectedKeySet);
- selectedKeySet.clear();
- }
- int selectedKeysCount = selectedKeys.size();
- int currentReactorThreadCount = reactorThreads.size();
- int myKeyCount;
- if (selectedKeysCount > currentReactorThreadCount) {
- myKeyCount = selectedKeysCount / currentReactorThreadCount;
- } else {
- myKeyCount = selectedKeysCount;
- }
- final Level reactorSelectStatsLogLevel = Level.FINE;
- if (LOGGER.isLoggable(reactorSelectStatsLogLevel)) {
- LOGGER.log(reactorSelectStatsLogLevel,
- "New selected key count: " + newSelectedKeysCount
- + ". Total selected key count " + selectedKeysCount
- + ". My key count: " + myKeyCount
- + ". Current reactor thread count: " + currentReactorThreadCount);
- }
- Collection<SelectionKey> mySelectedKeys = new ArrayList<>(myKeyCount);
- Iterator<SelectionKey> it = selectedKeys.iterator();
- for (int i = 0; i < myKeyCount; i++) {
- SelectionKey selectionKey = it.next();
- mySelectedKeys.add(selectionKey);
- }
- while (it.hasNext()) {
- // Drain to pendingSelectionKeys.
- SelectionKey selectionKey = it.next();
- pendingSelectionKeys.add(selectionKey);
- }
- if (selectedKeysCount - myKeyCount > 0) {
- // There where pending selection keys: Wakeup another reactor thread to handle them.
- selector.wakeup();
- }
- handleSelectedKeys(mySelectedKeys);
- }
- private void handlePendingSelectionKeys() {
- final int pendingSelectionKeysSize = pendingSelectionKeys.size();
- if (pendingSelectionKeysSize == 0) {
- return;
- }
- int currentReactorThreadCount = reactorThreads.size();
- int myKeyCount = pendingSelectionKeysSize / currentReactorThreadCount;
- // The division could result in myKeyCount being zero, even though there are pending selection keys.
- // Therefore, ensure that this thread tries to get at least one pending selection key by invoking poll().
- // Otherwise, it could happen that we end up in a busy loop, where myKeyCount is zero and this thread invokes
- // selector.wakeup() below because pendingSelectionsKeys is not empty, but the woken up reactor thread wil
- // end up with myKeyCount being zero again, restarting the busy-loop cycle.
- if (myKeyCount == 0) myKeyCount = 1;
- Collection<SelectionKey> selectedKeys = new ArrayList<>(myKeyCount);
- for (int i = 0; i < myKeyCount; i++) {
- SelectionKey selectionKey = pendingSelectionKeys.poll();
- if (selectionKey == null) {
- // We lost a race and can abort here since the pendingSelectionKeys queue is empty.
- break;
- }
- selectedKeys.add(selectionKey);
- }
- if (!pendingSelectionKeys.isEmpty()) {
- // There are more pending selection keys, wakeup a thread blocked in select() to handle them.
- selector.wakeup();
- }
- handleSelectedKeys(selectedKeys);
- }
- private void setInterestOpsCancelledKeySafe(SelectionKey selectionKey, int interestOps) {
- try {
- selectionKey.interestOps(interestOps);
- }
- catch (CancelledKeyException e) {
- final Level keyCancelledLogLevel = Level.FINER;
- if (LOGGER.isLoggable(keyCancelledLogLevel)) {
- LOGGER.log(keyCancelledLogLevel, "Key '" + selectionKey + "' has been cancelled", e);
- }
- }
- }
- void requestShutdown() {
- shutdownRequestTimestamp = System.currentTimeMillis();
- }
- }
- private static void handleSelectedKeys(Collection<SelectionKey> selectedKeys) {
- for (SelectionKey selectionKey : selectedKeys) {
- SelectableChannel channel = selectionKey.channel();
- SelectionKeyAttachment selectionKeyAttachment = (SelectionKeyAttachment) selectionKey.attachment();
- ChannelSelectedCallback channelSelectedCallback = selectionKeyAttachment.channelSelectedCallback;
- channelSelectedCallback.onChannelSelected(channel, selectionKey);
- }
- }
- public interface ChannelSelectedCallback {
- void onChannelSelected(SelectableChannel channel, SelectionKey selectionKey);
- }
- public void setReactorThreadCount(int reactorThreadCount) {
- if (reactorThreadCount < 2) {
- throw new IllegalArgumentException("Must have at least two reactor threads, but you requested " + reactorThreadCount);
- }
- synchronized (reactorThreads) {
- int deltaThreads = reactorThreadCount - reactorThreads.size();
- if (deltaThreads > 0) {
- // Start new reactor thread. Note that we start the threads before we increase the permits of the
- // actionsSemaphore.
- for (int i = 0; i < deltaThreads; i++) {
- Reactor reactor = new Reactor();
- reactor.setDaemon(true);
- reactor.setName("Smack " + reactorName + " Thread #" + i);
- reactorThreads.add(reactor);
- reactor.start();
- }
- actionsSemaphore.release(deltaThreads);
- } else {
- // Stop existing reactor threads. First we change the sign of deltaThreads, then we decrease the permits
- // of the actionsSemaphore *before* we signal the selected reactor threads that they should shut down.
- deltaThreads -= deltaThreads;
- for (int i = deltaThreads - 1; i > 0; i--) {
- // Note that this could potentially block forever, starving on the unfair semaphore.
- actionsSemaphore.acquireUninterruptibly();
- }
- for (int i = deltaThreads - 1; i > 0; i--) {
- Reactor reactor = reactorThreads.remove(i);
- reactor.requestShutdown();
- }
- selector.wakeup();
- }
- }
- }
- public static final class SelectionKeyAttachment {
- private final ChannelSelectedCallback channelSelectedCallback;
- private final AtomicBoolean reactorThreadRacing = new AtomicBoolean();
- private SelectionKeyAttachment(ChannelSelectedCallback channelSelectedCallback) {
- this.channelSelectedCallback = channelSelectedCallback;
- }
- private void setRacing() {
- // We use lazySet here since it is sufficient if the value does not become visible immediately.
- reactorThreadRacing.lazySet(true);
- }
- public void resetReactorThreadRacing() {
- reactorThreadRacing.set(false);
- }
- public boolean isReactorThreadRacing() {
- return reactorThreadRacing.get();
- }
- }
- }