001/**
002 *
003 * Copyright 2003-2007 Jive Software.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack.tcp;
018
019import java.io.BufferedReader;
020import java.io.ByteArrayInputStream;
021import java.io.FileInputStream;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.InputStreamReader;
025import java.io.OutputStream;
026import java.io.OutputStreamWriter;
027import java.io.Writer;
028import java.lang.reflect.Constructor;
029import java.net.InetAddress;
030import java.net.InetSocketAddress;
031import java.net.Socket;
032import java.security.KeyManagementException;
033import java.security.KeyStore;
034import java.security.KeyStoreException;
035import java.security.NoSuchAlgorithmException;
036import java.security.NoSuchProviderException;
037import java.security.Provider;
038import java.security.SecureRandom;
039import java.security.Security;
040import java.security.UnrecoverableKeyException;
041import java.security.cert.CertificateException;
042import java.util.ArrayList;
043import java.util.Collection;
044import java.util.Iterator;
045import java.util.LinkedHashSet;
046import java.util.LinkedList;
047import java.util.List;
048import java.util.Map;
049import java.util.Set;
050import java.util.concurrent.ArrayBlockingQueue;
051import java.util.concurrent.BlockingQueue;
052import java.util.concurrent.ConcurrentHashMap;
053import java.util.concurrent.ConcurrentLinkedQueue;
054import java.util.concurrent.Semaphore;
055import java.util.concurrent.TimeUnit;
056import java.util.concurrent.atomic.AtomicBoolean;
057import java.util.logging.Level;
058import java.util.logging.Logger;
059
060import javax.net.SocketFactory;
061import javax.net.ssl.HostnameVerifier;
062import javax.net.ssl.KeyManager;
063import javax.net.ssl.KeyManagerFactory;
064import javax.net.ssl.SSLContext;
065import javax.net.ssl.SSLSession;
066import javax.net.ssl.SSLSocket;
067import javax.net.ssl.TrustManager;
068import javax.net.ssl.X509TrustManager;
069import javax.security.auth.callback.Callback;
070import javax.security.auth.callback.CallbackHandler;
071import javax.security.auth.callback.PasswordCallback;
072
073import org.jivesoftware.smack.AbstractConnectionListener;
074import org.jivesoftware.smack.AbstractXMPPConnection;
075import org.jivesoftware.smack.ConnectionConfiguration;
076import org.jivesoftware.smack.ConnectionConfiguration.DnssecMode;
077import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
078import org.jivesoftware.smack.SmackConfiguration;
079import org.jivesoftware.smack.SmackException;
080import org.jivesoftware.smack.SmackException.AlreadyConnectedException;
081import org.jivesoftware.smack.SmackException.AlreadyLoggedInException;
082import org.jivesoftware.smack.SmackException.ConnectionException;
083import org.jivesoftware.smack.SmackException.NoResponseException;
084import org.jivesoftware.smack.SmackException.NotConnectedException;
085import org.jivesoftware.smack.SmackException.NotLoggedInException;
086import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException;
087import org.jivesoftware.smack.SmackException.SmackWrappedException;
088import org.jivesoftware.smack.SmackFuture;
089import org.jivesoftware.smack.StanzaListener;
090import org.jivesoftware.smack.SynchronizationPoint;
091import org.jivesoftware.smack.XMPPConnection;
092import org.jivesoftware.smack.XMPPException;
093import org.jivesoftware.smack.XMPPException.FailedNonzaException;
094import org.jivesoftware.smack.XMPPException.StreamErrorException;
095import org.jivesoftware.smack.compress.packet.Compress;
096import org.jivesoftware.smack.compress.packet.Compressed;
097import org.jivesoftware.smack.compression.XMPPInputOutputStream;
098import org.jivesoftware.smack.filter.StanzaFilter;
099import org.jivesoftware.smack.packet.Element;
100import org.jivesoftware.smack.packet.IQ;
101import org.jivesoftware.smack.packet.Message;
102import org.jivesoftware.smack.packet.Nonza;
103import org.jivesoftware.smack.packet.Presence;
104import org.jivesoftware.smack.packet.Stanza;
105import org.jivesoftware.smack.packet.StartTls;
106import org.jivesoftware.smack.packet.StreamError;
107import org.jivesoftware.smack.packet.StreamOpen;
108import org.jivesoftware.smack.proxy.ProxyInfo;
109import org.jivesoftware.smack.sasl.packet.SaslStreamElements;
110import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Challenge;
111import org.jivesoftware.smack.sasl.packet.SaslStreamElements.SASLFailure;
112import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Success;
113import org.jivesoftware.smack.sm.SMUtils;
114import org.jivesoftware.smack.sm.StreamManagementException;
115import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException;
116import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError;
117import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException;
118import org.jivesoftware.smack.sm.packet.StreamManagement;
119import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer;
120import org.jivesoftware.smack.sm.packet.StreamManagement.AckRequest;
121import org.jivesoftware.smack.sm.packet.StreamManagement.Enable;
122import org.jivesoftware.smack.sm.packet.StreamManagement.Enabled;
123import org.jivesoftware.smack.sm.packet.StreamManagement.Failed;
124import org.jivesoftware.smack.sm.packet.StreamManagement.Resume;
125import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed;
126import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature;
127import org.jivesoftware.smack.sm.predicates.Predicate;
128import org.jivesoftware.smack.sm.provider.ParseStreamManagement;
129import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
130import org.jivesoftware.smack.util.Async;
131import org.jivesoftware.smack.util.DNSUtil;
132import org.jivesoftware.smack.util.PacketParserUtils;
133import org.jivesoftware.smack.util.StringUtils;
134import org.jivesoftware.smack.util.TLSUtils;
135import org.jivesoftware.smack.util.XmlStringBuilder;
136import org.jivesoftware.smack.util.dns.HostAddress;
137import org.jivesoftware.smack.util.dns.SmackDaneProvider;
138import org.jivesoftware.smack.util.dns.SmackDaneVerifier;
139
140import org.jxmpp.jid.impl.JidCreate;
141import org.jxmpp.jid.parts.Resourcepart;
142import org.jxmpp.stringprep.XmppStringprepException;
143import org.jxmpp.util.XmppStringUtils;
144import org.minidns.dnsname.DnsName;
145import org.xmlpull.v1.XmlPullParser;
146import org.xmlpull.v1.XmlPullParserException;
147
148/**
149 * Creates a socket connection to an XMPP server. This is the default connection
150 * to an XMPP server and is specified in the XMPP Core (RFC 6120).
151 *
152 * @see XMPPConnection
153 * @author Matt Tucker
154 */
155public class XMPPTCPConnection extends AbstractXMPPConnection {
156
157    private static final int QUEUE_SIZE = 500;
158    private static final Logger LOGGER = Logger.getLogger(XMPPTCPConnection.class.getName());
159
160    /**
161     * The socket which is used for this connection.
162     */
163    private Socket socket;
164
165    /**
166     *
167     */
168    private boolean disconnectedButResumeable = false;
169
170    private SSLSocket secureSocket;
171
172    private final Semaphore readerWriterSemaphore = new Semaphore(2);
173
174    /**
175     * Protected access level because of unit test purposes
176     */
177    protected final PacketWriter packetWriter = new PacketWriter();
178
179    /**
180     * Protected access level because of unit test purposes
181     */
182    protected final PacketReader packetReader = new PacketReader();
183
184    private final SynchronizationPoint<Exception> initialOpenStreamSend = new SynchronizationPoint<>(
185                    this, "initial open stream element send to server");
186
187    /**
188     *
189     */
190    private final SynchronizationPoint<XMPPException> maybeCompressFeaturesReceived = new SynchronizationPoint<XMPPException>(
191                    this, "stream compression feature");
192
193    /**
194     *
195     */
196    private final SynchronizationPoint<SmackException> compressSyncPoint = new SynchronizationPoint<>(
197                    this, "stream compression");
198
199    /**
200     * A synchronization point which is successful if this connection has received the closing
201     * stream element from the remote end-point, i.e. the server.
202     */
203    private final SynchronizationPoint<Exception> closingStreamReceived = new SynchronizationPoint<>(
204                    this, "stream closing element received");
205
206    /**
207     * The default bundle and defer callback, used for new connections.
208     * @see bundleAndDeferCallback
209     */
210    private static BundleAndDeferCallback defaultBundleAndDeferCallback;
211
212    /**
213     * The used bundle and defer callback.
214     * <p>
215     * Although this field may be set concurrently, the 'volatile' keyword was deliberately not added, in order to avoid
216     * having a 'volatile' read within the writer threads loop.
217     * </p>
218     */
219    private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback;
220
221    private static boolean useSmDefault = true;
222
223    private static boolean useSmResumptionDefault = true;
224
225    /**
226     * The stream ID of the stream that is currently resumable, ie. the stream we hold the state
227     * for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and
228     * {@link #unacknowledgedStanzas}.
229     */
230    private String smSessionId;
231
232    private final SynchronizationPoint<FailedNonzaException> smResumedSyncPoint = new SynchronizationPoint<>(
233                    this, "stream resumed element");
234
235    private final SynchronizationPoint<SmackException> smEnabledSyncPoint = new SynchronizationPoint<>(
236                    this, "stream enabled element");
237
238    /**
239     * The client's preferred maximum resumption time in seconds.
240     */
241    private int smClientMaxResumptionTime = -1;
242
243    /**
244     * The server's preferred maximum resumption time in seconds.
245     */
246    private int smServerMaxResumptionTime = -1;
247
248    /**
249     * Indicates whether Stream Management (XEP-198) should be used if it's supported by the server.
250     */
251    private boolean useSm = useSmDefault;
252    private boolean useSmResumption = useSmResumptionDefault;
253
254    /**
255     * The counter that the server sends the client about it's current height. For example, if the server sends
256     * {@code <a h='42'/>}, then this will be set to 42 (while also handling the {@link #unacknowledgedStanzas} queue).
257     */
258    private long serverHandledStanzasCount = 0;
259
260    /**
261     * The counter for stanzas handled ("received") by the client.
262     * <p>
263     * Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are
264     * not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since
265     * {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and
266     * therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic.
267     * </p>
268     */
269    private long clientHandledStanzasCount = 0;
270
271    private BlockingQueue<Stanza> unacknowledgedStanzas;
272
273    /**
274     * Set to true if Stream Management was at least once enabled for this connection.
275     */
276    private boolean smWasEnabledAtLeastOnce = false;
277
278    /**
279     * This listeners are invoked for every stanza that got acknowledged.
280     * <p>
281     * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove
282     * themselves after they have been invoked.
283     * </p>
284     */
285    private final Collection<StanzaListener> stanzaAcknowledgedListeners = new ConcurrentLinkedQueue<>();
286
287    /**
288     * These listeners are invoked for every stanza that got dropped.
289     * <p>
290     * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove
291     * themselves after they have been invoked.
292     * </p>
293     */
294    private final Collection<StanzaListener> stanzaDroppedListeners = new ConcurrentLinkedQueue<>();
295
296    /**
297     * This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will
298     * only be invoked once and automatically removed after that.
299     */
300    private final Map<String, StanzaListener> stanzaIdAcknowledgedListeners = new ConcurrentHashMap<>();
301
302    /**
303     * Predicates that determine if an stream management ack should be requested from the server.
304     * <p>
305     * We use a linked hash set here, so that the order how the predicates are added matches the
306     * order in which they are invoked in order to determine if an ack request should be send or not.
307     * </p>
308     */
309    private final Set<StanzaFilter> requestAckPredicates = new LinkedHashSet<>();
310
311    @SuppressWarnings("HidingField")
312    private final XMPPTCPConnectionConfiguration config;
313
314    /**
315     * Creates a new XMPP connection over TCP (optionally using proxies).
316     * <p>
317     * Note that XMPPTCPConnection constructors do not establish a connection to the server
318     * and you must call {@link #connect()}.
319     * </p>
320     *
321     * @param config the connection configuration.
322     */
323    public XMPPTCPConnection(XMPPTCPConnectionConfiguration config) {
324        super(config);
325        this.config = config;
326        addConnectionListener(new AbstractConnectionListener() {
327            @Override
328            public void connectionClosedOnError(Exception e) {
329                if (e instanceof XMPPException.StreamErrorException || e instanceof StreamManagementException) {
330                    dropSmState();
331                }
332            }
333        });
334    }
335
336    /**
337     * Creates a new XMPP connection over TCP.
338     * <p>
339     * Note that {@code jid} must be the bare JID, e.g. "user@example.org". More fine-grained control over the
340     * connection settings is available using the {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)}
341     * constructor.
342     * </p>
343     *
344     * @param jid the bare JID used by the client.
345     * @param password the password or authentication token.
346     * @throws XmppStringprepException
347     */
348    public XMPPTCPConnection(CharSequence jid, String password) throws XmppStringprepException {
349        this(XmppStringUtils.parseLocalpart(jid.toString()), password, XmppStringUtils.parseDomain(jid.toString()));
350    }
351
352    /**
353     * Creates a new XMPP connection over TCP.
354     * <p>
355     * This is the simplest constructor for connecting to an XMPP server. Alternatively,
356     * you can get fine-grained control over connection settings using the
357     * {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} constructor.
358     * </p>
359     * @param username
360     * @param password
361     * @param serviceName
362     * @throws XmppStringprepException
363     */
364    public XMPPTCPConnection(CharSequence username, String password, String serviceName) throws XmppStringprepException {
365        this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setXmppDomain(
366                                        JidCreate.domainBareFrom(serviceName)).build());
367    }
368
369    @Override
370    protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException {
371        if (packetWriter == null) {
372            throw new NotConnectedException();
373        }
374        packetWriter.throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
375    }
376
377    @Override
378    protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException {
379        if (isConnected() && !disconnectedButResumeable) {
380            throw new AlreadyConnectedException();
381        }
382    }
383
384    @Override
385    protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException {
386        if (isAuthenticated() && !disconnectedButResumeable) {
387            throw new AlreadyLoggedInException();
388        }
389    }
390
391    @Override
392    protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException {
393        // Reset the flag in case it was set
394        disconnectedButResumeable = false;
395        super.afterSuccessfulLogin(resumed);
396    }
397
398    @Override
399    protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException,
400                    SmackException, IOException, InterruptedException {
401        // Authenticate using SASL
402        SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null;
403        saslAuthentication.authenticate(username, password, config.getAuthzid(), sslSession);
404
405        // Wait for stream features after the authentication.
406        // TODO: The name of this synchronization point "maybeCompressFeaturesReceived" is not perfect. It should be
407        // renamed to "streamFeaturesAfterAuthenticationReceived".
408        maybeCompressFeaturesReceived.checkIfSuccessOrWait();
409
410        // If compression is enabled then request the server to use stream compression. XEP-170
411        // recommends to perform stream compression before resource binding.
412        maybeEnableCompression();
413
414        if (isSmResumptionPossible()) {
415            smResumedSyncPoint.sendAndWaitForResponse(new Resume(clientHandledStanzasCount, smSessionId));
416            if (smResumedSyncPoint.wasSuccessful()) {
417                // We successfully resumed the stream, be done here
418                afterSuccessfulLogin(true);
419                return;
420            }
421            // SM resumption failed, what Smack does here is to report success of
422            // lastFeaturesReceived in case of sm resumption was answered with 'failed' so that
423            // normal resource binding can be tried.
424            LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process");
425        }
426
427        List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>();
428        if (unacknowledgedStanzas != null) {
429            // There was a previous connection with SM enabled but that was either not resumable or
430            // failed to resume. Make sure that we (re-)send the unacknowledged stanzas.
431            unacknowledgedStanzas.drainTo(previouslyUnackedStanzas);
432            // Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this
433            // XMPP session (There maybe was an enabled in a previous XMPP session of this
434            // connection instance though). This is used in writePackets to decide if stanzas should
435            // be added to the unacknowledged stanzas queue, because they have to be added right
436            // after the 'enable' stream element has been sent.
437            dropSmState();
438        }
439
440        // Now bind the resource. It is important to do this *after* we dropped an eventually
441        // existing Stream Management state. As otherwise <bind/> and <session/> may end up in
442        // unacknowledgedStanzas and become duplicated on reconnect. See SMACK-706.
443        bindResourceAndEstablishSession(resource);
444
445        if (isSmAvailable() && useSm) {
446            // Remove what is maybe left from previously stream managed sessions
447            serverHandledStanzasCount = 0;
448            // XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed'
449            // then this is a non recoverable error and we therefore throw an exception.
450            smEnabledSyncPoint.sendAndWaitForResponseOrThrow(new Enable(useSmResumption, smClientMaxResumptionTime));
451            synchronized (requestAckPredicates) {
452                if (requestAckPredicates.isEmpty()) {
453                    // Assure that we have at lest one predicate set up that so that we request acks
454                    // for the server and eventually flush some stanzas from the unacknowledged
455                    // stanza queue
456                    requestAckPredicates.add(Predicate.forMessagesOrAfter5Stanzas());
457                }
458            }
459        }
460        // Inform client about failed resumption if possible, resend stanzas otherwise
461        // Process the stanzas synchronously so a client can re-queue them for transmission
462        // before it is informed about connection success
463        if (!stanzaDroppedListeners.isEmpty()) {
464            for (Stanza stanza : previouslyUnackedStanzas) {
465                for (StanzaListener listener : stanzaDroppedListeners) {
466                    try {
467                        listener.processStanza(stanza);
468                    }
469                    catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
470                        LOGGER.log(Level.FINER, "StanzaDroppedListener received exception", e);
471                    }
472                }
473            }
474        } else {
475            for (Stanza stanza : previouslyUnackedStanzas) {
476                sendStanzaInternal(stanza);
477            }
478        }
479
480        afterSuccessfulLogin(false);
481    }
482
483    @Override
484    public boolean isSecureConnection() {
485        return secureSocket != null;
486    }
487
488    /**
489     * Shuts the current connection down. After this method returns, the connection must be ready
490     * for re-use by connect.
491     */
492    @Override
493    protected void shutdown() {
494        if (isSmEnabled()) {
495            try {
496                // Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM
497                // state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either.
498                sendSmAcknowledgementInternal();
499            } catch (InterruptedException | NotConnectedException e) {
500                LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e);
501            }
502        }
503        shutdown(false);
504    }
505
506    @Override
507    public synchronized void instantShutdown() {
508        shutdown(true);
509    }
510
511    private void shutdown(boolean instant) {
512        // First shutdown the writer, this will result in a closing stream element getting send to
513        // the server
514        LOGGER.finer("PacketWriter shutdown()");
515        packetWriter.shutdown(instant);
516        LOGGER.finer("PacketWriter has been shut down");
517
518        if (!instant) {
519            try {
520                // After we send the closing stream element, check if there was already a
521                // closing stream element sent by the server or wait with a timeout for a
522                // closing stream element to be received from the server.
523                @SuppressWarnings("unused")
524                Exception res = closingStreamReceived.checkIfSuccessOrWait();
525            } catch (InterruptedException | NoResponseException e) {
526                LOGGER.log(Level.INFO, "Exception while waiting for closing stream element from the server " + this, e);
527            }
528        }
529
530        LOGGER.finer("PacketReader shutdown()");
531        packetReader.shutdown();
532        LOGGER.finer("PacketReader has been shut down");
533
534        final Socket socket = this.socket;
535        if (socket != null && socket.isConnected()) {
536            try {
537                socket.close();
538            } catch (Exception e) {
539                LOGGER.log(Level.WARNING, "shutdown", e);
540            }
541        }
542
543        setWasAuthenticated();
544
545        // Wait for reader and writer threads to be terminated.
546        readerWriterSemaphore.acquireUninterruptibly(2);
547        readerWriterSemaphore.release(2);
548
549        if (disconnectedButResumeable) {
550            return;
551        }
552
553        // If we are able to resume the stream, then don't set
554        // connected/authenticated/usingTLS to false since we like behave like we are still
555        // connected (e.g. sendStanza should not throw a NotConnectedException).
556        if (isSmResumptionPossible() && instant) {
557            disconnectedButResumeable = true;
558        } else {
559            disconnectedButResumeable = false;
560            // Reset the stream management session id to null, since if the stream is cleanly closed, i.e. sending a closing
561            // stream tag, there is no longer a stream to resume.
562            smSessionId = null;
563            // Note that we deliberately do not reset authenticatedConnectionInitiallyEstablishedTimestamp here, so that the
564            // information is available in the connectionClosedOnError() listeners.
565        }
566        authenticated = false;
567        connected = false;
568        secureSocket = null;
569        reader = null;
570        writer = null;
571
572        initState();
573    }
574
575    @Override
576    protected void initState() {
577        super.initState();
578        maybeCompressFeaturesReceived.init();
579        compressSyncPoint.init();
580        smResumedSyncPoint.init();
581        smEnabledSyncPoint.init();
582        initialOpenStreamSend.init();
583    }
584
585    @Override
586    public void sendNonza(Nonza element) throws NotConnectedException, InterruptedException {
587        packetWriter.sendStreamElement(element);
588    }
589
590    @Override
591    protected void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException {
592        packetWriter.sendStreamElement(packet);
593        if (isSmEnabled()) {
594            for (StanzaFilter requestAckPredicate : requestAckPredicates) {
595                if (requestAckPredicate.accept(packet)) {
596                    requestSmAcknowledgementInternal();
597                    break;
598                }
599            }
600        }
601    }
602
603    private void connectUsingConfiguration() throws ConnectionException, IOException, InterruptedException {
604        List<HostAddress> failedAddresses = populateHostAddresses();
605        SocketFactory socketFactory = config.getSocketFactory();
606        ProxyInfo proxyInfo = config.getProxyInfo();
607        int timeout = config.getConnectTimeout();
608        if (socketFactory == null) {
609            socketFactory = SocketFactory.getDefault();
610        }
611        for (HostAddress hostAddress : hostAddresses) {
612            Iterator<InetAddress> inetAddresses;
613            String host = hostAddress.getHost();
614            int port = hostAddress.getPort();
615            if (proxyInfo == null) {
616                inetAddresses = hostAddress.getInetAddresses().iterator();
617                assert (inetAddresses.hasNext());
618
619                innerloop: while (inetAddresses.hasNext()) {
620                    // Create a *new* Socket before every connection attempt, i.e. connect() call, since Sockets are not
621                    // re-usable after a failed connection attempt. See also SMACK-724.
622                    SmackFuture.SocketFuture socketFuture = new SmackFuture.SocketFuture(socketFactory);
623
624                    final InetAddress inetAddress = inetAddresses.next();
625                    final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, port);
626                    LOGGER.finer("Trying to establish TCP connection to " + inetSocketAddress);
627                    socketFuture.connectAsync(inetSocketAddress, timeout);
628
629                    try {
630                        socket = socketFuture.getOrThrow();
631                    } catch (IOException e) {
632                        hostAddress.setException(inetAddress, e);
633                        if (inetAddresses.hasNext()) {
634                            continue innerloop;
635                        } else {
636                            break innerloop;
637                        }
638                    }
639                    LOGGER.finer("Established TCP connection to " + inetSocketAddress);
640                    // We found a host to connect to, return here
641                    this.host = host;
642                    this.port = port;
643                    return;
644                }
645                failedAddresses.add(hostAddress);
646            } else {
647                socket = socketFactory.createSocket();
648                StringUtils.requireNotNullOrEmpty(host, "Host of HostAddress " + hostAddress + " must not be null when using a Proxy");
649                final String hostAndPort = host + " at port " + port;
650                LOGGER.finer("Trying to establish TCP connection via Proxy to " + hostAndPort);
651                try {
652                    proxyInfo.getProxySocketConnection().connect(socket, host, port, timeout);
653                } catch (IOException e) {
654                    hostAddress.setException(e);
655                    failedAddresses.add(hostAddress);
656                    continue;
657                }
658                LOGGER.finer("Established TCP connection to " + hostAndPort);
659                // We found a host to connect to, return here
660                this.host = host;
661                this.port = port;
662                return;
663            }
664        }
665        // There are no more host addresses to try
666        // throw an exception and report all tried
667        // HostAddresses in the exception
668        throw ConnectionException.from(failedAddresses);
669    }
670
671    /**
672     * Initializes the connection by creating a stanza reader and writer and opening a
673     * XMPP stream to the server.
674     *
675     * @throws XMPPException if establishing a connection to the server fails.
676     * @throws SmackException if the server fails to respond back or if there is anther error.
677     * @throws IOException
678     * @throws InterruptedException
679     */
680    private void initConnection() throws IOException, InterruptedException {
681        compressionHandler = null;
682
683        // Set the reader and writer instance variables
684        initReaderAndWriter();
685
686        int availableReaderWriterSemaphorePermits = readerWriterSemaphore.availablePermits();
687        if (availableReaderWriterSemaphorePermits < 2) {
688            Object[] logObjects = new Object[] {
689                            this,
690                            availableReaderWriterSemaphorePermits,
691            };
692            LOGGER.log(Level.FINE, "Not every reader/writer threads where terminated on connection re-initializtion of {0}. Available permits {1}", logObjects);
693        }
694        readerWriterSemaphore.acquire(2);
695        // Start the writer thread. This will open an XMPP stream to the server
696        packetWriter.init();
697        // Start the reader thread. The startup() method will block until we
698        // get an opening stream packet back from server
699        packetReader.init();
700    }
701
702    private void initReaderAndWriter() throws IOException {
703        InputStream is = socket.getInputStream();
704        OutputStream os = socket.getOutputStream();
705        if (compressionHandler != null) {
706            is = compressionHandler.getInputStream(is);
707            os = compressionHandler.getOutputStream(os);
708        }
709        // OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter
710        writer = new OutputStreamWriter(os, "UTF-8");
711        reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
712
713        // If debugging is enabled, we open a window and write out all network traffic.
714        initDebugger();
715    }
716
717    /**
718     * The server has indicated that TLS negotiation can start. We now need to secure the
719     * existing plain connection and perform a handshake. This method won't return until the
720     * connection has finished the handshake or an error occurred while securing the connection.
721     * @throws IOException
722     * @throws CertificateException
723     * @throws NoSuchAlgorithmException
724     * @throws NoSuchProviderException
725     * @throws KeyStoreException
726     * @throws UnrecoverableKeyException
727     * @throws KeyManagementException
728     * @throws SmackException
729     * @throws Exception if an exception occurs.
730     */
731    @SuppressWarnings("LiteralClassName")
732    private void proceedTLSReceived() throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException, NoSuchProviderException, UnrecoverableKeyException, KeyManagementException, SmackException {
733        SmackDaneVerifier daneVerifier = null;
734
735        if (config.getDnssecMode() == DnssecMode.needsDnssecAndDane) {
736            SmackDaneProvider daneProvider = DNSUtil.getDaneProvider();
737            if (daneProvider == null) {
738                throw new UnsupportedOperationException("DANE enabled but no SmackDaneProvider configured");
739            }
740            daneVerifier = daneProvider.newInstance();
741            if (daneVerifier == null) {
742                throw new IllegalStateException("DANE requested but DANE provider did not return a DANE verifier");
743            }
744        }
745
746        SSLContext context = this.config.getCustomSSLContext();
747        KeyStore ks = null;
748        PasswordCallback pcb = null;
749
750        if (context == null) {
751            final String keyStoreType = config.getKeystoreType();
752            final CallbackHandler callbackHandler = config.getCallbackHandler();
753            final String keystorePath = config.getKeystorePath();
754            if ("PKCS11".equals(keyStoreType)) {
755                try {
756                    Constructor<?> c = Class.forName("sun.security.pkcs11.SunPKCS11").getConstructor(InputStream.class);
757                    String pkcs11Config = "name = SmartCard\nlibrary = " + config.getPKCS11Library();
758                    ByteArrayInputStream config = new ByteArrayInputStream(pkcs11Config.getBytes(StringUtils.UTF8));
759                    Provider p = (Provider) c.newInstance(config);
760                    Security.addProvider(p);
761                    ks = KeyStore.getInstance("PKCS11",p);
762                    pcb = new PasswordCallback("PKCS11 Password: ",false);
763                    callbackHandler.handle(new Callback[] {pcb});
764                    ks.load(null,pcb.getPassword());
765                }
766                catch (Exception e) {
767                    LOGGER.log(Level.WARNING, "Exception", e);
768                    ks = null;
769                }
770            }
771            else if ("Apple".equals(keyStoreType)) {
772                ks = KeyStore.getInstance("KeychainStore","Apple");
773                ks.load(null,null);
774                // pcb = new PasswordCallback("Apple Keychain",false);
775                // pcb.setPassword(null);
776            }
777            else if (keyStoreType != null) {
778                ks = KeyStore.getInstance(keyStoreType);
779                if (callbackHandler != null && StringUtils.isNotEmpty(keystorePath)) {
780                    try {
781                        pcb = new PasswordCallback("Keystore Password: ", false);
782                        callbackHandler.handle(new Callback[] { pcb });
783                        ks.load(new FileInputStream(keystorePath), pcb.getPassword());
784                    }
785                    catch (Exception e) {
786                        LOGGER.log(Level.WARNING, "Exception", e);
787                        ks = null;
788                    }
789                } else {
790                    ks.load(null, null);
791                }
792            }
793
794            KeyManager[] kms = null;
795
796            if (ks != null) {
797                String keyManagerFactoryAlgorithm = KeyManagerFactory.getDefaultAlgorithm();
798                KeyManagerFactory kmf = null;
799                try {
800                    kmf = KeyManagerFactory.getInstance(keyManagerFactoryAlgorithm);
801                }
802                catch (NoSuchAlgorithmException e) {
803                    LOGGER.log(Level.FINE, "Could get the default KeyManagerFactory for the '"
804                                    + keyManagerFactoryAlgorithm + "' algorithm", e);
805                }
806                if (kmf != null) {
807                    try {
808                        if (pcb == null) {
809                            kmf.init(ks, null);
810                        }
811                        else {
812                            kmf.init(ks, pcb.getPassword());
813                            pcb.clearPassword();
814                        }
815                        kms = kmf.getKeyManagers();
816                    }
817                    catch (NullPointerException npe) {
818                        LOGGER.log(Level.WARNING, "NullPointerException", npe);
819                    }
820                }
821            }
822
823            // If the user didn't specify a SSLContext, use the default one
824            context = SSLContext.getInstance("TLS");
825
826            final SecureRandom secureRandom = new java.security.SecureRandom();
827            X509TrustManager customTrustManager = config.getCustomX509TrustManager();
828
829            if (daneVerifier != null) {
830                // User requested DANE verification.
831                daneVerifier.init(context, kms, customTrustManager, secureRandom);
832            } else {
833                TrustManager[] customTrustManagers = null;
834                if (customTrustManager != null) {
835                    customTrustManagers = new TrustManager[] { customTrustManager };
836                }
837                context.init(kms, customTrustManagers, secureRandom);
838            }
839        }
840
841        Socket plain = socket;
842        // Secure the plain connection
843        socket = context.getSocketFactory().createSocket(plain,
844                config.getXMPPServiceDomain().toString(), plain.getPort(), true);
845
846        final SSLSocket sslSocket = (SSLSocket) socket;
847        // Immediately set the enabled SSL protocols and ciphers. See SMACK-712 why this is
848        // important (at least on certain platforms) and it seems to be a good idea anyways to
849        // prevent an accidental implicit handshake.
850        TLSUtils.setEnabledProtocolsAndCiphers(sslSocket, config.getEnabledSSLProtocols(), config.getEnabledSSLCiphers());
851
852        // Initialize the reader and writer with the new secured version
853        initReaderAndWriter();
854
855        // Proceed to do the handshake
856        sslSocket.startHandshake();
857
858        if (daneVerifier != null) {
859            daneVerifier.finish(sslSocket);
860        }
861
862        final HostnameVerifier verifier = getConfiguration().getHostnameVerifier();
863        if (verifier == null) {
864                throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure.");
865        }
866
867        final String verifierHostname;
868        {
869            DnsName xmppServiceDomainDnsName = getConfiguration().getXmppServiceDomainAsDnsNameIfPossible();
870            // Try to convert the XMPP service domain, which potentially includes Unicode characters, into ASCII
871            // Compatible Encoding (ACE) to match RFC3280 dNSname IA5String constraint.
872            // See also: https://bugzilla.mozilla.org/show_bug.cgi?id=280839#c1
873            if (xmppServiceDomainDnsName != null) {
874                verifierHostname = xmppServiceDomainDnsName.ace;
875            }
876            else {
877                LOGGER.log(Level.WARNING, "XMPP service domain name '" + getXMPPServiceDomain()
878                                + "' can not be represented as DNS name. TLS X.509 certificate validiation may fail.");
879                verifierHostname = getXMPPServiceDomain().toString();
880            }
881        }
882
883        final boolean verificationSuccessful;
884        // Verify the TLS session.
885        verificationSuccessful = verifier.verify(verifierHostname, sslSocket.getSession());
886        if (!verificationSuccessful) {
887            throw new CertificateException(
888                            "Hostname verification of certificate failed. Certificate does not authenticate "
889                                            + getXMPPServiceDomain());
890        }
891
892        // Set that TLS was successful
893        secureSocket = sslSocket;
894    }
895
896    /**
897     * Returns the compression handler that can be used for one compression methods offered by the server.
898     *
899     * @return a instance of XMPPInputOutputStream or null if no suitable instance was found
900     *
901     */
902    private static XMPPInputOutputStream maybeGetCompressionHandler(Compress.Feature compression) {
903        for (XMPPInputOutputStream handler : SmackConfiguration.getCompressionHandlers()) {
904                String method = handler.getCompressionMethod();
905                if (compression.getMethods().contains(method))
906                    return handler;
907        }
908        return null;
909    }
910
911    @Override
912    public boolean isUsingCompression() {
913        return compressionHandler != null && compressSyncPoint.wasSuccessful();
914    }
915
916    /**
917     * <p>
918     * Starts using stream compression that will compress network traffic. Traffic can be
919     * reduced up to 90%. Therefore, stream compression is ideal when using a slow speed network
920     * connection. However, the server and the client will need to use more CPU time in order to
921     * un/compress network data so under high load the server performance might be affected.
922     * </p>
923     * <p>
924     * Stream compression has to have been previously offered by the server. Currently only the
925     * zlib method is supported by the client. Stream compression negotiation has to be done
926     * before authentication took place.
927     * </p>
928     *
929     * @throws NotConnectedException
930     * @throws SmackException
931     * @throws NoResponseException
932     * @throws InterruptedException
933     */
934    private void maybeEnableCompression() throws SmackException, InterruptedException {
935        if (!config.isCompressionEnabled()) {
936            return;
937        }
938
939        Compress.Feature compression = getFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE);
940        if (compression == null) {
941            // Server does not support compression
942            return;
943        }
944        // If stream compression was offered by the server and we want to use
945        // compression then send compression request to the server
946        if ((compressionHandler = maybeGetCompressionHandler(compression)) != null) {
947            compressSyncPoint.sendAndWaitForResponseOrThrow(new Compress(compressionHandler.getCompressionMethod()));
948        } else {
949            LOGGER.warning("Could not enable compression because no matching handler/method pair was found");
950        }
951    }
952
953    /**
954     * Establishes a connection to the XMPP server. It basically
955     * creates and maintains a socket connection to the server.
956     * <p>
957     * Listeners will be preserved from a previous connection if the reconnection
958     * occurs after an abrupt termination.
959     * </p>
960     *
961     * @throws XMPPException if an error occurs while trying to establish the connection.
962     * @throws SmackException
963     * @throws IOException
964     * @throws InterruptedException
965     */
966    @Override
967    protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException {
968        closingStreamReceived.init();
969        // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if
970        // there is an error establishing the connection
971        connectUsingConfiguration();
972
973        // We connected successfully to the servers TCP port
974        initConnection();
975
976        // TLS handled will be successful either if TLS was established, or if it was not mandatory.
977        tlsHandled.checkIfSuccessOrWaitOrThrow();
978
979        // Wait with SASL auth until the SASL mechanisms have been received
980        saslFeatureReceived.checkIfSuccessOrWaitOrThrow();
981    }
982
983    /**
984     * Sends out a notification that there was an error with the connection
985     * and closes the connection. Also prints the stack trace of the given exception
986     *
987     * @param e the exception that causes the connection close event.
988     */
989    private void notifyConnectionError(final Exception e) {
990        ASYNC_BUT_ORDERED.performAsyncButOrdered(this, new Runnable() {
991            @Override
992            public void run() {
993                // Listeners were already notified of the exception, return right here.
994                if (packetReader.done || packetWriter.done()) return;
995
996                // Report the failure outside the synchronized block, so that a thread waiting within a synchronized
997                // function like connect() throws the wrapped exception.
998                SmackWrappedException smackWrappedException = new SmackWrappedException(e);
999                tlsHandled.reportGenericFailure(smackWrappedException);
1000                saslFeatureReceived.reportGenericFailure(smackWrappedException);
1001                maybeCompressFeaturesReceived.reportGenericFailure(smackWrappedException);
1002                lastFeaturesReceived.reportGenericFailure(smackWrappedException);
1003
1004                synchronized (XMPPTCPConnection.this) {
1005                    // Within this synchronized block, either *both* reader and writer threads must be terminated, or
1006                    // none.
1007                    assert ((packetReader.done && packetWriter.done())
1008                            || (!packetReader.done && !packetWriter.done()));
1009
1010                    // Closes the connection temporary. A reconnection is possible
1011                    // Note that a connection listener of XMPPTCPConnection will drop the SM state in
1012                    // case the Exception is a StreamErrorException.
1013                    instantShutdown();
1014                }
1015
1016                Async.go(new Runnable() {
1017                    @Override
1018                    public void run() {
1019                        // Notify connection listeners of the error.
1020                        callConnectionClosedOnErrorListener(e);
1021                    }
1022                }, XMPPTCPConnection.this + " callConnectionClosedOnErrorListener()");
1023            }
1024        });
1025    }
1026
1027    /**
1028     * For unit testing purposes
1029     *
1030     * @param writer
1031     */
1032    protected void setWriter(Writer writer) {
1033        this.writer = writer;
1034    }
1035
1036    @Override
1037    protected void afterFeaturesReceived() throws NotConnectedException, InterruptedException, SecurityRequiredByServerException {
1038        StartTls startTlsFeature = getFeature(StartTls.ELEMENT, StartTls.NAMESPACE);
1039        if (startTlsFeature != null) {
1040            if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) {
1041                SecurityRequiredByServerException smackException = new SecurityRequiredByServerException();
1042                tlsHandled.reportFailure(smackException);
1043                throw smackException;
1044            }
1045
1046            if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) {
1047                sendNonza(new StartTls());
1048            } else {
1049                tlsHandled.reportSuccess();
1050            }
1051        } else {
1052            tlsHandled.reportSuccess();
1053        }
1054
1055        if (getSASLAuthentication().authenticationSuccessful()) {
1056            // If we have received features after the SASL has been successfully completed, then we
1057            // have also *maybe* received, as it is an optional feature, the compression feature
1058            // from the server.
1059            maybeCompressFeaturesReceived.reportSuccess();
1060        }
1061    }
1062
1063    /**
1064     * Resets the parser using the latest connection's reader. Resetting the parser is necessary
1065     * when the plain connection has been secured or when a new opening stream element is going
1066     * to be sent by the server.
1067     *
1068     * @throws SmackException if the parser could not be reset.
1069     * @throws InterruptedException
1070     */
1071    void openStream() throws SmackException, InterruptedException {
1072        // If possible, provide the receiving entity of the stream open tag, i.e. the server, as much information as
1073        // possible. The 'to' attribute is *always* available. The 'from' attribute if set by the user and no external
1074        // mechanism is used to determine the local entity (user). And the 'id' attribute is available after the first
1075        // response from the server (see e.g. RFC 6120 § 9.1.1 Step 2.)
1076        CharSequence to = getXMPPServiceDomain();
1077        CharSequence from = null;
1078        CharSequence localpart = config.getUsername();
1079        if (localpart != null) {
1080            from = XmppStringUtils.completeJidFrom(localpart, to);
1081        }
1082        String id = getStreamId();
1083        sendNonza(new StreamOpen(to, from, id));
1084        try {
1085            packetReader.parser = PacketParserUtils.newXmppParser(reader);
1086        }
1087        catch (XmlPullParserException e) {
1088            throw new SmackException(e);
1089        }
1090    }
1091
1092    protected class PacketReader {
1093
1094        private final String threadName = "Smack Reader (" + getConnectionCounter() + ')';
1095
1096        XmlPullParser parser;
1097
1098        private volatile boolean done;
1099
1100        /**
1101         * Initializes the reader in order to be used. The reader is initialized during the
1102         * first connection and when reconnecting due to an abruptly disconnection.
1103         */
1104        void init() {
1105            done = false;
1106
1107            Async.go(new Runnable() {
1108                @Override
1109                public void run() {
1110                    LOGGER.finer(threadName + " start");
1111                    try {
1112                        parsePackets();
1113                    } finally {
1114                        LOGGER.finer(threadName + " exit");
1115                        XMPPTCPConnection.this.readerWriterSemaphore.release();
1116                    }
1117                }
1118            }, threadName);
1119         }
1120
1121        /**
1122         * Shuts the stanza reader down. This method simply sets the 'done' flag to true.
1123         */
1124        void shutdown() {
1125            done = true;
1126        }
1127
1128        /**
1129         * Parse top-level packets in order to process them further.
1130         */
1131        private void parsePackets() {
1132            try {
1133                initialOpenStreamSend.checkIfSuccessOrWait();
1134                int eventType = parser.getEventType();
1135                while (!done) {
1136                    switch (eventType) {
1137                    case XmlPullParser.START_TAG:
1138                        final String name = parser.getName();
1139                        switch (name) {
1140                        case Message.ELEMENT:
1141                        case IQ.IQ_ELEMENT:
1142                        case Presence.ELEMENT:
1143                            try {
1144                                parseAndProcessStanza(parser);
1145                            } finally {
1146                                clientHandledStanzasCount = SMUtils.incrementHeight(clientHandledStanzasCount);
1147                            }
1148                            break;
1149                        case "stream":
1150                            // We found an opening stream.
1151                            if ("jabber:client".equals(parser.getNamespace(null))) {
1152                                streamId = parser.getAttributeValue("", "id");
1153                                String reportedServerDomain = parser.getAttributeValue("", "from");
1154                                assert (config.getXMPPServiceDomain().equals(reportedServerDomain));
1155                            }
1156                            break;
1157                        case "error":
1158                            StreamError streamError = PacketParserUtils.parseStreamError(parser);
1159                            saslFeatureReceived.reportFailure(new StreamErrorException(streamError));
1160                            // Mark the tlsHandled sync point as success, we will use the saslFeatureReceived sync
1161                            // point to report the error, which is checked immediately after tlsHandled in
1162                            // connectInternal().
1163                            tlsHandled.reportSuccess();
1164                            throw new StreamErrorException(streamError);
1165                        case "features":
1166                            parseFeatures(parser);
1167                            break;
1168                        case "proceed":
1169                            try {
1170                                // Secure the connection by negotiating TLS
1171                                proceedTLSReceived();
1172                                // Send a new opening stream to the server
1173                                openStream();
1174                            }
1175                            catch (Exception e) {
1176                                SmackException smackException = new SmackException(e);
1177                                tlsHandled.reportFailure(smackException);
1178                                throw e;
1179                            }
1180                            break;
1181                        case "failure":
1182                            String namespace = parser.getNamespace(null);
1183                            switch (namespace) {
1184                            case "urn:ietf:params:xml:ns:xmpp-tls":
1185                                // TLS negotiation has failed. The server will close the connection
1186                                // TODO Parse failure stanza
1187                                throw new SmackException("TLS negotiation has failed");
1188                            case "http://jabber.org/protocol/compress":
1189                                // Stream compression has been denied. This is a recoverable
1190                                // situation. It is still possible to authenticate and
1191                                // use the connection but using an uncompressed connection
1192                                // TODO Parse failure stanza
1193                                compressSyncPoint.reportFailure(new SmackException(
1194                                                "Could not establish compression"));
1195                                break;
1196                            case SaslStreamElements.NAMESPACE:
1197                                // SASL authentication has failed. The server may close the connection
1198                                // depending on the number of retries
1199                                final SASLFailure failure = PacketParserUtils.parseSASLFailure(parser);
1200                                getSASLAuthentication().authenticationFailed(failure);
1201                                break;
1202                            }
1203                            break;
1204                        case Challenge.ELEMENT:
1205                            // The server is challenging the SASL authentication made by the client
1206                            String challengeData = parser.nextText();
1207                            getSASLAuthentication().challengeReceived(challengeData);
1208                            break;
1209                        case Success.ELEMENT:
1210                            Success success = new Success(parser.nextText());
1211                            // We now need to bind a resource for the connection
1212                            // Open a new stream and wait for the response
1213                            openStream();
1214                            // The SASL authentication with the server was successful. The next step
1215                            // will be to bind the resource
1216                            getSASLAuthentication().authenticated(success);
1217                            break;
1218                        case Compressed.ELEMENT:
1219                            // Server confirmed that it's possible to use stream compression. Start
1220                            // stream compression
1221                            // Initialize the reader and writer with the new compressed version
1222                            initReaderAndWriter();
1223                            // Send a new opening stream to the server
1224                            openStream();
1225                            // Notify that compression is being used
1226                            compressSyncPoint.reportSuccess();
1227                            break;
1228                        case Enabled.ELEMENT:
1229                            Enabled enabled = ParseStreamManagement.enabled(parser);
1230                            if (enabled.isResumeSet()) {
1231                                smSessionId = enabled.getId();
1232                                if (StringUtils.isNullOrEmpty(smSessionId)) {
1233                                    SmackException xmppException = new SmackException("Stream Management 'enabled' element with resume attribute but without session id received");
1234                                    smEnabledSyncPoint.reportFailure(xmppException);
1235                                    throw xmppException;
1236                                }
1237                                smServerMaxResumptionTime = enabled.getMaxResumptionTime();
1238                            } else {
1239                                // Mark this a non-resumable stream by setting smSessionId to null
1240                                smSessionId = null;
1241                            }
1242                            clientHandledStanzasCount = 0;
1243                            smWasEnabledAtLeastOnce = true;
1244                            smEnabledSyncPoint.reportSuccess();
1245                            LOGGER.fine("Stream Management (XEP-198): successfully enabled");
1246                            break;
1247                        case Failed.ELEMENT:
1248                            Failed failed = ParseStreamManagement.failed(parser);
1249                            FailedNonzaException xmppException = new FailedNonzaException(failed, failed.getStanzaErrorCondition());
1250                            // If only XEP-198 would specify different failure elements for the SM
1251                            // enable and SM resume failure case. But this is not the case, so we
1252                            // need to determine if this is a 'Failed' response for either 'Enable'
1253                            // or 'Resume'.
1254                            if (smResumedSyncPoint.requestSent()) {
1255                                smResumedSyncPoint.reportFailure(xmppException);
1256                            }
1257                            else {
1258                                if (!smEnabledSyncPoint.requestSent()) {
1259                                    throw new IllegalStateException("Failed element received but SM was not previously enabled");
1260                                }
1261                                smEnabledSyncPoint.reportFailure(new SmackException(xmppException));
1262                                // Report success for last lastFeaturesReceived so that in case a
1263                                // failed resumption, we can continue with normal resource binding.
1264                                // See text of XEP-198 5. below Example 11.
1265                                lastFeaturesReceived.reportSuccess();
1266                            }
1267                            break;
1268                        case Resumed.ELEMENT:
1269                            Resumed resumed = ParseStreamManagement.resumed(parser);
1270                            if (!smSessionId.equals(resumed.getPrevId())) {
1271                                throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId());
1272                            }
1273                            // Mark SM as enabled
1274                            smEnabledSyncPoint.reportSuccess();
1275                            // First, drop the stanzas already handled by the server
1276                            processHandledCount(resumed.getHandledCount());
1277                            // Then re-send what is left in the unacknowledged queue
1278                            List<Stanza> stanzasToResend = new ArrayList<>(unacknowledgedStanzas.size());
1279                            unacknowledgedStanzas.drainTo(stanzasToResend);
1280                            for (Stanza stanza : stanzasToResend) {
1281                                sendStanzaInternal(stanza);
1282                            }
1283                            // If there where stanzas resent, then request a SM ack for them.
1284                            // Writer's sendStreamElement() won't do it automatically based on
1285                            // predicates.
1286                            if (!stanzasToResend.isEmpty()) {
1287                                requestSmAcknowledgementInternal();
1288                            }
1289                            // Mark SM resumption as successful
1290                            smResumedSyncPoint.reportSuccess();
1291                            LOGGER.fine("Stream Management (XEP-198): Stream resumed");
1292                            break;
1293                        case AckAnswer.ELEMENT:
1294                            AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser);
1295                            processHandledCount(ackAnswer.getHandledCount());
1296                            break;
1297                        case AckRequest.ELEMENT:
1298                            ParseStreamManagement.ackRequest(parser);
1299                            if (smEnabledSyncPoint.wasSuccessful()) {
1300                                sendSmAcknowledgementInternal();
1301                            } else {
1302                                LOGGER.warning("SM Ack Request received while SM is not enabled");
1303                            }
1304                            break;
1305                         default:
1306                             LOGGER.warning("Unknown top level stream element: " + name);
1307                             break;
1308                        }
1309                        break;
1310                    case XmlPullParser.END_TAG:
1311                        final String endTagName = parser.getName();
1312                        if ("stream".equals(endTagName)) {
1313                            if (!parser.getNamespace().equals("http://etherx.jabber.org/streams")) {
1314                                LOGGER.warning(XMPPTCPConnection.this +  " </stream> but different namespace " + parser.getNamespace());
1315                                break;
1316                            }
1317
1318                            // Check if the queue was already shut down before reporting success on closing stream tag
1319                            // received. This avoids a race if there is a disconnect(), followed by a connect(), which
1320                            // did re-start the queue again, causing this writer to assume that the queue is not
1321                            // shutdown, which results in a call to disconnect().
1322                            final boolean queueWasShutdown = packetWriter.queue.isShutdown();
1323                            closingStreamReceived.reportSuccess();
1324
1325                            if (queueWasShutdown) {
1326                                // We received a closing stream element *after* we initiated the
1327                                // termination of the session by sending a closing stream element to
1328                                // the server first
1329                                return;
1330                            } else {
1331                                // We received a closing stream element from the server without us
1332                                // sending a closing stream element first. This means that the
1333                                // server wants to terminate the session, therefore disconnect
1334                                // the connection
1335                                LOGGER.info(XMPPTCPConnection.this
1336                                                + " received closing </stream> element."
1337                                                + " Server wants to terminate the connection, calling disconnect()");
1338                                ASYNC_BUT_ORDERED.performAsyncButOrdered(XMPPTCPConnection.this, new Runnable() {
1339                                    @Override
1340                                    public void run() {
1341                                        disconnect();
1342                                    }});
1343                            }
1344                        }
1345                        break;
1346                    case XmlPullParser.END_DOCUMENT:
1347                        // END_DOCUMENT only happens in an error case, as otherwise we would see a
1348                        // closing stream element before.
1349                        throw new SmackException(
1350                                        "Parser got END_DOCUMENT event. This could happen e.g. if the server closed the connection without sending a closing stream element");
1351                    }
1352                    eventType = parser.next();
1353                }
1354            }
1355            catch (Exception e) {
1356                closingStreamReceived.reportFailure(e);
1357                // The exception can be ignored if the the connection is 'done'
1358                // or if the it was caused because the socket got closed
1359                if (!(done || packetWriter.queue.isShutdown())) {
1360                    // Close the connection and notify connection listeners of the
1361                    // error.
1362                    notifyConnectionError(e);
1363                }
1364            }
1365        }
1366    }
1367
1368    protected class PacketWriter {
1369        public static final int QUEUE_SIZE = XMPPTCPConnection.QUEUE_SIZE;
1370
1371        private final String threadName = "Smack Writer (" + getConnectionCounter() + ')';
1372
1373        private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<>(
1374                        QUEUE_SIZE, true);
1375
1376        /**
1377         * Needs to be protected for unit testing purposes.
1378         */
1379        protected SynchronizationPoint<NoResponseException> shutdownDone = new SynchronizationPoint<>(
1380                        XMPPTCPConnection.this, "shutdown completed");
1381
1382        /**
1383         * If set, the stanza writer is shut down
1384         */
1385        protected volatile Long shutdownTimestamp = null;
1386
1387        private volatile boolean instantShutdown;
1388
1389        /**
1390         * True if some preconditions are given to start the bundle and defer mechanism.
1391         * <p>
1392         * This will likely get set to true right after the start of the writer thread, because
1393         * {@link #nextStreamElement()} will check if {@link queue} is empty, which is probably the case, and then set
1394         * this field to true.
1395         * </p>
1396         */
1397        private boolean shouldBundleAndDefer;
1398
1399        /**
1400        * Initializes the writer in order to be used. It is called at the first connection and also
1401        * is invoked if the connection is disconnected by an error.
1402        */
1403        void init() {
1404            shutdownDone.init();
1405            shutdownTimestamp = null;
1406
1407            if (unacknowledgedStanzas != null) {
1408                // It's possible that there are new stanzas in the writer queue that
1409                // came in while we were disconnected but resumable, drain those into
1410                // the unacknowledged queue so that they get resent now
1411                drainWriterQueueToUnacknowledgedStanzas();
1412            }
1413
1414            queue.start();
1415            Async.go(new Runnable() {
1416                @Override
1417                public void run() {
1418                    LOGGER.finer(threadName + " start");
1419                    try {
1420                        writePackets();
1421                    } finally {
1422                        LOGGER.finer(threadName + " exit");
1423                        XMPPTCPConnection.this.readerWriterSemaphore.release();
1424                    }
1425                }
1426            }, threadName);
1427        }
1428
1429        private boolean done() {
1430            return shutdownTimestamp != null;
1431        }
1432
1433        protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException {
1434            final boolean done = done();
1435            if (done) {
1436                final boolean smResumptionPossible = isSmResumptionPossible();
1437                // Don't throw a NotConnectedException is there is an resumable stream available
1438                if (!smResumptionPossible) {
1439                    throw new NotConnectedException(XMPPTCPConnection.this, "done=" + done
1440                                    + " smResumptionPossible=" + smResumptionPossible);
1441                }
1442            }
1443        }
1444
1445        /**
1446         * Sends the specified element to the server.
1447         *
1448         * @param element the element to send.
1449         * @throws NotConnectedException
1450         * @throws InterruptedException
1451         */
1452        protected void sendStreamElement(Element element) throws NotConnectedException, InterruptedException {
1453            throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
1454            try {
1455                queue.put(element);
1456            }
1457            catch (InterruptedException e) {
1458                // put() may throw an InterruptedException for two reasons:
1459                // 1. If the queue was shut down
1460                // 2. If the thread was interrupted
1461                // so we have to check which is the case
1462                throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
1463                // If the method above did not throw, then the sending thread was interrupted
1464                throw e;
1465            }
1466        }
1467
1468        /**
1469         * Shuts down the stanza writer. Once this method has been called, no further
1470         * packets will be written to the server.
1471         * @throws InterruptedException
1472         */
1473        void shutdown(boolean instant) {
1474            instantShutdown = instant;
1475            queue.shutdown();
1476            shutdownTimestamp = System.currentTimeMillis();
1477            if (shutdownDone.isNotInInitialState()) {
1478                try {
1479                    shutdownDone.checkIfSuccessOrWait();
1480                } catch (NoResponseException | InterruptedException e) {
1481                    LOGGER.log(Level.WARNING, "shutdownDone was not marked as successful by the writer thread", e);
1482                }
1483            }
1484        }
1485
1486        /**
1487         * Maybe return the next available element from the queue for writing. If the queue is shut down <b>or</b> a
1488         * spurious interrupt occurs, <code>null</code> is returned. So it is important to check the 'done' condition in
1489         * that case.
1490         *
1491         * @return the next element for writing or null.
1492         */
1493        private Element nextStreamElement() {
1494            // It is important the we check if the queue is empty before removing an element from it
1495            if (queue.isEmpty()) {
1496                shouldBundleAndDefer = true;
1497            }
1498            Element packet = null;
1499            try {
1500                packet = queue.take();
1501            }
1502            catch (InterruptedException e) {
1503                if (!queue.isShutdown()) {
1504                    // Users shouldn't try to interrupt the packet writer thread
1505                    LOGGER.log(Level.WARNING, "Writer thread was interrupted. Don't do that. Use disconnect() instead.", e);
1506                }
1507            }
1508            return packet;
1509        }
1510
1511        private void writePackets() {
1512            Exception writerException = null;
1513            try {
1514                openStream();
1515                initialOpenStreamSend.reportSuccess();
1516                // Write out packets from the queue.
1517                while (!done()) {
1518                    Element element = nextStreamElement();
1519                    if (element == null) {
1520                        continue;
1521                    }
1522
1523                    // Get a local version of the bundle and defer callback, in case it's unset
1524                    // between the null check and the method invocation
1525                    final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback;
1526                    // If the preconditions are given (e.g. bundleAndDefer callback is set, queue is
1527                    // empty), then we could wait a bit for further stanzas attempting to decrease
1528                    // our energy consumption
1529                    if (localBundleAndDeferCallback != null && isAuthenticated() && shouldBundleAndDefer) {
1530                        // Reset shouldBundleAndDefer to false, nextStreamElement() will set it to true once the
1531                        // queue is empty again.
1532                        shouldBundleAndDefer = false;
1533                        final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean();
1534                        final int bundleAndDeferMillis = localBundleAndDeferCallback.getBundleAndDeferMillis(new BundleAndDefer(
1535                                        bundlingAndDeferringStopped));
1536                        if (bundleAndDeferMillis > 0) {
1537                            long remainingWait = bundleAndDeferMillis;
1538                            final long waitStart = System.currentTimeMillis();
1539                            synchronized (bundlingAndDeferringStopped) {
1540                                while (!bundlingAndDeferringStopped.get() && remainingWait > 0) {
1541                                    bundlingAndDeferringStopped.wait(remainingWait);
1542                                    remainingWait = bundleAndDeferMillis
1543                                                    - (System.currentTimeMillis() - waitStart);
1544                                }
1545                            }
1546                        }
1547                    }
1548
1549                    Stanza packet = null;
1550                    if (element instanceof Stanza) {
1551                        packet = (Stanza) element;
1552                    }
1553                    else if (element instanceof Enable) {
1554                        // The client needs to add messages to the unacknowledged stanzas queue
1555                        // right after it sent 'enabled'. Stanza will be added once
1556                        // unacknowledgedStanzas is not null.
1557                        unacknowledgedStanzas = new ArrayBlockingQueue<>(QUEUE_SIZE);
1558                    }
1559                    maybeAddToUnacknowledgedStanzas(packet);
1560
1561                    CharSequence elementXml = element.toXML(StreamOpen.CLIENT_NAMESPACE);
1562                    if (elementXml instanceof XmlStringBuilder) {
1563                        ((XmlStringBuilder) elementXml).write(writer, StreamOpen.CLIENT_NAMESPACE);
1564                    }
1565                    else {
1566                        writer.write(elementXml.toString());
1567                    }
1568
1569                    if (queue.isEmpty()) {
1570                        writer.flush();
1571                    }
1572                    if (packet != null) {
1573                        firePacketSendingListeners(packet);
1574                    }
1575                }
1576                if (!instantShutdown) {
1577                    // Flush out the rest of the queue.
1578                    try {
1579                        while (!queue.isEmpty()) {
1580                            Element packet = queue.remove();
1581                            if (packet instanceof Stanza) {
1582                                Stanza stanza = (Stanza) packet;
1583                                maybeAddToUnacknowledgedStanzas(stanza);
1584                            }
1585                            writer.write(packet.toXML(null).toString());
1586                        }
1587                        writer.flush();
1588                    }
1589                    catch (Exception e) {
1590                        LOGGER.log(Level.WARNING,
1591                                        "Exception flushing queue during shutdown, ignore and continue",
1592                                        e);
1593                    }
1594
1595                    // Close the stream.
1596                    try {
1597                        writer.write("</stream:stream>");
1598                        writer.flush();
1599                    }
1600                    catch (Exception e) {
1601                        LOGGER.log(Level.WARNING, "Exception writing closing stream element", e);
1602                    }
1603
1604                    // Delete the queue contents (hopefully nothing is left).
1605                    queue.clear();
1606                } else if (instantShutdown && isSmEnabled()) {
1607                    // This was an instantShutdown and SM is enabled, drain all remaining stanzas
1608                    // into the unacknowledgedStanzas queue
1609                    drainWriterQueueToUnacknowledgedStanzas();
1610                }
1611                // Do *not* close the writer here, as it will cause the socket
1612                // to get closed. But we may want to receive further stanzas
1613                // until the closing stream tag is received. The socket will be
1614                // closed in shutdown().
1615            }
1616            catch (Exception e) {
1617                // The exception can be ignored if the the connection is 'done'
1618                // or if the it was caused because the socket got closed
1619                if (!(done() || queue.isShutdown())) {
1620                    writerException = e;
1621                } else {
1622                    LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e);
1623                }
1624            } finally {
1625                LOGGER.fine("Reporting shutdownDone success in writer thread");
1626                shutdownDone.reportSuccess();
1627            }
1628            // Delay notifyConnectionError after shutdownDone has been reported in the finally block.
1629            if (writerException != null) {
1630                notifyConnectionError(writerException);
1631            }
1632        }
1633
1634        private void drainWriterQueueToUnacknowledgedStanzas() {
1635            List<Element> elements = new ArrayList<>(queue.size());
1636            queue.drainTo(elements);
1637            for (int i = 0; i < elements.size(); i++) {
1638                Element element = elements.get(i);
1639                // If the unacknowledgedStanza queue is full, then bail out with a warning message. See SMACK-844.
1640                if (unacknowledgedStanzas.remainingCapacity() == 0) {
1641                    StreamManagementException.UnacknowledgedQueueFullException exception = StreamManagementException.UnacknowledgedQueueFullException
1642                            .newWith(i, elements, unacknowledgedStanzas);
1643                    LOGGER.log(Level.WARNING,
1644                            "Some stanzas may be lost as not all could be drained to the unacknowledged stanzas queue", exception);
1645                    return;
1646                }
1647                if (element instanceof Stanza) {
1648                    unacknowledgedStanzas.add((Stanza) element);
1649                }
1650            }
1651        }
1652
1653        private void maybeAddToUnacknowledgedStanzas(Stanza stanza) throws IOException {
1654            // Check if the stream element should be put to the unacknowledgedStanza
1655            // queue. Note that we can not do the put() in sendStanzaInternal() and the
1656            // packet order is not stable at this point (sendStanzaInternal() can be
1657            // called concurrently).
1658            if (unacknowledgedStanzas != null && stanza != null) {
1659                // If the unacknowledgedStanza queue is nearly full, request an new ack
1660                // from the server in order to drain it
1661                if (unacknowledgedStanzas.size() == 0.8 * XMPPTCPConnection.QUEUE_SIZE) {
1662                    writer.write(AckRequest.INSTANCE.toXML(null).toString());
1663                    writer.flush();
1664                }
1665                try {
1666                    // It is important the we put the stanza in the unacknowledged stanza
1667                    // queue before we put it on the wire
1668                    unacknowledgedStanzas.put(stanza);
1669                }
1670                catch (InterruptedException e) {
1671                    throw new IllegalStateException(e);
1672                }
1673            }
1674        }
1675    }
1676
1677    /**
1678     * Set if Stream Management should be used by default for new connections.
1679     *
1680     * @param useSmDefault true to use Stream Management for new connections.
1681     */
1682    public static void setUseStreamManagementDefault(boolean useSmDefault) {
1683        XMPPTCPConnection.useSmDefault = useSmDefault;
1684    }
1685
1686    /**
1687     * Set if Stream Management resumption should be used by default for new connections.
1688     *
1689     * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
1690     * @deprecated use {@link #setUseStreamManagementResumptionDefault(boolean)} instead.
1691     */
1692    @Deprecated
1693    public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) {
1694        setUseStreamManagementResumptionDefault(useSmResumptionDefault);
1695    }
1696
1697    /**
1698     * Set if Stream Management resumption should be used by default for new connections.
1699     *
1700     * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
1701     */
1702    public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) {
1703        if (useSmResumptionDefault) {
1704            // Also enable SM is resumption is enabled
1705            setUseStreamManagementDefault(useSmResumptionDefault);
1706        }
1707        XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault;
1708    }
1709
1710    /**
1711     * Set if Stream Management should be used if supported by the server.
1712     *
1713     * @param useSm true to use Stream Management.
1714     */
1715    public void setUseStreamManagement(boolean useSm) {
1716        this.useSm = useSm;
1717    }
1718
1719    /**
1720     * Set if Stream Management resumption should be used if supported by the server.
1721     *
1722     * @param useSmResumption true to use Stream Management resumption.
1723     */
1724    public void setUseStreamManagementResumption(boolean useSmResumption) {
1725        if (useSmResumption) {
1726            // Also enable SM is resumption is enabled
1727            setUseStreamManagement(useSmResumption);
1728        }
1729        this.useSmResumption = useSmResumption;
1730    }
1731
1732    /**
1733     * Set the preferred resumption time in seconds.
1734     * @param resumptionTime the preferred resumption time in seconds
1735     */
1736    public void setPreferredResumptionTime(int resumptionTime) {
1737        smClientMaxResumptionTime = resumptionTime;
1738    }
1739
1740    /**
1741     * Add a predicate for Stream Management acknowledgment requests.
1742     * <p>
1743     * Those predicates are used to determine when a Stream Management acknowledgement request is send to the server.
1744     * Some pre-defined predicates are found in the <code>org.jivesoftware.smack.sm.predicates</code> package.
1745     * </p>
1746     * <p>
1747     * If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used.
1748     * </p>
1749     *
1750     * @param predicate the predicate to add.
1751     * @return if the predicate was not already active.
1752     */
1753    public boolean addRequestAckPredicate(StanzaFilter predicate) {
1754        synchronized (requestAckPredicates) {
1755            return requestAckPredicates.add(predicate);
1756        }
1757    }
1758
1759    /**
1760     * Remove the given predicate for Stream Management acknowledgment request.
1761     * @param predicate the predicate to remove.
1762     * @return true if the predicate was removed.
1763     */
1764    public boolean removeRequestAckPredicate(StanzaFilter predicate) {
1765        synchronized (requestAckPredicates) {
1766            return requestAckPredicates.remove(predicate);
1767        }
1768    }
1769
1770    /**
1771     * Remove all predicates for Stream Management acknowledgment requests.
1772     */
1773    public void removeAllRequestAckPredicates() {
1774        synchronized (requestAckPredicates) {
1775            requestAckPredicates.clear();
1776        }
1777    }
1778
1779    /**
1780     * Send an unconditional Stream Management acknowledgement request to the server.
1781     *
1782     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1783     * @throws NotConnectedException if the connection is not connected.
1784     * @throws InterruptedException
1785     */
1786    public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException {
1787        if (!isSmEnabled()) {
1788            throw new StreamManagementException.StreamManagementNotEnabledException();
1789        }
1790        requestSmAcknowledgementInternal();
1791    }
1792
1793    private void requestSmAcknowledgementInternal() throws NotConnectedException, InterruptedException {
1794        packetWriter.sendStreamElement(AckRequest.INSTANCE);
1795    }
1796
1797    /**
1798     * Send a unconditional Stream Management acknowledgment to the server.
1799     * <p>
1800     * See <a href="http://xmpp.org/extensions/xep-0198.html#acking">XEP-198: Stream Management § 4. Acks</a>:
1801     * "Either party MAY send an &lt;a/&gt; element at any time (e.g., after it has received a certain number of stanzas,
1802     * or after a certain period of time), even if it has not received an &lt;r/&gt; element from the other party."
1803     * </p>
1804     *
1805     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1806     * @throws NotConnectedException if the connection is not connected.
1807     * @throws InterruptedException
1808     */
1809    public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException {
1810        if (!isSmEnabled()) {
1811            throw new StreamManagementException.StreamManagementNotEnabledException();
1812        }
1813        sendSmAcknowledgementInternal();
1814    }
1815
1816    private void sendSmAcknowledgementInternal() throws NotConnectedException, InterruptedException {
1817        packetWriter.sendStreamElement(new AckAnswer(clientHandledStanzasCount));
1818    }
1819
1820    /**
1821     * Add a Stanza acknowledged listener.
1822     * <p>
1823     * Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get
1824     * automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, StanzaListener)} when
1825     * possible.
1826     * </p>
1827     *
1828     * @param listener the listener to add.
1829     */
1830    public void addStanzaAcknowledgedListener(StanzaListener listener) {
1831        stanzaAcknowledgedListeners.add(listener);
1832    }
1833
1834    /**
1835     * Remove the given Stanza acknowledged listener.
1836     *
1837     * @param listener the listener.
1838     * @return true if the listener was removed.
1839     */
1840    public boolean removeStanzaAcknowledgedListener(StanzaListener listener) {
1841        return stanzaAcknowledgedListeners.remove(listener);
1842    }
1843
1844    /**
1845     * Remove all stanza acknowledged listeners.
1846     */
1847    public void removeAllStanzaAcknowledgedListeners() {
1848        stanzaAcknowledgedListeners.clear();
1849    }
1850
1851    /**
1852     * Add a Stanza dropped listener.
1853     * <p>
1854     * Those listeners will be invoked every time a Stanza has been dropped due to a failed SM resume. They will not get
1855     * automatically removed. If at least one StanzaDroppedListener is configured, no attempt will be made to retransmit
1856     * the Stanzas.
1857     * </p>
1858     *
1859     * @param listener the listener to add.
1860     * @since 4.3.3
1861     */
1862    public void addStanzaDroppedListener(StanzaListener listener) {
1863        stanzaDroppedListeners.add(listener);
1864    }
1865
1866    /**
1867     * Remove the given Stanza dropped listener.
1868     *
1869     * @param listener the listener.
1870     * @return true if the listener was removed.
1871     * @since 4.3.3
1872     */
1873    public boolean removeStanzaDroppedListener(StanzaListener listener) {
1874        return stanzaDroppedListeners.remove(listener);
1875    }
1876
1877    /**
1878     * Add a new Stanza ID acknowledged listener for the given ID.
1879     * <p>
1880     * The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will
1881     * automatically be removed after the listener was run.
1882     * </p>
1883     *
1884     * @param id the stanza ID.
1885     * @param listener the listener to invoke.
1886     * @return the previous listener for this stanza ID or null.
1887     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1888     */
1889    @SuppressWarnings("FutureReturnValueIgnored")
1890    public StanzaListener addStanzaIdAcknowledgedListener(final String id, StanzaListener listener) throws StreamManagementNotEnabledException {
1891        // Prevent users from adding callbacks that will never get removed
1892        if (!smWasEnabledAtLeastOnce) {
1893            throw new StreamManagementException.StreamManagementNotEnabledException();
1894        }
1895        // Remove the listener after max. 3 hours
1896        final int removeAfterSeconds = Math.min(getMaxSmResumptionTime(), 3 * 60 * 60);
1897        schedule(new Runnable() {
1898            @Override
1899            public void run() {
1900                stanzaIdAcknowledgedListeners.remove(id);
1901            }
1902        }, removeAfterSeconds, TimeUnit.SECONDS);
1903        return stanzaIdAcknowledgedListeners.put(id, listener);
1904    }
1905
1906    /**
1907     * Remove the Stanza ID acknowledged listener for the given ID.
1908     *
1909     * @param id the stanza ID.
1910     * @return true if the listener was found and removed, false otherwise.
1911     */
1912    public StanzaListener removeStanzaIdAcknowledgedListener(String id) {
1913        return stanzaIdAcknowledgedListeners.remove(id);
1914    }
1915
1916    /**
1917     * Removes all Stanza ID acknowledged listeners.
1918     */
1919    public void removeAllStanzaIdAcknowledgedListeners() {
1920        stanzaIdAcknowledgedListeners.clear();
1921    }
1922
1923    /**
1924     * Returns true if Stream Management is supported by the server.
1925     *
1926     * @return true if Stream Management is supported by the server.
1927     */
1928    public boolean isSmAvailable() {
1929        return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE);
1930    }
1931
1932    /**
1933     * Returns true if Stream Management was successfully negotiated with the server.
1934     *
1935     * @return true if Stream Management was negotiated.
1936     */
1937    public boolean isSmEnabled() {
1938        return smEnabledSyncPoint.wasSuccessful();
1939    }
1940
1941    /**
1942     * Returns true if the stream was successfully resumed with help of Stream Management.
1943     *
1944     * @return true if the stream was resumed.
1945     */
1946    public boolean streamWasResumed() {
1947        return smResumedSyncPoint.wasSuccessful();
1948    }
1949
1950    /**
1951     * Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible.
1952     *
1953     * @return true if disconnected but resumption possible.
1954     */
1955    public boolean isDisconnectedButSmResumptionPossible() {
1956        return disconnectedButResumeable && isSmResumptionPossible();
1957    }
1958
1959    /**
1960     * Returns true if the stream is resumable.
1961     *
1962     * @return true if the stream is resumable.
1963     */
1964    public boolean isSmResumptionPossible() {
1965        // There is no resumable stream available
1966        if (smSessionId == null)
1967            return false;
1968
1969        final Long shutdownTimestamp = packetWriter.shutdownTimestamp;
1970        // Seems like we are already reconnected, report true
1971        if (shutdownTimestamp == null) {
1972            return true;
1973        }
1974
1975        // See if resumption time is over
1976        long current = System.currentTimeMillis();
1977        long maxResumptionMillies = ((long) getMaxSmResumptionTime()) * 1000;
1978        if (current > shutdownTimestamp + maxResumptionMillies) {
1979            // Stream resumption is *not* possible if the current timestamp is greater then the greatest timestamp where
1980            // resumption is possible
1981            return false;
1982        } else {
1983            return true;
1984        }
1985    }
1986
1987    /**
1988     * Drop the stream management state. Sets {@link #smSessionId} and
1989     * {@link #unacknowledgedStanzas} to <code>null</code>.
1990     */
1991    private void dropSmState() {
1992        // clientHandledCount and serverHandledCount will be reset on <enable/> and <enabled/>
1993        // respective. No need to reset them here.
1994        smSessionId = null;
1995        unacknowledgedStanzas = null;
1996    }
1997
1998    /**
1999     * Get the maximum resumption time in seconds after which a managed stream can be resumed.
2000     * <p>
2001     * This method will return {@link Integer#MAX_VALUE} if neither the client nor the server specify a maximum
2002     * resumption time. Be aware of integer overflows when using this value, e.g. do not add arbitrary values to it
2003     * without checking for overflows before.
2004     * </p>
2005     *
2006     * @return the maximum resumption time in seconds or {@link Integer#MAX_VALUE} if none set.
2007     */
2008    public int getMaxSmResumptionTime() {
2009        int clientResumptionTime = smClientMaxResumptionTime > 0 ? smClientMaxResumptionTime : Integer.MAX_VALUE;
2010        int serverResumptionTime = smServerMaxResumptionTime > 0 ? smServerMaxResumptionTime : Integer.MAX_VALUE;
2011        return Math.min(clientResumptionTime, serverResumptionTime);
2012    }
2013
2014    private void processHandledCount(long handledCount) throws StreamManagementCounterError {
2015        long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount);
2016        final List<Stanza> ackedStanzas = new ArrayList<>(
2017                        ackedStanzasCount <= Integer.MAX_VALUE ? (int) ackedStanzasCount
2018                                        : Integer.MAX_VALUE);
2019        for (long i = 0; i < ackedStanzasCount; i++) {
2020            Stanza ackedStanza = unacknowledgedStanzas.poll();
2021            // If the server ack'ed a stanza, then it must be in the
2022            // unacknowledged stanza queue. There can be no exception.
2023            if (ackedStanza == null) {
2024                throw new StreamManagementCounterError(handledCount, serverHandledStanzasCount,
2025                                ackedStanzasCount, ackedStanzas);
2026            }
2027            ackedStanzas.add(ackedStanza);
2028        }
2029
2030        boolean atLeastOneStanzaAcknowledgedListener = false;
2031        if (!stanzaAcknowledgedListeners.isEmpty()) {
2032            // If stanzaAcknowledgedListeners is not empty, the we have at least one
2033            atLeastOneStanzaAcknowledgedListener = true;
2034        }
2035        else {
2036            // Otherwise we look for a matching id in the stanza *id* acknowledged listeners
2037            for (Stanza ackedStanza : ackedStanzas) {
2038                String id = ackedStanza.getStanzaId();
2039                if (id != null && stanzaIdAcknowledgedListeners.containsKey(id)) {
2040                    atLeastOneStanzaAcknowledgedListener = true;
2041                    break;
2042                }
2043            }
2044        }
2045
2046        // Only spawn a new thread if there is a chance that some listener is invoked
2047        if (atLeastOneStanzaAcknowledgedListener) {
2048            asyncGo(new Runnable() {
2049                @Override
2050                public void run() {
2051                    for (Stanza ackedStanza : ackedStanzas) {
2052                        for (StanzaListener listener : stanzaAcknowledgedListeners) {
2053                            try {
2054                                listener.processStanza(ackedStanza);
2055                            }
2056                            catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
2057                                LOGGER.log(Level.FINER, "Received exception", e);
2058                            }
2059                        }
2060                        String id = ackedStanza.getStanzaId();
2061                        if (StringUtils.isNullOrEmpty(id)) {
2062                            continue;
2063                        }
2064                        StanzaListener listener = stanzaIdAcknowledgedListeners.remove(id);
2065                        if (listener != null) {
2066                            try {
2067                                listener.processStanza(ackedStanza);
2068                            }
2069                            catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
2070                                LOGGER.log(Level.FINER, "Received exception", e);
2071                            }
2072                        }
2073                    }
2074                }
2075            });
2076        }
2077
2078        serverHandledStanzasCount = handledCount;
2079    }
2080
2081    /**
2082     * Set the default bundle and defer callback used for new connections.
2083     *
2084     * @param defaultBundleAndDeferCallback
2085     * @see BundleAndDeferCallback
2086     * @since 4.1
2087     */
2088    public static void setDefaultBundleAndDeferCallback(BundleAndDeferCallback defaultBundleAndDeferCallback) {
2089        XMPPTCPConnection.defaultBundleAndDeferCallback = defaultBundleAndDeferCallback;
2090    }
2091
2092    /**
2093     * Set the bundle and defer callback used for this connection.
2094     * <p>
2095     * You can use <code>null</code> as argument to reset the callback. Outgoing stanzas will then
2096     * no longer get deferred.
2097     * </p>
2098     *
2099     * @param bundleAndDeferCallback the callback or <code>null</code>.
2100     * @see BundleAndDeferCallback
2101     * @since 4.1
2102     */
2103    public void setBundleandDeferCallback(BundleAndDeferCallback bundleAndDeferCallback) {
2104        this.bundleAndDeferCallback = bundleAndDeferCallback;
2105    }
2106
2107}