001/**
002 *
003 * Copyright 2003-2007 Jive Software.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack.tcp;
018
019import java.io.BufferedReader;
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.InputStreamReader;
023import java.io.OutputStream;
024import java.io.OutputStreamWriter;
025import java.io.Writer;
026import java.net.InetAddress;
027import java.net.InetSocketAddress;
028import java.net.Socket;
029import java.security.cert.CertificateException;
030import java.util.ArrayList;
031import java.util.Collection;
032import java.util.Iterator;
033import java.util.LinkedHashSet;
034import java.util.LinkedList;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import java.util.concurrent.ArrayBlockingQueue;
039import java.util.concurrent.BlockingQueue;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.concurrent.ConcurrentLinkedQueue;
042import java.util.concurrent.TimeUnit;
043import java.util.concurrent.atomic.AtomicBoolean;
044import java.util.logging.Level;
045import java.util.logging.Logger;
046
047import javax.net.SocketFactory;
048import javax.net.ssl.HostnameVerifier;
049import javax.net.ssl.SSLSession;
050import javax.net.ssl.SSLSocket;
051import javax.net.ssl.SSLSocketFactory;
052
053import org.jivesoftware.smack.AbstractXMPPConnection;
054import org.jivesoftware.smack.ConnectionConfiguration;
055import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
056import org.jivesoftware.smack.ConnectionListener;
057import org.jivesoftware.smack.SmackConfiguration;
058import org.jivesoftware.smack.SmackException;
059import org.jivesoftware.smack.SmackException.AlreadyConnectedException;
060import org.jivesoftware.smack.SmackException.AlreadyLoggedInException;
061import org.jivesoftware.smack.SmackException.ConnectionException;
062import org.jivesoftware.smack.SmackException.EndpointConnectionException;
063import org.jivesoftware.smack.SmackException.NotConnectedException;
064import org.jivesoftware.smack.SmackException.NotLoggedInException;
065import org.jivesoftware.smack.SmackException.SecurityNotPossibleException;
066import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException;
067import org.jivesoftware.smack.SmackFuture;
068import org.jivesoftware.smack.StanzaListener;
069import org.jivesoftware.smack.XMPPConnection;
070import org.jivesoftware.smack.XMPPException;
071import org.jivesoftware.smack.XMPPException.FailedNonzaException;
072import org.jivesoftware.smack.XMPPException.StreamErrorException;
073import org.jivesoftware.smack.compress.packet.Compress;
074import org.jivesoftware.smack.compress.packet.Compressed;
075import org.jivesoftware.smack.compression.XMPPInputOutputStream;
076import org.jivesoftware.smack.datatypes.UInt16;
077import org.jivesoftware.smack.filter.StanzaFilter;
078import org.jivesoftware.smack.internal.SmackTlsContext;
079import org.jivesoftware.smack.packet.Element;
080import org.jivesoftware.smack.packet.IQ;
081import org.jivesoftware.smack.packet.Message;
082import org.jivesoftware.smack.packet.Nonza;
083import org.jivesoftware.smack.packet.Presence;
084import org.jivesoftware.smack.packet.Stanza;
085import org.jivesoftware.smack.packet.StartTls;
086import org.jivesoftware.smack.packet.StreamError;
087import org.jivesoftware.smack.packet.StreamOpen;
088import org.jivesoftware.smack.proxy.ProxyInfo;
089import org.jivesoftware.smack.sasl.packet.SaslNonza;
090import org.jivesoftware.smack.sm.SMUtils;
091import org.jivesoftware.smack.sm.StreamManagementException;
092import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException;
093import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError;
094import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException;
095import org.jivesoftware.smack.sm.packet.StreamManagement;
096import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer;
097import org.jivesoftware.smack.sm.packet.StreamManagement.AckRequest;
098import org.jivesoftware.smack.sm.packet.StreamManagement.Enable;
099import org.jivesoftware.smack.sm.packet.StreamManagement.Enabled;
100import org.jivesoftware.smack.sm.packet.StreamManagement.Failed;
101import org.jivesoftware.smack.sm.packet.StreamManagement.Resume;
102import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed;
103import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature;
104import org.jivesoftware.smack.sm.predicates.Predicate;
105import org.jivesoftware.smack.sm.provider.ParseStreamManagement;
106import org.jivesoftware.smack.tcp.rce.RemoteXmppTcpConnectionEndpoints;
107import org.jivesoftware.smack.tcp.rce.Rfc6120TcpRemoteConnectionEndpoint;
108import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
109import org.jivesoftware.smack.util.Async;
110import org.jivesoftware.smack.util.CloseableUtil;
111import org.jivesoftware.smack.util.PacketParserUtils;
112import org.jivesoftware.smack.util.StringUtils;
113import org.jivesoftware.smack.util.TLSUtils;
114import org.jivesoftware.smack.util.XmlStringBuilder;
115import org.jivesoftware.smack.util.rce.RemoteConnectionException;
116import org.jivesoftware.smack.xml.SmackXmlParser;
117import org.jivesoftware.smack.xml.XmlPullParser;
118import org.jivesoftware.smack.xml.XmlPullParserException;
119
120import org.jxmpp.jid.impl.JidCreate;
121import org.jxmpp.jid.parts.Resourcepart;
122import org.jxmpp.stringprep.XmppStringprepException;
123import org.minidns.dnsname.DnsName;
124
125/**
126 * Creates a socket connection to an XMPP server. This is the default connection
127 * to an XMPP server and is specified in the XMPP Core (RFC 6120).
128 *
129 * @see XMPPConnection
130 * @author Matt Tucker
131 */
132public class XMPPTCPConnection extends AbstractXMPPConnection {
133
134    private static final int QUEUE_SIZE = 500;
135    private static final Logger LOGGER = Logger.getLogger(XMPPTCPConnection.class.getName());
136
137    /**
138     * The socket which is used for this connection.
139     */
140    private Socket socket;
141
142    /**
143     *
144     */
145    private boolean disconnectedButResumeable = false;
146
147    private SSLSocket secureSocket;
148
149    /**
150     * Protected access level because of unit test purposes
151     */
152    protected final PacketWriter packetWriter = new PacketWriter();
153
154    /**
155     * Protected access level because of unit test purposes
156     */
157    protected final PacketReader packetReader = new PacketReader();
158
159    /**
160     *
161     */
162    private boolean streamFeaturesAfterAuthenticationReceived;
163
164    /**
165     *
166     */
167    private boolean compressSyncPoint;
168
169    /**
170     * The default bundle and defer callback, used for new connections.
171     * @see bundleAndDeferCallback
172     */
173    private static BundleAndDeferCallback defaultBundleAndDeferCallback;
174
175    /**
176     * The used bundle and defer callback.
177     * <p>
178     * Although this field may be set concurrently, the 'volatile' keyword was deliberately not added, in order to avoid
179     * having a 'volatile' read within the writer threads loop.
180     * </p>
181     */
182    private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback;
183
184    private static boolean useSmDefault = true;
185
186    private static boolean useSmResumptionDefault = true;
187
188    /**
189     * The stream ID of the stream that is currently resumable, ie. the stream we hold the state
190     * for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and
191     * {@link #unacknowledgedStanzas}.
192     */
193    private String smSessionId;
194
195    /**
196     * Represents the state of stream management resumption.
197     * <p>
198     * Unlike other sync points, this sync point is marked volatile because it is also read by the reader thread.
199     * </p>
200     */
201    private volatile SyncPointState smResumedSyncPoint;
202    private Failed smResumptionFailed;
203
204    /**
205     * Represents the state of stream magement.
206     * <p>
207     * This boolean is marked volatile as it is read by various threads, including the reader thread via {@link #isSmEnabled()}.
208     * </p>
209     */
210    private volatile boolean smEnabledSyncPoint;
211
212    /**
213     * The client's preferred maximum resumption time in seconds.
214     */
215    private int smClientMaxResumptionTime = -1;
216
217    /**
218     * The server's preferred maximum resumption time in seconds.
219     */
220    private int smServerMaxResumptionTime = -1;
221
222    /**
223     * Indicates whether Stream Management (XEP-198) should be used if it's supported by the server.
224     */
225    private boolean useSm = useSmDefault;
226    private boolean useSmResumption = useSmResumptionDefault;
227
228    /**
229     * The counter that the server sends the client about it's current height. For example, if the server sends
230     * {@code <a h='42'/>}, then this will be set to 42 (while also handling the {@link #unacknowledgedStanzas} queue).
231     */
232    private long serverHandledStanzasCount = 0;
233
234    /**
235     * The counter for stanzas handled ("received") by the client.
236     * <p>
237     * Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are
238     * not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since
239     * {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and
240     * therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic.
241     * </p>
242     */
243    private long clientHandledStanzasCount = 0;
244
245    private BlockingQueue<Stanza> unacknowledgedStanzas;
246
247    /**
248     * Set to true if Stream Management was at least once enabled for this connection.
249     */
250    private boolean smWasEnabledAtLeastOnce = false;
251
252    /**
253     * This listeners are invoked for every stanza that got acknowledged.
254     * <p>
255     * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove
256     * themselves after they have been invoked.
257     * </p>
258     */
259    private final Collection<StanzaListener> stanzaAcknowledgedListeners = new ConcurrentLinkedQueue<>();
260
261    /**
262     * These listeners are invoked for every stanza that got dropped.
263     * <p>
264     * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove
265     * themselves after they have been invoked.
266     * </p>
267     */
268    private final Collection<StanzaListener> stanzaDroppedListeners = new ConcurrentLinkedQueue<>();
269
270    /**
271     * This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will
272     * only be invoked once and automatically removed after that.
273     */
274    private final Map<String, StanzaListener> stanzaIdAcknowledgedListeners = new ConcurrentHashMap<>();
275
276    /**
277     * Predicates that determine if an stream management ack should be requested from the server.
278     * <p>
279     * We use a linked hash set here, so that the order how the predicates are added matches the
280     * order in which they are invoked in order to determine if an ack request should be send or not.
281     * </p>
282     */
283    private final Set<StanzaFilter> requestAckPredicates = new LinkedHashSet<>();
284
285    @SuppressWarnings("HidingField")
286    private final XMPPTCPConnectionConfiguration config;
287
288    /**
289     * Creates a new XMPP connection over TCP (optionally using proxies).
290     * <p>
291     * Note that XMPPTCPConnection constructors do not establish a connection to the server
292     * and you must call {@link #connect()}.
293     * </p>
294     *
295     * @param config the connection configuration.
296     */
297    public XMPPTCPConnection(XMPPTCPConnectionConfiguration config) {
298        super(config);
299        this.config = config;
300        addConnectionListener(new ConnectionListener() {
301            @Override
302            public void connectionClosedOnError(Exception e) {
303                if (e instanceof XMPPException.StreamErrorException || e instanceof StreamManagementException) {
304                    dropSmState();
305                }
306            }
307        });
308
309        // Re-init the reader and writer in case of SASL <success/>. This is done to reset the parser since a new stream
310        // is initiated.
311        buildNonzaCallback().listenFor(SaslNonza.Success.class, s -> resetParser()).install();
312    }
313
314    /**
315     * Creates a new XMPP connection over TCP.
316     * <p>
317     * Note that {@code jid} must be the bare JID, e.g. "user@example.org". More fine-grained control over the
318     * connection settings is available using the {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)}
319     * constructor.
320     * </p>
321     *
322     * @param jid the bare JID used by the client.
323     * @param password the password or authentication token.
324     * @throws XmppStringprepException if the provided string is invalid.
325     */
326    public XMPPTCPConnection(CharSequence jid, String password) throws XmppStringprepException {
327        this(XMPPTCPConnectionConfiguration.builder().setXmppAddressAndPassword(jid, password).build());
328    }
329
330    /**
331     * Creates a new XMPP connection over TCP.
332     * <p>
333     * This is the simplest constructor for connecting to an XMPP server. Alternatively,
334     * you can get fine-grained control over connection settings using the
335     * {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} constructor.
336     * </p>
337     * @param username TODO javadoc me please
338     * @param password TODO javadoc me please
339     * @param serviceName TODO javadoc me please
340     * @throws XmppStringprepException if the provided string is invalid.
341     */
342    public XMPPTCPConnection(CharSequence username, String password, String serviceName) throws XmppStringprepException {
343        this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setXmppDomain(
344                                        JidCreate.domainBareFrom(serviceName)).build());
345    }
346
347    @Override
348    protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException {
349        if (packetWriter == null) {
350            throw new NotConnectedException();
351        }
352        packetWriter.throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
353    }
354
355    @Override
356    protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException {
357        if (isConnected() && !disconnectedButResumeable) {
358            throw new AlreadyConnectedException();
359        }
360    }
361
362    @Override
363    protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException {
364        if (isAuthenticated() && !disconnectedButResumeable) {
365            throw new AlreadyLoggedInException();
366        }
367    }
368
369    @Override
370    protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException {
371        // Reset the flag in case it was set
372        disconnectedButResumeable = false;
373        super.afterSuccessfulLogin(resumed);
374    }
375
376    @Override
377    protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException,
378                    SmackException, IOException, InterruptedException {
379        // Authenticate using SASL
380        SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null;
381
382        streamFeaturesAfterAuthenticationReceived = false;
383        authenticate(username, password, config.getAuthzid(), sslSession);
384
385        // Wait for stream features after the authentication.
386        // TODO: The name of this synchronization point "maybeCompressFeaturesReceived" is not perfect. It should be
387        // renamed to "streamFeaturesAfterAuthenticationReceived".
388        waitForConditionOrThrowConnectionException(() -> streamFeaturesAfterAuthenticationReceived, "compress features from server");
389
390        // If compression is enabled then request the server to use stream compression. XEP-170
391        // recommends to perform stream compression before resource binding.
392        maybeEnableCompression();
393
394        smResumedSyncPoint = SyncPointState.initial;
395        smResumptionFailed = null;
396        if (isSmResumptionPossible()) {
397            smResumedSyncPoint = SyncPointState.request_sent;
398            sendNonza(new Resume(clientHandledStanzasCount, smSessionId));
399            waitForConditionOrThrowConnectionException(() -> smResumedSyncPoint == SyncPointState.successful || smResumptionFailed != null, "resume previous stream");
400            if (smResumedSyncPoint == SyncPointState.successful) {
401                // We successfully resumed the stream, be done here
402                afterSuccessfulLogin(true);
403                return;
404            }
405            // SM resumption failed, what Smack does here is to report success of
406            // lastFeaturesReceived in case of sm resumption was answered with 'failed' so that
407            // normal resource binding can be tried.
408            assert smResumptionFailed != null;
409            LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process: " + smResumptionFailed);
410        }
411
412        // We either failed to resume a previous stream management (SM) session, or we did not even try. In any case,
413        // mark SM as not enabled. Most importantly, we do this prior calling bindResourceAndEstablishSession(), as the
414        // bind IQ may trigger a SM ack request, which would be invalid in the pre resource bound state.
415        smEnabledSyncPoint = false;
416
417        List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>();
418        if (unacknowledgedStanzas != null) {
419            // There was a previous connection with SM enabled but that was either not resumable or
420            // failed to resume. Make sure that we (re-)send the unacknowledged stanzas.
421            unacknowledgedStanzas.drainTo(previouslyUnackedStanzas);
422            // Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this
423            // XMPP session (There maybe was an enabled in a previous XMPP session of this
424            // connection instance though). This is used in writePackets to decide if stanzas should
425            // be added to the unacknowledged stanzas queue, because they have to be added right
426            // after the 'enable' stream element has been sent.
427            dropSmState();
428        }
429
430        // Now bind the resource. It is important to do this *after* we dropped an eventually
431        // existing Stream Management state. As otherwise <bind/> and <session/> may end up in
432        // unacknowledgedStanzas and become duplicated on reconnect. See SMACK-706.
433        bindResourceAndEstablishSession(resource);
434
435        if (isSmAvailable() && useSm) {
436            // Remove what is maybe left from previously stream managed sessions
437            serverHandledStanzasCount = 0;
438            sendNonza(new Enable(useSmResumption, smClientMaxResumptionTime));
439            // XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed'
440            // then this is a non recoverable error and we therefore throw an exception.
441            waitForConditionOrThrowConnectionException(() -> smEnabledSyncPoint, "enabling stream mangement");
442            synchronized (requestAckPredicates) {
443                if (requestAckPredicates.isEmpty()) {
444                    // Assure that we have at lest one predicate set up that so that we request acks
445                    // for the server and eventually flush some stanzas from the unacknowledged
446                    // stanza queue
447                    requestAckPredicates.add(Predicate.forMessagesOrAfter5Stanzas());
448                }
449            }
450        }
451        // Inform client about failed resumption if possible, resend stanzas otherwise
452        // Process the stanzas synchronously so a client can re-queue them for transmission
453        // before it is informed about connection success
454        if (!stanzaDroppedListeners.isEmpty()) {
455            for (Stanza stanza : previouslyUnackedStanzas) {
456                for (StanzaListener listener : stanzaDroppedListeners) {
457                    try {
458                        listener.processStanza(stanza);
459                    }
460                    catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
461                        LOGGER.log(Level.FINER, "StanzaDroppedListener received exception", e);
462                    }
463                }
464            }
465        } else {
466            for (Stanza stanza : previouslyUnackedStanzas) {
467                sendStanzaInternal(stanza);
468            }
469        }
470
471        afterSuccessfulLogin(false);
472    }
473
474    @Override
475    public boolean isSecureConnection() {
476        return secureSocket != null;
477    }
478
479    /**
480     * Shuts the current connection down. After this method returns, the connection must be ready
481     * for re-use by connect.
482     */
483    @Override
484    protected void shutdown() {
485        if (isSmEnabled()) {
486            try {
487                // Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM
488                // state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either.
489                sendSmAcknowledgementInternal();
490            } catch (InterruptedException | NotConnectedException e) {
491                LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e);
492            }
493        }
494        shutdown(false);
495    }
496
497    @Override
498    public synchronized void instantShutdown() {
499        shutdown(true);
500    }
501
502    private void shutdown(boolean instant) {
503        // The writer thread may already been finished at this point, for example when the connection is in the
504        // disconnected-but-resumable state. There is no need to wait for the closing stream tag from the server in this
505        // case.
506        if (!packetWriter.done()) {
507            // First shutdown the writer, this will result in a closing stream element getting send to
508            // the server
509            LOGGER.finer(packetWriter.threadName + " shutdown()");
510            packetWriter.shutdown(instant);
511            LOGGER.finer(packetWriter.threadName + " shutdown() returned");
512
513            if (!instant) {
514                waitForClosingStreamTagFromServer();
515            }
516        }
517
518        LOGGER.finer(packetReader.threadName + " shutdown()");
519        packetReader.shutdown();
520        LOGGER.finer(packetReader.threadName + " shutdown() returned");
521
522        CloseableUtil.maybeClose(socket, LOGGER);
523
524        setWasAuthenticated();
525
526        try {
527            boolean readerAndWriterThreadsTermianted = waitFor(() -> !packetWriter.running && !packetReader.running);
528            if (!readerAndWriterThreadsTermianted) {
529                LOGGER.severe("Reader and/or writer threads did not terminate timely. Writer running: "
530                                + packetWriter.running + ", Reader running: " + packetReader.running);
531            } else {
532                LOGGER.fine("Reader and writer threads terminated");
533            }
534        } catch (InterruptedException e) {
535            LOGGER.log(Level.FINE, "Interrupted while waiting for reader and writer threads to terminate", e);
536        }
537
538        if (disconnectedButResumeable) {
539            return;
540        }
541
542        // If we are able to resume the stream, then don't set
543        // connected/authenticated/usingTLS to false since we like to behave like we are still
544        // connected (e.g. sendStanza should not throw a NotConnectedException).
545        if (instant) {
546            disconnectedButResumeable = isSmResumptionPossible();
547            if (!disconnectedButResumeable) {
548                // Reset the stream management session id to null, since the stream is no longer resumable. Note that we
549                // keep the unacknowledgedStanzas queue, because we want to resend them when we are reconnected.
550                smSessionId = null;
551            }
552        } else {
553            disconnectedButResumeable = false;
554
555            // Drop the stream management state if this is not an instant shutdown. We send
556            // a </stream> close tag and now the stream management state is no longer valid.
557            // This also prevents that we will potentially (re-)send any unavailable presence we
558            // may have send, because it got put into the unacknowledged queue and was not acknowledged before the
559            // connection terminated.
560            dropSmState();
561            // Note that we deliberately do not reset authenticatedConnectionInitiallyEstablishedTimestamp here, so that the
562            // information is available in the connectionClosedOnError() listeners.
563        }
564        authenticated = false;
565        connected = false;
566        secureSocket = null;
567        reader = null;
568        writer = null;
569
570        initState();
571    }
572
573    @Override
574    public void sendNonza(Nonza element) throws NotConnectedException, InterruptedException {
575        packetWriter.sendStreamElement(element);
576    }
577
578    @Override
579    protected void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException {
580        packetWriter.sendStreamElement(packet);
581        if (isSmEnabled()) {
582            for (StanzaFilter requestAckPredicate : requestAckPredicates) {
583                if (requestAckPredicate.accept(packet)) {
584                    requestSmAcknowledgementInternal();
585                    break;
586                }
587            }
588        }
589    }
590
591    private void connectUsingConfiguration() throws ConnectionException, IOException, InterruptedException {
592        RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result = RemoteXmppTcpConnectionEndpoints.lookup(config);
593
594        List<RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint>> connectionExceptions = new ArrayList<>();
595
596        SocketFactory socketFactory = config.getSocketFactory();
597        ProxyInfo proxyInfo = config.getProxyInfo();
598        int timeout = config.getConnectTimeout();
599        if (socketFactory == null) {
600            socketFactory = SocketFactory.getDefault();
601        }
602        for (Rfc6120TcpRemoteConnectionEndpoint endpoint : result.discoveredRemoteConnectionEndpoints) {
603            Iterator<? extends InetAddress> inetAddresses;
604            String host = endpoint.getHost().toString();
605            UInt16 portUint16 = endpoint.getPort();
606            int port = portUint16.intValue();
607            if (proxyInfo == null) {
608                inetAddresses = endpoint.getInetAddresses().iterator();
609                assert inetAddresses.hasNext();
610
611                innerloop: while (inetAddresses.hasNext()) {
612                    // Create a *new* Socket before every connection attempt, i.e. connect() call, since Sockets are not
613                    // re-usable after a failed connection attempt. See also SMACK-724.
614                    SmackFuture.SocketFuture socketFuture = new SmackFuture.SocketFuture(socketFactory);
615
616                    final InetAddress inetAddress = inetAddresses.next();
617                    final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, port);
618                    LOGGER.finer("Trying to establish TCP connection to " + inetSocketAddress);
619                    socketFuture.connectAsync(inetSocketAddress, timeout);
620
621                    try {
622                        socket = socketFuture.getOrThrow();
623                    } catch (IOException e) {
624                        RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>(
625                                        endpoint, inetAddress, e);
626                        connectionExceptions.add(rce);
627                        if (inetAddresses.hasNext()) {
628                            continue innerloop;
629                        } else {
630                            break innerloop;
631                        }
632                    }
633                    LOGGER.finer("Established TCP connection to " + inetSocketAddress);
634                    // We found a host to connect to, return here
635                    this.host = host;
636                    this.port = portUint16;
637                    return;
638                }
639            } else {
640                // TODO: Move this into the inner-loop above. There appears no reason why we should not try a proxy
641                // connection to every inet address of each connection endpoint.
642                socket = socketFactory.createSocket();
643                StringUtils.requireNotNullNorEmpty(host, "Host of endpoint " + endpoint + " must not be null when using a Proxy");
644                final String hostAndPort = host + " at port " + port;
645                LOGGER.finer("Trying to establish TCP connection via Proxy to " + hostAndPort);
646                try {
647                    proxyInfo.getProxySocketConnection().connect(socket, host, port, timeout);
648                } catch (IOException e) {
649                    CloseableUtil.maybeClose(socket, LOGGER);
650                    RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>(endpoint, null, e);
651                    connectionExceptions.add(rce);
652                    continue;
653                }
654                LOGGER.finer("Established TCP connection to " + hostAndPort);
655                // We found a host to connect to, return here
656                this.host = host;
657                this.port = portUint16;
658                return;
659            }
660        }
661
662        // There are no more host addresses to try
663        // throw an exception and report all tried
664        // HostAddresses in the exception
665        throw EndpointConnectionException.from(result.lookupFailures, connectionExceptions);
666    }
667
668    /**
669     * Initializes the connection by creating a stanza reader and writer and opening a
670     * XMPP stream to the server.
671     *
672     * @throws IOException if an I/O error occurred.
673     * @throws InterruptedException if the calling thread was interrupted.
674     */
675    private void initConnection() throws IOException, InterruptedException {
676        compressionHandler = null;
677
678        // Set the reader and writer instance variables
679        initReaderAndWriter();
680
681        // Start the writer thread. This will open an XMPP stream to the server
682        packetWriter.init();
683        // Start the reader thread. The startup() method will block until we
684        // get an opening stream packet back from server
685        packetReader.init();
686    }
687
688    private void initReaderAndWriter() throws IOException {
689        InputStream is = socket.getInputStream();
690        OutputStream os = socket.getOutputStream();
691        if (compressionHandler != null) {
692            is = compressionHandler.getInputStream(is);
693            os = compressionHandler.getOutputStream(os);
694        }
695        // OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter
696        writer = new OutputStreamWriter(os, "UTF-8");
697        reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
698
699        // If debugging is enabled, we open a window and write out all network traffic.
700        initDebugger();
701    }
702
703    /**
704     * The server has indicated that TLS negotiation can start. We now need to secure the
705     * existing plain connection and perform a handshake. This method won't return until the
706     * connection has finished the handshake or an error occurred while securing the connection.
707     * @throws IOException if an I/O error occurred.
708     * @throws SecurityNotPossibleException if TLS is not possible.
709     * @throws CertificateException if there is an issue with the certificate.
710     */
711    @SuppressWarnings("LiteralClassName")
712    private void proceedTLSReceived() throws IOException, SecurityNotPossibleException, CertificateException {
713        SmackTlsContext smackTlsContext = getSmackTlsContext();
714
715        Socket plain = socket;
716        int port = plain.getPort();
717        String xmppServiceDomainString = config.getXMPPServiceDomain().toString();
718        SSLSocketFactory sslSocketFactory = smackTlsContext.sslContext.getSocketFactory();
719        // Secure the plain connection
720        socket = sslSocketFactory.createSocket(plain, xmppServiceDomainString, port, true);
721
722        final SSLSocket sslSocket = (SSLSocket) socket;
723        // Immediately set the enabled SSL protocols and ciphers. See SMACK-712 why this is
724        // important (at least on certain platforms) and it seems to be a good idea anyways to
725        // prevent an accidental implicit handshake.
726        TLSUtils.setEnabledProtocolsAndCiphers(sslSocket, config.getEnabledSSLProtocols(), config.getEnabledSSLCiphers());
727
728        // Initialize the reader and writer with the new secured version
729        initReaderAndWriter();
730
731        // Proceed to do the handshake
732        sslSocket.startHandshake();
733
734        if (smackTlsContext.daneVerifier != null) {
735            smackTlsContext.daneVerifier.finish(sslSocket.getSession());
736        }
737
738        final HostnameVerifier verifier = getConfiguration().getHostnameVerifier();
739        if (verifier == null) {
740                throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure.");
741        }
742
743        final String verifierHostname;
744        {
745            DnsName xmppServiceDomainDnsName = getConfiguration().getXmppServiceDomainAsDnsNameIfPossible();
746            // Try to convert the XMPP service domain, which potentially includes Unicode characters, into ASCII
747            // Compatible Encoding (ACE) to match RFC3280 dNSname IA5String constraint.
748            // See also: https://bugzilla.mozilla.org/show_bug.cgi?id=280839#c1
749            if (xmppServiceDomainDnsName != null) {
750                verifierHostname = xmppServiceDomainDnsName.ace;
751            }
752            else {
753                LOGGER.log(Level.WARNING, "XMPP service domain name '" + getXMPPServiceDomain()
754                                + "' can not be represented as DNS name. TLS X.509 certificate validiation may fail.");
755                verifierHostname = getXMPPServiceDomain().toString();
756            }
757        }
758
759        final boolean verificationSuccessful;
760        // Verify the TLS session.
761        verificationSuccessful = verifier.verify(verifierHostname, sslSocket.getSession());
762        if (!verificationSuccessful) {
763            throw new CertificateException(
764                            "Hostname verification of certificate failed. Certificate does not authenticate "
765                                            + getXMPPServiceDomain());
766        }
767
768        // Set that TLS was successful
769        secureSocket = sslSocket;
770    }
771
772    /**
773     * Returns the compression handler that can be used for one compression methods offered by the server.
774     *
775     * @return a instance of XMPPInputOutputStream or null if no suitable instance was found
776     *
777     */
778    private static XMPPInputOutputStream maybeGetCompressionHandler(Compress.Feature compression) {
779        for (XMPPInputOutputStream handler : SmackConfiguration.getCompressionHandlers()) {
780                String method = handler.getCompressionMethod();
781                if (compression.getMethods().contains(method))
782                    return handler;
783        }
784        return null;
785    }
786
787    @Override
788    public boolean isUsingCompression() {
789        return compressionHandler != null && compressSyncPoint;
790    }
791
792    /**
793     * <p>
794     * Starts using stream compression that will compress network traffic. Traffic can be
795     * reduced up to 90%. Therefore, stream compression is ideal when using a slow speed network
796     * connection. However, the server and the client will need to use more CPU time in order to
797     * un/compress network data so under high load the server performance might be affected.
798     * </p>
799     * <p>
800     * Stream compression has to have been previously offered by the server. Currently only the
801     * zlib method is supported by the client. Stream compression negotiation has to be done
802     * before authentication took place.
803     * </p>
804     *
805     * @throws NotConnectedException if the XMPP connection is not connected.
806     * @throws SmackException if Smack detected an exceptional situation.
807     * @throws InterruptedException if the calling thread was interrupted.
808     * @throws XMPPException if an XMPP protocol error was received.
809     */
810    private void maybeEnableCompression() throws SmackException, InterruptedException, XMPPException {
811        if (!config.isCompressionEnabled()) {
812            return;
813        }
814
815        Compress.Feature compression = getFeature(Compress.Feature.class);
816        if (compression == null) {
817            // Server does not support compression
818            return;
819        }
820        // If stream compression was offered by the server and we want to use
821        // compression then send compression request to the server
822        if ((compressionHandler = maybeGetCompressionHandler(compression)) != null) {
823            compressSyncPoint = false;
824            sendNonza(new Compress(compressionHandler.getCompressionMethod()));
825            waitForConditionOrThrowConnectionException(() -> compressSyncPoint, "establishing stream compression");
826        } else {
827            LOGGER.warning("Could not enable compression because no matching handler/method pair was found");
828        }
829    }
830
831    /**
832     * Establishes a connection to the XMPP server. It basically
833     * creates and maintains a socket connection to the server.
834     * <p>
835     * Listeners will be preserved from a previous connection if the reconnection
836     * occurs after an abrupt termination.
837     * </p>
838     *
839     * @throws XMPPException if an error occurs while trying to establish the connection.
840     * @throws SmackException if Smack detected an exceptional situation.
841     * @throws IOException if an I/O error occurred.
842     * @throws InterruptedException if the calling thread was interrupted.
843     */
844    @Override
845    protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException {
846        // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if
847        // there is an error establishing the connection
848        connectUsingConfiguration();
849
850        connected = true;
851
852        // We connected successfully to the servers TCP port
853        initConnection();
854
855        // TLS handled will be true either if TLS was established, or if it was not mandatory.
856        waitForConditionOrThrowConnectionException(() -> tlsHandled, "establishing TLS");
857
858        // Wait with SASL auth until the SASL mechanisms have been received
859        waitForConditionOrThrowConnectionException(() -> saslFeatureReceived, "SASL mechanisms stream feature from server");
860    }
861
862    /**
863     * For unit testing purposes
864     *
865     * @param writer TODO javadoc me please
866     */
867    protected void setWriter(Writer writer) {
868        this.writer = writer;
869    }
870
871    @Override
872    protected void afterFeaturesReceived() throws NotConnectedException, InterruptedException, SecurityRequiredByServerException {
873        StartTls startTlsFeature = getFeature(StartTls.class);
874        if (startTlsFeature != null) {
875            if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) {
876                SecurityRequiredByServerException smackException = new SecurityRequiredByServerException();
877                currentSmackException = smackException;
878                notifyWaitingThreads();
879                throw smackException;
880            }
881
882            if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) {
883                sendNonza(new StartTls());
884            } else {
885                tlsHandled = true;
886                notifyWaitingThreads();
887            }
888        } else {
889            tlsHandled = true;
890            notifyWaitingThreads();
891        }
892
893        if (isSaslAuthenticated()) {
894            // If we have received features after the SASL has been successfully completed, then we
895            // have also *maybe* received, as it is an optional feature, the compression feature
896            // from the server.
897            streamFeaturesAfterAuthenticationReceived = true;
898            notifyWaitingThreads();
899        }
900    }
901
902    private void resetParser() throws IOException {
903        try {
904            packetReader.parser = SmackXmlParser.newXmlParser(reader);
905        } catch (XmlPullParserException e) {
906            throw new IOException(e);
907        }
908   }
909
910    private void openStreamAndResetParser() throws IOException, NotConnectedException, InterruptedException {
911        sendStreamOpen();
912        resetParser();
913    }
914
915    protected class PacketReader {
916
917        private final String threadName = "Smack Reader (" + getConnectionCounter() + ')';
918
919        XmlPullParser parser;
920
921        private volatile boolean done;
922
923        private boolean running;
924
925        /**
926         * Initializes the reader in order to be used. The reader is initialized during the
927         * first connection and when reconnecting due to an abruptly disconnection.
928         */
929        void init() {
930            done = false;
931
932            running = true;
933            Async.go(new Runnable() {
934                @Override
935                public void run() {
936                    LOGGER.finer(threadName + " start");
937                    try {
938                        parsePackets();
939                    } finally {
940                        LOGGER.finer(threadName + " exit");
941                        running = false;
942                        notifyWaitingThreads();
943                    }
944                }
945            }, threadName);
946         }
947
948        /**
949         * Shuts the stanza reader down. This method simply sets the 'done' flag to true.
950         */
951        void shutdown() {
952            done = true;
953        }
954
955        /**
956         * Parse top-level packets in order to process them further.
957         */
958        private void parsePackets() {
959            try {
960                openStreamAndResetParser();
961                XmlPullParser.Event eventType = parser.getEventType();
962                while (!done) {
963                    switch (eventType) {
964                    case START_ELEMENT:
965                        final String name = parser.getName();
966                        final String namespace = parser.getNamespace();
967
968                        switch (name) {
969                        case Message.ELEMENT:
970                        case IQ.IQ_ELEMENT:
971                        case Presence.ELEMENT:
972                            try {
973                                parseAndProcessStanza(parser);
974                            } finally {
975                                clientHandledStanzasCount = SMUtils.incrementHeight(clientHandledStanzasCount);
976                            }
977                            break;
978                        case "stream":
979                            if (StreamOpen.ETHERX_JABBER_STREAMS_NAMESPACE.equals(namespace)) {
980                                onStreamOpen(parser);
981                            }
982                            break;
983                        case "error":
984                            StreamError streamError = PacketParserUtils.parseStreamError(parser);
985                            // Stream errors are non recoverable, throw this exceptions. Also note that this will set
986                            // this exception as current connection exceptions and notify any waiting threads.
987                            throw new StreamErrorException(streamError);
988                        case "features":
989                            parseFeaturesAndNotify(parser);
990                            break;
991                        case "proceed":
992                            // Secure the connection by negotiating TLS
993                            proceedTLSReceived();
994                            // Send a new opening stream to the server
995                            openStreamAndResetParser();
996                            break;
997                        case "failure":
998                            switch (namespace) {
999                            case "urn:ietf:params:xml:ns:xmpp-tls":
1000                                // TLS negotiation has failed. The server will close the connection
1001                                // TODO Parse failure stanza
1002                                throw new SmackException.SmackMessageException("TLS negotiation has failed");
1003                            case "http://jabber.org/protocol/compress":
1004                                // Stream compression has been denied. This is a recoverable
1005                                // situation. It is still possible to authenticate and
1006                                // use the connection but using an uncompressed connection
1007                                // TODO Parse failure stanza
1008                                currentSmackException = new SmackException.SmackMessageException("Could not establish compression");
1009                                notifyWaitingThreads();
1010                                break;
1011                            default:
1012                                parseAndProcessNonza(parser);
1013                            }
1014                            break;
1015                        case Compressed.ELEMENT:
1016                            // Server confirmed that it's possible to use stream compression. Start
1017                            // stream compression
1018                            // Initialize the reader and writer with the new compressed version
1019                            initReaderAndWriter();
1020                            // Send a new opening stream to the server
1021                            openStreamAndResetParser();
1022                            // Notify that compression is being used
1023                            compressSyncPoint = true;
1024                            notifyWaitingThreads();
1025                            break;
1026                        case Enabled.ELEMENT:
1027                            Enabled enabled = ParseStreamManagement.enabled(parser);
1028                            if (enabled.isResumeSet()) {
1029                                smSessionId = enabled.getId();
1030                                if (StringUtils.isNullOrEmpty(smSessionId)) {
1031                                    SmackException xmppException = new SmackException.SmackMessageException("Stream Management 'enabled' element with resume attribute but without session id received");
1032                                    setCurrentConnectionExceptionAndNotify(xmppException);
1033                                    throw xmppException;
1034                                }
1035                                smServerMaxResumptionTime = enabled.getMaxResumptionTime();
1036                            } else {
1037                                // Mark this a non-resumable stream by setting smSessionId to null
1038                                smSessionId = null;
1039                            }
1040                            clientHandledStanzasCount = 0;
1041                            smWasEnabledAtLeastOnce = true;
1042                            smEnabledSyncPoint = true;
1043                            notifyWaitingThreads();
1044                            break;
1045                        case Failed.ELEMENT:
1046                            Failed failed = ParseStreamManagement.failed(parser);
1047                            if (smResumedSyncPoint == SyncPointState.request_sent) {
1048                                // This is a <failed/> nonza in a response to resuming a previous stream, failure to do
1049                                // so is non-fatal as we can simply continue with resource binding in this case.
1050                                smResumptionFailed = failed;
1051                                notifyWaitingThreads();
1052                            } else {
1053                                FailedNonzaException xmppException = new FailedNonzaException(failed, failed.getStanzaErrorCondition());
1054                                setCurrentConnectionExceptionAndNotify(xmppException);
1055                            }
1056                            break;
1057                        case Resumed.ELEMENT:
1058                            Resumed resumed = ParseStreamManagement.resumed(parser);
1059                            if (!smSessionId.equals(resumed.getPrevId())) {
1060                                throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId());
1061                            }
1062                            // Mark SM as enabled
1063                            smEnabledSyncPoint = true;
1064                            // First, drop the stanzas already handled by the server
1065                            processHandledCount(resumed.getHandledCount());
1066                            // Then re-send what is left in the unacknowledged queue
1067                            List<Stanza> stanzasToResend = new ArrayList<>(unacknowledgedStanzas.size());
1068                            unacknowledgedStanzas.drainTo(stanzasToResend);
1069                            for (Stanza stanza : stanzasToResend) {
1070                                sendStanzaInternal(stanza);
1071                            }
1072                            // If there where stanzas resent, then request a SM ack for them.
1073                            // Writer's sendStreamElement() won't do it automatically based on
1074                            // predicates.
1075                            if (!stanzasToResend.isEmpty()) {
1076                                requestSmAcknowledgementInternal();
1077                            }
1078                            // Mark SM resumption as successful
1079                            smResumedSyncPoint = SyncPointState.successful;
1080                            notifyWaitingThreads();
1081                            break;
1082                        case AckAnswer.ELEMENT:
1083                            AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser);
1084                            processHandledCount(ackAnswer.getHandledCount());
1085                            break;
1086                        case AckRequest.ELEMENT:
1087                            ParseStreamManagement.ackRequest(parser);
1088                            if (smEnabledSyncPoint) {
1089                                sendSmAcknowledgementInternal();
1090                            } else {
1091                                LOGGER.warning("SM Ack Request received while SM is not enabled");
1092                            }
1093                            break;
1094                         default:
1095                             parseAndProcessNonza(parser);
1096                             break;
1097                        }
1098                        break;
1099                    case END_ELEMENT:
1100                        final String endTagName = parser.getName();
1101                        if ("stream".equals(endTagName)) {
1102                            if (!parser.getNamespace().equals("http://etherx.jabber.org/streams")) {
1103                                LOGGER.warning(XMPPTCPConnection.this +  " </stream> but different namespace " + parser.getNamespace());
1104                                break;
1105                            }
1106
1107                            // Check if the queue was already shut down before reporting success on closing stream tag
1108                            // received. This avoids a race if there is a disconnect(), followed by a connect(), which
1109                            // did re-start the queue again, causing this writer to assume that the queue is not
1110                            // shutdown, which results in a call to disconnect().
1111                            final boolean queueWasShutdown = packetWriter.queue.isShutdown();
1112                            closingStreamReceived = true;
1113                            notifyWaitingThreads();
1114
1115                            if (queueWasShutdown) {
1116                                // We received a closing stream element *after* we initiated the
1117                                // termination of the session by sending a closing stream element to
1118                                // the server first
1119                                return;
1120                            } else {
1121                                // We received a closing stream element from the server without us
1122                                // sending a closing stream element first. This means that the
1123                                // server wants to terminate the session, therefore disconnect
1124                                // the connection
1125                                LOGGER.info(XMPPTCPConnection.this
1126                                                + " received closing </stream> element."
1127                                                + " Server wants to terminate the connection, calling disconnect()");
1128                                ASYNC_BUT_ORDERED.performAsyncButOrdered(XMPPTCPConnection.this, new Runnable() {
1129                                    @Override
1130                                    public void run() {
1131                                        disconnect();
1132                                    }});
1133                            }
1134                        }
1135                        break;
1136                    case END_DOCUMENT:
1137                        // END_DOCUMENT only happens in an error case, as otherwise we would see a
1138                        // closing stream element before.
1139                        throw new SmackException.SmackMessageException(
1140                                        "Parser got END_DOCUMENT event. This could happen e.g. if the server closed the connection without sending a closing stream element");
1141                    default:
1142                        // Catch all for incomplete switch (MissingCasesInEnumSwitch) statement.
1143                        break;
1144                    }
1145                    eventType = parser.next();
1146                }
1147            }
1148            catch (Exception e) {
1149                // Set running to false since this thread will exit here and notifyConnectionError() will wait until
1150                // the reader and writer thread's 'running' value is false. Hence we need to set it to false before calling
1151                // notifyConnetctionError() below, even though run() also sets it to false. Therefore, do not remove this.
1152                running = false;
1153
1154                String ignoreReasonThread = null;
1155
1156                boolean writerThreadWasShutDown = packetWriter.queue.isShutdown();
1157                if (writerThreadWasShutDown) {
1158                    ignoreReasonThread = "writer";
1159                } else if (done) {
1160                    ignoreReasonThread = "reader";
1161                }
1162
1163                if (ignoreReasonThread != null) {
1164                    LOGGER.log(Level.FINER, "Ignoring " + e + " as " + ignoreReasonThread + " was already shut down");
1165                    return;
1166                }
1167
1168                // Close the connection and notify connection listeners of the error.
1169                notifyConnectionError(e);
1170            }
1171        }
1172    }
1173
1174    protected class PacketWriter {
1175        public static final int QUEUE_SIZE = XMPPTCPConnection.QUEUE_SIZE;
1176        public static final int UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE = 1024;
1177        public static final int UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE_HIGH_WATER_MARK = (int) (0.3 * UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE);
1178
1179        private final String threadName = "Smack Writer (" + getConnectionCounter() + ')';
1180
1181        private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<>(
1182                        QUEUE_SIZE, true);
1183
1184        /**
1185         * If set, the stanza writer is shut down
1186         */
1187        protected volatile Long shutdownTimestamp = null;
1188
1189        private volatile boolean instantShutdown;
1190
1191        /**
1192         * True if some preconditions are given to start the bundle and defer mechanism.
1193         * <p>
1194         * This will likely get set to true right after the start of the writer thread, because
1195         * {@link #nextStreamElement()} will check if {@link queue} is empty, which is probably the case, and then set
1196         * this field to true.
1197         * </p>
1198         */
1199        private boolean shouldBundleAndDefer;
1200
1201        private boolean running;
1202
1203        /**
1204        * Initializes the writer in order to be used. It is called at the first connection and also
1205        * is invoked if the connection is disconnected by an error.
1206        */
1207        void init() {
1208            shutdownTimestamp = null;
1209
1210            if (unacknowledgedStanzas != null) {
1211                // It's possible that there are new stanzas in the writer queue that
1212                // came in while we were disconnected but resumable, drain those into
1213                // the unacknowledged queue so that they get resent now
1214                drainWriterQueueToUnacknowledgedStanzas();
1215            }
1216
1217            queue.start();
1218            running = true;
1219            Async.go(new Runnable() {
1220                @Override
1221                public void run() {
1222                    LOGGER.finer(threadName + " start");
1223                    try {
1224                        writePackets();
1225                    } finally {
1226                        LOGGER.finer(threadName + " exit");
1227                        running = false;
1228                        notifyWaitingThreads();
1229                    }
1230                }
1231            }, threadName);
1232        }
1233
1234        private boolean done() {
1235            return shutdownTimestamp != null;
1236        }
1237
1238        protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException {
1239            final boolean done = done();
1240            if (done) {
1241                final boolean smResumptionPossible = isSmResumptionPossible();
1242                // Don't throw a NotConnectedException is there is an resumable stream available
1243                if (!smResumptionPossible) {
1244                    throw new NotConnectedException(XMPPTCPConnection.this, "done=" + done
1245                                    + " smResumptionPossible=" + smResumptionPossible);
1246                }
1247            }
1248        }
1249
1250        /**
1251         * Sends the specified element to the server.
1252         *
1253         * @param element the element to send.
1254         * @throws NotConnectedException if the XMPP connection is not connected.
1255         * @throws InterruptedException if the calling thread was interrupted.
1256         */
1257        protected void sendStreamElement(Element element) throws NotConnectedException, InterruptedException {
1258            throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
1259            try {
1260                queue.put(element);
1261            }
1262            catch (InterruptedException e) {
1263                // put() may throw an InterruptedException for two reasons:
1264                // 1. If the queue was shut down
1265                // 2. If the thread was interrupted
1266                // so we have to check which is the case
1267                throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
1268                // If the method above did not throw, then the sending thread was interrupted
1269                throw e;
1270            }
1271        }
1272
1273        /**
1274         * Shuts down the stanza writer. Once this method has been called, no further
1275         * packets will be written to the server.
1276         */
1277        void shutdown(boolean instant) {
1278            instantShutdown = instant;
1279            queue.shutdown();
1280            shutdownTimestamp = System.currentTimeMillis();
1281        }
1282
1283        /**
1284         * Maybe return the next available element from the queue for writing. If the queue is shut down <b>or</b> a
1285         * spurious interrupt occurs, <code>null</code> is returned. So it is important to check the 'done' condition in
1286         * that case.
1287         *
1288         * @return the next element for writing or null.
1289         */
1290        private Element nextStreamElement() {
1291            // It is important the we check if the queue is empty before removing an element from it
1292            if (queue.isEmpty()) {
1293                shouldBundleAndDefer = true;
1294            }
1295            Element packet = null;
1296            try {
1297                packet = queue.take();
1298            }
1299            catch (InterruptedException e) {
1300                if (!queue.isShutdown()) {
1301                    // Users shouldn't try to interrupt the packet writer thread
1302                    LOGGER.log(Level.WARNING, "Writer thread was interrupted. Don't do that. Use disconnect() instead.", e);
1303                }
1304            }
1305            return packet;
1306        }
1307
1308        private void writePackets() {
1309            try {
1310                // Write out packets from the queue.
1311                while (!done()) {
1312                    Element element = nextStreamElement();
1313                    if (element == null) {
1314                        continue;
1315                    }
1316
1317                    // Get a local version of the bundle and defer callback, in case it's unset
1318                    // between the null check and the method invocation
1319                    final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback;
1320                    // If the preconditions are given (e.g. bundleAndDefer callback is set, queue is
1321                    // empty), then we could wait a bit for further stanzas attempting to decrease
1322                    // our energy consumption
1323                    if (localBundleAndDeferCallback != null && isAuthenticated() && shouldBundleAndDefer) {
1324                        // Reset shouldBundleAndDefer to false, nextStreamElement() will set it to true once the
1325                        // queue is empty again.
1326                        shouldBundleAndDefer = false;
1327                        final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean();
1328                        final int bundleAndDeferMillis = localBundleAndDeferCallback.getBundleAndDeferMillis(new BundleAndDefer(
1329                                        bundlingAndDeferringStopped));
1330                        if (bundleAndDeferMillis > 0) {
1331                            long remainingWait = bundleAndDeferMillis;
1332                            final long waitStart = System.currentTimeMillis();
1333                            synchronized (bundlingAndDeferringStopped) {
1334                                while (!bundlingAndDeferringStopped.get() && remainingWait > 0) {
1335                                    bundlingAndDeferringStopped.wait(remainingWait);
1336                                    remainingWait = bundleAndDeferMillis
1337                                                    - (System.currentTimeMillis() - waitStart);
1338                                }
1339                            }
1340                        }
1341                    }
1342
1343                    Stanza packet = null;
1344                    if (element instanceof Stanza) {
1345                        packet = (Stanza) element;
1346                    }
1347                    else if (element instanceof Enable) {
1348                        // The client needs to add messages to the unacknowledged stanzas queue
1349                        // right after it sent 'enabled'. Stanza will be added once
1350                        // unacknowledgedStanzas is not null.
1351                        unacknowledgedStanzas = new ArrayBlockingQueue<>(UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE);
1352                    }
1353                    maybeAddToUnacknowledgedStanzas(packet);
1354
1355                    CharSequence elementXml = element.toXML(outgoingStreamXmlEnvironment);
1356                    if (elementXml instanceof XmlStringBuilder) {
1357                        try {
1358                            ((XmlStringBuilder) elementXml).write(writer, outgoingStreamXmlEnvironment);
1359                        } catch (NullPointerException npe) {
1360                            LOGGER.log(Level.FINE, "NPE in XmlStringBuilder of " + element.getClass() + ": " + element, npe);
1361                            throw npe;
1362                        }
1363                    }
1364                    else {
1365                        writer.write(elementXml.toString());
1366                    }
1367
1368                    if (queue.isEmpty()) {
1369                        writer.flush();
1370                    }
1371                    if (packet != null) {
1372                        firePacketSendingListeners(packet);
1373                    }
1374                }
1375                if (!instantShutdown) {
1376                    // Flush out the rest of the queue.
1377                    try {
1378                        while (!queue.isEmpty()) {
1379                            Element packet = queue.remove();
1380                            if (packet instanceof Stanza) {
1381                                Stanza stanza = (Stanza) packet;
1382                                maybeAddToUnacknowledgedStanzas(stanza);
1383                            }
1384                            writer.write(packet.toXML().toString());
1385                        }
1386                    }
1387                    catch (Exception e) {
1388                        LOGGER.log(Level.WARNING,
1389                                        "Exception flushing queue during shutdown, ignore and continue",
1390                                        e);
1391                    }
1392
1393                    // Close the stream.
1394                    try {
1395                        writer.write("</stream:stream>");
1396                        writer.flush();
1397                    }
1398                    catch (Exception e) {
1399                        LOGGER.log(Level.WARNING, "Exception writing closing stream element", e);
1400                    }
1401
1402                    // Delete the queue contents (hopefully nothing is left).
1403                    queue.clear();
1404                } else if (instantShutdown && isSmEnabled()) {
1405                    // This was an instantShutdown and SM is enabled, drain all remaining stanzas
1406                    // into the unacknowledgedStanzas queue
1407                    drainWriterQueueToUnacknowledgedStanzas();
1408                }
1409                // Do *not* close the writer here, as it will cause the socket
1410                // to get closed. But we may want to receive further stanzas
1411                // until the closing stream tag is received. The socket will be
1412                // closed in shutdown().
1413            }
1414            catch (Exception e) {
1415                // The exception can be ignored if the the connection is 'done'
1416                // or if the it was caused because the socket got closed
1417                if (!(done() || queue.isShutdown())) {
1418                    // Set running to false since this thread will exit here and notifyConnectionError() will wait until
1419                    // the reader and writer thread's 'running' value is false.
1420                    running = false;
1421                    notifyConnectionError(e);
1422                } else {
1423                    LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e);
1424                }
1425            }
1426        }
1427
1428        private void drainWriterQueueToUnacknowledgedStanzas() {
1429            List<Element> elements = new ArrayList<>(queue.size());
1430            queue.drainTo(elements);
1431            for (int i = 0; i < elements.size(); i++) {
1432                Element element = elements.get(i);
1433                // If the unacknowledgedStanza queue is full, then bail out with a warning message. See SMACK-844.
1434                if (unacknowledgedStanzas.remainingCapacity() == 0) {
1435                    StreamManagementException.UnacknowledgedQueueFullException exception = StreamManagementException.UnacknowledgedQueueFullException
1436                            .newWith(i, elements, unacknowledgedStanzas);
1437                    LOGGER.log(Level.WARNING,
1438                            "Some stanzas may be lost as not all could be drained to the unacknowledged stanzas queue", exception);
1439                    return;
1440                }
1441                if (element instanceof Stanza) {
1442                    unacknowledgedStanzas.add((Stanza) element);
1443                }
1444            }
1445        }
1446
1447        private void maybeAddToUnacknowledgedStanzas(Stanza stanza) throws IOException {
1448            // Check if the stream element should be put to the unacknowledgedStanza
1449            // queue. Note that we can not do the put() in sendStanzaInternal() and the
1450            // packet order is not stable at this point (sendStanzaInternal() can be
1451            // called concurrently).
1452            if (unacknowledgedStanzas != null && stanza != null) {
1453                // If the unacknowledgedStanza queue reaching its high water mark, request an new ack
1454                // from the server in order to drain it
1455                if (unacknowledgedStanzas.size() == UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE_HIGH_WATER_MARK) {
1456                    writer.write(AckRequest.INSTANCE.toXML().toString());
1457                }
1458
1459                try {
1460                    // It is important the we put the stanza in the unacknowledged stanza
1461                    // queue before we put it on the wire
1462                    unacknowledgedStanzas.put(stanza);
1463                }
1464                catch (InterruptedException e) {
1465                    throw new IllegalStateException(e);
1466                }
1467            }
1468        }
1469    }
1470
1471    /**
1472     * Set if Stream Management should be used by default for new connections.
1473     *
1474     * @param useSmDefault true to use Stream Management for new connections.
1475     */
1476    public static void setUseStreamManagementDefault(boolean useSmDefault) {
1477        XMPPTCPConnection.useSmDefault = useSmDefault;
1478    }
1479
1480    /**
1481     * Set if Stream Management resumption should be used by default for new connections.
1482     *
1483     * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
1484     * @deprecated use {@link #setUseStreamManagementResumptionDefault(boolean)} instead.
1485     */
1486    @Deprecated
1487    public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) {
1488        setUseStreamManagementResumptionDefault(useSmResumptionDefault);
1489    }
1490
1491    /**
1492     * Set if Stream Management resumption should be used by default for new connections.
1493     *
1494     * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
1495     */
1496    public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) {
1497        if (useSmResumptionDefault) {
1498            // Also enable SM is resumption is enabled
1499            setUseStreamManagementDefault(useSmResumptionDefault);
1500        }
1501        XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault;
1502    }
1503
1504    /**
1505     * Set if Stream Management should be used if supported by the server.
1506     *
1507     * @param useSm true to use Stream Management.
1508     */
1509    public void setUseStreamManagement(boolean useSm) {
1510        this.useSm = useSm;
1511    }
1512
1513    /**
1514     * Set if Stream Management resumption should be used if supported by the server.
1515     *
1516     * @param useSmResumption true to use Stream Management resumption.
1517     */
1518    public void setUseStreamManagementResumption(boolean useSmResumption) {
1519        if (useSmResumption) {
1520            // Also enable SM is resumption is enabled
1521            setUseStreamManagement(useSmResumption);
1522        }
1523        this.useSmResumption = useSmResumption;
1524    }
1525
1526    /**
1527     * Set the preferred resumption time in seconds.
1528     * @param resumptionTime the preferred resumption time in seconds
1529     */
1530    public void setPreferredResumptionTime(int resumptionTime) {
1531        smClientMaxResumptionTime = resumptionTime;
1532    }
1533
1534    /**
1535     * Add a predicate for Stream Management acknowledgment requests.
1536     * <p>
1537     * Those predicates are used to determine when a Stream Management acknowledgement request is send to the server.
1538     * Some pre-defined predicates are found in the <code>org.jivesoftware.smack.sm.predicates</code> package.
1539     * </p>
1540     * <p>
1541     * If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used.
1542     * </p>
1543     *
1544     * @param predicate the predicate to add.
1545     * @return if the predicate was not already active.
1546     */
1547    public boolean addRequestAckPredicate(StanzaFilter predicate) {
1548        synchronized (requestAckPredicates) {
1549            return requestAckPredicates.add(predicate);
1550        }
1551    }
1552
1553    /**
1554     * Remove the given predicate for Stream Management acknowledgment request.
1555     * @param predicate the predicate to remove.
1556     * @return true if the predicate was removed.
1557     */
1558    public boolean removeRequestAckPredicate(StanzaFilter predicate) {
1559        synchronized (requestAckPredicates) {
1560            return requestAckPredicates.remove(predicate);
1561        }
1562    }
1563
1564    /**
1565     * Remove all predicates for Stream Management acknowledgment requests.
1566     */
1567    public void removeAllRequestAckPredicates() {
1568        synchronized (requestAckPredicates) {
1569            requestAckPredicates.clear();
1570        }
1571    }
1572
1573    /**
1574     * Send an unconditional Stream Management acknowledgement request to the server.
1575     *
1576     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1577     * @throws NotConnectedException if the connection is not connected.
1578     * @throws InterruptedException if the calling thread was interrupted.
1579     */
1580    public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException {
1581        if (!isSmEnabled()) {
1582            throw new StreamManagementException.StreamManagementNotEnabledException();
1583        }
1584        requestSmAcknowledgementInternal();
1585    }
1586
1587    private void requestSmAcknowledgementInternal() throws NotConnectedException, InterruptedException {
1588        packetWriter.sendStreamElement(AckRequest.INSTANCE);
1589    }
1590
1591    /**
1592     * Send a unconditional Stream Management acknowledgment to the server.
1593     * <p>
1594     * See <a href="http://xmpp.org/extensions/xep-0198.html#acking">XEP-198: Stream Management ยง 4. Acks</a>:
1595     * "Either party MAY send an &lt;a/&gt; element at any time (e.g., after it has received a certain number of stanzas,
1596     * or after a certain period of time), even if it has not received an &lt;r/&gt; element from the other party."
1597     * </p>
1598     *
1599     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1600     * @throws NotConnectedException if the connection is not connected.
1601     * @throws InterruptedException if the calling thread was interrupted.
1602     */
1603    public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException {
1604        if (!isSmEnabled()) {
1605            throw new StreamManagementException.StreamManagementNotEnabledException();
1606        }
1607        sendSmAcknowledgementInternal();
1608    }
1609
1610    private void sendSmAcknowledgementInternal() throws NotConnectedException, InterruptedException {
1611        AckAnswer ackAnswer = new AckAnswer(clientHandledStanzasCount);
1612        // Do net put an ack to the queue if it has already been shutdown. Some servers, like ejabberd, like to request
1613        // an ack even after we have send a stream close (and hance the queue was shutdown). If we would not check here,
1614        // then the ack would dangle around in the queue, and be send on the next re-connection attempt even before the
1615        // stream open.
1616        packetWriter.queue.putIfNotShutdown(ackAnswer);
1617    }
1618
1619    /**
1620     * Add a Stanza acknowledged listener.
1621     * <p>
1622     * Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get
1623     * automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, StanzaListener)} when
1624     * possible.
1625     * </p>
1626     *
1627     * @param listener the listener to add.
1628     */
1629    public void addStanzaAcknowledgedListener(StanzaListener listener) {
1630        stanzaAcknowledgedListeners.add(listener);
1631    }
1632
1633    /**
1634     * Remove the given Stanza acknowledged listener.
1635     *
1636     * @param listener the listener.
1637     * @return true if the listener was removed.
1638     */
1639    public boolean removeStanzaAcknowledgedListener(StanzaListener listener) {
1640        return stanzaAcknowledgedListeners.remove(listener);
1641    }
1642
1643    /**
1644     * Remove all stanza acknowledged listeners.
1645     */
1646    public void removeAllStanzaAcknowledgedListeners() {
1647        stanzaAcknowledgedListeners.clear();
1648    }
1649
1650    /**
1651     * Add a Stanza dropped listener.
1652     * <p>
1653     * Those listeners will be invoked every time a Stanza has been dropped due to a failed SM resume. They will not get
1654     * automatically removed. If at least one StanzaDroppedListener is configured, no attempt will be made to retransmit
1655     * the Stanzas.
1656     * </p>
1657     *
1658     * @param listener the listener to add.
1659     * @since 4.3.3
1660     */
1661    public void addStanzaDroppedListener(StanzaListener listener) {
1662        stanzaDroppedListeners.add(listener);
1663    }
1664
1665    /**
1666     * Remove the given Stanza dropped listener.
1667     *
1668     * @param listener the listener.
1669     * @return true if the listener was removed.
1670     * @since 4.3.3
1671     */
1672    public boolean removeStanzaDroppedListener(StanzaListener listener) {
1673        return stanzaDroppedListeners.remove(listener);
1674    }
1675
1676    /**
1677     * Add a new Stanza ID acknowledged listener for the given ID.
1678     * <p>
1679     * The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will
1680     * automatically be removed after the listener was run.
1681     * </p>
1682     *
1683     * @param id the stanza ID.
1684     * @param listener the listener to invoke.
1685     * @return the previous listener for this stanza ID or null.
1686     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1687     */
1688    @SuppressWarnings("FutureReturnValueIgnored")
1689    public StanzaListener addStanzaIdAcknowledgedListener(final String id, StanzaListener listener) throws StreamManagementNotEnabledException {
1690        // Prevent users from adding callbacks that will never get removed
1691        if (!smWasEnabledAtLeastOnce) {
1692            throw new StreamManagementException.StreamManagementNotEnabledException();
1693        }
1694        // Remove the listener after max. 3 hours
1695        final int removeAfterSeconds = Math.min(getMaxSmResumptionTime(), 3 * 60 * 60);
1696        schedule(new Runnable() {
1697            @Override
1698            public void run() {
1699                stanzaIdAcknowledgedListeners.remove(id);
1700            }
1701        }, removeAfterSeconds, TimeUnit.SECONDS);
1702        return stanzaIdAcknowledgedListeners.put(id, listener);
1703    }
1704
1705    /**
1706     * Remove the Stanza ID acknowledged listener for the given ID.
1707     *
1708     * @param id the stanza ID.
1709     * @return true if the listener was found and removed, false otherwise.
1710     */
1711    public StanzaListener removeStanzaIdAcknowledgedListener(String id) {
1712        return stanzaIdAcknowledgedListeners.remove(id);
1713    }
1714
1715    /**
1716     * Removes all Stanza ID acknowledged listeners.
1717     */
1718    public void removeAllStanzaIdAcknowledgedListeners() {
1719        stanzaIdAcknowledgedListeners.clear();
1720    }
1721
1722    /**
1723     * Returns true if Stream Management is supported by the server.
1724     *
1725     * @return true if Stream Management is supported by the server.
1726     */
1727    public boolean isSmAvailable() {
1728        return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE);
1729    }
1730
1731    /**
1732     * Returns true if Stream Management was successfully negotiated with the server.
1733     *
1734     * @return true if Stream Management was negotiated.
1735     */
1736    public boolean isSmEnabled() {
1737        return smEnabledSyncPoint;
1738    }
1739
1740    /**
1741     * Returns true if the stream was successfully resumed with help of Stream Management.
1742     *
1743     * @return true if the stream was resumed.
1744     */
1745    public boolean streamWasResumed() {
1746        return smResumedSyncPoint == SyncPointState.successful;
1747    }
1748
1749    /**
1750     * Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible.
1751     *
1752     * @return true if disconnected but resumption possible.
1753     */
1754    public boolean isDisconnectedButSmResumptionPossible() {
1755        return disconnectedButResumeable && isSmResumptionPossible();
1756    }
1757
1758    /**
1759     * Returns true if the stream is resumable.
1760     *
1761     * @return true if the stream is resumable.
1762     */
1763    public boolean isSmResumptionPossible() {
1764        // There is no resumable stream available
1765        if (smSessionId == null)
1766            return false;
1767
1768        final Long shutdownTimestamp = packetWriter.shutdownTimestamp;
1769        // Seems like we are already reconnected, report true
1770        if (shutdownTimestamp == null) {
1771            return true;
1772        }
1773
1774        // See if resumption time is over
1775        long current = System.currentTimeMillis();
1776        long maxResumptionMillies = ((long) getMaxSmResumptionTime()) * 1000;
1777        if (current > shutdownTimestamp + maxResumptionMillies) {
1778            // Stream resumption is *not* possible if the current timestamp is greater then the greatest timestamp where
1779            // resumption is possible
1780            return false;
1781        } else {
1782            return true;
1783        }
1784    }
1785
1786    /**
1787     * Drop the stream management state. Sets {@link #smSessionId} and
1788     * {@link #unacknowledgedStanzas} to <code>null</code>.
1789     */
1790    private void dropSmState() {
1791        // clientHandledCount and serverHandledCount will be reset on <enable/> and <enabled/>
1792        // respective. No need to reset them here.
1793        smSessionId = null;
1794        unacknowledgedStanzas = null;
1795    }
1796
1797    /**
1798     * Get the maximum resumption time in seconds after which a managed stream can be resumed.
1799     * <p>
1800     * This method will return {@link Integer#MAX_VALUE} if neither the client nor the server specify a maximum
1801     * resumption time. Be aware of integer overflows when using this value, e.g. do not add arbitrary values to it
1802     * without checking for overflows before.
1803     * </p>
1804     *
1805     * @return the maximum resumption time in seconds or {@link Integer#MAX_VALUE} if none set.
1806     */
1807    public int getMaxSmResumptionTime() {
1808        int clientResumptionTime = smClientMaxResumptionTime > 0 ? smClientMaxResumptionTime : Integer.MAX_VALUE;
1809        int serverResumptionTime = smServerMaxResumptionTime > 0 ? smServerMaxResumptionTime : Integer.MAX_VALUE;
1810        return Math.min(clientResumptionTime, serverResumptionTime);
1811    }
1812
1813    private void processHandledCount(long handledCount) throws StreamManagementCounterError {
1814        long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount);
1815        final List<Stanza> ackedStanzas = new ArrayList<>(
1816                        ackedStanzasCount <= Integer.MAX_VALUE ? (int) ackedStanzasCount
1817                                        : Integer.MAX_VALUE);
1818        for (long i = 0; i < ackedStanzasCount; i++) {
1819            Stanza ackedStanza = unacknowledgedStanzas.poll();
1820            // If the server ack'ed a stanza, then it must be in the
1821            // unacknowledged stanza queue. There can be no exception.
1822            if (ackedStanza == null) {
1823                throw new StreamManagementCounterError(handledCount, serverHandledStanzasCount,
1824                                ackedStanzasCount, ackedStanzas);
1825            }
1826            ackedStanzas.add(ackedStanza);
1827        }
1828
1829        boolean atLeastOneStanzaAcknowledgedListener = false;
1830        if (!stanzaAcknowledgedListeners.isEmpty()) {
1831            // If stanzaAcknowledgedListeners is not empty, the we have at least one
1832            atLeastOneStanzaAcknowledgedListener = true;
1833        }
1834        else {
1835            // Otherwise we look for a matching id in the stanza *id* acknowledged listeners
1836            for (Stanza ackedStanza : ackedStanzas) {
1837                String id = ackedStanza.getStanzaId();
1838                if (id != null && stanzaIdAcknowledgedListeners.containsKey(id)) {
1839                    atLeastOneStanzaAcknowledgedListener = true;
1840                    break;
1841                }
1842            }
1843        }
1844
1845        // Only spawn a new thread if there is a chance that some listener is invoked
1846        if (atLeastOneStanzaAcknowledgedListener) {
1847            asyncGo(new Runnable() {
1848                @Override
1849                public void run() {
1850                    for (Stanza ackedStanza : ackedStanzas) {
1851                        for (StanzaListener listener : stanzaAcknowledgedListeners) {
1852                            try {
1853                                listener.processStanza(ackedStanza);
1854                            }
1855                            catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
1856                                LOGGER.log(Level.FINER, "Received exception", e);
1857                            }
1858                        }
1859                        String id = ackedStanza.getStanzaId();
1860                        if (StringUtils.isNullOrEmpty(id)) {
1861                            continue;
1862                        }
1863                        StanzaListener listener = stanzaIdAcknowledgedListeners.remove(id);
1864                        if (listener != null) {
1865                            try {
1866                                listener.processStanza(ackedStanza);
1867                            }
1868                            catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
1869                                LOGGER.log(Level.FINER, "Received exception", e);
1870                            }
1871                        }
1872                    }
1873                }
1874            });
1875        }
1876
1877        serverHandledStanzasCount = handledCount;
1878    }
1879
1880    /**
1881     * Set the default bundle and defer callback used for new connections.
1882     *
1883     * @param defaultBundleAndDeferCallback TODO javadoc me please
1884     * @see BundleAndDeferCallback
1885     * @since 4.1
1886     */
1887    public static void setDefaultBundleAndDeferCallback(BundleAndDeferCallback defaultBundleAndDeferCallback) {
1888        XMPPTCPConnection.defaultBundleAndDeferCallback = defaultBundleAndDeferCallback;
1889    }
1890
1891    /**
1892     * Set the bundle and defer callback used for this connection.
1893     * <p>
1894     * You can use <code>null</code> as argument to reset the callback. Outgoing stanzas will then
1895     * no longer get deferred.
1896     * </p>
1897     *
1898     * @param bundleAndDeferCallback the callback or <code>null</code>.
1899     * @see BundleAndDeferCallback
1900     * @since 4.1
1901     */
1902    public void setBundleandDeferCallback(BundleAndDeferCallback bundleAndDeferCallback) {
1903        this.bundleAndDeferCallback = bundleAndDeferCallback;
1904    }
1905
1906}