001/** 002 * 003 * Copyright 2009 Jive Software. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack; 018 019import java.io.IOException; 020import java.io.Reader; 021import java.io.Writer; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.HashMap; 025import java.util.LinkedHashMap; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.concurrent.ConcurrentLinkedQueue; 031import java.util.concurrent.CopyOnWriteArraySet; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.Executors; 034import java.util.concurrent.ScheduledExecutorService; 035import java.util.concurrent.ScheduledFuture; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.atomic.AtomicInteger; 038import java.util.concurrent.locks.Lock; 039import java.util.concurrent.locks.ReentrantLock; 040import java.util.logging.Level; 041import java.util.logging.Logger; 042 043import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode; 044import org.jivesoftware.smack.SmackException.AlreadyConnectedException; 045import org.jivesoftware.smack.SmackException.AlreadyLoggedInException; 046import org.jivesoftware.smack.SmackException.NoResponseException; 047import org.jivesoftware.smack.SmackException.NotConnectedException; 048import org.jivesoftware.smack.SmackException.ConnectionException; 049import org.jivesoftware.smack.SmackException.ResourceBindingNotOfferedException; 050import org.jivesoftware.smack.SmackException.SecurityRequiredException; 051import org.jivesoftware.smack.XMPPException.XMPPErrorException; 052import org.jivesoftware.smack.compress.packet.Compress; 053import org.jivesoftware.smack.compression.XMPPInputOutputStream; 054import org.jivesoftware.smack.debugger.SmackDebugger; 055import org.jivesoftware.smack.filter.IQReplyFilter; 056import org.jivesoftware.smack.filter.StanzaFilter; 057import org.jivesoftware.smack.filter.StanzaIdFilter; 058import org.jivesoftware.smack.iqrequest.IQRequestHandler; 059import org.jivesoftware.smack.packet.Bind; 060import org.jivesoftware.smack.packet.ErrorIQ; 061import org.jivesoftware.smack.packet.IQ; 062import org.jivesoftware.smack.packet.Mechanisms; 063import org.jivesoftware.smack.packet.Stanza; 064import org.jivesoftware.smack.packet.ExtensionElement; 065import org.jivesoftware.smack.packet.Presence; 066import org.jivesoftware.smack.packet.Session; 067import org.jivesoftware.smack.packet.StartTls; 068import org.jivesoftware.smack.packet.PlainStreamElement; 069import org.jivesoftware.smack.packet.XMPPError; 070import org.jivesoftware.smack.parsing.ParsingExceptionCallback; 071import org.jivesoftware.smack.parsing.UnparsablePacket; 072import org.jivesoftware.smack.provider.ExtensionElementProvider; 073import org.jivesoftware.smack.provider.ProviderManager; 074import org.jivesoftware.smack.util.BoundedThreadPoolExecutor; 075import org.jivesoftware.smack.util.DNSUtil; 076import org.jivesoftware.smack.util.Objects; 077import org.jivesoftware.smack.util.PacketParserUtils; 078import org.jivesoftware.smack.util.ParserUtils; 079import org.jivesoftware.smack.util.SmackExecutorThreadFactory; 080import org.jivesoftware.smack.util.StringUtils; 081import org.jivesoftware.smack.util.dns.HostAddress; 082import org.jxmpp.util.XmppStringUtils; 083import org.xmlpull.v1.XmlPullParser; 084import org.xmlpull.v1.XmlPullParserException; 085 086 087public abstract class AbstractXMPPConnection implements XMPPConnection { 088 private static final Logger LOGGER = Logger.getLogger(AbstractXMPPConnection.class.getName()); 089 090 /** 091 * Counter to uniquely identify connections that are created. 092 */ 093 private final static AtomicInteger connectionCounter = new AtomicInteger(0); 094 095 static { 096 // Ensure the SmackConfiguration class is loaded by calling a method in it. 097 SmackConfiguration.getVersion(); 098 } 099 100 /** 101 * Get the collection of listeners that are interested in connection creation events. 102 * 103 * @return a collection of listeners interested on new connections. 104 */ 105 protected static Collection<ConnectionCreationListener> getConnectionCreationListeners() { 106 return XMPPConnectionRegistry.getConnectionCreationListeners(); 107 } 108 109 /** 110 * A collection of ConnectionListeners which listen for connection closing 111 * and reconnection events. 112 */ 113 protected final Set<ConnectionListener> connectionListeners = 114 new CopyOnWriteArraySet<ConnectionListener>(); 115 116 /** 117 * A collection of PacketCollectors which collects packets for a specified filter 118 * and perform blocking and polling operations on the result queue. 119 * <p> 120 * We use a ConcurrentLinkedQueue here, because its Iterator is weakly 121 * consistent and we want {@link #invokePacketCollectors(Stanza)} for-each 122 * loop to be lock free. As drawback, removing a PacketCollector is O(n). 123 * The alternative would be a synchronized HashSet, but this would mean a 124 * synchronized block around every usage of <code>collectors</code>. 125 * </p> 126 */ 127 private final Collection<PacketCollector> collectors = new ConcurrentLinkedQueue<PacketCollector>(); 128 129 /** 130 * List of PacketListeners that will be notified synchronously when a new stanza(/packet) was received. 131 */ 132 private final Map<StanzaListener, ListenerWrapper> syncRecvListeners = new LinkedHashMap<>(); 133 134 /** 135 * List of PacketListeners that will be notified asynchronously when a new stanza(/packet) was received. 136 */ 137 private final Map<StanzaListener, ListenerWrapper> asyncRecvListeners = new LinkedHashMap<>(); 138 139 /** 140 * List of PacketListeners that will be notified when a new stanza(/packet) was sent. 141 */ 142 private final Map<StanzaListener, ListenerWrapper> sendListeners = 143 new HashMap<StanzaListener, ListenerWrapper>(); 144 145 /** 146 * List of PacketListeners that will be notified when a new stanza(/packet) is about to be 147 * sent to the server. These interceptors may modify the stanza(/packet) before it is being 148 * actually sent to the server. 149 */ 150 private final Map<StanzaListener, InterceptorWrapper> interceptors = 151 new HashMap<StanzaListener, InterceptorWrapper>(); 152 153 protected final Lock connectionLock = new ReentrantLock(); 154 155 protected final Map<String, ExtensionElement> streamFeatures = new HashMap<String, ExtensionElement>(); 156 157 /** 158 * The full JID of the authenticated user, as returned by the resource binding response of the server. 159 * <p> 160 * It is important that we don't infer the user from the login() arguments and the configurations service name, as, 161 * for example, when SASL External is used, the username is not given to login but taken from the 'external' 162 * certificate. 163 * </p> 164 */ 165 protected String user; 166 167 protected boolean connected = false; 168 169 /** 170 * The stream ID, see RFC 6120 § 4.7.3 171 */ 172 protected String streamId; 173 174 /** 175 * 176 */ 177 private long packetReplyTimeout = SmackConfiguration.getDefaultPacketReplyTimeout(); 178 179 /** 180 * The SmackDebugger allows to log and debug XML traffic. 181 */ 182 protected SmackDebugger debugger = null; 183 184 /** 185 * The Reader which is used for the debugger. 186 */ 187 protected Reader reader; 188 189 /** 190 * The Writer which is used for the debugger. 191 */ 192 protected Writer writer; 193 194 /** 195 * Set to success if the last features stanza from the server has been parsed. A XMPP connection 196 * handshake can invoke multiple features stanzas, e.g. when TLS is activated a second feature 197 * stanza is send by the server. This is set to true once the last feature stanza has been 198 * parsed. 199 */ 200 protected final SynchronizationPoint<Exception> lastFeaturesReceived = new SynchronizationPoint<Exception>( 201 AbstractXMPPConnection.this); 202 203 /** 204 * Set to success if the sasl feature has been received. 205 */ 206 protected final SynchronizationPoint<SmackException> saslFeatureReceived = new SynchronizationPoint<SmackException>( 207 AbstractXMPPConnection.this); 208 209 /** 210 * The SASLAuthentication manager that is responsible for authenticating with the server. 211 */ 212 protected SASLAuthentication saslAuthentication = new SASLAuthentication(this); 213 214 /** 215 * A number to uniquely identify connections that are created. This is distinct from the 216 * connection ID, which is a value sent by the server once a connection is made. 217 */ 218 protected final int connectionCounterValue = connectionCounter.getAndIncrement(); 219 220 /** 221 * Holds the initial configuration used while creating the connection. 222 */ 223 protected final ConnectionConfiguration config; 224 225 /** 226 * Defines how the from attribute of outgoing stanzas should be handled. 227 */ 228 private FromMode fromMode = FromMode.OMITTED; 229 230 protected XMPPInputOutputStream compressionHandler; 231 232 private ParsingExceptionCallback parsingExceptionCallback = SmackConfiguration.getDefaultParsingExceptionCallback(); 233 234 /** 235 * ExecutorService used to invoke the PacketListeners on newly arrived and parsed stanzas. It is 236 * important that we use a <b>single threaded ExecutorService</b> in order to guarantee that the 237 * PacketListeners are invoked in the same order the stanzas arrived. 238 */ 239 private final BoundedThreadPoolExecutor executorService = new BoundedThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, 240 100, new SmackExecutorThreadFactory(connectionCounterValue, "Incoming Processor")); 241 242 /** 243 * This scheduled thread pool executor is used to remove pending callbacks. 244 */ 245 private final ScheduledExecutorService removeCallbacksService = Executors.newSingleThreadScheduledExecutor( 246 new SmackExecutorThreadFactory(connectionCounterValue, "Remove Callbacks")); 247 248 /** 249 * A cached thread pool executor service with custom thread factory to set meaningful names on the threads and set 250 * them 'daemon'. 251 */ 252 private final ExecutorService cachedExecutorService = Executors.newCachedThreadPool( 253 // @formatter:off 254 new SmackExecutorThreadFactory( // threadFactory 255 connectionCounterValue, 256 "Cached Executor" 257 ) 258 // @formatter:on 259 ); 260 261 /** 262 * A executor service used to invoke the callbacks of synchronous stanza(/packet) listeners. We use a executor service to 263 * decouple incoming stanza processing from callback invocation. It is important that order of callback invocation 264 * is the same as the order of the incoming stanzas. Therefore we use a <i>single</i> threaded executor service. 265 */ 266 private final ExecutorService singleThreadedExecutorService = Executors.newSingleThreadExecutor(new SmackExecutorThreadFactory( 267 getConnectionCounter(), "Single Threaded Executor")); 268 269 /** 270 * The used host to establish the connection to 271 */ 272 protected String host; 273 274 /** 275 * The used port to establish the connection to 276 */ 277 protected int port; 278 279 /** 280 * Flag that indicates if the user is currently authenticated with the server. 281 */ 282 protected boolean authenticated = false; 283 284 /** 285 * Flag that indicates if the user was authenticated with the server when the connection 286 * to the server was closed (abruptly or not). 287 */ 288 protected boolean wasAuthenticated = false; 289 290 private final Map<String, IQRequestHandler> setIqRequestHandler = new HashMap<>(); 291 private final Map<String, IQRequestHandler> getIqRequestHandler = new HashMap<>(); 292 293 /** 294 * Create a new XMPPConnection to an XMPP server. 295 * 296 * @param configuration The configuration which is used to establish the connection. 297 */ 298 protected AbstractXMPPConnection(ConnectionConfiguration configuration) { 299 config = configuration; 300 } 301 302 /** 303 * Get the connection configuration used by this connection. 304 * 305 * @return the connection configuration. 306 */ 307 public ConnectionConfiguration getConfiguration() { 308 return config; 309 } 310 311 @Override 312 public String getServiceName() { 313 if (serviceName != null) { 314 return serviceName; 315 } 316 return config.getServiceName(); 317 } 318 319 @Override 320 public String getHost() { 321 return host; 322 } 323 324 @Override 325 public int getPort() { 326 return port; 327 } 328 329 @Override 330 public abstract boolean isSecureConnection(); 331 332 protected abstract void sendStanzaInternal(Stanza packet) throws NotConnectedException; 333 334 @Override 335 public abstract void send(PlainStreamElement element) throws NotConnectedException; 336 337 @Override 338 public abstract boolean isUsingCompression(); 339 340 /** 341 * Establishes a connection to the XMPP server and performs an automatic login 342 * only if the previous connection state was logged (authenticated). It basically 343 * creates and maintains a connection to the server. 344 * <p> 345 * Listeners will be preserved from a previous connection. 346 * 347 * @throws XMPPException if an error occurs on the XMPP protocol level. 348 * @throws SmackException if an error occurs somewhere else besides XMPP protocol level. 349 * @throws IOException 350 * @throws ConnectionException with detailed information about the failed connection. 351 * @return a reference to this object, to chain <code>connect()</code> with <code>login()</code>. 352 */ 353 public synchronized AbstractXMPPConnection connect() throws SmackException, IOException, XMPPException { 354 // Check if not already connected 355 throwAlreadyConnectedExceptionIfAppropriate(); 356 357 // Reset the connection state 358 saslAuthentication.init(); 359 saslFeatureReceived.init(); 360 lastFeaturesReceived.init(); 361 streamId = null; 362 363 // Perform the actual connection to the XMPP service 364 connectInternal(); 365 366 return this; 367 } 368 369 /** 370 * Abstract method that concrete subclasses of XMPPConnection need to implement to perform their 371 * way of XMPP connection establishment. Implementations are required to perform an automatic 372 * login if the previous connection state was logged (authenticated). 373 * 374 * @throws SmackException 375 * @throws IOException 376 * @throws XMPPException 377 */ 378 protected abstract void connectInternal() throws SmackException, IOException, XMPPException; 379 380 private String usedUsername, usedPassword, usedResource; 381 382 /** 383 * Logs in to the server using the strongest SASL mechanism supported by 384 * the server. If more than the connection's default stanza(/packet) timeout elapses in each step of the 385 * authentication process without a response from the server, a 386 * {@link SmackException.NoResponseException} will be thrown. 387 * <p> 388 * Before logging in (i.e. authenticate) to the server the connection must be connected 389 * by calling {@link #connect}. 390 * </p> 391 * <p> 392 * It is possible to log in without sending an initial available presence by using 393 * {@link ConnectionConfiguration.Builder#setSendPresence(boolean)}. 394 * Finally, if you want to not pass a password and instead use a more advanced mechanism 395 * while using SASL then you may be interested in using 396 * {@link ConnectionConfiguration.Builder#setCallbackHandler(javax.security.auth.callback.CallbackHandler)}. 397 * For more advanced login settings see {@link ConnectionConfiguration}. 398 * </p> 399 * 400 * @throws XMPPException if an error occurs on the XMPP protocol level. 401 * @throws SmackException if an error occurs somewhere else besides XMPP protocol level. 402 * @throws IOException if an I/O error occurs during login. 403 */ 404 public synchronized void login() throws XMPPException, SmackException, IOException { 405 if (isAnonymous()) { 406 throwNotConnectedExceptionIfAppropriate("Did you call connect() before login()?"); 407 throwAlreadyLoggedInExceptionIfAppropriate(); 408 loginAnonymously(); 409 } else { 410 // The previously used username, password and resource take over precedence over the 411 // ones from the connection configuration 412 CharSequence username = usedUsername != null ? usedUsername : config.getUsername(); 413 String password = usedPassword != null ? usedPassword : config.getPassword(); 414 String resource = usedResource != null ? usedResource : config.getResource(); 415 login(username, password, resource); 416 } 417 } 418 419 /** 420 * Same as {@link #login(CharSequence, String, String)}, but takes the resource from the connection 421 * configuration. 422 * 423 * @param username 424 * @param password 425 * @throws XMPPException 426 * @throws SmackException 427 * @throws IOException 428 * @see #login 429 */ 430 public synchronized void login(CharSequence username, String password) throws XMPPException, SmackException, 431 IOException { 432 login(username, password, config.getResource()); 433 } 434 435 /** 436 * Login with the given username (authorization identity). You may omit the password if a callback handler is used. 437 * If resource is null, then the server will generate one. 438 * 439 * @param username 440 * @param password 441 * @param resource 442 * @throws XMPPException 443 * @throws SmackException 444 * @throws IOException 445 * @see #login 446 */ 447 public synchronized void login(CharSequence username, String password, String resource) throws XMPPException, 448 SmackException, IOException { 449 if (!config.allowNullOrEmptyUsername) { 450 StringUtils.requireNotNullOrEmpty(username, "Username must not be null or empty"); 451 } 452 throwNotConnectedExceptionIfAppropriate(); 453 throwAlreadyLoggedInExceptionIfAppropriate(); 454 usedUsername = username != null ? username.toString() : null; 455 usedPassword = password; 456 usedResource = resource; 457 loginNonAnonymously(usedUsername, usedPassword, usedResource); 458 } 459 460 protected abstract void loginNonAnonymously(String username, String password, String resource) 461 throws XMPPException, SmackException, IOException; 462 463 protected abstract void loginAnonymously() throws XMPPException, SmackException, IOException; 464 465 @Override 466 public final boolean isConnected() { 467 return connected; 468 } 469 470 @Override 471 public final boolean isAuthenticated() { 472 return authenticated; 473 } 474 475 @Override 476 public final String getUser() { 477 return user; 478 } 479 480 @Override 481 public String getStreamId() { 482 if (!isConnected()) { 483 return null; 484 } 485 return streamId; 486 } 487 488 // TODO remove this suppression once "disable legacy session" code has been removed from Smack 489 @SuppressWarnings("deprecation") 490 protected void bindResourceAndEstablishSession(String resource) throws XMPPErrorException, 491 IOException, SmackException { 492 493 // Wait until either: 494 // - the servers last features stanza has been parsed 495 // - the timeout occurs 496 LOGGER.finer("Waiting for last features to be received before continuing with resource binding"); 497 lastFeaturesReceived.checkIfSuccessOrWait(); 498 499 500 if (!hasFeature(Bind.ELEMENT, Bind.NAMESPACE)) { 501 // Server never offered resource binding, which is REQURIED in XMPP client and 502 // server implementations as per RFC6120 7.2 503 throw new ResourceBindingNotOfferedException(); 504 } 505 506 // Resource binding, see RFC6120 7. 507 // Note that we can not use IQReplyFilter here, since the users full JID is not yet 508 // available. It will become available right after the resource has been successfully bound. 509 Bind bindResource = Bind.newSet(resource); 510 PacketCollector packetCollector = createPacketCollectorAndSend(new StanzaIdFilter(bindResource), bindResource); 511 Bind response = packetCollector.nextResultOrThrow(); 512 // Set the connections user to the result of resource binding. It is important that we don't infer the user 513 // from the login() arguments and the configurations service name, as, for example, when SASL External is used, 514 // the username is not given to login but taken from the 'external' certificate. 515 user = response.getJid(); 516 serviceName = XmppStringUtils.parseDomain(user); 517 518 Session.Feature sessionFeature = getFeature(Session.ELEMENT, Session.NAMESPACE); 519 // Only bind the session if it's announced as stream feature by the server, is not optional and not disabled 520 // For more information see http://tools.ietf.org/html/draft-cridland-xmpp-session-01 521 if (sessionFeature != null && !sessionFeature.isOptional() && !getConfiguration().isLegacySessionDisabled()) { 522 Session session = new Session(); 523 packetCollector = createPacketCollectorAndSend(new StanzaIdFilter(session), session); 524 packetCollector.nextResultOrThrow(); 525 } 526 } 527 528 protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException { 529 // Indicate that we're now authenticated. 530 this.authenticated = true; 531 532 // If debugging is enabled, change the the debug window title to include the 533 // name we are now logged-in as. 534 // If DEBUG was set to true AFTER the connection was created the debugger 535 // will be null 536 if (config.isDebuggerEnabled() && debugger != null) { 537 debugger.userHasLogged(user); 538 } 539 callConnectionAuthenticatedListener(resumed); 540 541 // Set presence to online. It is important that this is done after 542 // callConnectionAuthenticatedListener(), as this call will also 543 // eventually load the roster. And we should load the roster before we 544 // send the initial presence. 545 if (config.isSendPresence() && !resumed) { 546 sendStanza(new Presence(Presence.Type.available)); 547 } 548 } 549 550 @Override 551 public final boolean isAnonymous() { 552 return config.getUsername() == null && usedUsername == null 553 && !config.allowNullOrEmptyUsername; 554 } 555 556 private String serviceName; 557 558 protected List<HostAddress> hostAddresses; 559 560 /** 561 * Populates {@link #hostAddresses} with at least one host address. 562 * 563 * @return a list of host addresses where DNS (SRV) RR resolution failed. 564 */ 565 protected List<HostAddress> populateHostAddresses() { 566 List<HostAddress> failedAddresses = new LinkedList<>(); 567 // N.B.: Important to use config.serviceName and not AbstractXMPPConnection.serviceName 568 if (config.host != null) { 569 hostAddresses = new ArrayList<HostAddress>(1); 570 HostAddress hostAddress; 571 hostAddress = new HostAddress(config.host, config.port); 572 hostAddresses.add(hostAddress); 573 } else { 574 hostAddresses = DNSUtil.resolveXMPPDomain(config.serviceName, failedAddresses); 575 } 576 // If we reach this, then hostAddresses *must not* be empty, i.e. there is at least one host added, either the 577 // config.host one or the host representing the service name by DNSUtil 578 assert(!hostAddresses.isEmpty()); 579 return failedAddresses; 580 } 581 582 protected Lock getConnectionLock() { 583 return connectionLock; 584 } 585 586 protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException { 587 throwNotConnectedExceptionIfAppropriate(null); 588 } 589 590 protected void throwNotConnectedExceptionIfAppropriate(String optionalHint) throws NotConnectedException { 591 if (!isConnected()) { 592 throw new NotConnectedException(optionalHint); 593 } 594 } 595 596 protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException { 597 if (isConnected()) { 598 throw new AlreadyConnectedException(); 599 } 600 } 601 602 protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException { 603 if (isAuthenticated()) { 604 throw new AlreadyLoggedInException(); 605 } 606 } 607 608 @Deprecated 609 @Override 610 public void sendPacket(Stanza packet) throws NotConnectedException { 611 sendStanza(packet); 612 } 613 614 @Override 615 public void sendStanza(Stanza packet) throws NotConnectedException { 616 Objects.requireNonNull(packet, "Packet must not be null"); 617 618 throwNotConnectedExceptionIfAppropriate(); 619 switch (fromMode) { 620 case OMITTED: 621 packet.setFrom(null); 622 break; 623 case USER: 624 packet.setFrom(getUser()); 625 break; 626 case UNCHANGED: 627 default: 628 break; 629 } 630 // Invoke interceptors for the new packet that is about to be sent. Interceptors may modify 631 // the content of the packet. 632 firePacketInterceptors(packet); 633 sendStanzaInternal(packet); 634 } 635 636 /** 637 * Returns the SASLAuthentication manager that is responsible for authenticating with 638 * the server. 639 * 640 * @return the SASLAuthentication manager that is responsible for authenticating with 641 * the server. 642 */ 643 protected SASLAuthentication getSASLAuthentication() { 644 return saslAuthentication; 645 } 646 647 /** 648 * Closes the connection by setting presence to unavailable then closing the connection to 649 * the XMPP server. The XMPPConnection can still be used for connecting to the server 650 * again. 651 * 652 */ 653 public void disconnect() { 654 try { 655 disconnect(new Presence(Presence.Type.unavailable)); 656 } 657 catch (NotConnectedException e) { 658 LOGGER.log(Level.FINEST, "Connection is already disconnected", e); 659 } 660 } 661 662 /** 663 * Closes the connection. A custom unavailable presence is sent to the server, followed 664 * by closing the stream. The XMPPConnection can still be used for connecting to the server 665 * again. A custom unavailable presence is useful for communicating offline presence 666 * information such as "On vacation". Typically, just the status text of the presence 667 * stanza(/packet) is set with online information, but most XMPP servers will deliver the full 668 * presence stanza(/packet) with whatever data is set. 669 * 670 * @param unavailablePresence the presence stanza(/packet) to send during shutdown. 671 * @throws NotConnectedException 672 */ 673 public synchronized void disconnect(Presence unavailablePresence) throws NotConnectedException { 674 sendStanza(unavailablePresence); 675 shutdown(); 676 callConnectionClosedListener(); 677 } 678 679 /** 680 * Shuts the current connection down. 681 */ 682 protected abstract void shutdown(); 683 684 @Override 685 public void addConnectionListener(ConnectionListener connectionListener) { 686 if (connectionListener == null) { 687 return; 688 } 689 connectionListeners.add(connectionListener); 690 } 691 692 @Override 693 public void removeConnectionListener(ConnectionListener connectionListener) { 694 connectionListeners.remove(connectionListener); 695 } 696 697 @Override 698 public PacketCollector createPacketCollectorAndSend(IQ packet) throws NotConnectedException { 699 StanzaFilter packetFilter = new IQReplyFilter(packet, this); 700 // Create the packet collector before sending the packet 701 PacketCollector packetCollector = createPacketCollectorAndSend(packetFilter, packet); 702 return packetCollector; 703 } 704 705 @Override 706 public PacketCollector createPacketCollectorAndSend(StanzaFilter packetFilter, Stanza packet) 707 throws NotConnectedException { 708 // Create the packet collector before sending the packet 709 PacketCollector packetCollector = createPacketCollector(packetFilter); 710 try { 711 // Now we can send the packet as the collector has been created 712 sendStanza(packet); 713 } 714 catch (NotConnectedException | RuntimeException e) { 715 packetCollector.cancel(); 716 throw e; 717 } 718 return packetCollector; 719 } 720 721 @Override 722 public PacketCollector createPacketCollector(StanzaFilter packetFilter) { 723 PacketCollector.Configuration configuration = PacketCollector.newConfiguration().setStanzaFilter(packetFilter); 724 return createPacketCollector(configuration); 725 } 726 727 @Override 728 public PacketCollector createPacketCollector(PacketCollector.Configuration configuration) { 729 PacketCollector collector = new PacketCollector(this, configuration); 730 // Add the collector to the list of active collectors. 731 collectors.add(collector); 732 return collector; 733 } 734 735 @Override 736 public void removePacketCollector(PacketCollector collector) { 737 collectors.remove(collector); 738 } 739 740 @Override 741 @Deprecated 742 public void addPacketListener(StanzaListener packetListener, StanzaFilter packetFilter) { 743 addAsyncStanzaListener(packetListener, packetFilter); 744 } 745 746 @Override 747 @Deprecated 748 public boolean removePacketListener(StanzaListener packetListener) { 749 return removeAsyncStanzaListener(packetListener); 750 } 751 752 @Override 753 public void addSyncStanzaListener(StanzaListener packetListener, StanzaFilter packetFilter) { 754 if (packetListener == null) { 755 throw new NullPointerException("Packet listener is null."); 756 } 757 ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); 758 synchronized (syncRecvListeners) { 759 syncRecvListeners.put(packetListener, wrapper); 760 } 761 } 762 763 @Override 764 public boolean removeSyncStanzaListener(StanzaListener packetListener) { 765 synchronized (syncRecvListeners) { 766 return syncRecvListeners.remove(packetListener) != null; 767 } 768 } 769 770 @Override 771 public void addAsyncStanzaListener(StanzaListener packetListener, StanzaFilter packetFilter) { 772 if (packetListener == null) { 773 throw new NullPointerException("Packet listener is null."); 774 } 775 ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); 776 synchronized (asyncRecvListeners) { 777 asyncRecvListeners.put(packetListener, wrapper); 778 } 779 } 780 781 @Override 782 public boolean removeAsyncStanzaListener(StanzaListener packetListener) { 783 synchronized (asyncRecvListeners) { 784 return asyncRecvListeners.remove(packetListener) != null; 785 } 786 } 787 788 @Override 789 public void addPacketSendingListener(StanzaListener packetListener, StanzaFilter packetFilter) { 790 if (packetListener == null) { 791 throw new NullPointerException("Packet listener is null."); 792 } 793 ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); 794 synchronized (sendListeners) { 795 sendListeners.put(packetListener, wrapper); 796 } 797 } 798 799 @Override 800 public void removePacketSendingListener(StanzaListener packetListener) { 801 synchronized (sendListeners) { 802 sendListeners.remove(packetListener); 803 } 804 } 805 806 /** 807 * Process all stanza(/packet) listeners for sending packets. 808 * <p> 809 * Compared to {@link #firePacketInterceptors(Stanza)}, the listeners will be invoked in a new thread. 810 * </p> 811 * 812 * @param packet the stanza(/packet) to process. 813 */ 814 @SuppressWarnings("javadoc") 815 protected void firePacketSendingListeners(final Stanza packet) { 816 final List<StanzaListener> listenersToNotify = new LinkedList<StanzaListener>(); 817 synchronized (sendListeners) { 818 for (ListenerWrapper listenerWrapper : sendListeners.values()) { 819 if (listenerWrapper.filterMatches(packet)) { 820 listenersToNotify.add(listenerWrapper.getListener()); 821 } 822 } 823 } 824 if (listenersToNotify.isEmpty()) { 825 return; 826 } 827 // Notify in a new thread, because we can 828 asyncGo(new Runnable() { 829 @Override 830 public void run() { 831 for (StanzaListener listener : listenersToNotify) { 832 try { 833 listener.processPacket(packet); 834 } 835 catch (Exception e) { 836 LOGGER.log(Level.WARNING, "Sending listener threw exception", e); 837 continue; 838 } 839 } 840 }}); 841 } 842 843 @Override 844 public void addPacketInterceptor(StanzaListener packetInterceptor, 845 StanzaFilter packetFilter) { 846 if (packetInterceptor == null) { 847 throw new NullPointerException("Packet interceptor is null."); 848 } 849 InterceptorWrapper interceptorWrapper = new InterceptorWrapper(packetInterceptor, packetFilter); 850 synchronized (interceptors) { 851 interceptors.put(packetInterceptor, interceptorWrapper); 852 } 853 } 854 855 @Override 856 public void removePacketInterceptor(StanzaListener packetInterceptor) { 857 synchronized (interceptors) { 858 interceptors.remove(packetInterceptor); 859 } 860 } 861 862 /** 863 * Process interceptors. Interceptors may modify the stanza(/packet) that is about to be sent. 864 * Since the thread that requested to send the stanza(/packet) will invoke all interceptors, it 865 * is important that interceptors perform their work as soon as possible so that the 866 * thread does not remain blocked for a long period. 867 * 868 * @param packet the stanza(/packet) that is going to be sent to the server 869 */ 870 private void firePacketInterceptors(Stanza packet) { 871 List<StanzaListener> interceptorsToInvoke = new LinkedList<StanzaListener>(); 872 synchronized (interceptors) { 873 for (InterceptorWrapper interceptorWrapper : interceptors.values()) { 874 if (interceptorWrapper.filterMatches(packet)) { 875 interceptorsToInvoke.add(interceptorWrapper.getInterceptor()); 876 } 877 } 878 } 879 for (StanzaListener interceptor : interceptorsToInvoke) { 880 try { 881 interceptor.processPacket(packet); 882 } catch (Exception e) { 883 LOGGER.log(Level.SEVERE, "Packet interceptor threw exception", e); 884 } 885 } 886 } 887 888 /** 889 * Initialize the {@link #debugger}. You can specify a customized {@link SmackDebugger} 890 * by setup the system property <code>smack.debuggerClass</code> to the implementation. 891 * 892 * @throws IllegalStateException if the reader or writer isn't yet initialized. 893 * @throws IllegalArgumentException if the SmackDebugger can't be loaded. 894 */ 895 protected void initDebugger() { 896 if (reader == null || writer == null) { 897 throw new NullPointerException("Reader or writer isn't initialized."); 898 } 899 // If debugging is enabled, we open a window and write out all network traffic. 900 if (config.isDebuggerEnabled()) { 901 if (debugger == null) { 902 debugger = SmackConfiguration.createDebugger(this, writer, reader); 903 } 904 905 if (debugger == null) { 906 LOGGER.severe("Debugging enabled but could not find debugger class"); 907 } else { 908 // Obtain new reader and writer from the existing debugger 909 reader = debugger.newConnectionReader(reader); 910 writer = debugger.newConnectionWriter(writer); 911 } 912 } 913 } 914 915 @Override 916 public long getPacketReplyTimeout() { 917 return packetReplyTimeout; 918 } 919 920 @Override 921 public void setPacketReplyTimeout(long timeout) { 922 packetReplyTimeout = timeout; 923 } 924 925 private static boolean replyToUnknownIqDefault = true; 926 927 /** 928 * Set the default value used to determine if new connection will reply to unknown IQ requests. The pre-configured 929 * default is 'true'. 930 * 931 * @param replyToUnkownIqDefault 932 * @see #setReplyToUnknownIq(boolean) 933 */ 934 public static void setReplyToUnknownIqDefault(boolean replyToUnkownIqDefault) { 935 AbstractXMPPConnection.replyToUnknownIqDefault = replyToUnkownIqDefault; 936 } 937 938 private boolean replyToUnkownIq = replyToUnknownIqDefault; 939 940 /** 941 * Set if Smack will automatically send 942 * {@link org.jivesoftware.smack.packet.XMPPError.Condition#feature_not_implemented} when a request IQ without a 943 * registered {@link IQRequestHandler} is received. 944 * 945 * @param replyToUnknownIq 946 */ 947 public void setReplyToUnknownIq(boolean replyToUnknownIq) { 948 this.replyToUnkownIq = replyToUnknownIq; 949 } 950 951 protected void parseAndProcessStanza(XmlPullParser parser) throws Exception { 952 ParserUtils.assertAtStartTag(parser); 953 int parserDepth = parser.getDepth(); 954 Stanza stanza = null; 955 try { 956 stanza = PacketParserUtils.parseStanza(parser); 957 } 958 catch (Exception e) { 959 CharSequence content = PacketParserUtils.parseContentDepth(parser, 960 parserDepth); 961 UnparsablePacket message = new UnparsablePacket(content, e); 962 ParsingExceptionCallback callback = getParsingExceptionCallback(); 963 if (callback != null) { 964 callback.handleUnparsablePacket(message); 965 } 966 } 967 ParserUtils.assertAtEndTag(parser); 968 if (stanza != null) { 969 processPacket(stanza); 970 } 971 } 972 973 /** 974 * Processes a stanza(/packet) after it's been fully parsed by looping through the installed 975 * stanza(/packet) collectors and listeners and letting them examine the stanza(/packet) to see if 976 * they are a match with the filter. 977 * 978 * @param packet the stanza(/packet) to process. 979 * @throws InterruptedException 980 */ 981 protected void processPacket(Stanza packet) throws InterruptedException { 982 assert(packet != null); 983 lastStanzaReceived = System.currentTimeMillis(); 984 // Deliver the incoming packet to listeners. 985 executorService.executeBlocking(new ListenerNotification(packet)); 986 } 987 988 /** 989 * A runnable to notify all listeners and stanza(/packet) collectors of a packet. 990 */ 991 private class ListenerNotification implements Runnable { 992 993 private final Stanza packet; 994 995 public ListenerNotification(Stanza packet) { 996 this.packet = packet; 997 } 998 999 public void run() { 1000 invokePacketCollectorsAndNotifyRecvListeners(packet); 1001 } 1002 } 1003 1004 /** 1005 * Invoke {@link PacketCollector#processPacket(Stanza)} for every 1006 * PacketCollector with the given packet. Also notify the receive listeners with a matching stanza(/packet) filter about the packet. 1007 * 1008 * @param packet the stanza(/packet) to notify the PacketCollectors and receive listeners about. 1009 */ 1010 protected void invokePacketCollectorsAndNotifyRecvListeners(final Stanza packet) { 1011 if (packet instanceof IQ) { 1012 final IQ iq = (IQ) packet; 1013 final IQ.Type type = iq.getType(); 1014 switch (type) { 1015 case set: 1016 case get: 1017 final String key = XmppStringUtils.generateKey(iq.getChildElementName(), iq.getChildElementNamespace()); 1018 IQRequestHandler iqRequestHandler = null; 1019 switch (type) { 1020 case set: 1021 synchronized (setIqRequestHandler) { 1022 iqRequestHandler = setIqRequestHandler.get(key); 1023 } 1024 break; 1025 case get: 1026 synchronized (getIqRequestHandler) { 1027 iqRequestHandler = getIqRequestHandler.get(key); 1028 } 1029 break; 1030 default: 1031 throw new IllegalStateException("Should only encounter IQ type 'get' or 'set'"); 1032 } 1033 if (iqRequestHandler == null) { 1034 if (!replyToUnkownIq) { 1035 return; 1036 } 1037 // If the IQ stanza is of type "get" or "set" with no registered IQ request handler, then answer an 1038 // IQ of type "error" with code 501 ("feature-not-implemented") 1039 ErrorIQ errorIQ = IQ.createErrorResponse(iq, new XMPPError( 1040 XMPPError.Condition.feature_not_implemented)); 1041 try { 1042 sendStanza(errorIQ); 1043 } 1044 catch (NotConnectedException e) { 1045 LOGGER.log(Level.WARNING, "NotConnectedException while sending error IQ to unkown IQ request", e); 1046 } 1047 } else { 1048 ExecutorService executorService = null; 1049 switch (iqRequestHandler.getMode()) { 1050 case sync: 1051 executorService = singleThreadedExecutorService; 1052 break; 1053 case async: 1054 executorService = cachedExecutorService; 1055 break; 1056 } 1057 final IQRequestHandler finalIqRequestHandler = iqRequestHandler; 1058 executorService.execute(new Runnable() { 1059 @Override 1060 public void run() { 1061 IQ response = finalIqRequestHandler.handleIQRequest(iq); 1062 if (response == null) { 1063 // It is not ideal if the IQ request handler does not return an IQ response, because RFC 1064 // 6120 § 8.1.2 does specify that a response is mandatory. But some APIs, mostly the 1065 // file transfer one, does not always return a result, so we need to handle this case. 1066 // Also sometimes a request handler may decide that it's better to not send a response, 1067 // e.g. to avoid presence leaks. 1068 return; 1069 } 1070 try { 1071 sendStanza(response); 1072 } 1073 catch (NotConnectedException e) { 1074 LOGGER.log(Level.WARNING, "NotConnectedException while sending response to IQ request", e); 1075 } 1076 } 1077 }); 1078 // The following returns makes it impossible for packet listeners and collectors to 1079 // filter for IQ request stanzas, i.e. IQs of type 'set' or 'get'. This is the 1080 // desired behavior. 1081 return; 1082 } 1083 break; 1084 default: 1085 break; 1086 } 1087 } 1088 1089 // First handle the async recv listeners. Note that this code is very similar to what follows a few lines below, 1090 // the only difference is that asyncRecvListeners is used here and that the packet listeners are started in 1091 // their own thread. 1092 final Collection<StanzaListener> listenersToNotify = new LinkedList<StanzaListener>(); 1093 synchronized (asyncRecvListeners) { 1094 for (ListenerWrapper listenerWrapper : asyncRecvListeners.values()) { 1095 if (listenerWrapper.filterMatches(packet)) { 1096 listenersToNotify.add(listenerWrapper.getListener()); 1097 } 1098 } 1099 } 1100 1101 for (final StanzaListener listener : listenersToNotify) { 1102 asyncGo(new Runnable() { 1103 @Override 1104 public void run() { 1105 try { 1106 listener.processPacket(packet); 1107 } catch (Exception e) { 1108 LOGGER.log(Level.SEVERE, "Exception in async packet listener", e); 1109 } 1110 } 1111 }); 1112 } 1113 1114 // Loop through all collectors and notify the appropriate ones. 1115 for (PacketCollector collector: collectors) { 1116 collector.processPacket(packet); 1117 } 1118 1119 // Notify the receive listeners interested in the packet 1120 listenersToNotify.clear(); 1121 synchronized (syncRecvListeners) { 1122 for (ListenerWrapper listenerWrapper : syncRecvListeners.values()) { 1123 if (listenerWrapper.filterMatches(packet)) { 1124 listenersToNotify.add(listenerWrapper.getListener()); 1125 } 1126 } 1127 } 1128 1129 // Decouple incoming stanza processing from listener invocation. Unlike async listeners, this uses a single 1130 // threaded executor service and therefore keeps the order. 1131 singleThreadedExecutorService.execute(new Runnable() { 1132 @Override 1133 public void run() { 1134 for (StanzaListener listener : listenersToNotify) { 1135 try { 1136 listener.processPacket(packet); 1137 } catch(NotConnectedException e) { 1138 LOGGER.log(Level.WARNING, "Got not connected exception, aborting", e); 1139 break; 1140 } catch (Exception e) { 1141 LOGGER.log(Level.SEVERE, "Exception in packet listener", e); 1142 } 1143 } 1144 } 1145 }); 1146 1147 } 1148 1149 /** 1150 * Sets whether the connection has already logged in the server. This method assures that the 1151 * {@link #wasAuthenticated} flag is never reset once it has ever been set. 1152 * 1153 */ 1154 protected void setWasAuthenticated() { 1155 // Never reset the flag if the connection has ever been authenticated 1156 if (!wasAuthenticated) { 1157 wasAuthenticated = authenticated; 1158 } 1159 } 1160 1161 protected void callConnectionConnectedListener() { 1162 for (ConnectionListener listener : connectionListeners) { 1163 listener.connected(this); 1164 } 1165 } 1166 1167 protected void callConnectionAuthenticatedListener(boolean resumed) { 1168 for (ConnectionListener listener : connectionListeners) { 1169 try { 1170 listener.authenticated(this, resumed); 1171 } catch (Exception e) { 1172 // Catch and print any exception so we can recover 1173 // from a faulty listener and finish the shutdown process 1174 LOGGER.log(Level.SEVERE, "Exception in authenticated listener", e); 1175 } 1176 } 1177 } 1178 1179 void callConnectionClosedListener() { 1180 for (ConnectionListener listener : connectionListeners) { 1181 try { 1182 listener.connectionClosed(); 1183 } 1184 catch (Exception e) { 1185 // Catch and print any exception so we can recover 1186 // from a faulty listener and finish the shutdown process 1187 LOGGER.log(Level.SEVERE, "Error in listener while closing connection", e); 1188 } 1189 } 1190 } 1191 1192 protected void callConnectionClosedOnErrorListener(Exception e) { 1193 LOGGER.log(Level.WARNING, "Connection closed with error", e); 1194 for (ConnectionListener listener : connectionListeners) { 1195 try { 1196 listener.connectionClosedOnError(e); 1197 } 1198 catch (Exception e2) { 1199 // Catch and print any exception so we can recover 1200 // from a faulty listener 1201 LOGGER.log(Level.SEVERE, "Error in listener while closing connection", e2); 1202 } 1203 } 1204 } 1205 1206 /** 1207 * Sends a notification indicating that the connection was reconnected successfully. 1208 */ 1209 protected void notifyReconnection() { 1210 // Notify connection listeners of the reconnection. 1211 for (ConnectionListener listener : connectionListeners) { 1212 try { 1213 listener.reconnectionSuccessful(); 1214 } 1215 catch (Exception e) { 1216 // Catch and print any exception so we can recover 1217 // from a faulty listener 1218 LOGGER.log(Level.WARNING, "notifyReconnection()", e); 1219 } 1220 } 1221 } 1222 1223 /** 1224 * A wrapper class to associate a stanza(/packet) filter with a listener. 1225 */ 1226 protected static class ListenerWrapper { 1227 1228 private final StanzaListener packetListener; 1229 private final StanzaFilter packetFilter; 1230 1231 /** 1232 * Create a class which associates a stanza(/packet) filter with a listener. 1233 * 1234 * @param packetListener the stanza(/packet) listener. 1235 * @param packetFilter the associated filter or null if it listen for all packets. 1236 */ 1237 public ListenerWrapper(StanzaListener packetListener, StanzaFilter packetFilter) { 1238 this.packetListener = packetListener; 1239 this.packetFilter = packetFilter; 1240 } 1241 1242 public boolean filterMatches(Stanza packet) { 1243 return packetFilter == null || packetFilter.accept(packet); 1244 } 1245 1246 public StanzaListener getListener() { 1247 return packetListener; 1248 } 1249 } 1250 1251 /** 1252 * A wrapper class to associate a stanza(/packet) filter with an interceptor. 1253 */ 1254 protected static class InterceptorWrapper { 1255 1256 private final StanzaListener packetInterceptor; 1257 private final StanzaFilter packetFilter; 1258 1259 /** 1260 * Create a class which associates a stanza(/packet) filter with an interceptor. 1261 * 1262 * @param packetInterceptor the interceptor. 1263 * @param packetFilter the associated filter or null if it intercepts all packets. 1264 */ 1265 public InterceptorWrapper(StanzaListener packetInterceptor, StanzaFilter packetFilter) { 1266 this.packetInterceptor = packetInterceptor; 1267 this.packetFilter = packetFilter; 1268 } 1269 1270 public boolean filterMatches(Stanza packet) { 1271 return packetFilter == null || packetFilter.accept(packet); 1272 } 1273 1274 public StanzaListener getInterceptor() { 1275 return packetInterceptor; 1276 } 1277 } 1278 1279 @Override 1280 public int getConnectionCounter() { 1281 return connectionCounterValue; 1282 } 1283 1284 @Override 1285 public void setFromMode(FromMode fromMode) { 1286 this.fromMode = fromMode; 1287 } 1288 1289 @Override 1290 public FromMode getFromMode() { 1291 return this.fromMode; 1292 } 1293 1294 @Override 1295 protected void finalize() throws Throwable { 1296 LOGGER.fine("finalizing XMPPConnection ( " + getConnectionCounter() 1297 + "): Shutting down executor services"); 1298 try { 1299 // It's usually not a good idea to rely on finalize. But this is the easiest way to 1300 // avoid the "Smack Listener Processor" leaking. The thread(s) of the executor have a 1301 // reference to their ExecutorService which prevents the ExecutorService from being 1302 // gc'ed. It is possible that the XMPPConnection instance is gc'ed while the 1303 // listenerExecutor ExecutorService call not be gc'ed until it got shut down. 1304 executorService.shutdownNow(); 1305 cachedExecutorService.shutdown(); 1306 removeCallbacksService.shutdownNow(); 1307 singleThreadedExecutorService.shutdownNow(); 1308 } catch (Throwable t) { 1309 LOGGER.log(Level.WARNING, "finalize() threw trhowable", t); 1310 } 1311 finally { 1312 super.finalize(); 1313 } 1314 } 1315 1316 protected final void parseFeatures(XmlPullParser parser) throws XmlPullParserException, 1317 IOException, SmackException { 1318 streamFeatures.clear(); 1319 final int initialDepth = parser.getDepth(); 1320 while (true) { 1321 int eventType = parser.next(); 1322 1323 if (eventType == XmlPullParser.START_TAG && parser.getDepth() == initialDepth + 1) { 1324 ExtensionElement streamFeature = null; 1325 String name = parser.getName(); 1326 String namespace = parser.getNamespace(); 1327 switch (name) { 1328 case StartTls.ELEMENT: 1329 streamFeature = PacketParserUtils.parseStartTlsFeature(parser); 1330 break; 1331 case Mechanisms.ELEMENT: 1332 streamFeature = new Mechanisms(PacketParserUtils.parseMechanisms(parser)); 1333 break; 1334 case Bind.ELEMENT: 1335 streamFeature = Bind.Feature.INSTANCE; 1336 break; 1337 case Session.ELEMENT: 1338 streamFeature = PacketParserUtils.parseSessionFeature(parser); 1339 break; 1340 case Compress.Feature.ELEMENT: 1341 streamFeature = PacketParserUtils.parseCompressionFeature(parser); 1342 break; 1343 default: 1344 ExtensionElementProvider<ExtensionElement> provider = ProviderManager.getStreamFeatureProvider(name, namespace); 1345 if (provider != null) { 1346 streamFeature = provider.parse(parser); 1347 } 1348 break; 1349 } 1350 if (streamFeature != null) { 1351 addStreamFeature(streamFeature); 1352 } 1353 } 1354 else if (eventType == XmlPullParser.END_TAG && parser.getDepth() == initialDepth) { 1355 break; 1356 } 1357 } 1358 1359 if (hasFeature(Mechanisms.ELEMENT, Mechanisms.NAMESPACE)) { 1360 // Only proceed with SASL auth if TLS is disabled or if the server doesn't announce it 1361 if (!hasFeature(StartTls.ELEMENT, StartTls.NAMESPACE) 1362 || config.getSecurityMode() == SecurityMode.disabled) { 1363 saslFeatureReceived.reportSuccess(); 1364 } 1365 } 1366 1367 // If the server reported the bind feature then we are that that we did SASL and maybe 1368 // STARTTLS. We can then report that the last 'stream:features' have been parsed 1369 if (hasFeature(Bind.ELEMENT, Bind.NAMESPACE)) { 1370 if (!hasFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE) 1371 || !config.isCompressionEnabled()) { 1372 // This was was last features from the server is either it did not contain 1373 // compression or if we disabled it 1374 lastFeaturesReceived.reportSuccess(); 1375 } 1376 } 1377 afterFeaturesReceived(); 1378 } 1379 1380 protected void afterFeaturesReceived() throws SecurityRequiredException, NotConnectedException { 1381 // Default implementation does nothing 1382 } 1383 1384 @SuppressWarnings("unchecked") 1385 @Override 1386 public <F extends ExtensionElement> F getFeature(String element, String namespace) { 1387 return (F) streamFeatures.get(XmppStringUtils.generateKey(element, namespace)); 1388 } 1389 1390 @Override 1391 public boolean hasFeature(String element, String namespace) { 1392 return getFeature(element, namespace) != null; 1393 } 1394 1395 private void addStreamFeature(ExtensionElement feature) { 1396 String key = XmppStringUtils.generateKey(feature.getElementName(), feature.getNamespace()); 1397 streamFeatures.put(key, feature); 1398 } 1399 1400 @Override 1401 public void sendStanzaWithResponseCallback(Stanza stanza, StanzaFilter replyFilter, 1402 StanzaListener callback) throws NotConnectedException { 1403 sendStanzaWithResponseCallback(stanza, replyFilter, callback, null); 1404 } 1405 1406 @Override 1407 public void sendStanzaWithResponseCallback(Stanza stanza, StanzaFilter replyFilter, 1408 StanzaListener callback, ExceptionCallback exceptionCallback) 1409 throws NotConnectedException { 1410 sendStanzaWithResponseCallback(stanza, replyFilter, callback, exceptionCallback, 1411 getPacketReplyTimeout()); 1412 } 1413 1414 @Override 1415 public void sendStanzaWithResponseCallback(Stanza stanza, final StanzaFilter replyFilter, 1416 final StanzaListener callback, final ExceptionCallback exceptionCallback, 1417 long timeout) throws NotConnectedException { 1418 Objects.requireNonNull(stanza, "stanza must not be null"); 1419 // While Smack allows to add PacketListeners with a PacketFilter value of 'null', we 1420 // disallow it here in the async API as it makes no sense 1421 Objects.requireNonNull(replyFilter, "replyFilter must not be null"); 1422 Objects.requireNonNull(callback, "callback must not be null"); 1423 1424 final StanzaListener packetListener = new StanzaListener() { 1425 @Override 1426 public void processPacket(Stanza packet) throws NotConnectedException { 1427 try { 1428 XMPPErrorException.ifHasErrorThenThrow(packet); 1429 callback.processPacket(packet); 1430 } 1431 catch (XMPPErrorException e) { 1432 if (exceptionCallback != null) { 1433 exceptionCallback.processException(e); 1434 } 1435 } 1436 finally { 1437 removeAsyncStanzaListener(this); 1438 } 1439 } 1440 }; 1441 removeCallbacksService.schedule(new Runnable() { 1442 @Override 1443 public void run() { 1444 boolean removed = removeAsyncStanzaListener(packetListener); 1445 // If the packetListener got removed, then it was never run and 1446 // we never received a response, inform the exception callback 1447 if (removed && exceptionCallback != null) { 1448 exceptionCallback.processException(NoResponseException.newWith(AbstractXMPPConnection.this, replyFilter)); 1449 } 1450 } 1451 }, timeout, TimeUnit.MILLISECONDS); 1452 addAsyncStanzaListener(packetListener, replyFilter); 1453 sendStanza(stanza); 1454 } 1455 1456 @Override 1457 public void sendIqWithResponseCallback(IQ iqRequest, StanzaListener callback) 1458 throws NotConnectedException { 1459 sendIqWithResponseCallback(iqRequest, callback, null); 1460 } 1461 1462 @Override 1463 public void sendIqWithResponseCallback(IQ iqRequest, StanzaListener callback, 1464 ExceptionCallback exceptionCallback) throws NotConnectedException { 1465 sendIqWithResponseCallback(iqRequest, callback, exceptionCallback, getPacketReplyTimeout()); 1466 } 1467 1468 @Override 1469 public void sendIqWithResponseCallback(IQ iqRequest, final StanzaListener callback, 1470 final ExceptionCallback exceptionCallback, long timeout) 1471 throws NotConnectedException { 1472 StanzaFilter replyFilter = new IQReplyFilter(iqRequest, this); 1473 sendStanzaWithResponseCallback(iqRequest, replyFilter, callback, exceptionCallback, timeout); 1474 } 1475 1476 @Override 1477 public void addOneTimeSyncCallback(final StanzaListener callback, final StanzaFilter packetFilter) { 1478 final StanzaListener packetListener = new StanzaListener() { 1479 @Override 1480 public void processPacket(Stanza packet) throws NotConnectedException { 1481 try { 1482 callback.processPacket(packet); 1483 } finally { 1484 removeSyncStanzaListener(this); 1485 } 1486 } 1487 }; 1488 addSyncStanzaListener(packetListener, packetFilter); 1489 removeCallbacksService.schedule(new Runnable() { 1490 @Override 1491 public void run() { 1492 removeSyncStanzaListener(packetListener); 1493 } 1494 }, getPacketReplyTimeout(), TimeUnit.MILLISECONDS); 1495 } 1496 1497 @Override 1498 public IQRequestHandler registerIQRequestHandler(final IQRequestHandler iqRequestHandler) { 1499 final String key = XmppStringUtils.generateKey(iqRequestHandler.getElement(), iqRequestHandler.getNamespace()); 1500 switch (iqRequestHandler.getType()) { 1501 case set: 1502 synchronized (setIqRequestHandler) { 1503 return setIqRequestHandler.put(key, iqRequestHandler); 1504 } 1505 case get: 1506 synchronized (getIqRequestHandler) { 1507 return getIqRequestHandler.put(key, iqRequestHandler); 1508 } 1509 default: 1510 throw new IllegalArgumentException("Only IQ type of 'get' and 'set' allowed"); 1511 } 1512 } 1513 1514 @Override 1515 public final IQRequestHandler unregisterIQRequestHandler(IQRequestHandler iqRequestHandler) { 1516 return unregisterIQRequestHandler(iqRequestHandler.getElement(), iqRequestHandler.getNamespace(), 1517 iqRequestHandler.getType()); 1518 } 1519 1520 @Override 1521 public IQRequestHandler unregisterIQRequestHandler(String element, String namespace, IQ.Type type) { 1522 final String key = XmppStringUtils.generateKey(element, namespace); 1523 switch (type) { 1524 case set: 1525 synchronized (setIqRequestHandler) { 1526 return setIqRequestHandler.remove(key); 1527 } 1528 case get: 1529 synchronized (getIqRequestHandler) { 1530 return getIqRequestHandler.remove(key); 1531 } 1532 default: 1533 throw new IllegalArgumentException("Only IQ type of 'get' and 'set' allowed"); 1534 } 1535 } 1536 1537 private long lastStanzaReceived; 1538 1539 public long getLastStanzaReceived() { 1540 return lastStanzaReceived; 1541 } 1542 1543 /** 1544 * Install a parsing exception callback, which will be invoked once an exception is encountered while parsing a 1545 * stanza 1546 * 1547 * @param callback the callback to install 1548 */ 1549 public void setParsingExceptionCallback(ParsingExceptionCallback callback) { 1550 parsingExceptionCallback = callback; 1551 } 1552 1553 /** 1554 * Get the current active parsing exception callback. 1555 * 1556 * @return the active exception callback or null if there is none 1557 */ 1558 public ParsingExceptionCallback getParsingExceptionCallback() { 1559 return parsingExceptionCallback; 1560 } 1561 1562 protected final void asyncGo(Runnable runnable) { 1563 cachedExecutorService.execute(runnable); 1564 } 1565 1566 protected final ScheduledFuture<?> schedule(Runnable runnable, long delay, TimeUnit unit) { 1567 return removeCallbacksService.schedule(runnable, delay, unit); 1568 } 1569}