001/** 002 * 003 * Copyright 2019-2024 Florian Schmaus 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack.tcp; 018 019import java.io.IOException; 020import java.net.InetSocketAddress; 021import java.nio.Buffer; 022import java.nio.ByteBuffer; 023import java.nio.channels.ClosedChannelException; 024import java.nio.channels.SelectableChannel; 025import java.nio.channels.SelectionKey; 026import java.nio.channels.SocketChannel; 027import java.security.cert.CertificateException; 028import java.util.ArrayList; 029import java.util.Collection; 030import java.util.Collections; 031import java.util.IdentityHashMap; 032import java.util.Iterator; 033import java.util.List; 034import java.util.ListIterator; 035import java.util.Map; 036import java.util.concurrent.atomic.AtomicInteger; 037import java.util.concurrent.atomic.LongAdder; 038import java.util.concurrent.locks.ReentrantLock; 039import java.util.logging.Level; 040import java.util.logging.Logger; 041 042import javax.net.ssl.SSLEngine; 043import javax.net.ssl.SSLEngineResult; 044import javax.net.ssl.SSLException; 045import javax.net.ssl.SSLSession; 046 047import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode; 048import org.jivesoftware.smack.SmackException; 049import org.jivesoftware.smack.SmackException.NoResponseException; 050import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException; 051import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException; 052import org.jivesoftware.smack.SmackException.SmackCertificateException; 053import org.jivesoftware.smack.SmackException.SmackWrappedException; 054import org.jivesoftware.smack.SmackFuture; 055import org.jivesoftware.smack.SmackFuture.InternalSmackFuture; 056import org.jivesoftware.smack.SmackReactor.SelectionKeyAttachment; 057import org.jivesoftware.smack.XMPPException; 058import org.jivesoftware.smack.XmppInputOutputFilter; 059import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnection.ConnectedButUnauthenticatedStateDescriptor; 060import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnection.LookupRemoteConnectionEndpointsStateDescriptor; 061import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnectionModule; 062import org.jivesoftware.smack.c2s.StreamOpenAndCloseFactory; 063import org.jivesoftware.smack.c2s.XmppClientToServerTransport; 064import org.jivesoftware.smack.c2s.internal.ModularXmppClientToServerConnectionInternal; 065import org.jivesoftware.smack.c2s.internal.WalkStateGraphContext; 066import org.jivesoftware.smack.debugger.SmackDebugger; 067import org.jivesoftware.smack.fsm.State; 068import org.jivesoftware.smack.fsm.StateDescriptor; 069import org.jivesoftware.smack.fsm.StateTransitionResult; 070import org.jivesoftware.smack.internal.SmackTlsContext; 071import org.jivesoftware.smack.packet.Stanza; 072import org.jivesoftware.smack.packet.StartTls; 073import org.jivesoftware.smack.packet.StreamClose; 074import org.jivesoftware.smack.packet.StreamOpen; 075import org.jivesoftware.smack.packet.TlsFailure; 076import org.jivesoftware.smack.packet.TlsProceed; 077import org.jivesoftware.smack.packet.TopLevelStreamElement; 078import org.jivesoftware.smack.packet.XmlEnvironment; 079import org.jivesoftware.smack.tcp.XmppTcpTransportModule.XmppTcpNioTransport.DiscoveredTcpEndpoints; 080import org.jivesoftware.smack.tcp.rce.RemoteXmppTcpConnectionEndpoints; 081import org.jivesoftware.smack.tcp.rce.RemoteXmppTcpConnectionEndpoints.Result; 082import org.jivesoftware.smack.tcp.rce.Rfc6120TcpRemoteConnectionEndpoint; 083import org.jivesoftware.smack.util.CollectionUtil; 084import org.jivesoftware.smack.util.StringUtils; 085import org.jivesoftware.smack.util.UTF8; 086import org.jivesoftware.smack.util.XmlStringBuilder; 087import org.jivesoftware.smack.util.rce.RemoteConnectionEndpointLookupFailure; 088 089import org.jxmpp.jid.DomainBareJid; 090import org.jxmpp.jid.Jid; 091import org.jxmpp.jid.util.JidUtil; 092import org.jxmpp.xml.splitter.Utf8ByteXmppXmlSplitter; 093import org.jxmpp.xml.splitter.XmlPrettyPrinter; 094import org.jxmpp.xml.splitter.XmlPrinter; 095import org.jxmpp.xml.splitter.XmppElementCallback; 096import org.jxmpp.xml.splitter.XmppXmlSplitter; 097 098public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionModule<XmppTcpTransportModuleDescriptor> { 099 100 private static final Logger LOGGER = Logger.getLogger(XmppTcpTransportModule.class.getName()); 101 102 private static final int CALLBACK_MAX_BYTES_READ = 10 * 1024 * 1024; 103 private static final int CALLBACK_MAX_BYTES_WRITEN = CALLBACK_MAX_BYTES_READ; 104 105 private static final int MAX_ELEMENT_SIZE = 64 * 1024; 106 107 private final XmppTcpNioTransport tcpNioTransport; 108 109 private SelectionKey selectionKey; 110 private SelectionKeyAttachment selectionKeyAttachment; 111 private SocketChannel socketChannel; 112 private InetSocketAddress remoteAddress; 113 114 private TlsState tlsState; 115 116 private Iterator<CharSequence> outgoingCharSequenceIterator; 117 118 private final List<TopLevelStreamElement> currentlyOutgoingElements = new ArrayList<>(); 119 private final IdentityHashMap<ByteBuffer, List<TopLevelStreamElement>> bufferToElementMap = new IdentityHashMap<>(); 120 121 private ByteBuffer outgoingBuffer; 122 private ByteBuffer filteredOutgoingBuffer; 123 private final List<ByteBuffer> networkOutgoingBuffers = new ArrayList<>(); 124 private long networkOutgoingBuffersBytes; 125 126 // TODO: Make the size of the incomingBuffer configurable. 127 private final ByteBuffer incomingBuffer = ByteBuffer.allocateDirect(2 * 4096); 128 129 private final ReentrantLock channelSelectedCallbackLock = new ReentrantLock(); 130 131 private long totalBytesRead; 132 private long totalBytesWritten; 133 private long totalBytesReadAfterFilter; 134 private long totalBytesWrittenBeforeFilter; 135 private long handledChannelSelectedCallbacks; 136 private long callbackPreemtBecauseBytesWritten; 137 private long callbackPreemtBecauseBytesRead; 138 private int sslEngineDelegatedTasks; 139 private int maxPendingSslEngineDelegatedTasks; 140 141 private final LongAdder setWriteInterestAfterChannelSelectedCallback = new LongAdder(); 142 private final LongAdder reactorThreadAlreadyRacing = new LongAdder(); 143 private final LongAdder afterOutgoingElementsQueueModifiedSetInterestOps = new LongAdder(); 144 private final LongAdder rejectedChannelSelectedCallbacks = new LongAdder(); 145 146 private Jid lastDestinationAddress; 147 148 private boolean pendingInputFilterData; 149 private boolean pendingOutputFilterData; 150 151 private boolean pendingWriteInterestAfterRead; 152 153 /** 154 * Note that this field is effective final, but due to https://stackoverflow.com/q/30360824/194894 we have to declare it non-final. 155 */ 156 private Utf8ByteXmppXmlSplitter splitter; 157 158 /** 159 * Note that this field is effective final, but due to https://stackoverflow.com/q/30360824/194894 we have to declare it non-final. 160 */ 161 private XmppXmlSplitter outputDebugSplitter; 162 163 private static final Level STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL = Level.FINER; 164 165 XmppTcpTransportModule(XmppTcpTransportModuleDescriptor moduleDescriptor, ModularXmppClientToServerConnectionInternal connectionInternal) { 166 super(moduleDescriptor, connectionInternal); 167 168 tcpNioTransport = new XmppTcpNioTransport(connectionInternal); 169 170 XmlPrinter incomingDebugPrettyPrinter = null; 171 final SmackDebugger debugger = connectionInternal.smackDebugger; 172 if (debugger != null) { 173 // Incoming stream debugging. 174 incomingDebugPrettyPrinter = XmlPrettyPrinter.builder() 175 .setPrettyWriter(sb -> debugger.incomingStreamSink(sb)) 176 .build(); 177 178 // Outgoing stream debugging. 179 XmlPrinter outgoingDebugPrettyPrinter = XmlPrettyPrinter.builder() 180 .setPrettyWriter(sb -> debugger.outgoingStreamSink(sb)) 181 .build(); 182 outputDebugSplitter = new XmppXmlSplitter(outgoingDebugPrettyPrinter); 183 } 184 185 XmppXmlSplitter xmppXmlSplitter = new XmppXmlSplitter(MAX_ELEMENT_SIZE, xmppElementCallback, 186 incomingDebugPrettyPrinter); 187 splitter = new Utf8ByteXmppXmlSplitter(xmppXmlSplitter); 188 } 189 190 private final XmppElementCallback xmppElementCallback = new XmppElementCallback() { 191 private String streamOpen; 192 private String streamClose; 193 194 @Override 195 public void onCompleteElement(String completeElement) { 196 assert streamOpen != null; 197 assert streamClose != null; 198 199 connectionInternal.withSmackDebugger(debugger -> debugger.onIncomingElementCompleted()); 200 201 String wrappedCompleteElement = streamOpen + completeElement + streamClose; 202 connectionInternal.parseAndProcessElement(wrappedCompleteElement); 203 } 204 205 206 @Override 207 public void streamOpened(String prefix, Map<String, String> attributes) { 208 if (LOGGER.isLoggable(STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL)) { 209 LOGGER.log(STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL, 210 "Stream of " + this + " opened. prefix=" + prefix + " attributes=" + attributes); 211 } 212 213 final String prefixXmlns = "xmlns:" + prefix; 214 // TODO: Use the return value of onStreamOpen(), which now returns the 215 // corresponding stream close tag, instead of creating it here. 216 final StringBuilder streamClose = new StringBuilder(32); 217 final StringBuilder streamOpen = new StringBuilder(256); 218 219 streamOpen.append('<'); 220 streamClose.append("</"); 221 if (StringUtils.isNotEmpty(prefix)) { 222 streamOpen.append(prefix).append(':'); 223 streamClose.append(prefix).append(':'); 224 } 225 streamOpen.append("stream"); 226 streamClose.append("stream>"); 227 for (Map.Entry<String, String> entry : attributes.entrySet()) { 228 String attributeName = entry.getKey(); 229 String attributeValue = entry.getValue(); 230 switch (attributeName) { 231 case "to": 232 case "from": 233 case "id": 234 case "version": 235 break; 236 case "xml:lang": 237 streamOpen.append(" xml:lang='").append(attributeValue).append('\''); 238 break; 239 case "xmlns": 240 streamOpen.append(" xmlns='").append(attributeValue).append('\''); 241 break; 242 default: 243 if (attributeName.equals(prefixXmlns)) { 244 streamOpen.append(' ').append(prefixXmlns).append("='").append(attributeValue).append('\''); 245 break; 246 } 247 LOGGER.info("Unknown <stream/> attribute: " + attributeName); 248 break; 249 } 250 } 251 streamOpen.append('>'); 252 253 this.streamOpen = streamOpen.toString(); 254 this.streamClose = streamClose.toString(); 255 256 connectionInternal.onStreamOpen(this.streamOpen); 257 } 258 259 @Override 260 public void streamClosed() { 261 if (LOGGER.isLoggable(STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL)) { 262 LOGGER.log(STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL, "Stream of " + this + " closed"); 263 } 264 265 connectionInternal.onStreamClosed(); 266 } 267 }; 268 269 private void onChannelSelected(SelectableChannel selectedChannel, SelectionKey selectedSelectionKey) { 270 assert selectionKey == null || selectionKey == selectedSelectionKey; 271 SocketChannel selectedSocketChannel = (SocketChannel) selectedChannel; 272 // We are *always* interested in OP_READ. 273 int newInterestedOps = SelectionKey.OP_READ; 274 boolean newPendingOutputFilterData = false; 275 276 if (!channelSelectedCallbackLock.tryLock()) { 277 rejectedChannelSelectedCallbacks.increment(); 278 return; 279 } 280 281 handledChannelSelectedCallbacks++; 282 283 long callbackBytesRead = 0; 284 long callbackBytesWritten = 0; 285 286 try { 287 boolean destinationAddressChanged = false; 288 boolean isLastPartOfElement = false; 289 TopLevelStreamElement currentlyOutgonigTopLevelStreamElement = null; 290 StringBuilder outgoingStreamForDebugger = null; 291 292 writeLoop: while (true) { 293 final boolean moreDataAvailable = !isLastPartOfElement || !connectionInternal.outgoingElementsQueue.isEmpty(); 294 295 if (filteredOutgoingBuffer != null || !networkOutgoingBuffers.isEmpty()) { 296 if (filteredOutgoingBuffer != null) { 297 networkOutgoingBuffers.add(filteredOutgoingBuffer); 298 networkOutgoingBuffersBytes += filteredOutgoingBuffer.remaining(); 299 300 filteredOutgoingBuffer = null; 301 if (moreDataAvailable && networkOutgoingBuffersBytes < 8096) { 302 continue; 303 } 304 } 305 306 ByteBuffer[] output = networkOutgoingBuffers.toArray(new ByteBuffer[networkOutgoingBuffers.size()]); 307 long bytesWritten; 308 try { 309 bytesWritten = selectedSocketChannel.write(output); 310 } catch (IOException e) { 311 // We have seen here so far 312 // - IOException "Broken pipe" 313 handleReadWriteIoException(e); 314 break; 315 } 316 317 if (bytesWritten == 0) { 318 newInterestedOps |= SelectionKey.OP_WRITE; 319 break; 320 } 321 322 callbackBytesWritten += bytesWritten; 323 324 networkOutgoingBuffersBytes -= bytesWritten; 325 326 List<? extends Buffer> prunedBuffers = pruneBufferList(networkOutgoingBuffers); 327 328 for (Buffer prunedBuffer : prunedBuffers) { 329 List<TopLevelStreamElement> sendElements = bufferToElementMap.remove(prunedBuffer); 330 if (sendElements == null) { 331 continue; 332 } 333 for (TopLevelStreamElement elementJustSend : sendElements) { 334 connectionInternal.fireFirstLevelElementSendListeners(elementJustSend); 335 } 336 } 337 338 // Prevent one callback from dominating the reactor thread. Break out of the write-loop if we have 339 // written a certain amount. 340 if (callbackBytesWritten > CALLBACK_MAX_BYTES_WRITEN) { 341 newInterestedOps |= SelectionKey.OP_WRITE; 342 callbackPreemtBecauseBytesWritten++; 343 break; 344 } 345 } else if (outgoingBuffer != null || pendingOutputFilterData) { 346 pendingOutputFilterData = false; 347 348 if (outgoingBuffer != null) { 349 totalBytesWrittenBeforeFilter += outgoingBuffer.remaining(); 350 if (isLastPartOfElement) { 351 assert currentlyOutgonigTopLevelStreamElement != null; 352 currentlyOutgoingElements.add(currentlyOutgonigTopLevelStreamElement); 353 } 354 } 355 356 ByteBuffer outputFilterInputData = outgoingBuffer; 357 // We can now null the outgoingBuffer since the filter step will take care of it from now on. 358 outgoingBuffer = null; 359 360 for (ListIterator<XmppInputOutputFilter> it = connectionInternal.getXmppInputOutputFilterBeginIterator(); it.hasNext();) { 361 XmppInputOutputFilter inputOutputFilter = it.next(); 362 XmppInputOutputFilter.OutputResult outputResult; 363 try { 364 outputResult = inputOutputFilter.output(outputFilterInputData, isLastPartOfElement, 365 destinationAddressChanged, moreDataAvailable); 366 } catch (IOException e) { 367 connectionInternal.notifyConnectionError(e); 368 break writeLoop; 369 } 370 newPendingOutputFilterData |= outputResult.pendingFilterData; 371 outputFilterInputData = outputResult.filteredOutputData; 372 if (outputFilterInputData != null) { 373 ((java.nio.Buffer) outputFilterInputData).flip(); 374 } 375 } 376 377 // It is ok if outputFilterInputData is 'null' here, this is expected behavior. 378 if (outputFilterInputData != null && outputFilterInputData.hasRemaining()) { 379 filteredOutgoingBuffer = outputFilterInputData; 380 } else { 381 filteredOutgoingBuffer = null; 382 } 383 384 // If the filters did eventually not produce any output data but if there is 385 // pending output data then we have a pending write request after read. 386 if (filteredOutgoingBuffer == null && newPendingOutputFilterData) { 387 pendingWriteInterestAfterRead = true; 388 } 389 390 if (filteredOutgoingBuffer != null && isLastPartOfElement) { 391 bufferToElementMap.put(filteredOutgoingBuffer, new ArrayList<>(currentlyOutgoingElements)); 392 currentlyOutgoingElements.clear(); 393 } 394 395 // Reset that the destination address has changed. 396 if (destinationAddressChanged) { 397 destinationAddressChanged = false; 398 } 399 } else if (outgoingCharSequenceIterator != null) { 400 CharSequence nextCharSequence = outgoingCharSequenceIterator.next(); 401 outgoingBuffer = UTF8.encode(nextCharSequence); 402 if (!outgoingCharSequenceIterator.hasNext()) { 403 outgoingCharSequenceIterator = null; 404 isLastPartOfElement = true; 405 } else { 406 isLastPartOfElement = false; 407 } 408 409 final SmackDebugger debugger = connectionInternal.smackDebugger; 410 if (debugger != null) { 411 if (outgoingStreamForDebugger == null) { 412 outgoingStreamForDebugger = new StringBuilder(); 413 } 414 outgoingStreamForDebugger.append(nextCharSequence); 415 416 if (isLastPartOfElement) { 417 try { 418 outputDebugSplitter.append(outgoingStreamForDebugger); 419 } catch (IOException e) { 420 throw new AssertionError(e); 421 } 422 debugger.onOutgoingElementCompleted(); 423 outgoingStreamForDebugger = null; 424 } 425 } 426 } else if (!connectionInternal.outgoingElementsQueue.isEmpty()) { 427 currentlyOutgonigTopLevelStreamElement = connectionInternal.outgoingElementsQueue.poll(); 428 if (currentlyOutgonigTopLevelStreamElement instanceof Stanza) { 429 Stanza currentlyOutgoingStanza = (Stanza) currentlyOutgonigTopLevelStreamElement; 430 Jid currentDestinationAddress = currentlyOutgoingStanza.getTo(); 431 destinationAddressChanged = !JidUtil.equals(lastDestinationAddress, currentDestinationAddress); 432 lastDestinationAddress = currentDestinationAddress; 433 } 434 CharSequence nextCharSequence = currentlyOutgonigTopLevelStreamElement.toXML(StreamOpen.CLIENT_NAMESPACE); 435 if (nextCharSequence instanceof XmlStringBuilder) { 436 XmlStringBuilder xmlStringBuilder = (XmlStringBuilder) nextCharSequence; 437 XmlEnvironment outgoingStreamXmlEnvironment = connectionInternal.getOutgoingStreamXmlEnvironment(); 438 outgoingCharSequenceIterator = xmlStringBuilder.toList(outgoingStreamXmlEnvironment).iterator(); 439 } else { 440 outgoingCharSequenceIterator = Collections.singletonList(nextCharSequence).iterator(); 441 } 442 assert outgoingCharSequenceIterator != null; 443 } else { 444 // There is nothing more to write. 445 break; 446 } 447 } 448 449 pendingOutputFilterData = newPendingOutputFilterData; 450 if (!pendingWriteInterestAfterRead && pendingOutputFilterData) { 451 newInterestedOps |= SelectionKey.OP_WRITE; 452 } 453 454 readLoop: while (true) { 455 // Prevent one callback from dominating the reactor thread. Break out of the read-loop if we have 456 // read a certain amount. 457 if (callbackBytesRead > CALLBACK_MAX_BYTES_READ) { 458 callbackPreemtBecauseBytesRead++; 459 break; 460 } 461 462 int bytesRead; 463 ((java.nio.Buffer) incomingBuffer).clear(); 464 try { 465 bytesRead = selectedSocketChannel.read(incomingBuffer); 466 } catch (IOException e) { 467 handleReadWriteIoException(e); 468 return; 469 } 470 471 if (bytesRead < 0) { 472 LOGGER.finer("NIO read() returned " + bytesRead 473 + " for " + this + ". This probably means that the TCP connection was terminated."); 474 // According to the socket channel javadoc section about "asynchronous reads" a socket channel's 475 // read() may return -1 if the input side of a socket is shut down. 476 // Note that we do not call notifyConnectionError() here because the connection may be 477 // cleanly shutdown which would also cause read() to return '-1. I assume that this socket 478 // will be selected again, on which read() would throw an IOException, which will be caught 479 // and invoke notifyConnectionError() (see a few lines above). 480 /* 481 IOException exception = new IOException("NIO read() returned " + bytesRead); 482 notifyConnectionError(exception); 483 */ 484 return; 485 } 486 487 if (!pendingInputFilterData) { 488 if (bytesRead == 0) { 489 // Nothing more to read. 490 break; 491 } 492 } else { 493 pendingInputFilterData = false; 494 } 495 496 if (pendingWriteInterestAfterRead) { 497 // We have successfully read something and someone announced a write interest after a read. It is 498 // now possible that a filter is now also able to write additional data (for example SSLEngine). 499 pendingWriteInterestAfterRead = false; 500 newInterestedOps |= SelectionKey.OP_WRITE; 501 } 502 503 callbackBytesRead += bytesRead; 504 505 ByteBuffer filteredIncomingBuffer = incomingBuffer; 506 for (ListIterator<XmppInputOutputFilter> it = connectionInternal.getXmppInputOutputFilterEndIterator(); it.hasPrevious();) { 507 ((java.nio.Buffer) filteredIncomingBuffer).flip(); 508 509 ByteBuffer newFilteredIncomingBuffer; 510 try { 511 newFilteredIncomingBuffer = it.previous().input(filteredIncomingBuffer); 512 } catch (IOException e) { 513 connectionInternal.notifyConnectionError(e); 514 return; 515 } 516 if (newFilteredIncomingBuffer == null) { 517 break readLoop; 518 } 519 filteredIncomingBuffer = newFilteredIncomingBuffer; 520 } 521 522 ((java.nio.Buffer) filteredIncomingBuffer).flip(); 523 final int bytesReadAfterFilter = filteredIncomingBuffer.remaining(); 524 525 totalBytesReadAfterFilter += bytesReadAfterFilter; 526 527 try { 528 splitter.write(filteredIncomingBuffer); 529 } catch (IOException e) { 530 connectionInternal.notifyConnectionError(e); 531 return; 532 } 533 } 534 } finally { 535 totalBytesWritten += callbackBytesWritten; 536 totalBytesRead += callbackBytesRead; 537 538 channelSelectedCallbackLock.unlock(); 539 } 540 541 // Indicate that there is no reactor thread racing towards handling this selection key. 542 final SelectionKeyAttachment selectionKeyAttachment = this.selectionKeyAttachment; 543 if (selectionKeyAttachment != null) { 544 selectionKeyAttachment.resetReactorThreadRacing(); 545 } 546 547 // Check the queue again to prevent lost wakeups caused by elements inserted before we 548 // called resetReactorThreadRacing() a few lines above. 549 if (!connectionInternal.outgoingElementsQueue.isEmpty()) { 550 setWriteInterestAfterChannelSelectedCallback.increment(); 551 newInterestedOps |= SelectionKey.OP_WRITE; 552 } 553 554 connectionInternal.setInterestOps(selectionKey, newInterestedOps); 555 } 556 557 private void handleReadWriteIoException(IOException e) { 558 if (e instanceof ClosedChannelException && !tcpNioTransport.isConnected()) { 559 // The connection is already closed. 560 return; 561 } 562 563 connectionInternal.notifyConnectionError(e); 564 } 565 566 /** 567 * This is the interface between the "lookup remote connection endpoints" state and the "establish TCP connection" 568 * state. The field is indirectly populated by {@link XmppTcpNioTransport#lookupConnectionEndpoints()} and consumed 569 * by {@link ConnectionAttemptState}. 570 */ 571 DiscoveredTcpEndpoints discoveredTcpEndpoints; 572 573 final class XmppTcpNioTransport extends XmppClientToServerTransport { 574 575 XmppTcpNioTransport(ModularXmppClientToServerConnectionInternal connectionInternal) { 576 super(connectionInternal); 577 } 578 579 @Override 580 public StreamOpenAndCloseFactory getStreamOpenAndCloseFactory() { 581 return new StreamOpenAndCloseFactory() { 582 @Override 583 public StreamOpen createStreamOpen(DomainBareJid to, CharSequence from, String id, String lang) { 584 String xmlLang = connectionInternal.connection.getConfiguration().getXmlLang(); 585 StreamOpen streamOpen = new StreamOpen(to, from, id, xmlLang, StreamOpen.StreamContentNamespace.client); 586 return streamOpen; 587 } 588 @Override 589 public StreamClose createStreamClose() { 590 return StreamClose.INSTANCE; 591 } 592 }; 593 } 594 595 @Override 596 protected void resetDiscoveredConnectionEndpoints() { 597 discoveredTcpEndpoints = null; 598 } 599 600 @Override 601 public boolean hasUseableConnectionEndpoints() { 602 return discoveredTcpEndpoints != null; 603 } 604 605 @Override 606 protected List<SmackFuture<LookupConnectionEndpointsResult, Exception>> lookupConnectionEndpoints() { 607 // Assert that there are no stale discovered endpoints prior performing the lookup. 608 assert discoveredTcpEndpoints == null; 609 610 List<SmackFuture<LookupConnectionEndpointsResult, Exception>> futures = new ArrayList<>(2); 611 612 InternalSmackFuture<LookupConnectionEndpointsResult, Exception> tcpEndpointsLookupFuture = new InternalSmackFuture<>(); 613 connectionInternal.asyncGo(() -> { 614 Result<Rfc6120TcpRemoteConnectionEndpoint> result = RemoteXmppTcpConnectionEndpoints.lookup( 615 connectionInternal.connection.getConfiguration()); 616 617 LookupConnectionEndpointsResult endpointsResult; 618 if (result.discoveredRemoteConnectionEndpoints.isEmpty()) { 619 endpointsResult = new TcpEndpointDiscoveryFailed(result); 620 } else { 621 endpointsResult = new DiscoveredTcpEndpoints(result); 622 } 623 tcpEndpointsLookupFuture.setResult(endpointsResult); 624 }); 625 futures.add(tcpEndpointsLookupFuture); 626 627 if (moduleDescriptor.isDirectTlsEnabled()) { 628 // TODO: Implement this. 629 throw new IllegalArgumentException("DirectTLS is not implemented yet"); 630 } 631 632 return futures; 633 } 634 635 @Override 636 protected void loadConnectionEndpoints(LookupConnectionEndpointsSuccess lookupConnectionEndpointsSuccess) { 637 // The API contract stats that we will be given the instance we handed out with lookupConnectionEndpoints, 638 // which must be of type DiscoveredTcpEndpoints here. Hence, if we can not cast it, then there is an internal 639 // Smack error. 640 discoveredTcpEndpoints = (DiscoveredTcpEndpoints) lookupConnectionEndpointsSuccess; 641 } 642 643 @Override 644 protected void afterFiltersClosed() { 645 pendingInputFilterData = pendingOutputFilterData = true; 646 afterOutgoingElementsQueueModified(); 647 } 648 649 @Override 650 protected void disconnect() { 651 XmppTcpTransportModule.this.closeSocketAndCleanup(); 652 } 653 654 @Override 655 protected void notifyAboutNewOutgoingElements() { 656 afterOutgoingElementsQueueModified(); 657 } 658 659 @Override 660 public SSLSession getSslSession() { 661 TlsState tlsState = XmppTcpTransportModule.this.tlsState; 662 if (tlsState == null) { 663 return null; 664 } 665 666 return tlsState.engine.getSession(); 667 } 668 669 public boolean isConnected() { 670 SocketChannel socketChannel = XmppTcpTransportModule.this.socketChannel; 671 if (socketChannel == null) { 672 return false; 673 } 674 675 return socketChannel.isConnected(); 676 } 677 678 @Override 679 public boolean isTransportSecured() { 680 final TlsState tlsState = XmppTcpTransportModule.this.tlsState; 681 return tlsState != null && tlsState.handshakeStatus == TlsHandshakeStatus.successful; 682 } 683 684 @Override 685 public XmppTcpTransportModule.Stats getStats() { 686 return XmppTcpTransportModule.this.getStats(); 687 } 688 689 final class DiscoveredTcpEndpoints implements LookupConnectionEndpointsSuccess { 690 final RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result; 691 DiscoveredTcpEndpoints(RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result) { 692 this.result = result; 693 } 694 } 695 696 final class TcpEndpointDiscoveryFailed implements LookupConnectionEndpointsFailed { 697 final List<RemoteConnectionEndpointLookupFailure> lookupFailures; 698 TcpEndpointDiscoveryFailed(RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result) { 699 lookupFailures = result.lookupFailures; 700 } 701 } 702 } 703 704 private void afterOutgoingElementsQueueModified() { 705 final SelectionKeyAttachment selectionKeyAttachment = this.selectionKeyAttachment; 706 if (selectionKeyAttachment != null && selectionKeyAttachment.isReactorThreadRacing()) { 707 // A reactor thread is already racing to the channel selected callback and will take care of this. 708 reactorThreadAlreadyRacing.increment(); 709 return; 710 } 711 712 afterOutgoingElementsQueueModifiedSetInterestOps.increment(); 713 714 // Add OP_WRITE to the interested Ops, since we have now new things to write. Note that this may cause 715 // multiple reactor threads to race to the channel selected callback in case we perform this right after 716 // a select() returned with this selection key in the selected-key set. Hence, we use tryLock() in the 717 // channel selected callback to keep the invariant that only exactly one thread is performing the 718 // callback. 719 // Note that we need to perform setInterestedOps() *without* holding the channelSelectedCallbackLock, as 720 // otherwise the reactor thread racing to the channel selected callback may found the lock still locked, which 721 // would result in the outgoingElementsQueue not being handled. 722 connectionInternal.setInterestOps(selectionKey, SelectionKey.OP_WRITE | SelectionKey.OP_READ); 723 } 724 725 @Override 726 protected XmppTcpNioTransport getTransport() { 727 return tcpNioTransport; 728 } 729 730 static final class EstablishingTcpConnectionStateDescriptor extends StateDescriptor { 731 private EstablishingTcpConnectionStateDescriptor() { 732 super(XmppTcpTransportModule.EstablishingTcpConnectionState.class); 733 addPredeccessor(LookupRemoteConnectionEndpointsStateDescriptor.class); 734 addSuccessor(EstablishTlsStateDescriptor.class); 735 addSuccessor(ConnectedButUnauthenticatedStateDescriptor.class); 736 } 737 738 @Override 739 protected XmppTcpTransportModule.EstablishingTcpConnectionState constructState(ModularXmppClientToServerConnectionInternal connectionInternal) { 740 XmppTcpTransportModule tcpTransportModule = connectionInternal.connection.getConnectionModuleFor(XmppTcpTransportModuleDescriptor.class); 741 return tcpTransportModule.constructEstablishingTcpConnectionState(this, connectionInternal); 742 } 743 } 744 745 private EstablishingTcpConnectionState constructEstablishingTcpConnectionState( 746 EstablishingTcpConnectionStateDescriptor stateDescriptor, 747 ModularXmppClientToServerConnectionInternal connectionInternal) { 748 return new EstablishingTcpConnectionState(stateDescriptor, connectionInternal); 749 } 750 751 final class EstablishingTcpConnectionState extends State.AbstractTransport { 752 private EstablishingTcpConnectionState(EstablishingTcpConnectionStateDescriptor stateDescriptor, 753 ModularXmppClientToServerConnectionInternal connectionInternal) { 754 super(tcpNioTransport, stateDescriptor, connectionInternal); 755 } 756 757 @Override 758 public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext) 759 throws InterruptedException, IOException, SmackException, XMPPException { 760 // The fields inetSocketAddress and failedAddresses are handed over from LookupHostAddresses to 761 // ConnectingToHost. 762 ConnectionAttemptState connectionAttemptState = new ConnectionAttemptState(connectionInternal, discoveredTcpEndpoints, 763 this); 764 StateTransitionResult.Failure failure = connectionAttemptState.establishTcpConnection(); 765 if (failure != null) { 766 return failure; 767 } 768 769 socketChannel = connectionAttemptState.socketChannel; 770 remoteAddress = (InetSocketAddress) socketChannel.socket().getRemoteSocketAddress(); 771 772 selectionKey = connectionInternal.registerWithSelector(socketChannel, SelectionKey.OP_READ, 773 XmppTcpTransportModule.this::onChannelSelected); 774 selectionKeyAttachment = (SelectionKeyAttachment) selectionKey.attachment(); 775 776 connectionInternal.setTransport(tcpNioTransport); 777 778 // TODO: It appears this should be done in a generic way. I'd assume we always 779 // have to wait for stream features after the connection was established. If this is true then consider 780 // moving this into State.AbstractTransport. But I am not yet 100% positive that this is the case for every 781 // transport. Hence, keep it here for now. 782 connectionInternal.newStreamOpenWaitForFeaturesSequence("stream features after initial connection"); 783 784 return new TcpSocketConnectedResult(remoteAddress); 785 } 786 787 @Override 788 public void resetState() { 789 closeSocketAndCleanup(); 790 } 791 } 792 793 public static final class TcpSocketConnectedResult extends StateTransitionResult.Success { 794 private final InetSocketAddress remoteAddress; 795 796 private TcpSocketConnectedResult(InetSocketAddress remoteAddress) { 797 super("TCP connection established to " + remoteAddress); 798 this.remoteAddress = remoteAddress; 799 } 800 801 public InetSocketAddress getRemoteAddress() { 802 return remoteAddress; 803 } 804 } 805 806 public static final class TlsEstablishedResult extends StateTransitionResult.Success { 807 808 private TlsEstablishedResult(SSLEngine sslEngine) { 809 super("TLS established: " + sslEngine.getSession()); 810 } 811 } 812 813 static final class EstablishTlsStateDescriptor extends StateDescriptor { 814 private EstablishTlsStateDescriptor() { 815 super(XmppTcpTransportModule.EstablishTlsState.class, "RFC 6120 § 5"); 816 addSuccessor(ConnectedButUnauthenticatedStateDescriptor.class); 817 declarePrecedenceOver(ConnectedButUnauthenticatedStateDescriptor.class); 818 } 819 820 @Override 821 protected EstablishTlsState constructState(ModularXmppClientToServerConnectionInternal connectionInternal) { 822 XmppTcpTransportModule tcpTransportModule = connectionInternal.connection.getConnectionModuleFor(XmppTcpTransportModuleDescriptor.class); 823 return tcpTransportModule.constructEstablishingTlsState(this, connectionInternal); 824 } 825 } 826 827 private EstablishTlsState constructEstablishingTlsState( 828 EstablishTlsStateDescriptor stateDescriptor, 829 ModularXmppClientToServerConnectionInternal connectionInternal) { 830 return new EstablishTlsState(stateDescriptor, connectionInternal); 831 } 832 833 private final class EstablishTlsState extends State { 834 private EstablishTlsState(EstablishTlsStateDescriptor stateDescriptor, 835 ModularXmppClientToServerConnectionInternal connectionInternal) { 836 super(stateDescriptor, connectionInternal); 837 } 838 839 @Override 840 public StateTransitionResult.TransitionImpossible isTransitionToPossible(WalkStateGraphContext walkStateGraphContext) 841 throws SecurityRequiredByClientException, SecurityRequiredByServerException { 842 StartTls startTlsFeature = connectionInternal.connection.getFeature(StartTls.class); 843 SecurityMode securityMode = connectionInternal.connection.getConfiguration().getSecurityMode(); 844 845 switch (securityMode) { 846 case required: 847 case ifpossible: 848 if (startTlsFeature == null) { 849 if (securityMode == SecurityMode.ifpossible) { 850 return new StateTransitionResult.TransitionImpossibleReason("Server does not announce support for TLS and we do not required it"); 851 } 852 throw new SecurityRequiredByClientException(); 853 } 854 // Allows transition by returning null. 855 return null; 856 case disabled: 857 if (startTlsFeature != null && startTlsFeature.required()) { 858 throw new SecurityRequiredByServerException(); 859 } 860 return new StateTransitionResult.TransitionImpossibleReason("TLS disabled in client settings and server does not require it"); 861 default: 862 throw new AssertionError("Unknown security mode: " + securityMode); 863 } 864 } 865 866 @Override 867 public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext) 868 throws IOException, InterruptedException, SmackException, XMPPException { 869 connectionInternal.sendAndWaitForResponse(StartTls.INSTANCE, TlsProceed.class, TlsFailure.class); 870 871 SmackTlsContext smackTlsContext = connectionInternal.getSmackTlsContext(); 872 873 tlsState = new TlsState(smackTlsContext); 874 connectionInternal.addXmppInputOutputFilter(tlsState); 875 876 channelSelectedCallbackLock.lock(); 877 try { 878 pendingOutputFilterData = true; 879 // The beginHandshake() is possibly not really required here, but it does not hurt either. 880 tlsState.engine.beginHandshake(); 881 tlsState.handshakeStatus = TlsHandshakeStatus.initiated; 882 } finally { 883 channelSelectedCallbackLock.unlock(); 884 } 885 connectionInternal.setInterestOps(selectionKey, SelectionKey.OP_WRITE | SelectionKey.OP_READ); 886 887 try { 888 tlsState.waitForHandshakeFinished(); 889 } catch (CertificateException e) { 890 throw new SmackCertificateException(e); 891 } 892 893 connectionInternal.newStreamOpenWaitForFeaturesSequence("stream features after TLS established"); 894 895 return new TlsEstablishedResult(tlsState.engine); 896 } 897 898 @Override 899 public void resetState() { 900 tlsState = null; 901 } 902 } 903 904 private enum TlsHandshakeStatus { 905 initial, 906 initiated, 907 successful, 908 failed, 909 } 910 911 private static final Level SSL_ENGINE_DEBUG_LOG_LEVEL = Level.FINEST; 912 913 private static void debugLogSslEngineResult(String operation, SSLEngineResult result) { 914 if (!LOGGER.isLoggable(SSL_ENGINE_DEBUG_LOG_LEVEL)) { 915 return; 916 } 917 918 LOGGER.log(SSL_ENGINE_DEBUG_LOG_LEVEL, "SSLEngineResult of " + operation + "(): " + result); 919 } 920 921 private final class TlsState implements XmppInputOutputFilter { 922 923 private static final int MAX_PENDING_OUTPUT_BYTES = 8096; 924 925 private final SmackTlsContext smackTlsContext; 926 private final SSLEngine engine; 927 928 private TlsHandshakeStatus handshakeStatus = TlsHandshakeStatus.initial; 929 private SSLException handshakeException; 930 931 private ByteBuffer myNetData; 932 private ByteBuffer peerAppData; 933 934 private final List<ByteBuffer> pendingOutputData = new ArrayList<>(); 935 private int pendingOutputBytes; 936 private ByteBuffer pendingInputData; 937 938 private final AtomicInteger pendingDelegatedTasks = new AtomicInteger(); 939 940 private long wrapInBytes; 941 private long wrapOutBytes; 942 943 private long unwrapInBytes; 944 private long unwrapOutBytes; 945 946 private TlsState(SmackTlsContext smackTlsContext) throws IOException { 947 this.smackTlsContext = smackTlsContext; 948 949 // Call createSSLEngine()'s variant with two parameters as this allows for TLS session resumption. 950 951 // Note that it is not really clear what the value of peer host should be. It could be A) the XMPP service's 952 // domainpart or B) the DNS name of the host we are connecting to (usually the DNS SRV RR target name). While 953 // the javadoc of createSSLEngine(String, int) indicates with "Some cipher suites (such as Kerberos) require 954 // remote hostname information, in which case peerHost needs to be specified." that A should be used. TLS 955 // session resumption may would need or at least benefit from B. Variant A would also be required if the 956 // String is used for certificate verification. And it appears at least likely that TLS session resumption 957 // would not be hurt by using variant A. Therefore we currently use variant A. 958 // TODO: Should we use the ACE representation of the XMPP service domain? Compare with f60e4055ec529f0b8160acedf13275592ab10a4b 959 // If yes, then we should probably introduce getXmppServiceDomainAceEncodedIfPossible(). 960 String peerHost = connectionInternal.connection.getConfiguration().getXMPPServiceDomain().toString(); 961 engine = smackTlsContext.sslContext.createSSLEngine(peerHost, remoteAddress.getPort()); 962 engine.setUseClientMode(true); 963 964 SSLSession session = engine.getSession(); 965 int applicationBufferSize = session.getApplicationBufferSize(); 966 int packetBufferSize = session.getPacketBufferSize(); 967 968 myNetData = ByteBuffer.allocateDirect(packetBufferSize); 969 peerAppData = ByteBuffer.allocate(applicationBufferSize); 970 } 971 972 @Override 973 public OutputResult output(ByteBuffer outputData, boolean isFinalDataOfElement, boolean destinationAddressChanged, 974 boolean moreDataAvailable) throws SSLException { 975 if (outputData != null) { 976 pendingOutputData.add(outputData); 977 pendingOutputBytes += outputData.remaining(); 978 if (moreDataAvailable && pendingOutputBytes < MAX_PENDING_OUTPUT_BYTES) { 979 return OutputResult.NO_OUTPUT; 980 } 981 } 982 983 ByteBuffer[] outputDataArray = pendingOutputData.toArray(new ByteBuffer[pendingOutputData.size()]); 984 985 ((java.nio.Buffer) myNetData).clear(); 986 987 while (true) { 988 SSLEngineResult result; 989 try { 990 result = engine.wrap(outputDataArray, myNetData); 991 } catch (SSLException e) { 992 handleSslException(e); 993 throw e; 994 } 995 996 debugLogSslEngineResult("wrap", result); 997 998 SSLEngineResult.Status engineResultStatus = result.getStatus(); 999 1000 pendingOutputBytes -= result.bytesConsumed(); 1001 1002 if (engineResultStatus == SSLEngineResult.Status.OK) { 1003 wrapInBytes += result.bytesConsumed(); 1004 wrapOutBytes += result.bytesProduced(); 1005 1006 SSLEngineResult.HandshakeStatus handshakeStatus = handleHandshakeStatus(result); 1007 switch (handshakeStatus) { 1008 case NEED_UNWRAP: 1009 // NEED_UNWRAP means that we need to receive something in order to continue the handshake. The 1010 // standard channelSelectedCallback logic will take care of this, as there is eventually always 1011 // a interest to read from the socket. 1012 break; 1013 case NEED_WRAP: 1014 // Same as need task: Cycle the reactor. 1015 case NEED_TASK: 1016 // Note that we also set pendingOutputFilterData in the OutputResult in the NEED_TASK case, as 1017 // we also want to retry the wrap() operation above in this case. 1018 return new OutputResult(true, myNetData); 1019 default: 1020 break; 1021 } 1022 } 1023 1024 switch (engineResultStatus) { 1025 case OK: 1026 // No need to outputData.compact() here, since we do not reuse the buffer. 1027 // Clean up the pending output data. 1028 pruneBufferList(pendingOutputData); 1029 return new OutputResult(!pendingOutputData.isEmpty(), myNetData); 1030 case CLOSED: 1031 pendingOutputData.clear(); 1032 return OutputResult.NO_OUTPUT; 1033 case BUFFER_OVERFLOW: 1034 LOGGER.warning("SSLEngine status BUFFER_OVERFLOW, this is hopefully uncommon"); 1035 int outputDataRemaining = outputData != null ? outputData.remaining() : 0; 1036 int newCapacity = (int) (1.3 * outputDataRemaining); 1037 // If newCapacity would not increase myNetData, then double it. 1038 if (newCapacity <= myNetData.capacity()) { 1039 newCapacity = 2 * myNetData.capacity(); 1040 } 1041 ByteBuffer newMyNetData = ByteBuffer.allocateDirect(newCapacity); 1042 ((java.nio.Buffer) myNetData).flip(); 1043 newMyNetData.put(myNetData); 1044 myNetData = newMyNetData; 1045 continue; 1046 case BUFFER_UNDERFLOW: 1047 throw new IllegalStateException( 1048 "Buffer underflow as result of SSLEngine.wrap() should never happen"); 1049 } 1050 } 1051 } 1052 1053 @SuppressWarnings("ReferenceEquality") 1054 @Override 1055 public ByteBuffer input(ByteBuffer inputData) throws SSLException { 1056 ByteBuffer accumulatedData; 1057 if (pendingInputData == null) { 1058 accumulatedData = inputData; 1059 } else { 1060 assert pendingInputData != inputData; 1061 1062 int accumulatedDataBytes = pendingInputData.remaining() + inputData.remaining(); 1063 accumulatedData = ByteBuffer.allocate(accumulatedDataBytes); 1064 accumulatedData.put(pendingInputData) 1065 .put(inputData); 1066 ((java.nio.Buffer) accumulatedData).flip(); 1067 pendingInputData = null; 1068 } 1069 1070 ((java.nio.Buffer) peerAppData).clear(); 1071 1072 while (true) { 1073 SSLEngineResult result; 1074 try { 1075 result = engine.unwrap(accumulatedData, peerAppData); 1076 } catch (SSLException e) { 1077 handleSslException(e); 1078 throw e; 1079 } 1080 1081 debugLogSslEngineResult("unwrap", result); 1082 1083 SSLEngineResult.Status engineResultStatus = result.getStatus(); 1084 1085 if (engineResultStatus == SSLEngineResult.Status.OK) { 1086 unwrapInBytes += result.bytesConsumed(); 1087 unwrapOutBytes += result.bytesProduced(); 1088 1089 SSLEngineResult.HandshakeStatus handshakeStatus = handleHandshakeStatus(result); 1090 switch (handshakeStatus) { 1091 case NEED_TASK: 1092 // A delegated task is asynchronously running. Take care of the remaining accumulatedData. 1093 addAsPendingInputData(accumulatedData); 1094 // Return here, as the async task created by handleHandshakeStatus will continue calling the 1095 // channelSelectedCallback. 1096 return null; 1097 case NEED_UNWRAP: 1098 continue; 1099 case NEED_WRAP: 1100 // NEED_WRAP means that the SSLEngine needs to send data, probably without consuming data. 1101 // We exploit here the fact that the channelSelectedCallback is single threaded and that the 1102 // input processing is after the output processing. 1103 addAsPendingInputData(accumulatedData); 1104 // Note that it is ok that we the provided argument for pending input filter data to channel 1105 // selected callback is false, as setPendingInputFilterData() will have set the internal state 1106 // boolean accordingly. 1107 connectionInternal.asyncGo(() -> callChannelSelectedCallback(false, true)); 1108 // Do not break here, but instead return and let the asynchronously invoked 1109 // callChannelSelectedCallback() do its work. 1110 return null; 1111 default: 1112 break; 1113 } 1114 } 1115 1116 switch (engineResultStatus) { 1117 case OK: 1118 // SSLEngine's unwrap() may not consume all bytes from the source buffer. If this is the case, then 1119 // simply perform another unwrap until accumulatedData has no remaining bytes. 1120 if (accumulatedData.hasRemaining()) { 1121 continue; 1122 } 1123 return peerAppData; 1124 case CLOSED: 1125 return null; 1126 case BUFFER_UNDERFLOW: 1127 // There were not enough source bytes available to make a complete packet. Let it in 1128 // pendingInputData. Note that we do not resize SSLEngine's source buffer - inputData in our case - 1129 // as it is not possible. 1130 addAsPendingInputData(accumulatedData); 1131 return null; 1132 case BUFFER_OVERFLOW: 1133 int applicationBufferSize = engine.getSession().getApplicationBufferSize(); 1134 assert peerAppData.remaining() < applicationBufferSize; 1135 peerAppData = ByteBuffer.allocate(applicationBufferSize); 1136 continue; 1137 } 1138 } 1139 } 1140 1141 private void addAsPendingInputData(ByteBuffer byteBuffer) { 1142 // Note that we can not simply write 1143 // pendingInputData = byteBuffer; 1144 // we have to copy the provided byte buffer, because it is possible that this byteBuffer is re-used by some 1145 // higher layer. That is, here 'byteBuffer' is typically 'incomingBuffer', which is a direct buffer only 1146 // allocated once per connection for performance reasons and hence re-used for read() calls. 1147 pendingInputData = ByteBuffer.allocate(byteBuffer.remaining()); 1148 pendingInputData.put(byteBuffer); 1149 ((java.nio.Buffer) pendingInputData).flip(); 1150 1151 pendingInputFilterData = pendingInputData.hasRemaining(); 1152 } 1153 1154 private SSLEngineResult.HandshakeStatus handleHandshakeStatus(SSLEngineResult sslEngineResult) { 1155 SSLEngineResult.HandshakeStatus handshakeStatus = sslEngineResult.getHandshakeStatus(); 1156 switch (handshakeStatus) { 1157 case NEED_TASK: 1158 while (true) { 1159 final Runnable delegatedTask = engine.getDelegatedTask(); 1160 if (delegatedTask == null) { 1161 break; 1162 } 1163 sslEngineDelegatedTasks++; 1164 int currentPendingDelegatedTasks = pendingDelegatedTasks.incrementAndGet(); 1165 if (currentPendingDelegatedTasks > maxPendingSslEngineDelegatedTasks) { 1166 maxPendingSslEngineDelegatedTasks = currentPendingDelegatedTasks; 1167 } 1168 1169 Runnable wrappedDelegatedTask = () -> { 1170 delegatedTask.run(); 1171 int wrappedCurrentPendingDelegatedTasks = pendingDelegatedTasks.decrementAndGet(); 1172 if (wrappedCurrentPendingDelegatedTasks == 0) { 1173 callChannelSelectedCallback(true, true); 1174 } 1175 }; 1176 connectionInternal.asyncGo(wrappedDelegatedTask); 1177 } 1178 break; 1179 case FINISHED: 1180 onHandshakeFinished(); 1181 break; 1182 default: 1183 break; 1184 } 1185 1186 SSLEngineResult.HandshakeStatus afterHandshakeStatus = engine.getHandshakeStatus(); 1187 return afterHandshakeStatus; 1188 } 1189 1190 private void handleSslException(SSLException e) { 1191 handshakeException = e; 1192 handshakeStatus = TlsHandshakeStatus.failed; 1193 connectionInternal.notifyWaitingThreads(); 1194 } 1195 1196 private void onHandshakeFinished() { 1197 handshakeStatus = TlsHandshakeStatus.successful; 1198 connectionInternal.notifyWaitingThreads(); 1199 } 1200 1201 private boolean isHandshakeFinished() { 1202 return handshakeStatus == TlsHandshakeStatus.successful || handshakeStatus == TlsHandshakeStatus.failed; 1203 } 1204 1205 private void waitForHandshakeFinished() throws InterruptedException, CertificateException, SSLException, SmackWrappedException, NoResponseException { 1206 connectionInternal.waitForConditionOrThrowConnectionException(() -> isHandshakeFinished(), "TLS handshake to finish"); 1207 1208 if (handshakeStatus == TlsHandshakeStatus.failed) { 1209 throw handshakeException; 1210 } 1211 1212 assert handshakeStatus == TlsHandshakeStatus.successful; 1213 1214 if (smackTlsContext.daneVerifier != null) { 1215 smackTlsContext.daneVerifier.finish(engine.getSession()); 1216 } 1217 } 1218 1219 @Override 1220 public Object getStats() { 1221 return new TlsStateStats(this); 1222 } 1223 1224 @Override 1225 public void closeInputOutput() { 1226 engine.closeOutbound(); 1227 try { 1228 engine.closeInbound(); 1229 } catch (SSLException e) { 1230 LOGGER.log(Level.FINEST, 1231 "SSLException when closing inbound TLS session. This can likely be ignored if a possible truncation attack is suggested." 1232 + " You may want to ask your XMPP server vendor to implement a clean TLS session shutdown sending close_notify after </stream>", 1233 e); 1234 } 1235 } 1236 1237 @Override 1238 public void waitUntilInputOutputClosed() throws IOException, CertificateException, InterruptedException, SmackWrappedException, NoResponseException { 1239 waitForHandshakeFinished(); 1240 } 1241 1242 @Override 1243 public String getFilterName() { 1244 return "TLS (" + engine + ')'; 1245 } 1246 } 1247 1248 public static final class TlsStateStats { 1249 public final long wrapInBytes; 1250 public final long wrapOutBytes; 1251 public final double wrapRatio; 1252 1253 public final long unwrapInBytes; 1254 public final long unwrapOutBytes; 1255 public final double unwrapRatio; 1256 1257 private TlsStateStats(TlsState tlsState) { 1258 wrapOutBytes = tlsState.wrapOutBytes; 1259 wrapInBytes = tlsState.wrapInBytes; 1260 wrapRatio = (double) wrapOutBytes / wrapInBytes; 1261 1262 unwrapOutBytes = tlsState.unwrapOutBytes; 1263 unwrapInBytes = tlsState.unwrapInBytes; 1264 unwrapRatio = (double) unwrapInBytes / unwrapOutBytes; 1265 } 1266 1267 private transient String toStringCache; 1268 1269 @Override 1270 public String toString() { 1271 if (toStringCache != null) { 1272 return toStringCache; 1273 } 1274 1275 toStringCache = 1276 "wrap-in-bytes: " + wrapInBytes + '\n' 1277 + "wrap-out-bytes: " + wrapOutBytes + '\n' 1278 + "wrap-ratio: " + wrapRatio + '\n' 1279 + "unwrap-in-bytes: " + unwrapInBytes + '\n' 1280 + "unwrap-out-bytes: " + unwrapOutBytes + '\n' 1281 + "unwrap-ratio: " + unwrapRatio + '\n' 1282 ; 1283 1284 return toStringCache; 1285 } 1286 } 1287 1288 private void callChannelSelectedCallback(boolean setPendingInputFilterData, boolean setPendingOutputFilterData) { 1289 final SocketChannel channel = socketChannel; 1290 final SelectionKey key = selectionKey; 1291 if (channel == null || key == null) { 1292 LOGGER.info("Not calling channel selected callback because the connection was eventually disconnected"); 1293 return; 1294 } 1295 1296 channelSelectedCallbackLock.lock(); 1297 try { 1298 // Note that it is important that we send the pending(Input|Output)FilterData flags while holding the lock. 1299 if (setPendingInputFilterData) { 1300 pendingInputFilterData = true; 1301 } 1302 if (setPendingOutputFilterData) { 1303 pendingOutputFilterData = true; 1304 } 1305 1306 onChannelSelected(channel, key); 1307 } finally { 1308 channelSelectedCallbackLock.unlock(); 1309 } 1310 } 1311 1312 private void closeSocketAndCleanup() { 1313 final SelectionKey selectionKey = this.selectionKey; 1314 if (selectionKey != null) { 1315 selectionKey.cancel(); 1316 } 1317 final SocketChannel socketChannel = this.socketChannel; 1318 if (socketChannel != null) { 1319 try { 1320 socketChannel.close(); 1321 } catch (IOException e) { 1322 LOGGER.log(Level.FINE, "Closing the socket channel failed", e); 1323 } 1324 } 1325 1326 this.selectionKey = null; 1327 this.socketChannel = null; 1328 1329 selectionKeyAttachment = null; 1330 remoteAddress = null; 1331 } 1332 1333 private static List<? extends Buffer> pruneBufferList(Collection<? extends Buffer> buffers) { 1334 return CollectionUtil.removeUntil(buffers, b -> b.hasRemaining()); 1335 } 1336 1337 public XmppTcpTransportModule.Stats getStats() { 1338 return new Stats(this); 1339 } 1340 1341 public static final class Stats extends XmppClientToServerTransport.Stats { 1342 public final long totalBytesWritten; 1343 public final long totalBytesWrittenBeforeFilter; 1344 public final double writeRatio; 1345 1346 public final long totalBytesRead; 1347 public final long totalBytesReadAfterFilter; 1348 public final double readRatio; 1349 1350 public final long handledChannelSelectedCallbacks; 1351 public final long setWriteInterestAfterChannelSelectedCallback; 1352 public final long reactorThreadAlreadyRacing; 1353 public final long afterOutgoingElementsQueueModifiedSetInterestOps; 1354 public final long rejectedChannelSelectedCallbacks; 1355 public final long totalCallbackRequests; 1356 public final long callbackPreemtBecauseBytesWritten; 1357 public final long callbackPreemtBecauseBytesRead; 1358 public final int sslEngineDelegatedTasks; 1359 public final int maxPendingSslEngineDelegatedTasks; 1360 1361 private Stats(XmppTcpTransportModule connection) { 1362 totalBytesWritten = connection.totalBytesWritten; 1363 totalBytesWrittenBeforeFilter = connection.totalBytesWrittenBeforeFilter; 1364 writeRatio = (double) totalBytesWritten / totalBytesWrittenBeforeFilter; 1365 1366 totalBytesReadAfterFilter = connection.totalBytesReadAfterFilter; 1367 totalBytesRead = connection.totalBytesRead; 1368 readRatio = (double) totalBytesRead / totalBytesReadAfterFilter; 1369 1370 handledChannelSelectedCallbacks = connection.handledChannelSelectedCallbacks; 1371 setWriteInterestAfterChannelSelectedCallback = connection.setWriteInterestAfterChannelSelectedCallback.sum(); 1372 reactorThreadAlreadyRacing = connection.reactorThreadAlreadyRacing.sum(); 1373 afterOutgoingElementsQueueModifiedSetInterestOps = connection.afterOutgoingElementsQueueModifiedSetInterestOps 1374 .sum(); 1375 rejectedChannelSelectedCallbacks = connection.rejectedChannelSelectedCallbacks.sum(); 1376 1377 totalCallbackRequests = handledChannelSelectedCallbacks + rejectedChannelSelectedCallbacks; 1378 1379 callbackPreemtBecauseBytesRead = connection.callbackPreemtBecauseBytesRead; 1380 callbackPreemtBecauseBytesWritten = connection.callbackPreemtBecauseBytesWritten; 1381 1382 sslEngineDelegatedTasks = connection.sslEngineDelegatedTasks; 1383 maxPendingSslEngineDelegatedTasks = connection.maxPendingSslEngineDelegatedTasks; 1384 } 1385 1386 private transient String toStringCache; 1387 1388 @Override 1389 public String toString() { 1390 if (toStringCache != null) { 1391 return toStringCache; 1392 } 1393 1394 toStringCache = 1395 "Total bytes\n" 1396 + "recv: " + totalBytesRead + '\n' 1397 + "send: " + totalBytesWritten + '\n' 1398 + "recv-aft-filter: " + totalBytesReadAfterFilter + '\n' 1399 + "send-bef-filter: " + totalBytesWrittenBeforeFilter + '\n' 1400 + "read-ratio: " + readRatio + '\n' 1401 + "write-ratio: " + writeRatio + '\n' 1402 + "Events\n" 1403 + "total-callback-requests: " + totalCallbackRequests + '\n' 1404 + "handled-channel-selected-callbacks: " + handledChannelSelectedCallbacks + '\n' 1405 + "rejected-channel-selected-callbacks: " + rejectedChannelSelectedCallbacks + '\n' 1406 + "set-write-interest-after-callback: " + setWriteInterestAfterChannelSelectedCallback + '\n' 1407 + "reactor-thread-already-racing: " + reactorThreadAlreadyRacing + '\n' 1408 + "after-queue-modified-set-interest-ops: " + afterOutgoingElementsQueueModifiedSetInterestOps + '\n' 1409 + "callback-preemt-because-bytes-read: " + callbackPreemtBecauseBytesRead + '\n' 1410 + "callback-preemt-because-bytes-written: " + callbackPreemtBecauseBytesWritten + '\n' 1411 + "ssl-engine-delegated-tasks: " + sslEngineDelegatedTasks + '\n' 1412 + "max-pending-ssl-engine-delegated-tasks: " + maxPendingSslEngineDelegatedTasks + '\n' 1413 ; 1414 1415 return toStringCache; 1416 } 1417 } 1418}