SmackReactor.java

  1. /**
  2.  *
  3.  * Copyright 2018-2023 Florian Schmaus
  4.  *
  5.  * Licensed under the Apache License, Version 2.0 (the "License");
  6.  * you may not use this file except in compliance with the License.
  7.  * You may obtain a copy of the License at
  8.  *
  9.  *     http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */
  17. package org.jivesoftware.smack;

  18. import java.io.IOException;
  19. import java.nio.channels.CancelledKeyException;
  20. import java.nio.channels.ClosedChannelException;
  21. import java.nio.channels.SelectableChannel;
  22. import java.nio.channels.SelectionKey;
  23. import java.nio.channels.Selector;
  24. import java.util.ArrayList;
  25. import java.util.Collection;
  26. import java.util.Collections;
  27. import java.util.Date;
  28. import java.util.Iterator;
  29. import java.util.List;
  30. import java.util.Queue;
  31. import java.util.Set;
  32. import java.util.concurrent.ConcurrentLinkedQueue;
  33. import java.util.concurrent.DelayQueue;
  34. import java.util.concurrent.Semaphore;
  35. import java.util.concurrent.TimeUnit;
  36. import java.util.concurrent.atomic.AtomicBoolean;
  37. import java.util.concurrent.locks.Lock;
  38. import java.util.concurrent.locks.ReentrantLock;
  39. import java.util.logging.Level;
  40. import java.util.logging.Logger;

  41. /**
  42.  * The SmackReactor for non-blocking I/O.
  43.  * <p>
  44.  * Highlights include:
  45.  * <ul>
  46.  * <li>Multiple reactor threads</li>
  47.  * <li>Scheduled actions</li>
  48.  * </ul>
  49.  *
  50.  * <pre>
  51.  *
  52.  *           ) ) )
  53.  *        ( ( (
  54.  *      ) ) )
  55.  *   (~~~~~~~~~)
  56.  *    | Smack |
  57.  *    |Reactor|
  58.  *    I      _._
  59.  *    I    /'   `\
  60.  *    I   |       |
  61.  *    f   |   |~~~~~~~~~~~~~~|
  62.  *  .'    |   | #   #   #  # |
  63.  * '______|___|___________###|
  64.  * </pre>
  65.  */
  66. public class SmackReactor {

  67.     private static final Logger LOGGER = Logger.getLogger(SmackReactor.class.getName());

  68.     private static final int DEFAULT_REACTOR_THREAD_COUNT = 2;

  69.     private static final int PENDING_SET_INTEREST_OPS_MAX_BATCH_SIZE = 1024;

  70.     private static SmackReactor INSTANCE;

  71.     static synchronized SmackReactor getInstance() {
  72.         if (INSTANCE == null) {
  73.             INSTANCE = new SmackReactor("DefaultReactor");
  74.         }
  75.         return INSTANCE;
  76.     }

  77.     private final Selector selector;
  78.     private final String reactorName;

  79.     private final List<Reactor> reactorThreads = Collections.synchronizedList(new ArrayList<>());

  80.     private final DelayQueue<ScheduledAction> scheduledActions = new DelayQueue<>();

  81.     private final Lock registrationLock = new ReentrantLock();

  82.     /**
  83.      * The semaphore protecting the handling of the actions. Note that it is
  84.      * initialized with -1, which basically means that one thread will always do I/O using
  85.      * select().
  86.      */
  87.     private final Semaphore actionsSemaphore = new Semaphore(-1, false);

  88.     private final Queue<SelectionKey> pendingSelectionKeys = new ConcurrentLinkedQueue<>();

  89.     private final Queue<SetInterestOps> pendingSetInterestOps = new ConcurrentLinkedQueue<>();

  90.     SmackReactor(String reactorName) {
  91.         this.reactorName = reactorName;

  92.         try {
  93.             selector = Selector.open();
  94.         }
  95.         catch (IOException e) {
  96.             throw new IllegalStateException(e);
  97.         }

  98.         setReactorThreadCount(DEFAULT_REACTOR_THREAD_COUNT);
  99.     }

  100.     public SelectionKey registerWithSelector(SelectableChannel channel, int ops, ChannelSelectedCallback callback)
  101.             throws ClosedChannelException {
  102.         SelectionKeyAttachment selectionKeyAttachment = new SelectionKeyAttachment(callback);

  103.         registrationLock.lock();
  104.         try {
  105.             selector.wakeup();
  106.             return channel.register(selector, ops, selectionKeyAttachment);
  107.         } finally {
  108.             registrationLock.unlock();
  109.         }
  110.     }

  111.     public void setInterestOps(SelectionKey selectionKey, int interestOps) {
  112.         SetInterestOps setInterestOps = new SetInterestOps(selectionKey, interestOps);
  113.         pendingSetInterestOps.add(setInterestOps);
  114.         selector.wakeup();
  115.     }

  116.     private static final class SetInterestOps {
  117.         private final SelectionKey selectionKey;
  118.         private final int interestOps;

  119.         private SetInterestOps(SelectionKey selectionKey, int interestOps) {
  120.             this.selectionKey = selectionKey;
  121.             this.interestOps = interestOps;
  122.         }
  123.     }

  124.     ScheduledAction schedule(Runnable runnable, long delay, TimeUnit unit, ScheduledAction.Kind scheduledActionKind) {
  125.         long releaseTimeEpoch = System.currentTimeMillis() + unit.toMillis(delay);
  126.         Date releaseTimeDate = new Date(releaseTimeEpoch);
  127.         ScheduledAction scheduledAction = new ScheduledAction(runnable, releaseTimeDate, this, scheduledActionKind);
  128.         scheduledActions.add(scheduledAction);
  129.         selector.wakeup();
  130.         return scheduledAction;
  131.     }

  132.     /**
  133.      * Cancels the scheduled action.
  134.      *
  135.      * @param scheduledAction the scheduled action to cancel.
  136.      * @return <code>true</code> if the scheduled action was still pending and got removed, <code>false</code> otherwise.
  137.      */
  138.     boolean cancel(ScheduledAction scheduledAction) {
  139.         return scheduledActions.remove(scheduledAction);
  140.     }

  141.     private class Reactor extends Thread {

  142.         private volatile long shutdownRequestTimestamp = -1;

  143.         @Override
  144.         public void run() {
  145.             try {
  146.                 reactorLoop();
  147.             } finally {
  148.                 if (shutdownRequestTimestamp > 0) {
  149.                     long shutDownDelay = System.currentTimeMillis() - shutdownRequestTimestamp;
  150.                     LOGGER.info(this + " shut down after " + shutDownDelay + "ms");
  151.                 } else {
  152.                     boolean contained = reactorThreads.remove(this);
  153.                     assert contained;
  154.                 }
  155.             }
  156.         }

  157.         private void reactorLoop() {
  158.             // Loop until reactor shutdown was requested.
  159.             while (shutdownRequestTimestamp < 0) {
  160.                 handleScheduledActionsOrPerformSelect();

  161.                 handlePendingSelectionKeys();
  162.             }
  163.         }

  164.         @SuppressWarnings("LockNotBeforeTry")
  165.         private void handleScheduledActionsOrPerformSelect() {
  166.             ScheduledAction dueScheduledAction = null;

  167.             boolean permitToHandleScheduledActions = actionsSemaphore.tryAcquire();
  168.             if (permitToHandleScheduledActions) {
  169.                 try {
  170.                     dueScheduledAction = scheduledActions.poll();
  171.                 } finally {
  172.                     actionsSemaphore.release();
  173.                 }
  174.             }

  175.             if (dueScheduledAction != null) {
  176.                 dueScheduledAction.run();
  177.                 return;
  178.             }

  179.             int newSelectedKeysCount = 0;
  180.             List<SelectionKey> selectedKeys;
  181.             synchronized (selector) {
  182.                 ScheduledAction nextScheduledAction = scheduledActions.peek();

  183.                 long selectWait;
  184.                 if (nextScheduledAction == null) {
  185.                     // There is no next scheduled action, wait indefinitely in select() or until another thread invokes
  186.                     // selector.wakeup().
  187.                     selectWait = 0;
  188.                 } else {
  189.                     selectWait = nextScheduledAction.getTimeToDueMillis();
  190.                     if (selectWait <= 0) {
  191.                         // A scheduled action was just released and became ready to execute.
  192.                         return;
  193.                     }
  194.                 }

  195.                 // Before we call select, we handle the pending the interest Ops. This will not block since no other
  196.                 // thread is currently in select() at this time.
  197.                 // Note: This was put deliberately before the registration lock. It may cause more synchronization but
  198.                 // allows for more parallelism.
  199.                 // Hopefully that assumption is right.
  200.                 int myHandledPendingSetInterestOps = 0;
  201.                 for (SetInterestOps setInterestOps; (setInterestOps = pendingSetInterestOps.poll()) != null;) {
  202.                     setInterestOpsCancelledKeySafe(setInterestOps.selectionKey, setInterestOps.interestOps);

  203.                     if (myHandledPendingSetInterestOps++ >= PENDING_SET_INTEREST_OPS_MAX_BATCH_SIZE) {
  204.                         // This thread has handled enough "set pending interest ops" requests. Wakeup another one to
  205.                         // handle the remaining (if any).
  206.                         selector.wakeup();
  207.                         break;
  208.                     }
  209.                 }

  210.                 // Ensure that a wakeup() in registerWithSelector() gives the corresponding
  211.                 // register() in the same method the chance to actually register the channel. In
  212.                 // other words: This construct ensures that there is never another select()
  213.                 // between a corresponding wakeup() and register() calls.
  214.                 // See also https://stackoverflow.com/a/1112809/194894
  215.                 registrationLock.lock();
  216.                 registrationLock.unlock();

  217.                 try {
  218.                     newSelectedKeysCount = selector.select(selectWait);
  219.                 } catch (IOException e) {
  220.                     LOGGER.log(Level.SEVERE, "IOException while using select()", e);
  221.                     return;
  222.                 }

  223.                 if (newSelectedKeysCount == 0) {
  224.                     return;
  225.                 }

  226.                 // Copy the selected-key set over to selectedKeys, remove the keys from the
  227.                 // selected key set and loose interest of the key OPs for the time being.
  228.                 // Note that we perform this operation in two steps in order to maximize the
  229.                 // timespan setRacing() is set.
  230.                 Set<SelectionKey> selectedKeySet = selector.selectedKeys();
  231.                 for (SelectionKey selectionKey : selectedKeySet) {
  232.                     SelectionKeyAttachment selectionKeyAttachment = (SelectionKeyAttachment) selectionKey.attachment();
  233.                     selectionKeyAttachment.setRacing();
  234.                 }
  235.                 for (SelectionKey selectionKey : selectedKeySet) {
  236.                     setInterestOpsCancelledKeySafe(selectionKey, 0);
  237.                 }

  238.                 selectedKeys = new ArrayList<>(selectedKeySet);
  239.                 selectedKeySet.clear();
  240.             }

  241.             int selectedKeysCount = selectedKeys.size();
  242.             int currentReactorThreadCount = reactorThreads.size();
  243.             int myKeyCount;
  244.             if (selectedKeysCount > currentReactorThreadCount) {
  245.                 myKeyCount = selectedKeysCount / currentReactorThreadCount;
  246.             } else {
  247.                 myKeyCount = selectedKeysCount;
  248.             }

  249.             final Level reactorSelectStatsLogLevel = Level.FINE;
  250.             if (LOGGER.isLoggable(reactorSelectStatsLogLevel)) {
  251.                 LOGGER.log(reactorSelectStatsLogLevel,
  252.                                 "New selected key count: " + newSelectedKeysCount
  253.                                 + ". Total selected key count " + selectedKeysCount
  254.                                 + ". My key count: " + myKeyCount
  255.                                 + ". Current reactor thread count: " + currentReactorThreadCount);
  256.             }

  257.             Collection<SelectionKey> mySelectedKeys = new ArrayList<>(myKeyCount);
  258.             Iterator<SelectionKey> it = selectedKeys.iterator();
  259.             for (int i = 0; i < myKeyCount; i++) {
  260.                 SelectionKey selectionKey = it.next();
  261.                 mySelectedKeys.add(selectionKey);
  262.             }
  263.             while (it.hasNext()) {
  264.                 // Drain to pendingSelectionKeys.
  265.                 SelectionKey selectionKey = it.next();
  266.                 pendingSelectionKeys.add(selectionKey);
  267.             }

  268.             if (selectedKeysCount - myKeyCount > 0) {
  269.                 // There where pending selection keys: Wakeup another reactor thread to handle them.
  270.                 selector.wakeup();
  271.             }

  272.             handleSelectedKeys(mySelectedKeys);
  273.         }

  274.         private void handlePendingSelectionKeys() {
  275.             final int pendingSelectionKeysSize = pendingSelectionKeys.size();
  276.             if (pendingSelectionKeysSize == 0) {
  277.                 return;
  278.             }

  279.             int currentReactorThreadCount = reactorThreads.size();
  280.             int myKeyCount = pendingSelectionKeysSize / currentReactorThreadCount;
  281.             // The division could result in myKeyCount being zero, even though there are pending selection keys.
  282.             // Therefore, ensure that this thread tries to get at least one pending selection key by invoking poll().
  283.             // Otherwise, it could happen that we end up in a busy loop, where myKeyCount is zero and this thread invokes
  284.             // selector.wakeup() below because pendingSelectionsKeys is not empty, but the woken up reactor thread wil
  285.             // end up with myKeyCount being zero again, restarting the busy-loop cycle.
  286.             if (myKeyCount == 0) myKeyCount = 1;
  287.             Collection<SelectionKey> selectedKeys = new ArrayList<>(myKeyCount);
  288.             for (int i = 0; i < myKeyCount; i++) {
  289.                 SelectionKey selectionKey = pendingSelectionKeys.poll();
  290.                 if (selectionKey == null) {
  291.                     // We lost a race and can abort here since the pendingSelectionKeys queue is empty.
  292.                     break;
  293.                 }
  294.                 selectedKeys.add(selectionKey);
  295.             }

  296.             if (!pendingSelectionKeys.isEmpty()) {
  297.                 // There are more pending selection keys, wakeup a thread blocked in select() to handle them.
  298.                 selector.wakeup();
  299.             }

  300.             handleSelectedKeys(selectedKeys);
  301.         }

  302.         private void setInterestOpsCancelledKeySafe(SelectionKey selectionKey, int interestOps) {
  303.             try {
  304.                 selectionKey.interestOps(interestOps);
  305.             }
  306.             catch (CancelledKeyException e) {
  307.                 final Level keyCancelledLogLevel = Level.FINER;
  308.                 if (LOGGER.isLoggable(keyCancelledLogLevel)) {
  309.                     LOGGER.log(keyCancelledLogLevel, "Key '" + selectionKey + "' has been cancelled", e);
  310.                 }
  311.             }
  312.         }

  313.         void requestShutdown() {
  314.             shutdownRequestTimestamp = System.currentTimeMillis();
  315.         }
  316.     }

  317.     private static void handleSelectedKeys(Collection<SelectionKey> selectedKeys) {
  318.         for (SelectionKey selectionKey : selectedKeys) {
  319.             SelectableChannel channel = selectionKey.channel();
  320.             SelectionKeyAttachment selectionKeyAttachment = (SelectionKeyAttachment) selectionKey.attachment();
  321.             ChannelSelectedCallback channelSelectedCallback = selectionKeyAttachment.channelSelectedCallback;
  322.             channelSelectedCallback.onChannelSelected(channel, selectionKey);
  323.         }
  324.     }

  325.     public interface ChannelSelectedCallback {
  326.         void onChannelSelected(SelectableChannel channel, SelectionKey selectionKey);
  327.     }

  328.     public void setReactorThreadCount(int reactorThreadCount) {
  329.         if (reactorThreadCount < 2) {
  330.             throw new IllegalArgumentException("Must have at least two reactor threads, but you requested " + reactorThreadCount);
  331.         }

  332.         synchronized (reactorThreads) {
  333.             int deltaThreads = reactorThreadCount - reactorThreads.size();
  334.             if (deltaThreads > 0) {
  335.                 // Start new reactor thread. Note that we start the threads before we increase the permits of the
  336.                 // actionsSemaphore.
  337.                 for (int i = 0; i < deltaThreads; i++) {
  338.                     Reactor reactor = new Reactor();
  339.                     reactor.setDaemon(true);
  340.                     reactor.setName("Smack " + reactorName + " Thread #" + i);
  341.                     reactorThreads.add(reactor);
  342.                     reactor.start();
  343.                 }

  344.                 actionsSemaphore.release(deltaThreads);
  345.             } else {
  346.                 // Stop existing reactor threads. First we change the sign of deltaThreads, then we decrease the permits
  347.                 // of the actionsSemaphore *before* we signal the selected reactor threads that they should shut down.
  348.                 deltaThreads -= deltaThreads;

  349.                 for (int i = deltaThreads - 1; i > 0; i--) {
  350.                     // Note that this could potentially block forever, starving on the unfair semaphore.
  351.                     actionsSemaphore.acquireUninterruptibly();
  352.                 }

  353.                 for (int i = deltaThreads - 1; i > 0; i--) {
  354.                     Reactor reactor = reactorThreads.remove(i);
  355.                     reactor.requestShutdown();
  356.                 }

  357.                 selector.wakeup();
  358.             }
  359.         }
  360.     }

  361.     public static final class SelectionKeyAttachment {
  362.         private final ChannelSelectedCallback channelSelectedCallback;
  363.         private final AtomicBoolean reactorThreadRacing = new AtomicBoolean();

  364.         private SelectionKeyAttachment(ChannelSelectedCallback channelSelectedCallback) {
  365.             this.channelSelectedCallback = channelSelectedCallback;
  366.         }

  367.         private void setRacing() {
  368.             // We use lazySet here since it is sufficient if the value does not become visible immediately.
  369.             reactorThreadRacing.lazySet(true);
  370.         }

  371.         public void resetReactorThreadRacing() {
  372.             reactorThreadRacing.set(false);
  373.         }

  374.         public boolean isReactorThreadRacing() {
  375.             return reactorThreadRacing.get();
  376.         }

  377.     }
  378. }