001/** 002 * 003 * Copyright 2018-2024 Florian Schmaus 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack; 018 019import java.io.IOException; 020import java.nio.channels.CancelledKeyException; 021import java.nio.channels.ClosedChannelException; 022import java.nio.channels.SelectableChannel; 023import java.nio.channels.SelectionKey; 024import java.nio.channels.Selector; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.Collections; 028import java.util.Date; 029import java.util.Iterator; 030import java.util.List; 031import java.util.Queue; 032import java.util.Set; 033import java.util.concurrent.ConcurrentLinkedQueue; 034import java.util.concurrent.DelayQueue; 035import java.util.concurrent.Semaphore; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.atomic.AtomicBoolean; 038import java.util.concurrent.locks.Lock; 039import java.util.concurrent.locks.ReentrantLock; 040import java.util.logging.Level; 041import java.util.logging.Logger; 042 043/** 044 * The SmackReactor for non-blocking I/O. 045 * <p> 046 * Highlights include: 047 * <ul> 048 * <li>Multiple reactor threads</li> 049 * <li>Scheduled actions</li> 050 * </ul> 051 * 052 * <pre> 053 * 054 * ) ) ) 055 * ( ( ( 056 * ) ) ) 057 * (~~~~~~~~~) 058 * | Smack | 059 * |Reactor| 060 * I _._ 061 * I /' `\ 062 * I | | 063 * f | |~~~~~~~~~~~~~~| 064 * .' | | # # # # | 065 * '______|___|___________###| 066 * </pre> 067 */ 068public class SmackReactor { 069 070 private static final Logger LOGGER = Logger.getLogger(SmackReactor.class.getName()); 071 072 private static final int DEFAULT_REACTOR_THREAD_COUNT = 2; 073 074 private static final int PENDING_SET_INTEREST_OPS_MAX_BATCH_SIZE = 1024; 075 076 private static SmackReactor INSTANCE; 077 078 static synchronized SmackReactor getInstance() { 079 if (INSTANCE == null) { 080 INSTANCE = new SmackReactor("DefaultReactor"); 081 } 082 return INSTANCE; 083 } 084 085 private final Selector selector; 086 private final String reactorName; 087 088 private final List<Reactor> reactorThreads = Collections.synchronizedList(new ArrayList<>()); 089 090 private final DelayQueue<ScheduledAction> scheduledActions = new DelayQueue<>(); 091 092 private final Lock registrationLock = new ReentrantLock(); 093 094 /** 095 * The semaphore protecting the handling of the actions. Note that it is 096 * initialized with -1, which basically means that one thread will always do I/O using 097 * select(). 098 */ 099 private final Semaphore actionsSemaphore = new Semaphore(-1, false); 100 101 private final Queue<SelectionKey> pendingSelectionKeys = new ConcurrentLinkedQueue<>(); 102 103 private final Queue<SetInterestOps> pendingSetInterestOps = new ConcurrentLinkedQueue<>(); 104 105 SmackReactor(String reactorName) { 106 this.reactorName = reactorName; 107 108 try { 109 selector = Selector.open(); 110 } 111 catch (IOException e) { 112 throw new IllegalStateException(e); 113 } 114 115 setReactorThreadCount(DEFAULT_REACTOR_THREAD_COUNT); 116 } 117 118 public SelectionKey registerWithSelector(SelectableChannel channel, int ops, ChannelSelectedCallback callback) 119 throws ClosedChannelException { 120 SelectionKeyAttachment selectionKeyAttachment = new SelectionKeyAttachment(callback); 121 122 registrationLock.lock(); 123 try { 124 selector.wakeup(); 125 return channel.register(selector, ops, selectionKeyAttachment); 126 } finally { 127 registrationLock.unlock(); 128 } 129 } 130 131 public void setInterestOps(SelectionKey selectionKey, int interestOps) { 132 SetInterestOps setInterestOps = new SetInterestOps(selectionKey, interestOps); 133 pendingSetInterestOps.add(setInterestOps); 134 selector.wakeup(); 135 } 136 137 private static final class SetInterestOps { 138 private final SelectionKey selectionKey; 139 private final int interestOps; 140 141 private SetInterestOps(SelectionKey selectionKey, int interestOps) { 142 this.selectionKey = selectionKey; 143 this.interestOps = interestOps; 144 } 145 } 146 147 @SuppressWarnings("JavaUtilDate") 148 ScheduledAction schedule(Runnable runnable, long delay, TimeUnit unit, ScheduledAction.Kind scheduledActionKind) { 149 long releaseTimeEpoch = System.currentTimeMillis() + unit.toMillis(delay); 150 Date releaseTimeDate = new Date(releaseTimeEpoch); 151 ScheduledAction scheduledAction = new ScheduledAction(runnable, releaseTimeDate, this, scheduledActionKind); 152 scheduledActions.add(scheduledAction); 153 selector.wakeup(); 154 return scheduledAction; 155 } 156 157 /** 158 * Cancels the scheduled action. 159 * 160 * @param scheduledAction the scheduled action to cancel. 161 * @return <code>true</code> if the scheduled action was still pending and got removed, <code>false</code> otherwise. 162 */ 163 boolean cancel(ScheduledAction scheduledAction) { 164 return scheduledActions.remove(scheduledAction); 165 } 166 167 private final class Reactor extends Thread { 168 169 private volatile long shutdownRequestTimestamp = -1; 170 171 @Override 172 public void run() { 173 try { 174 reactorLoop(); 175 } finally { 176 if (shutdownRequestTimestamp > 0) { 177 long shutDownDelay = System.currentTimeMillis() - shutdownRequestTimestamp; 178 LOGGER.info(this + " shut down after " + shutDownDelay + "ms"); 179 } else { 180 boolean contained = reactorThreads.remove(this); 181 assert contained; 182 } 183 } 184 } 185 186 private void reactorLoop() { 187 // Loop until reactor shutdown was requested. 188 while (shutdownRequestTimestamp < 0) { 189 handleScheduledActionsOrPerformSelect(); 190 191 handlePendingSelectionKeys(); 192 } 193 } 194 195 @SuppressWarnings("LockNotBeforeTry") 196 private void handleScheduledActionsOrPerformSelect() { 197 ScheduledAction dueScheduledAction = null; 198 199 boolean permitToHandleScheduledActions = actionsSemaphore.tryAcquire(); 200 if (permitToHandleScheduledActions) { 201 try { 202 dueScheduledAction = scheduledActions.poll(); 203 } finally { 204 actionsSemaphore.release(); 205 } 206 } 207 208 if (dueScheduledAction != null) { 209 dueScheduledAction.run(); 210 return; 211 } 212 213 int newSelectedKeysCount = 0; 214 List<SelectionKey> selectedKeys; 215 synchronized (selector) { 216 ScheduledAction nextScheduledAction = scheduledActions.peek(); 217 218 long selectWait; 219 if (nextScheduledAction == null) { 220 // There is no next scheduled action, wait indefinitely in select() or until another thread invokes 221 // selector.wakeup(). 222 selectWait = 0; 223 } else { 224 selectWait = nextScheduledAction.getTimeToDueMillis(); 225 if (selectWait <= 0) { 226 // A scheduled action was just released and became ready to execute. 227 return; 228 } 229 } 230 231 // Before we call select, we handle the pending the interest Ops. This will not block since no other 232 // thread is currently in select() at this time. 233 // Note: This was put deliberately before the registration lock. It may cause more synchronization but 234 // allows for more parallelism. 235 // Hopefully that assumption is right. 236 int myHandledPendingSetInterestOps = 0; 237 for (SetInterestOps setInterestOps; (setInterestOps = pendingSetInterestOps.poll()) != null;) { 238 setInterestOpsCancelledKeySafe(setInterestOps.selectionKey, setInterestOps.interestOps); 239 240 if (myHandledPendingSetInterestOps++ >= PENDING_SET_INTEREST_OPS_MAX_BATCH_SIZE) { 241 // This thread has handled enough "set pending interest ops" requests. Wakeup another one to 242 // handle the remaining (if any). 243 selector.wakeup(); 244 break; 245 } 246 } 247 248 // Ensure that a wakeup() in registerWithSelector() gives the corresponding 249 // register() in the same method the chance to actually register the channel. In 250 // other words: This construct ensures that there is never another select() 251 // between a corresponding wakeup() and register() calls. 252 // See also https://stackoverflow.com/a/1112809/194894 253 registrationLock.lock(); 254 registrationLock.unlock(); 255 256 try { 257 newSelectedKeysCount = selector.select(selectWait); 258 } catch (IOException e) { 259 LOGGER.log(Level.SEVERE, "IOException while using select()", e); 260 return; 261 } 262 263 if (newSelectedKeysCount == 0) { 264 return; 265 } 266 267 // Copy the selected-key set over to selectedKeys, remove the keys from the 268 // selected key set and loose interest of the key OPs for the time being. 269 // Note that we perform this operation in two steps in order to maximize the 270 // timespan setRacing() is set. 271 Set<SelectionKey> selectedKeySet = selector.selectedKeys(); 272 for (SelectionKey selectionKey : selectedKeySet) { 273 SelectionKeyAttachment selectionKeyAttachment = (SelectionKeyAttachment) selectionKey.attachment(); 274 selectionKeyAttachment.setRacing(); 275 } 276 for (SelectionKey selectionKey : selectedKeySet) { 277 setInterestOpsCancelledKeySafe(selectionKey, 0); 278 } 279 280 selectedKeys = new ArrayList<>(selectedKeySet); 281 selectedKeySet.clear(); 282 } 283 284 int selectedKeysCount = selectedKeys.size(); 285 int currentReactorThreadCount = reactorThreads.size(); 286 int myKeyCount; 287 if (selectedKeysCount > currentReactorThreadCount) { 288 myKeyCount = selectedKeysCount / currentReactorThreadCount; 289 } else { 290 myKeyCount = selectedKeysCount; 291 } 292 293 final Level reactorSelectStatsLogLevel = Level.FINE; 294 if (LOGGER.isLoggable(reactorSelectStatsLogLevel)) { 295 LOGGER.log(reactorSelectStatsLogLevel, 296 "New selected key count: " + newSelectedKeysCount 297 + ". Total selected key count " + selectedKeysCount 298 + ". My key count: " + myKeyCount 299 + ". Current reactor thread count: " + currentReactorThreadCount); 300 } 301 302 Collection<SelectionKey> mySelectedKeys = new ArrayList<>(myKeyCount); 303 Iterator<SelectionKey> it = selectedKeys.iterator(); 304 for (int i = 0; i < myKeyCount; i++) { 305 SelectionKey selectionKey = it.next(); 306 mySelectedKeys.add(selectionKey); 307 } 308 while (it.hasNext()) { 309 // Drain to pendingSelectionKeys. 310 SelectionKey selectionKey = it.next(); 311 pendingSelectionKeys.add(selectionKey); 312 } 313 314 if (selectedKeysCount - myKeyCount > 0) { 315 // There where pending selection keys: Wakeup another reactor thread to handle them. 316 selector.wakeup(); 317 } 318 319 handleSelectedKeys(mySelectedKeys); 320 } 321 322 private void handlePendingSelectionKeys() { 323 final int pendingSelectionKeysSize = pendingSelectionKeys.size(); 324 if (pendingSelectionKeysSize == 0) { 325 return; 326 } 327 328 int currentReactorThreadCount = reactorThreads.size(); 329 int myKeyCount = pendingSelectionKeysSize / currentReactorThreadCount; 330 // The division could result in myKeyCount being zero, even though there are pending selection keys. 331 // Therefore, ensure that this thread tries to get at least one pending selection key by invoking poll(). 332 // Otherwise, it could happen that we end up in a busy loop, where myKeyCount is zero and this thread invokes 333 // selector.wakeup() below because pendingSelectionsKeys is not empty, but the woken up reactor thread wil 334 // end up with myKeyCount being zero again, restarting the busy-loop cycle. 335 if (myKeyCount == 0) myKeyCount = 1; 336 Collection<SelectionKey> selectedKeys = new ArrayList<>(myKeyCount); 337 for (int i = 0; i < myKeyCount; i++) { 338 SelectionKey selectionKey = pendingSelectionKeys.poll(); 339 if (selectionKey == null) { 340 // We lost a race and can abort here since the pendingSelectionKeys queue is empty. 341 break; 342 } 343 selectedKeys.add(selectionKey); 344 } 345 346 if (!pendingSelectionKeys.isEmpty()) { 347 // There are more pending selection keys, wakeup a thread blocked in select() to handle them. 348 selector.wakeup(); 349 } 350 351 handleSelectedKeys(selectedKeys); 352 } 353 354 private void setInterestOpsCancelledKeySafe(SelectionKey selectionKey, int interestOps) { 355 try { 356 selectionKey.interestOps(interestOps); 357 } 358 catch (CancelledKeyException e) { 359 final Level keyCancelledLogLevel = Level.FINER; 360 if (LOGGER.isLoggable(keyCancelledLogLevel)) { 361 LOGGER.log(keyCancelledLogLevel, "Key '" + selectionKey + "' has been cancelled", e); 362 } 363 } 364 } 365 366 void requestShutdown() { 367 shutdownRequestTimestamp = System.currentTimeMillis(); 368 } 369 } 370 371 private static void handleSelectedKeys(Collection<SelectionKey> selectedKeys) { 372 for (SelectionKey selectionKey : selectedKeys) { 373 SelectableChannel channel = selectionKey.channel(); 374 SelectionKeyAttachment selectionKeyAttachment = (SelectionKeyAttachment) selectionKey.attachment(); 375 ChannelSelectedCallback channelSelectedCallback = selectionKeyAttachment.channelSelectedCallback; 376 channelSelectedCallback.onChannelSelected(channel, selectionKey); 377 } 378 } 379 380 public interface ChannelSelectedCallback { 381 void onChannelSelected(SelectableChannel channel, SelectionKey selectionKey); 382 } 383 384 public void setReactorThreadCount(int reactorThreadCount) { 385 if (reactorThreadCount < 2) { 386 throw new IllegalArgumentException("Must have at least two reactor threads, but you requested " + reactorThreadCount); 387 } 388 389 synchronized (reactorThreads) { 390 int deltaThreads = reactorThreadCount - reactorThreads.size(); 391 if (deltaThreads > 0) { 392 // Start new reactor thread. Note that we start the threads before we increase the permits of the 393 // actionsSemaphore. 394 for (int i = 0; i < deltaThreads; i++) { 395 Reactor reactor = new Reactor(); 396 reactor.setDaemon(true); 397 reactor.setName("Smack " + reactorName + " Thread #" + i); 398 reactorThreads.add(reactor); 399 reactor.start(); 400 } 401 402 actionsSemaphore.release(deltaThreads); 403 } else { 404 // Stop existing reactor threads. First we change the sign of deltaThreads, then we decrease the permits 405 // of the actionsSemaphore *before* we signal the selected reactor threads that they should shut down. 406 deltaThreads -= deltaThreads; 407 408 for (int i = deltaThreads - 1; i > 0; i--) { 409 // Note that this could potentially block forever, starving on the unfair semaphore. 410 actionsSemaphore.acquireUninterruptibly(); 411 } 412 413 for (int i = deltaThreads - 1; i > 0; i--) { 414 Reactor reactor = reactorThreads.remove(i); 415 reactor.requestShutdown(); 416 } 417 418 selector.wakeup(); 419 } 420 } 421 } 422 423 public static final class SelectionKeyAttachment { 424 private final ChannelSelectedCallback channelSelectedCallback; 425 private final AtomicBoolean reactorThreadRacing = new AtomicBoolean(); 426 427 private SelectionKeyAttachment(ChannelSelectedCallback channelSelectedCallback) { 428 this.channelSelectedCallback = channelSelectedCallback; 429 } 430 431 private void setRacing() { 432 // We use lazySet here since it is sufficient if the value does not become visible immediately. 433 reactorThreadRacing.lazySet(true); 434 } 435 436 public void resetReactorThreadRacing() { 437 reactorThreadRacing.set(false); 438 } 439 440 public boolean isReactorThreadRacing() { 441 return reactorThreadRacing.get(); 442 } 443 444 } 445}