001/** 002 * 003 * Copyright 2003-2007 Jive Software. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack.tcp; 018 019import org.jivesoftware.smack.AbstractXMPPConnection; 020import org.jivesoftware.smack.ConnectionConfiguration; 021import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode; 022import org.jivesoftware.smack.ConnectionCreationListener; 023import org.jivesoftware.smack.StanzaListener; 024import org.jivesoftware.smack.SmackConfiguration; 025import org.jivesoftware.smack.SmackException; 026import org.jivesoftware.smack.SmackException.AlreadyConnectedException; 027import org.jivesoftware.smack.SmackException.AlreadyLoggedInException; 028import org.jivesoftware.smack.SmackException.NoResponseException; 029import org.jivesoftware.smack.SmackException.NotConnectedException; 030import org.jivesoftware.smack.SmackException.ConnectionException; 031import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException; 032import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException; 033import org.jivesoftware.smack.SmackException.SecurityRequiredException; 034import org.jivesoftware.smack.SynchronizationPoint; 035import org.jivesoftware.smack.XMPPException.StreamErrorException; 036import org.jivesoftware.smack.XMPPConnection; 037import org.jivesoftware.smack.XMPPException; 038import org.jivesoftware.smack.XMPPException.XMPPErrorException; 039import org.jivesoftware.smack.compress.packet.Compressed; 040import org.jivesoftware.smack.compression.XMPPInputOutputStream; 041import org.jivesoftware.smack.filter.StanzaFilter; 042import org.jivesoftware.smack.compress.packet.Compress; 043import org.jivesoftware.smack.packet.Element; 044import org.jivesoftware.smack.packet.IQ; 045import org.jivesoftware.smack.packet.Message; 046import org.jivesoftware.smack.packet.StreamOpen; 047import org.jivesoftware.smack.packet.Stanza; 048import org.jivesoftware.smack.packet.Presence; 049import org.jivesoftware.smack.packet.StartTls; 050import org.jivesoftware.smack.sasl.packet.SaslStreamElements; 051import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Challenge; 052import org.jivesoftware.smack.sasl.packet.SaslStreamElements.SASLFailure; 053import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Success; 054import org.jivesoftware.smack.sm.SMUtils; 055import org.jivesoftware.smack.sm.StreamManagementException; 056import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException; 057import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError; 058import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException; 059import org.jivesoftware.smack.sm.packet.StreamManagement; 060import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer; 061import org.jivesoftware.smack.sm.packet.StreamManagement.AckRequest; 062import org.jivesoftware.smack.sm.packet.StreamManagement.Enable; 063import org.jivesoftware.smack.sm.packet.StreamManagement.Enabled; 064import org.jivesoftware.smack.sm.packet.StreamManagement.Failed; 065import org.jivesoftware.smack.sm.packet.StreamManagement.Resume; 066import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed; 067import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature; 068import org.jivesoftware.smack.sm.predicates.Predicate; 069import org.jivesoftware.smack.sm.provider.ParseStreamManagement; 070import org.jivesoftware.smack.packet.PlainStreamElement; 071import org.jivesoftware.smack.packet.XMPPError; 072import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown; 073import org.jivesoftware.smack.util.Async; 074import org.jivesoftware.smack.util.PacketParserUtils; 075import org.jivesoftware.smack.util.StringUtils; 076import org.jivesoftware.smack.util.TLSUtils; 077import org.jivesoftware.smack.util.dns.HostAddress; 078import org.jxmpp.util.XmppStringUtils; 079import org.xmlpull.v1.XmlPullParser; 080import org.xmlpull.v1.XmlPullParserException; 081 082import javax.net.SocketFactory; 083import javax.net.ssl.HostnameVerifier; 084import javax.net.ssl.KeyManager; 085import javax.net.ssl.KeyManagerFactory; 086import javax.net.ssl.SSLContext; 087import javax.net.ssl.SSLSocket; 088import javax.security.auth.callback.Callback; 089import javax.security.auth.callback.PasswordCallback; 090 091import java.io.BufferedReader; 092import java.io.ByteArrayInputStream; 093import java.io.FileInputStream; 094import java.io.IOException; 095import java.io.InputStream; 096import java.io.InputStreamReader; 097import java.io.OutputStream; 098import java.io.OutputStreamWriter; 099import java.io.Writer; 100import java.lang.reflect.Constructor; 101import java.net.InetAddress; 102import java.net.InetSocketAddress; 103import java.net.Socket; 104import java.net.UnknownHostException; 105import java.security.KeyManagementException; 106import java.security.KeyStore; 107import java.security.KeyStoreException; 108import java.security.NoSuchAlgorithmException; 109import java.security.NoSuchProviderException; 110import java.security.Provider; 111import java.security.Security; 112import java.security.UnrecoverableKeyException; 113import java.security.cert.CertificateException; 114import java.util.ArrayList; 115import java.util.Arrays; 116import java.util.Collection; 117import java.util.Iterator; 118import java.util.LinkedHashSet; 119import java.util.LinkedList; 120import java.util.List; 121import java.util.Map; 122import java.util.Set; 123import java.util.concurrent.ArrayBlockingQueue; 124import java.util.concurrent.BlockingQueue; 125import java.util.concurrent.ConcurrentHashMap; 126import java.util.concurrent.ConcurrentLinkedQueue; 127import java.util.concurrent.TimeUnit; 128import java.util.concurrent.atomic.AtomicBoolean; 129import java.util.logging.Level; 130import java.util.logging.Logger; 131 132/** 133 * Creates a socket connection to an XMPP server. This is the default connection 134 * to an XMPP server and is specified in the XMPP Core (RFC 6120). 135 * 136 * @see XMPPConnection 137 * @author Matt Tucker 138 */ 139public class XMPPTCPConnection extends AbstractXMPPConnection { 140 141 private static final int QUEUE_SIZE = 500; 142 private static final Logger LOGGER = Logger.getLogger(XMPPTCPConnection.class.getName()); 143 144 /** 145 * The socket which is used for this connection. 146 */ 147 private Socket socket; 148 149 /** 150 * 151 */ 152 private boolean disconnectedButResumeable = false; 153 154 /** 155 * Flag to indicate if the socket was closed intentionally by Smack. 156 * <p> 157 * This boolean flag is used concurrently, therefore it is marked volatile. 158 * </p> 159 */ 160 private volatile boolean socketClosed = false; 161 162 private boolean usingTLS = false; 163 164 /** 165 * Protected access level because of unit test purposes 166 */ 167 protected PacketWriter packetWriter; 168 169 /** 170 * Protected access level because of unit test purposes 171 */ 172 protected PacketReader packetReader; 173 174 private final SynchronizationPoint<Exception> initalOpenStreamSend = new SynchronizationPoint<Exception>(this); 175 176 /** 177 * 178 */ 179 private final SynchronizationPoint<XMPPException> maybeCompressFeaturesReceived = new SynchronizationPoint<XMPPException>( 180 this); 181 182 /** 183 * 184 */ 185 private final SynchronizationPoint<XMPPException> compressSyncPoint = new SynchronizationPoint<XMPPException>( 186 this); 187 188 /** 189 * The default bundle and defer callback, used for new connections. 190 * @see bundleAndDeferCallback 191 */ 192 private static BundleAndDeferCallback defaultBundleAndDeferCallback; 193 194 /** 195 * The used bundle and defer callback. 196 * <p> 197 * Although this field may be set concurrently, the 'volatile' keyword was deliberately not added, in order to avoid 198 * having a 'volatile' read within the writer threads loop. 199 * </p> 200 */ 201 private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback; 202 203 private static boolean useSmDefault = false; 204 205 private static boolean useSmResumptionDefault = true; 206 207 /** 208 * The stream ID of the stream that is currently resumable, ie. the stream we hold the state 209 * for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and 210 * {@link #unacknowledgedStanzas}. 211 */ 212 private String smSessionId; 213 214 private final SynchronizationPoint<XMPPException> smResumedSyncPoint = new SynchronizationPoint<XMPPException>( 215 this); 216 217 private final SynchronizationPoint<XMPPException> smEnabledSyncPoint = new SynchronizationPoint<XMPPException>( 218 this); 219 220 /** 221 * The client's preferred maximum resumption time in seconds. 222 */ 223 private int smClientMaxResumptionTime = -1; 224 225 /** 226 * The server's preferred maximum resumption time in seconds. 227 */ 228 private int smServerMaxResumptimTime = -1; 229 230 /** 231 * Indicates whether Stream Management (XEP-198) should be used if it's supported by the server. 232 */ 233 private boolean useSm = useSmDefault; 234 private boolean useSmResumption = useSmResumptionDefault; 235 236 /** 237 * The counter that the server sends the client about it's current height. For example, if the server sends 238 * {@code <a h='42'/>}, then this will be set to 42 (while also handling the {@link #unacknowledgedStanzas} queue). 239 */ 240 private long serverHandledStanzasCount = 0; 241 242 /** 243 * The counter for stanzas handled ("received") by the client. 244 * <p> 245 * Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are 246 * not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since 247 * {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and 248 * therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic. 249 * </p> 250 */ 251 private long clientHandledStanzasCount = 0; 252 253 private BlockingQueue<Stanza> unacknowledgedStanzas; 254 255 /** 256 * Set to true if Stream Management was at least once enabled for this connection. 257 */ 258 private boolean smWasEnabledAtLeastOnce = false; 259 260 /** 261 * This listeners are invoked for every stanza that got acknowledged. 262 * <p> 263 * We use a {@link ConccurrentLinkedQueue} here in order to allow the listeners to remove 264 * themselves after they have been invoked. 265 * </p> 266 */ 267 private final Collection<StanzaListener> stanzaAcknowledgedListeners = new ConcurrentLinkedQueue<StanzaListener>(); 268 269 /** 270 * This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will 271 * only be invoked once and automatically removed after that. 272 */ 273 private final Map<String, StanzaListener> stanzaIdAcknowledgedListeners = new ConcurrentHashMap<String, StanzaListener>(); 274 275 /** 276 * Predicates that determine if an stream management ack should be requested from the server. 277 * <p> 278 * We use a linked hash set here, so that the order how the predicates are added matches the 279 * order in which they are invoked in order to determine if an ack request should be send or not. 280 * </p> 281 */ 282 private final Set<StanzaFilter> requestAckPredicates = new LinkedHashSet<StanzaFilter>(); 283 284 private final XMPPTCPConnectionConfiguration config; 285 286 /** 287 * Creates a new XMPP connection over TCP (optionally using proxies). 288 * <p> 289 * Note that XMPPTCPConnection constructors do not establish a connection to the server 290 * and you must call {@link #connect()}. 291 * </p> 292 * 293 * @param config the connection configuration. 294 */ 295 public XMPPTCPConnection(XMPPTCPConnectionConfiguration config) { 296 super(config); 297 this.config = config; 298 } 299 300 /** 301 * Creates a new XMPP connection over TCP. 302 * <p> 303 * Note that {@code jid} must be the bare JID, e.g. "user@example.org". More fine-grained control over the 304 * connection settings is available using the {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} 305 * constructor. 306 * </p> 307 * 308 * @param jid the bare JID used by the client. 309 * @param password the password or authentication token. 310 */ 311 public XMPPTCPConnection(CharSequence jid, String password) { 312 this(XmppStringUtils.parseLocalpart(jid.toString()), password, XmppStringUtils.parseDomain(jid.toString())); 313 } 314 315 /** 316 * Creates a new XMPP connection over TCP. 317 * <p> 318 * This is the simplest constructor for connecting to an XMPP server. Alternatively, 319 * you can get fine-grained control over connection settings using the 320 * {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} constructor. 321 * </p> 322 * @param username 323 * @param password 324 * @param serviceName 325 */ 326 public XMPPTCPConnection(CharSequence username, String password, String serviceName) { 327 this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setServiceName( 328 serviceName).build()); 329 } 330 331 @Override 332 protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException { 333 if (packetWriter == null) { 334 throw new NotConnectedException(); 335 } 336 packetWriter.throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 337 } 338 339 @Override 340 protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException { 341 if (isConnected() && !disconnectedButResumeable) { 342 throw new AlreadyConnectedException(); 343 } 344 } 345 346 @Override 347 protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException { 348 if (isAuthenticated() && !disconnectedButResumeable) { 349 throw new AlreadyLoggedInException(); 350 } 351 } 352 353 @Override 354 protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException { 355 // Reset the flag in case it was set 356 disconnectedButResumeable = false; 357 super.afterSuccessfulLogin(resumed); 358 } 359 360 @Override 361 protected synchronized void loginNonAnonymously(String username, String password, String resource) throws XMPPException, SmackException, IOException { 362 if (saslAuthentication.hasNonAnonymousAuthentication()) { 363 // Authenticate using SASL 364 if (password != null) { 365 saslAuthentication.authenticate(username, password, resource); 366 } 367 else { 368 saslAuthentication.authenticate(resource, config.getCallbackHandler()); 369 } 370 } else { 371 throw new SmackException("No non-anonymous SASL authentication mechanism available"); 372 } 373 374 // If compression is enabled then request the server to use stream compression. XEP-170 375 // recommends to perform stream compression before resource binding. 376 if (config.isCompressionEnabled()) { 377 useCompression(); 378 } 379 380 if (isSmResumptionPossible()) { 381 smResumedSyncPoint.sendAndWaitForResponse(new Resume(clientHandledStanzasCount, smSessionId)); 382 if (smResumedSyncPoint.wasSuccessful()) { 383 // We successfully resumed the stream, be done here 384 afterSuccessfulLogin(true); 385 return; 386 } 387 // SM resumption failed, what Smack does here is to report success of 388 // lastFeaturesReceived in case of sm resumption was answered with 'failed' so that 389 // normal resource binding can be tried. 390 LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process"); 391 } 392 393 bindResourceAndEstablishSession(resource); 394 395 List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>(); 396 if (unacknowledgedStanzas != null) { 397 // There was a previous connection with SM enabled but that was either not resumable or 398 // failed to resume. Make sure that we (re-)send the unacknowledged stanzas. 399 unacknowledgedStanzas.drainTo(previouslyUnackedStanzas); 400 // Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this 401 // XMPP session (There maybe was an enabled in a previous XMPP session of this 402 // connection instance though). This is used in writePackets to decide if stanzas should 403 // be added to the unacknowledged stanzas queue, because they have to be added right 404 // after the 'enable' stream element has been sent. 405 unacknowledgedStanzas = null; 406 } 407 if (isSmAvailable() && useSm) { 408 // Remove what is maybe left from previously stream managed sessions 409 serverHandledStanzasCount = 0; 410 // XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed' 411 // then this is a non recoverable error and we therefore throw an exception. 412 smEnabledSyncPoint.sendAndWaitForResponseOrThrow(new Enable(useSmResumption, smClientMaxResumptionTime)); 413 synchronized (requestAckPredicates) { 414 if (requestAckPredicates.isEmpty()) { 415 // Assure that we have at lest one predicate set up that so that we request acks 416 // for the server and eventually flush some stanzas from the unacknowledged 417 // stanza queue 418 requestAckPredicates.add(Predicate.forMessagesOrAfter5Stanzas()); 419 } 420 } 421 } 422 // (Re-)send the stanzas *after* we tried to enable SM 423 for (Stanza stanza : previouslyUnackedStanzas) { 424 sendStanzaInternal(stanza); 425 } 426 427 afterSuccessfulLogin(false); 428 } 429 430 @Override 431 public synchronized void loginAnonymously() throws XMPPException, SmackException, IOException { 432 // Wait with SASL auth until the SASL mechanisms have been received 433 saslFeatureReceived.checkIfSuccessOrWaitOrThrow(); 434 435 if (saslAuthentication.hasAnonymousAuthentication()) { 436 saslAuthentication.authenticateAnonymously(); 437 } 438 else { 439 throw new SmackException("No anonymous SASL authentication mechanism available"); 440 } 441 442 // If compression is enabled then request the server to use stream compression 443 if (config.isCompressionEnabled()) { 444 useCompression(); 445 } 446 447 bindResourceAndEstablishSession(null); 448 449 afterSuccessfulLogin(false); 450 } 451 452 @Override 453 public boolean isSecureConnection() { 454 return usingTLS; 455 } 456 457 public boolean isSocketClosed() { 458 return socketClosed; 459 } 460 461 /** 462 * Shuts the current connection down. After this method returns, the connection must be ready 463 * for re-use by connect. 464 */ 465 @Override 466 protected void shutdown() { 467 if (isSmEnabled()) { 468 try { 469 // Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM 470 // state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either. 471 sendSmAcknowledgementInternal(); 472 } catch (NotConnectedException e) { 473 LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e); 474 } 475 } 476 shutdown(false); 477 } 478 479 /** 480 * Performs an unclean disconnect and shutdown of the connection. Does not send a closing stream stanza. 481 */ 482 public synchronized void instantShutdown() { 483 shutdown(true); 484 } 485 486 private void shutdown(boolean instant) { 487 if (disconnectedButResumeable) { 488 return; 489 } 490 if (packetReader != null) { 491 packetReader.shutdown(); 492 } 493 if (packetWriter != null) { 494 packetWriter.shutdown(instant); 495 } 496 497 // Set socketClosed to true. This will cause the PacketReader 498 // and PacketWriter to ignore any Exceptions that are thrown 499 // because of a read/write from/to a closed stream. 500 // It is *important* that this is done before socket.close()! 501 socketClosed = true; 502 try { 503 socket.close(); 504 } catch (Exception e) { 505 LOGGER.log(Level.WARNING, "shutdown", e); 506 } 507 508 setWasAuthenticated(); 509 // If we are able to resume the stream, then don't set 510 // connected/authenticated/usingTLS to false since we like behave like we are still 511 // connected (e.g. sendStanza should not throw a NotConnectedException). 512 if (isSmResumptionPossible() && instant) { 513 disconnectedButResumeable = true; 514 } else { 515 disconnectedButResumeable = false; 516 // Reset the stream management session id to null, since if the stream is cleanly closed, i.e. sending a closing 517 // stream tag, there is no longer a stream to resume. 518 smSessionId = null; 519 } 520 authenticated = false; 521 connected = false; 522 usingTLS = false; 523 reader = null; 524 writer = null; 525 526 maybeCompressFeaturesReceived.init(); 527 compressSyncPoint.init(); 528 smResumedSyncPoint.init(); 529 smEnabledSyncPoint.init(); 530 initalOpenStreamSend.init(); 531 } 532 533 @Override 534 public void send(PlainStreamElement element) throws NotConnectedException { 535 packetWriter.sendStreamElement(element); 536 } 537 538 @Override 539 protected void sendStanzaInternal(Stanza packet) throws NotConnectedException { 540 packetWriter.sendStreamElement(packet); 541 if (isSmEnabled()) { 542 for (StanzaFilter requestAckPredicate : requestAckPredicates) { 543 if (requestAckPredicate.accept(packet)) { 544 requestSmAcknowledgementInternal(); 545 break; 546 } 547 } 548 } 549 } 550 551 private void connectUsingConfiguration() throws IOException, ConnectionException { 552 List<HostAddress> failedAddresses = populateHostAddresses(); 553 SocketFactory socketFactory = config.getSocketFactory(); 554 if (socketFactory == null) { 555 socketFactory = SocketFactory.getDefault(); 556 } 557 for (HostAddress hostAddress : hostAddresses) { 558 String host = hostAddress.getFQDN(); 559 int port = hostAddress.getPort(); 560 socket = socketFactory.createSocket(); 561 try { 562 Iterator<InetAddress> inetAddresses = Arrays.asList(InetAddress.getAllByName(host)).iterator(); 563 if (!inetAddresses.hasNext()) { 564 // This should not happen 565 LOGGER.warning("InetAddress.getAllByName() returned empty result array."); 566 throw new UnknownHostException(host); 567 } 568 innerloop: while (inetAddresses.hasNext()) { 569 final InetAddress inetAddress = inetAddresses.next(); 570 final String inetAddressAndPort = inetAddress + " at port " + port; 571 LOGGER.finer("Trying to establish TCP connection to " + inetAddressAndPort); 572 try { 573 socket.connect(new InetSocketAddress(inetAddress, port), config.getConnectTimeout()); 574 } catch (Exception e) { 575 if (inetAddresses.hasNext()) { 576 continue innerloop; 577 } else { 578 throw e; 579 } 580 } 581 LOGGER.finer("Established TCP connection to " + inetAddressAndPort); 582 // We found a host to connect to, return here 583 this.host = host; 584 this.port = port; 585 return; 586 } 587 } 588 catch (Exception e) { 589 hostAddress.setException(e); 590 failedAddresses.add(hostAddress); 591 } 592 } 593 // There are no more host addresses to try 594 // throw an exception and report all tried 595 // HostAddresses in the exception 596 throw ConnectionException.from(failedAddresses); 597 } 598 599 /** 600 * Initializes the connection by creating a stanza(/packet) reader and writer and opening a 601 * XMPP stream to the server. 602 * 603 * @throws XMPPException if establishing a connection to the server fails. 604 * @throws SmackException if the server failes to respond back or if there is anther error. 605 * @throws IOException 606 */ 607 private void initConnection() throws IOException { 608 boolean isFirstInitialization = packetReader == null || packetWriter == null; 609 compressionHandler = null; 610 611 // Set the reader and writer instance variables 612 initReaderAndWriter(); 613 614 if (isFirstInitialization) { 615 packetWriter = new PacketWriter(); 616 packetReader = new PacketReader(); 617 618 // If debugging is enabled, we should start the thread that will listen for 619 // all packets and then log them. 620 if (config.isDebuggerEnabled()) { 621 addAsyncStanzaListener(debugger.getReaderListener(), null); 622 if (debugger.getWriterListener() != null) { 623 addPacketSendingListener(debugger.getWriterListener(), null); 624 } 625 } 626 } 627 // Start the packet writer. This will open an XMPP stream to the server 628 packetWriter.init(); 629 // Start the packet reader. The startup() method will block until we 630 // get an opening stream packet back from server 631 packetReader.init(); 632 633 if (isFirstInitialization) { 634 // Notify listeners that a new connection has been established 635 for (ConnectionCreationListener listener : getConnectionCreationListeners()) { 636 listener.connectionCreated(this); 637 } 638 } 639 } 640 641 private void initReaderAndWriter() throws IOException { 642 InputStream is = socket.getInputStream(); 643 OutputStream os = socket.getOutputStream(); 644 if (compressionHandler != null) { 645 is = compressionHandler.getInputStream(is); 646 os = compressionHandler.getOutputStream(os); 647 } 648 // OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter 649 writer = new OutputStreamWriter(os, "UTF-8"); 650 reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); 651 652 // If debugging is enabled, we open a window and write out all network traffic. 653 initDebugger(); 654 } 655 656 /** 657 * The server has indicated that TLS negotiation can start. We now need to secure the 658 * existing plain connection and perform a handshake. This method won't return until the 659 * connection has finished the handshake or an error occurred while securing the connection. 660 * @throws IOException 661 * @throws CertificateException 662 * @throws NoSuchAlgorithmException 663 * @throws NoSuchProviderException 664 * @throws KeyStoreException 665 * @throws UnrecoverableKeyException 666 * @throws KeyManagementException 667 * @throws SmackException 668 * @throws Exception if an exception occurs. 669 */ 670 private void proceedTLSReceived() throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException, NoSuchProviderException, UnrecoverableKeyException, KeyManagementException, SmackException { 671 SSLContext context = this.config.getCustomSSLContext(); 672 KeyStore ks = null; 673 KeyManager[] kms = null; 674 PasswordCallback pcb = null; 675 676 if(config.getCallbackHandler() == null) { 677 ks = null; 678 } else if (context == null) { 679 if(config.getKeystoreType().equals("NONE")) { 680 ks = null; 681 pcb = null; 682 } 683 else if(config.getKeystoreType().equals("PKCS11")) { 684 try { 685 Constructor<?> c = Class.forName("sun.security.pkcs11.SunPKCS11").getConstructor(InputStream.class); 686 String pkcs11Config = "name = SmartCard\nlibrary = "+config.getPKCS11Library(); 687 ByteArrayInputStream config = new ByteArrayInputStream(pkcs11Config.getBytes()); 688 Provider p = (Provider)c.newInstance(config); 689 Security.addProvider(p); 690 ks = KeyStore.getInstance("PKCS11",p); 691 pcb = new PasswordCallback("PKCS11 Password: ",false); 692 this.config.getCallbackHandler().handle(new Callback[]{pcb}); 693 ks.load(null,pcb.getPassword()); 694 } 695 catch (Exception e) { 696 ks = null; 697 pcb = null; 698 } 699 } 700 else if(config.getKeystoreType().equals("Apple")) { 701 ks = KeyStore.getInstance("KeychainStore","Apple"); 702 ks.load(null,null); 703 //pcb = new PasswordCallback("Apple Keychain",false); 704 //pcb.setPassword(null); 705 } 706 else { 707 ks = KeyStore.getInstance(config.getKeystoreType()); 708 try { 709 pcb = new PasswordCallback("Keystore Password: ",false); 710 config.getCallbackHandler().handle(new Callback[]{pcb}); 711 ks.load(new FileInputStream(config.getKeystorePath()), pcb.getPassword()); 712 } 713 catch(Exception e) { 714 ks = null; 715 pcb = null; 716 } 717 } 718 KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509"); 719 try { 720 if(pcb == null) { 721 kmf.init(ks,null); 722 } else { 723 kmf.init(ks,pcb.getPassword()); 724 pcb.clearPassword(); 725 } 726 kms = kmf.getKeyManagers(); 727 } catch (NullPointerException npe) { 728 kms = null; 729 } 730 } 731 732 // If the user didn't specify a SSLContext, use the default one 733 if (context == null) { 734 context = SSLContext.getInstance("TLS"); 735 context.init(kms, null, new java.security.SecureRandom()); 736 } 737 Socket plain = socket; 738 // Secure the plain connection 739 socket = context.getSocketFactory().createSocket(plain, 740 host, plain.getPort(), true); 741 // Initialize the reader and writer with the new secured version 742 initReaderAndWriter(); 743 744 final SSLSocket sslSocket = (SSLSocket) socket; 745 TLSUtils.setEnabledProtocolsAndCiphers(sslSocket, config.getEnabledSSLProtocols(), config.getEnabledSSLCiphers()); 746 747 // Proceed to do the handshake 748 sslSocket.startHandshake(); 749 750 final HostnameVerifier verifier = getConfiguration().getHostnameVerifier(); 751 if (verifier == null) { 752 throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure."); 753 } else if (!verifier.verify(getServiceName(), sslSocket.getSession())) { 754 throw new CertificateException("Hostname verification of certificate failed. Certificate does not authenticate " + getServiceName()); 755 } 756 757 // Set that TLS was successful 758 usingTLS = true; 759 } 760 761 /** 762 * Returns the compression handler that can be used for one compression methods offered by the server. 763 * 764 * @return a instance of XMPPInputOutputStream or null if no suitable instance was found 765 * 766 */ 767 private XMPPInputOutputStream maybeGetCompressionHandler() { 768 Compress.Feature compression = getFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE); 769 if (compression == null) { 770 // Server does not support compression 771 return null; 772 } 773 for (XMPPInputOutputStream handler : SmackConfiguration.getCompresionHandlers()) { 774 String method = handler.getCompressionMethod(); 775 if (compression.getMethods().contains(method)) 776 return handler; 777 } 778 return null; 779 } 780 781 @Override 782 public boolean isUsingCompression() { 783 return compressionHandler != null && compressSyncPoint.wasSuccessful(); 784 } 785 786 /** 787 * <p> 788 * Starts using stream compression that will compress network traffic. Traffic can be 789 * reduced up to 90%. Therefore, stream compression is ideal when using a slow speed network 790 * connection. However, the server and the client will need to use more CPU time in order to 791 * un/compress network data so under high load the server performance might be affected. 792 * </p> 793 * <p> 794 * Stream compression has to have been previously offered by the server. Currently only the 795 * zlib method is supported by the client. Stream compression negotiation has to be done 796 * before authentication took place. 797 * </p> 798 * 799 * @throws NotConnectedException 800 * @throws XMPPException 801 * @throws NoResponseException 802 */ 803 private void useCompression() throws NotConnectedException, NoResponseException, XMPPException { 804 maybeCompressFeaturesReceived.checkIfSuccessOrWait(); 805 // If stream compression was offered by the server and we want to use 806 // compression then send compression request to the server 807 if ((compressionHandler = maybeGetCompressionHandler()) != null) { 808 compressSyncPoint.sendAndWaitForResponseOrThrow(new Compress(compressionHandler.getCompressionMethod())); 809 } else { 810 LOGGER.warning("Could not enable compression because no matching handler/method pair was found"); 811 } 812 } 813 814 /** 815 * Establishes a connection to the XMPP server and performs an automatic login 816 * only if the previous connection state was logged (authenticated). It basically 817 * creates and maintains a socket connection to the server.<p> 818 * <p/> 819 * Listeners will be preserved from a previous connection if the reconnection 820 * occurs after an abrupt termination. 821 * 822 * @throws XMPPException if an error occurs while trying to establish the connection. 823 * @throws SmackException 824 * @throws IOException 825 */ 826 @Override 827 protected void connectInternal() throws SmackException, IOException, XMPPException { 828 // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if 829 // there is an error establishing the connection 830 connectUsingConfiguration(); 831 832 // We connected successfully to the servers TCP port 833 socketClosed = false; 834 initConnection(); 835 836 // Wait with SASL auth until the SASL mechanisms have been received 837 saslFeatureReceived.checkIfSuccessOrWaitOrThrow(); 838 839 // Make note of the fact that we're now connected. 840 connected = true; 841 callConnectionConnectedListener(); 842 843 // Automatically makes the login if the user was previously connected successfully 844 // to the server and the connection was terminated abruptly 845 if (wasAuthenticated) { 846 login(); 847 notifyReconnection(); 848 } 849 } 850 851 /** 852 * Sends out a notification that there was an error with the connection 853 * and closes the connection. Also prints the stack trace of the given exception 854 * 855 * @param e the exception that causes the connection close event. 856 */ 857 private synchronized void notifyConnectionError(Exception e) { 858 // Listeners were already notified of the exception, return right here. 859 if ((packetReader == null || packetReader.done) && 860 (packetWriter == null || packetWriter.done())) return; 861 862 // Closes the connection temporary. A reconnection is possible 863 instantShutdown(); 864 865 // Notify connection listeners of the error. 866 callConnectionClosedOnErrorListener(e); 867 } 868 869 /** 870 * For unit testing purposes 871 * 872 * @param writer 873 */ 874 protected void setWriter(Writer writer) { 875 this.writer = writer; 876 } 877 878 @Override 879 protected void afterFeaturesReceived() throws SecurityRequiredException, NotConnectedException { 880 StartTls startTlsFeature = getFeature(StartTls.ELEMENT, StartTls.NAMESPACE); 881 if (startTlsFeature != null) { 882 if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) { 883 notifyConnectionError(new SecurityRequiredByServerException()); 884 return; 885 } 886 887 if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) { 888 send(new StartTls()); 889 } 890 } 891 // If TLS is required but the server doesn't offer it, disconnect 892 // from the server and throw an error. First check if we've already negotiated TLS 893 // and are secure, however (features get parsed a second time after TLS is established). 894 if (!isSecureConnection() && startTlsFeature == null 895 && getConfiguration().getSecurityMode() == SecurityMode.required) { 896 throw new SecurityRequiredByClientException(); 897 } 898 899 if (getSASLAuthentication().authenticationSuccessful()) { 900 // If we have received features after the SASL has been successfully completed, then we 901 // have also *maybe* received, as it is an optional feature, the compression feature 902 // from the server. 903 maybeCompressFeaturesReceived.reportSuccess(); 904 } 905 } 906 907 /** 908 * Resets the parser using the latest connection's reader. Reseting the parser is necessary 909 * when the plain connection has been secured or when a new opening stream element is going 910 * to be sent by the server. 911 * 912 * @throws SmackException if the parser could not be reset. 913 */ 914 void openStream() throws SmackException { 915 // If possible, provide the receiving entity of the stream open tag, i.e. the server, as much information as 916 // possible. The 'to' attribute is *always* available. The 'from' attribute if set by the user and no external 917 // mechanism is used to determine the local entity (user). And the 'id' attribute is available after the first 918 // response from the server (see e.g. RFC 6120 ยง 9.1.1 Step 2.) 919 CharSequence to = getServiceName(); 920 CharSequence from = null; 921 CharSequence localpart = config.getUsername(); 922 if (localpart != null) { 923 from = XmppStringUtils.completeJidFrom(localpart, to); 924 } 925 String id = getStreamId(); 926 send(new StreamOpen(to, from, id)); 927 try { 928 packetReader.parser = PacketParserUtils.newXmppParser(reader); 929 } 930 catch (XmlPullParserException e) { 931 throw new SmackException(e); 932 } 933 } 934 935 protected class PacketReader { 936 937 XmlPullParser parser; 938 939 private volatile boolean done; 940 941 /** 942 * Initializes the reader in order to be used. The reader is initialized during the 943 * first connection and when reconnecting due to an abruptly disconnection. 944 */ 945 void init() { 946 done = false; 947 948 Async.go(new Runnable() { 949 public void run() { 950 parsePackets(); 951 } 952 }, "Smack Packet Reader (" + getConnectionCounter() + ")"); 953 } 954 955 /** 956 * Shuts the stanza(/packet) reader down. This method simply sets the 'done' flag to true. 957 */ 958 void shutdown() { 959 done = true; 960 } 961 962 /** 963 * Parse top-level packets in order to process them further. 964 * 965 * @param thread the thread that is being used by the reader to parse incoming packets. 966 */ 967 private void parsePackets() { 968 try { 969 initalOpenStreamSend.checkIfSuccessOrWait(); 970 int eventType = parser.getEventType(); 971 while (!done) { 972 switch (eventType) { 973 case XmlPullParser.START_TAG: 974 final String name = parser.getName(); 975 switch (name) { 976 case Message.ELEMENT: 977 case IQ.IQ_ELEMENT: 978 case Presence.ELEMENT: 979 try { 980 parseAndProcessStanza(parser); 981 } finally { 982 clientHandledStanzasCount = SMUtils.incrementHeight(clientHandledStanzasCount); 983 } 984 break; 985 case "stream": 986 // We found an opening stream. 987 if ("jabber:client".equals(parser.getNamespace(null))) { 988 streamId = parser.getAttributeValue("", "id"); 989 String reportedServiceName = parser.getAttributeValue("", "from"); 990 assert(reportedServiceName.equals(config.getServiceName())); 991 } 992 break; 993 case "error": 994 throw new StreamErrorException(PacketParserUtils.parseStreamError(parser)); 995 case "features": 996 parseFeatures(parser); 997 break; 998 case "proceed": 999 try { 1000 // Secure the connection by negotiating TLS 1001 proceedTLSReceived(); 1002 // Send a new opening stream to the server 1003 openStream(); 1004 } 1005 catch (Exception e) { 1006 // We report any failure regarding TLS in the second stage of XMPP 1007 // connection establishment, namely the SASL authentication 1008 saslFeatureReceived.reportFailure(new SmackException(e)); 1009 throw e; 1010 } 1011 break; 1012 case "failure": 1013 String namespace = parser.getNamespace(null); 1014 switch (namespace) { 1015 case "urn:ietf:params:xml:ns:xmpp-tls": 1016 // TLS negotiation has failed. The server will close the connection 1017 // TODO Parse failure stanza 1018 throw new XMPPErrorException("TLS negotiation has failed", null); 1019 case "http://jabber.org/protocol/compress": 1020 // Stream compression has been denied. This is a recoverable 1021 // situation. It is still possible to authenticate and 1022 // use the connection but using an uncompressed connection 1023 // TODO Parse failure stanza 1024 compressSyncPoint.reportFailure(new XMPPErrorException( 1025 "Could not establish compression", null)); 1026 break; 1027 case SaslStreamElements.NAMESPACE: 1028 // SASL authentication has failed. The server may close the connection 1029 // depending on the number of retries 1030 final SASLFailure failure = PacketParserUtils.parseSASLFailure(parser); 1031 getSASLAuthentication().authenticationFailed(failure); 1032 break; 1033 } 1034 break; 1035 case Challenge.ELEMENT: 1036 // The server is challenging the SASL authentication made by the client 1037 String challengeData = parser.nextText(); 1038 getSASLAuthentication().challengeReceived(challengeData); 1039 break; 1040 case Success.ELEMENT: 1041 Success success = new Success(parser.nextText()); 1042 // We now need to bind a resource for the connection 1043 // Open a new stream and wait for the response 1044 openStream(); 1045 // The SASL authentication with the server was successful. The next step 1046 // will be to bind the resource 1047 getSASLAuthentication().authenticated(success); 1048 break; 1049 case Compressed.ELEMENT: 1050 // Server confirmed that it's possible to use stream compression. Start 1051 // stream compression 1052 // Initialize the reader and writer with the new compressed version 1053 initReaderAndWriter(); 1054 // Send a new opening stream to the server 1055 openStream(); 1056 // Notify that compression is being used 1057 compressSyncPoint.reportSuccess(); 1058 break; 1059 case Enabled.ELEMENT: 1060 Enabled enabled = ParseStreamManagement.enabled(parser); 1061 if (enabled.isResumeSet()) { 1062 smSessionId = enabled.getId(); 1063 if (StringUtils.isNullOrEmpty(smSessionId)) { 1064 XMPPErrorException xmppException = new XMPPErrorException( 1065 "Stream Management 'enabled' element with resume attribute but without session id received", 1066 new XMPPError( 1067 XMPPError.Condition.bad_request)); 1068 smEnabledSyncPoint.reportFailure(xmppException); 1069 throw xmppException; 1070 } 1071 smServerMaxResumptimTime = enabled.getMaxResumptionTime(); 1072 } else { 1073 // Mark this a non-resumable stream by setting smSessionId to null 1074 smSessionId = null; 1075 } 1076 clientHandledStanzasCount = 0; 1077 smWasEnabledAtLeastOnce = true; 1078 smEnabledSyncPoint.reportSuccess(); 1079 LOGGER.fine("Stream Management (XEP-198): succesfully enabled"); 1080 break; 1081 case Failed.ELEMENT: 1082 Failed failed = ParseStreamManagement.failed(parser); 1083 XMPPError xmppError = new XMPPError(failed.getXMPPErrorCondition()); 1084 XMPPException xmppException = new XMPPErrorException("Stream Management failed", xmppError); 1085 // If only XEP-198 would specify different failure elements for the SM 1086 // enable and SM resume failure case. But this is not the case, so we 1087 // need to determine if this is a 'Failed' response for either 'Enable' 1088 // or 'Resume'. 1089 if (smResumedSyncPoint.requestSent()) { 1090 smResumedSyncPoint.reportFailure(xmppException); 1091 } 1092 else { 1093 if (!smEnabledSyncPoint.requestSent()) { 1094 throw new IllegalStateException("Failed element received but SM was not previously enabled"); 1095 } 1096 smEnabledSyncPoint.reportFailure(xmppException); 1097 // Report success for last lastFeaturesReceived so that in case a 1098 // failed resumption, we can continue with normal resource binding. 1099 // See text of XEP-198 5. below Example 11. 1100 lastFeaturesReceived.reportSuccess(); 1101 } 1102 break; 1103 case Resumed.ELEMENT: 1104 Resumed resumed = ParseStreamManagement.resumed(parser); 1105 if (!smSessionId.equals(resumed.getPrevId())) { 1106 throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId()); 1107 } 1108 // First, drop the stanzas already handled by the server 1109 processHandledCount(resumed.getHandledCount()); 1110 // Then re-send what is left in the unacknowledged queue 1111 List<Stanza> stanzasToResend = new LinkedList<Stanza>(); 1112 stanzasToResend.addAll(unacknowledgedStanzas); 1113 for (Stanza stanza : stanzasToResend) { 1114 packetWriter.sendStreamElement(stanza); 1115 } 1116 smResumedSyncPoint.reportSuccess(); 1117 smEnabledSyncPoint.reportSuccess(); 1118 // If there where stanzas resent, then request a SM ack for them. 1119 // Writer's sendStreamElement() won't do it automatically based on 1120 // predicates. 1121 if (!stanzasToResend.isEmpty()) { 1122 requestSmAcknowledgementInternal(); 1123 } 1124 LOGGER.fine("Stream Management (XEP-198): Stream resumed"); 1125 break; 1126 case AckAnswer.ELEMENT: 1127 AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser); 1128 processHandledCount(ackAnswer.getHandledCount()); 1129 break; 1130 case AckRequest.ELEMENT: 1131 ParseStreamManagement.ackRequest(parser); 1132 if (smEnabledSyncPoint.wasSuccessful()) { 1133 sendSmAcknowledgementInternal(); 1134 } else { 1135 LOGGER.warning("SM Ack Request received while SM is not enabled"); 1136 } 1137 break; 1138 default: 1139 LOGGER.warning("Unknown top level stream element: " + name); 1140 break; 1141 } 1142 break; 1143 case XmlPullParser.END_TAG: 1144 if (parser.getName().equals("stream")) { 1145 // Disconnect the connection 1146 disconnect(); 1147 } 1148 break; 1149 case XmlPullParser.END_DOCUMENT: 1150 // END_DOCUMENT only happens in an error case, as otherwise we would see a 1151 // closing stream element before. 1152 throw new SmackException( 1153 "Parser got END_DOCUMENT event. This could happen e.g. if the server closed the connection without sending a closing stream element"); 1154 } 1155 eventType = parser.next(); 1156 } 1157 } 1158 catch (Exception e) { 1159 // The exception can be ignored if the the connection is 'done' 1160 // or if the it was caused because the socket got closed 1161 if (!(done || isSocketClosed())) { 1162 // Close the connection and notify connection listeners of the 1163 // error. 1164 notifyConnectionError(e); 1165 } 1166 } 1167 } 1168 } 1169 1170 protected class PacketWriter { 1171 public static final int QUEUE_SIZE = XMPPTCPConnection.QUEUE_SIZE; 1172 1173 private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<Element>( 1174 QUEUE_SIZE, true); 1175 1176 /** 1177 * Needs to be protected for unit testing purposes. 1178 */ 1179 protected SynchronizationPoint<NoResponseException> shutdownDone = new SynchronizationPoint<NoResponseException>( 1180 XMPPTCPConnection.this); 1181 1182 /** 1183 * If set, the stanza(/packet) writer is shut down 1184 */ 1185 protected volatile Long shutdownTimestamp = null; 1186 1187 private volatile boolean instantShutdown; 1188 1189 /** 1190 * True if some preconditions are given to start the bundle and defer mechanism. 1191 * <p> 1192 * This will likely get set to true right after the start of the writer thread, because 1193 * {@link #nextStreamElement()} will check if {@link queue} is empty, which is probably the case, and then set 1194 * this field to true. 1195 * </p> 1196 */ 1197 private boolean shouldBundleAndDefer; 1198 1199 /** 1200 * Initializes the writer in order to be used. It is called at the first connection and also 1201 * is invoked if the connection is disconnected by an error. 1202 */ 1203 void init() { 1204 shutdownDone.init(); 1205 shutdownTimestamp = null; 1206 1207 if (unacknowledgedStanzas != null) { 1208 // It's possible that there are new stanzas in the writer queue that 1209 // came in while we were disconnected but resumable, drain those into 1210 // the unacknowledged queue so that they get resent now 1211 drainWriterQueueToUnacknowledgedStanzas(); 1212 } 1213 1214 queue.start(); 1215 Async.go(new Runnable() { 1216 @Override 1217 public void run() { 1218 writePackets(); 1219 } 1220 }, "Smack Packet Writer (" + getConnectionCounter() + ")"); 1221 } 1222 1223 private boolean done() { 1224 return shutdownTimestamp != null; 1225 } 1226 1227 protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException { 1228 if (done() && !isSmResumptionPossible()) { 1229 // Don't throw a NotConnectedException is there is an resumable stream available 1230 throw new NotConnectedException(); 1231 } 1232 } 1233 1234 /** 1235 * Sends the specified element to the server. 1236 * 1237 * @param element the element to send. 1238 * @throws NotConnectedException 1239 */ 1240 protected void sendStreamElement(Element element) throws NotConnectedException { 1241 throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 1242 1243 boolean enqueued = false; 1244 while (!enqueued) { 1245 try { 1246 queue.put(element); 1247 enqueued = true; 1248 } 1249 catch (InterruptedException e) { 1250 throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 1251 // If the method above did not throw, then the sending thread was interrupted 1252 // TODO in a later version of Smack the InterruptedException should be thrown to 1253 // allow users to interrupt a sending thread that is currently blocking because 1254 // the queue is full. 1255 LOGGER.log(Level.WARNING, "Sending thread was interrupted", e); 1256 } 1257 } 1258 } 1259 1260 /** 1261 * Shuts down the stanza(/packet) writer. Once this method has been called, no further 1262 * packets will be written to the server. 1263 */ 1264 void shutdown(boolean instant) { 1265 instantShutdown = instant; 1266 shutdownTimestamp = System.currentTimeMillis(); 1267 queue.shutdown(); 1268 try { 1269 shutdownDone.checkIfSuccessOrWait(); 1270 } 1271 catch (NoResponseException e) { 1272 LOGGER.log(Level.WARNING, "shutdownDone was not marked as successful by the writer thread", e); 1273 } 1274 } 1275 1276 /** 1277 * Maybe return the next available element from the queue for writing. If the queue is shut down <b>or</b> a 1278 * spurious interrupt occurs, <code>null</code> is returned. So it is important to check the 'done' condition in 1279 * that case. 1280 * 1281 * @return the next element for writing or null. 1282 */ 1283 private Element nextStreamElement() { 1284 // It is important the we check if the queue is empty before removing an element from it 1285 if (queue.isEmpty()) { 1286 shouldBundleAndDefer = true; 1287 } 1288 Element packet = null; 1289 try { 1290 packet = queue.take(); 1291 } 1292 catch (InterruptedException e) { 1293 if (!queue.isShutdown()) { 1294 // Users shouldn't try to interrupt the packet writer thread 1295 LOGGER.log(Level.WARNING, "Packet writer thread was interrupted. Don't do that. Use disconnect() instead.", e); 1296 } 1297 } 1298 return packet; 1299 } 1300 1301 private void writePackets() { 1302 try { 1303 openStream(); 1304 initalOpenStreamSend.reportSuccess(); 1305 // Write out packets from the queue. 1306 while (!done()) { 1307 Element element = nextStreamElement(); 1308 if (element == null) { 1309 continue; 1310 } 1311 1312 // Get a local version of the bundle and defer callback, in case it's unset 1313 // between the null check and the method invocation 1314 final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback; 1315 // If the preconditions are given (e.g. bundleAndDefer callback is set, queue is 1316 // empty), then we could wait a bit for further stanzas attempting to decrease 1317 // our energy consumption 1318 if (localBundleAndDeferCallback != null && isAuthenticated() && shouldBundleAndDefer) { 1319 // Reset shouldBundleAndDefer to false, nextStreamElement() will set it to true once the 1320 // queue is empty again. 1321 shouldBundleAndDefer = false; 1322 final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean(); 1323 final int bundleAndDeferMillis = localBundleAndDeferCallback.getBundleAndDeferMillis(new BundleAndDefer( 1324 bundlingAndDeferringStopped)); 1325 if (bundleAndDeferMillis > 0) { 1326 long remainingWait = bundleAndDeferMillis; 1327 final long waitStart = System.currentTimeMillis(); 1328 synchronized (bundlingAndDeferringStopped) { 1329 while (!bundlingAndDeferringStopped.get() && remainingWait > 0) { 1330 bundlingAndDeferringStopped.wait(remainingWait); 1331 remainingWait = bundleAndDeferMillis 1332 - (System.currentTimeMillis() - waitStart); 1333 } 1334 } 1335 } 1336 } 1337 1338 Stanza packet = null; 1339 if (element instanceof Stanza) { 1340 packet = (Stanza) element; 1341 } 1342 else if (element instanceof Enable) { 1343 // The client needs to add messages to the unacknowledged stanzas queue 1344 // right after it sent 'enabled'. Stanza will be added once 1345 // unacknowledgedStanzas is not null. 1346 unacknowledgedStanzas = new ArrayBlockingQueue<>(QUEUE_SIZE); 1347 } 1348 // Check if the stream element should be put to the unacknowledgedStanza 1349 // queue. Note that we can not do the put() in sendStanzaInternal() and the 1350 // packet order is not stable at this point (sendStanzaInternal() can be 1351 // called concurrently). 1352 if (unacknowledgedStanzas != null && packet != null) { 1353 // If the unacknowledgedStanza queue is nearly full, request an new ack 1354 // from the server in order to drain it 1355 if (unacknowledgedStanzas.size() == 0.8 * XMPPTCPConnection.QUEUE_SIZE) { 1356 writer.write(AckRequest.INSTANCE.toXML().toString()); 1357 writer.flush(); 1358 } 1359 try { 1360 // It is important the we put the stanza in the unacknowledged stanza 1361 // queue before we put it on the wire 1362 unacknowledgedStanzas.put(packet); 1363 } 1364 catch (InterruptedException e) { 1365 throw new IllegalStateException(e); 1366 } 1367 } 1368 writer.write(element.toXML().toString()); 1369 if (queue.isEmpty()) { 1370 writer.flush(); 1371 } 1372 if (packet != null) { 1373 firePacketSendingListeners(packet); 1374 } 1375 } 1376 if (!instantShutdown) { 1377 // Flush out the rest of the queue. 1378 try { 1379 while (!queue.isEmpty()) { 1380 Element packet = queue.remove(); 1381 writer.write(packet.toXML().toString()); 1382 } 1383 writer.flush(); 1384 } 1385 catch (Exception e) { 1386 LOGGER.log(Level.WARNING, 1387 "Exception flushing queue during shutdown, ignore and continue", 1388 e); 1389 } 1390 1391 // Close the stream. 1392 try { 1393 writer.write("</stream:stream>"); 1394 writer.flush(); 1395 } 1396 catch (Exception e) { 1397 LOGGER.log(Level.WARNING, "Exception writing closing stream element", e); 1398 } 1399 // Delete the queue contents (hopefully nothing is left). 1400 queue.clear(); 1401 } else if (instantShutdown && isSmEnabled()) { 1402 // This was an instantShutdown and SM is enabled, drain all remaining stanzas 1403 // into the unacknowledgedStanzas queue 1404 drainWriterQueueToUnacknowledgedStanzas(); 1405 } 1406 1407 try { 1408 writer.close(); 1409 } 1410 catch (Exception e) { 1411 // Do nothing 1412 } 1413 1414 } 1415 catch (Exception e) { 1416 // The exception can be ignored if the the connection is 'done' 1417 // or if the it was caused because the socket got closed 1418 if (!(done() || isSocketClosed())) { 1419 notifyConnectionError(e); 1420 } else { 1421 LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e); 1422 } 1423 } finally { 1424 LOGGER.fine("Reporting shutdownDone success in writer thread"); 1425 shutdownDone.reportSuccess(); 1426 } 1427 } 1428 1429 private void drainWriterQueueToUnacknowledgedStanzas() { 1430 List<Element> elements = new ArrayList<Element>(queue.size()); 1431 queue.drainTo(elements); 1432 for (Element element : elements) { 1433 if (element instanceof Stanza) { 1434 unacknowledgedStanzas.add((Stanza) element); 1435 } 1436 } 1437 } 1438 } 1439 1440 /** 1441 * Set if Stream Management should be used by default for new connections. 1442 * 1443 * @param useSmDefault true to use Stream Management for new connections. 1444 */ 1445 public static void setUseStreamManagementDefault(boolean useSmDefault) { 1446 XMPPTCPConnection.useSmDefault = useSmDefault; 1447 } 1448 1449 /** 1450 * Set if Stream Management resumption should be used by default for new connections. 1451 * 1452 * @param useSmResumptionDefault true to use Stream Management resumption for new connections. 1453 */ 1454 public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) { 1455 if (useSmResumptionDefault) { 1456 // Also enable SM is resumption is enabled 1457 setUseStreamManagementDefault(useSmResumptionDefault); 1458 } 1459 XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault; 1460 } 1461 1462 /** 1463 * Set if Stream Management should be used if supported by the server. 1464 * 1465 * @param useSm true to use Stream Management. 1466 */ 1467 public void setUseStreamManagement(boolean useSm) { 1468 this.useSm = useSm; 1469 } 1470 1471 /** 1472 * Set if Stream Management resumption should be used if supported by the server. 1473 * 1474 * @param useSmResumption true to use Stream Management resumption. 1475 */ 1476 public void setUseStreamManagementResumption(boolean useSmResumption) { 1477 if (useSmResumption) { 1478 // Also enable SM is resumption is enabled 1479 setUseStreamManagement(useSmResumption); 1480 } 1481 this.useSmResumption = useSmResumption; 1482 } 1483 1484 /** 1485 * Set the preferred resumption time in seconds. 1486 * @param resumptionTime the preferred resumption time in seconds 1487 */ 1488 public void setPreferredResumptionTime(int resumptionTime) { 1489 smClientMaxResumptionTime = resumptionTime; 1490 } 1491 1492 /** 1493 * Add a predicate for Stream Management acknowledgment requests. 1494 * <p> 1495 * Those predicates are used to determine when a Stream Management acknowledgement request is send to the server. 1496 * Some pre-defined predicates are found in the <code>org.jivesoftware.smack.sm.predicates</code> package. 1497 * </p> 1498 * <p> 1499 * If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used. 1500 * </p> 1501 * 1502 * @param predicate the predicate to add. 1503 * @return if the predicate was not already active. 1504 */ 1505 public boolean addRequestAckPredicate(StanzaFilter predicate) { 1506 synchronized (requestAckPredicates) { 1507 return requestAckPredicates.add(predicate); 1508 } 1509 } 1510 1511 /** 1512 * Remove the given predicate for Stream Management acknowledgment request. 1513 * @param predicate the predicate to remove. 1514 * @return true if the predicate was removed. 1515 */ 1516 public boolean removeRequestAckPredicate(StanzaFilter predicate) { 1517 synchronized (requestAckPredicates) { 1518 return requestAckPredicates.remove(predicate); 1519 } 1520 } 1521 1522 /** 1523 * Remove all predicates for Stream Management acknowledgment requests. 1524 */ 1525 public void removeAllRequestAckPredicates() { 1526 synchronized (requestAckPredicates) { 1527 requestAckPredicates.clear(); 1528 } 1529 } 1530 1531 /** 1532 * Send an unconditional Stream Management acknowledgement request to the server. 1533 * 1534 * @throws StreamManagementNotEnabledException if Stream Mangement is not enabled. 1535 * @throws NotConnectedException if the connection is not connected. 1536 */ 1537 public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException { 1538 if (!isSmEnabled()) { 1539 throw new StreamManagementException.StreamManagementNotEnabledException(); 1540 } 1541 requestSmAcknowledgementInternal(); 1542 } 1543 1544 private void requestSmAcknowledgementInternal() throws NotConnectedException { 1545 packetWriter.sendStreamElement(AckRequest.INSTANCE); 1546 } 1547 1548 /** 1549 * Send a unconditional Stream Management acknowledgment to the server. 1550 * <p> 1551 * See <a href="http://xmpp.org/extensions/xep-0198.html#acking">XEP-198: Stream Management ยง 4. Acks</a>: 1552 * "Either party MAY send an <a/> element at any time (e.g., after it has received a certain number of stanzas, 1553 * or after a certain period of time), even if it has not received an <r/> element from the other party." 1554 * </p> 1555 * 1556 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1557 * @throws NotConnectedException if the connection is not connected. 1558 */ 1559 public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException { 1560 if (!isSmEnabled()) { 1561 throw new StreamManagementException.StreamManagementNotEnabledException(); 1562 } 1563 sendSmAcknowledgementInternal(); 1564 } 1565 1566 private void sendSmAcknowledgementInternal() throws NotConnectedException { 1567 packetWriter.sendStreamElement(new AckAnswer(clientHandledStanzasCount)); 1568 } 1569 1570 /** 1571 * Add a Stanza acknowledged listener. 1572 * <p> 1573 * Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get 1574 * automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, StanzaListener)} when 1575 * possible. 1576 * </p> 1577 * 1578 * @param listener the listener to add. 1579 */ 1580 public void addStanzaAcknowledgedListener(StanzaListener listener) { 1581 stanzaAcknowledgedListeners.add(listener); 1582 } 1583 1584 /** 1585 * Remove the given Stanza acknowledged listener. 1586 * 1587 * @param listener the listener. 1588 * @return true if the listener was removed. 1589 */ 1590 public boolean removeStanzaAcknowledgedListener(StanzaListener listener) { 1591 return stanzaAcknowledgedListeners.remove(listener); 1592 } 1593 1594 /** 1595 * Remove all stanza acknowledged listeners. 1596 */ 1597 public void removeAllStanzaAcknowledgedListeners() { 1598 stanzaAcknowledgedListeners.clear(); 1599 } 1600 1601 /** 1602 * Add a new Stanza ID acknowledged listener for the given ID. 1603 * <p> 1604 * The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will 1605 * automatically be removed after the listener was run. 1606 * </p> 1607 * 1608 * @param id the stanza ID. 1609 * @param listener the listener to invoke. 1610 * @return the previous listener for this stanza ID or null. 1611 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1612 */ 1613 public StanzaListener addStanzaIdAcknowledgedListener(final String id, StanzaListener listener) throws StreamManagementNotEnabledException { 1614 // Prevent users from adding callbacks that will never get removed 1615 if (!smWasEnabledAtLeastOnce) { 1616 throw new StreamManagementException.StreamManagementNotEnabledException(); 1617 } 1618 // Remove the listener after max. 12 hours 1619 final int removeAfterSeconds = Math.min(getMaxSmResumptionTime(), 12 * 60 * 60); 1620 schedule(new Runnable() { 1621 @Override 1622 public void run() { 1623 stanzaIdAcknowledgedListeners.remove(id); 1624 } 1625 }, removeAfterSeconds, TimeUnit.SECONDS); 1626 return stanzaIdAcknowledgedListeners.put(id, listener); 1627 } 1628 1629 /** 1630 * Remove the Stanza ID acknowledged listener for the given ID. 1631 * 1632 * @param id the stanza ID. 1633 * @return true if the listener was found and removed, false otherwise. 1634 */ 1635 public StanzaListener removeStanzaIdAcknowledgedListener(String id) { 1636 return stanzaIdAcknowledgedListeners.remove(id); 1637 } 1638 1639 /** 1640 * Removes all Stanza ID acknowledged listeners. 1641 */ 1642 public void removeAllStanzaIdAcknowledgedListeners() { 1643 stanzaIdAcknowledgedListeners.clear(); 1644 } 1645 1646 /** 1647 * Returns true if Stream Management is supported by the server. 1648 * 1649 * @return true if Stream Management is supported by the server. 1650 */ 1651 public boolean isSmAvailable() { 1652 return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE); 1653 } 1654 1655 /** 1656 * Returns true if Stream Management was successfully negotiated with the server. 1657 * 1658 * @return true if Stream Management was negotiated. 1659 */ 1660 public boolean isSmEnabled() { 1661 return smEnabledSyncPoint.wasSuccessful(); 1662 } 1663 1664 /** 1665 * Returns true if the stream was successfully resumed with help of Stream Management. 1666 * 1667 * @return true if the stream was resumed. 1668 */ 1669 public boolean streamWasResumed() { 1670 return smResumedSyncPoint.wasSuccessful(); 1671 } 1672 1673 /** 1674 * Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible. 1675 * 1676 * @return true if disconnected but resumption possible. 1677 */ 1678 public boolean isDisconnectedButSmResumptionPossible() { 1679 return disconnectedButResumeable && isSmResumptionPossible(); 1680 } 1681 1682 /** 1683 * Returns true if the stream is resumable. 1684 * 1685 * @return true if the stream is resumable. 1686 */ 1687 public boolean isSmResumptionPossible() { 1688 // There is no resumable stream available 1689 if (smSessionId == null) 1690 return false; 1691 1692 final Long shutdownTimestamp = packetWriter.shutdownTimestamp; 1693 // Seems like we are already reconnected, report true 1694 if (shutdownTimestamp == null) { 1695 return true; 1696 } 1697 1698 // See if resumption time is over 1699 long current = System.currentTimeMillis(); 1700 long maxResumptionMillies = ((long) getMaxSmResumptionTime()) * 1000; 1701 if (current > shutdownTimestamp + maxResumptionMillies) { 1702 // Stream resumption is *not* possible if the current timestamp is greater then the greatest timestamp where 1703 // resumption is possible 1704 return false; 1705 } else { 1706 return true; 1707 } 1708 } 1709 1710 /** 1711 * Get the maximum resumption time in seconds after which a managed stream can be resumed. 1712 * <p> 1713 * This method will return {@link Integer#MAX_VALUE} if neither the client nor the server specify a maximum 1714 * resumption time. Be aware of integer overflows when using this value, e.g. do not add arbitrary values to it 1715 * without checking for overflows before. 1716 * </p> 1717 * 1718 * @return the maximum resumption time in seconds or {@link Integer#MAX_VALUE} if none set. 1719 */ 1720 public int getMaxSmResumptionTime() { 1721 int clientResumptionTime = smClientMaxResumptionTime > 0 ? smClientMaxResumptionTime : Integer.MAX_VALUE; 1722 int serverResumptionTime = smServerMaxResumptimTime > 0 ? smServerMaxResumptimTime : Integer.MAX_VALUE; 1723 return Math.min(clientResumptionTime, serverResumptionTime); 1724 } 1725 1726 private void processHandledCount(long handledCount) throws NotConnectedException, StreamManagementCounterError { 1727 long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount); 1728 final List<Stanza> ackedStanzas = new ArrayList<Stanza>( 1729 handledCount <= Integer.MAX_VALUE ? (int) handledCount 1730 : Integer.MAX_VALUE); 1731 for (long i = 0; i < ackedStanzasCount; i++) { 1732 Stanza ackedStanza = unacknowledgedStanzas.poll(); 1733 // If the server ack'ed a stanza, then it must be in the 1734 // unacknowledged stanza queue. There can be no exception. 1735 if (ackedStanza == null) { 1736 throw new StreamManagementCounterError(handledCount, serverHandledStanzasCount, 1737 ackedStanzasCount, ackedStanzas); 1738 } 1739 ackedStanzas.add(ackedStanza); 1740 } 1741 1742 boolean atLeastOneStanzaAcknowledgedListener = false; 1743 if (!stanzaAcknowledgedListeners.isEmpty()) { 1744 // If stanzaAcknowledgedListeners is not empty, the we have at least one 1745 atLeastOneStanzaAcknowledgedListener = true; 1746 } 1747 else { 1748 // Otherwise we look for a matching id in the stanza *id* acknowledged listeners 1749 for (Stanza ackedStanza : ackedStanzas) { 1750 String id = ackedStanza.getStanzaId(); 1751 if (id != null && stanzaIdAcknowledgedListeners.containsKey(id)) { 1752 atLeastOneStanzaAcknowledgedListener = true; 1753 break; 1754 } 1755 } 1756 } 1757 1758 // Only spawn a new thread if there is a chance that some listener is invoked 1759 if (atLeastOneStanzaAcknowledgedListener) { 1760 asyncGo(new Runnable() { 1761 @Override 1762 public void run() { 1763 for (Stanza ackedStanza : ackedStanzas) { 1764 for (StanzaListener listener : stanzaAcknowledgedListeners) { 1765 try { 1766 listener.processPacket(ackedStanza); 1767 } 1768 catch (NotConnectedException e) { 1769 LOGGER.log(Level.FINER, "Received not connected exception", e); 1770 } 1771 } 1772 String id = ackedStanza.getStanzaId(); 1773 if (StringUtils.isNullOrEmpty(id)) { 1774 continue; 1775 } 1776 StanzaListener listener = stanzaIdAcknowledgedListeners.remove(id); 1777 if (listener != null) { 1778 try { 1779 listener.processPacket(ackedStanza); 1780 } 1781 catch (NotConnectedException e) { 1782 LOGGER.log(Level.FINER, "Received not connected exception", e); 1783 } 1784 } 1785 } 1786 } 1787 }); 1788 } 1789 1790 serverHandledStanzasCount = handledCount; 1791 } 1792 1793 /** 1794 * Set the default bundle and defer callback used for new connections. 1795 * 1796 * @param defaultBundleAndDeferCallback 1797 * @see BundleAndDeferCallback 1798 * @since 4.1 1799 */ 1800 public static void setDefaultBundleAndDeferCallback(BundleAndDeferCallback defaultBundleAndDeferCallback) { 1801 XMPPTCPConnection.defaultBundleAndDeferCallback = defaultBundleAndDeferCallback; 1802 } 1803 1804 /** 1805 * Set the bundle and defer callback used for this connection. 1806 * <p> 1807 * You can use <code>null</code> as argument to reset the callback. Outgoing stanzas will then 1808 * no longer get deferred. 1809 * </p> 1810 * 1811 * @param bundleAndDeferCallback the callback or <code>null</code>. 1812 * @see BundleAndDeferCallback 1813 * @since 4.1 1814 */ 1815 public void setBundleandDeferCallback(BundleAndDeferCallback bundleAndDeferCallback) { 1816 this.bundleAndDeferCallback = bundleAndDeferCallback; 1817 } 1818 1819}