001/**
002 *
003 * Copyright 2019-2024 Florian Schmaus
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack.tcp;
018
019import java.io.IOException;
020import java.net.InetSocketAddress;
021import java.nio.Buffer;
022import java.nio.ByteBuffer;
023import java.nio.channels.ClosedChannelException;
024import java.nio.channels.SelectableChannel;
025import java.nio.channels.SelectionKey;
026import java.nio.channels.SocketChannel;
027import java.security.cert.CertificateException;
028import java.util.ArrayList;
029import java.util.Collection;
030import java.util.Collections;
031import java.util.IdentityHashMap;
032import java.util.Iterator;
033import java.util.List;
034import java.util.ListIterator;
035import java.util.Map;
036import java.util.concurrent.atomic.AtomicInteger;
037import java.util.concurrent.atomic.LongAdder;
038import java.util.concurrent.locks.ReentrantLock;
039import java.util.logging.Level;
040import java.util.logging.Logger;
041
042import javax.net.ssl.SSLEngine;
043import javax.net.ssl.SSLEngineResult;
044import javax.net.ssl.SSLException;
045import javax.net.ssl.SSLSession;
046
047import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
048import org.jivesoftware.smack.SmackException;
049import org.jivesoftware.smack.SmackException.NoResponseException;
050import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException;
051import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException;
052import org.jivesoftware.smack.SmackException.SmackCertificateException;
053import org.jivesoftware.smack.SmackException.SmackWrappedException;
054import org.jivesoftware.smack.SmackFuture;
055import org.jivesoftware.smack.SmackFuture.InternalSmackFuture;
056import org.jivesoftware.smack.SmackReactor.SelectionKeyAttachment;
057import org.jivesoftware.smack.XMPPException;
058import org.jivesoftware.smack.XmppInputOutputFilter;
059import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnection.ConnectedButUnauthenticatedStateDescriptor;
060import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnection.LookupRemoteConnectionEndpointsStateDescriptor;
061import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnectionModule;
062import org.jivesoftware.smack.c2s.StreamOpenAndCloseFactory;
063import org.jivesoftware.smack.c2s.XmppClientToServerTransport;
064import org.jivesoftware.smack.c2s.internal.ModularXmppClientToServerConnectionInternal;
065import org.jivesoftware.smack.c2s.internal.WalkStateGraphContext;
066import org.jivesoftware.smack.debugger.SmackDebugger;
067import org.jivesoftware.smack.fsm.State;
068import org.jivesoftware.smack.fsm.StateDescriptor;
069import org.jivesoftware.smack.fsm.StateTransitionResult;
070import org.jivesoftware.smack.internal.SmackTlsContext;
071import org.jivesoftware.smack.packet.Stanza;
072import org.jivesoftware.smack.packet.StartTls;
073import org.jivesoftware.smack.packet.StreamClose;
074import org.jivesoftware.smack.packet.StreamOpen;
075import org.jivesoftware.smack.packet.TlsFailure;
076import org.jivesoftware.smack.packet.TlsProceed;
077import org.jivesoftware.smack.packet.TopLevelStreamElement;
078import org.jivesoftware.smack.packet.XmlEnvironment;
079import org.jivesoftware.smack.tcp.XmppTcpTransportModule.XmppTcpNioTransport.DiscoveredTcpEndpoints;
080import org.jivesoftware.smack.tcp.rce.RemoteXmppTcpConnectionEndpoints;
081import org.jivesoftware.smack.tcp.rce.RemoteXmppTcpConnectionEndpoints.Result;
082import org.jivesoftware.smack.tcp.rce.Rfc6120TcpRemoteConnectionEndpoint;
083import org.jivesoftware.smack.util.CollectionUtil;
084import org.jivesoftware.smack.util.StringUtils;
085import org.jivesoftware.smack.util.UTF8;
086import org.jivesoftware.smack.util.XmlStringBuilder;
087import org.jivesoftware.smack.util.rce.RemoteConnectionEndpointLookupFailure;
088
089import org.jxmpp.jid.DomainBareJid;
090import org.jxmpp.jid.Jid;
091import org.jxmpp.jid.util.JidUtil;
092import org.jxmpp.xml.splitter.Utf8ByteXmppXmlSplitter;
093import org.jxmpp.xml.splitter.XmlPrettyPrinter;
094import org.jxmpp.xml.splitter.XmlPrinter;
095import org.jxmpp.xml.splitter.XmppElementCallback;
096import org.jxmpp.xml.splitter.XmppXmlSplitter;
097
098public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionModule<XmppTcpTransportModuleDescriptor> {
099
100    private static final Logger LOGGER = Logger.getLogger(XmppTcpTransportModule.class.getName());
101
102    private static final int CALLBACK_MAX_BYTES_READ = 10 * 1024 * 1024;
103    private static final int CALLBACK_MAX_BYTES_WRITEN = CALLBACK_MAX_BYTES_READ;
104
105    private static final int MAX_ELEMENT_SIZE = 64 * 1024;
106
107    private final XmppTcpNioTransport tcpNioTransport;
108
109    private SelectionKey selectionKey;
110    private SelectionKeyAttachment selectionKeyAttachment;
111    private SocketChannel socketChannel;
112    private InetSocketAddress remoteAddress;
113
114    private TlsState tlsState;
115
116    private Iterator<CharSequence> outgoingCharSequenceIterator;
117
118    private final List<TopLevelStreamElement> currentlyOutgoingElements = new ArrayList<>();
119    private final IdentityHashMap<ByteBuffer, List<TopLevelStreamElement>> bufferToElementMap = new IdentityHashMap<>();
120
121    private ByteBuffer outgoingBuffer;
122    private ByteBuffer filteredOutgoingBuffer;
123    private final List<ByteBuffer> networkOutgoingBuffers = new ArrayList<>();
124    private long networkOutgoingBuffersBytes;
125
126    // TODO: Make the size of the incomingBuffer configurable.
127    private final ByteBuffer incomingBuffer = ByteBuffer.allocateDirect(2 * 4096);
128
129    private final ReentrantLock channelSelectedCallbackLock = new ReentrantLock();
130
131    private long totalBytesRead;
132    private long totalBytesWritten;
133    private long totalBytesReadAfterFilter;
134    private long totalBytesWrittenBeforeFilter;
135    private long handledChannelSelectedCallbacks;
136    private long callbackPreemtBecauseBytesWritten;
137    private long callbackPreemtBecauseBytesRead;
138    private int sslEngineDelegatedTasks;
139    private int maxPendingSslEngineDelegatedTasks;
140
141    private final LongAdder setWriteInterestAfterChannelSelectedCallback = new LongAdder();
142    private final LongAdder reactorThreadAlreadyRacing = new LongAdder();
143    private final LongAdder afterOutgoingElementsQueueModifiedSetInterestOps = new LongAdder();
144    private final LongAdder rejectedChannelSelectedCallbacks = new LongAdder();
145
146    private Jid lastDestinationAddress;
147
148    private boolean pendingInputFilterData;
149    private boolean pendingOutputFilterData;
150
151    private boolean pendingWriteInterestAfterRead;
152
153    /**
154     * Note that this field is effective final, but due to https://stackoverflow.com/q/30360824/194894 we have to declare it non-final.
155     */
156    private Utf8ByteXmppXmlSplitter splitter;
157
158    /**
159     * Note that this field is effective final, but due to https://stackoverflow.com/q/30360824/194894 we have to declare it non-final.
160     */
161    private XmppXmlSplitter outputDebugSplitter;
162
163    private static final Level STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL = Level.FINER;
164
165    XmppTcpTransportModule(XmppTcpTransportModuleDescriptor moduleDescriptor, ModularXmppClientToServerConnectionInternal connectionInternal) {
166        super(moduleDescriptor, connectionInternal);
167
168        tcpNioTransport = new XmppTcpNioTransport(connectionInternal);
169
170        XmlPrinter incomingDebugPrettyPrinter = null;
171        final SmackDebugger debugger = connectionInternal.smackDebugger;
172        if (debugger != null) {
173            // Incoming stream debugging.
174            incomingDebugPrettyPrinter = XmlPrettyPrinter.builder()
175                    .setPrettyWriter(sb -> debugger.incomingStreamSink(sb))
176                    .build();
177
178            // Outgoing stream debugging.
179            XmlPrinter outgoingDebugPrettyPrinter = XmlPrettyPrinter.builder()
180                    .setPrettyWriter(sb -> debugger.outgoingStreamSink(sb))
181                    .build();
182            outputDebugSplitter = new XmppXmlSplitter(outgoingDebugPrettyPrinter);
183        }
184
185        XmppXmlSplitter xmppXmlSplitter = new XmppXmlSplitter(MAX_ELEMENT_SIZE, xmppElementCallback,
186                incomingDebugPrettyPrinter);
187        splitter = new Utf8ByteXmppXmlSplitter(xmppXmlSplitter);
188    }
189
190    private final XmppElementCallback xmppElementCallback = new XmppElementCallback() {
191        private String streamOpen;
192        private String streamClose;
193
194        @Override
195        public void onCompleteElement(String completeElement) {
196            assert streamOpen != null;
197            assert streamClose != null;
198
199            connectionInternal.withSmackDebugger(debugger -> debugger.onIncomingElementCompleted());
200
201            String wrappedCompleteElement = streamOpen + completeElement + streamClose;
202            connectionInternal.parseAndProcessElement(wrappedCompleteElement);
203        }
204
205
206        @Override
207        public void streamOpened(String prefix, Map<String, String> attributes) {
208            if (LOGGER.isLoggable(STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL)) {
209                LOGGER.log(STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL,
210                                "Stream of " + this + " opened. prefix=" + prefix + " attributes=" + attributes);
211            }
212
213            final String prefixXmlns = "xmlns:" + prefix;
214            // TODO: Use the return value of onStreamOpen(), which now returns the
215            // corresponding stream close tag, instead of creating it here.
216            final StringBuilder streamClose = new StringBuilder(32);
217            final StringBuilder streamOpen = new StringBuilder(256);
218
219            streamOpen.append('<');
220            streamClose.append("</");
221            if (StringUtils.isNotEmpty(prefix)) {
222                streamOpen.append(prefix).append(':');
223                streamClose.append(prefix).append(':');
224            }
225            streamOpen.append("stream");
226            streamClose.append("stream>");
227            for (Map.Entry<String, String> entry : attributes.entrySet()) {
228                String attributeName = entry.getKey();
229                String attributeValue = entry.getValue();
230                switch (attributeName) {
231                case "to":
232                case "from":
233                case "id":
234                case "version":
235                    break;
236                case "xml:lang":
237                    streamOpen.append(" xml:lang='").append(attributeValue).append('\'');
238                    break;
239                case "xmlns":
240                    streamOpen.append(" xmlns='").append(attributeValue).append('\'');
241                    break;
242                default:
243                    if (attributeName.equals(prefixXmlns)) {
244                        streamOpen.append(' ').append(prefixXmlns).append("='").append(attributeValue).append('\'');
245                        break;
246                    }
247                    LOGGER.info("Unknown <stream/> attribute: " + attributeName);
248                    break;
249                }
250            }
251            streamOpen.append('>');
252
253            this.streamOpen = streamOpen.toString();
254            this.streamClose = streamClose.toString();
255
256            connectionInternal.onStreamOpen(this.streamOpen);
257        }
258
259        @Override
260        public void streamClosed() {
261            if (LOGGER.isLoggable(STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL)) {
262                LOGGER.log(STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL, "Stream of " + this + " closed");
263            }
264
265           connectionInternal.onStreamClosed();
266        }
267    };
268
269    private void onChannelSelected(SelectableChannel selectedChannel, SelectionKey selectedSelectionKey) {
270        assert selectionKey == null || selectionKey == selectedSelectionKey;
271        SocketChannel selectedSocketChannel = (SocketChannel) selectedChannel;
272        // We are *always* interested in OP_READ.
273        int newInterestedOps = SelectionKey.OP_READ;
274        boolean newPendingOutputFilterData = false;
275
276        if (!channelSelectedCallbackLock.tryLock()) {
277            rejectedChannelSelectedCallbacks.increment();
278            return;
279        }
280
281        handledChannelSelectedCallbacks++;
282
283        long callbackBytesRead = 0;
284        long callbackBytesWritten = 0;
285
286        try {
287            boolean destinationAddressChanged = false;
288            boolean isLastPartOfElement = false;
289            TopLevelStreamElement currentlyOutgonigTopLevelStreamElement = null;
290            StringBuilder outgoingStreamForDebugger = null;
291
292            writeLoop: while (true) {
293                final boolean moreDataAvailable = !isLastPartOfElement || !connectionInternal.outgoingElementsQueue.isEmpty();
294
295                if (filteredOutgoingBuffer != null || !networkOutgoingBuffers.isEmpty()) {
296                    if (filteredOutgoingBuffer != null) {
297                        networkOutgoingBuffers.add(filteredOutgoingBuffer);
298                        networkOutgoingBuffersBytes += filteredOutgoingBuffer.remaining();
299
300                        filteredOutgoingBuffer = null;
301                        if (moreDataAvailable && networkOutgoingBuffersBytes < 8096) {
302                            continue;
303                        }
304                    }
305
306                    ByteBuffer[] output = networkOutgoingBuffers.toArray(new ByteBuffer[networkOutgoingBuffers.size()]);
307                    long bytesWritten;
308                    try {
309                        bytesWritten = selectedSocketChannel.write(output);
310                    } catch (IOException e) {
311                        // We have seen here so far
312                        // - IOException "Broken pipe"
313                        handleReadWriteIoException(e);
314                        break;
315                    }
316
317                    if (bytesWritten == 0) {
318                        newInterestedOps |= SelectionKey.OP_WRITE;
319                        break;
320                    }
321
322                    callbackBytesWritten += bytesWritten;
323
324                    networkOutgoingBuffersBytes -= bytesWritten;
325
326                    List<? extends Buffer> prunedBuffers = pruneBufferList(networkOutgoingBuffers);
327
328                    for (Buffer prunedBuffer : prunedBuffers) {
329                        List<TopLevelStreamElement> sendElements = bufferToElementMap.remove(prunedBuffer);
330                        if (sendElements == null) {
331                            continue;
332                        }
333                        for (TopLevelStreamElement elementJustSend : sendElements) {
334                            connectionInternal.fireFirstLevelElementSendListeners(elementJustSend);
335                        }
336                    }
337
338                    // Prevent one callback from dominating the reactor thread. Break out of the write-loop if we have
339                    // written a certain amount.
340                    if (callbackBytesWritten > CALLBACK_MAX_BYTES_WRITEN) {
341                        newInterestedOps |= SelectionKey.OP_WRITE;
342                        callbackPreemtBecauseBytesWritten++;
343                        break;
344                    }
345                } else if (outgoingBuffer != null || pendingOutputFilterData) {
346                    pendingOutputFilterData = false;
347
348                    if (outgoingBuffer != null) {
349                        totalBytesWrittenBeforeFilter += outgoingBuffer.remaining();
350                        if (isLastPartOfElement) {
351                            assert currentlyOutgonigTopLevelStreamElement != null;
352                            currentlyOutgoingElements.add(currentlyOutgonigTopLevelStreamElement);
353                        }
354                    }
355
356                    ByteBuffer outputFilterInputData = outgoingBuffer;
357                    // We can now null the outgoingBuffer since the filter step will take care of it from now on.
358                    outgoingBuffer = null;
359
360                    for (ListIterator<XmppInputOutputFilter> it = connectionInternal.getXmppInputOutputFilterBeginIterator(); it.hasNext();) {
361                        XmppInputOutputFilter inputOutputFilter = it.next();
362                        XmppInputOutputFilter.OutputResult outputResult;
363                        try {
364                            outputResult = inputOutputFilter.output(outputFilterInputData, isLastPartOfElement,
365                                    destinationAddressChanged, moreDataAvailable);
366                        } catch (IOException e) {
367                            connectionInternal.notifyConnectionError(e);
368                            break writeLoop;
369                        }
370                        newPendingOutputFilterData |= outputResult.pendingFilterData;
371                        outputFilterInputData = outputResult.filteredOutputData;
372                        if (outputFilterInputData != null) {
373                            ((java.nio.Buffer) outputFilterInputData).flip();
374                        }
375                    }
376
377                    // It is ok if outputFilterInputData is 'null' here, this is expected behavior.
378                    if (outputFilterInputData != null && outputFilterInputData.hasRemaining()) {
379                        filteredOutgoingBuffer = outputFilterInputData;
380                    } else {
381                        filteredOutgoingBuffer = null;
382                    }
383
384                    // If the filters did eventually not produce any output data but if there is
385                    // pending output data then we have a pending write request after read.
386                    if (filteredOutgoingBuffer == null && newPendingOutputFilterData) {
387                        pendingWriteInterestAfterRead = true;
388                    }
389
390                    if (filteredOutgoingBuffer != null && isLastPartOfElement) {
391                        bufferToElementMap.put(filteredOutgoingBuffer, new ArrayList<>(currentlyOutgoingElements));
392                        currentlyOutgoingElements.clear();
393                    }
394
395                    // Reset that the destination address has changed.
396                    if (destinationAddressChanged) {
397                        destinationAddressChanged = false;
398                    }
399                } else if (outgoingCharSequenceIterator != null) {
400                    CharSequence nextCharSequence = outgoingCharSequenceIterator.next();
401                    outgoingBuffer = UTF8.encode(nextCharSequence);
402                    if (!outgoingCharSequenceIterator.hasNext()) {
403                        outgoingCharSequenceIterator = null;
404                        isLastPartOfElement = true;
405                    } else {
406                        isLastPartOfElement = false;
407                    }
408
409                    final SmackDebugger debugger = connectionInternal.smackDebugger;
410                    if (debugger != null) {
411                        if (outgoingStreamForDebugger == null) {
412                            outgoingStreamForDebugger = new StringBuilder();
413                        }
414                        outgoingStreamForDebugger.append(nextCharSequence);
415
416                        if (isLastPartOfElement) {
417                            try {
418                                outputDebugSplitter.append(outgoingStreamForDebugger);
419                            } catch (IOException e) {
420                                throw new AssertionError(e);
421                            }
422                            debugger.onOutgoingElementCompleted();
423                            outgoingStreamForDebugger = null;
424                        }
425                    }
426                } else if (!connectionInternal.outgoingElementsQueue.isEmpty()) {
427                    currentlyOutgonigTopLevelStreamElement = connectionInternal.outgoingElementsQueue.poll();
428                    if (currentlyOutgonigTopLevelStreamElement instanceof Stanza) {
429                        Stanza currentlyOutgoingStanza = (Stanza) currentlyOutgonigTopLevelStreamElement;
430                        Jid currentDestinationAddress = currentlyOutgoingStanza.getTo();
431                        destinationAddressChanged = !JidUtil.equals(lastDestinationAddress, currentDestinationAddress);
432                        lastDestinationAddress = currentDestinationAddress;
433                    }
434                    CharSequence nextCharSequence = currentlyOutgonigTopLevelStreamElement.toXML(StreamOpen.CLIENT_NAMESPACE);
435                    if (nextCharSequence instanceof XmlStringBuilder) {
436                        XmlStringBuilder xmlStringBuilder = (XmlStringBuilder) nextCharSequence;
437                        XmlEnvironment outgoingStreamXmlEnvironment = connectionInternal.getOutgoingStreamXmlEnvironment();
438                        outgoingCharSequenceIterator = xmlStringBuilder.toList(outgoingStreamXmlEnvironment).iterator();
439                    } else {
440                        outgoingCharSequenceIterator = Collections.singletonList(nextCharSequence).iterator();
441                    }
442                    assert outgoingCharSequenceIterator != null;
443                } else {
444                    // There is nothing more to write.
445                    break;
446                }
447            }
448
449            pendingOutputFilterData = newPendingOutputFilterData;
450            if (!pendingWriteInterestAfterRead && pendingOutputFilterData) {
451                newInterestedOps |= SelectionKey.OP_WRITE;
452            }
453
454            readLoop: while (true) {
455                // Prevent one callback from dominating the reactor thread. Break out of the read-loop if we have
456                // read a certain amount.
457                if (callbackBytesRead > CALLBACK_MAX_BYTES_READ) {
458                    callbackPreemtBecauseBytesRead++;
459                    break;
460                }
461
462                int bytesRead;
463                ((java.nio.Buffer) incomingBuffer).clear();
464                try {
465                    bytesRead = selectedSocketChannel.read(incomingBuffer);
466                } catch (IOException e) {
467                    handleReadWriteIoException(e);
468                    return;
469                }
470
471                if (bytesRead < 0) {
472                    LOGGER.finer("NIO read() returned " + bytesRead
473                            + " for " + this + ". This probably means that the TCP connection was terminated.");
474                    // According to the socket channel javadoc section about "asynchronous reads" a socket channel's
475                    // read() may return -1 if the input side of a socket is shut down.
476                     // Note that we do not call notifyConnectionError() here because the connection may be
477                    // cleanly shutdown which would also cause read() to return '-1. I assume that this socket
478                    // will be selected again, on which read() would throw an IOException, which will be caught
479                    // and invoke notifyConnectionError() (see a few lines above).
480                    /*
481                    IOException exception = new IOException("NIO read() returned " + bytesRead);
482                    notifyConnectionError(exception);
483                    */
484                    return;
485                }
486
487                if (!pendingInputFilterData) {
488                    if (bytesRead == 0) {
489                        // Nothing more to read.
490                        break;
491                    }
492                } else {
493                    pendingInputFilterData = false;
494                }
495
496                if (pendingWriteInterestAfterRead) {
497                    // We have successfully read something and someone announced a write interest after a read. It is
498                    // now possible that a filter is now also able to write additional data (for example SSLEngine).
499                    pendingWriteInterestAfterRead = false;
500                    newInterestedOps |= SelectionKey.OP_WRITE;
501                }
502
503                callbackBytesRead += bytesRead;
504
505                ByteBuffer filteredIncomingBuffer = incomingBuffer;
506                for (ListIterator<XmppInputOutputFilter> it = connectionInternal.getXmppInputOutputFilterEndIterator(); it.hasPrevious();) {
507                    ((java.nio.Buffer) filteredIncomingBuffer).flip();
508
509                    ByteBuffer newFilteredIncomingBuffer;
510                    try {
511                        newFilteredIncomingBuffer = it.previous().input(filteredIncomingBuffer);
512                    } catch (IOException e) {
513                        connectionInternal.notifyConnectionError(e);
514                        return;
515                    }
516                    if (newFilteredIncomingBuffer == null) {
517                        break readLoop;
518                    }
519                    filteredIncomingBuffer = newFilteredIncomingBuffer;
520                }
521
522                ((java.nio.Buffer) filteredIncomingBuffer).flip();
523                final int bytesReadAfterFilter = filteredIncomingBuffer.remaining();
524
525                totalBytesReadAfterFilter += bytesReadAfterFilter;
526
527                try {
528                    splitter.write(filteredIncomingBuffer);
529                } catch (IOException e) {
530                    connectionInternal.notifyConnectionError(e);
531                    return;
532                }
533            }
534        } finally {
535            totalBytesWritten += callbackBytesWritten;
536            totalBytesRead += callbackBytesRead;
537
538            channelSelectedCallbackLock.unlock();
539        }
540
541        // Indicate that there is no reactor thread racing towards handling this selection key.
542        final SelectionKeyAttachment selectionKeyAttachment = this.selectionKeyAttachment;
543        if (selectionKeyAttachment != null) {
544            selectionKeyAttachment.resetReactorThreadRacing();
545        }
546
547        // Check the queue again to prevent lost wakeups caused by elements inserted before we
548        // called resetReactorThreadRacing() a few lines above.
549        if (!connectionInternal.outgoingElementsQueue.isEmpty()) {
550            setWriteInterestAfterChannelSelectedCallback.increment();
551            newInterestedOps |= SelectionKey.OP_WRITE;
552        }
553
554        connectionInternal.setInterestOps(selectionKey, newInterestedOps);
555    }
556
557    private void handleReadWriteIoException(IOException e) {
558        if (e instanceof ClosedChannelException && !tcpNioTransport.isConnected()) {
559            // The connection is already closed.
560            return;
561        }
562
563       connectionInternal.notifyConnectionError(e);
564    }
565
566    /**
567     * This is the interface between the "lookup remote connection endpoints" state and the "establish TCP connection"
568     * state. The field is indirectly populated by {@link XmppTcpNioTransport#lookupConnectionEndpoints()} and consumed
569     * by {@link ConnectionAttemptState}.
570     */
571    DiscoveredTcpEndpoints discoveredTcpEndpoints;
572
573    final class XmppTcpNioTransport extends XmppClientToServerTransport {
574
575        XmppTcpNioTransport(ModularXmppClientToServerConnectionInternal connectionInternal) {
576            super(connectionInternal);
577        }
578
579        @Override
580        public StreamOpenAndCloseFactory getStreamOpenAndCloseFactory() {
581            return new StreamOpenAndCloseFactory() {
582                @Override
583                public StreamOpen createStreamOpen(DomainBareJid to, CharSequence from, String id, String lang) {
584                    String xmlLang = connectionInternal.connection.getConfiguration().getXmlLang();
585                    StreamOpen streamOpen = new StreamOpen(to, from, id, xmlLang, StreamOpen.StreamContentNamespace.client);
586                    return streamOpen;
587                }
588                @Override
589                public StreamClose createStreamClose() {
590                    return StreamClose.INSTANCE;
591                }
592            };
593        }
594
595        @Override
596        protected void resetDiscoveredConnectionEndpoints() {
597            discoveredTcpEndpoints = null;
598        }
599
600        @Override
601        public boolean hasUseableConnectionEndpoints() {
602            return discoveredTcpEndpoints != null;
603        }
604
605        @Override
606        protected List<SmackFuture<LookupConnectionEndpointsResult, Exception>> lookupConnectionEndpoints() {
607            // Assert that there are no stale discovered endpoints prior performing the lookup.
608            assert discoveredTcpEndpoints == null;
609
610            List<SmackFuture<LookupConnectionEndpointsResult, Exception>> futures = new ArrayList<>(2);
611
612            InternalSmackFuture<LookupConnectionEndpointsResult, Exception> tcpEndpointsLookupFuture = new InternalSmackFuture<>();
613            connectionInternal.asyncGo(() -> {
614                Result<Rfc6120TcpRemoteConnectionEndpoint> result = RemoteXmppTcpConnectionEndpoints.lookup(
615                                connectionInternal.connection.getConfiguration());
616
617                LookupConnectionEndpointsResult endpointsResult;
618                if (result.discoveredRemoteConnectionEndpoints.isEmpty()) {
619                    endpointsResult = new TcpEndpointDiscoveryFailed(result);
620                } else {
621                    endpointsResult = new DiscoveredTcpEndpoints(result);
622                }
623                tcpEndpointsLookupFuture.setResult(endpointsResult);
624            });
625            futures.add(tcpEndpointsLookupFuture);
626
627            if (moduleDescriptor.isDirectTlsEnabled()) {
628                // TODO: Implement this.
629                throw new IllegalArgumentException("DirectTLS is not implemented yet");
630            }
631
632            return futures;
633        }
634
635        @Override
636        protected void loadConnectionEndpoints(LookupConnectionEndpointsSuccess lookupConnectionEndpointsSuccess) {
637            // The API contract stats that we will be given the instance we handed out with lookupConnectionEndpoints,
638            // which must be of type DiscoveredTcpEndpoints here. Hence, if we can not cast it, then there is an internal
639            // Smack error.
640            discoveredTcpEndpoints = (DiscoveredTcpEndpoints) lookupConnectionEndpointsSuccess;
641        }
642
643        @Override
644        protected void afterFiltersClosed() {
645            pendingInputFilterData = pendingOutputFilterData = true;
646            afterOutgoingElementsQueueModified();
647        }
648
649        @Override
650        protected void disconnect() {
651            XmppTcpTransportModule.this.closeSocketAndCleanup();
652        }
653
654        @Override
655        protected void notifyAboutNewOutgoingElements() {
656            afterOutgoingElementsQueueModified();
657        }
658
659        @Override
660        public SSLSession getSslSession() {
661            TlsState tlsState = XmppTcpTransportModule.this.tlsState;
662            if (tlsState == null) {
663                return null;
664            }
665
666            return tlsState.engine.getSession();
667        }
668
669        public boolean isConnected() {
670            SocketChannel socketChannel = XmppTcpTransportModule.this.socketChannel;
671            if (socketChannel == null) {
672                return false;
673            }
674
675            return socketChannel.isConnected();
676        }
677
678        @Override
679        public boolean isTransportSecured() {
680            final TlsState tlsState = XmppTcpTransportModule.this.tlsState;
681            return tlsState != null && tlsState.handshakeStatus == TlsHandshakeStatus.successful;
682        }
683
684        @Override
685        public XmppTcpTransportModule.Stats getStats() {
686            return XmppTcpTransportModule.this.getStats();
687        }
688
689        final class DiscoveredTcpEndpoints implements LookupConnectionEndpointsSuccess {
690            final RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result;
691            DiscoveredTcpEndpoints(RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result) {
692                this.result = result;
693            }
694        }
695
696        final class TcpEndpointDiscoveryFailed implements LookupConnectionEndpointsFailed {
697            final List<RemoteConnectionEndpointLookupFailure> lookupFailures;
698            TcpEndpointDiscoveryFailed(RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result) {
699                lookupFailures = result.lookupFailures;
700            }
701        }
702    }
703
704    private void afterOutgoingElementsQueueModified() {
705        final SelectionKeyAttachment selectionKeyAttachment = this.selectionKeyAttachment;
706        if (selectionKeyAttachment != null && selectionKeyAttachment.isReactorThreadRacing()) {
707            // A reactor thread is already racing to the channel selected callback and will take care of this.
708            reactorThreadAlreadyRacing.increment();
709            return;
710        }
711
712        afterOutgoingElementsQueueModifiedSetInterestOps.increment();
713
714        // Add OP_WRITE to the interested Ops, since we have now new things to write. Note that this may cause
715        // multiple reactor threads to race to the channel selected callback in case we perform this right after
716        // a select() returned with this selection key in the selected-key set. Hence, we use tryLock() in the
717        // channel selected callback to keep the invariant that only exactly one thread is performing the
718        // callback.
719        // Note that we need to perform setInterestedOps() *without* holding the channelSelectedCallbackLock, as
720        // otherwise the reactor thread racing to the channel selected callback may found the lock still locked, which
721        // would result in the outgoingElementsQueue not being handled.
722        connectionInternal.setInterestOps(selectionKey, SelectionKey.OP_WRITE | SelectionKey.OP_READ);
723    }
724
725    @Override
726    protected XmppTcpNioTransport getTransport() {
727        return tcpNioTransport;
728    }
729
730    static final class EstablishingTcpConnectionStateDescriptor extends StateDescriptor {
731        private EstablishingTcpConnectionStateDescriptor() {
732            super(XmppTcpTransportModule.EstablishingTcpConnectionState.class);
733            addPredeccessor(LookupRemoteConnectionEndpointsStateDescriptor.class);
734            addSuccessor(EstablishTlsStateDescriptor.class);
735            addSuccessor(ConnectedButUnauthenticatedStateDescriptor.class);
736        }
737
738        @Override
739        protected XmppTcpTransportModule.EstablishingTcpConnectionState constructState(ModularXmppClientToServerConnectionInternal connectionInternal) {
740            XmppTcpTransportModule tcpTransportModule = connectionInternal.connection.getConnectionModuleFor(XmppTcpTransportModuleDescriptor.class);
741            return tcpTransportModule.constructEstablishingTcpConnectionState(this, connectionInternal);
742        }
743    }
744
745    private EstablishingTcpConnectionState constructEstablishingTcpConnectionState(
746                    EstablishingTcpConnectionStateDescriptor stateDescriptor,
747                    ModularXmppClientToServerConnectionInternal connectionInternal) {
748        return new EstablishingTcpConnectionState(stateDescriptor, connectionInternal);
749    }
750
751    final class EstablishingTcpConnectionState extends State.AbstractTransport {
752        private EstablishingTcpConnectionState(EstablishingTcpConnectionStateDescriptor stateDescriptor,
753                        ModularXmppClientToServerConnectionInternal connectionInternal) {
754            super(tcpNioTransport, stateDescriptor, connectionInternal);
755        }
756
757        @Override
758        public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext)
759                        throws InterruptedException, IOException, SmackException, XMPPException {
760            // The fields inetSocketAddress and failedAddresses are handed over from LookupHostAddresses to
761            // ConnectingToHost.
762            ConnectionAttemptState connectionAttemptState = new ConnectionAttemptState(connectionInternal, discoveredTcpEndpoints,
763                    this);
764            StateTransitionResult.Failure failure = connectionAttemptState.establishTcpConnection();
765            if (failure != null) {
766                return failure;
767            }
768
769            socketChannel = connectionAttemptState.socketChannel;
770            remoteAddress = (InetSocketAddress) socketChannel.socket().getRemoteSocketAddress();
771
772            selectionKey = connectionInternal.registerWithSelector(socketChannel, SelectionKey.OP_READ,
773                            XmppTcpTransportModule.this::onChannelSelected);
774            selectionKeyAttachment = (SelectionKeyAttachment) selectionKey.attachment();
775
776            connectionInternal.setTransport(tcpNioTransport);
777
778            // TODO: It appears this should be done in a generic way. I'd assume we always
779            // have to wait for stream features after the connection was established. If this is true then consider
780            // moving this into State.AbstractTransport. But I am not yet 100% positive that this is the case for every
781            // transport. Hence, keep it here for now.
782            connectionInternal.newStreamOpenWaitForFeaturesSequence("stream features after initial connection");
783
784            return new TcpSocketConnectedResult(remoteAddress);
785        }
786
787        @Override
788        public void resetState() {
789            closeSocketAndCleanup();
790        }
791    }
792
793    public static final class TcpSocketConnectedResult extends StateTransitionResult.Success {
794        private final InetSocketAddress remoteAddress;
795
796        private TcpSocketConnectedResult(InetSocketAddress remoteAddress) {
797            super("TCP connection established to " + remoteAddress);
798            this.remoteAddress = remoteAddress;
799        }
800
801        public InetSocketAddress getRemoteAddress() {
802            return remoteAddress;
803        }
804    }
805
806    public static final class TlsEstablishedResult extends StateTransitionResult.Success {
807
808        private TlsEstablishedResult(SSLEngine sslEngine) {
809            super("TLS established: " + sslEngine.getSession());
810        }
811    }
812
813    static final class EstablishTlsStateDescriptor extends StateDescriptor {
814        private EstablishTlsStateDescriptor() {
815            super(XmppTcpTransportModule.EstablishTlsState.class, "RFC 6120 § 5");
816            addSuccessor(ConnectedButUnauthenticatedStateDescriptor.class);
817            declarePrecedenceOver(ConnectedButUnauthenticatedStateDescriptor.class);
818        }
819
820        @Override
821        protected EstablishTlsState constructState(ModularXmppClientToServerConnectionInternal connectionInternal) {
822            XmppTcpTransportModule tcpTransportModule = connectionInternal.connection.getConnectionModuleFor(XmppTcpTransportModuleDescriptor.class);
823            return tcpTransportModule.constructEstablishingTlsState(this, connectionInternal);
824        }
825    }
826
827    private EstablishTlsState constructEstablishingTlsState(
828                    EstablishTlsStateDescriptor stateDescriptor,
829                    ModularXmppClientToServerConnectionInternal connectionInternal) {
830        return new EstablishTlsState(stateDescriptor, connectionInternal);
831    }
832
833    private final class EstablishTlsState extends State {
834        private EstablishTlsState(EstablishTlsStateDescriptor stateDescriptor,
835                        ModularXmppClientToServerConnectionInternal connectionInternal) {
836            super(stateDescriptor, connectionInternal);
837        }
838
839        @Override
840        public StateTransitionResult.TransitionImpossible isTransitionToPossible(WalkStateGraphContext walkStateGraphContext)
841                throws SecurityRequiredByClientException, SecurityRequiredByServerException {
842            StartTls startTlsFeature = connectionInternal.connection.getFeature(StartTls.class);
843            SecurityMode securityMode = connectionInternal.connection.getConfiguration().getSecurityMode();
844
845            switch (securityMode) {
846            case required:
847            case ifpossible:
848                if (startTlsFeature == null) {
849                    if (securityMode == SecurityMode.ifpossible) {
850                        return new StateTransitionResult.TransitionImpossibleReason("Server does not announce support for TLS and we do not required it");
851                    }
852                    throw new SecurityRequiredByClientException();
853                }
854                // Allows transition by returning null.
855                return null;
856            case disabled:
857                if (startTlsFeature != null && startTlsFeature.required()) {
858                    throw new SecurityRequiredByServerException();
859                }
860                return new StateTransitionResult.TransitionImpossibleReason("TLS disabled in client settings and server does not require it");
861            default:
862                throw new AssertionError("Unknown security mode: " + securityMode);
863            }
864        }
865
866        @Override
867        public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext)
868                        throws IOException, InterruptedException, SmackException, XMPPException {
869            connectionInternal.sendAndWaitForResponse(StartTls.INSTANCE, TlsProceed.class, TlsFailure.class);
870
871            SmackTlsContext smackTlsContext = connectionInternal.getSmackTlsContext();
872
873            tlsState = new TlsState(smackTlsContext);
874            connectionInternal.addXmppInputOutputFilter(tlsState);
875
876            channelSelectedCallbackLock.lock();
877            try {
878                pendingOutputFilterData = true;
879                // The beginHandshake() is possibly not really required here, but it does not hurt either.
880                tlsState.engine.beginHandshake();
881                tlsState.handshakeStatus = TlsHandshakeStatus.initiated;
882            } finally {
883                channelSelectedCallbackLock.unlock();
884            }
885            connectionInternal.setInterestOps(selectionKey, SelectionKey.OP_WRITE | SelectionKey.OP_READ);
886
887            try {
888                tlsState.waitForHandshakeFinished();
889            } catch (CertificateException e) {
890                throw new SmackCertificateException(e);
891            }
892
893            connectionInternal.newStreamOpenWaitForFeaturesSequence("stream features after TLS established");
894
895            return new TlsEstablishedResult(tlsState.engine);
896        }
897
898        @Override
899        public void resetState() {
900            tlsState = null;
901        }
902    }
903
904    private enum TlsHandshakeStatus {
905        initial,
906        initiated,
907        successful,
908        failed,
909    }
910
911    private static final Level SSL_ENGINE_DEBUG_LOG_LEVEL = Level.FINEST;
912
913    private static void debugLogSslEngineResult(String operation, SSLEngineResult result) {
914        if (!LOGGER.isLoggable(SSL_ENGINE_DEBUG_LOG_LEVEL)) {
915            return;
916        }
917
918        LOGGER.log(SSL_ENGINE_DEBUG_LOG_LEVEL, "SSLEngineResult of " + operation + "(): " + result);
919    }
920
921    private final class TlsState implements XmppInputOutputFilter {
922
923        private static final int MAX_PENDING_OUTPUT_BYTES = 8096;
924
925        private final SmackTlsContext smackTlsContext;
926        private final SSLEngine engine;
927
928        private TlsHandshakeStatus handshakeStatus = TlsHandshakeStatus.initial;
929        private SSLException handshakeException;
930
931        private ByteBuffer myNetData;
932        private ByteBuffer peerAppData;
933
934        private final List<ByteBuffer> pendingOutputData = new ArrayList<>();
935        private int pendingOutputBytes;
936        private ByteBuffer pendingInputData;
937
938        private final AtomicInteger pendingDelegatedTasks = new AtomicInteger();
939
940        private long wrapInBytes;
941        private long wrapOutBytes;
942
943        private long unwrapInBytes;
944        private long unwrapOutBytes;
945
946        private TlsState(SmackTlsContext smackTlsContext) throws IOException {
947            this.smackTlsContext = smackTlsContext;
948
949            // Call createSSLEngine()'s variant with two parameters as this allows for TLS session resumption.
950
951            // Note that it is not really clear what the value of peer host should be. It could be A) the XMPP service's
952            // domainpart or B) the DNS name of the host we are connecting to (usually the DNS SRV RR target name). While
953            // the javadoc of createSSLEngine(String, int) indicates with "Some cipher suites (such as Kerberos) require
954            // remote hostname information, in which case peerHost needs to be specified." that A should be used. TLS
955            // session resumption may would need or at least benefit from B. Variant A would also be required if the
956            // String is used for certificate verification. And it appears at least likely that TLS session resumption
957            // would not be hurt by using variant A. Therefore we currently use variant A.
958            // TODO: Should we use the ACE representation of the XMPP service domain? Compare with f60e4055ec529f0b8160acedf13275592ab10a4b
959            // If yes, then we should probably introduce getXmppServiceDomainAceEncodedIfPossible().
960            String peerHost = connectionInternal.connection.getConfiguration().getXMPPServiceDomain().toString();
961            engine = smackTlsContext.sslContext.createSSLEngine(peerHost, remoteAddress.getPort());
962            engine.setUseClientMode(true);
963
964            SSLSession session = engine.getSession();
965            int applicationBufferSize = session.getApplicationBufferSize();
966            int packetBufferSize = session.getPacketBufferSize();
967
968            myNetData = ByteBuffer.allocateDirect(packetBufferSize);
969            peerAppData = ByteBuffer.allocate(applicationBufferSize);
970        }
971
972        @Override
973        public OutputResult output(ByteBuffer outputData, boolean isFinalDataOfElement, boolean destinationAddressChanged,
974                boolean moreDataAvailable) throws SSLException {
975            if (outputData != null) {
976                pendingOutputData.add(outputData);
977                pendingOutputBytes += outputData.remaining();
978                if (moreDataAvailable && pendingOutputBytes < MAX_PENDING_OUTPUT_BYTES) {
979                    return OutputResult.NO_OUTPUT;
980                }
981            }
982
983            ByteBuffer[] outputDataArray = pendingOutputData.toArray(new ByteBuffer[pendingOutputData.size()]);
984
985            ((java.nio.Buffer) myNetData).clear();
986
987            while (true) {
988                SSLEngineResult result;
989                try {
990                    result = engine.wrap(outputDataArray, myNetData);
991                } catch (SSLException e) {
992                    handleSslException(e);
993                    throw e;
994                }
995
996                debugLogSslEngineResult("wrap", result);
997
998                SSLEngineResult.Status engineResultStatus = result.getStatus();
999
1000                pendingOutputBytes -= result.bytesConsumed();
1001
1002                if (engineResultStatus == SSLEngineResult.Status.OK) {
1003                    wrapInBytes += result.bytesConsumed();
1004                    wrapOutBytes += result.bytesProduced();
1005
1006                    SSLEngineResult.HandshakeStatus handshakeStatus = handleHandshakeStatus(result);
1007                    switch (handshakeStatus) {
1008                        case NEED_UNWRAP:
1009                            // NEED_UNWRAP means that we need to receive something in order to continue the handshake. The
1010                            // standard channelSelectedCallback logic will take care of this, as there is eventually always
1011                            // a interest to read from the socket.
1012                            break;
1013                        case NEED_WRAP:
1014                            // Same as need task: Cycle the reactor.
1015                        case NEED_TASK:
1016                            // Note that we also set pendingOutputFilterData in the OutputResult in the NEED_TASK case, as
1017                            // we also want to retry the wrap() operation above in this case.
1018                            return new OutputResult(true, myNetData);
1019                        default:
1020                            break;
1021                    }
1022                }
1023
1024                switch (engineResultStatus) {
1025                case OK:
1026                    // No need to outputData.compact() here, since we do not reuse the buffer.
1027                    // Clean up the pending output data.
1028                    pruneBufferList(pendingOutputData);
1029                    return new OutputResult(!pendingOutputData.isEmpty(), myNetData);
1030                case CLOSED:
1031                    pendingOutputData.clear();
1032                    return OutputResult.NO_OUTPUT;
1033                case BUFFER_OVERFLOW:
1034                    LOGGER.warning("SSLEngine status BUFFER_OVERFLOW, this is hopefully uncommon");
1035                    int outputDataRemaining = outputData != null ? outputData.remaining() : 0;
1036                    int newCapacity = (int) (1.3 * outputDataRemaining);
1037                    // If newCapacity would not increase myNetData, then double it.
1038                    if (newCapacity <= myNetData.capacity()) {
1039                        newCapacity = 2 * myNetData.capacity();
1040                    }
1041                    ByteBuffer newMyNetData = ByteBuffer.allocateDirect(newCapacity);
1042                    ((java.nio.Buffer) myNetData).flip();
1043                    newMyNetData.put(myNetData);
1044                    myNetData = newMyNetData;
1045                    continue;
1046                case BUFFER_UNDERFLOW:
1047                    throw new IllegalStateException(
1048                            "Buffer underflow as result of SSLEngine.wrap() should never happen");
1049                }
1050            }
1051        }
1052
1053        @SuppressWarnings("ReferenceEquality")
1054        @Override
1055        public ByteBuffer input(ByteBuffer inputData) throws SSLException {
1056            ByteBuffer accumulatedData;
1057            if (pendingInputData == null) {
1058                accumulatedData = inputData;
1059            } else {
1060                assert pendingInputData != inputData;
1061
1062                int accumulatedDataBytes = pendingInputData.remaining() + inputData.remaining();
1063                accumulatedData = ByteBuffer.allocate(accumulatedDataBytes);
1064                accumulatedData.put(pendingInputData)
1065                               .put(inputData);
1066                ((java.nio.Buffer) accumulatedData).flip();
1067                pendingInputData = null;
1068            }
1069
1070            ((java.nio.Buffer) peerAppData).clear();
1071
1072            while (true) {
1073                SSLEngineResult result;
1074                try {
1075                    result = engine.unwrap(accumulatedData, peerAppData);
1076                } catch (SSLException e) {
1077                    handleSslException(e);
1078                    throw e;
1079                }
1080
1081                debugLogSslEngineResult("unwrap", result);
1082
1083                SSLEngineResult.Status engineResultStatus = result.getStatus();
1084
1085                if (engineResultStatus == SSLEngineResult.Status.OK) {
1086                    unwrapInBytes += result.bytesConsumed();
1087                    unwrapOutBytes += result.bytesProduced();
1088
1089                    SSLEngineResult.HandshakeStatus handshakeStatus = handleHandshakeStatus(result);
1090                    switch (handshakeStatus) {
1091                    case NEED_TASK:
1092                        // A delegated task is asynchronously running. Take care of the remaining accumulatedData.
1093                        addAsPendingInputData(accumulatedData);
1094                        // Return here, as the async task created by handleHandshakeStatus will continue calling the
1095                        // channelSelectedCallback.
1096                        return null;
1097                    case NEED_UNWRAP:
1098                        continue;
1099                    case NEED_WRAP:
1100                        // NEED_WRAP means that the SSLEngine needs to send data, probably without consuming data.
1101                        // We exploit here the fact that the channelSelectedCallback is single threaded and that the
1102                        // input processing is after the output processing.
1103                        addAsPendingInputData(accumulatedData);
1104                        // Note that it is ok that we the provided argument for pending input filter data to channel
1105                        // selected callback is false, as setPendingInputFilterData() will have set the internal state
1106                        // boolean accordingly.
1107                        connectionInternal.asyncGo(() -> callChannelSelectedCallback(false, true));
1108                        // Do not break here, but instead return and let the asynchronously invoked
1109                        // callChannelSelectedCallback() do its work.
1110                        return null;
1111                    default:
1112                        break;
1113                    }
1114                }
1115
1116                switch (engineResultStatus) {
1117                case OK:
1118                    // SSLEngine's unwrap() may not consume all bytes from the source buffer. If this is the case, then
1119                    // simply perform another unwrap until accumulatedData has no remaining bytes.
1120                    if (accumulatedData.hasRemaining()) {
1121                        continue;
1122                    }
1123                    return peerAppData;
1124                case CLOSED:
1125                    return null;
1126                case BUFFER_UNDERFLOW:
1127                    // There were not enough source bytes available to make a complete packet. Let it in
1128                    // pendingInputData. Note that we do not resize SSLEngine's source buffer - inputData in our case -
1129                    // as it is not possible.
1130                    addAsPendingInputData(accumulatedData);
1131                    return null;
1132                case BUFFER_OVERFLOW:
1133                    int applicationBufferSize = engine.getSession().getApplicationBufferSize();
1134                    assert peerAppData.remaining() < applicationBufferSize;
1135                    peerAppData = ByteBuffer.allocate(applicationBufferSize);
1136                    continue;
1137                }
1138            }
1139        }
1140
1141        private void addAsPendingInputData(ByteBuffer byteBuffer) {
1142            // Note that we can not simply write
1143            // pendingInputData = byteBuffer;
1144            // we have to copy the provided byte buffer, because it is possible that this byteBuffer is re-used by some
1145            // higher layer. That is, here 'byteBuffer' is typically 'incomingBuffer', which is a direct buffer only
1146            // allocated once per connection for performance reasons and hence re-used for read() calls.
1147            pendingInputData = ByteBuffer.allocate(byteBuffer.remaining());
1148            pendingInputData.put(byteBuffer);
1149            ((java.nio.Buffer) pendingInputData).flip();
1150
1151            pendingInputFilterData = pendingInputData.hasRemaining();
1152        }
1153
1154        private SSLEngineResult.HandshakeStatus handleHandshakeStatus(SSLEngineResult sslEngineResult) {
1155            SSLEngineResult.HandshakeStatus handshakeStatus = sslEngineResult.getHandshakeStatus();
1156            switch (handshakeStatus) {
1157            case NEED_TASK:
1158                while (true) {
1159                    final Runnable delegatedTask = engine.getDelegatedTask();
1160                    if (delegatedTask == null) {
1161                        break;
1162                    }
1163                    sslEngineDelegatedTasks++;
1164                    int currentPendingDelegatedTasks = pendingDelegatedTasks.incrementAndGet();
1165                    if (currentPendingDelegatedTasks > maxPendingSslEngineDelegatedTasks) {
1166                        maxPendingSslEngineDelegatedTasks = currentPendingDelegatedTasks;
1167                    }
1168
1169                    Runnable wrappedDelegatedTask = () -> {
1170                        delegatedTask.run();
1171                        int wrappedCurrentPendingDelegatedTasks = pendingDelegatedTasks.decrementAndGet();
1172                        if (wrappedCurrentPendingDelegatedTasks == 0) {
1173                            callChannelSelectedCallback(true, true);
1174                        }
1175                    };
1176                    connectionInternal.asyncGo(wrappedDelegatedTask);
1177                }
1178                break;
1179            case FINISHED:
1180                onHandshakeFinished();
1181                break;
1182            default:
1183                break;
1184            }
1185
1186            SSLEngineResult.HandshakeStatus afterHandshakeStatus = engine.getHandshakeStatus();
1187            return afterHandshakeStatus;
1188        }
1189
1190        private void handleSslException(SSLException e) {
1191            handshakeException = e;
1192            handshakeStatus = TlsHandshakeStatus.failed;
1193            connectionInternal.notifyWaitingThreads();
1194        }
1195
1196        private void onHandshakeFinished() {
1197            handshakeStatus = TlsHandshakeStatus.successful;
1198            connectionInternal.notifyWaitingThreads();
1199        }
1200
1201        private boolean isHandshakeFinished() {
1202            return handshakeStatus == TlsHandshakeStatus.successful || handshakeStatus == TlsHandshakeStatus.failed;
1203        }
1204
1205        private void waitForHandshakeFinished() throws InterruptedException, CertificateException, SSLException, SmackWrappedException, NoResponseException {
1206            connectionInternal.waitForConditionOrThrowConnectionException(() -> isHandshakeFinished(), "TLS handshake to finish");
1207
1208            if (handshakeStatus == TlsHandshakeStatus.failed) {
1209                throw handshakeException;
1210            }
1211
1212            assert handshakeStatus == TlsHandshakeStatus.successful;
1213
1214            if (smackTlsContext.daneVerifier != null) {
1215                smackTlsContext.daneVerifier.finish(engine.getSession());
1216            }
1217        }
1218
1219        @Override
1220        public Object getStats() {
1221            return new TlsStateStats(this);
1222        }
1223
1224        @Override
1225        public void closeInputOutput() {
1226            engine.closeOutbound();
1227            try {
1228                engine.closeInbound();
1229            } catch (SSLException e) {
1230                LOGGER.log(Level.FINEST,
1231                        "SSLException when closing inbound TLS session. This can likely be ignored if a possible truncation attack is suggested."
1232                        + " You may want to ask your XMPP server vendor to implement a clean TLS session shutdown sending close_notify after </stream>",
1233                        e);
1234            }
1235        }
1236
1237        @Override
1238        public void waitUntilInputOutputClosed() throws IOException, CertificateException, InterruptedException, SmackWrappedException, NoResponseException {
1239            waitForHandshakeFinished();
1240        }
1241
1242        @Override
1243        public String getFilterName() {
1244            return "TLS (" + engine + ')';
1245        }
1246    }
1247
1248    public static final class TlsStateStats {
1249        public final long wrapInBytes;
1250        public final long wrapOutBytes;
1251        public final double wrapRatio;
1252
1253        public final long unwrapInBytes;
1254        public final long unwrapOutBytes;
1255        public final double unwrapRatio;
1256
1257        private TlsStateStats(TlsState tlsState) {
1258            wrapOutBytes = tlsState.wrapOutBytes;
1259            wrapInBytes = tlsState.wrapInBytes;
1260            wrapRatio = (double) wrapOutBytes / wrapInBytes;
1261
1262            unwrapOutBytes = tlsState.unwrapOutBytes;
1263            unwrapInBytes = tlsState.unwrapInBytes;
1264            unwrapRatio = (double) unwrapInBytes / unwrapOutBytes;
1265        }
1266
1267        private transient String toStringCache;
1268
1269        @Override
1270        public String toString() {
1271            if (toStringCache != null) {
1272                return toStringCache;
1273            }
1274
1275            toStringCache =
1276                      "wrap-in-bytes: " + wrapInBytes + '\n'
1277                    + "wrap-out-bytes: " + wrapOutBytes + '\n'
1278                    + "wrap-ratio: " + wrapRatio + '\n'
1279                    + "unwrap-in-bytes: " + unwrapInBytes + '\n'
1280                    + "unwrap-out-bytes: " + unwrapOutBytes + '\n'
1281                    + "unwrap-ratio: " + unwrapRatio + '\n'
1282                    ;
1283
1284            return toStringCache;
1285        }
1286    }
1287
1288    private void callChannelSelectedCallback(boolean setPendingInputFilterData, boolean setPendingOutputFilterData) {
1289        final SocketChannel channel = socketChannel;
1290        final SelectionKey key = selectionKey;
1291        if (channel == null || key == null) {
1292            LOGGER.info("Not calling channel selected callback because the connection was eventually disconnected");
1293            return;
1294        }
1295
1296        channelSelectedCallbackLock.lock();
1297        try {
1298            // Note that it is important that we send the pending(Input|Output)FilterData flags while holding the lock.
1299            if (setPendingInputFilterData) {
1300                pendingInputFilterData = true;
1301            }
1302            if (setPendingOutputFilterData) {
1303                pendingOutputFilterData = true;
1304            }
1305
1306            onChannelSelected(channel, key);
1307        } finally {
1308            channelSelectedCallbackLock.unlock();
1309        }
1310    }
1311
1312    private void closeSocketAndCleanup() {
1313        final SelectionKey selectionKey = this.selectionKey;
1314        if (selectionKey != null) {
1315            selectionKey.cancel();
1316        }
1317        final SocketChannel socketChannel = this.socketChannel;
1318        if (socketChannel != null) {
1319            try {
1320                socketChannel.close();
1321            } catch (IOException e) {
1322                LOGGER.log(Level.FINE, "Closing the socket channel failed", e);
1323            }
1324        }
1325
1326        this.selectionKey = null;
1327        this.socketChannel = null;
1328
1329        selectionKeyAttachment = null;
1330        remoteAddress = null;
1331    }
1332
1333    private static List<? extends Buffer> pruneBufferList(Collection<? extends Buffer> buffers) {
1334        return CollectionUtil.removeUntil(buffers, b -> b.hasRemaining());
1335    }
1336
1337    public XmppTcpTransportModule.Stats getStats() {
1338        return new Stats(this);
1339    }
1340
1341    public static final class Stats extends XmppClientToServerTransport.Stats {
1342        public final long totalBytesWritten;
1343        public final long totalBytesWrittenBeforeFilter;
1344        public final double writeRatio;
1345
1346        public final long totalBytesRead;
1347        public final long totalBytesReadAfterFilter;
1348        public final double readRatio;
1349
1350        public final long handledChannelSelectedCallbacks;
1351        public final long setWriteInterestAfterChannelSelectedCallback;
1352        public final long reactorThreadAlreadyRacing;
1353        public final long afterOutgoingElementsQueueModifiedSetInterestOps;
1354        public final long rejectedChannelSelectedCallbacks;
1355        public final long totalCallbackRequests;
1356        public final long callbackPreemtBecauseBytesWritten;
1357        public final long callbackPreemtBecauseBytesRead;
1358        public final int sslEngineDelegatedTasks;
1359        public final int maxPendingSslEngineDelegatedTasks;
1360
1361        private Stats(XmppTcpTransportModule connection) {
1362            totalBytesWritten = connection.totalBytesWritten;
1363            totalBytesWrittenBeforeFilter = connection.totalBytesWrittenBeforeFilter;
1364            writeRatio = (double) totalBytesWritten / totalBytesWrittenBeforeFilter;
1365
1366            totalBytesReadAfterFilter = connection.totalBytesReadAfterFilter;
1367            totalBytesRead = connection.totalBytesRead;
1368            readRatio = (double) totalBytesRead / totalBytesReadAfterFilter;
1369
1370            handledChannelSelectedCallbacks = connection.handledChannelSelectedCallbacks;
1371            setWriteInterestAfterChannelSelectedCallback = connection.setWriteInterestAfterChannelSelectedCallback.sum();
1372            reactorThreadAlreadyRacing = connection.reactorThreadAlreadyRacing.sum();
1373            afterOutgoingElementsQueueModifiedSetInterestOps = connection.afterOutgoingElementsQueueModifiedSetInterestOps
1374                    .sum();
1375            rejectedChannelSelectedCallbacks = connection.rejectedChannelSelectedCallbacks.sum();
1376
1377            totalCallbackRequests = handledChannelSelectedCallbacks + rejectedChannelSelectedCallbacks;
1378
1379            callbackPreemtBecauseBytesRead = connection.callbackPreemtBecauseBytesRead;
1380            callbackPreemtBecauseBytesWritten = connection.callbackPreemtBecauseBytesWritten;
1381
1382            sslEngineDelegatedTasks = connection.sslEngineDelegatedTasks;
1383            maxPendingSslEngineDelegatedTasks = connection.maxPendingSslEngineDelegatedTasks;
1384        }
1385
1386        private transient String toStringCache;
1387
1388        @Override
1389        public String toString() {
1390            if (toStringCache != null) {
1391                return toStringCache;
1392            }
1393
1394            toStringCache =
1395              "Total bytes\n"
1396            + "recv: " + totalBytesRead + '\n'
1397            + "send: " + totalBytesWritten + '\n'
1398            + "recv-aft-filter: " + totalBytesReadAfterFilter + '\n'
1399            + "send-bef-filter: " + totalBytesWrittenBeforeFilter + '\n'
1400            + "read-ratio: " + readRatio + '\n'
1401            + "write-ratio: " + writeRatio + '\n'
1402            + "Events\n"
1403            + "total-callback-requests: " + totalCallbackRequests + '\n'
1404            + "handled-channel-selected-callbacks: " + handledChannelSelectedCallbacks + '\n'
1405            + "rejected-channel-selected-callbacks: " + rejectedChannelSelectedCallbacks + '\n'
1406            + "set-write-interest-after-callback: " + setWriteInterestAfterChannelSelectedCallback + '\n'
1407            + "reactor-thread-already-racing: " + reactorThreadAlreadyRacing + '\n'
1408            + "after-queue-modified-set-interest-ops: " + afterOutgoingElementsQueueModifiedSetInterestOps + '\n'
1409            + "callback-preemt-because-bytes-read: " + callbackPreemtBecauseBytesRead + '\n'
1410            + "callback-preemt-because-bytes-written: " + callbackPreemtBecauseBytesWritten + '\n'
1411            + "ssl-engine-delegated-tasks: " + sslEngineDelegatedTasks + '\n'
1412            + "max-pending-ssl-engine-delegated-tasks: " + maxPendingSslEngineDelegatedTasks + '\n'
1413            ;
1414
1415            return toStringCache;
1416        }
1417    }
1418}