001/**
002 *
003 * Copyright 2003-2007 Jive Software.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack.tcp;
018
019import java.io.BufferedReader;
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.InputStreamReader;
023import java.io.OutputStream;
024import java.io.OutputStreamWriter;
025import java.io.Writer;
026import java.net.InetAddress;
027import java.net.InetSocketAddress;
028import java.net.Socket;
029import java.security.cert.CertificateException;
030import java.util.ArrayList;
031import java.util.Collection;
032import java.util.Iterator;
033import java.util.LinkedHashSet;
034import java.util.LinkedList;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import java.util.concurrent.ArrayBlockingQueue;
039import java.util.concurrent.BlockingQueue;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.concurrent.ConcurrentLinkedQueue;
042import java.util.concurrent.TimeUnit;
043import java.util.concurrent.atomic.AtomicBoolean;
044import java.util.logging.Level;
045import java.util.logging.Logger;
046
047import javax.net.SocketFactory;
048import javax.net.ssl.HostnameVerifier;
049import javax.net.ssl.SSLSession;
050import javax.net.ssl.SSLSocket;
051
052import org.jivesoftware.smack.AbstractXMPPConnection;
053import org.jivesoftware.smack.ConnectionConfiguration;
054import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
055import org.jivesoftware.smack.ConnectionListener;
056import org.jivesoftware.smack.SmackConfiguration;
057import org.jivesoftware.smack.SmackException;
058import org.jivesoftware.smack.SmackException.AlreadyConnectedException;
059import org.jivesoftware.smack.SmackException.AlreadyLoggedInException;
060import org.jivesoftware.smack.SmackException.ConnectionException;
061import org.jivesoftware.smack.SmackException.EndpointConnectionException;
062import org.jivesoftware.smack.SmackException.NotConnectedException;
063import org.jivesoftware.smack.SmackException.NotLoggedInException;
064import org.jivesoftware.smack.SmackException.SecurityNotPossibleException;
065import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException;
066import org.jivesoftware.smack.SmackFuture;
067import org.jivesoftware.smack.StanzaListener;
068import org.jivesoftware.smack.XMPPConnection;
069import org.jivesoftware.smack.XMPPException;
070import org.jivesoftware.smack.XMPPException.FailedNonzaException;
071import org.jivesoftware.smack.XMPPException.StreamErrorException;
072import org.jivesoftware.smack.compress.packet.Compress;
073import org.jivesoftware.smack.compress.packet.Compressed;
074import org.jivesoftware.smack.compression.XMPPInputOutputStream;
075import org.jivesoftware.smack.datatypes.UInt16;
076import org.jivesoftware.smack.filter.StanzaFilter;
077import org.jivesoftware.smack.internal.SmackTlsContext;
078import org.jivesoftware.smack.packet.Element;
079import org.jivesoftware.smack.packet.IQ;
080import org.jivesoftware.smack.packet.Message;
081import org.jivesoftware.smack.packet.Nonza;
082import org.jivesoftware.smack.packet.Presence;
083import org.jivesoftware.smack.packet.Stanza;
084import org.jivesoftware.smack.packet.StartTls;
085import org.jivesoftware.smack.packet.StreamError;
086import org.jivesoftware.smack.proxy.ProxyInfo;
087import org.jivesoftware.smack.sasl.packet.SaslNonza;
088import org.jivesoftware.smack.sm.SMUtils;
089import org.jivesoftware.smack.sm.StreamManagementException;
090import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException;
091import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError;
092import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException;
093import org.jivesoftware.smack.sm.packet.StreamManagement;
094import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer;
095import org.jivesoftware.smack.sm.packet.StreamManagement.AckRequest;
096import org.jivesoftware.smack.sm.packet.StreamManagement.Enable;
097import org.jivesoftware.smack.sm.packet.StreamManagement.Enabled;
098import org.jivesoftware.smack.sm.packet.StreamManagement.Failed;
099import org.jivesoftware.smack.sm.packet.StreamManagement.Resume;
100import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed;
101import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature;
102import org.jivesoftware.smack.sm.predicates.Predicate;
103import org.jivesoftware.smack.sm.provider.ParseStreamManagement;
104import org.jivesoftware.smack.tcp.rce.RemoteXmppTcpConnectionEndpoints;
105import org.jivesoftware.smack.tcp.rce.Rfc6120TcpRemoteConnectionEndpoint;
106import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
107import org.jivesoftware.smack.util.Async;
108import org.jivesoftware.smack.util.CloseableUtil;
109import org.jivesoftware.smack.util.PacketParserUtils;
110import org.jivesoftware.smack.util.StringUtils;
111import org.jivesoftware.smack.util.TLSUtils;
112import org.jivesoftware.smack.util.XmlStringBuilder;
113import org.jivesoftware.smack.util.rce.RemoteConnectionException;
114import org.jivesoftware.smack.xml.SmackXmlParser;
115import org.jivesoftware.smack.xml.XmlPullParser;
116import org.jivesoftware.smack.xml.XmlPullParserException;
117
118import org.jxmpp.jid.impl.JidCreate;
119import org.jxmpp.jid.parts.Resourcepart;
120import org.jxmpp.stringprep.XmppStringprepException;
121import org.minidns.dnsname.DnsName;
122
123/**
124 * Creates a socket connection to an XMPP server. This is the default connection
125 * to an XMPP server and is specified in the XMPP Core (RFC 6120).
126 *
127 * @see XMPPConnection
128 * @author Matt Tucker
129 */
130public class XMPPTCPConnection extends AbstractXMPPConnection {
131
132    private static final int QUEUE_SIZE = 500;
133    private static final Logger LOGGER = Logger.getLogger(XMPPTCPConnection.class.getName());
134
135    /**
136     * The socket which is used for this connection.
137     */
138    private Socket socket;
139
140    /**
141     *
142     */
143    private boolean disconnectedButResumeable = false;
144
145    private SSLSocket secureSocket;
146
147    /**
148     * Protected access level because of unit test purposes
149     */
150    protected final PacketWriter packetWriter = new PacketWriter();
151
152    /**
153     * Protected access level because of unit test purposes
154     */
155    protected final PacketReader packetReader = new PacketReader();
156
157    /**
158     *
159     */
160    private boolean streamFeaturesAfterAuthenticationReceived;
161
162    /**
163     *
164     */
165    private boolean compressSyncPoint;
166
167    /**
168     * The default bundle and defer callback, used for new connections.
169     * @see bundleAndDeferCallback
170     */
171    private static BundleAndDeferCallback defaultBundleAndDeferCallback;
172
173    /**
174     * The used bundle and defer callback.
175     * <p>
176     * Although this field may be set concurrently, the 'volatile' keyword was deliberately not added, in order to avoid
177     * having a 'volatile' read within the writer threads loop.
178     * </p>
179     */
180    private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback;
181
182    private static boolean useSmDefault = true;
183
184    private static boolean useSmResumptionDefault = true;
185
186    /**
187     * The stream ID of the stream that is currently resumable, ie. the stream we hold the state
188     * for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and
189     * {@link #unFailedNonzaExceptionacknowledgedStanzas}.
190     */
191    private String smSessionId;
192
193    /**
194     * Represents the state of stream management resumption.
195     * <p>
196     * Unlike other sync points, this sync point is marked volatile because it is also read by the reader thread.
197     * </p>
198     */
199    private volatile SyncPointState smResumedSyncPoint;
200    private Failed smResumptionFailed;
201
202    /**
203     * Represents the state of stream magement.
204     * <p>
205     * This boolean is marked volatile as it is read by various threads, including the reader thread via {@link #isSmEnabled()}.
206     * </p>
207     */
208    private volatile boolean smEnabledSyncPoint;
209
210    /**
211     * The client's preferred maximum resumption time in seconds.
212     */
213    private int smClientMaxResumptionTime = -1;
214
215    /**
216     * The server's preferred maximum resumption time in seconds.
217     */
218    private int smServerMaxResumptionTime = -1;
219
220    /**
221     * Indicates whether Stream Management (XEP-198) should be used if it's supported by the server.
222     */
223    private boolean useSm = useSmDefault;
224    private boolean useSmResumption = useSmResumptionDefault;
225
226    /**
227     * The counter that the server sends the client about it's current height. For example, if the server sends
228     * {@code <a h='42'/>}, then this will be set to 42 (while also handling the {@link #unacknowledgedStanzas} queue).
229     */
230    private long serverHandledStanzasCount = 0;
231
232    /**
233     * The counter for stanzas handled ("received") by the client.
234     * <p>
235     * Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are
236     * not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since
237     * {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and
238     * therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic.
239     * </p>
240     */
241    private long clientHandledStanzasCount = 0;
242
243    private BlockingQueue<Stanza> unacknowledgedStanzas;
244
245    /**
246     * Set to true if Stream Management was at least once enabled for this connection.
247     */
248    private boolean smWasEnabledAtLeastOnce = false;
249
250    /**
251     * This listeners are invoked for every stanza that got acknowledged.
252     * <p>
253     * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove
254     * themselves after they have been invoked.
255     * </p>
256     */
257    private final Collection<StanzaListener> stanzaAcknowledgedListeners = new ConcurrentLinkedQueue<>();
258
259    /**
260     * These listeners are invoked for every stanza that got dropped.
261     * <p>
262     * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove
263     * themselves after they have been invoked.
264     * </p>
265     */
266    private final Collection<StanzaListener> stanzaDroppedListeners = new ConcurrentLinkedQueue<>();
267
268    /**
269     * This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will
270     * only be invoked once and automatically removed after that.
271     */
272    private final Map<String, StanzaListener> stanzaIdAcknowledgedListeners = new ConcurrentHashMap<>();
273
274    /**
275     * Predicates that determine if an stream management ack should be requested from the server.
276     * <p>
277     * We use a linked hash set here, so that the order how the predicates are added matches the
278     * order in which they are invoked in order to determine if an ack request should be send or not.
279     * </p>
280     */
281    private final Set<StanzaFilter> requestAckPredicates = new LinkedHashSet<>();
282
283    @SuppressWarnings("HidingField")
284    private final XMPPTCPConnectionConfiguration config;
285
286    /**
287     * Creates a new XMPP connection over TCP (optionally using proxies).
288     * <p>
289     * Note that XMPPTCPConnection constructors do not establish a connection to the server
290     * and you must call {@link #connect()}.
291     * </p>
292     *
293     * @param config the connection configuration.
294     */
295    public XMPPTCPConnection(XMPPTCPConnectionConfiguration config) {
296        super(config);
297        this.config = config;
298        addConnectionListener(new ConnectionListener() {
299            @Override
300            public void connectionClosedOnError(Exception e) {
301                if (e instanceof XMPPException.StreamErrorException || e instanceof StreamManagementException) {
302                    dropSmState();
303                }
304            }
305        });
306
307        // Re-init the reader and writer in case of SASL <success/>. This is done to reset the parser since a new stream
308        // is initiated.
309        buildNonzaCallback().listenFor(SaslNonza.Success.class, s -> resetParser()).install();
310    }
311
312    /**
313     * Creates a new XMPP connection over TCP.
314     * <p>
315     * Note that {@code jid} must be the bare JID, e.g. "user@example.org". More fine-grained control over the
316     * connection settings is available using the {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)}
317     * constructor.
318     * </p>
319     *
320     * @param jid the bare JID used by the client.
321     * @param password the password or authentication token.
322     * @throws XmppStringprepException if the provided string is invalid.
323     */
324    public XMPPTCPConnection(CharSequence jid, String password) throws XmppStringprepException {
325        this(XMPPTCPConnectionConfiguration.builder().setXmppAddressAndPassword(jid, password).build());
326    }
327
328    /**
329     * Creates a new XMPP connection over TCP.
330     * <p>
331     * This is the simplest constructor for connecting to an XMPP server. Alternatively,
332     * you can get fine-grained control over connection settings using the
333     * {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} constructor.
334     * </p>
335     * @param username TODO javadoc me please
336     * @param password TODO javadoc me please
337     * @param serviceName TODO javadoc me please
338     * @throws XmppStringprepException if the provided string is invalid.
339     */
340    public XMPPTCPConnection(CharSequence username, String password, String serviceName) throws XmppStringprepException {
341        this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setXmppDomain(
342                                        JidCreate.domainBareFrom(serviceName)).build());
343    }
344
345    @Override
346    protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException {
347        if (packetWriter == null) {
348            throw new NotConnectedException();
349        }
350        packetWriter.throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
351    }
352
353    @Override
354    protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException {
355        if (isConnected() && !disconnectedButResumeable) {
356            throw new AlreadyConnectedException();
357        }
358    }
359
360    @Override
361    protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException {
362        if (isAuthenticated() && !disconnectedButResumeable) {
363            throw new AlreadyLoggedInException();
364        }
365    }
366
367    @Override
368    protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException {
369        // Reset the flag in case it was set
370        disconnectedButResumeable = false;
371        super.afterSuccessfulLogin(resumed);
372    }
373
374    @Override
375    protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException,
376                    SmackException, IOException, InterruptedException {
377        // Authenticate using SASL
378        SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null;
379
380        streamFeaturesAfterAuthenticationReceived = false;
381        authenticate(username, password, config.getAuthzid(), sslSession);
382
383        // Wait for stream features after the authentication.
384        // TODO: The name of this synchronization point "maybeCompressFeaturesReceived" is not perfect. It should be
385        // renamed to "streamFeaturesAfterAuthenticationReceived".
386        waitForConditionOrThrowConnectionException(() -> streamFeaturesAfterAuthenticationReceived, "compress features from server");
387
388        // If compression is enabled then request the server to use stream compression. XEP-170
389        // recommends to perform stream compression before resource binding.
390        maybeEnableCompression();
391
392        smResumedSyncPoint = SyncPointState.initial;
393        smResumptionFailed = null;
394        if (isSmResumptionPossible()) {
395            smResumedSyncPoint = SyncPointState.request_sent;
396            sendNonza(new Resume(clientHandledStanzasCount, smSessionId));
397            waitForConditionOrConnectionException(() -> smResumedSyncPoint == SyncPointState.successful || smResumptionFailed != null, "resume previous stream");
398            if (smResumedSyncPoint == SyncPointState.successful) {
399                // We successfully resumed the stream, be done here
400                afterSuccessfulLogin(true);
401                return;
402            }
403            // SM resumption failed, what Smack does here is to report success of
404            // lastFeaturesReceived in case of sm resumption was answered with 'failed' so that
405            // normal resource binding can be tried.
406            assert smResumptionFailed != null;
407            LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process: " + smResumptionFailed);
408        }
409
410        // We either failed to resume a previous stream management (SM) session, or we did not even try. In any case,
411        // mark SM as not enabled. Most importantly, we do this prior calling bindResourceAndEstablishSession(), as the
412        // bind IQ may trigger a SM ack request, which would be invalid in the pre resource bound state.
413        smEnabledSyncPoint = false;
414
415        List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>();
416        if (unacknowledgedStanzas != null) {
417            // There was a previous connection with SM enabled but that was either not resumable or
418            // failed to resume. Make sure that we (re-)send the unacknowledged stanzas.
419            unacknowledgedStanzas.drainTo(previouslyUnackedStanzas);
420            // Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this
421            // XMPP session (There maybe was an enabled in a previous XMPP session of this
422            // connection instance though). This is used in writePackets to decide if stanzas should
423            // be added to the unacknowledged stanzas queue, because they have to be added right
424            // after the 'enable' stream element has been sent.
425            dropSmState();
426        }
427
428        // Now bind the resource. It is important to do this *after* we dropped an eventually
429        // existing Stream Management state. As otherwise <bind/> and <session/> may end up in
430        // unacknowledgedStanzas and become duplicated on reconnect. See SMACK-706.
431        bindResourceAndEstablishSession(resource);
432
433        if (isSmAvailable() && useSm) {
434            // Remove what is maybe left from previously stream managed sessions
435            serverHandledStanzasCount = 0;
436            sendNonza(new Enable(useSmResumption, smClientMaxResumptionTime));
437            // XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed'
438            // then this is a non recoverable error and we therefore throw an exception.
439            waitForConditionOrThrowConnectionException(() -> smEnabledSyncPoint, "enabling stream mangement");
440            synchronized (requestAckPredicates) {
441                if (requestAckPredicates.isEmpty()) {
442                    // Assure that we have at lest one predicate set up that so that we request acks
443                    // for the server and eventually flush some stanzas from the unacknowledged
444                    // stanza queue
445                    requestAckPredicates.add(Predicate.forMessagesOrAfter5Stanzas());
446                }
447            }
448        }
449        // Inform client about failed resumption if possible, resend stanzas otherwise
450        // Process the stanzas synchronously so a client can re-queue them for transmission
451        // before it is informed about connection success
452        if (!stanzaDroppedListeners.isEmpty()) {
453            for (Stanza stanza : previouslyUnackedStanzas) {
454                for (StanzaListener listener : stanzaDroppedListeners) {
455                    try {
456                        listener.processStanza(stanza);
457                    }
458                    catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
459                        LOGGER.log(Level.FINER, "StanzaDroppedListener received exception", e);
460                    }
461                }
462            }
463        } else {
464            for (Stanza stanza : previouslyUnackedStanzas) {
465                sendStanzaInternal(stanza);
466            }
467        }
468
469        afterSuccessfulLogin(false);
470    }
471
472    @Override
473    public boolean isSecureConnection() {
474        return secureSocket != null;
475    }
476
477    /**
478     * Shuts the current connection down. After this method returns, the connection must be ready
479     * for re-use by connect.
480     */
481    @Override
482    protected void shutdown() {
483        if (isSmEnabled()) {
484            try {
485                // Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM
486                // state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either.
487                sendSmAcknowledgementInternal();
488            } catch (InterruptedException | NotConnectedException e) {
489                LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e);
490            }
491        }
492        shutdown(false);
493    }
494
495    @Override
496    public synchronized void instantShutdown() {
497        shutdown(true);
498    }
499
500    private void shutdown(boolean instant) {
501        // The writer thread may already been finished at this point, for example when the connection is in the
502        // disconnected-but-resumable state. There is no need to wait for the closing stream tag from the server in this
503        // case.
504        if (!packetWriter.done()) {
505            // First shutdown the writer, this will result in a closing stream element getting send to
506            // the server
507            LOGGER.finer(packetWriter.threadName + " shutdown()");
508            packetWriter.shutdown(instant);
509            LOGGER.finer(packetWriter.threadName + " shutdown() returned");
510
511            if (!instant) {
512                waitForClosingStreamTagFromServer();
513            }
514        }
515
516        LOGGER.finer(packetReader.threadName + " shutdown()");
517        packetReader.shutdown();
518        LOGGER.finer(packetReader.threadName + " shutdown() returned");
519
520        CloseableUtil.maybeClose(socket, LOGGER);
521
522        setWasAuthenticated();
523
524        try {
525            boolean readerAndWriterThreadsTermianted = waitFor(() -> !packetWriter.running && !packetReader.running);
526            if (!readerAndWriterThreadsTermianted) {
527                LOGGER.severe("Reader and/or writer threads did not terminate timely. Writer running: "
528                                + packetWriter.running + ", Reader running: " + packetReader.running);
529            } else {
530                LOGGER.fine("Reader and writer threads terminated");
531            }
532        } catch (InterruptedException e) {
533            LOGGER.log(Level.FINE, "Interrupted while waiting for reader and writer threads to terminate", e);
534        }
535
536        if (disconnectedButResumeable) {
537            return;
538        }
539
540        // If we are able to resume the stream, then don't set
541        // connected/authenticated/usingTLS to false since we like behave like we are still
542        // connected (e.g. sendStanza should not throw a NotConnectedException).
543        if (instant) {
544            disconnectedButResumeable = isSmResumptionPossible();
545            if (!disconnectedButResumeable) {
546                // Reset the stream management session id to null, since the stream is no longer resumable. Note that we
547                // keep the unacknowledgedStanzas queue, because we want to resend them when we are reconnected.
548                smSessionId = null;
549            }
550        } else {
551            disconnectedButResumeable = false;
552
553            // Drop the stream management state if this is not an instant shutdown. We send
554            // a </stream> close tag and now the stream management state is no longer valid.
555            // This also prevents that we will potentially (re-)send any unavailable presence we
556            // may have send, because it got put into the unacknowledged queue and was not acknowledged before the
557            // connection terminated.
558            dropSmState();
559            // Note that we deliberately do not reset authenticatedConnectionInitiallyEstablishedTimestamp here, so that the
560            // information is available in the connectionClosedOnError() listeners.
561        }
562        authenticated = false;
563        connected = false;
564        secureSocket = null;
565        reader = null;
566        writer = null;
567
568        initState();
569    }
570
571    @Override
572    public void sendNonza(Nonza element) throws NotConnectedException, InterruptedException {
573        packetWriter.sendStreamElement(element);
574    }
575
576    @Override
577    protected void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException {
578        packetWriter.sendStreamElement(packet);
579        if (isSmEnabled()) {
580            for (StanzaFilter requestAckPredicate : requestAckPredicates) {
581                if (requestAckPredicate.accept(packet)) {
582                    requestSmAcknowledgementInternal();
583                    break;
584                }
585            }
586        }
587    }
588
589    private void connectUsingConfiguration() throws ConnectionException, IOException, InterruptedException {
590        RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result = RemoteXmppTcpConnectionEndpoints.lookup(config);
591
592        List<RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint>> connectionExceptions = new ArrayList<>();
593
594        SocketFactory socketFactory = config.getSocketFactory();
595        ProxyInfo proxyInfo = config.getProxyInfo();
596        int timeout = config.getConnectTimeout();
597        if (socketFactory == null) {
598            socketFactory = SocketFactory.getDefault();
599        }
600        for (Rfc6120TcpRemoteConnectionEndpoint endpoint : result.discoveredRemoteConnectionEndpoints) {
601            Iterator<? extends InetAddress> inetAddresses;
602            String host = endpoint.getHost().toString();
603            UInt16 portUint16 = endpoint.getPort();
604            int port = portUint16.intValue();
605            if (proxyInfo == null) {
606                inetAddresses = endpoint.getInetAddresses().iterator();
607                assert inetAddresses.hasNext();
608
609                innerloop: while (inetAddresses.hasNext()) {
610                    // Create a *new* Socket before every connection attempt, i.e. connect() call, since Sockets are not
611                    // re-usable after a failed connection attempt. See also SMACK-724.
612                    SmackFuture.SocketFuture socketFuture = new SmackFuture.SocketFuture(socketFactory);
613
614                    final InetAddress inetAddress = inetAddresses.next();
615                    final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, port);
616                    LOGGER.finer("Trying to establish TCP connection to " + inetSocketAddress);
617                    socketFuture.connectAsync(inetSocketAddress, timeout);
618
619                    try {
620                        socket = socketFuture.getOrThrow();
621                    } catch (IOException e) {
622                        RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>(
623                                        endpoint, inetAddress, e);
624                        connectionExceptions.add(rce);
625                        if (inetAddresses.hasNext()) {
626                            continue innerloop;
627                        } else {
628                            break innerloop;
629                        }
630                    }
631                    LOGGER.finer("Established TCP connection to " + inetSocketAddress);
632                    // We found a host to connect to, return here
633                    this.host = host;
634                    this.port = portUint16;
635                    return;
636                }
637            } else {
638                // TODO: Move this into the inner-loop above. There appears no reason why we should not try a proxy
639                // connection to every inet address of each connection endpoint.
640                socket = socketFactory.createSocket();
641                StringUtils.requireNotNullNorEmpty(host, "Host of endpoint " + endpoint + " must not be null when using a Proxy");
642                final String hostAndPort = host + " at port " + port;
643                LOGGER.finer("Trying to establish TCP connection via Proxy to " + hostAndPort);
644                try {
645                    proxyInfo.getProxySocketConnection().connect(socket, host, port, timeout);
646                } catch (IOException e) {
647                    CloseableUtil.maybeClose(socket, LOGGER);
648                    RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>(endpoint, null, e);
649                    connectionExceptions.add(rce);
650                    continue;
651                }
652                LOGGER.finer("Established TCP connection to " + hostAndPort);
653                // We found a host to connect to, return here
654                this.host = host;
655                this.port = portUint16;
656                return;
657            }
658        }
659
660        // There are no more host addresses to try
661        // throw an exception and report all tried
662        // HostAddresses in the exception
663        throw EndpointConnectionException.from(result.lookupFailures, connectionExceptions);
664    }
665
666    /**
667     * Initializes the connection by creating a stanza reader and writer and opening a
668     * XMPP stream to the server.
669     *
670     * @throws XMPPException if establishing a connection to the server fails.
671     * @throws SmackException if the server fails to respond back or if there is anther error.
672     * @throws IOException if an I/O error occurred.
673     * @throws InterruptedException if the calling thread was interrupted.
674     */
675    private void initConnection() throws IOException, InterruptedException {
676        compressionHandler = null;
677
678        // Set the reader and writer instance variables
679        initReaderAndWriter();
680
681        // Start the writer thread. This will open an XMPP stream to the server
682        packetWriter.init();
683        // Start the reader thread. The startup() method will block until we
684        // get an opening stream packet back from server
685        packetReader.init();
686    }
687
688    private void initReaderAndWriter() throws IOException {
689        InputStream is = socket.getInputStream();
690        OutputStream os = socket.getOutputStream();
691        if (compressionHandler != null) {
692            is = compressionHandler.getInputStream(is);
693            os = compressionHandler.getOutputStream(os);
694        }
695        // OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter
696        writer = new OutputStreamWriter(os, "UTF-8");
697        reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
698
699        // If debugging is enabled, we open a window and write out all network traffic.
700        initDebugger();
701    }
702
703    /**
704     * The server has indicated that TLS negotiation can start. We now need to secure the
705     * existing plain connection and perform a handshake. This method won't return until the
706     * connection has finished the handshake or an error occurred while securing the connection.
707     * @throws IOException if an I/O error occurred.
708     * @throws SecurityNotPossibleException if TLS is not possible.
709     * @throws CertificateException if there is an issue with the certificate.
710     */
711    @SuppressWarnings("LiteralClassName")
712    private void proceedTLSReceived() throws IOException, SecurityNotPossibleException, CertificateException {
713        SmackTlsContext smackTlsContext = getSmackTlsContext();
714
715        Socket plain = socket;
716        // Secure the plain connection
717        socket = smackTlsContext.sslContext.getSocketFactory().createSocket(plain,
718                config.getXMPPServiceDomain().toString(), plain.getPort(), true);
719
720        final SSLSocket sslSocket = (SSLSocket) socket;
721        // Immediately set the enabled SSL protocols and ciphers. See SMACK-712 why this is
722        // important (at least on certain platforms) and it seems to be a good idea anyways to
723        // prevent an accidental implicit handshake.
724        TLSUtils.setEnabledProtocolsAndCiphers(sslSocket, config.getEnabledSSLProtocols(), config.getEnabledSSLCiphers());
725
726        // Initialize the reader and writer with the new secured version
727        initReaderAndWriter();
728
729        // Proceed to do the handshake
730        sslSocket.startHandshake();
731
732        if (smackTlsContext.daneVerifier != null) {
733            smackTlsContext.daneVerifier.finish(sslSocket.getSession());
734        }
735
736        final HostnameVerifier verifier = getConfiguration().getHostnameVerifier();
737        if (verifier == null) {
738                throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure.");
739        }
740
741        final String verifierHostname;
742        {
743            DnsName xmppServiceDomainDnsName = getConfiguration().getXmppServiceDomainAsDnsNameIfPossible();
744            // Try to convert the XMPP service domain, which potentially includes Unicode characters, into ASCII
745            // Compatible Encoding (ACE) to match RFC3280 dNSname IA5String constraint.
746            // See also: https://bugzilla.mozilla.org/show_bug.cgi?id=280839#c1
747            if (xmppServiceDomainDnsName != null) {
748                verifierHostname = xmppServiceDomainDnsName.ace;
749            }
750            else {
751                LOGGER.log(Level.WARNING, "XMPP service domain name '" + getXMPPServiceDomain()
752                                + "' can not be represented as DNS name. TLS X.509 certificate validiation may fail.");
753                verifierHostname = getXMPPServiceDomain().toString();
754            }
755        }
756
757        final boolean verificationSuccessful;
758        // Verify the TLS session.
759        verificationSuccessful = verifier.verify(verifierHostname, sslSocket.getSession());
760        if (!verificationSuccessful) {
761            throw new CertificateException(
762                            "Hostname verification of certificate failed. Certificate does not authenticate "
763                                            + getXMPPServiceDomain());
764        }
765
766        // Set that TLS was successful
767        secureSocket = sslSocket;
768    }
769
770    /**
771     * Returns the compression handler that can be used for one compression methods offered by the server.
772     *
773     * @return a instance of XMPPInputOutputStream or null if no suitable instance was found
774     *
775     */
776    private static XMPPInputOutputStream maybeGetCompressionHandler(Compress.Feature compression) {
777        for (XMPPInputOutputStream handler : SmackConfiguration.getCompressionHandlers()) {
778                String method = handler.getCompressionMethod();
779                if (compression.getMethods().contains(method))
780                    return handler;
781        }
782        return null;
783    }
784
785    @Override
786    public boolean isUsingCompression() {
787        return compressionHandler != null && compressSyncPoint;
788    }
789
790    /**
791     * <p>
792     * Starts using stream compression that will compress network traffic. Traffic can be
793     * reduced up to 90%. Therefore, stream compression is ideal when using a slow speed network
794     * connection. However, the server and the client will need to use more CPU time in order to
795     * un/compress network data so under high load the server performance might be affected.
796     * </p>
797     * <p>
798     * Stream compression has to have been previously offered by the server. Currently only the
799     * zlib method is supported by the client. Stream compression negotiation has to be done
800     * before authentication took place.
801     * </p>
802     *
803     * @throws NotConnectedException if the XMPP connection is not connected.
804     * @throws SmackException if Smack detected an exceptional situation.
805     * @throws InterruptedException if the calling thread was interrupted.
806     * @throws XMPPException if an XMPP protocol error was received.
807     */
808    private void maybeEnableCompression() throws SmackException, InterruptedException, XMPPException {
809        if (!config.isCompressionEnabled()) {
810            return;
811        }
812
813        Compress.Feature compression = getFeature(Compress.Feature.class);
814        if (compression == null) {
815            // Server does not support compression
816            return;
817        }
818        // If stream compression was offered by the server and we want to use
819        // compression then send compression request to the server
820        if ((compressionHandler = maybeGetCompressionHandler(compression)) != null) {
821            compressSyncPoint = false;
822            sendNonza(new Compress(compressionHandler.getCompressionMethod()));
823            waitForConditionOrThrowConnectionException(() -> compressSyncPoint, "establishing stream compression");
824        } else {
825            LOGGER.warning("Could not enable compression because no matching handler/method pair was found");
826        }
827    }
828
829    /**
830     * Establishes a connection to the XMPP server. It basically
831     * creates and maintains a socket connection to the server.
832     * <p>
833     * Listeners will be preserved from a previous connection if the reconnection
834     * occurs after an abrupt termination.
835     * </p>
836     *
837     * @throws XMPPException if an error occurs while trying to establish the connection.
838     * @throws SmackException if Smack detected an exceptional situation.
839     * @throws IOException if an I/O error occurred.
840     * @throws InterruptedException if the calling thread was interrupted.
841     */
842    @Override
843    protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException {
844        // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if
845        // there is an error establishing the connection
846        connectUsingConfiguration();
847
848        connected = true;
849
850        // We connected successfully to the servers TCP port
851        initConnection();
852
853        // TLS handled will be true either if TLS was established, or if it was not mandatory.
854        waitForConditionOrThrowConnectionException(() -> tlsHandled, "establishing TLS");
855
856        // Wait with SASL auth until the SASL mechanisms have been received
857        waitForConditionOrThrowConnectionException(() -> saslFeatureReceived, "SASL mechanisms stream feature from server");
858    }
859
860    /**
861     * For unit testing purposes
862     *
863     * @param writer TODO javadoc me please
864     */
865    protected void setWriter(Writer writer) {
866        this.writer = writer;
867    }
868
869    @Override
870    protected void afterFeaturesReceived() throws NotConnectedException, InterruptedException, SecurityRequiredByServerException {
871        StartTls startTlsFeature = getFeature(StartTls.class);
872        if (startTlsFeature != null) {
873            if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) {
874                SecurityRequiredByServerException smackException = new SecurityRequiredByServerException();
875                currentSmackException = smackException;
876                notifyWaitingThreads();
877                throw smackException;
878            }
879
880            if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) {
881                sendNonza(new StartTls());
882            } else {
883                tlsHandled = true;
884                notifyWaitingThreads();
885            }
886        } else {
887            tlsHandled = true;
888            notifyWaitingThreads();
889        }
890
891        if (isSaslAuthenticated()) {
892            // If we have received features after the SASL has been successfully completed, then we
893            // have also *maybe* received, as it is an optional feature, the compression feature
894            // from the server.
895            streamFeaturesAfterAuthenticationReceived = true;
896            notifyWaitingThreads();
897        }
898    }
899
900    private void resetParser() throws IOException {
901        try {
902            packetReader.parser = SmackXmlParser.newXmlParser(reader);
903        } catch (XmlPullParserException e) {
904            throw new IOException(e);
905        }
906   }
907
908    private void openStreamAndResetParser() throws IOException, NotConnectedException, InterruptedException {
909        sendStreamOpen();
910        resetParser();
911    }
912
913    protected class PacketReader {
914
915        private final String threadName = "Smack Reader (" + getConnectionCounter() + ')';
916
917        XmlPullParser parser;
918
919        private volatile boolean done;
920
921        private boolean running;
922
923        /**
924         * Initializes the reader in order to be used. The reader is initialized during the
925         * first connection and when reconnecting due to an abruptly disconnection.
926         */
927        void init() {
928            done = false;
929
930            running = true;
931            Async.go(new Runnable() {
932                @Override
933                public void run() {
934                    LOGGER.finer(threadName + " start");
935                    try {
936                        parsePackets();
937                    } finally {
938                        LOGGER.finer(threadName + " exit");
939                        running = false;
940                        notifyWaitingThreads();
941                    }
942                }
943            }, threadName);
944         }
945
946        /**
947         * Shuts the stanza reader down. This method simply sets the 'done' flag to true.
948         */
949        void shutdown() {
950            done = true;
951        }
952
953        /**
954         * Parse top-level packets in order to process them further.
955         */
956        private void parsePackets() {
957            try {
958                openStreamAndResetParser();
959                XmlPullParser.Event eventType = parser.getEventType();
960                while (!done) {
961                    switch (eventType) {
962                    case START_ELEMENT:
963                        final String name = parser.getName();
964                        switch (name) {
965                        case Message.ELEMENT:
966                        case IQ.IQ_ELEMENT:
967                        case Presence.ELEMENT:
968                            try {
969                                parseAndProcessStanza(parser);
970                            } finally {
971                                clientHandledStanzasCount = SMUtils.incrementHeight(clientHandledStanzasCount);
972                            }
973                            break;
974                        case "stream":
975                            onStreamOpen(parser);
976                            break;
977                        case "error":
978                            StreamError streamError = PacketParserUtils.parseStreamError(parser);
979                            // Stream errors are non recoverable, throw this exceptions. Also note that this will set
980                            // this exception as current connection exceptions and notify any waiting threads.
981                            throw new StreamErrorException(streamError);
982                        case "features":
983                            parseFeaturesAndNotify(parser);
984                            break;
985                        case "proceed":
986                            // Secure the connection by negotiating TLS
987                            proceedTLSReceived();
988                            // Send a new opening stream to the server
989                            openStreamAndResetParser();
990                            break;
991                        case "failure":
992                            String namespace = parser.getNamespace(null);
993                            switch (namespace) {
994                            case "urn:ietf:params:xml:ns:xmpp-tls":
995                                // TLS negotiation has failed. The server will close the connection
996                                // TODO Parse failure stanza
997                                throw new SmackException.SmackMessageException("TLS negotiation has failed");
998                            case "http://jabber.org/protocol/compress":
999                                // Stream compression has been denied. This is a recoverable
1000                                // situation. It is still possible to authenticate and
1001                                // use the connection but using an uncompressed connection
1002                                // TODO Parse failure stanza
1003                                currentSmackException = new SmackException.SmackMessageException("Could not establish compression");
1004                                notifyWaitingThreads();
1005                                break;
1006                            default:
1007                                parseAndProcessNonza(parser);
1008                            }
1009                            break;
1010                        case Compressed.ELEMENT:
1011                            // Server confirmed that it's possible to use stream compression. Start
1012                            // stream compression
1013                            // Initialize the reader and writer with the new compressed version
1014                            initReaderAndWriter();
1015                            // Send a new opening stream to the server
1016                            openStreamAndResetParser();
1017                            // Notify that compression is being used
1018                            compressSyncPoint = true;
1019                            notifyWaitingThreads();
1020                            break;
1021                        case Enabled.ELEMENT:
1022                            Enabled enabled = ParseStreamManagement.enabled(parser);
1023                            if (enabled.isResumeSet()) {
1024                                smSessionId = enabled.getId();
1025                                if (StringUtils.isNullOrEmpty(smSessionId)) {
1026                                    SmackException xmppException = new SmackException.SmackMessageException("Stream Management 'enabled' element with resume attribute but without session id received");
1027                                    setCurrentConnectionExceptionAndNotify(xmppException);
1028                                    throw xmppException;
1029                                }
1030                                smServerMaxResumptionTime = enabled.getMaxResumptionTime();
1031                            } else {
1032                                // Mark this a non-resumable stream by setting smSessionId to null
1033                                smSessionId = null;
1034                            }
1035                            clientHandledStanzasCount = 0;
1036                            smWasEnabledAtLeastOnce = true;
1037                            smEnabledSyncPoint = true;
1038                            notifyWaitingThreads();
1039                            break;
1040                        case Failed.ELEMENT:
1041                            Failed failed = ParseStreamManagement.failed(parser);
1042                            if (smResumedSyncPoint == SyncPointState.request_sent) {
1043                                // This is a <failed/> nonza in a response to resuming a previous stream, failure to do
1044                                // so is non-fatal as we can simply continue with resource binding in this case.
1045                                smResumptionFailed = failed;
1046                                notifyWaitingThreads();
1047                            } else {
1048                                FailedNonzaException xmppException = new FailedNonzaException(failed, failed.getStanzaErrorCondition());
1049                                setCurrentConnectionExceptionAndNotify(xmppException);
1050                            }
1051                            break;
1052                        case Resumed.ELEMENT:
1053                            Resumed resumed = ParseStreamManagement.resumed(parser);
1054                            if (!smSessionId.equals(resumed.getPrevId())) {
1055                                throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId());
1056                            }
1057                            // Mark SM as enabled
1058                            smEnabledSyncPoint = true;
1059                            // First, drop the stanzas already handled by the server
1060                            processHandledCount(resumed.getHandledCount());
1061                            // Then re-send what is left in the unacknowledged queue
1062                            List<Stanza> stanzasToResend = new ArrayList<>(unacknowledgedStanzas.size());
1063                            unacknowledgedStanzas.drainTo(stanzasToResend);
1064                            for (Stanza stanza : stanzasToResend) {
1065                                sendStanzaInternal(stanza);
1066                            }
1067                            // If there where stanzas resent, then request a SM ack for them.
1068                            // Writer's sendStreamElement() won't do it automatically based on
1069                            // predicates.
1070                            if (!stanzasToResend.isEmpty()) {
1071                                requestSmAcknowledgementInternal();
1072                            }
1073                            // Mark SM resumption as successful
1074                            smResumedSyncPoint = SyncPointState.successful;
1075                            notifyWaitingThreads();
1076                            break;
1077                        case AckAnswer.ELEMENT:
1078                            AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser);
1079                            processHandledCount(ackAnswer.getHandledCount());
1080                            break;
1081                        case AckRequest.ELEMENT:
1082                            ParseStreamManagement.ackRequest(parser);
1083                            if (smEnabledSyncPoint) {
1084                                sendSmAcknowledgementInternal();
1085                            } else {
1086                                LOGGER.warning("SM Ack Request received while SM is not enabled");
1087                            }
1088                            break;
1089                         default:
1090                             parseAndProcessNonza(parser);
1091                             break;
1092                        }
1093                        break;
1094                    case END_ELEMENT:
1095                        final String endTagName = parser.getName();
1096                        if ("stream".equals(endTagName)) {
1097                            if (!parser.getNamespace().equals("http://etherx.jabber.org/streams")) {
1098                                LOGGER.warning(XMPPTCPConnection.this +  " </stream> but different namespace " + parser.getNamespace());
1099                                break;
1100                            }
1101
1102                            // Check if the queue was already shut down before reporting success on closing stream tag
1103                            // received. This avoids a race if there is a disconnect(), followed by a connect(), which
1104                            // did re-start the queue again, causing this writer to assume that the queue is not
1105                            // shutdown, which results in a call to disconnect().
1106                            final boolean queueWasShutdown = packetWriter.queue.isShutdown();
1107                            closingStreamReceived = true;
1108                            notifyWaitingThreads();
1109
1110                            if (queueWasShutdown) {
1111                                // We received a closing stream element *after* we initiated the
1112                                // termination of the session by sending a closing stream element to
1113                                // the server first
1114                                return;
1115                            } else {
1116                                // We received a closing stream element from the server without us
1117                                // sending a closing stream element first. This means that the
1118                                // server wants to terminate the session, therefore disconnect
1119                                // the connection
1120                                LOGGER.info(XMPPTCPConnection.this
1121                                                + " received closing </stream> element."
1122                                                + " Server wants to terminate the connection, calling disconnect()");
1123                                ASYNC_BUT_ORDERED.performAsyncButOrdered(XMPPTCPConnection.this, new Runnable() {
1124                                    @Override
1125                                    public void run() {
1126                                        disconnect();
1127                                    }});
1128                            }
1129                        }
1130                        break;
1131                    case END_DOCUMENT:
1132                        // END_DOCUMENT only happens in an error case, as otherwise we would see a
1133                        // closing stream element before.
1134                        throw new SmackException.SmackMessageException(
1135                                        "Parser got END_DOCUMENT event. This could happen e.g. if the server closed the connection without sending a closing stream element");
1136                    default:
1137                        // Catch all for incomplete switch (MissingCasesInEnumSwitch) statement.
1138                        break;
1139                    }
1140                    eventType = parser.next();
1141                }
1142            }
1143            catch (Exception e) {
1144                // The exception can be ignored if the the connection is 'done'
1145                // or if the it was caused because the socket got closed.
1146                if (!done) {
1147                    // Set running to false since this thread will exit here and notifyConnectionError() will wait until
1148                    // the reader and writer thread's 'running' value is false.
1149                    running = false;
1150                    // Close the connection and notify connection listeners of the
1151                    // error.
1152                    notifyConnectionError(e);
1153                }
1154            }
1155        }
1156    }
1157
1158    protected class PacketWriter {
1159        public static final int QUEUE_SIZE = XMPPTCPConnection.QUEUE_SIZE;
1160        public static final int UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE = 1024;
1161        public static final int UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE_HIGH_WATER_MARK = (int) (0.3 * UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE);
1162
1163        private final String threadName = "Smack Writer (" + getConnectionCounter() + ')';
1164
1165        private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<>(
1166                        QUEUE_SIZE, true);
1167
1168        /**
1169         * If set, the stanza writer is shut down
1170         */
1171        protected volatile Long shutdownTimestamp = null;
1172
1173        private volatile boolean instantShutdown;
1174
1175        /**
1176         * True if some preconditions are given to start the bundle and defer mechanism.
1177         * <p>
1178         * This will likely get set to true right after the start of the writer thread, because
1179         * {@link #nextStreamElement()} will check if {@link queue} is empty, which is probably the case, and then set
1180         * this field to true.
1181         * </p>
1182         */
1183        private boolean shouldBundleAndDefer;
1184
1185        private boolean running;
1186
1187        /**
1188        * Initializes the writer in order to be used. It is called at the first connection and also
1189        * is invoked if the connection is disconnected by an error.
1190        */
1191        void init() {
1192            shutdownTimestamp = null;
1193
1194            if (unacknowledgedStanzas != null) {
1195                // It's possible that there are new stanzas in the writer queue that
1196                // came in while we were disconnected but resumable, drain those into
1197                // the unacknowledged queue so that they get resent now
1198                drainWriterQueueToUnacknowledgedStanzas();
1199            }
1200
1201            queue.start();
1202            running = true;
1203            Async.go(new Runnable() {
1204                @Override
1205                public void run() {
1206                    LOGGER.finer(threadName + " start");
1207                    try {
1208                        writePackets();
1209                    } finally {
1210                        LOGGER.finer(threadName + " exit");
1211                        running = false;
1212                        notifyWaitingThreads();
1213                    }
1214                }
1215            }, threadName);
1216        }
1217
1218        private boolean done() {
1219            return shutdownTimestamp != null;
1220        }
1221
1222        protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException {
1223            final boolean done = done();
1224            if (done) {
1225                final boolean smResumptionPossible = isSmResumptionPossible();
1226                // Don't throw a NotConnectedException is there is an resumable stream available
1227                if (!smResumptionPossible) {
1228                    throw new NotConnectedException(XMPPTCPConnection.this, "done=" + done
1229                                    + " smResumptionPossible=" + smResumptionPossible);
1230                }
1231            }
1232        }
1233
1234        /**
1235         * Sends the specified element to the server.
1236         *
1237         * @param element the element to send.
1238         * @throws NotConnectedException if the XMPP connection is not connected.
1239         * @throws InterruptedException if the calling thread was interrupted.
1240         */
1241        protected void sendStreamElement(Element element) throws NotConnectedException, InterruptedException {
1242            throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
1243            try {
1244                queue.put(element);
1245            }
1246            catch (InterruptedException e) {
1247                // put() may throw an InterruptedException for two reasons:
1248                // 1. If the queue was shut down
1249                // 2. If the thread was interrupted
1250                // so we have to check which is the case
1251                throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
1252                // If the method above did not throw, then the sending thread was interrupted
1253                throw e;
1254            }
1255        }
1256
1257        /**
1258         * Shuts down the stanza writer. Once this method has been called, no further
1259         * packets will be written to the server.
1260         */
1261        void shutdown(boolean instant) {
1262            instantShutdown = instant;
1263            queue.shutdown();
1264            shutdownTimestamp = System.currentTimeMillis();
1265        }
1266
1267        /**
1268         * Maybe return the next available element from the queue for writing. If the queue is shut down <b>or</b> a
1269         * spurious interrupt occurs, <code>null</code> is returned. So it is important to check the 'done' condition in
1270         * that case.
1271         *
1272         * @return the next element for writing or null.
1273         */
1274        private Element nextStreamElement() {
1275            // It is important the we check if the queue is empty before removing an element from it
1276            if (queue.isEmpty()) {
1277                shouldBundleAndDefer = true;
1278            }
1279            Element packet = null;
1280            try {
1281                packet = queue.take();
1282            }
1283            catch (InterruptedException e) {
1284                if (!queue.isShutdown()) {
1285                    // Users shouldn't try to interrupt the packet writer thread
1286                    LOGGER.log(Level.WARNING, "Writer thread was interrupted. Don't do that. Use disconnect() instead.", e);
1287                }
1288            }
1289            return packet;
1290        }
1291
1292        private void writePackets() {
1293            try {
1294                // Write out packets from the queue.
1295                while (!done()) {
1296                    Element element = nextStreamElement();
1297                    if (element == null) {
1298                        continue;
1299                    }
1300
1301                    // Get a local version of the bundle and defer callback, in case it's unset
1302                    // between the null check and the method invocation
1303                    final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback;
1304                    // If the preconditions are given (e.g. bundleAndDefer callback is set, queue is
1305                    // empty), then we could wait a bit for further stanzas attempting to decrease
1306                    // our energy consumption
1307                    if (localBundleAndDeferCallback != null && isAuthenticated() && shouldBundleAndDefer) {
1308                        // Reset shouldBundleAndDefer to false, nextStreamElement() will set it to true once the
1309                        // queue is empty again.
1310                        shouldBundleAndDefer = false;
1311                        final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean();
1312                        final int bundleAndDeferMillis = localBundleAndDeferCallback.getBundleAndDeferMillis(new BundleAndDefer(
1313                                        bundlingAndDeferringStopped));
1314                        if (bundleAndDeferMillis > 0) {
1315                            long remainingWait = bundleAndDeferMillis;
1316                            final long waitStart = System.currentTimeMillis();
1317                            synchronized (bundlingAndDeferringStopped) {
1318                                while (!bundlingAndDeferringStopped.get() && remainingWait > 0) {
1319                                    bundlingAndDeferringStopped.wait(remainingWait);
1320                                    remainingWait = bundleAndDeferMillis
1321                                                    - (System.currentTimeMillis() - waitStart);
1322                                }
1323                            }
1324                        }
1325                    }
1326
1327                    Stanza packet = null;
1328                    if (element instanceof Stanza) {
1329                        packet = (Stanza) element;
1330                    }
1331                    else if (element instanceof Enable) {
1332                        // The client needs to add messages to the unacknowledged stanzas queue
1333                        // right after it sent 'enabled'. Stanza will be added once
1334                        // unacknowledgedStanzas is not null.
1335                        unacknowledgedStanzas = new ArrayBlockingQueue<>(UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE);
1336                    }
1337                    maybeAddToUnacknowledgedStanzas(packet);
1338
1339                    CharSequence elementXml = element.toXML(outgoingStreamXmlEnvironment);
1340                    if (elementXml instanceof XmlStringBuilder) {
1341                        try {
1342                            ((XmlStringBuilder) elementXml).write(writer, outgoingStreamXmlEnvironment);
1343                        } catch (NullPointerException npe) {
1344                            LOGGER.log(Level.FINE, "NPE in XmlStringBuilder of " + element.getClass() + ": " + element, npe);
1345                            throw npe;
1346                        }
1347                    }
1348                    else {
1349                        writer.write(elementXml.toString());
1350                    }
1351
1352                    if (queue.isEmpty()) {
1353                        writer.flush();
1354                    }
1355                    if (packet != null) {
1356                        firePacketSendingListeners(packet);
1357                    }
1358                }
1359                if (!instantShutdown) {
1360                    // Flush out the rest of the queue.
1361                    try {
1362                        while (!queue.isEmpty()) {
1363                            Element packet = queue.remove();
1364                            if (packet instanceof Stanza) {
1365                                Stanza stanza = (Stanza) packet;
1366                                maybeAddToUnacknowledgedStanzas(stanza);
1367                            }
1368                            writer.write(packet.toXML().toString());
1369                        }
1370                    }
1371                    catch (Exception e) {
1372                        LOGGER.log(Level.WARNING,
1373                                        "Exception flushing queue during shutdown, ignore and continue",
1374                                        e);
1375                    }
1376
1377                    // Close the stream.
1378                    try {
1379                        writer.write("</stream:stream>");
1380                        writer.flush();
1381                    }
1382                    catch (Exception e) {
1383                        LOGGER.log(Level.WARNING, "Exception writing closing stream element", e);
1384                    }
1385
1386                    // Delete the queue contents (hopefully nothing is left).
1387                    queue.clear();
1388                } else if (instantShutdown && isSmEnabled()) {
1389                    // This was an instantShutdown and SM is enabled, drain all remaining stanzas
1390                    // into the unacknowledgedStanzas queue
1391                    drainWriterQueueToUnacknowledgedStanzas();
1392                }
1393                // Do *not* close the writer here, as it will cause the socket
1394                // to get closed. But we may want to receive further stanzas
1395                // until the closing stream tag is received. The socket will be
1396                // closed in shutdown().
1397            }
1398            catch (Exception e) {
1399                // The exception can be ignored if the the connection is 'done'
1400                // or if the it was caused because the socket got closed
1401                if (!(done() || queue.isShutdown())) {
1402                    // Set running to false since this thread will exit here and notifyConnectionError() will wait until
1403                    // the reader and writer thread's 'running' value is false.
1404                    running = false;
1405                    notifyConnectionError(e);
1406                } else {
1407                    LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e);
1408                }
1409            }
1410        }
1411
1412        private void drainWriterQueueToUnacknowledgedStanzas() {
1413            List<Element> elements = new ArrayList<>(queue.size());
1414            queue.drainTo(elements);
1415            for (int i = 0; i < elements.size(); i++) {
1416                Element element = elements.get(i);
1417                // If the unacknowledgedStanza queue is full, then bail out with a warning message. See SMACK-844.
1418                if (unacknowledgedStanzas.remainingCapacity() == 0) {
1419                    StreamManagementException.UnacknowledgedQueueFullException exception = StreamManagementException.UnacknowledgedQueueFullException
1420                            .newWith(i, elements, unacknowledgedStanzas);
1421                    LOGGER.log(Level.WARNING,
1422                            "Some stanzas may be lost as not all could be drained to the unacknowledged stanzas queue", exception);
1423                    return;
1424                }
1425                if (element instanceof Stanza) {
1426                    unacknowledgedStanzas.add((Stanza) element);
1427                }
1428            }
1429        }
1430
1431        private void maybeAddToUnacknowledgedStanzas(Stanza stanza) throws IOException {
1432            // Check if the stream element should be put to the unacknowledgedStanza
1433            // queue. Note that we can not do the put() in sendStanzaInternal() and the
1434            // packet order is not stable at this point (sendStanzaInternal() can be
1435            // called concurrently).
1436            if (unacknowledgedStanzas != null && stanza != null) {
1437                // If the unacknowledgedStanza queue reaching its high water mark, request an new ack
1438                // from the server in order to drain it
1439                if (unacknowledgedStanzas.size() == UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE_HIGH_WATER_MARK) {
1440                    writer.write(AckRequest.INSTANCE.toXML().toString());
1441                }
1442
1443                try {
1444                    // It is important the we put the stanza in the unacknowledged stanza
1445                    // queue before we put it on the wire
1446                    unacknowledgedStanzas.put(stanza);
1447                }
1448                catch (InterruptedException e) {
1449                    throw new IllegalStateException(e);
1450                }
1451            }
1452        }
1453    }
1454
1455    /**
1456     * Set if Stream Management should be used by default for new connections.
1457     *
1458     * @param useSmDefault true to use Stream Management for new connections.
1459     */
1460    public static void setUseStreamManagementDefault(boolean useSmDefault) {
1461        XMPPTCPConnection.useSmDefault = useSmDefault;
1462    }
1463
1464    /**
1465     * Set if Stream Management resumption should be used by default for new connections.
1466     *
1467     * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
1468     * @deprecated use {@link #setUseStreamManagementResumptionDefault(boolean)} instead.
1469     */
1470    @Deprecated
1471    public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) {
1472        setUseStreamManagementResumptionDefault(useSmResumptionDefault);
1473    }
1474
1475    /**
1476     * Set if Stream Management resumption should be used by default for new connections.
1477     *
1478     * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
1479     */
1480    public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) {
1481        if (useSmResumptionDefault) {
1482            // Also enable SM is resumption is enabled
1483            setUseStreamManagementDefault(useSmResumptionDefault);
1484        }
1485        XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault;
1486    }
1487
1488    /**
1489     * Set if Stream Management should be used if supported by the server.
1490     *
1491     * @param useSm true to use Stream Management.
1492     */
1493    public void setUseStreamManagement(boolean useSm) {
1494        this.useSm = useSm;
1495    }
1496
1497    /**
1498     * Set if Stream Management resumption should be used if supported by the server.
1499     *
1500     * @param useSmResumption true to use Stream Management resumption.
1501     */
1502    public void setUseStreamManagementResumption(boolean useSmResumption) {
1503        if (useSmResumption) {
1504            // Also enable SM is resumption is enabled
1505            setUseStreamManagement(useSmResumption);
1506        }
1507        this.useSmResumption = useSmResumption;
1508    }
1509
1510    /**
1511     * Set the preferred resumption time in seconds.
1512     * @param resumptionTime the preferred resumption time in seconds
1513     */
1514    public void setPreferredResumptionTime(int resumptionTime) {
1515        smClientMaxResumptionTime = resumptionTime;
1516    }
1517
1518    /**
1519     * Add a predicate for Stream Management acknowledgment requests.
1520     * <p>
1521     * Those predicates are used to determine when a Stream Management acknowledgement request is send to the server.
1522     * Some pre-defined predicates are found in the <code>org.jivesoftware.smack.sm.predicates</code> package.
1523     * </p>
1524     * <p>
1525     * If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used.
1526     * </p>
1527     *
1528     * @param predicate the predicate to add.
1529     * @return if the predicate was not already active.
1530     */
1531    public boolean addRequestAckPredicate(StanzaFilter predicate) {
1532        synchronized (requestAckPredicates) {
1533            return requestAckPredicates.add(predicate);
1534        }
1535    }
1536
1537    /**
1538     * Remove the given predicate for Stream Management acknowledgment request.
1539     * @param predicate the predicate to remove.
1540     * @return true if the predicate was removed.
1541     */
1542    public boolean removeRequestAckPredicate(StanzaFilter predicate) {
1543        synchronized (requestAckPredicates) {
1544            return requestAckPredicates.remove(predicate);
1545        }
1546    }
1547
1548    /**
1549     * Remove all predicates for Stream Management acknowledgment requests.
1550     */
1551    public void removeAllRequestAckPredicates() {
1552        synchronized (requestAckPredicates) {
1553            requestAckPredicates.clear();
1554        }
1555    }
1556
1557    /**
1558     * Send an unconditional Stream Management acknowledgement request to the server.
1559     *
1560     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1561     * @throws NotConnectedException if the connection is not connected.
1562     * @throws InterruptedException if the calling thread was interrupted.
1563     */
1564    public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException {
1565        if (!isSmEnabled()) {
1566            throw new StreamManagementException.StreamManagementNotEnabledException();
1567        }
1568        requestSmAcknowledgementInternal();
1569    }
1570
1571    private void requestSmAcknowledgementInternal() throws NotConnectedException, InterruptedException {
1572        packetWriter.sendStreamElement(AckRequest.INSTANCE);
1573    }
1574
1575    /**
1576     * Send a unconditional Stream Management acknowledgment to the server.
1577     * <p>
1578     * See <a href="http://xmpp.org/extensions/xep-0198.html#acking">XEP-198: Stream Management ยง 4. Acks</a>:
1579     * "Either party MAY send an &lt;a/&gt; element at any time (e.g., after it has received a certain number of stanzas,
1580     * or after a certain period of time), even if it has not received an &lt;r/&gt; element from the other party."
1581     * </p>
1582     *
1583     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1584     * @throws NotConnectedException if the connection is not connected.
1585     * @throws InterruptedException if the calling thread was interrupted.
1586     */
1587    public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException {
1588        if (!isSmEnabled()) {
1589            throw new StreamManagementException.StreamManagementNotEnabledException();
1590        }
1591        sendSmAcknowledgementInternal();
1592    }
1593
1594    private void sendSmAcknowledgementInternal() throws NotConnectedException, InterruptedException {
1595        AckAnswer ackAnswer = new AckAnswer(clientHandledStanzasCount);
1596        // Do net put an ack to the queue if it has already been shutdown. Some servers, like ejabberd, like to request
1597        // an ack even after we have send a stream close (and hance the queue was shutdown). If we would not check here,
1598        // then the ack would dangle around in the queue, and be send on the next re-connection attempt even before the
1599        // stream open.
1600        packetWriter.queue.putIfNotShutdown(ackAnswer);
1601    }
1602
1603    /**
1604     * Add a Stanza acknowledged listener.
1605     * <p>
1606     * Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get
1607     * automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, StanzaListener)} when
1608     * possible.
1609     * </p>
1610     *
1611     * @param listener the listener to add.
1612     */
1613    public void addStanzaAcknowledgedListener(StanzaListener listener) {
1614        stanzaAcknowledgedListeners.add(listener);
1615    }
1616
1617    /**
1618     * Remove the given Stanza acknowledged listener.
1619     *
1620     * @param listener the listener.
1621     * @return true if the listener was removed.
1622     */
1623    public boolean removeStanzaAcknowledgedListener(StanzaListener listener) {
1624        return stanzaAcknowledgedListeners.remove(listener);
1625    }
1626
1627    /**
1628     * Remove all stanza acknowledged listeners.
1629     */
1630    public void removeAllStanzaAcknowledgedListeners() {
1631        stanzaAcknowledgedListeners.clear();
1632    }
1633
1634    /**
1635     * Add a Stanza dropped listener.
1636     * <p>
1637     * Those listeners will be invoked every time a Stanza has been dropped due to a failed SM resume. They will not get
1638     * automatically removed. If at least one StanzaDroppedListener is configured, no attempt will be made to retransmit
1639     * the Stanzas.
1640     * </p>
1641     *
1642     * @param listener the listener to add.
1643     * @since 4.3.3
1644     */
1645    public void addStanzaDroppedListener(StanzaListener listener) {
1646        stanzaDroppedListeners.add(listener);
1647    }
1648
1649    /**
1650     * Remove the given Stanza dropped listener.
1651     *
1652     * @param listener the listener.
1653     * @return true if the listener was removed.
1654     * @since 4.3.3
1655     */
1656    public boolean removeStanzaDroppedListener(StanzaListener listener) {
1657        return stanzaDroppedListeners.remove(listener);
1658    }
1659
1660    /**
1661     * Add a new Stanza ID acknowledged listener for the given ID.
1662     * <p>
1663     * The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will
1664     * automatically be removed after the listener was run.
1665     * </p>
1666     *
1667     * @param id the stanza ID.
1668     * @param listener the listener to invoke.
1669     * @return the previous listener for this stanza ID or null.
1670     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1671     */
1672    @SuppressWarnings("FutureReturnValueIgnored")
1673    public StanzaListener addStanzaIdAcknowledgedListener(final String id, StanzaListener listener) throws StreamManagementNotEnabledException {
1674        // Prevent users from adding callbacks that will never get removed
1675        if (!smWasEnabledAtLeastOnce) {
1676            throw new StreamManagementException.StreamManagementNotEnabledException();
1677        }
1678        // Remove the listener after max. 3 hours
1679        final int removeAfterSeconds = Math.min(getMaxSmResumptionTime(), 3 * 60 * 60);
1680        schedule(new Runnable() {
1681            @Override
1682            public void run() {
1683                stanzaIdAcknowledgedListeners.remove(id);
1684            }
1685        }, removeAfterSeconds, TimeUnit.SECONDS);
1686        return stanzaIdAcknowledgedListeners.put(id, listener);
1687    }
1688
1689    /**
1690     * Remove the Stanza ID acknowledged listener for the given ID.
1691     *
1692     * @param id the stanza ID.
1693     * @return true if the listener was found and removed, false otherwise.
1694     */
1695    public StanzaListener removeStanzaIdAcknowledgedListener(String id) {
1696        return stanzaIdAcknowledgedListeners.remove(id);
1697    }
1698
1699    /**
1700     * Removes all Stanza ID acknowledged listeners.
1701     */
1702    public void removeAllStanzaIdAcknowledgedListeners() {
1703        stanzaIdAcknowledgedListeners.clear();
1704    }
1705
1706    /**
1707     * Returns true if Stream Management is supported by the server.
1708     *
1709     * @return true if Stream Management is supported by the server.
1710     */
1711    public boolean isSmAvailable() {
1712        return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE);
1713    }
1714
1715    /**
1716     * Returns true if Stream Management was successfully negotiated with the server.
1717     *
1718     * @return true if Stream Management was negotiated.
1719     */
1720    public boolean isSmEnabled() {
1721        return smEnabledSyncPoint;
1722    }
1723
1724    /**
1725     * Returns true if the stream was successfully resumed with help of Stream Management.
1726     *
1727     * @return true if the stream was resumed.
1728     */
1729    public boolean streamWasResumed() {
1730        return smResumedSyncPoint == SyncPointState.successful;
1731    }
1732
1733    /**
1734     * Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible.
1735     *
1736     * @return true if disconnected but resumption possible.
1737     */
1738    public boolean isDisconnectedButSmResumptionPossible() {
1739        return disconnectedButResumeable && isSmResumptionPossible();
1740    }
1741
1742    /**
1743     * Returns true if the stream is resumable.
1744     *
1745     * @return true if the stream is resumable.
1746     */
1747    public boolean isSmResumptionPossible() {
1748        // There is no resumable stream available
1749        if (smSessionId == null)
1750            return false;
1751
1752        final Long shutdownTimestamp = packetWriter.shutdownTimestamp;
1753        // Seems like we are already reconnected, report true
1754        if (shutdownTimestamp == null) {
1755            return true;
1756        }
1757
1758        // See if resumption time is over
1759        long current = System.currentTimeMillis();
1760        long maxResumptionMillies = ((long) getMaxSmResumptionTime()) * 1000;
1761        if (current > shutdownTimestamp + maxResumptionMillies) {
1762            // Stream resumption is *not* possible if the current timestamp is greater then the greatest timestamp where
1763            // resumption is possible
1764            return false;
1765        } else {
1766            return true;
1767        }
1768    }
1769
1770    /**
1771     * Drop the stream management state. Sets {@link #smSessionId} and
1772     * {@link #unacknowledgedStanzas} to <code>null</code>.
1773     */
1774    private void dropSmState() {
1775        // clientHandledCount and serverHandledCount will be reset on <enable/> and <enabled/>
1776        // respective. No need to reset them here.
1777        smSessionId = null;
1778        unacknowledgedStanzas = null;
1779    }
1780
1781    /**
1782     * Get the maximum resumption time in seconds after which a managed stream can be resumed.
1783     * <p>
1784     * This method will return {@link Integer#MAX_VALUE} if neither the client nor the server specify a maximum
1785     * resumption time. Be aware of integer overflows when using this value, e.g. do not add arbitrary values to it
1786     * without checking for overflows before.
1787     * </p>
1788     *
1789     * @return the maximum resumption time in seconds or {@link Integer#MAX_VALUE} if none set.
1790     */
1791    public int getMaxSmResumptionTime() {
1792        int clientResumptionTime = smClientMaxResumptionTime > 0 ? smClientMaxResumptionTime : Integer.MAX_VALUE;
1793        int serverResumptionTime = smServerMaxResumptionTime > 0 ? smServerMaxResumptionTime : Integer.MAX_VALUE;
1794        return Math.min(clientResumptionTime, serverResumptionTime);
1795    }
1796
1797    private void processHandledCount(long handledCount) throws StreamManagementCounterError {
1798        long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount);
1799        final List<Stanza> ackedStanzas = new ArrayList<>(
1800                        ackedStanzasCount <= Integer.MAX_VALUE ? (int) ackedStanzasCount
1801                                        : Integer.MAX_VALUE);
1802        for (long i = 0; i < ackedStanzasCount; i++) {
1803            Stanza ackedStanza = unacknowledgedStanzas.poll();
1804            // If the server ack'ed a stanza, then it must be in the
1805            // unacknowledged stanza queue. There can be no exception.
1806            if (ackedStanza == null) {
1807                throw new StreamManagementCounterError(handledCount, serverHandledStanzasCount,
1808                                ackedStanzasCount, ackedStanzas);
1809            }
1810            ackedStanzas.add(ackedStanza);
1811        }
1812
1813        boolean atLeastOneStanzaAcknowledgedListener = false;
1814        if (!stanzaAcknowledgedListeners.isEmpty()) {
1815            // If stanzaAcknowledgedListeners is not empty, the we have at least one
1816            atLeastOneStanzaAcknowledgedListener = true;
1817        }
1818        else {
1819            // Otherwise we look for a matching id in the stanza *id* acknowledged listeners
1820            for (Stanza ackedStanza : ackedStanzas) {
1821                String id = ackedStanza.getStanzaId();
1822                if (id != null && stanzaIdAcknowledgedListeners.containsKey(id)) {
1823                    atLeastOneStanzaAcknowledgedListener = true;
1824                    break;
1825                }
1826            }
1827        }
1828
1829        // Only spawn a new thread if there is a chance that some listener is invoked
1830        if (atLeastOneStanzaAcknowledgedListener) {
1831            asyncGo(new Runnable() {
1832                @Override
1833                public void run() {
1834                    for (Stanza ackedStanza : ackedStanzas) {
1835                        for (StanzaListener listener : stanzaAcknowledgedListeners) {
1836                            try {
1837                                listener.processStanza(ackedStanza);
1838                            }
1839                            catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
1840                                LOGGER.log(Level.FINER, "Received exception", e);
1841                            }
1842                        }
1843                        String id = ackedStanza.getStanzaId();
1844                        if (StringUtils.isNullOrEmpty(id)) {
1845                            continue;
1846                        }
1847                        StanzaListener listener = stanzaIdAcknowledgedListeners.remove(id);
1848                        if (listener != null) {
1849                            try {
1850                                listener.processStanza(ackedStanza);
1851                            }
1852                            catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
1853                                LOGGER.log(Level.FINER, "Received exception", e);
1854                            }
1855                        }
1856                    }
1857                }
1858            });
1859        }
1860
1861        serverHandledStanzasCount = handledCount;
1862    }
1863
1864    /**
1865     * Set the default bundle and defer callback used for new connections.
1866     *
1867     * @param defaultBundleAndDeferCallback TODO javadoc me please
1868     * @see BundleAndDeferCallback
1869     * @since 4.1
1870     */
1871    public static void setDefaultBundleAndDeferCallback(BundleAndDeferCallback defaultBundleAndDeferCallback) {
1872        XMPPTCPConnection.defaultBundleAndDeferCallback = defaultBundleAndDeferCallback;
1873    }
1874
1875    /**
1876     * Set the bundle and defer callback used for this connection.
1877     * <p>
1878     * You can use <code>null</code> as argument to reset the callback. Outgoing stanzas will then
1879     * no longer get deferred.
1880     * </p>
1881     *
1882     * @param bundleAndDeferCallback the callback or <code>null</code>.
1883     * @see BundleAndDeferCallback
1884     * @since 4.1
1885     */
1886    public void setBundleandDeferCallback(BundleAndDeferCallback bundleAndDeferCallback) {
1887        this.bundleAndDeferCallback = bundleAndDeferCallback;
1888    }
1889
1890}