001/**
002 *
003 * Copyright 2003-2007 Jive Software.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack.tcp;
018
019import java.io.BufferedReader;
020import java.io.ByteArrayInputStream;
021import java.io.FileInputStream;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.InputStreamReader;
025import java.io.OutputStream;
026import java.io.OutputStreamWriter;
027import java.io.Writer;
028import java.lang.reflect.Constructor;
029import java.net.InetAddress;
030import java.net.InetSocketAddress;
031import java.net.Socket;
032import java.security.KeyManagementException;
033import java.security.KeyStore;
034import java.security.KeyStoreException;
035import java.security.NoSuchAlgorithmException;
036import java.security.NoSuchProviderException;
037import java.security.Provider;
038import java.security.SecureRandom;
039import java.security.Security;
040import java.security.UnrecoverableKeyException;
041import java.security.cert.CertificateException;
042import java.util.ArrayList;
043import java.util.Collection;
044import java.util.Iterator;
045import java.util.LinkedHashSet;
046import java.util.LinkedList;
047import java.util.List;
048import java.util.Map;
049import java.util.Set;
050import java.util.concurrent.ArrayBlockingQueue;
051import java.util.concurrent.BlockingQueue;
052import java.util.concurrent.ConcurrentHashMap;
053import java.util.concurrent.ConcurrentLinkedQueue;
054import java.util.concurrent.TimeUnit;
055import java.util.concurrent.atomic.AtomicBoolean;
056import java.util.logging.Level;
057import java.util.logging.Logger;
058
059import javax.net.SocketFactory;
060import javax.net.ssl.HostnameVerifier;
061import javax.net.ssl.KeyManager;
062import javax.net.ssl.KeyManagerFactory;
063import javax.net.ssl.SSLContext;
064import javax.net.ssl.SSLSession;
065import javax.net.ssl.SSLSocket;
066import javax.net.ssl.TrustManager;
067import javax.net.ssl.X509TrustManager;
068import javax.security.auth.callback.Callback;
069import javax.security.auth.callback.CallbackHandler;
070import javax.security.auth.callback.PasswordCallback;
071
072import org.jivesoftware.smack.AbstractConnectionListener;
073import org.jivesoftware.smack.AbstractXMPPConnection;
074import org.jivesoftware.smack.ConnectionConfiguration;
075import org.jivesoftware.smack.ConnectionConfiguration.DnssecMode;
076import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
077import org.jivesoftware.smack.SmackConfiguration;
078import org.jivesoftware.smack.SmackException;
079import org.jivesoftware.smack.SmackException.AlreadyConnectedException;
080import org.jivesoftware.smack.SmackException.AlreadyLoggedInException;
081import org.jivesoftware.smack.SmackException.ConnectionException;
082import org.jivesoftware.smack.SmackException.NoResponseException;
083import org.jivesoftware.smack.SmackException.NotConnectedException;
084import org.jivesoftware.smack.SmackException.NotLoggedInException;
085import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException;
086import org.jivesoftware.smack.StanzaListener;
087import org.jivesoftware.smack.SynchronizationPoint;
088import org.jivesoftware.smack.XMPPConnection;
089import org.jivesoftware.smack.XMPPException;
090import org.jivesoftware.smack.XMPPException.FailedNonzaException;
091import org.jivesoftware.smack.XMPPException.StreamErrorException;
092import org.jivesoftware.smack.compress.packet.Compress;
093import org.jivesoftware.smack.compress.packet.Compressed;
094import org.jivesoftware.smack.compression.XMPPInputOutputStream;
095import org.jivesoftware.smack.filter.StanzaFilter;
096import org.jivesoftware.smack.packet.Element;
097import org.jivesoftware.smack.packet.IQ;
098import org.jivesoftware.smack.packet.Message;
099import org.jivesoftware.smack.packet.Nonza;
100import org.jivesoftware.smack.packet.Presence;
101import org.jivesoftware.smack.packet.Stanza;
102import org.jivesoftware.smack.packet.StartTls;
103import org.jivesoftware.smack.packet.StreamError;
104import org.jivesoftware.smack.packet.StreamOpen;
105import org.jivesoftware.smack.proxy.ProxyInfo;
106import org.jivesoftware.smack.sasl.packet.SaslStreamElements;
107import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Challenge;
108import org.jivesoftware.smack.sasl.packet.SaslStreamElements.SASLFailure;
109import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Success;
110import org.jivesoftware.smack.sm.SMUtils;
111import org.jivesoftware.smack.sm.StreamManagementException;
112import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException;
113import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError;
114import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException;
115import org.jivesoftware.smack.sm.packet.StreamManagement;
116import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer;
117import org.jivesoftware.smack.sm.packet.StreamManagement.AckRequest;
118import org.jivesoftware.smack.sm.packet.StreamManagement.Enable;
119import org.jivesoftware.smack.sm.packet.StreamManagement.Enabled;
120import org.jivesoftware.smack.sm.packet.StreamManagement.Failed;
121import org.jivesoftware.smack.sm.packet.StreamManagement.Resume;
122import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed;
123import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature;
124import org.jivesoftware.smack.sm.predicates.Predicate;
125import org.jivesoftware.smack.sm.provider.ParseStreamManagement;
126import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
127import org.jivesoftware.smack.util.Async;
128import org.jivesoftware.smack.util.DNSUtil;
129import org.jivesoftware.smack.util.PacketParserUtils;
130import org.jivesoftware.smack.util.StringUtils;
131import org.jivesoftware.smack.util.TLSUtils;
132import org.jivesoftware.smack.util.XmlStringBuilder;
133import org.jivesoftware.smack.util.dns.HostAddress;
134import org.jivesoftware.smack.util.dns.SmackDaneProvider;
135import org.jivesoftware.smack.util.dns.SmackDaneVerifier;
136
137import org.jxmpp.jid.impl.JidCreate;
138import org.jxmpp.jid.parts.Resourcepart;
139import org.jxmpp.stringprep.XmppStringprepException;
140import org.jxmpp.util.XmppStringUtils;
141import org.xmlpull.v1.XmlPullParser;
142import org.xmlpull.v1.XmlPullParserException;
143
144/**
145 * Creates a socket connection to an XMPP server. This is the default connection
146 * to an XMPP server and is specified in the XMPP Core (RFC 6120).
147 *
148 * @see XMPPConnection
149 * @author Matt Tucker
150 */
151public class XMPPTCPConnection extends AbstractXMPPConnection {
152
153    private static final int QUEUE_SIZE = 500;
154    private static final Logger LOGGER = Logger.getLogger(XMPPTCPConnection.class.getName());
155
156    /**
157     * The socket which is used for this connection.
158     */
159    private Socket socket;
160
161    /**
162     *
163     */
164    private boolean disconnectedButResumeable = false;
165
166    private SSLSocket secureSocket;
167
168    /**
169     * Protected access level because of unit test purposes
170     */
171    protected PacketWriter packetWriter;
172
173    /**
174     * Protected access level because of unit test purposes
175     */
176    protected PacketReader packetReader;
177
178    private final SynchronizationPoint<Exception> initialOpenStreamSend = new SynchronizationPoint<>(
179                    this, "initial open stream element send to server");
180
181    /**
182     *
183     */
184    private final SynchronizationPoint<XMPPException> maybeCompressFeaturesReceived = new SynchronizationPoint<XMPPException>(
185                    this, "stream compression feature");
186
187    /**
188     *
189     */
190    private final SynchronizationPoint<SmackException> compressSyncPoint = new SynchronizationPoint<>(
191                    this, "stream compression");
192
193    /**
194     * A synchronization point which is successful if this connection has received the closing
195     * stream element from the remote end-point, i.e. the server.
196     */
197    private final SynchronizationPoint<Exception> closingStreamReceived = new SynchronizationPoint<>(
198                    this, "stream closing element received");
199
200    /**
201     * The default bundle and defer callback, used for new connections.
202     * @see bundleAndDeferCallback
203     */
204    private static BundleAndDeferCallback defaultBundleAndDeferCallback;
205
206    /**
207     * The used bundle and defer callback.
208     * <p>
209     * Although this field may be set concurrently, the 'volatile' keyword was deliberately not added, in order to avoid
210     * having a 'volatile' read within the writer threads loop.
211     * </p>
212     */
213    private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback;
214
215    private static boolean useSmDefault = true;
216
217    private static boolean useSmResumptionDefault = true;
218
219    /**
220     * The stream ID of the stream that is currently resumable, ie. the stream we hold the state
221     * for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and
222     * {@link #unacknowledgedStanzas}.
223     */
224    private String smSessionId;
225
226    private final SynchronizationPoint<FailedNonzaException> smResumedSyncPoint = new SynchronizationPoint<>(
227                    this, "stream resumed element");
228
229    private final SynchronizationPoint<SmackException> smEnabledSyncPoint = new SynchronizationPoint<>(
230                    this, "stream enabled element");
231
232    /**
233     * The client's preferred maximum resumption time in seconds.
234     */
235    private int smClientMaxResumptionTime = -1;
236
237    /**
238     * The server's preferred maximum resumption time in seconds.
239     */
240    private int smServerMaxResumptionTime = -1;
241
242    /**
243     * Indicates whether Stream Management (XEP-198) should be used if it's supported by the server.
244     */
245    private boolean useSm = useSmDefault;
246    private boolean useSmResumption = useSmResumptionDefault;
247
248    /**
249     * The counter that the server sends the client about it's current height. For example, if the server sends
250     * {@code <a h='42'/>}, then this will be set to 42 (while also handling the {@link #unacknowledgedStanzas} queue).
251     */
252    private long serverHandledStanzasCount = 0;
253
254    /**
255     * The counter for stanzas handled ("received") by the client.
256     * <p>
257     * Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are
258     * not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since
259     * {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and
260     * therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic.
261     * </p>
262     */
263    private long clientHandledStanzasCount = 0;
264
265    private BlockingQueue<Stanza> unacknowledgedStanzas;
266
267    /**
268     * Set to true if Stream Management was at least once enabled for this connection.
269     */
270    private boolean smWasEnabledAtLeastOnce = false;
271
272    /**
273     * This listeners are invoked for every stanza that got acknowledged.
274     * <p>
275     * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove
276     * themselves after they have been invoked.
277     * </p>
278     */
279    private final Collection<StanzaListener> stanzaAcknowledgedListeners = new ConcurrentLinkedQueue<>();
280
281    /**
282     * This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will
283     * only be invoked once and automatically removed after that.
284     */
285    private final Map<String, StanzaListener> stanzaIdAcknowledgedListeners = new ConcurrentHashMap<>();
286
287    /**
288     * Predicates that determine if an stream management ack should be requested from the server.
289     * <p>
290     * We use a linked hash set here, so that the order how the predicates are added matches the
291     * order in which they are invoked in order to determine if an ack request should be send or not.
292     * </p>
293     */
294    private final Set<StanzaFilter> requestAckPredicates = new LinkedHashSet<>();
295
296    @SuppressWarnings("HidingField")
297    private final XMPPTCPConnectionConfiguration config;
298
299    /**
300     * Creates a new XMPP connection over TCP (optionally using proxies).
301     * <p>
302     * Note that XMPPTCPConnection constructors do not establish a connection to the server
303     * and you must call {@link #connect()}.
304     * </p>
305     *
306     * @param config the connection configuration.
307     */
308    public XMPPTCPConnection(XMPPTCPConnectionConfiguration config) {
309        super(config);
310        this.config = config;
311        addConnectionListener(new AbstractConnectionListener() {
312            @Override
313            public void connectionClosedOnError(Exception e) {
314                if (e instanceof XMPPException.StreamErrorException || e instanceof StreamManagementException) {
315                    dropSmState();
316                }
317            }
318        });
319    }
320
321    /**
322     * Creates a new XMPP connection over TCP.
323     * <p>
324     * Note that {@code jid} must be the bare JID, e.g. "user@example.org". More fine-grained control over the
325     * connection settings is available using the {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)}
326     * constructor.
327     * </p>
328     *
329     * @param jid the bare JID used by the client.
330     * @param password the password or authentication token.
331     * @throws XmppStringprepException
332     */
333    public XMPPTCPConnection(CharSequence jid, String password) throws XmppStringprepException {
334        this(XmppStringUtils.parseLocalpart(jid.toString()), password, XmppStringUtils.parseDomain(jid.toString()));
335    }
336
337    /**
338     * Creates a new XMPP connection over TCP.
339     * <p>
340     * This is the simplest constructor for connecting to an XMPP server. Alternatively,
341     * you can get fine-grained control over connection settings using the
342     * {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} constructor.
343     * </p>
344     * @param username
345     * @param password
346     * @param serviceName
347     * @throws XmppStringprepException
348     */
349    public XMPPTCPConnection(CharSequence username, String password, String serviceName) throws XmppStringprepException {
350        this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setXmppDomain(
351                                        JidCreate.domainBareFrom(serviceName)).build());
352    }
353
354    @Override
355    protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException {
356        if (packetWriter == null) {
357            throw new NotConnectedException();
358        }
359        packetWriter.throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
360    }
361
362    @Override
363    protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException {
364        if (isConnected() && !disconnectedButResumeable) {
365            throw new AlreadyConnectedException();
366        }
367    }
368
369    @Override
370    protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException {
371        if (isAuthenticated() && !disconnectedButResumeable) {
372            throw new AlreadyLoggedInException();
373        }
374    }
375
376    @Override
377    protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException {
378        // Reset the flag in case it was set
379        disconnectedButResumeable = false;
380        super.afterSuccessfulLogin(resumed);
381    }
382
383    @Override
384    protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException,
385                    SmackException, IOException, InterruptedException {
386        // Authenticate using SASL
387        SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null;
388        saslAuthentication.authenticate(username, password, config.getAuthzid(), sslSession);
389
390        // If compression is enabled then request the server to use stream compression. XEP-170
391        // recommends to perform stream compression before resource binding.
392        maybeEnableCompression();
393
394        if (isSmResumptionPossible()) {
395            smResumedSyncPoint.sendAndWaitForResponse(new Resume(clientHandledStanzasCount, smSessionId));
396            if (smResumedSyncPoint.wasSuccessful()) {
397                // We successfully resumed the stream, be done here
398                afterSuccessfulLogin(true);
399                return;
400            }
401            // SM resumption failed, what Smack does here is to report success of
402            // lastFeaturesReceived in case of sm resumption was answered with 'failed' so that
403            // normal resource binding can be tried.
404            LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process");
405        }
406
407        List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>();
408        if (unacknowledgedStanzas != null) {
409            // There was a previous connection with SM enabled but that was either not resumable or
410            // failed to resume. Make sure that we (re-)send the unacknowledged stanzas.
411            unacknowledgedStanzas.drainTo(previouslyUnackedStanzas);
412            // Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this
413            // XMPP session (There maybe was an enabled in a previous XMPP session of this
414            // connection instance though). This is used in writePackets to decide if stanzas should
415            // be added to the unacknowledged stanzas queue, because they have to be added right
416            // after the 'enable' stream element has been sent.
417            dropSmState();
418        }
419
420        // Now bind the resource. It is important to do this *after* we dropped an eventually
421        // existing Stream Management state. As otherwise <bind/> and <session/> may end up in
422        // unacknowledgedStanzas and become duplicated on reconnect. See SMACK-706.
423        bindResourceAndEstablishSession(resource);
424
425        if (isSmAvailable() && useSm) {
426            // Remove what is maybe left from previously stream managed sessions
427            serverHandledStanzasCount = 0;
428            // XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed'
429            // then this is a non recoverable error and we therefore throw an exception.
430            smEnabledSyncPoint.sendAndWaitForResponseOrThrow(new Enable(useSmResumption, smClientMaxResumptionTime));
431            synchronized (requestAckPredicates) {
432                if (requestAckPredicates.isEmpty()) {
433                    // Assure that we have at lest one predicate set up that so that we request acks
434                    // for the server and eventually flush some stanzas from the unacknowledged
435                    // stanza queue
436                    requestAckPredicates.add(Predicate.forMessagesOrAfter5Stanzas());
437                }
438            }
439        }
440        // (Re-)send the stanzas *after* we tried to enable SM
441        for (Stanza stanza : previouslyUnackedStanzas) {
442            sendStanzaInternal(stanza);
443        }
444
445        afterSuccessfulLogin(false);
446    }
447
448    @Override
449    public boolean isSecureConnection() {
450        return secureSocket != null;
451    }
452
453    /**
454     * Shuts the current connection down. After this method returns, the connection must be ready
455     * for re-use by connect.
456     */
457    @Override
458    protected void shutdown() {
459        if (isSmEnabled()) {
460            try {
461                // Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM
462                // state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either.
463                sendSmAcknowledgementInternal();
464            } catch (InterruptedException | NotConnectedException e) {
465                LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e);
466            }
467        }
468        shutdown(false);
469    }
470
471    /**
472     * Performs an unclean disconnect and shutdown of the connection. Does not send a closing stream stanza.
473     */
474    public synchronized void instantShutdown() {
475        shutdown(true);
476    }
477
478    private void shutdown(boolean instant) {
479        if (disconnectedButResumeable) {
480            return;
481        }
482
483        // First shutdown the writer, this will result in a closing stream element getting send to
484        // the server
485        if (packetWriter != null) {
486            LOGGER.finer("PacketWriter shutdown()");
487            packetWriter.shutdown(instant);
488        }
489        LOGGER.finer("PacketWriter has been shut down");
490
491        if (!instant) {
492            try {
493                // After we send the closing stream element, check if there was already a
494                // closing stream element sent by the server or wait with a timeout for a
495                // closing stream element to be received from the server.
496                @SuppressWarnings("unused")
497                Exception res = closingStreamReceived.checkIfSuccessOrWait();
498            } catch (InterruptedException | NoResponseException e) {
499                LOGGER.log(Level.INFO, "Exception while waiting for closing stream element from the server " + this, e);
500            }
501        }
502
503        if (packetReader != null) {
504            LOGGER.finer("PacketReader shutdown()");
505                packetReader.shutdown();
506        }
507        LOGGER.finer("PacketReader has been shut down");
508
509        try {
510                socket.close();
511        } catch (Exception e) {
512                LOGGER.log(Level.WARNING, "shutdown", e);
513        }
514
515        setWasAuthenticated();
516        // If we are able to resume the stream, then don't set
517        // connected/authenticated/usingTLS to false since we like behave like we are still
518        // connected (e.g. sendStanza should not throw a NotConnectedException).
519        if (isSmResumptionPossible() && instant) {
520            disconnectedButResumeable = true;
521        } else {
522            disconnectedButResumeable = false;
523            // Reset the stream management session id to null, since if the stream is cleanly closed, i.e. sending a closing
524            // stream tag, there is no longer a stream to resume.
525            smSessionId = null;
526        }
527        authenticated = false;
528        connected = false;
529        secureSocket = null;
530        reader = null;
531        writer = null;
532
533        maybeCompressFeaturesReceived.init();
534        compressSyncPoint.init();
535        smResumedSyncPoint.init();
536        smEnabledSyncPoint.init();
537        initialOpenStreamSend.init();
538    }
539
540    @Override
541    public void sendNonza(Nonza element) throws NotConnectedException, InterruptedException {
542        packetWriter.sendStreamElement(element);
543    }
544
545    @Override
546    protected void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException {
547        packetWriter.sendStreamElement(packet);
548        if (isSmEnabled()) {
549            for (StanzaFilter requestAckPredicate : requestAckPredicates) {
550                if (requestAckPredicate.accept(packet)) {
551                    requestSmAcknowledgementInternal();
552                    break;
553                }
554            }
555        }
556    }
557
558    private void connectUsingConfiguration() throws ConnectionException, IOException {
559        List<HostAddress> failedAddresses = populateHostAddresses();
560        SocketFactory socketFactory = config.getSocketFactory();
561        ProxyInfo proxyInfo = config.getProxyInfo();
562        int timeout = config.getConnectTimeout();
563        if (socketFactory == null) {
564            socketFactory = SocketFactory.getDefault();
565        }
566        for (HostAddress hostAddress : hostAddresses) {
567            Iterator<InetAddress> inetAddresses;
568            String host = hostAddress.getHost();
569            int port = hostAddress.getPort();
570            if (proxyInfo == null) {
571                inetAddresses = hostAddress.getInetAddresses().iterator();
572                assert (inetAddresses.hasNext());
573
574                innerloop: while (inetAddresses.hasNext()) {
575                    // Create a *new* Socket before every connection attempt, i.e. connect() call, since Sockets are not
576                    // re-usable after a failed connection attempt. See also SMACK-724.
577                    socket = socketFactory.createSocket();
578
579                    final InetAddress inetAddress = inetAddresses.next();
580                    final String inetAddressAndPort = inetAddress + " at port " + port;
581                    LOGGER.finer("Trying to establish TCP connection to " + inetAddressAndPort);
582                    try {
583                        socket.connect(new InetSocketAddress(inetAddress, port), timeout);
584                    } catch (Exception e) {
585                        hostAddress.setException(inetAddress, e);
586                        if (inetAddresses.hasNext()) {
587                            continue innerloop;
588                        } else {
589                            break innerloop;
590                        }
591                    }
592                    LOGGER.finer("Established TCP connection to " + inetAddressAndPort);
593                    // We found a host to connect to, return here
594                    this.host = host;
595                    this.port = port;
596                    return;
597                }
598                failedAddresses.add(hostAddress);
599            } else {
600                socket = socketFactory.createSocket();
601                StringUtils.requireNotNullOrEmpty(host, "Host of HostAddress " + hostAddress + " must not be null when using a Proxy");
602                final String hostAndPort = host + " at port " + port;
603                LOGGER.finer("Trying to establish TCP connection via Proxy to " + hostAndPort);
604                try {
605                    proxyInfo.getProxySocketConnection().connect(socket, host, port, timeout);
606                } catch (IOException e) {
607                    hostAddress.setException(e);
608                    continue;
609                }
610                LOGGER.finer("Established TCP connection to " + hostAndPort);
611                // We found a host to connect to, return here
612                this.host = host;
613                this.port = port;
614                return;
615            }
616        }
617        // There are no more host addresses to try
618        // throw an exception and report all tried
619        // HostAddresses in the exception
620        throw ConnectionException.from(failedAddresses);
621    }
622
623    /**
624     * Initializes the connection by creating a stanza reader and writer and opening a
625     * XMPP stream to the server.
626     *
627     * @throws XMPPException if establishing a connection to the server fails.
628     * @throws SmackException if the server fails to respond back or if there is anther error.
629     * @throws IOException
630     */
631    private void initConnection() throws IOException {
632        boolean isFirstInitialization = packetReader == null || packetWriter == null;
633        compressionHandler = null;
634
635        // Set the reader and writer instance variables
636        initReaderAndWriter();
637
638        if (isFirstInitialization) {
639            packetWriter = new PacketWriter();
640            packetReader = new PacketReader();
641        }
642        // Start the writer thread. This will open an XMPP stream to the server
643        packetWriter.init();
644        // Start the reader thread. The startup() method will block until we
645        // get an opening stream packet back from server
646        packetReader.init();
647    }
648
649    private void initReaderAndWriter() throws IOException {
650        InputStream is = socket.getInputStream();
651        OutputStream os = socket.getOutputStream();
652        if (compressionHandler != null) {
653            is = compressionHandler.getInputStream(is);
654            os = compressionHandler.getOutputStream(os);
655        }
656        // OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter
657        writer = new OutputStreamWriter(os, "UTF-8");
658        reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
659
660        // If debugging is enabled, we open a window and write out all network traffic.
661        initDebugger();
662    }
663
664    /**
665     * The server has indicated that TLS negotiation can start. We now need to secure the
666     * existing plain connection and perform a handshake. This method won't return until the
667     * connection has finished the handshake or an error occurred while securing the connection.
668     * @throws IOException
669     * @throws CertificateException
670     * @throws NoSuchAlgorithmException
671     * @throws NoSuchProviderException
672     * @throws KeyStoreException
673     * @throws UnrecoverableKeyException
674     * @throws KeyManagementException
675     * @throws SmackException
676     * @throws Exception if an exception occurs.
677     */
678    @SuppressWarnings("LiteralClassName")
679    private void proceedTLSReceived() throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException, NoSuchProviderException, UnrecoverableKeyException, KeyManagementException, SmackException {
680        SmackDaneVerifier daneVerifier = null;
681
682        if (config.getDnssecMode() == DnssecMode.needsDnssecAndDane) {
683            SmackDaneProvider daneProvider = DNSUtil.getDaneProvider();
684            if (daneProvider == null) {
685                throw new UnsupportedOperationException("DANE enabled but no SmackDaneProvider configured");
686            }
687            daneVerifier = daneProvider.newInstance();
688            if (daneVerifier == null) {
689                throw new IllegalStateException("DANE requested but DANE provider did not return a DANE verifier");
690            }
691        }
692
693        SSLContext context = this.config.getCustomSSLContext();
694        KeyStore ks = null;
695        PasswordCallback pcb = null;
696
697        if (context == null) {
698            final String keyStoreType = config.getKeystoreType();
699            final CallbackHandler callbackHandler = config.getCallbackHandler();
700            final String keystorePath = config.getKeystorePath();
701            if ("PKCS11".equals(keyStoreType)) {
702                try {
703                    Constructor<?> c = Class.forName("sun.security.pkcs11.SunPKCS11").getConstructor(InputStream.class);
704                    String pkcs11Config = "name = SmartCard\nlibrary = " + config.getPKCS11Library();
705                    ByteArrayInputStream config = new ByteArrayInputStream(pkcs11Config.getBytes(StringUtils.UTF8));
706                    Provider p = (Provider) c.newInstance(config);
707                    Security.addProvider(p);
708                    ks = KeyStore.getInstance("PKCS11",p);
709                    pcb = new PasswordCallback("PKCS11 Password: ",false);
710                    callbackHandler.handle(new Callback[] {pcb});
711                    ks.load(null,pcb.getPassword());
712                }
713                catch (Exception e) {
714                    LOGGER.log(Level.WARNING, "Exception", e);
715                    ks = null;
716                }
717            }
718            else if ("Apple".equals(keyStoreType)) {
719                ks = KeyStore.getInstance("KeychainStore","Apple");
720                ks.load(null,null);
721                // pcb = new PasswordCallback("Apple Keychain",false);
722                // pcb.setPassword(null);
723            }
724            else if (keyStoreType != null) {
725                ks = KeyStore.getInstance(keyStoreType);
726                if (callbackHandler != null && StringUtils.isNotEmpty(keystorePath)) {
727                    try {
728                        pcb = new PasswordCallback("Keystore Password: ", false);
729                        callbackHandler.handle(new Callback[] { pcb });
730                        ks.load(new FileInputStream(keystorePath), pcb.getPassword());
731                    }
732                    catch (Exception e) {
733                        LOGGER.log(Level.WARNING, "Exception", e);
734                        ks = null;
735                    }
736                } else {
737                    ks.load(null, null);
738                }
739            }
740
741            KeyManager[] kms = null;
742
743            if (ks != null) {
744                String keyManagerFactoryAlgorithm = KeyManagerFactory.getDefaultAlgorithm();
745                KeyManagerFactory kmf = null;
746                try {
747                    kmf = KeyManagerFactory.getInstance(keyManagerFactoryAlgorithm);
748                }
749                catch (NoSuchAlgorithmException e) {
750                    LOGGER.log(Level.FINE, "Could get the default KeyManagerFactory for the '"
751                                    + keyManagerFactoryAlgorithm + "' algorithm", e);
752                }
753                if (kmf != null) {
754                    try {
755                        if (pcb == null) {
756                            kmf.init(ks, null);
757                        }
758                        else {
759                            kmf.init(ks, pcb.getPassword());
760                            pcb.clearPassword();
761                        }
762                        kms = kmf.getKeyManagers();
763                    }
764                    catch (NullPointerException npe) {
765                        LOGGER.log(Level.WARNING, "NullPointerException", npe);
766                    }
767                }
768            }
769
770            // If the user didn't specify a SSLContext, use the default one
771            context = SSLContext.getInstance("TLS");
772
773            final SecureRandom secureRandom = new java.security.SecureRandom();
774            X509TrustManager customTrustManager = config.getCustomX509TrustManager();
775
776            if (daneVerifier != null) {
777                // User requested DANE verification.
778                daneVerifier.init(context, kms, customTrustManager, secureRandom);
779            } else {
780                TrustManager[] customTrustManagers = null;
781                if (customTrustManager != null) {
782                    customTrustManagers = new TrustManager[] { customTrustManager };
783                }
784                context.init(kms, customTrustManagers, secureRandom);
785            }
786        }
787
788        Socket plain = socket;
789        // Secure the plain connection
790        socket = context.getSocketFactory().createSocket(plain,
791                config.getXMPPServiceDomain().toString(), plain.getPort(), true);
792
793        final SSLSocket sslSocket = (SSLSocket) socket;
794        // Immediately set the enabled SSL protocols and ciphers. See SMACK-712 why this is
795        // important (at least on certain platforms) and it seems to be a good idea anyways to
796        // prevent an accidental implicit handshake.
797        TLSUtils.setEnabledProtocolsAndCiphers(sslSocket, config.getEnabledSSLProtocols(), config.getEnabledSSLCiphers());
798
799        // Initialize the reader and writer with the new secured version
800        initReaderAndWriter();
801
802        // Proceed to do the handshake
803        sslSocket.startHandshake();
804
805        if (daneVerifier != null) {
806            daneVerifier.finish(sslSocket);
807        }
808
809        final HostnameVerifier verifier = getConfiguration().getHostnameVerifier();
810        if (verifier == null) {
811                throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure.");
812        } else if (!verifier.verify(getXMPPServiceDomain().toString(), sslSocket.getSession())) {
813            throw new CertificateException("Hostname verification of certificate failed. Certificate does not authenticate " + getXMPPServiceDomain());
814        }
815
816        // Set that TLS was successful
817        secureSocket = sslSocket;
818    }
819
820    /**
821     * Returns the compression handler that can be used for one compression methods offered by the server.
822     *
823     * @return a instance of XMPPInputOutputStream or null if no suitable instance was found
824     *
825     */
826    private static XMPPInputOutputStream maybeGetCompressionHandler(Compress.Feature compression) {
827        for (XMPPInputOutputStream handler : SmackConfiguration.getCompressionHandlers()) {
828                String method = handler.getCompressionMethod();
829                if (compression.getMethods().contains(method))
830                    return handler;
831        }
832        return null;
833    }
834
835    @Override
836    public boolean isUsingCompression() {
837        return compressionHandler != null && compressSyncPoint.wasSuccessful();
838    }
839
840    /**
841     * <p>
842     * Starts using stream compression that will compress network traffic. Traffic can be
843     * reduced up to 90%. Therefore, stream compression is ideal when using a slow speed network
844     * connection. However, the server and the client will need to use more CPU time in order to
845     * un/compress network data so under high load the server performance might be affected.
846     * </p>
847     * <p>
848     * Stream compression has to have been previously offered by the server. Currently only the
849     * zlib method is supported by the client. Stream compression negotiation has to be done
850     * before authentication took place.
851     * </p>
852     *
853     * @throws NotConnectedException
854     * @throws SmackException
855     * @throws NoResponseException
856     * @throws InterruptedException
857     */
858    private void maybeEnableCompression() throws SmackException, InterruptedException {
859        if (!config.isCompressionEnabled()) {
860            return;
861        }
862        maybeCompressFeaturesReceived.checkIfSuccessOrWait();
863        Compress.Feature compression = getFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE);
864        if (compression == null) {
865            // Server does not support compression
866            return;
867        }
868        // If stream compression was offered by the server and we want to use
869        // compression then send compression request to the server
870        if ((compressionHandler = maybeGetCompressionHandler(compression)) != null) {
871            compressSyncPoint.sendAndWaitForResponseOrThrow(new Compress(compressionHandler.getCompressionMethod()));
872        } else {
873            LOGGER.warning("Could not enable compression because no matching handler/method pair was found");
874        }
875    }
876
877    /**
878     * Establishes a connection to the XMPP server. It basically
879     * creates and maintains a socket connection to the server.
880     * <p>
881     * Listeners will be preserved from a previous connection if the reconnection
882     * occurs after an abrupt termination.
883     * </p>
884     *
885     * @throws XMPPException if an error occurs while trying to establish the connection.
886     * @throws SmackException
887     * @throws IOException
888     * @throws InterruptedException
889     */
890    @Override
891    protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException {
892        closingStreamReceived.init();
893        // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if
894        // there is an error establishing the connection
895        connectUsingConfiguration();
896
897        // We connected successfully to the servers TCP port
898        initConnection();
899
900        // TLS handled will be successful either if TLS was established, or if it was not mandatory.
901        tlsHandled.checkIfSuccessOrWaitOrThrow();
902
903        // Wait with SASL auth until the SASL mechanisms have been received
904        saslFeatureReceived.checkIfSuccessOrWaitOrThrow();
905    }
906
907    /**
908     * Sends out a notification that there was an error with the connection
909     * and closes the connection. Also prints the stack trace of the given exception
910     *
911     * @param e the exception that causes the connection close event.
912     */
913    private synchronized void notifyConnectionError(Exception e) {
914        // Listeners were already notified of the exception, return right here.
915        if ((packetReader == null || packetReader.done) &&
916                (packetWriter == null || packetWriter.done())) return;
917
918        // Closes the connection temporary. A reconnection is possible
919        // Note that a connection listener of XMPPTCPConnection will drop the SM state in
920        // case the Exception is a StreamErrorException.
921        instantShutdown();
922
923        // Notify connection listeners of the error.
924        callConnectionClosedOnErrorListener(e);
925    }
926
927    /**
928     * For unit testing purposes
929     *
930     * @param writer
931     */
932    protected void setWriter(Writer writer) {
933        this.writer = writer;
934    }
935
936    @Override
937    protected void afterFeaturesReceived() throws NotConnectedException, InterruptedException {
938        StartTls startTlsFeature = getFeature(StartTls.ELEMENT, StartTls.NAMESPACE);
939        if (startTlsFeature != null) {
940            if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) {
941                SmackException smackException = new SecurityRequiredByServerException();
942                tlsHandled.reportFailure(smackException);
943                notifyConnectionError(smackException);
944                return;
945            }
946
947            if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) {
948                sendNonza(new StartTls());
949            } else {
950                tlsHandled.reportSuccess();
951            }
952        } else {
953            tlsHandled.reportSuccess();
954        }
955
956        if (getSASLAuthentication().authenticationSuccessful()) {
957            // If we have received features after the SASL has been successfully completed, then we
958            // have also *maybe* received, as it is an optional feature, the compression feature
959            // from the server.
960            maybeCompressFeaturesReceived.reportSuccess();
961        }
962    }
963
964    /**
965     * Resets the parser using the latest connection's reader. Resetting the parser is necessary
966     * when the plain connection has been secured or when a new opening stream element is going
967     * to be sent by the server.
968     *
969     * @throws SmackException if the parser could not be reset.
970     * @throws InterruptedException
971     */
972    void openStream() throws SmackException, InterruptedException {
973        // If possible, provide the receiving entity of the stream open tag, i.e. the server, as much information as
974        // possible. The 'to' attribute is *always* available. The 'from' attribute if set by the user and no external
975        // mechanism is used to determine the local entity (user). And the 'id' attribute is available after the first
976        // response from the server (see e.g. RFC 6120 ยง 9.1.1 Step 2.)
977        CharSequence to = getXMPPServiceDomain();
978        CharSequence from = null;
979        CharSequence localpart = config.getUsername();
980        if (localpart != null) {
981            from = XmppStringUtils.completeJidFrom(localpart, to);
982        }
983        String id = getStreamId();
984        sendNonza(new StreamOpen(to, from, id));
985        try {
986            packetReader.parser = PacketParserUtils.newXmppParser(reader);
987        }
988        catch (XmlPullParserException e) {
989            throw new SmackException(e);
990        }
991    }
992
993    protected class PacketReader {
994
995        XmlPullParser parser;
996
997        private volatile boolean done;
998
999        /**
1000         * Initializes the reader in order to be used. The reader is initialized during the
1001         * first connection and when reconnecting due to an abruptly disconnection.
1002         */
1003        void init() {
1004            done = false;
1005
1006            Async.go(new Runnable() {
1007                @Override
1008                public void run() {
1009                    parsePackets();
1010                }
1011            }, "Smack Reader (" + getConnectionCounter() + ")");
1012         }
1013
1014        /**
1015         * Shuts the stanza reader down. This method simply sets the 'done' flag to true.
1016         */
1017        void shutdown() {
1018            done = true;
1019        }
1020
1021        /**
1022         * Parse top-level packets in order to process them further.
1023         */
1024        private void parsePackets() {
1025            try {
1026                initialOpenStreamSend.checkIfSuccessOrWait();
1027                int eventType = parser.getEventType();
1028                while (!done) {
1029                    switch (eventType) {
1030                    case XmlPullParser.START_TAG:
1031                        final String name = parser.getName();
1032                        switch (name) {
1033                        case Message.ELEMENT:
1034                        case IQ.IQ_ELEMENT:
1035                        case Presence.ELEMENT:
1036                            try {
1037                                parseAndProcessStanza(parser);
1038                            } finally {
1039                                clientHandledStanzasCount = SMUtils.incrementHeight(clientHandledStanzasCount);
1040                            }
1041                            break;
1042                        case "stream":
1043                            // We found an opening stream.
1044                            if ("jabber:client".equals(parser.getNamespace(null))) {
1045                                streamId = parser.getAttributeValue("", "id");
1046                                String reportedServerDomain = parser.getAttributeValue("", "from");
1047                                assert (config.getXMPPServiceDomain().equals(reportedServerDomain));
1048                            }
1049                            break;
1050                        case "error":
1051                            StreamError streamError = PacketParserUtils.parseStreamError(parser);
1052                            saslFeatureReceived.reportFailure(new StreamErrorException(streamError));
1053                            // Mark the tlsHandled sync point as success, we will use the saslFeatureReceived sync
1054                            // point to report the error, which is checked immediately after tlsHandled in
1055                            // connectInternal().
1056                            tlsHandled.reportSuccess();
1057                            throw new StreamErrorException(streamError);
1058                        case "features":
1059                            parseFeatures(parser);
1060                            break;
1061                        case "proceed":
1062                            try {
1063                                // Secure the connection by negotiating TLS
1064                                proceedTLSReceived();
1065                                // Send a new opening stream to the server
1066                                openStream();
1067                            }
1068                            catch (Exception e) {
1069                                SmackException smackException = new SmackException(e);
1070                                tlsHandled.reportFailure(smackException);
1071                                throw e;
1072                            }
1073                            break;
1074                        case "failure":
1075                            String namespace = parser.getNamespace(null);
1076                            switch (namespace) {
1077                            case "urn:ietf:params:xml:ns:xmpp-tls":
1078                                // TLS negotiation has failed. The server will close the connection
1079                                // TODO Parse failure stanza
1080                                throw new SmackException("TLS negotiation has failed");
1081                            case "http://jabber.org/protocol/compress":
1082                                // Stream compression has been denied. This is a recoverable
1083                                // situation. It is still possible to authenticate and
1084                                // use the connection but using an uncompressed connection
1085                                // TODO Parse failure stanza
1086                                compressSyncPoint.reportFailure(new SmackException(
1087                                                "Could not establish compression"));
1088                                break;
1089                            case SaslStreamElements.NAMESPACE:
1090                                // SASL authentication has failed. The server may close the connection
1091                                // depending on the number of retries
1092                                final SASLFailure failure = PacketParserUtils.parseSASLFailure(parser);
1093                                getSASLAuthentication().authenticationFailed(failure);
1094                                break;
1095                            }
1096                            break;
1097                        case Challenge.ELEMENT:
1098                            // The server is challenging the SASL authentication made by the client
1099                            String challengeData = parser.nextText();
1100                            getSASLAuthentication().challengeReceived(challengeData);
1101                            break;
1102                        case Success.ELEMENT:
1103                            Success success = new Success(parser.nextText());
1104                            // We now need to bind a resource for the connection
1105                            // Open a new stream and wait for the response
1106                            openStream();
1107                            // The SASL authentication with the server was successful. The next step
1108                            // will be to bind the resource
1109                            getSASLAuthentication().authenticated(success);
1110                            break;
1111                        case Compressed.ELEMENT:
1112                            // Server confirmed that it's possible to use stream compression. Start
1113                            // stream compression
1114                            // Initialize the reader and writer with the new compressed version
1115                            initReaderAndWriter();
1116                            // Send a new opening stream to the server
1117                            openStream();
1118                            // Notify that compression is being used
1119                            compressSyncPoint.reportSuccess();
1120                            break;
1121                        case Enabled.ELEMENT:
1122                            Enabled enabled = ParseStreamManagement.enabled(parser);
1123                            if (enabled.isResumeSet()) {
1124                                smSessionId = enabled.getId();
1125                                if (StringUtils.isNullOrEmpty(smSessionId)) {
1126                                    SmackException xmppException = new SmackException("Stream Management 'enabled' element with resume attribute but without session id received");
1127                                    smEnabledSyncPoint.reportFailure(xmppException);
1128                                    throw xmppException;
1129                                }
1130                                smServerMaxResumptionTime = enabled.getMaxResumptionTime();
1131                            } else {
1132                                // Mark this a non-resumable stream by setting smSessionId to null
1133                                smSessionId = null;
1134                            }
1135                            clientHandledStanzasCount = 0;
1136                            smWasEnabledAtLeastOnce = true;
1137                            smEnabledSyncPoint.reportSuccess();
1138                            LOGGER.fine("Stream Management (XEP-198): successfully enabled");
1139                            break;
1140                        case Failed.ELEMENT:
1141                            Failed failed = ParseStreamManagement.failed(parser);
1142                            FailedNonzaException xmppException = new FailedNonzaException(failed, failed.getStanzaErrorCondition());
1143                            // If only XEP-198 would specify different failure elements for the SM
1144                            // enable and SM resume failure case. But this is not the case, so we
1145                            // need to determine if this is a 'Failed' response for either 'Enable'
1146                            // or 'Resume'.
1147                            if (smResumedSyncPoint.requestSent()) {
1148                                smResumedSyncPoint.reportFailure(xmppException);
1149                            }
1150                            else {
1151                                if (!smEnabledSyncPoint.requestSent()) {
1152                                    throw new IllegalStateException("Failed element received but SM was not previously enabled");
1153                                }
1154                                smEnabledSyncPoint.reportFailure(new SmackException(xmppException));
1155                                // Report success for last lastFeaturesReceived so that in case a
1156                                // failed resumption, we can continue with normal resource binding.
1157                                // See text of XEP-198 5. below Example 11.
1158                                lastFeaturesReceived.reportSuccess();
1159                            }
1160                            break;
1161                        case Resumed.ELEMENT:
1162                            Resumed resumed = ParseStreamManagement.resumed(parser);
1163                            if (!smSessionId.equals(resumed.getPrevId())) {
1164                                throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId());
1165                            }
1166                            // Mark SM as enabled
1167                            smEnabledSyncPoint.reportSuccess();
1168                            // First, drop the stanzas already handled by the server
1169                            processHandledCount(resumed.getHandledCount());
1170                            // Then re-send what is left in the unacknowledged queue
1171                            List<Stanza> stanzasToResend = new ArrayList<>(unacknowledgedStanzas.size());
1172                            unacknowledgedStanzas.drainTo(stanzasToResend);
1173                            for (Stanza stanza : stanzasToResend) {
1174                                sendStanzaInternal(stanza);
1175                            }
1176                            // If there where stanzas resent, then request a SM ack for them.
1177                            // Writer's sendStreamElement() won't do it automatically based on
1178                            // predicates.
1179                            if (!stanzasToResend.isEmpty()) {
1180                                requestSmAcknowledgementInternal();
1181                            }
1182                            // Mark SM resumption as successful
1183                            smResumedSyncPoint.reportSuccess();
1184                            LOGGER.fine("Stream Management (XEP-198): Stream resumed");
1185                            break;
1186                        case AckAnswer.ELEMENT:
1187                            AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser);
1188                            processHandledCount(ackAnswer.getHandledCount());
1189                            break;
1190                        case AckRequest.ELEMENT:
1191                            ParseStreamManagement.ackRequest(parser);
1192                            if (smEnabledSyncPoint.wasSuccessful()) {
1193                                sendSmAcknowledgementInternal();
1194                            } else {
1195                                LOGGER.warning("SM Ack Request received while SM is not enabled");
1196                            }
1197                            break;
1198                         default:
1199                             LOGGER.warning("Unknown top level stream element: " + name);
1200                             break;
1201                        }
1202                        break;
1203                    case XmlPullParser.END_TAG:
1204                        final String endTagName = parser.getName();
1205                        if ("stream".equals(endTagName)) {
1206                            if (!parser.getNamespace().equals("http://etherx.jabber.org/streams")) {
1207                                LOGGER.warning(XMPPTCPConnection.this +  " </stream> but different namespace " + parser.getNamespace());
1208                                break;
1209                            }
1210
1211                            // Check if the queue was already shut down before reporting success on closing stream tag
1212                            // received. This avoids a race if there is a disconnect(), followed by a connect(), which
1213                            // did re-start the queue again, causing this writer to assume that the queue is not
1214                            // shutdown, which results in a call to disconnect().
1215                            final boolean queueWasShutdown = packetWriter.queue.isShutdown();
1216                            closingStreamReceived.reportSuccess();
1217
1218                            if (queueWasShutdown) {
1219                                // We received a closing stream element *after* we initiated the
1220                                // termination of the session by sending a closing stream element to
1221                                // the server first
1222                                return;
1223                            } else {
1224                                // We received a closing stream element from the server without us
1225                                // sending a closing stream element first. This means that the
1226                                // server wants to terminate the session, therefore disconnect
1227                                // the connection
1228                                LOGGER.info(XMPPTCPConnection.this
1229                                                + " received closing </stream> element."
1230                                                + " Server wants to terminate the connection, calling disconnect()");
1231                                disconnect();
1232                            }
1233                        }
1234                        break;
1235                    case XmlPullParser.END_DOCUMENT:
1236                        // END_DOCUMENT only happens in an error case, as otherwise we would see a
1237                        // closing stream element before.
1238                        throw new SmackException(
1239                                        "Parser got END_DOCUMENT event. This could happen e.g. if the server closed the connection without sending a closing stream element");
1240                    }
1241                    eventType = parser.next();
1242                }
1243            }
1244            catch (Exception e) {
1245                closingStreamReceived.reportFailure(e);
1246                // The exception can be ignored if the the connection is 'done'
1247                // or if the it was caused because the socket got closed
1248                if (!(done || packetWriter.queue.isShutdown())) {
1249                    // Close the connection and notify connection listeners of the
1250                    // error.
1251                    notifyConnectionError(e);
1252                }
1253            }
1254        }
1255    }
1256
1257    protected class PacketWriter {
1258        public static final int QUEUE_SIZE = XMPPTCPConnection.QUEUE_SIZE;
1259
1260        private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<>(
1261                        QUEUE_SIZE, true);
1262
1263        /**
1264         * Needs to be protected for unit testing purposes.
1265         */
1266        protected SynchronizationPoint<NoResponseException> shutdownDone = new SynchronizationPoint<>(
1267                        XMPPTCPConnection.this, "shutdown completed");
1268
1269        /**
1270         * If set, the stanza writer is shut down
1271         */
1272        protected volatile Long shutdownTimestamp = null;
1273
1274        private volatile boolean instantShutdown;
1275
1276        /**
1277         * True if some preconditions are given to start the bundle and defer mechanism.
1278         * <p>
1279         * This will likely get set to true right after the start of the writer thread, because
1280         * {@link #nextStreamElement()} will check if {@link queue} is empty, which is probably the case, and then set
1281         * this field to true.
1282         * </p>
1283         */
1284        private boolean shouldBundleAndDefer;
1285
1286        /**
1287        * Initializes the writer in order to be used. It is called at the first connection and also
1288        * is invoked if the connection is disconnected by an error.
1289        */
1290        void init() {
1291            shutdownDone.init();
1292            shutdownTimestamp = null;
1293
1294            if (unacknowledgedStanzas != null) {
1295                // It's possible that there are new stanzas in the writer queue that
1296                // came in while we were disconnected but resumable, drain those into
1297                // the unacknowledged queue so that they get resent now
1298                drainWriterQueueToUnacknowledgedStanzas();
1299            }
1300
1301            queue.start();
1302            Async.go(new Runnable() {
1303                @Override
1304                public void run() {
1305                    writePackets();
1306                }
1307            }, "Smack Writer (" + getConnectionCounter() + ")");
1308        }
1309
1310        private boolean done() {
1311            return shutdownTimestamp != null;
1312        }
1313
1314        protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException {
1315            final boolean done = done();
1316            if (done) {
1317                final boolean smResumptionPossible = isSmResumptionPossible();
1318                // Don't throw a NotConnectedException is there is an resumable stream available
1319                if (!smResumptionPossible) {
1320                    throw new NotConnectedException(XMPPTCPConnection.this, "done=" + done
1321                                    + " smResumptionPossible=" + smResumptionPossible);
1322                }
1323            }
1324        }
1325
1326        /**
1327         * Sends the specified element to the server.
1328         *
1329         * @param element the element to send.
1330         * @throws NotConnectedException
1331         * @throws InterruptedException
1332         */
1333        protected void sendStreamElement(Element element) throws NotConnectedException, InterruptedException {
1334            throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
1335            try {
1336                queue.put(element);
1337            }
1338            catch (InterruptedException e) {
1339                // put() may throw an InterruptedException for two reasons:
1340                // 1. If the queue was shut down
1341                // 2. If the thread was interrupted
1342                // so we have to check which is the case
1343                throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
1344                // If the method above did not throw, then the sending thread was interrupted
1345                throw e;
1346            }
1347        }
1348
1349        /**
1350         * Shuts down the stanza writer. Once this method has been called, no further
1351         * packets will be written to the server.
1352         * @throws InterruptedException
1353         */
1354        void shutdown(boolean instant) {
1355            instantShutdown = instant;
1356            queue.shutdown();
1357            shutdownTimestamp = System.currentTimeMillis();
1358            try {
1359                shutdownDone.checkIfSuccessOrWait();
1360            }
1361            catch (NoResponseException | InterruptedException e) {
1362                LOGGER.log(Level.WARNING, "shutdownDone was not marked as successful by the writer thread", e);
1363            }
1364        }
1365
1366        /**
1367         * Maybe return the next available element from the queue for writing. If the queue is shut down <b>or</b> a
1368         * spurious interrupt occurs, <code>null</code> is returned. So it is important to check the 'done' condition in
1369         * that case.
1370         *
1371         * @return the next element for writing or null.
1372         */
1373        private Element nextStreamElement() {
1374            // It is important the we check if the queue is empty before removing an element from it
1375            if (queue.isEmpty()) {
1376                shouldBundleAndDefer = true;
1377            }
1378            Element packet = null;
1379            try {
1380                packet = queue.take();
1381            }
1382            catch (InterruptedException e) {
1383                if (!queue.isShutdown()) {
1384                    // Users shouldn't try to interrupt the packet writer thread
1385                    LOGGER.log(Level.WARNING, "Writer thread was interrupted. Don't do that. Use disconnect() instead.", e);
1386                }
1387            }
1388            return packet;
1389        }
1390
1391        private void writePackets() {
1392            Exception writerException = null;
1393            try {
1394                openStream();
1395                initialOpenStreamSend.reportSuccess();
1396                // Write out packets from the queue.
1397                while (!done()) {
1398                    Element element = nextStreamElement();
1399                    if (element == null) {
1400                        continue;
1401                    }
1402
1403                    // Get a local version of the bundle and defer callback, in case it's unset
1404                    // between the null check and the method invocation
1405                    final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback;
1406                    // If the preconditions are given (e.g. bundleAndDefer callback is set, queue is
1407                    // empty), then we could wait a bit for further stanzas attempting to decrease
1408                    // our energy consumption
1409                    if (localBundleAndDeferCallback != null && isAuthenticated() && shouldBundleAndDefer) {
1410                        // Reset shouldBundleAndDefer to false, nextStreamElement() will set it to true once the
1411                        // queue is empty again.
1412                        shouldBundleAndDefer = false;
1413                        final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean();
1414                        final int bundleAndDeferMillis = localBundleAndDeferCallback.getBundleAndDeferMillis(new BundleAndDefer(
1415                                        bundlingAndDeferringStopped));
1416                        if (bundleAndDeferMillis > 0) {
1417                            long remainingWait = bundleAndDeferMillis;
1418                            final long waitStart = System.currentTimeMillis();
1419                            synchronized (bundlingAndDeferringStopped) {
1420                                while (!bundlingAndDeferringStopped.get() && remainingWait > 0) {
1421                                    bundlingAndDeferringStopped.wait(remainingWait);
1422                                    remainingWait = bundleAndDeferMillis
1423                                                    - (System.currentTimeMillis() - waitStart);
1424                                }
1425                            }
1426                        }
1427                    }
1428
1429                    Stanza packet = null;
1430                    if (element instanceof Stanza) {
1431                        packet = (Stanza) element;
1432                    }
1433                    else if (element instanceof Enable) {
1434                        // The client needs to add messages to the unacknowledged stanzas queue
1435                        // right after it sent 'enabled'. Stanza will be added once
1436                        // unacknowledgedStanzas is not null.
1437                        unacknowledgedStanzas = new ArrayBlockingQueue<>(QUEUE_SIZE);
1438                    }
1439                    maybeAddToUnacknowledgedStanzas(packet);
1440
1441                    CharSequence elementXml = element.toXML(StreamOpen.CLIENT_NAMESPACE);
1442                    if (elementXml instanceof XmlStringBuilder) {
1443                        ((XmlStringBuilder) elementXml).write(writer, StreamOpen.CLIENT_NAMESPACE);
1444                    }
1445                    else {
1446                        writer.write(elementXml.toString());
1447                    }
1448
1449                    if (queue.isEmpty()) {
1450                        writer.flush();
1451                    }
1452                    if (packet != null) {
1453                        firePacketSendingListeners(packet);
1454                    }
1455                }
1456                if (!instantShutdown) {
1457                    // Flush out the rest of the queue.
1458                    try {
1459                        while (!queue.isEmpty()) {
1460                            Element packet = queue.remove();
1461                            if (packet instanceof Stanza) {
1462                                Stanza stanza = (Stanza) packet;
1463                                maybeAddToUnacknowledgedStanzas(stanza);
1464                            }
1465                            writer.write(packet.toXML(null).toString());
1466                        }
1467                        writer.flush();
1468                    }
1469                    catch (Exception e) {
1470                        LOGGER.log(Level.WARNING,
1471                                        "Exception flushing queue during shutdown, ignore and continue",
1472                                        e);
1473                    }
1474
1475                    // Close the stream.
1476                    try {
1477                        writer.write("</stream:stream>");
1478                        writer.flush();
1479                    }
1480                    catch (Exception e) {
1481                        LOGGER.log(Level.WARNING, "Exception writing closing stream element", e);
1482                    }
1483
1484                    // Delete the queue contents (hopefully nothing is left).
1485                    queue.clear();
1486                } else if (instantShutdown && isSmEnabled()) {
1487                    // This was an instantShutdown and SM is enabled, drain all remaining stanzas
1488                    // into the unacknowledgedStanzas queue
1489                    drainWriterQueueToUnacknowledgedStanzas();
1490                }
1491                // Do *not* close the writer here, as it will cause the socket
1492                // to get closed. But we may want to receive further stanzas
1493                // until the closing stream tag is received. The socket will be
1494                // closed in shutdown().
1495            }
1496            catch (Exception e) {
1497                // The exception can be ignored if the the connection is 'done'
1498                // or if the it was caused because the socket got closed
1499                if (!(done() || queue.isShutdown())) {
1500                    writerException = e;
1501                } else {
1502                    LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e);
1503                }
1504            } finally {
1505                LOGGER.fine("Reporting shutdownDone success in writer thread");
1506                shutdownDone.reportSuccess();
1507            }
1508            // Delay notifyConnectionError after shutdownDone has been reported in the finally block.
1509            if (writerException != null) {
1510                notifyConnectionError(writerException);
1511            }
1512        }
1513
1514        private void drainWriterQueueToUnacknowledgedStanzas() {
1515            List<Element> elements = new ArrayList<>(queue.size());
1516            queue.drainTo(elements);
1517            for (Element element : elements) {
1518                if (element instanceof Stanza) {
1519                    unacknowledgedStanzas.add((Stanza) element);
1520                }
1521            }
1522        }
1523
1524        private void maybeAddToUnacknowledgedStanzas(Stanza stanza) throws IOException {
1525            // Check if the stream element should be put to the unacknowledgedStanza
1526            // queue. Note that we can not do the put() in sendStanzaInternal() and the
1527            // packet order is not stable at this point (sendStanzaInternal() can be
1528            // called concurrently).
1529            if (unacknowledgedStanzas != null && stanza != null) {
1530                // If the unacknowledgedStanza queue is nearly full, request an new ack
1531                // from the server in order to drain it
1532                if (unacknowledgedStanzas.size() == 0.8 * XMPPTCPConnection.QUEUE_SIZE) {
1533                    writer.write(AckRequest.INSTANCE.toXML(null).toString());
1534                    writer.flush();
1535                }
1536                try {
1537                    // It is important the we put the stanza in the unacknowledged stanza
1538                    // queue before we put it on the wire
1539                    unacknowledgedStanzas.put(stanza);
1540                }
1541                catch (InterruptedException e) {
1542                    throw new IllegalStateException(e);
1543                }
1544            }
1545        }
1546    }
1547
1548    /**
1549     * Set if Stream Management should be used by default for new connections.
1550     *
1551     * @param useSmDefault true to use Stream Management for new connections.
1552     */
1553    public static void setUseStreamManagementDefault(boolean useSmDefault) {
1554        XMPPTCPConnection.useSmDefault = useSmDefault;
1555    }
1556
1557    /**
1558     * Set if Stream Management resumption should be used by default for new connections.
1559     *
1560     * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
1561     * @deprecated use {@link #setUseStreamManagementResumptionDefault(boolean)} instead.
1562     */
1563    @Deprecated
1564    public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) {
1565        setUseStreamManagementResumptionDefault(useSmResumptionDefault);
1566    }
1567
1568    /**
1569     * Set if Stream Management resumption should be used by default for new connections.
1570     *
1571     * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
1572     */
1573    public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) {
1574        if (useSmResumptionDefault) {
1575            // Also enable SM is resumption is enabled
1576            setUseStreamManagementDefault(useSmResumptionDefault);
1577        }
1578        XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault;
1579    }
1580
1581    /**
1582     * Set if Stream Management should be used if supported by the server.
1583     *
1584     * @param useSm true to use Stream Management.
1585     */
1586    public void setUseStreamManagement(boolean useSm) {
1587        this.useSm = useSm;
1588    }
1589
1590    /**
1591     * Set if Stream Management resumption should be used if supported by the server.
1592     *
1593     * @param useSmResumption true to use Stream Management resumption.
1594     */
1595    public void setUseStreamManagementResumption(boolean useSmResumption) {
1596        if (useSmResumption) {
1597            // Also enable SM is resumption is enabled
1598            setUseStreamManagement(useSmResumption);
1599        }
1600        this.useSmResumption = useSmResumption;
1601    }
1602
1603    /**
1604     * Set the preferred resumption time in seconds.
1605     * @param resumptionTime the preferred resumption time in seconds
1606     */
1607    public void setPreferredResumptionTime(int resumptionTime) {
1608        smClientMaxResumptionTime = resumptionTime;
1609    }
1610
1611    /**
1612     * Add a predicate for Stream Management acknowledgment requests.
1613     * <p>
1614     * Those predicates are used to determine when a Stream Management acknowledgement request is send to the server.
1615     * Some pre-defined predicates are found in the <code>org.jivesoftware.smack.sm.predicates</code> package.
1616     * </p>
1617     * <p>
1618     * If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used.
1619     * </p>
1620     *
1621     * @param predicate the predicate to add.
1622     * @return if the predicate was not already active.
1623     */
1624    public boolean addRequestAckPredicate(StanzaFilter predicate) {
1625        synchronized (requestAckPredicates) {
1626            return requestAckPredicates.add(predicate);
1627        }
1628    }
1629
1630    /**
1631     * Remove the given predicate for Stream Management acknowledgment request.
1632     * @param predicate the predicate to remove.
1633     * @return true if the predicate was removed.
1634     */
1635    public boolean removeRequestAckPredicate(StanzaFilter predicate) {
1636        synchronized (requestAckPredicates) {
1637            return requestAckPredicates.remove(predicate);
1638        }
1639    }
1640
1641    /**
1642     * Remove all predicates for Stream Management acknowledgment requests.
1643     */
1644    public void removeAllRequestAckPredicates() {
1645        synchronized (requestAckPredicates) {
1646            requestAckPredicates.clear();
1647        }
1648    }
1649
1650    /**
1651     * Send an unconditional Stream Management acknowledgement request to the server.
1652     *
1653     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1654     * @throws NotConnectedException if the connection is not connected.
1655     * @throws InterruptedException
1656     */
1657    public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException {
1658        if (!isSmEnabled()) {
1659            throw new StreamManagementException.StreamManagementNotEnabledException();
1660        }
1661        requestSmAcknowledgementInternal();
1662    }
1663
1664    private void requestSmAcknowledgementInternal() throws NotConnectedException, InterruptedException {
1665        packetWriter.sendStreamElement(AckRequest.INSTANCE);
1666    }
1667
1668    /**
1669     * Send a unconditional Stream Management acknowledgment to the server.
1670     * <p>
1671     * See <a href="http://xmpp.org/extensions/xep-0198.html#acking">XEP-198: Stream Management ยง 4. Acks</a>:
1672     * "Either party MAY send an &lt;a/&gt; element at any time (e.g., after it has received a certain number of stanzas,
1673     * or after a certain period of time), even if it has not received an &lt;r/&gt; element from the other party."
1674     * </p>
1675     *
1676     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1677     * @throws NotConnectedException if the connection is not connected.
1678     * @throws InterruptedException
1679     */
1680    public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException {
1681        if (!isSmEnabled()) {
1682            throw new StreamManagementException.StreamManagementNotEnabledException();
1683        }
1684        sendSmAcknowledgementInternal();
1685    }
1686
1687    private void sendSmAcknowledgementInternal() throws NotConnectedException, InterruptedException {
1688        packetWriter.sendStreamElement(new AckAnswer(clientHandledStanzasCount));
1689    }
1690
1691    /**
1692     * Add a Stanza acknowledged listener.
1693     * <p>
1694     * Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get
1695     * automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, StanzaListener)} when
1696     * possible.
1697     * </p>
1698     *
1699     * @param listener the listener to add.
1700     */
1701    public void addStanzaAcknowledgedListener(StanzaListener listener) {
1702        stanzaAcknowledgedListeners.add(listener);
1703    }
1704
1705    /**
1706     * Remove the given Stanza acknowledged listener.
1707     *
1708     * @param listener the listener.
1709     * @return true if the listener was removed.
1710     */
1711    public boolean removeStanzaAcknowledgedListener(StanzaListener listener) {
1712        return stanzaAcknowledgedListeners.remove(listener);
1713    }
1714
1715    /**
1716     * Remove all stanza acknowledged listeners.
1717     */
1718    public void removeAllStanzaAcknowledgedListeners() {
1719        stanzaAcknowledgedListeners.clear();
1720    }
1721
1722    /**
1723     * Add a new Stanza ID acknowledged listener for the given ID.
1724     * <p>
1725     * The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will
1726     * automatically be removed after the listener was run.
1727     * </p>
1728     *
1729     * @param id the stanza ID.
1730     * @param listener the listener to invoke.
1731     * @return the previous listener for this stanza ID or null.
1732     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1733     */
1734    @SuppressWarnings("FutureReturnValueIgnored")
1735    public StanzaListener addStanzaIdAcknowledgedListener(final String id, StanzaListener listener) throws StreamManagementNotEnabledException {
1736        // Prevent users from adding callbacks that will never get removed
1737        if (!smWasEnabledAtLeastOnce) {
1738            throw new StreamManagementException.StreamManagementNotEnabledException();
1739        }
1740        // Remove the listener after max. 3 hours
1741        final int removeAfterSeconds = Math.min(getMaxSmResumptionTime(), 3 * 60 * 60);
1742        schedule(new Runnable() {
1743            @Override
1744            public void run() {
1745                stanzaIdAcknowledgedListeners.remove(id);
1746            }
1747        }, removeAfterSeconds, TimeUnit.SECONDS);
1748        return stanzaIdAcknowledgedListeners.put(id, listener);
1749    }
1750
1751    /**
1752     * Remove the Stanza ID acknowledged listener for the given ID.
1753     *
1754     * @param id the stanza ID.
1755     * @return true if the listener was found and removed, false otherwise.
1756     */
1757    public StanzaListener removeStanzaIdAcknowledgedListener(String id) {
1758        return stanzaIdAcknowledgedListeners.remove(id);
1759    }
1760
1761    /**
1762     * Removes all Stanza ID acknowledged listeners.
1763     */
1764    public void removeAllStanzaIdAcknowledgedListeners() {
1765        stanzaIdAcknowledgedListeners.clear();
1766    }
1767
1768    /**
1769     * Returns true if Stream Management is supported by the server.
1770     *
1771     * @return true if Stream Management is supported by the server.
1772     */
1773    public boolean isSmAvailable() {
1774        return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE);
1775    }
1776
1777    /**
1778     * Returns true if Stream Management was successfully negotiated with the server.
1779     *
1780     * @return true if Stream Management was negotiated.
1781     */
1782    public boolean isSmEnabled() {
1783        return smEnabledSyncPoint.wasSuccessful();
1784    }
1785
1786    /**
1787     * Returns true if the stream was successfully resumed with help of Stream Management.
1788     *
1789     * @return true if the stream was resumed.
1790     */
1791    public boolean streamWasResumed() {
1792        return smResumedSyncPoint.wasSuccessful();
1793    }
1794
1795    /**
1796     * Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible.
1797     *
1798     * @return true if disconnected but resumption possible.
1799     */
1800    public boolean isDisconnectedButSmResumptionPossible() {
1801        return disconnectedButResumeable && isSmResumptionPossible();
1802    }
1803
1804    /**
1805     * Returns true if the stream is resumable.
1806     *
1807     * @return true if the stream is resumable.
1808     */
1809    public boolean isSmResumptionPossible() {
1810        // There is no resumable stream available
1811        if (smSessionId == null)
1812            return false;
1813
1814        final Long shutdownTimestamp = packetWriter.shutdownTimestamp;
1815        // Seems like we are already reconnected, report true
1816        if (shutdownTimestamp == null) {
1817            return true;
1818        }
1819
1820        // See if resumption time is over
1821        long current = System.currentTimeMillis();
1822        long maxResumptionMillies = ((long) getMaxSmResumptionTime()) * 1000;
1823        if (current > shutdownTimestamp + maxResumptionMillies) {
1824            // Stream resumption is *not* possible if the current timestamp is greater then the greatest timestamp where
1825            // resumption is possible
1826            return false;
1827        } else {
1828            return true;
1829        }
1830    }
1831
1832    /**
1833     * Drop the stream management state. Sets {@link #smSessionId} and
1834     * {@link #unacknowledgedStanzas} to <code>null</code>.
1835     */
1836    private void dropSmState() {
1837        // clientHandledCount and serverHandledCount will be reset on <enable/> and <enabled/>
1838        // respective. No need to reset them here.
1839        smSessionId = null;
1840        unacknowledgedStanzas = null;
1841    }
1842
1843    /**
1844     * Get the maximum resumption time in seconds after which a managed stream can be resumed.
1845     * <p>
1846     * This method will return {@link Integer#MAX_VALUE} if neither the client nor the server specify a maximum
1847     * resumption time. Be aware of integer overflows when using this value, e.g. do not add arbitrary values to it
1848     * without checking for overflows before.
1849     * </p>
1850     *
1851     * @return the maximum resumption time in seconds or {@link Integer#MAX_VALUE} if none set.
1852     */
1853    public int getMaxSmResumptionTime() {
1854        int clientResumptionTime = smClientMaxResumptionTime > 0 ? smClientMaxResumptionTime : Integer.MAX_VALUE;
1855        int serverResumptionTime = smServerMaxResumptionTime > 0 ? smServerMaxResumptionTime : Integer.MAX_VALUE;
1856        return Math.min(clientResumptionTime, serverResumptionTime);
1857    }
1858
1859    private void processHandledCount(long handledCount) throws StreamManagementCounterError {
1860        long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount);
1861        final List<Stanza> ackedStanzas = new ArrayList<>(
1862                        ackedStanzasCount <= Integer.MAX_VALUE ? (int) ackedStanzasCount
1863                                        : Integer.MAX_VALUE);
1864        for (long i = 0; i < ackedStanzasCount; i++) {
1865            Stanza ackedStanza = unacknowledgedStanzas.poll();
1866            // If the server ack'ed a stanza, then it must be in the
1867            // unacknowledged stanza queue. There can be no exception.
1868            if (ackedStanza == null) {
1869                throw new StreamManagementCounterError(handledCount, serverHandledStanzasCount,
1870                                ackedStanzasCount, ackedStanzas);
1871            }
1872            ackedStanzas.add(ackedStanza);
1873        }
1874
1875        boolean atLeastOneStanzaAcknowledgedListener = false;
1876        if (!stanzaAcknowledgedListeners.isEmpty()) {
1877            // If stanzaAcknowledgedListeners is not empty, the we have at least one
1878            atLeastOneStanzaAcknowledgedListener = true;
1879        }
1880        else {
1881            // Otherwise we look for a matching id in the stanza *id* acknowledged listeners
1882            for (Stanza ackedStanza : ackedStanzas) {
1883                String id = ackedStanza.getStanzaId();
1884                if (id != null && stanzaIdAcknowledgedListeners.containsKey(id)) {
1885                    atLeastOneStanzaAcknowledgedListener = true;
1886                    break;
1887                }
1888            }
1889        }
1890
1891        // Only spawn a new thread if there is a chance that some listener is invoked
1892        if (atLeastOneStanzaAcknowledgedListener) {
1893            asyncGo(new Runnable() {
1894                @Override
1895                public void run() {
1896                    for (Stanza ackedStanza : ackedStanzas) {
1897                        for (StanzaListener listener : stanzaAcknowledgedListeners) {
1898                            try {
1899                                listener.processStanza(ackedStanza);
1900                            }
1901                            catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
1902                                LOGGER.log(Level.FINER, "Received exception", e);
1903                            }
1904                        }
1905                        String id = ackedStanza.getStanzaId();
1906                        if (StringUtils.isNullOrEmpty(id)) {
1907                            continue;
1908                        }
1909                        StanzaListener listener = stanzaIdAcknowledgedListeners.remove(id);
1910                        if (listener != null) {
1911                            try {
1912                                listener.processStanza(ackedStanza);
1913                            }
1914                            catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
1915                                LOGGER.log(Level.FINER, "Received exception", e);
1916                            }
1917                        }
1918                    }
1919                }
1920            });
1921        }
1922
1923        serverHandledStanzasCount = handledCount;
1924    }
1925
1926    /**
1927     * Set the default bundle and defer callback used for new connections.
1928     *
1929     * @param defaultBundleAndDeferCallback
1930     * @see BundleAndDeferCallback
1931     * @since 4.1
1932     */
1933    public static void setDefaultBundleAndDeferCallback(BundleAndDeferCallback defaultBundleAndDeferCallback) {
1934        XMPPTCPConnection.defaultBundleAndDeferCallback = defaultBundleAndDeferCallback;
1935    }
1936
1937    /**
1938     * Set the bundle and defer callback used for this connection.
1939     * <p>
1940     * You can use <code>null</code> as argument to reset the callback. Outgoing stanzas will then
1941     * no longer get deferred.
1942     * </p>
1943     *
1944     * @param bundleAndDeferCallback the callback or <code>null</code>.
1945     * @see BundleAndDeferCallback
1946     * @since 4.1
1947     */
1948    public void setBundleandDeferCallback(BundleAndDeferCallback bundleAndDeferCallback) {
1949        this.bundleAndDeferCallback = bundleAndDeferCallback;
1950    }
1951
1952}