001/** 002 * 003 * Copyright 2019-2021 Florian Schmaus 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack.tcp; 018 019import java.io.IOException; 020import java.net.InetSocketAddress; 021import java.nio.Buffer; 022import java.nio.ByteBuffer; 023import java.nio.channels.ClosedChannelException; 024import java.nio.channels.SelectableChannel; 025import java.nio.channels.SelectionKey; 026import java.nio.channels.SocketChannel; 027import java.security.cert.CertificateException; 028import java.util.ArrayList; 029import java.util.Collection; 030import java.util.Collections; 031import java.util.IdentityHashMap; 032import java.util.Iterator; 033import java.util.List; 034import java.util.ListIterator; 035import java.util.Map; 036import java.util.concurrent.atomic.AtomicInteger; 037import java.util.concurrent.atomic.AtomicLong; 038import java.util.concurrent.locks.ReentrantLock; 039import java.util.logging.Level; 040import java.util.logging.Logger; 041 042import javax.net.ssl.SSLEngine; 043import javax.net.ssl.SSLEngineResult; 044import javax.net.ssl.SSLException; 045import javax.net.ssl.SSLSession; 046 047import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode; 048import org.jivesoftware.smack.SmackException; 049import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException; 050import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException; 051import org.jivesoftware.smack.SmackException.SmackCertificateException; 052import org.jivesoftware.smack.SmackFuture; 053import org.jivesoftware.smack.SmackFuture.InternalSmackFuture; 054import org.jivesoftware.smack.SmackReactor.SelectionKeyAttachment; 055import org.jivesoftware.smack.XMPPException; 056import org.jivesoftware.smack.XmppInputOutputFilter; 057import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnection.ConnectedButUnauthenticatedStateDescriptor; 058import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnection.LookupRemoteConnectionEndpointsStateDescriptor; 059import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnectionModule; 060import org.jivesoftware.smack.c2s.StreamOpenAndCloseFactory; 061import org.jivesoftware.smack.c2s.XmppClientToServerTransport; 062import org.jivesoftware.smack.c2s.internal.ModularXmppClientToServerConnectionInternal; 063import org.jivesoftware.smack.c2s.internal.WalkStateGraphContext; 064import org.jivesoftware.smack.debugger.SmackDebugger; 065import org.jivesoftware.smack.fsm.State; 066import org.jivesoftware.smack.fsm.StateDescriptor; 067import org.jivesoftware.smack.fsm.StateTransitionResult; 068import org.jivesoftware.smack.internal.SmackTlsContext; 069import org.jivesoftware.smack.packet.Stanza; 070import org.jivesoftware.smack.packet.StartTls; 071import org.jivesoftware.smack.packet.StreamClose; 072import org.jivesoftware.smack.packet.StreamOpen; 073import org.jivesoftware.smack.packet.TlsFailure; 074import org.jivesoftware.smack.packet.TlsProceed; 075import org.jivesoftware.smack.packet.TopLevelStreamElement; 076import org.jivesoftware.smack.packet.XmlEnvironment; 077import org.jivesoftware.smack.tcp.XmppTcpTransportModule.XmppTcpNioTransport.DiscoveredTcpEndpoints; 078import org.jivesoftware.smack.tcp.rce.RemoteXmppTcpConnectionEndpoints; 079import org.jivesoftware.smack.tcp.rce.RemoteXmppTcpConnectionEndpoints.Result; 080import org.jivesoftware.smack.tcp.rce.Rfc6120TcpRemoteConnectionEndpoint; 081import org.jivesoftware.smack.util.CollectionUtil; 082import org.jivesoftware.smack.util.StringUtils; 083import org.jivesoftware.smack.util.UTF8; 084import org.jivesoftware.smack.util.XmlStringBuilder; 085import org.jivesoftware.smack.util.rce.RemoteConnectionEndpointLookupFailure; 086 087import org.jxmpp.jid.DomainBareJid; 088import org.jxmpp.jid.Jid; 089import org.jxmpp.jid.util.JidUtil; 090import org.jxmpp.xml.splitter.Utf8ByteXmppXmlSplitter; 091import org.jxmpp.xml.splitter.XmlPrettyPrinter; 092import org.jxmpp.xml.splitter.XmlPrinter; 093import org.jxmpp.xml.splitter.XmppElementCallback; 094import org.jxmpp.xml.splitter.XmppXmlSplitter; 095 096public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionModule<XmppTcpTransportModuleDescriptor> { 097 098 private static final Logger LOGGER = Logger.getLogger(XmppTcpTransportModule.class.getName()); 099 100 private static final int CALLBACK_MAX_BYTES_READ = 10 * 1024 * 1024; 101 private static final int CALLBACK_MAX_BYTES_WRITEN = CALLBACK_MAX_BYTES_READ; 102 103 private static final int MAX_ELEMENT_SIZE = 64 * 1024; 104 105 private final XmppTcpNioTransport tcpNioTransport; 106 107 private SelectionKey selectionKey; 108 private SelectionKeyAttachment selectionKeyAttachment; 109 private SocketChannel socketChannel; 110 private InetSocketAddress remoteAddress; 111 112 private TlsState tlsState; 113 114 private Iterator<CharSequence> outgoingCharSequenceIterator; 115 116 private final List<TopLevelStreamElement> currentlyOutgoingElements = new ArrayList<>(); 117 private final IdentityHashMap<ByteBuffer, List<TopLevelStreamElement>> bufferToElementMap = new IdentityHashMap<>(); 118 119 private ByteBuffer outgoingBuffer; 120 private ByteBuffer filteredOutgoingBuffer; 121 private final List<ByteBuffer> networkOutgoingBuffers = new ArrayList<>(); 122 private long networkOutgoingBuffersBytes; 123 124 // TODO: Make the size of the incomingBuffer configurable. 125 private final ByteBuffer incomingBuffer = ByteBuffer.allocateDirect(2 * 4096); 126 127 private final ReentrantLock channelSelectedCallbackLock = new ReentrantLock(); 128 129 private long totalBytesRead; 130 private long totalBytesWritten; 131 private long totalBytesReadAfterFilter; 132 private long totalBytesWrittenBeforeFilter; 133 private long handledChannelSelectedCallbacks; 134 private long callbackPreemtBecauseBytesWritten; 135 private long callbackPreemtBecauseBytesRead; 136 private int sslEngineDelegatedTasks; 137 private int maxPendingSslEngineDelegatedTasks; 138 139 // TODO: Use LongAdder once Smack's minimum Android API level is 24 or higher. 140 private final AtomicLong setWriteInterestAfterChannelSelectedCallback = new AtomicLong(); 141 private final AtomicLong reactorThreadAlreadyRacing = new AtomicLong(); 142 private final AtomicLong afterOutgoingElementsQueueModifiedSetInterestOps = new AtomicLong(); 143 private final AtomicLong rejectedChannelSelectedCallbacks = new AtomicLong(); 144 145 private Jid lastDestinationAddress; 146 147 private boolean pendingInputFilterData; 148 private boolean pendingOutputFilterData; 149 150 private boolean pendingWriteInterestAfterRead; 151 152 /** 153 * Note that this field is effective final, but due to https://stackoverflow.com/q/30360824/194894 we have to declare it non-final. 154 */ 155 private Utf8ByteXmppXmlSplitter splitter; 156 157 /** 158 * Note that this field is effective final, but due to https://stackoverflow.com/q/30360824/194894 we have to declare it non-final. 159 */ 160 private XmppXmlSplitter outputDebugSplitter; 161 162 private static final Level STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL = Level.FINER; 163 164 XmppTcpTransportModule(XmppTcpTransportModuleDescriptor moduleDescriptor, ModularXmppClientToServerConnectionInternal connectionInternal) { 165 super(moduleDescriptor, connectionInternal); 166 167 tcpNioTransport = new XmppTcpNioTransport(connectionInternal); 168 169 XmlPrinter incomingDebugPrettyPrinter = null; 170 final SmackDebugger debugger = connectionInternal.smackDebugger; 171 if (debugger != null) { 172 // Incoming stream debugging. 173 incomingDebugPrettyPrinter = XmlPrettyPrinter.builder() 174 .setPrettyWriter(sb -> debugger.incomingStreamSink(sb)) 175 .build(); 176 177 // Outgoing stream debugging. 178 XmlPrinter outgoingDebugPrettyPrinter = XmlPrettyPrinter.builder() 179 .setPrettyWriter(sb -> debugger.outgoingStreamSink(sb)) 180 .build(); 181 outputDebugSplitter = new XmppXmlSplitter(outgoingDebugPrettyPrinter); 182 } 183 184 XmppXmlSplitter xmppXmlSplitter = new XmppXmlSplitter(MAX_ELEMENT_SIZE, xmppElementCallback, 185 incomingDebugPrettyPrinter); 186 splitter = new Utf8ByteXmppXmlSplitter(xmppXmlSplitter); 187 } 188 189 private final XmppElementCallback xmppElementCallback = new XmppElementCallback() { 190 private String streamOpen; 191 private String streamClose; 192 193 @Override 194 public void onCompleteElement(String completeElement) { 195 assert streamOpen != null; 196 assert streamClose != null; 197 198 connectionInternal.withSmackDebugger(debugger -> debugger.onIncomingElementCompleted()); 199 200 String wrappedCompleteElement = streamOpen + completeElement + streamClose; 201 connectionInternal.parseAndProcessElement(wrappedCompleteElement); 202 } 203 204 205 @Override 206 public void streamOpened(String prefix, Map<String, String> attributes) { 207 if (LOGGER.isLoggable(STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL)) { 208 LOGGER.log(STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL, 209 "Stream of " + this + " opened. prefix=" + prefix + " attributes=" + attributes); 210 } 211 212 final String prefixXmlns = "xmlns:" + prefix; 213 // TODO: Use the return value of onStreamOpen(), which now returns the 214 // corresponding stream close tag, instead of creating it here. 215 final StringBuilder streamClose = new StringBuilder(32); 216 final StringBuilder streamOpen = new StringBuilder(256); 217 218 streamOpen.append('<'); 219 streamClose.append("</"); 220 if (StringUtils.isNotEmpty(prefix)) { 221 streamOpen.append(prefix).append(':'); 222 streamClose.append(prefix).append(':'); 223 } 224 streamOpen.append("stream"); 225 streamClose.append("stream>"); 226 for (Map.Entry<String, String> entry : attributes.entrySet()) { 227 String attributeName = entry.getKey(); 228 String attributeValue = entry.getValue(); 229 switch (attributeName) { 230 case "to": 231 case "from": 232 case "id": 233 case "version": 234 break; 235 case "xml:lang": 236 streamOpen.append(" xml:lang='").append(attributeValue).append('\''); 237 break; 238 case "xmlns": 239 streamOpen.append(" xmlns='").append(attributeValue).append('\''); 240 break; 241 default: 242 if (attributeName.equals(prefixXmlns)) { 243 streamOpen.append(' ').append(prefixXmlns).append("='").append(attributeValue).append('\''); 244 break; 245 } 246 LOGGER.info("Unknown <stream/> attribute: " + attributeName); 247 break; 248 } 249 } 250 streamOpen.append('>'); 251 252 this.streamOpen = streamOpen.toString(); 253 this.streamClose = streamClose.toString(); 254 255 connectionInternal.onStreamOpen(this.streamOpen); 256 } 257 258 @Override 259 public void streamClosed() { 260 if (LOGGER.isLoggable(STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL)) { 261 LOGGER.log(STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL, "Stream of " + this + " closed"); 262 } 263 264 connectionInternal.onStreamClosed(); 265 } 266 }; 267 268 private void onChannelSelected(SelectableChannel selectedChannel, SelectionKey selectedSelectionKey) { 269 assert selectionKey == null || selectionKey == selectedSelectionKey; 270 SocketChannel selectedSocketChannel = (SocketChannel) selectedChannel; 271 // We are *always* interested in OP_READ. 272 int newInterestedOps = SelectionKey.OP_READ; 273 boolean newPendingOutputFilterData = false; 274 275 if (!channelSelectedCallbackLock.tryLock()) { 276 rejectedChannelSelectedCallbacks.incrementAndGet(); 277 return; 278 } 279 280 handledChannelSelectedCallbacks++; 281 282 long callbackBytesRead = 0; 283 long callbackBytesWritten = 0; 284 285 try { 286 boolean destinationAddressChanged = false; 287 boolean isLastPartOfElement = false; 288 TopLevelStreamElement currentlyOutgonigTopLevelStreamElement = null; 289 StringBuilder outgoingStreamForDebugger = null; 290 291 writeLoop: while (true) { 292 final boolean moreDataAvailable = !isLastPartOfElement || !connectionInternal.outgoingElementsQueue.isEmpty(); 293 294 if (filteredOutgoingBuffer != null || !networkOutgoingBuffers.isEmpty()) { 295 if (filteredOutgoingBuffer != null) { 296 networkOutgoingBuffers.add(filteredOutgoingBuffer); 297 networkOutgoingBuffersBytes += filteredOutgoingBuffer.remaining(); 298 299 filteredOutgoingBuffer = null; 300 if (moreDataAvailable && networkOutgoingBuffersBytes < 8096) { 301 continue; 302 } 303 } 304 305 ByteBuffer[] output = networkOutgoingBuffers.toArray(new ByteBuffer[networkOutgoingBuffers.size()]); 306 long bytesWritten; 307 try { 308 bytesWritten = selectedSocketChannel.write(output); 309 } catch (IOException e) { 310 // We have seen here so far 311 // - IOException "Broken pipe" 312 handleReadWriteIoException(e); 313 break; 314 } 315 316 if (bytesWritten == 0) { 317 newInterestedOps |= SelectionKey.OP_WRITE; 318 break; 319 } 320 321 callbackBytesWritten += bytesWritten; 322 323 networkOutgoingBuffersBytes -= bytesWritten; 324 325 List<? extends Buffer> prunedBuffers = pruneBufferList(networkOutgoingBuffers); 326 327 for (Buffer prunedBuffer : prunedBuffers) { 328 List<TopLevelStreamElement> sendElements = bufferToElementMap.remove(prunedBuffer); 329 if (sendElements == null) { 330 continue; 331 } 332 for (TopLevelStreamElement elementJustSend : sendElements) { 333 connectionInternal.fireFirstLevelElementSendListeners(elementJustSend); 334 } 335 } 336 337 // Prevent one callback from dominating the reactor thread. Break out of the write-loop if we have 338 // written a certain amount. 339 if (callbackBytesWritten > CALLBACK_MAX_BYTES_WRITEN) { 340 newInterestedOps |= SelectionKey.OP_WRITE; 341 callbackPreemtBecauseBytesWritten++; 342 break; 343 } 344 } else if (outgoingBuffer != null || pendingOutputFilterData) { 345 pendingOutputFilterData = false; 346 347 if (outgoingBuffer != null) { 348 totalBytesWrittenBeforeFilter += outgoingBuffer.remaining(); 349 if (isLastPartOfElement) { 350 assert currentlyOutgonigTopLevelStreamElement != null; 351 currentlyOutgoingElements.add(currentlyOutgonigTopLevelStreamElement); 352 } 353 } 354 355 ByteBuffer outputFilterInputData = outgoingBuffer; 356 // We can now null the outgoingBuffer since the filter step will take care of it from now on. 357 outgoingBuffer = null; 358 359 for (ListIterator<XmppInputOutputFilter> it = connectionInternal.getXmppInputOutputFilterBeginIterator(); it.hasNext();) { 360 XmppInputOutputFilter inputOutputFilter = it.next(); 361 XmppInputOutputFilter.OutputResult outputResult; 362 try { 363 outputResult = inputOutputFilter.output(outputFilterInputData, isLastPartOfElement, 364 destinationAddressChanged, moreDataAvailable); 365 } catch (IOException e) { 366 connectionInternal.notifyConnectionError(e); 367 break writeLoop; 368 } 369 newPendingOutputFilterData |= outputResult.pendingFilterData; 370 outputFilterInputData = outputResult.filteredOutputData; 371 if (outputFilterInputData != null) { 372 outputFilterInputData.flip(); 373 } 374 } 375 376 // It is ok if outpuFilterInputData is 'null' here, this is expected behavior. 377 if (outputFilterInputData != null && outputFilterInputData.hasRemaining()) { 378 filteredOutgoingBuffer = outputFilterInputData; 379 } else { 380 filteredOutgoingBuffer = null; 381 } 382 383 // If the filters did eventually not produce any output data but if there is 384 // pending output data then we have a pending write request after read. 385 if (filteredOutgoingBuffer == null && newPendingOutputFilterData) { 386 pendingWriteInterestAfterRead = true; 387 } 388 389 if (filteredOutgoingBuffer != null && isLastPartOfElement) { 390 bufferToElementMap.put(filteredOutgoingBuffer, new ArrayList<>(currentlyOutgoingElements)); 391 currentlyOutgoingElements.clear(); 392 } 393 394 // Reset that the destination address has changed. 395 if (destinationAddressChanged) { 396 destinationAddressChanged = false; 397 } 398 } else if (outgoingCharSequenceIterator != null) { 399 CharSequence nextCharSequence = outgoingCharSequenceIterator.next(); 400 outgoingBuffer = UTF8.encode(nextCharSequence); 401 if (!outgoingCharSequenceIterator.hasNext()) { 402 outgoingCharSequenceIterator = null; 403 isLastPartOfElement = true; 404 } else { 405 isLastPartOfElement = false; 406 } 407 408 final SmackDebugger debugger = connectionInternal.smackDebugger; 409 if (debugger != null) { 410 if (outgoingStreamForDebugger == null) { 411 outgoingStreamForDebugger = new StringBuilder(); 412 } 413 outgoingStreamForDebugger.append(nextCharSequence); 414 415 if (isLastPartOfElement) { 416 try { 417 outputDebugSplitter.append(outgoingStreamForDebugger); 418 } catch (IOException e) { 419 throw new AssertionError(e); 420 } 421 debugger.onOutgoingElementCompleted(); 422 outgoingStreamForDebugger = null; 423 } 424 } 425 } else if (!connectionInternal.outgoingElementsQueue.isEmpty()) { 426 currentlyOutgonigTopLevelStreamElement = connectionInternal.outgoingElementsQueue.poll(); 427 if (currentlyOutgonigTopLevelStreamElement instanceof Stanza) { 428 Stanza currentlyOutgoingStanza = (Stanza) currentlyOutgonigTopLevelStreamElement; 429 Jid currentDestinationAddress = currentlyOutgoingStanza.getTo(); 430 destinationAddressChanged = !JidUtil.equals(lastDestinationAddress, currentDestinationAddress); 431 lastDestinationAddress = currentDestinationAddress; 432 } 433 CharSequence nextCharSequence = currentlyOutgonigTopLevelStreamElement.toXML(StreamOpen.CLIENT_NAMESPACE); 434 if (nextCharSequence instanceof XmlStringBuilder) { 435 XmlStringBuilder xmlStringBuilder = (XmlStringBuilder) nextCharSequence; 436 XmlEnvironment outgoingStreamXmlEnvironment = connectionInternal.getOutgoingStreamXmlEnvironment(); 437 outgoingCharSequenceIterator = xmlStringBuilder.toList(outgoingStreamXmlEnvironment).iterator(); 438 } else { 439 outgoingCharSequenceIterator = Collections.singletonList(nextCharSequence).iterator(); 440 } 441 assert outgoingCharSequenceIterator != null; 442 } else { 443 // There is nothing more to write. 444 break; 445 } 446 } 447 448 pendingOutputFilterData = newPendingOutputFilterData; 449 if (!pendingWriteInterestAfterRead && pendingOutputFilterData) { 450 newInterestedOps |= SelectionKey.OP_WRITE; 451 } 452 453 readLoop: while (true) { 454 // Prevent one callback from dominating the reactor thread. Break out of the read-loop if we have 455 // read a certain amount. 456 if (callbackBytesRead > CALLBACK_MAX_BYTES_READ) { 457 callbackPreemtBecauseBytesRead++; 458 break; 459 } 460 461 int bytesRead; 462 incomingBuffer.clear(); 463 try { 464 bytesRead = selectedSocketChannel.read(incomingBuffer); 465 } catch (IOException e) { 466 handleReadWriteIoException(e); 467 return; 468 } 469 470 if (bytesRead < 0) { 471 LOGGER.finer("NIO read() returned " + bytesRead 472 + " for " + this + ". This probably means that the TCP connection was terminated."); 473 // According to the socket channel javadoc section about "asynchronous reads" a socket channel's 474 // read() may return -1 if the input side of a socket is shut down. 475 // Note that we do not call notifyConnectionError() here because the connection may be 476 // cleanly shutdown which would also cause read() to return '-1. I assume that this socket 477 // will be selected again, on which read() would throw an IOException, which will be catched 478 // and invoke notifyConnectionError() (see a few lines above). 479 /* 480 IOException exception = new IOException("NIO read() returned " + bytesRead); 481 notifyConnectionError(exception); 482 */ 483 return; 484 } 485 486 if (!pendingInputFilterData) { 487 if (bytesRead == 0) { 488 // Nothing more to read. 489 break; 490 } 491 } else { 492 pendingInputFilterData = false; 493 } 494 495 if (pendingWriteInterestAfterRead) { 496 // We have successfully read something and someone announced a write interest after a read. It is 497 // now possible that a filter is now also able to write additional data (for example SSLEngine). 498 pendingWriteInterestAfterRead = false; 499 newInterestedOps |= SelectionKey.OP_WRITE; 500 } 501 502 callbackBytesRead += bytesRead; 503 504 ByteBuffer filteredIncomingBuffer = incomingBuffer; 505 for (ListIterator<XmppInputOutputFilter> it = connectionInternal.getXmppInputOutputFilterEndIterator(); it.hasPrevious();) { 506 filteredIncomingBuffer.flip(); 507 508 ByteBuffer newFilteredIncomingBuffer; 509 try { 510 newFilteredIncomingBuffer = it.previous().input(filteredIncomingBuffer); 511 } catch (IOException e) { 512 connectionInternal.notifyConnectionError(e); 513 return; 514 } 515 if (newFilteredIncomingBuffer == null) { 516 break readLoop; 517 } 518 filteredIncomingBuffer = newFilteredIncomingBuffer; 519 } 520 521 final int bytesReadAfterFilter = filteredIncomingBuffer.flip().remaining(); 522 523 totalBytesReadAfterFilter += bytesReadAfterFilter; 524 525 try { 526 splitter.write(filteredIncomingBuffer); 527 } catch (IOException e) { 528 connectionInternal.notifyConnectionError(e); 529 return; 530 } 531 } 532 } finally { 533 totalBytesWritten += callbackBytesWritten; 534 totalBytesRead += callbackBytesRead; 535 536 channelSelectedCallbackLock.unlock(); 537 } 538 539 // Indicate that there is no reactor thread racing towards handling this selection key. 540 final SelectionKeyAttachment selectionKeyAttachment = this.selectionKeyAttachment; 541 if (selectionKeyAttachment != null) { 542 selectionKeyAttachment.resetReactorThreadRacing(); 543 } 544 545 // Check the queue again to prevent lost wakeups caused by elements inserted before we 546 // called resetReactorThreadRacing() a few lines above. 547 if (!connectionInternal.outgoingElementsQueue.isEmpty()) { 548 setWriteInterestAfterChannelSelectedCallback.incrementAndGet(); 549 newInterestedOps |= SelectionKey.OP_WRITE; 550 } 551 552 connectionInternal.setInterestOps(selectionKey, newInterestedOps); 553 } 554 555 private void handleReadWriteIoException(IOException e) { 556 if (e instanceof ClosedChannelException && !tcpNioTransport.isConnected()) { 557 // The connection is already closed. 558 return; 559 } 560 561 connectionInternal.notifyConnectionError(e); 562 } 563 564 /** 565 * This is the interface between the "lookup remote connection endpoints" state and the "establish TCP connection" 566 * state. The field is indirectly populated by {@link XmppTcpNioTransport#lookupConnectionEndpoints()} and consumed 567 * by {@link ConnectionAttemptState}. 568 */ 569 DiscoveredTcpEndpoints discoveredTcpEndpoints; 570 571 final class XmppTcpNioTransport extends XmppClientToServerTransport { 572 573 XmppTcpNioTransport(ModularXmppClientToServerConnectionInternal connectionInternal) { 574 super(connectionInternal); 575 } 576 577 @Override 578 public StreamOpenAndCloseFactory getStreamOpenAndCloseFactory() { 579 return new StreamOpenAndCloseFactory() { 580 @Override 581 public StreamOpen createStreamOpen(DomainBareJid to, CharSequence from, String id, String lang) { 582 String xmlLang = connectionInternal.connection.getConfiguration().getXmlLang(); 583 StreamOpen streamOpen = new StreamOpen(to, from, id, xmlLang, StreamOpen.StreamContentNamespace.client); 584 return streamOpen; 585 } 586 @Override 587 public StreamClose createStreamClose() { 588 return StreamClose.INSTANCE; 589 } 590 }; 591 } 592 593 @Override 594 protected void resetDiscoveredConnectionEndpoints() { 595 discoveredTcpEndpoints = null; 596 } 597 598 @Override 599 public boolean hasUseableConnectionEndpoints() { 600 return discoveredTcpEndpoints != null; 601 } 602 603 @Override 604 protected List<SmackFuture<LookupConnectionEndpointsResult, Exception>> lookupConnectionEndpoints() { 605 // Assert that there are no stale discovered endpoints prior performing the lookup. 606 assert discoveredTcpEndpoints == null; 607 608 List<SmackFuture<LookupConnectionEndpointsResult, Exception>> futures = new ArrayList<>(2); 609 610 InternalSmackFuture<LookupConnectionEndpointsResult, Exception> tcpEndpointsLookupFuture = new InternalSmackFuture<>(); 611 connectionInternal.asyncGo(() -> { 612 Result<Rfc6120TcpRemoteConnectionEndpoint> result = RemoteXmppTcpConnectionEndpoints.lookup( 613 connectionInternal.connection.getConfiguration()); 614 615 LookupConnectionEndpointsResult endpointsResult; 616 if (result.discoveredRemoteConnectionEndpoints.isEmpty()) { 617 endpointsResult = new TcpEndpointDiscoveryFailed(result); 618 } else { 619 endpointsResult = new DiscoveredTcpEndpoints(result); 620 } 621 tcpEndpointsLookupFuture.setResult(endpointsResult); 622 }); 623 futures.add(tcpEndpointsLookupFuture); 624 625 if (moduleDescriptor.isDirectTlsEnabled()) { 626 // TODO: Implement this. 627 throw new IllegalArgumentException("DirectTLS is not implemented yet"); 628 } 629 630 return futures; 631 } 632 633 @Override 634 protected void loadConnectionEndpoints(LookupConnectionEndpointsSuccess lookupConnectionEndpointsSuccess) { 635 // The API contract stats that we will be given the instance we handed out with lookupConnectionEndpoints, 636 // which must be of type DiscoveredTcpEndpoints here. Hence if we can not cast it, then there is an internal 637 // Smack error. 638 discoveredTcpEndpoints = (DiscoveredTcpEndpoints) lookupConnectionEndpointsSuccess; 639 } 640 641 @Override 642 protected void afterFiltersClosed() { 643 pendingInputFilterData = pendingOutputFilterData = true; 644 afterOutgoingElementsQueueModified(); 645 } 646 647 @Override 648 protected void disconnect() { 649 XmppTcpTransportModule.this.closeSocketAndCleanup(); 650 } 651 652 @Override 653 protected void notifyAboutNewOutgoingElements() { 654 afterOutgoingElementsQueueModified(); 655 } 656 657 @Override 658 public SSLSession getSslSession() { 659 TlsState tlsState = XmppTcpTransportModule.this.tlsState; 660 if (tlsState == null) { 661 return null; 662 } 663 664 return tlsState.engine.getSession(); 665 } 666 667 public boolean isConnected() { 668 SocketChannel socketChannel = XmppTcpTransportModule.this.socketChannel; 669 if (socketChannel == null) { 670 return false; 671 } 672 673 return socketChannel.isConnected(); 674 } 675 676 @Override 677 public boolean isTransportSecured() { 678 final TlsState tlsState = XmppTcpTransportModule.this.tlsState; 679 return tlsState != null && tlsState.handshakeStatus == TlsHandshakeStatus.successful; 680 } 681 682 @Override 683 public XmppTcpTransportModule.Stats getStats() { 684 return XmppTcpTransportModule.this.getStats(); 685 } 686 687 final class DiscoveredTcpEndpoints implements LookupConnectionEndpointsSuccess { 688 final RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result; 689 DiscoveredTcpEndpoints(RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result) { 690 this.result = result; 691 } 692 } 693 694 final class TcpEndpointDiscoveryFailed implements LookupConnectionEndpointsFailed { 695 final List<RemoteConnectionEndpointLookupFailure> lookupFailures; 696 TcpEndpointDiscoveryFailed(RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result) { 697 lookupFailures = result.lookupFailures; 698 } 699 } 700 } 701 702 private void afterOutgoingElementsQueueModified() { 703 final SelectionKeyAttachment selectionKeyAttachment = this.selectionKeyAttachment; 704 if (selectionKeyAttachment != null && selectionKeyAttachment.isReactorThreadRacing()) { 705 // A reactor thread is already racing to the channel selected callback and will take care of this. 706 reactorThreadAlreadyRacing.incrementAndGet(); 707 return; 708 } 709 710 afterOutgoingElementsQueueModifiedSetInterestOps.incrementAndGet(); 711 712 // Add OP_WRITE to the interested Ops, since we have now new things to write. Note that this may cause 713 // multiple reactor threads to race to the channel selected callback in case we perform this right after 714 // a select() returned with this selection key in the selected-key set. Hence we use tryLock() in the 715 // channel selected callback to keep the invariant that only exactly one thread is performing the 716 // callback. 717 // Note that we need to perform setInterestedOps() *without* holding the channelSelectedCallbackLock, as 718 // otherwise the reactor thread racing to the channel selected callback may found the lock still locked, which 719 // would result in the outgoingElementsQueue not being handled. 720 connectionInternal.setInterestOps(selectionKey, SelectionKey.OP_WRITE | SelectionKey.OP_READ); 721 } 722 723 @Override 724 protected XmppTcpNioTransport getTransport() { 725 return tcpNioTransport; 726 } 727 728 static final class EstablishingTcpConnectionStateDescriptor extends StateDescriptor { 729 private EstablishingTcpConnectionStateDescriptor() { 730 super(XmppTcpTransportModule.EstablishingTcpConnectionState.class); 731 addPredeccessor(LookupRemoteConnectionEndpointsStateDescriptor.class); 732 addSuccessor(EstablishTlsStateDescriptor.class); 733 addSuccessor(ConnectedButUnauthenticatedStateDescriptor.class); 734 } 735 736 @Override 737 protected XmppTcpTransportModule.EstablishingTcpConnectionState constructState(ModularXmppClientToServerConnectionInternal connectionInternal) { 738 XmppTcpTransportModule tcpTransportModule = connectionInternal.connection.getConnectionModuleFor(XmppTcpTransportModuleDescriptor.class); 739 return tcpTransportModule.constructEstablishingTcpConnectionState(this, connectionInternal); 740 } 741 } 742 743 private EstablishingTcpConnectionState constructEstablishingTcpConnectionState( 744 EstablishingTcpConnectionStateDescriptor stateDescriptor, 745 ModularXmppClientToServerConnectionInternal connectionInternal) { 746 return new EstablishingTcpConnectionState(stateDescriptor, connectionInternal); 747 } 748 749 final class EstablishingTcpConnectionState extends State.AbstractTransport { 750 private EstablishingTcpConnectionState(EstablishingTcpConnectionStateDescriptor stateDescriptor, 751 ModularXmppClientToServerConnectionInternal connectionInternal) { 752 super(tcpNioTransport, stateDescriptor, connectionInternal); 753 } 754 755 @Override 756 public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext) 757 throws InterruptedException, IOException, SmackException, XMPPException { 758 // The fields inetSocketAddress and failedAddresses are handed over from LookupHostAddresses to 759 // ConnectingToHost. 760 ConnectionAttemptState connectionAttemptState = new ConnectionAttemptState(connectionInternal, discoveredTcpEndpoints, 761 this); 762 StateTransitionResult.Failure failure = connectionAttemptState.establishTcpConnection(); 763 if (failure != null) { 764 return failure; 765 } 766 767 socketChannel = connectionAttemptState.socketChannel; 768 remoteAddress = (InetSocketAddress) socketChannel.socket().getRemoteSocketAddress(); 769 770 selectionKey = connectionInternal.registerWithSelector(socketChannel, SelectionKey.OP_READ, 771 XmppTcpTransportModule.this::onChannelSelected); 772 selectionKeyAttachment = (SelectionKeyAttachment) selectionKey.attachment(); 773 774 connectionInternal.setTransport(tcpNioTransport); 775 776 // TODO: It appears this should be done in a generic way. I'd assume we always 777 // have to wait for stream features after the connection was established. If this is true then consider 778 // moving this into State.AbstractTransport. But I am not yet 100% positive that this is the case for every 779 // transport. Hence keep it here for now. 780 connectionInternal.newStreamOpenWaitForFeaturesSequence("stream features after initial connection"); 781 782 return new TcpSocketConnectedResult(remoteAddress); 783 } 784 785 @Override 786 public void resetState() { 787 closeSocketAndCleanup(); 788 } 789 } 790 791 public static final class TcpSocketConnectedResult extends StateTransitionResult.Success { 792 private final InetSocketAddress remoteAddress; 793 794 private TcpSocketConnectedResult(InetSocketAddress remoteAddress) { 795 super("TCP connection established to " + remoteAddress); 796 this.remoteAddress = remoteAddress; 797 } 798 799 public InetSocketAddress getRemoteAddress() { 800 return remoteAddress; 801 } 802 } 803 804 public static final class TlsEstablishedResult extends StateTransitionResult.Success { 805 806 private TlsEstablishedResult(SSLEngine sslEngine) { 807 super("TLS established: " + sslEngine.getSession()); 808 } 809 } 810 811 static final class EstablishTlsStateDescriptor extends StateDescriptor { 812 private EstablishTlsStateDescriptor() { 813 super(XmppTcpTransportModule.EstablishTlsState.class, "RFC 6120 ยง 5"); 814 addSuccessor(ConnectedButUnauthenticatedStateDescriptor.class); 815 declarePrecedenceOver(ConnectedButUnauthenticatedStateDescriptor.class); 816 } 817 818 @Override 819 protected EstablishTlsState constructState(ModularXmppClientToServerConnectionInternal connectionInternal) { 820 XmppTcpTransportModule tcpTransportModule = connectionInternal.connection.getConnectionModuleFor(XmppTcpTransportModuleDescriptor.class); 821 return tcpTransportModule.constructEstablishingTlsState(this, connectionInternal); 822 } 823 } 824 825 private EstablishTlsState constructEstablishingTlsState( 826 EstablishTlsStateDescriptor stateDescriptor, 827 ModularXmppClientToServerConnectionInternal connectionInternal) { 828 return new EstablishTlsState(stateDescriptor, connectionInternal); 829 } 830 831 private final class EstablishTlsState extends State { 832 private EstablishTlsState(EstablishTlsStateDescriptor stateDescriptor, 833 ModularXmppClientToServerConnectionInternal connectionInternal) { 834 super(stateDescriptor, connectionInternal); 835 } 836 837 @Override 838 public StateTransitionResult.TransitionImpossible isTransitionToPossible(WalkStateGraphContext walkStateGraphContext) 839 throws SecurityRequiredByClientException, SecurityRequiredByServerException { 840 StartTls startTlsFeature = connectionInternal.connection.getFeature(StartTls.class); 841 SecurityMode securityMode = connectionInternal.connection.getConfiguration().getSecurityMode(); 842 843 switch (securityMode) { 844 case required: 845 case ifpossible: 846 if (startTlsFeature == null) { 847 if (securityMode == SecurityMode.ifpossible) { 848 return new StateTransitionResult.TransitionImpossibleReason("Server does not announce support for TLS and we do not required it"); 849 } 850 throw new SecurityRequiredByClientException(); 851 } 852 // Allows transition by returning null. 853 return null; 854 case disabled: 855 if (startTlsFeature != null && startTlsFeature.required()) { 856 throw new SecurityRequiredByServerException(); 857 } 858 return new StateTransitionResult.TransitionImpossibleReason("TLS disabled in client settings and server does not require it"); 859 default: 860 throw new AssertionError("Unknown security mode: " + securityMode); 861 } 862 } 863 864 @Override 865 public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext) 866 throws IOException, InterruptedException, SmackException, XMPPException { 867 connectionInternal.sendAndWaitForResponse(StartTls.INSTANCE, TlsProceed.class, TlsFailure.class); 868 869 SmackTlsContext smackTlsContext = connectionInternal.getSmackTlsContext(); 870 871 tlsState = new TlsState(smackTlsContext); 872 connectionInternal.addXmppInputOutputFilter(tlsState); 873 874 channelSelectedCallbackLock.lock(); 875 try { 876 pendingOutputFilterData = true; 877 // The beginHandshake() is possibly not really required here, but it does not hurt either. 878 tlsState.engine.beginHandshake(); 879 tlsState.handshakeStatus = TlsHandshakeStatus.initiated; 880 } finally { 881 channelSelectedCallbackLock.unlock(); 882 } 883 connectionInternal.setInterestOps(selectionKey, SelectionKey.OP_WRITE | SelectionKey.OP_READ); 884 885 try { 886 tlsState.waitForHandshakeFinished(); 887 } catch (CertificateException e) { 888 throw new SmackCertificateException(e); 889 } 890 891 connectionInternal.newStreamOpenWaitForFeaturesSequence("stream features after TLS established"); 892 893 return new TlsEstablishedResult(tlsState.engine); 894 } 895 896 @Override 897 public void resetState() { 898 tlsState = null; 899 } 900 } 901 902 private enum TlsHandshakeStatus { 903 initial, 904 initiated, 905 successful, 906 failed, 907 } 908 909 private static final Level SSL_ENGINE_DEBUG_LOG_LEVEL = Level.FINEST; 910 911 private static void debugLogSslEngineResult(String operation, SSLEngineResult result) { 912 if (!LOGGER.isLoggable(SSL_ENGINE_DEBUG_LOG_LEVEL)) { 913 return; 914 } 915 916 LOGGER.log(SSL_ENGINE_DEBUG_LOG_LEVEL, "SSLEngineResult of " + operation + "(): " + result); 917 } 918 919 private final class TlsState implements XmppInputOutputFilter { 920 921 private static final int MAX_PENDING_OUTPUT_BYTES = 8096; 922 923 private final SmackTlsContext smackTlsContext; 924 private final SSLEngine engine; 925 926 private TlsHandshakeStatus handshakeStatus = TlsHandshakeStatus.initial; 927 private SSLException handshakeException; 928 929 private ByteBuffer myNetData; 930 private ByteBuffer peerAppData; 931 932 private final List<ByteBuffer> pendingOutputData = new ArrayList<>(); 933 private int pendingOutputBytes; 934 private ByteBuffer pendingInputData; 935 936 private final AtomicInteger pendingDelegatedTasks = new AtomicInteger(); 937 938 private long wrapInBytes; 939 private long wrapOutBytes; 940 941 private long unwrapInBytes; 942 private long unwrapOutBytes; 943 944 private TlsState(SmackTlsContext smackTlsContext) throws IOException { 945 this.smackTlsContext = smackTlsContext; 946 947 // Call createSSLEngine()'s variant with two parameters as this allows for TLS session resumption. 948 949 // Note that it is not really clear what the value of peer host should be. It could be A) the XMPP service's 950 // domainpart or B) the DNS name of the host we are connecting to (usually the DNS SRV RR target name). While 951 // the javadoc of createSSLEngine(String, int) indicates with "Some cipher suites (such as Kerberos) require 952 // remote hostname information, in which case peerHost needs to be specified." that A should be used. TLS 953 // session resumption may would need or at least benefit from B. Variant A would also be required if the 954 // String is used for certificate verification. And it appears at least likely that TLS session resumption 955 // would not be hurt by using variant A. Therefore we currently use variant A. 956 // TODO: Should we use the ACE representation of the XMPP service domain? Compare with f60e4055ec529f0b8160acedf13275592ab10a4b 957 // If yes, then we should probably introduce getXmppServiceDomainAceEncodedIfPossible(). 958 String peerHost = connectionInternal.connection.getConfiguration().getXMPPServiceDomain().toString(); 959 engine = smackTlsContext.sslContext.createSSLEngine(peerHost, remoteAddress.getPort()); 960 engine.setUseClientMode(true); 961 962 SSLSession session = engine.getSession(); 963 int applicationBufferSize = session.getApplicationBufferSize(); 964 int packetBufferSize = session.getPacketBufferSize(); 965 966 myNetData = ByteBuffer.allocateDirect(packetBufferSize); 967 peerAppData = ByteBuffer.allocate(applicationBufferSize); 968 } 969 970 @Override 971 public OutputResult output(ByteBuffer outputData, boolean isFinalDataOfElement, boolean destinationAddressChanged, 972 boolean moreDataAvailable) throws SSLException { 973 if (outputData != null) { 974 pendingOutputData.add(outputData); 975 pendingOutputBytes += outputData.remaining(); 976 if (moreDataAvailable && pendingOutputBytes < MAX_PENDING_OUTPUT_BYTES) { 977 return OutputResult.NO_OUTPUT; 978 } 979 } 980 981 ByteBuffer[] outputDataArray = pendingOutputData.toArray(new ByteBuffer[pendingOutputData.size()]); 982 983 myNetData.clear(); 984 985 while (true) { 986 SSLEngineResult result; 987 try { 988 result = engine.wrap(outputDataArray, myNetData); 989 } catch (SSLException e) { 990 handleSslException(e); 991 throw e; 992 } 993 994 debugLogSslEngineResult("wrap", result); 995 996 SSLEngineResult.Status engineResultStatus = result.getStatus(); 997 998 pendingOutputBytes -= result.bytesConsumed(); 999 1000 if (engineResultStatus == SSLEngineResult.Status.OK) { 1001 wrapInBytes += result.bytesConsumed(); 1002 wrapOutBytes += result.bytesProduced(); 1003 1004 SSLEngineResult.HandshakeStatus handshakeStatus = handleHandshakeStatus(result); 1005 switch (handshakeStatus) { 1006 case NEED_UNWRAP: 1007 // NEED_UNWRAP means that we need to receive something in order to continue the handshake. The 1008 // standard channelSelectedCallback logic will take care of this, as there is eventually always 1009 // a interest to read from the socket. 1010 break; 1011 case NEED_WRAP: 1012 // Same as need task: Cycle the reactor. 1013 case NEED_TASK: 1014 // Note that we also set pendingOutputFilterData in the OutputResult in the NEED_TASK case, as 1015 // we also want to retry the wrap() operation above in this case. 1016 return new OutputResult(true, myNetData); 1017 default: 1018 break; 1019 } 1020 } 1021 1022 switch (engineResultStatus) { 1023 case OK: 1024 // No need to outputData.compact() here, since we do not reuse the buffer. 1025 // Clean up the pending output data. 1026 pruneBufferList(pendingOutputData); 1027 return new OutputResult(!pendingOutputData.isEmpty(), myNetData); 1028 case CLOSED: 1029 pendingOutputData.clear(); 1030 return OutputResult.NO_OUTPUT; 1031 case BUFFER_OVERFLOW: 1032 LOGGER.warning("SSLEngine status BUFFER_OVERFLOW, this is hopefully uncommon"); 1033 int outputDataRemaining = outputData != null ? outputData.remaining() : 0; 1034 int newCapacity = (int) (1.3 * outputDataRemaining); 1035 // If newCapacity would not increase myNetData, then double it. 1036 if (newCapacity <= myNetData.capacity()) { 1037 newCapacity = 2 * myNetData.capacity(); 1038 } 1039 ByteBuffer newMyNetData = ByteBuffer.allocateDirect(newCapacity); 1040 myNetData.flip(); 1041 newMyNetData.put(myNetData); 1042 myNetData = newMyNetData; 1043 continue; 1044 case BUFFER_UNDERFLOW: 1045 throw new IllegalStateException( 1046 "Buffer underflow as result of SSLEngine.wrap() should never happen"); 1047 } 1048 } 1049 } 1050 1051 @SuppressWarnings("ReferenceEquality") 1052 @Override 1053 public ByteBuffer input(ByteBuffer inputData) throws SSLException { 1054 ByteBuffer accumulatedData; 1055 if (pendingInputData == null) { 1056 accumulatedData = inputData; 1057 } else { 1058 assert pendingInputData != inputData; 1059 1060 int accumulatedDataBytes = pendingInputData.remaining() + inputData.remaining(); 1061 accumulatedData = ByteBuffer.allocate(accumulatedDataBytes); 1062 accumulatedData.put(pendingInputData) 1063 .put(inputData) 1064 .flip(); 1065 pendingInputData = null; 1066 } 1067 1068 peerAppData.clear(); 1069 1070 while (true) { 1071 SSLEngineResult result; 1072 try { 1073 result = engine.unwrap(accumulatedData, peerAppData); 1074 } catch (SSLException e) { 1075 handleSslException(e); 1076 throw e; 1077 } 1078 1079 debugLogSslEngineResult("unwrap", result); 1080 1081 SSLEngineResult.Status engineResultStatus = result.getStatus(); 1082 1083 if (engineResultStatus == SSLEngineResult.Status.OK) { 1084 unwrapInBytes += result.bytesConsumed(); 1085 unwrapOutBytes += result.bytesProduced(); 1086 1087 SSLEngineResult.HandshakeStatus handshakeStatus = handleHandshakeStatus(result); 1088 switch (handshakeStatus) { 1089 case NEED_TASK: 1090 // A delegated task is asynchronously running. Take care of the remaining accumulatedData. 1091 addAsPendingInputData(accumulatedData); 1092 // Return here, as the async task created by handleHandshakeStatus will continue calling the 1093 // cannelSelectedCallback. 1094 return null; 1095 case NEED_UNWRAP: 1096 continue; 1097 case NEED_WRAP: 1098 // NEED_WRAP means that the SSLEngine needs to send data, probably without consuming data. 1099 // We exploit here the fact that the channelSelectedCallback is single threaded and that the 1100 // input processing is after the output processing. 1101 addAsPendingInputData(accumulatedData); 1102 // Note that it is ok that we the provided argument for pending input filter data to channel 1103 // selected callback is false, as setPendingInputFilterData() will have set the internal state 1104 // boolean accordingly. 1105 connectionInternal.asyncGo(() -> callChannelSelectedCallback(false, true)); 1106 // Do not break here, but instead return and let the asynchronously invoked 1107 // callChannelSelectedCallback() do its work. 1108 return null; 1109 default: 1110 break; 1111 } 1112 } 1113 1114 switch (engineResultStatus) { 1115 case OK: 1116 // SSLEngine's unwrap() may not consume all bytes from the source buffer. If this is the case, then 1117 // simply perform another unwrap until accumlatedData has no remaining bytes. 1118 if (accumulatedData.hasRemaining()) { 1119 continue; 1120 } 1121 return peerAppData; 1122 case CLOSED: 1123 return null; 1124 case BUFFER_UNDERFLOW: 1125 // There were not enough source bytes available to make a complete packet. Let it in 1126 // pendingInputData. Note that we do not resize SSLEngine's source buffer - inputData in our case - 1127 // as it is not possible. 1128 addAsPendingInputData(accumulatedData); 1129 return null; 1130 case BUFFER_OVERFLOW: 1131 int applicationBufferSize = engine.getSession().getApplicationBufferSize(); 1132 assert peerAppData.remaining() < applicationBufferSize; 1133 peerAppData = ByteBuffer.allocate(applicationBufferSize); 1134 continue; 1135 } 1136 } 1137 } 1138 1139 private void addAsPendingInputData(ByteBuffer byteBuffer) { 1140 // Note that we can not simply write 1141 // pendingInputData = byteBuffer; 1142 // we have to copy the provided byte buffer, because it is possible that this byteBuffer is re-used by some 1143 // higher layer. That is, here 'byteBuffer' is typically 'incomingBuffer', which is a direct buffer only 1144 // allocated once per connection for performance reasons and hence re-used for read() calls. 1145 pendingInputData = ByteBuffer.allocate(byteBuffer.remaining()); 1146 pendingInputData.put(byteBuffer).flip(); 1147 1148 pendingInputFilterData = pendingInputData.hasRemaining(); 1149 } 1150 1151 private SSLEngineResult.HandshakeStatus handleHandshakeStatus(SSLEngineResult sslEngineResult) { 1152 SSLEngineResult.HandshakeStatus handshakeStatus = sslEngineResult.getHandshakeStatus(); 1153 switch (handshakeStatus) { 1154 case NEED_TASK: 1155 while (true) { 1156 final Runnable delegatedTask = engine.getDelegatedTask(); 1157 if (delegatedTask == null) { 1158 break; 1159 } 1160 sslEngineDelegatedTasks++; 1161 int currentPendingDelegatedTasks = pendingDelegatedTasks.incrementAndGet(); 1162 if (currentPendingDelegatedTasks > maxPendingSslEngineDelegatedTasks) { 1163 maxPendingSslEngineDelegatedTasks = currentPendingDelegatedTasks; 1164 } 1165 1166 Runnable wrappedDelegatedTask = () -> { 1167 delegatedTask.run(); 1168 int wrappedCurrentPendingDelegatedTasks = pendingDelegatedTasks.decrementAndGet(); 1169 if (wrappedCurrentPendingDelegatedTasks == 0) { 1170 callChannelSelectedCallback(true, true); 1171 } 1172 }; 1173 connectionInternal.asyncGo(wrappedDelegatedTask); 1174 } 1175 break; 1176 case FINISHED: 1177 onHandshakeFinished(); 1178 break; 1179 default: 1180 break; 1181 } 1182 1183 SSLEngineResult.HandshakeStatus afterHandshakeStatus = engine.getHandshakeStatus(); 1184 return afterHandshakeStatus; 1185 } 1186 1187 private void handleSslException(SSLException e) { 1188 handshakeException = e; 1189 handshakeStatus = TlsHandshakeStatus.failed; 1190 connectionInternal.notifyWaitingThreads(); 1191 } 1192 1193 private void onHandshakeFinished() { 1194 handshakeStatus = TlsHandshakeStatus.successful; 1195 connectionInternal.notifyWaitingThreads(); 1196 } 1197 1198 private boolean isHandshakeFinished() { 1199 return handshakeStatus == TlsHandshakeStatus.successful || handshakeStatus == TlsHandshakeStatus.failed; 1200 } 1201 1202 private void waitForHandshakeFinished() throws InterruptedException, CertificateException, SSLException, SmackException, XMPPException { 1203 connectionInternal.waitForConditionOrThrowConnectionException(() -> isHandshakeFinished(), "TLS handshake to finish"); 1204 1205 if (handshakeStatus == TlsHandshakeStatus.failed) { 1206 throw handshakeException; 1207 } 1208 1209 assert handshakeStatus == TlsHandshakeStatus.successful; 1210 1211 if (smackTlsContext.daneVerifier != null) { 1212 smackTlsContext.daneVerifier.finish(engine.getSession()); 1213 } 1214 } 1215 1216 @Override 1217 public Object getStats() { 1218 return new TlsStateStats(this); 1219 } 1220 1221 @Override 1222 public void closeInputOutput() { 1223 engine.closeOutbound(); 1224 try { 1225 engine.closeInbound(); 1226 } catch (SSLException e) { 1227 LOGGER.log(Level.FINEST, 1228 "SSLException when closing inbound TLS session. This can likely be ignored if a possible truncation attack is suggested." 1229 + " You may want to ask your XMPP server vendor to implement a clean TLS session shutdown sending close_notify after </stream>", 1230 e); 1231 } 1232 } 1233 1234 @Override 1235 public void waitUntilInputOutputClosed() throws IOException, CertificateException, InterruptedException, 1236 SmackException, XMPPException { 1237 waitForHandshakeFinished(); 1238 } 1239 1240 @Override 1241 public String getFilterName() { 1242 return "TLS (" + engine + ')'; 1243 } 1244 } 1245 1246 public static final class TlsStateStats { 1247 public final long wrapInBytes; 1248 public final long wrapOutBytes; 1249 public final double wrapRatio; 1250 1251 public final long unwrapInBytes; 1252 public final long unwrapOutBytes; 1253 public final double unwrapRatio; 1254 1255 private TlsStateStats(TlsState tlsState) { 1256 wrapOutBytes = tlsState.wrapOutBytes; 1257 wrapInBytes = tlsState.wrapInBytes; 1258 wrapRatio = (double) wrapOutBytes / wrapInBytes; 1259 1260 unwrapOutBytes = tlsState.unwrapOutBytes; 1261 unwrapInBytes = tlsState.unwrapInBytes; 1262 unwrapRatio = (double) unwrapInBytes / unwrapOutBytes; 1263 } 1264 1265 private transient String toStringCache; 1266 1267 @Override 1268 public String toString() { 1269 if (toStringCache != null) { 1270 return toStringCache; 1271 } 1272 1273 toStringCache = 1274 "wrap-in-bytes: " + wrapInBytes + '\n' 1275 + "wrap-out-bytes: " + wrapOutBytes + '\n' 1276 + "wrap-ratio: " + wrapRatio + '\n' 1277 + "unwrap-in-bytes: " + unwrapInBytes + '\n' 1278 + "unwrap-out-bytes: " + unwrapOutBytes + '\n' 1279 + "unwrap-ratio: " + unwrapRatio + '\n' 1280 ; 1281 1282 return toStringCache; 1283 } 1284 } 1285 1286 private void callChannelSelectedCallback(boolean setPendingInputFilterData, boolean setPendingOutputFilterData) { 1287 final SocketChannel channel = socketChannel; 1288 final SelectionKey key = selectionKey; 1289 if (channel == null || key == null) { 1290 LOGGER.info("Not calling channel selected callback because the connection was eventually disconnected"); 1291 return; 1292 } 1293 1294 channelSelectedCallbackLock.lock(); 1295 try { 1296 // Note that it is important that we send the pending(Input|Output)FilterData flags while holding the lock. 1297 if (setPendingInputFilterData) { 1298 pendingInputFilterData = true; 1299 } 1300 if (setPendingOutputFilterData) { 1301 pendingOutputFilterData = true; 1302 } 1303 1304 onChannelSelected(channel, key); 1305 } finally { 1306 channelSelectedCallbackLock.unlock(); 1307 } 1308 } 1309 1310 private void closeSocketAndCleanup() { 1311 final SelectionKey selectionKey = this.selectionKey; 1312 if (selectionKey != null) { 1313 selectionKey.cancel(); 1314 } 1315 final SocketChannel socketChannel = this.socketChannel; 1316 if (socketChannel != null) { 1317 try { 1318 socketChannel.close(); 1319 } catch (IOException e) { 1320 LOGGER.log(Level.FINE, "Closing the socket channel failed", e); 1321 } 1322 } 1323 1324 this.selectionKey = null; 1325 this.socketChannel = null; 1326 1327 selectionKeyAttachment = null; 1328 remoteAddress = null; 1329 } 1330 1331 private static List<? extends Buffer> pruneBufferList(Collection<? extends Buffer> buffers) { 1332 return CollectionUtil.removeUntil(buffers, b -> b.hasRemaining()); 1333 } 1334 1335 public XmppTcpTransportModule.Stats getStats() { 1336 return new Stats(this); 1337 } 1338 1339 public static final class Stats extends XmppClientToServerTransport.Stats { 1340 public final long totalBytesWritten; 1341 public final long totalBytesWrittenBeforeFilter; 1342 public final double writeRatio; 1343 1344 public final long totalBytesRead; 1345 public final long totalBytesReadAfterFilter; 1346 public final double readRatio; 1347 1348 public final long handledChannelSelectedCallbacks; 1349 public final long setWriteInterestAfterChannelSelectedCallback; 1350 public final long reactorThreadAlreadyRacing; 1351 public final long afterOutgoingElementsQueueModifiedSetInterestOps; 1352 public final long rejectedChannelSelectedCallbacks; 1353 public final long totalCallbackRequests; 1354 public final long callbackPreemtBecauseBytesWritten; 1355 public final long callbackPreemtBecauseBytesRead; 1356 public final int sslEngineDelegatedTasks; 1357 public final int maxPendingSslEngineDelegatedTasks; 1358 1359 private Stats(XmppTcpTransportModule connection) { 1360 totalBytesWritten = connection.totalBytesWritten; 1361 totalBytesWrittenBeforeFilter = connection.totalBytesWrittenBeforeFilter; 1362 writeRatio = (double) totalBytesWritten / totalBytesWrittenBeforeFilter; 1363 1364 totalBytesReadAfterFilter = connection.totalBytesReadAfterFilter; 1365 totalBytesRead = connection.totalBytesRead; 1366 readRatio = (double) totalBytesRead / totalBytesReadAfterFilter; 1367 1368 handledChannelSelectedCallbacks = connection.handledChannelSelectedCallbacks; 1369 setWriteInterestAfterChannelSelectedCallback = connection.setWriteInterestAfterChannelSelectedCallback.get(); 1370 reactorThreadAlreadyRacing = connection.reactorThreadAlreadyRacing.get(); 1371 afterOutgoingElementsQueueModifiedSetInterestOps = connection.afterOutgoingElementsQueueModifiedSetInterestOps 1372 .get(); 1373 rejectedChannelSelectedCallbacks = connection.rejectedChannelSelectedCallbacks.get(); 1374 1375 totalCallbackRequests = handledChannelSelectedCallbacks + rejectedChannelSelectedCallbacks; 1376 1377 callbackPreemtBecauseBytesRead = connection.callbackPreemtBecauseBytesRead; 1378 callbackPreemtBecauseBytesWritten = connection.callbackPreemtBecauseBytesWritten; 1379 1380 sslEngineDelegatedTasks = connection.sslEngineDelegatedTasks; 1381 maxPendingSslEngineDelegatedTasks = connection.maxPendingSslEngineDelegatedTasks; 1382 } 1383 1384 private transient String toStringCache; 1385 1386 @Override 1387 public String toString() { 1388 if (toStringCache != null) { 1389 return toStringCache; 1390 } 1391 1392 toStringCache = 1393 "Total bytes\n" 1394 + "recv: " + totalBytesRead + '\n' 1395 + "send: " + totalBytesWritten + '\n' 1396 + "recv-aft-filter: " + totalBytesReadAfterFilter + '\n' 1397 + "send-bef-filter: " + totalBytesWrittenBeforeFilter + '\n' 1398 + "read-ratio: " + readRatio + '\n' 1399 + "write-ratio: " + writeRatio + '\n' 1400 + "Events\n" 1401 + "total-callback-requests: " + totalCallbackRequests + '\n' 1402 + "handled-channel-selected-callbacks: " + handledChannelSelectedCallbacks + '\n' 1403 + "rejected-channel-selected-callbacks: " + rejectedChannelSelectedCallbacks + '\n' 1404 + "set-write-interest-after-callback: " + setWriteInterestAfterChannelSelectedCallback + '\n' 1405 + "reactor-thread-already-racing: " + reactorThreadAlreadyRacing + '\n' 1406 + "after-queue-modified-set-interest-ops: " + afterOutgoingElementsQueueModifiedSetInterestOps + '\n' 1407 + "callback-preemt-because-bytes-read: " + callbackPreemtBecauseBytesRead + '\n' 1408 + "callback-preemt-because-bytes-written: " + callbackPreemtBecauseBytesWritten + '\n' 1409 + "ssl-engine-delegated-tasks: " + sslEngineDelegatedTasks + '\n' 1410 + "max-pending-ssl-engine-delegated-tasks: " + maxPendingSslEngineDelegatedTasks + '\n' 1411 ; 1412 1413 return toStringCache; 1414 } 1415 } 1416}