001/** 002 * 003 * Copyright 2003-2007 Jive Software. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack.tcp; 018 019import java.io.BufferedReader; 020import java.io.ByteArrayInputStream; 021import java.io.FileInputStream; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.InputStreamReader; 025import java.io.OutputStream; 026import java.io.OutputStreamWriter; 027import java.io.Writer; 028import java.lang.reflect.Constructor; 029import java.net.InetAddress; 030import java.net.InetSocketAddress; 031import java.net.Socket; 032import java.security.KeyManagementException; 033import java.security.KeyStore; 034import java.security.KeyStoreException; 035import java.security.NoSuchAlgorithmException; 036import java.security.NoSuchProviderException; 037import java.security.Provider; 038import java.security.SecureRandom; 039import java.security.Security; 040import java.security.UnrecoverableKeyException; 041import java.security.cert.CertificateException; 042import java.util.ArrayList; 043import java.util.Collection; 044import java.util.Iterator; 045import java.util.LinkedHashSet; 046import java.util.LinkedList; 047import java.util.List; 048import java.util.Map; 049import java.util.Set; 050import java.util.concurrent.ArrayBlockingQueue; 051import java.util.concurrent.BlockingQueue; 052import java.util.concurrent.ConcurrentHashMap; 053import java.util.concurrent.ConcurrentLinkedQueue; 054import java.util.concurrent.TimeUnit; 055import java.util.concurrent.atomic.AtomicBoolean; 056import java.util.logging.Level; 057import java.util.logging.Logger; 058 059import javax.net.SocketFactory; 060import javax.net.ssl.HostnameVerifier; 061import javax.net.ssl.KeyManager; 062import javax.net.ssl.KeyManagerFactory; 063import javax.net.ssl.SSLContext; 064import javax.net.ssl.SSLSession; 065import javax.net.ssl.SSLSocket; 066import javax.net.ssl.TrustManager; 067import javax.net.ssl.X509TrustManager; 068import javax.security.auth.callback.Callback; 069import javax.security.auth.callback.CallbackHandler; 070import javax.security.auth.callback.PasswordCallback; 071 072import org.jivesoftware.smack.AbstractConnectionListener; 073import org.jivesoftware.smack.AbstractXMPPConnection; 074import org.jivesoftware.smack.ConnectionConfiguration; 075import org.jivesoftware.smack.ConnectionConfiguration.DnssecMode; 076import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode; 077import org.jivesoftware.smack.SmackConfiguration; 078import org.jivesoftware.smack.SmackException; 079import org.jivesoftware.smack.SmackException.AlreadyConnectedException; 080import org.jivesoftware.smack.SmackException.AlreadyLoggedInException; 081import org.jivesoftware.smack.SmackException.ConnectionException; 082import org.jivesoftware.smack.SmackException.NoResponseException; 083import org.jivesoftware.smack.SmackException.NotConnectedException; 084import org.jivesoftware.smack.SmackException.NotLoggedInException; 085import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException; 086import org.jivesoftware.smack.StanzaListener; 087import org.jivesoftware.smack.SynchronizationPoint; 088import org.jivesoftware.smack.XMPPConnection; 089import org.jivesoftware.smack.XMPPException; 090import org.jivesoftware.smack.XMPPException.FailedNonzaException; 091import org.jivesoftware.smack.XMPPException.StreamErrorException; 092import org.jivesoftware.smack.compress.packet.Compress; 093import org.jivesoftware.smack.compress.packet.Compressed; 094import org.jivesoftware.smack.compression.XMPPInputOutputStream; 095import org.jivesoftware.smack.filter.StanzaFilter; 096import org.jivesoftware.smack.packet.Element; 097import org.jivesoftware.smack.packet.IQ; 098import org.jivesoftware.smack.packet.Message; 099import org.jivesoftware.smack.packet.Nonza; 100import org.jivesoftware.smack.packet.Presence; 101import org.jivesoftware.smack.packet.Stanza; 102import org.jivesoftware.smack.packet.StartTls; 103import org.jivesoftware.smack.packet.StreamError; 104import org.jivesoftware.smack.packet.StreamOpen; 105import org.jivesoftware.smack.proxy.ProxyInfo; 106import org.jivesoftware.smack.sasl.packet.SaslStreamElements; 107import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Challenge; 108import org.jivesoftware.smack.sasl.packet.SaslStreamElements.SASLFailure; 109import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Success; 110import org.jivesoftware.smack.sm.SMUtils; 111import org.jivesoftware.smack.sm.StreamManagementException; 112import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException; 113import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError; 114import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException; 115import org.jivesoftware.smack.sm.packet.StreamManagement; 116import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer; 117import org.jivesoftware.smack.sm.packet.StreamManagement.AckRequest; 118import org.jivesoftware.smack.sm.packet.StreamManagement.Enable; 119import org.jivesoftware.smack.sm.packet.StreamManagement.Enabled; 120import org.jivesoftware.smack.sm.packet.StreamManagement.Failed; 121import org.jivesoftware.smack.sm.packet.StreamManagement.Resume; 122import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed; 123import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature; 124import org.jivesoftware.smack.sm.predicates.Predicate; 125import org.jivesoftware.smack.sm.provider.ParseStreamManagement; 126import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown; 127import org.jivesoftware.smack.util.Async; 128import org.jivesoftware.smack.util.DNSUtil; 129import org.jivesoftware.smack.util.PacketParserUtils; 130import org.jivesoftware.smack.util.StringUtils; 131import org.jivesoftware.smack.util.TLSUtils; 132import org.jivesoftware.smack.util.XmlStringBuilder; 133import org.jivesoftware.smack.util.dns.HostAddress; 134import org.jivesoftware.smack.util.dns.SmackDaneProvider; 135import org.jivesoftware.smack.util.dns.SmackDaneVerifier; 136 137import org.jxmpp.jid.impl.JidCreate; 138import org.jxmpp.jid.parts.Resourcepart; 139import org.jxmpp.stringprep.XmppStringprepException; 140import org.jxmpp.util.XmppStringUtils; 141import org.xmlpull.v1.XmlPullParser; 142import org.xmlpull.v1.XmlPullParserException; 143 144/** 145 * Creates a socket connection to an XMPP server. This is the default connection 146 * to an XMPP server and is specified in the XMPP Core (RFC 6120). 147 * 148 * @see XMPPConnection 149 * @author Matt Tucker 150 */ 151public class XMPPTCPConnection extends AbstractXMPPConnection { 152 153 private static final int QUEUE_SIZE = 500; 154 private static final Logger LOGGER = Logger.getLogger(XMPPTCPConnection.class.getName()); 155 156 /** 157 * The socket which is used for this connection. 158 */ 159 private Socket socket; 160 161 /** 162 * 163 */ 164 private boolean disconnectedButResumeable = false; 165 166 private SSLSocket secureSocket; 167 168 /** 169 * Protected access level because of unit test purposes 170 */ 171 protected PacketWriter packetWriter; 172 173 /** 174 * Protected access level because of unit test purposes 175 */ 176 protected PacketReader packetReader; 177 178 private final SynchronizationPoint<Exception> initialOpenStreamSend = new SynchronizationPoint<>( 179 this, "initial open stream element send to server"); 180 181 /** 182 * 183 */ 184 private final SynchronizationPoint<XMPPException> maybeCompressFeaturesReceived = new SynchronizationPoint<XMPPException>( 185 this, "stream compression feature"); 186 187 /** 188 * 189 */ 190 private final SynchronizationPoint<SmackException> compressSyncPoint = new SynchronizationPoint<>( 191 this, "stream compression"); 192 193 /** 194 * A synchronization point which is successful if this connection has received the closing 195 * stream element from the remote end-point, i.e. the server. 196 */ 197 private final SynchronizationPoint<Exception> closingStreamReceived = new SynchronizationPoint<>( 198 this, "stream closing element received"); 199 200 /** 201 * The default bundle and defer callback, used for new connections. 202 * @see bundleAndDeferCallback 203 */ 204 private static BundleAndDeferCallback defaultBundleAndDeferCallback; 205 206 /** 207 * The used bundle and defer callback. 208 * <p> 209 * Although this field may be set concurrently, the 'volatile' keyword was deliberately not added, in order to avoid 210 * having a 'volatile' read within the writer threads loop. 211 * </p> 212 */ 213 private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback; 214 215 private static boolean useSmDefault = true; 216 217 private static boolean useSmResumptionDefault = true; 218 219 /** 220 * The stream ID of the stream that is currently resumable, ie. the stream we hold the state 221 * for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and 222 * {@link #unacknowledgedStanzas}. 223 */ 224 private String smSessionId; 225 226 private final SynchronizationPoint<FailedNonzaException> smResumedSyncPoint = new SynchronizationPoint<>( 227 this, "stream resumed element"); 228 229 private final SynchronizationPoint<SmackException> smEnabledSyncPoint = new SynchronizationPoint<>( 230 this, "stream enabled element"); 231 232 /** 233 * The client's preferred maximum resumption time in seconds. 234 */ 235 private int smClientMaxResumptionTime = -1; 236 237 /** 238 * The server's preferred maximum resumption time in seconds. 239 */ 240 private int smServerMaxResumptionTime = -1; 241 242 /** 243 * Indicates whether Stream Management (XEP-198) should be used if it's supported by the server. 244 */ 245 private boolean useSm = useSmDefault; 246 private boolean useSmResumption = useSmResumptionDefault; 247 248 /** 249 * The counter that the server sends the client about it's current height. For example, if the server sends 250 * {@code <a h='42'/>}, then this will be set to 42 (while also handling the {@link #unacknowledgedStanzas} queue). 251 */ 252 private long serverHandledStanzasCount = 0; 253 254 /** 255 * The counter for stanzas handled ("received") by the client. 256 * <p> 257 * Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are 258 * not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since 259 * {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and 260 * therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic. 261 * </p> 262 */ 263 private long clientHandledStanzasCount = 0; 264 265 private BlockingQueue<Stanza> unacknowledgedStanzas; 266 267 /** 268 * Set to true if Stream Management was at least once enabled for this connection. 269 */ 270 private boolean smWasEnabledAtLeastOnce = false; 271 272 /** 273 * This listeners are invoked for every stanza that got acknowledged. 274 * <p> 275 * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove 276 * themselves after they have been invoked. 277 * </p> 278 */ 279 private final Collection<StanzaListener> stanzaAcknowledgedListeners = new ConcurrentLinkedQueue<>(); 280 281 /** 282 * This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will 283 * only be invoked once and automatically removed after that. 284 */ 285 private final Map<String, StanzaListener> stanzaIdAcknowledgedListeners = new ConcurrentHashMap<>(); 286 287 /** 288 * Predicates that determine if an stream management ack should be requested from the server. 289 * <p> 290 * We use a linked hash set here, so that the order how the predicates are added matches the 291 * order in which they are invoked in order to determine if an ack request should be send or not. 292 * </p> 293 */ 294 private final Set<StanzaFilter> requestAckPredicates = new LinkedHashSet<>(); 295 296 @SuppressWarnings("HidingField") 297 private final XMPPTCPConnectionConfiguration config; 298 299 /** 300 * Creates a new XMPP connection over TCP (optionally using proxies). 301 * <p> 302 * Note that XMPPTCPConnection constructors do not establish a connection to the server 303 * and you must call {@link #connect()}. 304 * </p> 305 * 306 * @param config the connection configuration. 307 */ 308 public XMPPTCPConnection(XMPPTCPConnectionConfiguration config) { 309 super(config); 310 this.config = config; 311 addConnectionListener(new AbstractConnectionListener() { 312 @Override 313 public void connectionClosedOnError(Exception e) { 314 if (e instanceof XMPPException.StreamErrorException || e instanceof StreamManagementException) { 315 dropSmState(); 316 } 317 } 318 }); 319 } 320 321 /** 322 * Creates a new XMPP connection over TCP. 323 * <p> 324 * Note that {@code jid} must be the bare JID, e.g. "user@example.org". More fine-grained control over the 325 * connection settings is available using the {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} 326 * constructor. 327 * </p> 328 * 329 * @param jid the bare JID used by the client. 330 * @param password the password or authentication token. 331 * @throws XmppStringprepException 332 */ 333 public XMPPTCPConnection(CharSequence jid, String password) throws XmppStringprepException { 334 this(XmppStringUtils.parseLocalpart(jid.toString()), password, XmppStringUtils.parseDomain(jid.toString())); 335 } 336 337 /** 338 * Creates a new XMPP connection over TCP. 339 * <p> 340 * This is the simplest constructor for connecting to an XMPP server. Alternatively, 341 * you can get fine-grained control over connection settings using the 342 * {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} constructor. 343 * </p> 344 * @param username 345 * @param password 346 * @param serviceName 347 * @throws XmppStringprepException 348 */ 349 public XMPPTCPConnection(CharSequence username, String password, String serviceName) throws XmppStringprepException { 350 this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setXmppDomain( 351 JidCreate.domainBareFrom(serviceName)).build()); 352 } 353 354 @Override 355 protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException { 356 if (packetWriter == null) { 357 throw new NotConnectedException(); 358 } 359 packetWriter.throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 360 } 361 362 @Override 363 protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException { 364 if (isConnected() && !disconnectedButResumeable) { 365 throw new AlreadyConnectedException(); 366 } 367 } 368 369 @Override 370 protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException { 371 if (isAuthenticated() && !disconnectedButResumeable) { 372 throw new AlreadyLoggedInException(); 373 } 374 } 375 376 @Override 377 protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException { 378 // Reset the flag in case it was set 379 disconnectedButResumeable = false; 380 super.afterSuccessfulLogin(resumed); 381 } 382 383 @Override 384 protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException, 385 SmackException, IOException, InterruptedException { 386 // Authenticate using SASL 387 SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null; 388 saslAuthentication.authenticate(username, password, config.getAuthzid(), sslSession); 389 390 // If compression is enabled then request the server to use stream compression. XEP-170 391 // recommends to perform stream compression before resource binding. 392 maybeEnableCompression(); 393 394 if (isSmResumptionPossible()) { 395 smResumedSyncPoint.sendAndWaitForResponse(new Resume(clientHandledStanzasCount, smSessionId)); 396 if (smResumedSyncPoint.wasSuccessful()) { 397 // We successfully resumed the stream, be done here 398 afterSuccessfulLogin(true); 399 return; 400 } 401 // SM resumption failed, what Smack does here is to report success of 402 // lastFeaturesReceived in case of sm resumption was answered with 'failed' so that 403 // normal resource binding can be tried. 404 LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process"); 405 } 406 407 List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>(); 408 if (unacknowledgedStanzas != null) { 409 // There was a previous connection with SM enabled but that was either not resumable or 410 // failed to resume. Make sure that we (re-)send the unacknowledged stanzas. 411 unacknowledgedStanzas.drainTo(previouslyUnackedStanzas); 412 // Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this 413 // XMPP session (There maybe was an enabled in a previous XMPP session of this 414 // connection instance though). This is used in writePackets to decide if stanzas should 415 // be added to the unacknowledged stanzas queue, because they have to be added right 416 // after the 'enable' stream element has been sent. 417 dropSmState(); 418 } 419 420 // Now bind the resource. It is important to do this *after* we dropped an eventually 421 // existing Stream Management state. As otherwise <bind/> and <session/> may end up in 422 // unacknowledgedStanzas and become duplicated on reconnect. See SMACK-706. 423 bindResourceAndEstablishSession(resource); 424 425 if (isSmAvailable() && useSm) { 426 // Remove what is maybe left from previously stream managed sessions 427 serverHandledStanzasCount = 0; 428 // XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed' 429 // then this is a non recoverable error and we therefore throw an exception. 430 smEnabledSyncPoint.sendAndWaitForResponseOrThrow(new Enable(useSmResumption, smClientMaxResumptionTime)); 431 synchronized (requestAckPredicates) { 432 if (requestAckPredicates.isEmpty()) { 433 // Assure that we have at lest one predicate set up that so that we request acks 434 // for the server and eventually flush some stanzas from the unacknowledged 435 // stanza queue 436 requestAckPredicates.add(Predicate.forMessagesOrAfter5Stanzas()); 437 } 438 } 439 } 440 // (Re-)send the stanzas *after* we tried to enable SM 441 for (Stanza stanza : previouslyUnackedStanzas) { 442 sendStanzaInternal(stanza); 443 } 444 445 afterSuccessfulLogin(false); 446 } 447 448 @Override 449 public boolean isSecureConnection() { 450 return secureSocket != null; 451 } 452 453 /** 454 * Shuts the current connection down. After this method returns, the connection must be ready 455 * for re-use by connect. 456 */ 457 @Override 458 protected void shutdown() { 459 if (isSmEnabled()) { 460 try { 461 // Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM 462 // state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either. 463 sendSmAcknowledgementInternal(); 464 } catch (InterruptedException | NotConnectedException e) { 465 LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e); 466 } 467 } 468 shutdown(false); 469 } 470 471 /** 472 * Performs an unclean disconnect and shutdown of the connection. Does not send a closing stream stanza. 473 */ 474 public synchronized void instantShutdown() { 475 shutdown(true); 476 } 477 478 private void shutdown(boolean instant) { 479 if (disconnectedButResumeable) { 480 return; 481 } 482 483 // First shutdown the writer, this will result in a closing stream element getting send to 484 // the server 485 if (packetWriter != null) { 486 LOGGER.finer("PacketWriter shutdown()"); 487 packetWriter.shutdown(instant); 488 } 489 LOGGER.finer("PacketWriter has been shut down"); 490 491 if (!instant) { 492 try { 493 // After we send the closing stream element, check if there was already a 494 // closing stream element sent by the server or wait with a timeout for a 495 // closing stream element to be received from the server. 496 @SuppressWarnings("unused") 497 Exception res = closingStreamReceived.checkIfSuccessOrWait(); 498 } catch (InterruptedException | NoResponseException e) { 499 LOGGER.log(Level.INFO, "Exception while waiting for closing stream element from the server " + this, e); 500 } 501 } 502 503 if (packetReader != null) { 504 LOGGER.finer("PacketReader shutdown()"); 505 packetReader.shutdown(); 506 } 507 LOGGER.finer("PacketReader has been shut down"); 508 509 try { 510 socket.close(); 511 } catch (Exception e) { 512 LOGGER.log(Level.WARNING, "shutdown", e); 513 } 514 515 setWasAuthenticated(); 516 // If we are able to resume the stream, then don't set 517 // connected/authenticated/usingTLS to false since we like behave like we are still 518 // connected (e.g. sendStanza should not throw a NotConnectedException). 519 if (isSmResumptionPossible() && instant) { 520 disconnectedButResumeable = true; 521 } else { 522 disconnectedButResumeable = false; 523 // Reset the stream management session id to null, since if the stream is cleanly closed, i.e. sending a closing 524 // stream tag, there is no longer a stream to resume. 525 smSessionId = null; 526 } 527 authenticated = false; 528 connected = false; 529 secureSocket = null; 530 reader = null; 531 writer = null; 532 533 maybeCompressFeaturesReceived.init(); 534 compressSyncPoint.init(); 535 smResumedSyncPoint.init(); 536 smEnabledSyncPoint.init(); 537 initialOpenStreamSend.init(); 538 } 539 540 @Override 541 public void sendNonza(Nonza element) throws NotConnectedException, InterruptedException { 542 packetWriter.sendStreamElement(element); 543 } 544 545 @Override 546 protected void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException { 547 packetWriter.sendStreamElement(packet); 548 if (isSmEnabled()) { 549 for (StanzaFilter requestAckPredicate : requestAckPredicates) { 550 if (requestAckPredicate.accept(packet)) { 551 requestSmAcknowledgementInternal(); 552 break; 553 } 554 } 555 } 556 } 557 558 private void connectUsingConfiguration() throws ConnectionException, IOException { 559 List<HostAddress> failedAddresses = populateHostAddresses(); 560 SocketFactory socketFactory = config.getSocketFactory(); 561 ProxyInfo proxyInfo = config.getProxyInfo(); 562 int timeout = config.getConnectTimeout(); 563 if (socketFactory == null) { 564 socketFactory = SocketFactory.getDefault(); 565 } 566 for (HostAddress hostAddress : hostAddresses) { 567 Iterator<InetAddress> inetAddresses; 568 String host = hostAddress.getHost(); 569 int port = hostAddress.getPort(); 570 if (proxyInfo == null) { 571 inetAddresses = hostAddress.getInetAddresses().iterator(); 572 assert (inetAddresses.hasNext()); 573 574 innerloop: while (inetAddresses.hasNext()) { 575 // Create a *new* Socket before every connection attempt, i.e. connect() call, since Sockets are not 576 // re-usable after a failed connection attempt. See also SMACK-724. 577 socket = socketFactory.createSocket(); 578 579 final InetAddress inetAddress = inetAddresses.next(); 580 final String inetAddressAndPort = inetAddress + " at port " + port; 581 LOGGER.finer("Trying to establish TCP connection to " + inetAddressAndPort); 582 try { 583 socket.connect(new InetSocketAddress(inetAddress, port), timeout); 584 } catch (Exception e) { 585 hostAddress.setException(inetAddress, e); 586 if (inetAddresses.hasNext()) { 587 continue innerloop; 588 } else { 589 break innerloop; 590 } 591 } 592 LOGGER.finer("Established TCP connection to " + inetAddressAndPort); 593 // We found a host to connect to, return here 594 this.host = host; 595 this.port = port; 596 return; 597 } 598 failedAddresses.add(hostAddress); 599 } else { 600 socket = socketFactory.createSocket(); 601 StringUtils.requireNotNullOrEmpty(host, "Host of HostAddress " + hostAddress + " must not be null when using a Proxy"); 602 final String hostAndPort = host + " at port " + port; 603 LOGGER.finer("Trying to establish TCP connection via Proxy to " + hostAndPort); 604 try { 605 proxyInfo.getProxySocketConnection().connect(socket, host, port, timeout); 606 } catch (IOException e) { 607 hostAddress.setException(e); 608 continue; 609 } 610 LOGGER.finer("Established TCP connection to " + hostAndPort); 611 // We found a host to connect to, return here 612 this.host = host; 613 this.port = port; 614 return; 615 } 616 } 617 // There are no more host addresses to try 618 // throw an exception and report all tried 619 // HostAddresses in the exception 620 throw ConnectionException.from(failedAddresses); 621 } 622 623 /** 624 * Initializes the connection by creating a stanza reader and writer and opening a 625 * XMPP stream to the server. 626 * 627 * @throws XMPPException if establishing a connection to the server fails. 628 * @throws SmackException if the server fails to respond back or if there is anther error. 629 * @throws IOException 630 */ 631 private void initConnection() throws IOException { 632 boolean isFirstInitialization = packetReader == null || packetWriter == null; 633 compressionHandler = null; 634 635 // Set the reader and writer instance variables 636 initReaderAndWriter(); 637 638 if (isFirstInitialization) { 639 packetWriter = new PacketWriter(); 640 packetReader = new PacketReader(); 641 } 642 // Start the writer thread. This will open an XMPP stream to the server 643 packetWriter.init(); 644 // Start the reader thread. The startup() method will block until we 645 // get an opening stream packet back from server 646 packetReader.init(); 647 } 648 649 private void initReaderAndWriter() throws IOException { 650 InputStream is = socket.getInputStream(); 651 OutputStream os = socket.getOutputStream(); 652 if (compressionHandler != null) { 653 is = compressionHandler.getInputStream(is); 654 os = compressionHandler.getOutputStream(os); 655 } 656 // OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter 657 writer = new OutputStreamWriter(os, "UTF-8"); 658 reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); 659 660 // If debugging is enabled, we open a window and write out all network traffic. 661 initDebugger(); 662 } 663 664 /** 665 * The server has indicated that TLS negotiation can start. We now need to secure the 666 * existing plain connection and perform a handshake. This method won't return until the 667 * connection has finished the handshake or an error occurred while securing the connection. 668 * @throws IOException 669 * @throws CertificateException 670 * @throws NoSuchAlgorithmException 671 * @throws NoSuchProviderException 672 * @throws KeyStoreException 673 * @throws UnrecoverableKeyException 674 * @throws KeyManagementException 675 * @throws SmackException 676 * @throws Exception if an exception occurs. 677 */ 678 @SuppressWarnings("LiteralClassName") 679 private void proceedTLSReceived() throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException, NoSuchProviderException, UnrecoverableKeyException, KeyManagementException, SmackException { 680 SmackDaneVerifier daneVerifier = null; 681 682 if (config.getDnssecMode() == DnssecMode.needsDnssecAndDane) { 683 SmackDaneProvider daneProvider = DNSUtil.getDaneProvider(); 684 if (daneProvider == null) { 685 throw new UnsupportedOperationException("DANE enabled but no SmackDaneProvider configured"); 686 } 687 daneVerifier = daneProvider.newInstance(); 688 if (daneVerifier == null) { 689 throw new IllegalStateException("DANE requested but DANE provider did not return a DANE verifier"); 690 } 691 } 692 693 SSLContext context = this.config.getCustomSSLContext(); 694 KeyStore ks = null; 695 PasswordCallback pcb = null; 696 697 if (context == null) { 698 final String keyStoreType = config.getKeystoreType(); 699 final CallbackHandler callbackHandler = config.getCallbackHandler(); 700 final String keystorePath = config.getKeystorePath(); 701 if ("PKCS11".equals(keyStoreType)) { 702 try { 703 Constructor<?> c = Class.forName("sun.security.pkcs11.SunPKCS11").getConstructor(InputStream.class); 704 String pkcs11Config = "name = SmartCard\nlibrary = " + config.getPKCS11Library(); 705 ByteArrayInputStream config = new ByteArrayInputStream(pkcs11Config.getBytes(StringUtils.UTF8)); 706 Provider p = (Provider) c.newInstance(config); 707 Security.addProvider(p); 708 ks = KeyStore.getInstance("PKCS11",p); 709 pcb = new PasswordCallback("PKCS11 Password: ",false); 710 callbackHandler.handle(new Callback[] {pcb}); 711 ks.load(null,pcb.getPassword()); 712 } 713 catch (Exception e) { 714 LOGGER.log(Level.WARNING, "Exception", e); 715 ks = null; 716 } 717 } 718 else if ("Apple".equals(keyStoreType)) { 719 ks = KeyStore.getInstance("KeychainStore","Apple"); 720 ks.load(null,null); 721 // pcb = new PasswordCallback("Apple Keychain",false); 722 // pcb.setPassword(null); 723 } 724 else if (keyStoreType != null) { 725 ks = KeyStore.getInstance(keyStoreType); 726 if (callbackHandler != null && StringUtils.isNotEmpty(keystorePath)) { 727 try { 728 pcb = new PasswordCallback("Keystore Password: ", false); 729 callbackHandler.handle(new Callback[] { pcb }); 730 ks.load(new FileInputStream(keystorePath), pcb.getPassword()); 731 } 732 catch (Exception e) { 733 LOGGER.log(Level.WARNING, "Exception", e); 734 ks = null; 735 } 736 } else { 737 ks.load(null, null); 738 } 739 } 740 741 KeyManager[] kms = null; 742 743 if (ks != null) { 744 String keyManagerFactoryAlgorithm = KeyManagerFactory.getDefaultAlgorithm(); 745 KeyManagerFactory kmf = null; 746 try { 747 kmf = KeyManagerFactory.getInstance(keyManagerFactoryAlgorithm); 748 } 749 catch (NoSuchAlgorithmException e) { 750 LOGGER.log(Level.FINE, "Could get the default KeyManagerFactory for the '" 751 + keyManagerFactoryAlgorithm + "' algorithm", e); 752 } 753 if (kmf != null) { 754 try { 755 if (pcb == null) { 756 kmf.init(ks, null); 757 } 758 else { 759 kmf.init(ks, pcb.getPassword()); 760 pcb.clearPassword(); 761 } 762 kms = kmf.getKeyManagers(); 763 } 764 catch (NullPointerException npe) { 765 LOGGER.log(Level.WARNING, "NullPointerException", npe); 766 } 767 } 768 } 769 770 // If the user didn't specify a SSLContext, use the default one 771 context = SSLContext.getInstance("TLS"); 772 773 final SecureRandom secureRandom = new java.security.SecureRandom(); 774 X509TrustManager customTrustManager = config.getCustomX509TrustManager(); 775 776 if (daneVerifier != null) { 777 // User requested DANE verification. 778 daneVerifier.init(context, kms, customTrustManager, secureRandom); 779 } else { 780 TrustManager[] customTrustManagers = null; 781 if (customTrustManager != null) { 782 customTrustManagers = new TrustManager[] { customTrustManager }; 783 } 784 context.init(kms, customTrustManagers, secureRandom); 785 } 786 } 787 788 Socket plain = socket; 789 // Secure the plain connection 790 socket = context.getSocketFactory().createSocket(plain, 791 config.getXMPPServiceDomain().toString(), plain.getPort(), true); 792 793 final SSLSocket sslSocket = (SSLSocket) socket; 794 // Immediately set the enabled SSL protocols and ciphers. See SMACK-712 why this is 795 // important (at least on certain platforms) and it seems to be a good idea anyways to 796 // prevent an accidental implicit handshake. 797 TLSUtils.setEnabledProtocolsAndCiphers(sslSocket, config.getEnabledSSLProtocols(), config.getEnabledSSLCiphers()); 798 799 // Initialize the reader and writer with the new secured version 800 initReaderAndWriter(); 801 802 // Proceed to do the handshake 803 sslSocket.startHandshake(); 804 805 if (daneVerifier != null) { 806 daneVerifier.finish(sslSocket); 807 } 808 809 final HostnameVerifier verifier = getConfiguration().getHostnameVerifier(); 810 if (verifier == null) { 811 throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure."); 812 } else if (!verifier.verify(getXMPPServiceDomain().toString(), sslSocket.getSession())) { 813 throw new CertificateException("Hostname verification of certificate failed. Certificate does not authenticate " + getXMPPServiceDomain()); 814 } 815 816 // Set that TLS was successful 817 secureSocket = sslSocket; 818 } 819 820 /** 821 * Returns the compression handler that can be used for one compression methods offered by the server. 822 * 823 * @return a instance of XMPPInputOutputStream or null if no suitable instance was found 824 * 825 */ 826 private static XMPPInputOutputStream maybeGetCompressionHandler(Compress.Feature compression) { 827 for (XMPPInputOutputStream handler : SmackConfiguration.getCompressionHandlers()) { 828 String method = handler.getCompressionMethod(); 829 if (compression.getMethods().contains(method)) 830 return handler; 831 } 832 return null; 833 } 834 835 @Override 836 public boolean isUsingCompression() { 837 return compressionHandler != null && compressSyncPoint.wasSuccessful(); 838 } 839 840 /** 841 * <p> 842 * Starts using stream compression that will compress network traffic. Traffic can be 843 * reduced up to 90%. Therefore, stream compression is ideal when using a slow speed network 844 * connection. However, the server and the client will need to use more CPU time in order to 845 * un/compress network data so under high load the server performance might be affected. 846 * </p> 847 * <p> 848 * Stream compression has to have been previously offered by the server. Currently only the 849 * zlib method is supported by the client. Stream compression negotiation has to be done 850 * before authentication took place. 851 * </p> 852 * 853 * @throws NotConnectedException 854 * @throws SmackException 855 * @throws NoResponseException 856 * @throws InterruptedException 857 */ 858 private void maybeEnableCompression() throws SmackException, InterruptedException { 859 if (!config.isCompressionEnabled()) { 860 return; 861 } 862 maybeCompressFeaturesReceived.checkIfSuccessOrWait(); 863 Compress.Feature compression = getFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE); 864 if (compression == null) { 865 // Server does not support compression 866 return; 867 } 868 // If stream compression was offered by the server and we want to use 869 // compression then send compression request to the server 870 if ((compressionHandler = maybeGetCompressionHandler(compression)) != null) { 871 compressSyncPoint.sendAndWaitForResponseOrThrow(new Compress(compressionHandler.getCompressionMethod())); 872 } else { 873 LOGGER.warning("Could not enable compression because no matching handler/method pair was found"); 874 } 875 } 876 877 /** 878 * Establishes a connection to the XMPP server. It basically 879 * creates and maintains a socket connection to the server. 880 * <p> 881 * Listeners will be preserved from a previous connection if the reconnection 882 * occurs after an abrupt termination. 883 * </p> 884 * 885 * @throws XMPPException if an error occurs while trying to establish the connection. 886 * @throws SmackException 887 * @throws IOException 888 * @throws InterruptedException 889 */ 890 @Override 891 protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException { 892 closingStreamReceived.init(); 893 // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if 894 // there is an error establishing the connection 895 connectUsingConfiguration(); 896 897 // We connected successfully to the servers TCP port 898 initConnection(); 899 900 // TLS handled will be successful either if TLS was established, or if it was not mandatory. 901 tlsHandled.checkIfSuccessOrWaitOrThrow(); 902 903 // Wait with SASL auth until the SASL mechanisms have been received 904 saslFeatureReceived.checkIfSuccessOrWaitOrThrow(); 905 } 906 907 /** 908 * Sends out a notification that there was an error with the connection 909 * and closes the connection. Also prints the stack trace of the given exception 910 * 911 * @param e the exception that causes the connection close event. 912 */ 913 private synchronized void notifyConnectionError(Exception e) { 914 // Listeners were already notified of the exception, return right here. 915 if ((packetReader == null || packetReader.done) && 916 (packetWriter == null || packetWriter.done())) return; 917 918 // Closes the connection temporary. A reconnection is possible 919 // Note that a connection listener of XMPPTCPConnection will drop the SM state in 920 // case the Exception is a StreamErrorException. 921 instantShutdown(); 922 923 // Notify connection listeners of the error. 924 callConnectionClosedOnErrorListener(e); 925 } 926 927 /** 928 * For unit testing purposes 929 * 930 * @param writer 931 */ 932 protected void setWriter(Writer writer) { 933 this.writer = writer; 934 } 935 936 @Override 937 protected void afterFeaturesReceived() throws NotConnectedException, InterruptedException { 938 StartTls startTlsFeature = getFeature(StartTls.ELEMENT, StartTls.NAMESPACE); 939 if (startTlsFeature != null) { 940 if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) { 941 SmackException smackException = new SecurityRequiredByServerException(); 942 tlsHandled.reportFailure(smackException); 943 notifyConnectionError(smackException); 944 return; 945 } 946 947 if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) { 948 sendNonza(new StartTls()); 949 } else { 950 tlsHandled.reportSuccess(); 951 } 952 } else { 953 tlsHandled.reportSuccess(); 954 } 955 956 if (getSASLAuthentication().authenticationSuccessful()) { 957 // If we have received features after the SASL has been successfully completed, then we 958 // have also *maybe* received, as it is an optional feature, the compression feature 959 // from the server. 960 maybeCompressFeaturesReceived.reportSuccess(); 961 } 962 } 963 964 /** 965 * Resets the parser using the latest connection's reader. Resetting the parser is necessary 966 * when the plain connection has been secured or when a new opening stream element is going 967 * to be sent by the server. 968 * 969 * @throws SmackException if the parser could not be reset. 970 * @throws InterruptedException 971 */ 972 void openStream() throws SmackException, InterruptedException { 973 // If possible, provide the receiving entity of the stream open tag, i.e. the server, as much information as 974 // possible. The 'to' attribute is *always* available. The 'from' attribute if set by the user and no external 975 // mechanism is used to determine the local entity (user). And the 'id' attribute is available after the first 976 // response from the server (see e.g. RFC 6120 ยง 9.1.1 Step 2.) 977 CharSequence to = getXMPPServiceDomain(); 978 CharSequence from = null; 979 CharSequence localpart = config.getUsername(); 980 if (localpart != null) { 981 from = XmppStringUtils.completeJidFrom(localpart, to); 982 } 983 String id = getStreamId(); 984 sendNonza(new StreamOpen(to, from, id)); 985 try { 986 packetReader.parser = PacketParserUtils.newXmppParser(reader); 987 } 988 catch (XmlPullParserException e) { 989 throw new SmackException(e); 990 } 991 } 992 993 protected class PacketReader { 994 995 XmlPullParser parser; 996 997 private volatile boolean done; 998 999 /** 1000 * Initializes the reader in order to be used. The reader is initialized during the 1001 * first connection and when reconnecting due to an abruptly disconnection. 1002 */ 1003 void init() { 1004 done = false; 1005 1006 Async.go(new Runnable() { 1007 @Override 1008 public void run() { 1009 parsePackets(); 1010 } 1011 }, "Smack Reader (" + getConnectionCounter() + ")"); 1012 } 1013 1014 /** 1015 * Shuts the stanza reader down. This method simply sets the 'done' flag to true. 1016 */ 1017 void shutdown() { 1018 done = true; 1019 } 1020 1021 /** 1022 * Parse top-level packets in order to process them further. 1023 */ 1024 private void parsePackets() { 1025 try { 1026 initialOpenStreamSend.checkIfSuccessOrWait(); 1027 int eventType = parser.getEventType(); 1028 while (!done) { 1029 switch (eventType) { 1030 case XmlPullParser.START_TAG: 1031 final String name = parser.getName(); 1032 switch (name) { 1033 case Message.ELEMENT: 1034 case IQ.IQ_ELEMENT: 1035 case Presence.ELEMENT: 1036 try { 1037 parseAndProcessStanza(parser); 1038 } finally { 1039 clientHandledStanzasCount = SMUtils.incrementHeight(clientHandledStanzasCount); 1040 } 1041 break; 1042 case "stream": 1043 // We found an opening stream. 1044 if ("jabber:client".equals(parser.getNamespace(null))) { 1045 streamId = parser.getAttributeValue("", "id"); 1046 String reportedServerDomain = parser.getAttributeValue("", "from"); 1047 assert (config.getXMPPServiceDomain().equals(reportedServerDomain)); 1048 } 1049 break; 1050 case "error": 1051 StreamError streamError = PacketParserUtils.parseStreamError(parser); 1052 saslFeatureReceived.reportFailure(new StreamErrorException(streamError)); 1053 // Mark the tlsHandled sync point as success, we will use the saslFeatureReceived sync 1054 // point to report the error, which is checked immediately after tlsHandled in 1055 // connectInternal(). 1056 tlsHandled.reportSuccess(); 1057 throw new StreamErrorException(streamError); 1058 case "features": 1059 parseFeatures(parser); 1060 break; 1061 case "proceed": 1062 try { 1063 // Secure the connection by negotiating TLS 1064 proceedTLSReceived(); 1065 // Send a new opening stream to the server 1066 openStream(); 1067 } 1068 catch (Exception e) { 1069 SmackException smackException = new SmackException(e); 1070 tlsHandled.reportFailure(smackException); 1071 throw e; 1072 } 1073 break; 1074 case "failure": 1075 String namespace = parser.getNamespace(null); 1076 switch (namespace) { 1077 case "urn:ietf:params:xml:ns:xmpp-tls": 1078 // TLS negotiation has failed. The server will close the connection 1079 // TODO Parse failure stanza 1080 throw new SmackException("TLS negotiation has failed"); 1081 case "http://jabber.org/protocol/compress": 1082 // Stream compression has been denied. This is a recoverable 1083 // situation. It is still possible to authenticate and 1084 // use the connection but using an uncompressed connection 1085 // TODO Parse failure stanza 1086 compressSyncPoint.reportFailure(new SmackException( 1087 "Could not establish compression")); 1088 break; 1089 case SaslStreamElements.NAMESPACE: 1090 // SASL authentication has failed. The server may close the connection 1091 // depending on the number of retries 1092 final SASLFailure failure = PacketParserUtils.parseSASLFailure(parser); 1093 getSASLAuthentication().authenticationFailed(failure); 1094 break; 1095 } 1096 break; 1097 case Challenge.ELEMENT: 1098 // The server is challenging the SASL authentication made by the client 1099 String challengeData = parser.nextText(); 1100 getSASLAuthentication().challengeReceived(challengeData); 1101 break; 1102 case Success.ELEMENT: 1103 Success success = new Success(parser.nextText()); 1104 // We now need to bind a resource for the connection 1105 // Open a new stream and wait for the response 1106 openStream(); 1107 // The SASL authentication with the server was successful. The next step 1108 // will be to bind the resource 1109 getSASLAuthentication().authenticated(success); 1110 break; 1111 case Compressed.ELEMENT: 1112 // Server confirmed that it's possible to use stream compression. Start 1113 // stream compression 1114 // Initialize the reader and writer with the new compressed version 1115 initReaderAndWriter(); 1116 // Send a new opening stream to the server 1117 openStream(); 1118 // Notify that compression is being used 1119 compressSyncPoint.reportSuccess(); 1120 break; 1121 case Enabled.ELEMENT: 1122 Enabled enabled = ParseStreamManagement.enabled(parser); 1123 if (enabled.isResumeSet()) { 1124 smSessionId = enabled.getId(); 1125 if (StringUtils.isNullOrEmpty(smSessionId)) { 1126 SmackException xmppException = new SmackException("Stream Management 'enabled' element with resume attribute but without session id received"); 1127 smEnabledSyncPoint.reportFailure(xmppException); 1128 throw xmppException; 1129 } 1130 smServerMaxResumptionTime = enabled.getMaxResumptionTime(); 1131 } else { 1132 // Mark this a non-resumable stream by setting smSessionId to null 1133 smSessionId = null; 1134 } 1135 clientHandledStanzasCount = 0; 1136 smWasEnabledAtLeastOnce = true; 1137 smEnabledSyncPoint.reportSuccess(); 1138 LOGGER.fine("Stream Management (XEP-198): successfully enabled"); 1139 break; 1140 case Failed.ELEMENT: 1141 Failed failed = ParseStreamManagement.failed(parser); 1142 FailedNonzaException xmppException = new FailedNonzaException(failed, failed.getStanzaErrorCondition()); 1143 // If only XEP-198 would specify different failure elements for the SM 1144 // enable and SM resume failure case. But this is not the case, so we 1145 // need to determine if this is a 'Failed' response for either 'Enable' 1146 // or 'Resume'. 1147 if (smResumedSyncPoint.requestSent()) { 1148 smResumedSyncPoint.reportFailure(xmppException); 1149 } 1150 else { 1151 if (!smEnabledSyncPoint.requestSent()) { 1152 throw new IllegalStateException("Failed element received but SM was not previously enabled"); 1153 } 1154 smEnabledSyncPoint.reportFailure(new SmackException(xmppException)); 1155 // Report success for last lastFeaturesReceived so that in case a 1156 // failed resumption, we can continue with normal resource binding. 1157 // See text of XEP-198 5. below Example 11. 1158 lastFeaturesReceived.reportSuccess(); 1159 } 1160 break; 1161 case Resumed.ELEMENT: 1162 Resumed resumed = ParseStreamManagement.resumed(parser); 1163 if (!smSessionId.equals(resumed.getPrevId())) { 1164 throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId()); 1165 } 1166 // Mark SM as enabled 1167 smEnabledSyncPoint.reportSuccess(); 1168 // First, drop the stanzas already handled by the server 1169 processHandledCount(resumed.getHandledCount()); 1170 // Then re-send what is left in the unacknowledged queue 1171 List<Stanza> stanzasToResend = new ArrayList<>(unacknowledgedStanzas.size()); 1172 unacknowledgedStanzas.drainTo(stanzasToResend); 1173 for (Stanza stanza : stanzasToResend) { 1174 sendStanzaInternal(stanza); 1175 } 1176 // If there where stanzas resent, then request a SM ack for them. 1177 // Writer's sendStreamElement() won't do it automatically based on 1178 // predicates. 1179 if (!stanzasToResend.isEmpty()) { 1180 requestSmAcknowledgementInternal(); 1181 } 1182 // Mark SM resumption as successful 1183 smResumedSyncPoint.reportSuccess(); 1184 LOGGER.fine("Stream Management (XEP-198): Stream resumed"); 1185 break; 1186 case AckAnswer.ELEMENT: 1187 AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser); 1188 processHandledCount(ackAnswer.getHandledCount()); 1189 break; 1190 case AckRequest.ELEMENT: 1191 ParseStreamManagement.ackRequest(parser); 1192 if (smEnabledSyncPoint.wasSuccessful()) { 1193 sendSmAcknowledgementInternal(); 1194 } else { 1195 LOGGER.warning("SM Ack Request received while SM is not enabled"); 1196 } 1197 break; 1198 default: 1199 LOGGER.warning("Unknown top level stream element: " + name); 1200 break; 1201 } 1202 break; 1203 case XmlPullParser.END_TAG: 1204 final String endTagName = parser.getName(); 1205 if ("stream".equals(endTagName)) { 1206 if (!parser.getNamespace().equals("http://etherx.jabber.org/streams")) { 1207 LOGGER.warning(XMPPTCPConnection.this + " </stream> but different namespace " + parser.getNamespace()); 1208 break; 1209 } 1210 1211 // Check if the queue was already shut down before reporting success on closing stream tag 1212 // received. This avoids a race if there is a disconnect(), followed by a connect(), which 1213 // did re-start the queue again, causing this writer to assume that the queue is not 1214 // shutdown, which results in a call to disconnect(). 1215 final boolean queueWasShutdown = packetWriter.queue.isShutdown(); 1216 closingStreamReceived.reportSuccess(); 1217 1218 if (queueWasShutdown) { 1219 // We received a closing stream element *after* we initiated the 1220 // termination of the session by sending a closing stream element to 1221 // the server first 1222 return; 1223 } else { 1224 // We received a closing stream element from the server without us 1225 // sending a closing stream element first. This means that the 1226 // server wants to terminate the session, therefore disconnect 1227 // the connection 1228 LOGGER.info(XMPPTCPConnection.this 1229 + " received closing </stream> element." 1230 + " Server wants to terminate the connection, calling disconnect()"); 1231 disconnect(); 1232 } 1233 } 1234 break; 1235 case XmlPullParser.END_DOCUMENT: 1236 // END_DOCUMENT only happens in an error case, as otherwise we would see a 1237 // closing stream element before. 1238 throw new SmackException( 1239 "Parser got END_DOCUMENT event. This could happen e.g. if the server closed the connection without sending a closing stream element"); 1240 } 1241 eventType = parser.next(); 1242 } 1243 } 1244 catch (Exception e) { 1245 closingStreamReceived.reportFailure(e); 1246 // The exception can be ignored if the the connection is 'done' 1247 // or if the it was caused because the socket got closed 1248 if (!(done || packetWriter.queue.isShutdown())) { 1249 // Close the connection and notify connection listeners of the 1250 // error. 1251 notifyConnectionError(e); 1252 } 1253 } 1254 } 1255 } 1256 1257 protected class PacketWriter { 1258 public static final int QUEUE_SIZE = XMPPTCPConnection.QUEUE_SIZE; 1259 1260 private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<>( 1261 QUEUE_SIZE, true); 1262 1263 /** 1264 * Needs to be protected for unit testing purposes. 1265 */ 1266 protected SynchronizationPoint<NoResponseException> shutdownDone = new SynchronizationPoint<>( 1267 XMPPTCPConnection.this, "shutdown completed"); 1268 1269 /** 1270 * If set, the stanza writer is shut down 1271 */ 1272 protected volatile Long shutdownTimestamp = null; 1273 1274 private volatile boolean instantShutdown; 1275 1276 /** 1277 * True if some preconditions are given to start the bundle and defer mechanism. 1278 * <p> 1279 * This will likely get set to true right after the start of the writer thread, because 1280 * {@link #nextStreamElement()} will check if {@link queue} is empty, which is probably the case, and then set 1281 * this field to true. 1282 * </p> 1283 */ 1284 private boolean shouldBundleAndDefer; 1285 1286 /** 1287 * Initializes the writer in order to be used. It is called at the first connection and also 1288 * is invoked if the connection is disconnected by an error. 1289 */ 1290 void init() { 1291 shutdownDone.init(); 1292 shutdownTimestamp = null; 1293 1294 if (unacknowledgedStanzas != null) { 1295 // It's possible that there are new stanzas in the writer queue that 1296 // came in while we were disconnected but resumable, drain those into 1297 // the unacknowledged queue so that they get resent now 1298 drainWriterQueueToUnacknowledgedStanzas(); 1299 } 1300 1301 queue.start(); 1302 Async.go(new Runnable() { 1303 @Override 1304 public void run() { 1305 writePackets(); 1306 } 1307 }, "Smack Writer (" + getConnectionCounter() + ")"); 1308 } 1309 1310 private boolean done() { 1311 return shutdownTimestamp != null; 1312 } 1313 1314 protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException { 1315 final boolean done = done(); 1316 if (done) { 1317 final boolean smResumptionPossible = isSmResumptionPossible(); 1318 // Don't throw a NotConnectedException is there is an resumable stream available 1319 if (!smResumptionPossible) { 1320 throw new NotConnectedException(XMPPTCPConnection.this, "done=" + done 1321 + " smResumptionPossible=" + smResumptionPossible); 1322 } 1323 } 1324 } 1325 1326 /** 1327 * Sends the specified element to the server. 1328 * 1329 * @param element the element to send. 1330 * @throws NotConnectedException 1331 * @throws InterruptedException 1332 */ 1333 protected void sendStreamElement(Element element) throws NotConnectedException, InterruptedException { 1334 throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 1335 try { 1336 queue.put(element); 1337 } 1338 catch (InterruptedException e) { 1339 // put() may throw an InterruptedException for two reasons: 1340 // 1. If the queue was shut down 1341 // 2. If the thread was interrupted 1342 // so we have to check which is the case 1343 throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 1344 // If the method above did not throw, then the sending thread was interrupted 1345 throw e; 1346 } 1347 } 1348 1349 /** 1350 * Shuts down the stanza writer. Once this method has been called, no further 1351 * packets will be written to the server. 1352 * @throws InterruptedException 1353 */ 1354 void shutdown(boolean instant) { 1355 instantShutdown = instant; 1356 queue.shutdown(); 1357 shutdownTimestamp = System.currentTimeMillis(); 1358 try { 1359 shutdownDone.checkIfSuccessOrWait(); 1360 } 1361 catch (NoResponseException | InterruptedException e) { 1362 LOGGER.log(Level.WARNING, "shutdownDone was not marked as successful by the writer thread", e); 1363 } 1364 } 1365 1366 /** 1367 * Maybe return the next available element from the queue for writing. If the queue is shut down <b>or</b> a 1368 * spurious interrupt occurs, <code>null</code> is returned. So it is important to check the 'done' condition in 1369 * that case. 1370 * 1371 * @return the next element for writing or null. 1372 */ 1373 private Element nextStreamElement() { 1374 // It is important the we check if the queue is empty before removing an element from it 1375 if (queue.isEmpty()) { 1376 shouldBundleAndDefer = true; 1377 } 1378 Element packet = null; 1379 try { 1380 packet = queue.take(); 1381 } 1382 catch (InterruptedException e) { 1383 if (!queue.isShutdown()) { 1384 // Users shouldn't try to interrupt the packet writer thread 1385 LOGGER.log(Level.WARNING, "Writer thread was interrupted. Don't do that. Use disconnect() instead.", e); 1386 } 1387 } 1388 return packet; 1389 } 1390 1391 private void writePackets() { 1392 Exception writerException = null; 1393 try { 1394 openStream(); 1395 initialOpenStreamSend.reportSuccess(); 1396 // Write out packets from the queue. 1397 while (!done()) { 1398 Element element = nextStreamElement(); 1399 if (element == null) { 1400 continue; 1401 } 1402 1403 // Get a local version of the bundle and defer callback, in case it's unset 1404 // between the null check and the method invocation 1405 final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback; 1406 // If the preconditions are given (e.g. bundleAndDefer callback is set, queue is 1407 // empty), then we could wait a bit for further stanzas attempting to decrease 1408 // our energy consumption 1409 if (localBundleAndDeferCallback != null && isAuthenticated() && shouldBundleAndDefer) { 1410 // Reset shouldBundleAndDefer to false, nextStreamElement() will set it to true once the 1411 // queue is empty again. 1412 shouldBundleAndDefer = false; 1413 final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean(); 1414 final int bundleAndDeferMillis = localBundleAndDeferCallback.getBundleAndDeferMillis(new BundleAndDefer( 1415 bundlingAndDeferringStopped)); 1416 if (bundleAndDeferMillis > 0) { 1417 long remainingWait = bundleAndDeferMillis; 1418 final long waitStart = System.currentTimeMillis(); 1419 synchronized (bundlingAndDeferringStopped) { 1420 while (!bundlingAndDeferringStopped.get() && remainingWait > 0) { 1421 bundlingAndDeferringStopped.wait(remainingWait); 1422 remainingWait = bundleAndDeferMillis 1423 - (System.currentTimeMillis() - waitStart); 1424 } 1425 } 1426 } 1427 } 1428 1429 Stanza packet = null; 1430 if (element instanceof Stanza) { 1431 packet = (Stanza) element; 1432 } 1433 else if (element instanceof Enable) { 1434 // The client needs to add messages to the unacknowledged stanzas queue 1435 // right after it sent 'enabled'. Stanza will be added once 1436 // unacknowledgedStanzas is not null. 1437 unacknowledgedStanzas = new ArrayBlockingQueue<>(QUEUE_SIZE); 1438 } 1439 maybeAddToUnacknowledgedStanzas(packet); 1440 1441 CharSequence elementXml = element.toXML(StreamOpen.CLIENT_NAMESPACE); 1442 if (elementXml instanceof XmlStringBuilder) { 1443 ((XmlStringBuilder) elementXml).write(writer, StreamOpen.CLIENT_NAMESPACE); 1444 } 1445 else { 1446 writer.write(elementXml.toString()); 1447 } 1448 1449 if (queue.isEmpty()) { 1450 writer.flush(); 1451 } 1452 if (packet != null) { 1453 firePacketSendingListeners(packet); 1454 } 1455 } 1456 if (!instantShutdown) { 1457 // Flush out the rest of the queue. 1458 try { 1459 while (!queue.isEmpty()) { 1460 Element packet = queue.remove(); 1461 if (packet instanceof Stanza) { 1462 Stanza stanza = (Stanza) packet; 1463 maybeAddToUnacknowledgedStanzas(stanza); 1464 } 1465 writer.write(packet.toXML(null).toString()); 1466 } 1467 writer.flush(); 1468 } 1469 catch (Exception e) { 1470 LOGGER.log(Level.WARNING, 1471 "Exception flushing queue during shutdown, ignore and continue", 1472 e); 1473 } 1474 1475 // Close the stream. 1476 try { 1477 writer.write("</stream:stream>"); 1478 writer.flush(); 1479 } 1480 catch (Exception e) { 1481 LOGGER.log(Level.WARNING, "Exception writing closing stream element", e); 1482 } 1483 1484 // Delete the queue contents (hopefully nothing is left). 1485 queue.clear(); 1486 } else if (instantShutdown && isSmEnabled()) { 1487 // This was an instantShutdown and SM is enabled, drain all remaining stanzas 1488 // into the unacknowledgedStanzas queue 1489 drainWriterQueueToUnacknowledgedStanzas(); 1490 } 1491 // Do *not* close the writer here, as it will cause the socket 1492 // to get closed. But we may want to receive further stanzas 1493 // until the closing stream tag is received. The socket will be 1494 // closed in shutdown(). 1495 } 1496 catch (Exception e) { 1497 // The exception can be ignored if the the connection is 'done' 1498 // or if the it was caused because the socket got closed 1499 if (!(done() || queue.isShutdown())) { 1500 writerException = e; 1501 } else { 1502 LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e); 1503 } 1504 } finally { 1505 LOGGER.fine("Reporting shutdownDone success in writer thread"); 1506 shutdownDone.reportSuccess(); 1507 } 1508 // Delay notifyConnectionError after shutdownDone has been reported in the finally block. 1509 if (writerException != null) { 1510 notifyConnectionError(writerException); 1511 } 1512 } 1513 1514 private void drainWriterQueueToUnacknowledgedStanzas() { 1515 List<Element> elements = new ArrayList<>(queue.size()); 1516 queue.drainTo(elements); 1517 for (Element element : elements) { 1518 if (element instanceof Stanza) { 1519 unacknowledgedStanzas.add((Stanza) element); 1520 } 1521 } 1522 } 1523 1524 private void maybeAddToUnacknowledgedStanzas(Stanza stanza) throws IOException { 1525 // Check if the stream element should be put to the unacknowledgedStanza 1526 // queue. Note that we can not do the put() in sendStanzaInternal() and the 1527 // packet order is not stable at this point (sendStanzaInternal() can be 1528 // called concurrently). 1529 if (unacknowledgedStanzas != null && stanza != null) { 1530 // If the unacknowledgedStanza queue is nearly full, request an new ack 1531 // from the server in order to drain it 1532 if (unacknowledgedStanzas.size() == 0.8 * XMPPTCPConnection.QUEUE_SIZE) { 1533 writer.write(AckRequest.INSTANCE.toXML(null).toString()); 1534 writer.flush(); 1535 } 1536 try { 1537 // It is important the we put the stanza in the unacknowledged stanza 1538 // queue before we put it on the wire 1539 unacknowledgedStanzas.put(stanza); 1540 } 1541 catch (InterruptedException e) { 1542 throw new IllegalStateException(e); 1543 } 1544 } 1545 } 1546 } 1547 1548 /** 1549 * Set if Stream Management should be used by default for new connections. 1550 * 1551 * @param useSmDefault true to use Stream Management for new connections. 1552 */ 1553 public static void setUseStreamManagementDefault(boolean useSmDefault) { 1554 XMPPTCPConnection.useSmDefault = useSmDefault; 1555 } 1556 1557 /** 1558 * Set if Stream Management resumption should be used by default for new connections. 1559 * 1560 * @param useSmResumptionDefault true to use Stream Management resumption for new connections. 1561 * @deprecated use {@link #setUseStreamManagementResumptionDefault(boolean)} instead. 1562 */ 1563 @Deprecated 1564 public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) { 1565 setUseStreamManagementResumptionDefault(useSmResumptionDefault); 1566 } 1567 1568 /** 1569 * Set if Stream Management resumption should be used by default for new connections. 1570 * 1571 * @param useSmResumptionDefault true to use Stream Management resumption for new connections. 1572 */ 1573 public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) { 1574 if (useSmResumptionDefault) { 1575 // Also enable SM is resumption is enabled 1576 setUseStreamManagementDefault(useSmResumptionDefault); 1577 } 1578 XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault; 1579 } 1580 1581 /** 1582 * Set if Stream Management should be used if supported by the server. 1583 * 1584 * @param useSm true to use Stream Management. 1585 */ 1586 public void setUseStreamManagement(boolean useSm) { 1587 this.useSm = useSm; 1588 } 1589 1590 /** 1591 * Set if Stream Management resumption should be used if supported by the server. 1592 * 1593 * @param useSmResumption true to use Stream Management resumption. 1594 */ 1595 public void setUseStreamManagementResumption(boolean useSmResumption) { 1596 if (useSmResumption) { 1597 // Also enable SM is resumption is enabled 1598 setUseStreamManagement(useSmResumption); 1599 } 1600 this.useSmResumption = useSmResumption; 1601 } 1602 1603 /** 1604 * Set the preferred resumption time in seconds. 1605 * @param resumptionTime the preferred resumption time in seconds 1606 */ 1607 public void setPreferredResumptionTime(int resumptionTime) { 1608 smClientMaxResumptionTime = resumptionTime; 1609 } 1610 1611 /** 1612 * Add a predicate for Stream Management acknowledgment requests. 1613 * <p> 1614 * Those predicates are used to determine when a Stream Management acknowledgement request is send to the server. 1615 * Some pre-defined predicates are found in the <code>org.jivesoftware.smack.sm.predicates</code> package. 1616 * </p> 1617 * <p> 1618 * If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used. 1619 * </p> 1620 * 1621 * @param predicate the predicate to add. 1622 * @return if the predicate was not already active. 1623 */ 1624 public boolean addRequestAckPredicate(StanzaFilter predicate) { 1625 synchronized (requestAckPredicates) { 1626 return requestAckPredicates.add(predicate); 1627 } 1628 } 1629 1630 /** 1631 * Remove the given predicate for Stream Management acknowledgment request. 1632 * @param predicate the predicate to remove. 1633 * @return true if the predicate was removed. 1634 */ 1635 public boolean removeRequestAckPredicate(StanzaFilter predicate) { 1636 synchronized (requestAckPredicates) { 1637 return requestAckPredicates.remove(predicate); 1638 } 1639 } 1640 1641 /** 1642 * Remove all predicates for Stream Management acknowledgment requests. 1643 */ 1644 public void removeAllRequestAckPredicates() { 1645 synchronized (requestAckPredicates) { 1646 requestAckPredicates.clear(); 1647 } 1648 } 1649 1650 /** 1651 * Send an unconditional Stream Management acknowledgement request to the server. 1652 * 1653 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1654 * @throws NotConnectedException if the connection is not connected. 1655 * @throws InterruptedException 1656 */ 1657 public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException { 1658 if (!isSmEnabled()) { 1659 throw new StreamManagementException.StreamManagementNotEnabledException(); 1660 } 1661 requestSmAcknowledgementInternal(); 1662 } 1663 1664 private void requestSmAcknowledgementInternal() throws NotConnectedException, InterruptedException { 1665 packetWriter.sendStreamElement(AckRequest.INSTANCE); 1666 } 1667 1668 /** 1669 * Send a unconditional Stream Management acknowledgment to the server. 1670 * <p> 1671 * See <a href="http://xmpp.org/extensions/xep-0198.html#acking">XEP-198: Stream Management ยง 4. Acks</a>: 1672 * "Either party MAY send an <a/> element at any time (e.g., after it has received a certain number of stanzas, 1673 * or after a certain period of time), even if it has not received an <r/> element from the other party." 1674 * </p> 1675 * 1676 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1677 * @throws NotConnectedException if the connection is not connected. 1678 * @throws InterruptedException 1679 */ 1680 public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException { 1681 if (!isSmEnabled()) { 1682 throw new StreamManagementException.StreamManagementNotEnabledException(); 1683 } 1684 sendSmAcknowledgementInternal(); 1685 } 1686 1687 private void sendSmAcknowledgementInternal() throws NotConnectedException, InterruptedException { 1688 packetWriter.sendStreamElement(new AckAnswer(clientHandledStanzasCount)); 1689 } 1690 1691 /** 1692 * Add a Stanza acknowledged listener. 1693 * <p> 1694 * Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get 1695 * automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, StanzaListener)} when 1696 * possible. 1697 * </p> 1698 * 1699 * @param listener the listener to add. 1700 */ 1701 public void addStanzaAcknowledgedListener(StanzaListener listener) { 1702 stanzaAcknowledgedListeners.add(listener); 1703 } 1704 1705 /** 1706 * Remove the given Stanza acknowledged listener. 1707 * 1708 * @param listener the listener. 1709 * @return true if the listener was removed. 1710 */ 1711 public boolean removeStanzaAcknowledgedListener(StanzaListener listener) { 1712 return stanzaAcknowledgedListeners.remove(listener); 1713 } 1714 1715 /** 1716 * Remove all stanza acknowledged listeners. 1717 */ 1718 public void removeAllStanzaAcknowledgedListeners() { 1719 stanzaAcknowledgedListeners.clear(); 1720 } 1721 1722 /** 1723 * Add a new Stanza ID acknowledged listener for the given ID. 1724 * <p> 1725 * The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will 1726 * automatically be removed after the listener was run. 1727 * </p> 1728 * 1729 * @param id the stanza ID. 1730 * @param listener the listener to invoke. 1731 * @return the previous listener for this stanza ID or null. 1732 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1733 */ 1734 @SuppressWarnings("FutureReturnValueIgnored") 1735 public StanzaListener addStanzaIdAcknowledgedListener(final String id, StanzaListener listener) throws StreamManagementNotEnabledException { 1736 // Prevent users from adding callbacks that will never get removed 1737 if (!smWasEnabledAtLeastOnce) { 1738 throw new StreamManagementException.StreamManagementNotEnabledException(); 1739 } 1740 // Remove the listener after max. 3 hours 1741 final int removeAfterSeconds = Math.min(getMaxSmResumptionTime(), 3 * 60 * 60); 1742 schedule(new Runnable() { 1743 @Override 1744 public void run() { 1745 stanzaIdAcknowledgedListeners.remove(id); 1746 } 1747 }, removeAfterSeconds, TimeUnit.SECONDS); 1748 return stanzaIdAcknowledgedListeners.put(id, listener); 1749 } 1750 1751 /** 1752 * Remove the Stanza ID acknowledged listener for the given ID. 1753 * 1754 * @param id the stanza ID. 1755 * @return true if the listener was found and removed, false otherwise. 1756 */ 1757 public StanzaListener removeStanzaIdAcknowledgedListener(String id) { 1758 return stanzaIdAcknowledgedListeners.remove(id); 1759 } 1760 1761 /** 1762 * Removes all Stanza ID acknowledged listeners. 1763 */ 1764 public void removeAllStanzaIdAcknowledgedListeners() { 1765 stanzaIdAcknowledgedListeners.clear(); 1766 } 1767 1768 /** 1769 * Returns true if Stream Management is supported by the server. 1770 * 1771 * @return true if Stream Management is supported by the server. 1772 */ 1773 public boolean isSmAvailable() { 1774 return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE); 1775 } 1776 1777 /** 1778 * Returns true if Stream Management was successfully negotiated with the server. 1779 * 1780 * @return true if Stream Management was negotiated. 1781 */ 1782 public boolean isSmEnabled() { 1783 return smEnabledSyncPoint.wasSuccessful(); 1784 } 1785 1786 /** 1787 * Returns true if the stream was successfully resumed with help of Stream Management. 1788 * 1789 * @return true if the stream was resumed. 1790 */ 1791 public boolean streamWasResumed() { 1792 return smResumedSyncPoint.wasSuccessful(); 1793 } 1794 1795 /** 1796 * Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible. 1797 * 1798 * @return true if disconnected but resumption possible. 1799 */ 1800 public boolean isDisconnectedButSmResumptionPossible() { 1801 return disconnectedButResumeable && isSmResumptionPossible(); 1802 } 1803 1804 /** 1805 * Returns true if the stream is resumable. 1806 * 1807 * @return true if the stream is resumable. 1808 */ 1809 public boolean isSmResumptionPossible() { 1810 // There is no resumable stream available 1811 if (smSessionId == null) 1812 return false; 1813 1814 final Long shutdownTimestamp = packetWriter.shutdownTimestamp; 1815 // Seems like we are already reconnected, report true 1816 if (shutdownTimestamp == null) { 1817 return true; 1818 } 1819 1820 // See if resumption time is over 1821 long current = System.currentTimeMillis(); 1822 long maxResumptionMillies = ((long) getMaxSmResumptionTime()) * 1000; 1823 if (current > shutdownTimestamp + maxResumptionMillies) { 1824 // Stream resumption is *not* possible if the current timestamp is greater then the greatest timestamp where 1825 // resumption is possible 1826 return false; 1827 } else { 1828 return true; 1829 } 1830 } 1831 1832 /** 1833 * Drop the stream management state. Sets {@link #smSessionId} and 1834 * {@link #unacknowledgedStanzas} to <code>null</code>. 1835 */ 1836 private void dropSmState() { 1837 // clientHandledCount and serverHandledCount will be reset on <enable/> and <enabled/> 1838 // respective. No need to reset them here. 1839 smSessionId = null; 1840 unacknowledgedStanzas = null; 1841 } 1842 1843 /** 1844 * Get the maximum resumption time in seconds after which a managed stream can be resumed. 1845 * <p> 1846 * This method will return {@link Integer#MAX_VALUE} if neither the client nor the server specify a maximum 1847 * resumption time. Be aware of integer overflows when using this value, e.g. do not add arbitrary values to it 1848 * without checking for overflows before. 1849 * </p> 1850 * 1851 * @return the maximum resumption time in seconds or {@link Integer#MAX_VALUE} if none set. 1852 */ 1853 public int getMaxSmResumptionTime() { 1854 int clientResumptionTime = smClientMaxResumptionTime > 0 ? smClientMaxResumptionTime : Integer.MAX_VALUE; 1855 int serverResumptionTime = smServerMaxResumptionTime > 0 ? smServerMaxResumptionTime : Integer.MAX_VALUE; 1856 return Math.min(clientResumptionTime, serverResumptionTime); 1857 } 1858 1859 private void processHandledCount(long handledCount) throws StreamManagementCounterError { 1860 long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount); 1861 final List<Stanza> ackedStanzas = new ArrayList<>( 1862 ackedStanzasCount <= Integer.MAX_VALUE ? (int) ackedStanzasCount 1863 : Integer.MAX_VALUE); 1864 for (long i = 0; i < ackedStanzasCount; i++) { 1865 Stanza ackedStanza = unacknowledgedStanzas.poll(); 1866 // If the server ack'ed a stanza, then it must be in the 1867 // unacknowledged stanza queue. There can be no exception. 1868 if (ackedStanza == null) { 1869 throw new StreamManagementCounterError(handledCount, serverHandledStanzasCount, 1870 ackedStanzasCount, ackedStanzas); 1871 } 1872 ackedStanzas.add(ackedStanza); 1873 } 1874 1875 boolean atLeastOneStanzaAcknowledgedListener = false; 1876 if (!stanzaAcknowledgedListeners.isEmpty()) { 1877 // If stanzaAcknowledgedListeners is not empty, the we have at least one 1878 atLeastOneStanzaAcknowledgedListener = true; 1879 } 1880 else { 1881 // Otherwise we look for a matching id in the stanza *id* acknowledged listeners 1882 for (Stanza ackedStanza : ackedStanzas) { 1883 String id = ackedStanza.getStanzaId(); 1884 if (id != null && stanzaIdAcknowledgedListeners.containsKey(id)) { 1885 atLeastOneStanzaAcknowledgedListener = true; 1886 break; 1887 } 1888 } 1889 } 1890 1891 // Only spawn a new thread if there is a chance that some listener is invoked 1892 if (atLeastOneStanzaAcknowledgedListener) { 1893 asyncGo(new Runnable() { 1894 @Override 1895 public void run() { 1896 for (Stanza ackedStanza : ackedStanzas) { 1897 for (StanzaListener listener : stanzaAcknowledgedListeners) { 1898 try { 1899 listener.processStanza(ackedStanza); 1900 } 1901 catch (InterruptedException | NotConnectedException | NotLoggedInException e) { 1902 LOGGER.log(Level.FINER, "Received exception", e); 1903 } 1904 } 1905 String id = ackedStanza.getStanzaId(); 1906 if (StringUtils.isNullOrEmpty(id)) { 1907 continue; 1908 } 1909 StanzaListener listener = stanzaIdAcknowledgedListeners.remove(id); 1910 if (listener != null) { 1911 try { 1912 listener.processStanza(ackedStanza); 1913 } 1914 catch (InterruptedException | NotConnectedException | NotLoggedInException e) { 1915 LOGGER.log(Level.FINER, "Received exception", e); 1916 } 1917 } 1918 } 1919 } 1920 }); 1921 } 1922 1923 serverHandledStanzasCount = handledCount; 1924 } 1925 1926 /** 1927 * Set the default bundle and defer callback used for new connections. 1928 * 1929 * @param defaultBundleAndDeferCallback 1930 * @see BundleAndDeferCallback 1931 * @since 4.1 1932 */ 1933 public static void setDefaultBundleAndDeferCallback(BundleAndDeferCallback defaultBundleAndDeferCallback) { 1934 XMPPTCPConnection.defaultBundleAndDeferCallback = defaultBundleAndDeferCallback; 1935 } 1936 1937 /** 1938 * Set the bundle and defer callback used for this connection. 1939 * <p> 1940 * You can use <code>null</code> as argument to reset the callback. Outgoing stanzas will then 1941 * no longer get deferred. 1942 * </p> 1943 * 1944 * @param bundleAndDeferCallback the callback or <code>null</code>. 1945 * @see BundleAndDeferCallback 1946 * @since 4.1 1947 */ 1948 public void setBundleandDeferCallback(BundleAndDeferCallback bundleAndDeferCallback) { 1949 this.bundleAndDeferCallback = bundleAndDeferCallback; 1950 } 1951 1952}