001/** 002 * 003 * Copyright 2003-2007 Jive Software. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack.tcp; 018 019import java.io.BufferedReader; 020import java.io.IOException; 021import java.io.InputStream; 022import java.io.InputStreamReader; 023import java.io.OutputStream; 024import java.io.OutputStreamWriter; 025import java.io.Writer; 026import java.net.InetAddress; 027import java.net.InetSocketAddress; 028import java.net.Socket; 029import java.security.cert.CertificateException; 030import java.util.ArrayList; 031import java.util.Collection; 032import java.util.Iterator; 033import java.util.LinkedHashSet; 034import java.util.LinkedList; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038import java.util.concurrent.ArrayBlockingQueue; 039import java.util.concurrent.BlockingQueue; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.ConcurrentLinkedQueue; 042import java.util.concurrent.TimeUnit; 043import java.util.concurrent.atomic.AtomicBoolean; 044import java.util.logging.Level; 045import java.util.logging.Logger; 046 047import javax.net.SocketFactory; 048import javax.net.ssl.HostnameVerifier; 049import javax.net.ssl.SSLSession; 050import javax.net.ssl.SSLSocket; 051import javax.net.ssl.SSLSocketFactory; 052 053import org.jivesoftware.smack.AbstractXMPPConnection; 054import org.jivesoftware.smack.ConnectionConfiguration; 055import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode; 056import org.jivesoftware.smack.ConnectionListener; 057import org.jivesoftware.smack.SmackConfiguration; 058import org.jivesoftware.smack.SmackException; 059import org.jivesoftware.smack.SmackException.AlreadyConnectedException; 060import org.jivesoftware.smack.SmackException.AlreadyLoggedInException; 061import org.jivesoftware.smack.SmackException.ConnectionException; 062import org.jivesoftware.smack.SmackException.EndpointConnectionException; 063import org.jivesoftware.smack.SmackException.NotConnectedException; 064import org.jivesoftware.smack.SmackException.NotLoggedInException; 065import org.jivesoftware.smack.SmackException.SecurityNotPossibleException; 066import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException; 067import org.jivesoftware.smack.SmackFuture; 068import org.jivesoftware.smack.StanzaListener; 069import org.jivesoftware.smack.XMPPConnection; 070import org.jivesoftware.smack.XMPPException; 071import org.jivesoftware.smack.XMPPException.FailedNonzaException; 072import org.jivesoftware.smack.XMPPException.StreamErrorException; 073import org.jivesoftware.smack.compress.packet.Compress; 074import org.jivesoftware.smack.compress.packet.Compressed; 075import org.jivesoftware.smack.compression.XMPPInputOutputStream; 076import org.jivesoftware.smack.datatypes.UInt16; 077import org.jivesoftware.smack.filter.StanzaFilter; 078import org.jivesoftware.smack.internal.SmackTlsContext; 079import org.jivesoftware.smack.packet.Element; 080import org.jivesoftware.smack.packet.IQ; 081import org.jivesoftware.smack.packet.Message; 082import org.jivesoftware.smack.packet.Nonza; 083import org.jivesoftware.smack.packet.Presence; 084import org.jivesoftware.smack.packet.Stanza; 085import org.jivesoftware.smack.packet.StartTls; 086import org.jivesoftware.smack.packet.StreamError; 087import org.jivesoftware.smack.packet.StreamOpen; 088import org.jivesoftware.smack.proxy.ProxyInfo; 089import org.jivesoftware.smack.sasl.packet.SaslNonza; 090import org.jivesoftware.smack.sm.SMUtils; 091import org.jivesoftware.smack.sm.StreamManagementException; 092import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException; 093import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError; 094import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException; 095import org.jivesoftware.smack.sm.packet.StreamManagement; 096import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer; 097import org.jivesoftware.smack.sm.packet.StreamManagement.AckRequest; 098import org.jivesoftware.smack.sm.packet.StreamManagement.Enable; 099import org.jivesoftware.smack.sm.packet.StreamManagement.Enabled; 100import org.jivesoftware.smack.sm.packet.StreamManagement.Failed; 101import org.jivesoftware.smack.sm.packet.StreamManagement.Resume; 102import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed; 103import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature; 104import org.jivesoftware.smack.sm.predicates.Predicate; 105import org.jivesoftware.smack.sm.provider.ParseStreamManagement; 106import org.jivesoftware.smack.tcp.rce.RemoteXmppTcpConnectionEndpoints; 107import org.jivesoftware.smack.tcp.rce.Rfc6120TcpRemoteConnectionEndpoint; 108import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown; 109import org.jivesoftware.smack.util.Async; 110import org.jivesoftware.smack.util.CloseableUtil; 111import org.jivesoftware.smack.util.PacketParserUtils; 112import org.jivesoftware.smack.util.StringUtils; 113import org.jivesoftware.smack.util.TLSUtils; 114import org.jivesoftware.smack.util.XmlStringBuilder; 115import org.jivesoftware.smack.util.rce.RemoteConnectionException; 116import org.jivesoftware.smack.xml.SmackXmlParser; 117import org.jivesoftware.smack.xml.XmlPullParser; 118import org.jivesoftware.smack.xml.XmlPullParserException; 119 120import org.jxmpp.jid.impl.JidCreate; 121import org.jxmpp.jid.parts.Resourcepart; 122import org.jxmpp.stringprep.XmppStringprepException; 123import org.minidns.dnsname.DnsName; 124 125/** 126 * Creates a socket connection to an XMPP server. This is the default connection 127 * to an XMPP server and is specified in the XMPP Core (RFC 6120). 128 * 129 * @see XMPPConnection 130 * @author Matt Tucker 131 */ 132public class XMPPTCPConnection extends AbstractXMPPConnection { 133 134 private static final int QUEUE_SIZE = 500; 135 private static final Logger LOGGER = Logger.getLogger(XMPPTCPConnection.class.getName()); 136 137 /** 138 * The socket which is used for this connection. 139 */ 140 private Socket socket; 141 142 /** 143 * 144 */ 145 private boolean disconnectedButResumeable = false; 146 147 private SSLSocket secureSocket; 148 149 /** 150 * Protected access level because of unit test purposes 151 */ 152 protected final PacketWriter packetWriter = new PacketWriter(); 153 154 /** 155 * Protected access level because of unit test purposes 156 */ 157 protected final PacketReader packetReader = new PacketReader(); 158 159 /** 160 * 161 */ 162 private boolean streamFeaturesAfterAuthenticationReceived; 163 164 /** 165 * 166 */ 167 private boolean compressSyncPoint; 168 169 /** 170 * The default bundle and defer callback, used for new connections. 171 * @see bundleAndDeferCallback 172 */ 173 private static BundleAndDeferCallback defaultBundleAndDeferCallback; 174 175 /** 176 * The used bundle and defer callback. 177 * <p> 178 * Although this field may be set concurrently, the 'volatile' keyword was deliberately not added, in order to avoid 179 * having a 'volatile' read within the writer threads loop. 180 * </p> 181 */ 182 private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback; 183 184 private static boolean useSmDefault = true; 185 186 private static boolean useSmResumptionDefault = true; 187 188 /** 189 * The stream ID of the stream that is currently resumable, ie. the stream we hold the state 190 * for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and 191 * {@link #unacknowledgedStanzas}. 192 */ 193 private String smSessionId; 194 195 /** 196 * Represents the state of stream management resumption. 197 * <p> 198 * Unlike other sync points, this sync point is marked volatile because it is also read by the reader thread. 199 * </p> 200 */ 201 private volatile SyncPointState smResumedSyncPoint; 202 private Failed smResumptionFailed; 203 204 /** 205 * Represents the state of stream magement. 206 * <p> 207 * This boolean is marked volatile as it is read by various threads, including the reader thread via {@link #isSmEnabled()}. 208 * </p> 209 */ 210 private volatile boolean smEnabledSyncPoint; 211 212 /** 213 * The client's preferred maximum resumption time in seconds. 214 */ 215 private int smClientMaxResumptionTime = -1; 216 217 /** 218 * The server's preferred maximum resumption time in seconds. 219 */ 220 private int smServerMaxResumptionTime = -1; 221 222 /** 223 * Indicates whether Stream Management (XEP-198) should be used if it's supported by the server. 224 */ 225 private boolean useSm = useSmDefault; 226 private boolean useSmResumption = useSmResumptionDefault; 227 228 /** 229 * The counter that the server sends the client about it's current height. For example, if the server sends 230 * {@code <a h='42'/>}, then this will be set to 42 (while also handling the {@link #unacknowledgedStanzas} queue). 231 */ 232 private long serverHandledStanzasCount = 0; 233 234 /** 235 * The counter for stanzas handled ("received") by the client. 236 * <p> 237 * Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are 238 * not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since 239 * {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and 240 * therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic. 241 * </p> 242 */ 243 private long clientHandledStanzasCount = 0; 244 245 private BlockingQueue<Stanza> unacknowledgedStanzas; 246 247 /** 248 * Set to true if Stream Management was at least once enabled for this connection. 249 */ 250 private boolean smWasEnabledAtLeastOnce = false; 251 252 /** 253 * This listeners are invoked for every stanza that got acknowledged. 254 * <p> 255 * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove 256 * themselves after they have been invoked. 257 * </p> 258 */ 259 private final Collection<StanzaListener> stanzaAcknowledgedListeners = new ConcurrentLinkedQueue<>(); 260 261 /** 262 * These listeners are invoked for every stanza that got dropped. 263 * <p> 264 * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove 265 * themselves after they have been invoked. 266 * </p> 267 */ 268 private final Collection<StanzaListener> stanzaDroppedListeners = new ConcurrentLinkedQueue<>(); 269 270 /** 271 * This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will 272 * only be invoked once and automatically removed after that. 273 */ 274 private final Map<String, StanzaListener> stanzaIdAcknowledgedListeners = new ConcurrentHashMap<>(); 275 276 /** 277 * Predicates that determine if an stream management ack should be requested from the server. 278 * <p> 279 * We use a linked hash set here, so that the order how the predicates are added matches the 280 * order in which they are invoked in order to determine if an ack request should be send or not. 281 * </p> 282 */ 283 private final Set<StanzaFilter> requestAckPredicates = new LinkedHashSet<>(); 284 285 @SuppressWarnings("HidingField") 286 private final XMPPTCPConnectionConfiguration config; 287 288 /** 289 * Creates a new XMPP connection over TCP (optionally using proxies). 290 * <p> 291 * Note that XMPPTCPConnection constructors do not establish a connection to the server 292 * and you must call {@link #connect()}. 293 * </p> 294 * 295 * @param config the connection configuration. 296 */ 297 public XMPPTCPConnection(XMPPTCPConnectionConfiguration config) { 298 super(config); 299 this.config = config; 300 addConnectionListener(new ConnectionListener() { 301 @Override 302 public void connectionClosedOnError(Exception e) { 303 if (e instanceof XMPPException.StreamErrorException || e instanceof StreamManagementException) { 304 dropSmState(); 305 } 306 } 307 }); 308 309 // Re-init the reader and writer in case of SASL <success/>. This is done to reset the parser since a new stream 310 // is initiated. 311 buildNonzaCallback().listenFor(SaslNonza.Success.class, s -> resetParser()).install(); 312 } 313 314 /** 315 * Creates a new XMPP connection over TCP. 316 * <p> 317 * Note that {@code jid} must be the bare JID, e.g. "user@example.org". More fine-grained control over the 318 * connection settings is available using the {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} 319 * constructor. 320 * </p> 321 * 322 * @param jid the bare JID used by the client. 323 * @param password the password or authentication token. 324 * @throws XmppStringprepException if the provided string is invalid. 325 */ 326 public XMPPTCPConnection(CharSequence jid, String password) throws XmppStringprepException { 327 this(XMPPTCPConnectionConfiguration.builder().setXmppAddressAndPassword(jid, password).build()); 328 } 329 330 /** 331 * Creates a new XMPP connection over TCP. 332 * <p> 333 * This is the simplest constructor for connecting to an XMPP server. Alternatively, 334 * you can get fine-grained control over connection settings using the 335 * {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} constructor. 336 * </p> 337 * @param username TODO javadoc me please 338 * @param password TODO javadoc me please 339 * @param serviceName TODO javadoc me please 340 * @throws XmppStringprepException if the provided string is invalid. 341 */ 342 public XMPPTCPConnection(CharSequence username, String password, String serviceName) throws XmppStringprepException { 343 this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setXmppDomain( 344 JidCreate.domainBareFrom(serviceName)).build()); 345 } 346 347 @Override 348 protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException { 349 if (packetWriter == null) { 350 throw new NotConnectedException(); 351 } 352 packetWriter.throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 353 } 354 355 @Override 356 protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException { 357 if (isConnected() && !disconnectedButResumeable) { 358 throw new AlreadyConnectedException(); 359 } 360 } 361 362 @Override 363 protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException { 364 if (isAuthenticated() && !disconnectedButResumeable) { 365 throw new AlreadyLoggedInException(); 366 } 367 } 368 369 @Override 370 protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException { 371 // Reset the flag in case it was set 372 disconnectedButResumeable = false; 373 super.afterSuccessfulLogin(resumed); 374 } 375 376 @Override 377 protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException, 378 SmackException, IOException, InterruptedException { 379 // Authenticate using SASL 380 SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null; 381 382 streamFeaturesAfterAuthenticationReceived = false; 383 authenticate(username, password, config.getAuthzid(), sslSession); 384 385 // Wait for stream features after the authentication. 386 // TODO: The name of this synchronization point "maybeCompressFeaturesReceived" is not perfect. It should be 387 // renamed to "streamFeaturesAfterAuthenticationReceived". 388 waitForConditionOrThrowConnectionException(() -> streamFeaturesAfterAuthenticationReceived, "compress features from server"); 389 390 // If compression is enabled then request the server to use stream compression. XEP-170 391 // recommends to perform stream compression before resource binding. 392 maybeEnableCompression(); 393 394 smResumedSyncPoint = SyncPointState.initial; 395 smResumptionFailed = null; 396 if (isSmResumptionPossible()) { 397 smResumedSyncPoint = SyncPointState.request_sent; 398 sendNonza(new Resume(clientHandledStanzasCount, smSessionId)); 399 waitForConditionOrThrowConnectionException(() -> smResumedSyncPoint == SyncPointState.successful || smResumptionFailed != null, "resume previous stream"); 400 if (smResumedSyncPoint == SyncPointState.successful) { 401 // We successfully resumed the stream, be done here 402 afterSuccessfulLogin(true); 403 return; 404 } 405 // SM resumption failed, what Smack does here is to report success of 406 // lastFeaturesReceived in case of sm resumption was answered with 'failed' so that 407 // normal resource binding can be tried. 408 assert smResumptionFailed != null; 409 LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process: " + smResumptionFailed); 410 } 411 412 // We either failed to resume a previous stream management (SM) session, or we did not even try. In any case, 413 // mark SM as not enabled. Most importantly, we do this prior calling bindResourceAndEstablishSession(), as the 414 // bind IQ may trigger a SM ack request, which would be invalid in the pre resource bound state. 415 smEnabledSyncPoint = false; 416 417 List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>(); 418 if (unacknowledgedStanzas != null) { 419 // There was a previous connection with SM enabled but that was either not resumable or 420 // failed to resume. Make sure that we (re-)send the unacknowledged stanzas. 421 unacknowledgedStanzas.drainTo(previouslyUnackedStanzas); 422 // Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this 423 // XMPP session (There maybe was an enabled in a previous XMPP session of this 424 // connection instance though). This is used in writePackets to decide if stanzas should 425 // be added to the unacknowledged stanzas queue, because they have to be added right 426 // after the 'enable' stream element has been sent. 427 dropSmState(); 428 } 429 430 // Now bind the resource. It is important to do this *after* we dropped an eventually 431 // existing Stream Management state. As otherwise <bind/> and <session/> may end up in 432 // unacknowledgedStanzas and become duplicated on reconnect. See SMACK-706. 433 bindResourceAndEstablishSession(resource); 434 435 if (isSmAvailable() && useSm) { 436 // Remove what is maybe left from previously stream managed sessions 437 serverHandledStanzasCount = 0; 438 sendNonza(new Enable(useSmResumption, smClientMaxResumptionTime)); 439 // XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed' 440 // then this is a non recoverable error and we therefore throw an exception. 441 waitForConditionOrThrowConnectionException(() -> smEnabledSyncPoint, "enabling stream mangement"); 442 synchronized (requestAckPredicates) { 443 if (requestAckPredicates.isEmpty()) { 444 // Assure that we have at lest one predicate set up that so that we request acks 445 // for the server and eventually flush some stanzas from the unacknowledged 446 // stanza queue 447 requestAckPredicates.add(Predicate.forMessagesOrAfter5Stanzas()); 448 } 449 } 450 } 451 // Inform client about failed resumption if possible, resend stanzas otherwise 452 // Process the stanzas synchronously so a client can re-queue them for transmission 453 // before it is informed about connection success 454 if (!stanzaDroppedListeners.isEmpty()) { 455 for (Stanza stanza : previouslyUnackedStanzas) { 456 for (StanzaListener listener : stanzaDroppedListeners) { 457 try { 458 listener.processStanza(stanza); 459 } 460 catch (InterruptedException | NotConnectedException | NotLoggedInException e) { 461 LOGGER.log(Level.FINER, "StanzaDroppedListener received exception", e); 462 } 463 } 464 } 465 } else { 466 for (Stanza stanza : previouslyUnackedStanzas) { 467 sendStanzaInternal(stanza); 468 } 469 } 470 471 afterSuccessfulLogin(false); 472 } 473 474 @Override 475 public boolean isSecureConnection() { 476 return secureSocket != null; 477 } 478 479 /** 480 * Shuts the current connection down. After this method returns, the connection must be ready 481 * for re-use by connect. 482 */ 483 @Override 484 protected void shutdown() { 485 if (isSmEnabled()) { 486 try { 487 // Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM 488 // state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either. 489 sendSmAcknowledgementInternal(); 490 } catch (InterruptedException | NotConnectedException e) { 491 LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e); 492 } 493 } 494 shutdown(false); 495 } 496 497 @Override 498 public synchronized void instantShutdown() { 499 shutdown(true); 500 } 501 502 private void shutdown(boolean instant) { 503 // The writer thread may already been finished at this point, for example when the connection is in the 504 // disconnected-but-resumable state. There is no need to wait for the closing stream tag from the server in this 505 // case. 506 if (!packetWriter.done()) { 507 // First shutdown the writer, this will result in a closing stream element getting send to 508 // the server 509 LOGGER.finer(packetWriter.threadName + " shutdown()"); 510 packetWriter.shutdown(instant); 511 LOGGER.finer(packetWriter.threadName + " shutdown() returned"); 512 513 if (!instant) { 514 waitForClosingStreamTagFromServer(); 515 } 516 } 517 518 LOGGER.finer(packetReader.threadName + " shutdown()"); 519 packetReader.shutdown(); 520 LOGGER.finer(packetReader.threadName + " shutdown() returned"); 521 522 CloseableUtil.maybeClose(socket, LOGGER); 523 524 setWasAuthenticated(); 525 526 try { 527 boolean readerAndWriterThreadsTermianted = waitFor(() -> !packetWriter.running && !packetReader.running); 528 if (!readerAndWriterThreadsTermianted) { 529 LOGGER.severe("Reader and/or writer threads did not terminate timely. Writer running: " 530 + packetWriter.running + ", Reader running: " + packetReader.running); 531 } else { 532 LOGGER.fine("Reader and writer threads terminated"); 533 } 534 } catch (InterruptedException e) { 535 LOGGER.log(Level.FINE, "Interrupted while waiting for reader and writer threads to terminate", e); 536 } 537 538 if (disconnectedButResumeable) { 539 return; 540 } 541 542 // If we are able to resume the stream, then don't set 543 // connected/authenticated/usingTLS to false since we like to behave like we are still 544 // connected (e.g. sendStanza should not throw a NotConnectedException). 545 if (instant) { 546 disconnectedButResumeable = isSmResumptionPossible(); 547 if (!disconnectedButResumeable) { 548 // Reset the stream management session id to null, since the stream is no longer resumable. Note that we 549 // keep the unacknowledgedStanzas queue, because we want to resend them when we are reconnected. 550 smSessionId = null; 551 } 552 } else { 553 disconnectedButResumeable = false; 554 555 // Drop the stream management state if this is not an instant shutdown. We send 556 // a </stream> close tag and now the stream management state is no longer valid. 557 // This also prevents that we will potentially (re-)send any unavailable presence we 558 // may have send, because it got put into the unacknowledged queue and was not acknowledged before the 559 // connection terminated. 560 dropSmState(); 561 // Note that we deliberately do not reset authenticatedConnectionInitiallyEstablishedTimestamp here, so that the 562 // information is available in the connectionClosedOnError() listeners. 563 } 564 authenticated = false; 565 connected = false; 566 secureSocket = null; 567 reader = null; 568 writer = null; 569 570 initState(); 571 } 572 573 @Override 574 public void sendNonza(Nonza element) throws NotConnectedException, InterruptedException { 575 packetWriter.sendStreamElement(element); 576 } 577 578 @Override 579 protected void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException { 580 packetWriter.sendStreamElement(packet); 581 if (isSmEnabled()) { 582 for (StanzaFilter requestAckPredicate : requestAckPredicates) { 583 if (requestAckPredicate.accept(packet)) { 584 requestSmAcknowledgementInternal(); 585 break; 586 } 587 } 588 } 589 } 590 591 private void connectUsingConfiguration() throws ConnectionException, IOException, InterruptedException { 592 RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result = RemoteXmppTcpConnectionEndpoints.lookup(config); 593 594 List<RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint>> connectionExceptions = new ArrayList<>(); 595 596 SocketFactory socketFactory = config.getSocketFactory(); 597 ProxyInfo proxyInfo = config.getProxyInfo(); 598 int timeout = config.getConnectTimeout(); 599 if (socketFactory == null) { 600 socketFactory = SocketFactory.getDefault(); 601 } 602 for (Rfc6120TcpRemoteConnectionEndpoint endpoint : result.discoveredRemoteConnectionEndpoints) { 603 Iterator<? extends InetAddress> inetAddresses; 604 String host = endpoint.getHost().toString(); 605 UInt16 portUint16 = endpoint.getPort(); 606 int port = portUint16.intValue(); 607 if (proxyInfo == null) { 608 inetAddresses = endpoint.getInetAddresses().iterator(); 609 assert inetAddresses.hasNext(); 610 611 innerloop: while (inetAddresses.hasNext()) { 612 // Create a *new* Socket before every connection attempt, i.e. connect() call, since Sockets are not 613 // re-usable after a failed connection attempt. See also SMACK-724. 614 SmackFuture.SocketFuture socketFuture = new SmackFuture.SocketFuture(socketFactory); 615 616 final InetAddress inetAddress = inetAddresses.next(); 617 final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, port); 618 LOGGER.finer("Trying to establish TCP connection to " + inetSocketAddress); 619 socketFuture.connectAsync(inetSocketAddress, timeout); 620 621 try { 622 socket = socketFuture.getOrThrow(); 623 } catch (IOException e) { 624 RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>( 625 endpoint, inetAddress, e); 626 connectionExceptions.add(rce); 627 if (inetAddresses.hasNext()) { 628 continue innerloop; 629 } else { 630 break innerloop; 631 } 632 } 633 LOGGER.finer("Established TCP connection to " + inetSocketAddress); 634 // We found a host to connect to, return here 635 this.host = host; 636 this.port = portUint16; 637 return; 638 } 639 } else { 640 // TODO: Move this into the inner-loop above. There appears no reason why we should not try a proxy 641 // connection to every inet address of each connection endpoint. 642 socket = socketFactory.createSocket(); 643 StringUtils.requireNotNullNorEmpty(host, "Host of endpoint " + endpoint + " must not be null when using a Proxy"); 644 final String hostAndPort = host + " at port " + port; 645 LOGGER.finer("Trying to establish TCP connection via Proxy to " + hostAndPort); 646 try { 647 proxyInfo.getProxySocketConnection().connect(socket, host, port, timeout); 648 } catch (IOException e) { 649 CloseableUtil.maybeClose(socket, LOGGER); 650 RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>(endpoint, null, e); 651 connectionExceptions.add(rce); 652 continue; 653 } 654 LOGGER.finer("Established TCP connection to " + hostAndPort); 655 // We found a host to connect to, return here 656 this.host = host; 657 this.port = portUint16; 658 return; 659 } 660 } 661 662 // There are no more host addresses to try 663 // throw an exception and report all tried 664 // HostAddresses in the exception 665 throw EndpointConnectionException.from(result.lookupFailures, connectionExceptions); 666 } 667 668 /** 669 * Initializes the connection by creating a stanza reader and writer and opening a 670 * XMPP stream to the server. 671 * 672 * @throws IOException if an I/O error occurred. 673 * @throws InterruptedException if the calling thread was interrupted. 674 */ 675 private void initConnection() throws IOException, InterruptedException { 676 compressionHandler = null; 677 678 // Set the reader and writer instance variables 679 initReaderAndWriter(); 680 681 // Start the writer thread. This will open an XMPP stream to the server 682 packetWriter.init(); 683 // Start the reader thread. The startup() method will block until we 684 // get an opening stream packet back from server 685 packetReader.init(); 686 } 687 688 private void initReaderAndWriter() throws IOException { 689 InputStream is = socket.getInputStream(); 690 OutputStream os = socket.getOutputStream(); 691 if (compressionHandler != null) { 692 is = compressionHandler.getInputStream(is); 693 os = compressionHandler.getOutputStream(os); 694 } 695 // OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter 696 writer = new OutputStreamWriter(os, "UTF-8"); 697 reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); 698 699 // If debugging is enabled, we open a window and write out all network traffic. 700 initDebugger(); 701 } 702 703 /** 704 * The server has indicated that TLS negotiation can start. We now need to secure the 705 * existing plain connection and perform a handshake. This method won't return until the 706 * connection has finished the handshake or an error occurred while securing the connection. 707 * @throws IOException if an I/O error occurred. 708 * @throws SecurityNotPossibleException if TLS is not possible. 709 * @throws CertificateException if there is an issue with the certificate. 710 */ 711 @SuppressWarnings("LiteralClassName") 712 private void proceedTLSReceived() throws IOException, SecurityNotPossibleException, CertificateException { 713 SmackTlsContext smackTlsContext = getSmackTlsContext(); 714 715 Socket plain = socket; 716 int port = plain.getPort(); 717 String xmppServiceDomainString = config.getXMPPServiceDomain().toString(); 718 SSLSocketFactory sslSocketFactory = smackTlsContext.sslContext.getSocketFactory(); 719 // Secure the plain connection 720 socket = sslSocketFactory.createSocket(plain, xmppServiceDomainString, port, true); 721 722 final SSLSocket sslSocket = (SSLSocket) socket; 723 // Immediately set the enabled SSL protocols and ciphers. See SMACK-712 why this is 724 // important (at least on certain platforms) and it seems to be a good idea anyways to 725 // prevent an accidental implicit handshake. 726 TLSUtils.setEnabledProtocolsAndCiphers(sslSocket, config.getEnabledSSLProtocols(), config.getEnabledSSLCiphers()); 727 728 // Initialize the reader and writer with the new secured version 729 initReaderAndWriter(); 730 731 // Proceed to do the handshake 732 sslSocket.startHandshake(); 733 734 if (smackTlsContext.daneVerifier != null) { 735 smackTlsContext.daneVerifier.finish(sslSocket.getSession()); 736 } 737 738 final HostnameVerifier verifier = getConfiguration().getHostnameVerifier(); 739 if (verifier == null) { 740 throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure."); 741 } 742 743 final String verifierHostname; 744 { 745 DnsName xmppServiceDomainDnsName = getConfiguration().getXmppServiceDomainAsDnsNameIfPossible(); 746 // Try to convert the XMPP service domain, which potentially includes Unicode characters, into ASCII 747 // Compatible Encoding (ACE) to match RFC3280 dNSname IA5String constraint. 748 // See also: https://bugzilla.mozilla.org/show_bug.cgi?id=280839#c1 749 if (xmppServiceDomainDnsName != null) { 750 verifierHostname = xmppServiceDomainDnsName.ace; 751 } 752 else { 753 LOGGER.log(Level.WARNING, "XMPP service domain name '" + getXMPPServiceDomain() 754 + "' can not be represented as DNS name. TLS X.509 certificate validiation may fail."); 755 verifierHostname = getXMPPServiceDomain().toString(); 756 } 757 } 758 759 final boolean verificationSuccessful; 760 // Verify the TLS session. 761 verificationSuccessful = verifier.verify(verifierHostname, sslSocket.getSession()); 762 if (!verificationSuccessful) { 763 throw new CertificateException( 764 "Hostname verification of certificate failed. Certificate does not authenticate " 765 + getXMPPServiceDomain()); 766 } 767 768 // Set that TLS was successful 769 secureSocket = sslSocket; 770 } 771 772 /** 773 * Returns the compression handler that can be used for one compression methods offered by the server. 774 * 775 * @return a instance of XMPPInputOutputStream or null if no suitable instance was found 776 * 777 */ 778 private static XMPPInputOutputStream maybeGetCompressionHandler(Compress.Feature compression) { 779 for (XMPPInputOutputStream handler : SmackConfiguration.getCompressionHandlers()) { 780 String method = handler.getCompressionMethod(); 781 if (compression.getMethods().contains(method)) 782 return handler; 783 } 784 return null; 785 } 786 787 @Override 788 public boolean isUsingCompression() { 789 return compressionHandler != null && compressSyncPoint; 790 } 791 792 /** 793 * <p> 794 * Starts using stream compression that will compress network traffic. Traffic can be 795 * reduced up to 90%. Therefore, stream compression is ideal when using a slow speed network 796 * connection. However, the server and the client will need to use more CPU time in order to 797 * un/compress network data so under high load the server performance might be affected. 798 * </p> 799 * <p> 800 * Stream compression has to have been previously offered by the server. Currently only the 801 * zlib method is supported by the client. Stream compression negotiation has to be done 802 * before authentication took place. 803 * </p> 804 * 805 * @throws NotConnectedException if the XMPP connection is not connected. 806 * @throws SmackException if Smack detected an exceptional situation. 807 * @throws InterruptedException if the calling thread was interrupted. 808 * @throws XMPPException if an XMPP protocol error was received. 809 */ 810 private void maybeEnableCompression() throws SmackException, InterruptedException, XMPPException { 811 if (!config.isCompressionEnabled()) { 812 return; 813 } 814 815 Compress.Feature compression = getFeature(Compress.Feature.class); 816 if (compression == null) { 817 // Server does not support compression 818 return; 819 } 820 // If stream compression was offered by the server and we want to use 821 // compression then send compression request to the server 822 if ((compressionHandler = maybeGetCompressionHandler(compression)) != null) { 823 compressSyncPoint = false; 824 sendNonza(new Compress(compressionHandler.getCompressionMethod())); 825 waitForConditionOrThrowConnectionException(() -> compressSyncPoint, "establishing stream compression"); 826 } else { 827 LOGGER.warning("Could not enable compression because no matching handler/method pair was found"); 828 } 829 } 830 831 /** 832 * Establishes a connection to the XMPP server. It basically 833 * creates and maintains a socket connection to the server. 834 * <p> 835 * Listeners will be preserved from a previous connection if the reconnection 836 * occurs after an abrupt termination. 837 * </p> 838 * 839 * @throws XMPPException if an error occurs while trying to establish the connection. 840 * @throws SmackException if Smack detected an exceptional situation. 841 * @throws IOException if an I/O error occurred. 842 * @throws InterruptedException if the calling thread was interrupted. 843 */ 844 @Override 845 protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException { 846 // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if 847 // there is an error establishing the connection 848 connectUsingConfiguration(); 849 850 connected = true; 851 852 // We connected successfully to the servers TCP port 853 initConnection(); 854 855 // TLS handled will be true either if TLS was established, or if it was not mandatory. 856 waitForConditionOrThrowConnectionException(() -> tlsHandled, "establishing TLS"); 857 858 // Wait with SASL auth until the SASL mechanisms have been received 859 waitForConditionOrThrowConnectionException(() -> saslFeatureReceived, "SASL mechanisms stream feature from server"); 860 } 861 862 /** 863 * For unit testing purposes 864 * 865 * @param writer TODO javadoc me please 866 */ 867 protected void setWriter(Writer writer) { 868 this.writer = writer; 869 } 870 871 @Override 872 protected void afterFeaturesReceived() throws NotConnectedException, InterruptedException, SecurityRequiredByServerException { 873 StartTls startTlsFeature = getFeature(StartTls.class); 874 if (startTlsFeature != null) { 875 if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) { 876 SecurityRequiredByServerException smackException = new SecurityRequiredByServerException(); 877 currentSmackException = smackException; 878 notifyWaitingThreads(); 879 throw smackException; 880 } 881 882 if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) { 883 sendNonza(new StartTls()); 884 } else { 885 tlsHandled = true; 886 notifyWaitingThreads(); 887 } 888 } else { 889 tlsHandled = true; 890 notifyWaitingThreads(); 891 } 892 893 if (isSaslAuthenticated()) { 894 // If we have received features after the SASL has been successfully completed, then we 895 // have also *maybe* received, as it is an optional feature, the compression feature 896 // from the server. 897 streamFeaturesAfterAuthenticationReceived = true; 898 notifyWaitingThreads(); 899 } 900 } 901 902 private void resetParser() throws IOException { 903 try { 904 packetReader.parser = SmackXmlParser.newXmlParser(reader); 905 } catch (XmlPullParserException e) { 906 throw new IOException(e); 907 } 908 } 909 910 private void openStreamAndResetParser() throws IOException, NotConnectedException, InterruptedException { 911 sendStreamOpen(); 912 resetParser(); 913 } 914 915 protected class PacketReader { 916 917 private final String threadName = "Smack Reader (" + getConnectionCounter() + ')'; 918 919 XmlPullParser parser; 920 921 private volatile boolean done; 922 923 private boolean running; 924 925 /** 926 * Initializes the reader in order to be used. The reader is initialized during the 927 * first connection and when reconnecting due to an abruptly disconnection. 928 */ 929 void init() { 930 done = false; 931 932 running = true; 933 Async.go(new Runnable() { 934 @Override 935 public void run() { 936 LOGGER.finer(threadName + " start"); 937 try { 938 parsePackets(); 939 } finally { 940 LOGGER.finer(threadName + " exit"); 941 running = false; 942 notifyWaitingThreads(); 943 } 944 } 945 }, threadName); 946 } 947 948 /** 949 * Shuts the stanza reader down. This method simply sets the 'done' flag to true. 950 */ 951 void shutdown() { 952 done = true; 953 } 954 955 /** 956 * Parse top-level packets in order to process them further. 957 */ 958 private void parsePackets() { 959 try { 960 openStreamAndResetParser(); 961 XmlPullParser.Event eventType = parser.getEventType(); 962 while (!done) { 963 switch (eventType) { 964 case START_ELEMENT: 965 final String name = parser.getName(); 966 final String namespace = parser.getNamespace(); 967 968 switch (name) { 969 case Message.ELEMENT: 970 case IQ.IQ_ELEMENT: 971 case Presence.ELEMENT: 972 try { 973 parseAndProcessStanza(parser); 974 } finally { 975 clientHandledStanzasCount = SMUtils.incrementHeight(clientHandledStanzasCount); 976 } 977 break; 978 case "stream": 979 if (StreamOpen.ETHERX_JABBER_STREAMS_NAMESPACE.equals(namespace)) { 980 onStreamOpen(parser); 981 } 982 break; 983 case "error": 984 StreamError streamError = PacketParserUtils.parseStreamError(parser); 985 // Stream errors are non recoverable, throw this exceptions. Also note that this will set 986 // this exception as current connection exceptions and notify any waiting threads. 987 throw new StreamErrorException(streamError); 988 case "features": 989 parseFeaturesAndNotify(parser); 990 break; 991 case "proceed": 992 // Secure the connection by negotiating TLS 993 proceedTLSReceived(); 994 // Send a new opening stream to the server 995 openStreamAndResetParser(); 996 break; 997 case "failure": 998 switch (namespace) { 999 case "urn:ietf:params:xml:ns:xmpp-tls": 1000 // TLS negotiation has failed. The server will close the connection 1001 // TODO Parse failure stanza 1002 throw new SmackException.SmackMessageException("TLS negotiation has failed"); 1003 case "http://jabber.org/protocol/compress": 1004 // Stream compression has been denied. This is a recoverable 1005 // situation. It is still possible to authenticate and 1006 // use the connection but using an uncompressed connection 1007 // TODO Parse failure stanza 1008 currentSmackException = new SmackException.SmackMessageException("Could not establish compression"); 1009 notifyWaitingThreads(); 1010 break; 1011 default: 1012 parseAndProcessNonza(parser); 1013 } 1014 break; 1015 case Compressed.ELEMENT: 1016 // Server confirmed that it's possible to use stream compression. Start 1017 // stream compression 1018 // Initialize the reader and writer with the new compressed version 1019 initReaderAndWriter(); 1020 // Send a new opening stream to the server 1021 openStreamAndResetParser(); 1022 // Notify that compression is being used 1023 compressSyncPoint = true; 1024 notifyWaitingThreads(); 1025 break; 1026 case Enabled.ELEMENT: 1027 Enabled enabled = ParseStreamManagement.enabled(parser); 1028 if (enabled.isResumeSet()) { 1029 smSessionId = enabled.getId(); 1030 if (StringUtils.isNullOrEmpty(smSessionId)) { 1031 SmackException xmppException = new SmackException.SmackMessageException("Stream Management 'enabled' element with resume attribute but without session id received"); 1032 setCurrentConnectionExceptionAndNotify(xmppException); 1033 throw xmppException; 1034 } 1035 smServerMaxResumptionTime = enabled.getMaxResumptionTime(); 1036 } else { 1037 // Mark this a non-resumable stream by setting smSessionId to null 1038 smSessionId = null; 1039 } 1040 clientHandledStanzasCount = 0; 1041 smWasEnabledAtLeastOnce = true; 1042 smEnabledSyncPoint = true; 1043 notifyWaitingThreads(); 1044 break; 1045 case Failed.ELEMENT: 1046 Failed failed = ParseStreamManagement.failed(parser); 1047 if (smResumedSyncPoint == SyncPointState.request_sent) { 1048 // This is a <failed/> nonza in a response to resuming a previous stream, failure to do 1049 // so is non-fatal as we can simply continue with resource binding in this case. 1050 smResumptionFailed = failed; 1051 notifyWaitingThreads(); 1052 } else { 1053 FailedNonzaException xmppException = new FailedNonzaException(failed, failed.getStanzaErrorCondition()); 1054 setCurrentConnectionExceptionAndNotify(xmppException); 1055 } 1056 break; 1057 case Resumed.ELEMENT: 1058 Resumed resumed = ParseStreamManagement.resumed(parser); 1059 if (!smSessionId.equals(resumed.getPrevId())) { 1060 throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId()); 1061 } 1062 // Mark SM as enabled 1063 smEnabledSyncPoint = true; 1064 // First, drop the stanzas already handled by the server 1065 processHandledCount(resumed.getHandledCount()); 1066 // Then re-send what is left in the unacknowledged queue 1067 List<Stanza> stanzasToResend = new ArrayList<>(unacknowledgedStanzas.size()); 1068 unacknowledgedStanzas.drainTo(stanzasToResend); 1069 for (Stanza stanza : stanzasToResend) { 1070 sendStanzaInternal(stanza); 1071 } 1072 // If there where stanzas resent, then request a SM ack for them. 1073 // Writer's sendStreamElement() won't do it automatically based on 1074 // predicates. 1075 if (!stanzasToResend.isEmpty()) { 1076 requestSmAcknowledgementInternal(); 1077 } 1078 // Mark SM resumption as successful 1079 smResumedSyncPoint = SyncPointState.successful; 1080 notifyWaitingThreads(); 1081 break; 1082 case AckAnswer.ELEMENT: 1083 AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser); 1084 processHandledCount(ackAnswer.getHandledCount()); 1085 break; 1086 case AckRequest.ELEMENT: 1087 ParseStreamManagement.ackRequest(parser); 1088 if (smEnabledSyncPoint) { 1089 sendSmAcknowledgementInternal(); 1090 } else { 1091 LOGGER.warning("SM Ack Request received while SM is not enabled"); 1092 } 1093 break; 1094 default: 1095 parseAndProcessNonza(parser); 1096 break; 1097 } 1098 break; 1099 case END_ELEMENT: 1100 final String endTagName = parser.getName(); 1101 if ("stream".equals(endTagName)) { 1102 if (!parser.getNamespace().equals("http://etherx.jabber.org/streams")) { 1103 LOGGER.warning(XMPPTCPConnection.this + " </stream> but different namespace " + parser.getNamespace()); 1104 break; 1105 } 1106 1107 // Check if the queue was already shut down before reporting success on closing stream tag 1108 // received. This avoids a race if there is a disconnect(), followed by a connect(), which 1109 // did re-start the queue again, causing this writer to assume that the queue is not 1110 // shutdown, which results in a call to disconnect(). 1111 final boolean queueWasShutdown = packetWriter.queue.isShutdown(); 1112 closingStreamReceived = true; 1113 notifyWaitingThreads(); 1114 1115 if (queueWasShutdown) { 1116 // We received a closing stream element *after* we initiated the 1117 // termination of the session by sending a closing stream element to 1118 // the server first 1119 return; 1120 } else { 1121 // We received a closing stream element from the server without us 1122 // sending a closing stream element first. This means that the 1123 // server wants to terminate the session, therefore disconnect 1124 // the connection 1125 LOGGER.info(XMPPTCPConnection.this 1126 + " received closing </stream> element." 1127 + " Server wants to terminate the connection, calling disconnect()"); 1128 ASYNC_BUT_ORDERED.performAsyncButOrdered(XMPPTCPConnection.this, new Runnable() { 1129 @Override 1130 public void run() { 1131 disconnect(); 1132 }}); 1133 } 1134 } 1135 break; 1136 case END_DOCUMENT: 1137 // END_DOCUMENT only happens in an error case, as otherwise we would see a 1138 // closing stream element before. 1139 throw new SmackException.SmackMessageException( 1140 "Parser got END_DOCUMENT event. This could happen e.g. if the server closed the connection without sending a closing stream element"); 1141 default: 1142 // Catch all for incomplete switch (MissingCasesInEnumSwitch) statement. 1143 break; 1144 } 1145 eventType = parser.next(); 1146 } 1147 } 1148 catch (Exception e) { 1149 // Set running to false since this thread will exit here and notifyConnectionError() will wait until 1150 // the reader and writer thread's 'running' value is false. Hence we need to set it to false before calling 1151 // notifyConnetctionError() below, even though run() also sets it to false. Therefore, do not remove this. 1152 running = false; 1153 1154 String ignoreReasonThread = null; 1155 1156 boolean writerThreadWasShutDown = packetWriter.queue.isShutdown(); 1157 if (writerThreadWasShutDown) { 1158 ignoreReasonThread = "writer"; 1159 } else if (done) { 1160 ignoreReasonThread = "reader"; 1161 } 1162 1163 if (ignoreReasonThread != null) { 1164 LOGGER.log(Level.FINER, "Ignoring " + e + " as " + ignoreReasonThread + " was already shut down"); 1165 return; 1166 } 1167 1168 // Close the connection and notify connection listeners of the error. 1169 notifyConnectionError(e); 1170 } 1171 } 1172 } 1173 1174 protected class PacketWriter { 1175 public static final int QUEUE_SIZE = XMPPTCPConnection.QUEUE_SIZE; 1176 public static final int UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE = 1024; 1177 public static final int UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE_HIGH_WATER_MARK = (int) (0.3 * UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE); 1178 1179 private final String threadName = "Smack Writer (" + getConnectionCounter() + ')'; 1180 1181 private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<>( 1182 QUEUE_SIZE, true); 1183 1184 /** 1185 * If set, the stanza writer is shut down 1186 */ 1187 protected volatile Long shutdownTimestamp = null; 1188 1189 private volatile boolean instantShutdown; 1190 1191 /** 1192 * True if some preconditions are given to start the bundle and defer mechanism. 1193 * <p> 1194 * This will likely get set to true right after the start of the writer thread, because 1195 * {@link #nextStreamElement()} will check if {@link queue} is empty, which is probably the case, and then set 1196 * this field to true. 1197 * </p> 1198 */ 1199 private boolean shouldBundleAndDefer; 1200 1201 private boolean running; 1202 1203 /** 1204 * Initializes the writer in order to be used. It is called at the first connection and also 1205 * is invoked if the connection is disconnected by an error. 1206 */ 1207 void init() { 1208 shutdownTimestamp = null; 1209 1210 if (unacknowledgedStanzas != null) { 1211 // It's possible that there are new stanzas in the writer queue that 1212 // came in while we were disconnected but resumable, drain those into 1213 // the unacknowledged queue so that they get resent now 1214 drainWriterQueueToUnacknowledgedStanzas(); 1215 } 1216 1217 queue.start(); 1218 running = true; 1219 Async.go(new Runnable() { 1220 @Override 1221 public void run() { 1222 LOGGER.finer(threadName + " start"); 1223 try { 1224 writePackets(); 1225 } finally { 1226 LOGGER.finer(threadName + " exit"); 1227 running = false; 1228 notifyWaitingThreads(); 1229 } 1230 } 1231 }, threadName); 1232 } 1233 1234 private boolean done() { 1235 return shutdownTimestamp != null; 1236 } 1237 1238 protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException { 1239 final boolean done = done(); 1240 if (done) { 1241 final boolean smResumptionPossible = isSmResumptionPossible(); 1242 // Don't throw a NotConnectedException is there is an resumable stream available 1243 if (!smResumptionPossible) { 1244 throw new NotConnectedException(XMPPTCPConnection.this, "done=" + done 1245 + " smResumptionPossible=" + smResumptionPossible); 1246 } 1247 } 1248 } 1249 1250 /** 1251 * Sends the specified element to the server. 1252 * 1253 * @param element the element to send. 1254 * @throws NotConnectedException if the XMPP connection is not connected. 1255 * @throws InterruptedException if the calling thread was interrupted. 1256 */ 1257 protected void sendStreamElement(Element element) throws NotConnectedException, InterruptedException { 1258 throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 1259 try { 1260 queue.put(element); 1261 } 1262 catch (InterruptedException e) { 1263 // put() may throw an InterruptedException for two reasons: 1264 // 1. If the queue was shut down 1265 // 2. If the thread was interrupted 1266 // so we have to check which is the case 1267 throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 1268 // If the method above did not throw, then the sending thread was interrupted 1269 throw e; 1270 } 1271 } 1272 1273 /** 1274 * Shuts down the stanza writer. Once this method has been called, no further 1275 * packets will be written to the server. 1276 */ 1277 void shutdown(boolean instant) { 1278 instantShutdown = instant; 1279 queue.shutdown(); 1280 shutdownTimestamp = System.currentTimeMillis(); 1281 } 1282 1283 /** 1284 * Maybe return the next available element from the queue for writing. If the queue is shut down <b>or</b> a 1285 * spurious interrupt occurs, <code>null</code> is returned. So it is important to check the 'done' condition in 1286 * that case. 1287 * 1288 * @return the next element for writing or null. 1289 */ 1290 private Element nextStreamElement() { 1291 // It is important the we check if the queue is empty before removing an element from it 1292 if (queue.isEmpty()) { 1293 shouldBundleAndDefer = true; 1294 } 1295 Element packet = null; 1296 try { 1297 packet = queue.take(); 1298 } 1299 catch (InterruptedException e) { 1300 if (!queue.isShutdown()) { 1301 // Users shouldn't try to interrupt the packet writer thread 1302 LOGGER.log(Level.WARNING, "Writer thread was interrupted. Don't do that. Use disconnect() instead.", e); 1303 } 1304 } 1305 return packet; 1306 } 1307 1308 private void writePackets() { 1309 try { 1310 // Write out packets from the queue. 1311 while (!done()) { 1312 Element element = nextStreamElement(); 1313 if (element == null) { 1314 continue; 1315 } 1316 1317 // Get a local version of the bundle and defer callback, in case it's unset 1318 // between the null check and the method invocation 1319 final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback; 1320 // If the preconditions are given (e.g. bundleAndDefer callback is set, queue is 1321 // empty), then we could wait a bit for further stanzas attempting to decrease 1322 // our energy consumption 1323 if (localBundleAndDeferCallback != null && isAuthenticated() && shouldBundleAndDefer) { 1324 // Reset shouldBundleAndDefer to false, nextStreamElement() will set it to true once the 1325 // queue is empty again. 1326 shouldBundleAndDefer = false; 1327 final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean(); 1328 final int bundleAndDeferMillis = localBundleAndDeferCallback.getBundleAndDeferMillis(new BundleAndDefer( 1329 bundlingAndDeferringStopped)); 1330 if (bundleAndDeferMillis > 0) { 1331 long remainingWait = bundleAndDeferMillis; 1332 final long waitStart = System.currentTimeMillis(); 1333 synchronized (bundlingAndDeferringStopped) { 1334 while (!bundlingAndDeferringStopped.get() && remainingWait > 0) { 1335 bundlingAndDeferringStopped.wait(remainingWait); 1336 remainingWait = bundleAndDeferMillis 1337 - (System.currentTimeMillis() - waitStart); 1338 } 1339 } 1340 } 1341 } 1342 1343 Stanza packet = null; 1344 if (element instanceof Stanza) { 1345 packet = (Stanza) element; 1346 } 1347 else if (element instanceof Enable) { 1348 // The client needs to add messages to the unacknowledged stanzas queue 1349 // right after it sent 'enabled'. Stanza will be added once 1350 // unacknowledgedStanzas is not null. 1351 unacknowledgedStanzas = new ArrayBlockingQueue<>(UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE); 1352 } 1353 maybeAddToUnacknowledgedStanzas(packet); 1354 1355 CharSequence elementXml = element.toXML(outgoingStreamXmlEnvironment); 1356 if (elementXml instanceof XmlStringBuilder) { 1357 try { 1358 ((XmlStringBuilder) elementXml).write(writer, outgoingStreamXmlEnvironment); 1359 } catch (NullPointerException npe) { 1360 LOGGER.log(Level.FINE, "NPE in XmlStringBuilder of " + element.getClass() + ": " + element, npe); 1361 throw npe; 1362 } 1363 } 1364 else { 1365 writer.write(elementXml.toString()); 1366 } 1367 1368 if (queue.isEmpty()) { 1369 writer.flush(); 1370 } 1371 if (packet != null) { 1372 firePacketSendingListeners(packet); 1373 } 1374 } 1375 if (!instantShutdown) { 1376 // Flush out the rest of the queue. 1377 try { 1378 while (!queue.isEmpty()) { 1379 Element packet = queue.remove(); 1380 if (packet instanceof Stanza) { 1381 Stanza stanza = (Stanza) packet; 1382 maybeAddToUnacknowledgedStanzas(stanza); 1383 } 1384 writer.write(packet.toXML().toString()); 1385 } 1386 } 1387 catch (Exception e) { 1388 LOGGER.log(Level.WARNING, 1389 "Exception flushing queue during shutdown, ignore and continue", 1390 e); 1391 } 1392 1393 // Close the stream. 1394 try { 1395 writer.write("</stream:stream>"); 1396 writer.flush(); 1397 } 1398 catch (Exception e) { 1399 LOGGER.log(Level.WARNING, "Exception writing closing stream element", e); 1400 } 1401 1402 // Delete the queue contents (hopefully nothing is left). 1403 queue.clear(); 1404 } else if (instantShutdown && isSmEnabled()) { 1405 // This was an instantShutdown and SM is enabled, drain all remaining stanzas 1406 // into the unacknowledgedStanzas queue 1407 drainWriterQueueToUnacknowledgedStanzas(); 1408 } 1409 // Do *not* close the writer here, as it will cause the socket 1410 // to get closed. But we may want to receive further stanzas 1411 // until the closing stream tag is received. The socket will be 1412 // closed in shutdown(). 1413 } 1414 catch (Exception e) { 1415 // The exception can be ignored if the the connection is 'done' 1416 // or if the it was caused because the socket got closed 1417 if (!(done() || queue.isShutdown())) { 1418 // Set running to false since this thread will exit here and notifyConnectionError() will wait until 1419 // the reader and writer thread's 'running' value is false. 1420 running = false; 1421 notifyConnectionError(e); 1422 } else { 1423 LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e); 1424 } 1425 } 1426 } 1427 1428 private void drainWriterQueueToUnacknowledgedStanzas() { 1429 List<Element> elements = new ArrayList<>(queue.size()); 1430 queue.drainTo(elements); 1431 for (int i = 0; i < elements.size(); i++) { 1432 Element element = elements.get(i); 1433 // If the unacknowledgedStanza queue is full, then bail out with a warning message. See SMACK-844. 1434 if (unacknowledgedStanzas.remainingCapacity() == 0) { 1435 StreamManagementException.UnacknowledgedQueueFullException exception = StreamManagementException.UnacknowledgedQueueFullException 1436 .newWith(i, elements, unacknowledgedStanzas); 1437 LOGGER.log(Level.WARNING, 1438 "Some stanzas may be lost as not all could be drained to the unacknowledged stanzas queue", exception); 1439 return; 1440 } 1441 if (element instanceof Stanza) { 1442 unacknowledgedStanzas.add((Stanza) element); 1443 } 1444 } 1445 } 1446 1447 private void maybeAddToUnacknowledgedStanzas(Stanza stanza) throws IOException { 1448 // Check if the stream element should be put to the unacknowledgedStanza 1449 // queue. Note that we can not do the put() in sendStanzaInternal() and the 1450 // packet order is not stable at this point (sendStanzaInternal() can be 1451 // called concurrently). 1452 if (unacknowledgedStanzas != null && stanza != null) { 1453 // If the unacknowledgedStanza queue reaching its high water mark, request an new ack 1454 // from the server in order to drain it 1455 if (unacknowledgedStanzas.size() == UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE_HIGH_WATER_MARK) { 1456 writer.write(AckRequest.INSTANCE.toXML().toString()); 1457 } 1458 1459 try { 1460 // It is important the we put the stanza in the unacknowledged stanza 1461 // queue before we put it on the wire 1462 unacknowledgedStanzas.put(stanza); 1463 } 1464 catch (InterruptedException e) { 1465 throw new IllegalStateException(e); 1466 } 1467 } 1468 } 1469 } 1470 1471 /** 1472 * Set if Stream Management should be used by default for new connections. 1473 * 1474 * @param useSmDefault true to use Stream Management for new connections. 1475 */ 1476 public static void setUseStreamManagementDefault(boolean useSmDefault) { 1477 XMPPTCPConnection.useSmDefault = useSmDefault; 1478 } 1479 1480 /** 1481 * Set if Stream Management resumption should be used by default for new connections. 1482 * 1483 * @param useSmResumptionDefault true to use Stream Management resumption for new connections. 1484 * @deprecated use {@link #setUseStreamManagementResumptionDefault(boolean)} instead. 1485 */ 1486 @Deprecated 1487 public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) { 1488 setUseStreamManagementResumptionDefault(useSmResumptionDefault); 1489 } 1490 1491 /** 1492 * Set if Stream Management resumption should be used by default for new connections. 1493 * 1494 * @param useSmResumptionDefault true to use Stream Management resumption for new connections. 1495 */ 1496 public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) { 1497 if (useSmResumptionDefault) { 1498 // Also enable SM is resumption is enabled 1499 setUseStreamManagementDefault(useSmResumptionDefault); 1500 } 1501 XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault; 1502 } 1503 1504 /** 1505 * Set if Stream Management should be used if supported by the server. 1506 * 1507 * @param useSm true to use Stream Management. 1508 */ 1509 public void setUseStreamManagement(boolean useSm) { 1510 this.useSm = useSm; 1511 } 1512 1513 /** 1514 * Set if Stream Management resumption should be used if supported by the server. 1515 * 1516 * @param useSmResumption true to use Stream Management resumption. 1517 */ 1518 public void setUseStreamManagementResumption(boolean useSmResumption) { 1519 if (useSmResumption) { 1520 // Also enable SM is resumption is enabled 1521 setUseStreamManagement(useSmResumption); 1522 } 1523 this.useSmResumption = useSmResumption; 1524 } 1525 1526 /** 1527 * Set the preferred resumption time in seconds. 1528 * @param resumptionTime the preferred resumption time in seconds 1529 */ 1530 public void setPreferredResumptionTime(int resumptionTime) { 1531 smClientMaxResumptionTime = resumptionTime; 1532 } 1533 1534 /** 1535 * Add a predicate for Stream Management acknowledgment requests. 1536 * <p> 1537 * Those predicates are used to determine when a Stream Management acknowledgement request is send to the server. 1538 * Some pre-defined predicates are found in the <code>org.jivesoftware.smack.sm.predicates</code> package. 1539 * </p> 1540 * <p> 1541 * If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used. 1542 * </p> 1543 * 1544 * @param predicate the predicate to add. 1545 * @return if the predicate was not already active. 1546 */ 1547 public boolean addRequestAckPredicate(StanzaFilter predicate) { 1548 synchronized (requestAckPredicates) { 1549 return requestAckPredicates.add(predicate); 1550 } 1551 } 1552 1553 /** 1554 * Remove the given predicate for Stream Management acknowledgment request. 1555 * @param predicate the predicate to remove. 1556 * @return true if the predicate was removed. 1557 */ 1558 public boolean removeRequestAckPredicate(StanzaFilter predicate) { 1559 synchronized (requestAckPredicates) { 1560 return requestAckPredicates.remove(predicate); 1561 } 1562 } 1563 1564 /** 1565 * Remove all predicates for Stream Management acknowledgment requests. 1566 */ 1567 public void removeAllRequestAckPredicates() { 1568 synchronized (requestAckPredicates) { 1569 requestAckPredicates.clear(); 1570 } 1571 } 1572 1573 /** 1574 * Send an unconditional Stream Management acknowledgement request to the server. 1575 * 1576 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1577 * @throws NotConnectedException if the connection is not connected. 1578 * @throws InterruptedException if the calling thread was interrupted. 1579 */ 1580 public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException { 1581 if (!isSmEnabled()) { 1582 throw new StreamManagementException.StreamManagementNotEnabledException(); 1583 } 1584 requestSmAcknowledgementInternal(); 1585 } 1586 1587 private void requestSmAcknowledgementInternal() throws NotConnectedException, InterruptedException { 1588 packetWriter.sendStreamElement(AckRequest.INSTANCE); 1589 } 1590 1591 /** 1592 * Send a unconditional Stream Management acknowledgment to the server. 1593 * <p> 1594 * See <a href="http://xmpp.org/extensions/xep-0198.html#acking">XEP-198: Stream Management ยง 4. Acks</a>: 1595 * "Either party MAY send an <a/> element at any time (e.g., after it has received a certain number of stanzas, 1596 * or after a certain period of time), even if it has not received an <r/> element from the other party." 1597 * </p> 1598 * 1599 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1600 * @throws NotConnectedException if the connection is not connected. 1601 * @throws InterruptedException if the calling thread was interrupted. 1602 */ 1603 public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException { 1604 if (!isSmEnabled()) { 1605 throw new StreamManagementException.StreamManagementNotEnabledException(); 1606 } 1607 sendSmAcknowledgementInternal(); 1608 } 1609 1610 private void sendSmAcknowledgementInternal() throws NotConnectedException, InterruptedException { 1611 AckAnswer ackAnswer = new AckAnswer(clientHandledStanzasCount); 1612 // Do net put an ack to the queue if it has already been shutdown. Some servers, like ejabberd, like to request 1613 // an ack even after we have send a stream close (and hance the queue was shutdown). If we would not check here, 1614 // then the ack would dangle around in the queue, and be send on the next re-connection attempt even before the 1615 // stream open. 1616 packetWriter.queue.putIfNotShutdown(ackAnswer); 1617 } 1618 1619 /** 1620 * Add a Stanza acknowledged listener. 1621 * <p> 1622 * Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get 1623 * automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, StanzaListener)} when 1624 * possible. 1625 * </p> 1626 * 1627 * @param listener the listener to add. 1628 */ 1629 public void addStanzaAcknowledgedListener(StanzaListener listener) { 1630 stanzaAcknowledgedListeners.add(listener); 1631 } 1632 1633 /** 1634 * Remove the given Stanza acknowledged listener. 1635 * 1636 * @param listener the listener. 1637 * @return true if the listener was removed. 1638 */ 1639 public boolean removeStanzaAcknowledgedListener(StanzaListener listener) { 1640 return stanzaAcknowledgedListeners.remove(listener); 1641 } 1642 1643 /** 1644 * Remove all stanza acknowledged listeners. 1645 */ 1646 public void removeAllStanzaAcknowledgedListeners() { 1647 stanzaAcknowledgedListeners.clear(); 1648 } 1649 1650 /** 1651 * Add a Stanza dropped listener. 1652 * <p> 1653 * Those listeners will be invoked every time a Stanza has been dropped due to a failed SM resume. They will not get 1654 * automatically removed. If at least one StanzaDroppedListener is configured, no attempt will be made to retransmit 1655 * the Stanzas. 1656 * </p> 1657 * 1658 * @param listener the listener to add. 1659 * @since 4.3.3 1660 */ 1661 public void addStanzaDroppedListener(StanzaListener listener) { 1662 stanzaDroppedListeners.add(listener); 1663 } 1664 1665 /** 1666 * Remove the given Stanza dropped listener. 1667 * 1668 * @param listener the listener. 1669 * @return true if the listener was removed. 1670 * @since 4.3.3 1671 */ 1672 public boolean removeStanzaDroppedListener(StanzaListener listener) { 1673 return stanzaDroppedListeners.remove(listener); 1674 } 1675 1676 /** 1677 * Add a new Stanza ID acknowledged listener for the given ID. 1678 * <p> 1679 * The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will 1680 * automatically be removed after the listener was run. 1681 * </p> 1682 * 1683 * @param id the stanza ID. 1684 * @param listener the listener to invoke. 1685 * @return the previous listener for this stanza ID or null. 1686 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1687 */ 1688 @SuppressWarnings("FutureReturnValueIgnored") 1689 public StanzaListener addStanzaIdAcknowledgedListener(final String id, StanzaListener listener) throws StreamManagementNotEnabledException { 1690 // Prevent users from adding callbacks that will never get removed 1691 if (!smWasEnabledAtLeastOnce) { 1692 throw new StreamManagementException.StreamManagementNotEnabledException(); 1693 } 1694 // Remove the listener after max. 3 hours 1695 final int removeAfterSeconds = Math.min(getMaxSmResumptionTime(), 3 * 60 * 60); 1696 schedule(new Runnable() { 1697 @Override 1698 public void run() { 1699 stanzaIdAcknowledgedListeners.remove(id); 1700 } 1701 }, removeAfterSeconds, TimeUnit.SECONDS); 1702 return stanzaIdAcknowledgedListeners.put(id, listener); 1703 } 1704 1705 /** 1706 * Remove the Stanza ID acknowledged listener for the given ID. 1707 * 1708 * @param id the stanza ID. 1709 * @return true if the listener was found and removed, false otherwise. 1710 */ 1711 public StanzaListener removeStanzaIdAcknowledgedListener(String id) { 1712 return stanzaIdAcknowledgedListeners.remove(id); 1713 } 1714 1715 /** 1716 * Removes all Stanza ID acknowledged listeners. 1717 */ 1718 public void removeAllStanzaIdAcknowledgedListeners() { 1719 stanzaIdAcknowledgedListeners.clear(); 1720 } 1721 1722 /** 1723 * Returns true if Stream Management is supported by the server. 1724 * 1725 * @return true if Stream Management is supported by the server. 1726 */ 1727 public boolean isSmAvailable() { 1728 return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE); 1729 } 1730 1731 /** 1732 * Returns true if Stream Management was successfully negotiated with the server. 1733 * 1734 * @return true if Stream Management was negotiated. 1735 */ 1736 public boolean isSmEnabled() { 1737 return smEnabledSyncPoint; 1738 } 1739 1740 /** 1741 * Returns true if the stream was successfully resumed with help of Stream Management. 1742 * 1743 * @return true if the stream was resumed. 1744 */ 1745 public boolean streamWasResumed() { 1746 return smResumedSyncPoint == SyncPointState.successful; 1747 } 1748 1749 /** 1750 * Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible. 1751 * 1752 * @return true if disconnected but resumption possible. 1753 */ 1754 public boolean isDisconnectedButSmResumptionPossible() { 1755 return disconnectedButResumeable && isSmResumptionPossible(); 1756 } 1757 1758 /** 1759 * Returns true if the stream is resumable. 1760 * 1761 * @return true if the stream is resumable. 1762 */ 1763 public boolean isSmResumptionPossible() { 1764 // There is no resumable stream available 1765 if (smSessionId == null) 1766 return false; 1767 1768 final Long shutdownTimestamp = packetWriter.shutdownTimestamp; 1769 // Seems like we are already reconnected, report true 1770 if (shutdownTimestamp == null) { 1771 return true; 1772 } 1773 1774 // See if resumption time is over 1775 long current = System.currentTimeMillis(); 1776 long maxResumptionMillies = ((long) getMaxSmResumptionTime()) * 1000; 1777 if (current > shutdownTimestamp + maxResumptionMillies) { 1778 // Stream resumption is *not* possible if the current timestamp is greater then the greatest timestamp where 1779 // resumption is possible 1780 return false; 1781 } else { 1782 return true; 1783 } 1784 } 1785 1786 /** 1787 * Drop the stream management state. Sets {@link #smSessionId} and 1788 * {@link #unacknowledgedStanzas} to <code>null</code>. 1789 */ 1790 private void dropSmState() { 1791 // clientHandledCount and serverHandledCount will be reset on <enable/> and <enabled/> 1792 // respective. No need to reset them here. 1793 smSessionId = null; 1794 unacknowledgedStanzas = null; 1795 } 1796 1797 /** 1798 * Get the maximum resumption time in seconds after which a managed stream can be resumed. 1799 * <p> 1800 * This method will return {@link Integer#MAX_VALUE} if neither the client nor the server specify a maximum 1801 * resumption time. Be aware of integer overflows when using this value, e.g. do not add arbitrary values to it 1802 * without checking for overflows before. 1803 * </p> 1804 * 1805 * @return the maximum resumption time in seconds or {@link Integer#MAX_VALUE} if none set. 1806 */ 1807 public int getMaxSmResumptionTime() { 1808 int clientResumptionTime = smClientMaxResumptionTime > 0 ? smClientMaxResumptionTime : Integer.MAX_VALUE; 1809 int serverResumptionTime = smServerMaxResumptionTime > 0 ? smServerMaxResumptionTime : Integer.MAX_VALUE; 1810 return Math.min(clientResumptionTime, serverResumptionTime); 1811 } 1812 1813 private void processHandledCount(long handledCount) throws StreamManagementCounterError { 1814 long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount); 1815 final List<Stanza> ackedStanzas = new ArrayList<>( 1816 ackedStanzasCount <= Integer.MAX_VALUE ? (int) ackedStanzasCount 1817 : Integer.MAX_VALUE); 1818 for (long i = 0; i < ackedStanzasCount; i++) { 1819 Stanza ackedStanza = unacknowledgedStanzas.poll(); 1820 // If the server ack'ed a stanza, then it must be in the 1821 // unacknowledged stanza queue. There can be no exception. 1822 if (ackedStanza == null) { 1823 throw new StreamManagementCounterError(handledCount, serverHandledStanzasCount, 1824 ackedStanzasCount, ackedStanzas); 1825 } 1826 ackedStanzas.add(ackedStanza); 1827 } 1828 1829 boolean atLeastOneStanzaAcknowledgedListener = false; 1830 if (!stanzaAcknowledgedListeners.isEmpty()) { 1831 // If stanzaAcknowledgedListeners is not empty, the we have at least one 1832 atLeastOneStanzaAcknowledgedListener = true; 1833 } 1834 else { 1835 // Otherwise we look for a matching id in the stanza *id* acknowledged listeners 1836 for (Stanza ackedStanza : ackedStanzas) { 1837 String id = ackedStanza.getStanzaId(); 1838 if (id != null && stanzaIdAcknowledgedListeners.containsKey(id)) { 1839 atLeastOneStanzaAcknowledgedListener = true; 1840 break; 1841 } 1842 } 1843 } 1844 1845 // Only spawn a new thread if there is a chance that some listener is invoked 1846 if (atLeastOneStanzaAcknowledgedListener) { 1847 asyncGo(new Runnable() { 1848 @Override 1849 public void run() { 1850 for (Stanza ackedStanza : ackedStanzas) { 1851 for (StanzaListener listener : stanzaAcknowledgedListeners) { 1852 try { 1853 listener.processStanza(ackedStanza); 1854 } 1855 catch (InterruptedException | NotConnectedException | NotLoggedInException e) { 1856 LOGGER.log(Level.FINER, "Received exception", e); 1857 } 1858 } 1859 String id = ackedStanza.getStanzaId(); 1860 if (StringUtils.isNullOrEmpty(id)) { 1861 continue; 1862 } 1863 StanzaListener listener = stanzaIdAcknowledgedListeners.remove(id); 1864 if (listener != null) { 1865 try { 1866 listener.processStanza(ackedStanza); 1867 } 1868 catch (InterruptedException | NotConnectedException | NotLoggedInException e) { 1869 LOGGER.log(Level.FINER, "Received exception", e); 1870 } 1871 } 1872 } 1873 } 1874 }); 1875 } 1876 1877 serverHandledStanzasCount = handledCount; 1878 } 1879 1880 /** 1881 * Set the default bundle and defer callback used for new connections. 1882 * 1883 * @param defaultBundleAndDeferCallback TODO javadoc me please 1884 * @see BundleAndDeferCallback 1885 * @since 4.1 1886 */ 1887 public static void setDefaultBundleAndDeferCallback(BundleAndDeferCallback defaultBundleAndDeferCallback) { 1888 XMPPTCPConnection.defaultBundleAndDeferCallback = defaultBundleAndDeferCallback; 1889 } 1890 1891 /** 1892 * Set the bundle and defer callback used for this connection. 1893 * <p> 1894 * You can use <code>null</code> as argument to reset the callback. Outgoing stanzas will then 1895 * no longer get deferred. 1896 * </p> 1897 * 1898 * @param bundleAndDeferCallback the callback or <code>null</code>. 1899 * @see BundleAndDeferCallback 1900 * @since 4.1 1901 */ 1902 public void setBundleandDeferCallback(BundleAndDeferCallback bundleAndDeferCallback) { 1903 this.bundleAndDeferCallback = bundleAndDeferCallback; 1904 } 1905 1906}