001/** 002 * 003 * Copyright 2009 Jive Software. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack; 018 019import java.io.IOException; 020import java.io.Reader; 021import java.io.Writer; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.HashMap; 025import java.util.LinkedHashMap; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Queue; 030import java.util.Set; 031import java.util.concurrent.ConcurrentLinkedQueue; 032import java.util.concurrent.CopyOnWriteArraySet; 033import java.util.concurrent.Executor; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.Executors; 036import java.util.concurrent.ScheduledExecutorService; 037import java.util.concurrent.ScheduledFuture; 038import java.util.concurrent.ThreadFactory; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.atomic.AtomicInteger; 041import java.util.concurrent.locks.Lock; 042import java.util.concurrent.locks.ReentrantLock; 043import java.util.logging.Level; 044import java.util.logging.Logger; 045 046import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode; 047import org.jivesoftware.smack.SmackConfiguration.UnknownIqRequestReplyMode; 048import org.jivesoftware.smack.SmackException.AlreadyConnectedException; 049import org.jivesoftware.smack.SmackException.AlreadyLoggedInException; 050import org.jivesoftware.smack.SmackException.NoResponseException; 051import org.jivesoftware.smack.SmackException.NotConnectedException; 052import org.jivesoftware.smack.SmackException.NotLoggedInException; 053import org.jivesoftware.smack.SmackException.ResourceBindingNotOfferedException; 054import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException; 055import org.jivesoftware.smack.SmackException.SecurityRequiredException; 056import org.jivesoftware.smack.SmackFuture.InternalSmackFuture; 057import org.jivesoftware.smack.XMPPException.StreamErrorException; 058import org.jivesoftware.smack.XMPPException.XMPPErrorException; 059import org.jivesoftware.smack.compress.packet.Compress; 060import org.jivesoftware.smack.compression.XMPPInputOutputStream; 061import org.jivesoftware.smack.debugger.SmackDebugger; 062import org.jivesoftware.smack.debugger.SmackDebuggerFactory; 063import org.jivesoftware.smack.filter.IQReplyFilter; 064import org.jivesoftware.smack.filter.StanzaFilter; 065import org.jivesoftware.smack.filter.StanzaIdFilter; 066import org.jivesoftware.smack.iqrequest.IQRequestHandler; 067import org.jivesoftware.smack.packet.Bind; 068import org.jivesoftware.smack.packet.ErrorIQ; 069import org.jivesoftware.smack.packet.ExtensionElement; 070import org.jivesoftware.smack.packet.IQ; 071import org.jivesoftware.smack.packet.Mechanisms; 072import org.jivesoftware.smack.packet.Message; 073import org.jivesoftware.smack.packet.Nonza; 074import org.jivesoftware.smack.packet.Presence; 075import org.jivesoftware.smack.packet.Session; 076import org.jivesoftware.smack.packet.Stanza; 077import org.jivesoftware.smack.packet.StanzaError; 078import org.jivesoftware.smack.packet.StartTls; 079import org.jivesoftware.smack.packet.StreamError; 080import org.jivesoftware.smack.parsing.ParsingExceptionCallback; 081import org.jivesoftware.smack.provider.ExtensionElementProvider; 082import org.jivesoftware.smack.provider.ProviderManager; 083import org.jivesoftware.smack.sasl.core.SASLAnonymous; 084import org.jivesoftware.smack.util.Async; 085import org.jivesoftware.smack.util.DNSUtil; 086import org.jivesoftware.smack.util.Objects; 087import org.jivesoftware.smack.util.PacketParserUtils; 088import org.jivesoftware.smack.util.ParserUtils; 089import org.jivesoftware.smack.util.StringUtils; 090import org.jivesoftware.smack.util.dns.HostAddress; 091 092import org.jxmpp.jid.DomainBareJid; 093import org.jxmpp.jid.EntityFullJid; 094import org.jxmpp.jid.Jid; 095import org.jxmpp.jid.parts.Resourcepart; 096import org.jxmpp.util.XmppStringUtils; 097import org.minidns.dnsname.DnsName; 098import org.xmlpull.v1.XmlPullParser; 099 100 101/** 102 * This abstract class is commonly used as super class for XMPP connection mechanisms like TCP and BOSH. Hence it 103 * provides the methods for connection state management, like {@link #connect()}, {@link #login()} and 104 * {@link #disconnect()} (which are deliberately not provided by the {@link XMPPConnection} interface). 105 * <p> 106 * <b>Note:</b> The default entry point to Smack's documentation is {@link XMPPConnection}. If you are getting started 107 * with Smack, then head over to {@link XMPPConnection} and the come back here. 108 * </p> 109 * <h2>Parsing Exceptions</h2> 110 * <p> 111 * In case a Smack parser (Provider) throws those exceptions are handled over to the {@link ParsingExceptionCallback}. A 112 * common cause for a provider throwing is illegal input, for example a non-numeric String where only Integers are 113 * allowed. Smack's <em>default behavior</em> follows the <b>"fail-hard per default"</b> principle leading to a 114 * termination of the connection on parsing exceptions. This default was chosen to make users eventually aware that they 115 * should configure their own callback and handle those exceptions to prevent the disconnect. Handle a parsing exception 116 * could be as simple as using a non-throwing no-op callback, which would cause the faulty stream element to be taken 117 * out of the stream, i.e., Smack behaves like that element was never received. 118 * </p> 119 * <p> 120 * If the parsing exception is because Smack received illegal input, then please consider informing the authors of the 121 * originating entity about that. If it was thrown because of an bug in a Smack parser, then please consider filling a 122 * bug with Smack. 123 * </p> 124 * <h3>Managing the parsing exception callback</h3> 125 * <p> 126 * The "fail-hard per default" behavior is achieved by using the 127 * {@link org.jivesoftware.smack.parsing.ExceptionThrowingCallbackWithHint} as default parsing exception callback. You 128 * can change the behavior using {@link #setParsingExceptionCallback(ParsingExceptionCallback)} to set a new callback. 129 * Use {@link org.jivesoftware.smack.SmackConfiguration#setDefaultParsingExceptionCallback(ParsingExceptionCallback)} to 130 * set the default callback. 131 * </p> 132 */ 133public abstract class AbstractXMPPConnection implements XMPPConnection { 134 private static final Logger LOGGER = Logger.getLogger(AbstractXMPPConnection.class.getName()); 135 136 /** 137 * Counter to uniquely identify connections that are created. 138 */ 139 private static final AtomicInteger connectionCounter = new AtomicInteger(0); 140 141 static { 142 // Ensure the SmackConfiguration class is loaded by calling a method in it. 143 SmackConfiguration.getVersion(); 144 } 145 146 /** 147 * A collection of ConnectionListeners which listen for connection closing 148 * and reconnection events. 149 */ 150 protected final Set<ConnectionListener> connectionListeners = 151 new CopyOnWriteArraySet<>(); 152 153 /** 154 * A collection of StanzaCollectors which collects packets for a specified filter 155 * and perform blocking and polling operations on the result queue. 156 * <p> 157 * We use a ConcurrentLinkedQueue here, because its Iterator is weakly 158 * consistent and we want {@link #invokeStanzaCollectorsAndNotifyRecvListeners(Stanza)} for-each 159 * loop to be lock free. As drawback, removing a StanzaCollector is O(n). 160 * The alternative would be a synchronized HashSet, but this would mean a 161 * synchronized block around every usage of <code>collectors</code>. 162 * </p> 163 */ 164 private final Collection<StanzaCollector> collectors = new ConcurrentLinkedQueue<>(); 165 166 /** 167 * List of PacketListeners that will be notified synchronously when a new stanza was received. 168 */ 169 private final Map<StanzaListener, ListenerWrapper> syncRecvListeners = new LinkedHashMap<>(); 170 171 /** 172 * List of PacketListeners that will be notified asynchronously when a new stanza was received. 173 */ 174 private final Map<StanzaListener, ListenerWrapper> asyncRecvListeners = new LinkedHashMap<>(); 175 176 /** 177 * List of PacketListeners that will be notified when a new stanza was sent. 178 */ 179 private final Map<StanzaListener, ListenerWrapper> sendListeners = 180 new HashMap<>(); 181 182 /** 183 * List of PacketListeners that will be notified when a new stanza is about to be 184 * sent to the server. These interceptors may modify the stanza before it is being 185 * actually sent to the server. 186 */ 187 private final Map<StanzaListener, InterceptorWrapper> interceptors = 188 new HashMap<>(); 189 190 protected final Lock connectionLock = new ReentrantLock(); 191 192 protected final Map<String, ExtensionElement> streamFeatures = new HashMap<>(); 193 194 /** 195 * The full JID of the authenticated user, as returned by the resource binding response of the server. 196 * <p> 197 * It is important that we don't infer the user from the login() arguments and the configurations service name, as, 198 * for example, when SASL External is used, the username is not given to login but taken from the 'external' 199 * certificate. 200 * </p> 201 */ 202 protected EntityFullJid user; 203 204 protected boolean connected = false; 205 206 /** 207 * The stream ID, see RFC 6120 ยง 4.7.3 208 */ 209 protected String streamId; 210 211 /** 212 * The timeout to wait for a reply in milliseconds. 213 */ 214 private long replyTimeout = SmackConfiguration.getDefaultReplyTimeout(); 215 216 /** 217 * The SmackDebugger allows to log and debug XML traffic. 218 */ 219 protected final SmackDebugger debugger; 220 221 /** 222 * The Reader which is used for the debugger. 223 */ 224 protected Reader reader; 225 226 /** 227 * The Writer which is used for the debugger. 228 */ 229 protected Writer writer; 230 231 protected final SynchronizationPoint<SmackException> tlsHandled = new SynchronizationPoint<>(this, "establishing TLS"); 232 233 /** 234 * Set to success if the last features stanza from the server has been parsed. A XMPP connection 235 * handshake can invoke multiple features stanzas, e.g. when TLS is activated a second feature 236 * stanza is send by the server. This is set to true once the last feature stanza has been 237 * parsed. 238 */ 239 protected final SynchronizationPoint<SmackException> lastFeaturesReceived = new SynchronizationPoint<>( 240 AbstractXMPPConnection.this, "last stream features received from server"); 241 242 /** 243 * Set to success if the SASL feature has been received. 244 */ 245 protected final SynchronizationPoint<XMPPException> saslFeatureReceived = new SynchronizationPoint<>( 246 AbstractXMPPConnection.this, "SASL mechanisms stream feature from server"); 247 248 /** 249 * The SASLAuthentication manager that is responsible for authenticating with the server. 250 */ 251 protected final SASLAuthentication saslAuthentication; 252 253 /** 254 * A number to uniquely identify connections that are created. This is distinct from the 255 * connection ID, which is a value sent by the server once a connection is made. 256 */ 257 protected final int connectionCounterValue = connectionCounter.getAndIncrement(); 258 259 /** 260 * Holds the initial configuration used while creating the connection. 261 */ 262 protected final ConnectionConfiguration config; 263 264 /** 265 * Defines how the from attribute of outgoing stanzas should be handled. 266 */ 267 private FromMode fromMode = FromMode.OMITTED; 268 269 protected XMPPInputOutputStream compressionHandler; 270 271 private ParsingExceptionCallback parsingExceptionCallback = SmackConfiguration.getDefaultParsingExceptionCallback(); 272 273 /** 274 * This scheduled thread pool executor is used to remove pending callbacks. 275 */ 276 protected static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor( 277 new ThreadFactory() { 278 @Override 279 public Thread newThread(Runnable runnable) { 280 Thread thread = new Thread(runnable); 281 thread.setName("Smack Scheduled Executor Service"); 282 thread.setDaemon(true); 283 return thread; 284 } 285 }); 286 287 /** 288 * A cached thread pool executor service with custom thread factory to set meaningful names on the threads and set 289 * them 'daemon'. 290 */ 291 private static final ExecutorService CACHED_EXECUTOR_SERVICE = Executors.newCachedThreadPool(new ThreadFactory() { 292 @Override 293 public Thread newThread(Runnable runnable) { 294 Thread thread = new Thread(runnable); 295 thread.setName("Smack Cached Executor"); 296 thread.setDaemon(true); 297 thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 298 @Override 299 public void uncaughtException(Thread t, Throwable e) { 300 LOGGER.log(Level.WARNING, t + " encountered uncaught exception", e); 301 } 302 }); 303 return thread; 304 } 305 }); 306 307 protected static final AsyncButOrdered<AbstractXMPPConnection> ASYNC_BUT_ORDERED = new AsyncButOrdered<>(); 308 309 /** 310 * An executor which uses {@link #asyncGoLimited(Runnable)} to limit the number of asynchronously processed runnables 311 * per connection. 312 */ 313 private final Executor limitedExcutor = new Executor() { 314 @Override 315 public void execute(Runnable runnable) { 316 asyncGoLimited(runnable); 317 } 318 }; 319 320 /** 321 * The used host to establish the connection to 322 */ 323 protected String host; 324 325 /** 326 * The used port to establish the connection to 327 */ 328 protected int port; 329 330 /** 331 * Flag that indicates if the user is currently authenticated with the server. 332 */ 333 protected boolean authenticated = false; 334 335 // TODO: Migrate to ZonedDateTime once Smack's minimum required Android SDK level is 26 (8.0, Oreo) or higher. 336 protected long authenticatedConnectionInitiallyEstablishedTimestamp; 337 338 /** 339 * Flag that indicates if the user was authenticated with the server when the connection 340 * to the server was closed (abruptly or not). 341 */ 342 protected boolean wasAuthenticated = false; 343 344 private final Map<String, IQRequestHandler> setIqRequestHandler = new HashMap<>(); 345 private final Map<String, IQRequestHandler> getIqRequestHandler = new HashMap<>(); 346 347 /** 348 * Create a new XMPPConnection to an XMPP server. 349 * 350 * @param configuration The configuration which is used to establish the connection. 351 */ 352 protected AbstractXMPPConnection(ConnectionConfiguration configuration) { 353 saslAuthentication = new SASLAuthentication(this, configuration); 354 config = configuration; 355 SmackDebuggerFactory debuggerFactory = configuration.getDebuggerFactory(); 356 if (debuggerFactory != null) { 357 debugger = debuggerFactory.create(this); 358 } else { 359 debugger = null; 360 } 361 // Notify listeners that a new connection has been established 362 for (ConnectionCreationListener listener : XMPPConnectionRegistry.getConnectionCreationListeners()) { 363 listener.connectionCreated(this); 364 } 365 } 366 367 /** 368 * Get the connection configuration used by this connection. 369 * 370 * @return the connection configuration. 371 */ 372 public ConnectionConfiguration getConfiguration() { 373 return config; 374 } 375 376 @Override 377 public DomainBareJid getXMPPServiceDomain() { 378 if (xmppServiceDomain != null) { 379 return xmppServiceDomain; 380 } 381 return config.getXMPPServiceDomain(); 382 } 383 384 @Override 385 public String getHost() { 386 return host; 387 } 388 389 @Override 390 public int getPort() { 391 return port; 392 } 393 394 @Override 395 public abstract boolean isSecureConnection(); 396 397 protected abstract void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException; 398 399 @Override 400 public abstract void sendNonza(Nonza element) throws NotConnectedException, InterruptedException; 401 402 @Override 403 public abstract boolean isUsingCompression(); 404 405 protected void initState() { 406 saslFeatureReceived.init(); 407 lastFeaturesReceived.init(); 408 tlsHandled.init(); 409 } 410 411 /** 412 * Establishes a connection to the XMPP server. It basically 413 * creates and maintains a connection to the server. 414 * <p> 415 * Listeners will be preserved from a previous connection. 416 * </p> 417 * 418 * @throws XMPPException if an error occurs on the XMPP protocol level. 419 * @throws SmackException if an error occurs somewhere else besides XMPP protocol level. 420 * @throws IOException 421 * @return a reference to this object, to chain <code>connect()</code> with <code>login()</code>. 422 * @throws InterruptedException 423 */ 424 public synchronized AbstractXMPPConnection connect() throws SmackException, IOException, XMPPException, InterruptedException { 425 // Check if not already connected 426 throwAlreadyConnectedExceptionIfAppropriate(); 427 428 // Reset the connection state 429 initState(); 430 saslAuthentication.init(); 431 streamId = null; 432 433 try { 434 // Perform the actual connection to the XMPP service 435 connectInternal(); 436 437 // If TLS is required but the server doesn't offer it, disconnect 438 // from the server and throw an error. First check if we've already negotiated TLS 439 // and are secure, however (features get parsed a second time after TLS is established). 440 if (!isSecureConnection() && getConfiguration().getSecurityMode() == SecurityMode.required) { 441 throw new SecurityRequiredByClientException(); 442 } 443 } catch (SmackException | IOException | XMPPException | InterruptedException e) { 444 instantShutdown(); 445 throw e; 446 } 447 448 // Make note of the fact that we're now connected. 449 connected = true; 450 callConnectionConnectedListener(); 451 452 return this; 453 } 454 455 /** 456 * Abstract method that concrete subclasses of XMPPConnection need to implement to perform their 457 * way of XMPP connection establishment. Implementations are required to perform an automatic 458 * login if the previous connection state was logged (authenticated). 459 * 460 * @throws SmackException 461 * @throws IOException 462 * @throws XMPPException 463 * @throws InterruptedException 464 */ 465 protected abstract void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException; 466 467 private String usedUsername, usedPassword; 468 469 /** 470 * The resourcepart used for this connection. May not be the resulting resourcepart if it's null or overridden by the XMPP service. 471 */ 472 private Resourcepart usedResource; 473 474 /** 475 * Logs in to the server using the strongest SASL mechanism supported by 476 * the server. If more than the connection's default stanza timeout elapses in each step of the 477 * authentication process without a response from the server, a 478 * {@link SmackException.NoResponseException} will be thrown. 479 * <p> 480 * Before logging in (i.e. authenticate) to the server the connection must be connected 481 * by calling {@link #connect}. 482 * </p> 483 * <p> 484 * It is possible to log in without sending an initial available presence by using 485 * {@link ConnectionConfiguration.Builder#setSendPresence(boolean)}. 486 * Finally, if you want to not pass a password and instead use a more advanced mechanism 487 * while using SASL then you may be interested in using 488 * {@link ConnectionConfiguration.Builder#setCallbackHandler(javax.security.auth.callback.CallbackHandler)}. 489 * For more advanced login settings see {@link ConnectionConfiguration}. 490 * </p> 491 * 492 * @throws XMPPException if an error occurs on the XMPP protocol level. 493 * @throws SmackException if an error occurs somewhere else besides XMPP protocol level. 494 * @throws IOException if an I/O error occurs during login. 495 * @throws InterruptedException 496 */ 497 public synchronized void login() throws XMPPException, SmackException, IOException, InterruptedException { 498 // The previously used username, password and resource take over precedence over the 499 // ones from the connection configuration 500 CharSequence username = usedUsername != null ? usedUsername : config.getUsername(); 501 String password = usedPassword != null ? usedPassword : config.getPassword(); 502 Resourcepart resource = usedResource != null ? usedResource : config.getResource(); 503 login(username, password, resource); 504 } 505 506 /** 507 * Same as {@link #login(CharSequence, String, Resourcepart)}, but takes the resource from the connection 508 * configuration. 509 * 510 * @param username 511 * @param password 512 * @throws XMPPException 513 * @throws SmackException 514 * @throws IOException 515 * @throws InterruptedException 516 * @see #login 517 */ 518 public synchronized void login(CharSequence username, String password) throws XMPPException, SmackException, 519 IOException, InterruptedException { 520 login(username, password, config.getResource()); 521 } 522 523 /** 524 * Login with the given username (authorization identity). You may omit the password if a callback handler is used. 525 * If resource is null, then the server will generate one. 526 * 527 * @param username 528 * @param password 529 * @param resource 530 * @throws XMPPException 531 * @throws SmackException 532 * @throws IOException 533 * @throws InterruptedException 534 * @see #login 535 */ 536 public synchronized void login(CharSequence username, String password, Resourcepart resource) throws XMPPException, 537 SmackException, IOException, InterruptedException { 538 if (!config.allowNullOrEmptyUsername) { 539 StringUtils.requireNotNullOrEmpty(username, "Username must not be null or empty"); 540 } 541 throwNotConnectedExceptionIfAppropriate("Did you call connect() before login()?"); 542 throwAlreadyLoggedInExceptionIfAppropriate(); 543 usedUsername = username != null ? username.toString() : null; 544 usedPassword = password; 545 usedResource = resource; 546 loginInternal(usedUsername, usedPassword, usedResource); 547 } 548 549 protected abstract void loginInternal(String username, String password, Resourcepart resource) 550 throws XMPPException, SmackException, IOException, InterruptedException; 551 552 @Override 553 public final boolean isConnected() { 554 return connected; 555 } 556 557 @Override 558 public final boolean isAuthenticated() { 559 return authenticated; 560 } 561 562 @Override 563 public final EntityFullJid getUser() { 564 return user; 565 } 566 567 @Override 568 public String getStreamId() { 569 if (!isConnected()) { 570 return null; 571 } 572 return streamId; 573 } 574 575 protected void bindResourceAndEstablishSession(Resourcepart resource) throws XMPPErrorException, 576 SmackException, InterruptedException { 577 578 // Wait until either: 579 // - the servers last features stanza has been parsed 580 // - the timeout occurs 581 LOGGER.finer("Waiting for last features to be received before continuing with resource binding"); 582 lastFeaturesReceived.checkIfSuccessOrWaitOrThrow(); 583 584 585 if (!hasFeature(Bind.ELEMENT, Bind.NAMESPACE)) { 586 // Server never offered resource binding, which is REQUIRED in XMPP client and 587 // server implementations as per RFC6120 7.2 588 throw new ResourceBindingNotOfferedException(); 589 } 590 591 // Resource binding, see RFC6120 7. 592 // Note that we can not use IQReplyFilter here, since the users full JID is not yet 593 // available. It will become available right after the resource has been successfully bound. 594 Bind bindResource = Bind.newSet(resource); 595 StanzaCollector packetCollector = createStanzaCollectorAndSend(new StanzaIdFilter(bindResource), bindResource); 596 Bind response = packetCollector.nextResultOrThrow(); 597 // Set the connections user to the result of resource binding. It is important that we don't infer the user 598 // from the login() arguments and the configurations service name, as, for example, when SASL External is used, 599 // the username is not given to login but taken from the 'external' certificate. 600 user = response.getJid(); 601 xmppServiceDomain = user.asDomainBareJid(); 602 603 Session.Feature sessionFeature = getFeature(Session.ELEMENT, Session.NAMESPACE); 604 // Only bind the session if it's announced as stream feature by the server, is not optional and not disabled 605 // For more information see http://tools.ietf.org/html/draft-cridland-xmpp-session-01 606 if (sessionFeature != null && !sessionFeature.isOptional()) { 607 Session session = new Session(); 608 packetCollector = createStanzaCollectorAndSend(new StanzaIdFilter(session), session); 609 packetCollector.nextResultOrThrow(); 610 } 611 } 612 613 protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException { 614 if (!resumed) { 615 authenticatedConnectionInitiallyEstablishedTimestamp = System.currentTimeMillis(); 616 } 617 // Indicate that we're now authenticated. 618 this.authenticated = true; 619 620 // If debugging is enabled, change the the debug window title to include the 621 // name we are now logged-in as. 622 // If DEBUG was set to true AFTER the connection was created the debugger 623 // will be null 624 if (debugger != null) { 625 debugger.userHasLogged(user); 626 } 627 callConnectionAuthenticatedListener(resumed); 628 629 // Set presence to online. It is important that this is done after 630 // callConnectionAuthenticatedListener(), as this call will also 631 // eventually load the roster. And we should load the roster before we 632 // send the initial presence. 633 if (config.isSendPresence() && !resumed) { 634 sendStanza(new Presence(Presence.Type.available)); 635 } 636 } 637 638 @Override 639 public final boolean isAnonymous() { 640 return isAuthenticated() && SASLAnonymous.NAME.equals(getUsedSaslMechansism()); 641 } 642 643 /** 644 * Get the name of the SASL mechanism that was used to authenticate this connection. This returns the name of 645 * mechanism which was used the last time this connection was authenticated, and will return <code>null</code> if 646 * this connection was not authenticated before. 647 * 648 * @return the name of the used SASL mechanism. 649 * @since 4.2 650 */ 651 public final String getUsedSaslMechansism() { 652 return saslAuthentication.getNameOfLastUsedSaslMechansism(); 653 } 654 655 private DomainBareJid xmppServiceDomain; 656 657 protected List<HostAddress> hostAddresses; 658 659 /** 660 * Populates {@link #hostAddresses} with the resolved addresses or with the configured host address. If no host 661 * address was configured and all lookups failed, for example with NX_DOMAIN, then {@link #hostAddresses} will be 662 * populated with the empty list. 663 * 664 * @return a list of host addresses where DNS (SRV) RR resolution failed. 665 */ 666 protected List<HostAddress> populateHostAddresses() { 667 List<HostAddress> failedAddresses = new LinkedList<>(); 668 if (config.hostAddress != null) { 669 hostAddresses = new ArrayList<>(1); 670 HostAddress hostAddress = new HostAddress(config.port, config.hostAddress); 671 hostAddresses.add(hostAddress); 672 } 673 else if (config.host != null) { 674 hostAddresses = new ArrayList<>(1); 675 HostAddress hostAddress = DNSUtil.getDNSResolver().lookupHostAddress(config.host, config.port, failedAddresses, config.getDnssecMode()); 676 if (hostAddress != null) { 677 hostAddresses.add(hostAddress); 678 } 679 } else { 680 // N.B.: Important to use config.serviceName and not AbstractXMPPConnection.serviceName 681 DnsName dnsName = DnsName.from(config.getXMPPServiceDomain()); 682 hostAddresses = DNSUtil.resolveXMPPServiceDomain(dnsName, failedAddresses, config.getDnssecMode()); 683 } 684 // Either the populated host addresses are not empty *or* there must be at least one failed address. 685 assert (!hostAddresses.isEmpty() || !failedAddresses.isEmpty()); 686 return failedAddresses; 687 } 688 689 protected Lock getConnectionLock() { 690 return connectionLock; 691 } 692 693 protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException { 694 throwNotConnectedExceptionIfAppropriate(null); 695 } 696 697 protected void throwNotConnectedExceptionIfAppropriate(String optionalHint) throws NotConnectedException { 698 if (!isConnected()) { 699 throw new NotConnectedException(optionalHint); 700 } 701 } 702 703 protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException { 704 if (isConnected()) { 705 throw new AlreadyConnectedException(); 706 } 707 } 708 709 protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException { 710 if (isAuthenticated()) { 711 throw new AlreadyLoggedInException(); 712 } 713 } 714 715 @Override 716 public void sendStanza(Stanza stanza) throws NotConnectedException, InterruptedException { 717 Objects.requireNonNull(stanza, "Stanza must not be null"); 718 assert (stanza instanceof Message || stanza instanceof Presence || stanza instanceof IQ); 719 720 throwNotConnectedExceptionIfAppropriate(); 721 switch (fromMode) { 722 case OMITTED: 723 stanza.setFrom((Jid) null); 724 break; 725 case USER: 726 stanza.setFrom(getUser()); 727 break; 728 case UNCHANGED: 729 default: 730 break; 731 } 732 // Invoke interceptors for the new stanza that is about to be sent. Interceptors may modify 733 // the content of the stanza. 734 firePacketInterceptors(stanza); 735 sendStanzaInternal(stanza); 736 } 737 738 /** 739 * Returns the SASLAuthentication manager that is responsible for authenticating with 740 * the server. 741 * 742 * @return the SASLAuthentication manager that is responsible for authenticating with 743 * the server. 744 */ 745 protected SASLAuthentication getSASLAuthentication() { 746 return saslAuthentication; 747 } 748 749 /** 750 * Closes the connection by setting presence to unavailable then closing the connection to 751 * the XMPP server. The XMPPConnection can still be used for connecting to the server 752 * again. 753 * 754 */ 755 public void disconnect() { 756 Presence unavailablePresence = null; 757 if (isAuthenticated()) { 758 unavailablePresence = new Presence(Presence.Type.unavailable); 759 } 760 try { 761 disconnect(unavailablePresence); 762 } 763 catch (NotConnectedException e) { 764 LOGGER.log(Level.FINEST, "Connection is already disconnected", e); 765 } 766 } 767 768 /** 769 * Closes the connection. A custom unavailable presence is sent to the server, followed 770 * by closing the stream. The XMPPConnection can still be used for connecting to the server 771 * again. A custom unavailable presence is useful for communicating offline presence 772 * information such as "On vacation". Typically, just the status text of the presence 773 * stanza is set with online information, but most XMPP servers will deliver the full 774 * presence stanza with whatever data is set. 775 * 776 * @param unavailablePresence the optional presence stanza to send during shutdown. 777 * @throws NotConnectedException 778 */ 779 public synchronized void disconnect(Presence unavailablePresence) throws NotConnectedException { 780 if (unavailablePresence != null) { 781 try { 782 sendStanza(unavailablePresence); 783 } catch (InterruptedException e) { 784 LOGGER.log(Level.FINE, 785 "Was interrupted while sending unavailable presence. Continuing to disconnect the connection", 786 e); 787 } 788 } 789 shutdown(); 790 callConnectionClosedListener(); 791 } 792 793 /** 794 * Shuts the current connection down. 795 */ 796 protected abstract void shutdown(); 797 798 /** 799 * Performs an unclean disconnect and shutdown of the connection. Does not send a closing stream stanza. 800 */ 801 public abstract void instantShutdown(); 802 803 @Override 804 public void addConnectionListener(ConnectionListener connectionListener) { 805 if (connectionListener == null) { 806 return; 807 } 808 connectionListeners.add(connectionListener); 809 } 810 811 @Override 812 public void removeConnectionListener(ConnectionListener connectionListener) { 813 connectionListeners.remove(connectionListener); 814 } 815 816 @Override 817 public <I extends IQ> I sendIqRequestAndWaitForResponse(IQ request) 818 throws NoResponseException, XMPPErrorException, NotConnectedException, InterruptedException { 819 StanzaCollector collector = createStanzaCollectorAndSend(request); 820 IQ resultResponse = collector.nextResultOrThrow(); 821 @SuppressWarnings("unchecked") 822 I concreteResultResponse = (I) resultResponse; 823 return concreteResultResponse; 824 } 825 826 @Override 827 public StanzaCollector createStanzaCollectorAndSend(IQ packet) throws NotConnectedException, InterruptedException { 828 StanzaFilter packetFilter = new IQReplyFilter(packet, this); 829 // Create the packet collector before sending the packet 830 StanzaCollector packetCollector = createStanzaCollectorAndSend(packetFilter, packet); 831 return packetCollector; 832 } 833 834 @Override 835 public StanzaCollector createStanzaCollectorAndSend(StanzaFilter packetFilter, Stanza packet) 836 throws NotConnectedException, InterruptedException { 837 StanzaCollector.Configuration configuration = StanzaCollector.newConfiguration() 838 .setStanzaFilter(packetFilter) 839 .setRequest(packet); 840 // Create the packet collector before sending the packet 841 StanzaCollector packetCollector = createStanzaCollector(configuration); 842 try { 843 // Now we can send the packet as the collector has been created 844 sendStanza(packet); 845 } 846 catch (InterruptedException | NotConnectedException | RuntimeException e) { 847 packetCollector.cancel(); 848 throw e; 849 } 850 return packetCollector; 851 } 852 853 @Override 854 public StanzaCollector createStanzaCollector(StanzaFilter packetFilter) { 855 StanzaCollector.Configuration configuration = StanzaCollector.newConfiguration().setStanzaFilter(packetFilter); 856 return createStanzaCollector(configuration); 857 } 858 859 @Override 860 public StanzaCollector createStanzaCollector(StanzaCollector.Configuration configuration) { 861 StanzaCollector collector = new StanzaCollector(this, configuration); 862 // Add the collector to the list of active collectors. 863 collectors.add(collector); 864 return collector; 865 } 866 867 @Override 868 public void removeStanzaCollector(StanzaCollector collector) { 869 collectors.remove(collector); 870 } 871 872 @Override 873 public void addSyncStanzaListener(StanzaListener packetListener, StanzaFilter packetFilter) { 874 if (packetListener == null) { 875 throw new NullPointerException("Packet listener is null."); 876 } 877 ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); 878 synchronized (syncRecvListeners) { 879 syncRecvListeners.put(packetListener, wrapper); 880 } 881 } 882 883 @Override 884 public boolean removeSyncStanzaListener(StanzaListener packetListener) { 885 synchronized (syncRecvListeners) { 886 return syncRecvListeners.remove(packetListener) != null; 887 } 888 } 889 890 @Override 891 public void addAsyncStanzaListener(StanzaListener packetListener, StanzaFilter packetFilter) { 892 if (packetListener == null) { 893 throw new NullPointerException("Packet listener is null."); 894 } 895 ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); 896 synchronized (asyncRecvListeners) { 897 asyncRecvListeners.put(packetListener, wrapper); 898 } 899 } 900 901 @Override 902 public boolean removeAsyncStanzaListener(StanzaListener packetListener) { 903 synchronized (asyncRecvListeners) { 904 return asyncRecvListeners.remove(packetListener) != null; 905 } 906 } 907 908 @Deprecated 909 @Override 910 public void addPacketSendingListener(StanzaListener packetListener, StanzaFilter packetFilter) { 911 addStanzaSendingListener(packetListener, packetFilter); 912 } 913 914 @Override 915 public void addStanzaSendingListener(StanzaListener packetListener, StanzaFilter packetFilter) { 916 if (packetListener == null) { 917 throw new NullPointerException("Packet listener is null."); 918 } 919 ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); 920 synchronized (sendListeners) { 921 sendListeners.put(packetListener, wrapper); 922 } 923 } 924 925 @Deprecated 926 @Override 927 public void removePacketSendingListener(StanzaListener packetListener) { 928 removeStanzaSendingListener(packetListener); 929 } 930 931 @Override 932 public void removeStanzaSendingListener(StanzaListener packetListener) { 933 synchronized (sendListeners) { 934 sendListeners.remove(packetListener); 935 } 936 } 937 938 /** 939 * Process all stanza listeners for sending packets. 940 * <p> 941 * Compared to {@link #firePacketInterceptors(Stanza)}, the listeners will be invoked in a new thread. 942 * </p> 943 * 944 * @param packet the stanza to process. 945 */ 946 @SuppressWarnings("javadoc") 947 protected void firePacketSendingListeners(final Stanza packet) { 948 final SmackDebugger debugger = this.debugger; 949 if (debugger != null) { 950 debugger.onOutgoingStreamElement(packet); 951 } 952 953 final List<StanzaListener> listenersToNotify = new LinkedList<>(); 954 synchronized (sendListeners) { 955 for (ListenerWrapper listenerWrapper : sendListeners.values()) { 956 if (listenerWrapper.filterMatches(packet)) { 957 listenersToNotify.add(listenerWrapper.getListener()); 958 } 959 } 960 } 961 if (listenersToNotify.isEmpty()) { 962 return; 963 } 964 // Notify in a new thread, because we can 965 asyncGo(new Runnable() { 966 @Override 967 public void run() { 968 for (StanzaListener listener : listenersToNotify) { 969 try { 970 listener.processStanza(packet); 971 } 972 catch (Exception e) { 973 LOGGER.log(Level.WARNING, "Sending listener threw exception", e); 974 continue; 975 } 976 } 977 } 978 }); 979 } 980 981 @Deprecated 982 @Override 983 public void addPacketInterceptor(StanzaListener packetInterceptor, 984 StanzaFilter packetFilter) { 985 addStanzaInterceptor(packetInterceptor, packetFilter); 986 } 987 988 @Override 989 public void addStanzaInterceptor(StanzaListener packetInterceptor, 990 StanzaFilter packetFilter) { 991 if (packetInterceptor == null) { 992 throw new NullPointerException("Packet interceptor is null."); 993 } 994 InterceptorWrapper interceptorWrapper = new InterceptorWrapper(packetInterceptor, packetFilter); 995 synchronized (interceptors) { 996 interceptors.put(packetInterceptor, interceptorWrapper); 997 } 998 } 999 1000 @Deprecated 1001 @Override 1002 public void removePacketInterceptor(StanzaListener packetInterceptor) { 1003 removeStanzaInterceptor(packetInterceptor); 1004 } 1005 1006 @Override 1007 public void removeStanzaInterceptor(StanzaListener packetInterceptor) { 1008 synchronized (interceptors) { 1009 interceptors.remove(packetInterceptor); 1010 } 1011 } 1012 1013 /** 1014 * Process interceptors. Interceptors may modify the stanza that is about to be sent. 1015 * Since the thread that requested to send the stanza will invoke all interceptors, it 1016 * is important that interceptors perform their work as soon as possible so that the 1017 * thread does not remain blocked for a long period. 1018 * 1019 * @param packet the stanza that is going to be sent to the server 1020 */ 1021 private void firePacketInterceptors(Stanza packet) { 1022 List<StanzaListener> interceptorsToInvoke = new LinkedList<>(); 1023 synchronized (interceptors) { 1024 for (InterceptorWrapper interceptorWrapper : interceptors.values()) { 1025 if (interceptorWrapper.filterMatches(packet)) { 1026 interceptorsToInvoke.add(interceptorWrapper.getInterceptor()); 1027 } 1028 } 1029 } 1030 for (StanzaListener interceptor : interceptorsToInvoke) { 1031 try { 1032 interceptor.processStanza(packet); 1033 } catch (Exception e) { 1034 LOGGER.log(Level.SEVERE, "Packet interceptor threw exception", e); 1035 } 1036 } 1037 } 1038 1039 /** 1040 * Initialize the {@link #debugger}. You can specify a customized {@link SmackDebugger} 1041 * by setup the system property <code>smack.debuggerClass</code> to the implementation. 1042 * 1043 * @throws IllegalStateException if the reader or writer isn't yet initialized. 1044 * @throws IllegalArgumentException if the SmackDebugger can't be loaded. 1045 */ 1046 protected void initDebugger() { 1047 if (reader == null || writer == null) { 1048 throw new NullPointerException("Reader or writer isn't initialized."); 1049 } 1050 // If debugging is enabled, we open a window and write out all network traffic. 1051 if (debugger != null) { 1052 // Obtain new reader and writer from the existing debugger 1053 reader = debugger.newConnectionReader(reader); 1054 writer = debugger.newConnectionWriter(writer); 1055 } 1056 } 1057 1058 @Override 1059 public long getReplyTimeout() { 1060 return replyTimeout; 1061 } 1062 1063 @Override 1064 public void setReplyTimeout(long timeout) { 1065 replyTimeout = timeout; 1066 } 1067 1068 private SmackConfiguration.UnknownIqRequestReplyMode unknownIqRequestReplyMode = SmackConfiguration.getUnknownIqRequestReplyMode(); 1069 1070 /** 1071 * Set how Smack behaves when an unknown IQ request has been received. 1072 * 1073 * @param unknownIqRequestReplyMode reply mode. 1074 */ 1075 public void setUnknownIqRequestReplyMode(UnknownIqRequestReplyMode unknownIqRequestReplyMode) { 1076 this.unknownIqRequestReplyMode = Objects.requireNonNull(unknownIqRequestReplyMode, "Mode must not be null"); 1077 } 1078 1079 protected void parseAndProcessStanza(XmlPullParser parser) throws Exception { 1080 ParserUtils.assertAtStartTag(parser); 1081 int parserDepth = parser.getDepth(); 1082 Stanza stanza = null; 1083 try { 1084 stanza = PacketParserUtils.parseStanza(parser); 1085 } 1086 catch (Exception e) { 1087 CharSequence content = PacketParserUtils.parseContentDepth(parser, 1088 parserDepth); 1089 UnparseableStanza message = new UnparseableStanza(content, e); 1090 ParsingExceptionCallback callback = getParsingExceptionCallback(); 1091 if (callback != null) { 1092 callback.handleUnparsableStanza(message); 1093 } 1094 } 1095 ParserUtils.assertAtEndTag(parser); 1096 if (stanza != null) { 1097 processStanza(stanza); 1098 } 1099 } 1100 1101 /** 1102 * Processes a stanza after it's been fully parsed by looping through the installed 1103 * stanza collectors and listeners and letting them examine the stanza to see if 1104 * they are a match with the filter. 1105 * 1106 * @param stanza the stanza to process. 1107 * @throws InterruptedException 1108 */ 1109 protected void processStanza(final Stanza stanza) throws InterruptedException { 1110 assert (stanza != null); 1111 1112 final SmackDebugger debugger = this.debugger; 1113 if (debugger != null) { 1114 debugger.onIncomingStreamElement(stanza); 1115 } 1116 1117 lastStanzaReceived = System.currentTimeMillis(); 1118 // Deliver the incoming packet to listeners. 1119 invokeStanzaCollectorsAndNotifyRecvListeners(stanza); 1120 } 1121 1122 /** 1123 * Invoke {@link StanzaCollector#processStanza(Stanza)} for every 1124 * StanzaCollector with the given packet. Also notify the receive listeners with a matching stanza filter about the packet. 1125 * <p> 1126 * This method will be invoked by the connections incoming processing thread which may be shared across multiple connections and 1127 * thus it is important that no user code, e.g. in form of a callback, is invoked by this method. For the same reason, 1128 * this method must not block for an extended period of time. 1129 * </p> 1130 * 1131 * @param packet the stanza to notify the StanzaCollectors and receive listeners about. 1132 */ 1133 protected void invokeStanzaCollectorsAndNotifyRecvListeners(final Stanza packet) { 1134 if (packet instanceof IQ) { 1135 final IQ iq = (IQ) packet; 1136 if (iq.isRequestIQ()) { 1137 final IQ iqRequest = iq; 1138 final String key = XmppStringUtils.generateKey(iq.getChildElementName(), iq.getChildElementNamespace()); 1139 IQRequestHandler iqRequestHandler; 1140 final IQ.Type type = iq.getType(); 1141 switch (type) { 1142 case set: 1143 synchronized (setIqRequestHandler) { 1144 iqRequestHandler = setIqRequestHandler.get(key); 1145 } 1146 break; 1147 case get: 1148 synchronized (getIqRequestHandler) { 1149 iqRequestHandler = getIqRequestHandler.get(key); 1150 } 1151 break; 1152 default: 1153 throw new IllegalStateException("Should only encounter IQ type 'get' or 'set'"); 1154 } 1155 if (iqRequestHandler == null) { 1156 StanzaError.Condition replyCondition; 1157 switch (unknownIqRequestReplyMode) { 1158 case doNotReply: 1159 return; 1160 case replyFeatureNotImplemented: 1161 replyCondition = StanzaError.Condition.feature_not_implemented; 1162 break; 1163 case replyServiceUnavailable: 1164 replyCondition = StanzaError.Condition.service_unavailable; 1165 break; 1166 default: 1167 throw new AssertionError(); 1168 } 1169 1170 // If the IQ stanza is of type "get" or "set" with no registered IQ request handler, then answer an 1171 // IQ of type 'error' with condition 'service-unavailable'. 1172 ErrorIQ errorIQ = IQ.createErrorResponse(iq, StanzaError.getBuilder(( 1173 replyCondition))); 1174 try { 1175 sendStanza(errorIQ); 1176 } 1177 catch (InterruptedException | NotConnectedException e) { 1178 LOGGER.log(Level.WARNING, "Exception while sending error IQ to unkown IQ request", e); 1179 } 1180 } else { 1181 Executor executorService = null; 1182 switch (iqRequestHandler.getMode()) { 1183 case sync: 1184 executorService = ASYNC_BUT_ORDERED.asExecutorFor(this); 1185 break; 1186 case async: 1187 executorService = limitedExcutor; 1188 break; 1189 } 1190 final IQRequestHandler finalIqRequestHandler = iqRequestHandler; 1191 executorService.execute(new Runnable() { 1192 @Override 1193 public void run() { 1194 IQ response = finalIqRequestHandler.handleIQRequest(iq); 1195 if (response == null) { 1196 // It is not ideal if the IQ request handler does not return an IQ response, because RFC 1197 // 6120 ยง 8.1.2 does specify that a response is mandatory. But some APIs, mostly the 1198 // file transfer one, does not always return a result, so we need to handle this case. 1199 // Also sometimes a request handler may decide that it's better to not send a response, 1200 // e.g. to avoid presence leaks. 1201 return; 1202 } 1203 1204 assert (response.getType() == IQ.Type.result || response.getType() == IQ.Type.error); 1205 1206 response.setTo(iqRequest.getFrom()); 1207 response.setStanzaId(iqRequest.getStanzaId()); 1208 try { 1209 sendStanza(response); 1210 } 1211 catch (InterruptedException | NotConnectedException e) { 1212 LOGGER.log(Level.WARNING, "Exception while sending response to IQ request", e); 1213 } 1214 } 1215 }); 1216 } 1217 // The following returns makes it impossible for packet listeners and collectors to 1218 // filter for IQ request stanzas, i.e. IQs of type 'set' or 'get'. This is the 1219 // desired behavior. 1220 return; 1221 } 1222 } 1223 1224 // First handle the async recv listeners. Note that this code is very similar to what follows a few lines below, 1225 // the only difference is that asyncRecvListeners is used here and that the packet listeners are started in 1226 // their own thread. 1227 final Collection<StanzaListener> listenersToNotify = new LinkedList<>(); 1228 synchronized (asyncRecvListeners) { 1229 for (ListenerWrapper listenerWrapper : asyncRecvListeners.values()) { 1230 if (listenerWrapper.filterMatches(packet)) { 1231 listenersToNotify.add(listenerWrapper.getListener()); 1232 } 1233 } 1234 } 1235 1236 for (final StanzaListener listener : listenersToNotify) { 1237 asyncGoLimited(new Runnable() { 1238 @Override 1239 public void run() { 1240 try { 1241 listener.processStanza(packet); 1242 } catch (Exception e) { 1243 LOGGER.log(Level.SEVERE, "Exception in async packet listener", e); 1244 } 1245 } 1246 }); 1247 } 1248 1249 // Loop through all collectors and notify the appropriate ones. 1250 for (StanzaCollector collector : collectors) { 1251 collector.processStanza(packet); 1252 } 1253 1254 // Notify the receive listeners interested in the packet 1255 listenersToNotify.clear(); 1256 synchronized (syncRecvListeners) { 1257 for (ListenerWrapper listenerWrapper : syncRecvListeners.values()) { 1258 if (listenerWrapper.filterMatches(packet)) { 1259 listenersToNotify.add(listenerWrapper.getListener()); 1260 } 1261 } 1262 } 1263 1264 // Decouple incoming stanza processing from listener invocation. Unlike async listeners, this uses a single 1265 // threaded executor service and therefore keeps the order. 1266 ASYNC_BUT_ORDERED.performAsyncButOrdered(this, new Runnable() { 1267 @Override 1268 public void run() { 1269 for (StanzaListener listener : listenersToNotify) { 1270 try { 1271 listener.processStanza(packet); 1272 } catch (NotConnectedException e) { 1273 LOGGER.log(Level.WARNING, "Got not connected exception, aborting", e); 1274 break; 1275 } catch (Exception e) { 1276 LOGGER.log(Level.SEVERE, "Exception in packet listener", e); 1277 } 1278 } 1279 } 1280 }); 1281 } 1282 1283 /** 1284 * Sets whether the connection has already logged in the server. This method assures that the 1285 * {@link #wasAuthenticated} flag is never reset once it has ever been set. 1286 * 1287 */ 1288 protected void setWasAuthenticated() { 1289 // Never reset the flag if the connection has ever been authenticated 1290 if (!wasAuthenticated) { 1291 wasAuthenticated = authenticated; 1292 } 1293 } 1294 1295 protected void callConnectionConnectedListener() { 1296 for (ConnectionListener listener : connectionListeners) { 1297 listener.connected(this); 1298 } 1299 } 1300 1301 protected void callConnectionAuthenticatedListener(boolean resumed) { 1302 for (ConnectionListener listener : connectionListeners) { 1303 try { 1304 listener.authenticated(this, resumed); 1305 } catch (Exception e) { 1306 // Catch and print any exception so we can recover 1307 // from a faulty listener and finish the shutdown process 1308 LOGGER.log(Level.SEVERE, "Exception in authenticated listener", e); 1309 } 1310 } 1311 } 1312 1313 void callConnectionClosedListener() { 1314 for (ConnectionListener listener : connectionListeners) { 1315 try { 1316 listener.connectionClosed(); 1317 } 1318 catch (Exception e) { 1319 // Catch and print any exception so we can recover 1320 // from a faulty listener and finish the shutdown process 1321 LOGGER.log(Level.SEVERE, "Error in listener while closing connection", e); 1322 } 1323 } 1324 } 1325 1326 protected void callConnectionClosedOnErrorListener(Exception e) { 1327 boolean logWarning = true; 1328 if (e instanceof StreamErrorException) { 1329 StreamErrorException see = (StreamErrorException) e; 1330 if (see.getStreamError().getCondition() == StreamError.Condition.not_authorized 1331 && wasAuthenticated) { 1332 logWarning = false; 1333 LOGGER.log(Level.FINE, 1334 "Connection closed with not-authorized stream error after it was already authenticated. The account was likely deleted/unregistered on the server"); 1335 } 1336 } 1337 if (logWarning) { 1338 LOGGER.log(Level.WARNING, "Connection " + this + " closed with error", e); 1339 } 1340 for (ConnectionListener listener : connectionListeners) { 1341 try { 1342 listener.connectionClosedOnError(e); 1343 } 1344 catch (Exception e2) { 1345 // Catch and print any exception so we can recover 1346 // from a faulty listener 1347 LOGGER.log(Level.SEVERE, "Error in listener while closing connection", e2); 1348 } 1349 } 1350 } 1351 1352 /** 1353 * A wrapper class to associate a stanza filter with a listener. 1354 */ 1355 protected static class ListenerWrapper { 1356 1357 private final StanzaListener packetListener; 1358 private final StanzaFilter packetFilter; 1359 1360 /** 1361 * Create a class which associates a stanza filter with a listener. 1362 * 1363 * @param packetListener the stanza listener. 1364 * @param packetFilter the associated filter or null if it listen for all packets. 1365 */ 1366 public ListenerWrapper(StanzaListener packetListener, StanzaFilter packetFilter) { 1367 this.packetListener = packetListener; 1368 this.packetFilter = packetFilter; 1369 } 1370 1371 public boolean filterMatches(Stanza packet) { 1372 return packetFilter == null || packetFilter.accept(packet); 1373 } 1374 1375 public StanzaListener getListener() { 1376 return packetListener; 1377 } 1378 } 1379 1380 /** 1381 * A wrapper class to associate a stanza filter with an interceptor. 1382 */ 1383 protected static class InterceptorWrapper { 1384 1385 private final StanzaListener packetInterceptor; 1386 private final StanzaFilter packetFilter; 1387 1388 /** 1389 * Create a class which associates a stanza filter with an interceptor. 1390 * 1391 * @param packetInterceptor the interceptor. 1392 * @param packetFilter the associated filter or null if it intercepts all packets. 1393 */ 1394 public InterceptorWrapper(StanzaListener packetInterceptor, StanzaFilter packetFilter) { 1395 this.packetInterceptor = packetInterceptor; 1396 this.packetFilter = packetFilter; 1397 } 1398 1399 public boolean filterMatches(Stanza packet) { 1400 return packetFilter == null || packetFilter.accept(packet); 1401 } 1402 1403 public StanzaListener getInterceptor() { 1404 return packetInterceptor; 1405 } 1406 } 1407 1408 @Override 1409 public int getConnectionCounter() { 1410 return connectionCounterValue; 1411 } 1412 1413 @Override 1414 public void setFromMode(FromMode fromMode) { 1415 this.fromMode = fromMode; 1416 } 1417 1418 @Override 1419 public FromMode getFromMode() { 1420 return this.fromMode; 1421 } 1422 1423 protected final void parseFeatures(XmlPullParser parser) throws Exception { 1424 streamFeatures.clear(); 1425 final int initialDepth = parser.getDepth(); 1426 while (true) { 1427 int eventType = parser.next(); 1428 1429 if (eventType == XmlPullParser.START_TAG && parser.getDepth() == initialDepth + 1) { 1430 ExtensionElement streamFeature = null; 1431 String name = parser.getName(); 1432 String namespace = parser.getNamespace(); 1433 switch (name) { 1434 case StartTls.ELEMENT: 1435 streamFeature = PacketParserUtils.parseStartTlsFeature(parser); 1436 break; 1437 case Mechanisms.ELEMENT: 1438 streamFeature = new Mechanisms(PacketParserUtils.parseMechanisms(parser)); 1439 break; 1440 case Bind.ELEMENT: 1441 streamFeature = Bind.Feature.INSTANCE; 1442 break; 1443 case Session.ELEMENT: 1444 streamFeature = PacketParserUtils.parseSessionFeature(parser); 1445 break; 1446 case Compress.Feature.ELEMENT: 1447 streamFeature = PacketParserUtils.parseCompressionFeature(parser); 1448 break; 1449 default: 1450 ExtensionElementProvider<ExtensionElement> provider = ProviderManager.getStreamFeatureProvider(name, namespace); 1451 if (provider != null) { 1452 streamFeature = provider.parse(parser); 1453 } 1454 break; 1455 } 1456 if (streamFeature != null) { 1457 addStreamFeature(streamFeature); 1458 } 1459 } 1460 else if (eventType == XmlPullParser.END_TAG && parser.getDepth() == initialDepth) { 1461 break; 1462 } 1463 } 1464 1465 if (hasFeature(Mechanisms.ELEMENT, Mechanisms.NAMESPACE)) { 1466 // Only proceed with SASL auth if TLS is disabled or if the server doesn't announce it 1467 if (!hasFeature(StartTls.ELEMENT, StartTls.NAMESPACE) 1468 || config.getSecurityMode() == SecurityMode.disabled) { 1469 tlsHandled.reportSuccess(); 1470 saslFeatureReceived.reportSuccess(); 1471 } 1472 } 1473 1474 // If the server reported the bind feature then we are that that we did SASL and maybe 1475 // STARTTLS. We can then report that the last 'stream:features' have been parsed 1476 if (hasFeature(Bind.ELEMENT, Bind.NAMESPACE)) { 1477 if (!hasFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE) 1478 || !config.isCompressionEnabled()) { 1479 // This was was last features from the server is either it did not contain 1480 // compression or if we disabled it 1481 lastFeaturesReceived.reportSuccess(); 1482 } 1483 } 1484 afterFeaturesReceived(); 1485 } 1486 1487 @SuppressWarnings("unused") 1488 protected void afterFeaturesReceived() throws SecurityRequiredException, NotConnectedException, InterruptedException { 1489 // Default implementation does nothing 1490 } 1491 1492 @SuppressWarnings("unchecked") 1493 @Override 1494 public <F extends ExtensionElement> F getFeature(String element, String namespace) { 1495 return (F) streamFeatures.get(XmppStringUtils.generateKey(element, namespace)); 1496 } 1497 1498 @Override 1499 public boolean hasFeature(String element, String namespace) { 1500 return getFeature(element, namespace) != null; 1501 } 1502 1503 protected void addStreamFeature(ExtensionElement feature) { 1504 String key = XmppStringUtils.generateKey(feature.getElementName(), feature.getNamespace()); 1505 streamFeatures.put(key, feature); 1506 } 1507 1508 @SuppressWarnings("deprecation") 1509 @Override 1510 public void sendStanzaWithResponseCallback(Stanza stanza, StanzaFilter replyFilter, 1511 StanzaListener callback) throws NotConnectedException, InterruptedException { 1512 sendStanzaWithResponseCallback(stanza, replyFilter, callback, null); 1513 } 1514 1515 @SuppressWarnings("deprecation") 1516 @Override 1517 public void sendStanzaWithResponseCallback(Stanza stanza, StanzaFilter replyFilter, 1518 StanzaListener callback, ExceptionCallback exceptionCallback) 1519 throws NotConnectedException, InterruptedException { 1520 sendStanzaWithResponseCallback(stanza, replyFilter, callback, exceptionCallback, 1521 getReplyTimeout()); 1522 } 1523 1524 @Override 1525 public SmackFuture<IQ, Exception> sendIqRequestAsync(IQ request) { 1526 return sendIqRequestAsync(request, getReplyTimeout()); 1527 } 1528 1529 @Override 1530 public SmackFuture<IQ, Exception> sendIqRequestAsync(IQ request, long timeout) { 1531 StanzaFilter replyFilter = new IQReplyFilter(request, this); 1532 return sendAsync(request, replyFilter, timeout); 1533 } 1534 1535 @Override 1536 public <S extends Stanza> SmackFuture<S, Exception> sendAsync(S stanza, final StanzaFilter replyFilter) { 1537 return sendAsync(stanza, replyFilter, getReplyTimeout()); 1538 } 1539 1540 @SuppressWarnings("FutureReturnValueIgnored") 1541 @Override 1542 public <S extends Stanza> SmackFuture<S, Exception> sendAsync(S stanza, final StanzaFilter replyFilter, long timeout) { 1543 Objects.requireNonNull(stanza, "stanza must not be null"); 1544 // While Smack allows to add PacketListeners with a PacketFilter value of 'null', we 1545 // disallow it here in the async API as it makes no sense 1546 Objects.requireNonNull(replyFilter, "replyFilter must not be null"); 1547 1548 final InternalSmackFuture<S, Exception> future = new InternalSmackFuture<>(); 1549 1550 final StanzaListener stanzaListener = new StanzaListener() { 1551 @Override 1552 public void processStanza(Stanza stanza) throws NotConnectedException, InterruptedException { 1553 boolean removed = removeAsyncStanzaListener(this); 1554 if (!removed) { 1555 // We lost a race against the "no response" handling runnable. Avoid calling the callback, as the 1556 // exception callback will be invoked (if any). 1557 return; 1558 } 1559 try { 1560 XMPPErrorException.ifHasErrorThenThrow(stanza); 1561 @SuppressWarnings("unchecked") 1562 S s = (S) stanza; 1563 future.setResult(s); 1564 } 1565 catch (XMPPErrorException exception) { 1566 future.setException(exception); 1567 } 1568 } 1569 }; 1570 schedule(new Runnable() { 1571 @Override 1572 public void run() { 1573 boolean removed = removeAsyncStanzaListener(stanzaListener); 1574 if (!removed) { 1575 // We lost a race against the stanza listener, he already removed itself because he received a 1576 // reply. There is nothing more to do here. 1577 return; 1578 } 1579 1580 // If the packetListener got removed, then it was never run and 1581 // we never received a response, inform the exception callback 1582 Exception exception; 1583 if (!isConnected()) { 1584 // If the connection is no longer connected, throw a not connected exception. 1585 exception = new NotConnectedException(AbstractXMPPConnection.this, replyFilter); 1586 } 1587 else { 1588 exception = NoResponseException.newWith(AbstractXMPPConnection.this, replyFilter); 1589 } 1590 future.setException(exception); 1591 } 1592 }, timeout, TimeUnit.MILLISECONDS); 1593 1594 addAsyncStanzaListener(stanzaListener, replyFilter); 1595 try { 1596 sendStanza(stanza); 1597 } 1598 catch (NotConnectedException | InterruptedException exception) { 1599 future.setException(exception); 1600 } 1601 1602 return future; 1603 } 1604 1605 @SuppressWarnings({ "FutureReturnValueIgnored", "deprecation" }) 1606 @Override 1607 public void sendStanzaWithResponseCallback(Stanza stanza, final StanzaFilter replyFilter, 1608 final StanzaListener callback, final ExceptionCallback exceptionCallback, 1609 long timeout) throws NotConnectedException, InterruptedException { 1610 Objects.requireNonNull(stanza, "stanza must not be null"); 1611 // While Smack allows to add PacketListeners with a PacketFilter value of 'null', we 1612 // disallow it here in the async API as it makes no sense 1613 Objects.requireNonNull(replyFilter, "replyFilter must not be null"); 1614 Objects.requireNonNull(callback, "callback must not be null"); 1615 1616 final StanzaListener packetListener = new StanzaListener() { 1617 @Override 1618 public void processStanza(Stanza packet) throws NotConnectedException, InterruptedException, NotLoggedInException { 1619 boolean removed = removeAsyncStanzaListener(this); 1620 if (!removed) { 1621 // We lost a race against the "no response" handling runnable. Avoid calling the callback, as the 1622 // exception callback will be invoked (if any). 1623 return; 1624 } 1625 try { 1626 XMPPErrorException.ifHasErrorThenThrow(packet); 1627 callback.processStanza(packet); 1628 } 1629 catch (XMPPErrorException e) { 1630 if (exceptionCallback != null) { 1631 exceptionCallback.processException(e); 1632 } 1633 } 1634 } 1635 }; 1636 schedule(new Runnable() { 1637 @Override 1638 public void run() { 1639 boolean removed = removeAsyncStanzaListener(packetListener); 1640 // If the packetListener got removed, then it was never run and 1641 // we never received a response, inform the exception callback 1642 if (removed && exceptionCallback != null) { 1643 Exception exception; 1644 if (!isConnected()) { 1645 // If the connection is no longer connected, throw a not connected exception. 1646 exception = new NotConnectedException(AbstractXMPPConnection.this, replyFilter); 1647 } else { 1648 exception = NoResponseException.newWith(AbstractXMPPConnection.this, replyFilter); 1649 } 1650 final Exception exceptionToProcess = exception; 1651 Async.go(new Runnable() { 1652 @Override 1653 public void run() { 1654 exceptionCallback.processException(exceptionToProcess); 1655 } 1656 }); 1657 } 1658 } 1659 }, timeout, TimeUnit.MILLISECONDS); 1660 addAsyncStanzaListener(packetListener, replyFilter); 1661 sendStanza(stanza); 1662 } 1663 1664 @SuppressWarnings("deprecation") 1665 @Override 1666 public void sendIqWithResponseCallback(IQ iqRequest, StanzaListener callback) 1667 throws NotConnectedException, InterruptedException { 1668 sendIqWithResponseCallback(iqRequest, callback, null); 1669 } 1670 1671 @SuppressWarnings("deprecation") 1672 @Override 1673 public void sendIqWithResponseCallback(IQ iqRequest, StanzaListener callback, 1674 ExceptionCallback exceptionCallback) throws NotConnectedException, InterruptedException { 1675 sendIqWithResponseCallback(iqRequest, callback, exceptionCallback, getReplyTimeout()); 1676 } 1677 1678 @SuppressWarnings("deprecation") 1679 @Override 1680 public void sendIqWithResponseCallback(IQ iqRequest, final StanzaListener callback, 1681 final ExceptionCallback exceptionCallback, long timeout) 1682 throws NotConnectedException, InterruptedException { 1683 StanzaFilter replyFilter = new IQReplyFilter(iqRequest, this); 1684 sendStanzaWithResponseCallback(iqRequest, replyFilter, callback, exceptionCallback, timeout); 1685 } 1686 1687 @SuppressWarnings("FutureReturnValueIgnored") 1688 @Override 1689 public void addOneTimeSyncCallback(final StanzaListener callback, final StanzaFilter packetFilter) { 1690 final StanzaListener packetListener = new StanzaListener() { 1691 @Override 1692 public void processStanza(Stanza packet) throws NotConnectedException, InterruptedException, NotLoggedInException { 1693 try { 1694 callback.processStanza(packet); 1695 } finally { 1696 removeSyncStanzaListener(this); 1697 } 1698 } 1699 }; 1700 addSyncStanzaListener(packetListener, packetFilter); 1701 schedule(new Runnable() { 1702 @Override 1703 public void run() { 1704 removeSyncStanzaListener(packetListener); 1705 } 1706 }, getReplyTimeout(), TimeUnit.MILLISECONDS); 1707 } 1708 1709 @Override 1710 public IQRequestHandler registerIQRequestHandler(final IQRequestHandler iqRequestHandler) { 1711 final String key = XmppStringUtils.generateKey(iqRequestHandler.getElement(), iqRequestHandler.getNamespace()); 1712 switch (iqRequestHandler.getType()) { 1713 case set: 1714 synchronized (setIqRequestHandler) { 1715 return setIqRequestHandler.put(key, iqRequestHandler); 1716 } 1717 case get: 1718 synchronized (getIqRequestHandler) { 1719 return getIqRequestHandler.put(key, iqRequestHandler); 1720 } 1721 default: 1722 throw new IllegalArgumentException("Only IQ type of 'get' and 'set' allowed"); 1723 } 1724 } 1725 1726 @Override 1727 public final IQRequestHandler unregisterIQRequestHandler(IQRequestHandler iqRequestHandler) { 1728 return unregisterIQRequestHandler(iqRequestHandler.getElement(), iqRequestHandler.getNamespace(), 1729 iqRequestHandler.getType()); 1730 } 1731 1732 @Override 1733 public IQRequestHandler unregisterIQRequestHandler(String element, String namespace, IQ.Type type) { 1734 final String key = XmppStringUtils.generateKey(element, namespace); 1735 switch (type) { 1736 case set: 1737 synchronized (setIqRequestHandler) { 1738 return setIqRequestHandler.remove(key); 1739 } 1740 case get: 1741 synchronized (getIqRequestHandler) { 1742 return getIqRequestHandler.remove(key); 1743 } 1744 default: 1745 throw new IllegalArgumentException("Only IQ type of 'get' and 'set' allowed"); 1746 } 1747 } 1748 1749 private long lastStanzaReceived; 1750 1751 @Override 1752 public long getLastStanzaReceived() { 1753 return lastStanzaReceived; 1754 } 1755 1756 /** 1757 * Get the timestamp when the connection was the first time authenticated, i.e., when the first successful login was 1758 * performed. Note that this value is not reset on disconnect, so it represents the timestamp from the last 1759 * authenticated connection. The value is also not reset on stream resumption. 1760 * 1761 * @return the timestamp or {@code null}. 1762 * @since 4.3.3 1763 */ 1764 public final long getAuthenticatedConnectionInitiallyEstablishedTimestamp() { 1765 return authenticatedConnectionInitiallyEstablishedTimestamp; 1766 } 1767 1768 /** 1769 * Install a parsing exception callback, which will be invoked once an exception is encountered while parsing a 1770 * stanza. 1771 * 1772 * @param callback the callback to install 1773 */ 1774 public void setParsingExceptionCallback(ParsingExceptionCallback callback) { 1775 parsingExceptionCallback = callback; 1776 } 1777 1778 /** 1779 * Get the current active parsing exception callback. 1780 * 1781 * @return the active exception callback or null if there is none 1782 */ 1783 public ParsingExceptionCallback getParsingExceptionCallback() { 1784 return parsingExceptionCallback; 1785 } 1786 1787 @Override 1788 public final String toString() { 1789 EntityFullJid localEndpoint = getUser(); 1790 String localEndpointString = (localEndpoint == null ? "not-authenticated" : localEndpoint.toString()); 1791 return getClass().getSimpleName() + '[' + localEndpointString + "] (" + getConnectionCounter() + ')'; 1792 } 1793 1794 /** 1795 * A queue of deferred runnables that where not executed immediately because {@link #currentAsyncRunnables} reached 1796 * {@link #maxAsyncRunnables}. Note that we use a {@code LinkedList} in order to avoid space blowups in case the 1797 * list ever becomes very big and shrinks again. 1798 */ 1799 private final Queue<Runnable> deferredAsyncRunnables = new LinkedList<>(); 1800 1801 private int deferredAsyncRunnablesCount; 1802 1803 private int deferredAsyncRunnablesCountPrevious; 1804 1805 private int maxAsyncRunnables = SmackConfiguration.getDefaultConcurrencyLevelLimit(); 1806 1807 private int currentAsyncRunnables; 1808 1809 protected void asyncGoLimited(final Runnable runnable) { 1810 Runnable wrappedRunnable = new Runnable() { 1811 @Override 1812 public void run() { 1813 runnable.run(); 1814 1815 synchronized (deferredAsyncRunnables) { 1816 Runnable defferredRunnable = deferredAsyncRunnables.poll(); 1817 if (defferredRunnable == null) { 1818 currentAsyncRunnables--; 1819 } else { 1820 deferredAsyncRunnablesCount--; 1821 asyncGo(defferredRunnable); 1822 } 1823 } 1824 } 1825 }; 1826 1827 synchronized (deferredAsyncRunnables) { 1828 if (currentAsyncRunnables < maxAsyncRunnables) { 1829 currentAsyncRunnables++; 1830 asyncGo(wrappedRunnable); 1831 } else { 1832 deferredAsyncRunnablesCount++; 1833 deferredAsyncRunnables.add(wrappedRunnable); 1834 } 1835 1836 final int HIGH_WATERMARK = 100; 1837 final int INFORM_WATERMARK = 20; 1838 1839 final int deferredAsyncRunnablesCount = this.deferredAsyncRunnablesCount; 1840 1841 if (deferredAsyncRunnablesCount >= HIGH_WATERMARK 1842 && deferredAsyncRunnablesCountPrevious < HIGH_WATERMARK) { 1843 LOGGER.log(Level.WARNING, "High watermark of " + HIGH_WATERMARK + " simultaneous executing runnables reached"); 1844 } else if (deferredAsyncRunnablesCount >= INFORM_WATERMARK 1845 && deferredAsyncRunnablesCountPrevious < INFORM_WATERMARK) { 1846 LOGGER.log(Level.INFO, INFORM_WATERMARK + " simultaneous executing runnables reached"); 1847 } 1848 1849 deferredAsyncRunnablesCountPrevious = deferredAsyncRunnablesCount; 1850 } 1851 } 1852 1853 public void setMaxAsyncOperations(int maxAsyncOperations) { 1854 if (maxAsyncOperations < 1) { 1855 throw new IllegalArgumentException("Max async operations must be greater than 0"); 1856 } 1857 1858 synchronized (deferredAsyncRunnables) { 1859 maxAsyncRunnables = maxAsyncOperations; 1860 } 1861 } 1862 1863 protected static void asyncGo(Runnable runnable) { 1864 CACHED_EXECUTOR_SERVICE.execute(runnable); 1865 } 1866 1867 protected static ScheduledFuture<?> schedule(Runnable runnable, long delay, TimeUnit unit) { 1868 return SCHEDULED_EXECUTOR_SERVICE.schedule(runnable, delay, unit); 1869 } 1870}