XMPPTCPConnection.java

  1. /**
  2.  *
  3.  * Copyright 2003-2007 Jive Software.
  4.  *
  5.  * Licensed under the Apache License, Version 2.0 (the "License");
  6.  * you may not use this file except in compliance with the License.
  7.  * You may obtain a copy of the License at
  8.  *
  9.  *     http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */
  17. package org.jivesoftware.smack.tcp;

  18. import java.io.BufferedReader;
  19. import java.io.IOException;
  20. import java.io.InputStream;
  21. import java.io.InputStreamReader;
  22. import java.io.OutputStream;
  23. import java.io.OutputStreamWriter;
  24. import java.io.Writer;
  25. import java.net.InetAddress;
  26. import java.net.InetSocketAddress;
  27. import java.net.Socket;
  28. import java.security.cert.CertificateException;
  29. import java.util.ArrayList;
  30. import java.util.Collection;
  31. import java.util.Iterator;
  32. import java.util.LinkedHashSet;
  33. import java.util.LinkedList;
  34. import java.util.List;
  35. import java.util.Map;
  36. import java.util.Set;
  37. import java.util.concurrent.ArrayBlockingQueue;
  38. import java.util.concurrent.BlockingQueue;
  39. import java.util.concurrent.ConcurrentHashMap;
  40. import java.util.concurrent.ConcurrentLinkedQueue;
  41. import java.util.concurrent.TimeUnit;
  42. import java.util.concurrent.atomic.AtomicBoolean;
  43. import java.util.logging.Level;
  44. import java.util.logging.Logger;

  45. import javax.net.SocketFactory;
  46. import javax.net.ssl.HostnameVerifier;
  47. import javax.net.ssl.SSLSession;
  48. import javax.net.ssl.SSLSocket;
  49. import javax.net.ssl.SSLSocketFactory;

  50. import org.jivesoftware.smack.AbstractXMPPConnection;
  51. import org.jivesoftware.smack.ConnectionConfiguration;
  52. import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
  53. import org.jivesoftware.smack.ConnectionListener;
  54. import org.jivesoftware.smack.SmackConfiguration;
  55. import org.jivesoftware.smack.SmackException;
  56. import org.jivesoftware.smack.SmackException.AlreadyConnectedException;
  57. import org.jivesoftware.smack.SmackException.AlreadyLoggedInException;
  58. import org.jivesoftware.smack.SmackException.ConnectionException;
  59. import org.jivesoftware.smack.SmackException.EndpointConnectionException;
  60. import org.jivesoftware.smack.SmackException.NotConnectedException;
  61. import org.jivesoftware.smack.SmackException.NotLoggedInException;
  62. import org.jivesoftware.smack.SmackException.OutgoingQueueFullException;
  63. import org.jivesoftware.smack.SmackException.SecurityNotPossibleException;
  64. import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException;
  65. import org.jivesoftware.smack.SmackFuture;
  66. import org.jivesoftware.smack.StanzaListener;
  67. import org.jivesoftware.smack.XMPPConnection;
  68. import org.jivesoftware.smack.XMPPException;
  69. import org.jivesoftware.smack.XMPPException.FailedNonzaException;
  70. import org.jivesoftware.smack.XMPPException.StreamErrorException;
  71. import org.jivesoftware.smack.compress.packet.Compress;
  72. import org.jivesoftware.smack.compress.packet.Compressed;
  73. import org.jivesoftware.smack.compression.XMPPInputOutputStream;
  74. import org.jivesoftware.smack.datatypes.UInt16;
  75. import org.jivesoftware.smack.filter.StanzaFilter;
  76. import org.jivesoftware.smack.internal.SmackTlsContext;
  77. import org.jivesoftware.smack.packet.Element;
  78. import org.jivesoftware.smack.packet.IQ;
  79. import org.jivesoftware.smack.packet.Message;
  80. import org.jivesoftware.smack.packet.Presence;
  81. import org.jivesoftware.smack.packet.Stanza;
  82. import org.jivesoftware.smack.packet.StartTls;
  83. import org.jivesoftware.smack.packet.StreamError;
  84. import org.jivesoftware.smack.packet.StreamOpen;
  85. import org.jivesoftware.smack.packet.TopLevelStreamElement;
  86. import org.jivesoftware.smack.proxy.ProxyInfo;
  87. import org.jivesoftware.smack.sasl.packet.SaslNonza;
  88. import org.jivesoftware.smack.sm.SMUtils;
  89. import org.jivesoftware.smack.sm.StreamManagementException;
  90. import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException;
  91. import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError;
  92. import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException;
  93. import org.jivesoftware.smack.sm.packet.StreamManagement;
  94. import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer;
  95. import org.jivesoftware.smack.sm.packet.StreamManagement.AckRequest;
  96. import org.jivesoftware.smack.sm.packet.StreamManagement.Enable;
  97. import org.jivesoftware.smack.sm.packet.StreamManagement.Enabled;
  98. import org.jivesoftware.smack.sm.packet.StreamManagement.Failed;
  99. import org.jivesoftware.smack.sm.packet.StreamManagement.Resume;
  100. import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed;
  101. import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature;
  102. import org.jivesoftware.smack.sm.predicates.Predicate;
  103. import org.jivesoftware.smack.sm.provider.ParseStreamManagement;
  104. import org.jivesoftware.smack.tcp.rce.RemoteXmppTcpConnectionEndpoints;
  105. import org.jivesoftware.smack.tcp.rce.Rfc6120TcpRemoteConnectionEndpoint;
  106. import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
  107. import org.jivesoftware.smack.util.Async;
  108. import org.jivesoftware.smack.util.CloseableUtil;
  109. import org.jivesoftware.smack.util.PacketParserUtils;
  110. import org.jivesoftware.smack.util.StringUtils;
  111. import org.jivesoftware.smack.util.TLSUtils;
  112. import org.jivesoftware.smack.util.XmlStringBuilder;
  113. import org.jivesoftware.smack.util.rce.RemoteConnectionException;
  114. import org.jivesoftware.smack.xml.SmackXmlParser;
  115. import org.jivesoftware.smack.xml.XmlPullParser;
  116. import org.jivesoftware.smack.xml.XmlPullParserException;

  117. import org.jxmpp.jid.impl.JidCreate;
  118. import org.jxmpp.jid.parts.Resourcepart;
  119. import org.jxmpp.stringprep.XmppStringprepException;
  120. import org.minidns.dnsname.DnsName;

  121. /**
  122.  * Creates a socket connection to an XMPP server. This is the default connection
  123.  * to an XMPP server and is specified in the XMPP Core (RFC 6120).
  124.  *
  125.  * @see XMPPConnection
  126.  * @author Matt Tucker
  127.  */
  128. public class XMPPTCPConnection extends AbstractXMPPConnection {

  129.     private static final int QUEUE_SIZE = 500;
  130.     private static final Logger LOGGER = Logger.getLogger(XMPPTCPConnection.class.getName());

  131.     /**
  132.      * The socket which is used for this connection.
  133.      */
  134.     private Socket socket;

  135.     /**
  136.      *
  137.      */
  138.     private boolean disconnectedButResumeable = false;

  139.     private SSLSocket secureSocket;

  140.     /**
  141.      * Protected access level because of unit test purposes
  142.      */
  143.     protected final PacketWriter packetWriter = new PacketWriter();

  144.     /**
  145.      * Protected access level because of unit test purposes
  146.      */
  147.     protected final PacketReader packetReader = new PacketReader();

  148.     /**
  149.      *
  150.      */
  151.     private boolean streamFeaturesAfterAuthenticationReceived;

  152.     /**
  153.      *
  154.      */
  155.     private boolean compressSyncPoint;

  156.     /**
  157.      * The default bundle and defer callback, used for new connections.
  158.      * @see bundleAndDeferCallback
  159.      */
  160.     private static BundleAndDeferCallback defaultBundleAndDeferCallback;

  161.     /**
  162.      * The used bundle and defer callback.
  163.      * <p>
  164.      * Although this field may be set concurrently, the 'volatile' keyword was deliberately not added, in order to avoid
  165.      * having a 'volatile' read within the writer threads loop.
  166.      * </p>
  167.      */
  168.     private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback;

  169.     private static boolean useSmDefault = true;

  170.     private static boolean useSmResumptionDefault = true;

  171.     /**
  172.      * The stream ID of the stream that is currently resumable, i.e. the stream we hold the state
  173.      * for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and
  174.      * {@link #unacknowledgedStanzas}.
  175.      */
  176.     private String smSessionId;

  177.     /**
  178.      * Represents the state of stream management resumption.
  179.      * <p>
  180.      * Unlike other sync points, this sync point is marked volatile because it is also read by the reader thread.
  181.      * </p>
  182.      */
  183.     private volatile SyncPointState smResumedSyncPoint;
  184.     private Failed smResumptionFailed;

  185.     /**
  186.      * Represents the state of stream management.
  187.      * <p>
  188.      * This boolean is marked volatile as it is read by various threads, including the reader thread via {@link #isSmEnabled()}.
  189.      * </p>
  190.      */
  191.     private volatile boolean smEnabledSyncPoint;

  192.     /**
  193.      * The client's preferred maximum resumption time in seconds.
  194.      */
  195.     private int smClientMaxResumptionTime = -1;

  196.     /**
  197.      * The server's preferred maximum resumption time in seconds.
  198.      */
  199.     private int smServerMaxResumptionTime = -1;

  200.     /**
  201.      * Indicates whether Stream Management (XEP-198) should be used if it's supported by the server.
  202.      */
  203.     private boolean useSm = useSmDefault;
  204.     private boolean useSmResumption = useSmResumptionDefault;

  205.     /**
  206.      * The counter that the server sends the client about it's current height. For example, if the server sends
  207.      * {@code <a h='42'/>}, then this will be set to 42 (while also handling the {@link #unacknowledgedStanzas} queue).
  208.      */
  209.     private long serverHandledStanzasCount = 0;

  210.     /**
  211.      * The counter for stanzas handled ("received") by the client.
  212.      * <p>
  213.      * Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are
  214.      * not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since
  215.      * {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and
  216.      * therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic.
  217.      * </p>
  218.      */
  219.     private long clientHandledStanzasCount = 0;

  220.     private BlockingQueue<Stanza> unacknowledgedStanzas;

  221.     /**
  222.      * Set to true if Stream Management was at least once enabled for this connection.
  223.      */
  224.     private boolean smWasEnabledAtLeastOnce = false;

  225.     /**
  226.      * This listeners are invoked for every stanza that got acknowledged.
  227.      * <p>
  228.      * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove
  229.      * themselves after they have been invoked.
  230.      * </p>
  231.      */
  232.     private final Collection<StanzaListener> stanzaAcknowledgedListeners = new ConcurrentLinkedQueue<>();

  233.     /**
  234.      * These listeners are invoked for every stanza that got dropped.
  235.      * <p>
  236.      * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove
  237.      * themselves after they have been invoked.
  238.      * </p>
  239.      */
  240.     private final Collection<StanzaListener> stanzaDroppedListeners = new ConcurrentLinkedQueue<>();

  241.     /**
  242.      * This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will
  243.      * only be invoked once and automatically removed after that.
  244.      */
  245.     private final Map<String, StanzaListener> stanzaIdAcknowledgedListeners = new ConcurrentHashMap<>();

  246.     /**
  247.      * Predicates that determine if an stream management ack should be requested from the server.
  248.      * <p>
  249.      * We use a linked hash set here, so that the order how the predicates are added matches the
  250.      * order in which they are invoked in order to determine if an ack request should be send or not.
  251.      * </p>
  252.      */
  253.     private final Set<StanzaFilter> requestAckPredicates = new LinkedHashSet<>();

  254.     @SuppressWarnings("HidingField")
  255.     private final XMPPTCPConnectionConfiguration config;

  256.     /**
  257.      * Creates a new XMPP connection over TCP (optionally using proxies).
  258.      * <p>
  259.      * Note that XMPPTCPConnection constructors do not establish a connection to the server
  260.      * and you must call {@link #connect()}.
  261.      * </p>
  262.      *
  263.      * @param config the connection configuration.
  264.      */
  265.     public XMPPTCPConnection(XMPPTCPConnectionConfiguration config) {
  266.         super(config);
  267.         this.config = config;
  268.         addConnectionListener(new ConnectionListener() {
  269.             @Override
  270.             public void connectionClosedOnError(Exception e) {
  271.                 if (e instanceof XMPPException.StreamErrorException || e instanceof StreamManagementException) {
  272.                     dropSmState();
  273.                 }
  274.             }
  275.         });

  276.         // Re-init the reader and writer in case of SASL <success/>. This is done to reset the parser since a new stream
  277.         // is initiated.
  278.         buildNonzaCallback().listenFor(SaslNonza.Success.class, s -> resetParser()).install();
  279.     }

  280.     /**
  281.      * Creates a new XMPP connection over TCP.
  282.      * <p>
  283.      * Note that {@code jid} must be the bare JID, e.g. "user@example.org". More fine-grained control over the
  284.      * connection settings is available using the {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)}
  285.      * constructor.
  286.      * </p>
  287.      *
  288.      * @param jid the bare JID used by the client.
  289.      * @param password the password or authentication token.
  290.      * @throws XmppStringprepException if the provided string is invalid.
  291.      */
  292.     public XMPPTCPConnection(CharSequence jid, String password) throws XmppStringprepException {
  293.         this(XMPPTCPConnectionConfiguration.builder().setXmppAddressAndPassword(jid, password).build());
  294.     }

  295.     /**
  296.      * Creates a new XMPP connection over TCP.
  297.      * <p>
  298.      * This is the simplest constructor for connecting to an XMPP server. Alternatively,
  299.      * you can get fine-grained control over connection settings using the
  300.      * {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} constructor.
  301.      * </p>
  302.      * @param username TODO javadoc me please
  303.      * @param password TODO javadoc me please
  304.      * @param serviceName TODO javadoc me please
  305.      * @throws XmppStringprepException if the provided string is invalid.
  306.      */
  307.     public XMPPTCPConnection(CharSequence username, String password, String serviceName) throws XmppStringprepException {
  308.         this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setXmppDomain(
  309.                                         JidCreate.domainBareFrom(serviceName)).build());
  310.     }

  311.     @Override
  312.     protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException {
  313.         if (packetWriter == null) {
  314.             throw new NotConnectedException();
  315.         }
  316.         packetWriter.throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
  317.     }

  318.     @Override
  319.     protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException {
  320.         if (isConnected() && !disconnectedButResumeable) {
  321.             throw new AlreadyConnectedException();
  322.         }
  323.     }

  324.     @Override
  325.     protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException {
  326.         if (isAuthenticated() && !disconnectedButResumeable) {
  327.             throw new AlreadyLoggedInException();
  328.         }
  329.     }

  330.     @Override
  331.     protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException {
  332.         // Reset the flag in case it was set
  333.         disconnectedButResumeable = false;
  334.         super.afterSuccessfulLogin(resumed);
  335.     }

  336.     @Override
  337.     protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException,
  338.                     SmackException, IOException, InterruptedException {
  339.         // Authenticate using SASL
  340.         SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null;

  341.         streamFeaturesAfterAuthenticationReceived = false;
  342.         authenticate(username, password, config.getAuthzid(), sslSession);

  343.         // Wait for stream features after the authentication.
  344.         // TODO: The name of this synchronization point "maybeCompressFeaturesReceived" is not perfect. It should be
  345.         // renamed to "streamFeaturesAfterAuthenticationReceived".
  346.         waitForConditionOrThrowConnectionException(() -> streamFeaturesAfterAuthenticationReceived, "compress features from server");

  347.         // If compression is enabled then request the server to use stream compression. XEP-170
  348.         // recommends to perform stream compression before resource binding.
  349.         maybeEnableCompression();

  350.         smResumedSyncPoint = SyncPointState.initial;
  351.         smResumptionFailed = null;
  352.         if (isSmResumptionPossible()) {
  353.             smResumedSyncPoint = SyncPointState.request_sent;
  354.             sendNonza(new Resume(clientHandledStanzasCount, smSessionId));
  355.             waitForConditionOrThrowConnectionException(() -> smResumedSyncPoint == SyncPointState.successful || smResumptionFailed != null, "resume previous stream");
  356.             if (smResumedSyncPoint == SyncPointState.successful) {
  357.                 // We successfully resumed the stream, be done here
  358.                 afterSuccessfulLogin(true);
  359.                 return;
  360.             }
  361.             // SM resumption failed, what Smack does here is to report success of
  362.             // lastFeaturesReceived in case of sm resumption was answered with 'failed' so that
  363.             // normal resource binding can be tried.
  364.             assert smResumptionFailed != null;
  365.             LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process: " + smResumptionFailed);
  366.         }

  367.         // We either failed to resume a previous stream management (SM) session, or we did not even try. In any case,
  368.         // mark SM as not enabled. Most importantly, we do this prior calling bindResourceAndEstablishSession(), as the
  369.         // bind IQ may trigger a SM ack request, which would be invalid in the pre resource bound state.
  370.         smEnabledSyncPoint = false;

  371.         List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>();
  372.         if (unacknowledgedStanzas != null) {
  373.             // There was a previous connection with SM enabled but that was either not resumable or
  374.             // failed to resume. Make sure that we (re-)send the unacknowledged stanzas.
  375.             unacknowledgedStanzas.drainTo(previouslyUnackedStanzas);
  376.             // Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this
  377.             // XMPP session (There maybe was an enabled in a previous XMPP session of this
  378.             // connection instance though). This is used in writePackets to decide if stanzas should
  379.             // be added to the unacknowledged stanzas queue, because they have to be added right
  380.             // after the 'enable' stream element has been sent.
  381.             dropSmState();
  382.         }

  383.         // Now bind the resource. It is important to do this *after* we dropped an eventually
  384.         // existing Stream Management state. As otherwise <bind/> and <session/> may end up in
  385.         // unacknowledgedStanzas and become duplicated on reconnect. See SMACK-706.
  386.         bindResourceAndEstablishSession(resource);

  387.         if (isSmAvailable() && useSm) {
  388.             // Remove what is maybe left from previously stream managed sessions
  389.             serverHandledStanzasCount = 0;
  390.             sendNonza(new Enable(useSmResumption, smClientMaxResumptionTime));
  391.             // XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed'
  392.             // then this is a non recoverable error and we therefore throw an exception.
  393.             waitForConditionOrThrowConnectionException(() -> smEnabledSyncPoint, "enabling stream mangement");
  394.             synchronized (requestAckPredicates) {
  395.                 if (requestAckPredicates.isEmpty()) {
  396.                     // Assure that we have at lest one predicate set up that so that we request acks
  397.                     // for the server and eventually flush some stanzas from the unacknowledged
  398.                     // stanza queue
  399.                     requestAckPredicates.add(Predicate.forMessagesOrAfter5Stanzas());
  400.                 }
  401.             }
  402.         }
  403.         // Inform client about failed resumption if possible, resend stanzas otherwise
  404.         // Process the stanzas synchronously so a client can re-queue them for transmission
  405.         // before it is informed about connection success
  406.         if (!stanzaDroppedListeners.isEmpty()) {
  407.             for (Stanza stanza : previouslyUnackedStanzas) {
  408.                 for (StanzaListener listener : stanzaDroppedListeners) {
  409.                     try {
  410.                         listener.processStanza(stanza);
  411.                     }
  412.                     catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
  413.                         LOGGER.log(Level.FINER, "StanzaDroppedListener received exception", e);
  414.                     }
  415.                 }
  416.             }
  417.         } else {
  418.             for (Stanza stanza : previouslyUnackedStanzas) {
  419.                 sendInternal(stanza);
  420.             }
  421.         }

  422.         afterSuccessfulLogin(false);
  423.     }

  424.     @Override
  425.     public boolean isSecureConnection() {
  426.         return secureSocket != null;
  427.     }

  428.     /**
  429.      * Shuts the current connection down. After this method returns, the connection must be ready
  430.      * for re-use by connect.
  431.      */
  432.     @Override
  433.     protected void shutdown() {
  434.         if (isSmEnabled()) {
  435.             try {
  436.                 // Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM
  437.                 // state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either.
  438.                 sendSmAcknowledgementInternal();
  439.             } catch (InterruptedException | NotConnectedException e) {
  440.                 LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e);
  441.             }
  442.         }
  443.         shutdown(false);
  444.     }

  445.     @Override
  446.     public synchronized void instantShutdown() {
  447.         shutdown(true);
  448.     }

  449.     private void shutdown(boolean instant) {
  450.         // The writer thread may already been finished at this point, for example when the connection is in the
  451.         // disconnected-but-resumable state. There is no need to wait for the closing stream tag from the server in this
  452.         // case.
  453.         if (!packetWriter.done()) {
  454.             // First shutdown the writer, this will result in a closing stream element getting send to
  455.             // the server
  456.             LOGGER.finer(packetWriter.threadName + " shutdown()");
  457.             packetWriter.shutdown(instant);
  458.             LOGGER.finer(packetWriter.threadName + " shutdown() returned");

  459.             if (!instant) {
  460.                 waitForClosingStreamTagFromServer();
  461.             }
  462.         }

  463.         LOGGER.finer(packetReader.threadName + " shutdown()");
  464.         packetReader.shutdown();
  465.         LOGGER.finer(packetReader.threadName + " shutdown() returned");

  466.         CloseableUtil.maybeClose(socket, LOGGER);

  467.         setWasAuthenticated();

  468.         try {
  469.             boolean readerAndWriterThreadsTermianted = waitFor(() -> !packetWriter.running && !packetReader.running);
  470.             if (!readerAndWriterThreadsTermianted) {
  471.                 LOGGER.severe("Reader and/or writer threads did not terminate timely. Writer running: "
  472.                                 + packetWriter.running + ", Reader running: " + packetReader.running);
  473.             } else {
  474.                 LOGGER.fine("Reader and writer threads terminated");
  475.             }
  476.         } catch (InterruptedException e) {
  477.             LOGGER.log(Level.FINE, "Interrupted while waiting for reader and writer threads to terminate", e);
  478.         }

  479.         if (disconnectedButResumeable) {
  480.             return;
  481.         }

  482.         // If we are able to resume the stream, then don't set
  483.         // connected/authenticated/usingTLS to false since we like to behave like we are still
  484.         // connected (e.g. sendStanza should not throw a NotConnectedException).
  485.         if (instant) {
  486.             disconnectedButResumeable = isSmResumptionPossible();
  487.             if (!disconnectedButResumeable) {
  488.                 // Reset the stream management session id to null, since the stream is no longer resumable. Note that we
  489.                 // keep the unacknowledgedStanzas queue, because we want to resend them when we are reconnected.
  490.                 smSessionId = null;
  491.             }
  492.         } else {
  493.             disconnectedButResumeable = false;

  494.             // Drop the stream management state if this is not an instant shutdown. We send
  495.             // a </stream> close tag and now the stream management state is no longer valid.
  496.             // This also prevents that we will potentially (re-)send any unavailable presence we
  497.             // may have send, because it got put into the unacknowledged queue and was not acknowledged before the
  498.             // connection terminated.
  499.             dropSmState();
  500.             // Note that we deliberately do not reset authenticatedConnectionInitiallyEstablishedTimestamp here, so that the
  501.             // information is available in the connectionClosedOnError() listeners.
  502.         }
  503.         authenticated = false;
  504.         connected = false;
  505.         secureSocket = null;
  506.         reader = null;
  507.         writer = null;

  508.         initState();
  509.     }

  510.     private interface SmAckAction<E extends Exception> {
  511.         void run() throws NotConnectedException, E;
  512.     }

  513.     private <E extends Exception> void requestSmAckIfNecessary(TopLevelStreamElement element,
  514.                     SmAckAction<E> smAckAction) throws NotConnectedException, E {
  515.         if (!isSmEnabled())
  516.             return;

  517.         if (element instanceof Stanza) {
  518.             Stanza stanza = (Stanza) element;
  519.             for (StanzaFilter requestAckPredicate : requestAckPredicates) {
  520.                 if (requestAckPredicate.accept(stanza)) {
  521.                     smAckAction.run();
  522.                     break;
  523.                 }
  524.             }
  525.         }
  526.     }

  527.     @Override
  528.     protected void sendInternal(TopLevelStreamElement element) throws NotConnectedException, InterruptedException {
  529.         packetWriter.sendStreamElement(element);
  530.         requestSmAckIfNecessary(element, () -> requestSmAcknowledgementInternal());
  531.     }

  532.     @Override
  533.     protected void sendNonBlockingInternal(TopLevelStreamElement element) throws NotConnectedException, OutgoingQueueFullException {
  534.         packetWriter.sendNonBlocking(element);
  535.         requestSmAckIfNecessary(element, () -> requestSmAcknowledgementNonBlockingInternal());
  536.     }

  537.     private void connectUsingConfiguration() throws ConnectionException, IOException, InterruptedException {
  538.         RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result = RemoteXmppTcpConnectionEndpoints.lookup(config);

  539.         List<RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint>> connectionExceptions = new ArrayList<>();

  540.         SocketFactory socketFactory = config.getSocketFactory();
  541.         ProxyInfo proxyInfo = config.getProxyInfo();
  542.         int timeout = config.getConnectTimeout();
  543.         if (socketFactory == null) {
  544.             socketFactory = SocketFactory.getDefault();
  545.         }
  546.         for (Rfc6120TcpRemoteConnectionEndpoint endpoint : result.discoveredRemoteConnectionEndpoints) {
  547.             Iterator<? extends InetAddress> inetAddresses;
  548.             String host = endpoint.getHost().toString();
  549.             UInt16 portUint16 = endpoint.getPort();
  550.             int port = portUint16.intValue();
  551.             if (proxyInfo == null) {
  552.                 inetAddresses = endpoint.getInetAddresses().iterator();
  553.                 assert inetAddresses.hasNext();

  554.                 innerloop: while (inetAddresses.hasNext()) {
  555.                     // Create a *new* Socket before every connection attempt, i.e. connect() call, since Sockets are not
  556.                     // re-usable after a failed connection attempt. See also SMACK-724.
  557.                     SmackFuture.SocketFuture socketFuture = new SmackFuture.SocketFuture(socketFactory);

  558.                     final InetAddress inetAddress = inetAddresses.next();
  559.                     final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, port);
  560.                     LOGGER.finer("Trying to establish TCP connection to " + inetSocketAddress);
  561.                     socketFuture.connectAsync(inetSocketAddress, timeout);

  562.                     try {
  563.                         socket = socketFuture.getOrThrow();
  564.                     } catch (IOException e) {
  565.                         RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>(
  566.                                         endpoint, inetAddress, e);
  567.                         connectionExceptions.add(rce);
  568.                         if (inetAddresses.hasNext()) {
  569.                             continue innerloop;
  570.                         } else {
  571.                             break innerloop;
  572.                         }
  573.                     }
  574.                     LOGGER.finer("Established TCP connection to " + inetSocketAddress);
  575.                     // We found a host to connect to, return here
  576.                     this.host = host;
  577.                     this.port = portUint16;
  578.                     return;
  579.                 }
  580.             } else {
  581.                 // TODO: Move this into the inner-loop above. There appears no reason why we should not try a proxy
  582.                 // connection to every inet address of each connection endpoint.
  583.                 socket = socketFactory.createSocket();
  584.                 StringUtils.requireNotNullNorEmpty(host, "Host of endpoint " + endpoint + " must not be null when using a Proxy");
  585.                 final String hostAndPort = host + " at port " + port;
  586.                 LOGGER.finer("Trying to establish TCP connection via Proxy to " + hostAndPort);
  587.                 try {
  588.                     proxyInfo.getProxySocketConnection().connect(socket, host, port, timeout);
  589.                 } catch (IOException e) {
  590.                     CloseableUtil.maybeClose(socket, LOGGER);
  591.                     RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>(endpoint, null, e);
  592.                     connectionExceptions.add(rce);
  593.                     continue;
  594.                 }
  595.                 LOGGER.finer("Established TCP connection to " + hostAndPort);
  596.                 // We found a host to connect to, return here
  597.                 this.host = host;
  598.                 this.port = portUint16;
  599.                 return;
  600.             }
  601.         }

  602.         // There are no more host addresses to try
  603.         // throw an exception and report all tried
  604.         // HostAddresses in the exception
  605.         throw EndpointConnectionException.from(result.lookupFailures, connectionExceptions);
  606.     }

  607.     /**
  608.      * Initializes the connection by creating a stanza reader and writer and opening a
  609.      * XMPP stream to the server.
  610.      *
  611.      * @throws IOException if an I/O error occurred.
  612.      * @throws InterruptedException if the calling thread was interrupted.
  613.      */
  614.     private void initConnection() throws IOException, InterruptedException {
  615.         compressionHandler = null;

  616.         // Set the reader and writer instance variables
  617.         initReaderAndWriter();

  618.         // Start the writer thread. This will open an XMPP stream to the server
  619.         packetWriter.init();
  620.         // Start the reader thread. The startup() method will block until we
  621.         // get an opening stream packet back from server
  622.         packetReader.init();
  623.     }

  624.     private void initReaderAndWriter() throws IOException {
  625.         InputStream is = socket.getInputStream();
  626.         OutputStream os = socket.getOutputStream();
  627.         if (compressionHandler != null) {
  628.             is = compressionHandler.getInputStream(is);
  629.             os = compressionHandler.getOutputStream(os);
  630.         }
  631.         // OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter
  632.         writer = new OutputStreamWriter(os, "UTF-8");
  633.         reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));

  634.         // If debugging is enabled, we open a window and write out all network traffic.
  635.         initDebugger();
  636.     }

  637.     /**
  638.      * The server has indicated that TLS negotiation can start. We now need to secure the
  639.      * existing plain connection and perform a handshake. This method won't return until the
  640.      * connection has finished the handshake or an error occurred while securing the connection.
  641.      * @throws IOException if an I/O error occurred.
  642.      * @throws SecurityNotPossibleException if TLS is not possible.
  643.      * @throws CertificateException if there is an issue with the certificate.
  644.      */
  645.     @SuppressWarnings("LiteralClassName")
  646.     private void proceedTLSReceived() throws IOException, SecurityNotPossibleException, CertificateException {
  647.         SmackTlsContext smackTlsContext = getSmackTlsContext();

  648.         Socket plain = socket;
  649.         int port = plain.getPort();
  650.         String xmppServiceDomainString = config.getXMPPServiceDomain().toString();
  651.         SSLSocketFactory sslSocketFactory = smackTlsContext.sslContext.getSocketFactory();
  652.         // Secure the plain connection
  653.         socket = sslSocketFactory.createSocket(plain, xmppServiceDomainString, port, true);

  654.         final SSLSocket sslSocket = (SSLSocket) socket;
  655.         // Immediately set the enabled SSL protocols and ciphers. See SMACK-712 why this is
  656.         // important (at least on certain platforms) and it seems to be a good idea anyways to
  657.         // prevent an accidental implicit handshake.
  658.         TLSUtils.setEnabledProtocolsAndCiphers(sslSocket, config.getEnabledSSLProtocols(), config.getEnabledSSLCiphers());

  659.         // Initialize the reader and writer with the new secured version
  660.         initReaderAndWriter();

  661.         // Proceed to do the handshake
  662.         sslSocket.startHandshake();

  663.         if (smackTlsContext.daneVerifier != null) {
  664.             smackTlsContext.daneVerifier.finish(sslSocket.getSession());
  665.         }

  666.         final HostnameVerifier verifier = getConfiguration().getHostnameVerifier();
  667.         if (verifier == null) {
  668.                 throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure.");
  669.         }

  670.         final String verifierHostname;
  671.         {
  672.             DnsName xmppServiceDomainDnsName = getConfiguration().getXmppServiceDomainAsDnsNameIfPossible();
  673.             // Try to convert the XMPP service domain, which potentially includes Unicode characters, into ASCII
  674.             // Compatible Encoding (ACE) to match RFC3280 dNSname IA5String constraint.
  675.             // See also: https://bugzilla.mozilla.org/show_bug.cgi?id=280839#c1
  676.             if (xmppServiceDomainDnsName != null) {
  677.                 verifierHostname = xmppServiceDomainDnsName.ace;
  678.             }
  679.             else {
  680.                 LOGGER.log(Level.WARNING, "XMPP service domain name '" + getXMPPServiceDomain()
  681.                                 + "' can not be represented as DNS name. TLS X.509 certificate validiation may fail.");
  682.                 verifierHostname = getXMPPServiceDomain().toString();
  683.             }
  684.         }

  685.         final boolean verificationSuccessful;
  686.         // Verify the TLS session.
  687.         verificationSuccessful = verifier.verify(verifierHostname, sslSocket.getSession());
  688.         if (!verificationSuccessful) {
  689.             throw new CertificateException(
  690.                             "Hostname verification of certificate failed. Certificate does not authenticate "
  691.                                             + getXMPPServiceDomain());
  692.         }

  693.         // Set that TLS was successful
  694.         secureSocket = sslSocket;
  695.     }

  696.     /**
  697.      * Returns the compression handler that can be used for one compression methods offered by the server.
  698.      *
  699.      * @return a instance of XMPPInputOutputStream or null if no suitable instance was found
  700.      *
  701.      */
  702.     private static XMPPInputOutputStream maybeGetCompressionHandler(Compress.Feature compression) {
  703.         for (XMPPInputOutputStream handler : SmackConfiguration.getCompressionHandlers()) {
  704.                 String method = handler.getCompressionMethod();
  705.                 if (compression.getMethods().contains(method))
  706.                     return handler;
  707.         }
  708.         return null;
  709.     }

  710.     @Override
  711.     public boolean isUsingCompression() {
  712.         return compressionHandler != null && compressSyncPoint;
  713.     }

  714.     /**
  715.      * <p>
  716.      * Starts using stream compression that will compress network traffic. Traffic can be
  717.      * reduced up to 90%. Therefore, stream compression is ideal when using a slow speed network
  718.      * connection. However, the server and the client will need to use more CPU time in order to
  719.      * un/compress network data so under high load the server performance might be affected.
  720.      * </p>
  721.      * <p>
  722.      * Stream compression has to have been previously offered by the server. Currently only the
  723.      * zlib method is supported by the client. Stream compression negotiation has to be done
  724.      * before authentication took place.
  725.      * </p>
  726.      *
  727.      * @throws NotConnectedException if the XMPP connection is not connected.
  728.      * @throws SmackException if Smack detected an exceptional situation.
  729.      * @throws InterruptedException if the calling thread was interrupted.
  730.      * @throws XMPPException if an XMPP protocol error was received.
  731.      */
  732.     private void maybeEnableCompression() throws SmackException, InterruptedException, XMPPException {
  733.         if (!config.isCompressionEnabled()) {
  734.             return;
  735.         }

  736.         Compress.Feature compression = getFeature(Compress.Feature.class);
  737.         if (compression == null) {
  738.             // Server does not support compression
  739.             return;
  740.         }
  741.         // If stream compression was offered by the server and we want to use
  742.         // compression then send compression request to the server
  743.         if ((compressionHandler = maybeGetCompressionHandler(compression)) != null) {
  744.             compressSyncPoint = false;
  745.             sendNonza(new Compress(compressionHandler.getCompressionMethod()));
  746.             waitForConditionOrThrowConnectionException(() -> compressSyncPoint, "establishing stream compression");
  747.         } else {
  748.             LOGGER.warning("Could not enable compression because no matching handler/method pair was found");
  749.         }
  750.     }

  751.     /**
  752.      * Establishes a connection to the XMPP server. It basically
  753.      * creates and maintains a socket connection to the server.
  754.      * <p>
  755.      * Listeners will be preserved from a previous connection if the reconnection
  756.      * occurs after an abrupt termination.
  757.      * </p>
  758.      *
  759.      * @throws XMPPException if an error occurs while trying to establish the connection.
  760.      * @throws SmackException if Smack detected an exceptional situation.
  761.      * @throws IOException if an I/O error occurred.
  762.      * @throws InterruptedException if the calling thread was interrupted.
  763.      */
  764.     @Override
  765.     protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException {
  766.         // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if
  767.         // there is an error establishing the connection
  768.         connectUsingConfiguration();

  769.         connected = true;

  770.         // We connected successfully to the servers TCP port
  771.         initConnection();

  772.         // TLS handled will be true either if TLS was established, or if it was not mandatory.
  773.         waitForConditionOrThrowConnectionException(() -> tlsHandled, "establishing TLS");

  774.         // Wait with SASL auth until the SASL mechanisms have been received
  775.         waitForConditionOrThrowConnectionException(() -> saslFeatureReceived, "SASL mechanisms stream feature from server");
  776.     }

  777.     /**
  778.      * For unit testing purposes
  779.      *
  780.      * @param writer TODO javadoc me please
  781.      */
  782.     protected void setWriter(Writer writer) {
  783.         this.writer = writer;
  784.     }

  785.     @Override
  786.     protected void afterFeaturesReceived() throws NotConnectedException, InterruptedException, SecurityRequiredByServerException {
  787.         StartTls startTlsFeature = getFeature(StartTls.class);
  788.         if (startTlsFeature != null) {
  789.             if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) {
  790.                 SecurityRequiredByServerException smackException = new SecurityRequiredByServerException();
  791.                 currentSmackException = smackException;
  792.                 notifyWaitingThreads();
  793.                 throw smackException;
  794.             }

  795.             if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) {
  796.                 sendNonza(new StartTls());
  797.             } else {
  798.                 tlsHandled = true;
  799.                 notifyWaitingThreads();
  800.             }
  801.         } else {
  802.             tlsHandled = true;
  803.             notifyWaitingThreads();
  804.         }

  805.         if (isSaslAuthenticated()) {
  806.             // If we have received features after the SASL has been successfully completed, then we
  807.             // have also *maybe* received, as it is an optional feature, the compression feature
  808.             // from the server.
  809.             streamFeaturesAfterAuthenticationReceived = true;
  810.             notifyWaitingThreads();
  811.         }
  812.     }

  813.     private void resetParser() throws IOException {
  814.         try {
  815.             packetReader.parser = SmackXmlParser.newXmlParser(reader);
  816.         } catch (XmlPullParserException e) {
  817.             throw new IOException(e);
  818.         }
  819.    }

  820.     private void openStreamAndResetParser() throws IOException, NotConnectedException, InterruptedException {
  821.         sendStreamOpen();
  822.         resetParser();
  823.     }

  824.     protected class PacketReader {

  825.         private final String threadName = "Smack Reader (" + getConnectionCounter() + ')';

  826.         XmlPullParser parser;

  827.         private volatile boolean done;

  828.         private boolean running;

  829.         /**
  830.          * Initializes the reader in order to be used. The reader is initialized during the
  831.          * first connection and when reconnecting due to an abruptly disconnection.
  832.          */
  833.         void init() {
  834.             done = false;

  835.             running = true;
  836.             Async.go(new Runnable() {
  837.                 @Override
  838.                 public void run() {
  839.                     LOGGER.finer(threadName + " start");
  840.                     try {
  841.                         parsePackets();
  842.                     } finally {
  843.                         LOGGER.finer(threadName + " exit");
  844.                         running = false;
  845.                         notifyWaitingThreads();
  846.                     }
  847.                 }
  848.             }, threadName);
  849.          }

  850.         /**
  851.          * Shuts the stanza reader down. This method simply sets the 'done' flag to true.
  852.          */
  853.         void shutdown() {
  854.             done = true;
  855.         }

  856.         /**
  857.          * Parse top-level packets in order to process them further.
  858.          */
  859.         private void parsePackets() {
  860.             try {
  861.                 openStreamAndResetParser();
  862.                 XmlPullParser.Event eventType = parser.getEventType();
  863.                 while (!done) {
  864.                     switch (eventType) {
  865.                     case START_ELEMENT:
  866.                         final String name = parser.getName();
  867.                         final String namespace = parser.getNamespace();

  868.                         switch (name) {
  869.                         case Message.ELEMENT:
  870.                         case IQ.IQ_ELEMENT:
  871.                         case Presence.ELEMENT:
  872.                             try {
  873.                                 parseAndProcessStanza(parser);
  874.                             } finally {
  875.                                 clientHandledStanzasCount = SMUtils.incrementHeight(clientHandledStanzasCount);
  876.                             }
  877.                             break;
  878.                         case "stream":
  879.                             if (StreamOpen.ETHERX_JABBER_STREAMS_NAMESPACE.equals(namespace)) {
  880.                                 onStreamOpen(parser);
  881.                             }
  882.                             break;
  883.                         case "error":
  884.                             StreamError streamError = PacketParserUtils.parseStreamError(parser);
  885.                             // Stream errors are non recoverable, throw this exceptions. Also note that this will set
  886.                             // this exception as current connection exceptions and notify any waiting threads.
  887.                             throw new StreamErrorException(streamError);
  888.                         case "features":
  889.                             parseFeaturesAndNotify(parser);
  890.                             break;
  891.                         case "proceed":
  892.                             // Secure the connection by negotiating TLS
  893.                             proceedTLSReceived();
  894.                             // Send a new opening stream to the server
  895.                             openStreamAndResetParser();
  896.                             break;
  897.                         case "failure":
  898.                             switch (namespace) {
  899.                             case "urn:ietf:params:xml:ns:xmpp-tls":
  900.                                 // TLS negotiation has failed. The server will close the connection
  901.                                 // TODO Parse failure stanza
  902.                                 throw new SmackException.SmackMessageException("TLS negotiation has failed");
  903.                             case "http://jabber.org/protocol/compress":
  904.                                 // Stream compression has been denied. This is a recoverable
  905.                                 // situation. It is still possible to authenticate and
  906.                                 // use the connection but using an uncompressed connection
  907.                                 // TODO Parse failure stanza
  908.                                 currentSmackException = new SmackException.SmackMessageException("Could not establish compression");
  909.                                 notifyWaitingThreads();
  910.                                 break;
  911.                             default:
  912.                                 parseAndProcessNonza(parser);
  913.                             }
  914.                             break;
  915.                         case Compressed.ELEMENT:
  916.                             // Server confirmed that it's possible to use stream compression. Start
  917.                             // stream compression
  918.                             // Initialize the reader and writer with the new compressed version
  919.                             initReaderAndWriter();
  920.                             // Send a new opening stream to the server
  921.                             openStreamAndResetParser();
  922.                             // Notify that compression is being used
  923.                             compressSyncPoint = true;
  924.                             notifyWaitingThreads();
  925.                             break;
  926.                         case Enabled.ELEMENT:
  927.                             Enabled enabled = ParseStreamManagement.enabled(parser);
  928.                             if (enabled.isResumeSet()) {
  929.                                 smSessionId = enabled.getId();
  930.                                 if (StringUtils.isNullOrEmpty(smSessionId)) {
  931.                                     SmackException xmppException = new SmackException.SmackMessageException("Stream Management 'enabled' element with resume attribute but without session id received");
  932.                                     setCurrentConnectionExceptionAndNotify(xmppException);
  933.                                     throw xmppException;
  934.                                 }
  935.                                 smServerMaxResumptionTime = enabled.getMaxResumptionTime();
  936.                             } else {
  937.                                 // Mark this a non-resumable stream by setting smSessionId to null
  938.                                 smSessionId = null;
  939.                             }
  940.                             clientHandledStanzasCount = 0;
  941.                             smWasEnabledAtLeastOnce = true;
  942.                             smEnabledSyncPoint = true;
  943.                             notifyWaitingThreads();
  944.                             break;
  945.                         case Failed.ELEMENT:
  946.                             Failed failed = ParseStreamManagement.failed(parser);
  947.                             if (smResumedSyncPoint == SyncPointState.request_sent) {
  948.                                 // This is a <failed/> nonza in a response to resuming a previous stream, failure to do
  949.                                 // so is non-fatal as we can simply continue with resource binding in this case.
  950.                                 smResumptionFailed = failed;
  951.                                 notifyWaitingThreads();
  952.                             } else {
  953.                                 FailedNonzaException xmppException = new FailedNonzaException(failed, failed.getStanzaErrorCondition());
  954.                                 setCurrentConnectionExceptionAndNotify(xmppException);
  955.                             }
  956.                             break;
  957.                         case Resumed.ELEMENT:
  958.                             Resumed resumed = ParseStreamManagement.resumed(parser);
  959.                             if (!smSessionId.equals(resumed.getPrevId())) {
  960.                                 throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId());
  961.                             }
  962.                             // Mark SM as enabled
  963.                             smEnabledSyncPoint = true;
  964.                             // First, drop the stanzas already handled by the server
  965.                             processHandledCount(resumed.getHandledCount());
  966.                             // Then re-send what is left in the unacknowledged queue
  967.                             List<Stanza> stanzasToResend = new ArrayList<>(unacknowledgedStanzas.size());
  968.                             unacknowledgedStanzas.drainTo(stanzasToResend);
  969.                             for (Stanza stanza : stanzasToResend) {
  970.                                 XMPPTCPConnection.this.sendInternal(stanza);
  971.                             }
  972.                             // If there where stanzas resent, then request a SM ack for them.
  973.                             // Writer's sendStreamElement() won't do it automatically based on
  974.                             // predicates.
  975.                             if (!stanzasToResend.isEmpty()) {
  976.                                 requestSmAcknowledgementInternal();
  977.                             }
  978.                             // Mark SM resumption as successful
  979.                             smResumedSyncPoint = SyncPointState.successful;
  980.                             notifyWaitingThreads();
  981.                             break;
  982.                         case AckAnswer.ELEMENT:
  983.                             AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser);
  984.                             processHandledCount(ackAnswer.getHandledCount());
  985.                             break;
  986.                         case AckRequest.ELEMENT:
  987.                             ParseStreamManagement.ackRequest(parser);
  988.                             if (smEnabledSyncPoint) {
  989.                                 sendSmAcknowledgementInternal();
  990.                             } else {
  991.                                 LOGGER.warning("SM Ack Request received while SM is not enabled");
  992.                             }
  993.                             break;
  994.                          default:
  995.                              parseAndProcessNonza(parser);
  996.                              break;
  997.                         }
  998.                         break;
  999.                     case END_ELEMENT:
  1000.                         final String endTagName = parser.getName();
  1001.                         if ("stream".equals(endTagName)) {
  1002.                             if (!parser.getNamespace().equals("http://etherx.jabber.org/streams")) {
  1003.                                 LOGGER.warning(XMPPTCPConnection.this +  " </stream> but different namespace " + parser.getNamespace());
  1004.                                 break;
  1005.                             }

  1006.                             // Check if the queue was already shut down before reporting success on closing stream tag
  1007.                             // received. This avoids a race if there is a disconnect(), followed by a connect(), which
  1008.                             // did re-start the queue again, causing this writer to assume that the queue is not
  1009.                             // shutdown, which results in a call to disconnect().
  1010.                             final boolean queueWasShutdown = packetWriter.queue.isShutdown();
  1011.                             closingStreamReceived = true;
  1012.                             notifyWaitingThreads();

  1013.                             if (queueWasShutdown) {
  1014.                                 // We received a closing stream element *after* we initiated the
  1015.                                 // termination of the session by sending a closing stream element to
  1016.                                 // the server first
  1017.                                 return;
  1018.                             } else {
  1019.                                 // We received a closing stream element from the server without us
  1020.                                 // sending a closing stream element first. This means that the
  1021.                                 // server wants to terminate the session, therefore disconnect
  1022.                                 // the connection
  1023.                                 LOGGER.info(XMPPTCPConnection.this
  1024.                                                 + " received closing </stream> element."
  1025.                                                 + " Server wants to terminate the connection, calling disconnect()");
  1026.                                 ASYNC_BUT_ORDERED.performAsyncButOrdered(XMPPTCPConnection.this, new Runnable() {
  1027.                                     @Override
  1028.                                     public void run() {
  1029.                                         disconnect();
  1030.                                     }});
  1031.                             }
  1032.                         }
  1033.                         break;
  1034.                     case END_DOCUMENT:
  1035.                         // END_DOCUMENT only happens in an error case, as otherwise we would see a
  1036.                         // closing stream element before.
  1037.                         throw new SmackException.SmackMessageException(
  1038.                                         "Parser got END_DOCUMENT event. This could happen e.g. if the server closed the connection without sending a closing stream element");
  1039.                     default:
  1040.                         // Catch all for incomplete switch (MissingCasesInEnumSwitch) statement.
  1041.                         break;
  1042.                     }
  1043.                     eventType = parser.next();
  1044.                 }
  1045.             }
  1046.             catch (Exception e) {
  1047.                 // Set running to false since this thread will exit here and notifyConnectionError() will wait until
  1048.                 // the reader and writer thread's 'running' value is false. Hence, we need to set it to false before calling
  1049.                 // notifyConnectionError() below, even though run() also sets it to false. Therefore, do not remove this.
  1050.                 running = false;

  1051.                 String ignoreReasonThread = null;

  1052.                 boolean writerThreadWasShutDown = packetWriter.queue.isShutdown();
  1053.                 if (writerThreadWasShutDown) {
  1054.                     ignoreReasonThread = "writer";
  1055.                 } else if (done) {
  1056.                     ignoreReasonThread = "reader";
  1057.                 }

  1058.                 if (ignoreReasonThread != null) {
  1059.                     LOGGER.log(Level.FINER, "Ignoring " + e + " as " + ignoreReasonThread + " was already shut down");
  1060.                     return;
  1061.                 }

  1062.                 // Close the connection and notify connection listeners of the error.
  1063.                 notifyConnectionError(e);
  1064.             }
  1065.         }
  1066.     }

  1067.     protected class PacketWriter {
  1068.         public static final int QUEUE_SIZE = XMPPTCPConnection.QUEUE_SIZE;
  1069.         public static final int UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE = 1024;
  1070.         public static final int UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE_HIGH_WATER_MARK = (int) (0.3 * UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE);

  1071.         private final String threadName = "Smack Writer (" + getConnectionCounter() + ')';

  1072.         private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<>(
  1073.                         QUEUE_SIZE, true);

  1074.         /**
  1075.          * If set, the stanza writer is shut down
  1076.          */
  1077.         protected volatile Long shutdownTimestamp = null;

  1078.         private volatile boolean instantShutdown;

  1079.         /**
  1080.          * True if some preconditions are given to start the bundle and defer mechanism.
  1081.          * <p>
  1082.          * This will likely get set to true right after the start of the writer thread, because
  1083.          * {@link #nextStreamElement()} will check if {@link queue} is empty, which is probably the case, and then set
  1084.          * this field to true.
  1085.          * </p>
  1086.          */
  1087.         private boolean shouldBundleAndDefer;

  1088.         private boolean running;

  1089.         /**
  1090.         * Initializes the writer in order to be used. It is called at the first connection and also
  1091.         * is invoked if the connection is disconnected by an error.
  1092.         */
  1093.         void init() {
  1094.             shutdownTimestamp = null;

  1095.             if (unacknowledgedStanzas != null) {
  1096.                 // It's possible that there are new stanzas in the writer queue that
  1097.                 // came in while we were disconnected but resumable, drain those into
  1098.                 // the unacknowledged queue so that they get resent now
  1099.                 drainWriterQueueToUnacknowledgedStanzas();
  1100.             }

  1101.             queue.start();
  1102.             running = true;
  1103.             Async.go(new Runnable() {
  1104.                 @Override
  1105.                 public void run() {
  1106.                     LOGGER.finer(threadName + " start");
  1107.                     try {
  1108.                         writePackets();
  1109.                     } finally {
  1110.                         LOGGER.finer(threadName + " exit");
  1111.                         running = false;
  1112.                         notifyWaitingThreads();
  1113.                     }
  1114.                 }
  1115.             }, threadName);
  1116.         }

  1117.         private boolean done() {
  1118.             return shutdownTimestamp != null;
  1119.         }

  1120.         protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException {
  1121.             final boolean done = done();
  1122.             if (done) {
  1123.                 final boolean smResumptionPossible = isSmResumptionPossible();
  1124.                 // Don't throw a NotConnectedException is there is an resumable stream available
  1125.                 if (!smResumptionPossible) {
  1126.                     throw new NotConnectedException(XMPPTCPConnection.this, "done=" + done
  1127.                                     + " smResumptionPossible=" + smResumptionPossible);
  1128.                 }
  1129.             }
  1130.         }

  1131.         /**
  1132.          * Sends the specified element to the server.
  1133.          *
  1134.          * @param element the element to send.
  1135.          * @throws NotConnectedException if the XMPP connection is not connected.
  1136.          * @throws InterruptedException if the calling thread was interrupted.
  1137.          */
  1138.         protected void sendStreamElement(Element element) throws NotConnectedException, InterruptedException {
  1139.             throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
  1140.             try {
  1141.                 queue.put(element);
  1142.             }
  1143.             catch (InterruptedException e) {
  1144.                 // put() may throw an InterruptedException for two reasons:
  1145.                 // 1. If the queue was shut down
  1146.                 // 2. If the thread was interrupted
  1147.                 // so we have to check which is the case
  1148.                 throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
  1149.                 // If the method above did not throw, then the sending thread was interrupted
  1150.                 throw e;
  1151.             }
  1152.         }

  1153.         /**
  1154.          * Sends the specified element to the server.
  1155.          *
  1156.          * @param element the element to send.
  1157.          * @throws NotConnectedException if the XMPP connection is not connected.
  1158.          * @throws OutgoingQueueFullException if there is no space in the outgoing queue.
  1159.          */
  1160.         protected void sendNonBlocking(Element element) throws NotConnectedException, OutgoingQueueFullException {
  1161.             throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
  1162.             boolean enqueued = queue.offer(element);
  1163.             if (!enqueued) {
  1164.                 throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
  1165.                 throw new OutgoingQueueFullException();
  1166.             }
  1167.         }

  1168.         /**
  1169.          * Shuts down the stanza writer. Once this method has been called, no further
  1170.          * packets will be written to the server.
  1171.          */
  1172.         void shutdown(boolean instant) {
  1173.             instantShutdown = instant;
  1174.             queue.shutdown();
  1175.             shutdownTimestamp = System.currentTimeMillis();
  1176.         }

  1177.         /**
  1178.          * Maybe return the next available element from the queue for writing. If the queue is shut down <b>or</b> a
  1179.          * spurious interrupt occurs, <code>null</code> is returned. So it is important to check the 'done' condition in
  1180.          * that case.
  1181.          *
  1182.          * @return the next element for writing or null.
  1183.          */
  1184.         private Element nextStreamElement() {
  1185.             // It is important the we check if the queue is empty before removing an element from it
  1186.             if (queue.isEmpty()) {
  1187.                 shouldBundleAndDefer = true;
  1188.             }
  1189.             Element packet = null;
  1190.             try {
  1191.                 packet = queue.take();
  1192.             }
  1193.             catch (InterruptedException e) {
  1194.                 if (!queue.isShutdown()) {
  1195.                     // Users shouldn't try to interrupt the packet writer thread
  1196.                     LOGGER.log(Level.WARNING, "Writer thread was interrupted. Don't do that. Use disconnect() instead.", e);
  1197.                 }
  1198.             }
  1199.             return packet;
  1200.         }

  1201.         private void writePackets() {
  1202.             try {
  1203.                 // Write out packets from the queue.
  1204.                 while (!done()) {
  1205.                     Element element = nextStreamElement();
  1206.                     if (element == null) {
  1207.                         continue;
  1208.                     }

  1209.                     // Get a local version of the bundle and defer callback, in case it's unset
  1210.                     // between the null check and the method invocation
  1211.                     final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback;
  1212.                     // If the preconditions are given (e.g. bundleAndDefer callback is set, queue is
  1213.                     // empty), then we could wait a bit for further stanzas attempting to decrease
  1214.                     // our energy consumption
  1215.                     if (localBundleAndDeferCallback != null && isAuthenticated() && shouldBundleAndDefer) {
  1216.                         // Reset shouldBundleAndDefer to false, nextStreamElement() will set it to true once the
  1217.                         // queue is empty again.
  1218.                         shouldBundleAndDefer = false;
  1219.                         final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean();
  1220.                         final int bundleAndDeferMillis = localBundleAndDeferCallback.getBundleAndDeferMillis(new BundleAndDefer(
  1221.                                         bundlingAndDeferringStopped));
  1222.                         if (bundleAndDeferMillis > 0) {
  1223.                             long remainingWait = bundleAndDeferMillis;
  1224.                             final long waitStart = System.currentTimeMillis();
  1225.                             synchronized (bundlingAndDeferringStopped) {
  1226.                                 while (!bundlingAndDeferringStopped.get() && remainingWait > 0) {
  1227.                                     bundlingAndDeferringStopped.wait(remainingWait);
  1228.                                     remainingWait = bundleAndDeferMillis
  1229.                                                     - (System.currentTimeMillis() - waitStart);
  1230.                                 }
  1231.                             }
  1232.                         }
  1233.                     }

  1234.                     Stanza packet = null;
  1235.                     if (element instanceof Stanza) {
  1236.                         packet = (Stanza) element;
  1237.                     }
  1238.                     else if (element instanceof Enable) {
  1239.                         // The client needs to add messages to the unacknowledged stanzas queue
  1240.                         // right after it sent 'enabled'. Stanza will be added once
  1241.                         // unacknowledgedStanzas is not null.
  1242.                         unacknowledgedStanzas = new ArrayBlockingQueue<>(UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE);
  1243.                     }
  1244.                     maybeAddToUnacknowledgedStanzas(packet);

  1245.                     CharSequence elementXml = element.toXML(outgoingStreamXmlEnvironment);
  1246.                     if (elementXml instanceof XmlStringBuilder) {
  1247.                         try {
  1248.                             ((XmlStringBuilder) elementXml).write(writer, outgoingStreamXmlEnvironment);
  1249.                         } catch (NullPointerException npe) {
  1250.                             LOGGER.log(Level.FINE, "NPE in XmlStringBuilder of " + element.getClass() + ": " + element, npe);
  1251.                             throw npe;
  1252.                         }
  1253.                     }
  1254.                     else {
  1255.                         writer.write(elementXml.toString());
  1256.                     }

  1257.                     if (queue.isEmpty()) {
  1258.                         writer.flush();
  1259.                     }
  1260.                     if (packet != null) {
  1261.                         firePacketSendingListeners(packet);
  1262.                     }
  1263.                 }
  1264.                 if (!instantShutdown) {
  1265.                     // Flush out the rest of the queue.
  1266.                     try {
  1267.                         while (!queue.isEmpty()) {
  1268.                             Element packet = queue.remove();
  1269.                             if (packet instanceof Stanza) {
  1270.                                 Stanza stanza = (Stanza) packet;
  1271.                                 maybeAddToUnacknowledgedStanzas(stanza);
  1272.                             }
  1273.                             writer.write(packet.toXML().toString());
  1274.                         }
  1275.                     }
  1276.                     catch (Exception e) {
  1277.                         LOGGER.log(Level.WARNING,
  1278.                                         "Exception flushing queue during shutdown, ignore and continue",
  1279.                                         e);
  1280.                     }

  1281.                     // Close the stream.
  1282.                     try {
  1283.                         writer.write("</stream:stream>");
  1284.                         writer.flush();
  1285.                     }
  1286.                     catch (Exception e) {
  1287.                         LOGGER.log(Level.WARNING, "Exception writing closing stream element", e);
  1288.                     }

  1289.                     // Delete the queue contents (hopefully nothing is left).
  1290.                     queue.clear();
  1291.                 } else if (instantShutdown && isSmEnabled()) {
  1292.                     // This was an instantShutdown and SM is enabled, drain all remaining stanzas
  1293.                     // into the unacknowledgedStanzas queue
  1294.                     drainWriterQueueToUnacknowledgedStanzas();
  1295.                 }
  1296.                 // Do *not* close the writer here, as it will cause the socket
  1297.                 // to get closed. But we may want to receive further stanzas
  1298.                 // until the closing stream tag is received. The socket will be
  1299.                 // closed in shutdown().
  1300.             }
  1301.             catch (Exception e) {
  1302.                 // The exception can be ignored if the connection is 'done'
  1303.                 // or if the it was caused because the socket got closed
  1304.                 if (!(done() || queue.isShutdown())) {
  1305.                     // Set running to false since this thread will exit here and notifyConnectionError() will wait until
  1306.                     // the reader and writer thread's 'running' value is false.
  1307.                     running = false;
  1308.                     notifyConnectionError(e);
  1309.                 } else {
  1310.                     LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e);
  1311.                 }
  1312.             }
  1313.         }

  1314.         private void drainWriterQueueToUnacknowledgedStanzas() {
  1315.             List<Element> elements = new ArrayList<>(queue.size());
  1316.             queue.drainTo(elements);
  1317.             for (int i = 0; i < elements.size(); i++) {
  1318.                 Element element = elements.get(i);
  1319.                 // If the unacknowledgedStanza queue is full, then bail out with a warning message. See SMACK-844.
  1320.                 if (unacknowledgedStanzas.remainingCapacity() == 0) {
  1321.                     StreamManagementException.UnacknowledgedQueueFullException exception = StreamManagementException.UnacknowledgedQueueFullException
  1322.                             .newWith(i, elements, unacknowledgedStanzas);
  1323.                     LOGGER.log(Level.WARNING,
  1324.                             "Some stanzas may be lost as not all could be drained to the unacknowledged stanzas queue", exception);
  1325.                     return;
  1326.                 }
  1327.                 if (element instanceof Stanza) {
  1328.                     unacknowledgedStanzas.add((Stanza) element);
  1329.                 }
  1330.             }
  1331.         }

  1332.         private void maybeAddToUnacknowledgedStanzas(Stanza stanza) throws IOException {
  1333.             // Check if the stream element should be put to the unacknowledgedStanza
  1334.             // queue. Note that we can not do the put() in sendStanzaInternal() and the
  1335.             // packet order is not stable at this point (sendStanzaInternal() can be
  1336.             // called concurrently).
  1337.             if (unacknowledgedStanzas != null && stanza != null) {
  1338.                 // If the unacknowledgedStanza queue reaching its high water mark, request an new ack
  1339.                 // from the server in order to drain it
  1340.                 if (unacknowledgedStanzas.size() == UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE_HIGH_WATER_MARK) {
  1341.                     writer.write(AckRequest.INSTANCE.toXML().toString());
  1342.                 }

  1343.                 try {
  1344.                     // It is important the we put the stanza in the unacknowledged stanza
  1345.                     // queue before we put it on the wire
  1346.                     unacknowledgedStanzas.put(stanza);
  1347.                 }
  1348.                 catch (InterruptedException e) {
  1349.                     throw new IllegalStateException(e);
  1350.                 }
  1351.             }
  1352.         }
  1353.     }

  1354.     /**
  1355.      * Set if Stream Management should be used by default for new connections.
  1356.      *
  1357.      * @param useSmDefault true to use Stream Management for new connections.
  1358.      */
  1359.     public static void setUseStreamManagementDefault(boolean useSmDefault) {
  1360.         XMPPTCPConnection.useSmDefault = useSmDefault;
  1361.     }

  1362.     /**
  1363.      * Set if Stream Management resumption should be used by default for new connections.
  1364.      *
  1365.      * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
  1366.      * @deprecated use {@link #setUseStreamManagementResumptionDefault(boolean)} instead.
  1367.      */
  1368.     @Deprecated
  1369.     public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) {
  1370.         setUseStreamManagementResumptionDefault(useSmResumptionDefault);
  1371.     }

  1372.     /**
  1373.      * Set if Stream Management resumption should be used by default for new connections.
  1374.      *
  1375.      * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
  1376.      */
  1377.     public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) {
  1378.         if (useSmResumptionDefault) {
  1379.             // Also enable SM is resumption is enabled
  1380.             setUseStreamManagementDefault(useSmResumptionDefault);
  1381.         }
  1382.         XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault;
  1383.     }

  1384.     /**
  1385.      * Set if Stream Management should be used if supported by the server.
  1386.      *
  1387.      * @param useSm true to use Stream Management.
  1388.      */
  1389.     public void setUseStreamManagement(boolean useSm) {
  1390.         this.useSm = useSm;
  1391.     }

  1392.     /**
  1393.      * Set if Stream Management resumption should be used if supported by the server.
  1394.      *
  1395.      * @param useSmResumption true to use Stream Management resumption.
  1396.      */
  1397.     public void setUseStreamManagementResumption(boolean useSmResumption) {
  1398.         if (useSmResumption) {
  1399.             // Also enable SM is resumption is enabled
  1400.             setUseStreamManagement(useSmResumption);
  1401.         }
  1402.         this.useSmResumption = useSmResumption;
  1403.     }

  1404.     /**
  1405.      * Set the preferred resumption time in seconds.
  1406.      * @param resumptionTime the preferred resumption time in seconds
  1407.      */
  1408.     public void setPreferredResumptionTime(int resumptionTime) {
  1409.         smClientMaxResumptionTime = resumptionTime;
  1410.     }

  1411.     /**
  1412.      * Add a predicate for Stream Management acknowledgment requests.
  1413.      * <p>
  1414.      * Those predicates are used to determine when a Stream Management acknowledgement request is send to the server.
  1415.      * Some pre-defined predicates are found in the <code>org.jivesoftware.smack.sm.predicates</code> package.
  1416.      * </p>
  1417.      * <p>
  1418.      * If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used.
  1419.      * </p>
  1420.      *
  1421.      * @param predicate the predicate to add.
  1422.      * @return if the predicate was not already active.
  1423.      */
  1424.     public boolean addRequestAckPredicate(StanzaFilter predicate) {
  1425.         synchronized (requestAckPredicates) {
  1426.             return requestAckPredicates.add(predicate);
  1427.         }
  1428.     }

  1429.     /**
  1430.      * Remove the given predicate for Stream Management acknowledgment request.
  1431.      * @param predicate the predicate to remove.
  1432.      * @return true if the predicate was removed.
  1433.      */
  1434.     public boolean removeRequestAckPredicate(StanzaFilter predicate) {
  1435.         synchronized (requestAckPredicates) {
  1436.             return requestAckPredicates.remove(predicate);
  1437.         }
  1438.     }

  1439.     /**
  1440.      * Remove all predicates for Stream Management acknowledgment requests.
  1441.      */
  1442.     public void removeAllRequestAckPredicates() {
  1443.         synchronized (requestAckPredicates) {
  1444.             requestAckPredicates.clear();
  1445.         }
  1446.     }

  1447.     /**
  1448.      * Send an unconditional Stream Management acknowledgement request to the server.
  1449.      *
  1450.      * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
  1451.      * @throws NotConnectedException if the connection is not connected.
  1452.      * @throws InterruptedException if the calling thread was interrupted.
  1453.      */
  1454.     public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException {
  1455.         if (!isSmEnabled()) {
  1456.             throw new StreamManagementException.StreamManagementNotEnabledException();
  1457.         }
  1458.         requestSmAcknowledgementInternal();
  1459.     }

  1460.     private void requestSmAcknowledgementInternal() throws NotConnectedException, InterruptedException {
  1461.         packetWriter.sendStreamElement(AckRequest.INSTANCE);
  1462.     }

  1463.     private void requestSmAcknowledgementNonBlockingInternal() throws NotConnectedException, OutgoingQueueFullException {
  1464.         packetWriter.sendNonBlocking(AckRequest.INSTANCE);
  1465.     }

  1466.     /**
  1467.      * Send a unconditional Stream Management acknowledgment to the server.
  1468.      * <p>
  1469.      * See <a href="http://xmpp.org/extensions/xep-0198.html#acking">XEP-198: Stream Management § 4. Acks</a>:
  1470.      * "Either party MAY send an &lt;a/&gt; element at any time (e.g., after it has received a certain number of stanzas,
  1471.      * or after a certain period of time), even if it has not received an &lt;r/&gt; element from the other party."
  1472.      * </p>
  1473.      *
  1474.      * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
  1475.      * @throws NotConnectedException if the connection is not connected.
  1476.      * @throws InterruptedException if the calling thread was interrupted.
  1477.      */
  1478.     public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException {
  1479.         if (!isSmEnabled()) {
  1480.             throw new StreamManagementException.StreamManagementNotEnabledException();
  1481.         }
  1482.         sendSmAcknowledgementInternal();
  1483.     }

  1484.     private void sendSmAcknowledgementInternal() throws NotConnectedException, InterruptedException {
  1485.         AckAnswer ackAnswer = new AckAnswer(clientHandledStanzasCount);
  1486.         // Do net put an ack to the queue if it has already been shutdown. Some servers, like ejabberd, like to request
  1487.         // an ack even after we have sent a stream close (and hence the queue was shutdown). If we would not check here,
  1488.         // then the ack would dangle around in the queue, and be sent on the next re-connection attempt even before the
  1489.         // stream open.
  1490.         packetWriter.queue.putIfNotShutdown(ackAnswer);
  1491.     }

  1492.     /**
  1493.      * Add a Stanza acknowledged listener.
  1494.      * <p>
  1495.      * Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get
  1496.      * automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, StanzaListener)} when
  1497.      * possible.
  1498.      * </p>
  1499.      *
  1500.      * @param listener the listener to add.
  1501.      */
  1502.     public void addStanzaAcknowledgedListener(StanzaListener listener) {
  1503.         stanzaAcknowledgedListeners.add(listener);
  1504.     }

  1505.     /**
  1506.      * Remove the given Stanza acknowledged listener.
  1507.      *
  1508.      * @param listener the listener.
  1509.      * @return true if the listener was removed.
  1510.      */
  1511.     public boolean removeStanzaAcknowledgedListener(StanzaListener listener) {
  1512.         return stanzaAcknowledgedListeners.remove(listener);
  1513.     }

  1514.     /**
  1515.      * Remove all stanza acknowledged listeners.
  1516.      */
  1517.     public void removeAllStanzaAcknowledgedListeners() {
  1518.         stanzaAcknowledgedListeners.clear();
  1519.     }

  1520.     /**
  1521.      * Add a Stanza dropped listener.
  1522.      * <p>
  1523.      * Those listeners will be invoked every time a Stanza has been dropped due to a failed SM resume. They will not get
  1524.      * automatically removed. If at least one StanzaDroppedListener is configured, no attempt will be made to retransmit
  1525.      * the Stanzas.
  1526.      * </p>
  1527.      *
  1528.      * @param listener the listener to add.
  1529.      * @since 4.3.3
  1530.      */
  1531.     public void addStanzaDroppedListener(StanzaListener listener) {
  1532.         stanzaDroppedListeners.add(listener);
  1533.     }

  1534.     /**
  1535.      * Remove the given Stanza dropped listener.
  1536.      *
  1537.      * @param listener the listener.
  1538.      * @return true if the listener was removed.
  1539.      * @since 4.3.3
  1540.      */
  1541.     public boolean removeStanzaDroppedListener(StanzaListener listener) {
  1542.         return stanzaDroppedListeners.remove(listener);
  1543.     }

  1544.     /**
  1545.      * Add a new Stanza ID acknowledged listener for the given ID.
  1546.      * <p>
  1547.      * The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will
  1548.      * automatically be removed after the listener was run.
  1549.      * </p>
  1550.      *
  1551.      * @param id the stanza ID.
  1552.      * @param listener the listener to invoke.
  1553.      * @return the previous listener for this stanza ID or null.
  1554.      * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
  1555.      */
  1556.     @SuppressWarnings("FutureReturnValueIgnored")
  1557.     public StanzaListener addStanzaIdAcknowledgedListener(final String id, StanzaListener listener) throws StreamManagementNotEnabledException {
  1558.         // Prevent users from adding callbacks that will never get removed
  1559.         if (!smWasEnabledAtLeastOnce) {
  1560.             throw new StreamManagementException.StreamManagementNotEnabledException();
  1561.         }
  1562.         // Remove the listener after max. 3 hours
  1563.         final int removeAfterSeconds = Math.min(getMaxSmResumptionTime(), 3 * 60 * 60);
  1564.         schedule(new Runnable() {
  1565.             @Override
  1566.             public void run() {
  1567.                 stanzaIdAcknowledgedListeners.remove(id);
  1568.             }
  1569.         }, removeAfterSeconds, TimeUnit.SECONDS);
  1570.         return stanzaIdAcknowledgedListeners.put(id, listener);
  1571.     }

  1572.     /**
  1573.      * Remove the Stanza ID acknowledged listener for the given ID.
  1574.      *
  1575.      * @param id the stanza ID.
  1576.      * @return true if the listener was found and removed, false otherwise.
  1577.      */
  1578.     public StanzaListener removeStanzaIdAcknowledgedListener(String id) {
  1579.         return stanzaIdAcknowledgedListeners.remove(id);
  1580.     }

  1581.     /**
  1582.      * Removes all Stanza ID acknowledged listeners.
  1583.      */
  1584.     public void removeAllStanzaIdAcknowledgedListeners() {
  1585.         stanzaIdAcknowledgedListeners.clear();
  1586.     }

  1587.     /**
  1588.      * Returns true if Stream Management is supported by the server.
  1589.      *
  1590.      * @return true if Stream Management is supported by the server.
  1591.      */
  1592.     public boolean isSmAvailable() {
  1593.         return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE);
  1594.     }

  1595.     /**
  1596.      * Returns true if Stream Management was successfully negotiated with the server.
  1597.      *
  1598.      * @return true if Stream Management was negotiated.
  1599.      */
  1600.     public boolean isSmEnabled() {
  1601.         return smEnabledSyncPoint;
  1602.     }

  1603.     /**
  1604.      * Returns true if the stream was successfully resumed with help of Stream Management.
  1605.      *
  1606.      * @return true if the stream was resumed.
  1607.      */
  1608.     public boolean streamWasResumed() {
  1609.         return smResumedSyncPoint == SyncPointState.successful;
  1610.     }

  1611.     /**
  1612.      * Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible.
  1613.      *
  1614.      * @return true if disconnected but resumption possible.
  1615.      */
  1616.     public boolean isDisconnectedButSmResumptionPossible() {
  1617.         return disconnectedButResumeable && isSmResumptionPossible();
  1618.     }

  1619.     /**
  1620.      * Returns true if the stream is resumable.
  1621.      *
  1622.      * @return true if the stream is resumable.
  1623.      */
  1624.     public boolean isSmResumptionPossible() {
  1625.         // There is no resumable stream available
  1626.         if (smSessionId == null)
  1627.             return false;

  1628.         final Long shutdownTimestamp = packetWriter.shutdownTimestamp;
  1629.         // Seems like we are already reconnected, report true
  1630.         if (shutdownTimestamp == null) {
  1631.             return true;
  1632.         }

  1633.         // See if resumption time is over
  1634.         long current = System.currentTimeMillis();
  1635.         long maxResumptionMillies = ((long) getMaxSmResumptionTime()) * 1000;
  1636.         if (current > shutdownTimestamp + maxResumptionMillies) {
  1637.             // Stream resumption is *not* possible if the current timestamp is greater then the greatest timestamp where
  1638.             // resumption is possible
  1639.             return false;
  1640.         } else {
  1641.             return true;
  1642.         }
  1643.     }

  1644.     /**
  1645.      * Drop the stream management state. Sets {@link #smSessionId} and
  1646.      * {@link #unacknowledgedStanzas} to <code>null</code>.
  1647.      */
  1648.     private void dropSmState() {
  1649.         // clientHandledCount and serverHandledCount will be reset on <enable/> and <enabled/>
  1650.         // respective. No need to reset them here.
  1651.         smSessionId = null;
  1652.         unacknowledgedStanzas = null;
  1653.     }

  1654.     /**
  1655.      * Get the maximum resumption time in seconds after which a managed stream can be resumed.
  1656.      * <p>
  1657.      * This method will return {@link Integer#MAX_VALUE} if neither the client nor the server specify a maximum
  1658.      * resumption time. Be aware of integer overflows when using this value, e.g. do not add arbitrary values to it
  1659.      * without checking for overflows before.
  1660.      * </p>
  1661.      *
  1662.      * @return the maximum resumption time in seconds or {@link Integer#MAX_VALUE} if none set.
  1663.      */
  1664.     public int getMaxSmResumptionTime() {
  1665.         int clientResumptionTime = smClientMaxResumptionTime > 0 ? smClientMaxResumptionTime : Integer.MAX_VALUE;
  1666.         int serverResumptionTime = smServerMaxResumptionTime > 0 ? smServerMaxResumptionTime : Integer.MAX_VALUE;
  1667.         return Math.min(clientResumptionTime, serverResumptionTime);
  1668.     }

  1669.     private void processHandledCount(long handledCount) throws StreamManagementCounterError {
  1670.         long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount);
  1671.         final List<Stanza> ackedStanzas = new ArrayList<>(
  1672.                         ackedStanzasCount <= Integer.MAX_VALUE ? (int) ackedStanzasCount
  1673.                                         : Integer.MAX_VALUE);
  1674.         for (long i = 0; i < ackedStanzasCount; i++) {
  1675.             Stanza ackedStanza = unacknowledgedStanzas.poll();
  1676.             // If the server ack'ed a stanza, then it must be in the
  1677.             // unacknowledged stanza queue. There can be no exception.
  1678.             if (ackedStanza == null) {
  1679.                 throw new StreamManagementCounterError(handledCount, serverHandledStanzasCount,
  1680.                                 ackedStanzasCount, ackedStanzas);
  1681.             }
  1682.             ackedStanzas.add(ackedStanza);
  1683.         }

  1684.         boolean atLeastOneStanzaAcknowledgedListener = false;
  1685.         if (!stanzaAcknowledgedListeners.isEmpty()) {
  1686.             // If stanzaAcknowledgedListeners is not empty, the we have at least one
  1687.             atLeastOneStanzaAcknowledgedListener = true;
  1688.         }
  1689.         else {
  1690.             // Otherwise we look for a matching id in the stanza *id* acknowledged listeners
  1691.             for (Stanza ackedStanza : ackedStanzas) {
  1692.                 String id = ackedStanza.getStanzaId();
  1693.                 if (id != null && stanzaIdAcknowledgedListeners.containsKey(id)) {
  1694.                     atLeastOneStanzaAcknowledgedListener = true;
  1695.                     break;
  1696.                 }
  1697.             }
  1698.         }

  1699.         // Only spawn a new thread if there is a chance that some listener is invoked
  1700.         if (atLeastOneStanzaAcknowledgedListener) {
  1701.             asyncGo(new Runnable() {
  1702.                 @Override
  1703.                 public void run() {
  1704.                     for (Stanza ackedStanza : ackedStanzas) {
  1705.                         for (StanzaListener listener : stanzaAcknowledgedListeners) {
  1706.                             try {
  1707.                                 listener.processStanza(ackedStanza);
  1708.                             }
  1709.                             catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
  1710.                                 LOGGER.log(Level.FINER, "Received exception", e);
  1711.                             }
  1712.                         }
  1713.                         String id = ackedStanza.getStanzaId();
  1714.                         if (StringUtils.isNullOrEmpty(id)) {
  1715.                             continue;
  1716.                         }
  1717.                         StanzaListener listener = stanzaIdAcknowledgedListeners.remove(id);
  1718.                         if (listener != null) {
  1719.                             try {
  1720.                                 listener.processStanza(ackedStanza);
  1721.                             }
  1722.                             catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
  1723.                                 LOGGER.log(Level.FINER, "Received exception", e);
  1724.                             }
  1725.                         }
  1726.                     }
  1727.                 }
  1728.             });
  1729.         }

  1730.         serverHandledStanzasCount = handledCount;
  1731.     }

  1732.     /**
  1733.      * Set the default bundle and defer callback used for new connections.
  1734.      *
  1735.      * @param defaultBundleAndDeferCallback TODO javadoc me please
  1736.      * @see BundleAndDeferCallback
  1737.      * @since 4.1
  1738.      */
  1739.     public static void setDefaultBundleAndDeferCallback(BundleAndDeferCallback defaultBundleAndDeferCallback) {
  1740.         XMPPTCPConnection.defaultBundleAndDeferCallback = defaultBundleAndDeferCallback;
  1741.     }

  1742.     /**
  1743.      * Set the bundle and defer callback used for this connection.
  1744.      * <p>
  1745.      * You can use <code>null</code> as argument to reset the callback. Outgoing stanzas will then
  1746.      * no longer get deferred.
  1747.      * </p>
  1748.      *
  1749.      * @param bundleAndDeferCallback the callback or <code>null</code>.
  1750.      * @see BundleAndDeferCallback
  1751.      * @since 4.1
  1752.      */
  1753.     public void setBundleandDeferCallback(BundleAndDeferCallback bundleAndDeferCallback) {
  1754.         this.bundleAndDeferCallback = bundleAndDeferCallback;
  1755.     }


  1756.     /**
  1757.      * Returns the local address currently in use for this connection.
  1758.      *
  1759.      * @return the local address
  1760.      */
  1761.     @Override
  1762.     public InetAddress getLocalAddress() {
  1763.         final Socket socket = this.socket;
  1764.         if (socket == null) return null;

  1765.         InetAddress localAddress = socket.getLocalAddress();
  1766.         if (localAddress.isAnyLocalAddress()) return null;

  1767.         return localAddress;
  1768.     }
  1769. }