XmppTcpTransportModule.java

  1. /**
  2.  *
  3.  * Copyright 2019-2021 Florian Schmaus
  4.  *
  5.  * Licensed under the Apache License, Version 2.0 (the "License");
  6.  * you may not use this file except in compliance with the License.
  7.  * You may obtain a copy of the License at
  8.  *
  9.  *     http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */
  17. package org.jivesoftware.smack.tcp;

  18. import java.io.IOException;
  19. import java.net.InetSocketAddress;
  20. import java.nio.Buffer;
  21. import java.nio.ByteBuffer;
  22. import java.nio.channels.ClosedChannelException;
  23. import java.nio.channels.SelectableChannel;
  24. import java.nio.channels.SelectionKey;
  25. import java.nio.channels.SocketChannel;
  26. import java.security.cert.CertificateException;
  27. import java.util.ArrayList;
  28. import java.util.Collection;
  29. import java.util.Collections;
  30. import java.util.IdentityHashMap;
  31. import java.util.Iterator;
  32. import java.util.List;
  33. import java.util.ListIterator;
  34. import java.util.Map;
  35. import java.util.concurrent.atomic.AtomicInteger;
  36. import java.util.concurrent.atomic.AtomicLong;
  37. import java.util.concurrent.locks.ReentrantLock;
  38. import java.util.logging.Level;
  39. import java.util.logging.Logger;

  40. import javax.net.ssl.SSLEngine;
  41. import javax.net.ssl.SSLEngineResult;
  42. import javax.net.ssl.SSLException;
  43. import javax.net.ssl.SSLSession;

  44. import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
  45. import org.jivesoftware.smack.SmackException;
  46. import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException;
  47. import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException;
  48. import org.jivesoftware.smack.SmackException.SmackCertificateException;
  49. import org.jivesoftware.smack.SmackFuture;
  50. import org.jivesoftware.smack.SmackFuture.InternalSmackFuture;
  51. import org.jivesoftware.smack.SmackReactor.SelectionKeyAttachment;
  52. import org.jivesoftware.smack.XMPPException;
  53. import org.jivesoftware.smack.XmppInputOutputFilter;
  54. import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnection.ConnectedButUnauthenticatedStateDescriptor;
  55. import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnection.LookupRemoteConnectionEndpointsStateDescriptor;
  56. import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnectionModule;
  57. import org.jivesoftware.smack.c2s.StreamOpenAndCloseFactory;
  58. import org.jivesoftware.smack.c2s.XmppClientToServerTransport;
  59. import org.jivesoftware.smack.c2s.internal.ModularXmppClientToServerConnectionInternal;
  60. import org.jivesoftware.smack.c2s.internal.WalkStateGraphContext;
  61. import org.jivesoftware.smack.debugger.SmackDebugger;
  62. import org.jivesoftware.smack.fsm.State;
  63. import org.jivesoftware.smack.fsm.StateDescriptor;
  64. import org.jivesoftware.smack.fsm.StateTransitionResult;
  65. import org.jivesoftware.smack.internal.SmackTlsContext;
  66. import org.jivesoftware.smack.packet.Stanza;
  67. import org.jivesoftware.smack.packet.StartTls;
  68. import org.jivesoftware.smack.packet.StreamClose;
  69. import org.jivesoftware.smack.packet.StreamOpen;
  70. import org.jivesoftware.smack.packet.TlsFailure;
  71. import org.jivesoftware.smack.packet.TlsProceed;
  72. import org.jivesoftware.smack.packet.TopLevelStreamElement;
  73. import org.jivesoftware.smack.packet.XmlEnvironment;
  74. import org.jivesoftware.smack.tcp.XmppTcpTransportModule.XmppTcpNioTransport.DiscoveredTcpEndpoints;
  75. import org.jivesoftware.smack.tcp.rce.RemoteXmppTcpConnectionEndpoints;
  76. import org.jivesoftware.smack.tcp.rce.RemoteXmppTcpConnectionEndpoints.Result;
  77. import org.jivesoftware.smack.tcp.rce.Rfc6120TcpRemoteConnectionEndpoint;
  78. import org.jivesoftware.smack.util.CollectionUtil;
  79. import org.jivesoftware.smack.util.StringUtils;
  80. import org.jivesoftware.smack.util.UTF8;
  81. import org.jivesoftware.smack.util.XmlStringBuilder;
  82. import org.jivesoftware.smack.util.rce.RemoteConnectionEndpointLookupFailure;

  83. import org.jxmpp.jid.DomainBareJid;
  84. import org.jxmpp.jid.Jid;
  85. import org.jxmpp.jid.util.JidUtil;
  86. import org.jxmpp.xml.splitter.Utf8ByteXmppXmlSplitter;
  87. import org.jxmpp.xml.splitter.XmlPrettyPrinter;
  88. import org.jxmpp.xml.splitter.XmlPrinter;
  89. import org.jxmpp.xml.splitter.XmppElementCallback;
  90. import org.jxmpp.xml.splitter.XmppXmlSplitter;

  91. public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionModule<XmppTcpTransportModuleDescriptor> {

  92.     private static final Logger LOGGER = Logger.getLogger(XmppTcpTransportModule.class.getName());

  93.     private static final int CALLBACK_MAX_BYTES_READ = 10 * 1024 * 1024;
  94.     private static final int CALLBACK_MAX_BYTES_WRITEN = CALLBACK_MAX_BYTES_READ;

  95.     private static final int MAX_ELEMENT_SIZE = 64 * 1024;

  96.     private final XmppTcpNioTransport tcpNioTransport;

  97.     private SelectionKey selectionKey;
  98.     private SelectionKeyAttachment selectionKeyAttachment;
  99.     private SocketChannel socketChannel;
  100.     private InetSocketAddress remoteAddress;

  101.     private TlsState tlsState;

  102.     private Iterator<CharSequence> outgoingCharSequenceIterator;

  103.     private final List<TopLevelStreamElement> currentlyOutgoingElements = new ArrayList<>();
  104.     private final IdentityHashMap<ByteBuffer, List<TopLevelStreamElement>> bufferToElementMap = new IdentityHashMap<>();

  105.     private ByteBuffer outgoingBuffer;
  106.     private ByteBuffer filteredOutgoingBuffer;
  107.     private final List<ByteBuffer> networkOutgoingBuffers = new ArrayList<>();
  108.     private long networkOutgoingBuffersBytes;

  109.     // TODO: Make the size of the incomingBuffer configurable.
  110.     private final ByteBuffer incomingBuffer = ByteBuffer.allocateDirect(2 * 4096);

  111.     private final ReentrantLock channelSelectedCallbackLock = new ReentrantLock();

  112.     private long totalBytesRead;
  113.     private long totalBytesWritten;
  114.     private long totalBytesReadAfterFilter;
  115.     private long totalBytesWrittenBeforeFilter;
  116.     private long handledChannelSelectedCallbacks;
  117.     private long callbackPreemtBecauseBytesWritten;
  118.     private long callbackPreemtBecauseBytesRead;
  119.     private int sslEngineDelegatedTasks;
  120.     private int maxPendingSslEngineDelegatedTasks;

  121.     // TODO: Use LongAdder once Smack's minimum Android API level is 24 or higher.
  122.     private final AtomicLong setWriteInterestAfterChannelSelectedCallback = new AtomicLong();
  123.     private final AtomicLong reactorThreadAlreadyRacing = new AtomicLong();
  124.     private final AtomicLong afterOutgoingElementsQueueModifiedSetInterestOps = new AtomicLong();
  125.     private final AtomicLong rejectedChannelSelectedCallbacks = new AtomicLong();

  126.     private Jid lastDestinationAddress;

  127.     private boolean pendingInputFilterData;
  128.     private boolean pendingOutputFilterData;

  129.     private boolean pendingWriteInterestAfterRead;

  130.     /**
  131.      * Note that this field is effective final, but due to https://stackoverflow.com/q/30360824/194894 we have to declare it non-final.
  132.      */
  133.     private Utf8ByteXmppXmlSplitter splitter;

  134.     /**
  135.      * Note that this field is effective final, but due to https://stackoverflow.com/q/30360824/194894 we have to declare it non-final.
  136.      */
  137.     private XmppXmlSplitter outputDebugSplitter;

  138.     private static final Level STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL = Level.FINER;

  139.     XmppTcpTransportModule(XmppTcpTransportModuleDescriptor moduleDescriptor, ModularXmppClientToServerConnectionInternal connectionInternal) {
  140.         super(moduleDescriptor, connectionInternal);

  141.         tcpNioTransport = new XmppTcpNioTransport(connectionInternal);

  142.         XmlPrinter incomingDebugPrettyPrinter = null;
  143.         final SmackDebugger debugger = connectionInternal.smackDebugger;
  144.         if (debugger != null) {
  145.             // Incoming stream debugging.
  146.             incomingDebugPrettyPrinter = XmlPrettyPrinter.builder()
  147.                     .setPrettyWriter(sb -> debugger.incomingStreamSink(sb))
  148.                     .build();

  149.             // Outgoing stream debugging.
  150.             XmlPrinter outgoingDebugPrettyPrinter = XmlPrettyPrinter.builder()
  151.                     .setPrettyWriter(sb -> debugger.outgoingStreamSink(sb))
  152.                     .build();
  153.             outputDebugSplitter = new XmppXmlSplitter(outgoingDebugPrettyPrinter);
  154.         }

  155.         XmppXmlSplitter xmppXmlSplitter = new XmppXmlSplitter(MAX_ELEMENT_SIZE, xmppElementCallback,
  156.                 incomingDebugPrettyPrinter);
  157.         splitter = new Utf8ByteXmppXmlSplitter(xmppXmlSplitter);
  158.     }

  159.     private final XmppElementCallback xmppElementCallback = new XmppElementCallback() {
  160.         private String streamOpen;
  161.         private String streamClose;

  162.         @Override
  163.         public void onCompleteElement(String completeElement) {
  164.             assert streamOpen != null;
  165.             assert streamClose != null;

  166.             connectionInternal.withSmackDebugger(debugger -> debugger.onIncomingElementCompleted());

  167.             String wrappedCompleteElement = streamOpen + completeElement + streamClose;
  168.             connectionInternal.parseAndProcessElement(wrappedCompleteElement);
  169.         }


  170.         @Override
  171.         public void streamOpened(String prefix, Map<String, String> attributes) {
  172.             if (LOGGER.isLoggable(STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL)) {
  173.                 LOGGER.log(STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL,
  174.                                 "Stream of " + this + " opened. prefix=" + prefix + " attributes=" + attributes);
  175.             }

  176.             final String prefixXmlns = "xmlns:" + prefix;
  177.             // TODO: Use the return value of onStreamOpen(), which now returns the
  178.             // corresponding stream close tag, instead of creating it here.
  179.             final StringBuilder streamClose = new StringBuilder(32);
  180.             final StringBuilder streamOpen = new StringBuilder(256);

  181.             streamOpen.append('<');
  182.             streamClose.append("</");
  183.             if (StringUtils.isNotEmpty(prefix)) {
  184.                 streamOpen.append(prefix).append(':');
  185.                 streamClose.append(prefix).append(':');
  186.             }
  187.             streamOpen.append("stream");
  188.             streamClose.append("stream>");
  189.             for (Map.Entry<String, String> entry : attributes.entrySet()) {
  190.                 String attributeName = entry.getKey();
  191.                 String attributeValue = entry.getValue();
  192.                 switch (attributeName) {
  193.                 case "to":
  194.                 case "from":
  195.                 case "id":
  196.                 case "version":
  197.                     break;
  198.                 case "xml:lang":
  199.                     streamOpen.append(" xml:lang='").append(attributeValue).append('\'');
  200.                     break;
  201.                 case "xmlns":
  202.                     streamOpen.append(" xmlns='").append(attributeValue).append('\'');
  203.                     break;
  204.                 default:
  205.                     if (attributeName.equals(prefixXmlns)) {
  206.                         streamOpen.append(' ').append(prefixXmlns).append("='").append(attributeValue).append('\'');
  207.                         break;
  208.                     }
  209.                     LOGGER.info("Unknown <stream/> attribute: " + attributeName);
  210.                     break;
  211.                 }
  212.             }
  213.             streamOpen.append('>');

  214.             this.streamOpen = streamOpen.toString();
  215.             this.streamClose = streamClose.toString();

  216.             connectionInternal.onStreamOpen(this.streamOpen);
  217.         }

  218.         @Override
  219.         public void streamClosed() {
  220.             if (LOGGER.isLoggable(STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL)) {
  221.                 LOGGER.log(STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL, "Stream of " + this + " closed");
  222.             }

  223.            connectionInternal.onStreamClosed();
  224.         }
  225.     };

  226.     private void onChannelSelected(SelectableChannel selectedChannel, SelectionKey selectedSelectionKey) {
  227.         assert selectionKey == null || selectionKey == selectedSelectionKey;
  228.         SocketChannel selectedSocketChannel = (SocketChannel) selectedChannel;
  229.         // We are *always* interested in OP_READ.
  230.         int newInterestedOps = SelectionKey.OP_READ;
  231.         boolean newPendingOutputFilterData = false;

  232.         if (!channelSelectedCallbackLock.tryLock()) {
  233.             rejectedChannelSelectedCallbacks.incrementAndGet();
  234.             return;
  235.         }

  236.         handledChannelSelectedCallbacks++;

  237.         long callbackBytesRead = 0;
  238.         long callbackBytesWritten = 0;

  239.         try {
  240.             boolean destinationAddressChanged = false;
  241.             boolean isLastPartOfElement = false;
  242.             TopLevelStreamElement currentlyOutgonigTopLevelStreamElement = null;
  243.             StringBuilder outgoingStreamForDebugger = null;

  244.             writeLoop: while (true) {
  245.                 final boolean moreDataAvailable = !isLastPartOfElement || !connectionInternal.outgoingElementsQueue.isEmpty();

  246.                 if (filteredOutgoingBuffer != null || !networkOutgoingBuffers.isEmpty()) {
  247.                     if (filteredOutgoingBuffer != null) {
  248.                         networkOutgoingBuffers.add(filteredOutgoingBuffer);
  249.                         networkOutgoingBuffersBytes += filteredOutgoingBuffer.remaining();

  250.                         filteredOutgoingBuffer = null;
  251.                         if (moreDataAvailable && networkOutgoingBuffersBytes < 8096) {
  252.                             continue;
  253.                         }
  254.                     }

  255.                     ByteBuffer[] output = networkOutgoingBuffers.toArray(new ByteBuffer[networkOutgoingBuffers.size()]);
  256.                     long bytesWritten;
  257.                     try {
  258.                         bytesWritten = selectedSocketChannel.write(output);
  259.                     } catch (IOException e) {
  260.                         // We have seen here so far
  261.                         // - IOException "Broken pipe"
  262.                         handleReadWriteIoException(e);
  263.                         break;
  264.                     }

  265.                     if (bytesWritten == 0) {
  266.                         newInterestedOps |= SelectionKey.OP_WRITE;
  267.                         break;
  268.                     }

  269.                     callbackBytesWritten += bytesWritten;

  270.                     networkOutgoingBuffersBytes -= bytesWritten;

  271.                     List<? extends Buffer> prunedBuffers = pruneBufferList(networkOutgoingBuffers);

  272.                     for (Buffer prunedBuffer : prunedBuffers) {
  273.                         List<TopLevelStreamElement> sendElements = bufferToElementMap.remove(prunedBuffer);
  274.                         if (sendElements == null) {
  275.                             continue;
  276.                         }
  277.                         for (TopLevelStreamElement elementJustSend : sendElements) {
  278.                             connectionInternal.fireFirstLevelElementSendListeners(elementJustSend);
  279.                         }
  280.                     }

  281.                     // Prevent one callback from dominating the reactor thread. Break out of the write-loop if we have
  282.                     // written a certain amount.
  283.                     if (callbackBytesWritten > CALLBACK_MAX_BYTES_WRITEN) {
  284.                         newInterestedOps |= SelectionKey.OP_WRITE;
  285.                         callbackPreemtBecauseBytesWritten++;
  286.                         break;
  287.                     }
  288.                 } else if (outgoingBuffer != null || pendingOutputFilterData) {
  289.                     pendingOutputFilterData = false;

  290.                     if (outgoingBuffer != null) {
  291.                         totalBytesWrittenBeforeFilter += outgoingBuffer.remaining();
  292.                         if (isLastPartOfElement) {
  293.                             assert currentlyOutgonigTopLevelStreamElement != null;
  294.                             currentlyOutgoingElements.add(currentlyOutgonigTopLevelStreamElement);
  295.                         }
  296.                     }

  297.                     ByteBuffer outputFilterInputData = outgoingBuffer;
  298.                     // We can now null the outgoingBuffer since the filter step will take care of it from now on.
  299.                     outgoingBuffer = null;

  300.                     for (ListIterator<XmppInputOutputFilter> it = connectionInternal.getXmppInputOutputFilterBeginIterator(); it.hasNext();) {
  301.                         XmppInputOutputFilter inputOutputFilter = it.next();
  302.                         XmppInputOutputFilter.OutputResult outputResult;
  303.                         try {
  304.                             outputResult = inputOutputFilter.output(outputFilterInputData, isLastPartOfElement,
  305.                                     destinationAddressChanged, moreDataAvailable);
  306.                         } catch (IOException e) {
  307.                             connectionInternal.notifyConnectionError(e);
  308.                             break writeLoop;
  309.                         }
  310.                         newPendingOutputFilterData |= outputResult.pendingFilterData;
  311.                         outputFilterInputData = outputResult.filteredOutputData;
  312.                         if (outputFilterInputData != null) {
  313.                             outputFilterInputData.flip();
  314.                         }
  315.                     }

  316.                     // It is ok if outputFilterInputData is 'null' here, this is expected behavior.
  317.                     if (outputFilterInputData != null && outputFilterInputData.hasRemaining()) {
  318.                         filteredOutgoingBuffer = outputFilterInputData;
  319.                     } else {
  320.                         filteredOutgoingBuffer = null;
  321.                     }

  322.                     // If the filters did eventually not produce any output data but if there is
  323.                     // pending output data then we have a pending write request after read.
  324.                     if (filteredOutgoingBuffer == null && newPendingOutputFilterData) {
  325.                         pendingWriteInterestAfterRead = true;
  326.                     }

  327.                     if (filteredOutgoingBuffer != null && isLastPartOfElement) {
  328.                         bufferToElementMap.put(filteredOutgoingBuffer, new ArrayList<>(currentlyOutgoingElements));
  329.                         currentlyOutgoingElements.clear();
  330.                     }

  331.                     // Reset that the destination address has changed.
  332.                     if (destinationAddressChanged) {
  333.                         destinationAddressChanged = false;
  334.                     }
  335.                 } else if (outgoingCharSequenceIterator != null) {
  336.                     CharSequence nextCharSequence = outgoingCharSequenceIterator.next();
  337.                     outgoingBuffer = UTF8.encode(nextCharSequence);
  338.                     if (!outgoingCharSequenceIterator.hasNext()) {
  339.                         outgoingCharSequenceIterator = null;
  340.                         isLastPartOfElement = true;
  341.                     } else {
  342.                         isLastPartOfElement = false;
  343.                     }

  344.                     final SmackDebugger debugger = connectionInternal.smackDebugger;
  345.                     if (debugger != null) {
  346.                         if (outgoingStreamForDebugger == null) {
  347.                             outgoingStreamForDebugger = new StringBuilder();
  348.                         }
  349.                         outgoingStreamForDebugger.append(nextCharSequence);

  350.                         if (isLastPartOfElement) {
  351.                             try {
  352.                                 outputDebugSplitter.append(outgoingStreamForDebugger);
  353.                             } catch (IOException e) {
  354.                                 throw new AssertionError(e);
  355.                             }
  356.                             debugger.onOutgoingElementCompleted();
  357.                             outgoingStreamForDebugger = null;
  358.                         }
  359.                     }
  360.                 } else if (!connectionInternal.outgoingElementsQueue.isEmpty()) {
  361.                     currentlyOutgonigTopLevelStreamElement = connectionInternal.outgoingElementsQueue.poll();
  362.                     if (currentlyOutgonigTopLevelStreamElement instanceof Stanza) {
  363.                         Stanza currentlyOutgoingStanza = (Stanza) currentlyOutgonigTopLevelStreamElement;
  364.                         Jid currentDestinationAddress = currentlyOutgoingStanza.getTo();
  365.                         destinationAddressChanged = !JidUtil.equals(lastDestinationAddress, currentDestinationAddress);
  366.                         lastDestinationAddress = currentDestinationAddress;
  367.                     }
  368.                     CharSequence nextCharSequence = currentlyOutgonigTopLevelStreamElement.toXML(StreamOpen.CLIENT_NAMESPACE);
  369.                     if (nextCharSequence instanceof XmlStringBuilder) {
  370.                         XmlStringBuilder xmlStringBuilder = (XmlStringBuilder) nextCharSequence;
  371.                         XmlEnvironment outgoingStreamXmlEnvironment = connectionInternal.getOutgoingStreamXmlEnvironment();
  372.                         outgoingCharSequenceIterator = xmlStringBuilder.toList(outgoingStreamXmlEnvironment).iterator();
  373.                     } else {
  374.                         outgoingCharSequenceIterator = Collections.singletonList(nextCharSequence).iterator();
  375.                     }
  376.                     assert outgoingCharSequenceIterator != null;
  377.                 } else {
  378.                     // There is nothing more to write.
  379.                     break;
  380.                 }
  381.             }

  382.             pendingOutputFilterData = newPendingOutputFilterData;
  383.             if (!pendingWriteInterestAfterRead && pendingOutputFilterData) {
  384.                 newInterestedOps |= SelectionKey.OP_WRITE;
  385.             }

  386.             readLoop: while (true) {
  387.                 // Prevent one callback from dominating the reactor thread. Break out of the read-loop if we have
  388.                 // read a certain amount.
  389.                 if (callbackBytesRead > CALLBACK_MAX_BYTES_READ) {
  390.                     callbackPreemtBecauseBytesRead++;
  391.                     break;
  392.                 }

  393.                 int bytesRead;
  394.                 incomingBuffer.clear();
  395.                 try {
  396.                     bytesRead = selectedSocketChannel.read(incomingBuffer);
  397.                 } catch (IOException e) {
  398.                     handleReadWriteIoException(e);
  399.                     return;
  400.                 }

  401.                 if (bytesRead < 0) {
  402.                     LOGGER.finer("NIO read() returned " + bytesRead
  403.                             + " for " + this + ". This probably means that the TCP connection was terminated.");
  404.                     // According to the socket channel javadoc section about "asynchronous reads" a socket channel's
  405.                     // read() may return -1 if the input side of a socket is shut down.
  406.                      // Note that we do not call notifyConnectionError() here because the connection may be
  407.                     // cleanly shutdown which would also cause read() to return '-1. I assume that this socket
  408.                     // will be selected again, on which read() would throw an IOException, which will be caught
  409.                     // and invoke notifyConnectionError() (see a few lines above).
  410.                     /*
  411.                     IOException exception = new IOException("NIO read() returned " + bytesRead);
  412.                     notifyConnectionError(exception);
  413.                     */
  414.                     return;
  415.                 }

  416.                 if (!pendingInputFilterData) {
  417.                     if (bytesRead == 0) {
  418.                         // Nothing more to read.
  419.                         break;
  420.                     }
  421.                 } else {
  422.                     pendingInputFilterData = false;
  423.                 }

  424.                 if (pendingWriteInterestAfterRead) {
  425.                     // We have successfully read something and someone announced a write interest after a read. It is
  426.                     // now possible that a filter is now also able to write additional data (for example SSLEngine).
  427.                     pendingWriteInterestAfterRead = false;
  428.                     newInterestedOps |= SelectionKey.OP_WRITE;
  429.                 }

  430.                 callbackBytesRead += bytesRead;

  431.                 ByteBuffer filteredIncomingBuffer = incomingBuffer;
  432.                 for (ListIterator<XmppInputOutputFilter> it = connectionInternal.getXmppInputOutputFilterEndIterator(); it.hasPrevious();) {
  433.                     filteredIncomingBuffer.flip();

  434.                     ByteBuffer newFilteredIncomingBuffer;
  435.                     try {
  436.                         newFilteredIncomingBuffer = it.previous().input(filteredIncomingBuffer);
  437.                     } catch (IOException e) {
  438.                         connectionInternal.notifyConnectionError(e);
  439.                         return;
  440.                     }
  441.                     if (newFilteredIncomingBuffer == null) {
  442.                         break readLoop;
  443.                     }
  444.                     filteredIncomingBuffer = newFilteredIncomingBuffer;
  445.                 }

  446.                 final int bytesReadAfterFilter = filteredIncomingBuffer.flip().remaining();

  447.                 totalBytesReadAfterFilter += bytesReadAfterFilter;

  448.                 try {
  449.                     splitter.write(filteredIncomingBuffer);
  450.                 } catch (IOException e) {
  451.                     connectionInternal.notifyConnectionError(e);
  452.                     return;
  453.                 }
  454.             }
  455.         } finally {
  456.             totalBytesWritten += callbackBytesWritten;
  457.             totalBytesRead += callbackBytesRead;

  458.             channelSelectedCallbackLock.unlock();
  459.         }

  460.         // Indicate that there is no reactor thread racing towards handling this selection key.
  461.         final SelectionKeyAttachment selectionKeyAttachment = this.selectionKeyAttachment;
  462.         if (selectionKeyAttachment != null) {
  463.             selectionKeyAttachment.resetReactorThreadRacing();
  464.         }

  465.         // Check the queue again to prevent lost wakeups caused by elements inserted before we
  466.         // called resetReactorThreadRacing() a few lines above.
  467.         if (!connectionInternal.outgoingElementsQueue.isEmpty()) {
  468.             setWriteInterestAfterChannelSelectedCallback.incrementAndGet();
  469.             newInterestedOps |= SelectionKey.OP_WRITE;
  470.         }

  471.         connectionInternal.setInterestOps(selectionKey, newInterestedOps);
  472.     }

  473.     private void handleReadWriteIoException(IOException e) {
  474.         if (e instanceof ClosedChannelException && !tcpNioTransport.isConnected()) {
  475.             // The connection is already closed.
  476.             return;
  477.         }

  478.        connectionInternal.notifyConnectionError(e);
  479.     }

  480.     /**
  481.      * This is the interface between the "lookup remote connection endpoints" state and the "establish TCP connection"
  482.      * state. The field is indirectly populated by {@link XmppTcpNioTransport#lookupConnectionEndpoints()} and consumed
  483.      * by {@link ConnectionAttemptState}.
  484.      */
  485.     DiscoveredTcpEndpoints discoveredTcpEndpoints;

  486.     final class XmppTcpNioTransport extends XmppClientToServerTransport {

  487.         XmppTcpNioTransport(ModularXmppClientToServerConnectionInternal connectionInternal) {
  488.             super(connectionInternal);
  489.         }

  490.         @Override
  491.         public StreamOpenAndCloseFactory getStreamOpenAndCloseFactory() {
  492.             return new StreamOpenAndCloseFactory() {
  493.                 @Override
  494.                 public StreamOpen createStreamOpen(DomainBareJid to, CharSequence from, String id, String lang) {
  495.                     String xmlLang = connectionInternal.connection.getConfiguration().getXmlLang();
  496.                     StreamOpen streamOpen = new StreamOpen(to, from, id, xmlLang, StreamOpen.StreamContentNamespace.client);
  497.                     return streamOpen;
  498.                 }
  499.                 @Override
  500.                 public StreamClose createStreamClose() {
  501.                     return StreamClose.INSTANCE;
  502.                 }
  503.             };
  504.         }

  505.         @Override
  506.         protected void resetDiscoveredConnectionEndpoints() {
  507.             discoveredTcpEndpoints = null;
  508.         }

  509.         @Override
  510.         public boolean hasUseableConnectionEndpoints() {
  511.             return discoveredTcpEndpoints != null;
  512.         }

  513.         @Override
  514.         protected List<SmackFuture<LookupConnectionEndpointsResult, Exception>> lookupConnectionEndpoints() {
  515.             // Assert that there are no stale discovered endpoints prior performing the lookup.
  516.             assert discoveredTcpEndpoints == null;

  517.             List<SmackFuture<LookupConnectionEndpointsResult, Exception>> futures = new ArrayList<>(2);

  518.             InternalSmackFuture<LookupConnectionEndpointsResult, Exception> tcpEndpointsLookupFuture = new InternalSmackFuture<>();
  519.             connectionInternal.asyncGo(() -> {
  520.                 Result<Rfc6120TcpRemoteConnectionEndpoint> result = RemoteXmppTcpConnectionEndpoints.lookup(
  521.                                 connectionInternal.connection.getConfiguration());

  522.                 LookupConnectionEndpointsResult endpointsResult;
  523.                 if (result.discoveredRemoteConnectionEndpoints.isEmpty()) {
  524.                     endpointsResult = new TcpEndpointDiscoveryFailed(result);
  525.                 } else {
  526.                     endpointsResult = new DiscoveredTcpEndpoints(result);
  527.                 }
  528.                 tcpEndpointsLookupFuture.setResult(endpointsResult);
  529.             });
  530.             futures.add(tcpEndpointsLookupFuture);

  531.             if (moduleDescriptor.isDirectTlsEnabled()) {
  532.                 // TODO: Implement this.
  533.                 throw new IllegalArgumentException("DirectTLS is not implemented yet");
  534.             }

  535.             return futures;
  536.         }

  537.         @Override
  538.         protected void loadConnectionEndpoints(LookupConnectionEndpointsSuccess lookupConnectionEndpointsSuccess) {
  539.             // The API contract stats that we will be given the instance we handed out with lookupConnectionEndpoints,
  540.             // which must be of type DiscoveredTcpEndpoints here. Hence, if we can not cast it, then there is an internal
  541.             // Smack error.
  542.             discoveredTcpEndpoints = (DiscoveredTcpEndpoints) lookupConnectionEndpointsSuccess;
  543.         }

  544.         @Override
  545.         protected void afterFiltersClosed() {
  546.             pendingInputFilterData = pendingOutputFilterData = true;
  547.             afterOutgoingElementsQueueModified();
  548.         }

  549.         @Override
  550.         protected void disconnect() {
  551.             XmppTcpTransportModule.this.closeSocketAndCleanup();
  552.         }

  553.         @Override
  554.         protected void notifyAboutNewOutgoingElements() {
  555.             afterOutgoingElementsQueueModified();
  556.         }

  557.         @Override
  558.         public SSLSession getSslSession() {
  559.             TlsState tlsState = XmppTcpTransportModule.this.tlsState;
  560.             if (tlsState == null) {
  561.                 return null;
  562.             }

  563.             return tlsState.engine.getSession();
  564.         }

  565.         public boolean isConnected() {
  566.             SocketChannel socketChannel = XmppTcpTransportModule.this.socketChannel;
  567.             if (socketChannel == null) {
  568.                 return false;
  569.             }

  570.             return socketChannel.isConnected();
  571.         }

  572.         @Override
  573.         public boolean isTransportSecured() {
  574.             final TlsState tlsState = XmppTcpTransportModule.this.tlsState;
  575.             return tlsState != null && tlsState.handshakeStatus == TlsHandshakeStatus.successful;
  576.         }

  577.         @Override
  578.         public XmppTcpTransportModule.Stats getStats() {
  579.             return XmppTcpTransportModule.this.getStats();
  580.         }

  581.         final class DiscoveredTcpEndpoints implements LookupConnectionEndpointsSuccess {
  582.             final RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result;
  583.             DiscoveredTcpEndpoints(RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result) {
  584.                 this.result = result;
  585.             }
  586.         }

  587.         final class TcpEndpointDiscoveryFailed implements LookupConnectionEndpointsFailed {
  588.             final List<RemoteConnectionEndpointLookupFailure> lookupFailures;
  589.             TcpEndpointDiscoveryFailed(RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result) {
  590.                 lookupFailures = result.lookupFailures;
  591.             }
  592.         }
  593.     }

  594.     private void afterOutgoingElementsQueueModified() {
  595.         final SelectionKeyAttachment selectionKeyAttachment = this.selectionKeyAttachment;
  596.         if (selectionKeyAttachment != null && selectionKeyAttachment.isReactorThreadRacing()) {
  597.             // A reactor thread is already racing to the channel selected callback and will take care of this.
  598.             reactorThreadAlreadyRacing.incrementAndGet();
  599.             return;
  600.         }

  601.         afterOutgoingElementsQueueModifiedSetInterestOps.incrementAndGet();

  602.         // Add OP_WRITE to the interested Ops, since we have now new things to write. Note that this may cause
  603.         // multiple reactor threads to race to the channel selected callback in case we perform this right after
  604.         // a select() returned with this selection key in the selected-key set. Hence, we use tryLock() in the
  605.         // channel selected callback to keep the invariant that only exactly one thread is performing the
  606.         // callback.
  607.         // Note that we need to perform setInterestedOps() *without* holding the channelSelectedCallbackLock, as
  608.         // otherwise the reactor thread racing to the channel selected callback may found the lock still locked, which
  609.         // would result in the outgoingElementsQueue not being handled.
  610.         connectionInternal.setInterestOps(selectionKey, SelectionKey.OP_WRITE | SelectionKey.OP_READ);
  611.     }

  612.     @Override
  613.     protected XmppTcpNioTransport getTransport() {
  614.         return tcpNioTransport;
  615.     }

  616.     static final class EstablishingTcpConnectionStateDescriptor extends StateDescriptor {
  617.         private EstablishingTcpConnectionStateDescriptor() {
  618.             super(XmppTcpTransportModule.EstablishingTcpConnectionState.class);
  619.             addPredeccessor(LookupRemoteConnectionEndpointsStateDescriptor.class);
  620.             addSuccessor(EstablishTlsStateDescriptor.class);
  621.             addSuccessor(ConnectedButUnauthenticatedStateDescriptor.class);
  622.         }

  623.         @Override
  624.         protected XmppTcpTransportModule.EstablishingTcpConnectionState constructState(ModularXmppClientToServerConnectionInternal connectionInternal) {
  625.             XmppTcpTransportModule tcpTransportModule = connectionInternal.connection.getConnectionModuleFor(XmppTcpTransportModuleDescriptor.class);
  626.             return tcpTransportModule.constructEstablishingTcpConnectionState(this, connectionInternal);
  627.         }
  628.     }

  629.     private EstablishingTcpConnectionState constructEstablishingTcpConnectionState(
  630.                     EstablishingTcpConnectionStateDescriptor stateDescriptor,
  631.                     ModularXmppClientToServerConnectionInternal connectionInternal) {
  632.         return new EstablishingTcpConnectionState(stateDescriptor, connectionInternal);
  633.     }

  634.     final class EstablishingTcpConnectionState extends State.AbstractTransport {
  635.         private EstablishingTcpConnectionState(EstablishingTcpConnectionStateDescriptor stateDescriptor,
  636.                         ModularXmppClientToServerConnectionInternal connectionInternal) {
  637.             super(tcpNioTransport, stateDescriptor, connectionInternal);
  638.         }

  639.         @Override
  640.         public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext)
  641.                         throws InterruptedException, IOException, SmackException, XMPPException {
  642.             // The fields inetSocketAddress and failedAddresses are handed over from LookupHostAddresses to
  643.             // ConnectingToHost.
  644.             ConnectionAttemptState connectionAttemptState = new ConnectionAttemptState(connectionInternal, discoveredTcpEndpoints,
  645.                     this);
  646.             StateTransitionResult.Failure failure = connectionAttemptState.establishTcpConnection();
  647.             if (failure != null) {
  648.                 return failure;
  649.             }

  650.             socketChannel = connectionAttemptState.socketChannel;
  651.             remoteAddress = (InetSocketAddress) socketChannel.socket().getRemoteSocketAddress();

  652.             selectionKey = connectionInternal.registerWithSelector(socketChannel, SelectionKey.OP_READ,
  653.                             XmppTcpTransportModule.this::onChannelSelected);
  654.             selectionKeyAttachment = (SelectionKeyAttachment) selectionKey.attachment();

  655.             connectionInternal.setTransport(tcpNioTransport);

  656.             // TODO: It appears this should be done in a generic way. I'd assume we always
  657.             // have to wait for stream features after the connection was established. If this is true then consider
  658.             // moving this into State.AbstractTransport. But I am not yet 100% positive that this is the case for every
  659.             // transport. Hence, keep it here for now.
  660.             connectionInternal.newStreamOpenWaitForFeaturesSequence("stream features after initial connection");

  661.             return new TcpSocketConnectedResult(remoteAddress);
  662.         }

  663.         @Override
  664.         public void resetState() {
  665.             closeSocketAndCleanup();
  666.         }
  667.     }

  668.     public static final class TcpSocketConnectedResult extends StateTransitionResult.Success {
  669.         private final InetSocketAddress remoteAddress;

  670.         private TcpSocketConnectedResult(InetSocketAddress remoteAddress) {
  671.             super("TCP connection established to " + remoteAddress);
  672.             this.remoteAddress = remoteAddress;
  673.         }

  674.         public InetSocketAddress getRemoteAddress() {
  675.             return remoteAddress;
  676.         }
  677.     }

  678.     public static final class TlsEstablishedResult extends StateTransitionResult.Success {

  679.         private TlsEstablishedResult(SSLEngine sslEngine) {
  680.             super("TLS established: " + sslEngine.getSession());
  681.         }
  682.     }

  683.     static final class EstablishTlsStateDescriptor extends StateDescriptor {
  684.         private EstablishTlsStateDescriptor() {
  685.             super(XmppTcpTransportModule.EstablishTlsState.class, "RFC 6120 § 5");
  686.             addSuccessor(ConnectedButUnauthenticatedStateDescriptor.class);
  687.             declarePrecedenceOver(ConnectedButUnauthenticatedStateDescriptor.class);
  688.         }

  689.         @Override
  690.         protected EstablishTlsState constructState(ModularXmppClientToServerConnectionInternal connectionInternal) {
  691.             XmppTcpTransportModule tcpTransportModule = connectionInternal.connection.getConnectionModuleFor(XmppTcpTransportModuleDescriptor.class);
  692.             return tcpTransportModule.constructEstablishingTlsState(this, connectionInternal);
  693.         }
  694.     }

  695.     private EstablishTlsState constructEstablishingTlsState(
  696.                     EstablishTlsStateDescriptor stateDescriptor,
  697.                     ModularXmppClientToServerConnectionInternal connectionInternal) {
  698.         return new EstablishTlsState(stateDescriptor, connectionInternal);
  699.     }

  700.     private final class EstablishTlsState extends State {
  701.         private EstablishTlsState(EstablishTlsStateDescriptor stateDescriptor,
  702.                         ModularXmppClientToServerConnectionInternal connectionInternal) {
  703.             super(stateDescriptor, connectionInternal);
  704.         }

  705.         @Override
  706.         public StateTransitionResult.TransitionImpossible isTransitionToPossible(WalkStateGraphContext walkStateGraphContext)
  707.                 throws SecurityRequiredByClientException, SecurityRequiredByServerException {
  708.             StartTls startTlsFeature = connectionInternal.connection.getFeature(StartTls.class);
  709.             SecurityMode securityMode = connectionInternal.connection.getConfiguration().getSecurityMode();

  710.             switch (securityMode) {
  711.             case required:
  712.             case ifpossible:
  713.                 if (startTlsFeature == null) {
  714.                     if (securityMode == SecurityMode.ifpossible) {
  715.                         return new StateTransitionResult.TransitionImpossibleReason("Server does not announce support for TLS and we do not required it");
  716.                     }
  717.                     throw new SecurityRequiredByClientException();
  718.                 }
  719.                 // Allows transition by returning null.
  720.                 return null;
  721.             case disabled:
  722.                 if (startTlsFeature != null && startTlsFeature.required()) {
  723.                     throw new SecurityRequiredByServerException();
  724.                 }
  725.                 return new StateTransitionResult.TransitionImpossibleReason("TLS disabled in client settings and server does not require it");
  726.             default:
  727.                 throw new AssertionError("Unknown security mode: " + securityMode);
  728.             }
  729.         }

  730.         @Override
  731.         public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext)
  732.                         throws IOException, InterruptedException, SmackException, XMPPException {
  733.             connectionInternal.sendAndWaitForResponse(StartTls.INSTANCE, TlsProceed.class, TlsFailure.class);

  734.             SmackTlsContext smackTlsContext = connectionInternal.getSmackTlsContext();

  735.             tlsState = new TlsState(smackTlsContext);
  736.             connectionInternal.addXmppInputOutputFilter(tlsState);

  737.             channelSelectedCallbackLock.lock();
  738.             try {
  739.                 pendingOutputFilterData = true;
  740.                 // The beginHandshake() is possibly not really required here, but it does not hurt either.
  741.                 tlsState.engine.beginHandshake();
  742.                 tlsState.handshakeStatus = TlsHandshakeStatus.initiated;
  743.             } finally {
  744.                 channelSelectedCallbackLock.unlock();
  745.             }
  746.             connectionInternal.setInterestOps(selectionKey, SelectionKey.OP_WRITE | SelectionKey.OP_READ);

  747.             try {
  748.                 tlsState.waitForHandshakeFinished();
  749.             } catch (CertificateException e) {
  750.                 throw new SmackCertificateException(e);
  751.             }

  752.             connectionInternal.newStreamOpenWaitForFeaturesSequence("stream features after TLS established");

  753.             return new TlsEstablishedResult(tlsState.engine);
  754.         }

  755.         @Override
  756.         public void resetState() {
  757.             tlsState = null;
  758.         }
  759.     }

  760.     private enum TlsHandshakeStatus {
  761.         initial,
  762.         initiated,
  763.         successful,
  764.         failed,
  765.     }

  766.     private static final Level SSL_ENGINE_DEBUG_LOG_LEVEL = Level.FINEST;

  767.     private static void debugLogSslEngineResult(String operation, SSLEngineResult result) {
  768.         if (!LOGGER.isLoggable(SSL_ENGINE_DEBUG_LOG_LEVEL)) {
  769.             return;
  770.         }

  771.         LOGGER.log(SSL_ENGINE_DEBUG_LOG_LEVEL, "SSLEngineResult of " + operation + "(): " + result);
  772.     }

  773.     private final class TlsState implements XmppInputOutputFilter {

  774.         private static final int MAX_PENDING_OUTPUT_BYTES = 8096;

  775.         private final SmackTlsContext smackTlsContext;
  776.         private final SSLEngine engine;

  777.         private TlsHandshakeStatus handshakeStatus = TlsHandshakeStatus.initial;
  778.         private SSLException handshakeException;

  779.         private ByteBuffer myNetData;
  780.         private ByteBuffer peerAppData;

  781.         private final List<ByteBuffer> pendingOutputData = new ArrayList<>();
  782.         private int pendingOutputBytes;
  783.         private ByteBuffer pendingInputData;

  784.         private final AtomicInteger pendingDelegatedTasks = new AtomicInteger();

  785.         private long wrapInBytes;
  786.         private long wrapOutBytes;

  787.         private long unwrapInBytes;
  788.         private long unwrapOutBytes;

  789.         private TlsState(SmackTlsContext smackTlsContext) throws IOException {
  790.             this.smackTlsContext = smackTlsContext;

  791.             // Call createSSLEngine()'s variant with two parameters as this allows for TLS session resumption.

  792.             // Note that it is not really clear what the value of peer host should be. It could be A) the XMPP service's
  793.             // domainpart or B) the DNS name of the host we are connecting to (usually the DNS SRV RR target name). While
  794.             // the javadoc of createSSLEngine(String, int) indicates with "Some cipher suites (such as Kerberos) require
  795.             // remote hostname information, in which case peerHost needs to be specified." that A should be used. TLS
  796.             // session resumption may would need or at least benefit from B. Variant A would also be required if the
  797.             // String is used for certificate verification. And it appears at least likely that TLS session resumption
  798.             // would not be hurt by using variant A. Therefore we currently use variant A.
  799.             // TODO: Should we use the ACE representation of the XMPP service domain? Compare with f60e4055ec529f0b8160acedf13275592ab10a4b
  800.             // If yes, then we should probably introduce getXmppServiceDomainAceEncodedIfPossible().
  801.             String peerHost = connectionInternal.connection.getConfiguration().getXMPPServiceDomain().toString();
  802.             engine = smackTlsContext.sslContext.createSSLEngine(peerHost, remoteAddress.getPort());
  803.             engine.setUseClientMode(true);

  804.             SSLSession session = engine.getSession();
  805.             int applicationBufferSize = session.getApplicationBufferSize();
  806.             int packetBufferSize = session.getPacketBufferSize();

  807.             myNetData = ByteBuffer.allocateDirect(packetBufferSize);
  808.             peerAppData = ByteBuffer.allocate(applicationBufferSize);
  809.         }

  810.         @Override
  811.         public OutputResult output(ByteBuffer outputData, boolean isFinalDataOfElement, boolean destinationAddressChanged,
  812.                 boolean moreDataAvailable) throws SSLException {
  813.             if (outputData != null) {
  814.                 pendingOutputData.add(outputData);
  815.                 pendingOutputBytes += outputData.remaining();
  816.                 if (moreDataAvailable && pendingOutputBytes < MAX_PENDING_OUTPUT_BYTES) {
  817.                     return OutputResult.NO_OUTPUT;
  818.                 }
  819.             }

  820.             ByteBuffer[] outputDataArray = pendingOutputData.toArray(new ByteBuffer[pendingOutputData.size()]);

  821.             myNetData.clear();

  822.             while (true) {
  823.                 SSLEngineResult result;
  824.                 try {
  825.                     result = engine.wrap(outputDataArray, myNetData);
  826.                 } catch (SSLException e) {
  827.                     handleSslException(e);
  828.                     throw e;
  829.                 }

  830.                 debugLogSslEngineResult("wrap", result);

  831.                 SSLEngineResult.Status engineResultStatus = result.getStatus();

  832.                 pendingOutputBytes -= result.bytesConsumed();

  833.                 if (engineResultStatus == SSLEngineResult.Status.OK) {
  834.                     wrapInBytes += result.bytesConsumed();
  835.                     wrapOutBytes += result.bytesProduced();

  836.                     SSLEngineResult.HandshakeStatus handshakeStatus = handleHandshakeStatus(result);
  837.                     switch (handshakeStatus) {
  838.                         case NEED_UNWRAP:
  839.                             // NEED_UNWRAP means that we need to receive something in order to continue the handshake. The
  840.                             // standard channelSelectedCallback logic will take care of this, as there is eventually always
  841.                             // a interest to read from the socket.
  842.                             break;
  843.                         case NEED_WRAP:
  844.                             // Same as need task: Cycle the reactor.
  845.                         case NEED_TASK:
  846.                             // Note that we also set pendingOutputFilterData in the OutputResult in the NEED_TASK case, as
  847.                             // we also want to retry the wrap() operation above in this case.
  848.                             return new OutputResult(true, myNetData);
  849.                         default:
  850.                             break;
  851.                     }
  852.                 }

  853.                 switch (engineResultStatus) {
  854.                 case OK:
  855.                     // No need to outputData.compact() here, since we do not reuse the buffer.
  856.                     // Clean up the pending output data.
  857.                     pruneBufferList(pendingOutputData);
  858.                     return new OutputResult(!pendingOutputData.isEmpty(), myNetData);
  859.                 case CLOSED:
  860.                     pendingOutputData.clear();
  861.                     return OutputResult.NO_OUTPUT;
  862.                 case BUFFER_OVERFLOW:
  863.                     LOGGER.warning("SSLEngine status BUFFER_OVERFLOW, this is hopefully uncommon");
  864.                     int outputDataRemaining = outputData != null ? outputData.remaining() : 0;
  865.                     int newCapacity = (int) (1.3 * outputDataRemaining);
  866.                     // If newCapacity would not increase myNetData, then double it.
  867.                     if (newCapacity <= myNetData.capacity()) {
  868.                         newCapacity = 2 * myNetData.capacity();
  869.                     }
  870.                     ByteBuffer newMyNetData = ByteBuffer.allocateDirect(newCapacity);
  871.                     myNetData.flip();
  872.                     newMyNetData.put(myNetData);
  873.                     myNetData = newMyNetData;
  874.                     continue;
  875.                 case BUFFER_UNDERFLOW:
  876.                     throw new IllegalStateException(
  877.                             "Buffer underflow as result of SSLEngine.wrap() should never happen");
  878.                 }
  879.             }
  880.         }

  881.         @SuppressWarnings("ReferenceEquality")
  882.         @Override
  883.         public ByteBuffer input(ByteBuffer inputData) throws SSLException {
  884.             ByteBuffer accumulatedData;
  885.             if (pendingInputData == null) {
  886.                 accumulatedData = inputData;
  887.             } else {
  888.                 assert pendingInputData != inputData;

  889.                 int accumulatedDataBytes = pendingInputData.remaining() + inputData.remaining();
  890.                 accumulatedData = ByteBuffer.allocate(accumulatedDataBytes);
  891.                 accumulatedData.put(pendingInputData)
  892.                                .put(inputData)
  893.                                .flip();
  894.                 pendingInputData = null;
  895.             }

  896.             peerAppData.clear();

  897.             while (true) {
  898.                 SSLEngineResult result;
  899.                 try {
  900.                     result = engine.unwrap(accumulatedData, peerAppData);
  901.                 } catch (SSLException e) {
  902.                     handleSslException(e);
  903.                     throw e;
  904.                 }

  905.                 debugLogSslEngineResult("unwrap", result);

  906.                 SSLEngineResult.Status engineResultStatus = result.getStatus();

  907.                 if (engineResultStatus == SSLEngineResult.Status.OK) {
  908.                     unwrapInBytes += result.bytesConsumed();
  909.                     unwrapOutBytes += result.bytesProduced();

  910.                     SSLEngineResult.HandshakeStatus handshakeStatus = handleHandshakeStatus(result);
  911.                     switch (handshakeStatus) {
  912.                     case NEED_TASK:
  913.                         // A delegated task is asynchronously running. Take care of the remaining accumulatedData.
  914.                         addAsPendingInputData(accumulatedData);
  915.                         // Return here, as the async task created by handleHandshakeStatus will continue calling the
  916.                         // channelSelectedCallback.
  917.                         return null;
  918.                     case NEED_UNWRAP:
  919.                         continue;
  920.                     case NEED_WRAP:
  921.                         // NEED_WRAP means that the SSLEngine needs to send data, probably without consuming data.
  922.                         // We exploit here the fact that the channelSelectedCallback is single threaded and that the
  923.                         // input processing is after the output processing.
  924.                         addAsPendingInputData(accumulatedData);
  925.                         // Note that it is ok that we the provided argument for pending input filter data to channel
  926.                         // selected callback is false, as setPendingInputFilterData() will have set the internal state
  927.                         // boolean accordingly.
  928.                         connectionInternal.asyncGo(() -> callChannelSelectedCallback(false, true));
  929.                         // Do not break here, but instead return and let the asynchronously invoked
  930.                         // callChannelSelectedCallback() do its work.
  931.                         return null;
  932.                     default:
  933.                         break;
  934.                     }
  935.                 }

  936.                 switch (engineResultStatus) {
  937.                 case OK:
  938.                     // SSLEngine's unwrap() may not consume all bytes from the source buffer. If this is the case, then
  939.                     // simply perform another unwrap until accumulatedData has no remaining bytes.
  940.                     if (accumulatedData.hasRemaining()) {
  941.                         continue;
  942.                     }
  943.                     return peerAppData;
  944.                 case CLOSED:
  945.                     return null;
  946.                 case BUFFER_UNDERFLOW:
  947.                     // There were not enough source bytes available to make a complete packet. Let it in
  948.                     // pendingInputData. Note that we do not resize SSLEngine's source buffer - inputData in our case -
  949.                     // as it is not possible.
  950.                     addAsPendingInputData(accumulatedData);
  951.                     return null;
  952.                 case BUFFER_OVERFLOW:
  953.                     int applicationBufferSize = engine.getSession().getApplicationBufferSize();
  954.                     assert peerAppData.remaining() < applicationBufferSize;
  955.                     peerAppData = ByteBuffer.allocate(applicationBufferSize);
  956.                     continue;
  957.                 }
  958.             }
  959.         }

  960.         private void addAsPendingInputData(ByteBuffer byteBuffer) {
  961.             // Note that we can not simply write
  962.             // pendingInputData = byteBuffer;
  963.             // we have to copy the provided byte buffer, because it is possible that this byteBuffer is re-used by some
  964.             // higher layer. That is, here 'byteBuffer' is typically 'incomingBuffer', which is a direct buffer only
  965.             // allocated once per connection for performance reasons and hence re-used for read() calls.
  966.             pendingInputData = ByteBuffer.allocate(byteBuffer.remaining());
  967.             pendingInputData.put(byteBuffer).flip();

  968.             pendingInputFilterData = pendingInputData.hasRemaining();
  969.         }

  970.         private SSLEngineResult.HandshakeStatus handleHandshakeStatus(SSLEngineResult sslEngineResult) {
  971.             SSLEngineResult.HandshakeStatus handshakeStatus = sslEngineResult.getHandshakeStatus();
  972.             switch (handshakeStatus) {
  973.             case NEED_TASK:
  974.                 while (true) {
  975.                     final Runnable delegatedTask = engine.getDelegatedTask();
  976.                     if (delegatedTask == null) {
  977.                         break;
  978.                     }
  979.                     sslEngineDelegatedTasks++;
  980.                     int currentPendingDelegatedTasks = pendingDelegatedTasks.incrementAndGet();
  981.                     if (currentPendingDelegatedTasks > maxPendingSslEngineDelegatedTasks) {
  982.                         maxPendingSslEngineDelegatedTasks = currentPendingDelegatedTasks;
  983.                     }

  984.                     Runnable wrappedDelegatedTask = () -> {
  985.                         delegatedTask.run();
  986.                         int wrappedCurrentPendingDelegatedTasks = pendingDelegatedTasks.decrementAndGet();
  987.                         if (wrappedCurrentPendingDelegatedTasks == 0) {
  988.                             callChannelSelectedCallback(true, true);
  989.                         }
  990.                     };
  991.                     connectionInternal.asyncGo(wrappedDelegatedTask);
  992.                 }
  993.                 break;
  994.             case FINISHED:
  995.                 onHandshakeFinished();
  996.                 break;
  997.             default:
  998.                 break;
  999.             }

  1000.             SSLEngineResult.HandshakeStatus afterHandshakeStatus = engine.getHandshakeStatus();
  1001.             return afterHandshakeStatus;
  1002.         }

  1003.         private void handleSslException(SSLException e) {
  1004.             handshakeException = e;
  1005.             handshakeStatus = TlsHandshakeStatus.failed;
  1006.             connectionInternal.notifyWaitingThreads();
  1007.         }

  1008.         private void onHandshakeFinished() {
  1009.             handshakeStatus = TlsHandshakeStatus.successful;
  1010.             connectionInternal.notifyWaitingThreads();
  1011.         }

  1012.         private boolean isHandshakeFinished() {
  1013.             return handshakeStatus == TlsHandshakeStatus.successful || handshakeStatus == TlsHandshakeStatus.failed;
  1014.         }

  1015.         private void waitForHandshakeFinished() throws InterruptedException, CertificateException, SSLException, SmackException, XMPPException {
  1016.             connectionInternal.waitForConditionOrThrowConnectionException(() -> isHandshakeFinished(), "TLS handshake to finish");

  1017.             if (handshakeStatus == TlsHandshakeStatus.failed) {
  1018.                 throw handshakeException;
  1019.             }

  1020.             assert handshakeStatus == TlsHandshakeStatus.successful;

  1021.             if (smackTlsContext.daneVerifier != null) {
  1022.                 smackTlsContext.daneVerifier.finish(engine.getSession());
  1023.             }
  1024.         }

  1025.         @Override
  1026.         public Object getStats() {
  1027.             return new TlsStateStats(this);
  1028.         }

  1029.         @Override
  1030.         public void closeInputOutput() {
  1031.             engine.closeOutbound();
  1032.             try {
  1033.                 engine.closeInbound();
  1034.             } catch (SSLException e) {
  1035.                 LOGGER.log(Level.FINEST,
  1036.                         "SSLException when closing inbound TLS session. This can likely be ignored if a possible truncation attack is suggested."
  1037.                         + " You may want to ask your XMPP server vendor to implement a clean TLS session shutdown sending close_notify after </stream>",
  1038.                         e);
  1039.             }
  1040.         }

  1041.         @Override
  1042.         public void waitUntilInputOutputClosed() throws IOException, CertificateException, InterruptedException,
  1043.                 SmackException, XMPPException {
  1044.             waitForHandshakeFinished();
  1045.         }

  1046.         @Override
  1047.         public String getFilterName() {
  1048.             return "TLS (" + engine + ')';
  1049.         }
  1050.     }

  1051.     public static final class TlsStateStats {
  1052.         public final long wrapInBytes;
  1053.         public final long wrapOutBytes;
  1054.         public final double wrapRatio;

  1055.         public final long unwrapInBytes;
  1056.         public final long unwrapOutBytes;
  1057.         public final double unwrapRatio;

  1058.         private TlsStateStats(TlsState tlsState) {
  1059.             wrapOutBytes = tlsState.wrapOutBytes;
  1060.             wrapInBytes = tlsState.wrapInBytes;
  1061.             wrapRatio = (double) wrapOutBytes / wrapInBytes;

  1062.             unwrapOutBytes = tlsState.unwrapOutBytes;
  1063.             unwrapInBytes = tlsState.unwrapInBytes;
  1064.             unwrapRatio = (double) unwrapInBytes / unwrapOutBytes;
  1065.         }

  1066.         private transient String toStringCache;

  1067.         @Override
  1068.         public String toString() {
  1069.             if (toStringCache != null) {
  1070.                 return toStringCache;
  1071.             }

  1072.             toStringCache =
  1073.                       "wrap-in-bytes: " + wrapInBytes + '\n'
  1074.                     + "wrap-out-bytes: " + wrapOutBytes + '\n'
  1075.                     + "wrap-ratio: " + wrapRatio + '\n'
  1076.                     + "unwrap-in-bytes: " + unwrapInBytes + '\n'
  1077.                     + "unwrap-out-bytes: " + unwrapOutBytes + '\n'
  1078.                     + "unwrap-ratio: " + unwrapRatio + '\n'
  1079.                     ;

  1080.             return toStringCache;
  1081.         }
  1082.     }

  1083.     private void callChannelSelectedCallback(boolean setPendingInputFilterData, boolean setPendingOutputFilterData) {
  1084.         final SocketChannel channel = socketChannel;
  1085.         final SelectionKey key = selectionKey;
  1086.         if (channel == null || key == null) {
  1087.             LOGGER.info("Not calling channel selected callback because the connection was eventually disconnected");
  1088.             return;
  1089.         }

  1090.         channelSelectedCallbackLock.lock();
  1091.         try {
  1092.             // Note that it is important that we send the pending(Input|Output)FilterData flags while holding the lock.
  1093.             if (setPendingInputFilterData) {
  1094.                 pendingInputFilterData = true;
  1095.             }
  1096.             if (setPendingOutputFilterData) {
  1097.                 pendingOutputFilterData = true;
  1098.             }

  1099.             onChannelSelected(channel, key);
  1100.         } finally {
  1101.             channelSelectedCallbackLock.unlock();
  1102.         }
  1103.     }

  1104.     private void closeSocketAndCleanup() {
  1105.         final SelectionKey selectionKey = this.selectionKey;
  1106.         if (selectionKey != null) {
  1107.             selectionKey.cancel();
  1108.         }
  1109.         final SocketChannel socketChannel = this.socketChannel;
  1110.         if (socketChannel != null) {
  1111.             try {
  1112.                 socketChannel.close();
  1113.             } catch (IOException e) {
  1114.                 LOGGER.log(Level.FINE, "Closing the socket channel failed", e);
  1115.             }
  1116.         }

  1117.         this.selectionKey = null;
  1118.         this.socketChannel = null;

  1119.         selectionKeyAttachment = null;
  1120.         remoteAddress = null;
  1121.     }

  1122.     private static List<? extends Buffer> pruneBufferList(Collection<? extends Buffer> buffers) {
  1123.         return CollectionUtil.removeUntil(buffers, b -> b.hasRemaining());
  1124.     }

  1125.     public XmppTcpTransportModule.Stats getStats() {
  1126.         return new Stats(this);
  1127.     }

  1128.     public static final class Stats extends XmppClientToServerTransport.Stats {
  1129.         public final long totalBytesWritten;
  1130.         public final long totalBytesWrittenBeforeFilter;
  1131.         public final double writeRatio;

  1132.         public final long totalBytesRead;
  1133.         public final long totalBytesReadAfterFilter;
  1134.         public final double readRatio;

  1135.         public final long handledChannelSelectedCallbacks;
  1136.         public final long setWriteInterestAfterChannelSelectedCallback;
  1137.         public final long reactorThreadAlreadyRacing;
  1138.         public final long afterOutgoingElementsQueueModifiedSetInterestOps;
  1139.         public final long rejectedChannelSelectedCallbacks;
  1140.         public final long totalCallbackRequests;
  1141.         public final long callbackPreemtBecauseBytesWritten;
  1142.         public final long callbackPreemtBecauseBytesRead;
  1143.         public final int sslEngineDelegatedTasks;
  1144.         public final int maxPendingSslEngineDelegatedTasks;

  1145.         private Stats(XmppTcpTransportModule connection) {
  1146.             totalBytesWritten = connection.totalBytesWritten;
  1147.             totalBytesWrittenBeforeFilter = connection.totalBytesWrittenBeforeFilter;
  1148.             writeRatio = (double) totalBytesWritten / totalBytesWrittenBeforeFilter;

  1149.             totalBytesReadAfterFilter = connection.totalBytesReadAfterFilter;
  1150.             totalBytesRead = connection.totalBytesRead;
  1151.             readRatio = (double) totalBytesRead / totalBytesReadAfterFilter;

  1152.             handledChannelSelectedCallbacks = connection.handledChannelSelectedCallbacks;
  1153.             setWriteInterestAfterChannelSelectedCallback = connection.setWriteInterestAfterChannelSelectedCallback.get();
  1154.             reactorThreadAlreadyRacing = connection.reactorThreadAlreadyRacing.get();
  1155.             afterOutgoingElementsQueueModifiedSetInterestOps = connection.afterOutgoingElementsQueueModifiedSetInterestOps
  1156.                     .get();
  1157.             rejectedChannelSelectedCallbacks = connection.rejectedChannelSelectedCallbacks.get();

  1158.             totalCallbackRequests = handledChannelSelectedCallbacks + rejectedChannelSelectedCallbacks;

  1159.             callbackPreemtBecauseBytesRead = connection.callbackPreemtBecauseBytesRead;
  1160.             callbackPreemtBecauseBytesWritten = connection.callbackPreemtBecauseBytesWritten;

  1161.             sslEngineDelegatedTasks = connection.sslEngineDelegatedTasks;
  1162.             maxPendingSslEngineDelegatedTasks = connection.maxPendingSslEngineDelegatedTasks;
  1163.         }

  1164.         private transient String toStringCache;

  1165.         @Override
  1166.         public String toString() {
  1167.             if (toStringCache != null) {
  1168.                 return toStringCache;
  1169.             }

  1170.             toStringCache =
  1171.               "Total bytes\n"
  1172.             + "recv: " + totalBytesRead + '\n'
  1173.             + "send: " + totalBytesWritten + '\n'
  1174.             + "recv-aft-filter: " + totalBytesReadAfterFilter + '\n'
  1175.             + "send-bef-filter: " + totalBytesWrittenBeforeFilter + '\n'
  1176.             + "read-ratio: " + readRatio + '\n'
  1177.             + "write-ratio: " + writeRatio + '\n'
  1178.             + "Events\n"
  1179.             + "total-callback-requests: " + totalCallbackRequests + '\n'
  1180.             + "handled-channel-selected-callbacks: " + handledChannelSelectedCallbacks + '\n'
  1181.             + "rejected-channel-selected-callbacks: " + rejectedChannelSelectedCallbacks + '\n'
  1182.             + "set-write-interest-after-callback: " + setWriteInterestAfterChannelSelectedCallback + '\n'
  1183.             + "reactor-thread-already-racing: " + reactorThreadAlreadyRacing + '\n'
  1184.             + "after-queue-modified-set-interest-ops: " + afterOutgoingElementsQueueModifiedSetInterestOps + '\n'
  1185.             + "callback-preemt-because-bytes-read: " + callbackPreemtBecauseBytesRead + '\n'
  1186.             + "callback-preemt-because-bytes-written: " + callbackPreemtBecauseBytesWritten + '\n'
  1187.             + "ssl-engine-delegated-tasks: " + sslEngineDelegatedTasks + '\n'
  1188.             + "max-pending-ssl-engine-delegated-tasks: " + maxPendingSslEngineDelegatedTasks + '\n'
  1189.             ;

  1190.             return toStringCache;
  1191.         }
  1192.     }
  1193. }