001/** 002 * 003 * Copyright 2003-2007 Jive Software. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack.tcp; 018 019import java.io.BufferedReader; 020import java.io.IOException; 021import java.io.InputStream; 022import java.io.InputStreamReader; 023import java.io.OutputStream; 024import java.io.OutputStreamWriter; 025import java.io.Writer; 026import java.net.InetAddress; 027import java.net.InetSocketAddress; 028import java.net.Socket; 029import java.security.cert.CertificateException; 030import java.util.ArrayList; 031import java.util.Collection; 032import java.util.Iterator; 033import java.util.LinkedHashSet; 034import java.util.LinkedList; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038import java.util.concurrent.ArrayBlockingQueue; 039import java.util.concurrent.BlockingQueue; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.ConcurrentLinkedQueue; 042import java.util.concurrent.TimeUnit; 043import java.util.concurrent.atomic.AtomicBoolean; 044import java.util.logging.Level; 045import java.util.logging.Logger; 046 047import javax.net.SocketFactory; 048import javax.net.ssl.HostnameVerifier; 049import javax.net.ssl.SSLSession; 050import javax.net.ssl.SSLSocket; 051import javax.net.ssl.SSLSocketFactory; 052 053import org.jivesoftware.smack.AbstractXMPPConnection; 054import org.jivesoftware.smack.ConnectionConfiguration; 055import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode; 056import org.jivesoftware.smack.ConnectionListener; 057import org.jivesoftware.smack.SmackConfiguration; 058import org.jivesoftware.smack.SmackException; 059import org.jivesoftware.smack.SmackException.AlreadyConnectedException; 060import org.jivesoftware.smack.SmackException.AlreadyLoggedInException; 061import org.jivesoftware.smack.SmackException.ConnectionException; 062import org.jivesoftware.smack.SmackException.EndpointConnectionException; 063import org.jivesoftware.smack.SmackException.NotConnectedException; 064import org.jivesoftware.smack.SmackException.NotLoggedInException; 065import org.jivesoftware.smack.SmackException.OutgoingQueueFullException; 066import org.jivesoftware.smack.SmackException.SecurityNotPossibleException; 067import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException; 068import org.jivesoftware.smack.SmackFuture; 069import org.jivesoftware.smack.StanzaListener; 070import org.jivesoftware.smack.XMPPConnection; 071import org.jivesoftware.smack.XMPPException; 072import org.jivesoftware.smack.XMPPException.FailedNonzaException; 073import org.jivesoftware.smack.XMPPException.StreamErrorException; 074import org.jivesoftware.smack.compress.packet.Compress; 075import org.jivesoftware.smack.compress.packet.Compressed; 076import org.jivesoftware.smack.compression.XMPPInputOutputStream; 077import org.jivesoftware.smack.datatypes.UInt16; 078import org.jivesoftware.smack.filter.StanzaFilter; 079import org.jivesoftware.smack.internal.SmackTlsContext; 080import org.jivesoftware.smack.packet.Element; 081import org.jivesoftware.smack.packet.IQ; 082import org.jivesoftware.smack.packet.Message; 083import org.jivesoftware.smack.packet.Presence; 084import org.jivesoftware.smack.packet.Stanza; 085import org.jivesoftware.smack.packet.StartTls; 086import org.jivesoftware.smack.packet.StreamError; 087import org.jivesoftware.smack.packet.StreamOpen; 088import org.jivesoftware.smack.packet.TopLevelStreamElement; 089import org.jivesoftware.smack.proxy.ProxyInfo; 090import org.jivesoftware.smack.sasl.packet.SaslNonza; 091import org.jivesoftware.smack.sm.SMUtils; 092import org.jivesoftware.smack.sm.StreamManagementException; 093import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException; 094import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError; 095import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException; 096import org.jivesoftware.smack.sm.packet.StreamManagement; 097import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer; 098import org.jivesoftware.smack.sm.packet.StreamManagement.AckRequest; 099import org.jivesoftware.smack.sm.packet.StreamManagement.Enable; 100import org.jivesoftware.smack.sm.packet.StreamManagement.Enabled; 101import org.jivesoftware.smack.sm.packet.StreamManagement.Failed; 102import org.jivesoftware.smack.sm.packet.StreamManagement.Resume; 103import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed; 104import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature; 105import org.jivesoftware.smack.sm.predicates.Predicate; 106import org.jivesoftware.smack.sm.provider.ParseStreamManagement; 107import org.jivesoftware.smack.tcp.rce.RemoteXmppTcpConnectionEndpoints; 108import org.jivesoftware.smack.tcp.rce.Rfc6120TcpRemoteConnectionEndpoint; 109import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown; 110import org.jivesoftware.smack.util.Async; 111import org.jivesoftware.smack.util.CloseableUtil; 112import org.jivesoftware.smack.util.PacketParserUtils; 113import org.jivesoftware.smack.util.StringUtils; 114import org.jivesoftware.smack.util.TLSUtils; 115import org.jivesoftware.smack.util.XmlStringBuilder; 116import org.jivesoftware.smack.util.rce.RemoteConnectionException; 117import org.jivesoftware.smack.xml.SmackXmlParser; 118import org.jivesoftware.smack.xml.XmlPullParser; 119import org.jivesoftware.smack.xml.XmlPullParserException; 120 121import org.jxmpp.jid.impl.JidCreate; 122import org.jxmpp.jid.parts.Resourcepart; 123import org.jxmpp.stringprep.XmppStringprepException; 124import org.minidns.dnsname.DnsName; 125 126/** 127 * Creates a socket connection to an XMPP server. This is the default connection 128 * to an XMPP server and is specified in the XMPP Core (RFC 6120). 129 * 130 * @see XMPPConnection 131 * @author Matt Tucker 132 */ 133public class XMPPTCPConnection extends AbstractXMPPConnection { 134 135 private static final int QUEUE_SIZE = 500; 136 private static final Logger LOGGER = Logger.getLogger(XMPPTCPConnection.class.getName()); 137 138 /** 139 * The socket which is used for this connection. 140 */ 141 private Socket socket; 142 143 /** 144 * 145 */ 146 private boolean disconnectedButResumeable = false; 147 148 private SSLSocket secureSocket; 149 150 /** 151 * Protected access level because of unit test purposes 152 */ 153 protected final PacketWriter packetWriter = new PacketWriter(); 154 155 /** 156 * Protected access level because of unit test purposes 157 */ 158 protected final PacketReader packetReader = new PacketReader(); 159 160 /** 161 * 162 */ 163 private boolean streamFeaturesAfterAuthenticationReceived; 164 165 /** 166 * 167 */ 168 private boolean compressSyncPoint; 169 170 /** 171 * The default bundle and defer callback, used for new connections. 172 * @see bundleAndDeferCallback 173 */ 174 private static BundleAndDeferCallback defaultBundleAndDeferCallback; 175 176 /** 177 * The used bundle and defer callback. 178 * <p> 179 * Although this field may be set concurrently, the 'volatile' keyword was deliberately not added, in order to avoid 180 * having a 'volatile' read within the writer threads loop. 181 * </p> 182 */ 183 private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback; 184 185 private static boolean useSmDefault = true; 186 187 private static boolean useSmResumptionDefault = true; 188 189 /** 190 * The stream ID of the stream that is currently resumable, ie. the stream we hold the state 191 * for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and 192 * {@link #unacknowledgedStanzas}. 193 */ 194 private String smSessionId; 195 196 /** 197 * Represents the state of stream management resumption. 198 * <p> 199 * Unlike other sync points, this sync point is marked volatile because it is also read by the reader thread. 200 * </p> 201 */ 202 private volatile SyncPointState smResumedSyncPoint; 203 private Failed smResumptionFailed; 204 205 /** 206 * Represents the state of stream magement. 207 * <p> 208 * This boolean is marked volatile as it is read by various threads, including the reader thread via {@link #isSmEnabled()}. 209 * </p> 210 */ 211 private volatile boolean smEnabledSyncPoint; 212 213 /** 214 * The client's preferred maximum resumption time in seconds. 215 */ 216 private int smClientMaxResumptionTime = -1; 217 218 /** 219 * The server's preferred maximum resumption time in seconds. 220 */ 221 private int smServerMaxResumptionTime = -1; 222 223 /** 224 * Indicates whether Stream Management (XEP-198) should be used if it's supported by the server. 225 */ 226 private boolean useSm = useSmDefault; 227 private boolean useSmResumption = useSmResumptionDefault; 228 229 /** 230 * The counter that the server sends the client about it's current height. For example, if the server sends 231 * {@code <a h='42'/>}, then this will be set to 42 (while also handling the {@link #unacknowledgedStanzas} queue). 232 */ 233 private long serverHandledStanzasCount = 0; 234 235 /** 236 * The counter for stanzas handled ("received") by the client. 237 * <p> 238 * Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are 239 * not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since 240 * {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and 241 * therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic. 242 * </p> 243 */ 244 private long clientHandledStanzasCount = 0; 245 246 private BlockingQueue<Stanza> unacknowledgedStanzas; 247 248 /** 249 * Set to true if Stream Management was at least once enabled for this connection. 250 */ 251 private boolean smWasEnabledAtLeastOnce = false; 252 253 /** 254 * This listeners are invoked for every stanza that got acknowledged. 255 * <p> 256 * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove 257 * themselves after they have been invoked. 258 * </p> 259 */ 260 private final Collection<StanzaListener> stanzaAcknowledgedListeners = new ConcurrentLinkedQueue<>(); 261 262 /** 263 * These listeners are invoked for every stanza that got dropped. 264 * <p> 265 * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove 266 * themselves after they have been invoked. 267 * </p> 268 */ 269 private final Collection<StanzaListener> stanzaDroppedListeners = new ConcurrentLinkedQueue<>(); 270 271 /** 272 * This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will 273 * only be invoked once and automatically removed after that. 274 */ 275 private final Map<String, StanzaListener> stanzaIdAcknowledgedListeners = new ConcurrentHashMap<>(); 276 277 /** 278 * Predicates that determine if an stream management ack should be requested from the server. 279 * <p> 280 * We use a linked hash set here, so that the order how the predicates are added matches the 281 * order in which they are invoked in order to determine if an ack request should be send or not. 282 * </p> 283 */ 284 private final Set<StanzaFilter> requestAckPredicates = new LinkedHashSet<>(); 285 286 @SuppressWarnings("HidingField") 287 private final XMPPTCPConnectionConfiguration config; 288 289 /** 290 * Creates a new XMPP connection over TCP (optionally using proxies). 291 * <p> 292 * Note that XMPPTCPConnection constructors do not establish a connection to the server 293 * and you must call {@link #connect()}. 294 * </p> 295 * 296 * @param config the connection configuration. 297 */ 298 public XMPPTCPConnection(XMPPTCPConnectionConfiguration config) { 299 super(config); 300 this.config = config; 301 addConnectionListener(new ConnectionListener() { 302 @Override 303 public void connectionClosedOnError(Exception e) { 304 if (e instanceof XMPPException.StreamErrorException || e instanceof StreamManagementException) { 305 dropSmState(); 306 } 307 } 308 }); 309 310 // Re-init the reader and writer in case of SASL <success/>. This is done to reset the parser since a new stream 311 // is initiated. 312 buildNonzaCallback().listenFor(SaslNonza.Success.class, s -> resetParser()).install(); 313 } 314 315 /** 316 * Creates a new XMPP connection over TCP. 317 * <p> 318 * Note that {@code jid} must be the bare JID, e.g. "user@example.org". More fine-grained control over the 319 * connection settings is available using the {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} 320 * constructor. 321 * </p> 322 * 323 * @param jid the bare JID used by the client. 324 * @param password the password or authentication token. 325 * @throws XmppStringprepException if the provided string is invalid. 326 */ 327 public XMPPTCPConnection(CharSequence jid, String password) throws XmppStringprepException { 328 this(XMPPTCPConnectionConfiguration.builder().setXmppAddressAndPassword(jid, password).build()); 329 } 330 331 /** 332 * Creates a new XMPP connection over TCP. 333 * <p> 334 * This is the simplest constructor for connecting to an XMPP server. Alternatively, 335 * you can get fine-grained control over connection settings using the 336 * {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} constructor. 337 * </p> 338 * @param username TODO javadoc me please 339 * @param password TODO javadoc me please 340 * @param serviceName TODO javadoc me please 341 * @throws XmppStringprepException if the provided string is invalid. 342 */ 343 public XMPPTCPConnection(CharSequence username, String password, String serviceName) throws XmppStringprepException { 344 this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setXmppDomain( 345 JidCreate.domainBareFrom(serviceName)).build()); 346 } 347 348 @Override 349 protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException { 350 if (packetWriter == null) { 351 throw new NotConnectedException(); 352 } 353 packetWriter.throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 354 } 355 356 @Override 357 protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException { 358 if (isConnected() && !disconnectedButResumeable) { 359 throw new AlreadyConnectedException(); 360 } 361 } 362 363 @Override 364 protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException { 365 if (isAuthenticated() && !disconnectedButResumeable) { 366 throw new AlreadyLoggedInException(); 367 } 368 } 369 370 @Override 371 protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException { 372 // Reset the flag in case it was set 373 disconnectedButResumeable = false; 374 super.afterSuccessfulLogin(resumed); 375 } 376 377 @Override 378 protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException, 379 SmackException, IOException, InterruptedException { 380 // Authenticate using SASL 381 SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null; 382 383 streamFeaturesAfterAuthenticationReceived = false; 384 authenticate(username, password, config.getAuthzid(), sslSession); 385 386 // Wait for stream features after the authentication. 387 // TODO: The name of this synchronization point "maybeCompressFeaturesReceived" is not perfect. It should be 388 // renamed to "streamFeaturesAfterAuthenticationReceived". 389 waitForConditionOrThrowConnectionException(() -> streamFeaturesAfterAuthenticationReceived, "compress features from server"); 390 391 // If compression is enabled then request the server to use stream compression. XEP-170 392 // recommends to perform stream compression before resource binding. 393 maybeEnableCompression(); 394 395 smResumedSyncPoint = SyncPointState.initial; 396 smResumptionFailed = null; 397 if (isSmResumptionPossible()) { 398 smResumedSyncPoint = SyncPointState.request_sent; 399 sendNonza(new Resume(clientHandledStanzasCount, smSessionId)); 400 waitForConditionOrThrowConnectionException(() -> smResumedSyncPoint == SyncPointState.successful || smResumptionFailed != null, "resume previous stream"); 401 if (smResumedSyncPoint == SyncPointState.successful) { 402 // We successfully resumed the stream, be done here 403 afterSuccessfulLogin(true); 404 return; 405 } 406 // SM resumption failed, what Smack does here is to report success of 407 // lastFeaturesReceived in case of sm resumption was answered with 'failed' so that 408 // normal resource binding can be tried. 409 assert smResumptionFailed != null; 410 LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process: " + smResumptionFailed); 411 } 412 413 // We either failed to resume a previous stream management (SM) session, or we did not even try. In any case, 414 // mark SM as not enabled. Most importantly, we do this prior calling bindResourceAndEstablishSession(), as the 415 // bind IQ may trigger a SM ack request, which would be invalid in the pre resource bound state. 416 smEnabledSyncPoint = false; 417 418 List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>(); 419 if (unacknowledgedStanzas != null) { 420 // There was a previous connection with SM enabled but that was either not resumable or 421 // failed to resume. Make sure that we (re-)send the unacknowledged stanzas. 422 unacknowledgedStanzas.drainTo(previouslyUnackedStanzas); 423 // Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this 424 // XMPP session (There maybe was an enabled in a previous XMPP session of this 425 // connection instance though). This is used in writePackets to decide if stanzas should 426 // be added to the unacknowledged stanzas queue, because they have to be added right 427 // after the 'enable' stream element has been sent. 428 dropSmState(); 429 } 430 431 // Now bind the resource. It is important to do this *after* we dropped an eventually 432 // existing Stream Management state. As otherwise <bind/> and <session/> may end up in 433 // unacknowledgedStanzas and become duplicated on reconnect. See SMACK-706. 434 bindResourceAndEstablishSession(resource); 435 436 if (isSmAvailable() && useSm) { 437 // Remove what is maybe left from previously stream managed sessions 438 serverHandledStanzasCount = 0; 439 sendNonza(new Enable(useSmResumption, smClientMaxResumptionTime)); 440 // XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed' 441 // then this is a non recoverable error and we therefore throw an exception. 442 waitForConditionOrThrowConnectionException(() -> smEnabledSyncPoint, "enabling stream mangement"); 443 synchronized (requestAckPredicates) { 444 if (requestAckPredicates.isEmpty()) { 445 // Assure that we have at lest one predicate set up that so that we request acks 446 // for the server and eventually flush some stanzas from the unacknowledged 447 // stanza queue 448 requestAckPredicates.add(Predicate.forMessagesOrAfter5Stanzas()); 449 } 450 } 451 } 452 // Inform client about failed resumption if possible, resend stanzas otherwise 453 // Process the stanzas synchronously so a client can re-queue them for transmission 454 // before it is informed about connection success 455 if (!stanzaDroppedListeners.isEmpty()) { 456 for (Stanza stanza : previouslyUnackedStanzas) { 457 for (StanzaListener listener : stanzaDroppedListeners) { 458 try { 459 listener.processStanza(stanza); 460 } 461 catch (InterruptedException | NotConnectedException | NotLoggedInException e) { 462 LOGGER.log(Level.FINER, "StanzaDroppedListener received exception", e); 463 } 464 } 465 } 466 } else { 467 for (Stanza stanza : previouslyUnackedStanzas) { 468 sendInternal(stanza); 469 } 470 } 471 472 afterSuccessfulLogin(false); 473 } 474 475 @Override 476 public boolean isSecureConnection() { 477 return secureSocket != null; 478 } 479 480 /** 481 * Shuts the current connection down. After this method returns, the connection must be ready 482 * for re-use by connect. 483 */ 484 @Override 485 protected void shutdown() { 486 if (isSmEnabled()) { 487 try { 488 // Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM 489 // state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either. 490 sendSmAcknowledgementInternal(); 491 } catch (InterruptedException | NotConnectedException e) { 492 LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e); 493 } 494 } 495 shutdown(false); 496 } 497 498 @Override 499 public synchronized void instantShutdown() { 500 shutdown(true); 501 } 502 503 private void shutdown(boolean instant) { 504 // The writer thread may already been finished at this point, for example when the connection is in the 505 // disconnected-but-resumable state. There is no need to wait for the closing stream tag from the server in this 506 // case. 507 if (!packetWriter.done()) { 508 // First shutdown the writer, this will result in a closing stream element getting send to 509 // the server 510 LOGGER.finer(packetWriter.threadName + " shutdown()"); 511 packetWriter.shutdown(instant); 512 LOGGER.finer(packetWriter.threadName + " shutdown() returned"); 513 514 if (!instant) { 515 waitForClosingStreamTagFromServer(); 516 } 517 } 518 519 LOGGER.finer(packetReader.threadName + " shutdown()"); 520 packetReader.shutdown(); 521 LOGGER.finer(packetReader.threadName + " shutdown() returned"); 522 523 CloseableUtil.maybeClose(socket, LOGGER); 524 525 setWasAuthenticated(); 526 527 try { 528 boolean readerAndWriterThreadsTermianted = waitFor(() -> !packetWriter.running && !packetReader.running); 529 if (!readerAndWriterThreadsTermianted) { 530 LOGGER.severe("Reader and/or writer threads did not terminate timely. Writer running: " 531 + packetWriter.running + ", Reader running: " + packetReader.running); 532 } else { 533 LOGGER.fine("Reader and writer threads terminated"); 534 } 535 } catch (InterruptedException e) { 536 LOGGER.log(Level.FINE, "Interrupted while waiting for reader and writer threads to terminate", e); 537 } 538 539 if (disconnectedButResumeable) { 540 return; 541 } 542 543 // If we are able to resume the stream, then don't set 544 // connected/authenticated/usingTLS to false since we like to behave like we are still 545 // connected (e.g. sendStanza should not throw a NotConnectedException). 546 if (instant) { 547 disconnectedButResumeable = isSmResumptionPossible(); 548 if (!disconnectedButResumeable) { 549 // Reset the stream management session id to null, since the stream is no longer resumable. Note that we 550 // keep the unacknowledgedStanzas queue, because we want to resend them when we are reconnected. 551 smSessionId = null; 552 } 553 } else { 554 disconnectedButResumeable = false; 555 556 // Drop the stream management state if this is not an instant shutdown. We send 557 // a </stream> close tag and now the stream management state is no longer valid. 558 // This also prevents that we will potentially (re-)send any unavailable presence we 559 // may have send, because it got put into the unacknowledged queue and was not acknowledged before the 560 // connection terminated. 561 dropSmState(); 562 // Note that we deliberately do not reset authenticatedConnectionInitiallyEstablishedTimestamp here, so that the 563 // information is available in the connectionClosedOnError() listeners. 564 } 565 authenticated = false; 566 connected = false; 567 secureSocket = null; 568 reader = null; 569 writer = null; 570 571 initState(); 572 } 573 574 private interface SmAckAction<E extends Exception> { 575 void run() throws NotConnectedException, E; 576 } 577 578 private <E extends Exception> void requestSmAckIfNecessary(TopLevelStreamElement element, 579 SmAckAction<E> smAckAction) throws NotConnectedException, E { 580 if (!isSmEnabled()) 581 return; 582 583 if (element instanceof Stanza) { 584 Stanza stanza = (Stanza) element; 585 for (StanzaFilter requestAckPredicate : requestAckPredicates) { 586 if (requestAckPredicate.accept(stanza)) { 587 smAckAction.run(); 588 break; 589 } 590 } 591 } 592 } 593 594 @Override 595 protected void sendInternal(TopLevelStreamElement element) throws NotConnectedException, InterruptedException { 596 packetWriter.sendStreamElement(element); 597 requestSmAckIfNecessary(element, () -> requestSmAcknowledgementInternal()); 598 } 599 600 @Override 601 protected void sendNonBlockingInternal(TopLevelStreamElement element) throws NotConnectedException, OutgoingQueueFullException { 602 packetWriter.sendNonBlocking(element); 603 requestSmAckIfNecessary(element, () -> requestSmAcknowledgementNonBlockingInternal()); 604 } 605 606 private void connectUsingConfiguration() throws ConnectionException, IOException, InterruptedException { 607 RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result = RemoteXmppTcpConnectionEndpoints.lookup(config); 608 609 List<RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint>> connectionExceptions = new ArrayList<>(); 610 611 SocketFactory socketFactory = config.getSocketFactory(); 612 ProxyInfo proxyInfo = config.getProxyInfo(); 613 int timeout = config.getConnectTimeout(); 614 if (socketFactory == null) { 615 socketFactory = SocketFactory.getDefault(); 616 } 617 for (Rfc6120TcpRemoteConnectionEndpoint endpoint : result.discoveredRemoteConnectionEndpoints) { 618 Iterator<? extends InetAddress> inetAddresses; 619 String host = endpoint.getHost().toString(); 620 UInt16 portUint16 = endpoint.getPort(); 621 int port = portUint16.intValue(); 622 if (proxyInfo == null) { 623 inetAddresses = endpoint.getInetAddresses().iterator(); 624 assert inetAddresses.hasNext(); 625 626 innerloop: while (inetAddresses.hasNext()) { 627 // Create a *new* Socket before every connection attempt, i.e. connect() call, since Sockets are not 628 // re-usable after a failed connection attempt. See also SMACK-724. 629 SmackFuture.SocketFuture socketFuture = new SmackFuture.SocketFuture(socketFactory); 630 631 final InetAddress inetAddress = inetAddresses.next(); 632 final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, port); 633 LOGGER.finer("Trying to establish TCP connection to " + inetSocketAddress); 634 socketFuture.connectAsync(inetSocketAddress, timeout); 635 636 try { 637 socket = socketFuture.getOrThrow(); 638 } catch (IOException e) { 639 RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>( 640 endpoint, inetAddress, e); 641 connectionExceptions.add(rce); 642 if (inetAddresses.hasNext()) { 643 continue innerloop; 644 } else { 645 break innerloop; 646 } 647 } 648 LOGGER.finer("Established TCP connection to " + inetSocketAddress); 649 // We found a host to connect to, return here 650 this.host = host; 651 this.port = portUint16; 652 return; 653 } 654 } else { 655 // TODO: Move this into the inner-loop above. There appears no reason why we should not try a proxy 656 // connection to every inet address of each connection endpoint. 657 socket = socketFactory.createSocket(); 658 StringUtils.requireNotNullNorEmpty(host, "Host of endpoint " + endpoint + " must not be null when using a Proxy"); 659 final String hostAndPort = host + " at port " + port; 660 LOGGER.finer("Trying to establish TCP connection via Proxy to " + hostAndPort); 661 try { 662 proxyInfo.getProxySocketConnection().connect(socket, host, port, timeout); 663 } catch (IOException e) { 664 CloseableUtil.maybeClose(socket, LOGGER); 665 RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>(endpoint, null, e); 666 connectionExceptions.add(rce); 667 continue; 668 } 669 LOGGER.finer("Established TCP connection to " + hostAndPort); 670 // We found a host to connect to, return here 671 this.host = host; 672 this.port = portUint16; 673 return; 674 } 675 } 676 677 // There are no more host addresses to try 678 // throw an exception and report all tried 679 // HostAddresses in the exception 680 throw EndpointConnectionException.from(result.lookupFailures, connectionExceptions); 681 } 682 683 /** 684 * Initializes the connection by creating a stanza reader and writer and opening a 685 * XMPP stream to the server. 686 * 687 * @throws IOException if an I/O error occurred. 688 * @throws InterruptedException if the calling thread was interrupted. 689 */ 690 private void initConnection() throws IOException, InterruptedException { 691 compressionHandler = null; 692 693 // Set the reader and writer instance variables 694 initReaderAndWriter(); 695 696 // Start the writer thread. This will open an XMPP stream to the server 697 packetWriter.init(); 698 // Start the reader thread. The startup() method will block until we 699 // get an opening stream packet back from server 700 packetReader.init(); 701 } 702 703 private void initReaderAndWriter() throws IOException { 704 InputStream is = socket.getInputStream(); 705 OutputStream os = socket.getOutputStream(); 706 if (compressionHandler != null) { 707 is = compressionHandler.getInputStream(is); 708 os = compressionHandler.getOutputStream(os); 709 } 710 // OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter 711 writer = new OutputStreamWriter(os, "UTF-8"); 712 reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); 713 714 // If debugging is enabled, we open a window and write out all network traffic. 715 initDebugger(); 716 } 717 718 /** 719 * The server has indicated that TLS negotiation can start. We now need to secure the 720 * existing plain connection and perform a handshake. This method won't return until the 721 * connection has finished the handshake or an error occurred while securing the connection. 722 * @throws IOException if an I/O error occurred. 723 * @throws SecurityNotPossibleException if TLS is not possible. 724 * @throws CertificateException if there is an issue with the certificate. 725 */ 726 @SuppressWarnings("LiteralClassName") 727 private void proceedTLSReceived() throws IOException, SecurityNotPossibleException, CertificateException { 728 SmackTlsContext smackTlsContext = getSmackTlsContext(); 729 730 Socket plain = socket; 731 int port = plain.getPort(); 732 String xmppServiceDomainString = config.getXMPPServiceDomain().toString(); 733 SSLSocketFactory sslSocketFactory = smackTlsContext.sslContext.getSocketFactory(); 734 // Secure the plain connection 735 socket = sslSocketFactory.createSocket(plain, xmppServiceDomainString, port, true); 736 737 final SSLSocket sslSocket = (SSLSocket) socket; 738 // Immediately set the enabled SSL protocols and ciphers. See SMACK-712 why this is 739 // important (at least on certain platforms) and it seems to be a good idea anyways to 740 // prevent an accidental implicit handshake. 741 TLSUtils.setEnabledProtocolsAndCiphers(sslSocket, config.getEnabledSSLProtocols(), config.getEnabledSSLCiphers()); 742 743 // Initialize the reader and writer with the new secured version 744 initReaderAndWriter(); 745 746 // Proceed to do the handshake 747 sslSocket.startHandshake(); 748 749 if (smackTlsContext.daneVerifier != null) { 750 smackTlsContext.daneVerifier.finish(sslSocket.getSession()); 751 } 752 753 final HostnameVerifier verifier = getConfiguration().getHostnameVerifier(); 754 if (verifier == null) { 755 throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure."); 756 } 757 758 final String verifierHostname; 759 { 760 DnsName xmppServiceDomainDnsName = getConfiguration().getXmppServiceDomainAsDnsNameIfPossible(); 761 // Try to convert the XMPP service domain, which potentially includes Unicode characters, into ASCII 762 // Compatible Encoding (ACE) to match RFC3280 dNSname IA5String constraint. 763 // See also: https://bugzilla.mozilla.org/show_bug.cgi?id=280839#c1 764 if (xmppServiceDomainDnsName != null) { 765 verifierHostname = xmppServiceDomainDnsName.ace; 766 } 767 else { 768 LOGGER.log(Level.WARNING, "XMPP service domain name '" + getXMPPServiceDomain() 769 + "' can not be represented as DNS name. TLS X.509 certificate validiation may fail."); 770 verifierHostname = getXMPPServiceDomain().toString(); 771 } 772 } 773 774 final boolean verificationSuccessful; 775 // Verify the TLS session. 776 verificationSuccessful = verifier.verify(verifierHostname, sslSocket.getSession()); 777 if (!verificationSuccessful) { 778 throw new CertificateException( 779 "Hostname verification of certificate failed. Certificate does not authenticate " 780 + getXMPPServiceDomain()); 781 } 782 783 // Set that TLS was successful 784 secureSocket = sslSocket; 785 } 786 787 /** 788 * Returns the compression handler that can be used for one compression methods offered by the server. 789 * 790 * @return a instance of XMPPInputOutputStream or null if no suitable instance was found 791 * 792 */ 793 private static XMPPInputOutputStream maybeGetCompressionHandler(Compress.Feature compression) { 794 for (XMPPInputOutputStream handler : SmackConfiguration.getCompressionHandlers()) { 795 String method = handler.getCompressionMethod(); 796 if (compression.getMethods().contains(method)) 797 return handler; 798 } 799 return null; 800 } 801 802 @Override 803 public boolean isUsingCompression() { 804 return compressionHandler != null && compressSyncPoint; 805 } 806 807 /** 808 * <p> 809 * Starts using stream compression that will compress network traffic. Traffic can be 810 * reduced up to 90%. Therefore, stream compression is ideal when using a slow speed network 811 * connection. However, the server and the client will need to use more CPU time in order to 812 * un/compress network data so under high load the server performance might be affected. 813 * </p> 814 * <p> 815 * Stream compression has to have been previously offered by the server. Currently only the 816 * zlib method is supported by the client. Stream compression negotiation has to be done 817 * before authentication took place. 818 * </p> 819 * 820 * @throws NotConnectedException if the XMPP connection is not connected. 821 * @throws SmackException if Smack detected an exceptional situation. 822 * @throws InterruptedException if the calling thread was interrupted. 823 * @throws XMPPException if an XMPP protocol error was received. 824 */ 825 private void maybeEnableCompression() throws SmackException, InterruptedException, XMPPException { 826 if (!config.isCompressionEnabled()) { 827 return; 828 } 829 830 Compress.Feature compression = getFeature(Compress.Feature.class); 831 if (compression == null) { 832 // Server does not support compression 833 return; 834 } 835 // If stream compression was offered by the server and we want to use 836 // compression then send compression request to the server 837 if ((compressionHandler = maybeGetCompressionHandler(compression)) != null) { 838 compressSyncPoint = false; 839 sendNonza(new Compress(compressionHandler.getCompressionMethod())); 840 waitForConditionOrThrowConnectionException(() -> compressSyncPoint, "establishing stream compression"); 841 } else { 842 LOGGER.warning("Could not enable compression because no matching handler/method pair was found"); 843 } 844 } 845 846 /** 847 * Establishes a connection to the XMPP server. It basically 848 * creates and maintains a socket connection to the server. 849 * <p> 850 * Listeners will be preserved from a previous connection if the reconnection 851 * occurs after an abrupt termination. 852 * </p> 853 * 854 * @throws XMPPException if an error occurs while trying to establish the connection. 855 * @throws SmackException if Smack detected an exceptional situation. 856 * @throws IOException if an I/O error occurred. 857 * @throws InterruptedException if the calling thread was interrupted. 858 */ 859 @Override 860 protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException { 861 // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if 862 // there is an error establishing the connection 863 connectUsingConfiguration(); 864 865 connected = true; 866 867 // We connected successfully to the servers TCP port 868 initConnection(); 869 870 // TLS handled will be true either if TLS was established, or if it was not mandatory. 871 waitForConditionOrThrowConnectionException(() -> tlsHandled, "establishing TLS"); 872 873 // Wait with SASL auth until the SASL mechanisms have been received 874 waitForConditionOrThrowConnectionException(() -> saslFeatureReceived, "SASL mechanisms stream feature from server"); 875 } 876 877 /** 878 * For unit testing purposes 879 * 880 * @param writer TODO javadoc me please 881 */ 882 protected void setWriter(Writer writer) { 883 this.writer = writer; 884 } 885 886 @Override 887 protected void afterFeaturesReceived() throws NotConnectedException, InterruptedException, SecurityRequiredByServerException { 888 StartTls startTlsFeature = getFeature(StartTls.class); 889 if (startTlsFeature != null) { 890 if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) { 891 SecurityRequiredByServerException smackException = new SecurityRequiredByServerException(); 892 currentSmackException = smackException; 893 notifyWaitingThreads(); 894 throw smackException; 895 } 896 897 if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) { 898 sendNonza(new StartTls()); 899 } else { 900 tlsHandled = true; 901 notifyWaitingThreads(); 902 } 903 } else { 904 tlsHandled = true; 905 notifyWaitingThreads(); 906 } 907 908 if (isSaslAuthenticated()) { 909 // If we have received features after the SASL has been successfully completed, then we 910 // have also *maybe* received, as it is an optional feature, the compression feature 911 // from the server. 912 streamFeaturesAfterAuthenticationReceived = true; 913 notifyWaitingThreads(); 914 } 915 } 916 917 private void resetParser() throws IOException { 918 try { 919 packetReader.parser = SmackXmlParser.newXmlParser(reader); 920 } catch (XmlPullParserException e) { 921 throw new IOException(e); 922 } 923 } 924 925 private void openStreamAndResetParser() throws IOException, NotConnectedException, InterruptedException { 926 sendStreamOpen(); 927 resetParser(); 928 } 929 930 protected class PacketReader { 931 932 private final String threadName = "Smack Reader (" + getConnectionCounter() + ')'; 933 934 XmlPullParser parser; 935 936 private volatile boolean done; 937 938 private boolean running; 939 940 /** 941 * Initializes the reader in order to be used. The reader is initialized during the 942 * first connection and when reconnecting due to an abruptly disconnection. 943 */ 944 void init() { 945 done = false; 946 947 running = true; 948 Async.go(new Runnable() { 949 @Override 950 public void run() { 951 LOGGER.finer(threadName + " start"); 952 try { 953 parsePackets(); 954 } finally { 955 LOGGER.finer(threadName + " exit"); 956 running = false; 957 notifyWaitingThreads(); 958 } 959 } 960 }, threadName); 961 } 962 963 /** 964 * Shuts the stanza reader down. This method simply sets the 'done' flag to true. 965 */ 966 void shutdown() { 967 done = true; 968 } 969 970 /** 971 * Parse top-level packets in order to process them further. 972 */ 973 private void parsePackets() { 974 try { 975 openStreamAndResetParser(); 976 XmlPullParser.Event eventType = parser.getEventType(); 977 while (!done) { 978 switch (eventType) { 979 case START_ELEMENT: 980 final String name = parser.getName(); 981 final String namespace = parser.getNamespace(); 982 983 switch (name) { 984 case Message.ELEMENT: 985 case IQ.IQ_ELEMENT: 986 case Presence.ELEMENT: 987 try { 988 parseAndProcessStanza(parser); 989 } finally { 990 clientHandledStanzasCount = SMUtils.incrementHeight(clientHandledStanzasCount); 991 } 992 break; 993 case "stream": 994 if (StreamOpen.ETHERX_JABBER_STREAMS_NAMESPACE.equals(namespace)) { 995 onStreamOpen(parser); 996 } 997 break; 998 case "error": 999 StreamError streamError = PacketParserUtils.parseStreamError(parser); 1000 // Stream errors are non recoverable, throw this exceptions. Also note that this will set 1001 // this exception as current connection exceptions and notify any waiting threads. 1002 throw new StreamErrorException(streamError); 1003 case "features": 1004 parseFeaturesAndNotify(parser); 1005 break; 1006 case "proceed": 1007 // Secure the connection by negotiating TLS 1008 proceedTLSReceived(); 1009 // Send a new opening stream to the server 1010 openStreamAndResetParser(); 1011 break; 1012 case "failure": 1013 switch (namespace) { 1014 case "urn:ietf:params:xml:ns:xmpp-tls": 1015 // TLS negotiation has failed. The server will close the connection 1016 // TODO Parse failure stanza 1017 throw new SmackException.SmackMessageException("TLS negotiation has failed"); 1018 case "http://jabber.org/protocol/compress": 1019 // Stream compression has been denied. This is a recoverable 1020 // situation. It is still possible to authenticate and 1021 // use the connection but using an uncompressed connection 1022 // TODO Parse failure stanza 1023 currentSmackException = new SmackException.SmackMessageException("Could not establish compression"); 1024 notifyWaitingThreads(); 1025 break; 1026 default: 1027 parseAndProcessNonza(parser); 1028 } 1029 break; 1030 case Compressed.ELEMENT: 1031 // Server confirmed that it's possible to use stream compression. Start 1032 // stream compression 1033 // Initialize the reader and writer with the new compressed version 1034 initReaderAndWriter(); 1035 // Send a new opening stream to the server 1036 openStreamAndResetParser(); 1037 // Notify that compression is being used 1038 compressSyncPoint = true; 1039 notifyWaitingThreads(); 1040 break; 1041 case Enabled.ELEMENT: 1042 Enabled enabled = ParseStreamManagement.enabled(parser); 1043 if (enabled.isResumeSet()) { 1044 smSessionId = enabled.getId(); 1045 if (StringUtils.isNullOrEmpty(smSessionId)) { 1046 SmackException xmppException = new SmackException.SmackMessageException("Stream Management 'enabled' element with resume attribute but without session id received"); 1047 setCurrentConnectionExceptionAndNotify(xmppException); 1048 throw xmppException; 1049 } 1050 smServerMaxResumptionTime = enabled.getMaxResumptionTime(); 1051 } else { 1052 // Mark this a non-resumable stream by setting smSessionId to null 1053 smSessionId = null; 1054 } 1055 clientHandledStanzasCount = 0; 1056 smWasEnabledAtLeastOnce = true; 1057 smEnabledSyncPoint = true; 1058 notifyWaitingThreads(); 1059 break; 1060 case Failed.ELEMENT: 1061 Failed failed = ParseStreamManagement.failed(parser); 1062 if (smResumedSyncPoint == SyncPointState.request_sent) { 1063 // This is a <failed/> nonza in a response to resuming a previous stream, failure to do 1064 // so is non-fatal as we can simply continue with resource binding in this case. 1065 smResumptionFailed = failed; 1066 notifyWaitingThreads(); 1067 } else { 1068 FailedNonzaException xmppException = new FailedNonzaException(failed, failed.getStanzaErrorCondition()); 1069 setCurrentConnectionExceptionAndNotify(xmppException); 1070 } 1071 break; 1072 case Resumed.ELEMENT: 1073 Resumed resumed = ParseStreamManagement.resumed(parser); 1074 if (!smSessionId.equals(resumed.getPrevId())) { 1075 throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId()); 1076 } 1077 // Mark SM as enabled 1078 smEnabledSyncPoint = true; 1079 // First, drop the stanzas already handled by the server 1080 processHandledCount(resumed.getHandledCount()); 1081 // Then re-send what is left in the unacknowledged queue 1082 List<Stanza> stanzasToResend = new ArrayList<>(unacknowledgedStanzas.size()); 1083 unacknowledgedStanzas.drainTo(stanzasToResend); 1084 for (Stanza stanza : stanzasToResend) { 1085 XMPPTCPConnection.this.sendInternal(stanza); 1086 } 1087 // If there where stanzas resent, then request a SM ack for them. 1088 // Writer's sendStreamElement() won't do it automatically based on 1089 // predicates. 1090 if (!stanzasToResend.isEmpty()) { 1091 requestSmAcknowledgementInternal(); 1092 } 1093 // Mark SM resumption as successful 1094 smResumedSyncPoint = SyncPointState.successful; 1095 notifyWaitingThreads(); 1096 break; 1097 case AckAnswer.ELEMENT: 1098 AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser); 1099 processHandledCount(ackAnswer.getHandledCount()); 1100 break; 1101 case AckRequest.ELEMENT: 1102 ParseStreamManagement.ackRequest(parser); 1103 if (smEnabledSyncPoint) { 1104 sendSmAcknowledgementInternal(); 1105 } else { 1106 LOGGER.warning("SM Ack Request received while SM is not enabled"); 1107 } 1108 break; 1109 default: 1110 parseAndProcessNonza(parser); 1111 break; 1112 } 1113 break; 1114 case END_ELEMENT: 1115 final String endTagName = parser.getName(); 1116 if ("stream".equals(endTagName)) { 1117 if (!parser.getNamespace().equals("http://etherx.jabber.org/streams")) { 1118 LOGGER.warning(XMPPTCPConnection.this + " </stream> but different namespace " + parser.getNamespace()); 1119 break; 1120 } 1121 1122 // Check if the queue was already shut down before reporting success on closing stream tag 1123 // received. This avoids a race if there is a disconnect(), followed by a connect(), which 1124 // did re-start the queue again, causing this writer to assume that the queue is not 1125 // shutdown, which results in a call to disconnect(). 1126 final boolean queueWasShutdown = packetWriter.queue.isShutdown(); 1127 closingStreamReceived = true; 1128 notifyWaitingThreads(); 1129 1130 if (queueWasShutdown) { 1131 // We received a closing stream element *after* we initiated the 1132 // termination of the session by sending a closing stream element to 1133 // the server first 1134 return; 1135 } else { 1136 // We received a closing stream element from the server without us 1137 // sending a closing stream element first. This means that the 1138 // server wants to terminate the session, therefore disconnect 1139 // the connection 1140 LOGGER.info(XMPPTCPConnection.this 1141 + " received closing </stream> element." 1142 + " Server wants to terminate the connection, calling disconnect()"); 1143 ASYNC_BUT_ORDERED.performAsyncButOrdered(XMPPTCPConnection.this, new Runnable() { 1144 @Override 1145 public void run() { 1146 disconnect(); 1147 }}); 1148 } 1149 } 1150 break; 1151 case END_DOCUMENT: 1152 // END_DOCUMENT only happens in an error case, as otherwise we would see a 1153 // closing stream element before. 1154 throw new SmackException.SmackMessageException( 1155 "Parser got END_DOCUMENT event. This could happen e.g. if the server closed the connection without sending a closing stream element"); 1156 default: 1157 // Catch all for incomplete switch (MissingCasesInEnumSwitch) statement. 1158 break; 1159 } 1160 eventType = parser.next(); 1161 } 1162 } 1163 catch (Exception e) { 1164 // Set running to false since this thread will exit here and notifyConnectionError() will wait until 1165 // the reader and writer thread's 'running' value is false. Hence we need to set it to false before calling 1166 // notifyConnetctionError() below, even though run() also sets it to false. Therefore, do not remove this. 1167 running = false; 1168 1169 String ignoreReasonThread = null; 1170 1171 boolean writerThreadWasShutDown = packetWriter.queue.isShutdown(); 1172 if (writerThreadWasShutDown) { 1173 ignoreReasonThread = "writer"; 1174 } else if (done) { 1175 ignoreReasonThread = "reader"; 1176 } 1177 1178 if (ignoreReasonThread != null) { 1179 LOGGER.log(Level.FINER, "Ignoring " + e + " as " + ignoreReasonThread + " was already shut down"); 1180 return; 1181 } 1182 1183 // Close the connection and notify connection listeners of the error. 1184 notifyConnectionError(e); 1185 } 1186 } 1187 } 1188 1189 protected class PacketWriter { 1190 public static final int QUEUE_SIZE = XMPPTCPConnection.QUEUE_SIZE; 1191 public static final int UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE = 1024; 1192 public static final int UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE_HIGH_WATER_MARK = (int) (0.3 * UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE); 1193 1194 private final String threadName = "Smack Writer (" + getConnectionCounter() + ')'; 1195 1196 private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<>( 1197 QUEUE_SIZE, true); 1198 1199 /** 1200 * If set, the stanza writer is shut down 1201 */ 1202 protected volatile Long shutdownTimestamp = null; 1203 1204 private volatile boolean instantShutdown; 1205 1206 /** 1207 * True if some preconditions are given to start the bundle and defer mechanism. 1208 * <p> 1209 * This will likely get set to true right after the start of the writer thread, because 1210 * {@link #nextStreamElement()} will check if {@link queue} is empty, which is probably the case, and then set 1211 * this field to true. 1212 * </p> 1213 */ 1214 private boolean shouldBundleAndDefer; 1215 1216 private boolean running; 1217 1218 /** 1219 * Initializes the writer in order to be used. It is called at the first connection and also 1220 * is invoked if the connection is disconnected by an error. 1221 */ 1222 void init() { 1223 shutdownTimestamp = null; 1224 1225 if (unacknowledgedStanzas != null) { 1226 // It's possible that there are new stanzas in the writer queue that 1227 // came in while we were disconnected but resumable, drain those into 1228 // the unacknowledged queue so that they get resent now 1229 drainWriterQueueToUnacknowledgedStanzas(); 1230 } 1231 1232 queue.start(); 1233 running = true; 1234 Async.go(new Runnable() { 1235 @Override 1236 public void run() { 1237 LOGGER.finer(threadName + " start"); 1238 try { 1239 writePackets(); 1240 } finally { 1241 LOGGER.finer(threadName + " exit"); 1242 running = false; 1243 notifyWaitingThreads(); 1244 } 1245 } 1246 }, threadName); 1247 } 1248 1249 private boolean done() { 1250 return shutdownTimestamp != null; 1251 } 1252 1253 protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException { 1254 final boolean done = done(); 1255 if (done) { 1256 final boolean smResumptionPossible = isSmResumptionPossible(); 1257 // Don't throw a NotConnectedException is there is an resumable stream available 1258 if (!smResumptionPossible) { 1259 throw new NotConnectedException(XMPPTCPConnection.this, "done=" + done 1260 + " smResumptionPossible=" + smResumptionPossible); 1261 } 1262 } 1263 } 1264 1265 /** 1266 * Sends the specified element to the server. 1267 * 1268 * @param element the element to send. 1269 * @throws NotConnectedException if the XMPP connection is not connected. 1270 * @throws InterruptedException if the calling thread was interrupted. 1271 */ 1272 protected void sendStreamElement(Element element) throws NotConnectedException, InterruptedException { 1273 throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 1274 try { 1275 queue.put(element); 1276 } 1277 catch (InterruptedException e) { 1278 // put() may throw an InterruptedException for two reasons: 1279 // 1. If the queue was shut down 1280 // 2. If the thread was interrupted 1281 // so we have to check which is the case 1282 throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 1283 // If the method above did not throw, then the sending thread was interrupted 1284 throw e; 1285 } 1286 } 1287 1288 /** 1289 * Sends the specified element to the server. 1290 * 1291 * @param element the element to send. 1292 * @throws NotConnectedException if the XMPP connection is not connected. 1293 * @throws OutgoingQueueFullException if there is no space in the outgoing queue. 1294 */ 1295 protected void sendNonBlocking(Element element) throws NotConnectedException, OutgoingQueueFullException { 1296 throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 1297 boolean enqueued = queue.offer(element); 1298 if (!enqueued) { 1299 throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 1300 throw new OutgoingQueueFullException(); 1301 } 1302 } 1303 1304 /** 1305 * Shuts down the stanza writer. Once this method has been called, no further 1306 * packets will be written to the server. 1307 */ 1308 void shutdown(boolean instant) { 1309 instantShutdown = instant; 1310 queue.shutdown(); 1311 shutdownTimestamp = System.currentTimeMillis(); 1312 } 1313 1314 /** 1315 * Maybe return the next available element from the queue for writing. If the queue is shut down <b>or</b> a 1316 * spurious interrupt occurs, <code>null</code> is returned. So it is important to check the 'done' condition in 1317 * that case. 1318 * 1319 * @return the next element for writing or null. 1320 */ 1321 private Element nextStreamElement() { 1322 // It is important the we check if the queue is empty before removing an element from it 1323 if (queue.isEmpty()) { 1324 shouldBundleAndDefer = true; 1325 } 1326 Element packet = null; 1327 try { 1328 packet = queue.take(); 1329 } 1330 catch (InterruptedException e) { 1331 if (!queue.isShutdown()) { 1332 // Users shouldn't try to interrupt the packet writer thread 1333 LOGGER.log(Level.WARNING, "Writer thread was interrupted. Don't do that. Use disconnect() instead.", e); 1334 } 1335 } 1336 return packet; 1337 } 1338 1339 private void writePackets() { 1340 try { 1341 // Write out packets from the queue. 1342 while (!done()) { 1343 Element element = nextStreamElement(); 1344 if (element == null) { 1345 continue; 1346 } 1347 1348 // Get a local version of the bundle and defer callback, in case it's unset 1349 // between the null check and the method invocation 1350 final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback; 1351 // If the preconditions are given (e.g. bundleAndDefer callback is set, queue is 1352 // empty), then we could wait a bit for further stanzas attempting to decrease 1353 // our energy consumption 1354 if (localBundleAndDeferCallback != null && isAuthenticated() && shouldBundleAndDefer) { 1355 // Reset shouldBundleAndDefer to false, nextStreamElement() will set it to true once the 1356 // queue is empty again. 1357 shouldBundleAndDefer = false; 1358 final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean(); 1359 final int bundleAndDeferMillis = localBundleAndDeferCallback.getBundleAndDeferMillis(new BundleAndDefer( 1360 bundlingAndDeferringStopped)); 1361 if (bundleAndDeferMillis > 0) { 1362 long remainingWait = bundleAndDeferMillis; 1363 final long waitStart = System.currentTimeMillis(); 1364 synchronized (bundlingAndDeferringStopped) { 1365 while (!bundlingAndDeferringStopped.get() && remainingWait > 0) { 1366 bundlingAndDeferringStopped.wait(remainingWait); 1367 remainingWait = bundleAndDeferMillis 1368 - (System.currentTimeMillis() - waitStart); 1369 } 1370 } 1371 } 1372 } 1373 1374 Stanza packet = null; 1375 if (element instanceof Stanza) { 1376 packet = (Stanza) element; 1377 } 1378 else if (element instanceof Enable) { 1379 // The client needs to add messages to the unacknowledged stanzas queue 1380 // right after it sent 'enabled'. Stanza will be added once 1381 // unacknowledgedStanzas is not null. 1382 unacknowledgedStanzas = new ArrayBlockingQueue<>(UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE); 1383 } 1384 maybeAddToUnacknowledgedStanzas(packet); 1385 1386 CharSequence elementXml = element.toXML(outgoingStreamXmlEnvironment); 1387 if (elementXml instanceof XmlStringBuilder) { 1388 try { 1389 ((XmlStringBuilder) elementXml).write(writer, outgoingStreamXmlEnvironment); 1390 } catch (NullPointerException npe) { 1391 LOGGER.log(Level.FINE, "NPE in XmlStringBuilder of " + element.getClass() + ": " + element, npe); 1392 throw npe; 1393 } 1394 } 1395 else { 1396 writer.write(elementXml.toString()); 1397 } 1398 1399 if (queue.isEmpty()) { 1400 writer.flush(); 1401 } 1402 if (packet != null) { 1403 firePacketSendingListeners(packet); 1404 } 1405 } 1406 if (!instantShutdown) { 1407 // Flush out the rest of the queue. 1408 try { 1409 while (!queue.isEmpty()) { 1410 Element packet = queue.remove(); 1411 if (packet instanceof Stanza) { 1412 Stanza stanza = (Stanza) packet; 1413 maybeAddToUnacknowledgedStanzas(stanza); 1414 } 1415 writer.write(packet.toXML().toString()); 1416 } 1417 } 1418 catch (Exception e) { 1419 LOGGER.log(Level.WARNING, 1420 "Exception flushing queue during shutdown, ignore and continue", 1421 e); 1422 } 1423 1424 // Close the stream. 1425 try { 1426 writer.write("</stream:stream>"); 1427 writer.flush(); 1428 } 1429 catch (Exception e) { 1430 LOGGER.log(Level.WARNING, "Exception writing closing stream element", e); 1431 } 1432 1433 // Delete the queue contents (hopefully nothing is left). 1434 queue.clear(); 1435 } else if (instantShutdown && isSmEnabled()) { 1436 // This was an instantShutdown and SM is enabled, drain all remaining stanzas 1437 // into the unacknowledgedStanzas queue 1438 drainWriterQueueToUnacknowledgedStanzas(); 1439 } 1440 // Do *not* close the writer here, as it will cause the socket 1441 // to get closed. But we may want to receive further stanzas 1442 // until the closing stream tag is received. The socket will be 1443 // closed in shutdown(). 1444 } 1445 catch (Exception e) { 1446 // The exception can be ignored if the connection is 'done' 1447 // or if the it was caused because the socket got closed 1448 if (!(done() || queue.isShutdown())) { 1449 // Set running to false since this thread will exit here and notifyConnectionError() will wait until 1450 // the reader and writer thread's 'running' value is false. 1451 running = false; 1452 notifyConnectionError(e); 1453 } else { 1454 LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e); 1455 } 1456 } 1457 } 1458 1459 private void drainWriterQueueToUnacknowledgedStanzas() { 1460 List<Element> elements = new ArrayList<>(queue.size()); 1461 queue.drainTo(elements); 1462 for (int i = 0; i < elements.size(); i++) { 1463 Element element = elements.get(i); 1464 // If the unacknowledgedStanza queue is full, then bail out with a warning message. See SMACK-844. 1465 if (unacknowledgedStanzas.remainingCapacity() == 0) { 1466 StreamManagementException.UnacknowledgedQueueFullException exception = StreamManagementException.UnacknowledgedQueueFullException 1467 .newWith(i, elements, unacknowledgedStanzas); 1468 LOGGER.log(Level.WARNING, 1469 "Some stanzas may be lost as not all could be drained to the unacknowledged stanzas queue", exception); 1470 return; 1471 } 1472 if (element instanceof Stanza) { 1473 unacknowledgedStanzas.add((Stanza) element); 1474 } 1475 } 1476 } 1477 1478 private void maybeAddToUnacknowledgedStanzas(Stanza stanza) throws IOException { 1479 // Check if the stream element should be put to the unacknowledgedStanza 1480 // queue. Note that we can not do the put() in sendStanzaInternal() and the 1481 // packet order is not stable at this point (sendStanzaInternal() can be 1482 // called concurrently). 1483 if (unacknowledgedStanzas != null && stanza != null) { 1484 // If the unacknowledgedStanza queue reaching its high water mark, request an new ack 1485 // from the server in order to drain it 1486 if (unacknowledgedStanzas.size() == UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE_HIGH_WATER_MARK) { 1487 writer.write(AckRequest.INSTANCE.toXML().toString()); 1488 } 1489 1490 try { 1491 // It is important the we put the stanza in the unacknowledged stanza 1492 // queue before we put it on the wire 1493 unacknowledgedStanzas.put(stanza); 1494 } 1495 catch (InterruptedException e) { 1496 throw new IllegalStateException(e); 1497 } 1498 } 1499 } 1500 } 1501 1502 /** 1503 * Set if Stream Management should be used by default for new connections. 1504 * 1505 * @param useSmDefault true to use Stream Management for new connections. 1506 */ 1507 public static void setUseStreamManagementDefault(boolean useSmDefault) { 1508 XMPPTCPConnection.useSmDefault = useSmDefault; 1509 } 1510 1511 /** 1512 * Set if Stream Management resumption should be used by default for new connections. 1513 * 1514 * @param useSmResumptionDefault true to use Stream Management resumption for new connections. 1515 * @deprecated use {@link #setUseStreamManagementResumptionDefault(boolean)} instead. 1516 */ 1517 @Deprecated 1518 public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) { 1519 setUseStreamManagementResumptionDefault(useSmResumptionDefault); 1520 } 1521 1522 /** 1523 * Set if Stream Management resumption should be used by default for new connections. 1524 * 1525 * @param useSmResumptionDefault true to use Stream Management resumption for new connections. 1526 */ 1527 public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) { 1528 if (useSmResumptionDefault) { 1529 // Also enable SM is resumption is enabled 1530 setUseStreamManagementDefault(useSmResumptionDefault); 1531 } 1532 XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault; 1533 } 1534 1535 /** 1536 * Set if Stream Management should be used if supported by the server. 1537 * 1538 * @param useSm true to use Stream Management. 1539 */ 1540 public void setUseStreamManagement(boolean useSm) { 1541 this.useSm = useSm; 1542 } 1543 1544 /** 1545 * Set if Stream Management resumption should be used if supported by the server. 1546 * 1547 * @param useSmResumption true to use Stream Management resumption. 1548 */ 1549 public void setUseStreamManagementResumption(boolean useSmResumption) { 1550 if (useSmResumption) { 1551 // Also enable SM is resumption is enabled 1552 setUseStreamManagement(useSmResumption); 1553 } 1554 this.useSmResumption = useSmResumption; 1555 } 1556 1557 /** 1558 * Set the preferred resumption time in seconds. 1559 * @param resumptionTime the preferred resumption time in seconds 1560 */ 1561 public void setPreferredResumptionTime(int resumptionTime) { 1562 smClientMaxResumptionTime = resumptionTime; 1563 } 1564 1565 /** 1566 * Add a predicate for Stream Management acknowledgment requests. 1567 * <p> 1568 * Those predicates are used to determine when a Stream Management acknowledgement request is send to the server. 1569 * Some pre-defined predicates are found in the <code>org.jivesoftware.smack.sm.predicates</code> package. 1570 * </p> 1571 * <p> 1572 * If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used. 1573 * </p> 1574 * 1575 * @param predicate the predicate to add. 1576 * @return if the predicate was not already active. 1577 */ 1578 public boolean addRequestAckPredicate(StanzaFilter predicate) { 1579 synchronized (requestAckPredicates) { 1580 return requestAckPredicates.add(predicate); 1581 } 1582 } 1583 1584 /** 1585 * Remove the given predicate for Stream Management acknowledgment request. 1586 * @param predicate the predicate to remove. 1587 * @return true if the predicate was removed. 1588 */ 1589 public boolean removeRequestAckPredicate(StanzaFilter predicate) { 1590 synchronized (requestAckPredicates) { 1591 return requestAckPredicates.remove(predicate); 1592 } 1593 } 1594 1595 /** 1596 * Remove all predicates for Stream Management acknowledgment requests. 1597 */ 1598 public void removeAllRequestAckPredicates() { 1599 synchronized (requestAckPredicates) { 1600 requestAckPredicates.clear(); 1601 } 1602 } 1603 1604 /** 1605 * Send an unconditional Stream Management acknowledgement request to the server. 1606 * 1607 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1608 * @throws NotConnectedException if the connection is not connected. 1609 * @throws InterruptedException if the calling thread was interrupted. 1610 */ 1611 public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException { 1612 if (!isSmEnabled()) { 1613 throw new StreamManagementException.StreamManagementNotEnabledException(); 1614 } 1615 requestSmAcknowledgementInternal(); 1616 } 1617 1618 private void requestSmAcknowledgementInternal() throws NotConnectedException, InterruptedException { 1619 packetWriter.sendStreamElement(AckRequest.INSTANCE); 1620 } 1621 1622 private void requestSmAcknowledgementNonBlockingInternal() throws NotConnectedException, OutgoingQueueFullException { 1623 packetWriter.sendNonBlocking(AckRequest.INSTANCE); 1624 } 1625 1626 /** 1627 * Send a unconditional Stream Management acknowledgment to the server. 1628 * <p> 1629 * See <a href="http://xmpp.org/extensions/xep-0198.html#acking">XEP-198: Stream Management ยง 4. Acks</a>: 1630 * "Either party MAY send an <a/> element at any time (e.g., after it has received a certain number of stanzas, 1631 * or after a certain period of time), even if it has not received an <r/> element from the other party." 1632 * </p> 1633 * 1634 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1635 * @throws NotConnectedException if the connection is not connected. 1636 * @throws InterruptedException if the calling thread was interrupted. 1637 */ 1638 public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException { 1639 if (!isSmEnabled()) { 1640 throw new StreamManagementException.StreamManagementNotEnabledException(); 1641 } 1642 sendSmAcknowledgementInternal(); 1643 } 1644 1645 private void sendSmAcknowledgementInternal() throws NotConnectedException, InterruptedException { 1646 AckAnswer ackAnswer = new AckAnswer(clientHandledStanzasCount); 1647 // Do net put an ack to the queue if it has already been shutdown. Some servers, like ejabberd, like to request 1648 // an ack even after we have send a stream close (and hance the queue was shutdown). If we would not check here, 1649 // then the ack would dangle around in the queue, and be send on the next re-connection attempt even before the 1650 // stream open. 1651 packetWriter.queue.putIfNotShutdown(ackAnswer); 1652 } 1653 1654 /** 1655 * Add a Stanza acknowledged listener. 1656 * <p> 1657 * Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get 1658 * automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, StanzaListener)} when 1659 * possible. 1660 * </p> 1661 * 1662 * @param listener the listener to add. 1663 */ 1664 public void addStanzaAcknowledgedListener(StanzaListener listener) { 1665 stanzaAcknowledgedListeners.add(listener); 1666 } 1667 1668 /** 1669 * Remove the given Stanza acknowledged listener. 1670 * 1671 * @param listener the listener. 1672 * @return true if the listener was removed. 1673 */ 1674 public boolean removeStanzaAcknowledgedListener(StanzaListener listener) { 1675 return stanzaAcknowledgedListeners.remove(listener); 1676 } 1677 1678 /** 1679 * Remove all stanza acknowledged listeners. 1680 */ 1681 public void removeAllStanzaAcknowledgedListeners() { 1682 stanzaAcknowledgedListeners.clear(); 1683 } 1684 1685 /** 1686 * Add a Stanza dropped listener. 1687 * <p> 1688 * Those listeners will be invoked every time a Stanza has been dropped due to a failed SM resume. They will not get 1689 * automatically removed. If at least one StanzaDroppedListener is configured, no attempt will be made to retransmit 1690 * the Stanzas. 1691 * </p> 1692 * 1693 * @param listener the listener to add. 1694 * @since 4.3.3 1695 */ 1696 public void addStanzaDroppedListener(StanzaListener listener) { 1697 stanzaDroppedListeners.add(listener); 1698 } 1699 1700 /** 1701 * Remove the given Stanza dropped listener. 1702 * 1703 * @param listener the listener. 1704 * @return true if the listener was removed. 1705 * @since 4.3.3 1706 */ 1707 public boolean removeStanzaDroppedListener(StanzaListener listener) { 1708 return stanzaDroppedListeners.remove(listener); 1709 } 1710 1711 /** 1712 * Add a new Stanza ID acknowledged listener for the given ID. 1713 * <p> 1714 * The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will 1715 * automatically be removed after the listener was run. 1716 * </p> 1717 * 1718 * @param id the stanza ID. 1719 * @param listener the listener to invoke. 1720 * @return the previous listener for this stanza ID or null. 1721 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1722 */ 1723 @SuppressWarnings("FutureReturnValueIgnored") 1724 public StanzaListener addStanzaIdAcknowledgedListener(final String id, StanzaListener listener) throws StreamManagementNotEnabledException { 1725 // Prevent users from adding callbacks that will never get removed 1726 if (!smWasEnabledAtLeastOnce) { 1727 throw new StreamManagementException.StreamManagementNotEnabledException(); 1728 } 1729 // Remove the listener after max. 3 hours 1730 final int removeAfterSeconds = Math.min(getMaxSmResumptionTime(), 3 * 60 * 60); 1731 schedule(new Runnable() { 1732 @Override 1733 public void run() { 1734 stanzaIdAcknowledgedListeners.remove(id); 1735 } 1736 }, removeAfterSeconds, TimeUnit.SECONDS); 1737 return stanzaIdAcknowledgedListeners.put(id, listener); 1738 } 1739 1740 /** 1741 * Remove the Stanza ID acknowledged listener for the given ID. 1742 * 1743 * @param id the stanza ID. 1744 * @return true if the listener was found and removed, false otherwise. 1745 */ 1746 public StanzaListener removeStanzaIdAcknowledgedListener(String id) { 1747 return stanzaIdAcknowledgedListeners.remove(id); 1748 } 1749 1750 /** 1751 * Removes all Stanza ID acknowledged listeners. 1752 */ 1753 public void removeAllStanzaIdAcknowledgedListeners() { 1754 stanzaIdAcknowledgedListeners.clear(); 1755 } 1756 1757 /** 1758 * Returns true if Stream Management is supported by the server. 1759 * 1760 * @return true if Stream Management is supported by the server. 1761 */ 1762 public boolean isSmAvailable() { 1763 return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE); 1764 } 1765 1766 /** 1767 * Returns true if Stream Management was successfully negotiated with the server. 1768 * 1769 * @return true if Stream Management was negotiated. 1770 */ 1771 public boolean isSmEnabled() { 1772 return smEnabledSyncPoint; 1773 } 1774 1775 /** 1776 * Returns true if the stream was successfully resumed with help of Stream Management. 1777 * 1778 * @return true if the stream was resumed. 1779 */ 1780 public boolean streamWasResumed() { 1781 return smResumedSyncPoint == SyncPointState.successful; 1782 } 1783 1784 /** 1785 * Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible. 1786 * 1787 * @return true if disconnected but resumption possible. 1788 */ 1789 public boolean isDisconnectedButSmResumptionPossible() { 1790 return disconnectedButResumeable && isSmResumptionPossible(); 1791 } 1792 1793 /** 1794 * Returns true if the stream is resumable. 1795 * 1796 * @return true if the stream is resumable. 1797 */ 1798 public boolean isSmResumptionPossible() { 1799 // There is no resumable stream available 1800 if (smSessionId == null) 1801 return false; 1802 1803 final Long shutdownTimestamp = packetWriter.shutdownTimestamp; 1804 // Seems like we are already reconnected, report true 1805 if (shutdownTimestamp == null) { 1806 return true; 1807 } 1808 1809 // See if resumption time is over 1810 long current = System.currentTimeMillis(); 1811 long maxResumptionMillies = ((long) getMaxSmResumptionTime()) * 1000; 1812 if (current > shutdownTimestamp + maxResumptionMillies) { 1813 // Stream resumption is *not* possible if the current timestamp is greater then the greatest timestamp where 1814 // resumption is possible 1815 return false; 1816 } else { 1817 return true; 1818 } 1819 } 1820 1821 /** 1822 * Drop the stream management state. Sets {@link #smSessionId} and 1823 * {@link #unacknowledgedStanzas} to <code>null</code>. 1824 */ 1825 private void dropSmState() { 1826 // clientHandledCount and serverHandledCount will be reset on <enable/> and <enabled/> 1827 // respective. No need to reset them here. 1828 smSessionId = null; 1829 unacknowledgedStanzas = null; 1830 } 1831 1832 /** 1833 * Get the maximum resumption time in seconds after which a managed stream can be resumed. 1834 * <p> 1835 * This method will return {@link Integer#MAX_VALUE} if neither the client nor the server specify a maximum 1836 * resumption time. Be aware of integer overflows when using this value, e.g. do not add arbitrary values to it 1837 * without checking for overflows before. 1838 * </p> 1839 * 1840 * @return the maximum resumption time in seconds or {@link Integer#MAX_VALUE} if none set. 1841 */ 1842 public int getMaxSmResumptionTime() { 1843 int clientResumptionTime = smClientMaxResumptionTime > 0 ? smClientMaxResumptionTime : Integer.MAX_VALUE; 1844 int serverResumptionTime = smServerMaxResumptionTime > 0 ? smServerMaxResumptionTime : Integer.MAX_VALUE; 1845 return Math.min(clientResumptionTime, serverResumptionTime); 1846 } 1847 1848 private void processHandledCount(long handledCount) throws StreamManagementCounterError { 1849 long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount); 1850 final List<Stanza> ackedStanzas = new ArrayList<>( 1851 ackedStanzasCount <= Integer.MAX_VALUE ? (int) ackedStanzasCount 1852 : Integer.MAX_VALUE); 1853 for (long i = 0; i < ackedStanzasCount; i++) { 1854 Stanza ackedStanza = unacknowledgedStanzas.poll(); 1855 // If the server ack'ed a stanza, then it must be in the 1856 // unacknowledged stanza queue. There can be no exception. 1857 if (ackedStanza == null) { 1858 throw new StreamManagementCounterError(handledCount, serverHandledStanzasCount, 1859 ackedStanzasCount, ackedStanzas); 1860 } 1861 ackedStanzas.add(ackedStanza); 1862 } 1863 1864 boolean atLeastOneStanzaAcknowledgedListener = false; 1865 if (!stanzaAcknowledgedListeners.isEmpty()) { 1866 // If stanzaAcknowledgedListeners is not empty, the we have at least one 1867 atLeastOneStanzaAcknowledgedListener = true; 1868 } 1869 else { 1870 // Otherwise we look for a matching id in the stanza *id* acknowledged listeners 1871 for (Stanza ackedStanza : ackedStanzas) { 1872 String id = ackedStanza.getStanzaId(); 1873 if (id != null && stanzaIdAcknowledgedListeners.containsKey(id)) { 1874 atLeastOneStanzaAcknowledgedListener = true; 1875 break; 1876 } 1877 } 1878 } 1879 1880 // Only spawn a new thread if there is a chance that some listener is invoked 1881 if (atLeastOneStanzaAcknowledgedListener) { 1882 asyncGo(new Runnable() { 1883 @Override 1884 public void run() { 1885 for (Stanza ackedStanza : ackedStanzas) { 1886 for (StanzaListener listener : stanzaAcknowledgedListeners) { 1887 try { 1888 listener.processStanza(ackedStanza); 1889 } 1890 catch (InterruptedException | NotConnectedException | NotLoggedInException e) { 1891 LOGGER.log(Level.FINER, "Received exception", e); 1892 } 1893 } 1894 String id = ackedStanza.getStanzaId(); 1895 if (StringUtils.isNullOrEmpty(id)) { 1896 continue; 1897 } 1898 StanzaListener listener = stanzaIdAcknowledgedListeners.remove(id); 1899 if (listener != null) { 1900 try { 1901 listener.processStanza(ackedStanza); 1902 } 1903 catch (InterruptedException | NotConnectedException | NotLoggedInException e) { 1904 LOGGER.log(Level.FINER, "Received exception", e); 1905 } 1906 } 1907 } 1908 } 1909 }); 1910 } 1911 1912 serverHandledStanzasCount = handledCount; 1913 } 1914 1915 /** 1916 * Set the default bundle and defer callback used for new connections. 1917 * 1918 * @param defaultBundleAndDeferCallback TODO javadoc me please 1919 * @see BundleAndDeferCallback 1920 * @since 4.1 1921 */ 1922 public static void setDefaultBundleAndDeferCallback(BundleAndDeferCallback defaultBundleAndDeferCallback) { 1923 XMPPTCPConnection.defaultBundleAndDeferCallback = defaultBundleAndDeferCallback; 1924 } 1925 1926 /** 1927 * Set the bundle and defer callback used for this connection. 1928 * <p> 1929 * You can use <code>null</code> as argument to reset the callback. Outgoing stanzas will then 1930 * no longer get deferred. 1931 * </p> 1932 * 1933 * @param bundleAndDeferCallback the callback or <code>null</code>. 1934 * @see BundleAndDeferCallback 1935 * @since 4.1 1936 */ 1937 public void setBundleandDeferCallback(BundleAndDeferCallback bundleAndDeferCallback) { 1938 this.bundleAndDeferCallback = bundleAndDeferCallback; 1939 } 1940 1941 1942 /** 1943 * Returns the local address currently in use for this connection. 1944 * 1945 * @return the local address 1946 */ 1947 @Override 1948 public InetAddress getLocalAddress() { 1949 final Socket socket = this.socket; 1950 if (socket == null) return null; 1951 1952 InetAddress localAddress = socket.getLocalAddress(); 1953 if (localAddress.isAnyLocalAddress()) return null; 1954 1955 return localAddress; 1956 } 1957}