001/**
002 *
003 * Copyright 2019-2021 Florian Schmaus
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack.tcp;
018
019import java.io.IOException;
020import java.net.InetSocketAddress;
021import java.nio.Buffer;
022import java.nio.ByteBuffer;
023import java.nio.channels.ClosedChannelException;
024import java.nio.channels.SelectableChannel;
025import java.nio.channels.SelectionKey;
026import java.nio.channels.SocketChannel;
027import java.security.cert.CertificateException;
028import java.util.ArrayList;
029import java.util.Collection;
030import java.util.Collections;
031import java.util.IdentityHashMap;
032import java.util.Iterator;
033import java.util.List;
034import java.util.ListIterator;
035import java.util.Map;
036import java.util.concurrent.atomic.AtomicInteger;
037import java.util.concurrent.atomic.AtomicLong;
038import java.util.concurrent.locks.ReentrantLock;
039import java.util.logging.Level;
040import java.util.logging.Logger;
041
042import javax.net.ssl.SSLEngine;
043import javax.net.ssl.SSLEngineResult;
044import javax.net.ssl.SSLException;
045import javax.net.ssl.SSLSession;
046
047import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
048import org.jivesoftware.smack.SmackException;
049import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException;
050import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException;
051import org.jivesoftware.smack.SmackException.SmackCertificateException;
052import org.jivesoftware.smack.SmackFuture;
053import org.jivesoftware.smack.SmackFuture.InternalSmackFuture;
054import org.jivesoftware.smack.SmackReactor.SelectionKeyAttachment;
055import org.jivesoftware.smack.XMPPException;
056import org.jivesoftware.smack.XmppInputOutputFilter;
057import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnection.ConnectedButUnauthenticatedStateDescriptor;
058import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnection.LookupRemoteConnectionEndpointsStateDescriptor;
059import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnectionModule;
060import org.jivesoftware.smack.c2s.StreamOpenAndCloseFactory;
061import org.jivesoftware.smack.c2s.XmppClientToServerTransport;
062import org.jivesoftware.smack.c2s.internal.ModularXmppClientToServerConnectionInternal;
063import org.jivesoftware.smack.c2s.internal.WalkStateGraphContext;
064import org.jivesoftware.smack.debugger.SmackDebugger;
065import org.jivesoftware.smack.fsm.State;
066import org.jivesoftware.smack.fsm.StateDescriptor;
067import org.jivesoftware.smack.fsm.StateTransitionResult;
068import org.jivesoftware.smack.internal.SmackTlsContext;
069import org.jivesoftware.smack.packet.Stanza;
070import org.jivesoftware.smack.packet.StartTls;
071import org.jivesoftware.smack.packet.StreamClose;
072import org.jivesoftware.smack.packet.StreamOpen;
073import org.jivesoftware.smack.packet.TlsFailure;
074import org.jivesoftware.smack.packet.TlsProceed;
075import org.jivesoftware.smack.packet.TopLevelStreamElement;
076import org.jivesoftware.smack.packet.XmlEnvironment;
077import org.jivesoftware.smack.tcp.XmppTcpTransportModule.XmppTcpNioTransport.DiscoveredTcpEndpoints;
078import org.jivesoftware.smack.tcp.rce.RemoteXmppTcpConnectionEndpoints;
079import org.jivesoftware.smack.tcp.rce.RemoteXmppTcpConnectionEndpoints.Result;
080import org.jivesoftware.smack.tcp.rce.Rfc6120TcpRemoteConnectionEndpoint;
081import org.jivesoftware.smack.util.CollectionUtil;
082import org.jivesoftware.smack.util.StringUtils;
083import org.jivesoftware.smack.util.UTF8;
084import org.jivesoftware.smack.util.XmlStringBuilder;
085import org.jivesoftware.smack.util.rce.RemoteConnectionEndpointLookupFailure;
086
087import org.jxmpp.jid.DomainBareJid;
088import org.jxmpp.jid.Jid;
089import org.jxmpp.jid.util.JidUtil;
090import org.jxmpp.xml.splitter.Utf8ByteXmppXmlSplitter;
091import org.jxmpp.xml.splitter.XmlPrettyPrinter;
092import org.jxmpp.xml.splitter.XmlPrinter;
093import org.jxmpp.xml.splitter.XmppElementCallback;
094import org.jxmpp.xml.splitter.XmppXmlSplitter;
095
096public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionModule<XmppTcpTransportModuleDescriptor> {
097
098    private static final Logger LOGGER = Logger.getLogger(XmppTcpTransportModule.class.getName());
099
100    private static final int CALLBACK_MAX_BYTES_READ = 10 * 1024 * 1024;
101    private static final int CALLBACK_MAX_BYTES_WRITEN = CALLBACK_MAX_BYTES_READ;
102
103    private static final int MAX_ELEMENT_SIZE = 64 * 1024;
104
105    private final XmppTcpNioTransport tcpNioTransport;
106
107    private SelectionKey selectionKey;
108    private SelectionKeyAttachment selectionKeyAttachment;
109    private SocketChannel socketChannel;
110    private InetSocketAddress remoteAddress;
111
112    private TlsState tlsState;
113
114    private Iterator<CharSequence> outgoingCharSequenceIterator;
115
116    private final List<TopLevelStreamElement> currentlyOutgoingElements = new ArrayList<>();
117    private final IdentityHashMap<ByteBuffer, List<TopLevelStreamElement>> bufferToElementMap = new IdentityHashMap<>();
118
119    private ByteBuffer outgoingBuffer;
120    private ByteBuffer filteredOutgoingBuffer;
121    private final List<ByteBuffer> networkOutgoingBuffers = new ArrayList<>();
122    private long networkOutgoingBuffersBytes;
123
124    // TODO: Make the size of the incomingBuffer configurable.
125    private final ByteBuffer incomingBuffer = ByteBuffer.allocateDirect(2 * 4096);
126
127    private final ReentrantLock channelSelectedCallbackLock = new ReentrantLock();
128
129    private long totalBytesRead;
130    private long totalBytesWritten;
131    private long totalBytesReadAfterFilter;
132    private long totalBytesWrittenBeforeFilter;
133    private long handledChannelSelectedCallbacks;
134    private long callbackPreemtBecauseBytesWritten;
135    private long callbackPreemtBecauseBytesRead;
136    private int sslEngineDelegatedTasks;
137    private int maxPendingSslEngineDelegatedTasks;
138
139    // TODO: Use LongAdder once Smack's minimum Android API level is 24 or higher.
140    private final AtomicLong setWriteInterestAfterChannelSelectedCallback = new AtomicLong();
141    private final AtomicLong reactorThreadAlreadyRacing = new AtomicLong();
142    private final AtomicLong afterOutgoingElementsQueueModifiedSetInterestOps = new AtomicLong();
143    private final AtomicLong rejectedChannelSelectedCallbacks = new AtomicLong();
144
145    private Jid lastDestinationAddress;
146
147    private boolean pendingInputFilterData;
148    private boolean pendingOutputFilterData;
149
150    private boolean pendingWriteInterestAfterRead;
151
152    /**
153     * Note that this field is effective final, but due to https://stackoverflow.com/q/30360824/194894 we have to declare it non-final.
154     */
155    private Utf8ByteXmppXmlSplitter splitter;
156
157    /**
158     * Note that this field is effective final, but due to https://stackoverflow.com/q/30360824/194894 we have to declare it non-final.
159     */
160    private XmppXmlSplitter outputDebugSplitter;
161
162    private static final Level STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL = Level.FINER;
163
164    XmppTcpTransportModule(XmppTcpTransportModuleDescriptor moduleDescriptor, ModularXmppClientToServerConnectionInternal connectionInternal) {
165        super(moduleDescriptor, connectionInternal);
166
167        tcpNioTransport = new XmppTcpNioTransport(connectionInternal);
168
169        XmlPrinter incomingDebugPrettyPrinter = null;
170        final SmackDebugger debugger = connectionInternal.smackDebugger;
171        if (debugger != null) {
172            // Incoming stream debugging.
173            incomingDebugPrettyPrinter = XmlPrettyPrinter.builder()
174                    .setPrettyWriter(sb -> debugger.incomingStreamSink(sb))
175                    .build();
176
177            // Outgoing stream debugging.
178            XmlPrinter outgoingDebugPrettyPrinter = XmlPrettyPrinter.builder()
179                    .setPrettyWriter(sb -> debugger.outgoingStreamSink(sb))
180                    .build();
181            outputDebugSplitter = new XmppXmlSplitter(outgoingDebugPrettyPrinter);
182        }
183
184        XmppXmlSplitter xmppXmlSplitter = new XmppXmlSplitter(MAX_ELEMENT_SIZE, xmppElementCallback,
185                incomingDebugPrettyPrinter);
186        splitter = new Utf8ByteXmppXmlSplitter(xmppXmlSplitter);
187    }
188
189    private final XmppElementCallback xmppElementCallback = new XmppElementCallback() {
190        private String streamOpen;
191        private String streamClose;
192
193        @Override
194        public void onCompleteElement(String completeElement) {
195            assert streamOpen != null;
196            assert streamClose != null;
197
198            connectionInternal.withSmackDebugger(debugger -> debugger.onIncomingElementCompleted());
199
200            String wrappedCompleteElement = streamOpen + completeElement + streamClose;
201            connectionInternal.parseAndProcessElement(wrappedCompleteElement);
202        }
203
204
205        @Override
206        public void streamOpened(String prefix, Map<String, String> attributes) {
207            if (LOGGER.isLoggable(STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL)) {
208                LOGGER.log(STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL,
209                                "Stream of " + this + " opened. prefix=" + prefix + " attributes=" + attributes);
210            }
211
212            final String prefixXmlns = "xmlns:" + prefix;
213            // TODO: Use the return value of onStreamOpen(), which now returns the
214            // corresponding stream close tag, instead of creating it here.
215            final StringBuilder streamClose = new StringBuilder(32);
216            final StringBuilder streamOpen = new StringBuilder(256);
217
218            streamOpen.append('<');
219            streamClose.append("</");
220            if (StringUtils.isNotEmpty(prefix)) {
221                streamOpen.append(prefix).append(':');
222                streamClose.append(prefix).append(':');
223            }
224            streamOpen.append("stream");
225            streamClose.append("stream>");
226            for (Map.Entry<String, String> entry : attributes.entrySet()) {
227                String attributeName = entry.getKey();
228                String attributeValue = entry.getValue();
229                switch (attributeName) {
230                case "to":
231                case "from":
232                case "id":
233                case "version":
234                    break;
235                case "xml:lang":
236                    streamOpen.append(" xml:lang='").append(attributeValue).append('\'');
237                    break;
238                case "xmlns":
239                    streamOpen.append(" xmlns='").append(attributeValue).append('\'');
240                    break;
241                default:
242                    if (attributeName.equals(prefixXmlns)) {
243                        streamOpen.append(' ').append(prefixXmlns).append("='").append(attributeValue).append('\'');
244                        break;
245                    }
246                    LOGGER.info("Unknown <stream/> attribute: " + attributeName);
247                    break;
248                }
249            }
250            streamOpen.append('>');
251
252            this.streamOpen = streamOpen.toString();
253            this.streamClose = streamClose.toString();
254
255            connectionInternal.onStreamOpen(this.streamOpen);
256        }
257
258        @Override
259        public void streamClosed() {
260            if (LOGGER.isLoggable(STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL)) {
261                LOGGER.log(STREAM_OPEN_CLOSE_DEBUG_LOG_LEVEL, "Stream of " + this + " closed");
262            }
263
264           connectionInternal.onStreamClosed();
265        }
266    };
267
268    private void onChannelSelected(SelectableChannel selectedChannel, SelectionKey selectedSelectionKey) {
269        assert selectionKey == null || selectionKey == selectedSelectionKey;
270        SocketChannel selectedSocketChannel = (SocketChannel) selectedChannel;
271        // We are *always* interested in OP_READ.
272        int newInterestedOps = SelectionKey.OP_READ;
273        boolean newPendingOutputFilterData = false;
274
275        if (!channelSelectedCallbackLock.tryLock()) {
276            rejectedChannelSelectedCallbacks.incrementAndGet();
277            return;
278        }
279
280        handledChannelSelectedCallbacks++;
281
282        long callbackBytesRead = 0;
283        long callbackBytesWritten = 0;
284
285        try {
286            boolean destinationAddressChanged = false;
287            boolean isLastPartOfElement = false;
288            TopLevelStreamElement currentlyOutgonigTopLevelStreamElement = null;
289            StringBuilder outgoingStreamForDebugger = null;
290
291            writeLoop: while (true) {
292                final boolean moreDataAvailable = !isLastPartOfElement || !connectionInternal.outgoingElementsQueue.isEmpty();
293
294                if (filteredOutgoingBuffer != null || !networkOutgoingBuffers.isEmpty()) {
295                    if (filteredOutgoingBuffer != null) {
296                        networkOutgoingBuffers.add(filteredOutgoingBuffer);
297                        networkOutgoingBuffersBytes += filteredOutgoingBuffer.remaining();
298
299                        filteredOutgoingBuffer = null;
300                        if (moreDataAvailable && networkOutgoingBuffersBytes < 8096) {
301                            continue;
302                        }
303                    }
304
305                    ByteBuffer[] output = networkOutgoingBuffers.toArray(new ByteBuffer[networkOutgoingBuffers.size()]);
306                    long bytesWritten;
307                    try {
308                        bytesWritten = selectedSocketChannel.write(output);
309                    } catch (IOException e) {
310                        // We have seen here so far
311                        // - IOException "Broken pipe"
312                        handleReadWriteIoException(e);
313                        break;
314                    }
315
316                    if (bytesWritten == 0) {
317                        newInterestedOps |= SelectionKey.OP_WRITE;
318                        break;
319                    }
320
321                    callbackBytesWritten += bytesWritten;
322
323                    networkOutgoingBuffersBytes -= bytesWritten;
324
325                    List<? extends Buffer> prunedBuffers = pruneBufferList(networkOutgoingBuffers);
326
327                    for (Buffer prunedBuffer : prunedBuffers) {
328                        List<TopLevelStreamElement> sendElements = bufferToElementMap.remove(prunedBuffer);
329                        if (sendElements == null) {
330                            continue;
331                        }
332                        for (TopLevelStreamElement elementJustSend : sendElements) {
333                            connectionInternal.fireFirstLevelElementSendListeners(elementJustSend);
334                        }
335                    }
336
337                    // Prevent one callback from dominating the reactor thread. Break out of the write-loop if we have
338                    // written a certain amount.
339                    if (callbackBytesWritten > CALLBACK_MAX_BYTES_WRITEN) {
340                        newInterestedOps |= SelectionKey.OP_WRITE;
341                        callbackPreemtBecauseBytesWritten++;
342                        break;
343                    }
344                } else if (outgoingBuffer != null || pendingOutputFilterData) {
345                    pendingOutputFilterData = false;
346
347                    if (outgoingBuffer != null) {
348                        totalBytesWrittenBeforeFilter += outgoingBuffer.remaining();
349                        if (isLastPartOfElement) {
350                            assert currentlyOutgonigTopLevelStreamElement != null;
351                            currentlyOutgoingElements.add(currentlyOutgonigTopLevelStreamElement);
352                        }
353                    }
354
355                    ByteBuffer outputFilterInputData = outgoingBuffer;
356                    // We can now null the outgoingBuffer since the filter step will take care of it from now on.
357                    outgoingBuffer = null;
358
359                    for (ListIterator<XmppInputOutputFilter> it = connectionInternal.getXmppInputOutputFilterBeginIterator(); it.hasNext();) {
360                        XmppInputOutputFilter inputOutputFilter = it.next();
361                        XmppInputOutputFilter.OutputResult outputResult;
362                        try {
363                            outputResult = inputOutputFilter.output(outputFilterInputData, isLastPartOfElement,
364                                    destinationAddressChanged, moreDataAvailable);
365                        } catch (IOException e) {
366                            connectionInternal.notifyConnectionError(e);
367                            break writeLoop;
368                        }
369                        newPendingOutputFilterData |= outputResult.pendingFilterData;
370                        outputFilterInputData = outputResult.filteredOutputData;
371                        if (outputFilterInputData != null) {
372                            outputFilterInputData.flip();
373                        }
374                    }
375
376                    // It is ok if outpuFilterInputData is 'null' here, this is expected behavior.
377                    if (outputFilterInputData != null && outputFilterInputData.hasRemaining()) {
378                        filteredOutgoingBuffer = outputFilterInputData;
379                    } else {
380                        filteredOutgoingBuffer = null;
381                    }
382
383                    // If the filters did eventually not produce any output data but if there is
384                    // pending output data then we have a pending write request after read.
385                    if (filteredOutgoingBuffer == null && newPendingOutputFilterData) {
386                        pendingWriteInterestAfterRead = true;
387                    }
388
389                    if (filteredOutgoingBuffer != null && isLastPartOfElement) {
390                        bufferToElementMap.put(filteredOutgoingBuffer, new ArrayList<>(currentlyOutgoingElements));
391                        currentlyOutgoingElements.clear();
392                    }
393
394                    // Reset that the destination address has changed.
395                    if (destinationAddressChanged) {
396                        destinationAddressChanged = false;
397                    }
398                } else if (outgoingCharSequenceIterator != null) {
399                    CharSequence nextCharSequence = outgoingCharSequenceIterator.next();
400                    outgoingBuffer = UTF8.encode(nextCharSequence);
401                    if (!outgoingCharSequenceIterator.hasNext()) {
402                        outgoingCharSequenceIterator = null;
403                        isLastPartOfElement = true;
404                    } else {
405                        isLastPartOfElement = false;
406                    }
407
408                    final SmackDebugger debugger = connectionInternal.smackDebugger;
409                    if (debugger != null) {
410                        if (outgoingStreamForDebugger == null) {
411                            outgoingStreamForDebugger = new StringBuilder();
412                        }
413                        outgoingStreamForDebugger.append(nextCharSequence);
414
415                        if (isLastPartOfElement) {
416                            try {
417                                outputDebugSplitter.append(outgoingStreamForDebugger);
418                            } catch (IOException e) {
419                                throw new AssertionError(e);
420                            }
421                            debugger.onOutgoingElementCompleted();
422                            outgoingStreamForDebugger = null;
423                        }
424                    }
425                } else if (!connectionInternal.outgoingElementsQueue.isEmpty()) {
426                    currentlyOutgonigTopLevelStreamElement = connectionInternal.outgoingElementsQueue.poll();
427                    if (currentlyOutgonigTopLevelStreamElement instanceof Stanza) {
428                        Stanza currentlyOutgoingStanza = (Stanza) currentlyOutgonigTopLevelStreamElement;
429                        Jid currentDestinationAddress = currentlyOutgoingStanza.getTo();
430                        destinationAddressChanged = !JidUtil.equals(lastDestinationAddress, currentDestinationAddress);
431                        lastDestinationAddress = currentDestinationAddress;
432                    }
433                    CharSequence nextCharSequence = currentlyOutgonigTopLevelStreamElement.toXML(StreamOpen.CLIENT_NAMESPACE);
434                    if (nextCharSequence instanceof XmlStringBuilder) {
435                        XmlStringBuilder xmlStringBuilder = (XmlStringBuilder) nextCharSequence;
436                        XmlEnvironment outgoingStreamXmlEnvironment = connectionInternal.getOutgoingStreamXmlEnvironment();
437                        outgoingCharSequenceIterator = xmlStringBuilder.toList(outgoingStreamXmlEnvironment).iterator();
438                    } else {
439                        outgoingCharSequenceIterator = Collections.singletonList(nextCharSequence).iterator();
440                    }
441                    assert outgoingCharSequenceIterator != null;
442                } else {
443                    // There is nothing more to write.
444                    break;
445                }
446            }
447
448            pendingOutputFilterData = newPendingOutputFilterData;
449            if (!pendingWriteInterestAfterRead && pendingOutputFilterData) {
450                newInterestedOps |= SelectionKey.OP_WRITE;
451            }
452
453            readLoop: while (true) {
454                // Prevent one callback from dominating the reactor thread. Break out of the read-loop if we have
455                // read a certain amount.
456                if (callbackBytesRead > CALLBACK_MAX_BYTES_READ) {
457                    callbackPreemtBecauseBytesRead++;
458                    break;
459                }
460
461                int bytesRead;
462                incomingBuffer.clear();
463                try {
464                    bytesRead = selectedSocketChannel.read(incomingBuffer);
465                } catch (IOException e) {
466                    handleReadWriteIoException(e);
467                    return;
468                }
469
470                if (bytesRead < 0) {
471                    LOGGER.finer("NIO read() returned " + bytesRead
472                            + " for " + this + ". This probably means that the TCP connection was terminated.");
473                    // According to the socket channel javadoc section about "asynchronous reads" a socket channel's
474                    // read() may return -1 if the input side of a socket is shut down.
475                     // Note that we do not call notifyConnectionError() here because the connection may be
476                    // cleanly shutdown which would also cause read() to return '-1. I assume that this socket
477                    // will be selected again, on which read() would throw an IOException, which will be catched
478                    // and invoke notifyConnectionError() (see a few lines above).
479                    /*
480                    IOException exception = new IOException("NIO read() returned " + bytesRead);
481                    notifyConnectionError(exception);
482                    */
483                    return;
484                }
485
486                if (!pendingInputFilterData) {
487                    if (bytesRead == 0) {
488                        // Nothing more to read.
489                        break;
490                    }
491                } else {
492                    pendingInputFilterData = false;
493                }
494
495                if (pendingWriteInterestAfterRead) {
496                    // We have successfully read something and someone announced a write interest after a read. It is
497                    // now possible that a filter is now also able to write additional data (for example SSLEngine).
498                    pendingWriteInterestAfterRead = false;
499                    newInterestedOps |= SelectionKey.OP_WRITE;
500                }
501
502                callbackBytesRead += bytesRead;
503
504                ByteBuffer filteredIncomingBuffer = incomingBuffer;
505                for (ListIterator<XmppInputOutputFilter> it = connectionInternal.getXmppInputOutputFilterEndIterator(); it.hasPrevious();) {
506                    filteredIncomingBuffer.flip();
507
508                    ByteBuffer newFilteredIncomingBuffer;
509                    try {
510                        newFilteredIncomingBuffer = it.previous().input(filteredIncomingBuffer);
511                    } catch (IOException e) {
512                        connectionInternal.notifyConnectionError(e);
513                        return;
514                    }
515                    if (newFilteredIncomingBuffer == null) {
516                        break readLoop;
517                    }
518                    filteredIncomingBuffer = newFilteredIncomingBuffer;
519                }
520
521                final int bytesReadAfterFilter = filteredIncomingBuffer.flip().remaining();
522
523                totalBytesReadAfterFilter += bytesReadAfterFilter;
524
525                try {
526                    splitter.write(filteredIncomingBuffer);
527                } catch (IOException e) {
528                    connectionInternal.notifyConnectionError(e);
529                    return;
530                }
531            }
532        } finally {
533            totalBytesWritten += callbackBytesWritten;
534            totalBytesRead += callbackBytesRead;
535
536            channelSelectedCallbackLock.unlock();
537        }
538
539        // Indicate that there is no reactor thread racing towards handling this selection key.
540        final SelectionKeyAttachment selectionKeyAttachment = this.selectionKeyAttachment;
541        if (selectionKeyAttachment != null) {
542            selectionKeyAttachment.resetReactorThreadRacing();
543        }
544
545        // Check the queue again to prevent lost wakeups caused by elements inserted before we
546        // called resetReactorThreadRacing() a few lines above.
547        if (!connectionInternal.outgoingElementsQueue.isEmpty()) {
548            setWriteInterestAfterChannelSelectedCallback.incrementAndGet();
549            newInterestedOps |= SelectionKey.OP_WRITE;
550        }
551
552        connectionInternal.setInterestOps(selectionKey, newInterestedOps);
553    }
554
555    private void handleReadWriteIoException(IOException e) {
556        if (e instanceof ClosedChannelException && !tcpNioTransport.isConnected()) {
557            // The connection is already closed.
558            return;
559        }
560
561       connectionInternal.notifyConnectionError(e);
562    }
563
564    /**
565     * This is the interface between the "lookup remote connection endpoints" state and the "establish TCP connection"
566     * state. The field is indirectly populated by {@link XmppTcpNioTransport#lookupConnectionEndpoints()} and consumed
567     * by {@link ConnectionAttemptState}.
568     */
569    DiscoveredTcpEndpoints discoveredTcpEndpoints;
570
571    final class XmppTcpNioTransport extends XmppClientToServerTransport {
572
573        XmppTcpNioTransport(ModularXmppClientToServerConnectionInternal connectionInternal) {
574            super(connectionInternal);
575        }
576
577        @Override
578        public StreamOpenAndCloseFactory getStreamOpenAndCloseFactory() {
579            return new StreamOpenAndCloseFactory() {
580                @Override
581                public StreamOpen createStreamOpen(DomainBareJid to, CharSequence from, String id, String lang) {
582                    String xmlLang = connectionInternal.connection.getConfiguration().getXmlLang();
583                    StreamOpen streamOpen = new StreamOpen(to, from, id, xmlLang, StreamOpen.StreamContentNamespace.client);
584                    return streamOpen;
585                }
586                @Override
587                public StreamClose createStreamClose() {
588                    return StreamClose.INSTANCE;
589                }
590            };
591        }
592
593        @Override
594        protected void resetDiscoveredConnectionEndpoints() {
595            discoveredTcpEndpoints = null;
596        }
597
598        @Override
599        public boolean hasUseableConnectionEndpoints() {
600            return discoveredTcpEndpoints != null;
601        }
602
603        @Override
604        protected List<SmackFuture<LookupConnectionEndpointsResult, Exception>> lookupConnectionEndpoints() {
605            // Assert that there are no stale discovered endpoints prior performing the lookup.
606            assert discoveredTcpEndpoints == null;
607
608            List<SmackFuture<LookupConnectionEndpointsResult, Exception>> futures = new ArrayList<>(2);
609
610            InternalSmackFuture<LookupConnectionEndpointsResult, Exception> tcpEndpointsLookupFuture = new InternalSmackFuture<>();
611            connectionInternal.asyncGo(() -> {
612                Result<Rfc6120TcpRemoteConnectionEndpoint> result = RemoteXmppTcpConnectionEndpoints.lookup(
613                                connectionInternal.connection.getConfiguration());
614
615                LookupConnectionEndpointsResult endpointsResult;
616                if (result.discoveredRemoteConnectionEndpoints.isEmpty()) {
617                    endpointsResult = new TcpEndpointDiscoveryFailed(result);
618                } else {
619                    endpointsResult = new DiscoveredTcpEndpoints(result);
620                }
621                tcpEndpointsLookupFuture.setResult(endpointsResult);
622            });
623            futures.add(tcpEndpointsLookupFuture);
624
625            if (moduleDescriptor.isDirectTlsEnabled()) {
626                // TODO: Implement this.
627                throw new IllegalArgumentException("DirectTLS is not implemented yet");
628            }
629
630            return futures;
631        }
632
633        @Override
634        protected void loadConnectionEndpoints(LookupConnectionEndpointsSuccess lookupConnectionEndpointsSuccess) {
635            // The API contract stats that we will be given the instance we handed out with lookupConnectionEndpoints,
636            // which must be of type DiscoveredTcpEndpoints here. Hence if we can not cast it, then there is an internal
637            // Smack error.
638            discoveredTcpEndpoints = (DiscoveredTcpEndpoints) lookupConnectionEndpointsSuccess;
639        }
640
641        @Override
642        protected void afterFiltersClosed() {
643            pendingInputFilterData = pendingOutputFilterData = true;
644            afterOutgoingElementsQueueModified();
645        }
646
647        @Override
648        protected void disconnect() {
649            XmppTcpTransportModule.this.closeSocketAndCleanup();
650        }
651
652        @Override
653        protected void notifyAboutNewOutgoingElements() {
654            afterOutgoingElementsQueueModified();
655        }
656
657        @Override
658        public SSLSession getSslSession() {
659            TlsState tlsState = XmppTcpTransportModule.this.tlsState;
660            if (tlsState == null) {
661                return null;
662            }
663
664            return tlsState.engine.getSession();
665        }
666
667        public boolean isConnected() {
668            SocketChannel socketChannel = XmppTcpTransportModule.this.socketChannel;
669            if (socketChannel == null) {
670                return false;
671            }
672
673            return socketChannel.isConnected();
674        }
675
676        @Override
677        public boolean isTransportSecured() {
678            final TlsState tlsState = XmppTcpTransportModule.this.tlsState;
679            return tlsState != null && tlsState.handshakeStatus == TlsHandshakeStatus.successful;
680        }
681
682        @Override
683        public XmppTcpTransportModule.Stats getStats() {
684            return XmppTcpTransportModule.this.getStats();
685        }
686
687        final class DiscoveredTcpEndpoints implements LookupConnectionEndpointsSuccess {
688            final RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result;
689            DiscoveredTcpEndpoints(RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result) {
690                this.result = result;
691            }
692        }
693
694        final class TcpEndpointDiscoveryFailed implements LookupConnectionEndpointsFailed {
695            final List<RemoteConnectionEndpointLookupFailure> lookupFailures;
696            TcpEndpointDiscoveryFailed(RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result) {
697                lookupFailures = result.lookupFailures;
698            }
699        }
700    }
701
702    private void afterOutgoingElementsQueueModified() {
703        final SelectionKeyAttachment selectionKeyAttachment = this.selectionKeyAttachment;
704        if (selectionKeyAttachment != null && selectionKeyAttachment.isReactorThreadRacing()) {
705            // A reactor thread is already racing to the channel selected callback and will take care of this.
706            reactorThreadAlreadyRacing.incrementAndGet();
707            return;
708        }
709
710        afterOutgoingElementsQueueModifiedSetInterestOps.incrementAndGet();
711
712        // Add OP_WRITE to the interested Ops, since we have now new things to write. Note that this may cause
713        // multiple reactor threads to race to the channel selected callback in case we perform this right after
714        // a select() returned with this selection key in the selected-key set. Hence we use tryLock() in the
715        // channel selected callback to keep the invariant that only exactly one thread is performing the
716        // callback.
717        // Note that we need to perform setInterestedOps() *without* holding the channelSelectedCallbackLock, as
718        // otherwise the reactor thread racing to the channel selected callback may found the lock still locked, which
719        // would result in the outgoingElementsQueue not being handled.
720        connectionInternal.setInterestOps(selectionKey, SelectionKey.OP_WRITE | SelectionKey.OP_READ);
721    }
722
723    @Override
724    protected XmppTcpNioTransport getTransport() {
725        return tcpNioTransport;
726    }
727
728    static final class EstablishingTcpConnectionStateDescriptor extends StateDescriptor {
729        private EstablishingTcpConnectionStateDescriptor() {
730            super(XmppTcpTransportModule.EstablishingTcpConnectionState.class);
731            addPredeccessor(LookupRemoteConnectionEndpointsStateDescriptor.class);
732            addSuccessor(EstablishTlsStateDescriptor.class);
733            addSuccessor(ConnectedButUnauthenticatedStateDescriptor.class);
734        }
735
736        @Override
737        protected XmppTcpTransportModule.EstablishingTcpConnectionState constructState(ModularXmppClientToServerConnectionInternal connectionInternal) {
738            XmppTcpTransportModule tcpTransportModule = connectionInternal.connection.getConnectionModuleFor(XmppTcpTransportModuleDescriptor.class);
739            return tcpTransportModule.constructEstablishingTcpConnectionState(this, connectionInternal);
740        }
741    }
742
743    private EstablishingTcpConnectionState constructEstablishingTcpConnectionState(
744                    EstablishingTcpConnectionStateDescriptor stateDescriptor,
745                    ModularXmppClientToServerConnectionInternal connectionInternal) {
746        return new EstablishingTcpConnectionState(stateDescriptor, connectionInternal);
747    }
748
749    final class EstablishingTcpConnectionState extends State.AbstractTransport {
750        private EstablishingTcpConnectionState(EstablishingTcpConnectionStateDescriptor stateDescriptor,
751                        ModularXmppClientToServerConnectionInternal connectionInternal) {
752            super(tcpNioTransport, stateDescriptor, connectionInternal);
753        }
754
755        @Override
756        public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext)
757                        throws InterruptedException, IOException, SmackException, XMPPException {
758            // The fields inetSocketAddress and failedAddresses are handed over from LookupHostAddresses to
759            // ConnectingToHost.
760            ConnectionAttemptState connectionAttemptState = new ConnectionAttemptState(connectionInternal, discoveredTcpEndpoints,
761                    this);
762            StateTransitionResult.Failure failure = connectionAttemptState.establishTcpConnection();
763            if (failure != null) {
764                return failure;
765            }
766
767            socketChannel = connectionAttemptState.socketChannel;
768            remoteAddress = (InetSocketAddress) socketChannel.socket().getRemoteSocketAddress();
769
770            selectionKey = connectionInternal.registerWithSelector(socketChannel, SelectionKey.OP_READ,
771                            XmppTcpTransportModule.this::onChannelSelected);
772            selectionKeyAttachment = (SelectionKeyAttachment) selectionKey.attachment();
773
774            connectionInternal.setTransport(tcpNioTransport);
775
776            // TODO: It appears this should be done in a generic way. I'd assume we always
777            // have to wait for stream features after the connection was established. If this is true then consider
778            // moving this into State.AbstractTransport. But I am not yet 100% positive that this is the case for every
779            // transport. Hence keep it here for now.
780            connectionInternal.newStreamOpenWaitForFeaturesSequence("stream features after initial connection");
781
782            return new TcpSocketConnectedResult(remoteAddress);
783        }
784
785        @Override
786        public void resetState() {
787            closeSocketAndCleanup();
788        }
789    }
790
791    public static final class TcpSocketConnectedResult extends StateTransitionResult.Success {
792        private final InetSocketAddress remoteAddress;
793
794        private TcpSocketConnectedResult(InetSocketAddress remoteAddress) {
795            super("TCP connection established to " + remoteAddress);
796            this.remoteAddress = remoteAddress;
797        }
798
799        public InetSocketAddress getRemoteAddress() {
800            return remoteAddress;
801        }
802    }
803
804    public static final class TlsEstablishedResult extends StateTransitionResult.Success {
805
806        private TlsEstablishedResult(SSLEngine sslEngine) {
807            super("TLS established: " + sslEngine.getSession());
808        }
809    }
810
811    static final class EstablishTlsStateDescriptor extends StateDescriptor {
812        private EstablishTlsStateDescriptor() {
813            super(XmppTcpTransportModule.EstablishTlsState.class, "RFC 6120 ยง 5");
814            addSuccessor(ConnectedButUnauthenticatedStateDescriptor.class);
815            declarePrecedenceOver(ConnectedButUnauthenticatedStateDescriptor.class);
816        }
817
818        @Override
819        protected EstablishTlsState constructState(ModularXmppClientToServerConnectionInternal connectionInternal) {
820            XmppTcpTransportModule tcpTransportModule = connectionInternal.connection.getConnectionModuleFor(XmppTcpTransportModuleDescriptor.class);
821            return tcpTransportModule.constructEstablishingTlsState(this, connectionInternal);
822        }
823    }
824
825    private EstablishTlsState constructEstablishingTlsState(
826                    EstablishTlsStateDescriptor stateDescriptor,
827                    ModularXmppClientToServerConnectionInternal connectionInternal) {
828        return new EstablishTlsState(stateDescriptor, connectionInternal);
829    }
830
831    private final class EstablishTlsState extends State {
832        private EstablishTlsState(EstablishTlsStateDescriptor stateDescriptor,
833                        ModularXmppClientToServerConnectionInternal connectionInternal) {
834            super(stateDescriptor, connectionInternal);
835        }
836
837        @Override
838        public StateTransitionResult.TransitionImpossible isTransitionToPossible(WalkStateGraphContext walkStateGraphContext)
839                throws SecurityRequiredByClientException, SecurityRequiredByServerException {
840            StartTls startTlsFeature = connectionInternal.connection.getFeature(StartTls.class);
841            SecurityMode securityMode = connectionInternal.connection.getConfiguration().getSecurityMode();
842
843            switch (securityMode) {
844            case required:
845            case ifpossible:
846                if (startTlsFeature == null) {
847                    if (securityMode == SecurityMode.ifpossible) {
848                        return new StateTransitionResult.TransitionImpossibleReason("Server does not announce support for TLS and we do not required it");
849                    }
850                    throw new SecurityRequiredByClientException();
851                }
852                // Allows transition by returning null.
853                return null;
854            case disabled:
855                if (startTlsFeature != null && startTlsFeature.required()) {
856                    throw new SecurityRequiredByServerException();
857                }
858                return new StateTransitionResult.TransitionImpossibleReason("TLS disabled in client settings and server does not require it");
859            default:
860                throw new AssertionError("Unknown security mode: " + securityMode);
861            }
862        }
863
864        @Override
865        public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext)
866                        throws IOException, InterruptedException, SmackException, XMPPException {
867            connectionInternal.sendAndWaitForResponse(StartTls.INSTANCE, TlsProceed.class, TlsFailure.class);
868
869            SmackTlsContext smackTlsContext = connectionInternal.getSmackTlsContext();
870
871            tlsState = new TlsState(smackTlsContext);
872            connectionInternal.addXmppInputOutputFilter(tlsState);
873
874            channelSelectedCallbackLock.lock();
875            try {
876                pendingOutputFilterData = true;
877                // The beginHandshake() is possibly not really required here, but it does not hurt either.
878                tlsState.engine.beginHandshake();
879                tlsState.handshakeStatus = TlsHandshakeStatus.initiated;
880            } finally {
881                channelSelectedCallbackLock.unlock();
882            }
883            connectionInternal.setInterestOps(selectionKey, SelectionKey.OP_WRITE | SelectionKey.OP_READ);
884
885            try {
886                tlsState.waitForHandshakeFinished();
887            } catch (CertificateException e) {
888                throw new SmackCertificateException(e);
889            }
890
891            connectionInternal.newStreamOpenWaitForFeaturesSequence("stream features after TLS established");
892
893            return new TlsEstablishedResult(tlsState.engine);
894        }
895
896        @Override
897        public void resetState() {
898            tlsState = null;
899        }
900    }
901
902    private enum TlsHandshakeStatus {
903        initial,
904        initiated,
905        successful,
906        failed,
907    }
908
909    private static final Level SSL_ENGINE_DEBUG_LOG_LEVEL = Level.FINEST;
910
911    private static void debugLogSslEngineResult(String operation, SSLEngineResult result) {
912        if (!LOGGER.isLoggable(SSL_ENGINE_DEBUG_LOG_LEVEL)) {
913            return;
914        }
915
916        LOGGER.log(SSL_ENGINE_DEBUG_LOG_LEVEL, "SSLEngineResult of " + operation + "(): " + result);
917    }
918
919    private final class TlsState implements XmppInputOutputFilter {
920
921        private static final int MAX_PENDING_OUTPUT_BYTES = 8096;
922
923        private final SmackTlsContext smackTlsContext;
924        private final SSLEngine engine;
925
926        private TlsHandshakeStatus handshakeStatus = TlsHandshakeStatus.initial;
927        private SSLException handshakeException;
928
929        private ByteBuffer myNetData;
930        private ByteBuffer peerAppData;
931
932        private final List<ByteBuffer> pendingOutputData = new ArrayList<>();
933        private int pendingOutputBytes;
934        private ByteBuffer pendingInputData;
935
936        private final AtomicInteger pendingDelegatedTasks = new AtomicInteger();
937
938        private long wrapInBytes;
939        private long wrapOutBytes;
940
941        private long unwrapInBytes;
942        private long unwrapOutBytes;
943
944        private TlsState(SmackTlsContext smackTlsContext) throws IOException {
945            this.smackTlsContext = smackTlsContext;
946
947            // Call createSSLEngine()'s variant with two parameters as this allows for TLS session resumption.
948
949            // Note that it is not really clear what the value of peer host should be. It could be A) the XMPP service's
950            // domainpart or B) the DNS name of the host we are connecting to (usually the DNS SRV RR target name). While
951            // the javadoc of createSSLEngine(String, int) indicates with "Some cipher suites (such as Kerberos) require
952            // remote hostname information, in which case peerHost needs to be specified." that A should be used. TLS
953            // session resumption may would need or at least benefit from B. Variant A would also be required if the
954            // String is used for certificate verification. And it appears at least likely that TLS session resumption
955            // would not be hurt by using variant A. Therefore we currently use variant A.
956            // TODO: Should we use the ACE representation of the XMPP service domain? Compare with f60e4055ec529f0b8160acedf13275592ab10a4b
957            // If yes, then we should probably introduce getXmppServiceDomainAceEncodedIfPossible().
958            String peerHost = connectionInternal.connection.getConfiguration().getXMPPServiceDomain().toString();
959            engine = smackTlsContext.sslContext.createSSLEngine(peerHost, remoteAddress.getPort());
960            engine.setUseClientMode(true);
961
962            SSLSession session = engine.getSession();
963            int applicationBufferSize = session.getApplicationBufferSize();
964            int packetBufferSize = session.getPacketBufferSize();
965
966            myNetData = ByteBuffer.allocateDirect(packetBufferSize);
967            peerAppData = ByteBuffer.allocate(applicationBufferSize);
968        }
969
970        @Override
971        public OutputResult output(ByteBuffer outputData, boolean isFinalDataOfElement, boolean destinationAddressChanged,
972                boolean moreDataAvailable) throws SSLException {
973            if (outputData != null) {
974                pendingOutputData.add(outputData);
975                pendingOutputBytes += outputData.remaining();
976                if (moreDataAvailable && pendingOutputBytes < MAX_PENDING_OUTPUT_BYTES) {
977                    return OutputResult.NO_OUTPUT;
978                }
979            }
980
981            ByteBuffer[] outputDataArray = pendingOutputData.toArray(new ByteBuffer[pendingOutputData.size()]);
982
983            myNetData.clear();
984
985            while (true) {
986                SSLEngineResult result;
987                try {
988                    result = engine.wrap(outputDataArray, myNetData);
989                } catch (SSLException e) {
990                    handleSslException(e);
991                    throw e;
992                }
993
994                debugLogSslEngineResult("wrap", result);
995
996                SSLEngineResult.Status engineResultStatus = result.getStatus();
997
998                pendingOutputBytes -= result.bytesConsumed();
999
1000                if (engineResultStatus == SSLEngineResult.Status.OK) {
1001                    wrapInBytes += result.bytesConsumed();
1002                    wrapOutBytes += result.bytesProduced();
1003
1004                    SSLEngineResult.HandshakeStatus handshakeStatus = handleHandshakeStatus(result);
1005                    switch (handshakeStatus) {
1006                        case NEED_UNWRAP:
1007                            // NEED_UNWRAP means that we need to receive something in order to continue the handshake. The
1008                            // standard channelSelectedCallback logic will take care of this, as there is eventually always
1009                            // a interest to read from the socket.
1010                            break;
1011                        case NEED_WRAP:
1012                            // Same as need task: Cycle the reactor.
1013                        case NEED_TASK:
1014                            // Note that we also set pendingOutputFilterData in the OutputResult in the NEED_TASK case, as
1015                            // we also want to retry the wrap() operation above in this case.
1016                            return new OutputResult(true, myNetData);
1017                        default:
1018                            break;
1019                    }
1020                }
1021
1022                switch (engineResultStatus) {
1023                case OK:
1024                    // No need to outputData.compact() here, since we do not reuse the buffer.
1025                    // Clean up the pending output data.
1026                    pruneBufferList(pendingOutputData);
1027                    return new OutputResult(!pendingOutputData.isEmpty(), myNetData);
1028                case CLOSED:
1029                    pendingOutputData.clear();
1030                    return OutputResult.NO_OUTPUT;
1031                case BUFFER_OVERFLOW:
1032                    LOGGER.warning("SSLEngine status BUFFER_OVERFLOW, this is hopefully uncommon");
1033                    int outputDataRemaining = outputData != null ? outputData.remaining() : 0;
1034                    int newCapacity = (int) (1.3 * outputDataRemaining);
1035                    // If newCapacity would not increase myNetData, then double it.
1036                    if (newCapacity <= myNetData.capacity()) {
1037                        newCapacity = 2 * myNetData.capacity();
1038                    }
1039                    ByteBuffer newMyNetData = ByteBuffer.allocateDirect(newCapacity);
1040                    myNetData.flip();
1041                    newMyNetData.put(myNetData);
1042                    myNetData = newMyNetData;
1043                    continue;
1044                case BUFFER_UNDERFLOW:
1045                    throw new IllegalStateException(
1046                            "Buffer underflow as result of SSLEngine.wrap() should never happen");
1047                }
1048            }
1049        }
1050
1051        @SuppressWarnings("ReferenceEquality")
1052        @Override
1053        public ByteBuffer input(ByteBuffer inputData) throws SSLException {
1054            ByteBuffer accumulatedData;
1055            if (pendingInputData == null) {
1056                accumulatedData = inputData;
1057            } else {
1058                assert pendingInputData != inputData;
1059
1060                int accumulatedDataBytes = pendingInputData.remaining() + inputData.remaining();
1061                accumulatedData = ByteBuffer.allocate(accumulatedDataBytes);
1062                accumulatedData.put(pendingInputData)
1063                               .put(inputData)
1064                               .flip();
1065                pendingInputData = null;
1066            }
1067
1068            peerAppData.clear();
1069
1070            while (true) {
1071                SSLEngineResult result;
1072                try {
1073                    result = engine.unwrap(accumulatedData, peerAppData);
1074                } catch (SSLException e) {
1075                    handleSslException(e);
1076                    throw e;
1077                }
1078
1079                debugLogSslEngineResult("unwrap", result);
1080
1081                SSLEngineResult.Status engineResultStatus = result.getStatus();
1082
1083                if (engineResultStatus == SSLEngineResult.Status.OK) {
1084                    unwrapInBytes += result.bytesConsumed();
1085                    unwrapOutBytes += result.bytesProduced();
1086
1087                    SSLEngineResult.HandshakeStatus handshakeStatus = handleHandshakeStatus(result);
1088                    switch (handshakeStatus) {
1089                    case NEED_TASK:
1090                        // A delegated task is asynchronously running. Take care of the remaining accumulatedData.
1091                        addAsPendingInputData(accumulatedData);
1092                        // Return here, as the async task created by handleHandshakeStatus will continue calling the
1093                        // cannelSelectedCallback.
1094                        return null;
1095                    case NEED_UNWRAP:
1096                        continue;
1097                    case NEED_WRAP:
1098                        // NEED_WRAP means that the SSLEngine needs to send data, probably without consuming data.
1099                        // We exploit here the fact that the channelSelectedCallback is single threaded and that the
1100                        // input processing is after the output processing.
1101                        addAsPendingInputData(accumulatedData);
1102                        // Note that it is ok that we the provided argument for pending input filter data to channel
1103                        // selected callback is false, as setPendingInputFilterData() will have set the internal state
1104                        // boolean accordingly.
1105                        connectionInternal.asyncGo(() -> callChannelSelectedCallback(false, true));
1106                        // Do not break here, but instead return and let the asynchronously invoked
1107                        // callChannelSelectedCallback() do its work.
1108                        return null;
1109                    default:
1110                        break;
1111                    }
1112                }
1113
1114                switch (engineResultStatus) {
1115                case OK:
1116                    // SSLEngine's unwrap() may not consume all bytes from the source buffer. If this is the case, then
1117                    // simply perform another unwrap until accumlatedData has no remaining bytes.
1118                    if (accumulatedData.hasRemaining()) {
1119                        continue;
1120                    }
1121                    return peerAppData;
1122                case CLOSED:
1123                    return null;
1124                case BUFFER_UNDERFLOW:
1125                    // There were not enough source bytes available to make a complete packet. Let it in
1126                    // pendingInputData. Note that we do not resize SSLEngine's source buffer - inputData in our case -
1127                    // as it is not possible.
1128                    addAsPendingInputData(accumulatedData);
1129                    return null;
1130                case BUFFER_OVERFLOW:
1131                    int applicationBufferSize = engine.getSession().getApplicationBufferSize();
1132                    assert peerAppData.remaining() < applicationBufferSize;
1133                    peerAppData = ByteBuffer.allocate(applicationBufferSize);
1134                    continue;
1135                }
1136            }
1137        }
1138
1139        private void addAsPendingInputData(ByteBuffer byteBuffer) {
1140            // Note that we can not simply write
1141            // pendingInputData = byteBuffer;
1142            // we have to copy the provided byte buffer, because it is possible that this byteBuffer is re-used by some
1143            // higher layer. That is, here 'byteBuffer' is typically 'incomingBuffer', which is a direct buffer only
1144            // allocated once per connection for performance reasons and hence re-used for read() calls.
1145            pendingInputData = ByteBuffer.allocate(byteBuffer.remaining());
1146            pendingInputData.put(byteBuffer).flip();
1147
1148            pendingInputFilterData = pendingInputData.hasRemaining();
1149        }
1150
1151        private SSLEngineResult.HandshakeStatus handleHandshakeStatus(SSLEngineResult sslEngineResult) {
1152            SSLEngineResult.HandshakeStatus handshakeStatus = sslEngineResult.getHandshakeStatus();
1153            switch (handshakeStatus) {
1154            case NEED_TASK:
1155                while (true) {
1156                    final Runnable delegatedTask = engine.getDelegatedTask();
1157                    if (delegatedTask == null) {
1158                        break;
1159                    }
1160                    sslEngineDelegatedTasks++;
1161                    int currentPendingDelegatedTasks = pendingDelegatedTasks.incrementAndGet();
1162                    if (currentPendingDelegatedTasks > maxPendingSslEngineDelegatedTasks) {
1163                        maxPendingSslEngineDelegatedTasks = currentPendingDelegatedTasks;
1164                    }
1165
1166                    Runnable wrappedDelegatedTask = () -> {
1167                        delegatedTask.run();
1168                        int wrappedCurrentPendingDelegatedTasks = pendingDelegatedTasks.decrementAndGet();
1169                        if (wrappedCurrentPendingDelegatedTasks == 0) {
1170                            callChannelSelectedCallback(true, true);
1171                        }
1172                    };
1173                    connectionInternal.asyncGo(wrappedDelegatedTask);
1174                }
1175                break;
1176            case FINISHED:
1177                onHandshakeFinished();
1178                break;
1179            default:
1180                break;
1181            }
1182
1183            SSLEngineResult.HandshakeStatus afterHandshakeStatus = engine.getHandshakeStatus();
1184            return afterHandshakeStatus;
1185        }
1186
1187        private void handleSslException(SSLException e) {
1188            handshakeException = e;
1189            handshakeStatus = TlsHandshakeStatus.failed;
1190            connectionInternal.notifyWaitingThreads();
1191        }
1192
1193        private void onHandshakeFinished() {
1194            handshakeStatus = TlsHandshakeStatus.successful;
1195            connectionInternal.notifyWaitingThreads();
1196        }
1197
1198        private boolean isHandshakeFinished() {
1199            return handshakeStatus == TlsHandshakeStatus.successful || handshakeStatus == TlsHandshakeStatus.failed;
1200        }
1201
1202        private void waitForHandshakeFinished() throws InterruptedException, CertificateException, SSLException, SmackException, XMPPException {
1203            connectionInternal.waitForConditionOrThrowConnectionException(() -> isHandshakeFinished(), "TLS handshake to finish");
1204
1205            if (handshakeStatus == TlsHandshakeStatus.failed) {
1206                throw handshakeException;
1207            }
1208
1209            assert handshakeStatus == TlsHandshakeStatus.successful;
1210
1211            if (smackTlsContext.daneVerifier != null) {
1212                smackTlsContext.daneVerifier.finish(engine.getSession());
1213            }
1214        }
1215
1216        @Override
1217        public Object getStats() {
1218            return new TlsStateStats(this);
1219        }
1220
1221        @Override
1222        public void closeInputOutput() {
1223            engine.closeOutbound();
1224            try {
1225                engine.closeInbound();
1226            } catch (SSLException e) {
1227                LOGGER.log(Level.FINEST,
1228                        "SSLException when closing inbound TLS session. This can likely be ignored if a possible truncation attack is suggested."
1229                        + " You may want to ask your XMPP server vendor to implement a clean TLS session shutdown sending close_notify after </stream>",
1230                        e);
1231            }
1232        }
1233
1234        @Override
1235        public void waitUntilInputOutputClosed() throws IOException, CertificateException, InterruptedException,
1236                SmackException, XMPPException {
1237            waitForHandshakeFinished();
1238        }
1239
1240        @Override
1241        public String getFilterName() {
1242            return "TLS (" + engine + ')';
1243        }
1244    }
1245
1246    public static final class TlsStateStats {
1247        public final long wrapInBytes;
1248        public final long wrapOutBytes;
1249        public final double wrapRatio;
1250
1251        public final long unwrapInBytes;
1252        public final long unwrapOutBytes;
1253        public final double unwrapRatio;
1254
1255        private TlsStateStats(TlsState tlsState) {
1256            wrapOutBytes = tlsState.wrapOutBytes;
1257            wrapInBytes = tlsState.wrapInBytes;
1258            wrapRatio = (double) wrapOutBytes / wrapInBytes;
1259
1260            unwrapOutBytes = tlsState.unwrapOutBytes;
1261            unwrapInBytes = tlsState.unwrapInBytes;
1262            unwrapRatio = (double) unwrapInBytes / unwrapOutBytes;
1263        }
1264
1265        private transient String toStringCache;
1266
1267        @Override
1268        public String toString() {
1269            if (toStringCache != null) {
1270                return toStringCache;
1271            }
1272
1273            toStringCache =
1274                      "wrap-in-bytes: " + wrapInBytes + '\n'
1275                    + "wrap-out-bytes: " + wrapOutBytes + '\n'
1276                    + "wrap-ratio: " + wrapRatio + '\n'
1277                    + "unwrap-in-bytes: " + unwrapInBytes + '\n'
1278                    + "unwrap-out-bytes: " + unwrapOutBytes + '\n'
1279                    + "unwrap-ratio: " + unwrapRatio + '\n'
1280                    ;
1281
1282            return toStringCache;
1283        }
1284    }
1285
1286    private void callChannelSelectedCallback(boolean setPendingInputFilterData, boolean setPendingOutputFilterData) {
1287        final SocketChannel channel = socketChannel;
1288        final SelectionKey key = selectionKey;
1289        if (channel == null || key == null) {
1290            LOGGER.info("Not calling channel selected callback because the connection was eventually disconnected");
1291            return;
1292        }
1293
1294        channelSelectedCallbackLock.lock();
1295        try {
1296            // Note that it is important that we send the pending(Input|Output)FilterData flags while holding the lock.
1297            if (setPendingInputFilterData) {
1298                pendingInputFilterData = true;
1299            }
1300            if (setPendingOutputFilterData) {
1301                pendingOutputFilterData = true;
1302            }
1303
1304            onChannelSelected(channel, key);
1305        } finally {
1306            channelSelectedCallbackLock.unlock();
1307        }
1308    }
1309
1310    private void closeSocketAndCleanup() {
1311        final SelectionKey selectionKey = this.selectionKey;
1312        if (selectionKey != null) {
1313            selectionKey.cancel();
1314        }
1315        final SocketChannel socketChannel = this.socketChannel;
1316        if (socketChannel != null) {
1317            try {
1318                socketChannel.close();
1319            } catch (IOException e) {
1320                LOGGER.log(Level.FINE, "Closing the socket channel failed", e);
1321            }
1322        }
1323
1324        this.selectionKey = null;
1325        this.socketChannel = null;
1326
1327        selectionKeyAttachment = null;
1328        remoteAddress = null;
1329    }
1330
1331    private static List<? extends Buffer> pruneBufferList(Collection<? extends Buffer> buffers) {
1332        return CollectionUtil.removeUntil(buffers, b -> b.hasRemaining());
1333    }
1334
1335    public XmppTcpTransportModule.Stats getStats() {
1336        return new Stats(this);
1337    }
1338
1339    public static final class Stats extends XmppClientToServerTransport.Stats {
1340        public final long totalBytesWritten;
1341        public final long totalBytesWrittenBeforeFilter;
1342        public final double writeRatio;
1343
1344        public final long totalBytesRead;
1345        public final long totalBytesReadAfterFilter;
1346        public final double readRatio;
1347
1348        public final long handledChannelSelectedCallbacks;
1349        public final long setWriteInterestAfterChannelSelectedCallback;
1350        public final long reactorThreadAlreadyRacing;
1351        public final long afterOutgoingElementsQueueModifiedSetInterestOps;
1352        public final long rejectedChannelSelectedCallbacks;
1353        public final long totalCallbackRequests;
1354        public final long callbackPreemtBecauseBytesWritten;
1355        public final long callbackPreemtBecauseBytesRead;
1356        public final int sslEngineDelegatedTasks;
1357        public final int maxPendingSslEngineDelegatedTasks;
1358
1359        private Stats(XmppTcpTransportModule connection) {
1360            totalBytesWritten = connection.totalBytesWritten;
1361            totalBytesWrittenBeforeFilter = connection.totalBytesWrittenBeforeFilter;
1362            writeRatio = (double) totalBytesWritten / totalBytesWrittenBeforeFilter;
1363
1364            totalBytesReadAfterFilter = connection.totalBytesReadAfterFilter;
1365            totalBytesRead = connection.totalBytesRead;
1366            readRatio = (double) totalBytesRead / totalBytesReadAfterFilter;
1367
1368            handledChannelSelectedCallbacks = connection.handledChannelSelectedCallbacks;
1369            setWriteInterestAfterChannelSelectedCallback = connection.setWriteInterestAfterChannelSelectedCallback.get();
1370            reactorThreadAlreadyRacing = connection.reactorThreadAlreadyRacing.get();
1371            afterOutgoingElementsQueueModifiedSetInterestOps = connection.afterOutgoingElementsQueueModifiedSetInterestOps
1372                    .get();
1373            rejectedChannelSelectedCallbacks = connection.rejectedChannelSelectedCallbacks.get();
1374
1375            totalCallbackRequests = handledChannelSelectedCallbacks + rejectedChannelSelectedCallbacks;
1376
1377            callbackPreemtBecauseBytesRead = connection.callbackPreemtBecauseBytesRead;
1378            callbackPreemtBecauseBytesWritten = connection.callbackPreemtBecauseBytesWritten;
1379
1380            sslEngineDelegatedTasks = connection.sslEngineDelegatedTasks;
1381            maxPendingSslEngineDelegatedTasks = connection.maxPendingSslEngineDelegatedTasks;
1382        }
1383
1384        private transient String toStringCache;
1385
1386        @Override
1387        public String toString() {
1388            if (toStringCache != null) {
1389                return toStringCache;
1390            }
1391
1392            toStringCache =
1393              "Total bytes\n"
1394            + "recv: " + totalBytesRead + '\n'
1395            + "send: " + totalBytesWritten + '\n'
1396            + "recv-aft-filter: " + totalBytesReadAfterFilter + '\n'
1397            + "send-bef-filter: " + totalBytesWrittenBeforeFilter + '\n'
1398            + "read-ratio: " + readRatio + '\n'
1399            + "write-ratio: " + writeRatio + '\n'
1400            + "Events\n"
1401            + "total-callback-requests: " + totalCallbackRequests + '\n'
1402            + "handled-channel-selected-callbacks: " + handledChannelSelectedCallbacks + '\n'
1403            + "rejected-channel-selected-callbacks: " + rejectedChannelSelectedCallbacks + '\n'
1404            + "set-write-interest-after-callback: " + setWriteInterestAfterChannelSelectedCallback + '\n'
1405            + "reactor-thread-already-racing: " + reactorThreadAlreadyRacing + '\n'
1406            + "after-queue-modified-set-interest-ops: " + afterOutgoingElementsQueueModifiedSetInterestOps + '\n'
1407            + "callback-preemt-because-bytes-read: " + callbackPreemtBecauseBytesRead + '\n'
1408            + "callback-preemt-because-bytes-written: " + callbackPreemtBecauseBytesWritten + '\n'
1409            + "ssl-engine-delegated-tasks: " + sslEngineDelegatedTasks + '\n'
1410            + "max-pending-ssl-engine-delegated-tasks: " + maxPendingSslEngineDelegatedTasks + '\n'
1411            ;
1412
1413            return toStringCache;
1414        }
1415    }
1416}