001/**
002 *
003 * Copyright 2003-2007 Jive Software.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack.tcp;
018
019import java.io.BufferedReader;
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.InputStreamReader;
023import java.io.OutputStream;
024import java.io.OutputStreamWriter;
025import java.io.Writer;
026import java.net.InetAddress;
027import java.net.InetSocketAddress;
028import java.net.Socket;
029import java.security.cert.CertificateException;
030import java.util.ArrayList;
031import java.util.Collection;
032import java.util.Iterator;
033import java.util.LinkedHashSet;
034import java.util.LinkedList;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import java.util.concurrent.ArrayBlockingQueue;
039import java.util.concurrent.BlockingQueue;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.concurrent.ConcurrentLinkedQueue;
042import java.util.concurrent.TimeUnit;
043import java.util.concurrent.atomic.AtomicBoolean;
044import java.util.logging.Level;
045import java.util.logging.Logger;
046
047import javax.net.SocketFactory;
048import javax.net.ssl.HostnameVerifier;
049import javax.net.ssl.SSLSession;
050import javax.net.ssl.SSLSocket;
051
052import org.jivesoftware.smack.AbstractXMPPConnection;
053import org.jivesoftware.smack.ConnectionConfiguration;
054import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
055import org.jivesoftware.smack.ConnectionListener;
056import org.jivesoftware.smack.SmackConfiguration;
057import org.jivesoftware.smack.SmackException;
058import org.jivesoftware.smack.SmackException.AlreadyConnectedException;
059import org.jivesoftware.smack.SmackException.AlreadyLoggedInException;
060import org.jivesoftware.smack.SmackException.ConnectionException;
061import org.jivesoftware.smack.SmackException.EndpointConnectionException;
062import org.jivesoftware.smack.SmackException.NotConnectedException;
063import org.jivesoftware.smack.SmackException.NotLoggedInException;
064import org.jivesoftware.smack.SmackException.SecurityNotPossibleException;
065import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException;
066import org.jivesoftware.smack.SmackFuture;
067import org.jivesoftware.smack.StanzaListener;
068import org.jivesoftware.smack.XMPPConnection;
069import org.jivesoftware.smack.XMPPException;
070import org.jivesoftware.smack.XMPPException.FailedNonzaException;
071import org.jivesoftware.smack.XMPPException.StreamErrorException;
072import org.jivesoftware.smack.compress.packet.Compress;
073import org.jivesoftware.smack.compress.packet.Compressed;
074import org.jivesoftware.smack.compression.XMPPInputOutputStream;
075import org.jivesoftware.smack.datatypes.UInt16;
076import org.jivesoftware.smack.filter.StanzaFilter;
077import org.jivesoftware.smack.internal.SmackTlsContext;
078import org.jivesoftware.smack.packet.Element;
079import org.jivesoftware.smack.packet.IQ;
080import org.jivesoftware.smack.packet.Message;
081import org.jivesoftware.smack.packet.Nonza;
082import org.jivesoftware.smack.packet.Presence;
083import org.jivesoftware.smack.packet.Stanza;
084import org.jivesoftware.smack.packet.StartTls;
085import org.jivesoftware.smack.packet.StreamError;
086import org.jivesoftware.smack.proxy.ProxyInfo;
087import org.jivesoftware.smack.sasl.packet.SaslNonza;
088import org.jivesoftware.smack.sm.SMUtils;
089import org.jivesoftware.smack.sm.StreamManagementException;
090import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException;
091import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError;
092import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException;
093import org.jivesoftware.smack.sm.packet.StreamManagement;
094import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer;
095import org.jivesoftware.smack.sm.packet.StreamManagement.AckRequest;
096import org.jivesoftware.smack.sm.packet.StreamManagement.Enable;
097import org.jivesoftware.smack.sm.packet.StreamManagement.Enabled;
098import org.jivesoftware.smack.sm.packet.StreamManagement.Failed;
099import org.jivesoftware.smack.sm.packet.StreamManagement.Resume;
100import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed;
101import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature;
102import org.jivesoftware.smack.sm.predicates.Predicate;
103import org.jivesoftware.smack.sm.provider.ParseStreamManagement;
104import org.jivesoftware.smack.tcp.rce.RemoteXmppTcpConnectionEndpoints;
105import org.jivesoftware.smack.tcp.rce.Rfc6120TcpRemoteConnectionEndpoint;
106import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
107import org.jivesoftware.smack.util.Async;
108import org.jivesoftware.smack.util.CloseableUtil;
109import org.jivesoftware.smack.util.PacketParserUtils;
110import org.jivesoftware.smack.util.StringUtils;
111import org.jivesoftware.smack.util.TLSUtils;
112import org.jivesoftware.smack.util.XmlStringBuilder;
113import org.jivesoftware.smack.util.rce.RemoteConnectionException;
114import org.jivesoftware.smack.xml.SmackXmlParser;
115import org.jivesoftware.smack.xml.XmlPullParser;
116import org.jivesoftware.smack.xml.XmlPullParserException;
117
118import org.jxmpp.jid.impl.JidCreate;
119import org.jxmpp.jid.parts.Resourcepart;
120import org.jxmpp.stringprep.XmppStringprepException;
121import org.minidns.dnsname.DnsName;
122
123/**
124 * Creates a socket connection to an XMPP server. This is the default connection
125 * to an XMPP server and is specified in the XMPP Core (RFC 6120).
126 *
127 * @see XMPPConnection
128 * @author Matt Tucker
129 */
130public class XMPPTCPConnection extends AbstractXMPPConnection {
131
132    private static final int QUEUE_SIZE = 500;
133    private static final Logger LOGGER = Logger.getLogger(XMPPTCPConnection.class.getName());
134
135    /**
136     * The socket which is used for this connection.
137     */
138    private Socket socket;
139
140    /**
141     *
142     */
143    private boolean disconnectedButResumeable = false;
144
145    private SSLSocket secureSocket;
146
147    /**
148     * Protected access level because of unit test purposes
149     */
150    protected final PacketWriter packetWriter = new PacketWriter();
151
152    /**
153     * Protected access level because of unit test purposes
154     */
155    protected final PacketReader packetReader = new PacketReader();
156
157    /**
158     *
159     */
160    private boolean streamFeaturesAfterAuthenticationReceived;
161
162    /**
163     *
164     */
165    private boolean compressSyncPoint;
166
167    /**
168     * The default bundle and defer callback, used for new connections.
169     * @see bundleAndDeferCallback
170     */
171    private static BundleAndDeferCallback defaultBundleAndDeferCallback;
172
173    /**
174     * The used bundle and defer callback.
175     * <p>
176     * Although this field may be set concurrently, the 'volatile' keyword was deliberately not added, in order to avoid
177     * having a 'volatile' read within the writer threads loop.
178     * </p>
179     */
180    private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback;
181
182    private static boolean useSmDefault = true;
183
184    private static boolean useSmResumptionDefault = true;
185
186    /**
187     * The stream ID of the stream that is currently resumable, ie. the stream we hold the state
188     * for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and
189     * {@link #unFailedNonzaExceptionacknowledgedStanzas}.
190     */
191    private String smSessionId;
192
193    /**
194     * Represents the state of stream management resumption.
195     * <p>
196     * Unlike other sync points, this sync point is marked volatile because it is also read by the reader thread.
197     * </p>
198     */
199    private volatile SyncPointState smResumedSyncPoint;
200    private Failed smResumptionFailed;
201
202    /**
203     * Represents the state of stream magement.
204     * <p>
205     * This boolean is marked volatile as it is read by various threads, including the reader thread via {@link #isSmEnabled()}.
206     * </p>
207     */
208    private volatile boolean smEnabledSyncPoint;
209
210    /**
211     * The client's preferred maximum resumption time in seconds.
212     */
213    private int smClientMaxResumptionTime = -1;
214
215    /**
216     * The server's preferred maximum resumption time in seconds.
217     */
218    private int smServerMaxResumptionTime = -1;
219
220    /**
221     * Indicates whether Stream Management (XEP-198) should be used if it's supported by the server.
222     */
223    private boolean useSm = useSmDefault;
224    private boolean useSmResumption = useSmResumptionDefault;
225
226    /**
227     * The counter that the server sends the client about it's current height. For example, if the server sends
228     * {@code <a h='42'/>}, then this will be set to 42 (while also handling the {@link #unacknowledgedStanzas} queue).
229     */
230    private long serverHandledStanzasCount = 0;
231
232    /**
233     * The counter for stanzas handled ("received") by the client.
234     * <p>
235     * Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are
236     * not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since
237     * {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and
238     * therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic.
239     * </p>
240     */
241    private long clientHandledStanzasCount = 0;
242
243    private BlockingQueue<Stanza> unacknowledgedStanzas;
244
245    /**
246     * Set to true if Stream Management was at least once enabled for this connection.
247     */
248    private boolean smWasEnabledAtLeastOnce = false;
249
250    /**
251     * This listeners are invoked for every stanza that got acknowledged.
252     * <p>
253     * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove
254     * themselves after they have been invoked.
255     * </p>
256     */
257    private final Collection<StanzaListener> stanzaAcknowledgedListeners = new ConcurrentLinkedQueue<>();
258
259    /**
260     * These listeners are invoked for every stanza that got dropped.
261     * <p>
262     * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove
263     * themselves after they have been invoked.
264     * </p>
265     */
266    private final Collection<StanzaListener> stanzaDroppedListeners = new ConcurrentLinkedQueue<>();
267
268    /**
269     * This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will
270     * only be invoked once and automatically removed after that.
271     */
272    private final Map<String, StanzaListener> stanzaIdAcknowledgedListeners = new ConcurrentHashMap<>();
273
274    /**
275     * Predicates that determine if an stream management ack should be requested from the server.
276     * <p>
277     * We use a linked hash set here, so that the order how the predicates are added matches the
278     * order in which they are invoked in order to determine if an ack request should be send or not.
279     * </p>
280     */
281    private final Set<StanzaFilter> requestAckPredicates = new LinkedHashSet<>();
282
283    @SuppressWarnings("HidingField")
284    private final XMPPTCPConnectionConfiguration config;
285
286    /**
287     * Creates a new XMPP connection over TCP (optionally using proxies).
288     * <p>
289     * Note that XMPPTCPConnection constructors do not establish a connection to the server
290     * and you must call {@link #connect()}.
291     * </p>
292     *
293     * @param config the connection configuration.
294     */
295    public XMPPTCPConnection(XMPPTCPConnectionConfiguration config) {
296        super(config);
297        this.config = config;
298        addConnectionListener(new ConnectionListener() {
299            @Override
300            public void connectionClosedOnError(Exception e) {
301                if (e instanceof XMPPException.StreamErrorException || e instanceof StreamManagementException) {
302                    dropSmState();
303                }
304            }
305        });
306
307        // Re-init the reader and writer in case of SASL <success/>. This is done to reset the parser since a new stream
308        // is initiated.
309        buildNonzaCallback().listenFor(SaslNonza.Success.class, s -> resetParser()).install();
310    }
311
312    /**
313     * Creates a new XMPP connection over TCP.
314     * <p>
315     * Note that {@code jid} must be the bare JID, e.g. "user@example.org". More fine-grained control over the
316     * connection settings is available using the {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)}
317     * constructor.
318     * </p>
319     *
320     * @param jid the bare JID used by the client.
321     * @param password the password or authentication token.
322     * @throws XmppStringprepException if the provided string is invalid.
323     */
324    public XMPPTCPConnection(CharSequence jid, String password) throws XmppStringprepException {
325        this(XMPPTCPConnectionConfiguration.builder().setXmppAddressAndPassword(jid, password).build());
326    }
327
328    /**
329     * Creates a new XMPP connection over TCP.
330     * <p>
331     * This is the simplest constructor for connecting to an XMPP server. Alternatively,
332     * you can get fine-grained control over connection settings using the
333     * {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} constructor.
334     * </p>
335     * @param username TODO javadoc me please
336     * @param password TODO javadoc me please
337     * @param serviceName TODO javadoc me please
338     * @throws XmppStringprepException if the provided string is invalid.
339     */
340    public XMPPTCPConnection(CharSequence username, String password, String serviceName) throws XmppStringprepException {
341        this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setXmppDomain(
342                                        JidCreate.domainBareFrom(serviceName)).build());
343    }
344
345    @Override
346    protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException {
347        if (packetWriter == null) {
348            throw new NotConnectedException();
349        }
350        packetWriter.throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
351    }
352
353    @Override
354    protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException {
355        if (isConnected() && !disconnectedButResumeable) {
356            throw new AlreadyConnectedException();
357        }
358    }
359
360    @Override
361    protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException {
362        if (isAuthenticated() && !disconnectedButResumeable) {
363            throw new AlreadyLoggedInException();
364        }
365    }
366
367    @Override
368    protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException {
369        // Reset the flag in case it was set
370        disconnectedButResumeable = false;
371        super.afterSuccessfulLogin(resumed);
372    }
373
374    @Override
375    protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException,
376                    SmackException, IOException, InterruptedException {
377        // Authenticate using SASL
378        SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null;
379
380        streamFeaturesAfterAuthenticationReceived = false;
381        authenticate(username, password, config.getAuthzid(), sslSession);
382
383        // Wait for stream features after the authentication.
384        // TODO: The name of this synchronization point "maybeCompressFeaturesReceived" is not perfect. It should be
385        // renamed to "streamFeaturesAfterAuthenticationReceived".
386        waitForConditionOrThrowConnectionException(() -> streamFeaturesAfterAuthenticationReceived, "compress features from server");
387
388        // If compression is enabled then request the server to use stream compression. XEP-170
389        // recommends to perform stream compression before resource binding.
390        maybeEnableCompression();
391
392        smResumedSyncPoint = SyncPointState.initial;
393        smResumptionFailed = null;
394        if (isSmResumptionPossible()) {
395            smResumedSyncPoint = SyncPointState.request_sent;
396            sendNonza(new Resume(clientHandledStanzasCount, smSessionId));
397            waitForConditionOrConnectionException(() -> smResumedSyncPoint == SyncPointState.successful || smResumptionFailed != null, "resume previous stream");
398            if (smResumedSyncPoint == SyncPointState.successful) {
399                // We successfully resumed the stream, be done here
400                afterSuccessfulLogin(true);
401                return;
402            }
403            // SM resumption failed, what Smack does here is to report success of
404            // lastFeaturesReceived in case of sm resumption was answered with 'failed' so that
405            // normal resource binding can be tried.
406            assert smResumptionFailed != null;
407            LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process: " + smResumptionFailed);
408        }
409
410        // We either failed to resume a previous stream management (SM) session, or we did not even try. In any case,
411        // mark SM as not enabled. Most importantly, we do this prior calling bindResourceAndEstablishSession(), as the
412        // bind IQ may trigger a SM ack request, which would be invalid in the pre resource bound state.
413        smEnabledSyncPoint = false;
414
415        List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>();
416        if (unacknowledgedStanzas != null) {
417            // There was a previous connection with SM enabled but that was either not resumable or
418            // failed to resume. Make sure that we (re-)send the unacknowledged stanzas.
419            unacknowledgedStanzas.drainTo(previouslyUnackedStanzas);
420            // Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this
421            // XMPP session (There maybe was an enabled in a previous XMPP session of this
422            // connection instance though). This is used in writePackets to decide if stanzas should
423            // be added to the unacknowledged stanzas queue, because they have to be added right
424            // after the 'enable' stream element has been sent.
425            dropSmState();
426        }
427
428        // Now bind the resource. It is important to do this *after* we dropped an eventually
429        // existing Stream Management state. As otherwise <bind/> and <session/> may end up in
430        // unacknowledgedStanzas and become duplicated on reconnect. See SMACK-706.
431        bindResourceAndEstablishSession(resource);
432
433        if (isSmAvailable() && useSm) {
434            // Remove what is maybe left from previously stream managed sessions
435            serverHandledStanzasCount = 0;
436            sendNonza(new Enable(useSmResumption, smClientMaxResumptionTime));
437            // XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed'
438            // then this is a non recoverable error and we therefore throw an exception.
439            waitForConditionOrThrowConnectionException(() -> smEnabledSyncPoint, "enabling stream mangement");
440            synchronized (requestAckPredicates) {
441                if (requestAckPredicates.isEmpty()) {
442                    // Assure that we have at lest one predicate set up that so that we request acks
443                    // for the server and eventually flush some stanzas from the unacknowledged
444                    // stanza queue
445                    requestAckPredicates.add(Predicate.forMessagesOrAfter5Stanzas());
446                }
447            }
448        }
449        // Inform client about failed resumption if possible, resend stanzas otherwise
450        // Process the stanzas synchronously so a client can re-queue them for transmission
451        // before it is informed about connection success
452        if (!stanzaDroppedListeners.isEmpty()) {
453            for (Stanza stanza : previouslyUnackedStanzas) {
454                for (StanzaListener listener : stanzaDroppedListeners) {
455                    try {
456                        listener.processStanza(stanza);
457                    }
458                    catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
459                        LOGGER.log(Level.FINER, "StanzaDroppedListener received exception", e);
460                    }
461                }
462            }
463        } else {
464            for (Stanza stanza : previouslyUnackedStanzas) {
465                sendStanzaInternal(stanza);
466            }
467        }
468
469        afterSuccessfulLogin(false);
470    }
471
472    @Override
473    public boolean isSecureConnection() {
474        return secureSocket != null;
475    }
476
477    /**
478     * Shuts the current connection down. After this method returns, the connection must be ready
479     * for re-use by connect.
480     */
481    @Override
482    protected void shutdown() {
483        if (isSmEnabled()) {
484            try {
485                // Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM
486                // state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either.
487                sendSmAcknowledgementInternal();
488            } catch (InterruptedException | NotConnectedException e) {
489                LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e);
490            }
491        }
492        shutdown(false);
493    }
494
495    @Override
496    public synchronized void instantShutdown() {
497        shutdown(true);
498    }
499
500    private void shutdown(boolean instant) {
501        // The writer thread may already been finished at this point, for example when the connection is in the
502        // disconnected-but-resumable state. There is no need to wait for the closing stream tag from the server in this
503        // case.
504        if (!packetWriter.done()) {
505            // First shutdown the writer, this will result in a closing stream element getting send to
506            // the server
507            LOGGER.finer(packetWriter.threadName + " shutdown()");
508            packetWriter.shutdown(instant);
509            LOGGER.finer(packetWriter.threadName + " shutdown() returned");
510
511            if (!instant) {
512                waitForClosingStreamTagFromServer();
513            }
514        }
515
516        LOGGER.finer(packetReader.threadName + " shutdown()");
517        packetReader.shutdown();
518        LOGGER.finer(packetReader.threadName + " shutdown() returned");
519
520        CloseableUtil.maybeClose(socket, LOGGER);
521
522        setWasAuthenticated();
523
524        try {
525            boolean readerAndWriterThreadsTermianted = waitFor(() -> !packetWriter.running && !packetReader.running);
526            if (!readerAndWriterThreadsTermianted) {
527                LOGGER.severe("Reader and/or writer threads did not terminate timely. Writer running: "
528                                + packetWriter.running + ", Reader running: " + packetReader.running);
529            } else {
530                LOGGER.fine("Reader and writer threads terminated");
531            }
532        } catch (InterruptedException e) {
533            LOGGER.log(Level.FINE, "Interrupted while waiting for reader and writer threads to terminate", e);
534        }
535
536        if (disconnectedButResumeable) {
537            return;
538        }
539
540        // If we are able to resume the stream, then don't set
541        // connected/authenticated/usingTLS to false since we like behave like we are still
542        // connected (e.g. sendStanza should not throw a NotConnectedException).
543        if (instant) {
544            disconnectedButResumeable = isSmResumptionPossible();
545            if (!disconnectedButResumeable) {
546                // Reset the stream management session id to null, since the stream is no longer resumable. Note that we
547                // keep the unacknowledgedStanzas queue, because we want to resend them when we are reconnected.
548                smSessionId = null;
549            }
550        } else {
551            disconnectedButResumeable = false;
552
553            // Drop the stream management state if this is not an instant shutdown. We send
554            // a </stream> close tag and now the stream management state is no longer valid.
555            // This also prevents that we will potentially (re-)send any unavailable presence we
556            // may have send, because it got put into the unacknowledged queue and was not acknowledged before the
557            // connection terminated.
558            dropSmState();
559            // Note that we deliberately do not reset authenticatedConnectionInitiallyEstablishedTimestamp here, so that the
560            // information is available in the connectionClosedOnError() listeners.
561        }
562        authenticated = false;
563        connected = false;
564        secureSocket = null;
565        reader = null;
566        writer = null;
567
568        initState();
569    }
570
571    @Override
572    public void sendNonza(Nonza element) throws NotConnectedException, InterruptedException {
573        packetWriter.sendStreamElement(element);
574    }
575
576    @Override
577    protected void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException {
578        packetWriter.sendStreamElement(packet);
579        if (isSmEnabled()) {
580            for (StanzaFilter requestAckPredicate : requestAckPredicates) {
581                if (requestAckPredicate.accept(packet)) {
582                    requestSmAcknowledgementInternal();
583                    break;
584                }
585            }
586        }
587    }
588
589    private void connectUsingConfiguration() throws ConnectionException, IOException, InterruptedException {
590        RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result = RemoteXmppTcpConnectionEndpoints.lookup(config);
591
592        List<RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint>> connectionExceptions = new ArrayList<>();
593
594        SocketFactory socketFactory = config.getSocketFactory();
595        ProxyInfo proxyInfo = config.getProxyInfo();
596        int timeout = config.getConnectTimeout();
597        if (socketFactory == null) {
598            socketFactory = SocketFactory.getDefault();
599        }
600        for (Rfc6120TcpRemoteConnectionEndpoint endpoint : result.discoveredRemoteConnectionEndpoints) {
601            Iterator<? extends InetAddress> inetAddresses;
602            String host = endpoint.getHost().toString();
603            UInt16 portUint16 = endpoint.getPort();
604            int port = portUint16.intValue();
605            if (proxyInfo == null) {
606                inetAddresses = endpoint.getInetAddresses().iterator();
607                assert inetAddresses.hasNext();
608
609                innerloop: while (inetAddresses.hasNext()) {
610                    // Create a *new* Socket before every connection attempt, i.e. connect() call, since Sockets are not
611                    // re-usable after a failed connection attempt. See also SMACK-724.
612                    SmackFuture.SocketFuture socketFuture = new SmackFuture.SocketFuture(socketFactory);
613
614                    final InetAddress inetAddress = inetAddresses.next();
615                    final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, port);
616                    LOGGER.finer("Trying to establish TCP connection to " + inetSocketAddress);
617                    socketFuture.connectAsync(inetSocketAddress, timeout);
618
619                    try {
620                        socket = socketFuture.getOrThrow();
621                    } catch (IOException e) {
622                        RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>(
623                                        endpoint, inetAddress, e);
624                        connectionExceptions.add(rce);
625                        if (inetAddresses.hasNext()) {
626                            continue innerloop;
627                        } else {
628                            break innerloop;
629                        }
630                    }
631                    LOGGER.finer("Established TCP connection to " + inetSocketAddress);
632                    // We found a host to connect to, return here
633                    this.host = host;
634                    this.port = portUint16;
635                    return;
636                }
637            } else {
638                // TODO: Move this into the inner-loop above. There appears no reason why we should not try a proxy
639                // connection to every inet address of each connection endpoint.
640                socket = socketFactory.createSocket();
641                StringUtils.requireNotNullNorEmpty(host, "Host of endpoint " + endpoint + " must not be null when using a Proxy");
642                final String hostAndPort = host + " at port " + port;
643                LOGGER.finer("Trying to establish TCP connection via Proxy to " + hostAndPort);
644                try {
645                    proxyInfo.getProxySocketConnection().connect(socket, host, port, timeout);
646                } catch (IOException e) {
647                    CloseableUtil.maybeClose(socket, LOGGER);
648                    RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>(endpoint, null, e);
649                    connectionExceptions.add(rce);
650                    continue;
651                }
652                LOGGER.finer("Established TCP connection to " + hostAndPort);
653                // We found a host to connect to, return here
654                this.host = host;
655                this.port = portUint16;
656                return;
657            }
658        }
659
660        // There are no more host addresses to try
661        // throw an exception and report all tried
662        // HostAddresses in the exception
663        throw EndpointConnectionException.from(result.lookupFailures, connectionExceptions);
664    }
665
666    /**
667     * Initializes the connection by creating a stanza reader and writer and opening a
668     * XMPP stream to the server.
669     *
670     * @throws XMPPException if establishing a connection to the server fails.
671     * @throws SmackException if the server fails to respond back or if there is anther error.
672     * @throws IOException if an I/O error occurred.
673     * @throws InterruptedException if the calling thread was interrupted.
674     */
675    private void initConnection() throws IOException, InterruptedException {
676        compressionHandler = null;
677
678        // Set the reader and writer instance variables
679        initReaderAndWriter();
680
681        // Start the writer thread. This will open an XMPP stream to the server
682        packetWriter.init();
683        // Start the reader thread. The startup() method will block until we
684        // get an opening stream packet back from server
685        packetReader.init();
686    }
687
688    private void initReaderAndWriter() throws IOException {
689        InputStream is = socket.getInputStream();
690        OutputStream os = socket.getOutputStream();
691        if (compressionHandler != null) {
692            is = compressionHandler.getInputStream(is);
693            os = compressionHandler.getOutputStream(os);
694        }
695        // OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter
696        writer = new OutputStreamWriter(os, "UTF-8");
697        reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
698
699        // If debugging is enabled, we open a window and write out all network traffic.
700        initDebugger();
701    }
702
703    /**
704     * The server has indicated that TLS negotiation can start. We now need to secure the
705     * existing plain connection and perform a handshake. This method won't return until the
706     * connection has finished the handshake or an error occurred while securing the connection.
707     * @throws IOException if an I/O error occurred.
708     * @throws SecurityNotPossibleException if TLS is not possible.
709     * @throws CertificateException if there is an issue with the certificate.
710     */
711    @SuppressWarnings("LiteralClassName")
712    private void proceedTLSReceived() throws IOException, SecurityNotPossibleException, CertificateException {
713        SmackTlsContext smackTlsContext = getSmackTlsContext();
714
715        Socket plain = socket;
716        // Secure the plain connection
717        socket = smackTlsContext.sslContext.getSocketFactory().createSocket(plain,
718                config.getXMPPServiceDomain().toString(), plain.getPort(), true);
719
720        final SSLSocket sslSocket = (SSLSocket) socket;
721        // Immediately set the enabled SSL protocols and ciphers. See SMACK-712 why this is
722        // important (at least on certain platforms) and it seems to be a good idea anyways to
723        // prevent an accidental implicit handshake.
724        TLSUtils.setEnabledProtocolsAndCiphers(sslSocket, config.getEnabledSSLProtocols(), config.getEnabledSSLCiphers());
725
726        // Initialize the reader and writer with the new secured version
727        initReaderAndWriter();
728
729        // Proceed to do the handshake
730        sslSocket.startHandshake();
731
732        if (smackTlsContext.daneVerifier != null) {
733            smackTlsContext.daneVerifier.finish(sslSocket.getSession());
734        }
735
736        final HostnameVerifier verifier = getConfiguration().getHostnameVerifier();
737        if (verifier == null) {
738                throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure.");
739        }
740
741        final String verifierHostname;
742        {
743            DnsName xmppServiceDomainDnsName = getConfiguration().getXmppServiceDomainAsDnsNameIfPossible();
744            // Try to convert the XMPP service domain, which potentially includes Unicode characters, into ASCII
745            // Compatible Encoding (ACE) to match RFC3280 dNSname IA5String constraint.
746            // See also: https://bugzilla.mozilla.org/show_bug.cgi?id=280839#c1
747            if (xmppServiceDomainDnsName != null) {
748                verifierHostname = xmppServiceDomainDnsName.ace;
749            }
750            else {
751                LOGGER.log(Level.WARNING, "XMPP service domain name '" + getXMPPServiceDomain()
752                                + "' can not be represented as DNS name. TLS X.509 certificate validiation may fail.");
753                verifierHostname = getXMPPServiceDomain().toString();
754            }
755        }
756
757        final boolean verificationSuccessful;
758        // Verify the TLS session.
759        verificationSuccessful = verifier.verify(verifierHostname, sslSocket.getSession());
760        if (!verificationSuccessful) {
761            throw new CertificateException(
762                            "Hostname verification of certificate failed. Certificate does not authenticate "
763                                            + getXMPPServiceDomain());
764        }
765
766        // Set that TLS was successful
767        secureSocket = sslSocket;
768    }
769
770    /**
771     * Returns the compression handler that can be used for one compression methods offered by the server.
772     *
773     * @return a instance of XMPPInputOutputStream or null if no suitable instance was found
774     *
775     */
776    private static XMPPInputOutputStream maybeGetCompressionHandler(Compress.Feature compression) {
777        for (XMPPInputOutputStream handler : SmackConfiguration.getCompressionHandlers()) {
778                String method = handler.getCompressionMethod();
779                if (compression.getMethods().contains(method))
780                    return handler;
781        }
782        return null;
783    }
784
785    @Override
786    public boolean isUsingCompression() {
787        return compressionHandler != null && compressSyncPoint;
788    }
789
790    /**
791     * <p>
792     * Starts using stream compression that will compress network traffic. Traffic can be
793     * reduced up to 90%. Therefore, stream compression is ideal when using a slow speed network
794     * connection. However, the server and the client will need to use more CPU time in order to
795     * un/compress network data so under high load the server performance might be affected.
796     * </p>
797     * <p>
798     * Stream compression has to have been previously offered by the server. Currently only the
799     * zlib method is supported by the client. Stream compression negotiation has to be done
800     * before authentication took place.
801     * </p>
802     *
803     * @throws NotConnectedException if the XMPP connection is not connected.
804     * @throws SmackException if Smack detected an exceptional situation.
805     * @throws InterruptedException if the calling thread was interrupted.
806     * @throws XMPPException if an XMPP protocol error was received.
807     */
808    private void maybeEnableCompression() throws SmackException, InterruptedException, XMPPException {
809        if (!config.isCompressionEnabled()) {
810            return;
811        }
812
813        Compress.Feature compression = getFeature(Compress.Feature.class);
814        if (compression == null) {
815            // Server does not support compression
816            return;
817        }
818        // If stream compression was offered by the server and we want to use
819        // compression then send compression request to the server
820        if ((compressionHandler = maybeGetCompressionHandler(compression)) != null) {
821            compressSyncPoint = false;
822            sendNonza(new Compress(compressionHandler.getCompressionMethod()));
823            waitForConditionOrThrowConnectionException(() -> compressSyncPoint, "establishing stream compression");
824        } else {
825            LOGGER.warning("Could not enable compression because no matching handler/method pair was found");
826        }
827    }
828
829    /**
830     * Establishes a connection to the XMPP server. It basically
831     * creates and maintains a socket connection to the server.
832     * <p>
833     * Listeners will be preserved from a previous connection if the reconnection
834     * occurs after an abrupt termination.
835     * </p>
836     *
837     * @throws XMPPException if an error occurs while trying to establish the connection.
838     * @throws SmackException if Smack detected an exceptional situation.
839     * @throws IOException if an I/O error occurred.
840     * @throws InterruptedException if the calling thread was interrupted.
841     */
842    @Override
843    protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException {
844        // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if
845        // there is an error establishing the connection
846        connectUsingConfiguration();
847
848        connected = true;
849
850        // We connected successfully to the servers TCP port
851        initConnection();
852
853        // TLS handled will be true either if TLS was established, or if it was not mandatory.
854        waitForConditionOrThrowConnectionException(() -> tlsHandled, "establishing TLS");
855
856        // Wait with SASL auth until the SASL mechanisms have been received
857        waitForConditionOrThrowConnectionException(() -> saslFeatureReceived, "SASL mechanisms stream feature from server");
858    }
859
860    /**
861     * For unit testing purposes
862     *
863     * @param writer TODO javadoc me please
864     */
865    protected void setWriter(Writer writer) {
866        this.writer = writer;
867    }
868
869    @Override
870    protected void afterFeaturesReceived() throws NotConnectedException, InterruptedException, SecurityRequiredByServerException {
871        StartTls startTlsFeature = getFeature(StartTls.class);
872        if (startTlsFeature != null) {
873            if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) {
874                SecurityRequiredByServerException smackException = new SecurityRequiredByServerException();
875                currentSmackException = smackException;
876                notifyWaitingThreads();
877                throw smackException;
878            }
879
880            if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) {
881                sendNonza(new StartTls());
882            } else {
883                tlsHandled = true;
884                notifyWaitingThreads();
885            }
886        } else {
887            tlsHandled = true;
888            notifyWaitingThreads();
889        }
890
891        if (isSaslAuthenticated()) {
892            // If we have received features after the SASL has been successfully completed, then we
893            // have also *maybe* received, as it is an optional feature, the compression feature
894            // from the server.
895            streamFeaturesAfterAuthenticationReceived = true;
896            notifyWaitingThreads();
897        }
898    }
899
900    private void resetParser() throws IOException {
901        try {
902            packetReader.parser = SmackXmlParser.newXmlParser(reader);
903        } catch (XmlPullParserException e) {
904            throw new IOException(e);
905        }
906   }
907
908    private void openStreamAndResetParser() throws IOException, NotConnectedException, InterruptedException {
909        sendStreamOpen();
910        resetParser();
911    }
912
913    protected class PacketReader {
914
915        private final String threadName = "Smack Reader (" + getConnectionCounter() + ')';
916
917        XmlPullParser parser;
918
919        private volatile boolean done;
920
921        private boolean running;
922
923        /**
924         * Initializes the reader in order to be used. The reader is initialized during the
925         * first connection and when reconnecting due to an abruptly disconnection.
926         */
927        void init() {
928            done = false;
929
930            running = true;
931            Async.go(new Runnable() {
932                @Override
933                public void run() {
934                    LOGGER.finer(threadName + " start");
935                    try {
936                        parsePackets();
937                    } finally {
938                        LOGGER.finer(threadName + " exit");
939                        running = false;
940                        notifyWaitingThreads();
941                    }
942                }
943            }, threadName);
944         }
945
946        /**
947         * Shuts the stanza reader down. This method simply sets the 'done' flag to true.
948         */
949        void shutdown() {
950            done = true;
951        }
952
953        /**
954         * Parse top-level packets in order to process them further.
955         */
956        private void parsePackets() {
957            try {
958                openStreamAndResetParser();
959                XmlPullParser.Event eventType = parser.getEventType();
960                while (!done) {
961                    switch (eventType) {
962                    case START_ELEMENT:
963                        final String name = parser.getName();
964                        switch (name) {
965                        case Message.ELEMENT:
966                        case IQ.IQ_ELEMENT:
967                        case Presence.ELEMENT:
968                            try {
969                                parseAndProcessStanza(parser);
970                            } finally {
971                                clientHandledStanzasCount = SMUtils.incrementHeight(clientHandledStanzasCount);
972                            }
973                            break;
974                        case "stream":
975                            onStreamOpen(parser);
976                            break;
977                        case "error":
978                            StreamError streamError = PacketParserUtils.parseStreamError(parser);
979                            // Stream errors are non recoverable, throw this exceptions. Also note that this will set
980                            // this exception as current connection exceptions and notify any waiting threads.
981                            throw new StreamErrorException(streamError);
982                        case "features":
983                            parseFeaturesAndNotify(parser);
984                            break;
985                        case "proceed":
986                            // Secure the connection by negotiating TLS
987                            proceedTLSReceived();
988                            // Send a new opening stream to the server
989                            openStreamAndResetParser();
990                            break;
991                        case "failure":
992                            String namespace = parser.getNamespace(null);
993                            switch (namespace) {
994                            case "urn:ietf:params:xml:ns:xmpp-tls":
995                                // TLS negotiation has failed. The server will close the connection
996                                // TODO Parse failure stanza
997                                throw new SmackException.SmackMessageException("TLS negotiation has failed");
998                            case "http://jabber.org/protocol/compress":
999                                // Stream compression has been denied. This is a recoverable
1000                                // situation. It is still possible to authenticate and
1001                                // use the connection but using an uncompressed connection
1002                                // TODO Parse failure stanza
1003                                currentSmackException = new SmackException.SmackMessageException("Could not establish compression");
1004                                notifyWaitingThreads();
1005                                break;
1006                            default:
1007                                parseAndProcessNonza(parser);
1008                            }
1009                            break;
1010                        case Compressed.ELEMENT:
1011                            // Server confirmed that it's possible to use stream compression. Start
1012                            // stream compression
1013                            // Initialize the reader and writer with the new compressed version
1014                            initReaderAndWriter();
1015                            // Send a new opening stream to the server
1016                            openStreamAndResetParser();
1017                            // Notify that compression is being used
1018                            compressSyncPoint = true;
1019                            notifyWaitingThreads();
1020                            break;
1021                        case Enabled.ELEMENT:
1022                            Enabled enabled = ParseStreamManagement.enabled(parser);
1023                            if (enabled.isResumeSet()) {
1024                                smSessionId = enabled.getId();
1025                                if (StringUtils.isNullOrEmpty(smSessionId)) {
1026                                    SmackException xmppException = new SmackException.SmackMessageException("Stream Management 'enabled' element with resume attribute but without session id received");
1027                                    setCurrentConnectionExceptionAndNotify(xmppException);
1028                                    throw xmppException;
1029                                }
1030                                smServerMaxResumptionTime = enabled.getMaxResumptionTime();
1031                            } else {
1032                                // Mark this a non-resumable stream by setting smSessionId to null
1033                                smSessionId = null;
1034                            }
1035                            clientHandledStanzasCount = 0;
1036                            smWasEnabledAtLeastOnce = true;
1037                            smEnabledSyncPoint = true;
1038                            notifyWaitingThreads();
1039                            break;
1040                        case Failed.ELEMENT:
1041                            Failed failed = ParseStreamManagement.failed(parser);
1042                            if (smResumedSyncPoint == SyncPointState.request_sent) {
1043                                // This is a <failed/> nonza in a response to resuming a previous stream, failure to do
1044                                // so is non-fatal as we can simply continue with resource binding in this case.
1045                                smResumptionFailed = failed;
1046                                notifyWaitingThreads();
1047                            } else {
1048                                FailedNonzaException xmppException = new FailedNonzaException(failed, failed.getStanzaErrorCondition());
1049                                setCurrentConnectionExceptionAndNotify(xmppException);
1050                            }
1051                            break;
1052                        case Resumed.ELEMENT:
1053                            Resumed resumed = ParseStreamManagement.resumed(parser);
1054                            if (!smSessionId.equals(resumed.getPrevId())) {
1055                                throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId());
1056                            }
1057                            // Mark SM as enabled
1058                            smEnabledSyncPoint = true;
1059                            // First, drop the stanzas already handled by the server
1060                            processHandledCount(resumed.getHandledCount());
1061                            // Then re-send what is left in the unacknowledged queue
1062                            List<Stanza> stanzasToResend = new ArrayList<>(unacknowledgedStanzas.size());
1063                            unacknowledgedStanzas.drainTo(stanzasToResend);
1064                            for (Stanza stanza : stanzasToResend) {
1065                                sendStanzaInternal(stanza);
1066                            }
1067                            // If there where stanzas resent, then request a SM ack for them.
1068                            // Writer's sendStreamElement() won't do it automatically based on
1069                            // predicates.
1070                            if (!stanzasToResend.isEmpty()) {
1071                                requestSmAcknowledgementInternal();
1072                            }
1073                            // Mark SM resumption as successful
1074                            smResumedSyncPoint = SyncPointState.successful;
1075                            notifyWaitingThreads();
1076                            break;
1077                        case AckAnswer.ELEMENT:
1078                            AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser);
1079                            processHandledCount(ackAnswer.getHandledCount());
1080                            break;
1081                        case AckRequest.ELEMENT:
1082                            ParseStreamManagement.ackRequest(parser);
1083                            if (smEnabledSyncPoint) {
1084                                sendSmAcknowledgementInternal();
1085                            } else {
1086                                LOGGER.warning("SM Ack Request received while SM is not enabled");
1087                            }
1088                            break;
1089                         default:
1090                             parseAndProcessNonza(parser);
1091                             break;
1092                        }
1093                        break;
1094                    case END_ELEMENT:
1095                        final String endTagName = parser.getName();
1096                        if ("stream".equals(endTagName)) {
1097                            if (!parser.getNamespace().equals("http://etherx.jabber.org/streams")) {
1098                                LOGGER.warning(XMPPTCPConnection.this +  " </stream> but different namespace " + parser.getNamespace());
1099                                break;
1100                            }
1101
1102                            // Check if the queue was already shut down before reporting success on closing stream tag
1103                            // received. This avoids a race if there is a disconnect(), followed by a connect(), which
1104                            // did re-start the queue again, causing this writer to assume that the queue is not
1105                            // shutdown, which results in a call to disconnect().
1106                            final boolean queueWasShutdown = packetWriter.queue.isShutdown();
1107                            closingStreamReceived = true;
1108                            notifyWaitingThreads();
1109
1110                            if (queueWasShutdown) {
1111                                // We received a closing stream element *after* we initiated the
1112                                // termination of the session by sending a closing stream element to
1113                                // the server first
1114                                return;
1115                            } else {
1116                                // We received a closing stream element from the server without us
1117                                // sending a closing stream element first. This means that the
1118                                // server wants to terminate the session, therefore disconnect
1119                                // the connection
1120                                LOGGER.info(XMPPTCPConnection.this
1121                                                + " received closing </stream> element."
1122                                                + " Server wants to terminate the connection, calling disconnect()");
1123                                ASYNC_BUT_ORDERED.performAsyncButOrdered(XMPPTCPConnection.this, new Runnable() {
1124                                    @Override
1125                                    public void run() {
1126                                        disconnect();
1127                                    }});
1128                            }
1129                        }
1130                        break;
1131                    case END_DOCUMENT:
1132                        // END_DOCUMENT only happens in an error case, as otherwise we would see a
1133                        // closing stream element before.
1134                        throw new SmackException.SmackMessageException(
1135                                        "Parser got END_DOCUMENT event. This could happen e.g. if the server closed the connection without sending a closing stream element");
1136                    default:
1137                        // Catch all for incomplete switch (MissingCasesInEnumSwitch) statement.
1138                        break;
1139                    }
1140                    eventType = parser.next();
1141                }
1142            }
1143            catch (Exception e) {
1144                // Set running to false since this thread will exit here and notifyConnectionError() will wait until
1145                // the reader and writer thread's 'running' value is false. Hence we need to set it to false before calling
1146                // notifyConnetctionError() below, even though run() also sets it to false. Therefore, do not remove this.
1147                running = false;
1148
1149                String ignoreReasonThread = null;
1150
1151                boolean writerThreadWasShutDown = packetWriter.queue.isShutdown();
1152                if (writerThreadWasShutDown) {
1153                    ignoreReasonThread = "writer";
1154                } else if (done) {
1155                    ignoreReasonThread = "reader";
1156                }
1157
1158                if (ignoreReasonThread != null) {
1159                    LOGGER.log(Level.FINER, "Ignoring " + e + " as " + ignoreReasonThread + " was already shut down");
1160                    return;
1161                }
1162
1163                // Close the connection and notify connection listeners of the error.
1164                notifyConnectionError(e);
1165            }
1166        }
1167    }
1168
1169    protected class PacketWriter {
1170        public static final int QUEUE_SIZE = XMPPTCPConnection.QUEUE_SIZE;
1171        public static final int UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE = 1024;
1172        public static final int UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE_HIGH_WATER_MARK = (int) (0.3 * UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE);
1173
1174        private final String threadName = "Smack Writer (" + getConnectionCounter() + ')';
1175
1176        private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<>(
1177                        QUEUE_SIZE, true);
1178
1179        /**
1180         * If set, the stanza writer is shut down
1181         */
1182        protected volatile Long shutdownTimestamp = null;
1183
1184        private volatile boolean instantShutdown;
1185
1186        /**
1187         * True if some preconditions are given to start the bundle and defer mechanism.
1188         * <p>
1189         * This will likely get set to true right after the start of the writer thread, because
1190         * {@link #nextStreamElement()} will check if {@link queue} is empty, which is probably the case, and then set
1191         * this field to true.
1192         * </p>
1193         */
1194        private boolean shouldBundleAndDefer;
1195
1196        private boolean running;
1197
1198        /**
1199        * Initializes the writer in order to be used. It is called at the first connection and also
1200        * is invoked if the connection is disconnected by an error.
1201        */
1202        void init() {
1203            shutdownTimestamp = null;
1204
1205            if (unacknowledgedStanzas != null) {
1206                // It's possible that there are new stanzas in the writer queue that
1207                // came in while we were disconnected but resumable, drain those into
1208                // the unacknowledged queue so that they get resent now
1209                drainWriterQueueToUnacknowledgedStanzas();
1210            }
1211
1212            queue.start();
1213            running = true;
1214            Async.go(new Runnable() {
1215                @Override
1216                public void run() {
1217                    LOGGER.finer(threadName + " start");
1218                    try {
1219                        writePackets();
1220                    } finally {
1221                        LOGGER.finer(threadName + " exit");
1222                        running = false;
1223                        notifyWaitingThreads();
1224                    }
1225                }
1226            }, threadName);
1227        }
1228
1229        private boolean done() {
1230            return shutdownTimestamp != null;
1231        }
1232
1233        protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException {
1234            final boolean done = done();
1235            if (done) {
1236                final boolean smResumptionPossible = isSmResumptionPossible();
1237                // Don't throw a NotConnectedException is there is an resumable stream available
1238                if (!smResumptionPossible) {
1239                    throw new NotConnectedException(XMPPTCPConnection.this, "done=" + done
1240                                    + " smResumptionPossible=" + smResumptionPossible);
1241                }
1242            }
1243        }
1244
1245        /**
1246         * Sends the specified element to the server.
1247         *
1248         * @param element the element to send.
1249         * @throws NotConnectedException if the XMPP connection is not connected.
1250         * @throws InterruptedException if the calling thread was interrupted.
1251         */
1252        protected void sendStreamElement(Element element) throws NotConnectedException, InterruptedException {
1253            throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
1254            try {
1255                queue.put(element);
1256            }
1257            catch (InterruptedException e) {
1258                // put() may throw an InterruptedException for two reasons:
1259                // 1. If the queue was shut down
1260                // 2. If the thread was interrupted
1261                // so we have to check which is the case
1262                throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
1263                // If the method above did not throw, then the sending thread was interrupted
1264                throw e;
1265            }
1266        }
1267
1268        /**
1269         * Shuts down the stanza writer. Once this method has been called, no further
1270         * packets will be written to the server.
1271         */
1272        void shutdown(boolean instant) {
1273            instantShutdown = instant;
1274            queue.shutdown();
1275            shutdownTimestamp = System.currentTimeMillis();
1276        }
1277
1278        /**
1279         * Maybe return the next available element from the queue for writing. If the queue is shut down <b>or</b> a
1280         * spurious interrupt occurs, <code>null</code> is returned. So it is important to check the 'done' condition in
1281         * that case.
1282         *
1283         * @return the next element for writing or null.
1284         */
1285        private Element nextStreamElement() {
1286            // It is important the we check if the queue is empty before removing an element from it
1287            if (queue.isEmpty()) {
1288                shouldBundleAndDefer = true;
1289            }
1290            Element packet = null;
1291            try {
1292                packet = queue.take();
1293            }
1294            catch (InterruptedException e) {
1295                if (!queue.isShutdown()) {
1296                    // Users shouldn't try to interrupt the packet writer thread
1297                    LOGGER.log(Level.WARNING, "Writer thread was interrupted. Don't do that. Use disconnect() instead.", e);
1298                }
1299            }
1300            return packet;
1301        }
1302
1303        private void writePackets() {
1304            try {
1305                // Write out packets from the queue.
1306                while (!done()) {
1307                    Element element = nextStreamElement();
1308                    if (element == null) {
1309                        continue;
1310                    }
1311
1312                    // Get a local version of the bundle and defer callback, in case it's unset
1313                    // between the null check and the method invocation
1314                    final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback;
1315                    // If the preconditions are given (e.g. bundleAndDefer callback is set, queue is
1316                    // empty), then we could wait a bit for further stanzas attempting to decrease
1317                    // our energy consumption
1318                    if (localBundleAndDeferCallback != null && isAuthenticated() && shouldBundleAndDefer) {
1319                        // Reset shouldBundleAndDefer to false, nextStreamElement() will set it to true once the
1320                        // queue is empty again.
1321                        shouldBundleAndDefer = false;
1322                        final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean();
1323                        final int bundleAndDeferMillis = localBundleAndDeferCallback.getBundleAndDeferMillis(new BundleAndDefer(
1324                                        bundlingAndDeferringStopped));
1325                        if (bundleAndDeferMillis > 0) {
1326                            long remainingWait = bundleAndDeferMillis;
1327                            final long waitStart = System.currentTimeMillis();
1328                            synchronized (bundlingAndDeferringStopped) {
1329                                while (!bundlingAndDeferringStopped.get() && remainingWait > 0) {
1330                                    bundlingAndDeferringStopped.wait(remainingWait);
1331                                    remainingWait = bundleAndDeferMillis
1332                                                    - (System.currentTimeMillis() - waitStart);
1333                                }
1334                            }
1335                        }
1336                    }
1337
1338                    Stanza packet = null;
1339                    if (element instanceof Stanza) {
1340                        packet = (Stanza) element;
1341                    }
1342                    else if (element instanceof Enable) {
1343                        // The client needs to add messages to the unacknowledged stanzas queue
1344                        // right after it sent 'enabled'. Stanza will be added once
1345                        // unacknowledgedStanzas is not null.
1346                        unacknowledgedStanzas = new ArrayBlockingQueue<>(UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE);
1347                    }
1348                    maybeAddToUnacknowledgedStanzas(packet);
1349
1350                    CharSequence elementXml = element.toXML(outgoingStreamXmlEnvironment);
1351                    if (elementXml instanceof XmlStringBuilder) {
1352                        try {
1353                            ((XmlStringBuilder) elementXml).write(writer, outgoingStreamXmlEnvironment);
1354                        } catch (NullPointerException npe) {
1355                            LOGGER.log(Level.FINE, "NPE in XmlStringBuilder of " + element.getClass() + ": " + element, npe);
1356                            throw npe;
1357                        }
1358                    }
1359                    else {
1360                        writer.write(elementXml.toString());
1361                    }
1362
1363                    if (queue.isEmpty()) {
1364                        writer.flush();
1365                    }
1366                    if (packet != null) {
1367                        firePacketSendingListeners(packet);
1368                    }
1369                }
1370                if (!instantShutdown) {
1371                    // Flush out the rest of the queue.
1372                    try {
1373                        while (!queue.isEmpty()) {
1374                            Element packet = queue.remove();
1375                            if (packet instanceof Stanza) {
1376                                Stanza stanza = (Stanza) packet;
1377                                maybeAddToUnacknowledgedStanzas(stanza);
1378                            }
1379                            writer.write(packet.toXML().toString());
1380                        }
1381                    }
1382                    catch (Exception e) {
1383                        LOGGER.log(Level.WARNING,
1384                                        "Exception flushing queue during shutdown, ignore and continue",
1385                                        e);
1386                    }
1387
1388                    // Close the stream.
1389                    try {
1390                        writer.write("</stream:stream>");
1391                        writer.flush();
1392                    }
1393                    catch (Exception e) {
1394                        LOGGER.log(Level.WARNING, "Exception writing closing stream element", e);
1395                    }
1396
1397                    // Delete the queue contents (hopefully nothing is left).
1398                    queue.clear();
1399                } else if (instantShutdown && isSmEnabled()) {
1400                    // This was an instantShutdown and SM is enabled, drain all remaining stanzas
1401                    // into the unacknowledgedStanzas queue
1402                    drainWriterQueueToUnacknowledgedStanzas();
1403                }
1404                // Do *not* close the writer here, as it will cause the socket
1405                // to get closed. But we may want to receive further stanzas
1406                // until the closing stream tag is received. The socket will be
1407                // closed in shutdown().
1408            }
1409            catch (Exception e) {
1410                // The exception can be ignored if the the connection is 'done'
1411                // or if the it was caused because the socket got closed
1412                if (!(done() || queue.isShutdown())) {
1413                    // Set running to false since this thread will exit here and notifyConnectionError() will wait until
1414                    // the reader and writer thread's 'running' value is false.
1415                    running = false;
1416                    notifyConnectionError(e);
1417                } else {
1418                    LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e);
1419                }
1420            }
1421        }
1422
1423        private void drainWriterQueueToUnacknowledgedStanzas() {
1424            List<Element> elements = new ArrayList<>(queue.size());
1425            queue.drainTo(elements);
1426            for (int i = 0; i < elements.size(); i++) {
1427                Element element = elements.get(i);
1428                // If the unacknowledgedStanza queue is full, then bail out with a warning message. See SMACK-844.
1429                if (unacknowledgedStanzas.remainingCapacity() == 0) {
1430                    StreamManagementException.UnacknowledgedQueueFullException exception = StreamManagementException.UnacknowledgedQueueFullException
1431                            .newWith(i, elements, unacknowledgedStanzas);
1432                    LOGGER.log(Level.WARNING,
1433                            "Some stanzas may be lost as not all could be drained to the unacknowledged stanzas queue", exception);
1434                    return;
1435                }
1436                if (element instanceof Stanza) {
1437                    unacknowledgedStanzas.add((Stanza) element);
1438                }
1439            }
1440        }
1441
1442        private void maybeAddToUnacknowledgedStanzas(Stanza stanza) throws IOException {
1443            // Check if the stream element should be put to the unacknowledgedStanza
1444            // queue. Note that we can not do the put() in sendStanzaInternal() and the
1445            // packet order is not stable at this point (sendStanzaInternal() can be
1446            // called concurrently).
1447            if (unacknowledgedStanzas != null && stanza != null) {
1448                // If the unacknowledgedStanza queue reaching its high water mark, request an new ack
1449                // from the server in order to drain it
1450                if (unacknowledgedStanzas.size() == UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE_HIGH_WATER_MARK) {
1451                    writer.write(AckRequest.INSTANCE.toXML().toString());
1452                }
1453
1454                try {
1455                    // It is important the we put the stanza in the unacknowledged stanza
1456                    // queue before we put it on the wire
1457                    unacknowledgedStanzas.put(stanza);
1458                }
1459                catch (InterruptedException e) {
1460                    throw new IllegalStateException(e);
1461                }
1462            }
1463        }
1464    }
1465
1466    /**
1467     * Set if Stream Management should be used by default for new connections.
1468     *
1469     * @param useSmDefault true to use Stream Management for new connections.
1470     */
1471    public static void setUseStreamManagementDefault(boolean useSmDefault) {
1472        XMPPTCPConnection.useSmDefault = useSmDefault;
1473    }
1474
1475    /**
1476     * Set if Stream Management resumption should be used by default for new connections.
1477     *
1478     * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
1479     * @deprecated use {@link #setUseStreamManagementResumptionDefault(boolean)} instead.
1480     */
1481    @Deprecated
1482    public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) {
1483        setUseStreamManagementResumptionDefault(useSmResumptionDefault);
1484    }
1485
1486    /**
1487     * Set if Stream Management resumption should be used by default for new connections.
1488     *
1489     * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
1490     */
1491    public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) {
1492        if (useSmResumptionDefault) {
1493            // Also enable SM is resumption is enabled
1494            setUseStreamManagementDefault(useSmResumptionDefault);
1495        }
1496        XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault;
1497    }
1498
1499    /**
1500     * Set if Stream Management should be used if supported by the server.
1501     *
1502     * @param useSm true to use Stream Management.
1503     */
1504    public void setUseStreamManagement(boolean useSm) {
1505        this.useSm = useSm;
1506    }
1507
1508    /**
1509     * Set if Stream Management resumption should be used if supported by the server.
1510     *
1511     * @param useSmResumption true to use Stream Management resumption.
1512     */
1513    public void setUseStreamManagementResumption(boolean useSmResumption) {
1514        if (useSmResumption) {
1515            // Also enable SM is resumption is enabled
1516            setUseStreamManagement(useSmResumption);
1517        }
1518        this.useSmResumption = useSmResumption;
1519    }
1520
1521    /**
1522     * Set the preferred resumption time in seconds.
1523     * @param resumptionTime the preferred resumption time in seconds
1524     */
1525    public void setPreferredResumptionTime(int resumptionTime) {
1526        smClientMaxResumptionTime = resumptionTime;
1527    }
1528
1529    /**
1530     * Add a predicate for Stream Management acknowledgment requests.
1531     * <p>
1532     * Those predicates are used to determine when a Stream Management acknowledgement request is send to the server.
1533     * Some pre-defined predicates are found in the <code>org.jivesoftware.smack.sm.predicates</code> package.
1534     * </p>
1535     * <p>
1536     * If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used.
1537     * </p>
1538     *
1539     * @param predicate the predicate to add.
1540     * @return if the predicate was not already active.
1541     */
1542    public boolean addRequestAckPredicate(StanzaFilter predicate) {
1543        synchronized (requestAckPredicates) {
1544            return requestAckPredicates.add(predicate);
1545        }
1546    }
1547
1548    /**
1549     * Remove the given predicate for Stream Management acknowledgment request.
1550     * @param predicate the predicate to remove.
1551     * @return true if the predicate was removed.
1552     */
1553    public boolean removeRequestAckPredicate(StanzaFilter predicate) {
1554        synchronized (requestAckPredicates) {
1555            return requestAckPredicates.remove(predicate);
1556        }
1557    }
1558
1559    /**
1560     * Remove all predicates for Stream Management acknowledgment requests.
1561     */
1562    public void removeAllRequestAckPredicates() {
1563        synchronized (requestAckPredicates) {
1564            requestAckPredicates.clear();
1565        }
1566    }
1567
1568    /**
1569     * Send an unconditional Stream Management acknowledgement request to the server.
1570     *
1571     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1572     * @throws NotConnectedException if the connection is not connected.
1573     * @throws InterruptedException if the calling thread was interrupted.
1574     */
1575    public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException {
1576        if (!isSmEnabled()) {
1577            throw new StreamManagementException.StreamManagementNotEnabledException();
1578        }
1579        requestSmAcknowledgementInternal();
1580    }
1581
1582    private void requestSmAcknowledgementInternal() throws NotConnectedException, InterruptedException {
1583        packetWriter.sendStreamElement(AckRequest.INSTANCE);
1584    }
1585
1586    /**
1587     * Send a unconditional Stream Management acknowledgment to the server.
1588     * <p>
1589     * See <a href="http://xmpp.org/extensions/xep-0198.html#acking">XEP-198: Stream Management ยง 4. Acks</a>:
1590     * "Either party MAY send an &lt;a/&gt; element at any time (e.g., after it has received a certain number of stanzas,
1591     * or after a certain period of time), even if it has not received an &lt;r/&gt; element from the other party."
1592     * </p>
1593     *
1594     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1595     * @throws NotConnectedException if the connection is not connected.
1596     * @throws InterruptedException if the calling thread was interrupted.
1597     */
1598    public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException {
1599        if (!isSmEnabled()) {
1600            throw new StreamManagementException.StreamManagementNotEnabledException();
1601        }
1602        sendSmAcknowledgementInternal();
1603    }
1604
1605    private void sendSmAcknowledgementInternal() throws NotConnectedException, InterruptedException {
1606        AckAnswer ackAnswer = new AckAnswer(clientHandledStanzasCount);
1607        // Do net put an ack to the queue if it has already been shutdown. Some servers, like ejabberd, like to request
1608        // an ack even after we have send a stream close (and hance the queue was shutdown). If we would not check here,
1609        // then the ack would dangle around in the queue, and be send on the next re-connection attempt even before the
1610        // stream open.
1611        packetWriter.queue.putIfNotShutdown(ackAnswer);
1612    }
1613
1614    /**
1615     * Add a Stanza acknowledged listener.
1616     * <p>
1617     * Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get
1618     * automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, StanzaListener)} when
1619     * possible.
1620     * </p>
1621     *
1622     * @param listener the listener to add.
1623     */
1624    public void addStanzaAcknowledgedListener(StanzaListener listener) {
1625        stanzaAcknowledgedListeners.add(listener);
1626    }
1627
1628    /**
1629     * Remove the given Stanza acknowledged listener.
1630     *
1631     * @param listener the listener.
1632     * @return true if the listener was removed.
1633     */
1634    public boolean removeStanzaAcknowledgedListener(StanzaListener listener) {
1635        return stanzaAcknowledgedListeners.remove(listener);
1636    }
1637
1638    /**
1639     * Remove all stanza acknowledged listeners.
1640     */
1641    public void removeAllStanzaAcknowledgedListeners() {
1642        stanzaAcknowledgedListeners.clear();
1643    }
1644
1645    /**
1646     * Add a Stanza dropped listener.
1647     * <p>
1648     * Those listeners will be invoked every time a Stanza has been dropped due to a failed SM resume. They will not get
1649     * automatically removed. If at least one StanzaDroppedListener is configured, no attempt will be made to retransmit
1650     * the Stanzas.
1651     * </p>
1652     *
1653     * @param listener the listener to add.
1654     * @since 4.3.3
1655     */
1656    public void addStanzaDroppedListener(StanzaListener listener) {
1657        stanzaDroppedListeners.add(listener);
1658    }
1659
1660    /**
1661     * Remove the given Stanza dropped listener.
1662     *
1663     * @param listener the listener.
1664     * @return true if the listener was removed.
1665     * @since 4.3.3
1666     */
1667    public boolean removeStanzaDroppedListener(StanzaListener listener) {
1668        return stanzaDroppedListeners.remove(listener);
1669    }
1670
1671    /**
1672     * Add a new Stanza ID acknowledged listener for the given ID.
1673     * <p>
1674     * The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will
1675     * automatically be removed after the listener was run.
1676     * </p>
1677     *
1678     * @param id the stanza ID.
1679     * @param listener the listener to invoke.
1680     * @return the previous listener for this stanza ID or null.
1681     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1682     */
1683    @SuppressWarnings("FutureReturnValueIgnored")
1684    public StanzaListener addStanzaIdAcknowledgedListener(final String id, StanzaListener listener) throws StreamManagementNotEnabledException {
1685        // Prevent users from adding callbacks that will never get removed
1686        if (!smWasEnabledAtLeastOnce) {
1687            throw new StreamManagementException.StreamManagementNotEnabledException();
1688        }
1689        // Remove the listener after max. 3 hours
1690        final int removeAfterSeconds = Math.min(getMaxSmResumptionTime(), 3 * 60 * 60);
1691        schedule(new Runnable() {
1692            @Override
1693            public void run() {
1694                stanzaIdAcknowledgedListeners.remove(id);
1695            }
1696        }, removeAfterSeconds, TimeUnit.SECONDS);
1697        return stanzaIdAcknowledgedListeners.put(id, listener);
1698    }
1699
1700    /**
1701     * Remove the Stanza ID acknowledged listener for the given ID.
1702     *
1703     * @param id the stanza ID.
1704     * @return true if the listener was found and removed, false otherwise.
1705     */
1706    public StanzaListener removeStanzaIdAcknowledgedListener(String id) {
1707        return stanzaIdAcknowledgedListeners.remove(id);
1708    }
1709
1710    /**
1711     * Removes all Stanza ID acknowledged listeners.
1712     */
1713    public void removeAllStanzaIdAcknowledgedListeners() {
1714        stanzaIdAcknowledgedListeners.clear();
1715    }
1716
1717    /**
1718     * Returns true if Stream Management is supported by the server.
1719     *
1720     * @return true if Stream Management is supported by the server.
1721     */
1722    public boolean isSmAvailable() {
1723        return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE);
1724    }
1725
1726    /**
1727     * Returns true if Stream Management was successfully negotiated with the server.
1728     *
1729     * @return true if Stream Management was negotiated.
1730     */
1731    public boolean isSmEnabled() {
1732        return smEnabledSyncPoint;
1733    }
1734
1735    /**
1736     * Returns true if the stream was successfully resumed with help of Stream Management.
1737     *
1738     * @return true if the stream was resumed.
1739     */
1740    public boolean streamWasResumed() {
1741        return smResumedSyncPoint == SyncPointState.successful;
1742    }
1743
1744    /**
1745     * Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible.
1746     *
1747     * @return true if disconnected but resumption possible.
1748     */
1749    public boolean isDisconnectedButSmResumptionPossible() {
1750        return disconnectedButResumeable && isSmResumptionPossible();
1751    }
1752
1753    /**
1754     * Returns true if the stream is resumable.
1755     *
1756     * @return true if the stream is resumable.
1757     */
1758    public boolean isSmResumptionPossible() {
1759        // There is no resumable stream available
1760        if (smSessionId == null)
1761            return false;
1762
1763        final Long shutdownTimestamp = packetWriter.shutdownTimestamp;
1764        // Seems like we are already reconnected, report true
1765        if (shutdownTimestamp == null) {
1766            return true;
1767        }
1768
1769        // See if resumption time is over
1770        long current = System.currentTimeMillis();
1771        long maxResumptionMillies = ((long) getMaxSmResumptionTime()) * 1000;
1772        if (current > shutdownTimestamp + maxResumptionMillies) {
1773            // Stream resumption is *not* possible if the current timestamp is greater then the greatest timestamp where
1774            // resumption is possible
1775            return false;
1776        } else {
1777            return true;
1778        }
1779    }
1780
1781    /**
1782     * Drop the stream management state. Sets {@link #smSessionId} and
1783     * {@link #unacknowledgedStanzas} to <code>null</code>.
1784     */
1785    private void dropSmState() {
1786        // clientHandledCount and serverHandledCount will be reset on <enable/> and <enabled/>
1787        // respective. No need to reset them here.
1788        smSessionId = null;
1789        unacknowledgedStanzas = null;
1790    }
1791
1792    /**
1793     * Get the maximum resumption time in seconds after which a managed stream can be resumed.
1794     * <p>
1795     * This method will return {@link Integer#MAX_VALUE} if neither the client nor the server specify a maximum
1796     * resumption time. Be aware of integer overflows when using this value, e.g. do not add arbitrary values to it
1797     * without checking for overflows before.
1798     * </p>
1799     *
1800     * @return the maximum resumption time in seconds or {@link Integer#MAX_VALUE} if none set.
1801     */
1802    public int getMaxSmResumptionTime() {
1803        int clientResumptionTime = smClientMaxResumptionTime > 0 ? smClientMaxResumptionTime : Integer.MAX_VALUE;
1804        int serverResumptionTime = smServerMaxResumptionTime > 0 ? smServerMaxResumptionTime : Integer.MAX_VALUE;
1805        return Math.min(clientResumptionTime, serverResumptionTime);
1806    }
1807
1808    private void processHandledCount(long handledCount) throws StreamManagementCounterError {
1809        long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount);
1810        final List<Stanza> ackedStanzas = new ArrayList<>(
1811                        ackedStanzasCount <= Integer.MAX_VALUE ? (int) ackedStanzasCount
1812                                        : Integer.MAX_VALUE);
1813        for (long i = 0; i < ackedStanzasCount; i++) {
1814            Stanza ackedStanza = unacknowledgedStanzas.poll();
1815            // If the server ack'ed a stanza, then it must be in the
1816            // unacknowledged stanza queue. There can be no exception.
1817            if (ackedStanza == null) {
1818                throw new StreamManagementCounterError(handledCount, serverHandledStanzasCount,
1819                                ackedStanzasCount, ackedStanzas);
1820            }
1821            ackedStanzas.add(ackedStanza);
1822        }
1823
1824        boolean atLeastOneStanzaAcknowledgedListener = false;
1825        if (!stanzaAcknowledgedListeners.isEmpty()) {
1826            // If stanzaAcknowledgedListeners is not empty, the we have at least one
1827            atLeastOneStanzaAcknowledgedListener = true;
1828        }
1829        else {
1830            // Otherwise we look for a matching id in the stanza *id* acknowledged listeners
1831            for (Stanza ackedStanza : ackedStanzas) {
1832                String id = ackedStanza.getStanzaId();
1833                if (id != null && stanzaIdAcknowledgedListeners.containsKey(id)) {
1834                    atLeastOneStanzaAcknowledgedListener = true;
1835                    break;
1836                }
1837            }
1838        }
1839
1840        // Only spawn a new thread if there is a chance that some listener is invoked
1841        if (atLeastOneStanzaAcknowledgedListener) {
1842            asyncGo(new Runnable() {
1843                @Override
1844                public void run() {
1845                    for (Stanza ackedStanza : ackedStanzas) {
1846                        for (StanzaListener listener : stanzaAcknowledgedListeners) {
1847                            try {
1848                                listener.processStanza(ackedStanza);
1849                            }
1850                            catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
1851                                LOGGER.log(Level.FINER, "Received exception", e);
1852                            }
1853                        }
1854                        String id = ackedStanza.getStanzaId();
1855                        if (StringUtils.isNullOrEmpty(id)) {
1856                            continue;
1857                        }
1858                        StanzaListener listener = stanzaIdAcknowledgedListeners.remove(id);
1859                        if (listener != null) {
1860                            try {
1861                                listener.processStanza(ackedStanza);
1862                            }
1863                            catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
1864                                LOGGER.log(Level.FINER, "Received exception", e);
1865                            }
1866                        }
1867                    }
1868                }
1869            });
1870        }
1871
1872        serverHandledStanzasCount = handledCount;
1873    }
1874
1875    /**
1876     * Set the default bundle and defer callback used for new connections.
1877     *
1878     * @param defaultBundleAndDeferCallback TODO javadoc me please
1879     * @see BundleAndDeferCallback
1880     * @since 4.1
1881     */
1882    public static void setDefaultBundleAndDeferCallback(BundleAndDeferCallback defaultBundleAndDeferCallback) {
1883        XMPPTCPConnection.defaultBundleAndDeferCallback = defaultBundleAndDeferCallback;
1884    }
1885
1886    /**
1887     * Set the bundle and defer callback used for this connection.
1888     * <p>
1889     * You can use <code>null</code> as argument to reset the callback. Outgoing stanzas will then
1890     * no longer get deferred.
1891     * </p>
1892     *
1893     * @param bundleAndDeferCallback the callback or <code>null</code>.
1894     * @see BundleAndDeferCallback
1895     * @since 4.1
1896     */
1897    public void setBundleandDeferCallback(BundleAndDeferCallback bundleAndDeferCallback) {
1898        this.bundleAndDeferCallback = bundleAndDeferCallback;
1899    }
1900
1901}