001/**
002 *
003 * Copyright 2003-2007 Jive Software.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack.tcp;
018
019import java.io.BufferedReader;
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.InputStreamReader;
023import java.io.OutputStream;
024import java.io.OutputStreamWriter;
025import java.io.Writer;
026import java.net.InetAddress;
027import java.net.InetSocketAddress;
028import java.net.Socket;
029import java.security.cert.CertificateException;
030import java.util.ArrayList;
031import java.util.Collection;
032import java.util.Iterator;
033import java.util.LinkedHashSet;
034import java.util.LinkedList;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import java.util.concurrent.ArrayBlockingQueue;
039import java.util.concurrent.BlockingQueue;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.concurrent.ConcurrentLinkedQueue;
042import java.util.concurrent.TimeUnit;
043import java.util.concurrent.atomic.AtomicBoolean;
044import java.util.logging.Level;
045import java.util.logging.Logger;
046
047import javax.net.SocketFactory;
048import javax.net.ssl.HostnameVerifier;
049import javax.net.ssl.SSLSession;
050import javax.net.ssl.SSLSocket;
051import javax.net.ssl.SSLSocketFactory;
052
053import org.jivesoftware.smack.AbstractXMPPConnection;
054import org.jivesoftware.smack.ConnectionConfiguration;
055import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
056import org.jivesoftware.smack.ConnectionListener;
057import org.jivesoftware.smack.SmackConfiguration;
058import org.jivesoftware.smack.SmackException;
059import org.jivesoftware.smack.SmackException.AlreadyConnectedException;
060import org.jivesoftware.smack.SmackException.AlreadyLoggedInException;
061import org.jivesoftware.smack.SmackException.ConnectionException;
062import org.jivesoftware.smack.SmackException.EndpointConnectionException;
063import org.jivesoftware.smack.SmackException.NotConnectedException;
064import org.jivesoftware.smack.SmackException.NotLoggedInException;
065import org.jivesoftware.smack.SmackException.SecurityNotPossibleException;
066import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException;
067import org.jivesoftware.smack.SmackFuture;
068import org.jivesoftware.smack.StanzaListener;
069import org.jivesoftware.smack.XMPPConnection;
070import org.jivesoftware.smack.XMPPException;
071import org.jivesoftware.smack.XMPPException.FailedNonzaException;
072import org.jivesoftware.smack.XMPPException.StreamErrorException;
073import org.jivesoftware.smack.compress.packet.Compress;
074import org.jivesoftware.smack.compress.packet.Compressed;
075import org.jivesoftware.smack.compression.XMPPInputOutputStream;
076import org.jivesoftware.smack.datatypes.UInt16;
077import org.jivesoftware.smack.filter.StanzaFilter;
078import org.jivesoftware.smack.internal.SmackTlsContext;
079import org.jivesoftware.smack.packet.Element;
080import org.jivesoftware.smack.packet.IQ;
081import org.jivesoftware.smack.packet.Message;
082import org.jivesoftware.smack.packet.Nonza;
083import org.jivesoftware.smack.packet.Presence;
084import org.jivesoftware.smack.packet.Stanza;
085import org.jivesoftware.smack.packet.StartTls;
086import org.jivesoftware.smack.packet.StreamError;
087import org.jivesoftware.smack.proxy.ProxyInfo;
088import org.jivesoftware.smack.sasl.packet.SaslNonza;
089import org.jivesoftware.smack.sm.SMUtils;
090import org.jivesoftware.smack.sm.StreamManagementException;
091import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException;
092import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError;
093import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException;
094import org.jivesoftware.smack.sm.packet.StreamManagement;
095import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer;
096import org.jivesoftware.smack.sm.packet.StreamManagement.AckRequest;
097import org.jivesoftware.smack.sm.packet.StreamManagement.Enable;
098import org.jivesoftware.smack.sm.packet.StreamManagement.Enabled;
099import org.jivesoftware.smack.sm.packet.StreamManagement.Failed;
100import org.jivesoftware.smack.sm.packet.StreamManagement.Resume;
101import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed;
102import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature;
103import org.jivesoftware.smack.sm.predicates.Predicate;
104import org.jivesoftware.smack.sm.provider.ParseStreamManagement;
105import org.jivesoftware.smack.tcp.rce.RemoteXmppTcpConnectionEndpoints;
106import org.jivesoftware.smack.tcp.rce.Rfc6120TcpRemoteConnectionEndpoint;
107import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
108import org.jivesoftware.smack.util.Async;
109import org.jivesoftware.smack.util.CloseableUtil;
110import org.jivesoftware.smack.util.PacketParserUtils;
111import org.jivesoftware.smack.util.StringUtils;
112import org.jivesoftware.smack.util.TLSUtils;
113import org.jivesoftware.smack.util.XmlStringBuilder;
114import org.jivesoftware.smack.util.rce.RemoteConnectionException;
115import org.jivesoftware.smack.xml.SmackXmlParser;
116import org.jivesoftware.smack.xml.XmlPullParser;
117import org.jivesoftware.smack.xml.XmlPullParserException;
118
119import org.jxmpp.jid.impl.JidCreate;
120import org.jxmpp.jid.parts.Resourcepart;
121import org.jxmpp.stringprep.XmppStringprepException;
122import org.minidns.dnsname.DnsName;
123
124/**
125 * Creates a socket connection to an XMPP server. This is the default connection
126 * to an XMPP server and is specified in the XMPP Core (RFC 6120).
127 *
128 * @see XMPPConnection
129 * @author Matt Tucker
130 */
131public class XMPPTCPConnection extends AbstractXMPPConnection {
132
133    private static final int QUEUE_SIZE = 500;
134    private static final Logger LOGGER = Logger.getLogger(XMPPTCPConnection.class.getName());
135
136    /**
137     * The socket which is used for this connection.
138     */
139    private Socket socket;
140
141    /**
142     *
143     */
144    private boolean disconnectedButResumeable = false;
145
146    private SSLSocket secureSocket;
147
148    /**
149     * Protected access level because of unit test purposes
150     */
151    protected final PacketWriter packetWriter = new PacketWriter();
152
153    /**
154     * Protected access level because of unit test purposes
155     */
156    protected final PacketReader packetReader = new PacketReader();
157
158    /**
159     *
160     */
161    private boolean streamFeaturesAfterAuthenticationReceived;
162
163    /**
164     *
165     */
166    private boolean compressSyncPoint;
167
168    /**
169     * The default bundle and defer callback, used for new connections.
170     * @see bundleAndDeferCallback
171     */
172    private static BundleAndDeferCallback defaultBundleAndDeferCallback;
173
174    /**
175     * The used bundle and defer callback.
176     * <p>
177     * Although this field may be set concurrently, the 'volatile' keyword was deliberately not added, in order to avoid
178     * having a 'volatile' read within the writer threads loop.
179     * </p>
180     */
181    private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback;
182
183    private static boolean useSmDefault = true;
184
185    private static boolean useSmResumptionDefault = true;
186
187    /**
188     * The stream ID of the stream that is currently resumable, ie. the stream we hold the state
189     * for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and
190     * {@link #unFailedNonzaExceptionacknowledgedStanzas}.
191     */
192    private String smSessionId;
193
194    /**
195     * Represents the state of stream management resumption.
196     * <p>
197     * Unlike other sync points, this sync point is marked volatile because it is also read by the reader thread.
198     * </p>
199     */
200    private volatile SyncPointState smResumedSyncPoint;
201    private Failed smResumptionFailed;
202
203    /**
204     * Represents the state of stream magement.
205     * <p>
206     * This boolean is marked volatile as it is read by various threads, including the reader thread via {@link #isSmEnabled()}.
207     * </p>
208     */
209    private volatile boolean smEnabledSyncPoint;
210
211    /**
212     * The client's preferred maximum resumption time in seconds.
213     */
214    private int smClientMaxResumptionTime = -1;
215
216    /**
217     * The server's preferred maximum resumption time in seconds.
218     */
219    private int smServerMaxResumptionTime = -1;
220
221    /**
222     * Indicates whether Stream Management (XEP-198) should be used if it's supported by the server.
223     */
224    private boolean useSm = useSmDefault;
225    private boolean useSmResumption = useSmResumptionDefault;
226
227    /**
228     * The counter that the server sends the client about it's current height. For example, if the server sends
229     * {@code <a h='42'/>}, then this will be set to 42 (while also handling the {@link #unacknowledgedStanzas} queue).
230     */
231    private long serverHandledStanzasCount = 0;
232
233    /**
234     * The counter for stanzas handled ("received") by the client.
235     * <p>
236     * Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are
237     * not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since
238     * {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and
239     * therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic.
240     * </p>
241     */
242    private long clientHandledStanzasCount = 0;
243
244    private BlockingQueue<Stanza> unacknowledgedStanzas;
245
246    /**
247     * Set to true if Stream Management was at least once enabled for this connection.
248     */
249    private boolean smWasEnabledAtLeastOnce = false;
250
251    /**
252     * This listeners are invoked for every stanza that got acknowledged.
253     * <p>
254     * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove
255     * themselves after they have been invoked.
256     * </p>
257     */
258    private final Collection<StanzaListener> stanzaAcknowledgedListeners = new ConcurrentLinkedQueue<>();
259
260    /**
261     * These listeners are invoked for every stanza that got dropped.
262     * <p>
263     * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove
264     * themselves after they have been invoked.
265     * </p>
266     */
267    private final Collection<StanzaListener> stanzaDroppedListeners = new ConcurrentLinkedQueue<>();
268
269    /**
270     * This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will
271     * only be invoked once and automatically removed after that.
272     */
273    private final Map<String, StanzaListener> stanzaIdAcknowledgedListeners = new ConcurrentHashMap<>();
274
275    /**
276     * Predicates that determine if an stream management ack should be requested from the server.
277     * <p>
278     * We use a linked hash set here, so that the order how the predicates are added matches the
279     * order in which they are invoked in order to determine if an ack request should be send or not.
280     * </p>
281     */
282    private final Set<StanzaFilter> requestAckPredicates = new LinkedHashSet<>();
283
284    @SuppressWarnings("HidingField")
285    private final XMPPTCPConnectionConfiguration config;
286
287    /**
288     * Creates a new XMPP connection over TCP (optionally using proxies).
289     * <p>
290     * Note that XMPPTCPConnection constructors do not establish a connection to the server
291     * and you must call {@link #connect()}.
292     * </p>
293     *
294     * @param config the connection configuration.
295     */
296    public XMPPTCPConnection(XMPPTCPConnectionConfiguration config) {
297        super(config);
298        this.config = config;
299        addConnectionListener(new ConnectionListener() {
300            @Override
301            public void connectionClosedOnError(Exception e) {
302                if (e instanceof XMPPException.StreamErrorException || e instanceof StreamManagementException) {
303                    dropSmState();
304                }
305            }
306        });
307
308        // Re-init the reader and writer in case of SASL <success/>. This is done to reset the parser since a new stream
309        // is initiated.
310        buildNonzaCallback().listenFor(SaslNonza.Success.class, s -> resetParser()).install();
311    }
312
313    /**
314     * Creates a new XMPP connection over TCP.
315     * <p>
316     * Note that {@code jid} must be the bare JID, e.g. "user@example.org". More fine-grained control over the
317     * connection settings is available using the {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)}
318     * constructor.
319     * </p>
320     *
321     * @param jid the bare JID used by the client.
322     * @param password the password or authentication token.
323     * @throws XmppStringprepException if the provided string is invalid.
324     */
325    public XMPPTCPConnection(CharSequence jid, String password) throws XmppStringprepException {
326        this(XMPPTCPConnectionConfiguration.builder().setXmppAddressAndPassword(jid, password).build());
327    }
328
329    /**
330     * Creates a new XMPP connection over TCP.
331     * <p>
332     * This is the simplest constructor for connecting to an XMPP server. Alternatively,
333     * you can get fine-grained control over connection settings using the
334     * {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} constructor.
335     * </p>
336     * @param username TODO javadoc me please
337     * @param password TODO javadoc me please
338     * @param serviceName TODO javadoc me please
339     * @throws XmppStringprepException if the provided string is invalid.
340     */
341    public XMPPTCPConnection(CharSequence username, String password, String serviceName) throws XmppStringprepException {
342        this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setXmppDomain(
343                                        JidCreate.domainBareFrom(serviceName)).build());
344    }
345
346    @Override
347    protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException {
348        if (packetWriter == null) {
349            throw new NotConnectedException();
350        }
351        packetWriter.throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
352    }
353
354    @Override
355    protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException {
356        if (isConnected() && !disconnectedButResumeable) {
357            throw new AlreadyConnectedException();
358        }
359    }
360
361    @Override
362    protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException {
363        if (isAuthenticated() && !disconnectedButResumeable) {
364            throw new AlreadyLoggedInException();
365        }
366    }
367
368    @Override
369    protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException {
370        // Reset the flag in case it was set
371        disconnectedButResumeable = false;
372        super.afterSuccessfulLogin(resumed);
373    }
374
375    @Override
376    protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException,
377                    SmackException, IOException, InterruptedException {
378        // Authenticate using SASL
379        SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null;
380
381        streamFeaturesAfterAuthenticationReceived = false;
382        authenticate(username, password, config.getAuthzid(), sslSession);
383
384        // Wait for stream features after the authentication.
385        // TODO: The name of this synchronization point "maybeCompressFeaturesReceived" is not perfect. It should be
386        // renamed to "streamFeaturesAfterAuthenticationReceived".
387        waitForConditionOrThrowConnectionException(() -> streamFeaturesAfterAuthenticationReceived, "compress features from server");
388
389        // If compression is enabled then request the server to use stream compression. XEP-170
390        // recommends to perform stream compression before resource binding.
391        maybeEnableCompression();
392
393        smResumedSyncPoint = SyncPointState.initial;
394        smResumptionFailed = null;
395        if (isSmResumptionPossible()) {
396            smResumedSyncPoint = SyncPointState.request_sent;
397            sendNonza(new Resume(clientHandledStanzasCount, smSessionId));
398            waitForConditionOrThrowConnectionException(() -> smResumedSyncPoint == SyncPointState.successful || smResumptionFailed != null, "resume previous stream");
399            if (smResumedSyncPoint == SyncPointState.successful) {
400                // We successfully resumed the stream, be done here
401                afterSuccessfulLogin(true);
402                return;
403            }
404            // SM resumption failed, what Smack does here is to report success of
405            // lastFeaturesReceived in case of sm resumption was answered with 'failed' so that
406            // normal resource binding can be tried.
407            assert smResumptionFailed != null;
408            LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process: " + smResumptionFailed);
409        }
410
411        // We either failed to resume a previous stream management (SM) session, or we did not even try. In any case,
412        // mark SM as not enabled. Most importantly, we do this prior calling bindResourceAndEstablishSession(), as the
413        // bind IQ may trigger a SM ack request, which would be invalid in the pre resource bound state.
414        smEnabledSyncPoint = false;
415
416        List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>();
417        if (unacknowledgedStanzas != null) {
418            // There was a previous connection with SM enabled but that was either not resumable or
419            // failed to resume. Make sure that we (re-)send the unacknowledged stanzas.
420            unacknowledgedStanzas.drainTo(previouslyUnackedStanzas);
421            // Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this
422            // XMPP session (There maybe was an enabled in a previous XMPP session of this
423            // connection instance though). This is used in writePackets to decide if stanzas should
424            // be added to the unacknowledged stanzas queue, because they have to be added right
425            // after the 'enable' stream element has been sent.
426            dropSmState();
427        }
428
429        // Now bind the resource. It is important to do this *after* we dropped an eventually
430        // existing Stream Management state. As otherwise <bind/> and <session/> may end up in
431        // unacknowledgedStanzas and become duplicated on reconnect. See SMACK-706.
432        bindResourceAndEstablishSession(resource);
433
434        if (isSmAvailable() && useSm) {
435            // Remove what is maybe left from previously stream managed sessions
436            serverHandledStanzasCount = 0;
437            sendNonza(new Enable(useSmResumption, smClientMaxResumptionTime));
438            // XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed'
439            // then this is a non recoverable error and we therefore throw an exception.
440            waitForConditionOrThrowConnectionException(() -> smEnabledSyncPoint, "enabling stream mangement");
441            synchronized (requestAckPredicates) {
442                if (requestAckPredicates.isEmpty()) {
443                    // Assure that we have at lest one predicate set up that so that we request acks
444                    // for the server and eventually flush some stanzas from the unacknowledged
445                    // stanza queue
446                    requestAckPredicates.add(Predicate.forMessagesOrAfter5Stanzas());
447                }
448            }
449        }
450        // Inform client about failed resumption if possible, resend stanzas otherwise
451        // Process the stanzas synchronously so a client can re-queue them for transmission
452        // before it is informed about connection success
453        if (!stanzaDroppedListeners.isEmpty()) {
454            for (Stanza stanza : previouslyUnackedStanzas) {
455                for (StanzaListener listener : stanzaDroppedListeners) {
456                    try {
457                        listener.processStanza(stanza);
458                    }
459                    catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
460                        LOGGER.log(Level.FINER, "StanzaDroppedListener received exception", e);
461                    }
462                }
463            }
464        } else {
465            for (Stanza stanza : previouslyUnackedStanzas) {
466                sendStanzaInternal(stanza);
467            }
468        }
469
470        afterSuccessfulLogin(false);
471    }
472
473    @Override
474    public boolean isSecureConnection() {
475        return secureSocket != null;
476    }
477
478    /**
479     * Shuts the current connection down. After this method returns, the connection must be ready
480     * for re-use by connect.
481     */
482    @Override
483    protected void shutdown() {
484        if (isSmEnabled()) {
485            try {
486                // Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM
487                // state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either.
488                sendSmAcknowledgementInternal();
489            } catch (InterruptedException | NotConnectedException e) {
490                LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e);
491            }
492        }
493        shutdown(false);
494    }
495
496    @Override
497    public synchronized void instantShutdown() {
498        shutdown(true);
499    }
500
501    private void shutdown(boolean instant) {
502        // The writer thread may already been finished at this point, for example when the connection is in the
503        // disconnected-but-resumable state. There is no need to wait for the closing stream tag from the server in this
504        // case.
505        if (!packetWriter.done()) {
506            // First shutdown the writer, this will result in a closing stream element getting send to
507            // the server
508            LOGGER.finer(packetWriter.threadName + " shutdown()");
509            packetWriter.shutdown(instant);
510            LOGGER.finer(packetWriter.threadName + " shutdown() returned");
511
512            if (!instant) {
513                waitForClosingStreamTagFromServer();
514            }
515        }
516
517        LOGGER.finer(packetReader.threadName + " shutdown()");
518        packetReader.shutdown();
519        LOGGER.finer(packetReader.threadName + " shutdown() returned");
520
521        CloseableUtil.maybeClose(socket, LOGGER);
522
523        setWasAuthenticated();
524
525        try {
526            boolean readerAndWriterThreadsTermianted = waitFor(() -> !packetWriter.running && !packetReader.running);
527            if (!readerAndWriterThreadsTermianted) {
528                LOGGER.severe("Reader and/or writer threads did not terminate timely. Writer running: "
529                                + packetWriter.running + ", Reader running: " + packetReader.running);
530            } else {
531                LOGGER.fine("Reader and writer threads terminated");
532            }
533        } catch (InterruptedException e) {
534            LOGGER.log(Level.FINE, "Interrupted while waiting for reader and writer threads to terminate", e);
535        }
536
537        if (disconnectedButResumeable) {
538            return;
539        }
540
541        // If we are able to resume the stream, then don't set
542        // connected/authenticated/usingTLS to false since we like behave like we are still
543        // connected (e.g. sendStanza should not throw a NotConnectedException).
544        if (instant) {
545            disconnectedButResumeable = isSmResumptionPossible();
546            if (!disconnectedButResumeable) {
547                // Reset the stream management session id to null, since the stream is no longer resumable. Note that we
548                // keep the unacknowledgedStanzas queue, because we want to resend them when we are reconnected.
549                smSessionId = null;
550            }
551        } else {
552            disconnectedButResumeable = false;
553
554            // Drop the stream management state if this is not an instant shutdown. We send
555            // a </stream> close tag and now the stream management state is no longer valid.
556            // This also prevents that we will potentially (re-)send any unavailable presence we
557            // may have send, because it got put into the unacknowledged queue and was not acknowledged before the
558            // connection terminated.
559            dropSmState();
560            // Note that we deliberately do not reset authenticatedConnectionInitiallyEstablishedTimestamp here, so that the
561            // information is available in the connectionClosedOnError() listeners.
562        }
563        authenticated = false;
564        connected = false;
565        secureSocket = null;
566        reader = null;
567        writer = null;
568
569        initState();
570    }
571
572    @Override
573    public void sendNonza(Nonza element) throws NotConnectedException, InterruptedException {
574        packetWriter.sendStreamElement(element);
575    }
576
577    @Override
578    protected void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException {
579        packetWriter.sendStreamElement(packet);
580        if (isSmEnabled()) {
581            for (StanzaFilter requestAckPredicate : requestAckPredicates) {
582                if (requestAckPredicate.accept(packet)) {
583                    requestSmAcknowledgementInternal();
584                    break;
585                }
586            }
587        }
588    }
589
590    private void connectUsingConfiguration() throws ConnectionException, IOException, InterruptedException {
591        RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result = RemoteXmppTcpConnectionEndpoints.lookup(config);
592
593        List<RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint>> connectionExceptions = new ArrayList<>();
594
595        SocketFactory socketFactory = config.getSocketFactory();
596        ProxyInfo proxyInfo = config.getProxyInfo();
597        int timeout = config.getConnectTimeout();
598        if (socketFactory == null) {
599            socketFactory = SocketFactory.getDefault();
600        }
601        for (Rfc6120TcpRemoteConnectionEndpoint endpoint : result.discoveredRemoteConnectionEndpoints) {
602            Iterator<? extends InetAddress> inetAddresses;
603            String host = endpoint.getHost().toString();
604            UInt16 portUint16 = endpoint.getPort();
605            int port = portUint16.intValue();
606            if (proxyInfo == null) {
607                inetAddresses = endpoint.getInetAddresses().iterator();
608                assert inetAddresses.hasNext();
609
610                innerloop: while (inetAddresses.hasNext()) {
611                    // Create a *new* Socket before every connection attempt, i.e. connect() call, since Sockets are not
612                    // re-usable after a failed connection attempt. See also SMACK-724.
613                    SmackFuture.SocketFuture socketFuture = new SmackFuture.SocketFuture(socketFactory);
614
615                    final InetAddress inetAddress = inetAddresses.next();
616                    final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, port);
617                    LOGGER.finer("Trying to establish TCP connection to " + inetSocketAddress);
618                    socketFuture.connectAsync(inetSocketAddress, timeout);
619
620                    try {
621                        socket = socketFuture.getOrThrow();
622                    } catch (IOException e) {
623                        RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>(
624                                        endpoint, inetAddress, e);
625                        connectionExceptions.add(rce);
626                        if (inetAddresses.hasNext()) {
627                            continue innerloop;
628                        } else {
629                            break innerloop;
630                        }
631                    }
632                    LOGGER.finer("Established TCP connection to " + inetSocketAddress);
633                    // We found a host to connect to, return here
634                    this.host = host;
635                    this.port = portUint16;
636                    return;
637                }
638            } else {
639                // TODO: Move this into the inner-loop above. There appears no reason why we should not try a proxy
640                // connection to every inet address of each connection endpoint.
641                socket = socketFactory.createSocket();
642                StringUtils.requireNotNullNorEmpty(host, "Host of endpoint " + endpoint + " must not be null when using a Proxy");
643                final String hostAndPort = host + " at port " + port;
644                LOGGER.finer("Trying to establish TCP connection via Proxy to " + hostAndPort);
645                try {
646                    proxyInfo.getProxySocketConnection().connect(socket, host, port, timeout);
647                } catch (IOException e) {
648                    CloseableUtil.maybeClose(socket, LOGGER);
649                    RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>(endpoint, null, e);
650                    connectionExceptions.add(rce);
651                    continue;
652                }
653                LOGGER.finer("Established TCP connection to " + hostAndPort);
654                // We found a host to connect to, return here
655                this.host = host;
656                this.port = portUint16;
657                return;
658            }
659        }
660
661        // There are no more host addresses to try
662        // throw an exception and report all tried
663        // HostAddresses in the exception
664        throw EndpointConnectionException.from(result.lookupFailures, connectionExceptions);
665    }
666
667    /**
668     * Initializes the connection by creating a stanza reader and writer and opening a
669     * XMPP stream to the server.
670     *
671     * @throws XMPPException if establishing a connection to the server fails.
672     * @throws SmackException if the server fails to respond back or if there is anther error.
673     * @throws IOException if an I/O error occurred.
674     * @throws InterruptedException if the calling thread was interrupted.
675     */
676    private void initConnection() throws IOException, InterruptedException {
677        compressionHandler = null;
678
679        // Set the reader and writer instance variables
680        initReaderAndWriter();
681
682        // Start the writer thread. This will open an XMPP stream to the server
683        packetWriter.init();
684        // Start the reader thread. The startup() method will block until we
685        // get an opening stream packet back from server
686        packetReader.init();
687    }
688
689    private void initReaderAndWriter() throws IOException {
690        InputStream is = socket.getInputStream();
691        OutputStream os = socket.getOutputStream();
692        if (compressionHandler != null) {
693            is = compressionHandler.getInputStream(is);
694            os = compressionHandler.getOutputStream(os);
695        }
696        // OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter
697        writer = new OutputStreamWriter(os, "UTF-8");
698        reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
699
700        // If debugging is enabled, we open a window and write out all network traffic.
701        initDebugger();
702    }
703
704    /**
705     * The server has indicated that TLS negotiation can start. We now need to secure the
706     * existing plain connection and perform a handshake. This method won't return until the
707     * connection has finished the handshake or an error occurred while securing the connection.
708     * @throws IOException if an I/O error occurred.
709     * @throws SecurityNotPossibleException if TLS is not possible.
710     * @throws CertificateException if there is an issue with the certificate.
711     */
712    @SuppressWarnings("LiteralClassName")
713    private void proceedTLSReceived() throws IOException, SecurityNotPossibleException, CertificateException {
714        SmackTlsContext smackTlsContext = getSmackTlsContext();
715
716        Socket plain = socket;
717        int port = plain.getPort();
718        String xmppServiceDomainString = config.getXMPPServiceDomain().toString();
719        SSLSocketFactory sslSocketFactory = smackTlsContext.sslContext.getSocketFactory();
720        // Secure the plain connection
721        socket = sslSocketFactory.createSocket(plain, xmppServiceDomainString, port, true);
722
723        final SSLSocket sslSocket = (SSLSocket) socket;
724        // Immediately set the enabled SSL protocols and ciphers. See SMACK-712 why this is
725        // important (at least on certain platforms) and it seems to be a good idea anyways to
726        // prevent an accidental implicit handshake.
727        TLSUtils.setEnabledProtocolsAndCiphers(sslSocket, config.getEnabledSSLProtocols(), config.getEnabledSSLCiphers());
728
729        // Initialize the reader and writer with the new secured version
730        initReaderAndWriter();
731
732        // Proceed to do the handshake
733        sslSocket.startHandshake();
734
735        if (smackTlsContext.daneVerifier != null) {
736            smackTlsContext.daneVerifier.finish(sslSocket.getSession());
737        }
738
739        final HostnameVerifier verifier = getConfiguration().getHostnameVerifier();
740        if (verifier == null) {
741                throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure.");
742        }
743
744        final String verifierHostname;
745        {
746            DnsName xmppServiceDomainDnsName = getConfiguration().getXmppServiceDomainAsDnsNameIfPossible();
747            // Try to convert the XMPP service domain, which potentially includes Unicode characters, into ASCII
748            // Compatible Encoding (ACE) to match RFC3280 dNSname IA5String constraint.
749            // See also: https://bugzilla.mozilla.org/show_bug.cgi?id=280839#c1
750            if (xmppServiceDomainDnsName != null) {
751                verifierHostname = xmppServiceDomainDnsName.ace;
752            }
753            else {
754                LOGGER.log(Level.WARNING, "XMPP service domain name '" + getXMPPServiceDomain()
755                                + "' can not be represented as DNS name. TLS X.509 certificate validiation may fail.");
756                verifierHostname = getXMPPServiceDomain().toString();
757            }
758        }
759
760        final boolean verificationSuccessful;
761        // Verify the TLS session.
762        verificationSuccessful = verifier.verify(verifierHostname, sslSocket.getSession());
763        if (!verificationSuccessful) {
764            throw new CertificateException(
765                            "Hostname verification of certificate failed. Certificate does not authenticate "
766                                            + getXMPPServiceDomain());
767        }
768
769        // Set that TLS was successful
770        secureSocket = sslSocket;
771    }
772
773    /**
774     * Returns the compression handler that can be used for one compression methods offered by the server.
775     *
776     * @return a instance of XMPPInputOutputStream or null if no suitable instance was found
777     *
778     */
779    private static XMPPInputOutputStream maybeGetCompressionHandler(Compress.Feature compression) {
780        for (XMPPInputOutputStream handler : SmackConfiguration.getCompressionHandlers()) {
781                String method = handler.getCompressionMethod();
782                if (compression.getMethods().contains(method))
783                    return handler;
784        }
785        return null;
786    }
787
788    @Override
789    public boolean isUsingCompression() {
790        return compressionHandler != null && compressSyncPoint;
791    }
792
793    /**
794     * <p>
795     * Starts using stream compression that will compress network traffic. Traffic can be
796     * reduced up to 90%. Therefore, stream compression is ideal when using a slow speed network
797     * connection. However, the server and the client will need to use more CPU time in order to
798     * un/compress network data so under high load the server performance might be affected.
799     * </p>
800     * <p>
801     * Stream compression has to have been previously offered by the server. Currently only the
802     * zlib method is supported by the client. Stream compression negotiation has to be done
803     * before authentication took place.
804     * </p>
805     *
806     * @throws NotConnectedException if the XMPP connection is not connected.
807     * @throws SmackException if Smack detected an exceptional situation.
808     * @throws InterruptedException if the calling thread was interrupted.
809     * @throws XMPPException if an XMPP protocol error was received.
810     */
811    private void maybeEnableCompression() throws SmackException, InterruptedException, XMPPException {
812        if (!config.isCompressionEnabled()) {
813            return;
814        }
815
816        Compress.Feature compression = getFeature(Compress.Feature.class);
817        if (compression == null) {
818            // Server does not support compression
819            return;
820        }
821        // If stream compression was offered by the server and we want to use
822        // compression then send compression request to the server
823        if ((compressionHandler = maybeGetCompressionHandler(compression)) != null) {
824            compressSyncPoint = false;
825            sendNonza(new Compress(compressionHandler.getCompressionMethod()));
826            waitForConditionOrThrowConnectionException(() -> compressSyncPoint, "establishing stream compression");
827        } else {
828            LOGGER.warning("Could not enable compression because no matching handler/method pair was found");
829        }
830    }
831
832    /**
833     * Establishes a connection to the XMPP server. It basically
834     * creates and maintains a socket connection to the server.
835     * <p>
836     * Listeners will be preserved from a previous connection if the reconnection
837     * occurs after an abrupt termination.
838     * </p>
839     *
840     * @throws XMPPException if an error occurs while trying to establish the connection.
841     * @throws SmackException if Smack detected an exceptional situation.
842     * @throws IOException if an I/O error occurred.
843     * @throws InterruptedException if the calling thread was interrupted.
844     */
845    @Override
846    protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException {
847        // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if
848        // there is an error establishing the connection
849        connectUsingConfiguration();
850
851        connected = true;
852
853        // We connected successfully to the servers TCP port
854        initConnection();
855
856        // TLS handled will be true either if TLS was established, or if it was not mandatory.
857        waitForConditionOrThrowConnectionException(() -> tlsHandled, "establishing TLS");
858
859        // Wait with SASL auth until the SASL mechanisms have been received
860        waitForConditionOrThrowConnectionException(() -> saslFeatureReceived, "SASL mechanisms stream feature from server");
861    }
862
863    /**
864     * For unit testing purposes
865     *
866     * @param writer TODO javadoc me please
867     */
868    protected void setWriter(Writer writer) {
869        this.writer = writer;
870    }
871
872    @Override
873    protected void afterFeaturesReceived() throws NotConnectedException, InterruptedException, SecurityRequiredByServerException {
874        StartTls startTlsFeature = getFeature(StartTls.class);
875        if (startTlsFeature != null) {
876            if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) {
877                SecurityRequiredByServerException smackException = new SecurityRequiredByServerException();
878                currentSmackException = smackException;
879                notifyWaitingThreads();
880                throw smackException;
881            }
882
883            if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) {
884                sendNonza(new StartTls());
885            } else {
886                tlsHandled = true;
887                notifyWaitingThreads();
888            }
889        } else {
890            tlsHandled = true;
891            notifyWaitingThreads();
892        }
893
894        if (isSaslAuthenticated()) {
895            // If we have received features after the SASL has been successfully completed, then we
896            // have also *maybe* received, as it is an optional feature, the compression feature
897            // from the server.
898            streamFeaturesAfterAuthenticationReceived = true;
899            notifyWaitingThreads();
900        }
901    }
902
903    private void resetParser() throws IOException {
904        try {
905            packetReader.parser = SmackXmlParser.newXmlParser(reader);
906        } catch (XmlPullParserException e) {
907            throw new IOException(e);
908        }
909   }
910
911    private void openStreamAndResetParser() throws IOException, NotConnectedException, InterruptedException {
912        sendStreamOpen();
913        resetParser();
914    }
915
916    protected class PacketReader {
917
918        private final String threadName = "Smack Reader (" + getConnectionCounter() + ')';
919
920        XmlPullParser parser;
921
922        private volatile boolean done;
923
924        private boolean running;
925
926        /**
927         * Initializes the reader in order to be used. The reader is initialized during the
928         * first connection and when reconnecting due to an abruptly disconnection.
929         */
930        void init() {
931            done = false;
932
933            running = true;
934            Async.go(new Runnable() {
935                @Override
936                public void run() {
937                    LOGGER.finer(threadName + " start");
938                    try {
939                        parsePackets();
940                    } finally {
941                        LOGGER.finer(threadName + " exit");
942                        running = false;
943                        notifyWaitingThreads();
944                    }
945                }
946            }, threadName);
947         }
948
949        /**
950         * Shuts the stanza reader down. This method simply sets the 'done' flag to true.
951         */
952        void shutdown() {
953            done = true;
954        }
955
956        /**
957         * Parse top-level packets in order to process them further.
958         */
959        private void parsePackets() {
960            try {
961                openStreamAndResetParser();
962                XmlPullParser.Event eventType = parser.getEventType();
963                while (!done) {
964                    switch (eventType) {
965                    case START_ELEMENT:
966                        final String name = parser.getName();
967                        switch (name) {
968                        case Message.ELEMENT:
969                        case IQ.IQ_ELEMENT:
970                        case Presence.ELEMENT:
971                            try {
972                                parseAndProcessStanza(parser);
973                            } finally {
974                                clientHandledStanzasCount = SMUtils.incrementHeight(clientHandledStanzasCount);
975                            }
976                            break;
977                        case "stream":
978                            onStreamOpen(parser);
979                            break;
980                        case "error":
981                            StreamError streamError = PacketParserUtils.parseStreamError(parser);
982                            // Stream errors are non recoverable, throw this exceptions. Also note that this will set
983                            // this exception as current connection exceptions and notify any waiting threads.
984                            throw new StreamErrorException(streamError);
985                        case "features":
986                            parseFeaturesAndNotify(parser);
987                            break;
988                        case "proceed":
989                            // Secure the connection by negotiating TLS
990                            proceedTLSReceived();
991                            // Send a new opening stream to the server
992                            openStreamAndResetParser();
993                            break;
994                        case "failure":
995                            String namespace = parser.getNamespace(null);
996                            switch (namespace) {
997                            case "urn:ietf:params:xml:ns:xmpp-tls":
998                                // TLS negotiation has failed. The server will close the connection
999                                // TODO Parse failure stanza
1000                                throw new SmackException.SmackMessageException("TLS negotiation has failed");
1001                            case "http://jabber.org/protocol/compress":
1002                                // Stream compression has been denied. This is a recoverable
1003                                // situation. It is still possible to authenticate and
1004                                // use the connection but using an uncompressed connection
1005                                // TODO Parse failure stanza
1006                                currentSmackException = new SmackException.SmackMessageException("Could not establish compression");
1007                                notifyWaitingThreads();
1008                                break;
1009                            default:
1010                                parseAndProcessNonza(parser);
1011                            }
1012                            break;
1013                        case Compressed.ELEMENT:
1014                            // Server confirmed that it's possible to use stream compression. Start
1015                            // stream compression
1016                            // Initialize the reader and writer with the new compressed version
1017                            initReaderAndWriter();
1018                            // Send a new opening stream to the server
1019                            openStreamAndResetParser();
1020                            // Notify that compression is being used
1021                            compressSyncPoint = true;
1022                            notifyWaitingThreads();
1023                            break;
1024                        case Enabled.ELEMENT:
1025                            Enabled enabled = ParseStreamManagement.enabled(parser);
1026                            if (enabled.isResumeSet()) {
1027                                smSessionId = enabled.getId();
1028                                if (StringUtils.isNullOrEmpty(smSessionId)) {
1029                                    SmackException xmppException = new SmackException.SmackMessageException("Stream Management 'enabled' element with resume attribute but without session id received");
1030                                    setCurrentConnectionExceptionAndNotify(xmppException);
1031                                    throw xmppException;
1032                                }
1033                                smServerMaxResumptionTime = enabled.getMaxResumptionTime();
1034                            } else {
1035                                // Mark this a non-resumable stream by setting smSessionId to null
1036                                smSessionId = null;
1037                            }
1038                            clientHandledStanzasCount = 0;
1039                            smWasEnabledAtLeastOnce = true;
1040                            smEnabledSyncPoint = true;
1041                            notifyWaitingThreads();
1042                            break;
1043                        case Failed.ELEMENT:
1044                            Failed failed = ParseStreamManagement.failed(parser);
1045                            if (smResumedSyncPoint == SyncPointState.request_sent) {
1046                                // This is a <failed/> nonza in a response to resuming a previous stream, failure to do
1047                                // so is non-fatal as we can simply continue with resource binding in this case.
1048                                smResumptionFailed = failed;
1049                                notifyWaitingThreads();
1050                            } else {
1051                                FailedNonzaException xmppException = new FailedNonzaException(failed, failed.getStanzaErrorCondition());
1052                                setCurrentConnectionExceptionAndNotify(xmppException);
1053                            }
1054                            break;
1055                        case Resumed.ELEMENT:
1056                            Resumed resumed = ParseStreamManagement.resumed(parser);
1057                            if (!smSessionId.equals(resumed.getPrevId())) {
1058                                throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId());
1059                            }
1060                            // Mark SM as enabled
1061                            smEnabledSyncPoint = true;
1062                            // First, drop the stanzas already handled by the server
1063                            processHandledCount(resumed.getHandledCount());
1064                            // Then re-send what is left in the unacknowledged queue
1065                            List<Stanza> stanzasToResend = new ArrayList<>(unacknowledgedStanzas.size());
1066                            unacknowledgedStanzas.drainTo(stanzasToResend);
1067                            for (Stanza stanza : stanzasToResend) {
1068                                sendStanzaInternal(stanza);
1069                            }
1070                            // If there where stanzas resent, then request a SM ack for them.
1071                            // Writer's sendStreamElement() won't do it automatically based on
1072                            // predicates.
1073                            if (!stanzasToResend.isEmpty()) {
1074                                requestSmAcknowledgementInternal();
1075                            }
1076                            // Mark SM resumption as successful
1077                            smResumedSyncPoint = SyncPointState.successful;
1078                            notifyWaitingThreads();
1079                            break;
1080                        case AckAnswer.ELEMENT:
1081                            AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser);
1082                            processHandledCount(ackAnswer.getHandledCount());
1083                            break;
1084                        case AckRequest.ELEMENT:
1085                            ParseStreamManagement.ackRequest(parser);
1086                            if (smEnabledSyncPoint) {
1087                                sendSmAcknowledgementInternal();
1088                            } else {
1089                                LOGGER.warning("SM Ack Request received while SM is not enabled");
1090                            }
1091                            break;
1092                         default:
1093                             parseAndProcessNonza(parser);
1094                             break;
1095                        }
1096                        break;
1097                    case END_ELEMENT:
1098                        final String endTagName = parser.getName();
1099                        if ("stream".equals(endTagName)) {
1100                            if (!parser.getNamespace().equals("http://etherx.jabber.org/streams")) {
1101                                LOGGER.warning(XMPPTCPConnection.this +  " </stream> but different namespace " + parser.getNamespace());
1102                                break;
1103                            }
1104
1105                            // Check if the queue was already shut down before reporting success on closing stream tag
1106                            // received. This avoids a race if there is a disconnect(), followed by a connect(), which
1107                            // did re-start the queue again, causing this writer to assume that the queue is not
1108                            // shutdown, which results in a call to disconnect().
1109                            final boolean queueWasShutdown = packetWriter.queue.isShutdown();
1110                            closingStreamReceived = true;
1111                            notifyWaitingThreads();
1112
1113                            if (queueWasShutdown) {
1114                                // We received a closing stream element *after* we initiated the
1115                                // termination of the session by sending a closing stream element to
1116                                // the server first
1117                                return;
1118                            } else {
1119                                // We received a closing stream element from the server without us
1120                                // sending a closing stream element first. This means that the
1121                                // server wants to terminate the session, therefore disconnect
1122                                // the connection
1123                                LOGGER.info(XMPPTCPConnection.this
1124                                                + " received closing </stream> element."
1125                                                + " Server wants to terminate the connection, calling disconnect()");
1126                                ASYNC_BUT_ORDERED.performAsyncButOrdered(XMPPTCPConnection.this, new Runnable() {
1127                                    @Override
1128                                    public void run() {
1129                                        disconnect();
1130                                    }});
1131                            }
1132                        }
1133                        break;
1134                    case END_DOCUMENT:
1135                        // END_DOCUMENT only happens in an error case, as otherwise we would see a
1136                        // closing stream element before.
1137                        throw new SmackException.SmackMessageException(
1138                                        "Parser got END_DOCUMENT event. This could happen e.g. if the server closed the connection without sending a closing stream element");
1139                    default:
1140                        // Catch all for incomplete switch (MissingCasesInEnumSwitch) statement.
1141                        break;
1142                    }
1143                    eventType = parser.next();
1144                }
1145            }
1146            catch (Exception e) {
1147                // Set running to false since this thread will exit here and notifyConnectionError() will wait until
1148                // the reader and writer thread's 'running' value is false. Hence we need to set it to false before calling
1149                // notifyConnetctionError() below, even though run() also sets it to false. Therefore, do not remove this.
1150                running = false;
1151
1152                String ignoreReasonThread = null;
1153
1154                boolean writerThreadWasShutDown = packetWriter.queue.isShutdown();
1155                if (writerThreadWasShutDown) {
1156                    ignoreReasonThread = "writer";
1157                } else if (done) {
1158                    ignoreReasonThread = "reader";
1159                }
1160
1161                if (ignoreReasonThread != null) {
1162                    LOGGER.log(Level.FINER, "Ignoring " + e + " as " + ignoreReasonThread + " was already shut down");
1163                    return;
1164                }
1165
1166                // Close the connection and notify connection listeners of the error.
1167                notifyConnectionError(e);
1168            }
1169        }
1170    }
1171
1172    protected class PacketWriter {
1173        public static final int QUEUE_SIZE = XMPPTCPConnection.QUEUE_SIZE;
1174        public static final int UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE = 1024;
1175        public static final int UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE_HIGH_WATER_MARK = (int) (0.3 * UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE);
1176
1177        private final String threadName = "Smack Writer (" + getConnectionCounter() + ')';
1178
1179        private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<>(
1180                        QUEUE_SIZE, true);
1181
1182        /**
1183         * If set, the stanza writer is shut down
1184         */
1185        protected volatile Long shutdownTimestamp = null;
1186
1187        private volatile boolean instantShutdown;
1188
1189        /**
1190         * True if some preconditions are given to start the bundle and defer mechanism.
1191         * <p>
1192         * This will likely get set to true right after the start of the writer thread, because
1193         * {@link #nextStreamElement()} will check if {@link queue} is empty, which is probably the case, and then set
1194         * this field to true.
1195         * </p>
1196         */
1197        private boolean shouldBundleAndDefer;
1198
1199        private boolean running;
1200
1201        /**
1202        * Initializes the writer in order to be used. It is called at the first connection and also
1203        * is invoked if the connection is disconnected by an error.
1204        */
1205        void init() {
1206            shutdownTimestamp = null;
1207
1208            if (unacknowledgedStanzas != null) {
1209                // It's possible that there are new stanzas in the writer queue that
1210                // came in while we were disconnected but resumable, drain those into
1211                // the unacknowledged queue so that they get resent now
1212                drainWriterQueueToUnacknowledgedStanzas();
1213            }
1214
1215            queue.start();
1216            running = true;
1217            Async.go(new Runnable() {
1218                @Override
1219                public void run() {
1220                    LOGGER.finer(threadName + " start");
1221                    try {
1222                        writePackets();
1223                    } finally {
1224                        LOGGER.finer(threadName + " exit");
1225                        running = false;
1226                        notifyWaitingThreads();
1227                    }
1228                }
1229            }, threadName);
1230        }
1231
1232        private boolean done() {
1233            return shutdownTimestamp != null;
1234        }
1235
1236        protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException {
1237            final boolean done = done();
1238            if (done) {
1239                final boolean smResumptionPossible = isSmResumptionPossible();
1240                // Don't throw a NotConnectedException is there is an resumable stream available
1241                if (!smResumptionPossible) {
1242                    throw new NotConnectedException(XMPPTCPConnection.this, "done=" + done
1243                                    + " smResumptionPossible=" + smResumptionPossible);
1244                }
1245            }
1246        }
1247
1248        /**
1249         * Sends the specified element to the server.
1250         *
1251         * @param element the element to send.
1252         * @throws NotConnectedException if the XMPP connection is not connected.
1253         * @throws InterruptedException if the calling thread was interrupted.
1254         */
1255        protected void sendStreamElement(Element element) throws NotConnectedException, InterruptedException {
1256            throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
1257            try {
1258                queue.put(element);
1259            }
1260            catch (InterruptedException e) {
1261                // put() may throw an InterruptedException for two reasons:
1262                // 1. If the queue was shut down
1263                // 2. If the thread was interrupted
1264                // so we have to check which is the case
1265                throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
1266                // If the method above did not throw, then the sending thread was interrupted
1267                throw e;
1268            }
1269        }
1270
1271        /**
1272         * Shuts down the stanza writer. Once this method has been called, no further
1273         * packets will be written to the server.
1274         */
1275        void shutdown(boolean instant) {
1276            instantShutdown = instant;
1277            queue.shutdown();
1278            shutdownTimestamp = System.currentTimeMillis();
1279        }
1280
1281        /**
1282         * Maybe return the next available element from the queue for writing. If the queue is shut down <b>or</b> a
1283         * spurious interrupt occurs, <code>null</code> is returned. So it is important to check the 'done' condition in
1284         * that case.
1285         *
1286         * @return the next element for writing or null.
1287         */
1288        private Element nextStreamElement() {
1289            // It is important the we check if the queue is empty before removing an element from it
1290            if (queue.isEmpty()) {
1291                shouldBundleAndDefer = true;
1292            }
1293            Element packet = null;
1294            try {
1295                packet = queue.take();
1296            }
1297            catch (InterruptedException e) {
1298                if (!queue.isShutdown()) {
1299                    // Users shouldn't try to interrupt the packet writer thread
1300                    LOGGER.log(Level.WARNING, "Writer thread was interrupted. Don't do that. Use disconnect() instead.", e);
1301                }
1302            }
1303            return packet;
1304        }
1305
1306        private void writePackets() {
1307            try {
1308                // Write out packets from the queue.
1309                while (!done()) {
1310                    Element element = nextStreamElement();
1311                    if (element == null) {
1312                        continue;
1313                    }
1314
1315                    // Get a local version of the bundle and defer callback, in case it's unset
1316                    // between the null check and the method invocation
1317                    final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback;
1318                    // If the preconditions are given (e.g. bundleAndDefer callback is set, queue is
1319                    // empty), then we could wait a bit for further stanzas attempting to decrease
1320                    // our energy consumption
1321                    if (localBundleAndDeferCallback != null && isAuthenticated() && shouldBundleAndDefer) {
1322                        // Reset shouldBundleAndDefer to false, nextStreamElement() will set it to true once the
1323                        // queue is empty again.
1324                        shouldBundleAndDefer = false;
1325                        final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean();
1326                        final int bundleAndDeferMillis = localBundleAndDeferCallback.getBundleAndDeferMillis(new BundleAndDefer(
1327                                        bundlingAndDeferringStopped));
1328                        if (bundleAndDeferMillis > 0) {
1329                            long remainingWait = bundleAndDeferMillis;
1330                            final long waitStart = System.currentTimeMillis();
1331                            synchronized (bundlingAndDeferringStopped) {
1332                                while (!bundlingAndDeferringStopped.get() && remainingWait > 0) {
1333                                    bundlingAndDeferringStopped.wait(remainingWait);
1334                                    remainingWait = bundleAndDeferMillis
1335                                                    - (System.currentTimeMillis() - waitStart);
1336                                }
1337                            }
1338                        }
1339                    }
1340
1341                    Stanza packet = null;
1342                    if (element instanceof Stanza) {
1343                        packet = (Stanza) element;
1344                    }
1345                    else if (element instanceof Enable) {
1346                        // The client needs to add messages to the unacknowledged stanzas queue
1347                        // right after it sent 'enabled'. Stanza will be added once
1348                        // unacknowledgedStanzas is not null.
1349                        unacknowledgedStanzas = new ArrayBlockingQueue<>(UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE);
1350                    }
1351                    maybeAddToUnacknowledgedStanzas(packet);
1352
1353                    CharSequence elementXml = element.toXML(outgoingStreamXmlEnvironment);
1354                    if (elementXml instanceof XmlStringBuilder) {
1355                        try {
1356                            ((XmlStringBuilder) elementXml).write(writer, outgoingStreamXmlEnvironment);
1357                        } catch (NullPointerException npe) {
1358                            LOGGER.log(Level.FINE, "NPE in XmlStringBuilder of " + element.getClass() + ": " + element, npe);
1359                            throw npe;
1360                        }
1361                    }
1362                    else {
1363                        writer.write(elementXml.toString());
1364                    }
1365
1366                    if (queue.isEmpty()) {
1367                        writer.flush();
1368                    }
1369                    if (packet != null) {
1370                        firePacketSendingListeners(packet);
1371                    }
1372                }
1373                if (!instantShutdown) {
1374                    // Flush out the rest of the queue.
1375                    try {
1376                        while (!queue.isEmpty()) {
1377                            Element packet = queue.remove();
1378                            if (packet instanceof Stanza) {
1379                                Stanza stanza = (Stanza) packet;
1380                                maybeAddToUnacknowledgedStanzas(stanza);
1381                            }
1382                            writer.write(packet.toXML().toString());
1383                        }
1384                    }
1385                    catch (Exception e) {
1386                        LOGGER.log(Level.WARNING,
1387                                        "Exception flushing queue during shutdown, ignore and continue",
1388                                        e);
1389                    }
1390
1391                    // Close the stream.
1392                    try {
1393                        writer.write("</stream:stream>");
1394                        writer.flush();
1395                    }
1396                    catch (Exception e) {
1397                        LOGGER.log(Level.WARNING, "Exception writing closing stream element", e);
1398                    }
1399
1400                    // Delete the queue contents (hopefully nothing is left).
1401                    queue.clear();
1402                } else if (instantShutdown && isSmEnabled()) {
1403                    // This was an instantShutdown and SM is enabled, drain all remaining stanzas
1404                    // into the unacknowledgedStanzas queue
1405                    drainWriterQueueToUnacknowledgedStanzas();
1406                }
1407                // Do *not* close the writer here, as it will cause the socket
1408                // to get closed. But we may want to receive further stanzas
1409                // until the closing stream tag is received. The socket will be
1410                // closed in shutdown().
1411            }
1412            catch (Exception e) {
1413                // The exception can be ignored if the the connection is 'done'
1414                // or if the it was caused because the socket got closed
1415                if (!(done() || queue.isShutdown())) {
1416                    // Set running to false since this thread will exit here and notifyConnectionError() will wait until
1417                    // the reader and writer thread's 'running' value is false.
1418                    running = false;
1419                    notifyConnectionError(e);
1420                } else {
1421                    LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e);
1422                }
1423            }
1424        }
1425
1426        private void drainWriterQueueToUnacknowledgedStanzas() {
1427            List<Element> elements = new ArrayList<>(queue.size());
1428            queue.drainTo(elements);
1429            for (int i = 0; i < elements.size(); i++) {
1430                Element element = elements.get(i);
1431                // If the unacknowledgedStanza queue is full, then bail out with a warning message. See SMACK-844.
1432                if (unacknowledgedStanzas.remainingCapacity() == 0) {
1433                    StreamManagementException.UnacknowledgedQueueFullException exception = StreamManagementException.UnacknowledgedQueueFullException
1434                            .newWith(i, elements, unacknowledgedStanzas);
1435                    LOGGER.log(Level.WARNING,
1436                            "Some stanzas may be lost as not all could be drained to the unacknowledged stanzas queue", exception);
1437                    return;
1438                }
1439                if (element instanceof Stanza) {
1440                    unacknowledgedStanzas.add((Stanza) element);
1441                }
1442            }
1443        }
1444
1445        private void maybeAddToUnacknowledgedStanzas(Stanza stanza) throws IOException {
1446            // Check if the stream element should be put to the unacknowledgedStanza
1447            // queue. Note that we can not do the put() in sendStanzaInternal() and the
1448            // packet order is not stable at this point (sendStanzaInternal() can be
1449            // called concurrently).
1450            if (unacknowledgedStanzas != null && stanza != null) {
1451                // If the unacknowledgedStanza queue reaching its high water mark, request an new ack
1452                // from the server in order to drain it
1453                if (unacknowledgedStanzas.size() == UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE_HIGH_WATER_MARK) {
1454                    writer.write(AckRequest.INSTANCE.toXML().toString());
1455                }
1456
1457                try {
1458                    // It is important the we put the stanza in the unacknowledged stanza
1459                    // queue before we put it on the wire
1460                    unacknowledgedStanzas.put(stanza);
1461                }
1462                catch (InterruptedException e) {
1463                    throw new IllegalStateException(e);
1464                }
1465            }
1466        }
1467    }
1468
1469    /**
1470     * Set if Stream Management should be used by default for new connections.
1471     *
1472     * @param useSmDefault true to use Stream Management for new connections.
1473     */
1474    public static void setUseStreamManagementDefault(boolean useSmDefault) {
1475        XMPPTCPConnection.useSmDefault = useSmDefault;
1476    }
1477
1478    /**
1479     * Set if Stream Management resumption should be used by default for new connections.
1480     *
1481     * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
1482     * @deprecated use {@link #setUseStreamManagementResumptionDefault(boolean)} instead.
1483     */
1484    @Deprecated
1485    public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) {
1486        setUseStreamManagementResumptionDefault(useSmResumptionDefault);
1487    }
1488
1489    /**
1490     * Set if Stream Management resumption should be used by default for new connections.
1491     *
1492     * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
1493     */
1494    public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) {
1495        if (useSmResumptionDefault) {
1496            // Also enable SM is resumption is enabled
1497            setUseStreamManagementDefault(useSmResumptionDefault);
1498        }
1499        XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault;
1500    }
1501
1502    /**
1503     * Set if Stream Management should be used if supported by the server.
1504     *
1505     * @param useSm true to use Stream Management.
1506     */
1507    public void setUseStreamManagement(boolean useSm) {
1508        this.useSm = useSm;
1509    }
1510
1511    /**
1512     * Set if Stream Management resumption should be used if supported by the server.
1513     *
1514     * @param useSmResumption true to use Stream Management resumption.
1515     */
1516    public void setUseStreamManagementResumption(boolean useSmResumption) {
1517        if (useSmResumption) {
1518            // Also enable SM is resumption is enabled
1519            setUseStreamManagement(useSmResumption);
1520        }
1521        this.useSmResumption = useSmResumption;
1522    }
1523
1524    /**
1525     * Set the preferred resumption time in seconds.
1526     * @param resumptionTime the preferred resumption time in seconds
1527     */
1528    public void setPreferredResumptionTime(int resumptionTime) {
1529        smClientMaxResumptionTime = resumptionTime;
1530    }
1531
1532    /**
1533     * Add a predicate for Stream Management acknowledgment requests.
1534     * <p>
1535     * Those predicates are used to determine when a Stream Management acknowledgement request is send to the server.
1536     * Some pre-defined predicates are found in the <code>org.jivesoftware.smack.sm.predicates</code> package.
1537     * </p>
1538     * <p>
1539     * If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used.
1540     * </p>
1541     *
1542     * @param predicate the predicate to add.
1543     * @return if the predicate was not already active.
1544     */
1545    public boolean addRequestAckPredicate(StanzaFilter predicate) {
1546        synchronized (requestAckPredicates) {
1547            return requestAckPredicates.add(predicate);
1548        }
1549    }
1550
1551    /**
1552     * Remove the given predicate for Stream Management acknowledgment request.
1553     * @param predicate the predicate to remove.
1554     * @return true if the predicate was removed.
1555     */
1556    public boolean removeRequestAckPredicate(StanzaFilter predicate) {
1557        synchronized (requestAckPredicates) {
1558            return requestAckPredicates.remove(predicate);
1559        }
1560    }
1561
1562    /**
1563     * Remove all predicates for Stream Management acknowledgment requests.
1564     */
1565    public void removeAllRequestAckPredicates() {
1566        synchronized (requestAckPredicates) {
1567            requestAckPredicates.clear();
1568        }
1569    }
1570
1571    /**
1572     * Send an unconditional Stream Management acknowledgement request to the server.
1573     *
1574     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1575     * @throws NotConnectedException if the connection is not connected.
1576     * @throws InterruptedException if the calling thread was interrupted.
1577     */
1578    public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException {
1579        if (!isSmEnabled()) {
1580            throw new StreamManagementException.StreamManagementNotEnabledException();
1581        }
1582        requestSmAcknowledgementInternal();
1583    }
1584
1585    private void requestSmAcknowledgementInternal() throws NotConnectedException, InterruptedException {
1586        packetWriter.sendStreamElement(AckRequest.INSTANCE);
1587    }
1588
1589    /**
1590     * Send a unconditional Stream Management acknowledgment to the server.
1591     * <p>
1592     * See <a href="http://xmpp.org/extensions/xep-0198.html#acking">XEP-198: Stream Management ยง 4. Acks</a>:
1593     * "Either party MAY send an &lt;a/&gt; element at any time (e.g., after it has received a certain number of stanzas,
1594     * or after a certain period of time), even if it has not received an &lt;r/&gt; element from the other party."
1595     * </p>
1596     *
1597     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1598     * @throws NotConnectedException if the connection is not connected.
1599     * @throws InterruptedException if the calling thread was interrupted.
1600     */
1601    public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException {
1602        if (!isSmEnabled()) {
1603            throw new StreamManagementException.StreamManagementNotEnabledException();
1604        }
1605        sendSmAcknowledgementInternal();
1606    }
1607
1608    private void sendSmAcknowledgementInternal() throws NotConnectedException, InterruptedException {
1609        AckAnswer ackAnswer = new AckAnswer(clientHandledStanzasCount);
1610        // Do net put an ack to the queue if it has already been shutdown. Some servers, like ejabberd, like to request
1611        // an ack even after we have send a stream close (and hance the queue was shutdown). If we would not check here,
1612        // then the ack would dangle around in the queue, and be send on the next re-connection attempt even before the
1613        // stream open.
1614        packetWriter.queue.putIfNotShutdown(ackAnswer);
1615    }
1616
1617    /**
1618     * Add a Stanza acknowledged listener.
1619     * <p>
1620     * Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get
1621     * automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, StanzaListener)} when
1622     * possible.
1623     * </p>
1624     *
1625     * @param listener the listener to add.
1626     */
1627    public void addStanzaAcknowledgedListener(StanzaListener listener) {
1628        stanzaAcknowledgedListeners.add(listener);
1629    }
1630
1631    /**
1632     * Remove the given Stanza acknowledged listener.
1633     *
1634     * @param listener the listener.
1635     * @return true if the listener was removed.
1636     */
1637    public boolean removeStanzaAcknowledgedListener(StanzaListener listener) {
1638        return stanzaAcknowledgedListeners.remove(listener);
1639    }
1640
1641    /**
1642     * Remove all stanza acknowledged listeners.
1643     */
1644    public void removeAllStanzaAcknowledgedListeners() {
1645        stanzaAcknowledgedListeners.clear();
1646    }
1647
1648    /**
1649     * Add a Stanza dropped listener.
1650     * <p>
1651     * Those listeners will be invoked every time a Stanza has been dropped due to a failed SM resume. They will not get
1652     * automatically removed. If at least one StanzaDroppedListener is configured, no attempt will be made to retransmit
1653     * the Stanzas.
1654     * </p>
1655     *
1656     * @param listener the listener to add.
1657     * @since 4.3.3
1658     */
1659    public void addStanzaDroppedListener(StanzaListener listener) {
1660        stanzaDroppedListeners.add(listener);
1661    }
1662
1663    /**
1664     * Remove the given Stanza dropped listener.
1665     *
1666     * @param listener the listener.
1667     * @return true if the listener was removed.
1668     * @since 4.3.3
1669     */
1670    public boolean removeStanzaDroppedListener(StanzaListener listener) {
1671        return stanzaDroppedListeners.remove(listener);
1672    }
1673
1674    /**
1675     * Add a new Stanza ID acknowledged listener for the given ID.
1676     * <p>
1677     * The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will
1678     * automatically be removed after the listener was run.
1679     * </p>
1680     *
1681     * @param id the stanza ID.
1682     * @param listener the listener to invoke.
1683     * @return the previous listener for this stanza ID or null.
1684     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1685     */
1686    @SuppressWarnings("FutureReturnValueIgnored")
1687    public StanzaListener addStanzaIdAcknowledgedListener(final String id, StanzaListener listener) throws StreamManagementNotEnabledException {
1688        // Prevent users from adding callbacks that will never get removed
1689        if (!smWasEnabledAtLeastOnce) {
1690            throw new StreamManagementException.StreamManagementNotEnabledException();
1691        }
1692        // Remove the listener after max. 3 hours
1693        final int removeAfterSeconds = Math.min(getMaxSmResumptionTime(), 3 * 60 * 60);
1694        schedule(new Runnable() {
1695            @Override
1696            public void run() {
1697                stanzaIdAcknowledgedListeners.remove(id);
1698            }
1699        }, removeAfterSeconds, TimeUnit.SECONDS);
1700        return stanzaIdAcknowledgedListeners.put(id, listener);
1701    }
1702
1703    /**
1704     * Remove the Stanza ID acknowledged listener for the given ID.
1705     *
1706     * @param id the stanza ID.
1707     * @return true if the listener was found and removed, false otherwise.
1708     */
1709    public StanzaListener removeStanzaIdAcknowledgedListener(String id) {
1710        return stanzaIdAcknowledgedListeners.remove(id);
1711    }
1712
1713    /**
1714     * Removes all Stanza ID acknowledged listeners.
1715     */
1716    public void removeAllStanzaIdAcknowledgedListeners() {
1717        stanzaIdAcknowledgedListeners.clear();
1718    }
1719
1720    /**
1721     * Returns true if Stream Management is supported by the server.
1722     *
1723     * @return true if Stream Management is supported by the server.
1724     */
1725    public boolean isSmAvailable() {
1726        return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE);
1727    }
1728
1729    /**
1730     * Returns true if Stream Management was successfully negotiated with the server.
1731     *
1732     * @return true if Stream Management was negotiated.
1733     */
1734    public boolean isSmEnabled() {
1735        return smEnabledSyncPoint;
1736    }
1737
1738    /**
1739     * Returns true if the stream was successfully resumed with help of Stream Management.
1740     *
1741     * @return true if the stream was resumed.
1742     */
1743    public boolean streamWasResumed() {
1744        return smResumedSyncPoint == SyncPointState.successful;
1745    }
1746
1747    /**
1748     * Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible.
1749     *
1750     * @return true if disconnected but resumption possible.
1751     */
1752    public boolean isDisconnectedButSmResumptionPossible() {
1753        return disconnectedButResumeable && isSmResumptionPossible();
1754    }
1755
1756    /**
1757     * Returns true if the stream is resumable.
1758     *
1759     * @return true if the stream is resumable.
1760     */
1761    public boolean isSmResumptionPossible() {
1762        // There is no resumable stream available
1763        if (smSessionId == null)
1764            return false;
1765
1766        final Long shutdownTimestamp = packetWriter.shutdownTimestamp;
1767        // Seems like we are already reconnected, report true
1768        if (shutdownTimestamp == null) {
1769            return true;
1770        }
1771
1772        // See if resumption time is over
1773        long current = System.currentTimeMillis();
1774        long maxResumptionMillies = ((long) getMaxSmResumptionTime()) * 1000;
1775        if (current > shutdownTimestamp + maxResumptionMillies) {
1776            // Stream resumption is *not* possible if the current timestamp is greater then the greatest timestamp where
1777            // resumption is possible
1778            return false;
1779        } else {
1780            return true;
1781        }
1782    }
1783
1784    /**
1785     * Drop the stream management state. Sets {@link #smSessionId} and
1786     * {@link #unacknowledgedStanzas} to <code>null</code>.
1787     */
1788    private void dropSmState() {
1789        // clientHandledCount and serverHandledCount will be reset on <enable/> and <enabled/>
1790        // respective. No need to reset them here.
1791        smSessionId = null;
1792        unacknowledgedStanzas = null;
1793    }
1794
1795    /**
1796     * Get the maximum resumption time in seconds after which a managed stream can be resumed.
1797     * <p>
1798     * This method will return {@link Integer#MAX_VALUE} if neither the client nor the server specify a maximum
1799     * resumption time. Be aware of integer overflows when using this value, e.g. do not add arbitrary values to it
1800     * without checking for overflows before.
1801     * </p>
1802     *
1803     * @return the maximum resumption time in seconds or {@link Integer#MAX_VALUE} if none set.
1804     */
1805    public int getMaxSmResumptionTime() {
1806        int clientResumptionTime = smClientMaxResumptionTime > 0 ? smClientMaxResumptionTime : Integer.MAX_VALUE;
1807        int serverResumptionTime = smServerMaxResumptionTime > 0 ? smServerMaxResumptionTime : Integer.MAX_VALUE;
1808        return Math.min(clientResumptionTime, serverResumptionTime);
1809    }
1810
1811    private void processHandledCount(long handledCount) throws StreamManagementCounterError {
1812        long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount);
1813        final List<Stanza> ackedStanzas = new ArrayList<>(
1814                        ackedStanzasCount <= Integer.MAX_VALUE ? (int) ackedStanzasCount
1815                                        : Integer.MAX_VALUE);
1816        for (long i = 0; i < ackedStanzasCount; i++) {
1817            Stanza ackedStanza = unacknowledgedStanzas.poll();
1818            // If the server ack'ed a stanza, then it must be in the
1819            // unacknowledged stanza queue. There can be no exception.
1820            if (ackedStanza == null) {
1821                throw new StreamManagementCounterError(handledCount, serverHandledStanzasCount,
1822                                ackedStanzasCount, ackedStanzas);
1823            }
1824            ackedStanzas.add(ackedStanza);
1825        }
1826
1827        boolean atLeastOneStanzaAcknowledgedListener = false;
1828        if (!stanzaAcknowledgedListeners.isEmpty()) {
1829            // If stanzaAcknowledgedListeners is not empty, the we have at least one
1830            atLeastOneStanzaAcknowledgedListener = true;
1831        }
1832        else {
1833            // Otherwise we look for a matching id in the stanza *id* acknowledged listeners
1834            for (Stanza ackedStanza : ackedStanzas) {
1835                String id = ackedStanza.getStanzaId();
1836                if (id != null && stanzaIdAcknowledgedListeners.containsKey(id)) {
1837                    atLeastOneStanzaAcknowledgedListener = true;
1838                    break;
1839                }
1840            }
1841        }
1842
1843        // Only spawn a new thread if there is a chance that some listener is invoked
1844        if (atLeastOneStanzaAcknowledgedListener) {
1845            asyncGo(new Runnable() {
1846                @Override
1847                public void run() {
1848                    for (Stanza ackedStanza : ackedStanzas) {
1849                        for (StanzaListener listener : stanzaAcknowledgedListeners) {
1850                            try {
1851                                listener.processStanza(ackedStanza);
1852                            }
1853                            catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
1854                                LOGGER.log(Level.FINER, "Received exception", e);
1855                            }
1856                        }
1857                        String id = ackedStanza.getStanzaId();
1858                        if (StringUtils.isNullOrEmpty(id)) {
1859                            continue;
1860                        }
1861                        StanzaListener listener = stanzaIdAcknowledgedListeners.remove(id);
1862                        if (listener != null) {
1863                            try {
1864                                listener.processStanza(ackedStanza);
1865                            }
1866                            catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
1867                                LOGGER.log(Level.FINER, "Received exception", e);
1868                            }
1869                        }
1870                    }
1871                }
1872            });
1873        }
1874
1875        serverHandledStanzasCount = handledCount;
1876    }
1877
1878    /**
1879     * Set the default bundle and defer callback used for new connections.
1880     *
1881     * @param defaultBundleAndDeferCallback TODO javadoc me please
1882     * @see BundleAndDeferCallback
1883     * @since 4.1
1884     */
1885    public static void setDefaultBundleAndDeferCallback(BundleAndDeferCallback defaultBundleAndDeferCallback) {
1886        XMPPTCPConnection.defaultBundleAndDeferCallback = defaultBundleAndDeferCallback;
1887    }
1888
1889    /**
1890     * Set the bundle and defer callback used for this connection.
1891     * <p>
1892     * You can use <code>null</code> as argument to reset the callback. Outgoing stanzas will then
1893     * no longer get deferred.
1894     * </p>
1895     *
1896     * @param bundleAndDeferCallback the callback or <code>null</code>.
1897     * @see BundleAndDeferCallback
1898     * @since 4.1
1899     */
1900    public void setBundleandDeferCallback(BundleAndDeferCallback bundleAndDeferCallback) {
1901        this.bundleAndDeferCallback = bundleAndDeferCallback;
1902    }
1903
1904}