XMPPTCPConnection.java

  1. /**
  2.  *
  3.  * Copyright 2003-2007 Jive Software.
  4.  *
  5.  * Licensed under the Apache License, Version 2.0 (the "License");
  6.  * you may not use this file except in compliance with the License.
  7.  * You may obtain a copy of the License at
  8.  *
  9.  *     http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */
  17. package org.jivesoftware.smack.tcp;

  18. import org.jivesoftware.smack.AbstractXMPPConnection;
  19. import org.jivesoftware.smack.ConnectionConfiguration;
  20. import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
  21. import org.jivesoftware.smack.ConnectionCreationListener;
  22. import org.jivesoftware.smack.StanzaListener;
  23. import org.jivesoftware.smack.SmackConfiguration;
  24. import org.jivesoftware.smack.SmackException;
  25. import org.jivesoftware.smack.SmackException.AlreadyConnectedException;
  26. import org.jivesoftware.smack.SmackException.AlreadyLoggedInException;
  27. import org.jivesoftware.smack.SmackException.NoResponseException;
  28. import org.jivesoftware.smack.SmackException.NotConnectedException;
  29. import org.jivesoftware.smack.SmackException.ConnectionException;
  30. import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException;
  31. import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException;
  32. import org.jivesoftware.smack.SmackException.SecurityRequiredException;
  33. import org.jivesoftware.smack.SynchronizationPoint;
  34. import org.jivesoftware.smack.XMPPException.StreamErrorException;
  35. import org.jivesoftware.smack.XMPPConnection;
  36. import org.jivesoftware.smack.XMPPException;
  37. import org.jivesoftware.smack.XMPPException.XMPPErrorException;
  38. import org.jivesoftware.smack.compress.packet.Compressed;
  39. import org.jivesoftware.smack.compression.XMPPInputOutputStream;
  40. import org.jivesoftware.smack.filter.StanzaFilter;
  41. import org.jivesoftware.smack.compress.packet.Compress;
  42. import org.jivesoftware.smack.packet.Element;
  43. import org.jivesoftware.smack.packet.IQ;
  44. import org.jivesoftware.smack.packet.Message;
  45. import org.jivesoftware.smack.packet.StreamOpen;
  46. import org.jivesoftware.smack.packet.Stanza;
  47. import org.jivesoftware.smack.packet.Presence;
  48. import org.jivesoftware.smack.packet.StartTls;
  49. import org.jivesoftware.smack.sasl.packet.SaslStreamElements;
  50. import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Challenge;
  51. import org.jivesoftware.smack.sasl.packet.SaslStreamElements.SASLFailure;
  52. import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Success;
  53. import org.jivesoftware.smack.sm.SMUtils;
  54. import org.jivesoftware.smack.sm.StreamManagementException;
  55. import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException;
  56. import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError;
  57. import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException;
  58. import org.jivesoftware.smack.sm.packet.StreamManagement;
  59. import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer;
  60. import org.jivesoftware.smack.sm.packet.StreamManagement.AckRequest;
  61. import org.jivesoftware.smack.sm.packet.StreamManagement.Enable;
  62. import org.jivesoftware.smack.sm.packet.StreamManagement.Enabled;
  63. import org.jivesoftware.smack.sm.packet.StreamManagement.Failed;
  64. import org.jivesoftware.smack.sm.packet.StreamManagement.Resume;
  65. import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed;
  66. import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature;
  67. import org.jivesoftware.smack.sm.predicates.Predicate;
  68. import org.jivesoftware.smack.sm.provider.ParseStreamManagement;
  69. import org.jivesoftware.smack.packet.PlainStreamElement;
  70. import org.jivesoftware.smack.packet.XMPPError;
  71. import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
  72. import org.jivesoftware.smack.util.Async;
  73. import org.jivesoftware.smack.util.PacketParserUtils;
  74. import org.jivesoftware.smack.util.StringUtils;
  75. import org.jivesoftware.smack.util.TLSUtils;
  76. import org.jivesoftware.smack.util.dns.HostAddress;
  77. import org.jxmpp.jid.impl.JidCreate;
  78. import org.jxmpp.stringprep.XmppStringprepException;
  79. import org.jxmpp.util.XmppStringUtils;
  80. import org.xmlpull.v1.XmlPullParser;
  81. import org.xmlpull.v1.XmlPullParserException;

  82. import javax.net.SocketFactory;
  83. import javax.net.ssl.HostnameVerifier;
  84. import javax.net.ssl.KeyManager;
  85. import javax.net.ssl.KeyManagerFactory;
  86. import javax.net.ssl.SSLContext;
  87. import javax.net.ssl.SSLSocket;
  88. import javax.security.auth.callback.Callback;
  89. import javax.security.auth.callback.PasswordCallback;

  90. import java.io.BufferedReader;
  91. import java.io.ByteArrayInputStream;
  92. import java.io.FileInputStream;
  93. import java.io.IOException;
  94. import java.io.InputStream;
  95. import java.io.InputStreamReader;
  96. import java.io.OutputStream;
  97. import java.io.OutputStreamWriter;
  98. import java.io.Writer;
  99. import java.lang.reflect.Constructor;
  100. import java.net.InetAddress;
  101. import java.net.InetSocketAddress;
  102. import java.net.Socket;
  103. import java.net.UnknownHostException;
  104. import java.security.KeyManagementException;
  105. import java.security.KeyStore;
  106. import java.security.KeyStoreException;
  107. import java.security.NoSuchAlgorithmException;
  108. import java.security.NoSuchProviderException;
  109. import java.security.Provider;
  110. import java.security.Security;
  111. import java.security.UnrecoverableKeyException;
  112. import java.security.cert.CertificateException;
  113. import java.util.ArrayList;
  114. import java.util.Arrays;
  115. import java.util.Collection;
  116. import java.util.Iterator;
  117. import java.util.LinkedHashSet;
  118. import java.util.LinkedList;
  119. import java.util.List;
  120. import java.util.Map;
  121. import java.util.Set;
  122. import java.util.concurrent.ArrayBlockingQueue;
  123. import java.util.concurrent.BlockingQueue;
  124. import java.util.concurrent.ConcurrentHashMap;
  125. import java.util.concurrent.ConcurrentLinkedQueue;
  126. import java.util.concurrent.TimeUnit;
  127. import java.util.concurrent.atomic.AtomicBoolean;
  128. import java.util.logging.Level;
  129. import java.util.logging.Logger;

  130. /**
  131.  * Creates a socket connection to an XMPP server. This is the default connection
  132.  * to an XMPP server and is specified in the XMPP Core (RFC 6120).
  133.  *
  134.  * @see XMPPConnection
  135.  * @author Matt Tucker
  136.  */
  137. public class XMPPTCPConnection extends AbstractXMPPConnection {

  138.     private static final int QUEUE_SIZE = 500;
  139.     private static final Logger LOGGER = Logger.getLogger(XMPPTCPConnection.class.getName());

  140.     /**
  141.      * The socket which is used for this connection.
  142.      */
  143.     private Socket socket;

  144.     /**
  145.      *
  146.      */
  147.     private boolean disconnectedButResumeable = false;

  148.     /**
  149.      * Flag to indicate if the socket was closed intentionally by Smack.
  150.      * <p>
  151.      * This boolean flag is used concurrently, therefore it is marked volatile.
  152.      * </p>
  153.      */
  154.     private volatile boolean socketClosed = false;

  155.     private boolean usingTLS = false;

  156.     /**
  157.      * Protected access level because of unit test purposes
  158.      */
  159.     protected PacketWriter packetWriter;

  160.     /**
  161.      * Protected access level because of unit test purposes
  162.      */
  163.     protected PacketReader packetReader;

  164.     private final SynchronizationPoint<Exception> initalOpenStreamSend = new SynchronizationPoint<Exception>(this);

  165.     /**
  166.      *
  167.      */
  168.     private final SynchronizationPoint<XMPPException> maybeCompressFeaturesReceived = new SynchronizationPoint<XMPPException>(
  169.                     this);

  170.     /**
  171.      *
  172.      */
  173.     private final SynchronizationPoint<XMPPException> compressSyncPoint = new SynchronizationPoint<XMPPException>(
  174.                     this);

  175.     /**
  176.      * The default bundle and defer callback, used for new connections.
  177.      * @see bundleAndDeferCallback
  178.      */
  179.     private static BundleAndDeferCallback defaultBundleAndDeferCallback;

  180.     /**
  181.      * The used bundle and defer callback.
  182.      * <p>
  183.      * Although this field may be set concurrently, the 'volatile' keyword was deliberately not added, in order to avoid
  184.      * having a 'volatile' read within the writer threads loop.
  185.      * </p>
  186.      */
  187.     private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback;

  188.     private static boolean useSmDefault = false;

  189.     private static boolean useSmResumptionDefault = true;

  190.     /**
  191.      * The stream ID of the stream that is currently resumable, ie. the stream we hold the state
  192.      * for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and
  193.      * {@link #unacknowledgedStanzas}.
  194.      */
  195.     private String smSessionId;

  196.     private final SynchronizationPoint<XMPPException> smResumedSyncPoint = new SynchronizationPoint<XMPPException>(
  197.                     this);

  198.     private final SynchronizationPoint<XMPPException> smEnabledSyncPoint = new SynchronizationPoint<XMPPException>(
  199.                     this);

  200.     /**
  201.      * The client's preferred maximum resumption time in seconds.
  202.      */
  203.     private int smClientMaxResumptionTime = -1;

  204.     /**
  205.      * The server's preferred maximum resumption time in seconds.
  206.      */
  207.     private int smServerMaxResumptimTime = -1;

  208.     /**
  209.      * Indicates whether Stream Management (XEP-198) should be used if it's supported by the server.
  210.      */
  211.     private boolean useSm = useSmDefault;
  212.     private boolean useSmResumption = useSmResumptionDefault;

  213.     /**
  214.      * The counter that the server sends the client about it's current height. For example, if the server sends
  215.      * {@code <a h='42'/>}, then this will be set to 42 (while also handling the {@link #unacknowledgedStanzas} queue).
  216.      */
  217.     private long serverHandledStanzasCount = 0;

  218.     /**
  219.      * The counter for stanzas handled ("received") by the client.
  220.      * <p>
  221.      * Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are
  222.      * not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since
  223.      * {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and
  224.      * therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic.
  225.      * </p>
  226.      */
  227.     private long clientHandledStanzasCount = 0;

  228.     private BlockingQueue<Stanza> unacknowledgedStanzas;

  229.     /**
  230.      * Set to true if Stream Management was at least once enabled for this connection.
  231.      */
  232.     private boolean smWasEnabledAtLeastOnce = false;

  233.     /**
  234.      * This listeners are invoked for every stanza that got acknowledged.
  235.      * <p>
  236.      * We use a {@link ConccurrentLinkedQueue} here in order to allow the listeners to remove
  237.      * themselves after they have been invoked.
  238.      * </p>
  239.      */
  240.     private final Collection<StanzaListener> stanzaAcknowledgedListeners = new ConcurrentLinkedQueue<StanzaListener>();

  241.     /**
  242.      * This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will
  243.      * only be invoked once and automatically removed after that.
  244.      */
  245.     private final Map<String, StanzaListener> stanzaIdAcknowledgedListeners = new ConcurrentHashMap<String, StanzaListener>();

  246.     /**
  247.      * Predicates that determine if an stream management ack should be requested from the server.
  248.      * <p>
  249.      * We use a linked hash set here, so that the order how the predicates are added matches the
  250.      * order in which they are invoked in order to determine if an ack request should be send or not.
  251.      * </p>
  252.      */
  253.     private final Set<StanzaFilter> requestAckPredicates = new LinkedHashSet<StanzaFilter>();

  254.     private final XMPPTCPConnectionConfiguration config;

  255.     /**
  256.      * Creates a new XMPP connection over TCP (optionally using proxies).
  257.      * <p>
  258.      * Note that XMPPTCPConnection constructors do not establish a connection to the server
  259.      * and you must call {@link #connect()}.
  260.      * </p>
  261.      *
  262.      * @param config the connection configuration.
  263.      */
  264.     public XMPPTCPConnection(XMPPTCPConnectionConfiguration config) {
  265.         super(config);
  266.         this.config = config;
  267.     }

  268.     /**
  269.      * Creates a new XMPP connection over TCP.
  270.      * <p>
  271.      * Note that {@code jid} must be the bare JID, e.g. "user@example.org". More fine-grained control over the
  272.      * connection settings is available using the {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)}
  273.      * constructor.
  274.      * </p>
  275.      *
  276.      * @param jid the bare JID used by the client.
  277.      * @param password the password or authentication token.
  278.      * @throws XmppStringprepException
  279.      */
  280.     public XMPPTCPConnection(CharSequence jid, String password) throws XmppStringprepException {
  281.         this(XmppStringUtils.parseLocalpart(jid.toString()), password, XmppStringUtils.parseDomain(jid.toString()));
  282.     }

  283.     /**
  284.      * Creates a new XMPP connection over TCP.
  285.      * <p>
  286.      * This is the simplest constructor for connecting to an XMPP server. Alternatively,
  287.      * you can get fine-grained control over connection settings using the
  288.      * {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} constructor.
  289.      * </p>
  290.      * @param username
  291.      * @param password
  292.      * @param serviceName
  293.      * @throws XmppStringprepException
  294.      */
  295.     public XMPPTCPConnection(CharSequence username, String password, String serviceName) throws XmppStringprepException {
  296.         this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setServiceName(
  297.                                         JidCreate.domainBareFrom(serviceName)).build());
  298.     }

  299.     @Override
  300.     protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException {
  301.         if (packetWriter == null) {
  302.             throw new NotConnectedException();
  303.         }
  304.         packetWriter.throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
  305.     }

  306.     @Override
  307.     protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException {
  308.         if (isConnected() && !disconnectedButResumeable) {
  309.             throw new AlreadyConnectedException();
  310.         }
  311.     }

  312.     @Override
  313.     protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException {
  314.         if (isAuthenticated() && !disconnectedButResumeable) {
  315.             throw new AlreadyLoggedInException();
  316.         }
  317.     }

  318.     @Override
  319.     protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException {
  320.         // Reset the flag in case it was set
  321.         disconnectedButResumeable = false;
  322.         super.afterSuccessfulLogin(resumed);
  323.     }

  324.     @Override
  325.     protected synchronized void loginNonAnonymously(String username, String password, String resource) throws XMPPException, SmackException, IOException, InterruptedException {
  326.         if (saslAuthentication.hasNonAnonymousAuthentication()) {
  327.             // Authenticate using SASL
  328.             if (password != null) {
  329.                 saslAuthentication.authenticate(username, password, resource);
  330.             }
  331.             else {
  332.                 saslAuthentication.authenticate(resource, config.getCallbackHandler());
  333.             }
  334.         } else {
  335.             throw new SmackException("No non-anonymous SASL authentication mechanism available");
  336.         }

  337.         // If compression is enabled then request the server to use stream compression. XEP-170
  338.         // recommends to perform stream compression before resource binding.
  339.         if (config.isCompressionEnabled()) {
  340.             useCompression();
  341.         }

  342.         if (isSmResumptionPossible()) {
  343.             smResumedSyncPoint.sendAndWaitForResponse(new Resume(clientHandledStanzasCount, smSessionId));
  344.             if (smResumedSyncPoint.wasSuccessful()) {
  345.                 // We successfully resumed the stream, be done here
  346.                 afterSuccessfulLogin(true);
  347.                 return;
  348.             }
  349.             // SM resumption failed, what Smack does here is to report success of
  350.             // lastFeaturesReceived in case of sm resumption was answered with 'failed' so that
  351.             // normal resource binding can be tried.
  352.             LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process");
  353.         }

  354.         bindResourceAndEstablishSession(resource);

  355.         List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>();
  356.         if (unacknowledgedStanzas != null) {
  357.             // There was a previous connection with SM enabled but that was either not resumable or
  358.             // failed to resume. Make sure that we (re-)send the unacknowledged stanzas.
  359.             unacknowledgedStanzas.drainTo(previouslyUnackedStanzas);
  360.             // Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this
  361.             // XMPP session (There maybe was an enabled in a previous XMPP session of this
  362.             // connection instance though). This is used in writePackets to decide if stanzas should
  363.             // be added to the unacknowledged stanzas queue, because they have to be added right
  364.             // after the 'enable' stream element has been sent.
  365.             unacknowledgedStanzas = null;
  366.         }
  367.         if (isSmAvailable() && useSm) {
  368.             // Remove what is maybe left from previously stream managed sessions
  369.             serverHandledStanzasCount = 0;
  370.             // XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed'
  371.             // then this is a non recoverable error and we therefore throw an exception.
  372.             smEnabledSyncPoint.sendAndWaitForResponseOrThrow(new Enable(useSmResumption, smClientMaxResumptionTime));
  373.             synchronized (requestAckPredicates) {
  374.                 if (requestAckPredicates.isEmpty()) {
  375.                     // Assure that we have at lest one predicate set up that so that we request acks
  376.                     // for the server and eventually flush some stanzas from the unacknowledged
  377.                     // stanza queue
  378.                     requestAckPredicates.add(Predicate.forMessagesOrAfter5Stanzas());
  379.                 }
  380.             }
  381.         }
  382.         // (Re-)send the stanzas *after* we tried to enable SM
  383.         for (Stanza stanza : previouslyUnackedStanzas) {
  384.             sendStanzaInternal(stanza);
  385.         }

  386.         afterSuccessfulLogin(false);
  387.     }

  388.     @Override
  389.     public synchronized void loginAnonymously() throws XMPPException, SmackException, IOException, InterruptedException {
  390.         // Wait with SASL auth until the SASL mechanisms have been received
  391.         saslFeatureReceived.checkIfSuccessOrWaitOrThrow();

  392.         if (saslAuthentication.hasAnonymousAuthentication()) {
  393.             saslAuthentication.authenticateAnonymously();
  394.         }
  395.         else {
  396.             throw new SmackException("No anonymous SASL authentication mechanism available");
  397.         }

  398.         // If compression is enabled then request the server to use stream compression
  399.         if (config.isCompressionEnabled()) {
  400.             useCompression();
  401.         }

  402.         bindResourceAndEstablishSession(null);

  403.         afterSuccessfulLogin(false);
  404.     }

  405.     @Override
  406.     public boolean isSecureConnection() {
  407.         return usingTLS;
  408.     }

  409.     public boolean isSocketClosed() {
  410.         return socketClosed;
  411.     }

  412.     /**
  413.      * Shuts the current connection down. After this method returns, the connection must be ready
  414.      * for re-use by connect.
  415.      */
  416.     @Override
  417.     protected void shutdown() {
  418.         if (isSmEnabled()) {
  419.             try {
  420.                 // Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM
  421.                 // state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either.
  422.                 sendSmAcknowledgementInternal();
  423.             } catch (InterruptedException | NotConnectedException e) {
  424.                 LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e);
  425.             }
  426.         }
  427.         shutdown(false);
  428.     }

  429.     /**
  430.      * Performs an unclean disconnect and shutdown of the connection. Does not send a closing stream stanza.
  431.      */
  432.     public synchronized void instantShutdown() {
  433.         shutdown(true);
  434.     }

  435.     private void shutdown(boolean instant) {
  436.         if (disconnectedButResumeable) {
  437.             return;
  438.         }
  439.         if (packetReader != null) {
  440.                 packetReader.shutdown();
  441.         }
  442.         if (packetWriter != null) {
  443.                 packetWriter.shutdown(instant);
  444.         }

  445.         // Set socketClosed to true. This will cause the PacketReader
  446.         // and PacketWriter to ignore any Exceptions that are thrown
  447.         // because of a read/write from/to a closed stream.
  448.         // It is *important* that this is done before socket.close()!
  449.         socketClosed = true;
  450.         try {
  451.                 socket.close();
  452.         } catch (Exception e) {
  453.                 LOGGER.log(Level.WARNING, "shutdown", e);
  454.         }

  455.         setWasAuthenticated();
  456.         // If we are able to resume the stream, then don't set
  457.         // connected/authenticated/usingTLS to false since we like behave like we are still
  458.         // connected (e.g. sendStanza should not throw a NotConnectedException).
  459.         if (isSmResumptionPossible() && instant) {
  460.             disconnectedButResumeable = true;
  461.         } else {
  462.             disconnectedButResumeable = false;
  463.             // Reset the stream management session id to null, since if the stream is cleanly closed, i.e. sending a closing
  464.             // stream tag, there is no longer a stream to resume.
  465.             smSessionId = null;
  466.         }
  467.         authenticated = false;
  468.         connected = false;
  469.         usingTLS = false;
  470.         reader = null;
  471.         writer = null;

  472.         maybeCompressFeaturesReceived.init();
  473.         compressSyncPoint.init();
  474.         smResumedSyncPoint.init();
  475.         smEnabledSyncPoint.init();
  476.         initalOpenStreamSend.init();
  477.     }

  478.     @Override
  479.     public void send(PlainStreamElement element) throws NotConnectedException, InterruptedException {
  480.         packetWriter.sendStreamElement(element);
  481.     }

  482.     @Override
  483.     protected void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException {
  484.         packetWriter.sendStreamElement(packet);
  485.         if (isSmEnabled()) {
  486.             for (StanzaFilter requestAckPredicate : requestAckPredicates) {
  487.                 if (requestAckPredicate.accept(packet)) {
  488.                     requestSmAcknowledgementInternal();
  489.                     break;
  490.                 }
  491.             }
  492.         }
  493.     }

  494.     private void connectUsingConfiguration() throws IOException, ConnectionException {
  495.         List<HostAddress> failedAddresses = populateHostAddresses();
  496.         SocketFactory socketFactory = config.getSocketFactory();
  497.         if (socketFactory == null) {
  498.             socketFactory = SocketFactory.getDefault();
  499.         }
  500.         for (HostAddress hostAddress : hostAddresses) {
  501.             String host = hostAddress.getFQDN();
  502.             int port = hostAddress.getPort();
  503.             socket = socketFactory.createSocket();
  504.             try {
  505.                 Iterator<InetAddress> inetAddresses = Arrays.asList(InetAddress.getAllByName(host)).iterator();
  506.                 if (!inetAddresses.hasNext()) {
  507.                     // This should not happen
  508.                     LOGGER.warning("InetAddress.getAllByName() returned empty result array.");
  509.                     throw new UnknownHostException(host);
  510.                 }
  511.                 innerloop: while (inetAddresses.hasNext()) {
  512.                     final InetAddress inetAddress = inetAddresses.next();
  513.                     final String inetAddressAndPort = inetAddress + " at port " + port;
  514.                     LOGGER.finer("Trying to establish TCP connection to " + inetAddressAndPort);
  515.                     try {
  516.                         socket.connect(new InetSocketAddress(inetAddress, port), config.getConnectTimeout());
  517.                     } catch (Exception e) {
  518.                         if (inetAddresses.hasNext()) {
  519.                             continue innerloop;
  520.                         } else {
  521.                             throw e;
  522.                         }
  523.                     }
  524.                     LOGGER.finer("Established TCP connection to " + inetAddressAndPort);
  525.                     // We found a host to connect to, return here
  526.                     this.host = host;
  527.                     this.port = port;
  528.                     return;
  529.                 }
  530.             }
  531.             catch (Exception e) {
  532.                 hostAddress.setException(e);
  533.                 failedAddresses.add(hostAddress);
  534.             }
  535.         }
  536.         // There are no more host addresses to try
  537.         // throw an exception and report all tried
  538.         // HostAddresses in the exception
  539.         throw ConnectionException.from(failedAddresses);
  540.     }

  541.     /**
  542.      * Initializes the connection by creating a packet reader and writer and opening a
  543.      * XMPP stream to the server.
  544.      *
  545.      * @throws XMPPException if establishing a connection to the server fails.
  546.      * @throws SmackException if the server failes to respond back or if there is anther error.
  547.      * @throws IOException
  548.      */
  549.     private void initConnection() throws IOException {
  550.         boolean isFirstInitialization = packetReader == null || packetWriter == null;
  551.         compressionHandler = null;

  552.         // Set the reader and writer instance variables
  553.         initReaderAndWriter();

  554.         if (isFirstInitialization) {
  555.             packetWriter = new PacketWriter();
  556.             packetReader = new PacketReader();

  557.             // If debugging is enabled, we should start the thread that will listen for
  558.             // all packets and then log them.
  559.             if (config.isDebuggerEnabled()) {
  560.                 addAsyncStanzaListener(debugger.getReaderListener(), null);
  561.                 if (debugger.getWriterListener() != null) {
  562.                     addPacketSendingListener(debugger.getWriterListener(), null);
  563.                 }
  564.             }
  565.         }
  566.         // Start the packet writer. This will open an XMPP stream to the server
  567.         packetWriter.init();
  568.         // Start the packet reader. The startup() method will block until we
  569.         // get an opening stream packet back from server
  570.         packetReader.init();

  571.         if (isFirstInitialization) {
  572.             // Notify listeners that a new connection has been established
  573.             for (ConnectionCreationListener listener : getConnectionCreationListeners()) {
  574.                 listener.connectionCreated(this);
  575.             }
  576.         }
  577.     }

  578.     private void initReaderAndWriter() throws IOException {
  579.         InputStream is = socket.getInputStream();
  580.         OutputStream os = socket.getOutputStream();
  581.         if (compressionHandler != null) {
  582.             is = compressionHandler.getInputStream(is);
  583.             os = compressionHandler.getOutputStream(os);
  584.         }
  585.         // OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter
  586.         writer = new OutputStreamWriter(os, "UTF-8");
  587.         reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));

  588.         // If debugging is enabled, we open a window and write out all network traffic.
  589.         initDebugger();
  590.     }

  591.     /**
  592.      * The server has indicated that TLS negotiation can start. We now need to secure the
  593.      * existing plain connection and perform a handshake. This method won't return until the
  594.      * connection has finished the handshake or an error occurred while securing the connection.
  595.      * @throws IOException
  596.      * @throws CertificateException
  597.      * @throws NoSuchAlgorithmException
  598.      * @throws NoSuchProviderException
  599.      * @throws KeyStoreException
  600.      * @throws UnrecoverableKeyException
  601.      * @throws KeyManagementException
  602.      * @throws SmackException
  603.      * @throws Exception if an exception occurs.
  604.      */
  605.     private void proceedTLSReceived() throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException, NoSuchProviderException, UnrecoverableKeyException, KeyManagementException, SmackException {
  606.         SSLContext context = this.config.getCustomSSLContext();
  607.         KeyStore ks = null;
  608.         KeyManager[] kms = null;
  609.         PasswordCallback pcb = null;

  610.         if(config.getCallbackHandler() == null) {
  611.            ks = null;
  612.         } else if (context == null) {
  613.             if(config.getKeystoreType().equals("NONE")) {
  614.                 ks = null;
  615.                 pcb = null;
  616.             }
  617.             else if(config.getKeystoreType().equals("PKCS11")) {
  618.                 try {
  619.                     Constructor<?> c = Class.forName("sun.security.pkcs11.SunPKCS11").getConstructor(InputStream.class);
  620.                     String pkcs11Config = "name = SmartCard\nlibrary = "+config.getPKCS11Library();
  621.                     ByteArrayInputStream config = new ByteArrayInputStream(pkcs11Config.getBytes());
  622.                     Provider p = (Provider)c.newInstance(config);
  623.                     Security.addProvider(p);
  624.                     ks = KeyStore.getInstance("PKCS11",p);
  625.                     pcb = new PasswordCallback("PKCS11 Password: ",false);
  626.                     this.config.getCallbackHandler().handle(new Callback[]{pcb});
  627.                     ks.load(null,pcb.getPassword());
  628.                 }
  629.                 catch (Exception e) {
  630.                     ks = null;
  631.                     pcb = null;
  632.                 }
  633.             }
  634.             else if(config.getKeystoreType().equals("Apple")) {
  635.                 ks = KeyStore.getInstance("KeychainStore","Apple");
  636.                 ks.load(null,null);
  637.                 //pcb = new PasswordCallback("Apple Keychain",false);
  638.                 //pcb.setPassword(null);
  639.             }
  640.             else {
  641.                 ks = KeyStore.getInstance(config.getKeystoreType());
  642.                 try {
  643.                     pcb = new PasswordCallback("Keystore Password: ",false);
  644.                     config.getCallbackHandler().handle(new Callback[]{pcb});
  645.                     ks.load(new FileInputStream(config.getKeystorePath()), pcb.getPassword());
  646.                 }
  647.                 catch(Exception e) {
  648.                     ks = null;
  649.                     pcb = null;
  650.                 }
  651.             }
  652.             KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
  653.             try {
  654.                 if(pcb == null) {
  655.                     kmf.init(ks,null);
  656.                 } else {
  657.                     kmf.init(ks,pcb.getPassword());
  658.                     pcb.clearPassword();
  659.                 }
  660.                 kms = kmf.getKeyManagers();
  661.             } catch (NullPointerException npe) {
  662.                 kms = null;
  663.             }
  664.         }

  665.         // If the user didn't specify a SSLContext, use the default one
  666.         if (context == null) {
  667.             context = SSLContext.getInstance("TLS");
  668.             context.init(kms, null, new java.security.SecureRandom());
  669.         }
  670.         Socket plain = socket;
  671.         // Secure the plain connection
  672.         socket = context.getSocketFactory().createSocket(plain,
  673.                 host, plain.getPort(), true);
  674.         // Initialize the reader and writer with the new secured version
  675.         initReaderAndWriter();

  676.         final SSLSocket sslSocket = (SSLSocket) socket;
  677.         TLSUtils.setEnabledProtocolsAndCiphers(sslSocket, config.getEnabledSSLProtocols(), config.getEnabledSSLCiphers());

  678.         // Proceed to do the handshake
  679.         sslSocket.startHandshake();

  680.         final HostnameVerifier verifier = getConfiguration().getHostnameVerifier();
  681.         if (verifier == null) {
  682.                 throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure.");
  683.         } else if (!verifier.verify(getServiceName().toString(), sslSocket.getSession())) {
  684.             throw new CertificateException("Hostname verification of certificate failed. Certificate does not authenticate " + getServiceName());
  685.         }

  686.         // Set that TLS was successful
  687.         usingTLS = true;
  688.     }

  689.     /**
  690.      * Returns the compression handler that can be used for one compression methods offered by the server.
  691.      *
  692.      * @return a instance of XMPPInputOutputStream or null if no suitable instance was found
  693.      *
  694.      */
  695.     private XMPPInputOutputStream maybeGetCompressionHandler() {
  696.         Compress.Feature compression = getFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE);
  697.         if (compression == null) {
  698.             // Server does not support compression
  699.             return null;
  700.         }
  701.         for (XMPPInputOutputStream handler : SmackConfiguration.getCompresionHandlers()) {
  702.                 String method = handler.getCompressionMethod();
  703.                 if (compression.getMethods().contains(method))
  704.                     return handler;
  705.         }
  706.         return null;
  707.     }

  708.     @Override
  709.     public boolean isUsingCompression() {
  710.         return compressionHandler != null && compressSyncPoint.wasSuccessful();
  711.     }

  712.     /**
  713.      * <p>
  714.      * Starts using stream compression that will compress network traffic. Traffic can be
  715.      * reduced up to 90%. Therefore, stream compression is ideal when using a slow speed network
  716.      * connection. However, the server and the client will need to use more CPU time in order to
  717.      * un/compress network data so under high load the server performance might be affected.
  718.      * </p>
  719.      * <p>
  720.      * Stream compression has to have been previously offered by the server. Currently only the
  721.      * zlib method is supported by the client. Stream compression negotiation has to be done
  722.      * before authentication took place.
  723.      * </p>
  724.      *
  725.      * @throws NotConnectedException
  726.      * @throws XMPPException
  727.      * @throws NoResponseException
  728.      * @throws InterruptedException
  729.      */
  730.     private void useCompression() throws NotConnectedException, NoResponseException, XMPPException, InterruptedException {
  731.         maybeCompressFeaturesReceived.checkIfSuccessOrWait();
  732.         // If stream compression was offered by the server and we want to use
  733.         // compression then send compression request to the server
  734.         if ((compressionHandler = maybeGetCompressionHandler()) != null) {
  735.             compressSyncPoint.sendAndWaitForResponseOrThrow(new Compress(compressionHandler.getCompressionMethod()));
  736.         } else {
  737.             LOGGER.warning("Could not enable compression because no matching handler/method pair was found");
  738.         }
  739.     }

  740.     /**
  741.      * Establishes a connection to the XMPP server and performs an automatic login
  742.      * only if the previous connection state was logged (authenticated). It basically
  743.      * creates and maintains a socket connection to the server.<p>
  744.      * <p/>
  745.      * Listeners will be preserved from a previous connection if the reconnection
  746.      * occurs after an abrupt termination.
  747.      *
  748.      * @throws XMPPException if an error occurs while trying to establish the connection.
  749.      * @throws SmackException
  750.      * @throws IOException
  751.      * @throws InterruptedException
  752.      */
  753.     @Override
  754.     protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException {
  755.         // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if
  756.         // there is an error establishing the connection
  757.         connectUsingConfiguration();

  758.         // We connected successfully to the servers TCP port
  759.         socketClosed = false;
  760.         initConnection();

  761.         // Wait with SASL auth until the SASL mechanisms have been received
  762.         saslFeatureReceived.checkIfSuccessOrWaitOrThrow();

  763.         // Make note of the fact that we're now connected.
  764.         connected = true;
  765.         callConnectionConnectedListener();

  766.         // Automatically makes the login if the user was previously connected successfully
  767.         // to the server and the connection was terminated abruptly
  768.         if (wasAuthenticated) {
  769.             login();
  770.             notifyReconnection();
  771.         }
  772.     }

  773.     /**
  774.      * Sends out a notification that there was an error with the connection
  775.      * and closes the connection. Also prints the stack trace of the given exception
  776.      *
  777.      * @param e the exception that causes the connection close event.
  778.      */
  779.     private synchronized void notifyConnectionError(Exception e) {
  780.         // Listeners were already notified of the exception, return right here.
  781.         if ((packetReader == null || packetReader.done) &&
  782.                 (packetWriter == null || packetWriter.done())) return;

  783.         // Closes the connection temporary. A reconnection is possible
  784.         instantShutdown();

  785.         // Notify connection listeners of the error.
  786.         callConnectionClosedOnErrorListener(e);
  787.     }

  788.     /**
  789.      * For unit testing purposes
  790.      *
  791.      * @param writer
  792.      */
  793.     protected void setWriter(Writer writer) {
  794.         this.writer = writer;
  795.     }

  796.     @Override
  797.     protected void afterFeaturesReceived() throws SecurityRequiredException, NotConnectedException, InterruptedException {
  798.         StartTls startTlsFeature = getFeature(StartTls.ELEMENT, StartTls.NAMESPACE);
  799.         if (startTlsFeature != null) {
  800.             if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) {
  801.                 notifyConnectionError(new SecurityRequiredByServerException());
  802.                 return;
  803.             }

  804.             if (config.getSecurityMode() == ConnectionConfiguration.SecurityMode.disabled) {
  805.                 // Do not secure the connection using TLS since TLS was disabled
  806.                 return;
  807.             }
  808.             send(new StartTls());
  809.         }
  810.         // If TLS is required but the server doesn't offer it, disconnect
  811.         // from the server and throw an error. First check if we've already negotiated TLS
  812.         // and are secure, however (features get parsed a second time after TLS is established).
  813.         if (!isSecureConnection() && startTlsFeature == null
  814.                         && getConfiguration().getSecurityMode() == SecurityMode.required) {
  815.             throw new SecurityRequiredByClientException();
  816.         }

  817.         if (getSASLAuthentication().authenticationSuccessful()) {
  818.             // If we have received features after the SASL has been successfully completed, then we
  819.             // have also *maybe* received, as it is an optional feature, the compression feature
  820.             // from the server.
  821.             maybeCompressFeaturesReceived.reportSuccess();
  822.         }
  823.     }

  824.     /**
  825.      * Resets the parser using the latest connection's reader. Reseting the parser is necessary
  826.      * when the plain connection has been secured or when a new opening stream element is going
  827.      * to be sent by the server.
  828.      *
  829.      * @throws SmackException if the parser could not be reset.
  830.      * @throws InterruptedException
  831.      */
  832.     void openStream() throws SmackException, InterruptedException {
  833.         // If possible, provide the receiving entity of the stream open tag, i.e. the server, as much information as
  834.         // possible. The 'to' attribute is *always* available. The 'from' attribute if set by the user and no external
  835.         // mechanism is used to determine the local entity (user). And the 'id' attribute is available after the first
  836.         // response from the server (see e.g. RFC 6120 ยง 9.1.1 Step 2.)
  837.         CharSequence to = getServiceName();
  838.         CharSequence from = null;
  839.         CharSequence localpart = config.getUsername();
  840.         if (localpart != null) {
  841.             from = XmppStringUtils.completeJidFrom(localpart, to);
  842.         }
  843.         String id = getStreamId();
  844.         send(new StreamOpen(to, from, id));
  845.         try {
  846.             packetReader.parser = PacketParserUtils.newXmppParser(reader);
  847.         }
  848.         catch (XmlPullParserException e) {
  849.             throw new SmackException(e);
  850.         }
  851.     }

  852.     protected class PacketReader {

  853.         XmlPullParser parser;

  854.         private volatile boolean done;

  855.         /**
  856.          * Initializes the reader in order to be used. The reader is initialized during the
  857.          * first connection and when reconnecting due to an abruptly disconnection.
  858.          */
  859.         void init() {
  860.             done = false;

  861.             Async.go(new Runnable() {
  862.                 public void run() {
  863.                     parsePackets();
  864.                 }
  865.             }, "Smack Packet Reader (" + getConnectionCounter() + ")");
  866.          }

  867.         /**
  868.          * Shuts the packet reader down. This method simply sets the 'done' flag to true.
  869.          */
  870.         void shutdown() {
  871.             done = true;
  872.         }

  873.         /**
  874.          * Parse top-level packets in order to process them further.
  875.          *
  876.          * @param thread the thread that is being used by the reader to parse incoming packets.
  877.          */
  878.         private void parsePackets() {
  879.             try {
  880.                 initalOpenStreamSend.checkIfSuccessOrWait();
  881.                 int eventType = parser.getEventType();
  882.                 while (!done) {
  883.                     switch (eventType) {
  884.                     case XmlPullParser.START_TAG:
  885.                         final String name = parser.getName();
  886.                         switch (name) {
  887.                         case Message.ELEMENT:
  888.                         case IQ.IQ_ELEMENT:
  889.                         case Presence.ELEMENT:
  890.                             try {
  891.                                 parseAndProcessStanza(parser);
  892.                             } finally {
  893.                                 clientHandledStanzasCount = SMUtils.incrementHeight(clientHandledStanzasCount);
  894.                             }
  895.                             break;
  896.                         case "stream":
  897.                             // We found an opening stream.
  898.                             if ("jabber:client".equals(parser.getNamespace(null))) {
  899.                                 streamId = parser.getAttributeValue("", "id");
  900.                                 String reportedServiceName = parser.getAttributeValue("", "from");
  901.                                 assert(reportedServiceName.equals(config.getServiceName()));
  902.                             }
  903.                             break;
  904.                         case "error":
  905.                             throw new StreamErrorException(PacketParserUtils.parseStreamError(parser));
  906.                         case "features":
  907.                             parseFeatures(parser);
  908.                             break;
  909.                         case "proceed":
  910.                             try {
  911.                                 // Secure the connection by negotiating TLS
  912.                                 proceedTLSReceived();
  913.                                 // Send a new opening stream to the server
  914.                                 openStream();
  915.                             }
  916.                             catch (Exception e) {
  917.                                 // We report any failure regarding TLS in the second stage of XMPP
  918.                                 // connection establishment, namely the SASL authentication
  919.                                 saslFeatureReceived.reportFailure(new SmackException(e));
  920.                                 throw e;
  921.                             }
  922.                             break;
  923.                         case "failure":
  924.                             String namespace = parser.getNamespace(null);
  925.                             switch (namespace) {
  926.                             case "urn:ietf:params:xml:ns:xmpp-tls":
  927.                                 // TLS negotiation has failed. The server will close the connection
  928.                                 // TODO Parse failure stanza
  929.                                 throw new XMPPErrorException("TLS negotiation has failed", null);
  930.                             case "http://jabber.org/protocol/compress":
  931.                                 // Stream compression has been denied. This is a recoverable
  932.                                 // situation. It is still possible to authenticate and
  933.                                 // use the connection but using an uncompressed connection
  934.                                 // TODO Parse failure stanza
  935.                                 compressSyncPoint.reportFailure(new XMPPErrorException(
  936.                                                 "Could not establish compression", null));
  937.                                 break;
  938.                             case SaslStreamElements.NAMESPACE:
  939.                                 // SASL authentication has failed. The server may close the connection
  940.                                 // depending on the number of retries
  941.                                 final SASLFailure failure = PacketParserUtils.parseSASLFailure(parser);
  942.                                 getSASLAuthentication().authenticationFailed(failure);
  943.                                 break;
  944.                             }
  945.                             break;
  946.                         case Challenge.ELEMENT:
  947.                             // The server is challenging the SASL authentication made by the client
  948.                             String challengeData = parser.nextText();
  949.                             getSASLAuthentication().challengeReceived(challengeData);
  950.                             break;
  951.                         case Success.ELEMENT:
  952.                             Success success = new Success(parser.nextText());
  953.                             // We now need to bind a resource for the connection
  954.                             // Open a new stream and wait for the response
  955.                             openStream();
  956.                             // The SASL authentication with the server was successful. The next step
  957.                             // will be to bind the resource
  958.                             getSASLAuthentication().authenticated(success);
  959.                             break;
  960.                         case Compressed.ELEMENT:
  961.                             // Server confirmed that it's possible to use stream compression. Start
  962.                             // stream compression
  963.                             // Initialize the reader and writer with the new compressed version
  964.                             initReaderAndWriter();
  965.                             // Send a new opening stream to the server
  966.                             openStream();
  967.                             // Notify that compression is being used
  968.                             compressSyncPoint.reportSuccess();
  969.                             break;
  970.                         case Enabled.ELEMENT:
  971.                             Enabled enabled = ParseStreamManagement.enabled(parser);
  972.                             if (enabled.isResumeSet()) {
  973.                                 smSessionId = enabled.getId();
  974.                                 if (StringUtils.isNullOrEmpty(smSessionId)) {
  975.                                     XMPPErrorException xmppException = new XMPPErrorException(
  976.                                                     "Stream Management 'enabled' element with resume attribute but without session id received",
  977.                                                     new XMPPError(
  978.                                                                     XMPPError.Condition.bad_request));
  979.                                     smEnabledSyncPoint.reportFailure(xmppException);
  980.                                     throw xmppException;
  981.                                 }
  982.                                 smServerMaxResumptimTime = enabled.getMaxResumptionTime();
  983.                             } else {
  984.                                 // Mark this a non-resumable stream by setting smSessionId to null
  985.                                 smSessionId = null;
  986.                             }
  987.                             clientHandledStanzasCount = 0;
  988.                             smWasEnabledAtLeastOnce = true;
  989.                             smEnabledSyncPoint.reportSuccess();
  990.                             LOGGER.fine("Stream Management (XEP-198): succesfully enabled");
  991.                             break;
  992.                         case Failed.ELEMENT:
  993.                             Failed failed = ParseStreamManagement.failed(parser);
  994.                             XMPPError xmppError = new XMPPError(failed.getXMPPErrorCondition());
  995.                             XMPPException xmppException = new XMPPErrorException("Stream Management failed", xmppError);
  996.                             // If only XEP-198 would specify different failure elements for the SM
  997.                             // enable and SM resume failure case. But this is not the case, so we
  998.                             // need to determine if this is a 'Failed' response for either 'Enable'
  999.                             // or 'Resume'.
  1000.                             if (smResumedSyncPoint.requestSent()) {
  1001.                                 smResumedSyncPoint.reportFailure(xmppException);
  1002.                             }
  1003.                             else {
  1004.                                 if (!smEnabledSyncPoint.requestSent()) {
  1005.                                     throw new IllegalStateException("Failed element received but SM was not previously enabled");
  1006.                                 }
  1007.                                 smEnabledSyncPoint.reportFailure(xmppException);
  1008.                                 // Report success for last lastFeaturesReceived so that in case a
  1009.                                 // failed resumption, we can continue with normal resource binding.
  1010.                                 // See text of XEP-198 5. below Example 11.
  1011.                                 lastFeaturesReceived.reportSuccess();
  1012.                             }
  1013.                             break;
  1014.                         case Resumed.ELEMENT:
  1015.                             Resumed resumed = ParseStreamManagement.resumed(parser);
  1016.                             if (!smSessionId.equals(resumed.getPrevId())) {
  1017.                                 throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId());
  1018.                             }
  1019.                             // First, drop the stanzas already handled by the server
  1020.                             processHandledCount(resumed.getHandledCount());
  1021.                             // Then re-send what is left in the unacknowledged queue
  1022.                             List<Stanza> stanzasToResend = new LinkedList<Stanza>();
  1023.                             stanzasToResend.addAll(unacknowledgedStanzas);
  1024.                             for (Stanza stanza : stanzasToResend) {
  1025.                                 packetWriter.sendStreamElement(stanza);
  1026.                             }
  1027.                             smResumedSyncPoint.reportSuccess();
  1028.                             smEnabledSyncPoint.reportSuccess();
  1029.                             LOGGER.fine("Stream Management (XEP-198): Stream resumed");
  1030.                             break;
  1031.                         case AckAnswer.ELEMENT:
  1032.                             AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser);
  1033.                             processHandledCount(ackAnswer.getHandledCount());
  1034.                             break;
  1035.                         case AckRequest.ELEMENT:
  1036.                             ParseStreamManagement.ackRequest(parser);
  1037.                             if (smEnabledSyncPoint.wasSuccessful()) {
  1038.                                 sendSmAcknowledgementInternal();
  1039.                             } else {
  1040.                                 LOGGER.warning("SM Ack Request received while SM is not enabled");
  1041.                             }
  1042.                             break;
  1043.                          default:
  1044.                              LOGGER.warning("Unknown top level stream element: " + name);
  1045.                              break;
  1046.                         }
  1047.                         break;
  1048.                     case XmlPullParser.END_TAG:
  1049.                         if (parser.getName().equals("stream")) {
  1050.                             // Disconnect the connection
  1051.                             disconnect();
  1052.                         }
  1053.                         break;
  1054.                     case XmlPullParser.END_DOCUMENT:
  1055.                         // END_DOCUMENT only happens in an error case, as otherwise we would see a
  1056.                         // closing stream element before.
  1057.                         throw new SmackException(
  1058.                                         "Parser got END_DOCUMENT event. This could happen e.g. if the server closed the connection without sending a closing stream element");
  1059.                     }
  1060.                     eventType = parser.next();
  1061.                 }
  1062.             }
  1063.             catch (Exception e) {
  1064.                 // The exception can be ignored if the the connection is 'done'
  1065.                 // or if the it was caused because the socket got closed
  1066.                 if (!(done || isSocketClosed())) {
  1067.                     // Close the connection and notify connection listeners of the
  1068.                     // error.
  1069.                     notifyConnectionError(e);
  1070.                 }
  1071.             }
  1072.         }
  1073.     }

  1074.     protected class PacketWriter {
  1075.         public static final int QUEUE_SIZE = XMPPTCPConnection.QUEUE_SIZE;

  1076.         private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<Element>(
  1077.                         QUEUE_SIZE, true);

  1078.         /**
  1079.          * Needs to be protected for unit testing purposes.
  1080.          */
  1081.         protected SynchronizationPoint<NoResponseException> shutdownDone = new SynchronizationPoint<NoResponseException>(
  1082.                         XMPPTCPConnection.this);

  1083.         /**
  1084.          * If set, the packet writer is shut down
  1085.          */
  1086.         protected volatile Long shutdownTimestamp = null;

  1087.         private volatile boolean instantShutdown;

  1088.         /**
  1089.          * True if some preconditions are given to start the bundle and defer mechanism.
  1090.          * <p>
  1091.          * This will likely get set to true right after the start of the writer thread, because
  1092.          * {@link #nextStreamElement()} will check if {@link queue} is empty, which is probably the case, and then set
  1093.          * this field to true.
  1094.          * </p>
  1095.          */
  1096.         private boolean shouldBundleAndDefer;

  1097.         /**
  1098.         * Initializes the writer in order to be used. It is called at the first connection and also
  1099.         * is invoked if the connection is disconnected by an error.
  1100.         */
  1101.         void init() {
  1102.             shutdownDone.init();
  1103.             shutdownTimestamp = null;

  1104.             if (unacknowledgedStanzas != null) {
  1105.                 // It's possible that there are new stanzas in the writer queue that
  1106.                 // came in while we were disconnected but resumable, drain those into
  1107.                 // the unacknowledged queue so that they get resent now
  1108.                 drainWriterQueueToUnacknowledgedStanzas();
  1109.             }

  1110.             queue.start();
  1111.             Async.go(new Runnable() {
  1112.                 @Override
  1113.                 public void run() {
  1114.                     writePackets();
  1115.                 }
  1116.             }, "Smack Packet Writer (" + getConnectionCounter() + ")");
  1117.         }

  1118.         private boolean done() {
  1119.             return shutdownTimestamp != null;
  1120.         }

  1121.         protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException {
  1122.             if (done() && !isSmResumptionPossible()) {
  1123.                 // Don't throw a NotConnectedException is there is an resumable stream available
  1124.                 throw new NotConnectedException();
  1125.             }
  1126.         }

  1127.         /**
  1128.          * Sends the specified element to the server.
  1129.          *
  1130.          * @param element the element to send.
  1131.          * @throws NotConnectedException
  1132.          * @throws InterruptedException
  1133.          */
  1134.         protected void sendStreamElement(Element element) throws NotConnectedException, InterruptedException {
  1135.             throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
  1136.             try {
  1137.                 queue.put(element);
  1138.             }
  1139.             catch (InterruptedException e) {
  1140.                 // put() may throw an InterruptedException for two reasons:
  1141.                 // 1. If the queue was shut down
  1142.                 // 2. If the thread was interrupted
  1143.                 // so we have to check which is the case
  1144.                 throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
  1145.                 // If the method above did not throw, then the sending thread was interrupted
  1146.                 throw e;
  1147.             }
  1148.         }

  1149.         /**
  1150.          * Shuts down the packet writer. Once this method has been called, no further
  1151.          * packets will be written to the server.
  1152.          */
  1153.         void shutdown(boolean instant) {
  1154.             instantShutdown = instant;
  1155.             shutdownTimestamp = System.currentTimeMillis();
  1156.             queue.shutdown();
  1157.             try {
  1158.                 shutdownDone.checkIfSuccessOrWait();
  1159.             }
  1160.             catch (NoResponseException e) {
  1161.                 LOGGER.log(Level.WARNING, "shutdownDone was not marked as successful by the writer thread", e);
  1162.             }
  1163.         }

  1164.         /**
  1165.          * Maybe return the next available element from the queue for writing. If the queue is shut down <b>or</b> a
  1166.          * spurious interrupt occurs, <code>null</code> is returned. So it is important to check the 'done' condition in
  1167.          * that case.
  1168.          *
  1169.          * @return the next element for writing or null.
  1170.          */
  1171.         private Element nextStreamElement() {
  1172.             // It is important the we check if the queue is empty before removing an element from it
  1173.             if (queue.isEmpty()) {
  1174.                 shouldBundleAndDefer = true;
  1175.             }
  1176.             Element packet = null;
  1177.             try {
  1178.                 packet = queue.take();
  1179.             }
  1180.             catch (InterruptedException e) {
  1181.                 if (!queue.isShutdown()) {
  1182.                     // Users shouldn't try to interrupt the packet writer thread
  1183.                     LOGGER.log(Level.WARNING, "Packet writer thread was interrupted. Don't do that. Use disconnect() instead.", e);
  1184.                 }
  1185.             }
  1186.             return packet;
  1187.         }

  1188.         private void writePackets() {
  1189.             try {
  1190.                 openStream();
  1191.                 initalOpenStreamSend.reportSuccess();
  1192.                 // Write out packets from the queue.
  1193.                 while (!done()) {
  1194.                     Element element = nextStreamElement();
  1195.                     if (element == null) {
  1196.                         continue;
  1197.                     }

  1198.                     // Get a local version of the bundle and defer callback, in case it's unset
  1199.                     // between the null check and the method invocation
  1200.                     final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback;
  1201.                     // If the preconditions are given (e.g. bundleAndDefer callback is set, queue is
  1202.                     // empty), then we could wait a bit for further stanzas attempting to decrease
  1203.                     // our energy consumption
  1204.                     if (localBundleAndDeferCallback != null && isAuthenticated() && shouldBundleAndDefer) {
  1205.                         // Reset shouldBundleAndDefer to false, nextStreamElement() will set it to true once the
  1206.                         // queue is empty again.
  1207.                         shouldBundleAndDefer = false;
  1208.                         final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean();
  1209.                         final int bundleAndDeferMillis = localBundleAndDeferCallback.getBundleAndDeferMillis(new BundleAndDefer(
  1210.                                         bundlingAndDeferringStopped));
  1211.                         if (bundleAndDeferMillis > 0) {
  1212.                             long remainingWait = bundleAndDeferMillis;
  1213.                             final long waitStart = System.currentTimeMillis();
  1214.                             synchronized (bundlingAndDeferringStopped) {
  1215.                                 while (!bundlingAndDeferringStopped.get() && remainingWait > 0) {
  1216.                                     bundlingAndDeferringStopped.wait(remainingWait);
  1217.                                     remainingWait = bundleAndDeferMillis
  1218.                                                     - (System.currentTimeMillis() - waitStart);
  1219.                                 }
  1220.                             }
  1221.                         }
  1222.                     }

  1223.                     Stanza packet = null;
  1224.                     if (element instanceof Stanza) {
  1225.                         packet = (Stanza) element;
  1226.                     }
  1227.                     else if (element instanceof Enable) {
  1228.                         // The client needs to add messages to the unacknowledged stanzas queue
  1229.                         // right after it sent 'enabled'. Stanza will be added once
  1230.                         // unacknowledgedStanzas is not null.
  1231.                         unacknowledgedStanzas = new ArrayBlockingQueue<>(QUEUE_SIZE);
  1232.                     }
  1233.                     // Check if the stream element should be put to the unacknowledgedStanza
  1234.                     // queue. Note that we can not do the put() in sendStanzaInternal() and the
  1235.                     // packet order is not stable at this point (sendStanzaInternal() can be
  1236.                     // called concurrently).
  1237.                     if (unacknowledgedStanzas != null && packet != null) {
  1238.                         // If the unacknowledgedStanza queue is nearly full, request an new ack
  1239.                         // from the server in order to drain it
  1240.                         if (unacknowledgedStanzas.size() == 0.8 * XMPPTCPConnection.QUEUE_SIZE) {
  1241.                             writer.write(AckRequest.INSTANCE.toXML().toString());
  1242.                             writer.flush();
  1243.                         }
  1244.                         try {
  1245.                             // It is important the we put the stanza in the unacknowledged stanza
  1246.                             // queue before we put it on the wire
  1247.                             unacknowledgedStanzas.put(packet);
  1248.                         }
  1249.                         catch (InterruptedException e) {
  1250.                             throw new IllegalStateException(e);
  1251.                         }
  1252.                     }
  1253.                     writer.write(element.toXML().toString());
  1254.                     if (queue.isEmpty()) {
  1255.                         writer.flush();
  1256.                     }
  1257.                     if (packet != null) {
  1258.                         firePacketSendingListeners(packet);
  1259.                     }
  1260.                 }
  1261.                 if (!instantShutdown) {
  1262.                     // Flush out the rest of the queue.
  1263.                     try {
  1264.                         while (!queue.isEmpty()) {
  1265.                             Element packet = queue.remove();
  1266.                             writer.write(packet.toXML().toString());
  1267.                         }
  1268.                         writer.flush();
  1269.                     }
  1270.                     catch (Exception e) {
  1271.                         LOGGER.log(Level.WARNING,
  1272.                                         "Exception flushing queue during shutdown, ignore and continue",
  1273.                                         e);
  1274.                     }

  1275.                     // Close the stream.
  1276.                     try {
  1277.                         writer.write("</stream:stream>");
  1278.                         writer.flush();
  1279.                     }
  1280.                     catch (Exception e) {
  1281.                         LOGGER.log(Level.WARNING, "Exception writing closing stream element", e);
  1282.                     }
  1283.                     // Delete the queue contents (hopefully nothing is left).
  1284.                     queue.clear();
  1285.                 } else if (instantShutdown && isSmEnabled()) {
  1286.                     // This was an instantShutdown and SM is enabled, drain all remaining stanzas
  1287.                     // into the unacknowledgedStanzas queue
  1288.                     drainWriterQueueToUnacknowledgedStanzas();
  1289.                 }

  1290.                 try {
  1291.                     writer.close();
  1292.                 }
  1293.                 catch (Exception e) {
  1294.                     // Do nothing
  1295.                 }

  1296.             }
  1297.             catch (Exception e) {
  1298.                 // The exception can be ignored if the the connection is 'done'
  1299.                 // or if the it was caused because the socket got closed
  1300.                 if (!(done() || isSocketClosed())) {
  1301.                     notifyConnectionError(e);
  1302.                 } else {
  1303.                     LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e);
  1304.                 }
  1305.             } finally {
  1306.                 LOGGER.fine("Reporting shutdownDone success in writer thread");
  1307.                 shutdownDone.reportSuccess();
  1308.             }
  1309.         }

  1310.         private void drainWriterQueueToUnacknowledgedStanzas() {
  1311.             List<Element> elements = new ArrayList<Element>(queue.size());
  1312.             queue.drainTo(elements);
  1313.             for (Element element : elements) {
  1314.                 if (element instanceof Stanza) {
  1315.                     unacknowledgedStanzas.add((Stanza) element);
  1316.                 }
  1317.             }
  1318.         }
  1319.     }

  1320.     /**
  1321.      * Set if Stream Management should be used by default for new connections.
  1322.      *
  1323.      * @param useSmDefault true to use Stream Management for new connections.
  1324.      */
  1325.     public static void setUseStreamManagementDefault(boolean useSmDefault) {
  1326.         XMPPTCPConnection.useSmDefault = useSmDefault;
  1327.     }

  1328.     /**
  1329.      * Set if Stream Management resumption should be used by default for new connections.
  1330.      *
  1331.      * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
  1332.      */
  1333.     public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) {
  1334.         if (useSmResumptionDefault) {
  1335.             // Also enable SM is resumption is enabled
  1336.             setUseStreamManagementDefault(useSmResumptionDefault);
  1337.         }
  1338.         XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault;
  1339.     }

  1340.     /**
  1341.      * Set if Stream Management should be used if supported by the server.
  1342.      *
  1343.      * @param useSm true to use Stream Management.
  1344.      */
  1345.     public void setUseStreamManagement(boolean useSm) {
  1346.         this.useSm = useSm;
  1347.     }

  1348.     /**
  1349.      * Set if Stream Management resumption should be used if supported by the server.
  1350.      *
  1351.      * @param useSmResumption true to use Stream Management resumption.
  1352.      */
  1353.     public void setUseStreamManagementResumption(boolean useSmResumption) {
  1354.         if (useSmResumption) {
  1355.             // Also enable SM is resumption is enabled
  1356.             setUseStreamManagement(useSmResumption);
  1357.         }
  1358.         this.useSmResumption = useSmResumption;
  1359.     }

  1360.     /**
  1361.      * Set the preferred resumption time in seconds.
  1362.      * @param resumptionTime the preferred resumption time in seconds
  1363.      */
  1364.     public void setPreferredResumptionTime(int resumptionTime) {
  1365.         smClientMaxResumptionTime = resumptionTime;
  1366.     }

  1367.     /**
  1368.      * Add a predicate for Stream Management acknowledgment requests.
  1369.      * <p>
  1370.      * Those predicates are used to determine when a Stream Management acknowledgement request is send to the server.
  1371.      * Some pre-defined predicates are found in the <code>org.jivesoftware.smack.sm.predicates</code> package.
  1372.      * </p>
  1373.      * <p>
  1374.      * If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used.
  1375.      * </p>
  1376.      *
  1377.      * @param predicate the predicate to add.
  1378.      * @return if the predicate was not already active.
  1379.      */
  1380.     public boolean addRequestAckPredicate(StanzaFilter predicate) {
  1381.         synchronized (requestAckPredicates) {
  1382.             return requestAckPredicates.add(predicate);
  1383.         }
  1384.     }

  1385.     /**
  1386.      * Remove the given predicate for Stream Management acknowledgment request.
  1387.      * @param predicate the predicate to remove.
  1388.      * @return true if the predicate was removed.
  1389.      */
  1390.     public boolean removeRequestAckPredicate(StanzaFilter predicate) {
  1391.         synchronized (requestAckPredicates) {
  1392.             return requestAckPredicates.remove(predicate);
  1393.         }
  1394.     }

  1395.     /**
  1396.      * Remove all predicates for Stream Management acknowledgment requests.
  1397.      */
  1398.     public void removeAllRequestAckPredicates() {
  1399.         synchronized (requestAckPredicates) {
  1400.             requestAckPredicates.clear();
  1401.         }
  1402.     }

  1403.     /**
  1404.      * Send an unconditional Stream Management acknowledgement request to the server.
  1405.      *
  1406.      * @throws StreamManagementNotEnabledException if Stream Mangement is not enabled.
  1407.      * @throws NotConnectedException if the connection is not connected.
  1408.      * @throws InterruptedException
  1409.      */
  1410.     public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException {
  1411.         if (!isSmEnabled()) {
  1412.             throw new StreamManagementException.StreamManagementNotEnabledException();
  1413.         }
  1414.         requestSmAcknowledgementInternal();
  1415.     }

  1416.     private void requestSmAcknowledgementInternal() throws NotConnectedException, InterruptedException {
  1417.         packetWriter.sendStreamElement(AckRequest.INSTANCE);
  1418.     }

  1419.     /**
  1420.      * Send a unconditional Stream Management acknowledgment to the server.
  1421.      * <p>
  1422.      * See <a href="http://xmpp.org/extensions/xep-0198.html#acking">XEP-198: Stream Management ยง 4. Acks</a>:
  1423.      * "Either party MAY send an <a/> element at any time (e.g., after it has received a certain number of stanzas,
  1424.      * or after a certain period of time), even if it has not received an <r/> element from the other party."
  1425.      * </p>
  1426.      *
  1427.      * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
  1428.      * @throws NotConnectedException if the connection is not connected.
  1429.      * @throws InterruptedException
  1430.      */
  1431.     public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException {
  1432.         if (!isSmEnabled()) {
  1433.             throw new StreamManagementException.StreamManagementNotEnabledException();
  1434.         }
  1435.         sendSmAcknowledgementInternal();
  1436.     }

  1437.     private void sendSmAcknowledgementInternal() throws NotConnectedException, InterruptedException {
  1438.         packetWriter.sendStreamElement(new AckAnswer(clientHandledStanzasCount));
  1439.     }

  1440.     /**
  1441.      * Add a Stanza acknowledged listener.
  1442.      * <p>
  1443.      * Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get
  1444.      * automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, StanzaListener)} when
  1445.      * possible.
  1446.      * </p>
  1447.      *
  1448.      * @param listener the listener to add.
  1449.      */
  1450.     public void addStanzaAcknowledgedListener(StanzaListener listener) {
  1451.         stanzaAcknowledgedListeners.add(listener);
  1452.     }

  1453.     /**
  1454.      * Remove the given Stanza acknowledged listener.
  1455.      *
  1456.      * @param listener the listener.
  1457.      * @return true if the listener was removed.
  1458.      */
  1459.     public boolean removeStanzaAcknowledgedListener(StanzaListener listener) {
  1460.         return stanzaAcknowledgedListeners.remove(listener);
  1461.     }

  1462.     /**
  1463.      * Remove all stanza acknowledged listeners.
  1464.      */
  1465.     public void removeAllStanzaAcknowledgedListeners() {
  1466.         stanzaAcknowledgedListeners.clear();
  1467.     }

  1468.     /**
  1469.      * Add a new Stanza ID acknowledged listener for the given ID.
  1470.      * <p>
  1471.      * The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will
  1472.      * automatically be removed after the listener was run.
  1473.      * </p>
  1474.      *
  1475.      * @param id the stanza ID.
  1476.      * @param listener the listener to invoke.
  1477.      * @return the previous listener for this stanza ID or null.
  1478.      * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
  1479.      */
  1480.     public StanzaListener addStanzaIdAcknowledgedListener(final String id, StanzaListener listener) throws StreamManagementNotEnabledException {
  1481.         // Prevent users from adding callbacks that will never get removed
  1482.         if (!smWasEnabledAtLeastOnce) {
  1483.             throw new StreamManagementException.StreamManagementNotEnabledException();
  1484.         }
  1485.         // Remove the listener after max. 12 hours
  1486.         final int removeAfterSeconds = Math.min(getMaxSmResumptionTime() + 60, 12 * 60 * 60);
  1487.         schedule(new Runnable() {
  1488.             @Override
  1489.             public void run() {
  1490.                 stanzaIdAcknowledgedListeners.remove(id);
  1491.             }
  1492.         }, removeAfterSeconds, TimeUnit.SECONDS);
  1493.         return stanzaIdAcknowledgedListeners.put(id, listener);
  1494.     }

  1495.     /**
  1496.      * Remove the Stanza ID acknowledged listener for the given ID.
  1497.      *
  1498.      * @param id the stanza ID.
  1499.      * @return true if the listener was found and removed, false otherwise.
  1500.      */
  1501.     public StanzaListener removeStanzaIdAcknowledgedListener(String id) {
  1502.         return stanzaIdAcknowledgedListeners.remove(id);
  1503.     }

  1504.     /**
  1505.      * Removes all Stanza ID acknowledged listeners.
  1506.      */
  1507.     public void removeAllStanzaIdAcknowledgedListeners() {
  1508.         stanzaIdAcknowledgedListeners.clear();
  1509.     }

  1510.     /**
  1511.      * Returns true if Stream Management is supported by the server.
  1512.      *
  1513.      * @return true if Stream Management is supported by the server.
  1514.      */
  1515.     public boolean isSmAvailable() {
  1516.         return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE);
  1517.     }

  1518.     /**
  1519.      * Returns true if Stream Management was successfully negotiated with the server.
  1520.      *
  1521.      * @return true if Stream Management was negotiated.
  1522.      */
  1523.     public boolean isSmEnabled() {
  1524.         return smEnabledSyncPoint.wasSuccessful();
  1525.     }

  1526.     /**
  1527.      * Returns true if the stream was successfully resumed with help of Stream Management.
  1528.      *
  1529.      * @return true if the stream was resumed.
  1530.      */
  1531.     public boolean streamWasResumed() {
  1532.         return smResumedSyncPoint.wasSuccessful();
  1533.     }

  1534.     /**
  1535.      * Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible.
  1536.      *
  1537.      * @return true if disconnected but resumption possible.
  1538.      */
  1539.     public boolean isDisconnectedButSmResumptionPossible() {
  1540.         return disconnectedButResumeable && isSmResumptionPossible();
  1541.     }

  1542.     /**
  1543.      * Returns true if the stream is resumable.
  1544.      *
  1545.      * @return true if the stream is resumable.
  1546.      */
  1547.     public boolean isSmResumptionPossible() {
  1548.         // There is no resumable stream available
  1549.         if (smSessionId == null)
  1550.             return false;

  1551.         final Long shutdownTimestamp = packetWriter.shutdownTimestamp;
  1552.         // Seems like we are already reconnected, report true
  1553.         if (shutdownTimestamp == null) {
  1554.             return true;
  1555.         }

  1556.         // See if resumption time is over
  1557.         long current = System.currentTimeMillis();
  1558.         long maxResumptionMillies = getMaxSmResumptionTime() * 1000;
  1559.         if (shutdownTimestamp + maxResumptionMillies > current) {
  1560.             return false;
  1561.         } else {
  1562.             return true;
  1563.         }
  1564.     }

  1565.     /**
  1566.      * Get the maximum resumption time in seconds after which a managed stream can be resumed.
  1567.      *
  1568.      * @return the maximum resumption time in seconds.
  1569.      */
  1570.     public int getMaxSmResumptionTime() {
  1571.         int clientResumptionTime = smClientMaxResumptionTime > 0 ? smClientMaxResumptionTime : Integer.MAX_VALUE;
  1572.         int serverResumptionTime = smServerMaxResumptimTime > 0 ? smServerMaxResumptimTime : Integer.MAX_VALUE;
  1573.         return Math.min(clientResumptionTime, serverResumptionTime);
  1574.     }

  1575.     private void processHandledCount(long handledCount) throws NotConnectedException, StreamManagementCounterError {
  1576.         long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount);
  1577.         final List<Stanza> ackedStanzas = new ArrayList<Stanza>(
  1578.                         handledCount <= Integer.MAX_VALUE ? (int) handledCount
  1579.                                         : Integer.MAX_VALUE);
  1580.         for (long i = 0; i < ackedStanzasCount; i++) {
  1581.             Stanza ackedStanza = unacknowledgedStanzas.poll();
  1582.             // If the server ack'ed a stanza, then it must be in the
  1583.             // unacknowledged stanza queue. There can be no exception.
  1584.             if (ackedStanza == null) {
  1585.                 throw new StreamManagementCounterError(handledCount, serverHandledStanzasCount,
  1586.                                 ackedStanzasCount, ackedStanzas);
  1587.             }
  1588.             ackedStanzas.add(ackedStanza);
  1589.         }

  1590.         boolean atLeastOneStanzaAcknowledgedListener = false;
  1591.         if (!stanzaAcknowledgedListeners.isEmpty()) {
  1592.             // If stanzaAcknowledgedListeners is not empty, the we have at least one
  1593.             atLeastOneStanzaAcknowledgedListener = true;
  1594.         }
  1595.         else {
  1596.             // Otherwise we look for a matching id in the stanza *id* acknowledged listeners
  1597.             for (Stanza ackedStanza : ackedStanzas) {
  1598.                 String id = ackedStanza.getStanzaId();
  1599.                 if (id != null && stanzaIdAcknowledgedListeners.containsKey(id)) {
  1600.                     atLeastOneStanzaAcknowledgedListener = true;
  1601.                     break;
  1602.                 }
  1603.             }
  1604.         }

  1605.         // Only spawn a new thread if there is a chance that some listener is invoked
  1606.         if (atLeastOneStanzaAcknowledgedListener) {
  1607.             asyncGo(new Runnable() {
  1608.                 @Override
  1609.                 public void run() {
  1610.                     for (Stanza ackedStanza : ackedStanzas) {
  1611.                         for (StanzaListener listener : stanzaAcknowledgedListeners) {
  1612.                             try {
  1613.                                 listener.processPacket(ackedStanza);
  1614.                             }
  1615.                             catch (InterruptedException | NotConnectedException e) {
  1616.                                 LOGGER.log(Level.FINER, "Received exception", e);
  1617.                             }
  1618.                         }
  1619.                         String id = ackedStanza.getStanzaId();
  1620.                         if (StringUtils.isNullOrEmpty(id)) {
  1621.                             continue;
  1622.                         }
  1623.                         StanzaListener listener = stanzaIdAcknowledgedListeners.remove(id);
  1624.                         if (listener != null) {
  1625.                             try {
  1626.                                 listener.processPacket(ackedStanza);
  1627.                             }
  1628.                             catch (InterruptedException | NotConnectedException e) {
  1629.                                 LOGGER.log(Level.FINER, "Received exception", e);
  1630.                             }
  1631.                         }
  1632.                     }
  1633.                 }
  1634.             });
  1635.         }

  1636.         serverHandledStanzasCount = handledCount;
  1637.     }

  1638.     /**
  1639.      * Set the default bundle and defer callback used for new connections.
  1640.      *
  1641.      * @param defaultBundleAndDeferCallback
  1642.      * @see BundleAndDeferCallback
  1643.      * @since 4.1
  1644.      */
  1645.     public static void setDefaultBundleAndDeferCallback(BundleAndDeferCallback defaultBundleAndDeferCallback) {
  1646.         XMPPTCPConnection.defaultBundleAndDeferCallback = defaultBundleAndDeferCallback;
  1647.     }

  1648.     /**
  1649.      * Set the bundle and defer callback used for this connection.
  1650.      * <p>
  1651.      * You can use <code>null</code> as argument to reset the callback. Outgoing stanzas will then
  1652.      * no longer get deferred.
  1653.      * </p>
  1654.      *
  1655.      * @param bundleAndDeferCallback the callback or <code>null</code>.
  1656.      * @see BundleAndDeferCallback
  1657.      * @since 4.1
  1658.      */
  1659.     public void setBundleandDeferCallback(BundleAndDeferCallback bundleAndDeferCallback) {
  1660.         this.bundleAndDeferCallback = bundleAndDeferCallback;
  1661.     }

  1662. }