001/** 002 * 003 * Copyright 2003-2007 Jive Software. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack.tcp; 018 019import java.io.BufferedReader; 020import java.io.IOException; 021import java.io.InputStream; 022import java.io.InputStreamReader; 023import java.io.OutputStream; 024import java.io.OutputStreamWriter; 025import java.io.Writer; 026import java.net.InetAddress; 027import java.net.InetSocketAddress; 028import java.net.Socket; 029import java.security.cert.CertificateException; 030import java.util.ArrayList; 031import java.util.Collection; 032import java.util.Iterator; 033import java.util.LinkedHashSet; 034import java.util.LinkedList; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038import java.util.concurrent.ArrayBlockingQueue; 039import java.util.concurrent.BlockingQueue; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.ConcurrentLinkedQueue; 042import java.util.concurrent.TimeUnit; 043import java.util.concurrent.atomic.AtomicBoolean; 044import java.util.logging.Level; 045import java.util.logging.Logger; 046 047import javax.net.SocketFactory; 048import javax.net.ssl.HostnameVerifier; 049import javax.net.ssl.SSLSession; 050import javax.net.ssl.SSLSocket; 051 052import org.jivesoftware.smack.AbstractXMPPConnection; 053import org.jivesoftware.smack.ConnectionConfiguration; 054import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode; 055import org.jivesoftware.smack.ConnectionListener; 056import org.jivesoftware.smack.SmackConfiguration; 057import org.jivesoftware.smack.SmackException; 058import org.jivesoftware.smack.SmackException.AlreadyConnectedException; 059import org.jivesoftware.smack.SmackException.AlreadyLoggedInException; 060import org.jivesoftware.smack.SmackException.ConnectionException; 061import org.jivesoftware.smack.SmackException.EndpointConnectionException; 062import org.jivesoftware.smack.SmackException.NotConnectedException; 063import org.jivesoftware.smack.SmackException.NotLoggedInException; 064import org.jivesoftware.smack.SmackException.SecurityNotPossibleException; 065import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException; 066import org.jivesoftware.smack.SmackFuture; 067import org.jivesoftware.smack.StanzaListener; 068import org.jivesoftware.smack.XMPPConnection; 069import org.jivesoftware.smack.XMPPException; 070import org.jivesoftware.smack.XMPPException.FailedNonzaException; 071import org.jivesoftware.smack.XMPPException.StreamErrorException; 072import org.jivesoftware.smack.compress.packet.Compress; 073import org.jivesoftware.smack.compress.packet.Compressed; 074import org.jivesoftware.smack.compression.XMPPInputOutputStream; 075import org.jivesoftware.smack.datatypes.UInt16; 076import org.jivesoftware.smack.filter.StanzaFilter; 077import org.jivesoftware.smack.internal.SmackTlsContext; 078import org.jivesoftware.smack.packet.Element; 079import org.jivesoftware.smack.packet.IQ; 080import org.jivesoftware.smack.packet.Message; 081import org.jivesoftware.smack.packet.Nonza; 082import org.jivesoftware.smack.packet.Presence; 083import org.jivesoftware.smack.packet.Stanza; 084import org.jivesoftware.smack.packet.StartTls; 085import org.jivesoftware.smack.packet.StreamError; 086import org.jivesoftware.smack.proxy.ProxyInfo; 087import org.jivesoftware.smack.sasl.packet.SaslNonza; 088import org.jivesoftware.smack.sm.SMUtils; 089import org.jivesoftware.smack.sm.StreamManagementException; 090import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException; 091import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError; 092import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException; 093import org.jivesoftware.smack.sm.packet.StreamManagement; 094import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer; 095import org.jivesoftware.smack.sm.packet.StreamManagement.AckRequest; 096import org.jivesoftware.smack.sm.packet.StreamManagement.Enable; 097import org.jivesoftware.smack.sm.packet.StreamManagement.Enabled; 098import org.jivesoftware.smack.sm.packet.StreamManagement.Failed; 099import org.jivesoftware.smack.sm.packet.StreamManagement.Resume; 100import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed; 101import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature; 102import org.jivesoftware.smack.sm.predicates.Predicate; 103import org.jivesoftware.smack.sm.provider.ParseStreamManagement; 104import org.jivesoftware.smack.tcp.rce.RemoteXmppTcpConnectionEndpoints; 105import org.jivesoftware.smack.tcp.rce.Rfc6120TcpRemoteConnectionEndpoint; 106import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown; 107import org.jivesoftware.smack.util.Async; 108import org.jivesoftware.smack.util.CloseableUtil; 109import org.jivesoftware.smack.util.PacketParserUtils; 110import org.jivesoftware.smack.util.StringUtils; 111import org.jivesoftware.smack.util.TLSUtils; 112import org.jivesoftware.smack.util.XmlStringBuilder; 113import org.jivesoftware.smack.util.rce.RemoteConnectionException; 114import org.jivesoftware.smack.xml.SmackXmlParser; 115import org.jivesoftware.smack.xml.XmlPullParser; 116import org.jivesoftware.smack.xml.XmlPullParserException; 117 118import org.jxmpp.jid.impl.JidCreate; 119import org.jxmpp.jid.parts.Resourcepart; 120import org.jxmpp.stringprep.XmppStringprepException; 121import org.minidns.dnsname.DnsName; 122 123/** 124 * Creates a socket connection to an XMPP server. This is the default connection 125 * to an XMPP server and is specified in the XMPP Core (RFC 6120). 126 * 127 * @see XMPPConnection 128 * @author Matt Tucker 129 */ 130public class XMPPTCPConnection extends AbstractXMPPConnection { 131 132 private static final int QUEUE_SIZE = 500; 133 private static final Logger LOGGER = Logger.getLogger(XMPPTCPConnection.class.getName()); 134 135 /** 136 * The socket which is used for this connection. 137 */ 138 private Socket socket; 139 140 /** 141 * 142 */ 143 private boolean disconnectedButResumeable = false; 144 145 private SSLSocket secureSocket; 146 147 /** 148 * Protected access level because of unit test purposes 149 */ 150 protected final PacketWriter packetWriter = new PacketWriter(); 151 152 /** 153 * Protected access level because of unit test purposes 154 */ 155 protected final PacketReader packetReader = new PacketReader(); 156 157 /** 158 * 159 */ 160 private boolean streamFeaturesAfterAuthenticationReceived; 161 162 /** 163 * 164 */ 165 private boolean compressSyncPoint; 166 167 /** 168 * The default bundle and defer callback, used for new connections. 169 * @see bundleAndDeferCallback 170 */ 171 private static BundleAndDeferCallback defaultBundleAndDeferCallback; 172 173 /** 174 * The used bundle and defer callback. 175 * <p> 176 * Although this field may be set concurrently, the 'volatile' keyword was deliberately not added, in order to avoid 177 * having a 'volatile' read within the writer threads loop. 178 * </p> 179 */ 180 private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback; 181 182 private static boolean useSmDefault = true; 183 184 private static boolean useSmResumptionDefault = true; 185 186 /** 187 * The stream ID of the stream that is currently resumable, ie. the stream we hold the state 188 * for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and 189 * {@link #unFailedNonzaExceptionacknowledgedStanzas}. 190 */ 191 private String smSessionId; 192 193 /** 194 * Represents the state of stream management resumption. 195 * <p> 196 * Unlike other sync points, this sync point is marked volatile because it is also read by the reader thread. 197 * </p> 198 */ 199 private volatile SyncPointState smResumedSyncPoint; 200 private Failed smResumptionFailed; 201 202 /** 203 * Represents the state of stream magement. 204 * <p> 205 * This boolean is marked volatile as it is read by various threads, including the reader thread via {@link #isSmEnabled()}. 206 * </p> 207 */ 208 private volatile boolean smEnabledSyncPoint; 209 210 /** 211 * The client's preferred maximum resumption time in seconds. 212 */ 213 private int smClientMaxResumptionTime = -1; 214 215 /** 216 * The server's preferred maximum resumption time in seconds. 217 */ 218 private int smServerMaxResumptionTime = -1; 219 220 /** 221 * Indicates whether Stream Management (XEP-198) should be used if it's supported by the server. 222 */ 223 private boolean useSm = useSmDefault; 224 private boolean useSmResumption = useSmResumptionDefault; 225 226 /** 227 * The counter that the server sends the client about it's current height. For example, if the server sends 228 * {@code <a h='42'/>}, then this will be set to 42 (while also handling the {@link #unacknowledgedStanzas} queue). 229 */ 230 private long serverHandledStanzasCount = 0; 231 232 /** 233 * The counter for stanzas handled ("received") by the client. 234 * <p> 235 * Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are 236 * not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since 237 * {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and 238 * therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic. 239 * </p> 240 */ 241 private long clientHandledStanzasCount = 0; 242 243 private BlockingQueue<Stanza> unacknowledgedStanzas; 244 245 /** 246 * Set to true if Stream Management was at least once enabled for this connection. 247 */ 248 private boolean smWasEnabledAtLeastOnce = false; 249 250 /** 251 * This listeners are invoked for every stanza that got acknowledged. 252 * <p> 253 * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove 254 * themselves after they have been invoked. 255 * </p> 256 */ 257 private final Collection<StanzaListener> stanzaAcknowledgedListeners = new ConcurrentLinkedQueue<>(); 258 259 /** 260 * These listeners are invoked for every stanza that got dropped. 261 * <p> 262 * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove 263 * themselves after they have been invoked. 264 * </p> 265 */ 266 private final Collection<StanzaListener> stanzaDroppedListeners = new ConcurrentLinkedQueue<>(); 267 268 /** 269 * This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will 270 * only be invoked once and automatically removed after that. 271 */ 272 private final Map<String, StanzaListener> stanzaIdAcknowledgedListeners = new ConcurrentHashMap<>(); 273 274 /** 275 * Predicates that determine if an stream management ack should be requested from the server. 276 * <p> 277 * We use a linked hash set here, so that the order how the predicates are added matches the 278 * order in which they are invoked in order to determine if an ack request should be send or not. 279 * </p> 280 */ 281 private final Set<StanzaFilter> requestAckPredicates = new LinkedHashSet<>(); 282 283 @SuppressWarnings("HidingField") 284 private final XMPPTCPConnectionConfiguration config; 285 286 /** 287 * Creates a new XMPP connection over TCP (optionally using proxies). 288 * <p> 289 * Note that XMPPTCPConnection constructors do not establish a connection to the server 290 * and you must call {@link #connect()}. 291 * </p> 292 * 293 * @param config the connection configuration. 294 */ 295 public XMPPTCPConnection(XMPPTCPConnectionConfiguration config) { 296 super(config); 297 this.config = config; 298 addConnectionListener(new ConnectionListener() { 299 @Override 300 public void connectionClosedOnError(Exception e) { 301 if (e instanceof XMPPException.StreamErrorException || e instanceof StreamManagementException) { 302 dropSmState(); 303 } 304 } 305 }); 306 307 // Re-init the reader and writer in case of SASL <success/>. This is done to reset the parser since a new stream 308 // is initiated. 309 buildNonzaCallback().listenFor(SaslNonza.Success.class, s -> resetParser()).install(); 310 } 311 312 /** 313 * Creates a new XMPP connection over TCP. 314 * <p> 315 * Note that {@code jid} must be the bare JID, e.g. "user@example.org". More fine-grained control over the 316 * connection settings is available using the {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} 317 * constructor. 318 * </p> 319 * 320 * @param jid the bare JID used by the client. 321 * @param password the password or authentication token. 322 * @throws XmppStringprepException if the provided string is invalid. 323 */ 324 public XMPPTCPConnection(CharSequence jid, String password) throws XmppStringprepException { 325 this(XMPPTCPConnectionConfiguration.builder().setXmppAddressAndPassword(jid, password).build()); 326 } 327 328 /** 329 * Creates a new XMPP connection over TCP. 330 * <p> 331 * This is the simplest constructor for connecting to an XMPP server. Alternatively, 332 * you can get fine-grained control over connection settings using the 333 * {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} constructor. 334 * </p> 335 * @param username TODO javadoc me please 336 * @param password TODO javadoc me please 337 * @param serviceName TODO javadoc me please 338 * @throws XmppStringprepException if the provided string is invalid. 339 */ 340 public XMPPTCPConnection(CharSequence username, String password, String serviceName) throws XmppStringprepException { 341 this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setXmppDomain( 342 JidCreate.domainBareFrom(serviceName)).build()); 343 } 344 345 @Override 346 protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException { 347 if (packetWriter == null) { 348 throw new NotConnectedException(); 349 } 350 packetWriter.throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 351 } 352 353 @Override 354 protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException { 355 if (isConnected() && !disconnectedButResumeable) { 356 throw new AlreadyConnectedException(); 357 } 358 } 359 360 @Override 361 protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException { 362 if (isAuthenticated() && !disconnectedButResumeable) { 363 throw new AlreadyLoggedInException(); 364 } 365 } 366 367 @Override 368 protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException { 369 // Reset the flag in case it was set 370 disconnectedButResumeable = false; 371 super.afterSuccessfulLogin(resumed); 372 } 373 374 @Override 375 protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException, 376 SmackException, IOException, InterruptedException { 377 // Authenticate using SASL 378 SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null; 379 380 streamFeaturesAfterAuthenticationReceived = false; 381 authenticate(username, password, config.getAuthzid(), sslSession); 382 383 // Wait for stream features after the authentication. 384 // TODO: The name of this synchronization point "maybeCompressFeaturesReceived" is not perfect. It should be 385 // renamed to "streamFeaturesAfterAuthenticationReceived". 386 waitForConditionOrThrowConnectionException(() -> streamFeaturesAfterAuthenticationReceived, "compress features from server"); 387 388 // If compression is enabled then request the server to use stream compression. XEP-170 389 // recommends to perform stream compression before resource binding. 390 maybeEnableCompression(); 391 392 smResumedSyncPoint = SyncPointState.initial; 393 smResumptionFailed = null; 394 if (isSmResumptionPossible()) { 395 smResumedSyncPoint = SyncPointState.request_sent; 396 sendNonza(new Resume(clientHandledStanzasCount, smSessionId)); 397 waitForConditionOrConnectionException(() -> smResumedSyncPoint == SyncPointState.successful || smResumptionFailed != null, "resume previous stream"); 398 if (smResumedSyncPoint == SyncPointState.successful) { 399 // We successfully resumed the stream, be done here 400 afterSuccessfulLogin(true); 401 return; 402 } 403 // SM resumption failed, what Smack does here is to report success of 404 // lastFeaturesReceived in case of sm resumption was answered with 'failed' so that 405 // normal resource binding can be tried. 406 assert smResumptionFailed != null; 407 LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process: " + smResumptionFailed); 408 } 409 410 // We either failed to resume a previous stream management (SM) session, or we did not even try. In any case, 411 // mark SM as not enabled. Most importantly, we do this prior calling bindResourceAndEstablishSession(), as the 412 // bind IQ may trigger a SM ack request, which would be invalid in the pre resource bound state. 413 smEnabledSyncPoint = false; 414 415 List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>(); 416 if (unacknowledgedStanzas != null) { 417 // There was a previous connection with SM enabled but that was either not resumable or 418 // failed to resume. Make sure that we (re-)send the unacknowledged stanzas. 419 unacknowledgedStanzas.drainTo(previouslyUnackedStanzas); 420 // Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this 421 // XMPP session (There maybe was an enabled in a previous XMPP session of this 422 // connection instance though). This is used in writePackets to decide if stanzas should 423 // be added to the unacknowledged stanzas queue, because they have to be added right 424 // after the 'enable' stream element has been sent. 425 dropSmState(); 426 } 427 428 // Now bind the resource. It is important to do this *after* we dropped an eventually 429 // existing Stream Management state. As otherwise <bind/> and <session/> may end up in 430 // unacknowledgedStanzas and become duplicated on reconnect. See SMACK-706. 431 bindResourceAndEstablishSession(resource); 432 433 if (isSmAvailable() && useSm) { 434 // Remove what is maybe left from previously stream managed sessions 435 serverHandledStanzasCount = 0; 436 sendNonza(new Enable(useSmResumption, smClientMaxResumptionTime)); 437 // XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed' 438 // then this is a non recoverable error and we therefore throw an exception. 439 waitForConditionOrThrowConnectionException(() -> smEnabledSyncPoint, "enabling stream mangement"); 440 synchronized (requestAckPredicates) { 441 if (requestAckPredicates.isEmpty()) { 442 // Assure that we have at lest one predicate set up that so that we request acks 443 // for the server and eventually flush some stanzas from the unacknowledged 444 // stanza queue 445 requestAckPredicates.add(Predicate.forMessagesOrAfter5Stanzas()); 446 } 447 } 448 } 449 // Inform client about failed resumption if possible, resend stanzas otherwise 450 // Process the stanzas synchronously so a client can re-queue them for transmission 451 // before it is informed about connection success 452 if (!stanzaDroppedListeners.isEmpty()) { 453 for (Stanza stanza : previouslyUnackedStanzas) { 454 for (StanzaListener listener : stanzaDroppedListeners) { 455 try { 456 listener.processStanza(stanza); 457 } 458 catch (InterruptedException | NotConnectedException | NotLoggedInException e) { 459 LOGGER.log(Level.FINER, "StanzaDroppedListener received exception", e); 460 } 461 } 462 } 463 } else { 464 for (Stanza stanza : previouslyUnackedStanzas) { 465 sendStanzaInternal(stanza); 466 } 467 } 468 469 afterSuccessfulLogin(false); 470 } 471 472 @Override 473 public boolean isSecureConnection() { 474 return secureSocket != null; 475 } 476 477 /** 478 * Shuts the current connection down. After this method returns, the connection must be ready 479 * for re-use by connect. 480 */ 481 @Override 482 protected void shutdown() { 483 if (isSmEnabled()) { 484 try { 485 // Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM 486 // state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either. 487 sendSmAcknowledgementInternal(); 488 } catch (InterruptedException | NotConnectedException e) { 489 LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e); 490 } 491 } 492 shutdown(false); 493 } 494 495 @Override 496 public synchronized void instantShutdown() { 497 shutdown(true); 498 } 499 500 private void shutdown(boolean instant) { 501 // The writer thread may already been finished at this point, for example when the connection is in the 502 // disconnected-but-resumable state. There is no need to wait for the closing stream tag from the server in this 503 // case. 504 if (!packetWriter.done()) { 505 // First shutdown the writer, this will result in a closing stream element getting send to 506 // the server 507 LOGGER.finer(packetWriter.threadName + " shutdown()"); 508 packetWriter.shutdown(instant); 509 LOGGER.finer(packetWriter.threadName + " shutdown() returned"); 510 511 if (!instant) { 512 waitForClosingStreamTagFromServer(); 513 } 514 } 515 516 LOGGER.finer(packetReader.threadName + " shutdown()"); 517 packetReader.shutdown(); 518 LOGGER.finer(packetReader.threadName + " shutdown() returned"); 519 520 CloseableUtil.maybeClose(socket, LOGGER); 521 522 setWasAuthenticated(); 523 524 try { 525 boolean readerAndWriterThreadsTermianted = waitFor(() -> !packetWriter.running && !packetReader.running); 526 if (!readerAndWriterThreadsTermianted) { 527 LOGGER.severe("Reader and/or writer threads did not terminate timely. Writer running: " 528 + packetWriter.running + ", Reader running: " + packetReader.running); 529 } else { 530 LOGGER.fine("Reader and writer threads terminated"); 531 } 532 } catch (InterruptedException e) { 533 LOGGER.log(Level.FINE, "Interrupted while waiting for reader and writer threads to terminate", e); 534 } 535 536 if (disconnectedButResumeable) { 537 return; 538 } 539 540 // If we are able to resume the stream, then don't set 541 // connected/authenticated/usingTLS to false since we like behave like we are still 542 // connected (e.g. sendStanza should not throw a NotConnectedException). 543 if (instant) { 544 disconnectedButResumeable = isSmResumptionPossible(); 545 if (!disconnectedButResumeable) { 546 // Reset the stream management session id to null, since the stream is no longer resumable. Note that we 547 // keep the unacknowledgedStanzas queue, because we want to resend them when we are reconnected. 548 smSessionId = null; 549 } 550 } else { 551 disconnectedButResumeable = false; 552 553 // Drop the stream management state if this is not an instant shutdown. We send 554 // a </stream> close tag and now the stream management state is no longer valid. 555 // This also prevents that we will potentially (re-)send any unavailable presence we 556 // may have send, because it got put into the unacknowledged queue and was not acknowledged before the 557 // connection terminated. 558 dropSmState(); 559 // Note that we deliberately do not reset authenticatedConnectionInitiallyEstablishedTimestamp here, so that the 560 // information is available in the connectionClosedOnError() listeners. 561 } 562 authenticated = false; 563 connected = false; 564 secureSocket = null; 565 reader = null; 566 writer = null; 567 568 initState(); 569 } 570 571 @Override 572 public void sendNonza(Nonza element) throws NotConnectedException, InterruptedException { 573 packetWriter.sendStreamElement(element); 574 } 575 576 @Override 577 protected void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException { 578 packetWriter.sendStreamElement(packet); 579 if (isSmEnabled()) { 580 for (StanzaFilter requestAckPredicate : requestAckPredicates) { 581 if (requestAckPredicate.accept(packet)) { 582 requestSmAcknowledgementInternal(); 583 break; 584 } 585 } 586 } 587 } 588 589 private void connectUsingConfiguration() throws ConnectionException, IOException, InterruptedException { 590 RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result = RemoteXmppTcpConnectionEndpoints.lookup(config); 591 592 List<RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint>> connectionExceptions = new ArrayList<>(); 593 594 SocketFactory socketFactory = config.getSocketFactory(); 595 ProxyInfo proxyInfo = config.getProxyInfo(); 596 int timeout = config.getConnectTimeout(); 597 if (socketFactory == null) { 598 socketFactory = SocketFactory.getDefault(); 599 } 600 for (Rfc6120TcpRemoteConnectionEndpoint endpoint : result.discoveredRemoteConnectionEndpoints) { 601 Iterator<? extends InetAddress> inetAddresses; 602 String host = endpoint.getHost().toString(); 603 UInt16 portUint16 = endpoint.getPort(); 604 int port = portUint16.intValue(); 605 if (proxyInfo == null) { 606 inetAddresses = endpoint.getInetAddresses().iterator(); 607 assert inetAddresses.hasNext(); 608 609 innerloop: while (inetAddresses.hasNext()) { 610 // Create a *new* Socket before every connection attempt, i.e. connect() call, since Sockets are not 611 // re-usable after a failed connection attempt. See also SMACK-724. 612 SmackFuture.SocketFuture socketFuture = new SmackFuture.SocketFuture(socketFactory); 613 614 final InetAddress inetAddress = inetAddresses.next(); 615 final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, port); 616 LOGGER.finer("Trying to establish TCP connection to " + inetSocketAddress); 617 socketFuture.connectAsync(inetSocketAddress, timeout); 618 619 try { 620 socket = socketFuture.getOrThrow(); 621 } catch (IOException e) { 622 RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>( 623 endpoint, inetAddress, e); 624 connectionExceptions.add(rce); 625 if (inetAddresses.hasNext()) { 626 continue innerloop; 627 } else { 628 break innerloop; 629 } 630 } 631 LOGGER.finer("Established TCP connection to " + inetSocketAddress); 632 // We found a host to connect to, return here 633 this.host = host; 634 this.port = portUint16; 635 return; 636 } 637 } else { 638 // TODO: Move this into the inner-loop above. There appears no reason why we should not try a proxy 639 // connection to every inet address of each connection endpoint. 640 socket = socketFactory.createSocket(); 641 StringUtils.requireNotNullNorEmpty(host, "Host of endpoint " + endpoint + " must not be null when using a Proxy"); 642 final String hostAndPort = host + " at port " + port; 643 LOGGER.finer("Trying to establish TCP connection via Proxy to " + hostAndPort); 644 try { 645 proxyInfo.getProxySocketConnection().connect(socket, host, port, timeout); 646 } catch (IOException e) { 647 CloseableUtil.maybeClose(socket, LOGGER); 648 RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>(endpoint, null, e); 649 connectionExceptions.add(rce); 650 continue; 651 } 652 LOGGER.finer("Established TCP connection to " + hostAndPort); 653 // We found a host to connect to, return here 654 this.host = host; 655 this.port = portUint16; 656 return; 657 } 658 } 659 660 // There are no more host addresses to try 661 // throw an exception and report all tried 662 // HostAddresses in the exception 663 throw EndpointConnectionException.from(result.lookupFailures, connectionExceptions); 664 } 665 666 /** 667 * Initializes the connection by creating a stanza reader and writer and opening a 668 * XMPP stream to the server. 669 * 670 * @throws XMPPException if establishing a connection to the server fails. 671 * @throws SmackException if the server fails to respond back or if there is anther error. 672 * @throws IOException if an I/O error occurred. 673 * @throws InterruptedException if the calling thread was interrupted. 674 */ 675 private void initConnection() throws IOException, InterruptedException { 676 compressionHandler = null; 677 678 // Set the reader and writer instance variables 679 initReaderAndWriter(); 680 681 // Start the writer thread. This will open an XMPP stream to the server 682 packetWriter.init(); 683 // Start the reader thread. The startup() method will block until we 684 // get an opening stream packet back from server 685 packetReader.init(); 686 } 687 688 private void initReaderAndWriter() throws IOException { 689 InputStream is = socket.getInputStream(); 690 OutputStream os = socket.getOutputStream(); 691 if (compressionHandler != null) { 692 is = compressionHandler.getInputStream(is); 693 os = compressionHandler.getOutputStream(os); 694 } 695 // OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter 696 writer = new OutputStreamWriter(os, "UTF-8"); 697 reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); 698 699 // If debugging is enabled, we open a window and write out all network traffic. 700 initDebugger(); 701 } 702 703 /** 704 * The server has indicated that TLS negotiation can start. We now need to secure the 705 * existing plain connection and perform a handshake. This method won't return until the 706 * connection has finished the handshake or an error occurred while securing the connection. 707 * @throws IOException if an I/O error occurred. 708 * @throws SecurityNotPossibleException if TLS is not possible. 709 * @throws CertificateException if there is an issue with the certificate. 710 */ 711 @SuppressWarnings("LiteralClassName") 712 private void proceedTLSReceived() throws IOException, SecurityNotPossibleException, CertificateException { 713 SmackTlsContext smackTlsContext = getSmackTlsContext(); 714 715 Socket plain = socket; 716 // Secure the plain connection 717 socket = smackTlsContext.sslContext.getSocketFactory().createSocket(plain, 718 config.getXMPPServiceDomain().toString(), plain.getPort(), true); 719 720 final SSLSocket sslSocket = (SSLSocket) socket; 721 // Immediately set the enabled SSL protocols and ciphers. See SMACK-712 why this is 722 // important (at least on certain platforms) and it seems to be a good idea anyways to 723 // prevent an accidental implicit handshake. 724 TLSUtils.setEnabledProtocolsAndCiphers(sslSocket, config.getEnabledSSLProtocols(), config.getEnabledSSLCiphers()); 725 726 // Initialize the reader and writer with the new secured version 727 initReaderAndWriter(); 728 729 // Proceed to do the handshake 730 sslSocket.startHandshake(); 731 732 if (smackTlsContext.daneVerifier != null) { 733 smackTlsContext.daneVerifier.finish(sslSocket.getSession()); 734 } 735 736 final HostnameVerifier verifier = getConfiguration().getHostnameVerifier(); 737 if (verifier == null) { 738 throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure."); 739 } 740 741 final String verifierHostname; 742 { 743 DnsName xmppServiceDomainDnsName = getConfiguration().getXmppServiceDomainAsDnsNameIfPossible(); 744 // Try to convert the XMPP service domain, which potentially includes Unicode characters, into ASCII 745 // Compatible Encoding (ACE) to match RFC3280 dNSname IA5String constraint. 746 // See also: https://bugzilla.mozilla.org/show_bug.cgi?id=280839#c1 747 if (xmppServiceDomainDnsName != null) { 748 verifierHostname = xmppServiceDomainDnsName.ace; 749 } 750 else { 751 LOGGER.log(Level.WARNING, "XMPP service domain name '" + getXMPPServiceDomain() 752 + "' can not be represented as DNS name. TLS X.509 certificate validiation may fail."); 753 verifierHostname = getXMPPServiceDomain().toString(); 754 } 755 } 756 757 final boolean verificationSuccessful; 758 // Verify the TLS session. 759 verificationSuccessful = verifier.verify(verifierHostname, sslSocket.getSession()); 760 if (!verificationSuccessful) { 761 throw new CertificateException( 762 "Hostname verification of certificate failed. Certificate does not authenticate " 763 + getXMPPServiceDomain()); 764 } 765 766 // Set that TLS was successful 767 secureSocket = sslSocket; 768 } 769 770 /** 771 * Returns the compression handler that can be used for one compression methods offered by the server. 772 * 773 * @return a instance of XMPPInputOutputStream or null if no suitable instance was found 774 * 775 */ 776 private static XMPPInputOutputStream maybeGetCompressionHandler(Compress.Feature compression) { 777 for (XMPPInputOutputStream handler : SmackConfiguration.getCompressionHandlers()) { 778 String method = handler.getCompressionMethod(); 779 if (compression.getMethods().contains(method)) 780 return handler; 781 } 782 return null; 783 } 784 785 @Override 786 public boolean isUsingCompression() { 787 return compressionHandler != null && compressSyncPoint; 788 } 789 790 /** 791 * <p> 792 * Starts using stream compression that will compress network traffic. Traffic can be 793 * reduced up to 90%. Therefore, stream compression is ideal when using a slow speed network 794 * connection. However, the server and the client will need to use more CPU time in order to 795 * un/compress network data so under high load the server performance might be affected. 796 * </p> 797 * <p> 798 * Stream compression has to have been previously offered by the server. Currently only the 799 * zlib method is supported by the client. Stream compression negotiation has to be done 800 * before authentication took place. 801 * </p> 802 * 803 * @throws NotConnectedException if the XMPP connection is not connected. 804 * @throws SmackException if Smack detected an exceptional situation. 805 * @throws InterruptedException if the calling thread was interrupted. 806 * @throws XMPPException if an XMPP protocol error was received. 807 */ 808 private void maybeEnableCompression() throws SmackException, InterruptedException, XMPPException { 809 if (!config.isCompressionEnabled()) { 810 return; 811 } 812 813 Compress.Feature compression = getFeature(Compress.Feature.class); 814 if (compression == null) { 815 // Server does not support compression 816 return; 817 } 818 // If stream compression was offered by the server and we want to use 819 // compression then send compression request to the server 820 if ((compressionHandler = maybeGetCompressionHandler(compression)) != null) { 821 compressSyncPoint = false; 822 sendNonza(new Compress(compressionHandler.getCompressionMethod())); 823 waitForConditionOrThrowConnectionException(() -> compressSyncPoint, "establishing stream compression"); 824 } else { 825 LOGGER.warning("Could not enable compression because no matching handler/method pair was found"); 826 } 827 } 828 829 /** 830 * Establishes a connection to the XMPP server. It basically 831 * creates and maintains a socket connection to the server. 832 * <p> 833 * Listeners will be preserved from a previous connection if the reconnection 834 * occurs after an abrupt termination. 835 * </p> 836 * 837 * @throws XMPPException if an error occurs while trying to establish the connection. 838 * @throws SmackException if Smack detected an exceptional situation. 839 * @throws IOException if an I/O error occurred. 840 * @throws InterruptedException if the calling thread was interrupted. 841 */ 842 @Override 843 protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException { 844 // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if 845 // there is an error establishing the connection 846 connectUsingConfiguration(); 847 848 connected = true; 849 850 // We connected successfully to the servers TCP port 851 initConnection(); 852 853 // TLS handled will be true either if TLS was established, or if it was not mandatory. 854 waitForConditionOrThrowConnectionException(() -> tlsHandled, "establishing TLS"); 855 856 // Wait with SASL auth until the SASL mechanisms have been received 857 waitForConditionOrThrowConnectionException(() -> saslFeatureReceived, "SASL mechanisms stream feature from server"); 858 } 859 860 /** 861 * For unit testing purposes 862 * 863 * @param writer TODO javadoc me please 864 */ 865 protected void setWriter(Writer writer) { 866 this.writer = writer; 867 } 868 869 @Override 870 protected void afterFeaturesReceived() throws NotConnectedException, InterruptedException, SecurityRequiredByServerException { 871 StartTls startTlsFeature = getFeature(StartTls.class); 872 if (startTlsFeature != null) { 873 if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) { 874 SecurityRequiredByServerException smackException = new SecurityRequiredByServerException(); 875 currentSmackException = smackException; 876 notifyWaitingThreads(); 877 throw smackException; 878 } 879 880 if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) { 881 sendNonza(new StartTls()); 882 } else { 883 tlsHandled = true; 884 notifyWaitingThreads(); 885 } 886 } else { 887 tlsHandled = true; 888 notifyWaitingThreads(); 889 } 890 891 if (isSaslAuthenticated()) { 892 // If we have received features after the SASL has been successfully completed, then we 893 // have also *maybe* received, as it is an optional feature, the compression feature 894 // from the server. 895 streamFeaturesAfterAuthenticationReceived = true; 896 notifyWaitingThreads(); 897 } 898 } 899 900 private void resetParser() throws IOException { 901 try { 902 packetReader.parser = SmackXmlParser.newXmlParser(reader); 903 } catch (XmlPullParserException e) { 904 throw new IOException(e); 905 } 906 } 907 908 private void openStreamAndResetParser() throws IOException, NotConnectedException, InterruptedException { 909 sendStreamOpen(); 910 resetParser(); 911 } 912 913 protected class PacketReader { 914 915 private final String threadName = "Smack Reader (" + getConnectionCounter() + ')'; 916 917 XmlPullParser parser; 918 919 private volatile boolean done; 920 921 private boolean running; 922 923 /** 924 * Initializes the reader in order to be used. The reader is initialized during the 925 * first connection and when reconnecting due to an abruptly disconnection. 926 */ 927 void init() { 928 done = false; 929 930 running = true; 931 Async.go(new Runnable() { 932 @Override 933 public void run() { 934 LOGGER.finer(threadName + " start"); 935 try { 936 parsePackets(); 937 } finally { 938 LOGGER.finer(threadName + " exit"); 939 running = false; 940 notifyWaitingThreads(); 941 } 942 } 943 }, threadName); 944 } 945 946 /** 947 * Shuts the stanza reader down. This method simply sets the 'done' flag to true. 948 */ 949 void shutdown() { 950 done = true; 951 } 952 953 /** 954 * Parse top-level packets in order to process them further. 955 */ 956 private void parsePackets() { 957 try { 958 openStreamAndResetParser(); 959 XmlPullParser.Event eventType = parser.getEventType(); 960 while (!done) { 961 switch (eventType) { 962 case START_ELEMENT: 963 final String name = parser.getName(); 964 switch (name) { 965 case Message.ELEMENT: 966 case IQ.IQ_ELEMENT: 967 case Presence.ELEMENT: 968 try { 969 parseAndProcessStanza(parser); 970 } finally { 971 clientHandledStanzasCount = SMUtils.incrementHeight(clientHandledStanzasCount); 972 } 973 break; 974 case "stream": 975 onStreamOpen(parser); 976 break; 977 case "error": 978 StreamError streamError = PacketParserUtils.parseStreamError(parser); 979 // Stream errors are non recoverable, throw this exceptions. Also note that this will set 980 // this exception as current connection exceptions and notify any waiting threads. 981 throw new StreamErrorException(streamError); 982 case "features": 983 parseFeaturesAndNotify(parser); 984 break; 985 case "proceed": 986 // Secure the connection by negotiating TLS 987 proceedTLSReceived(); 988 // Send a new opening stream to the server 989 openStreamAndResetParser(); 990 break; 991 case "failure": 992 String namespace = parser.getNamespace(null); 993 switch (namespace) { 994 case "urn:ietf:params:xml:ns:xmpp-tls": 995 // TLS negotiation has failed. The server will close the connection 996 // TODO Parse failure stanza 997 throw new SmackException.SmackMessageException("TLS negotiation has failed"); 998 case "http://jabber.org/protocol/compress": 999 // Stream compression has been denied. This is a recoverable 1000 // situation. It is still possible to authenticate and 1001 // use the connection but using an uncompressed connection 1002 // TODO Parse failure stanza 1003 currentSmackException = new SmackException.SmackMessageException("Could not establish compression"); 1004 notifyWaitingThreads(); 1005 break; 1006 default: 1007 parseAndProcessNonza(parser); 1008 } 1009 break; 1010 case Compressed.ELEMENT: 1011 // Server confirmed that it's possible to use stream compression. Start 1012 // stream compression 1013 // Initialize the reader and writer with the new compressed version 1014 initReaderAndWriter(); 1015 // Send a new opening stream to the server 1016 openStreamAndResetParser(); 1017 // Notify that compression is being used 1018 compressSyncPoint = true; 1019 notifyWaitingThreads(); 1020 break; 1021 case Enabled.ELEMENT: 1022 Enabled enabled = ParseStreamManagement.enabled(parser); 1023 if (enabled.isResumeSet()) { 1024 smSessionId = enabled.getId(); 1025 if (StringUtils.isNullOrEmpty(smSessionId)) { 1026 SmackException xmppException = new SmackException.SmackMessageException("Stream Management 'enabled' element with resume attribute but without session id received"); 1027 setCurrentConnectionExceptionAndNotify(xmppException); 1028 throw xmppException; 1029 } 1030 smServerMaxResumptionTime = enabled.getMaxResumptionTime(); 1031 } else { 1032 // Mark this a non-resumable stream by setting smSessionId to null 1033 smSessionId = null; 1034 } 1035 clientHandledStanzasCount = 0; 1036 smWasEnabledAtLeastOnce = true; 1037 smEnabledSyncPoint = true; 1038 notifyWaitingThreads(); 1039 break; 1040 case Failed.ELEMENT: 1041 Failed failed = ParseStreamManagement.failed(parser); 1042 if (smResumedSyncPoint == SyncPointState.request_sent) { 1043 // This is a <failed/> nonza in a response to resuming a previous stream, failure to do 1044 // so is non-fatal as we can simply continue with resource binding in this case. 1045 smResumptionFailed = failed; 1046 notifyWaitingThreads(); 1047 } else { 1048 FailedNonzaException xmppException = new FailedNonzaException(failed, failed.getStanzaErrorCondition()); 1049 setCurrentConnectionExceptionAndNotify(xmppException); 1050 } 1051 break; 1052 case Resumed.ELEMENT: 1053 Resumed resumed = ParseStreamManagement.resumed(parser); 1054 if (!smSessionId.equals(resumed.getPrevId())) { 1055 throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId()); 1056 } 1057 // Mark SM as enabled 1058 smEnabledSyncPoint = true; 1059 // First, drop the stanzas already handled by the server 1060 processHandledCount(resumed.getHandledCount()); 1061 // Then re-send what is left in the unacknowledged queue 1062 List<Stanza> stanzasToResend = new ArrayList<>(unacknowledgedStanzas.size()); 1063 unacknowledgedStanzas.drainTo(stanzasToResend); 1064 for (Stanza stanza : stanzasToResend) { 1065 sendStanzaInternal(stanza); 1066 } 1067 // If there where stanzas resent, then request a SM ack for them. 1068 // Writer's sendStreamElement() won't do it automatically based on 1069 // predicates. 1070 if (!stanzasToResend.isEmpty()) { 1071 requestSmAcknowledgementInternal(); 1072 } 1073 // Mark SM resumption as successful 1074 smResumedSyncPoint = SyncPointState.successful; 1075 notifyWaitingThreads(); 1076 break; 1077 case AckAnswer.ELEMENT: 1078 AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser); 1079 processHandledCount(ackAnswer.getHandledCount()); 1080 break; 1081 case AckRequest.ELEMENT: 1082 ParseStreamManagement.ackRequest(parser); 1083 if (smEnabledSyncPoint) { 1084 sendSmAcknowledgementInternal(); 1085 } else { 1086 LOGGER.warning("SM Ack Request received while SM is not enabled"); 1087 } 1088 break; 1089 default: 1090 parseAndProcessNonza(parser); 1091 break; 1092 } 1093 break; 1094 case END_ELEMENT: 1095 final String endTagName = parser.getName(); 1096 if ("stream".equals(endTagName)) { 1097 if (!parser.getNamespace().equals("http://etherx.jabber.org/streams")) { 1098 LOGGER.warning(XMPPTCPConnection.this + " </stream> but different namespace " + parser.getNamespace()); 1099 break; 1100 } 1101 1102 // Check if the queue was already shut down before reporting success on closing stream tag 1103 // received. This avoids a race if there is a disconnect(), followed by a connect(), which 1104 // did re-start the queue again, causing this writer to assume that the queue is not 1105 // shutdown, which results in a call to disconnect(). 1106 final boolean queueWasShutdown = packetWriter.queue.isShutdown(); 1107 closingStreamReceived = true; 1108 notifyWaitingThreads(); 1109 1110 if (queueWasShutdown) { 1111 // We received a closing stream element *after* we initiated the 1112 // termination of the session by sending a closing stream element to 1113 // the server first 1114 return; 1115 } else { 1116 // We received a closing stream element from the server without us 1117 // sending a closing stream element first. This means that the 1118 // server wants to terminate the session, therefore disconnect 1119 // the connection 1120 LOGGER.info(XMPPTCPConnection.this 1121 + " received closing </stream> element." 1122 + " Server wants to terminate the connection, calling disconnect()"); 1123 ASYNC_BUT_ORDERED.performAsyncButOrdered(XMPPTCPConnection.this, new Runnable() { 1124 @Override 1125 public void run() { 1126 disconnect(); 1127 }}); 1128 } 1129 } 1130 break; 1131 case END_DOCUMENT: 1132 // END_DOCUMENT only happens in an error case, as otherwise we would see a 1133 // closing stream element before. 1134 throw new SmackException.SmackMessageException( 1135 "Parser got END_DOCUMENT event. This could happen e.g. if the server closed the connection without sending a closing stream element"); 1136 default: 1137 // Catch all for incomplete switch (MissingCasesInEnumSwitch) statement. 1138 break; 1139 } 1140 eventType = parser.next(); 1141 } 1142 } 1143 catch (Exception e) { 1144 // Set running to false since this thread will exit here and notifyConnectionError() will wait until 1145 // the reader and writer thread's 'running' value is false. Hence we need to set it to false before calling 1146 // notifyConnetctionError() below, even though run() also sets it to false. Therefore, do not remove this. 1147 running = false; 1148 1149 String ignoreReasonThread = null; 1150 1151 boolean writerThreadWasShutDown = packetWriter.queue.isShutdown(); 1152 if (writerThreadWasShutDown) { 1153 ignoreReasonThread = "writer"; 1154 } else if (done) { 1155 ignoreReasonThread = "reader"; 1156 } 1157 1158 if (ignoreReasonThread != null) { 1159 LOGGER.log(Level.FINER, "Ignoring " + e + " as " + ignoreReasonThread + " was already shut down"); 1160 return; 1161 } 1162 1163 // Close the connection and notify connection listeners of the error. 1164 notifyConnectionError(e); 1165 } 1166 } 1167 } 1168 1169 protected class PacketWriter { 1170 public static final int QUEUE_SIZE = XMPPTCPConnection.QUEUE_SIZE; 1171 public static final int UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE = 1024; 1172 public static final int UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE_HIGH_WATER_MARK = (int) (0.3 * UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE); 1173 1174 private final String threadName = "Smack Writer (" + getConnectionCounter() + ')'; 1175 1176 private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<>( 1177 QUEUE_SIZE, true); 1178 1179 /** 1180 * If set, the stanza writer is shut down 1181 */ 1182 protected volatile Long shutdownTimestamp = null; 1183 1184 private volatile boolean instantShutdown; 1185 1186 /** 1187 * True if some preconditions are given to start the bundle and defer mechanism. 1188 * <p> 1189 * This will likely get set to true right after the start of the writer thread, because 1190 * {@link #nextStreamElement()} will check if {@link queue} is empty, which is probably the case, and then set 1191 * this field to true. 1192 * </p> 1193 */ 1194 private boolean shouldBundleAndDefer; 1195 1196 private boolean running; 1197 1198 /** 1199 * Initializes the writer in order to be used. It is called at the first connection and also 1200 * is invoked if the connection is disconnected by an error. 1201 */ 1202 void init() { 1203 shutdownTimestamp = null; 1204 1205 if (unacknowledgedStanzas != null) { 1206 // It's possible that there are new stanzas in the writer queue that 1207 // came in while we were disconnected but resumable, drain those into 1208 // the unacknowledged queue so that they get resent now 1209 drainWriterQueueToUnacknowledgedStanzas(); 1210 } 1211 1212 queue.start(); 1213 running = true; 1214 Async.go(new Runnable() { 1215 @Override 1216 public void run() { 1217 LOGGER.finer(threadName + " start"); 1218 try { 1219 writePackets(); 1220 } finally { 1221 LOGGER.finer(threadName + " exit"); 1222 running = false; 1223 notifyWaitingThreads(); 1224 } 1225 } 1226 }, threadName); 1227 } 1228 1229 private boolean done() { 1230 return shutdownTimestamp != null; 1231 } 1232 1233 protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException { 1234 final boolean done = done(); 1235 if (done) { 1236 final boolean smResumptionPossible = isSmResumptionPossible(); 1237 // Don't throw a NotConnectedException is there is an resumable stream available 1238 if (!smResumptionPossible) { 1239 throw new NotConnectedException(XMPPTCPConnection.this, "done=" + done 1240 + " smResumptionPossible=" + smResumptionPossible); 1241 } 1242 } 1243 } 1244 1245 /** 1246 * Sends the specified element to the server. 1247 * 1248 * @param element the element to send. 1249 * @throws NotConnectedException if the XMPP connection is not connected. 1250 * @throws InterruptedException if the calling thread was interrupted. 1251 */ 1252 protected void sendStreamElement(Element element) throws NotConnectedException, InterruptedException { 1253 throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 1254 try { 1255 queue.put(element); 1256 } 1257 catch (InterruptedException e) { 1258 // put() may throw an InterruptedException for two reasons: 1259 // 1. If the queue was shut down 1260 // 2. If the thread was interrupted 1261 // so we have to check which is the case 1262 throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 1263 // If the method above did not throw, then the sending thread was interrupted 1264 throw e; 1265 } 1266 } 1267 1268 /** 1269 * Shuts down the stanza writer. Once this method has been called, no further 1270 * packets will be written to the server. 1271 */ 1272 void shutdown(boolean instant) { 1273 instantShutdown = instant; 1274 queue.shutdown(); 1275 shutdownTimestamp = System.currentTimeMillis(); 1276 } 1277 1278 /** 1279 * Maybe return the next available element from the queue for writing. If the queue is shut down <b>or</b> a 1280 * spurious interrupt occurs, <code>null</code> is returned. So it is important to check the 'done' condition in 1281 * that case. 1282 * 1283 * @return the next element for writing or null. 1284 */ 1285 private Element nextStreamElement() { 1286 // It is important the we check if the queue is empty before removing an element from it 1287 if (queue.isEmpty()) { 1288 shouldBundleAndDefer = true; 1289 } 1290 Element packet = null; 1291 try { 1292 packet = queue.take(); 1293 } 1294 catch (InterruptedException e) { 1295 if (!queue.isShutdown()) { 1296 // Users shouldn't try to interrupt the packet writer thread 1297 LOGGER.log(Level.WARNING, "Writer thread was interrupted. Don't do that. Use disconnect() instead.", e); 1298 } 1299 } 1300 return packet; 1301 } 1302 1303 private void writePackets() { 1304 try { 1305 // Write out packets from the queue. 1306 while (!done()) { 1307 Element element = nextStreamElement(); 1308 if (element == null) { 1309 continue; 1310 } 1311 1312 // Get a local version of the bundle and defer callback, in case it's unset 1313 // between the null check and the method invocation 1314 final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback; 1315 // If the preconditions are given (e.g. bundleAndDefer callback is set, queue is 1316 // empty), then we could wait a bit for further stanzas attempting to decrease 1317 // our energy consumption 1318 if (localBundleAndDeferCallback != null && isAuthenticated() && shouldBundleAndDefer) { 1319 // Reset shouldBundleAndDefer to false, nextStreamElement() will set it to true once the 1320 // queue is empty again. 1321 shouldBundleAndDefer = false; 1322 final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean(); 1323 final int bundleAndDeferMillis = localBundleAndDeferCallback.getBundleAndDeferMillis(new BundleAndDefer( 1324 bundlingAndDeferringStopped)); 1325 if (bundleAndDeferMillis > 0) { 1326 long remainingWait = bundleAndDeferMillis; 1327 final long waitStart = System.currentTimeMillis(); 1328 synchronized (bundlingAndDeferringStopped) { 1329 while (!bundlingAndDeferringStopped.get() && remainingWait > 0) { 1330 bundlingAndDeferringStopped.wait(remainingWait); 1331 remainingWait = bundleAndDeferMillis 1332 - (System.currentTimeMillis() - waitStart); 1333 } 1334 } 1335 } 1336 } 1337 1338 Stanza packet = null; 1339 if (element instanceof Stanza) { 1340 packet = (Stanza) element; 1341 } 1342 else if (element instanceof Enable) { 1343 // The client needs to add messages to the unacknowledged stanzas queue 1344 // right after it sent 'enabled'. Stanza will be added once 1345 // unacknowledgedStanzas is not null. 1346 unacknowledgedStanzas = new ArrayBlockingQueue<>(UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE); 1347 } 1348 maybeAddToUnacknowledgedStanzas(packet); 1349 1350 CharSequence elementXml = element.toXML(outgoingStreamXmlEnvironment); 1351 if (elementXml instanceof XmlStringBuilder) { 1352 try { 1353 ((XmlStringBuilder) elementXml).write(writer, outgoingStreamXmlEnvironment); 1354 } catch (NullPointerException npe) { 1355 LOGGER.log(Level.FINE, "NPE in XmlStringBuilder of " + element.getClass() + ": " + element, npe); 1356 throw npe; 1357 } 1358 } 1359 else { 1360 writer.write(elementXml.toString()); 1361 } 1362 1363 if (queue.isEmpty()) { 1364 writer.flush(); 1365 } 1366 if (packet != null) { 1367 firePacketSendingListeners(packet); 1368 } 1369 } 1370 if (!instantShutdown) { 1371 // Flush out the rest of the queue. 1372 try { 1373 while (!queue.isEmpty()) { 1374 Element packet = queue.remove(); 1375 if (packet instanceof Stanza) { 1376 Stanza stanza = (Stanza) packet; 1377 maybeAddToUnacknowledgedStanzas(stanza); 1378 } 1379 writer.write(packet.toXML().toString()); 1380 } 1381 } 1382 catch (Exception e) { 1383 LOGGER.log(Level.WARNING, 1384 "Exception flushing queue during shutdown, ignore and continue", 1385 e); 1386 } 1387 1388 // Close the stream. 1389 try { 1390 writer.write("</stream:stream>"); 1391 writer.flush(); 1392 } 1393 catch (Exception e) { 1394 LOGGER.log(Level.WARNING, "Exception writing closing stream element", e); 1395 } 1396 1397 // Delete the queue contents (hopefully nothing is left). 1398 queue.clear(); 1399 } else if (instantShutdown && isSmEnabled()) { 1400 // This was an instantShutdown and SM is enabled, drain all remaining stanzas 1401 // into the unacknowledgedStanzas queue 1402 drainWriterQueueToUnacknowledgedStanzas(); 1403 } 1404 // Do *not* close the writer here, as it will cause the socket 1405 // to get closed. But we may want to receive further stanzas 1406 // until the closing stream tag is received. The socket will be 1407 // closed in shutdown(). 1408 } 1409 catch (Exception e) { 1410 // The exception can be ignored if the the connection is 'done' 1411 // or if the it was caused because the socket got closed 1412 if (!(done() || queue.isShutdown())) { 1413 // Set running to false since this thread will exit here and notifyConnectionError() will wait until 1414 // the reader and writer thread's 'running' value is false. 1415 running = false; 1416 notifyConnectionError(e); 1417 } else { 1418 LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e); 1419 } 1420 } 1421 } 1422 1423 private void drainWriterQueueToUnacknowledgedStanzas() { 1424 List<Element> elements = new ArrayList<>(queue.size()); 1425 queue.drainTo(elements); 1426 for (int i = 0; i < elements.size(); i++) { 1427 Element element = elements.get(i); 1428 // If the unacknowledgedStanza queue is full, then bail out with a warning message. See SMACK-844. 1429 if (unacknowledgedStanzas.remainingCapacity() == 0) { 1430 StreamManagementException.UnacknowledgedQueueFullException exception = StreamManagementException.UnacknowledgedQueueFullException 1431 .newWith(i, elements, unacknowledgedStanzas); 1432 LOGGER.log(Level.WARNING, 1433 "Some stanzas may be lost as not all could be drained to the unacknowledged stanzas queue", exception); 1434 return; 1435 } 1436 if (element instanceof Stanza) { 1437 unacknowledgedStanzas.add((Stanza) element); 1438 } 1439 } 1440 } 1441 1442 private void maybeAddToUnacknowledgedStanzas(Stanza stanza) throws IOException { 1443 // Check if the stream element should be put to the unacknowledgedStanza 1444 // queue. Note that we can not do the put() in sendStanzaInternal() and the 1445 // packet order is not stable at this point (sendStanzaInternal() can be 1446 // called concurrently). 1447 if (unacknowledgedStanzas != null && stanza != null) { 1448 // If the unacknowledgedStanza queue reaching its high water mark, request an new ack 1449 // from the server in order to drain it 1450 if (unacknowledgedStanzas.size() == UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE_HIGH_WATER_MARK) { 1451 writer.write(AckRequest.INSTANCE.toXML().toString()); 1452 } 1453 1454 try { 1455 // It is important the we put the stanza in the unacknowledged stanza 1456 // queue before we put it on the wire 1457 unacknowledgedStanzas.put(stanza); 1458 } 1459 catch (InterruptedException e) { 1460 throw new IllegalStateException(e); 1461 } 1462 } 1463 } 1464 } 1465 1466 /** 1467 * Set if Stream Management should be used by default for new connections. 1468 * 1469 * @param useSmDefault true to use Stream Management for new connections. 1470 */ 1471 public static void setUseStreamManagementDefault(boolean useSmDefault) { 1472 XMPPTCPConnection.useSmDefault = useSmDefault; 1473 } 1474 1475 /** 1476 * Set if Stream Management resumption should be used by default for new connections. 1477 * 1478 * @param useSmResumptionDefault true to use Stream Management resumption for new connections. 1479 * @deprecated use {@link #setUseStreamManagementResumptionDefault(boolean)} instead. 1480 */ 1481 @Deprecated 1482 public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) { 1483 setUseStreamManagementResumptionDefault(useSmResumptionDefault); 1484 } 1485 1486 /** 1487 * Set if Stream Management resumption should be used by default for new connections. 1488 * 1489 * @param useSmResumptionDefault true to use Stream Management resumption for new connections. 1490 */ 1491 public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) { 1492 if (useSmResumptionDefault) { 1493 // Also enable SM is resumption is enabled 1494 setUseStreamManagementDefault(useSmResumptionDefault); 1495 } 1496 XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault; 1497 } 1498 1499 /** 1500 * Set if Stream Management should be used if supported by the server. 1501 * 1502 * @param useSm true to use Stream Management. 1503 */ 1504 public void setUseStreamManagement(boolean useSm) { 1505 this.useSm = useSm; 1506 } 1507 1508 /** 1509 * Set if Stream Management resumption should be used if supported by the server. 1510 * 1511 * @param useSmResumption true to use Stream Management resumption. 1512 */ 1513 public void setUseStreamManagementResumption(boolean useSmResumption) { 1514 if (useSmResumption) { 1515 // Also enable SM is resumption is enabled 1516 setUseStreamManagement(useSmResumption); 1517 } 1518 this.useSmResumption = useSmResumption; 1519 } 1520 1521 /** 1522 * Set the preferred resumption time in seconds. 1523 * @param resumptionTime the preferred resumption time in seconds 1524 */ 1525 public void setPreferredResumptionTime(int resumptionTime) { 1526 smClientMaxResumptionTime = resumptionTime; 1527 } 1528 1529 /** 1530 * Add a predicate for Stream Management acknowledgment requests. 1531 * <p> 1532 * Those predicates are used to determine when a Stream Management acknowledgement request is send to the server. 1533 * Some pre-defined predicates are found in the <code>org.jivesoftware.smack.sm.predicates</code> package. 1534 * </p> 1535 * <p> 1536 * If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used. 1537 * </p> 1538 * 1539 * @param predicate the predicate to add. 1540 * @return if the predicate was not already active. 1541 */ 1542 public boolean addRequestAckPredicate(StanzaFilter predicate) { 1543 synchronized (requestAckPredicates) { 1544 return requestAckPredicates.add(predicate); 1545 } 1546 } 1547 1548 /** 1549 * Remove the given predicate for Stream Management acknowledgment request. 1550 * @param predicate the predicate to remove. 1551 * @return true if the predicate was removed. 1552 */ 1553 public boolean removeRequestAckPredicate(StanzaFilter predicate) { 1554 synchronized (requestAckPredicates) { 1555 return requestAckPredicates.remove(predicate); 1556 } 1557 } 1558 1559 /** 1560 * Remove all predicates for Stream Management acknowledgment requests. 1561 */ 1562 public void removeAllRequestAckPredicates() { 1563 synchronized (requestAckPredicates) { 1564 requestAckPredicates.clear(); 1565 } 1566 } 1567 1568 /** 1569 * Send an unconditional Stream Management acknowledgement request to the server. 1570 * 1571 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1572 * @throws NotConnectedException if the connection is not connected. 1573 * @throws InterruptedException if the calling thread was interrupted. 1574 */ 1575 public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException { 1576 if (!isSmEnabled()) { 1577 throw new StreamManagementException.StreamManagementNotEnabledException(); 1578 } 1579 requestSmAcknowledgementInternal(); 1580 } 1581 1582 private void requestSmAcknowledgementInternal() throws NotConnectedException, InterruptedException { 1583 packetWriter.sendStreamElement(AckRequest.INSTANCE); 1584 } 1585 1586 /** 1587 * Send a unconditional Stream Management acknowledgment to the server. 1588 * <p> 1589 * See <a href="http://xmpp.org/extensions/xep-0198.html#acking">XEP-198: Stream Management ยง 4. Acks</a>: 1590 * "Either party MAY send an <a/> element at any time (e.g., after it has received a certain number of stanzas, 1591 * or after a certain period of time), even if it has not received an <r/> element from the other party." 1592 * </p> 1593 * 1594 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1595 * @throws NotConnectedException if the connection is not connected. 1596 * @throws InterruptedException if the calling thread was interrupted. 1597 */ 1598 public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException { 1599 if (!isSmEnabled()) { 1600 throw new StreamManagementException.StreamManagementNotEnabledException(); 1601 } 1602 sendSmAcknowledgementInternal(); 1603 } 1604 1605 private void sendSmAcknowledgementInternal() throws NotConnectedException, InterruptedException { 1606 AckAnswer ackAnswer = new AckAnswer(clientHandledStanzasCount); 1607 // Do net put an ack to the queue if it has already been shutdown. Some servers, like ejabberd, like to request 1608 // an ack even after we have send a stream close (and hance the queue was shutdown). If we would not check here, 1609 // then the ack would dangle around in the queue, and be send on the next re-connection attempt even before the 1610 // stream open. 1611 packetWriter.queue.putIfNotShutdown(ackAnswer); 1612 } 1613 1614 /** 1615 * Add a Stanza acknowledged listener. 1616 * <p> 1617 * Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get 1618 * automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, StanzaListener)} when 1619 * possible. 1620 * </p> 1621 * 1622 * @param listener the listener to add. 1623 */ 1624 public void addStanzaAcknowledgedListener(StanzaListener listener) { 1625 stanzaAcknowledgedListeners.add(listener); 1626 } 1627 1628 /** 1629 * Remove the given Stanza acknowledged listener. 1630 * 1631 * @param listener the listener. 1632 * @return true if the listener was removed. 1633 */ 1634 public boolean removeStanzaAcknowledgedListener(StanzaListener listener) { 1635 return stanzaAcknowledgedListeners.remove(listener); 1636 } 1637 1638 /** 1639 * Remove all stanza acknowledged listeners. 1640 */ 1641 public void removeAllStanzaAcknowledgedListeners() { 1642 stanzaAcknowledgedListeners.clear(); 1643 } 1644 1645 /** 1646 * Add a Stanza dropped listener. 1647 * <p> 1648 * Those listeners will be invoked every time a Stanza has been dropped due to a failed SM resume. They will not get 1649 * automatically removed. If at least one StanzaDroppedListener is configured, no attempt will be made to retransmit 1650 * the Stanzas. 1651 * </p> 1652 * 1653 * @param listener the listener to add. 1654 * @since 4.3.3 1655 */ 1656 public void addStanzaDroppedListener(StanzaListener listener) { 1657 stanzaDroppedListeners.add(listener); 1658 } 1659 1660 /** 1661 * Remove the given Stanza dropped listener. 1662 * 1663 * @param listener the listener. 1664 * @return true if the listener was removed. 1665 * @since 4.3.3 1666 */ 1667 public boolean removeStanzaDroppedListener(StanzaListener listener) { 1668 return stanzaDroppedListeners.remove(listener); 1669 } 1670 1671 /** 1672 * Add a new Stanza ID acknowledged listener for the given ID. 1673 * <p> 1674 * The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will 1675 * automatically be removed after the listener was run. 1676 * </p> 1677 * 1678 * @param id the stanza ID. 1679 * @param listener the listener to invoke. 1680 * @return the previous listener for this stanza ID or null. 1681 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1682 */ 1683 @SuppressWarnings("FutureReturnValueIgnored") 1684 public StanzaListener addStanzaIdAcknowledgedListener(final String id, StanzaListener listener) throws StreamManagementNotEnabledException { 1685 // Prevent users from adding callbacks that will never get removed 1686 if (!smWasEnabledAtLeastOnce) { 1687 throw new StreamManagementException.StreamManagementNotEnabledException(); 1688 } 1689 // Remove the listener after max. 3 hours 1690 final int removeAfterSeconds = Math.min(getMaxSmResumptionTime(), 3 * 60 * 60); 1691 schedule(new Runnable() { 1692 @Override 1693 public void run() { 1694 stanzaIdAcknowledgedListeners.remove(id); 1695 } 1696 }, removeAfterSeconds, TimeUnit.SECONDS); 1697 return stanzaIdAcknowledgedListeners.put(id, listener); 1698 } 1699 1700 /** 1701 * Remove the Stanza ID acknowledged listener for the given ID. 1702 * 1703 * @param id the stanza ID. 1704 * @return true if the listener was found and removed, false otherwise. 1705 */ 1706 public StanzaListener removeStanzaIdAcknowledgedListener(String id) { 1707 return stanzaIdAcknowledgedListeners.remove(id); 1708 } 1709 1710 /** 1711 * Removes all Stanza ID acknowledged listeners. 1712 */ 1713 public void removeAllStanzaIdAcknowledgedListeners() { 1714 stanzaIdAcknowledgedListeners.clear(); 1715 } 1716 1717 /** 1718 * Returns true if Stream Management is supported by the server. 1719 * 1720 * @return true if Stream Management is supported by the server. 1721 */ 1722 public boolean isSmAvailable() { 1723 return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE); 1724 } 1725 1726 /** 1727 * Returns true if Stream Management was successfully negotiated with the server. 1728 * 1729 * @return true if Stream Management was negotiated. 1730 */ 1731 public boolean isSmEnabled() { 1732 return smEnabledSyncPoint; 1733 } 1734 1735 /** 1736 * Returns true if the stream was successfully resumed with help of Stream Management. 1737 * 1738 * @return true if the stream was resumed. 1739 */ 1740 public boolean streamWasResumed() { 1741 return smResumedSyncPoint == SyncPointState.successful; 1742 } 1743 1744 /** 1745 * Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible. 1746 * 1747 * @return true if disconnected but resumption possible. 1748 */ 1749 public boolean isDisconnectedButSmResumptionPossible() { 1750 return disconnectedButResumeable && isSmResumptionPossible(); 1751 } 1752 1753 /** 1754 * Returns true if the stream is resumable. 1755 * 1756 * @return true if the stream is resumable. 1757 */ 1758 public boolean isSmResumptionPossible() { 1759 // There is no resumable stream available 1760 if (smSessionId == null) 1761 return false; 1762 1763 final Long shutdownTimestamp = packetWriter.shutdownTimestamp; 1764 // Seems like we are already reconnected, report true 1765 if (shutdownTimestamp == null) { 1766 return true; 1767 } 1768 1769 // See if resumption time is over 1770 long current = System.currentTimeMillis(); 1771 long maxResumptionMillies = ((long) getMaxSmResumptionTime()) * 1000; 1772 if (current > shutdownTimestamp + maxResumptionMillies) { 1773 // Stream resumption is *not* possible if the current timestamp is greater then the greatest timestamp where 1774 // resumption is possible 1775 return false; 1776 } else { 1777 return true; 1778 } 1779 } 1780 1781 /** 1782 * Drop the stream management state. Sets {@link #smSessionId} and 1783 * {@link #unacknowledgedStanzas} to <code>null</code>. 1784 */ 1785 private void dropSmState() { 1786 // clientHandledCount and serverHandledCount will be reset on <enable/> and <enabled/> 1787 // respective. No need to reset them here. 1788 smSessionId = null; 1789 unacknowledgedStanzas = null; 1790 } 1791 1792 /** 1793 * Get the maximum resumption time in seconds after which a managed stream can be resumed. 1794 * <p> 1795 * This method will return {@link Integer#MAX_VALUE} if neither the client nor the server specify a maximum 1796 * resumption time. Be aware of integer overflows when using this value, e.g. do not add arbitrary values to it 1797 * without checking for overflows before. 1798 * </p> 1799 * 1800 * @return the maximum resumption time in seconds or {@link Integer#MAX_VALUE} if none set. 1801 */ 1802 public int getMaxSmResumptionTime() { 1803 int clientResumptionTime = smClientMaxResumptionTime > 0 ? smClientMaxResumptionTime : Integer.MAX_VALUE; 1804 int serverResumptionTime = smServerMaxResumptionTime > 0 ? smServerMaxResumptionTime : Integer.MAX_VALUE; 1805 return Math.min(clientResumptionTime, serverResumptionTime); 1806 } 1807 1808 private void processHandledCount(long handledCount) throws StreamManagementCounterError { 1809 long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount); 1810 final List<Stanza> ackedStanzas = new ArrayList<>( 1811 ackedStanzasCount <= Integer.MAX_VALUE ? (int) ackedStanzasCount 1812 : Integer.MAX_VALUE); 1813 for (long i = 0; i < ackedStanzasCount; i++) { 1814 Stanza ackedStanza = unacknowledgedStanzas.poll(); 1815 // If the server ack'ed a stanza, then it must be in the 1816 // unacknowledged stanza queue. There can be no exception. 1817 if (ackedStanza == null) { 1818 throw new StreamManagementCounterError(handledCount, serverHandledStanzasCount, 1819 ackedStanzasCount, ackedStanzas); 1820 } 1821 ackedStanzas.add(ackedStanza); 1822 } 1823 1824 boolean atLeastOneStanzaAcknowledgedListener = false; 1825 if (!stanzaAcknowledgedListeners.isEmpty()) { 1826 // If stanzaAcknowledgedListeners is not empty, the we have at least one 1827 atLeastOneStanzaAcknowledgedListener = true; 1828 } 1829 else { 1830 // Otherwise we look for a matching id in the stanza *id* acknowledged listeners 1831 for (Stanza ackedStanza : ackedStanzas) { 1832 String id = ackedStanza.getStanzaId(); 1833 if (id != null && stanzaIdAcknowledgedListeners.containsKey(id)) { 1834 atLeastOneStanzaAcknowledgedListener = true; 1835 break; 1836 } 1837 } 1838 } 1839 1840 // Only spawn a new thread if there is a chance that some listener is invoked 1841 if (atLeastOneStanzaAcknowledgedListener) { 1842 asyncGo(new Runnable() { 1843 @Override 1844 public void run() { 1845 for (Stanza ackedStanza : ackedStanzas) { 1846 for (StanzaListener listener : stanzaAcknowledgedListeners) { 1847 try { 1848 listener.processStanza(ackedStanza); 1849 } 1850 catch (InterruptedException | NotConnectedException | NotLoggedInException e) { 1851 LOGGER.log(Level.FINER, "Received exception", e); 1852 } 1853 } 1854 String id = ackedStanza.getStanzaId(); 1855 if (StringUtils.isNullOrEmpty(id)) { 1856 continue; 1857 } 1858 StanzaListener listener = stanzaIdAcknowledgedListeners.remove(id); 1859 if (listener != null) { 1860 try { 1861 listener.processStanza(ackedStanza); 1862 } 1863 catch (InterruptedException | NotConnectedException | NotLoggedInException e) { 1864 LOGGER.log(Level.FINER, "Received exception", e); 1865 } 1866 } 1867 } 1868 } 1869 }); 1870 } 1871 1872 serverHandledStanzasCount = handledCount; 1873 } 1874 1875 /** 1876 * Set the default bundle and defer callback used for new connections. 1877 * 1878 * @param defaultBundleAndDeferCallback TODO javadoc me please 1879 * @see BundleAndDeferCallback 1880 * @since 4.1 1881 */ 1882 public static void setDefaultBundleAndDeferCallback(BundleAndDeferCallback defaultBundleAndDeferCallback) { 1883 XMPPTCPConnection.defaultBundleAndDeferCallback = defaultBundleAndDeferCallback; 1884 } 1885 1886 /** 1887 * Set the bundle and defer callback used for this connection. 1888 * <p> 1889 * You can use <code>null</code> as argument to reset the callback. Outgoing stanzas will then 1890 * no longer get deferred. 1891 * </p> 1892 * 1893 * @param bundleAndDeferCallback the callback or <code>null</code>. 1894 * @see BundleAndDeferCallback 1895 * @since 4.1 1896 */ 1897 public void setBundleandDeferCallback(BundleAndDeferCallback bundleAndDeferCallback) { 1898 this.bundleAndDeferCallback = bundleAndDeferCallback; 1899 } 1900 1901}