001/**
002 *
003 * Copyright 2018-2024 Florian Schmaus
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack;
018
019import java.io.IOException;
020import java.nio.channels.CancelledKeyException;
021import java.nio.channels.ClosedChannelException;
022import java.nio.channels.SelectableChannel;
023import java.nio.channels.SelectionKey;
024import java.nio.channels.Selector;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.Collections;
028import java.util.Date;
029import java.util.Iterator;
030import java.util.List;
031import java.util.Queue;
032import java.util.Set;
033import java.util.concurrent.ConcurrentLinkedQueue;
034import java.util.concurrent.DelayQueue;
035import java.util.concurrent.Semaphore;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.atomic.AtomicBoolean;
038import java.util.concurrent.locks.Lock;
039import java.util.concurrent.locks.ReentrantLock;
040import java.util.logging.Level;
041import java.util.logging.Logger;
042
043/**
044 * The SmackReactor for non-blocking I/O.
045 * <p>
046 * Highlights include:
047 * <ul>
048 * <li>Multiple reactor threads</li>
049 * <li>Scheduled actions</li>
050 * </ul>
051 *
052 * <pre>
053 *
054 *           ) ) )
055 *        ( ( (
056 *      ) ) )
057 *   (~~~~~~~~~)
058 *    | Smack |
059 *    |Reactor|
060 *    I      _._
061 *    I    /'   `\
062 *    I   |       |
063 *    f   |   |~~~~~~~~~~~~~~|
064 *  .'    |   | #   #   #  # |
065 * '______|___|___________###|
066 * </pre>
067 */
068public class SmackReactor {
069
070    private static final Logger LOGGER = Logger.getLogger(SmackReactor.class.getName());
071
072    private static final int DEFAULT_REACTOR_THREAD_COUNT = 2;
073
074    private static final int PENDING_SET_INTEREST_OPS_MAX_BATCH_SIZE = 1024;
075
076    private static SmackReactor INSTANCE;
077
078    static synchronized SmackReactor getInstance() {
079        if (INSTANCE == null) {
080            INSTANCE = new SmackReactor("DefaultReactor");
081        }
082        return INSTANCE;
083    }
084
085    private final Selector selector;
086    private final String reactorName;
087
088    private final List<Reactor> reactorThreads = Collections.synchronizedList(new ArrayList<>());
089
090    private final DelayQueue<ScheduledAction> scheduledActions = new DelayQueue<>();
091
092    private final Lock registrationLock = new ReentrantLock();
093
094    /**
095     * The semaphore protecting the handling of the actions. Note that it is
096     * initialized with -1, which basically means that one thread will always do I/O using
097     * select().
098     */
099    private final Semaphore actionsSemaphore = new Semaphore(-1, false);
100
101    private final Queue<SelectionKey> pendingSelectionKeys = new ConcurrentLinkedQueue<>();
102
103    private final Queue<SetInterestOps> pendingSetInterestOps = new ConcurrentLinkedQueue<>();
104
105    SmackReactor(String reactorName) {
106        this.reactorName = reactorName;
107
108        try {
109            selector = Selector.open();
110        }
111        catch (IOException e) {
112            throw new IllegalStateException(e);
113        }
114
115        setReactorThreadCount(DEFAULT_REACTOR_THREAD_COUNT);
116    }
117
118    public SelectionKey registerWithSelector(SelectableChannel channel, int ops, ChannelSelectedCallback callback)
119            throws ClosedChannelException {
120        SelectionKeyAttachment selectionKeyAttachment = new SelectionKeyAttachment(callback);
121
122        registrationLock.lock();
123        try {
124            selector.wakeup();
125            return channel.register(selector, ops, selectionKeyAttachment);
126        } finally {
127            registrationLock.unlock();
128        }
129    }
130
131    public void setInterestOps(SelectionKey selectionKey, int interestOps) {
132        SetInterestOps setInterestOps = new SetInterestOps(selectionKey, interestOps);
133        pendingSetInterestOps.add(setInterestOps);
134        selector.wakeup();
135    }
136
137    private static final class SetInterestOps {
138        private final SelectionKey selectionKey;
139        private final int interestOps;
140
141        private SetInterestOps(SelectionKey selectionKey, int interestOps) {
142            this.selectionKey = selectionKey;
143            this.interestOps = interestOps;
144        }
145    }
146
147    @SuppressWarnings("JavaUtilDate")
148    ScheduledAction schedule(Runnable runnable, long delay, TimeUnit unit, ScheduledAction.Kind scheduledActionKind) {
149        long releaseTimeEpoch = System.currentTimeMillis() + unit.toMillis(delay);
150        Date releaseTimeDate = new Date(releaseTimeEpoch);
151        ScheduledAction scheduledAction = new ScheduledAction(runnable, releaseTimeDate, this, scheduledActionKind);
152        scheduledActions.add(scheduledAction);
153        selector.wakeup();
154        return scheduledAction;
155    }
156
157    /**
158     * Cancels the scheduled action.
159     *
160     * @param scheduledAction the scheduled action to cancel.
161     * @return <code>true</code> if the scheduled action was still pending and got removed, <code>false</code> otherwise.
162     */
163    boolean cancel(ScheduledAction scheduledAction) {
164        return scheduledActions.remove(scheduledAction);
165    }
166
167    private final class Reactor extends Thread {
168
169        private volatile long shutdownRequestTimestamp = -1;
170
171        @Override
172        public void run() {
173            try {
174                reactorLoop();
175            } finally {
176                if (shutdownRequestTimestamp > 0) {
177                    long shutDownDelay = System.currentTimeMillis() - shutdownRequestTimestamp;
178                    LOGGER.info(this + " shut down after " + shutDownDelay + "ms");
179                } else {
180                    boolean contained = reactorThreads.remove(this);
181                    assert contained;
182                }
183            }
184        }
185
186        private void reactorLoop() {
187            // Loop until reactor shutdown was requested.
188            while (shutdownRequestTimestamp < 0) {
189                handleScheduledActionsOrPerformSelect();
190
191                handlePendingSelectionKeys();
192            }
193        }
194
195        @SuppressWarnings("LockNotBeforeTry")
196        private void handleScheduledActionsOrPerformSelect() {
197            ScheduledAction dueScheduledAction = null;
198
199            boolean permitToHandleScheduledActions = actionsSemaphore.tryAcquire();
200            if (permitToHandleScheduledActions) {
201                try {
202                    dueScheduledAction = scheduledActions.poll();
203                } finally {
204                    actionsSemaphore.release();
205                }
206            }
207
208            if (dueScheduledAction != null) {
209                dueScheduledAction.run();
210                return;
211            }
212
213            int newSelectedKeysCount = 0;
214            List<SelectionKey> selectedKeys;
215            synchronized (selector) {
216                ScheduledAction nextScheduledAction = scheduledActions.peek();
217
218                long selectWait;
219                if (nextScheduledAction == null) {
220                    // There is no next scheduled action, wait indefinitely in select() or until another thread invokes
221                    // selector.wakeup().
222                    selectWait = 0;
223                } else {
224                    selectWait = nextScheduledAction.getTimeToDueMillis();
225                    if (selectWait <= 0) {
226                        // A scheduled action was just released and became ready to execute.
227                        return;
228                    }
229                }
230
231                // Before we call select, we handle the pending the interest Ops. This will not block since no other
232                // thread is currently in select() at this time.
233                // Note: This was put deliberately before the registration lock. It may cause more synchronization but
234                // allows for more parallelism.
235                // Hopefully that assumption is right.
236                int myHandledPendingSetInterestOps = 0;
237                for (SetInterestOps setInterestOps; (setInterestOps = pendingSetInterestOps.poll()) != null;) {
238                    setInterestOpsCancelledKeySafe(setInterestOps.selectionKey, setInterestOps.interestOps);
239
240                    if (myHandledPendingSetInterestOps++ >= PENDING_SET_INTEREST_OPS_MAX_BATCH_SIZE) {
241                        // This thread has handled enough "set pending interest ops" requests. Wakeup another one to
242                        // handle the remaining (if any).
243                        selector.wakeup();
244                        break;
245                    }
246                }
247
248                // Ensure that a wakeup() in registerWithSelector() gives the corresponding
249                // register() in the same method the chance to actually register the channel. In
250                // other words: This construct ensures that there is never another select()
251                // between a corresponding wakeup() and register() calls.
252                // See also https://stackoverflow.com/a/1112809/194894
253                registrationLock.lock();
254                registrationLock.unlock();
255
256                try {
257                    newSelectedKeysCount = selector.select(selectWait);
258                } catch (IOException e) {
259                    LOGGER.log(Level.SEVERE, "IOException while using select()", e);
260                    return;
261                }
262
263                if (newSelectedKeysCount == 0) {
264                    return;
265                }
266
267                // Copy the selected-key set over to selectedKeys, remove the keys from the
268                // selected key set and loose interest of the key OPs for the time being.
269                // Note that we perform this operation in two steps in order to maximize the
270                // timespan setRacing() is set.
271                Set<SelectionKey> selectedKeySet = selector.selectedKeys();
272                for (SelectionKey selectionKey : selectedKeySet) {
273                    SelectionKeyAttachment selectionKeyAttachment = (SelectionKeyAttachment) selectionKey.attachment();
274                    selectionKeyAttachment.setRacing();
275                }
276                for (SelectionKey selectionKey : selectedKeySet) {
277                    setInterestOpsCancelledKeySafe(selectionKey, 0);
278                }
279
280                selectedKeys = new ArrayList<>(selectedKeySet);
281                selectedKeySet.clear();
282            }
283
284            int selectedKeysCount = selectedKeys.size();
285            int currentReactorThreadCount = reactorThreads.size();
286            int myKeyCount;
287            if (selectedKeysCount > currentReactorThreadCount) {
288                myKeyCount = selectedKeysCount / currentReactorThreadCount;
289            } else {
290                myKeyCount = selectedKeysCount;
291            }
292
293            final Level reactorSelectStatsLogLevel = Level.FINE;
294            if (LOGGER.isLoggable(reactorSelectStatsLogLevel)) {
295                LOGGER.log(reactorSelectStatsLogLevel,
296                                "New selected key count: " + newSelectedKeysCount
297                                + ". Total selected key count " + selectedKeysCount
298                                + ". My key count: " + myKeyCount
299                                + ". Current reactor thread count: " + currentReactorThreadCount);
300            }
301
302            Collection<SelectionKey> mySelectedKeys = new ArrayList<>(myKeyCount);
303            Iterator<SelectionKey> it = selectedKeys.iterator();
304            for (int i = 0; i < myKeyCount; i++) {
305                SelectionKey selectionKey = it.next();
306                mySelectedKeys.add(selectionKey);
307            }
308            while (it.hasNext()) {
309                // Drain to pendingSelectionKeys.
310                SelectionKey selectionKey = it.next();
311                pendingSelectionKeys.add(selectionKey);
312            }
313
314            if (selectedKeysCount - myKeyCount > 0) {
315                // There where pending selection keys: Wakeup another reactor thread to handle them.
316                selector.wakeup();
317            }
318
319            handleSelectedKeys(mySelectedKeys);
320        }
321
322        private void handlePendingSelectionKeys() {
323            final int pendingSelectionKeysSize = pendingSelectionKeys.size();
324            if (pendingSelectionKeysSize == 0) {
325                return;
326            }
327
328            int currentReactorThreadCount = reactorThreads.size();
329            int myKeyCount = pendingSelectionKeysSize / currentReactorThreadCount;
330            // The division could result in myKeyCount being zero, even though there are pending selection keys.
331            // Therefore, ensure that this thread tries to get at least one pending selection key by invoking poll().
332            // Otherwise, it could happen that we end up in a busy loop, where myKeyCount is zero and this thread invokes
333            // selector.wakeup() below because pendingSelectionsKeys is not empty, but the woken up reactor thread wil
334            // end up with myKeyCount being zero again, restarting the busy-loop cycle.
335            if (myKeyCount == 0) myKeyCount = 1;
336            Collection<SelectionKey> selectedKeys = new ArrayList<>(myKeyCount);
337            for (int i = 0; i < myKeyCount; i++) {
338                SelectionKey selectionKey = pendingSelectionKeys.poll();
339                if (selectionKey == null) {
340                    // We lost a race and can abort here since the pendingSelectionKeys queue is empty.
341                    break;
342                }
343                selectedKeys.add(selectionKey);
344            }
345
346            if (!pendingSelectionKeys.isEmpty()) {
347                // There are more pending selection keys, wakeup a thread blocked in select() to handle them.
348                selector.wakeup();
349            }
350
351            handleSelectedKeys(selectedKeys);
352        }
353
354        private void setInterestOpsCancelledKeySafe(SelectionKey selectionKey, int interestOps) {
355            try {
356                selectionKey.interestOps(interestOps);
357            }
358            catch (CancelledKeyException e) {
359                final Level keyCancelledLogLevel = Level.FINER;
360                if (LOGGER.isLoggable(keyCancelledLogLevel)) {
361                    LOGGER.log(keyCancelledLogLevel, "Key '" + selectionKey + "' has been cancelled", e);
362                }
363            }
364        }
365
366        void requestShutdown() {
367            shutdownRequestTimestamp = System.currentTimeMillis();
368        }
369    }
370
371    private static void handleSelectedKeys(Collection<SelectionKey> selectedKeys) {
372        for (SelectionKey selectionKey : selectedKeys) {
373            SelectableChannel channel = selectionKey.channel();
374            SelectionKeyAttachment selectionKeyAttachment = (SelectionKeyAttachment) selectionKey.attachment();
375            ChannelSelectedCallback channelSelectedCallback = selectionKeyAttachment.channelSelectedCallback;
376            channelSelectedCallback.onChannelSelected(channel, selectionKey);
377        }
378    }
379
380    public interface ChannelSelectedCallback {
381        void onChannelSelected(SelectableChannel channel, SelectionKey selectionKey);
382    }
383
384    public void setReactorThreadCount(int reactorThreadCount) {
385        if (reactorThreadCount < 2) {
386            throw new IllegalArgumentException("Must have at least two reactor threads, but you requested " + reactorThreadCount);
387        }
388
389        synchronized (reactorThreads) {
390            int deltaThreads = reactorThreadCount - reactorThreads.size();
391            if (deltaThreads > 0) {
392                // Start new reactor thread. Note that we start the threads before we increase the permits of the
393                // actionsSemaphore.
394                for (int i = 0; i < deltaThreads; i++) {
395                    Reactor reactor = new Reactor();
396                    reactor.setDaemon(true);
397                    reactor.setName("Smack " + reactorName + " Thread #" + i);
398                    reactorThreads.add(reactor);
399                    reactor.start();
400                }
401
402                actionsSemaphore.release(deltaThreads);
403            } else {
404                // Stop existing reactor threads. First we change the sign of deltaThreads, then we decrease the permits
405                // of the actionsSemaphore *before* we signal the selected reactor threads that they should shut down.
406                deltaThreads -= deltaThreads;
407
408                for (int i = deltaThreads - 1; i > 0; i--) {
409                    // Note that this could potentially block forever, starving on the unfair semaphore.
410                    actionsSemaphore.acquireUninterruptibly();
411                }
412
413                for (int i = deltaThreads - 1; i > 0; i--) {
414                    Reactor reactor = reactorThreads.remove(i);
415                    reactor.requestShutdown();
416                }
417
418                selector.wakeup();
419            }
420        }
421    }
422
423    public static final class SelectionKeyAttachment {
424        private final ChannelSelectedCallback channelSelectedCallback;
425        private final AtomicBoolean reactorThreadRacing = new AtomicBoolean();
426
427        private SelectionKeyAttachment(ChannelSelectedCallback channelSelectedCallback) {
428            this.channelSelectedCallback = channelSelectedCallback;
429        }
430
431        private void setRacing() {
432            // We use lazySet here since it is sufficient if the value does not become visible immediately.
433            reactorThreadRacing.lazySet(true);
434        }
435
436        public void resetReactorThreadRacing() {
437            reactorThreadRacing.set(false);
438        }
439
440        public boolean isReactorThreadRacing() {
441            return reactorThreadRacing.get();
442        }
443
444    }
445}