001/** 002 * 003 * Copyright 2003-2007 Jive Software. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack.tcp; 018 019import org.jivesoftware.smack.AbstractConnectionListener; 020import org.jivesoftware.smack.AbstractXMPPConnection; 021import org.jivesoftware.smack.ConnectionConfiguration; 022import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode; 023import org.jivesoftware.smack.ConnectionCreationListener; 024import org.jivesoftware.smack.StanzaListener; 025import org.jivesoftware.smack.SmackConfiguration; 026import org.jivesoftware.smack.SmackException; 027import org.jivesoftware.smack.SmackException.AlreadyConnectedException; 028import org.jivesoftware.smack.SmackException.AlreadyLoggedInException; 029import org.jivesoftware.smack.SmackException.NoResponseException; 030import org.jivesoftware.smack.SmackException.NotConnectedException; 031import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException; 032import org.jivesoftware.smack.SmackException.ConnectionException; 033import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException; 034import org.jivesoftware.smack.SynchronizationPoint; 035import org.jivesoftware.smack.XMPPException.StreamErrorException; 036import org.jivesoftware.smack.XMPPConnection; 037import org.jivesoftware.smack.XMPPException; 038import org.jivesoftware.smack.XMPPException.XMPPErrorException; 039import org.jivesoftware.smack.compress.packet.Compressed; 040import org.jivesoftware.smack.compression.XMPPInputOutputStream; 041import org.jivesoftware.smack.filter.StanzaFilter; 042import org.jivesoftware.smack.compress.packet.Compress; 043import org.jivesoftware.smack.packet.Element; 044import org.jivesoftware.smack.packet.IQ; 045import org.jivesoftware.smack.packet.Message; 046import org.jivesoftware.smack.packet.StreamOpen; 047import org.jivesoftware.smack.packet.Stanza; 048import org.jivesoftware.smack.packet.Presence; 049import org.jivesoftware.smack.packet.StartTls; 050import org.jivesoftware.smack.sasl.packet.SaslStreamElements; 051import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Challenge; 052import org.jivesoftware.smack.sasl.packet.SaslStreamElements.SASLFailure; 053import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Success; 054import org.jivesoftware.smack.sm.SMUtils; 055import org.jivesoftware.smack.sm.StreamManagementException; 056import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException; 057import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError; 058import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException; 059import org.jivesoftware.smack.sm.packet.StreamManagement; 060import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer; 061import org.jivesoftware.smack.sm.packet.StreamManagement.AckRequest; 062import org.jivesoftware.smack.sm.packet.StreamManagement.Enable; 063import org.jivesoftware.smack.sm.packet.StreamManagement.Enabled; 064import org.jivesoftware.smack.sm.packet.StreamManagement.Failed; 065import org.jivesoftware.smack.sm.packet.StreamManagement.Resume; 066import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed; 067import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature; 068import org.jivesoftware.smack.sm.predicates.Predicate; 069import org.jivesoftware.smack.sm.provider.ParseStreamManagement; 070import org.jivesoftware.smack.packet.PlainStreamElement; 071import org.jivesoftware.smack.packet.XMPPError; 072import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown; 073import org.jivesoftware.smack.util.Async; 074import org.jivesoftware.smack.util.PacketParserUtils; 075import org.jivesoftware.smack.util.StringUtils; 076import org.jivesoftware.smack.util.TLSUtils; 077import org.jivesoftware.smack.util.dns.HostAddress; 078import org.jxmpp.util.XmppStringUtils; 079import org.xmlpull.v1.XmlPullParser; 080import org.xmlpull.v1.XmlPullParserException; 081 082import javax.net.SocketFactory; 083import javax.net.ssl.HostnameVerifier; 084import javax.net.ssl.KeyManager; 085import javax.net.ssl.KeyManagerFactory; 086import javax.net.ssl.SSLContext; 087import javax.net.ssl.SSLSocket; 088import javax.security.auth.callback.Callback; 089import javax.security.auth.callback.PasswordCallback; 090 091import java.io.BufferedReader; 092import java.io.ByteArrayInputStream; 093import java.io.FileInputStream; 094import java.io.IOException; 095import java.io.InputStream; 096import java.io.InputStreamReader; 097import java.io.OutputStream; 098import java.io.OutputStreamWriter; 099import java.io.Writer; 100import java.lang.reflect.Constructor; 101import java.net.InetAddress; 102import java.net.InetSocketAddress; 103import java.net.Socket; 104import java.net.UnknownHostException; 105import java.security.KeyManagementException; 106import java.security.KeyStore; 107import java.security.KeyStoreException; 108import java.security.NoSuchAlgorithmException; 109import java.security.NoSuchProviderException; 110import java.security.Provider; 111import java.security.Security; 112import java.security.UnrecoverableKeyException; 113import java.security.cert.CertificateException; 114import java.util.ArrayList; 115import java.util.Arrays; 116import java.util.Collection; 117import java.util.Iterator; 118import java.util.LinkedHashSet; 119import java.util.LinkedList; 120import java.util.List; 121import java.util.Map; 122import java.util.Set; 123import java.util.concurrent.ArrayBlockingQueue; 124import java.util.concurrent.BlockingQueue; 125import java.util.concurrent.ConcurrentHashMap; 126import java.util.concurrent.ConcurrentLinkedQueue; 127import java.util.concurrent.TimeUnit; 128import java.util.concurrent.atomic.AtomicBoolean; 129import java.util.logging.Level; 130import java.util.logging.Logger; 131 132/** 133 * Creates a socket connection to an XMPP server. This is the default connection 134 * to an XMPP server and is specified in the XMPP Core (RFC 6120). 135 * 136 * @see XMPPConnection 137 * @author Matt Tucker 138 */ 139public class XMPPTCPConnection extends AbstractXMPPConnection { 140 141 private static final int QUEUE_SIZE = 500; 142 private static final Logger LOGGER = Logger.getLogger(XMPPTCPConnection.class.getName()); 143 144 /** 145 * The socket which is used for this connection. 146 */ 147 private Socket socket; 148 149 /** 150 * 151 */ 152 private boolean disconnectedButResumeable = false; 153 154 /** 155 * Flag to indicate if the socket was closed intentionally by Smack. 156 * <p> 157 * This boolean flag is used concurrently, therefore it is marked volatile. 158 * </p> 159 */ 160 private volatile boolean socketClosed = false; 161 162 private boolean usingTLS = false; 163 164 /** 165 * Protected access level because of unit test purposes 166 */ 167 protected PacketWriter packetWriter; 168 169 /** 170 * Protected access level because of unit test purposes 171 */ 172 protected PacketReader packetReader; 173 174 private final SynchronizationPoint<Exception> initalOpenStreamSend = new SynchronizationPoint<Exception>(this); 175 176 /** 177 * 178 */ 179 private final SynchronizationPoint<XMPPException> maybeCompressFeaturesReceived = new SynchronizationPoint<XMPPException>( 180 this); 181 182 /** 183 * 184 */ 185 private final SynchronizationPoint<XMPPException> compressSyncPoint = new SynchronizationPoint<XMPPException>( 186 this); 187 188 /** 189 * The default bundle and defer callback, used for new connections. 190 * @see bundleAndDeferCallback 191 */ 192 private static BundleAndDeferCallback defaultBundleAndDeferCallback; 193 194 /** 195 * The used bundle and defer callback. 196 * <p> 197 * Although this field may be set concurrently, the 'volatile' keyword was deliberately not added, in order to avoid 198 * having a 'volatile' read within the writer threads loop. 199 * </p> 200 */ 201 private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback; 202 203 private static boolean useSmDefault = false; 204 205 private static boolean useSmResumptionDefault = true; 206 207 /** 208 * The stream ID of the stream that is currently resumable, ie. the stream we hold the state 209 * for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and 210 * {@link #unacknowledgedStanzas}. 211 */ 212 private String smSessionId; 213 214 private final SynchronizationPoint<XMPPException> smResumedSyncPoint = new SynchronizationPoint<XMPPException>( 215 this); 216 217 private final SynchronizationPoint<XMPPException> smEnabledSyncPoint = new SynchronizationPoint<XMPPException>( 218 this); 219 220 /** 221 * The client's preferred maximum resumption time in seconds. 222 */ 223 private int smClientMaxResumptionTime = -1; 224 225 /** 226 * The server's preferred maximum resumption time in seconds. 227 */ 228 private int smServerMaxResumptimTime = -1; 229 230 /** 231 * Indicates whether Stream Management (XEP-198) should be used if it's supported by the server. 232 */ 233 private boolean useSm = useSmDefault; 234 private boolean useSmResumption = useSmResumptionDefault; 235 236 /** 237 * The counter that the server sends the client about it's current height. For example, if the server sends 238 * {@code <a h='42'/>}, then this will be set to 42 (while also handling the {@link #unacknowledgedStanzas} queue). 239 */ 240 private long serverHandledStanzasCount = 0; 241 242 /** 243 * The counter for stanzas handled ("received") by the client. 244 * <p> 245 * Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are 246 * not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since 247 * {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and 248 * therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic. 249 * </p> 250 */ 251 private long clientHandledStanzasCount = 0; 252 253 private BlockingQueue<Stanza> unacknowledgedStanzas; 254 255 /** 256 * Set to true if Stream Management was at least once enabled for this connection. 257 */ 258 private boolean smWasEnabledAtLeastOnce = false; 259 260 /** 261 * This listeners are invoked for every stanza that got acknowledged. 262 * <p> 263 * We use a {@link ConccurrentLinkedQueue} here in order to allow the listeners to remove 264 * themselves after they have been invoked. 265 * </p> 266 */ 267 private final Collection<StanzaListener> stanzaAcknowledgedListeners = new ConcurrentLinkedQueue<StanzaListener>(); 268 269 /** 270 * This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will 271 * only be invoked once and automatically removed after that. 272 */ 273 private final Map<String, StanzaListener> stanzaIdAcknowledgedListeners = new ConcurrentHashMap<String, StanzaListener>(); 274 275 /** 276 * Predicates that determine if an stream management ack should be requested from the server. 277 * <p> 278 * We use a linked hash set here, so that the order how the predicates are added matches the 279 * order in which they are invoked in order to determine if an ack request should be send or not. 280 * </p> 281 */ 282 private final Set<StanzaFilter> requestAckPredicates = new LinkedHashSet<StanzaFilter>(); 283 284 private final XMPPTCPConnectionConfiguration config; 285 286 /** 287 * Creates a new XMPP connection over TCP (optionally using proxies). 288 * <p> 289 * Note that XMPPTCPConnection constructors do not establish a connection to the server 290 * and you must call {@link #connect()}. 291 * </p> 292 * 293 * @param config the connection configuration. 294 */ 295 public XMPPTCPConnection(XMPPTCPConnectionConfiguration config) { 296 super(config); 297 this.config = config; 298 addConnectionListener(new AbstractConnectionListener() { 299 @Override 300 public void connectionClosedOnError(Exception e) { 301 if (e instanceof XMPPException.StreamErrorException) { 302 dropSmState(); 303 } 304 } 305 }); 306 } 307 308 /** 309 * Creates a new XMPP connection over TCP. 310 * <p> 311 * Note that {@code jid} must be the bare JID, e.g. "user@example.org". More fine-grained control over the 312 * connection settings is available using the {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} 313 * constructor. 314 * </p> 315 * 316 * @param jid the bare JID used by the client. 317 * @param password the password or authentication token. 318 */ 319 public XMPPTCPConnection(CharSequence jid, String password) { 320 this(XmppStringUtils.parseLocalpart(jid.toString()), password, XmppStringUtils.parseDomain(jid.toString())); 321 } 322 323 /** 324 * Creates a new XMPP connection over TCP. 325 * <p> 326 * This is the simplest constructor for connecting to an XMPP server. Alternatively, 327 * you can get fine-grained control over connection settings using the 328 * {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} constructor. 329 * </p> 330 * @param username 331 * @param password 332 * @param serviceName 333 */ 334 public XMPPTCPConnection(CharSequence username, String password, String serviceName) { 335 this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setServiceName( 336 serviceName).build()); 337 } 338 339 @Override 340 protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException { 341 if (packetWriter == null) { 342 throw new NotConnectedException(); 343 } 344 packetWriter.throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 345 } 346 347 @Override 348 protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException { 349 if (isConnected() && !disconnectedButResumeable) { 350 throw new AlreadyConnectedException(); 351 } 352 } 353 354 @Override 355 protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException { 356 if (isAuthenticated() && !disconnectedButResumeable) { 357 throw new AlreadyLoggedInException(); 358 } 359 } 360 361 @Override 362 protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException { 363 // Reset the flag in case it was set 364 disconnectedButResumeable = false; 365 super.afterSuccessfulLogin(resumed); 366 } 367 368 @Override 369 protected synchronized void loginNonAnonymously(String username, String password, String resource) throws XMPPException, SmackException, IOException { 370 if (saslAuthentication.hasNonAnonymousAuthentication()) { 371 // Authenticate using SASL 372 if (password != null) { 373 saslAuthentication.authenticate(username, password, resource); 374 } 375 else { 376 saslAuthentication.authenticate(resource, config.getCallbackHandler()); 377 } 378 } else { 379 throw new SmackException("No non-anonymous SASL authentication mechanism available"); 380 } 381 382 // If compression is enabled then request the server to use stream compression. XEP-170 383 // recommends to perform stream compression before resource binding. 384 if (config.isCompressionEnabled()) { 385 useCompression(); 386 } 387 388 if (isSmResumptionPossible()) { 389 smResumedSyncPoint.sendAndWaitForResponse(new Resume(clientHandledStanzasCount, smSessionId)); 390 if (smResumedSyncPoint.wasSuccessful()) { 391 // We successfully resumed the stream, be done here 392 afterSuccessfulLogin(true); 393 return; 394 } 395 // SM resumption failed, what Smack does here is to report success of 396 // lastFeaturesReceived in case of sm resumption was answered with 'failed' so that 397 // normal resource binding can be tried. 398 LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process"); 399 } 400 401 List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>(); 402 if (unacknowledgedStanzas != null) { 403 // There was a previous connection with SM enabled but that was either not resumable or 404 // failed to resume. Make sure that we (re-)send the unacknowledged stanzas. 405 unacknowledgedStanzas.drainTo(previouslyUnackedStanzas); 406 // Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this 407 // XMPP session (There maybe was an enabled in a previous XMPP session of this 408 // connection instance though). This is used in writePackets to decide if stanzas should 409 // be added to the unacknowledged stanzas queue, because they have to be added right 410 // after the 'enable' stream element has been sent. 411 dropSmState(); 412 } 413 414 // Now bind the resource. It is important to do this *after* we dropped an eventually 415 // existing Stream Management state. As otherwise <bind/> and <session/> may end up in 416 // unacknowledgedStanzas and become duplicated on reconnect. See SMACK-706. 417 bindResourceAndEstablishSession(resource); 418 419 if (isSmAvailable() && useSm) { 420 // Remove what is maybe left from previously stream managed sessions 421 serverHandledStanzasCount = 0; 422 // XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed' 423 // then this is a non recoverable error and we therefore throw an exception. 424 smEnabledSyncPoint.sendAndWaitForResponseOrThrow(new Enable(useSmResumption, smClientMaxResumptionTime)); 425 synchronized (requestAckPredicates) { 426 if (requestAckPredicates.isEmpty()) { 427 // Assure that we have at lest one predicate set up that so that we request acks 428 // for the server and eventually flush some stanzas from the unacknowledged 429 // stanza queue 430 requestAckPredicates.add(Predicate.forMessagesOrAfter5Stanzas()); 431 } 432 } 433 } 434 // (Re-)send the stanzas *after* we tried to enable SM 435 for (Stanza stanza : previouslyUnackedStanzas) { 436 sendStanzaInternal(stanza); 437 } 438 439 afterSuccessfulLogin(false); 440 } 441 442 @Override 443 public synchronized void loginAnonymously() throws XMPPException, SmackException, IOException { 444 // Wait with SASL auth until the SASL mechanisms have been received 445 saslFeatureReceived.checkIfSuccessOrWaitOrThrow(); 446 447 if (saslAuthentication.hasAnonymousAuthentication()) { 448 saslAuthentication.authenticateAnonymously(); 449 } 450 else { 451 throw new SmackException("No anonymous SASL authentication mechanism available"); 452 } 453 454 // If compression is enabled then request the server to use stream compression 455 if (config.isCompressionEnabled()) { 456 useCompression(); 457 } 458 459 bindResourceAndEstablishSession(null); 460 461 afterSuccessfulLogin(false); 462 } 463 464 @Override 465 public boolean isSecureConnection() { 466 return usingTLS; 467 } 468 469 public boolean isSocketClosed() { 470 return socketClosed; 471 } 472 473 /** 474 * Shuts the current connection down. After this method returns, the connection must be ready 475 * for re-use by connect. 476 */ 477 @Override 478 protected void shutdown() { 479 if (isSmEnabled()) { 480 try { 481 // Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM 482 // state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either. 483 sendSmAcknowledgementInternal(); 484 } catch (NotConnectedException e) { 485 LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e); 486 } 487 } 488 shutdown(false); 489 } 490 491 /** 492 * Performs an unclean disconnect and shutdown of the connection. Does not send a closing stream stanza. 493 */ 494 public synchronized void instantShutdown() { 495 shutdown(true); 496 } 497 498 private void shutdown(boolean instant) { 499 if (disconnectedButResumeable) { 500 return; 501 } 502 if (packetReader != null) { 503 packetReader.shutdown(); 504 } 505 if (packetWriter != null) { 506 packetWriter.shutdown(instant); 507 } 508 509 // Set socketClosed to true. This will cause the PacketReader 510 // and PacketWriter to ignore any Exceptions that are thrown 511 // because of a read/write from/to a closed stream. 512 // It is *important* that this is done before socket.close()! 513 socketClosed = true; 514 try { 515 socket.close(); 516 } catch (Exception e) { 517 LOGGER.log(Level.WARNING, "shutdown", e); 518 } 519 520 setWasAuthenticated(); 521 // If we are able to resume the stream, then don't set 522 // connected/authenticated/usingTLS to false since we like behave like we are still 523 // connected (e.g. sendStanza should not throw a NotConnectedException). 524 if (isSmResumptionPossible() && instant) { 525 disconnectedButResumeable = true; 526 } else { 527 disconnectedButResumeable = false; 528 // Reset the stream management session id to null, since if the stream is cleanly closed, i.e. sending a closing 529 // stream tag, there is no longer a stream to resume. 530 smSessionId = null; 531 } 532 authenticated = false; 533 connected = false; 534 usingTLS = false; 535 reader = null; 536 writer = null; 537 538 maybeCompressFeaturesReceived.init(); 539 compressSyncPoint.init(); 540 smResumedSyncPoint.init(); 541 smEnabledSyncPoint.init(); 542 initalOpenStreamSend.init(); 543 } 544 545 @Override 546 public void send(PlainStreamElement element) throws NotConnectedException { 547 packetWriter.sendStreamElement(element); 548 } 549 550 @Override 551 protected void sendStanzaInternal(Stanza packet) throws NotConnectedException { 552 packetWriter.sendStreamElement(packet); 553 if (isSmEnabled()) { 554 for (StanzaFilter requestAckPredicate : requestAckPredicates) { 555 if (requestAckPredicate.accept(packet)) { 556 requestSmAcknowledgementInternal(); 557 break; 558 } 559 } 560 } 561 } 562 563 private void connectUsingConfiguration() throws IOException, ConnectionException { 564 List<HostAddress> failedAddresses = populateHostAddresses(); 565 SocketFactory socketFactory = config.getSocketFactory(); 566 if (socketFactory == null) { 567 socketFactory = SocketFactory.getDefault(); 568 } 569 for (HostAddress hostAddress : hostAddresses) { 570 String host = hostAddress.getFQDN(); 571 int port = hostAddress.getPort(); 572 socket = socketFactory.createSocket(); 573 try { 574 Iterator<InetAddress> inetAddresses = Arrays.asList(InetAddress.getAllByName(host)).iterator(); 575 if (!inetAddresses.hasNext()) { 576 // This should not happen 577 LOGGER.warning("InetAddress.getAllByName() returned empty result array."); 578 throw new UnknownHostException(host); 579 } 580 innerloop: while (inetAddresses.hasNext()) { 581 // Create a *new* Socket before every connection attempt, i.e. connect() call, since Sockets are not 582 // re-usable after a failed connection attempt. See also SMACK-724. 583 socket = socketFactory.createSocket(); 584 585 final InetAddress inetAddress = inetAddresses.next(); 586 final String inetAddressAndPort = inetAddress + " at port " + port; 587 LOGGER.finer("Trying to establish TCP connection to " + inetAddressAndPort); 588 try { 589 socket.connect(new InetSocketAddress(inetAddress, port), config.getConnectTimeout()); 590 } catch (Exception e) { 591 if (inetAddresses.hasNext()) { 592 continue innerloop; 593 } else { 594 throw e; 595 } 596 } 597 LOGGER.finer("Established TCP connection to " + inetAddressAndPort); 598 // We found a host to connect to, return here 599 this.host = host; 600 this.port = port; 601 return; 602 } 603 } 604 catch (Exception e) { 605 hostAddress.setException(e); 606 failedAddresses.add(hostAddress); 607 } 608 } 609 // There are no more host addresses to try 610 // throw an exception and report all tried 611 // HostAddresses in the exception 612 throw ConnectionException.from(failedAddresses); 613 } 614 615 /** 616 * Initializes the connection by creating a stanza(/packet) reader and writer and opening a 617 * XMPP stream to the server. 618 * 619 * @throws XMPPException if establishing a connection to the server fails. 620 * @throws SmackException if the server failes to respond back or if there is anther error. 621 * @throws IOException 622 */ 623 private void initConnection() throws IOException { 624 boolean isFirstInitialization = packetReader == null || packetWriter == null; 625 compressionHandler = null; 626 627 // Set the reader and writer instance variables 628 initReaderAndWriter(); 629 630 if (isFirstInitialization) { 631 packetWriter = new PacketWriter(); 632 packetReader = new PacketReader(); 633 634 // If debugging is enabled, we should start the thread that will listen for 635 // all packets and then log them. 636 if (config.isDebuggerEnabled()) { 637 addAsyncStanzaListener(debugger.getReaderListener(), null); 638 if (debugger.getWriterListener() != null) { 639 addPacketSendingListener(debugger.getWriterListener(), null); 640 } 641 } 642 } 643 // Start the packet writer. This will open an XMPP stream to the server 644 packetWriter.init(); 645 // Start the packet reader. The startup() method will block until we 646 // get an opening stream packet back from server 647 packetReader.init(); 648 649 if (isFirstInitialization) { 650 // Notify listeners that a new connection has been established 651 for (ConnectionCreationListener listener : getConnectionCreationListeners()) { 652 listener.connectionCreated(this); 653 } 654 } 655 } 656 657 private void initReaderAndWriter() throws IOException { 658 InputStream is = socket.getInputStream(); 659 OutputStream os = socket.getOutputStream(); 660 if (compressionHandler != null) { 661 is = compressionHandler.getInputStream(is); 662 os = compressionHandler.getOutputStream(os); 663 } 664 // OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter 665 writer = new OutputStreamWriter(os, "UTF-8"); 666 reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); 667 668 // If debugging is enabled, we open a window and write out all network traffic. 669 initDebugger(); 670 } 671 672 /** 673 * The server has indicated that TLS negotiation can start. We now need to secure the 674 * existing plain connection and perform a handshake. This method won't return until the 675 * connection has finished the handshake or an error occurred while securing the connection. 676 * @throws IOException 677 * @throws CertificateException 678 * @throws NoSuchAlgorithmException 679 * @throws NoSuchProviderException 680 * @throws KeyStoreException 681 * @throws UnrecoverableKeyException 682 * @throws KeyManagementException 683 * @throws SmackException 684 * @throws Exception if an exception occurs. 685 */ 686 private void proceedTLSReceived() throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException, NoSuchProviderException, UnrecoverableKeyException, KeyManagementException, SmackException { 687 SSLContext context = this.config.getCustomSSLContext(); 688 KeyStore ks = null; 689 KeyManager[] kms = null; 690 PasswordCallback pcb = null; 691 692 if(config.getCallbackHandler() == null) { 693 ks = null; 694 } else if (context == null) { 695 if(config.getKeystoreType().equals("NONE")) { 696 ks = null; 697 pcb = null; 698 } 699 else if(config.getKeystoreType().equals("PKCS11")) { 700 try { 701 Constructor<?> c = Class.forName("sun.security.pkcs11.SunPKCS11").getConstructor(InputStream.class); 702 String pkcs11Config = "name = SmartCard\nlibrary = "+config.getPKCS11Library(); 703 ByteArrayInputStream config = new ByteArrayInputStream(pkcs11Config.getBytes()); 704 Provider p = (Provider)c.newInstance(config); 705 Security.addProvider(p); 706 ks = KeyStore.getInstance("PKCS11",p); 707 pcb = new PasswordCallback("PKCS11 Password: ",false); 708 this.config.getCallbackHandler().handle(new Callback[]{pcb}); 709 ks.load(null,pcb.getPassword()); 710 } 711 catch (Exception e) { 712 ks = null; 713 pcb = null; 714 } 715 } 716 else if(config.getKeystoreType().equals("Apple")) { 717 ks = KeyStore.getInstance("KeychainStore","Apple"); 718 ks.load(null,null); 719 //pcb = new PasswordCallback("Apple Keychain",false); 720 //pcb.setPassword(null); 721 } 722 else { 723 ks = KeyStore.getInstance(config.getKeystoreType()); 724 try { 725 pcb = new PasswordCallback("Keystore Password: ",false); 726 config.getCallbackHandler().handle(new Callback[]{pcb}); 727 ks.load(new FileInputStream(config.getKeystorePath()), pcb.getPassword()); 728 } 729 catch(Exception e) { 730 ks = null; 731 pcb = null; 732 } 733 } 734 KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509"); 735 try { 736 if(pcb == null) { 737 kmf.init(ks,null); 738 } else { 739 kmf.init(ks,pcb.getPassword()); 740 pcb.clearPassword(); 741 } 742 kms = kmf.getKeyManagers(); 743 } catch (NullPointerException npe) { 744 kms = null; 745 } 746 } 747 748 // If the user didn't specify a SSLContext, use the default one 749 if (context == null) { 750 context = SSLContext.getInstance("TLS"); 751 context.init(kms, null, new java.security.SecureRandom()); 752 } 753 Socket plain = socket; 754 // Secure the plain connection 755 socket = context.getSocketFactory().createSocket(plain, 756 host, plain.getPort(), true); 757 758 final SSLSocket sslSocket = (SSLSocket) socket; 759 // Immediately set the enabled SSL protocols and ciphers. See SMACK-712 why this is 760 // important (at least on certain platforms) and it seems to be a good idea anyways to 761 // prevent an accidental implicit handshake. 762 TLSUtils.setEnabledProtocolsAndCiphers(sslSocket, config.getEnabledSSLProtocols(), config.getEnabledSSLCiphers()); 763 764 // Initialize the reader and writer with the new secured version 765 initReaderAndWriter(); 766 767 // Proceed to do the handshake 768 sslSocket.startHandshake(); 769 770 final HostnameVerifier verifier = getConfiguration().getHostnameVerifier(); 771 if (verifier == null) { 772 throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure."); 773 } else if (!verifier.verify(getServiceName(), sslSocket.getSession())) { 774 throw new CertificateException("Hostname verification of certificate failed. Certificate does not authenticate " + getServiceName()); 775 } 776 777 // Set that TLS was successful 778 usingTLS = true; 779 } 780 781 /** 782 * Returns the compression handler that can be used for one compression methods offered by the server. 783 * 784 * @return a instance of XMPPInputOutputStream or null if no suitable instance was found 785 * 786 */ 787 private XMPPInputOutputStream maybeGetCompressionHandler() { 788 Compress.Feature compression = getFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE); 789 if (compression == null) { 790 // Server does not support compression 791 return null; 792 } 793 for (XMPPInputOutputStream handler : SmackConfiguration.getCompresionHandlers()) { 794 String method = handler.getCompressionMethod(); 795 if (compression.getMethods().contains(method)) 796 return handler; 797 } 798 return null; 799 } 800 801 @Override 802 public boolean isUsingCompression() { 803 return compressionHandler != null && compressSyncPoint.wasSuccessful(); 804 } 805 806 /** 807 * <p> 808 * Starts using stream compression that will compress network traffic. Traffic can be 809 * reduced up to 90%. Therefore, stream compression is ideal when using a slow speed network 810 * connection. However, the server and the client will need to use more CPU time in order to 811 * un/compress network data so under high load the server performance might be affected. 812 * </p> 813 * <p> 814 * Stream compression has to have been previously offered by the server. Currently only the 815 * zlib method is supported by the client. Stream compression negotiation has to be done 816 * before authentication took place. 817 * </p> 818 * 819 * @throws NotConnectedException 820 * @throws XMPPException 821 * @throws NoResponseException 822 */ 823 private void useCompression() throws NotConnectedException, NoResponseException, XMPPException { 824 maybeCompressFeaturesReceived.checkIfSuccessOrWait(); 825 // If stream compression was offered by the server and we want to use 826 // compression then send compression request to the server 827 if ((compressionHandler = maybeGetCompressionHandler()) != null) { 828 compressSyncPoint.sendAndWaitForResponseOrThrow(new Compress(compressionHandler.getCompressionMethod())); 829 } else { 830 LOGGER.warning("Could not enable compression because no matching handler/method pair was found"); 831 } 832 } 833 834 /** 835 * Establishes a connection to the XMPP server and performs an automatic login 836 * only if the previous connection state was logged (authenticated). It basically 837 * creates and maintains a socket connection to the server.<p> 838 * <p/> 839 * Listeners will be preserved from a previous connection if the reconnection 840 * occurs after an abrupt termination. 841 * 842 * @throws XMPPException if an error occurs while trying to establish the connection. 843 * @throws SmackException 844 * @throws IOException 845 */ 846 @Override 847 protected void connectInternal() throws SmackException, IOException, XMPPException { 848 // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if 849 // there is an error establishing the connection 850 connectUsingConfiguration(); 851 852 // We connected successfully to the servers TCP port 853 socketClosed = false; 854 initConnection(); 855 856 // Wait with SASL auth until the SASL mechanisms have been received 857 saslFeatureReceived.checkIfSuccessOrWaitOrThrow(); 858 859 // If TLS is required but the server doesn't offer it, disconnect 860 // from the server and throw an error. First check if we've already negotiated TLS 861 // and are secure, however (features get parsed a second time after TLS is established). 862 if (!isSecureConnection() && getConfiguration().getSecurityMode() == SecurityMode.required) { 863 shutdown(); 864 throw new SecurityRequiredByClientException(); 865 } 866 867 // Make note of the fact that we're now connected. 868 connected = true; 869 callConnectionConnectedListener(); 870 871 // Automatically makes the login if the user was previously connected successfully 872 // to the server and the connection was terminated abruptly 873 if (wasAuthenticated) { 874 login(); 875 notifyReconnection(); 876 } 877 } 878 879 /** 880 * Sends out a notification that there was an error with the connection 881 * and closes the connection. Also prints the stack trace of the given exception 882 * 883 * @param e the exception that causes the connection close event. 884 */ 885 private synchronized void notifyConnectionError(Exception e) { 886 // Listeners were already notified of the exception, return right here. 887 if ((packetReader == null || packetReader.done) && 888 (packetWriter == null || packetWriter.done())) return; 889 890 // Closes the connection temporary. A reconnection is possible 891 instantShutdown(); 892 893 // Notify connection listeners of the error. 894 callConnectionClosedOnErrorListener(e); 895 } 896 897 /** 898 * For unit testing purposes 899 * 900 * @param writer 901 */ 902 protected void setWriter(Writer writer) { 903 this.writer = writer; 904 } 905 906 @Override 907 protected void afterFeaturesReceived() throws NotConnectedException { 908 StartTls startTlsFeature = getFeature(StartTls.ELEMENT, StartTls.NAMESPACE); 909 if (startTlsFeature != null) { 910 if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) { 911 notifyConnectionError(new SecurityRequiredByServerException()); 912 return; 913 } 914 915 if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) { 916 send(new StartTls()); 917 } 918 } 919 920 if (getSASLAuthentication().authenticationSuccessful()) { 921 // If we have received features after the SASL has been successfully completed, then we 922 // have also *maybe* received, as it is an optional feature, the compression feature 923 // from the server. 924 maybeCompressFeaturesReceived.reportSuccess(); 925 } 926 } 927 928 /** 929 * Resets the parser using the latest connection's reader. Reseting the parser is necessary 930 * when the plain connection has been secured or when a new opening stream element is going 931 * to be sent by the server. 932 * 933 * @throws SmackException if the parser could not be reset. 934 */ 935 void openStream() throws SmackException { 936 // If possible, provide the receiving entity of the stream open tag, i.e. the server, as much information as 937 // possible. The 'to' attribute is *always* available. The 'from' attribute if set by the user and no external 938 // mechanism is used to determine the local entity (user). And the 'id' attribute is available after the first 939 // response from the server (see e.g. RFC 6120 ยง 9.1.1 Step 2.) 940 CharSequence to = getServiceName(); 941 CharSequence from = null; 942 CharSequence localpart = config.getUsername(); 943 if (localpart != null) { 944 from = XmppStringUtils.completeJidFrom(localpart, to); 945 } 946 String id = getStreamId(); 947 send(new StreamOpen(to, from, id)); 948 try { 949 packetReader.parser = PacketParserUtils.newXmppParser(reader); 950 } 951 catch (XmlPullParserException e) { 952 throw new SmackException(e); 953 } 954 } 955 956 protected class PacketReader { 957 958 XmlPullParser parser; 959 960 private volatile boolean done; 961 962 /** 963 * Initializes the reader in order to be used. The reader is initialized during the 964 * first connection and when reconnecting due to an abruptly disconnection. 965 */ 966 void init() { 967 done = false; 968 969 Async.go(new Runnable() { 970 public void run() { 971 parsePackets(); 972 } 973 }, "Smack Packet Reader (" + getConnectionCounter() + ")"); 974 } 975 976 /** 977 * Shuts the stanza(/packet) reader down. This method simply sets the 'done' flag to true. 978 */ 979 void shutdown() { 980 done = true; 981 } 982 983 /** 984 * Parse top-level packets in order to process them further. 985 * 986 * @param thread the thread that is being used by the reader to parse incoming packets. 987 */ 988 private void parsePackets() { 989 try { 990 initalOpenStreamSend.checkIfSuccessOrWait(); 991 int eventType = parser.getEventType(); 992 while (!done) { 993 switch (eventType) { 994 case XmlPullParser.START_TAG: 995 final String name = parser.getName(); 996 switch (name) { 997 case Message.ELEMENT: 998 case IQ.IQ_ELEMENT: 999 case Presence.ELEMENT: 1000 try { 1001 parseAndProcessStanza(parser); 1002 } finally { 1003 clientHandledStanzasCount = SMUtils.incrementHeight(clientHandledStanzasCount); 1004 } 1005 break; 1006 case "stream": 1007 // We found an opening stream. 1008 if ("jabber:client".equals(parser.getNamespace(null))) { 1009 streamId = parser.getAttributeValue("", "id"); 1010 String reportedServiceName = parser.getAttributeValue("", "from"); 1011 assert(reportedServiceName.equals(config.getServiceName())); 1012 } 1013 break; 1014 case "error": 1015 throw new StreamErrorException(PacketParserUtils.parseStreamError(parser)); 1016 case "features": 1017 parseFeatures(parser); 1018 break; 1019 case "proceed": 1020 try { 1021 // Secure the connection by negotiating TLS 1022 proceedTLSReceived(); 1023 // Send a new opening stream to the server 1024 openStream(); 1025 } 1026 catch (Exception e) { 1027 // We report any failure regarding TLS in the second stage of XMPP 1028 // connection establishment, namely the SASL authentication 1029 saslFeatureReceived.reportFailure(new SmackException(e)); 1030 throw e; 1031 } 1032 break; 1033 case "failure": 1034 String namespace = parser.getNamespace(null); 1035 switch (namespace) { 1036 case "urn:ietf:params:xml:ns:xmpp-tls": 1037 // TLS negotiation has failed. The server will close the connection 1038 // TODO Parse failure stanza 1039 throw new XMPPErrorException("TLS negotiation has failed", null); 1040 case "http://jabber.org/protocol/compress": 1041 // Stream compression has been denied. This is a recoverable 1042 // situation. It is still possible to authenticate and 1043 // use the connection but using an uncompressed connection 1044 // TODO Parse failure stanza 1045 compressSyncPoint.reportFailure(new XMPPErrorException( 1046 "Could not establish compression", null)); 1047 break; 1048 case SaslStreamElements.NAMESPACE: 1049 // SASL authentication has failed. The server may close the connection 1050 // depending on the number of retries 1051 final SASLFailure failure = PacketParserUtils.parseSASLFailure(parser); 1052 getSASLAuthentication().authenticationFailed(failure); 1053 break; 1054 } 1055 break; 1056 case Challenge.ELEMENT: 1057 // The server is challenging the SASL authentication made by the client 1058 String challengeData = parser.nextText(); 1059 getSASLAuthentication().challengeReceived(challengeData); 1060 break; 1061 case Success.ELEMENT: 1062 Success success = new Success(parser.nextText()); 1063 // We now need to bind a resource for the connection 1064 // Open a new stream and wait for the response 1065 openStream(); 1066 // The SASL authentication with the server was successful. The next step 1067 // will be to bind the resource 1068 getSASLAuthentication().authenticated(success); 1069 break; 1070 case Compressed.ELEMENT: 1071 // Server confirmed that it's possible to use stream compression. Start 1072 // stream compression 1073 // Initialize the reader and writer with the new compressed version 1074 initReaderAndWriter(); 1075 // Send a new opening stream to the server 1076 openStream(); 1077 // Notify that compression is being used 1078 compressSyncPoint.reportSuccess(); 1079 break; 1080 case Enabled.ELEMENT: 1081 Enabled enabled = ParseStreamManagement.enabled(parser); 1082 if (enabled.isResumeSet()) { 1083 smSessionId = enabled.getId(); 1084 if (StringUtils.isNullOrEmpty(smSessionId)) { 1085 XMPPErrorException xmppException = new XMPPErrorException( 1086 "Stream Management 'enabled' element with resume attribute but without session id received", 1087 new XMPPError( 1088 XMPPError.Condition.bad_request)); 1089 smEnabledSyncPoint.reportFailure(xmppException); 1090 throw xmppException; 1091 } 1092 smServerMaxResumptimTime = enabled.getMaxResumptionTime(); 1093 } else { 1094 // Mark this a non-resumable stream by setting smSessionId to null 1095 smSessionId = null; 1096 } 1097 clientHandledStanzasCount = 0; 1098 smWasEnabledAtLeastOnce = true; 1099 smEnabledSyncPoint.reportSuccess(); 1100 LOGGER.fine("Stream Management (XEP-198): succesfully enabled"); 1101 break; 1102 case Failed.ELEMENT: 1103 Failed failed = ParseStreamManagement.failed(parser); 1104 XMPPError xmppError = new XMPPError(failed.getXMPPErrorCondition()); 1105 XMPPException xmppException = new XMPPErrorException("Stream Management failed", xmppError); 1106 // If only XEP-198 would specify different failure elements for the SM 1107 // enable and SM resume failure case. But this is not the case, so we 1108 // need to determine if this is a 'Failed' response for either 'Enable' 1109 // or 'Resume'. 1110 if (smResumedSyncPoint.requestSent()) { 1111 smResumedSyncPoint.reportFailure(xmppException); 1112 } 1113 else { 1114 if (!smEnabledSyncPoint.requestSent()) { 1115 throw new IllegalStateException("Failed element received but SM was not previously enabled"); 1116 } 1117 smEnabledSyncPoint.reportFailure(xmppException); 1118 // Report success for last lastFeaturesReceived so that in case a 1119 // failed resumption, we can continue with normal resource binding. 1120 // See text of XEP-198 5. below Example 11. 1121 lastFeaturesReceived.reportSuccess(); 1122 } 1123 break; 1124 case Resumed.ELEMENT: 1125 Resumed resumed = ParseStreamManagement.resumed(parser); 1126 if (!smSessionId.equals(resumed.getPrevId())) { 1127 throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId()); 1128 } 1129 // Mark SM as enabled and resumption as successful. 1130 smResumedSyncPoint.reportSuccess(); 1131 smEnabledSyncPoint.reportSuccess(); 1132 // First, drop the stanzas already handled by the server 1133 processHandledCount(resumed.getHandledCount()); 1134 // Then re-send what is left in the unacknowledged queue 1135 List<Stanza> stanzasToResend = new ArrayList<>(unacknowledgedStanzas.size()); 1136 unacknowledgedStanzas.drainTo(stanzasToResend); 1137 for (Stanza stanza : stanzasToResend) { 1138 sendStanzaInternal(stanza); 1139 } 1140 // If there where stanzas resent, then request a SM ack for them. 1141 // Writer's sendStreamElement() won't do it automatically based on 1142 // predicates. 1143 if (!stanzasToResend.isEmpty()) { 1144 requestSmAcknowledgementInternal(); 1145 } 1146 LOGGER.fine("Stream Management (XEP-198): Stream resumed"); 1147 break; 1148 case AckAnswer.ELEMENT: 1149 AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser); 1150 processHandledCount(ackAnswer.getHandledCount()); 1151 break; 1152 case AckRequest.ELEMENT: 1153 ParseStreamManagement.ackRequest(parser); 1154 if (smEnabledSyncPoint.wasSuccessful()) { 1155 sendSmAcknowledgementInternal(); 1156 } else { 1157 LOGGER.warning("SM Ack Request received while SM is not enabled"); 1158 } 1159 break; 1160 default: 1161 LOGGER.warning("Unknown top level stream element: " + name); 1162 break; 1163 } 1164 break; 1165 case XmlPullParser.END_TAG: 1166 if (parser.getName().equals("stream")) { 1167 // Disconnect the connection 1168 disconnect(); 1169 } 1170 break; 1171 case XmlPullParser.END_DOCUMENT: 1172 // END_DOCUMENT only happens in an error case, as otherwise we would see a 1173 // closing stream element before. 1174 throw new SmackException( 1175 "Parser got END_DOCUMENT event. This could happen e.g. if the server closed the connection without sending a closing stream element"); 1176 } 1177 eventType = parser.next(); 1178 } 1179 } 1180 catch (Exception e) { 1181 // The exception can be ignored if the the connection is 'done' 1182 // or if the it was caused because the socket got closed 1183 if (!(done || isSocketClosed())) { 1184 // Close the connection and notify connection listeners of the 1185 // error. 1186 notifyConnectionError(e); 1187 } 1188 } 1189 } 1190 } 1191 1192 protected class PacketWriter { 1193 public static final int QUEUE_SIZE = XMPPTCPConnection.QUEUE_SIZE; 1194 1195 private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<Element>( 1196 QUEUE_SIZE, true); 1197 1198 /** 1199 * Needs to be protected for unit testing purposes. 1200 */ 1201 protected SynchronizationPoint<NoResponseException> shutdownDone = new SynchronizationPoint<NoResponseException>( 1202 XMPPTCPConnection.this); 1203 1204 /** 1205 * If set, the stanza(/packet) writer is shut down 1206 */ 1207 protected volatile Long shutdownTimestamp = null; 1208 1209 private volatile boolean instantShutdown; 1210 1211 /** 1212 * True if some preconditions are given to start the bundle and defer mechanism. 1213 * <p> 1214 * This will likely get set to true right after the start of the writer thread, because 1215 * {@link #nextStreamElement()} will check if {@link queue} is empty, which is probably the case, and then set 1216 * this field to true. 1217 * </p> 1218 */ 1219 private boolean shouldBundleAndDefer; 1220 1221 /** 1222 * Initializes the writer in order to be used. It is called at the first connection and also 1223 * is invoked if the connection is disconnected by an error. 1224 */ 1225 void init() { 1226 shutdownDone.init(); 1227 shutdownTimestamp = null; 1228 1229 if (unacknowledgedStanzas != null) { 1230 // It's possible that there are new stanzas in the writer queue that 1231 // came in while we were disconnected but resumable, drain those into 1232 // the unacknowledged queue so that they get resent now 1233 drainWriterQueueToUnacknowledgedStanzas(); 1234 } 1235 1236 queue.start(); 1237 Async.go(new Runnable() { 1238 @Override 1239 public void run() { 1240 writePackets(); 1241 } 1242 }, "Smack Packet Writer (" + getConnectionCounter() + ")"); 1243 } 1244 1245 private boolean done() { 1246 return shutdownTimestamp != null; 1247 } 1248 1249 protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException { 1250 if (done() && !isSmResumptionPossible()) { 1251 // Don't throw a NotConnectedException is there is an resumable stream available 1252 throw new NotConnectedException(); 1253 } 1254 } 1255 1256 /** 1257 * Sends the specified element to the server. 1258 * 1259 * @param element the element to send. 1260 * @throws NotConnectedException 1261 */ 1262 protected void sendStreamElement(Element element) throws NotConnectedException { 1263 throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 1264 1265 boolean enqueued = false; 1266 while (!enqueued) { 1267 try { 1268 queue.put(element); 1269 enqueued = true; 1270 } 1271 catch (InterruptedException e) { 1272 throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 1273 // If the method above did not throw, then the sending thread was interrupted 1274 // TODO in a later version of Smack the InterruptedException should be thrown to 1275 // allow users to interrupt a sending thread that is currently blocking because 1276 // the queue is full. 1277 LOGGER.log(Level.WARNING, "Sending thread was interrupted", e); 1278 } 1279 } 1280 } 1281 1282 /** 1283 * Shuts down the stanza(/packet) writer. Once this method has been called, no further 1284 * packets will be written to the server. 1285 */ 1286 void shutdown(boolean instant) { 1287 instantShutdown = instant; 1288 shutdownTimestamp = System.currentTimeMillis(); 1289 queue.shutdown(); 1290 try { 1291 shutdownDone.checkIfSuccessOrWait(); 1292 } 1293 catch (NoResponseException e) { 1294 LOGGER.log(Level.WARNING, "shutdownDone was not marked as successful by the writer thread", e); 1295 } 1296 } 1297 1298 /** 1299 * Maybe return the next available element from the queue for writing. If the queue is shut down <b>or</b> a 1300 * spurious interrupt occurs, <code>null</code> is returned. So it is important to check the 'done' condition in 1301 * that case. 1302 * 1303 * @return the next element for writing or null. 1304 */ 1305 private Element nextStreamElement() { 1306 // It is important the we check if the queue is empty before removing an element from it 1307 if (queue.isEmpty()) { 1308 shouldBundleAndDefer = true; 1309 } 1310 Element packet = null; 1311 try { 1312 packet = queue.take(); 1313 } 1314 catch (InterruptedException e) { 1315 if (!queue.isShutdown()) { 1316 // Users shouldn't try to interrupt the packet writer thread 1317 LOGGER.log(Level.WARNING, "Packet writer thread was interrupted. Don't do that. Use disconnect() instead.", e); 1318 } 1319 } 1320 return packet; 1321 } 1322 1323 private void writePackets() { 1324 try { 1325 openStream(); 1326 initalOpenStreamSend.reportSuccess(); 1327 // Write out packets from the queue. 1328 while (!done()) { 1329 Element element = nextStreamElement(); 1330 if (element == null) { 1331 continue; 1332 } 1333 1334 // Get a local version of the bundle and defer callback, in case it's unset 1335 // between the null check and the method invocation 1336 final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback; 1337 // If the preconditions are given (e.g. bundleAndDefer callback is set, queue is 1338 // empty), then we could wait a bit for further stanzas attempting to decrease 1339 // our energy consumption 1340 if (localBundleAndDeferCallback != null && isAuthenticated() && shouldBundleAndDefer) { 1341 // Reset shouldBundleAndDefer to false, nextStreamElement() will set it to true once the 1342 // queue is empty again. 1343 shouldBundleAndDefer = false; 1344 final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean(); 1345 final int bundleAndDeferMillis = localBundleAndDeferCallback.getBundleAndDeferMillis(new BundleAndDefer( 1346 bundlingAndDeferringStopped)); 1347 if (bundleAndDeferMillis > 0) { 1348 long remainingWait = bundleAndDeferMillis; 1349 final long waitStart = System.currentTimeMillis(); 1350 synchronized (bundlingAndDeferringStopped) { 1351 while (!bundlingAndDeferringStopped.get() && remainingWait > 0) { 1352 bundlingAndDeferringStopped.wait(remainingWait); 1353 remainingWait = bundleAndDeferMillis 1354 - (System.currentTimeMillis() - waitStart); 1355 } 1356 } 1357 } 1358 } 1359 1360 Stanza packet = null; 1361 if (element instanceof Stanza) { 1362 packet = (Stanza) element; 1363 } 1364 else if (element instanceof Enable) { 1365 // The client needs to add messages to the unacknowledged stanzas queue 1366 // right after it sent 'enabled'. Stanza will be added once 1367 // unacknowledgedStanzas is not null. 1368 unacknowledgedStanzas = new ArrayBlockingQueue<>(QUEUE_SIZE); 1369 } 1370 // Check if the stream element should be put to the unacknowledgedStanza 1371 // queue. Note that we can not do the put() in sendStanzaInternal() and the 1372 // packet order is not stable at this point (sendStanzaInternal() can be 1373 // called concurrently). 1374 if (unacknowledgedStanzas != null && packet != null) { 1375 // If the unacknowledgedStanza queue is nearly full, request an new ack 1376 // from the server in order to drain it 1377 if (unacknowledgedStanzas.size() == 0.8 * XMPPTCPConnection.QUEUE_SIZE) { 1378 writer.write(AckRequest.INSTANCE.toXML().toString()); 1379 writer.flush(); 1380 } 1381 try { 1382 // It is important the we put the stanza in the unacknowledged stanza 1383 // queue before we put it on the wire 1384 unacknowledgedStanzas.put(packet); 1385 } 1386 catch (InterruptedException e) { 1387 throw new IllegalStateException(e); 1388 } 1389 } 1390 writer.write(element.toXML().toString()); 1391 if (queue.isEmpty()) { 1392 writer.flush(); 1393 } 1394 if (packet != null) { 1395 firePacketSendingListeners(packet); 1396 } 1397 } 1398 if (!instantShutdown) { 1399 // Flush out the rest of the queue. 1400 try { 1401 while (!queue.isEmpty()) { 1402 Element packet = queue.remove(); 1403 writer.write(packet.toXML().toString()); 1404 } 1405 writer.flush(); 1406 } 1407 catch (Exception e) { 1408 LOGGER.log(Level.WARNING, 1409 "Exception flushing queue during shutdown, ignore and continue", 1410 e); 1411 } 1412 1413 // Close the stream. 1414 try { 1415 writer.write("</stream:stream>"); 1416 writer.flush(); 1417 } 1418 catch (Exception e) { 1419 LOGGER.log(Level.WARNING, "Exception writing closing stream element", e); 1420 } 1421 // Delete the queue contents (hopefully nothing is left). 1422 queue.clear(); 1423 } else if (instantShutdown && isSmEnabled()) { 1424 // This was an instantShutdown and SM is enabled, drain all remaining stanzas 1425 // into the unacknowledgedStanzas queue 1426 drainWriterQueueToUnacknowledgedStanzas(); 1427 } 1428 1429 try { 1430 writer.close(); 1431 } 1432 catch (Exception e) { 1433 // Do nothing 1434 } 1435 1436 } 1437 catch (Exception e) { 1438 // The exception can be ignored if the the connection is 'done' 1439 // or if the it was caused because the socket got closed 1440 if (!(done() || isSocketClosed())) { 1441 notifyConnectionError(e); 1442 } else { 1443 LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e); 1444 } 1445 } finally { 1446 LOGGER.fine("Reporting shutdownDone success in writer thread"); 1447 shutdownDone.reportSuccess(); 1448 } 1449 } 1450 1451 private void drainWriterQueueToUnacknowledgedStanzas() { 1452 List<Element> elements = new ArrayList<Element>(queue.size()); 1453 queue.drainTo(elements); 1454 for (Element element : elements) { 1455 if (element instanceof Stanza) { 1456 unacknowledgedStanzas.add((Stanza) element); 1457 } 1458 } 1459 } 1460 } 1461 1462 /** 1463 * Set if Stream Management should be used by default for new connections. 1464 * 1465 * @param useSmDefault true to use Stream Management for new connections. 1466 */ 1467 public static void setUseStreamManagementDefault(boolean useSmDefault) { 1468 XMPPTCPConnection.useSmDefault = useSmDefault; 1469 } 1470 1471 /** 1472 * Set if Stream Management resumption should be used by default for new connections. 1473 * 1474 * @param useSmResumptionDefault true to use Stream Management resumption for new connections. 1475 * @deprecated use {@link #setUseStreamManagementResumptionDefault(boolean)} instead. 1476 */ 1477 @Deprecated 1478 public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) { 1479 setUseStreamManagementResumptionDefault(useSmResumptionDefault); 1480 } 1481 1482 /** 1483 * Set if Stream Management resumption should be used by default for new connections. 1484 * 1485 * @param useSmResumptionDefault true to use Stream Management resumption for new connections. 1486 */ 1487 public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) { 1488 if (useSmResumptionDefault) { 1489 // Also enable SM is resumption is enabled 1490 setUseStreamManagementDefault(useSmResumptionDefault); 1491 } 1492 XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault; 1493 } 1494 1495 /** 1496 * Set if Stream Management should be used if supported by the server. 1497 * 1498 * @param useSm true to use Stream Management. 1499 */ 1500 public void setUseStreamManagement(boolean useSm) { 1501 this.useSm = useSm; 1502 } 1503 1504 /** 1505 * Set if Stream Management resumption should be used if supported by the server. 1506 * 1507 * @param useSmResumption true to use Stream Management resumption. 1508 */ 1509 public void setUseStreamManagementResumption(boolean useSmResumption) { 1510 if (useSmResumption) { 1511 // Also enable SM is resumption is enabled 1512 setUseStreamManagement(useSmResumption); 1513 } 1514 this.useSmResumption = useSmResumption; 1515 } 1516 1517 /** 1518 * Set the preferred resumption time in seconds. 1519 * @param resumptionTime the preferred resumption time in seconds 1520 */ 1521 public void setPreferredResumptionTime(int resumptionTime) { 1522 smClientMaxResumptionTime = resumptionTime; 1523 } 1524 1525 /** 1526 * Add a predicate for Stream Management acknowledgment requests. 1527 * <p> 1528 * Those predicates are used to determine when a Stream Management acknowledgement request is send to the server. 1529 * Some pre-defined predicates are found in the <code>org.jivesoftware.smack.sm.predicates</code> package. 1530 * </p> 1531 * <p> 1532 * If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used. 1533 * </p> 1534 * 1535 * @param predicate the predicate to add. 1536 * @return if the predicate was not already active. 1537 */ 1538 public boolean addRequestAckPredicate(StanzaFilter predicate) { 1539 synchronized (requestAckPredicates) { 1540 return requestAckPredicates.add(predicate); 1541 } 1542 } 1543 1544 /** 1545 * Remove the given predicate for Stream Management acknowledgment request. 1546 * @param predicate the predicate to remove. 1547 * @return true if the predicate was removed. 1548 */ 1549 public boolean removeRequestAckPredicate(StanzaFilter predicate) { 1550 synchronized (requestAckPredicates) { 1551 return requestAckPredicates.remove(predicate); 1552 } 1553 } 1554 1555 /** 1556 * Remove all predicates for Stream Management acknowledgment requests. 1557 */ 1558 public void removeAllRequestAckPredicates() { 1559 synchronized (requestAckPredicates) { 1560 requestAckPredicates.clear(); 1561 } 1562 } 1563 1564 /** 1565 * Send an unconditional Stream Management acknowledgement request to the server. 1566 * 1567 * @throws StreamManagementNotEnabledException if Stream Mangement is not enabled. 1568 * @throws NotConnectedException if the connection is not connected. 1569 */ 1570 public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException { 1571 if (!isSmEnabled()) { 1572 throw new StreamManagementException.StreamManagementNotEnabledException(); 1573 } 1574 requestSmAcknowledgementInternal(); 1575 } 1576 1577 private void requestSmAcknowledgementInternal() throws NotConnectedException { 1578 packetWriter.sendStreamElement(AckRequest.INSTANCE); 1579 } 1580 1581 /** 1582 * Send a unconditional Stream Management acknowledgment to the server. 1583 * <p> 1584 * See <a href="http://xmpp.org/extensions/xep-0198.html#acking">XEP-198: Stream Management ยง 4. Acks</a>: 1585 * "Either party MAY send an <a/> element at any time (e.g., after it has received a certain number of stanzas, 1586 * or after a certain period of time), even if it has not received an <r/> element from the other party." 1587 * </p> 1588 * 1589 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1590 * @throws NotConnectedException if the connection is not connected. 1591 */ 1592 public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException { 1593 if (!isSmEnabled()) { 1594 throw new StreamManagementException.StreamManagementNotEnabledException(); 1595 } 1596 sendSmAcknowledgementInternal(); 1597 } 1598 1599 private void sendSmAcknowledgementInternal() throws NotConnectedException { 1600 packetWriter.sendStreamElement(new AckAnswer(clientHandledStanzasCount)); 1601 } 1602 1603 /** 1604 * Add a Stanza acknowledged listener. 1605 * <p> 1606 * Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get 1607 * automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, StanzaListener)} when 1608 * possible. 1609 * </p> 1610 * 1611 * @param listener the listener to add. 1612 */ 1613 public void addStanzaAcknowledgedListener(StanzaListener listener) { 1614 stanzaAcknowledgedListeners.add(listener); 1615 } 1616 1617 /** 1618 * Remove the given Stanza acknowledged listener. 1619 * 1620 * @param listener the listener. 1621 * @return true if the listener was removed. 1622 */ 1623 public boolean removeStanzaAcknowledgedListener(StanzaListener listener) { 1624 return stanzaAcknowledgedListeners.remove(listener); 1625 } 1626 1627 /** 1628 * Remove all stanza acknowledged listeners. 1629 */ 1630 public void removeAllStanzaAcknowledgedListeners() { 1631 stanzaAcknowledgedListeners.clear(); 1632 } 1633 1634 /** 1635 * Add a new Stanza ID acknowledged listener for the given ID. 1636 * <p> 1637 * The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will 1638 * automatically be removed after the listener was run. 1639 * </p> 1640 * 1641 * @param id the stanza ID. 1642 * @param listener the listener to invoke. 1643 * @return the previous listener for this stanza ID or null. 1644 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1645 */ 1646 public StanzaListener addStanzaIdAcknowledgedListener(final String id, StanzaListener listener) throws StreamManagementNotEnabledException { 1647 // Prevent users from adding callbacks that will never get removed 1648 if (!smWasEnabledAtLeastOnce) { 1649 throw new StreamManagementException.StreamManagementNotEnabledException(); 1650 } 1651 // Remove the listener after max. 12 hours 1652 final int removeAfterSeconds = Math.min(getMaxSmResumptionTime(), 12 * 60 * 60); 1653 schedule(new Runnable() { 1654 @Override 1655 public void run() { 1656 stanzaIdAcknowledgedListeners.remove(id); 1657 } 1658 }, removeAfterSeconds, TimeUnit.SECONDS); 1659 return stanzaIdAcknowledgedListeners.put(id, listener); 1660 } 1661 1662 /** 1663 * Remove the Stanza ID acknowledged listener for the given ID. 1664 * 1665 * @param id the stanza ID. 1666 * @return true if the listener was found and removed, false otherwise. 1667 */ 1668 public StanzaListener removeStanzaIdAcknowledgedListener(String id) { 1669 return stanzaIdAcknowledgedListeners.remove(id); 1670 } 1671 1672 /** 1673 * Removes all Stanza ID acknowledged listeners. 1674 */ 1675 public void removeAllStanzaIdAcknowledgedListeners() { 1676 stanzaIdAcknowledgedListeners.clear(); 1677 } 1678 1679 /** 1680 * Returns true if Stream Management is supported by the server. 1681 * 1682 * @return true if Stream Management is supported by the server. 1683 */ 1684 public boolean isSmAvailable() { 1685 return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE); 1686 } 1687 1688 /** 1689 * Returns true if Stream Management was successfully negotiated with the server. 1690 * 1691 * @return true if Stream Management was negotiated. 1692 */ 1693 public boolean isSmEnabled() { 1694 return smEnabledSyncPoint.wasSuccessful(); 1695 } 1696 1697 /** 1698 * Returns true if the stream was successfully resumed with help of Stream Management. 1699 * 1700 * @return true if the stream was resumed. 1701 */ 1702 public boolean streamWasResumed() { 1703 return smResumedSyncPoint.wasSuccessful(); 1704 } 1705 1706 /** 1707 * Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible. 1708 * 1709 * @return true if disconnected but resumption possible. 1710 */ 1711 public boolean isDisconnectedButSmResumptionPossible() { 1712 return disconnectedButResumeable && isSmResumptionPossible(); 1713 } 1714 1715 /** 1716 * Returns true if the stream is resumable. 1717 * 1718 * @return true if the stream is resumable. 1719 */ 1720 public boolean isSmResumptionPossible() { 1721 // There is no resumable stream available 1722 if (smSessionId == null) 1723 return false; 1724 1725 final Long shutdownTimestamp = packetWriter.shutdownTimestamp; 1726 // Seems like we are already reconnected, report true 1727 if (shutdownTimestamp == null) { 1728 return true; 1729 } 1730 1731 // See if resumption time is over 1732 long current = System.currentTimeMillis(); 1733 long maxResumptionMillies = ((long) getMaxSmResumptionTime()) * 1000; 1734 if (current > shutdownTimestamp + maxResumptionMillies) { 1735 // Stream resumption is *not* possible if the current timestamp is greater then the greatest timestamp where 1736 // resumption is possible 1737 return false; 1738 } else { 1739 return true; 1740 } 1741 } 1742 1743 /** 1744 * Drop the stream management state. Sets {@link #smSessionId} and 1745 * {@link #unacknowledgedStanzas} to <code>null</code>. 1746 */ 1747 private void dropSmState() { 1748 // clientHandledCount and serverHandledCount will be reset on <enable/> and <enabled/> 1749 // respective. No need to reset them here. 1750 smSessionId = null; 1751 unacknowledgedStanzas = null; 1752 } 1753 1754 /** 1755 * Get the maximum resumption time in seconds after which a managed stream can be resumed. 1756 * <p> 1757 * This method will return {@link Integer#MAX_VALUE} if neither the client nor the server specify a maximum 1758 * resumption time. Be aware of integer overflows when using this value, e.g. do not add arbitrary values to it 1759 * without checking for overflows before. 1760 * </p> 1761 * 1762 * @return the maximum resumption time in seconds or {@link Integer#MAX_VALUE} if none set. 1763 */ 1764 public int getMaxSmResumptionTime() { 1765 int clientResumptionTime = smClientMaxResumptionTime > 0 ? smClientMaxResumptionTime : Integer.MAX_VALUE; 1766 int serverResumptionTime = smServerMaxResumptimTime > 0 ? smServerMaxResumptimTime : Integer.MAX_VALUE; 1767 return Math.min(clientResumptionTime, serverResumptionTime); 1768 } 1769 1770 private void processHandledCount(long handledCount) throws StreamManagementCounterError { 1771 long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount); 1772 final List<Stanza> ackedStanzas = new ArrayList<Stanza>( 1773 ackedStanzasCount <= Integer.MAX_VALUE ? (int) ackedStanzasCount 1774 : Integer.MAX_VALUE); 1775 for (long i = 0; i < ackedStanzasCount; i++) { 1776 Stanza ackedStanza = unacknowledgedStanzas.poll(); 1777 // If the server ack'ed a stanza, then it must be in the 1778 // unacknowledged stanza queue. There can be no exception. 1779 if (ackedStanza == null) { 1780 throw new StreamManagementCounterError(handledCount, serverHandledStanzasCount, 1781 ackedStanzasCount, ackedStanzas); 1782 } 1783 ackedStanzas.add(ackedStanza); 1784 } 1785 1786 boolean atLeastOneStanzaAcknowledgedListener = false; 1787 if (!stanzaAcknowledgedListeners.isEmpty()) { 1788 // If stanzaAcknowledgedListeners is not empty, the we have at least one 1789 atLeastOneStanzaAcknowledgedListener = true; 1790 } 1791 else { 1792 // Otherwise we look for a matching id in the stanza *id* acknowledged listeners 1793 for (Stanza ackedStanza : ackedStanzas) { 1794 String id = ackedStanza.getStanzaId(); 1795 if (id != null && stanzaIdAcknowledgedListeners.containsKey(id)) { 1796 atLeastOneStanzaAcknowledgedListener = true; 1797 break; 1798 } 1799 } 1800 } 1801 1802 // Only spawn a new thread if there is a chance that some listener is invoked 1803 if (atLeastOneStanzaAcknowledgedListener) { 1804 asyncGo(new Runnable() { 1805 @Override 1806 public void run() { 1807 for (Stanza ackedStanza : ackedStanzas) { 1808 for (StanzaListener listener : stanzaAcknowledgedListeners) { 1809 try { 1810 listener.processPacket(ackedStanza); 1811 } 1812 catch (NotConnectedException e) { 1813 LOGGER.log(Level.FINER, "Received not connected exception", e); 1814 } 1815 } 1816 String id = ackedStanza.getStanzaId(); 1817 if (StringUtils.isNullOrEmpty(id)) { 1818 continue; 1819 } 1820 StanzaListener listener = stanzaIdAcknowledgedListeners.remove(id); 1821 if (listener != null) { 1822 try { 1823 listener.processPacket(ackedStanza); 1824 } 1825 catch (NotConnectedException e) { 1826 LOGGER.log(Level.FINER, "Received not connected exception", e); 1827 } 1828 } 1829 } 1830 } 1831 }); 1832 } 1833 1834 serverHandledStanzasCount = handledCount; 1835 } 1836 1837 /** 1838 * Set the default bundle and defer callback used for new connections. 1839 * 1840 * @param defaultBundleAndDeferCallback 1841 * @see BundleAndDeferCallback 1842 * @since 4.1 1843 */ 1844 public static void setDefaultBundleAndDeferCallback(BundleAndDeferCallback defaultBundleAndDeferCallback) { 1845 XMPPTCPConnection.defaultBundleAndDeferCallback = defaultBundleAndDeferCallback; 1846 } 1847 1848 /** 1849 * Set the bundle and defer callback used for this connection. 1850 * <p> 1851 * You can use <code>null</code> as argument to reset the callback. Outgoing stanzas will then 1852 * no longer get deferred. 1853 * </p> 1854 * 1855 * @param bundleAndDeferCallback the callback or <code>null</code>. 1856 * @see BundleAndDeferCallback 1857 * @since 4.1 1858 */ 1859 public void setBundleandDeferCallback(BundleAndDeferCallback bundleAndDeferCallback) { 1860 this.bundleAndDeferCallback = bundleAndDeferCallback; 1861 } 1862 1863}