001/** 002 * 003 * Copyright 2003-2007 Jive Software. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack.tcp; 018 019import java.io.BufferedReader; 020import java.io.IOException; 021import java.io.InputStream; 022import java.io.InputStreamReader; 023import java.io.OutputStream; 024import java.io.OutputStreamWriter; 025import java.io.Writer; 026import java.net.InetAddress; 027import java.net.InetSocketAddress; 028import java.net.Socket; 029import java.security.cert.CertificateException; 030import java.util.ArrayList; 031import java.util.Collection; 032import java.util.Iterator; 033import java.util.LinkedHashSet; 034import java.util.LinkedList; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038import java.util.concurrent.ArrayBlockingQueue; 039import java.util.concurrent.BlockingQueue; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.ConcurrentLinkedQueue; 042import java.util.concurrent.TimeUnit; 043import java.util.concurrent.atomic.AtomicBoolean; 044import java.util.logging.Level; 045import java.util.logging.Logger; 046 047import javax.net.SocketFactory; 048import javax.net.ssl.HostnameVerifier; 049import javax.net.ssl.SSLSession; 050import javax.net.ssl.SSLSocket; 051import javax.net.ssl.SSLSocketFactory; 052 053import org.jivesoftware.smack.AbstractXMPPConnection; 054import org.jivesoftware.smack.ConnectionConfiguration; 055import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode; 056import org.jivesoftware.smack.ConnectionListener; 057import org.jivesoftware.smack.SmackConfiguration; 058import org.jivesoftware.smack.SmackException; 059import org.jivesoftware.smack.SmackException.AlreadyConnectedException; 060import org.jivesoftware.smack.SmackException.AlreadyLoggedInException; 061import org.jivesoftware.smack.SmackException.ConnectionException; 062import org.jivesoftware.smack.SmackException.EndpointConnectionException; 063import org.jivesoftware.smack.SmackException.NotConnectedException; 064import org.jivesoftware.smack.SmackException.NotLoggedInException; 065import org.jivesoftware.smack.SmackException.SecurityNotPossibleException; 066import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException; 067import org.jivesoftware.smack.SmackFuture; 068import org.jivesoftware.smack.StanzaListener; 069import org.jivesoftware.smack.XMPPConnection; 070import org.jivesoftware.smack.XMPPException; 071import org.jivesoftware.smack.XMPPException.FailedNonzaException; 072import org.jivesoftware.smack.XMPPException.StreamErrorException; 073import org.jivesoftware.smack.compress.packet.Compress; 074import org.jivesoftware.smack.compress.packet.Compressed; 075import org.jivesoftware.smack.compression.XMPPInputOutputStream; 076import org.jivesoftware.smack.datatypes.UInt16; 077import org.jivesoftware.smack.filter.StanzaFilter; 078import org.jivesoftware.smack.internal.SmackTlsContext; 079import org.jivesoftware.smack.packet.Element; 080import org.jivesoftware.smack.packet.IQ; 081import org.jivesoftware.smack.packet.Message; 082import org.jivesoftware.smack.packet.Nonza; 083import org.jivesoftware.smack.packet.Presence; 084import org.jivesoftware.smack.packet.Stanza; 085import org.jivesoftware.smack.packet.StartTls; 086import org.jivesoftware.smack.packet.StreamError; 087import org.jivesoftware.smack.proxy.ProxyInfo; 088import org.jivesoftware.smack.sasl.packet.SaslNonza; 089import org.jivesoftware.smack.sm.SMUtils; 090import org.jivesoftware.smack.sm.StreamManagementException; 091import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException; 092import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError; 093import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException; 094import org.jivesoftware.smack.sm.packet.StreamManagement; 095import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer; 096import org.jivesoftware.smack.sm.packet.StreamManagement.AckRequest; 097import org.jivesoftware.smack.sm.packet.StreamManagement.Enable; 098import org.jivesoftware.smack.sm.packet.StreamManagement.Enabled; 099import org.jivesoftware.smack.sm.packet.StreamManagement.Failed; 100import org.jivesoftware.smack.sm.packet.StreamManagement.Resume; 101import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed; 102import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature; 103import org.jivesoftware.smack.sm.predicates.Predicate; 104import org.jivesoftware.smack.sm.provider.ParseStreamManagement; 105import org.jivesoftware.smack.tcp.rce.RemoteXmppTcpConnectionEndpoints; 106import org.jivesoftware.smack.tcp.rce.Rfc6120TcpRemoteConnectionEndpoint; 107import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown; 108import org.jivesoftware.smack.util.Async; 109import org.jivesoftware.smack.util.CloseableUtil; 110import org.jivesoftware.smack.util.PacketParserUtils; 111import org.jivesoftware.smack.util.StringUtils; 112import org.jivesoftware.smack.util.TLSUtils; 113import org.jivesoftware.smack.util.XmlStringBuilder; 114import org.jivesoftware.smack.util.rce.RemoteConnectionException; 115import org.jivesoftware.smack.xml.SmackXmlParser; 116import org.jivesoftware.smack.xml.XmlPullParser; 117import org.jivesoftware.smack.xml.XmlPullParserException; 118 119import org.jxmpp.jid.impl.JidCreate; 120import org.jxmpp.jid.parts.Resourcepart; 121import org.jxmpp.stringprep.XmppStringprepException; 122import org.minidns.dnsname.DnsName; 123 124/** 125 * Creates a socket connection to an XMPP server. This is the default connection 126 * to an XMPP server and is specified in the XMPP Core (RFC 6120). 127 * 128 * @see XMPPConnection 129 * @author Matt Tucker 130 */ 131public class XMPPTCPConnection extends AbstractXMPPConnection { 132 133 private static final int QUEUE_SIZE = 500; 134 private static final Logger LOGGER = Logger.getLogger(XMPPTCPConnection.class.getName()); 135 136 /** 137 * The socket which is used for this connection. 138 */ 139 private Socket socket; 140 141 /** 142 * 143 */ 144 private boolean disconnectedButResumeable = false; 145 146 private SSLSocket secureSocket; 147 148 /** 149 * Protected access level because of unit test purposes 150 */ 151 protected final PacketWriter packetWriter = new PacketWriter(); 152 153 /** 154 * Protected access level because of unit test purposes 155 */ 156 protected final PacketReader packetReader = new PacketReader(); 157 158 /** 159 * 160 */ 161 private boolean streamFeaturesAfterAuthenticationReceived; 162 163 /** 164 * 165 */ 166 private boolean compressSyncPoint; 167 168 /** 169 * The default bundle and defer callback, used for new connections. 170 * @see bundleAndDeferCallback 171 */ 172 private static BundleAndDeferCallback defaultBundleAndDeferCallback; 173 174 /** 175 * The used bundle and defer callback. 176 * <p> 177 * Although this field may be set concurrently, the 'volatile' keyword was deliberately not added, in order to avoid 178 * having a 'volatile' read within the writer threads loop. 179 * </p> 180 */ 181 private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback; 182 183 private static boolean useSmDefault = true; 184 185 private static boolean useSmResumptionDefault = true; 186 187 /** 188 * The stream ID of the stream that is currently resumable, ie. the stream we hold the state 189 * for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and 190 * {@link #unFailedNonzaExceptionacknowledgedStanzas}. 191 */ 192 private String smSessionId; 193 194 /** 195 * Represents the state of stream management resumption. 196 * <p> 197 * Unlike other sync points, this sync point is marked volatile because it is also read by the reader thread. 198 * </p> 199 */ 200 private volatile SyncPointState smResumedSyncPoint; 201 private Failed smResumptionFailed; 202 203 /** 204 * Represents the state of stream magement. 205 * <p> 206 * This boolean is marked volatile as it is read by various threads, including the reader thread via {@link #isSmEnabled()}. 207 * </p> 208 */ 209 private volatile boolean smEnabledSyncPoint; 210 211 /** 212 * The client's preferred maximum resumption time in seconds. 213 */ 214 private int smClientMaxResumptionTime = -1; 215 216 /** 217 * The server's preferred maximum resumption time in seconds. 218 */ 219 private int smServerMaxResumptionTime = -1; 220 221 /** 222 * Indicates whether Stream Management (XEP-198) should be used if it's supported by the server. 223 */ 224 private boolean useSm = useSmDefault; 225 private boolean useSmResumption = useSmResumptionDefault; 226 227 /** 228 * The counter that the server sends the client about it's current height. For example, if the server sends 229 * {@code <a h='42'/>}, then this will be set to 42 (while also handling the {@link #unacknowledgedStanzas} queue). 230 */ 231 private long serverHandledStanzasCount = 0; 232 233 /** 234 * The counter for stanzas handled ("received") by the client. 235 * <p> 236 * Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are 237 * not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since 238 * {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and 239 * therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic. 240 * </p> 241 */ 242 private long clientHandledStanzasCount = 0; 243 244 private BlockingQueue<Stanza> unacknowledgedStanzas; 245 246 /** 247 * Set to true if Stream Management was at least once enabled for this connection. 248 */ 249 private boolean smWasEnabledAtLeastOnce = false; 250 251 /** 252 * This listeners are invoked for every stanza that got acknowledged. 253 * <p> 254 * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove 255 * themselves after they have been invoked. 256 * </p> 257 */ 258 private final Collection<StanzaListener> stanzaAcknowledgedListeners = new ConcurrentLinkedQueue<>(); 259 260 /** 261 * These listeners are invoked for every stanza that got dropped. 262 * <p> 263 * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove 264 * themselves after they have been invoked. 265 * </p> 266 */ 267 private final Collection<StanzaListener> stanzaDroppedListeners = new ConcurrentLinkedQueue<>(); 268 269 /** 270 * This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will 271 * only be invoked once and automatically removed after that. 272 */ 273 private final Map<String, StanzaListener> stanzaIdAcknowledgedListeners = new ConcurrentHashMap<>(); 274 275 /** 276 * Predicates that determine if an stream management ack should be requested from the server. 277 * <p> 278 * We use a linked hash set here, so that the order how the predicates are added matches the 279 * order in which they are invoked in order to determine if an ack request should be send or not. 280 * </p> 281 */ 282 private final Set<StanzaFilter> requestAckPredicates = new LinkedHashSet<>(); 283 284 @SuppressWarnings("HidingField") 285 private final XMPPTCPConnectionConfiguration config; 286 287 /** 288 * Creates a new XMPP connection over TCP (optionally using proxies). 289 * <p> 290 * Note that XMPPTCPConnection constructors do not establish a connection to the server 291 * and you must call {@link #connect()}. 292 * </p> 293 * 294 * @param config the connection configuration. 295 */ 296 public XMPPTCPConnection(XMPPTCPConnectionConfiguration config) { 297 super(config); 298 this.config = config; 299 addConnectionListener(new ConnectionListener() { 300 @Override 301 public void connectionClosedOnError(Exception e) { 302 if (e instanceof XMPPException.StreamErrorException || e instanceof StreamManagementException) { 303 dropSmState(); 304 } 305 } 306 }); 307 308 // Re-init the reader and writer in case of SASL <success/>. This is done to reset the parser since a new stream 309 // is initiated. 310 buildNonzaCallback().listenFor(SaslNonza.Success.class, s -> resetParser()).install(); 311 } 312 313 /** 314 * Creates a new XMPP connection over TCP. 315 * <p> 316 * Note that {@code jid} must be the bare JID, e.g. "user@example.org". More fine-grained control over the 317 * connection settings is available using the {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} 318 * constructor. 319 * </p> 320 * 321 * @param jid the bare JID used by the client. 322 * @param password the password or authentication token. 323 * @throws XmppStringprepException if the provided string is invalid. 324 */ 325 public XMPPTCPConnection(CharSequence jid, String password) throws XmppStringprepException { 326 this(XMPPTCPConnectionConfiguration.builder().setXmppAddressAndPassword(jid, password).build()); 327 } 328 329 /** 330 * Creates a new XMPP connection over TCP. 331 * <p> 332 * This is the simplest constructor for connecting to an XMPP server. Alternatively, 333 * you can get fine-grained control over connection settings using the 334 * {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} constructor. 335 * </p> 336 * @param username TODO javadoc me please 337 * @param password TODO javadoc me please 338 * @param serviceName TODO javadoc me please 339 * @throws XmppStringprepException if the provided string is invalid. 340 */ 341 public XMPPTCPConnection(CharSequence username, String password, String serviceName) throws XmppStringprepException { 342 this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setXmppDomain( 343 JidCreate.domainBareFrom(serviceName)).build()); 344 } 345 346 @Override 347 protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException { 348 if (packetWriter == null) { 349 throw new NotConnectedException(); 350 } 351 packetWriter.throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 352 } 353 354 @Override 355 protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException { 356 if (isConnected() && !disconnectedButResumeable) { 357 throw new AlreadyConnectedException(); 358 } 359 } 360 361 @Override 362 protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException { 363 if (isAuthenticated() && !disconnectedButResumeable) { 364 throw new AlreadyLoggedInException(); 365 } 366 } 367 368 @Override 369 protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException { 370 // Reset the flag in case it was set 371 disconnectedButResumeable = false; 372 super.afterSuccessfulLogin(resumed); 373 } 374 375 @Override 376 protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException, 377 SmackException, IOException, InterruptedException { 378 // Authenticate using SASL 379 SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null; 380 381 streamFeaturesAfterAuthenticationReceived = false; 382 authenticate(username, password, config.getAuthzid(), sslSession); 383 384 // Wait for stream features after the authentication. 385 // TODO: The name of this synchronization point "maybeCompressFeaturesReceived" is not perfect. It should be 386 // renamed to "streamFeaturesAfterAuthenticationReceived". 387 waitForConditionOrThrowConnectionException(() -> streamFeaturesAfterAuthenticationReceived, "compress features from server"); 388 389 // If compression is enabled then request the server to use stream compression. XEP-170 390 // recommends to perform stream compression before resource binding. 391 maybeEnableCompression(); 392 393 smResumedSyncPoint = SyncPointState.initial; 394 smResumptionFailed = null; 395 if (isSmResumptionPossible()) { 396 smResumedSyncPoint = SyncPointState.request_sent; 397 sendNonza(new Resume(clientHandledStanzasCount, smSessionId)); 398 waitForConditionOrThrowConnectionException(() -> smResumedSyncPoint == SyncPointState.successful || smResumptionFailed != null, "resume previous stream"); 399 if (smResumedSyncPoint == SyncPointState.successful) { 400 // We successfully resumed the stream, be done here 401 afterSuccessfulLogin(true); 402 return; 403 } 404 // SM resumption failed, what Smack does here is to report success of 405 // lastFeaturesReceived in case of sm resumption was answered with 'failed' so that 406 // normal resource binding can be tried. 407 assert smResumptionFailed != null; 408 LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process: " + smResumptionFailed); 409 } 410 411 // We either failed to resume a previous stream management (SM) session, or we did not even try. In any case, 412 // mark SM as not enabled. Most importantly, we do this prior calling bindResourceAndEstablishSession(), as the 413 // bind IQ may trigger a SM ack request, which would be invalid in the pre resource bound state. 414 smEnabledSyncPoint = false; 415 416 List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>(); 417 if (unacknowledgedStanzas != null) { 418 // There was a previous connection with SM enabled but that was either not resumable or 419 // failed to resume. Make sure that we (re-)send the unacknowledged stanzas. 420 unacknowledgedStanzas.drainTo(previouslyUnackedStanzas); 421 // Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this 422 // XMPP session (There maybe was an enabled in a previous XMPP session of this 423 // connection instance though). This is used in writePackets to decide if stanzas should 424 // be added to the unacknowledged stanzas queue, because they have to be added right 425 // after the 'enable' stream element has been sent. 426 dropSmState(); 427 } 428 429 // Now bind the resource. It is important to do this *after* we dropped an eventually 430 // existing Stream Management state. As otherwise <bind/> and <session/> may end up in 431 // unacknowledgedStanzas and become duplicated on reconnect. See SMACK-706. 432 bindResourceAndEstablishSession(resource); 433 434 if (isSmAvailable() && useSm) { 435 // Remove what is maybe left from previously stream managed sessions 436 serverHandledStanzasCount = 0; 437 sendNonza(new Enable(useSmResumption, smClientMaxResumptionTime)); 438 // XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed' 439 // then this is a non recoverable error and we therefore throw an exception. 440 waitForConditionOrThrowConnectionException(() -> smEnabledSyncPoint, "enabling stream mangement"); 441 synchronized (requestAckPredicates) { 442 if (requestAckPredicates.isEmpty()) { 443 // Assure that we have at lest one predicate set up that so that we request acks 444 // for the server and eventually flush some stanzas from the unacknowledged 445 // stanza queue 446 requestAckPredicates.add(Predicate.forMessagesOrAfter5Stanzas()); 447 } 448 } 449 } 450 // Inform client about failed resumption if possible, resend stanzas otherwise 451 // Process the stanzas synchronously so a client can re-queue them for transmission 452 // before it is informed about connection success 453 if (!stanzaDroppedListeners.isEmpty()) { 454 for (Stanza stanza : previouslyUnackedStanzas) { 455 for (StanzaListener listener : stanzaDroppedListeners) { 456 try { 457 listener.processStanza(stanza); 458 } 459 catch (InterruptedException | NotConnectedException | NotLoggedInException e) { 460 LOGGER.log(Level.FINER, "StanzaDroppedListener received exception", e); 461 } 462 } 463 } 464 } else { 465 for (Stanza stanza : previouslyUnackedStanzas) { 466 sendStanzaInternal(stanza); 467 } 468 } 469 470 afterSuccessfulLogin(false); 471 } 472 473 @Override 474 public boolean isSecureConnection() { 475 return secureSocket != null; 476 } 477 478 /** 479 * Shuts the current connection down. After this method returns, the connection must be ready 480 * for re-use by connect. 481 */ 482 @Override 483 protected void shutdown() { 484 if (isSmEnabled()) { 485 try { 486 // Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM 487 // state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either. 488 sendSmAcknowledgementInternal(); 489 } catch (InterruptedException | NotConnectedException e) { 490 LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e); 491 } 492 } 493 shutdown(false); 494 } 495 496 @Override 497 public synchronized void instantShutdown() { 498 shutdown(true); 499 } 500 501 private void shutdown(boolean instant) { 502 // The writer thread may already been finished at this point, for example when the connection is in the 503 // disconnected-but-resumable state. There is no need to wait for the closing stream tag from the server in this 504 // case. 505 if (!packetWriter.done()) { 506 // First shutdown the writer, this will result in a closing stream element getting send to 507 // the server 508 LOGGER.finer(packetWriter.threadName + " shutdown()"); 509 packetWriter.shutdown(instant); 510 LOGGER.finer(packetWriter.threadName + " shutdown() returned"); 511 512 if (!instant) { 513 waitForClosingStreamTagFromServer(); 514 } 515 } 516 517 LOGGER.finer(packetReader.threadName + " shutdown()"); 518 packetReader.shutdown(); 519 LOGGER.finer(packetReader.threadName + " shutdown() returned"); 520 521 CloseableUtil.maybeClose(socket, LOGGER); 522 523 setWasAuthenticated(); 524 525 try { 526 boolean readerAndWriterThreadsTermianted = waitFor(() -> !packetWriter.running && !packetReader.running); 527 if (!readerAndWriterThreadsTermianted) { 528 LOGGER.severe("Reader and/or writer threads did not terminate timely. Writer running: " 529 + packetWriter.running + ", Reader running: " + packetReader.running); 530 } else { 531 LOGGER.fine("Reader and writer threads terminated"); 532 } 533 } catch (InterruptedException e) { 534 LOGGER.log(Level.FINE, "Interrupted while waiting for reader and writer threads to terminate", e); 535 } 536 537 if (disconnectedButResumeable) { 538 return; 539 } 540 541 // If we are able to resume the stream, then don't set 542 // connected/authenticated/usingTLS to false since we like behave like we are still 543 // connected (e.g. sendStanza should not throw a NotConnectedException). 544 if (instant) { 545 disconnectedButResumeable = isSmResumptionPossible(); 546 if (!disconnectedButResumeable) { 547 // Reset the stream management session id to null, since the stream is no longer resumable. Note that we 548 // keep the unacknowledgedStanzas queue, because we want to resend them when we are reconnected. 549 smSessionId = null; 550 } 551 } else { 552 disconnectedButResumeable = false; 553 554 // Drop the stream management state if this is not an instant shutdown. We send 555 // a </stream> close tag and now the stream management state is no longer valid. 556 // This also prevents that we will potentially (re-)send any unavailable presence we 557 // may have send, because it got put into the unacknowledged queue and was not acknowledged before the 558 // connection terminated. 559 dropSmState(); 560 // Note that we deliberately do not reset authenticatedConnectionInitiallyEstablishedTimestamp here, so that the 561 // information is available in the connectionClosedOnError() listeners. 562 } 563 authenticated = false; 564 connected = false; 565 secureSocket = null; 566 reader = null; 567 writer = null; 568 569 initState(); 570 } 571 572 @Override 573 public void sendNonza(Nonza element) throws NotConnectedException, InterruptedException { 574 packetWriter.sendStreamElement(element); 575 } 576 577 @Override 578 protected void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException { 579 packetWriter.sendStreamElement(packet); 580 if (isSmEnabled()) { 581 for (StanzaFilter requestAckPredicate : requestAckPredicates) { 582 if (requestAckPredicate.accept(packet)) { 583 requestSmAcknowledgementInternal(); 584 break; 585 } 586 } 587 } 588 } 589 590 private void connectUsingConfiguration() throws ConnectionException, IOException, InterruptedException { 591 RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result = RemoteXmppTcpConnectionEndpoints.lookup(config); 592 593 List<RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint>> connectionExceptions = new ArrayList<>(); 594 595 SocketFactory socketFactory = config.getSocketFactory(); 596 ProxyInfo proxyInfo = config.getProxyInfo(); 597 int timeout = config.getConnectTimeout(); 598 if (socketFactory == null) { 599 socketFactory = SocketFactory.getDefault(); 600 } 601 for (Rfc6120TcpRemoteConnectionEndpoint endpoint : result.discoveredRemoteConnectionEndpoints) { 602 Iterator<? extends InetAddress> inetAddresses; 603 String host = endpoint.getHost().toString(); 604 UInt16 portUint16 = endpoint.getPort(); 605 int port = portUint16.intValue(); 606 if (proxyInfo == null) { 607 inetAddresses = endpoint.getInetAddresses().iterator(); 608 assert inetAddresses.hasNext(); 609 610 innerloop: while (inetAddresses.hasNext()) { 611 // Create a *new* Socket before every connection attempt, i.e. connect() call, since Sockets are not 612 // re-usable after a failed connection attempt. See also SMACK-724. 613 SmackFuture.SocketFuture socketFuture = new SmackFuture.SocketFuture(socketFactory); 614 615 final InetAddress inetAddress = inetAddresses.next(); 616 final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, port); 617 LOGGER.finer("Trying to establish TCP connection to " + inetSocketAddress); 618 socketFuture.connectAsync(inetSocketAddress, timeout); 619 620 try { 621 socket = socketFuture.getOrThrow(); 622 } catch (IOException e) { 623 RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>( 624 endpoint, inetAddress, e); 625 connectionExceptions.add(rce); 626 if (inetAddresses.hasNext()) { 627 continue innerloop; 628 } else { 629 break innerloop; 630 } 631 } 632 LOGGER.finer("Established TCP connection to " + inetSocketAddress); 633 // We found a host to connect to, return here 634 this.host = host; 635 this.port = portUint16; 636 return; 637 } 638 } else { 639 // TODO: Move this into the inner-loop above. There appears no reason why we should not try a proxy 640 // connection to every inet address of each connection endpoint. 641 socket = socketFactory.createSocket(); 642 StringUtils.requireNotNullNorEmpty(host, "Host of endpoint " + endpoint + " must not be null when using a Proxy"); 643 final String hostAndPort = host + " at port " + port; 644 LOGGER.finer("Trying to establish TCP connection via Proxy to " + hostAndPort); 645 try { 646 proxyInfo.getProxySocketConnection().connect(socket, host, port, timeout); 647 } catch (IOException e) { 648 CloseableUtil.maybeClose(socket, LOGGER); 649 RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>(endpoint, null, e); 650 connectionExceptions.add(rce); 651 continue; 652 } 653 LOGGER.finer("Established TCP connection to " + hostAndPort); 654 // We found a host to connect to, return here 655 this.host = host; 656 this.port = portUint16; 657 return; 658 } 659 } 660 661 // There are no more host addresses to try 662 // throw an exception and report all tried 663 // HostAddresses in the exception 664 throw EndpointConnectionException.from(result.lookupFailures, connectionExceptions); 665 } 666 667 /** 668 * Initializes the connection by creating a stanza reader and writer and opening a 669 * XMPP stream to the server. 670 * 671 * @throws XMPPException if establishing a connection to the server fails. 672 * @throws SmackException if the server fails to respond back or if there is anther error. 673 * @throws IOException if an I/O error occurred. 674 * @throws InterruptedException if the calling thread was interrupted. 675 */ 676 private void initConnection() throws IOException, InterruptedException { 677 compressionHandler = null; 678 679 // Set the reader and writer instance variables 680 initReaderAndWriter(); 681 682 // Start the writer thread. This will open an XMPP stream to the server 683 packetWriter.init(); 684 // Start the reader thread. The startup() method will block until we 685 // get an opening stream packet back from server 686 packetReader.init(); 687 } 688 689 private void initReaderAndWriter() throws IOException { 690 InputStream is = socket.getInputStream(); 691 OutputStream os = socket.getOutputStream(); 692 if (compressionHandler != null) { 693 is = compressionHandler.getInputStream(is); 694 os = compressionHandler.getOutputStream(os); 695 } 696 // OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter 697 writer = new OutputStreamWriter(os, "UTF-8"); 698 reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); 699 700 // If debugging is enabled, we open a window and write out all network traffic. 701 initDebugger(); 702 } 703 704 /** 705 * The server has indicated that TLS negotiation can start. We now need to secure the 706 * existing plain connection and perform a handshake. This method won't return until the 707 * connection has finished the handshake or an error occurred while securing the connection. 708 * @throws IOException if an I/O error occurred. 709 * @throws SecurityNotPossibleException if TLS is not possible. 710 * @throws CertificateException if there is an issue with the certificate. 711 */ 712 @SuppressWarnings("LiteralClassName") 713 private void proceedTLSReceived() throws IOException, SecurityNotPossibleException, CertificateException { 714 SmackTlsContext smackTlsContext = getSmackTlsContext(); 715 716 Socket plain = socket; 717 int port = plain.getPort(); 718 String xmppServiceDomainString = config.getXMPPServiceDomain().toString(); 719 SSLSocketFactory sslSocketFactory = smackTlsContext.sslContext.getSocketFactory(); 720 // Secure the plain connection 721 socket = sslSocketFactory.createSocket(plain, xmppServiceDomainString, port, true); 722 723 final SSLSocket sslSocket = (SSLSocket) socket; 724 // Immediately set the enabled SSL protocols and ciphers. See SMACK-712 why this is 725 // important (at least on certain platforms) and it seems to be a good idea anyways to 726 // prevent an accidental implicit handshake. 727 TLSUtils.setEnabledProtocolsAndCiphers(sslSocket, config.getEnabledSSLProtocols(), config.getEnabledSSLCiphers()); 728 729 // Initialize the reader and writer with the new secured version 730 initReaderAndWriter(); 731 732 // Proceed to do the handshake 733 sslSocket.startHandshake(); 734 735 if (smackTlsContext.daneVerifier != null) { 736 smackTlsContext.daneVerifier.finish(sslSocket.getSession()); 737 } 738 739 final HostnameVerifier verifier = getConfiguration().getHostnameVerifier(); 740 if (verifier == null) { 741 throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure."); 742 } 743 744 final String verifierHostname; 745 { 746 DnsName xmppServiceDomainDnsName = getConfiguration().getXmppServiceDomainAsDnsNameIfPossible(); 747 // Try to convert the XMPP service domain, which potentially includes Unicode characters, into ASCII 748 // Compatible Encoding (ACE) to match RFC3280 dNSname IA5String constraint. 749 // See also: https://bugzilla.mozilla.org/show_bug.cgi?id=280839#c1 750 if (xmppServiceDomainDnsName != null) { 751 verifierHostname = xmppServiceDomainDnsName.ace; 752 } 753 else { 754 LOGGER.log(Level.WARNING, "XMPP service domain name '" + getXMPPServiceDomain() 755 + "' can not be represented as DNS name. TLS X.509 certificate validiation may fail."); 756 verifierHostname = getXMPPServiceDomain().toString(); 757 } 758 } 759 760 final boolean verificationSuccessful; 761 // Verify the TLS session. 762 verificationSuccessful = verifier.verify(verifierHostname, sslSocket.getSession()); 763 if (!verificationSuccessful) { 764 throw new CertificateException( 765 "Hostname verification of certificate failed. Certificate does not authenticate " 766 + getXMPPServiceDomain()); 767 } 768 769 // Set that TLS was successful 770 secureSocket = sslSocket; 771 } 772 773 /** 774 * Returns the compression handler that can be used for one compression methods offered by the server. 775 * 776 * @return a instance of XMPPInputOutputStream or null if no suitable instance was found 777 * 778 */ 779 private static XMPPInputOutputStream maybeGetCompressionHandler(Compress.Feature compression) { 780 for (XMPPInputOutputStream handler : SmackConfiguration.getCompressionHandlers()) { 781 String method = handler.getCompressionMethod(); 782 if (compression.getMethods().contains(method)) 783 return handler; 784 } 785 return null; 786 } 787 788 @Override 789 public boolean isUsingCompression() { 790 return compressionHandler != null && compressSyncPoint; 791 } 792 793 /** 794 * <p> 795 * Starts using stream compression that will compress network traffic. Traffic can be 796 * reduced up to 90%. Therefore, stream compression is ideal when using a slow speed network 797 * connection. However, the server and the client will need to use more CPU time in order to 798 * un/compress network data so under high load the server performance might be affected. 799 * </p> 800 * <p> 801 * Stream compression has to have been previously offered by the server. Currently only the 802 * zlib method is supported by the client. Stream compression negotiation has to be done 803 * before authentication took place. 804 * </p> 805 * 806 * @throws NotConnectedException if the XMPP connection is not connected. 807 * @throws SmackException if Smack detected an exceptional situation. 808 * @throws InterruptedException if the calling thread was interrupted. 809 * @throws XMPPException if an XMPP protocol error was received. 810 */ 811 private void maybeEnableCompression() throws SmackException, InterruptedException, XMPPException { 812 if (!config.isCompressionEnabled()) { 813 return; 814 } 815 816 Compress.Feature compression = getFeature(Compress.Feature.class); 817 if (compression == null) { 818 // Server does not support compression 819 return; 820 } 821 // If stream compression was offered by the server and we want to use 822 // compression then send compression request to the server 823 if ((compressionHandler = maybeGetCompressionHandler(compression)) != null) { 824 compressSyncPoint = false; 825 sendNonza(new Compress(compressionHandler.getCompressionMethod())); 826 waitForConditionOrThrowConnectionException(() -> compressSyncPoint, "establishing stream compression"); 827 } else { 828 LOGGER.warning("Could not enable compression because no matching handler/method pair was found"); 829 } 830 } 831 832 /** 833 * Establishes a connection to the XMPP server. It basically 834 * creates and maintains a socket connection to the server. 835 * <p> 836 * Listeners will be preserved from a previous connection if the reconnection 837 * occurs after an abrupt termination. 838 * </p> 839 * 840 * @throws XMPPException if an error occurs while trying to establish the connection. 841 * @throws SmackException if Smack detected an exceptional situation. 842 * @throws IOException if an I/O error occurred. 843 * @throws InterruptedException if the calling thread was interrupted. 844 */ 845 @Override 846 protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException { 847 // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if 848 // there is an error establishing the connection 849 connectUsingConfiguration(); 850 851 connected = true; 852 853 // We connected successfully to the servers TCP port 854 initConnection(); 855 856 // TLS handled will be true either if TLS was established, or if it was not mandatory. 857 waitForConditionOrThrowConnectionException(() -> tlsHandled, "establishing TLS"); 858 859 // Wait with SASL auth until the SASL mechanisms have been received 860 waitForConditionOrThrowConnectionException(() -> saslFeatureReceived, "SASL mechanisms stream feature from server"); 861 } 862 863 /** 864 * For unit testing purposes 865 * 866 * @param writer TODO javadoc me please 867 */ 868 protected void setWriter(Writer writer) { 869 this.writer = writer; 870 } 871 872 @Override 873 protected void afterFeaturesReceived() throws NotConnectedException, InterruptedException, SecurityRequiredByServerException { 874 StartTls startTlsFeature = getFeature(StartTls.class); 875 if (startTlsFeature != null) { 876 if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) { 877 SecurityRequiredByServerException smackException = new SecurityRequiredByServerException(); 878 currentSmackException = smackException; 879 notifyWaitingThreads(); 880 throw smackException; 881 } 882 883 if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) { 884 sendNonza(new StartTls()); 885 } else { 886 tlsHandled = true; 887 notifyWaitingThreads(); 888 } 889 } else { 890 tlsHandled = true; 891 notifyWaitingThreads(); 892 } 893 894 if (isSaslAuthenticated()) { 895 // If we have received features after the SASL has been successfully completed, then we 896 // have also *maybe* received, as it is an optional feature, the compression feature 897 // from the server. 898 streamFeaturesAfterAuthenticationReceived = true; 899 notifyWaitingThreads(); 900 } 901 } 902 903 private void resetParser() throws IOException { 904 try { 905 packetReader.parser = SmackXmlParser.newXmlParser(reader); 906 } catch (XmlPullParserException e) { 907 throw new IOException(e); 908 } 909 } 910 911 private void openStreamAndResetParser() throws IOException, NotConnectedException, InterruptedException { 912 sendStreamOpen(); 913 resetParser(); 914 } 915 916 protected class PacketReader { 917 918 private final String threadName = "Smack Reader (" + getConnectionCounter() + ')'; 919 920 XmlPullParser parser; 921 922 private volatile boolean done; 923 924 private boolean running; 925 926 /** 927 * Initializes the reader in order to be used. The reader is initialized during the 928 * first connection and when reconnecting due to an abruptly disconnection. 929 */ 930 void init() { 931 done = false; 932 933 running = true; 934 Async.go(new Runnable() { 935 @Override 936 public void run() { 937 LOGGER.finer(threadName + " start"); 938 try { 939 parsePackets(); 940 } finally { 941 LOGGER.finer(threadName + " exit"); 942 running = false; 943 notifyWaitingThreads(); 944 } 945 } 946 }, threadName); 947 } 948 949 /** 950 * Shuts the stanza reader down. This method simply sets the 'done' flag to true. 951 */ 952 void shutdown() { 953 done = true; 954 } 955 956 /** 957 * Parse top-level packets in order to process them further. 958 */ 959 private void parsePackets() { 960 try { 961 openStreamAndResetParser(); 962 XmlPullParser.Event eventType = parser.getEventType(); 963 while (!done) { 964 switch (eventType) { 965 case START_ELEMENT: 966 final String name = parser.getName(); 967 switch (name) { 968 case Message.ELEMENT: 969 case IQ.IQ_ELEMENT: 970 case Presence.ELEMENT: 971 try { 972 parseAndProcessStanza(parser); 973 } finally { 974 clientHandledStanzasCount = SMUtils.incrementHeight(clientHandledStanzasCount); 975 } 976 break; 977 case "stream": 978 onStreamOpen(parser); 979 break; 980 case "error": 981 StreamError streamError = PacketParserUtils.parseStreamError(parser); 982 // Stream errors are non recoverable, throw this exceptions. Also note that this will set 983 // this exception as current connection exceptions and notify any waiting threads. 984 throw new StreamErrorException(streamError); 985 case "features": 986 parseFeaturesAndNotify(parser); 987 break; 988 case "proceed": 989 // Secure the connection by negotiating TLS 990 proceedTLSReceived(); 991 // Send a new opening stream to the server 992 openStreamAndResetParser(); 993 break; 994 case "failure": 995 String namespace = parser.getNamespace(null); 996 switch (namespace) { 997 case "urn:ietf:params:xml:ns:xmpp-tls": 998 // TLS negotiation has failed. The server will close the connection 999 // TODO Parse failure stanza 1000 throw new SmackException.SmackMessageException("TLS negotiation has failed"); 1001 case "http://jabber.org/protocol/compress": 1002 // Stream compression has been denied. This is a recoverable 1003 // situation. It is still possible to authenticate and 1004 // use the connection but using an uncompressed connection 1005 // TODO Parse failure stanza 1006 currentSmackException = new SmackException.SmackMessageException("Could not establish compression"); 1007 notifyWaitingThreads(); 1008 break; 1009 default: 1010 parseAndProcessNonza(parser); 1011 } 1012 break; 1013 case Compressed.ELEMENT: 1014 // Server confirmed that it's possible to use stream compression. Start 1015 // stream compression 1016 // Initialize the reader and writer with the new compressed version 1017 initReaderAndWriter(); 1018 // Send a new opening stream to the server 1019 openStreamAndResetParser(); 1020 // Notify that compression is being used 1021 compressSyncPoint = true; 1022 notifyWaitingThreads(); 1023 break; 1024 case Enabled.ELEMENT: 1025 Enabled enabled = ParseStreamManagement.enabled(parser); 1026 if (enabled.isResumeSet()) { 1027 smSessionId = enabled.getId(); 1028 if (StringUtils.isNullOrEmpty(smSessionId)) { 1029 SmackException xmppException = new SmackException.SmackMessageException("Stream Management 'enabled' element with resume attribute but without session id received"); 1030 setCurrentConnectionExceptionAndNotify(xmppException); 1031 throw xmppException; 1032 } 1033 smServerMaxResumptionTime = enabled.getMaxResumptionTime(); 1034 } else { 1035 // Mark this a non-resumable stream by setting smSessionId to null 1036 smSessionId = null; 1037 } 1038 clientHandledStanzasCount = 0; 1039 smWasEnabledAtLeastOnce = true; 1040 smEnabledSyncPoint = true; 1041 notifyWaitingThreads(); 1042 break; 1043 case Failed.ELEMENT: 1044 Failed failed = ParseStreamManagement.failed(parser); 1045 if (smResumedSyncPoint == SyncPointState.request_sent) { 1046 // This is a <failed/> nonza in a response to resuming a previous stream, failure to do 1047 // so is non-fatal as we can simply continue with resource binding in this case. 1048 smResumptionFailed = failed; 1049 notifyWaitingThreads(); 1050 } else { 1051 FailedNonzaException xmppException = new FailedNonzaException(failed, failed.getStanzaErrorCondition()); 1052 setCurrentConnectionExceptionAndNotify(xmppException); 1053 } 1054 break; 1055 case Resumed.ELEMENT: 1056 Resumed resumed = ParseStreamManagement.resumed(parser); 1057 if (!smSessionId.equals(resumed.getPrevId())) { 1058 throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId()); 1059 } 1060 // Mark SM as enabled 1061 smEnabledSyncPoint = true; 1062 // First, drop the stanzas already handled by the server 1063 processHandledCount(resumed.getHandledCount()); 1064 // Then re-send what is left in the unacknowledged queue 1065 List<Stanza> stanzasToResend = new ArrayList<>(unacknowledgedStanzas.size()); 1066 unacknowledgedStanzas.drainTo(stanzasToResend); 1067 for (Stanza stanza : stanzasToResend) { 1068 sendStanzaInternal(stanza); 1069 } 1070 // If there where stanzas resent, then request a SM ack for them. 1071 // Writer's sendStreamElement() won't do it automatically based on 1072 // predicates. 1073 if (!stanzasToResend.isEmpty()) { 1074 requestSmAcknowledgementInternal(); 1075 } 1076 // Mark SM resumption as successful 1077 smResumedSyncPoint = SyncPointState.successful; 1078 notifyWaitingThreads(); 1079 break; 1080 case AckAnswer.ELEMENT: 1081 AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser); 1082 processHandledCount(ackAnswer.getHandledCount()); 1083 break; 1084 case AckRequest.ELEMENT: 1085 ParseStreamManagement.ackRequest(parser); 1086 if (smEnabledSyncPoint) { 1087 sendSmAcknowledgementInternal(); 1088 } else { 1089 LOGGER.warning("SM Ack Request received while SM is not enabled"); 1090 } 1091 break; 1092 default: 1093 parseAndProcessNonza(parser); 1094 break; 1095 } 1096 break; 1097 case END_ELEMENT: 1098 final String endTagName = parser.getName(); 1099 if ("stream".equals(endTagName)) { 1100 if (!parser.getNamespace().equals("http://etherx.jabber.org/streams")) { 1101 LOGGER.warning(XMPPTCPConnection.this + " </stream> but different namespace " + parser.getNamespace()); 1102 break; 1103 } 1104 1105 // Check if the queue was already shut down before reporting success on closing stream tag 1106 // received. This avoids a race if there is a disconnect(), followed by a connect(), which 1107 // did re-start the queue again, causing this writer to assume that the queue is not 1108 // shutdown, which results in a call to disconnect(). 1109 final boolean queueWasShutdown = packetWriter.queue.isShutdown(); 1110 closingStreamReceived = true; 1111 notifyWaitingThreads(); 1112 1113 if (queueWasShutdown) { 1114 // We received a closing stream element *after* we initiated the 1115 // termination of the session by sending a closing stream element to 1116 // the server first 1117 return; 1118 } else { 1119 // We received a closing stream element from the server without us 1120 // sending a closing stream element first. This means that the 1121 // server wants to terminate the session, therefore disconnect 1122 // the connection 1123 LOGGER.info(XMPPTCPConnection.this 1124 + " received closing </stream> element." 1125 + " Server wants to terminate the connection, calling disconnect()"); 1126 ASYNC_BUT_ORDERED.performAsyncButOrdered(XMPPTCPConnection.this, new Runnable() { 1127 @Override 1128 public void run() { 1129 disconnect(); 1130 }}); 1131 } 1132 } 1133 break; 1134 case END_DOCUMENT: 1135 // END_DOCUMENT only happens in an error case, as otherwise we would see a 1136 // closing stream element before. 1137 throw new SmackException.SmackMessageException( 1138 "Parser got END_DOCUMENT event. This could happen e.g. if the server closed the connection without sending a closing stream element"); 1139 default: 1140 // Catch all for incomplete switch (MissingCasesInEnumSwitch) statement. 1141 break; 1142 } 1143 eventType = parser.next(); 1144 } 1145 } 1146 catch (Exception e) { 1147 // Set running to false since this thread will exit here and notifyConnectionError() will wait until 1148 // the reader and writer thread's 'running' value is false. Hence we need to set it to false before calling 1149 // notifyConnetctionError() below, even though run() also sets it to false. Therefore, do not remove this. 1150 running = false; 1151 1152 String ignoreReasonThread = null; 1153 1154 boolean writerThreadWasShutDown = packetWriter.queue.isShutdown(); 1155 if (writerThreadWasShutDown) { 1156 ignoreReasonThread = "writer"; 1157 } else if (done) { 1158 ignoreReasonThread = "reader"; 1159 } 1160 1161 if (ignoreReasonThread != null) { 1162 LOGGER.log(Level.FINER, "Ignoring " + e + " as " + ignoreReasonThread + " was already shut down"); 1163 return; 1164 } 1165 1166 // Close the connection and notify connection listeners of the error. 1167 notifyConnectionError(e); 1168 } 1169 } 1170 } 1171 1172 protected class PacketWriter { 1173 public static final int QUEUE_SIZE = XMPPTCPConnection.QUEUE_SIZE; 1174 public static final int UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE = 1024; 1175 public static final int UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE_HIGH_WATER_MARK = (int) (0.3 * UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE); 1176 1177 private final String threadName = "Smack Writer (" + getConnectionCounter() + ')'; 1178 1179 private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<>( 1180 QUEUE_SIZE, true); 1181 1182 /** 1183 * If set, the stanza writer is shut down 1184 */ 1185 protected volatile Long shutdownTimestamp = null; 1186 1187 private volatile boolean instantShutdown; 1188 1189 /** 1190 * True if some preconditions are given to start the bundle and defer mechanism. 1191 * <p> 1192 * This will likely get set to true right after the start of the writer thread, because 1193 * {@link #nextStreamElement()} will check if {@link queue} is empty, which is probably the case, and then set 1194 * this field to true. 1195 * </p> 1196 */ 1197 private boolean shouldBundleAndDefer; 1198 1199 private boolean running; 1200 1201 /** 1202 * Initializes the writer in order to be used. It is called at the first connection and also 1203 * is invoked if the connection is disconnected by an error. 1204 */ 1205 void init() { 1206 shutdownTimestamp = null; 1207 1208 if (unacknowledgedStanzas != null) { 1209 // It's possible that there are new stanzas in the writer queue that 1210 // came in while we were disconnected but resumable, drain those into 1211 // the unacknowledged queue so that they get resent now 1212 drainWriterQueueToUnacknowledgedStanzas(); 1213 } 1214 1215 queue.start(); 1216 running = true; 1217 Async.go(new Runnable() { 1218 @Override 1219 public void run() { 1220 LOGGER.finer(threadName + " start"); 1221 try { 1222 writePackets(); 1223 } finally { 1224 LOGGER.finer(threadName + " exit"); 1225 running = false; 1226 notifyWaitingThreads(); 1227 } 1228 } 1229 }, threadName); 1230 } 1231 1232 private boolean done() { 1233 return shutdownTimestamp != null; 1234 } 1235 1236 protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException { 1237 final boolean done = done(); 1238 if (done) { 1239 final boolean smResumptionPossible = isSmResumptionPossible(); 1240 // Don't throw a NotConnectedException is there is an resumable stream available 1241 if (!smResumptionPossible) { 1242 throw new NotConnectedException(XMPPTCPConnection.this, "done=" + done 1243 + " smResumptionPossible=" + smResumptionPossible); 1244 } 1245 } 1246 } 1247 1248 /** 1249 * Sends the specified element to the server. 1250 * 1251 * @param element the element to send. 1252 * @throws NotConnectedException if the XMPP connection is not connected. 1253 * @throws InterruptedException if the calling thread was interrupted. 1254 */ 1255 protected void sendStreamElement(Element element) throws NotConnectedException, InterruptedException { 1256 throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 1257 try { 1258 queue.put(element); 1259 } 1260 catch (InterruptedException e) { 1261 // put() may throw an InterruptedException for two reasons: 1262 // 1. If the queue was shut down 1263 // 2. If the thread was interrupted 1264 // so we have to check which is the case 1265 throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 1266 // If the method above did not throw, then the sending thread was interrupted 1267 throw e; 1268 } 1269 } 1270 1271 /** 1272 * Shuts down the stanza writer. Once this method has been called, no further 1273 * packets will be written to the server. 1274 */ 1275 void shutdown(boolean instant) { 1276 instantShutdown = instant; 1277 queue.shutdown(); 1278 shutdownTimestamp = System.currentTimeMillis(); 1279 } 1280 1281 /** 1282 * Maybe return the next available element from the queue for writing. If the queue is shut down <b>or</b> a 1283 * spurious interrupt occurs, <code>null</code> is returned. So it is important to check the 'done' condition in 1284 * that case. 1285 * 1286 * @return the next element for writing or null. 1287 */ 1288 private Element nextStreamElement() { 1289 // It is important the we check if the queue is empty before removing an element from it 1290 if (queue.isEmpty()) { 1291 shouldBundleAndDefer = true; 1292 } 1293 Element packet = null; 1294 try { 1295 packet = queue.take(); 1296 } 1297 catch (InterruptedException e) { 1298 if (!queue.isShutdown()) { 1299 // Users shouldn't try to interrupt the packet writer thread 1300 LOGGER.log(Level.WARNING, "Writer thread was interrupted. Don't do that. Use disconnect() instead.", e); 1301 } 1302 } 1303 return packet; 1304 } 1305 1306 private void writePackets() { 1307 try { 1308 // Write out packets from the queue. 1309 while (!done()) { 1310 Element element = nextStreamElement(); 1311 if (element == null) { 1312 continue; 1313 } 1314 1315 // Get a local version of the bundle and defer callback, in case it's unset 1316 // between the null check and the method invocation 1317 final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback; 1318 // If the preconditions are given (e.g. bundleAndDefer callback is set, queue is 1319 // empty), then we could wait a bit for further stanzas attempting to decrease 1320 // our energy consumption 1321 if (localBundleAndDeferCallback != null && isAuthenticated() && shouldBundleAndDefer) { 1322 // Reset shouldBundleAndDefer to false, nextStreamElement() will set it to true once the 1323 // queue is empty again. 1324 shouldBundleAndDefer = false; 1325 final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean(); 1326 final int bundleAndDeferMillis = localBundleAndDeferCallback.getBundleAndDeferMillis(new BundleAndDefer( 1327 bundlingAndDeferringStopped)); 1328 if (bundleAndDeferMillis > 0) { 1329 long remainingWait = bundleAndDeferMillis; 1330 final long waitStart = System.currentTimeMillis(); 1331 synchronized (bundlingAndDeferringStopped) { 1332 while (!bundlingAndDeferringStopped.get() && remainingWait > 0) { 1333 bundlingAndDeferringStopped.wait(remainingWait); 1334 remainingWait = bundleAndDeferMillis 1335 - (System.currentTimeMillis() - waitStart); 1336 } 1337 } 1338 } 1339 } 1340 1341 Stanza packet = null; 1342 if (element instanceof Stanza) { 1343 packet = (Stanza) element; 1344 } 1345 else if (element instanceof Enable) { 1346 // The client needs to add messages to the unacknowledged stanzas queue 1347 // right after it sent 'enabled'. Stanza will be added once 1348 // unacknowledgedStanzas is not null. 1349 unacknowledgedStanzas = new ArrayBlockingQueue<>(UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE); 1350 } 1351 maybeAddToUnacknowledgedStanzas(packet); 1352 1353 CharSequence elementXml = element.toXML(outgoingStreamXmlEnvironment); 1354 if (elementXml instanceof XmlStringBuilder) { 1355 try { 1356 ((XmlStringBuilder) elementXml).write(writer, outgoingStreamXmlEnvironment); 1357 } catch (NullPointerException npe) { 1358 LOGGER.log(Level.FINE, "NPE in XmlStringBuilder of " + element.getClass() + ": " + element, npe); 1359 throw npe; 1360 } 1361 } 1362 else { 1363 writer.write(elementXml.toString()); 1364 } 1365 1366 if (queue.isEmpty()) { 1367 writer.flush(); 1368 } 1369 if (packet != null) { 1370 firePacketSendingListeners(packet); 1371 } 1372 } 1373 if (!instantShutdown) { 1374 // Flush out the rest of the queue. 1375 try { 1376 while (!queue.isEmpty()) { 1377 Element packet = queue.remove(); 1378 if (packet instanceof Stanza) { 1379 Stanza stanza = (Stanza) packet; 1380 maybeAddToUnacknowledgedStanzas(stanza); 1381 } 1382 writer.write(packet.toXML().toString()); 1383 } 1384 } 1385 catch (Exception e) { 1386 LOGGER.log(Level.WARNING, 1387 "Exception flushing queue during shutdown, ignore and continue", 1388 e); 1389 } 1390 1391 // Close the stream. 1392 try { 1393 writer.write("</stream:stream>"); 1394 writer.flush(); 1395 } 1396 catch (Exception e) { 1397 LOGGER.log(Level.WARNING, "Exception writing closing stream element", e); 1398 } 1399 1400 // Delete the queue contents (hopefully nothing is left). 1401 queue.clear(); 1402 } else if (instantShutdown && isSmEnabled()) { 1403 // This was an instantShutdown and SM is enabled, drain all remaining stanzas 1404 // into the unacknowledgedStanzas queue 1405 drainWriterQueueToUnacknowledgedStanzas(); 1406 } 1407 // Do *not* close the writer here, as it will cause the socket 1408 // to get closed. But we may want to receive further stanzas 1409 // until the closing stream tag is received. The socket will be 1410 // closed in shutdown(). 1411 } 1412 catch (Exception e) { 1413 // The exception can be ignored if the the connection is 'done' 1414 // or if the it was caused because the socket got closed 1415 if (!(done() || queue.isShutdown())) { 1416 // Set running to false since this thread will exit here and notifyConnectionError() will wait until 1417 // the reader and writer thread's 'running' value is false. 1418 running = false; 1419 notifyConnectionError(e); 1420 } else { 1421 LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e); 1422 } 1423 } 1424 } 1425 1426 private void drainWriterQueueToUnacknowledgedStanzas() { 1427 List<Element> elements = new ArrayList<>(queue.size()); 1428 queue.drainTo(elements); 1429 for (int i = 0; i < elements.size(); i++) { 1430 Element element = elements.get(i); 1431 // If the unacknowledgedStanza queue is full, then bail out with a warning message. See SMACK-844. 1432 if (unacknowledgedStanzas.remainingCapacity() == 0) { 1433 StreamManagementException.UnacknowledgedQueueFullException exception = StreamManagementException.UnacknowledgedQueueFullException 1434 .newWith(i, elements, unacknowledgedStanzas); 1435 LOGGER.log(Level.WARNING, 1436 "Some stanzas may be lost as not all could be drained to the unacknowledged stanzas queue", exception); 1437 return; 1438 } 1439 if (element instanceof Stanza) { 1440 unacknowledgedStanzas.add((Stanza) element); 1441 } 1442 } 1443 } 1444 1445 private void maybeAddToUnacknowledgedStanzas(Stanza stanza) throws IOException { 1446 // Check if the stream element should be put to the unacknowledgedStanza 1447 // queue. Note that we can not do the put() in sendStanzaInternal() and the 1448 // packet order is not stable at this point (sendStanzaInternal() can be 1449 // called concurrently). 1450 if (unacknowledgedStanzas != null && stanza != null) { 1451 // If the unacknowledgedStanza queue reaching its high water mark, request an new ack 1452 // from the server in order to drain it 1453 if (unacknowledgedStanzas.size() == UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE_HIGH_WATER_MARK) { 1454 writer.write(AckRequest.INSTANCE.toXML().toString()); 1455 } 1456 1457 try { 1458 // It is important the we put the stanza in the unacknowledged stanza 1459 // queue before we put it on the wire 1460 unacknowledgedStanzas.put(stanza); 1461 } 1462 catch (InterruptedException e) { 1463 throw new IllegalStateException(e); 1464 } 1465 } 1466 } 1467 } 1468 1469 /** 1470 * Set if Stream Management should be used by default for new connections. 1471 * 1472 * @param useSmDefault true to use Stream Management for new connections. 1473 */ 1474 public static void setUseStreamManagementDefault(boolean useSmDefault) { 1475 XMPPTCPConnection.useSmDefault = useSmDefault; 1476 } 1477 1478 /** 1479 * Set if Stream Management resumption should be used by default for new connections. 1480 * 1481 * @param useSmResumptionDefault true to use Stream Management resumption for new connections. 1482 * @deprecated use {@link #setUseStreamManagementResumptionDefault(boolean)} instead. 1483 */ 1484 @Deprecated 1485 public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) { 1486 setUseStreamManagementResumptionDefault(useSmResumptionDefault); 1487 } 1488 1489 /** 1490 * Set if Stream Management resumption should be used by default for new connections. 1491 * 1492 * @param useSmResumptionDefault true to use Stream Management resumption for new connections. 1493 */ 1494 public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) { 1495 if (useSmResumptionDefault) { 1496 // Also enable SM is resumption is enabled 1497 setUseStreamManagementDefault(useSmResumptionDefault); 1498 } 1499 XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault; 1500 } 1501 1502 /** 1503 * Set if Stream Management should be used if supported by the server. 1504 * 1505 * @param useSm true to use Stream Management. 1506 */ 1507 public void setUseStreamManagement(boolean useSm) { 1508 this.useSm = useSm; 1509 } 1510 1511 /** 1512 * Set if Stream Management resumption should be used if supported by the server. 1513 * 1514 * @param useSmResumption true to use Stream Management resumption. 1515 */ 1516 public void setUseStreamManagementResumption(boolean useSmResumption) { 1517 if (useSmResumption) { 1518 // Also enable SM is resumption is enabled 1519 setUseStreamManagement(useSmResumption); 1520 } 1521 this.useSmResumption = useSmResumption; 1522 } 1523 1524 /** 1525 * Set the preferred resumption time in seconds. 1526 * @param resumptionTime the preferred resumption time in seconds 1527 */ 1528 public void setPreferredResumptionTime(int resumptionTime) { 1529 smClientMaxResumptionTime = resumptionTime; 1530 } 1531 1532 /** 1533 * Add a predicate for Stream Management acknowledgment requests. 1534 * <p> 1535 * Those predicates are used to determine when a Stream Management acknowledgement request is send to the server. 1536 * Some pre-defined predicates are found in the <code>org.jivesoftware.smack.sm.predicates</code> package. 1537 * </p> 1538 * <p> 1539 * If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used. 1540 * </p> 1541 * 1542 * @param predicate the predicate to add. 1543 * @return if the predicate was not already active. 1544 */ 1545 public boolean addRequestAckPredicate(StanzaFilter predicate) { 1546 synchronized (requestAckPredicates) { 1547 return requestAckPredicates.add(predicate); 1548 } 1549 } 1550 1551 /** 1552 * Remove the given predicate for Stream Management acknowledgment request. 1553 * @param predicate the predicate to remove. 1554 * @return true if the predicate was removed. 1555 */ 1556 public boolean removeRequestAckPredicate(StanzaFilter predicate) { 1557 synchronized (requestAckPredicates) { 1558 return requestAckPredicates.remove(predicate); 1559 } 1560 } 1561 1562 /** 1563 * Remove all predicates for Stream Management acknowledgment requests. 1564 */ 1565 public void removeAllRequestAckPredicates() { 1566 synchronized (requestAckPredicates) { 1567 requestAckPredicates.clear(); 1568 } 1569 } 1570 1571 /** 1572 * Send an unconditional Stream Management acknowledgement request to the server. 1573 * 1574 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1575 * @throws NotConnectedException if the connection is not connected. 1576 * @throws InterruptedException if the calling thread was interrupted. 1577 */ 1578 public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException { 1579 if (!isSmEnabled()) { 1580 throw new StreamManagementException.StreamManagementNotEnabledException(); 1581 } 1582 requestSmAcknowledgementInternal(); 1583 } 1584 1585 private void requestSmAcknowledgementInternal() throws NotConnectedException, InterruptedException { 1586 packetWriter.sendStreamElement(AckRequest.INSTANCE); 1587 } 1588 1589 /** 1590 * Send a unconditional Stream Management acknowledgment to the server. 1591 * <p> 1592 * See <a href="http://xmpp.org/extensions/xep-0198.html#acking">XEP-198: Stream Management ยง 4. Acks</a>: 1593 * "Either party MAY send an <a/> element at any time (e.g., after it has received a certain number of stanzas, 1594 * or after a certain period of time), even if it has not received an <r/> element from the other party." 1595 * </p> 1596 * 1597 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1598 * @throws NotConnectedException if the connection is not connected. 1599 * @throws InterruptedException if the calling thread was interrupted. 1600 */ 1601 public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException { 1602 if (!isSmEnabled()) { 1603 throw new StreamManagementException.StreamManagementNotEnabledException(); 1604 } 1605 sendSmAcknowledgementInternal(); 1606 } 1607 1608 private void sendSmAcknowledgementInternal() throws NotConnectedException, InterruptedException { 1609 AckAnswer ackAnswer = new AckAnswer(clientHandledStanzasCount); 1610 // Do net put an ack to the queue if it has already been shutdown. Some servers, like ejabberd, like to request 1611 // an ack even after we have send a stream close (and hance the queue was shutdown). If we would not check here, 1612 // then the ack would dangle around in the queue, and be send on the next re-connection attempt even before the 1613 // stream open. 1614 packetWriter.queue.putIfNotShutdown(ackAnswer); 1615 } 1616 1617 /** 1618 * Add a Stanza acknowledged listener. 1619 * <p> 1620 * Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get 1621 * automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, StanzaListener)} when 1622 * possible. 1623 * </p> 1624 * 1625 * @param listener the listener to add. 1626 */ 1627 public void addStanzaAcknowledgedListener(StanzaListener listener) { 1628 stanzaAcknowledgedListeners.add(listener); 1629 } 1630 1631 /** 1632 * Remove the given Stanza acknowledged listener. 1633 * 1634 * @param listener the listener. 1635 * @return true if the listener was removed. 1636 */ 1637 public boolean removeStanzaAcknowledgedListener(StanzaListener listener) { 1638 return stanzaAcknowledgedListeners.remove(listener); 1639 } 1640 1641 /** 1642 * Remove all stanza acknowledged listeners. 1643 */ 1644 public void removeAllStanzaAcknowledgedListeners() { 1645 stanzaAcknowledgedListeners.clear(); 1646 } 1647 1648 /** 1649 * Add a Stanza dropped listener. 1650 * <p> 1651 * Those listeners will be invoked every time a Stanza has been dropped due to a failed SM resume. They will not get 1652 * automatically removed. If at least one StanzaDroppedListener is configured, no attempt will be made to retransmit 1653 * the Stanzas. 1654 * </p> 1655 * 1656 * @param listener the listener to add. 1657 * @since 4.3.3 1658 */ 1659 public void addStanzaDroppedListener(StanzaListener listener) { 1660 stanzaDroppedListeners.add(listener); 1661 } 1662 1663 /** 1664 * Remove the given Stanza dropped listener. 1665 * 1666 * @param listener the listener. 1667 * @return true if the listener was removed. 1668 * @since 4.3.3 1669 */ 1670 public boolean removeStanzaDroppedListener(StanzaListener listener) { 1671 return stanzaDroppedListeners.remove(listener); 1672 } 1673 1674 /** 1675 * Add a new Stanza ID acknowledged listener for the given ID. 1676 * <p> 1677 * The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will 1678 * automatically be removed after the listener was run. 1679 * </p> 1680 * 1681 * @param id the stanza ID. 1682 * @param listener the listener to invoke. 1683 * @return the previous listener for this stanza ID or null. 1684 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1685 */ 1686 @SuppressWarnings("FutureReturnValueIgnored") 1687 public StanzaListener addStanzaIdAcknowledgedListener(final String id, StanzaListener listener) throws StreamManagementNotEnabledException { 1688 // Prevent users from adding callbacks that will never get removed 1689 if (!smWasEnabledAtLeastOnce) { 1690 throw new StreamManagementException.StreamManagementNotEnabledException(); 1691 } 1692 // Remove the listener after max. 3 hours 1693 final int removeAfterSeconds = Math.min(getMaxSmResumptionTime(), 3 * 60 * 60); 1694 schedule(new Runnable() { 1695 @Override 1696 public void run() { 1697 stanzaIdAcknowledgedListeners.remove(id); 1698 } 1699 }, removeAfterSeconds, TimeUnit.SECONDS); 1700 return stanzaIdAcknowledgedListeners.put(id, listener); 1701 } 1702 1703 /** 1704 * Remove the Stanza ID acknowledged listener for the given ID. 1705 * 1706 * @param id the stanza ID. 1707 * @return true if the listener was found and removed, false otherwise. 1708 */ 1709 public StanzaListener removeStanzaIdAcknowledgedListener(String id) { 1710 return stanzaIdAcknowledgedListeners.remove(id); 1711 } 1712 1713 /** 1714 * Removes all Stanza ID acknowledged listeners. 1715 */ 1716 public void removeAllStanzaIdAcknowledgedListeners() { 1717 stanzaIdAcknowledgedListeners.clear(); 1718 } 1719 1720 /** 1721 * Returns true if Stream Management is supported by the server. 1722 * 1723 * @return true if Stream Management is supported by the server. 1724 */ 1725 public boolean isSmAvailable() { 1726 return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE); 1727 } 1728 1729 /** 1730 * Returns true if Stream Management was successfully negotiated with the server. 1731 * 1732 * @return true if Stream Management was negotiated. 1733 */ 1734 public boolean isSmEnabled() { 1735 return smEnabledSyncPoint; 1736 } 1737 1738 /** 1739 * Returns true if the stream was successfully resumed with help of Stream Management. 1740 * 1741 * @return true if the stream was resumed. 1742 */ 1743 public boolean streamWasResumed() { 1744 return smResumedSyncPoint == SyncPointState.successful; 1745 } 1746 1747 /** 1748 * Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible. 1749 * 1750 * @return true if disconnected but resumption possible. 1751 */ 1752 public boolean isDisconnectedButSmResumptionPossible() { 1753 return disconnectedButResumeable && isSmResumptionPossible(); 1754 } 1755 1756 /** 1757 * Returns true if the stream is resumable. 1758 * 1759 * @return true if the stream is resumable. 1760 */ 1761 public boolean isSmResumptionPossible() { 1762 // There is no resumable stream available 1763 if (smSessionId == null) 1764 return false; 1765 1766 final Long shutdownTimestamp = packetWriter.shutdownTimestamp; 1767 // Seems like we are already reconnected, report true 1768 if (shutdownTimestamp == null) { 1769 return true; 1770 } 1771 1772 // See if resumption time is over 1773 long current = System.currentTimeMillis(); 1774 long maxResumptionMillies = ((long) getMaxSmResumptionTime()) * 1000; 1775 if (current > shutdownTimestamp + maxResumptionMillies) { 1776 // Stream resumption is *not* possible if the current timestamp is greater then the greatest timestamp where 1777 // resumption is possible 1778 return false; 1779 } else { 1780 return true; 1781 } 1782 } 1783 1784 /** 1785 * Drop the stream management state. Sets {@link #smSessionId} and 1786 * {@link #unacknowledgedStanzas} to <code>null</code>. 1787 */ 1788 private void dropSmState() { 1789 // clientHandledCount and serverHandledCount will be reset on <enable/> and <enabled/> 1790 // respective. No need to reset them here. 1791 smSessionId = null; 1792 unacknowledgedStanzas = null; 1793 } 1794 1795 /** 1796 * Get the maximum resumption time in seconds after which a managed stream can be resumed. 1797 * <p> 1798 * This method will return {@link Integer#MAX_VALUE} if neither the client nor the server specify a maximum 1799 * resumption time. Be aware of integer overflows when using this value, e.g. do not add arbitrary values to it 1800 * without checking for overflows before. 1801 * </p> 1802 * 1803 * @return the maximum resumption time in seconds or {@link Integer#MAX_VALUE} if none set. 1804 */ 1805 public int getMaxSmResumptionTime() { 1806 int clientResumptionTime = smClientMaxResumptionTime > 0 ? smClientMaxResumptionTime : Integer.MAX_VALUE; 1807 int serverResumptionTime = smServerMaxResumptionTime > 0 ? smServerMaxResumptionTime : Integer.MAX_VALUE; 1808 return Math.min(clientResumptionTime, serverResumptionTime); 1809 } 1810 1811 private void processHandledCount(long handledCount) throws StreamManagementCounterError { 1812 long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount); 1813 final List<Stanza> ackedStanzas = new ArrayList<>( 1814 ackedStanzasCount <= Integer.MAX_VALUE ? (int) ackedStanzasCount 1815 : Integer.MAX_VALUE); 1816 for (long i = 0; i < ackedStanzasCount; i++) { 1817 Stanza ackedStanza = unacknowledgedStanzas.poll(); 1818 // If the server ack'ed a stanza, then it must be in the 1819 // unacknowledged stanza queue. There can be no exception. 1820 if (ackedStanza == null) { 1821 throw new StreamManagementCounterError(handledCount, serverHandledStanzasCount, 1822 ackedStanzasCount, ackedStanzas); 1823 } 1824 ackedStanzas.add(ackedStanza); 1825 } 1826 1827 boolean atLeastOneStanzaAcknowledgedListener = false; 1828 if (!stanzaAcknowledgedListeners.isEmpty()) { 1829 // If stanzaAcknowledgedListeners is not empty, the we have at least one 1830 atLeastOneStanzaAcknowledgedListener = true; 1831 } 1832 else { 1833 // Otherwise we look for a matching id in the stanza *id* acknowledged listeners 1834 for (Stanza ackedStanza : ackedStanzas) { 1835 String id = ackedStanza.getStanzaId(); 1836 if (id != null && stanzaIdAcknowledgedListeners.containsKey(id)) { 1837 atLeastOneStanzaAcknowledgedListener = true; 1838 break; 1839 } 1840 } 1841 } 1842 1843 // Only spawn a new thread if there is a chance that some listener is invoked 1844 if (atLeastOneStanzaAcknowledgedListener) { 1845 asyncGo(new Runnable() { 1846 @Override 1847 public void run() { 1848 for (Stanza ackedStanza : ackedStanzas) { 1849 for (StanzaListener listener : stanzaAcknowledgedListeners) { 1850 try { 1851 listener.processStanza(ackedStanza); 1852 } 1853 catch (InterruptedException | NotConnectedException | NotLoggedInException e) { 1854 LOGGER.log(Level.FINER, "Received exception", e); 1855 } 1856 } 1857 String id = ackedStanza.getStanzaId(); 1858 if (StringUtils.isNullOrEmpty(id)) { 1859 continue; 1860 } 1861 StanzaListener listener = stanzaIdAcknowledgedListeners.remove(id); 1862 if (listener != null) { 1863 try { 1864 listener.processStanza(ackedStanza); 1865 } 1866 catch (InterruptedException | NotConnectedException | NotLoggedInException e) { 1867 LOGGER.log(Level.FINER, "Received exception", e); 1868 } 1869 } 1870 } 1871 } 1872 }); 1873 } 1874 1875 serverHandledStanzasCount = handledCount; 1876 } 1877 1878 /** 1879 * Set the default bundle and defer callback used for new connections. 1880 * 1881 * @param defaultBundleAndDeferCallback TODO javadoc me please 1882 * @see BundleAndDeferCallback 1883 * @since 4.1 1884 */ 1885 public static void setDefaultBundleAndDeferCallback(BundleAndDeferCallback defaultBundleAndDeferCallback) { 1886 XMPPTCPConnection.defaultBundleAndDeferCallback = defaultBundleAndDeferCallback; 1887 } 1888 1889 /** 1890 * Set the bundle and defer callback used for this connection. 1891 * <p> 1892 * You can use <code>null</code> as argument to reset the callback. Outgoing stanzas will then 1893 * no longer get deferred. 1894 * </p> 1895 * 1896 * @param bundleAndDeferCallback the callback or <code>null</code>. 1897 * @see BundleAndDeferCallback 1898 * @since 4.1 1899 */ 1900 public void setBundleandDeferCallback(BundleAndDeferCallback bundleAndDeferCallback) { 1901 this.bundleAndDeferCallback = bundleAndDeferCallback; 1902 } 1903 1904}