XmppTcpTransportModule.java

/**
 *
 * Copyright 2019-2021 Florian Schmaus
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.jivesoftware.smack.tcp;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLSession;

import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
import org.jivesoftware.smack.SmackException;
import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException;
import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException;
import org.jivesoftware.smack.SmackException.SmackCertificateException;
import org.jivesoftware.smack.SmackFuture;
import org.jivesoftware.smack.SmackFuture.InternalSmackFuture;
import org.jivesoftware.smack.SmackReactor.SelectionKeyAttachment;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.XmppInputOutputFilter;
import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnection.ConnectedButUnauthenticatedStateDescriptor;
import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnection.LookupRemoteConnectionEndpointsStateDescriptor;
import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnectionModule;
import org.jivesoftware.smack.c2s.StreamOpenAndCloseFactory;
import org.jivesoftware.smack.c2s.XmppClientToServerTransport;
import org.jivesoftware.smack.c2s.internal.ModularXmppClientToServerConnectionInternal;
import org.jivesoftware.smack.c2s.internal.WalkStateGraphContext;
import org.jivesoftware.smack.debugger.SmackDebugger;
import org.jivesoftware.smack.fsm.State;
import org.jivesoftware.smack.fsm.StateDescriptor;
import org.jivesoftware.smack.fsm.StateTransitionResult;
import org.jivesoftware.smack.internal.SmackTlsContext;
import org.jivesoftware.smack.packet.Stanza;
import org.jivesoftware.smack.packet.StartTls;
import org.jivesoftware.smack.packet.StreamClose;
import org.jivesoftware.smack.packet.StreamOpen;
import org.jivesoftware.smack.packet.TlsFailure;
import org.jivesoftware.smack.packet.TlsProceed;
import org.jivesoftware.smack.packet.TopLevelStreamElement;
import org.jivesoftware.smack.packet.XmlEnvironment;
import org.jivesoftware.smack.tcp.XmppTcpTransportModule.XmppTcpNioTransport.DiscoveredTcpEndpoints;
import org.jivesoftware.smack.tcp.rce.RemoteXmppTcpConnectionEndpoints;
import org.jivesoftware.smack.tcp.rce.RemoteXmppTcpConnectionEndpoints.Result;
import org.jivesoftware.smack.tcp.rce.Rfc6120TcpRemoteConnectionEndpoint;
import org.jivesoftware.smack.util.CollectionUtil;
import org.jivesoftware.smack.util.StringUtils;
import org.jivesoftware.smack.util.UTF8;
import org.jivesoftware.smack.util.XmlStringBuilder;
import org.jivesoftware.smack.util.rce.RemoteConnectionEndpointLookupFailure;

import org.jxmpp.jid.DomainBareJid;
import org.jxmpp.jid.Jid;
import org.jxmpp.jid.util.JidUtil;
import org.jxmpp.xml.splitter.Utf8ByteXmppXmlSplitter;
import org.jxmpp.xml.splitter.XmlPrettyPrinter;
import org.jxmpp.xml.splitter.XmlPrinter;
import org.jxmpp.xml.splitter.XmppElementCallback;
import org.jxmpp.xml.splitter.XmppXmlSplitter;

public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionModule<XmppTcpTransportModuleDescriptor> {

    private static final Logger LOGGER = Logger.getLogger(XmppTcpTransportModule.class.getName());

    private static final int CALLBACK_MAX_BYTES_READ = 10 * 1024 * 1024;
    private static final int CALLBACK_MAX_BYTES_WRITEN = CALLBACK_MAX_BYTES_READ;

    private static final int MAX_ELEMENT_SIZE = 64 * 1024;

    private final XmppTcpNioTransport tcpNioTransport;

    private SelectionKey selectionKey;
    private SelectionKeyAttachment selectionKeyAttachment;
    private SocketChannel socketChannel;
    private InetSocketAddress remoteAddress;

    private TlsState tlsState;

    private Iterator<CharSequence> outgoingCharSequenceIterator;

    private final List<TopLevelStreamElement> currentlyOutgoingElements = new ArrayList<>();
    private final IdentityHashMap<ByteBuffer, List<TopLevelStreamElement>> bufferToElementMap = new IdentityHashMap<>();

    private ByteBuffer outgoingBuffer;
    private ByteBuffer filteredOutgoingBuffer;
    private final List<ByteBuffer> networkOutgoingBuffers = new ArrayList<>();
    private long networkOutgoingBuffersBytes;

    // TODO: Make the size of the incomingBuffer configurable.
    private final ByteBuffer incomingBuffer = ByteBuffer.allocateDirect(2 * 4096);

    private final ReentrantLock channelSelectedCallbackLock = new ReentrantLock();

    private long totalBytesRead;
    private long totalBytesWritten;
    private long totalBytesReadAfterFilter;
    private long totalBytesWrittenBeforeFilter;
    private long handledChannelSelectedCallbacks;
    private long callbackPreemtBecauseBytesWritten;
    private long callbackPreemtBecauseBytesRead;
    private int sslEngineDelegatedTasks;
    private int maxPendingSslEngineDelegatedTasks;

    // TODO: Use LongAdder once Smack's minimum Android API level is 24 or higher.
    private final AtomicLong setWriteInterestAfterChannelSelectedCallback = new AtomicLong();
    private final AtomicLong reactorThreadAlreadyRacing = new AtomicLong();
    private final AtomicLong afterOutgoingElementsQueueModifiedSetInterestOps = new AtomicLong();
    private final AtomicLong rejectedChannelSelectedCallbacks = new AtomicLong();

    private Jid lastDestinationAddress;

    private boolean pendingInputFilterData;
    private boolean pendingOutputFilterData;

    private boolean pendingWriteInterestAfterRead;

    /**
     * Note that this field is effective final, but due to https://stackoverflow.com/q/30360824/194894 we have to declare it non-final.
     */
    private Utf8ByteXmppXmlSplitter splitter;

    /**
     * Note that this field is effective final, but due to https://stackoverflow.com/q/30360824/194894 we have to declare it non-final.
     */
    private XmppXmlSplitter outputDebugSplitter;

    private static final Level STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL = Level.FINER;

    XmppTcpTransportModule(XmppTcpTransportModuleDescriptor moduleDescriptor, ModularXmppClientToServerConnectionInternal connectionInternal) {
        super(moduleDescriptor, connectionInternal);

        tcpNioTransport = new XmppTcpNioTransport(connectionInternal);

        XmlPrinter incomingDebugPrettyPrinter = null;
        final SmackDebugger debugger = connectionInternal.smackDebugger;
        if (debugger != null) {
            // Incoming stream debugging.
            incomingDebugPrettyPrinter = XmlPrettyPrinter.builder()
                    .setPrettyWriter(sb -> debugger.incomingStreamSink(sb))
                    .build();

            // Outgoing stream debugging.
            XmlPrinter outgoingDebugPrettyPrinter = XmlPrettyPrinter.builder()
                    .setPrettyWriter(sb -> debugger.outgoingStreamSink(sb))
                    .build();
            outputDebugSplitter = new XmppXmlSplitter(outgoingDebugPrettyPrinter);
        }

        XmppXmlSplitter xmppXmlSplitter = new XmppXmlSplitter(MAX_ELEMENT_SIZE, xmppElementCallback,
                incomingDebugPrettyPrinter);
        splitter = new Utf8ByteXmppXmlSplitter(xmppXmlSplitter);
    }

    private final XmppElementCallback xmppElementCallback = new XmppElementCallback() {
        private String streamOpen;
        private String streamClose;

        @Override
        public void onCompleteElement(String completeElement) {
            assert streamOpen != null;
            assert streamClose != null;

            connectionInternal.withSmackDebugger(debugger -> debugger.onIncomingElementCompleted());

            String wrappedCompleteElement = streamOpen + completeElement + streamClose;
            connectionInternal.parseAndProcessElement(wrappedCompleteElement);
        }


        @Override
        public void streamOpened(String prefix, Map<String, String> attributes) {
            if (LOGGER.isLoggable(STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL)) {
                LOGGER.log(STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL,
                                "Stream of " + this + " opened. prefix=" + prefix + " attributes=" + attributes);
            }

            final String prefixXmlns = "xmlns:" + prefix;
            // TODO: Use the return value of onStreamOpen(), which now returns the
            // corresponding stream close tag, instead of creating it here.
            final StringBuilder streamClose = new StringBuilder(32);
            final StringBuilder streamOpen = new StringBuilder(256);

            streamOpen.append('<');
            streamClose.append("</");
            if (StringUtils.isNotEmpty(prefix)) {
                streamOpen.append(prefix).append(':');
                streamClose.append(prefix).append(':');
            }
            streamOpen.append("stream");
            streamClose.append("stream>");
            for (Map.Entry<String, String> entry : attributes.entrySet()) {
                String attributeName = entry.getKey();
                String attributeValue = entry.getValue();
                switch (attributeName) {
                case "to":
                case "from":
                case "id":
                case "version":
                    break;
                case "xml:lang":
                    streamOpen.append(" xml:lang='").append(attributeValue).append('\'');
                    break;
                case "xmlns":
                    streamOpen.append(" xmlns='").append(attributeValue).append('\'');
                    break;
                default:
                    if (attributeName.equals(prefixXmlns)) {
                        streamOpen.append(' ').append(prefixXmlns).append("='").append(attributeValue).append('\'');
                        break;
                    }
                    LOGGER.info("Unknown <stream/> attribute: " + attributeName);
                    break;
                }
            }
            streamOpen.append('>');

            this.streamOpen = streamOpen.toString();
            this.streamClose = streamClose.toString();

            connectionInternal.onStreamOpen(this.streamOpen);
        }

        @Override
        public void streamClosed() {
            if (LOGGER.isLoggable(STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL)) {
                LOGGER.log(STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL, "Stream of " + this + " closed");
            }

           connectionInternal.onStreamClosed();
        }
    };

    private void onChannelSelected(SelectableChannel selectedChannel, SelectionKey selectedSelectionKey) {
        assert selectionKey == null || selectionKey == selectedSelectionKey;
        SocketChannel selectedSocketChannel = (SocketChannel) selectedChannel;
        // We are *always* interested in OP_READ.
        int newInterestedOps = SelectionKey.OP_READ;
        boolean newPendingOutputFilterData = false;

        if (!channelSelectedCallbackLock.tryLock()) {
            rejectedChannelSelectedCallbacks.incrementAndGet();
            return;
        }

        handledChannelSelectedCallbacks++;

        long callbackBytesRead = 0;
        long callbackBytesWritten = 0;

        try {
            boolean destinationAddressChanged = false;
            boolean isLastPartOfElement = false;
            TopLevelStreamElement currentlyOutgonigTopLevelStreamElement = null;
            StringBuilder outgoingStreamForDebugger = null;

            writeLoop: while (true) {
                final boolean moreDataAvailable = !isLastPartOfElement || !connectionInternal.outgoingElementsQueue.isEmpty();

                if (filteredOutgoingBuffer != null || !networkOutgoingBuffers.isEmpty()) {
                    if (filteredOutgoingBuffer != null) {
                        networkOutgoingBuffers.add(filteredOutgoingBuffer);
                        networkOutgoingBuffersBytes += filteredOutgoingBuffer.remaining();

                        filteredOutgoingBuffer = null;
                        if (moreDataAvailable && networkOutgoingBuffersBytes < 8096) {
                            continue;
                        }
                    }

                    ByteBuffer[] output = networkOutgoingBuffers.toArray(new ByteBuffer[networkOutgoingBuffers.size()]);
                    long bytesWritten;
                    try {
                        bytesWritten = selectedSocketChannel.write(output);
                    } catch (IOException e) {
                        // We have seen here so far
                        // - IOException "Broken pipe"
                        handleReadWriteIoException(e);
                        break;
                    }

                    if (bytesWritten == 0) {
                        newInterestedOps |= SelectionKey.OP_WRITE;
                        break;
                    }

                    callbackBytesWritten += bytesWritten;

                    networkOutgoingBuffersBytes -= bytesWritten;

                    List<? extends Buffer> prunedBuffers = pruneBufferList(networkOutgoingBuffers);

                    for (Buffer prunedBuffer : prunedBuffers) {
                        List<TopLevelStreamElement> sendElements = bufferToElementMap.remove(prunedBuffer);
                        if (sendElements == null) {
                            continue;
                        }
                        for (TopLevelStreamElement elementJustSend : sendElements) {
                            connectionInternal.fireFirstLevelElementSendListeners(elementJustSend);
                        }
                    }

                    // Prevent one callback from dominating the reactor thread. Break out of the write-loop if we have
                    // written a certain amount.
                    if (callbackBytesWritten > CALLBACK_MAX_BYTES_WRITEN) {
                        newInterestedOps |= SelectionKey.OP_WRITE;
                        callbackPreemtBecauseBytesWritten++;
                        break;
                    }
                } else if (outgoingBuffer != null || pendingOutputFilterData) {
                    pendingOutputFilterData = false;

                    if (outgoingBuffer != null) {
                        totalBytesWrittenBeforeFilter += outgoingBuffer.remaining();
                        if (isLastPartOfElement) {
                            assert currentlyOutgonigTopLevelStreamElement != null;
                            currentlyOutgoingElements.add(currentlyOutgonigTopLevelStreamElement);
                        }
                    }

                    ByteBuffer outputFilterInputData = outgoingBuffer;
                    // We can now null the outgoingBuffer since the filter step will take care of it from now on.
                    outgoingBuffer = null;

                    for (ListIterator<XmppInputOutputFilter> it = connectionInternal.getXmppInputOutputFilterBeginIterator(); it.hasNext();) {
                        XmppInputOutputFilter inputOutputFilter = it.next();
                        XmppInputOutputFilter.OutputResult outputResult;
                        try {
                            outputResult = inputOutputFilter.output(outputFilterInputData, isLastPartOfElement,
                                    destinationAddressChanged, moreDataAvailable);
                        } catch (IOException e) {
                            connectionInternal.notifyConnectionError(e);
                            break writeLoop;
                        }
                        newPendingOutputFilterData |= outputResult.pendingFilterData;
                        outputFilterInputData = outputResult.filteredOutputData;
                        if (outputFilterInputData != null) {
                            outputFilterInputData.flip();
                        }
                    }

                    // It is ok if outpuFilterInputData is 'null' here, this is expected behavior.
                    if (outputFilterInputData != null && outputFilterInputData.hasRemaining()) {
                        filteredOutgoingBuffer = outputFilterInputData;
                    } else {
                        filteredOutgoingBuffer = null;
                    }

                    // If the filters did eventually not produce any output data but if there is
                    // pending output data then we have a pending write request after read.
                    if (filteredOutgoingBuffer == null && newPendingOutputFilterData) {
                        pendingWriteInterestAfterRead = true;
                    }

                    if (filteredOutgoingBuffer != null && isLastPartOfElement) {
                        bufferToElementMap.put(filteredOutgoingBuffer, new ArrayList<>(currentlyOutgoingElements));
                        currentlyOutgoingElements.clear();
                    }

                    // Reset that the destination address has changed.
                    if (destinationAddressChanged) {
                        destinationAddressChanged = false;
                    }
                } else if (outgoingCharSequenceIterator != null) {
                    CharSequence nextCharSequence = outgoingCharSequenceIterator.next();
                    outgoingBuffer = UTF8.encode(nextCharSequence);
                    if (!outgoingCharSequenceIterator.hasNext()) {
                        outgoingCharSequenceIterator = null;
                        isLastPartOfElement = true;
                    } else {
                        isLastPartOfElement = false;
                    }

                    final SmackDebugger debugger = connectionInternal.smackDebugger;
                    if (debugger != null) {
                        if (outgoingStreamForDebugger == null) {
                            outgoingStreamForDebugger = new StringBuilder();
                        }
                        outgoingStreamForDebugger.append(nextCharSequence);

                        if (isLastPartOfElement) {
                            try {
                                outputDebugSplitter.append(outgoingStreamForDebugger);
                            } catch (IOException e) {
                                throw new AssertionError(e);
                            }
                            debugger.onOutgoingElementCompleted();
                            outgoingStreamForDebugger = null;
                        }
                    }
                } else if (!connectionInternal.outgoingElementsQueue.isEmpty()) {
                    currentlyOutgonigTopLevelStreamElement = connectionInternal.outgoingElementsQueue.poll();
                    if (currentlyOutgonigTopLevelStreamElement instanceof Stanza) {
                        Stanza currentlyOutgoingStanza = (Stanza) currentlyOutgonigTopLevelStreamElement;
                        Jid currentDestinationAddress = currentlyOutgoingStanza.getTo();
                        destinationAddressChanged = !JidUtil.equals(lastDestinationAddress, currentDestinationAddress);
                        lastDestinationAddress = currentDestinationAddress;
                    }
                    CharSequence nextCharSequence = currentlyOutgonigTopLevelStreamElement.toXML(StreamOpen.CLIENT_NAMESPACE);
                    if (nextCharSequence instanceof XmlStringBuilder) {
                        XmlStringBuilder xmlStringBuilder = (XmlStringBuilder) nextCharSequence;
                        XmlEnvironment outgoingStreamXmlEnvironment = connectionInternal.getOutgoingStreamXmlEnvironment();
                        outgoingCharSequenceIterator = xmlStringBuilder.toList(outgoingStreamXmlEnvironment).iterator();
                    } else {
                        outgoingCharSequenceIterator = Collections.singletonList(nextCharSequence).iterator();
                    }
                    assert outgoingCharSequenceIterator != null;
                } else {
                    // There is nothing more to write.
                    break;
                }
            }

            pendingOutputFilterData = newPendingOutputFilterData;
            if (!pendingWriteInterestAfterRead && pendingOutputFilterData) {
                newInterestedOps |= SelectionKey.OP_WRITE;
            }

            readLoop: while (true) {
                // Prevent one callback from dominating the reactor thread. Break out of the read-loop if we have
                // read a certain amount.
                if (callbackBytesRead > CALLBACK_MAX_BYTES_READ) {
                    callbackPreemtBecauseBytesRead++;
                    break;
                }

                int bytesRead;
                incomingBuffer.clear();
                try {
                    bytesRead = selectedSocketChannel.read(incomingBuffer);
                } catch (IOException e) {
                    handleReadWriteIoException(e);
                    return;
                }

                if (bytesRead < 0) {
                    LOGGER.finer("NIO read() returned " + bytesRead
                            + " for " + this + ". This probably means that the TCP connection was terminated.");
                    // According to the socket channel javadoc section about "asynchronous reads" a socket channel's
                    // read() may return -1 if the input side of a socket is shut down.
                     // Note that we do not call notifyConnectionError() here because the connection may be
                    // cleanly shutdown which would also cause read() to return '-1. I assume that this socket
                    // will be selected again, on which read() would throw an IOException, which will be catched
                    // and invoke notifyConnectionError() (see a few lines above).
                    /*
                    IOException exception = new IOException("NIO read() returned " + bytesRead);
                    notifyConnectionError(exception);
                    */
                    return;
                }

                if (!pendingInputFilterData) {
                    if (bytesRead == 0) {
                        // Nothing more to read.
                        break;
                    }
                } else {
                    pendingInputFilterData = false;
                }

                if (pendingWriteInterestAfterRead) {
                    // We have successfully read something and someone announced a write interest after a read. It is
                    // now possible that a filter is now also able to write additional data (for example SSLEngine).
                    pendingWriteInterestAfterRead = false;
                    newInterestedOps |= SelectionKey.OP_WRITE;
                }

                callbackBytesRead += bytesRead;

                ByteBuffer filteredIncomingBuffer = incomingBuffer;
                for (ListIterator<XmppInputOutputFilter> it = connectionInternal.getXmppInputOutputFilterEndIterator(); it.hasPrevious();) {
                    filteredIncomingBuffer.flip();

                    ByteBuffer newFilteredIncomingBuffer;
                    try {
                        newFilteredIncomingBuffer = it.previous().input(filteredIncomingBuffer);
                    } catch (IOException e) {
                        connectionInternal.notifyConnectionError(e);
                        return;
                    }
                    if (newFilteredIncomingBuffer == null) {
                        break readLoop;
                    }
                    filteredIncomingBuffer = newFilteredIncomingBuffer;
                }

                final int bytesReadAfterFilter = filteredIncomingBuffer.flip().remaining();

                totalBytesReadAfterFilter += bytesReadAfterFilter;

                try {
                    splitter.write(filteredIncomingBuffer);
                } catch (IOException e) {
                    connectionInternal.notifyConnectionError(e);
                    return;
                }
            }
        } finally {
            totalBytesWritten += callbackBytesWritten;
            totalBytesRead += callbackBytesRead;

            channelSelectedCallbackLock.unlock();
        }

        // Indicate that there is no reactor thread racing towards handling this selection key.
        final SelectionKeyAttachment selectionKeyAttachment = this.selectionKeyAttachment;
        if (selectionKeyAttachment != null) {
            selectionKeyAttachment.resetReactorThreadRacing();
        }

        // Check the queue again to prevent lost wakeups caused by elements inserted before we
        // called resetReactorThreadRacing() a few lines above.
        if (!connectionInternal.outgoingElementsQueue.isEmpty()) {
            setWriteInterestAfterChannelSelectedCallback.incrementAndGet();
            newInterestedOps |= SelectionKey.OP_WRITE;
        }

        connectionInternal.setInterestOps(selectionKey, newInterestedOps);
    }

    private void handleReadWriteIoException(IOException e) {
        if (e instanceof ClosedChannelException && !tcpNioTransport.isConnected()) {
            // The connection is already closed.
            return;
        }

       connectionInternal.notifyConnectionError(e);
    }

    /**
     * This is the interface between the "lookup remote connection endpoints" state and the "establish TCP connection"
     * state. The field is indirectly populated by {@link XmppTcpNioTransport#lookupConnectionEndpoints()} and consumed
     * by {@link ConnectionAttemptState}.
     */
    DiscoveredTcpEndpoints discoveredTcpEndpoints;

    final class XmppTcpNioTransport extends XmppClientToServerTransport {

        XmppTcpNioTransport(ModularXmppClientToServerConnectionInternal connectionInternal) {
            super(connectionInternal);
        }

        @Override
        public StreamOpenAndCloseFactory getStreamOpenAndCloseFactory() {
            return new StreamOpenAndCloseFactory() {
                @Override
                public StreamOpen createStreamOpen(DomainBareJid to, CharSequence from, String id, String lang) {
                    String xmlLang = connectionInternal.connection.getConfiguration().getXmlLang();
                    StreamOpen streamOpen = new StreamOpen(to, from, id, xmlLang, StreamOpen.StreamContentNamespace.client);
                    return streamOpen;
                }
                @Override
                public StreamClose createStreamClose() {
                    return StreamClose.INSTANCE;
                }
            };
        }

        @Override
        protected void resetDiscoveredConnectionEndpoints() {
            discoveredTcpEndpoints = null;
        }

        @Override
        public boolean hasUseableConnectionEndpoints() {
            return discoveredTcpEndpoints != null;
        }

        @Override
        protected List<SmackFuture<LookupConnectionEndpointsResult, Exception>> lookupConnectionEndpoints() {
            // Assert that there are no stale discovered endpoints prior performing the lookup.
            assert discoveredTcpEndpoints == null;

            List<SmackFuture<LookupConnectionEndpointsResult, Exception>> futures = new ArrayList<>(2);

            InternalSmackFuture<LookupConnectionEndpointsResult, Exception> tcpEndpointsLookupFuture = new InternalSmackFuture<>();
            connectionInternal.asyncGo(() -> {
                Result<Rfc6120TcpRemoteConnectionEndpoint> result = RemoteXmppTcpConnectionEndpoints.lookup(
                                connectionInternal.connection.getConfiguration());

                LookupConnectionEndpointsResult endpointsResult;
                if (result.discoveredRemoteConnectionEndpoints.isEmpty()) {
                    endpointsResult = new TcpEndpointDiscoveryFailed(result);
                } else {
                    endpointsResult = new DiscoveredTcpEndpoints(result);
                }
                tcpEndpointsLookupFuture.setResult(endpointsResult);
            });
            futures.add(tcpEndpointsLookupFuture);

            if (moduleDescriptor.isDirectTlsEnabled()) {
                // TODO: Implement this.
                throw new IllegalArgumentException("DirectTLS is not implemented yet");
            }

            return futures;
        }

        @Override
        protected void loadConnectionEndpoints(LookupConnectionEndpointsSuccess lookupConnectionEndpointsSuccess) {
            // The API contract stats that we will be given the instance we handed out with lookupConnectionEndpoints,
            // which must be of type DiscoveredTcpEndpoints here. Hence if we can not cast it, then there is an internal
            // Smack error.
            discoveredTcpEndpoints = (DiscoveredTcpEndpoints) lookupConnectionEndpointsSuccess;
        }

        @Override
        protected void afterFiltersClosed() {
            pendingInputFilterData = pendingOutputFilterData = true;
            afterOutgoingElementsQueueModified();
        }

        @Override
        protected void disconnect() {
            XmppTcpTransportModule.this.closeSocketAndCleanup();
        }

        @Override
        protected void notifyAboutNewOutgoingElements() {
            afterOutgoingElementsQueueModified();
        }

        @Override
        public SSLSession getSslSession() {
            TlsState tlsState = XmppTcpTransportModule.this.tlsState;
            if (tlsState == null) {
                return null;
            }

            return tlsState.engine.getSession();
        }

        public boolean isConnected() {
            SocketChannel socketChannel = XmppTcpTransportModule.this.socketChannel;
            if (socketChannel == null) {
                return false;
            }

            return socketChannel.isConnected();
        }

        @Override
        public boolean isTransportSecured() {
            final TlsState tlsState = XmppTcpTransportModule.this.tlsState;
            return tlsState != null && tlsState.handshakeStatus == TlsHandshakeStatus.successful;
        }

        @Override
        public XmppTcpTransportModule.Stats getStats() {
            return XmppTcpTransportModule.this.getStats();
        }

        final class DiscoveredTcpEndpoints implements LookupConnectionEndpointsSuccess {
            final RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result;
            DiscoveredTcpEndpoints(RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result) {
                this.result = result;
            }
        }

        final class TcpEndpointDiscoveryFailed implements LookupConnectionEndpointsFailed {
            final List<RemoteConnectionEndpointLookupFailure> lookupFailures;
            TcpEndpointDiscoveryFailed(RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result) {
                lookupFailures = result.lookupFailures;
            }
        }
    }

    private void afterOutgoingElementsQueueModified() {
        final SelectionKeyAttachment selectionKeyAttachment = this.selectionKeyAttachment;
        if (selectionKeyAttachment != null && selectionKeyAttachment.isReactorThreadRacing()) {
            // A reactor thread is already racing to the channel selected callback and will take care of this.
            reactorThreadAlreadyRacing.incrementAndGet();
            return;
        }

        afterOutgoingElementsQueueModifiedSetInterestOps.incrementAndGet();

        // Add OP_WRITE to the interested Ops, since we have now new things to write. Note that this may cause
        // multiple reactor threads to race to the channel selected callback in case we perform this right after
        // a select() returned with this selection key in the selected-key set. Hence we use tryLock() in the
        // channel selected callback to keep the invariant that only exactly one thread is performing the
        // callback.
        // Note that we need to perform setInterestedOps() *without* holding the channelSelectedCallbackLock, as
        // otherwise the reactor thread racing to the channel selected callback may found the lock still locked, which
        // would result in the outgoingElementsQueue not being handled.
        connectionInternal.setInterestOps(selectionKey, SelectionKey.OP_WRITE | SelectionKey.OP_READ);
    }

    @Override
    protected XmppTcpNioTransport getTransport() {
        return tcpNioTransport;
    }

    static final class EstablishingTcpConnectionStateDescriptor extends StateDescriptor {
        private EstablishingTcpConnectionStateDescriptor() {
            super(XmppTcpTransportModule.EstablishingTcpConnectionState.class);
            addPredeccessor(LookupRemoteConnectionEndpointsStateDescriptor.class);
            addSuccessor(EstablishTlsStateDescriptor.class);
            addSuccessor(ConnectedButUnauthenticatedStateDescriptor.class);
        }

        @Override
        protected XmppTcpTransportModule.EstablishingTcpConnectionState constructState(ModularXmppClientToServerConnectionInternal connectionInternal) {
            XmppTcpTransportModule tcpTransportModule = connectionInternal.connection.getConnectionModuleFor(XmppTcpTransportModuleDescriptor.class);
            return tcpTransportModule.constructEstablishingTcpConnectionState(this, connectionInternal);
        }
    }

    private EstablishingTcpConnectionState constructEstablishingTcpConnectionState(
                    EstablishingTcpConnectionStateDescriptor stateDescriptor,
                    ModularXmppClientToServerConnectionInternal connectionInternal) {
        return new EstablishingTcpConnectionState(stateDescriptor, connectionInternal);
    }

    final class EstablishingTcpConnectionState extends State.AbstractTransport {
        private EstablishingTcpConnectionState(EstablishingTcpConnectionStateDescriptor stateDescriptor,
                        ModularXmppClientToServerConnectionInternal connectionInternal) {
            super(tcpNioTransport, stateDescriptor, connectionInternal);
        }

        @Override
        public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext)
                        throws InterruptedException, IOException, SmackException, XMPPException {
            // The fields inetSocketAddress and failedAddresses are handed over from LookupHostAddresses to
            // ConnectingToHost.
            ConnectionAttemptState connectionAttemptState = new ConnectionAttemptState(connectionInternal, discoveredTcpEndpoints,
                    this);
            StateTransitionResult.Failure failure = connectionAttemptState.establishTcpConnection();
            if (failure != null) {
                return failure;
            }

            socketChannel = connectionAttemptState.socketChannel;
            remoteAddress = (InetSocketAddress) socketChannel.socket().getRemoteSocketAddress();

            selectionKey = connectionInternal.registerWithSelector(socketChannel, SelectionKey.OP_READ,
                            XmppTcpTransportModule.this::onChannelSelected);
            selectionKeyAttachment = (SelectionKeyAttachment) selectionKey.attachment();

            connectionInternal.setTransport(tcpNioTransport);

            // TODO: It appears this should be done in a generic way. I'd assume we always
            // have to wait for stream features after the connection was established. If this is true then consider
            // moving this into State.AbstractTransport. But I am not yet 100% positive that this is the case for every
            // transport. Hence keep it here for now.
            connectionInternal.newStreamOpenWaitForFeaturesSequence("stream features after initial connection");

            return new TcpSocketConnectedResult(remoteAddress);
        }

        @Override
        public void resetState() {
            closeSocketAndCleanup();
        }
    }

    public static final class TcpSocketConnectedResult extends StateTransitionResult.Success {
        private final InetSocketAddress remoteAddress;

        private TcpSocketConnectedResult(InetSocketAddress remoteAddress) {
            super("TCP connection established to " + remoteAddress);
            this.remoteAddress = remoteAddress;
        }

        public InetSocketAddress getRemoteAddress() {
            return remoteAddress;
        }
    }

    public static final class TlsEstablishedResult extends StateTransitionResult.Success {

        private TlsEstablishedResult(SSLEngine sslEngine) {
            super("TLS established: " + sslEngine.getSession());
        }
    }

    static final class EstablishTlsStateDescriptor extends StateDescriptor {
        private EstablishTlsStateDescriptor() {
            super(XmppTcpTransportModule.EstablishTlsState.class, "RFC 6120 ยง 5");
            addSuccessor(ConnectedButUnauthenticatedStateDescriptor.class);
            declarePrecedenceOver(ConnectedButUnauthenticatedStateDescriptor.class);
        }

        @Override
        protected EstablishTlsState constructState(ModularXmppClientToServerConnectionInternal connectionInternal) {
            XmppTcpTransportModule tcpTransportModule = connectionInternal.connection.getConnectionModuleFor(XmppTcpTransportModuleDescriptor.class);
            return tcpTransportModule.constructEstablishingTlsState(this, connectionInternal);
        }
    }

    private EstablishTlsState constructEstablishingTlsState(
                    EstablishTlsStateDescriptor stateDescriptor,
                    ModularXmppClientToServerConnectionInternal connectionInternal) {
        return new EstablishTlsState(stateDescriptor, connectionInternal);
    }

    private final class EstablishTlsState extends State {
        private EstablishTlsState(EstablishTlsStateDescriptor stateDescriptor,
                        ModularXmppClientToServerConnectionInternal connectionInternal) {
            super(stateDescriptor, connectionInternal);
        }

        @Override
        public StateTransitionResult.TransitionImpossible isTransitionToPossible(WalkStateGraphContext walkStateGraphContext)
                throws SecurityRequiredByClientException, SecurityRequiredByServerException {
            StartTls startTlsFeature = connectionInternal.connection.getFeature(StartTls.class);
            SecurityMode securityMode = connectionInternal.connection.getConfiguration().getSecurityMode();

            switch (securityMode) {
            case required:
            case ifpossible:
                if (startTlsFeature == null) {
                    if (securityMode == SecurityMode.ifpossible) {
                        return new StateTransitionResult.TransitionImpossibleReason("Server does not announce support for TLS and we do not required it");
                    }
                    throw new SecurityRequiredByClientException();
                }
                // Allows transition by returning null.
                return null;
            case disabled:
                if (startTlsFeature != null && startTlsFeature.required()) {
                    throw new SecurityRequiredByServerException();
                }
                return new StateTransitionResult.TransitionImpossibleReason("TLS disabled in client settings and server does not require it");
            default:
                throw new AssertionError("Unknown security mode: " + securityMode);
            }
        }

        @Override
        public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext)
                        throws IOException, InterruptedException, SmackException, XMPPException {
            connectionInternal.sendAndWaitForResponse(StartTls.INSTANCE, TlsProceed.class, TlsFailure.class);

            SmackTlsContext smackTlsContext = connectionInternal.getSmackTlsContext();

            tlsState = new TlsState(smackTlsContext);
            connectionInternal.addXmppInputOutputFilter(tlsState);

            channelSelectedCallbackLock.lock();
            try {
                pendingOutputFilterData = true;
                // The beginHandshake() is possibly not really required here, but it does not hurt either.
                tlsState.engine.beginHandshake();
                tlsState.handshakeStatus = TlsHandshakeStatus.initiated;
            } finally {
                channelSelectedCallbackLock.unlock();
            }
            connectionInternal.setInterestOps(selectionKey, SelectionKey.OP_WRITE | SelectionKey.OP_READ);

            try {
                tlsState.waitForHandshakeFinished();
            } catch (CertificateException e) {
                throw new SmackCertificateException(e);
            }

            connectionInternal.newStreamOpenWaitForFeaturesSequence("stream features after TLS established");

            return new TlsEstablishedResult(tlsState.engine);
        }

        @Override
        public void resetState() {
            tlsState = null;
        }
    }

    private enum TlsHandshakeStatus {
        initial,
        initiated,
        successful,
        failed,
    }

    private static final Level SSL_ENGINE_DEBUG_LOG_LEVEL = Level.FINEST;

    private static void debugLogSslEngineResult(String operation, SSLEngineResult result) {
        if (!LOGGER.isLoggable(SSL_ENGINE_DEBUG_LOG_LEVEL)) {
            return;
        }

        LOGGER.log(SSL_ENGINE_DEBUG_LOG_LEVEL, "SSLEngineResult of " + operation + "(): " + result);
    }

    private final class TlsState implements XmppInputOutputFilter {

        private static final int MAX_PENDING_OUTPUT_BYTES = 8096;

        private final SmackTlsContext smackTlsContext;
        private final SSLEngine engine;

        private TlsHandshakeStatus handshakeStatus = TlsHandshakeStatus.initial;
        private SSLException handshakeException;

        private ByteBuffer myNetData;
        private ByteBuffer peerAppData;

        private final List<ByteBuffer> pendingOutputData = new ArrayList<>();
        private int pendingOutputBytes;
        private ByteBuffer pendingInputData;

        private final AtomicInteger pendingDelegatedTasks = new AtomicInteger();

        private long wrapInBytes;
        private long wrapOutBytes;

        private long unwrapInBytes;
        private long unwrapOutBytes;

        private TlsState(SmackTlsContext smackTlsContext) throws IOException {
            this.smackTlsContext = smackTlsContext;

            // Call createSSLEngine()'s variant with two parameters as this allows for TLS session resumption.

            // Note that it is not really clear what the value of peer host should be. It could be A) the XMPP service's
            // domainpart or B) the DNS name of the host we are connecting to (usually the DNS SRV RR target name). While
            // the javadoc of createSSLEngine(String, int) indicates with "Some cipher suites (such as Kerberos) require
            // remote hostname information, in which case peerHost needs to be specified." that A should be used. TLS
            // session resumption may would need or at least benefit from B. Variant A would also be required if the
            // String is used for certificate verification. And it appears at least likely that TLS session resumption
            // would not be hurt by using variant A. Therefore we currently use variant A.
            // TODO: Should we use the ACE representation of the XMPP service domain? Compare with f60e4055ec529f0b8160acedf13275592ab10a4b
            // If yes, then we should probably introduce getXmppServiceDomainAceEncodedIfPossible().
            String peerHost = connectionInternal.connection.getConfiguration().getXMPPServiceDomain().toString();
            engine = smackTlsContext.sslContext.createSSLEngine(peerHost, remoteAddress.getPort());
            engine.setUseClientMode(true);

            SSLSession session = engine.getSession();
            int applicationBufferSize = session.getApplicationBufferSize();
            int packetBufferSize = session.getPacketBufferSize();

            myNetData = ByteBuffer.allocateDirect(packetBufferSize);
            peerAppData = ByteBuffer.allocate(applicationBufferSize);
        }

        @Override
        public OutputResult output(ByteBuffer outputData, boolean isFinalDataOfElement, boolean destinationAddressChanged,
                boolean moreDataAvailable) throws SSLException {
            if (outputData != null) {
                pendingOutputData.add(outputData);
                pendingOutputBytes += outputData.remaining();
                if (moreDataAvailable && pendingOutputBytes < MAX_PENDING_OUTPUT_BYTES) {
                    return OutputResult.NO_OUTPUT;
                }
            }

            ByteBuffer[] outputDataArray = pendingOutputData.toArray(new ByteBuffer[pendingOutputData.size()]);

            myNetData.clear();

            while (true) {
                SSLEngineResult result;
                try {
                    result = engine.wrap(outputDataArray, myNetData);
                } catch (SSLException e) {
                    handleSslException(e);
                    throw e;
                }

                debugLogSslEngineResult("wrap", result);

                SSLEngineResult.Status engineResultStatus = result.getStatus();

                pendingOutputBytes -= result.bytesConsumed();

                if (engineResultStatus == SSLEngineResult.Status.OK) {
                    wrapInBytes += result.bytesConsumed();
                    wrapOutBytes += result.bytesProduced();

                    SSLEngineResult.HandshakeStatus handshakeStatus = handleHandshakeStatus(result);
                    switch (handshakeStatus) {
                        case NEED_UNWRAP:
                            // NEED_UNWRAP means that we need to receive something in order to continue the handshake. The
                            // standard channelSelectedCallback logic will take care of this, as there is eventually always
                            // a interest to read from the socket.
                            break;
                        case NEED_WRAP:
                            // Same as need task: Cycle the reactor.
                        case NEED_TASK:
                            // Note that we also set pendingOutputFilterData in the OutputResult in the NEED_TASK case, as
                            // we also want to retry the wrap() operation above in this case.
                            return new OutputResult(true, myNetData);
                        default:
                            break;
                    }
                }

                switch (engineResultStatus) {
                case OK:
                    // No need to outputData.compact() here, since we do not reuse the buffer.
                    // Clean up the pending output data.
                    pruneBufferList(pendingOutputData);
                    return new OutputResult(!pendingOutputData.isEmpty(), myNetData);
                case CLOSED:
                    pendingOutputData.clear();
                    return OutputResult.NO_OUTPUT;
                case BUFFER_OVERFLOW:
                    LOGGER.warning("SSLEngine status BUFFER_OVERFLOW, this is hopefully uncommon");
                    int outputDataRemaining = outputData != null ? outputData.remaining() : 0;
                    int newCapacity = (int) (1.3 * outputDataRemaining);
                    // If newCapacity would not increase myNetData, then double it.
                    if (newCapacity <= myNetData.capacity()) {
                        newCapacity = 2 * myNetData.capacity();
                    }
                    ByteBuffer newMyNetData = ByteBuffer.allocateDirect(newCapacity);
                    myNetData.flip();
                    newMyNetData.put(myNetData);
                    myNetData = newMyNetData;
                    continue;
                case BUFFER_UNDERFLOW:
                    throw new IllegalStateException(
                            "Buffer underflow as result of SSLEngine.wrap() should never happen");
                }
            }
        }

        @SuppressWarnings("ReferenceEquality")
        @Override
        public ByteBuffer input(ByteBuffer inputData) throws SSLException {
            ByteBuffer accumulatedData;
            if (pendingInputData == null) {
                accumulatedData = inputData;
            } else {
                assert pendingInputData != inputData;

                int accumulatedDataBytes = pendingInputData.remaining() + inputData.remaining();
                accumulatedData = ByteBuffer.allocate(accumulatedDataBytes);
                accumulatedData.put(pendingInputData)
                               .put(inputData)
                               .flip();
                pendingInputData = null;
            }

            peerAppData.clear();

            while (true) {
                SSLEngineResult result;
                try {
                    result = engine.unwrap(accumulatedData, peerAppData);
                } catch (SSLException e) {
                    handleSslException(e);
                    throw e;
                }

                debugLogSslEngineResult("unwrap", result);

                SSLEngineResult.Status engineResultStatus = result.getStatus();

                if (engineResultStatus == SSLEngineResult.Status.OK) {
                    unwrapInBytes += result.bytesConsumed();
                    unwrapOutBytes += result.bytesProduced();

                    SSLEngineResult.HandshakeStatus handshakeStatus = handleHandshakeStatus(result);
                    switch (handshakeStatus) {
                    case NEED_TASK:
                        // A delegated task is asynchronously running. Take care of the remaining accumulatedData.
                        addAsPendingInputData(accumulatedData);
                        // Return here, as the async task created by handleHandshakeStatus will continue calling the
                        // cannelSelectedCallback.
                        return null;
                    case NEED_UNWRAP:
                        continue;
                    case NEED_WRAP:
                        // NEED_WRAP means that the SSLEngine needs to send data, probably without consuming data.
                        // We exploit here the fact that the channelSelectedCallback is single threaded and that the
                        // input processing is after the output processing.
                        addAsPendingInputData(accumulatedData);
                        // Note that it is ok that we the provided argument for pending input filter data to channel
                        // selected callback is false, as setPendingInputFilterData() will have set the internal state
                        // boolean accordingly.
                        connectionInternal.asyncGo(() -> callChannelSelectedCallback(false, true));
                        // Do not break here, but instead return and let the asynchronously invoked
                        // callChannelSelectedCallback() do its work.
                        return null;
                    default:
                        break;
                    }
                }

                switch (engineResultStatus) {
                case OK:
                    // SSLEngine's unwrap() may not consume all bytes from the source buffer. If this is the case, then
                    // simply perform another unwrap until accumlatedData has no remaining bytes.
                    if (accumulatedData.hasRemaining()) {
                        continue;
                    }
                    return peerAppData;
                case CLOSED:
                    return null;
                case BUFFER_UNDERFLOW:
                    // There were not enough source bytes available to make a complete packet. Let it in
                    // pendingInputData. Note that we do not resize SSLEngine's source buffer - inputData in our case -
                    // as it is not possible.
                    addAsPendingInputData(accumulatedData);
                    return null;
                case BUFFER_OVERFLOW:
                    int applicationBufferSize = engine.getSession().getApplicationBufferSize();
                    assert peerAppData.remaining() < applicationBufferSize;
                    peerAppData = ByteBuffer.allocate(applicationBufferSize);
                    continue;
                }
            }
        }

        private void addAsPendingInputData(ByteBuffer byteBuffer) {
            // Note that we can not simply write
            // pendingInputData = byteBuffer;
            // we have to copy the provided byte buffer, because it is possible that this byteBuffer is re-used by some
            // higher layer. That is, here 'byteBuffer' is typically 'incomingBuffer', which is a direct buffer only
            // allocated once per connection for performance reasons and hence re-used for read() calls.
            pendingInputData = ByteBuffer.allocate(byteBuffer.remaining());
            pendingInputData.put(byteBuffer).flip();

            pendingInputFilterData = pendingInputData.hasRemaining();
        }

        private SSLEngineResult.HandshakeStatus handleHandshakeStatus(SSLEngineResult sslEngineResult) {
            SSLEngineResult.HandshakeStatus handshakeStatus = sslEngineResult.getHandshakeStatus();
            switch (handshakeStatus) {
            case NEED_TASK:
                while (true) {
                    final Runnable delegatedTask = engine.getDelegatedTask();
                    if (delegatedTask == null) {
                        break;
                    }
                    sslEngineDelegatedTasks++;
                    int currentPendingDelegatedTasks = pendingDelegatedTasks.incrementAndGet();
                    if (currentPendingDelegatedTasks > maxPendingSslEngineDelegatedTasks) {
                        maxPendingSslEngineDelegatedTasks = currentPendingDelegatedTasks;
                    }

                    Runnable wrappedDelegatedTask = () -> {
                        delegatedTask.run();
                        int wrappedCurrentPendingDelegatedTasks = pendingDelegatedTasks.decrementAndGet();
                        if (wrappedCurrentPendingDelegatedTasks == 0) {
                            callChannelSelectedCallback(true, true);
                        }
                    };
                    connectionInternal.asyncGo(wrappedDelegatedTask);
                }
                break;
            case FINISHED:
                onHandshakeFinished();
                break;
            default:
                break;
            }

            SSLEngineResult.HandshakeStatus afterHandshakeStatus = engine.getHandshakeStatus();
            return afterHandshakeStatus;
        }

        private void handleSslException(SSLException e) {
            handshakeException = e;
            handshakeStatus = TlsHandshakeStatus.failed;
            connectionInternal.notifyWaitingThreads();
        }

        private void onHandshakeFinished() {
            handshakeStatus = TlsHandshakeStatus.successful;
            connectionInternal.notifyWaitingThreads();
        }

        private boolean isHandshakeFinished() {
            return handshakeStatus == TlsHandshakeStatus.successful || handshakeStatus == TlsHandshakeStatus.failed;
        }

        private void waitForHandshakeFinished() throws InterruptedException, CertificateException, SSLException, SmackException, XMPPException {
            connectionInternal.waitForConditionOrThrowConnectionException(() -> isHandshakeFinished(), "TLS handshake to finish");

            if (handshakeStatus == TlsHandshakeStatus.failed) {
                throw handshakeException;
            }

            assert handshakeStatus == TlsHandshakeStatus.successful;

            if (smackTlsContext.daneVerifier != null) {
                smackTlsContext.daneVerifier.finish(engine.getSession());
            }
        }

        @Override
        public Object getStats() {
            return new TlsStateStats(this);
        }

        @Override
        public void closeInputOutput() {
            engine.closeOutbound();
            try {
                engine.closeInbound();
            } catch (SSLException e) {
                LOGGER.log(Level.FINEST,
                        "SSLException when closing inbound TLS session. This can likely be ignored if a possible truncation attack is suggested."
                        + " You may want to ask your XMPP server vendor to implement a clean TLS session shutdown sending close_notify after </stream>",
                        e);
            }
        }

        @Override
        public void waitUntilInputOutputClosed() throws IOException, CertificateException, InterruptedException,
                SmackException, XMPPException {
            waitForHandshakeFinished();
        }

        @Override
        public String getFilterName() {
            return "TLS (" + engine + ')';
        }
    }

    public static final class TlsStateStats {
        public final long wrapInBytes;
        public final long wrapOutBytes;
        public final double wrapRatio;

        public final long unwrapInBytes;
        public final long unwrapOutBytes;
        public final double unwrapRatio;

        private TlsStateStats(TlsState tlsState) {
            wrapOutBytes = tlsState.wrapOutBytes;
            wrapInBytes = tlsState.wrapInBytes;
            wrapRatio = (double) wrapOutBytes / wrapInBytes;

            unwrapOutBytes = tlsState.unwrapOutBytes;
            unwrapInBytes = tlsState.unwrapInBytes;
            unwrapRatio = (double) unwrapInBytes / unwrapOutBytes;
        }

        private transient String toStringCache;

        @Override
        public String toString() {
            if (toStringCache != null) {
                return toStringCache;
            }

            toStringCache =
                      "wrap-in-bytes: " + wrapInBytes + '\n'
                    + "wrap-out-bytes: " + wrapOutBytes + '\n'
                    + "wrap-ratio: " + wrapRatio + '\n'
                    + "unwrap-in-bytes: " + unwrapInBytes + '\n'
                    + "unwrap-out-bytes: " + unwrapOutBytes + '\n'
                    + "unwrap-ratio: " + unwrapRatio + '\n'
                    ;

            return toStringCache;
        }
    }

    private void callChannelSelectedCallback(boolean setPendingInputFilterData, boolean setPendingOutputFilterData) {
        final SocketChannel channel = socketChannel;
        final SelectionKey key = selectionKey;
        if (channel == null || key == null) {
            LOGGER.info("Not calling channel selected callback because the connection was eventually disconnected");
            return;
        }

        channelSelectedCallbackLock.lock();
        try {
            // Note that it is important that we send the pending(Input|Output)FilterData flags while holding the lock.
            if (setPendingInputFilterData) {
                pendingInputFilterData = true;
            }
            if (setPendingOutputFilterData) {
                pendingOutputFilterData = true;
            }

            onChannelSelected(channel, key);
        } finally {
            channelSelectedCallbackLock.unlock();
        }
    }

    private void closeSocketAndCleanup() {
        final SelectionKey selectionKey = this.selectionKey;
        if (selectionKey != null) {
            selectionKey.cancel();
        }
        final SocketChannel socketChannel = this.socketChannel;
        if (socketChannel != null) {
            try {
                socketChannel.close();
            } catch (IOException e) {
                LOGGER.log(Level.FINE, "Closing the socket channel failed", e);
            }
        }

        this.selectionKey = null;
        this.socketChannel = null;

        selectionKeyAttachment = null;
        remoteAddress = null;
    }

    private static List<? extends Buffer> pruneBufferList(Collection<? extends Buffer> buffers) {
        return CollectionUtil.removeUntil(buffers, b -> b.hasRemaining());
    }

    public XmppTcpTransportModule.Stats getStats() {
        return new Stats(this);
    }

    public static final class Stats extends XmppClientToServerTransport.Stats {
        public final long totalBytesWritten;
        public final long totalBytesWrittenBeforeFilter;
        public final double writeRatio;

        public final long totalBytesRead;
        public final long totalBytesReadAfterFilter;
        public final double readRatio;

        public final long handledChannelSelectedCallbacks;
        public final long setWriteInterestAfterChannelSelectedCallback;
        public final long reactorThreadAlreadyRacing;
        public final long afterOutgoingElementsQueueModifiedSetInterestOps;
        public final long rejectedChannelSelectedCallbacks;
        public final long totalCallbackRequests;
        public final long callbackPreemtBecauseBytesWritten;
        public final long callbackPreemtBecauseBytesRead;
        public final int sslEngineDelegatedTasks;
        public final int maxPendingSslEngineDelegatedTasks;

        private Stats(XmppTcpTransportModule connection) {
            totalBytesWritten = connection.totalBytesWritten;
            totalBytesWrittenBeforeFilter = connection.totalBytesWrittenBeforeFilter;
            writeRatio = (double) totalBytesWritten / totalBytesWrittenBeforeFilter;

            totalBytesReadAfterFilter = connection.totalBytesReadAfterFilter;
            totalBytesRead = connection.totalBytesRead;
            readRatio = (double) totalBytesRead / totalBytesReadAfterFilter;

            handledChannelSelectedCallbacks = connection.handledChannelSelectedCallbacks;
            setWriteInterestAfterChannelSelectedCallback = connection.setWriteInterestAfterChannelSelectedCallback.get();
            reactorThreadAlreadyRacing = connection.reactorThreadAlreadyRacing.get();
            afterOutgoingElementsQueueModifiedSetInterestOps = connection.afterOutgoingElementsQueueModifiedSetInterestOps
                    .get();
            rejectedChannelSelectedCallbacks = connection.rejectedChannelSelectedCallbacks.get();

            totalCallbackRequests = handledChannelSelectedCallbacks + rejectedChannelSelectedCallbacks;

            callbackPreemtBecauseBytesRead = connection.callbackPreemtBecauseBytesRead;
            callbackPreemtBecauseBytesWritten = connection.callbackPreemtBecauseBytesWritten;

            sslEngineDelegatedTasks = connection.sslEngineDelegatedTasks;
            maxPendingSslEngineDelegatedTasks = connection.maxPendingSslEngineDelegatedTasks;
        }

        private transient String toStringCache;

        @Override
        public String toString() {
            if (toStringCache != null) {
                return toStringCache;
            }

            toStringCache =
              "Total bytes\n"
            + "recv: " + totalBytesRead + '\n'
            + "send: " + totalBytesWritten + '\n'
            + "recv-aft-filter: " + totalBytesReadAfterFilter + '\n'
            + "send-bef-filter: " + totalBytesWrittenBeforeFilter + '\n'
            + "read-ratio: " + readRatio + '\n'
            + "write-ratio: " + writeRatio + '\n'
            + "Events\n"
            + "total-callback-requests: " + totalCallbackRequests + '\n'
            + "handled-channel-selected-callbacks: " + handledChannelSelectedCallbacks + '\n'
            + "rejected-channel-selected-callbacks: " + rejectedChannelSelectedCallbacks + '\n'
            + "set-write-interest-after-callback: " + setWriteInterestAfterChannelSelectedCallback + '\n'
            + "reactor-thread-already-racing: " + reactorThreadAlreadyRacing + '\n'
            + "after-queue-modified-set-interest-ops: " + afterOutgoingElementsQueueModifiedSetInterestOps + '\n'
            + "callback-preemt-because-bytes-read: " + callbackPreemtBecauseBytesRead + '\n'
            + "callback-preemt-because-bytes-written: " + callbackPreemtBecauseBytesWritten + '\n'
            + "ssl-engine-delegated-tasks: " + sslEngineDelegatedTasks + '\n'
            + "max-pending-ssl-engine-delegated-tasks: " + maxPendingSslEngineDelegatedTasks + '\n'
            ;

            return toStringCache;
        }
    }
}