001/**
002 *
003 * Copyright 2003-2007 Jive Software.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack.tcp;
018
019import java.io.BufferedReader;
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.InputStreamReader;
023import java.io.OutputStream;
024import java.io.OutputStreamWriter;
025import java.io.Writer;
026import java.net.InetAddress;
027import java.net.InetSocketAddress;
028import java.net.Socket;
029import java.security.cert.CertificateException;
030import java.util.ArrayList;
031import java.util.Collection;
032import java.util.Iterator;
033import java.util.LinkedHashSet;
034import java.util.LinkedList;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import java.util.concurrent.ArrayBlockingQueue;
039import java.util.concurrent.BlockingQueue;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.concurrent.ConcurrentLinkedQueue;
042import java.util.concurrent.TimeUnit;
043import java.util.concurrent.atomic.AtomicBoolean;
044import java.util.logging.Level;
045import java.util.logging.Logger;
046
047import javax.net.SocketFactory;
048import javax.net.ssl.HostnameVerifier;
049import javax.net.ssl.SSLSession;
050import javax.net.ssl.SSLSocket;
051import javax.net.ssl.SSLSocketFactory;
052
053import org.jivesoftware.smack.AbstractXMPPConnection;
054import org.jivesoftware.smack.ConnectionConfiguration;
055import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
056import org.jivesoftware.smack.ConnectionListener;
057import org.jivesoftware.smack.SmackConfiguration;
058import org.jivesoftware.smack.SmackException;
059import org.jivesoftware.smack.SmackException.AlreadyConnectedException;
060import org.jivesoftware.smack.SmackException.AlreadyLoggedInException;
061import org.jivesoftware.smack.SmackException.ConnectionException;
062import org.jivesoftware.smack.SmackException.EndpointConnectionException;
063import org.jivesoftware.smack.SmackException.NotConnectedException;
064import org.jivesoftware.smack.SmackException.NotLoggedInException;
065import org.jivesoftware.smack.SmackException.OutgoingQueueFullException;
066import org.jivesoftware.smack.SmackException.SecurityNotPossibleException;
067import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException;
068import org.jivesoftware.smack.SmackFuture;
069import org.jivesoftware.smack.StanzaListener;
070import org.jivesoftware.smack.XMPPConnection;
071import org.jivesoftware.smack.XMPPException;
072import org.jivesoftware.smack.XMPPException.FailedNonzaException;
073import org.jivesoftware.smack.XMPPException.StreamErrorException;
074import org.jivesoftware.smack.compress.packet.Compress;
075import org.jivesoftware.smack.compress.packet.Compressed;
076import org.jivesoftware.smack.compression.XMPPInputOutputStream;
077import org.jivesoftware.smack.datatypes.UInt16;
078import org.jivesoftware.smack.filter.StanzaFilter;
079import org.jivesoftware.smack.internal.SmackTlsContext;
080import org.jivesoftware.smack.packet.Element;
081import org.jivesoftware.smack.packet.IQ;
082import org.jivesoftware.smack.packet.Message;
083import org.jivesoftware.smack.packet.Presence;
084import org.jivesoftware.smack.packet.Stanza;
085import org.jivesoftware.smack.packet.StartTls;
086import org.jivesoftware.smack.packet.StreamError;
087import org.jivesoftware.smack.packet.StreamOpen;
088import org.jivesoftware.smack.packet.TopLevelStreamElement;
089import org.jivesoftware.smack.proxy.ProxyInfo;
090import org.jivesoftware.smack.sasl.packet.SaslNonza;
091import org.jivesoftware.smack.sm.SMUtils;
092import org.jivesoftware.smack.sm.StreamManagementException;
093import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException;
094import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError;
095import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException;
096import org.jivesoftware.smack.sm.packet.StreamManagement;
097import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer;
098import org.jivesoftware.smack.sm.packet.StreamManagement.AckRequest;
099import org.jivesoftware.smack.sm.packet.StreamManagement.Enable;
100import org.jivesoftware.smack.sm.packet.StreamManagement.Enabled;
101import org.jivesoftware.smack.sm.packet.StreamManagement.Failed;
102import org.jivesoftware.smack.sm.packet.StreamManagement.Resume;
103import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed;
104import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature;
105import org.jivesoftware.smack.sm.predicates.Predicate;
106import org.jivesoftware.smack.sm.provider.ParseStreamManagement;
107import org.jivesoftware.smack.tcp.rce.RemoteXmppTcpConnectionEndpoints;
108import org.jivesoftware.smack.tcp.rce.Rfc6120TcpRemoteConnectionEndpoint;
109import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
110import org.jivesoftware.smack.util.Async;
111import org.jivesoftware.smack.util.CloseableUtil;
112import org.jivesoftware.smack.util.PacketParserUtils;
113import org.jivesoftware.smack.util.StringUtils;
114import org.jivesoftware.smack.util.TLSUtils;
115import org.jivesoftware.smack.util.XmlStringBuilder;
116import org.jivesoftware.smack.util.rce.RemoteConnectionException;
117import org.jivesoftware.smack.xml.SmackXmlParser;
118import org.jivesoftware.smack.xml.XmlPullParser;
119import org.jivesoftware.smack.xml.XmlPullParserException;
120
121import org.jxmpp.jid.impl.JidCreate;
122import org.jxmpp.jid.parts.Resourcepart;
123import org.jxmpp.stringprep.XmppStringprepException;
124import org.minidns.dnsname.DnsName;
125
126/**
127 * Creates a socket connection to an XMPP server. This is the default connection
128 * to an XMPP server and is specified in the XMPP Core (RFC 6120).
129 *
130 * @see XMPPConnection
131 * @author Matt Tucker
132 */
133public class XMPPTCPConnection extends AbstractXMPPConnection {
134
135    private static final int QUEUE_SIZE = 500;
136    private static final Logger LOGGER = Logger.getLogger(XMPPTCPConnection.class.getName());
137
138    /**
139     * The socket which is used for this connection.
140     */
141    private Socket socket;
142
143    /**
144     *
145     */
146    private boolean disconnectedButResumeable = false;
147
148    private SSLSocket secureSocket;
149
150    /**
151     * Protected access level because of unit test purposes
152     */
153    protected final PacketWriter packetWriter = new PacketWriter();
154
155    /**
156     * Protected access level because of unit test purposes
157     */
158    protected final PacketReader packetReader = new PacketReader();
159
160    /**
161     *
162     */
163    private boolean streamFeaturesAfterAuthenticationReceived;
164
165    /**
166     *
167     */
168    private boolean compressSyncPoint;
169
170    /**
171     * The default bundle and defer callback, used for new connections.
172     * @see bundleAndDeferCallback
173     */
174    private static BundleAndDeferCallback defaultBundleAndDeferCallback;
175
176    /**
177     * The used bundle and defer callback.
178     * <p>
179     * Although this field may be set concurrently, the 'volatile' keyword was deliberately not added, in order to avoid
180     * having a 'volatile' read within the writer threads loop.
181     * </p>
182     */
183    private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback;
184
185    private static boolean useSmDefault = true;
186
187    private static boolean useSmResumptionDefault = true;
188
189    /**
190     * The stream ID of the stream that is currently resumable, ie. the stream we hold the state
191     * for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and
192     * {@link #unacknowledgedStanzas}.
193     */
194    private String smSessionId;
195
196    /**
197     * Represents the state of stream management resumption.
198     * <p>
199     * Unlike other sync points, this sync point is marked volatile because it is also read by the reader thread.
200     * </p>
201     */
202    private volatile SyncPointState smResumedSyncPoint;
203    private Failed smResumptionFailed;
204
205    /**
206     * Represents the state of stream magement.
207     * <p>
208     * This boolean is marked volatile as it is read by various threads, including the reader thread via {@link #isSmEnabled()}.
209     * </p>
210     */
211    private volatile boolean smEnabledSyncPoint;
212
213    /**
214     * The client's preferred maximum resumption time in seconds.
215     */
216    private int smClientMaxResumptionTime = -1;
217
218    /**
219     * The server's preferred maximum resumption time in seconds.
220     */
221    private int smServerMaxResumptionTime = -1;
222
223    /**
224     * Indicates whether Stream Management (XEP-198) should be used if it's supported by the server.
225     */
226    private boolean useSm = useSmDefault;
227    private boolean useSmResumption = useSmResumptionDefault;
228
229    /**
230     * The counter that the server sends the client about it's current height. For example, if the server sends
231     * {@code <a h='42'/>}, then this will be set to 42 (while also handling the {@link #unacknowledgedStanzas} queue).
232     */
233    private long serverHandledStanzasCount = 0;
234
235    /**
236     * The counter for stanzas handled ("received") by the client.
237     * <p>
238     * Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are
239     * not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since
240     * {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and
241     * therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic.
242     * </p>
243     */
244    private long clientHandledStanzasCount = 0;
245
246    private BlockingQueue<Stanza> unacknowledgedStanzas;
247
248    /**
249     * Set to true if Stream Management was at least once enabled for this connection.
250     */
251    private boolean smWasEnabledAtLeastOnce = false;
252
253    /**
254     * This listeners are invoked for every stanza that got acknowledged.
255     * <p>
256     * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove
257     * themselves after they have been invoked.
258     * </p>
259     */
260    private final Collection<StanzaListener> stanzaAcknowledgedListeners = new ConcurrentLinkedQueue<>();
261
262    /**
263     * These listeners are invoked for every stanza that got dropped.
264     * <p>
265     * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove
266     * themselves after they have been invoked.
267     * </p>
268     */
269    private final Collection<StanzaListener> stanzaDroppedListeners = new ConcurrentLinkedQueue<>();
270
271    /**
272     * This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will
273     * only be invoked once and automatically removed after that.
274     */
275    private final Map<String, StanzaListener> stanzaIdAcknowledgedListeners = new ConcurrentHashMap<>();
276
277    /**
278     * Predicates that determine if an stream management ack should be requested from the server.
279     * <p>
280     * We use a linked hash set here, so that the order how the predicates are added matches the
281     * order in which they are invoked in order to determine if an ack request should be send or not.
282     * </p>
283     */
284    private final Set<StanzaFilter> requestAckPredicates = new LinkedHashSet<>();
285
286    @SuppressWarnings("HidingField")
287    private final XMPPTCPConnectionConfiguration config;
288
289    /**
290     * Creates a new XMPP connection over TCP (optionally using proxies).
291     * <p>
292     * Note that XMPPTCPConnection constructors do not establish a connection to the server
293     * and you must call {@link #connect()}.
294     * </p>
295     *
296     * @param config the connection configuration.
297     */
298    public XMPPTCPConnection(XMPPTCPConnectionConfiguration config) {
299        super(config);
300        this.config = config;
301        addConnectionListener(new ConnectionListener() {
302            @Override
303            public void connectionClosedOnError(Exception e) {
304                if (e instanceof XMPPException.StreamErrorException || e instanceof StreamManagementException) {
305                    dropSmState();
306                }
307            }
308        });
309
310        // Re-init the reader and writer in case of SASL <success/>. This is done to reset the parser since a new stream
311        // is initiated.
312        buildNonzaCallback().listenFor(SaslNonza.Success.class, s -> resetParser()).install();
313    }
314
315    /**
316     * Creates a new XMPP connection over TCP.
317     * <p>
318     * Note that {@code jid} must be the bare JID, e.g. "user@example.org". More fine-grained control over the
319     * connection settings is available using the {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)}
320     * constructor.
321     * </p>
322     *
323     * @param jid the bare JID used by the client.
324     * @param password the password or authentication token.
325     * @throws XmppStringprepException if the provided string is invalid.
326     */
327    public XMPPTCPConnection(CharSequence jid, String password) throws XmppStringprepException {
328        this(XMPPTCPConnectionConfiguration.builder().setXmppAddressAndPassword(jid, password).build());
329    }
330
331    /**
332     * Creates a new XMPP connection over TCP.
333     * <p>
334     * This is the simplest constructor for connecting to an XMPP server. Alternatively,
335     * you can get fine-grained control over connection settings using the
336     * {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} constructor.
337     * </p>
338     * @param username TODO javadoc me please
339     * @param password TODO javadoc me please
340     * @param serviceName TODO javadoc me please
341     * @throws XmppStringprepException if the provided string is invalid.
342     */
343    public XMPPTCPConnection(CharSequence username, String password, String serviceName) throws XmppStringprepException {
344        this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setXmppDomain(
345                                        JidCreate.domainBareFrom(serviceName)).build());
346    }
347
348    @Override
349    protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException {
350        if (packetWriter == null) {
351            throw new NotConnectedException();
352        }
353        packetWriter.throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
354    }
355
356    @Override
357    protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException {
358        if (isConnected() && !disconnectedButResumeable) {
359            throw new AlreadyConnectedException();
360        }
361    }
362
363    @Override
364    protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException {
365        if (isAuthenticated() && !disconnectedButResumeable) {
366            throw new AlreadyLoggedInException();
367        }
368    }
369
370    @Override
371    protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException {
372        // Reset the flag in case it was set
373        disconnectedButResumeable = false;
374        super.afterSuccessfulLogin(resumed);
375    }
376
377    @Override
378    protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException,
379                    SmackException, IOException, InterruptedException {
380        // Authenticate using SASL
381        SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null;
382
383        streamFeaturesAfterAuthenticationReceived = false;
384        authenticate(username, password, config.getAuthzid(), sslSession);
385
386        // Wait for stream features after the authentication.
387        // TODO: The name of this synchronization point "maybeCompressFeaturesReceived" is not perfect. It should be
388        // renamed to "streamFeaturesAfterAuthenticationReceived".
389        waitForConditionOrThrowConnectionException(() -> streamFeaturesAfterAuthenticationReceived, "compress features from server");
390
391        // If compression is enabled then request the server to use stream compression. XEP-170
392        // recommends to perform stream compression before resource binding.
393        maybeEnableCompression();
394
395        smResumedSyncPoint = SyncPointState.initial;
396        smResumptionFailed = null;
397        if (isSmResumptionPossible()) {
398            smResumedSyncPoint = SyncPointState.request_sent;
399            sendNonza(new Resume(clientHandledStanzasCount, smSessionId));
400            waitForConditionOrThrowConnectionException(() -> smResumedSyncPoint == SyncPointState.successful || smResumptionFailed != null, "resume previous stream");
401            if (smResumedSyncPoint == SyncPointState.successful) {
402                // We successfully resumed the stream, be done here
403                afterSuccessfulLogin(true);
404                return;
405            }
406            // SM resumption failed, what Smack does here is to report success of
407            // lastFeaturesReceived in case of sm resumption was answered with 'failed' so that
408            // normal resource binding can be tried.
409            assert smResumptionFailed != null;
410            LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process: " + smResumptionFailed);
411        }
412
413        // We either failed to resume a previous stream management (SM) session, or we did not even try. In any case,
414        // mark SM as not enabled. Most importantly, we do this prior calling bindResourceAndEstablishSession(), as the
415        // bind IQ may trigger a SM ack request, which would be invalid in the pre resource bound state.
416        smEnabledSyncPoint = false;
417
418        List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>();
419        if (unacknowledgedStanzas != null) {
420            // There was a previous connection with SM enabled but that was either not resumable or
421            // failed to resume. Make sure that we (re-)send the unacknowledged stanzas.
422            unacknowledgedStanzas.drainTo(previouslyUnackedStanzas);
423            // Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this
424            // XMPP session (There maybe was an enabled in a previous XMPP session of this
425            // connection instance though). This is used in writePackets to decide if stanzas should
426            // be added to the unacknowledged stanzas queue, because they have to be added right
427            // after the 'enable' stream element has been sent.
428            dropSmState();
429        }
430
431        // Now bind the resource. It is important to do this *after* we dropped an eventually
432        // existing Stream Management state. As otherwise <bind/> and <session/> may end up in
433        // unacknowledgedStanzas and become duplicated on reconnect. See SMACK-706.
434        bindResourceAndEstablishSession(resource);
435
436        if (isSmAvailable() && useSm) {
437            // Remove what is maybe left from previously stream managed sessions
438            serverHandledStanzasCount = 0;
439            sendNonza(new Enable(useSmResumption, smClientMaxResumptionTime));
440            // XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed'
441            // then this is a non recoverable error and we therefore throw an exception.
442            waitForConditionOrThrowConnectionException(() -> smEnabledSyncPoint, "enabling stream mangement");
443            synchronized (requestAckPredicates) {
444                if (requestAckPredicates.isEmpty()) {
445                    // Assure that we have at lest one predicate set up that so that we request acks
446                    // for the server and eventually flush some stanzas from the unacknowledged
447                    // stanza queue
448                    requestAckPredicates.add(Predicate.forMessagesOrAfter5Stanzas());
449                }
450            }
451        }
452        // Inform client about failed resumption if possible, resend stanzas otherwise
453        // Process the stanzas synchronously so a client can re-queue them for transmission
454        // before it is informed about connection success
455        if (!stanzaDroppedListeners.isEmpty()) {
456            for (Stanza stanza : previouslyUnackedStanzas) {
457                for (StanzaListener listener : stanzaDroppedListeners) {
458                    try {
459                        listener.processStanza(stanza);
460                    }
461                    catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
462                        LOGGER.log(Level.FINER, "StanzaDroppedListener received exception", e);
463                    }
464                }
465            }
466        } else {
467            for (Stanza stanza : previouslyUnackedStanzas) {
468                sendInternal(stanza);
469            }
470        }
471
472        afterSuccessfulLogin(false);
473    }
474
475    @Override
476    public boolean isSecureConnection() {
477        return secureSocket != null;
478    }
479
480    /**
481     * Shuts the current connection down. After this method returns, the connection must be ready
482     * for re-use by connect.
483     */
484    @Override
485    protected void shutdown() {
486        if (isSmEnabled()) {
487            try {
488                // Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM
489                // state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either.
490                sendSmAcknowledgementInternal();
491            } catch (InterruptedException | NotConnectedException e) {
492                LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e);
493            }
494        }
495        shutdown(false);
496    }
497
498    @Override
499    public synchronized void instantShutdown() {
500        shutdown(true);
501    }
502
503    private void shutdown(boolean instant) {
504        // The writer thread may already been finished at this point, for example when the connection is in the
505        // disconnected-but-resumable state. There is no need to wait for the closing stream tag from the server in this
506        // case.
507        if (!packetWriter.done()) {
508            // First shutdown the writer, this will result in a closing stream element getting send to
509            // the server
510            LOGGER.finer(packetWriter.threadName + " shutdown()");
511            packetWriter.shutdown(instant);
512            LOGGER.finer(packetWriter.threadName + " shutdown() returned");
513
514            if (!instant) {
515                waitForClosingStreamTagFromServer();
516            }
517        }
518
519        LOGGER.finer(packetReader.threadName + " shutdown()");
520        packetReader.shutdown();
521        LOGGER.finer(packetReader.threadName + " shutdown() returned");
522
523        CloseableUtil.maybeClose(socket, LOGGER);
524
525        setWasAuthenticated();
526
527        try {
528            boolean readerAndWriterThreadsTermianted = waitFor(() -> !packetWriter.running && !packetReader.running);
529            if (!readerAndWriterThreadsTermianted) {
530                LOGGER.severe("Reader and/or writer threads did not terminate timely. Writer running: "
531                                + packetWriter.running + ", Reader running: " + packetReader.running);
532            } else {
533                LOGGER.fine("Reader and writer threads terminated");
534            }
535        } catch (InterruptedException e) {
536            LOGGER.log(Level.FINE, "Interrupted while waiting for reader and writer threads to terminate", e);
537        }
538
539        if (disconnectedButResumeable) {
540            return;
541        }
542
543        // If we are able to resume the stream, then don't set
544        // connected/authenticated/usingTLS to false since we like to behave like we are still
545        // connected (e.g. sendStanza should not throw a NotConnectedException).
546        if (instant) {
547            disconnectedButResumeable = isSmResumptionPossible();
548            if (!disconnectedButResumeable) {
549                // Reset the stream management session id to null, since the stream is no longer resumable. Note that we
550                // keep the unacknowledgedStanzas queue, because we want to resend them when we are reconnected.
551                smSessionId = null;
552            }
553        } else {
554            disconnectedButResumeable = false;
555
556            // Drop the stream management state if this is not an instant shutdown. We send
557            // a </stream> close tag and now the stream management state is no longer valid.
558            // This also prevents that we will potentially (re-)send any unavailable presence we
559            // may have send, because it got put into the unacknowledged queue and was not acknowledged before the
560            // connection terminated.
561            dropSmState();
562            // Note that we deliberately do not reset authenticatedConnectionInitiallyEstablishedTimestamp here, so that the
563            // information is available in the connectionClosedOnError() listeners.
564        }
565        authenticated = false;
566        connected = false;
567        secureSocket = null;
568        reader = null;
569        writer = null;
570
571        initState();
572    }
573
574    private interface SmAckAction<E extends Exception> {
575        void run() throws NotConnectedException, E;
576    }
577
578    private <E extends Exception> void requestSmAckIfNecessary(TopLevelStreamElement element,
579                    SmAckAction<E> smAckAction) throws NotConnectedException, E {
580        if (!isSmEnabled())
581            return;
582
583        if (element instanceof Stanza) {
584            Stanza stanza = (Stanza) element;
585            for (StanzaFilter requestAckPredicate : requestAckPredicates) {
586                if (requestAckPredicate.accept(stanza)) {
587                    smAckAction.run();
588                    break;
589                }
590            }
591        }
592    }
593
594    @Override
595    protected void sendInternal(TopLevelStreamElement element) throws NotConnectedException, InterruptedException {
596        packetWriter.sendStreamElement(element);
597        requestSmAckIfNecessary(element, () -> requestSmAcknowledgementInternal());
598    }
599
600    @Override
601    protected void sendNonBlockingInternal(TopLevelStreamElement element) throws NotConnectedException, OutgoingQueueFullException {
602        packetWriter.sendNonBlocking(element);
603        requestSmAckIfNecessary(element, () -> requestSmAcknowledgementNonBlockingInternal());
604    }
605
606    private void connectUsingConfiguration() throws ConnectionException, IOException, InterruptedException {
607        RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result = RemoteXmppTcpConnectionEndpoints.lookup(config);
608
609        List<RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint>> connectionExceptions = new ArrayList<>();
610
611        SocketFactory socketFactory = config.getSocketFactory();
612        ProxyInfo proxyInfo = config.getProxyInfo();
613        int timeout = config.getConnectTimeout();
614        if (socketFactory == null) {
615            socketFactory = SocketFactory.getDefault();
616        }
617        for (Rfc6120TcpRemoteConnectionEndpoint endpoint : result.discoveredRemoteConnectionEndpoints) {
618            Iterator<? extends InetAddress> inetAddresses;
619            String host = endpoint.getHost().toString();
620            UInt16 portUint16 = endpoint.getPort();
621            int port = portUint16.intValue();
622            if (proxyInfo == null) {
623                inetAddresses = endpoint.getInetAddresses().iterator();
624                assert inetAddresses.hasNext();
625
626                innerloop: while (inetAddresses.hasNext()) {
627                    // Create a *new* Socket before every connection attempt, i.e. connect() call, since Sockets are not
628                    // re-usable after a failed connection attempt. See also SMACK-724.
629                    SmackFuture.SocketFuture socketFuture = new SmackFuture.SocketFuture(socketFactory);
630
631                    final InetAddress inetAddress = inetAddresses.next();
632                    final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, port);
633                    LOGGER.finer("Trying to establish TCP connection to " + inetSocketAddress);
634                    socketFuture.connectAsync(inetSocketAddress, timeout);
635
636                    try {
637                        socket = socketFuture.getOrThrow();
638                    } catch (IOException e) {
639                        RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>(
640                                        endpoint, inetAddress, e);
641                        connectionExceptions.add(rce);
642                        if (inetAddresses.hasNext()) {
643                            continue innerloop;
644                        } else {
645                            break innerloop;
646                        }
647                    }
648                    LOGGER.finer("Established TCP connection to " + inetSocketAddress);
649                    // We found a host to connect to, return here
650                    this.host = host;
651                    this.port = portUint16;
652                    return;
653                }
654            } else {
655                // TODO: Move this into the inner-loop above. There appears no reason why we should not try a proxy
656                // connection to every inet address of each connection endpoint.
657                socket = socketFactory.createSocket();
658                StringUtils.requireNotNullNorEmpty(host, "Host of endpoint " + endpoint + " must not be null when using a Proxy");
659                final String hostAndPort = host + " at port " + port;
660                LOGGER.finer("Trying to establish TCP connection via Proxy to " + hostAndPort);
661                try {
662                    proxyInfo.getProxySocketConnection().connect(socket, host, port, timeout);
663                } catch (IOException e) {
664                    CloseableUtil.maybeClose(socket, LOGGER);
665                    RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>(endpoint, null, e);
666                    connectionExceptions.add(rce);
667                    continue;
668                }
669                LOGGER.finer("Established TCP connection to " + hostAndPort);
670                // We found a host to connect to, return here
671                this.host = host;
672                this.port = portUint16;
673                return;
674            }
675        }
676
677        // There are no more host addresses to try
678        // throw an exception and report all tried
679        // HostAddresses in the exception
680        throw EndpointConnectionException.from(result.lookupFailures, connectionExceptions);
681    }
682
683    /**
684     * Initializes the connection by creating a stanza reader and writer and opening a
685     * XMPP stream to the server.
686     *
687     * @throws IOException if an I/O error occurred.
688     * @throws InterruptedException if the calling thread was interrupted.
689     */
690    private void initConnection() throws IOException, InterruptedException {
691        compressionHandler = null;
692
693        // Set the reader and writer instance variables
694        initReaderAndWriter();
695
696        // Start the writer thread. This will open an XMPP stream to the server
697        packetWriter.init();
698        // Start the reader thread. The startup() method will block until we
699        // get an opening stream packet back from server
700        packetReader.init();
701    }
702
703    private void initReaderAndWriter() throws IOException {
704        InputStream is = socket.getInputStream();
705        OutputStream os = socket.getOutputStream();
706        if (compressionHandler != null) {
707            is = compressionHandler.getInputStream(is);
708            os = compressionHandler.getOutputStream(os);
709        }
710        // OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter
711        writer = new OutputStreamWriter(os, "UTF-8");
712        reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
713
714        // If debugging is enabled, we open a window and write out all network traffic.
715        initDebugger();
716    }
717
718    /**
719     * The server has indicated that TLS negotiation can start. We now need to secure the
720     * existing plain connection and perform a handshake. This method won't return until the
721     * connection has finished the handshake or an error occurred while securing the connection.
722     * @throws IOException if an I/O error occurred.
723     * @throws SecurityNotPossibleException if TLS is not possible.
724     * @throws CertificateException if there is an issue with the certificate.
725     */
726    @SuppressWarnings("LiteralClassName")
727    private void proceedTLSReceived() throws IOException, SecurityNotPossibleException, CertificateException {
728        SmackTlsContext smackTlsContext = getSmackTlsContext();
729
730        Socket plain = socket;
731        int port = plain.getPort();
732        String xmppServiceDomainString = config.getXMPPServiceDomain().toString();
733        SSLSocketFactory sslSocketFactory = smackTlsContext.sslContext.getSocketFactory();
734        // Secure the plain connection
735        socket = sslSocketFactory.createSocket(plain, xmppServiceDomainString, port, true);
736
737        final SSLSocket sslSocket = (SSLSocket) socket;
738        // Immediately set the enabled SSL protocols and ciphers. See SMACK-712 why this is
739        // important (at least on certain platforms) and it seems to be a good idea anyways to
740        // prevent an accidental implicit handshake.
741        TLSUtils.setEnabledProtocolsAndCiphers(sslSocket, config.getEnabledSSLProtocols(), config.getEnabledSSLCiphers());
742
743        // Initialize the reader and writer with the new secured version
744        initReaderAndWriter();
745
746        // Proceed to do the handshake
747        sslSocket.startHandshake();
748
749        if (smackTlsContext.daneVerifier != null) {
750            smackTlsContext.daneVerifier.finish(sslSocket.getSession());
751        }
752
753        final HostnameVerifier verifier = getConfiguration().getHostnameVerifier();
754        if (verifier == null) {
755                throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure.");
756        }
757
758        final String verifierHostname;
759        {
760            DnsName xmppServiceDomainDnsName = getConfiguration().getXmppServiceDomainAsDnsNameIfPossible();
761            // Try to convert the XMPP service domain, which potentially includes Unicode characters, into ASCII
762            // Compatible Encoding (ACE) to match RFC3280 dNSname IA5String constraint.
763            // See also: https://bugzilla.mozilla.org/show_bug.cgi?id=280839#c1
764            if (xmppServiceDomainDnsName != null) {
765                verifierHostname = xmppServiceDomainDnsName.ace;
766            }
767            else {
768                LOGGER.log(Level.WARNING, "XMPP service domain name '" + getXMPPServiceDomain()
769                                + "' can not be represented as DNS name. TLS X.509 certificate validiation may fail.");
770                verifierHostname = getXMPPServiceDomain().toString();
771            }
772        }
773
774        final boolean verificationSuccessful;
775        // Verify the TLS session.
776        verificationSuccessful = verifier.verify(verifierHostname, sslSocket.getSession());
777        if (!verificationSuccessful) {
778            throw new CertificateException(
779                            "Hostname verification of certificate failed. Certificate does not authenticate "
780                                            + getXMPPServiceDomain());
781        }
782
783        // Set that TLS was successful
784        secureSocket = sslSocket;
785    }
786
787    /**
788     * Returns the compression handler that can be used for one compression methods offered by the server.
789     *
790     * @return a instance of XMPPInputOutputStream or null if no suitable instance was found
791     *
792     */
793    private static XMPPInputOutputStream maybeGetCompressionHandler(Compress.Feature compression) {
794        for (XMPPInputOutputStream handler : SmackConfiguration.getCompressionHandlers()) {
795                String method = handler.getCompressionMethod();
796                if (compression.getMethods().contains(method))
797                    return handler;
798        }
799        return null;
800    }
801
802    @Override
803    public boolean isUsingCompression() {
804        return compressionHandler != null && compressSyncPoint;
805    }
806
807    /**
808     * <p>
809     * Starts using stream compression that will compress network traffic. Traffic can be
810     * reduced up to 90%. Therefore, stream compression is ideal when using a slow speed network
811     * connection. However, the server and the client will need to use more CPU time in order to
812     * un/compress network data so under high load the server performance might be affected.
813     * </p>
814     * <p>
815     * Stream compression has to have been previously offered by the server. Currently only the
816     * zlib method is supported by the client. Stream compression negotiation has to be done
817     * before authentication took place.
818     * </p>
819     *
820     * @throws NotConnectedException if the XMPP connection is not connected.
821     * @throws SmackException if Smack detected an exceptional situation.
822     * @throws InterruptedException if the calling thread was interrupted.
823     * @throws XMPPException if an XMPP protocol error was received.
824     */
825    private void maybeEnableCompression() throws SmackException, InterruptedException, XMPPException {
826        if (!config.isCompressionEnabled()) {
827            return;
828        }
829
830        Compress.Feature compression = getFeature(Compress.Feature.class);
831        if (compression == null) {
832            // Server does not support compression
833            return;
834        }
835        // If stream compression was offered by the server and we want to use
836        // compression then send compression request to the server
837        if ((compressionHandler = maybeGetCompressionHandler(compression)) != null) {
838            compressSyncPoint = false;
839            sendNonza(new Compress(compressionHandler.getCompressionMethod()));
840            waitForConditionOrThrowConnectionException(() -> compressSyncPoint, "establishing stream compression");
841        } else {
842            LOGGER.warning("Could not enable compression because no matching handler/method pair was found");
843        }
844    }
845
846    /**
847     * Establishes a connection to the XMPP server. It basically
848     * creates and maintains a socket connection to the server.
849     * <p>
850     * Listeners will be preserved from a previous connection if the reconnection
851     * occurs after an abrupt termination.
852     * </p>
853     *
854     * @throws XMPPException if an error occurs while trying to establish the connection.
855     * @throws SmackException if Smack detected an exceptional situation.
856     * @throws IOException if an I/O error occurred.
857     * @throws InterruptedException if the calling thread was interrupted.
858     */
859    @Override
860    protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException {
861        // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if
862        // there is an error establishing the connection
863        connectUsingConfiguration();
864
865        connected = true;
866
867        // We connected successfully to the servers TCP port
868        initConnection();
869
870        // TLS handled will be true either if TLS was established, or if it was not mandatory.
871        waitForConditionOrThrowConnectionException(() -> tlsHandled, "establishing TLS");
872
873        // Wait with SASL auth until the SASL mechanisms have been received
874        waitForConditionOrThrowConnectionException(() -> saslFeatureReceived, "SASL mechanisms stream feature from server");
875    }
876
877    /**
878     * For unit testing purposes
879     *
880     * @param writer TODO javadoc me please
881     */
882    protected void setWriter(Writer writer) {
883        this.writer = writer;
884    }
885
886    @Override
887    protected void afterFeaturesReceived() throws NotConnectedException, InterruptedException, SecurityRequiredByServerException {
888        StartTls startTlsFeature = getFeature(StartTls.class);
889        if (startTlsFeature != null) {
890            if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) {
891                SecurityRequiredByServerException smackException = new SecurityRequiredByServerException();
892                currentSmackException = smackException;
893                notifyWaitingThreads();
894                throw smackException;
895            }
896
897            if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) {
898                sendNonza(new StartTls());
899            } else {
900                tlsHandled = true;
901                notifyWaitingThreads();
902            }
903        } else {
904            tlsHandled = true;
905            notifyWaitingThreads();
906        }
907
908        if (isSaslAuthenticated()) {
909            // If we have received features after the SASL has been successfully completed, then we
910            // have also *maybe* received, as it is an optional feature, the compression feature
911            // from the server.
912            streamFeaturesAfterAuthenticationReceived = true;
913            notifyWaitingThreads();
914        }
915    }
916
917    private void resetParser() throws IOException {
918        try {
919            packetReader.parser = SmackXmlParser.newXmlParser(reader);
920        } catch (XmlPullParserException e) {
921            throw new IOException(e);
922        }
923   }
924
925    private void openStreamAndResetParser() throws IOException, NotConnectedException, InterruptedException {
926        sendStreamOpen();
927        resetParser();
928    }
929
930    protected class PacketReader {
931
932        private final String threadName = "Smack Reader (" + getConnectionCounter() + ')';
933
934        XmlPullParser parser;
935
936        private volatile boolean done;
937
938        private boolean running;
939
940        /**
941         * Initializes the reader in order to be used. The reader is initialized during the
942         * first connection and when reconnecting due to an abruptly disconnection.
943         */
944        void init() {
945            done = false;
946
947            running = true;
948            Async.go(new Runnable() {
949                @Override
950                public void run() {
951                    LOGGER.finer(threadName + " start");
952                    try {
953                        parsePackets();
954                    } finally {
955                        LOGGER.finer(threadName + " exit");
956                        running = false;
957                        notifyWaitingThreads();
958                    }
959                }
960            }, threadName);
961         }
962
963        /**
964         * Shuts the stanza reader down. This method simply sets the 'done' flag to true.
965         */
966        void shutdown() {
967            done = true;
968        }
969
970        /**
971         * Parse top-level packets in order to process them further.
972         */
973        private void parsePackets() {
974            try {
975                openStreamAndResetParser();
976                XmlPullParser.Event eventType = parser.getEventType();
977                while (!done) {
978                    switch (eventType) {
979                    case START_ELEMENT:
980                        final String name = parser.getName();
981                        final String namespace = parser.getNamespace();
982
983                        switch (name) {
984                        case Message.ELEMENT:
985                        case IQ.IQ_ELEMENT:
986                        case Presence.ELEMENT:
987                            try {
988                                parseAndProcessStanza(parser);
989                            } finally {
990                                clientHandledStanzasCount = SMUtils.incrementHeight(clientHandledStanzasCount);
991                            }
992                            break;
993                        case "stream":
994                            if (StreamOpen.ETHERX_JABBER_STREAMS_NAMESPACE.equals(namespace)) {
995                                onStreamOpen(parser);
996                            }
997                            break;
998                        case "error":
999                            StreamError streamError = PacketParserUtils.parseStreamError(parser);
1000                            // Stream errors are non recoverable, throw this exceptions. Also note that this will set
1001                            // this exception as current connection exceptions and notify any waiting threads.
1002                            throw new StreamErrorException(streamError);
1003                        case "features":
1004                            parseFeaturesAndNotify(parser);
1005                            break;
1006                        case "proceed":
1007                            // Secure the connection by negotiating TLS
1008                            proceedTLSReceived();
1009                            // Send a new opening stream to the server
1010                            openStreamAndResetParser();
1011                            break;
1012                        case "failure":
1013                            switch (namespace) {
1014                            case "urn:ietf:params:xml:ns:xmpp-tls":
1015                                // TLS negotiation has failed. The server will close the connection
1016                                // TODO Parse failure stanza
1017                                throw new SmackException.SmackMessageException("TLS negotiation has failed");
1018                            case "http://jabber.org/protocol/compress":
1019                                // Stream compression has been denied. This is a recoverable
1020                                // situation. It is still possible to authenticate and
1021                                // use the connection but using an uncompressed connection
1022                                // TODO Parse failure stanza
1023                                currentSmackException = new SmackException.SmackMessageException("Could not establish compression");
1024                                notifyWaitingThreads();
1025                                break;
1026                            default:
1027                                parseAndProcessNonza(parser);
1028                            }
1029                            break;
1030                        case Compressed.ELEMENT:
1031                            // Server confirmed that it's possible to use stream compression. Start
1032                            // stream compression
1033                            // Initialize the reader and writer with the new compressed version
1034                            initReaderAndWriter();
1035                            // Send a new opening stream to the server
1036                            openStreamAndResetParser();
1037                            // Notify that compression is being used
1038                            compressSyncPoint = true;
1039                            notifyWaitingThreads();
1040                            break;
1041                        case Enabled.ELEMENT:
1042                            Enabled enabled = ParseStreamManagement.enabled(parser);
1043                            if (enabled.isResumeSet()) {
1044                                smSessionId = enabled.getId();
1045                                if (StringUtils.isNullOrEmpty(smSessionId)) {
1046                                    SmackException xmppException = new SmackException.SmackMessageException("Stream Management 'enabled' element with resume attribute but without session id received");
1047                                    setCurrentConnectionExceptionAndNotify(xmppException);
1048                                    throw xmppException;
1049                                }
1050                                smServerMaxResumptionTime = enabled.getMaxResumptionTime();
1051                            } else {
1052                                // Mark this a non-resumable stream by setting smSessionId to null
1053                                smSessionId = null;
1054                            }
1055                            clientHandledStanzasCount = 0;
1056                            smWasEnabledAtLeastOnce = true;
1057                            smEnabledSyncPoint = true;
1058                            notifyWaitingThreads();
1059                            break;
1060                        case Failed.ELEMENT:
1061                            Failed failed = ParseStreamManagement.failed(parser);
1062                            if (smResumedSyncPoint == SyncPointState.request_sent) {
1063                                // This is a <failed/> nonza in a response to resuming a previous stream, failure to do
1064                                // so is non-fatal as we can simply continue with resource binding in this case.
1065                                smResumptionFailed = failed;
1066                                notifyWaitingThreads();
1067                            } else {
1068                                FailedNonzaException xmppException = new FailedNonzaException(failed, failed.getStanzaErrorCondition());
1069                                setCurrentConnectionExceptionAndNotify(xmppException);
1070                            }
1071                            break;
1072                        case Resumed.ELEMENT:
1073                            Resumed resumed = ParseStreamManagement.resumed(parser);
1074                            if (!smSessionId.equals(resumed.getPrevId())) {
1075                                throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId());
1076                            }
1077                            // Mark SM as enabled
1078                            smEnabledSyncPoint = true;
1079                            // First, drop the stanzas already handled by the server
1080                            processHandledCount(resumed.getHandledCount());
1081                            // Then re-send what is left in the unacknowledged queue
1082                            List<Stanza> stanzasToResend = new ArrayList<>(unacknowledgedStanzas.size());
1083                            unacknowledgedStanzas.drainTo(stanzasToResend);
1084                            for (Stanza stanza : stanzasToResend) {
1085                                XMPPTCPConnection.this.sendInternal(stanza);
1086                            }
1087                            // If there where stanzas resent, then request a SM ack for them.
1088                            // Writer's sendStreamElement() won't do it automatically based on
1089                            // predicates.
1090                            if (!stanzasToResend.isEmpty()) {
1091                                requestSmAcknowledgementInternal();
1092                            }
1093                            // Mark SM resumption as successful
1094                            smResumedSyncPoint = SyncPointState.successful;
1095                            notifyWaitingThreads();
1096                            break;
1097                        case AckAnswer.ELEMENT:
1098                            AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser);
1099                            processHandledCount(ackAnswer.getHandledCount());
1100                            break;
1101                        case AckRequest.ELEMENT:
1102                            ParseStreamManagement.ackRequest(parser);
1103                            if (smEnabledSyncPoint) {
1104                                sendSmAcknowledgementInternal();
1105                            } else {
1106                                LOGGER.warning("SM Ack Request received while SM is not enabled");
1107                            }
1108                            break;
1109                         default:
1110                             parseAndProcessNonza(parser);
1111                             break;
1112                        }
1113                        break;
1114                    case END_ELEMENT:
1115                        final String endTagName = parser.getName();
1116                        if ("stream".equals(endTagName)) {
1117                            if (!parser.getNamespace().equals("http://etherx.jabber.org/streams")) {
1118                                LOGGER.warning(XMPPTCPConnection.this +  " </stream> but different namespace " + parser.getNamespace());
1119                                break;
1120                            }
1121
1122                            // Check if the queue was already shut down before reporting success on closing stream tag
1123                            // received. This avoids a race if there is a disconnect(), followed by a connect(), which
1124                            // did re-start the queue again, causing this writer to assume that the queue is not
1125                            // shutdown, which results in a call to disconnect().
1126                            final boolean queueWasShutdown = packetWriter.queue.isShutdown();
1127                            closingStreamReceived = true;
1128                            notifyWaitingThreads();
1129
1130                            if (queueWasShutdown) {
1131                                // We received a closing stream element *after* we initiated the
1132                                // termination of the session by sending a closing stream element to
1133                                // the server first
1134                                return;
1135                            } else {
1136                                // We received a closing stream element from the server without us
1137                                // sending a closing stream element first. This means that the
1138                                // server wants to terminate the session, therefore disconnect
1139                                // the connection
1140                                LOGGER.info(XMPPTCPConnection.this
1141                                                + " received closing </stream> element."
1142                                                + " Server wants to terminate the connection, calling disconnect()");
1143                                ASYNC_BUT_ORDERED.performAsyncButOrdered(XMPPTCPConnection.this, new Runnable() {
1144                                    @Override
1145                                    public void run() {
1146                                        disconnect();
1147                                    }});
1148                            }
1149                        }
1150                        break;
1151                    case END_DOCUMENT:
1152                        // END_DOCUMENT only happens in an error case, as otherwise we would see a
1153                        // closing stream element before.
1154                        throw new SmackException.SmackMessageException(
1155                                        "Parser got END_DOCUMENT event. This could happen e.g. if the server closed the connection without sending a closing stream element");
1156                    default:
1157                        // Catch all for incomplete switch (MissingCasesInEnumSwitch) statement.
1158                        break;
1159                    }
1160                    eventType = parser.next();
1161                }
1162            }
1163            catch (Exception e) {
1164                // Set running to false since this thread will exit here and notifyConnectionError() will wait until
1165                // the reader and writer thread's 'running' value is false. Hence we need to set it to false before calling
1166                // notifyConnetctionError() below, even though run() also sets it to false. Therefore, do not remove this.
1167                running = false;
1168
1169                String ignoreReasonThread = null;
1170
1171                boolean writerThreadWasShutDown = packetWriter.queue.isShutdown();
1172                if (writerThreadWasShutDown) {
1173                    ignoreReasonThread = "writer";
1174                } else if (done) {
1175                    ignoreReasonThread = "reader";
1176                }
1177
1178                if (ignoreReasonThread != null) {
1179                    LOGGER.log(Level.FINER, "Ignoring " + e + " as " + ignoreReasonThread + " was already shut down");
1180                    return;
1181                }
1182
1183                // Close the connection and notify connection listeners of the error.
1184                notifyConnectionError(e);
1185            }
1186        }
1187    }
1188
1189    protected class PacketWriter {
1190        public static final int QUEUE_SIZE = XMPPTCPConnection.QUEUE_SIZE;
1191        public static final int UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE = 1024;
1192        public static final int UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE_HIGH_WATER_MARK = (int) (0.3 * UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE);
1193
1194        private final String threadName = "Smack Writer (" + getConnectionCounter() + ')';
1195
1196        private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<>(
1197                        QUEUE_SIZE, true);
1198
1199        /**
1200         * If set, the stanza writer is shut down
1201         */
1202        protected volatile Long shutdownTimestamp = null;
1203
1204        private volatile boolean instantShutdown;
1205
1206        /**
1207         * True if some preconditions are given to start the bundle and defer mechanism.
1208         * <p>
1209         * This will likely get set to true right after the start of the writer thread, because
1210         * {@link #nextStreamElement()} will check if {@link queue} is empty, which is probably the case, and then set
1211         * this field to true.
1212         * </p>
1213         */
1214        private boolean shouldBundleAndDefer;
1215
1216        private boolean running;
1217
1218        /**
1219        * Initializes the writer in order to be used. It is called at the first connection and also
1220        * is invoked if the connection is disconnected by an error.
1221        */
1222        void init() {
1223            shutdownTimestamp = null;
1224
1225            if (unacknowledgedStanzas != null) {
1226                // It's possible that there are new stanzas in the writer queue that
1227                // came in while we were disconnected but resumable, drain those into
1228                // the unacknowledged queue so that they get resent now
1229                drainWriterQueueToUnacknowledgedStanzas();
1230            }
1231
1232            queue.start();
1233            running = true;
1234            Async.go(new Runnable() {
1235                @Override
1236                public void run() {
1237                    LOGGER.finer(threadName + " start");
1238                    try {
1239                        writePackets();
1240                    } finally {
1241                        LOGGER.finer(threadName + " exit");
1242                        running = false;
1243                        notifyWaitingThreads();
1244                    }
1245                }
1246            }, threadName);
1247        }
1248
1249        private boolean done() {
1250            return shutdownTimestamp != null;
1251        }
1252
1253        protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException {
1254            final boolean done = done();
1255            if (done) {
1256                final boolean smResumptionPossible = isSmResumptionPossible();
1257                // Don't throw a NotConnectedException is there is an resumable stream available
1258                if (!smResumptionPossible) {
1259                    throw new NotConnectedException(XMPPTCPConnection.this, "done=" + done
1260                                    + " smResumptionPossible=" + smResumptionPossible);
1261                }
1262            }
1263        }
1264
1265        /**
1266         * Sends the specified element to the server.
1267         *
1268         * @param element the element to send.
1269         * @throws NotConnectedException if the XMPP connection is not connected.
1270         * @throws InterruptedException if the calling thread was interrupted.
1271         */
1272        protected void sendStreamElement(Element element) throws NotConnectedException, InterruptedException {
1273            throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
1274            try {
1275                queue.put(element);
1276            }
1277            catch (InterruptedException e) {
1278                // put() may throw an InterruptedException for two reasons:
1279                // 1. If the queue was shut down
1280                // 2. If the thread was interrupted
1281                // so we have to check which is the case
1282                throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
1283                // If the method above did not throw, then the sending thread was interrupted
1284                throw e;
1285            }
1286        }
1287
1288        /**
1289         * Sends the specified element to the server.
1290         *
1291         * @param element the element to send.
1292         * @throws NotConnectedException if the XMPP connection is not connected.
1293         * @throws OutgoingQueueFullException if there is no space in the outgoing queue.
1294         */
1295        protected void sendNonBlocking(Element element) throws NotConnectedException, OutgoingQueueFullException {
1296            throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
1297            boolean enqueued = queue.offer(element);
1298            if (!enqueued) {
1299                throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
1300                throw new OutgoingQueueFullException();
1301            }
1302        }
1303
1304        /**
1305         * Shuts down the stanza writer. Once this method has been called, no further
1306         * packets will be written to the server.
1307         */
1308        void shutdown(boolean instant) {
1309            instantShutdown = instant;
1310            queue.shutdown();
1311            shutdownTimestamp = System.currentTimeMillis();
1312        }
1313
1314        /**
1315         * Maybe return the next available element from the queue for writing. If the queue is shut down <b>or</b> a
1316         * spurious interrupt occurs, <code>null</code> is returned. So it is important to check the 'done' condition in
1317         * that case.
1318         *
1319         * @return the next element for writing or null.
1320         */
1321        private Element nextStreamElement() {
1322            // It is important the we check if the queue is empty before removing an element from it
1323            if (queue.isEmpty()) {
1324                shouldBundleAndDefer = true;
1325            }
1326            Element packet = null;
1327            try {
1328                packet = queue.take();
1329            }
1330            catch (InterruptedException e) {
1331                if (!queue.isShutdown()) {
1332                    // Users shouldn't try to interrupt the packet writer thread
1333                    LOGGER.log(Level.WARNING, "Writer thread was interrupted. Don't do that. Use disconnect() instead.", e);
1334                }
1335            }
1336            return packet;
1337        }
1338
1339        private void writePackets() {
1340            try {
1341                // Write out packets from the queue.
1342                while (!done()) {
1343                    Element element = nextStreamElement();
1344                    if (element == null) {
1345                        continue;
1346                    }
1347
1348                    // Get a local version of the bundle and defer callback, in case it's unset
1349                    // between the null check and the method invocation
1350                    final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback;
1351                    // If the preconditions are given (e.g. bundleAndDefer callback is set, queue is
1352                    // empty), then we could wait a bit for further stanzas attempting to decrease
1353                    // our energy consumption
1354                    if (localBundleAndDeferCallback != null && isAuthenticated() && shouldBundleAndDefer) {
1355                        // Reset shouldBundleAndDefer to false, nextStreamElement() will set it to true once the
1356                        // queue is empty again.
1357                        shouldBundleAndDefer = false;
1358                        final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean();
1359                        final int bundleAndDeferMillis = localBundleAndDeferCallback.getBundleAndDeferMillis(new BundleAndDefer(
1360                                        bundlingAndDeferringStopped));
1361                        if (bundleAndDeferMillis > 0) {
1362                            long remainingWait = bundleAndDeferMillis;
1363                            final long waitStart = System.currentTimeMillis();
1364                            synchronized (bundlingAndDeferringStopped) {
1365                                while (!bundlingAndDeferringStopped.get() && remainingWait > 0) {
1366                                    bundlingAndDeferringStopped.wait(remainingWait);
1367                                    remainingWait = bundleAndDeferMillis
1368                                                    - (System.currentTimeMillis() - waitStart);
1369                                }
1370                            }
1371                        }
1372                    }
1373
1374                    Stanza packet = null;
1375                    if (element instanceof Stanza) {
1376                        packet = (Stanza) element;
1377                    }
1378                    else if (element instanceof Enable) {
1379                        // The client needs to add messages to the unacknowledged stanzas queue
1380                        // right after it sent 'enabled'. Stanza will be added once
1381                        // unacknowledgedStanzas is not null.
1382                        unacknowledgedStanzas = new ArrayBlockingQueue<>(UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE);
1383                    }
1384                    maybeAddToUnacknowledgedStanzas(packet);
1385
1386                    CharSequence elementXml = element.toXML(outgoingStreamXmlEnvironment);
1387                    if (elementXml instanceof XmlStringBuilder) {
1388                        try {
1389                            ((XmlStringBuilder) elementXml).write(writer, outgoingStreamXmlEnvironment);
1390                        } catch (NullPointerException npe) {
1391                            LOGGER.log(Level.FINE, "NPE in XmlStringBuilder of " + element.getClass() + ": " + element, npe);
1392                            throw npe;
1393                        }
1394                    }
1395                    else {
1396                        writer.write(elementXml.toString());
1397                    }
1398
1399                    if (queue.isEmpty()) {
1400                        writer.flush();
1401                    }
1402                    if (packet != null) {
1403                        firePacketSendingListeners(packet);
1404                    }
1405                }
1406                if (!instantShutdown) {
1407                    // Flush out the rest of the queue.
1408                    try {
1409                        while (!queue.isEmpty()) {
1410                            Element packet = queue.remove();
1411                            if (packet instanceof Stanza) {
1412                                Stanza stanza = (Stanza) packet;
1413                                maybeAddToUnacknowledgedStanzas(stanza);
1414                            }
1415                            writer.write(packet.toXML().toString());
1416                        }
1417                    }
1418                    catch (Exception e) {
1419                        LOGGER.log(Level.WARNING,
1420                                        "Exception flushing queue during shutdown, ignore and continue",
1421                                        e);
1422                    }
1423
1424                    // Close the stream.
1425                    try {
1426                        writer.write("</stream:stream>");
1427                        writer.flush();
1428                    }
1429                    catch (Exception e) {
1430                        LOGGER.log(Level.WARNING, "Exception writing closing stream element", e);
1431                    }
1432
1433                    // Delete the queue contents (hopefully nothing is left).
1434                    queue.clear();
1435                } else if (instantShutdown && isSmEnabled()) {
1436                    // This was an instantShutdown and SM is enabled, drain all remaining stanzas
1437                    // into the unacknowledgedStanzas queue
1438                    drainWriterQueueToUnacknowledgedStanzas();
1439                }
1440                // Do *not* close the writer here, as it will cause the socket
1441                // to get closed. But we may want to receive further stanzas
1442                // until the closing stream tag is received. The socket will be
1443                // closed in shutdown().
1444            }
1445            catch (Exception e) {
1446                // The exception can be ignored if the connection is 'done'
1447                // or if the it was caused because the socket got closed
1448                if (!(done() || queue.isShutdown())) {
1449                    // Set running to false since this thread will exit here and notifyConnectionError() will wait until
1450                    // the reader and writer thread's 'running' value is false.
1451                    running = false;
1452                    notifyConnectionError(e);
1453                } else {
1454                    LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e);
1455                }
1456            }
1457        }
1458
1459        private void drainWriterQueueToUnacknowledgedStanzas() {
1460            List<Element> elements = new ArrayList<>(queue.size());
1461            queue.drainTo(elements);
1462            for (int i = 0; i < elements.size(); i++) {
1463                Element element = elements.get(i);
1464                // If the unacknowledgedStanza queue is full, then bail out with a warning message. See SMACK-844.
1465                if (unacknowledgedStanzas.remainingCapacity() == 0) {
1466                    StreamManagementException.UnacknowledgedQueueFullException exception = StreamManagementException.UnacknowledgedQueueFullException
1467                            .newWith(i, elements, unacknowledgedStanzas);
1468                    LOGGER.log(Level.WARNING,
1469                            "Some stanzas may be lost as not all could be drained to the unacknowledged stanzas queue", exception);
1470                    return;
1471                }
1472                if (element instanceof Stanza) {
1473                    unacknowledgedStanzas.add((Stanza) element);
1474                }
1475            }
1476        }
1477
1478        private void maybeAddToUnacknowledgedStanzas(Stanza stanza) throws IOException {
1479            // Check if the stream element should be put to the unacknowledgedStanza
1480            // queue. Note that we can not do the put() in sendStanzaInternal() and the
1481            // packet order is not stable at this point (sendStanzaInternal() can be
1482            // called concurrently).
1483            if (unacknowledgedStanzas != null && stanza != null) {
1484                // If the unacknowledgedStanza queue reaching its high water mark, request an new ack
1485                // from the server in order to drain it
1486                if (unacknowledgedStanzas.size() == UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE_HIGH_WATER_MARK) {
1487                    writer.write(AckRequest.INSTANCE.toXML().toString());
1488                }
1489
1490                try {
1491                    // It is important the we put the stanza in the unacknowledged stanza
1492                    // queue before we put it on the wire
1493                    unacknowledgedStanzas.put(stanza);
1494                }
1495                catch (InterruptedException e) {
1496                    throw new IllegalStateException(e);
1497                }
1498            }
1499        }
1500    }
1501
1502    /**
1503     * Set if Stream Management should be used by default for new connections.
1504     *
1505     * @param useSmDefault true to use Stream Management for new connections.
1506     */
1507    public static void setUseStreamManagementDefault(boolean useSmDefault) {
1508        XMPPTCPConnection.useSmDefault = useSmDefault;
1509    }
1510
1511    /**
1512     * Set if Stream Management resumption should be used by default for new connections.
1513     *
1514     * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
1515     * @deprecated use {@link #setUseStreamManagementResumptionDefault(boolean)} instead.
1516     */
1517    @Deprecated
1518    public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) {
1519        setUseStreamManagementResumptionDefault(useSmResumptionDefault);
1520    }
1521
1522    /**
1523     * Set if Stream Management resumption should be used by default for new connections.
1524     *
1525     * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
1526     */
1527    public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) {
1528        if (useSmResumptionDefault) {
1529            // Also enable SM is resumption is enabled
1530            setUseStreamManagementDefault(useSmResumptionDefault);
1531        }
1532        XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault;
1533    }
1534
1535    /**
1536     * Set if Stream Management should be used if supported by the server.
1537     *
1538     * @param useSm true to use Stream Management.
1539     */
1540    public void setUseStreamManagement(boolean useSm) {
1541        this.useSm = useSm;
1542    }
1543
1544    /**
1545     * Set if Stream Management resumption should be used if supported by the server.
1546     *
1547     * @param useSmResumption true to use Stream Management resumption.
1548     */
1549    public void setUseStreamManagementResumption(boolean useSmResumption) {
1550        if (useSmResumption) {
1551            // Also enable SM is resumption is enabled
1552            setUseStreamManagement(useSmResumption);
1553        }
1554        this.useSmResumption = useSmResumption;
1555    }
1556
1557    /**
1558     * Set the preferred resumption time in seconds.
1559     * @param resumptionTime the preferred resumption time in seconds
1560     */
1561    public void setPreferredResumptionTime(int resumptionTime) {
1562        smClientMaxResumptionTime = resumptionTime;
1563    }
1564
1565    /**
1566     * Add a predicate for Stream Management acknowledgment requests.
1567     * <p>
1568     * Those predicates are used to determine when a Stream Management acknowledgement request is send to the server.
1569     * Some pre-defined predicates are found in the <code>org.jivesoftware.smack.sm.predicates</code> package.
1570     * </p>
1571     * <p>
1572     * If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used.
1573     * </p>
1574     *
1575     * @param predicate the predicate to add.
1576     * @return if the predicate was not already active.
1577     */
1578    public boolean addRequestAckPredicate(StanzaFilter predicate) {
1579        synchronized (requestAckPredicates) {
1580            return requestAckPredicates.add(predicate);
1581        }
1582    }
1583
1584    /**
1585     * Remove the given predicate for Stream Management acknowledgment request.
1586     * @param predicate the predicate to remove.
1587     * @return true if the predicate was removed.
1588     */
1589    public boolean removeRequestAckPredicate(StanzaFilter predicate) {
1590        synchronized (requestAckPredicates) {
1591            return requestAckPredicates.remove(predicate);
1592        }
1593    }
1594
1595    /**
1596     * Remove all predicates for Stream Management acknowledgment requests.
1597     */
1598    public void removeAllRequestAckPredicates() {
1599        synchronized (requestAckPredicates) {
1600            requestAckPredicates.clear();
1601        }
1602    }
1603
1604    /**
1605     * Send an unconditional Stream Management acknowledgement request to the server.
1606     *
1607     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1608     * @throws NotConnectedException if the connection is not connected.
1609     * @throws InterruptedException if the calling thread was interrupted.
1610     */
1611    public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException {
1612        if (!isSmEnabled()) {
1613            throw new StreamManagementException.StreamManagementNotEnabledException();
1614        }
1615        requestSmAcknowledgementInternal();
1616    }
1617
1618    private void requestSmAcknowledgementInternal() throws NotConnectedException, InterruptedException {
1619        packetWriter.sendStreamElement(AckRequest.INSTANCE);
1620    }
1621
1622    private void requestSmAcknowledgementNonBlockingInternal() throws NotConnectedException, OutgoingQueueFullException {
1623        packetWriter.sendNonBlocking(AckRequest.INSTANCE);
1624    }
1625
1626    /**
1627     * Send a unconditional Stream Management acknowledgment to the server.
1628     * <p>
1629     * See <a href="http://xmpp.org/extensions/xep-0198.html#acking">XEP-198: Stream Management ยง 4. Acks</a>:
1630     * "Either party MAY send an &lt;a/&gt; element at any time (e.g., after it has received a certain number of stanzas,
1631     * or after a certain period of time), even if it has not received an &lt;r/&gt; element from the other party."
1632     * </p>
1633     *
1634     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1635     * @throws NotConnectedException if the connection is not connected.
1636     * @throws InterruptedException if the calling thread was interrupted.
1637     */
1638    public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException {
1639        if (!isSmEnabled()) {
1640            throw new StreamManagementException.StreamManagementNotEnabledException();
1641        }
1642        sendSmAcknowledgementInternal();
1643    }
1644
1645    private void sendSmAcknowledgementInternal() throws NotConnectedException, InterruptedException {
1646        AckAnswer ackAnswer = new AckAnswer(clientHandledStanzasCount);
1647        // Do net put an ack to the queue if it has already been shutdown. Some servers, like ejabberd, like to request
1648        // an ack even after we have send a stream close (and hance the queue was shutdown). If we would not check here,
1649        // then the ack would dangle around in the queue, and be send on the next re-connection attempt even before the
1650        // stream open.
1651        packetWriter.queue.putIfNotShutdown(ackAnswer);
1652    }
1653
1654    /**
1655     * Add a Stanza acknowledged listener.
1656     * <p>
1657     * Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get
1658     * automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, StanzaListener)} when
1659     * possible.
1660     * </p>
1661     *
1662     * @param listener the listener to add.
1663     */
1664    public void addStanzaAcknowledgedListener(StanzaListener listener) {
1665        stanzaAcknowledgedListeners.add(listener);
1666    }
1667
1668    /**
1669     * Remove the given Stanza acknowledged listener.
1670     *
1671     * @param listener the listener.
1672     * @return true if the listener was removed.
1673     */
1674    public boolean removeStanzaAcknowledgedListener(StanzaListener listener) {
1675        return stanzaAcknowledgedListeners.remove(listener);
1676    }
1677
1678    /**
1679     * Remove all stanza acknowledged listeners.
1680     */
1681    public void removeAllStanzaAcknowledgedListeners() {
1682        stanzaAcknowledgedListeners.clear();
1683    }
1684
1685    /**
1686     * Add a Stanza dropped listener.
1687     * <p>
1688     * Those listeners will be invoked every time a Stanza has been dropped due to a failed SM resume. They will not get
1689     * automatically removed. If at least one StanzaDroppedListener is configured, no attempt will be made to retransmit
1690     * the Stanzas.
1691     * </p>
1692     *
1693     * @param listener the listener to add.
1694     * @since 4.3.3
1695     */
1696    public void addStanzaDroppedListener(StanzaListener listener) {
1697        stanzaDroppedListeners.add(listener);
1698    }
1699
1700    /**
1701     * Remove the given Stanza dropped listener.
1702     *
1703     * @param listener the listener.
1704     * @return true if the listener was removed.
1705     * @since 4.3.3
1706     */
1707    public boolean removeStanzaDroppedListener(StanzaListener listener) {
1708        return stanzaDroppedListeners.remove(listener);
1709    }
1710
1711    /**
1712     * Add a new Stanza ID acknowledged listener for the given ID.
1713     * <p>
1714     * The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will
1715     * automatically be removed after the listener was run.
1716     * </p>
1717     *
1718     * @param id the stanza ID.
1719     * @param listener the listener to invoke.
1720     * @return the previous listener for this stanza ID or null.
1721     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1722     */
1723    @SuppressWarnings("FutureReturnValueIgnored")
1724    public StanzaListener addStanzaIdAcknowledgedListener(final String id, StanzaListener listener) throws StreamManagementNotEnabledException {
1725        // Prevent users from adding callbacks that will never get removed
1726        if (!smWasEnabledAtLeastOnce) {
1727            throw new StreamManagementException.StreamManagementNotEnabledException();
1728        }
1729        // Remove the listener after max. 3 hours
1730        final int removeAfterSeconds = Math.min(getMaxSmResumptionTime(), 3 * 60 * 60);
1731        schedule(new Runnable() {
1732            @Override
1733            public void run() {
1734                stanzaIdAcknowledgedListeners.remove(id);
1735            }
1736        }, removeAfterSeconds, TimeUnit.SECONDS);
1737        return stanzaIdAcknowledgedListeners.put(id, listener);
1738    }
1739
1740    /**
1741     * Remove the Stanza ID acknowledged listener for the given ID.
1742     *
1743     * @param id the stanza ID.
1744     * @return true if the listener was found and removed, false otherwise.
1745     */
1746    public StanzaListener removeStanzaIdAcknowledgedListener(String id) {
1747        return stanzaIdAcknowledgedListeners.remove(id);
1748    }
1749
1750    /**
1751     * Removes all Stanza ID acknowledged listeners.
1752     */
1753    public void removeAllStanzaIdAcknowledgedListeners() {
1754        stanzaIdAcknowledgedListeners.clear();
1755    }
1756
1757    /**
1758     * Returns true if Stream Management is supported by the server.
1759     *
1760     * @return true if Stream Management is supported by the server.
1761     */
1762    public boolean isSmAvailable() {
1763        return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE);
1764    }
1765
1766    /**
1767     * Returns true if Stream Management was successfully negotiated with the server.
1768     *
1769     * @return true if Stream Management was negotiated.
1770     */
1771    public boolean isSmEnabled() {
1772        return smEnabledSyncPoint;
1773    }
1774
1775    /**
1776     * Returns true if the stream was successfully resumed with help of Stream Management.
1777     *
1778     * @return true if the stream was resumed.
1779     */
1780    public boolean streamWasResumed() {
1781        return smResumedSyncPoint == SyncPointState.successful;
1782    }
1783
1784    /**
1785     * Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible.
1786     *
1787     * @return true if disconnected but resumption possible.
1788     */
1789    public boolean isDisconnectedButSmResumptionPossible() {
1790        return disconnectedButResumeable && isSmResumptionPossible();
1791    }
1792
1793    /**
1794     * Returns true if the stream is resumable.
1795     *
1796     * @return true if the stream is resumable.
1797     */
1798    public boolean isSmResumptionPossible() {
1799        // There is no resumable stream available
1800        if (smSessionId == null)
1801            return false;
1802
1803        final Long shutdownTimestamp = packetWriter.shutdownTimestamp;
1804        // Seems like we are already reconnected, report true
1805        if (shutdownTimestamp == null) {
1806            return true;
1807        }
1808
1809        // See if resumption time is over
1810        long current = System.currentTimeMillis();
1811        long maxResumptionMillies = ((long) getMaxSmResumptionTime()) * 1000;
1812        if (current > shutdownTimestamp + maxResumptionMillies) {
1813            // Stream resumption is *not* possible if the current timestamp is greater then the greatest timestamp where
1814            // resumption is possible
1815            return false;
1816        } else {
1817            return true;
1818        }
1819    }
1820
1821    /**
1822     * Drop the stream management state. Sets {@link #smSessionId} and
1823     * {@link #unacknowledgedStanzas} to <code>null</code>.
1824     */
1825    private void dropSmState() {
1826        // clientHandledCount and serverHandledCount will be reset on <enable/> and <enabled/>
1827        // respective. No need to reset them here.
1828        smSessionId = null;
1829        unacknowledgedStanzas = null;
1830    }
1831
1832    /**
1833     * Get the maximum resumption time in seconds after which a managed stream can be resumed.
1834     * <p>
1835     * This method will return {@link Integer#MAX_VALUE} if neither the client nor the server specify a maximum
1836     * resumption time. Be aware of integer overflows when using this value, e.g. do not add arbitrary values to it
1837     * without checking for overflows before.
1838     * </p>
1839     *
1840     * @return the maximum resumption time in seconds or {@link Integer#MAX_VALUE} if none set.
1841     */
1842    public int getMaxSmResumptionTime() {
1843        int clientResumptionTime = smClientMaxResumptionTime > 0 ? smClientMaxResumptionTime : Integer.MAX_VALUE;
1844        int serverResumptionTime = smServerMaxResumptionTime > 0 ? smServerMaxResumptionTime : Integer.MAX_VALUE;
1845        return Math.min(clientResumptionTime, serverResumptionTime);
1846    }
1847
1848    private void processHandledCount(long handledCount) throws StreamManagementCounterError {
1849        long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount);
1850        final List<Stanza> ackedStanzas = new ArrayList<>(
1851                        ackedStanzasCount <= Integer.MAX_VALUE ? (int) ackedStanzasCount
1852                                        : Integer.MAX_VALUE);
1853        for (long i = 0; i < ackedStanzasCount; i++) {
1854            Stanza ackedStanza = unacknowledgedStanzas.poll();
1855            // If the server ack'ed a stanza, then it must be in the
1856            // unacknowledged stanza queue. There can be no exception.
1857            if (ackedStanza == null) {
1858                throw new StreamManagementCounterError(handledCount, serverHandledStanzasCount,
1859                                ackedStanzasCount, ackedStanzas);
1860            }
1861            ackedStanzas.add(ackedStanza);
1862        }
1863
1864        boolean atLeastOneStanzaAcknowledgedListener = false;
1865        if (!stanzaAcknowledgedListeners.isEmpty()) {
1866            // If stanzaAcknowledgedListeners is not empty, the we have at least one
1867            atLeastOneStanzaAcknowledgedListener = true;
1868        }
1869        else {
1870            // Otherwise we look for a matching id in the stanza *id* acknowledged listeners
1871            for (Stanza ackedStanza : ackedStanzas) {
1872                String id = ackedStanza.getStanzaId();
1873                if (id != null && stanzaIdAcknowledgedListeners.containsKey(id)) {
1874                    atLeastOneStanzaAcknowledgedListener = true;
1875                    break;
1876                }
1877            }
1878        }
1879
1880        // Only spawn a new thread if there is a chance that some listener is invoked
1881        if (atLeastOneStanzaAcknowledgedListener) {
1882            asyncGo(new Runnable() {
1883                @Override
1884                public void run() {
1885                    for (Stanza ackedStanza : ackedStanzas) {
1886                        for (StanzaListener listener : stanzaAcknowledgedListeners) {
1887                            try {
1888                                listener.processStanza(ackedStanza);
1889                            }
1890                            catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
1891                                LOGGER.log(Level.FINER, "Received exception", e);
1892                            }
1893                        }
1894                        String id = ackedStanza.getStanzaId();
1895                        if (StringUtils.isNullOrEmpty(id)) {
1896                            continue;
1897                        }
1898                        StanzaListener listener = stanzaIdAcknowledgedListeners.remove(id);
1899                        if (listener != null) {
1900                            try {
1901                                listener.processStanza(ackedStanza);
1902                            }
1903                            catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
1904                                LOGGER.log(Level.FINER, "Received exception", e);
1905                            }
1906                        }
1907                    }
1908                }
1909            });
1910        }
1911
1912        serverHandledStanzasCount = handledCount;
1913    }
1914
1915    /**
1916     * Set the default bundle and defer callback used for new connections.
1917     *
1918     * @param defaultBundleAndDeferCallback TODO javadoc me please
1919     * @see BundleAndDeferCallback
1920     * @since 4.1
1921     */
1922    public static void setDefaultBundleAndDeferCallback(BundleAndDeferCallback defaultBundleAndDeferCallback) {
1923        XMPPTCPConnection.defaultBundleAndDeferCallback = defaultBundleAndDeferCallback;
1924    }
1925
1926    /**
1927     * Set the bundle and defer callback used for this connection.
1928     * <p>
1929     * You can use <code>null</code> as argument to reset the callback. Outgoing stanzas will then
1930     * no longer get deferred.
1931     * </p>
1932     *
1933     * @param bundleAndDeferCallback the callback or <code>null</code>.
1934     * @see BundleAndDeferCallback
1935     * @since 4.1
1936     */
1937    public void setBundleandDeferCallback(BundleAndDeferCallback bundleAndDeferCallback) {
1938        this.bundleAndDeferCallback = bundleAndDeferCallback;
1939    }
1940
1941
1942    /**
1943     * Returns the local address currently in use for this connection.
1944     *
1945     * @return the local address
1946     */
1947    @Override
1948    public InetAddress getLocalAddress() {
1949        final Socket socket = this.socket;
1950        if (socket == null) return null;
1951
1952        InetAddress localAddress = socket.getLocalAddress();
1953        if (localAddress.isAnyLocalAddress()) return null;
1954
1955        return localAddress;
1956    }
1957}