001/** 002 * 003 * Copyright 2009 Jive Software, 2018-2020 Florian Schmaus. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack; 018 019import java.io.IOException; 020import java.io.Reader; 021import java.io.Writer; 022import java.util.Collection; 023import java.util.HashMap; 024import java.util.Iterator; 025import java.util.LinkedHashMap; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Queue; 030import java.util.Set; 031import java.util.concurrent.ConcurrentLinkedQueue; 032import java.util.concurrent.CopyOnWriteArraySet; 033import java.util.concurrent.Executor; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.Executors; 036import java.util.concurrent.ThreadFactory; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.atomic.AtomicInteger; 039import java.util.concurrent.locks.Lock; 040import java.util.concurrent.locks.ReentrantLock; 041import java.util.logging.Level; 042import java.util.logging.Logger; 043 044import javax.net.ssl.SSLSession; 045import javax.xml.namespace.QName; 046 047import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode; 048import org.jivesoftware.smack.SmackConfiguration.UnknownIqRequestReplyMode; 049import org.jivesoftware.smack.SmackException.AlreadyConnectedException; 050import org.jivesoftware.smack.SmackException.AlreadyLoggedInException; 051import org.jivesoftware.smack.SmackException.NoResponseException; 052import org.jivesoftware.smack.SmackException.NotConnectedException; 053import org.jivesoftware.smack.SmackException.NotLoggedInException; 054import org.jivesoftware.smack.SmackException.ResourceBindingNotOfferedException; 055import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException; 056import org.jivesoftware.smack.SmackException.SecurityRequiredException; 057import org.jivesoftware.smack.SmackException.SmackSaslException; 058import org.jivesoftware.smack.SmackException.SmackWrappedException; 059import org.jivesoftware.smack.SmackFuture.InternalSmackFuture; 060import org.jivesoftware.smack.XMPPException.FailedNonzaException; 061import org.jivesoftware.smack.XMPPException.StreamErrorException; 062import org.jivesoftware.smack.XMPPException.XMPPErrorException; 063import org.jivesoftware.smack.compress.packet.Compress; 064import org.jivesoftware.smack.compression.XMPPInputOutputStream; 065import org.jivesoftware.smack.datatypes.UInt16; 066import org.jivesoftware.smack.debugger.SmackDebugger; 067import org.jivesoftware.smack.debugger.SmackDebuggerFactory; 068import org.jivesoftware.smack.filter.IQReplyFilter; 069import org.jivesoftware.smack.filter.StanzaFilter; 070import org.jivesoftware.smack.filter.StanzaIdFilter; 071import org.jivesoftware.smack.internal.SmackTlsContext; 072import org.jivesoftware.smack.iqrequest.IQRequestHandler; 073import org.jivesoftware.smack.packet.AbstractStreamOpen; 074import org.jivesoftware.smack.packet.Bind; 075import org.jivesoftware.smack.packet.ErrorIQ; 076import org.jivesoftware.smack.packet.ExtensionElement; 077import org.jivesoftware.smack.packet.FullyQualifiedElement; 078import org.jivesoftware.smack.packet.IQ; 079import org.jivesoftware.smack.packet.Mechanisms; 080import org.jivesoftware.smack.packet.Message; 081import org.jivesoftware.smack.packet.MessageBuilder; 082import org.jivesoftware.smack.packet.MessageOrPresence; 083import org.jivesoftware.smack.packet.MessageOrPresenceBuilder; 084import org.jivesoftware.smack.packet.Nonza; 085import org.jivesoftware.smack.packet.Presence; 086import org.jivesoftware.smack.packet.PresenceBuilder; 087import org.jivesoftware.smack.packet.Session; 088import org.jivesoftware.smack.packet.Stanza; 089import org.jivesoftware.smack.packet.StanzaError; 090import org.jivesoftware.smack.packet.StanzaFactory; 091import org.jivesoftware.smack.packet.StartTls; 092import org.jivesoftware.smack.packet.StreamError; 093import org.jivesoftware.smack.packet.StreamOpen; 094import org.jivesoftware.smack.packet.TopLevelStreamElement; 095import org.jivesoftware.smack.packet.XmlEnvironment; 096import org.jivesoftware.smack.packet.id.StanzaIdSource; 097import org.jivesoftware.smack.parsing.ParsingExceptionCallback; 098import org.jivesoftware.smack.parsing.SmackParsingException; 099import org.jivesoftware.smack.provider.ExtensionElementProvider; 100import org.jivesoftware.smack.provider.NonzaProvider; 101import org.jivesoftware.smack.provider.ProviderManager; 102import org.jivesoftware.smack.sasl.SASLErrorException; 103import org.jivesoftware.smack.sasl.SASLMechanism; 104import org.jivesoftware.smack.sasl.core.SASLAnonymous; 105import org.jivesoftware.smack.sasl.packet.SaslNonza; 106import org.jivesoftware.smack.util.Async; 107import org.jivesoftware.smack.util.CollectionUtil; 108import org.jivesoftware.smack.util.Consumer; 109import org.jivesoftware.smack.util.MultiMap; 110import org.jivesoftware.smack.util.Objects; 111import org.jivesoftware.smack.util.PacketParserUtils; 112import org.jivesoftware.smack.util.ParserUtils; 113import org.jivesoftware.smack.util.Predicate; 114import org.jivesoftware.smack.util.StringUtils; 115import org.jivesoftware.smack.util.Supplier; 116import org.jivesoftware.smack.xml.XmlPullParser; 117import org.jivesoftware.smack.xml.XmlPullParserException; 118 119import org.jxmpp.jid.DomainBareJid; 120import org.jxmpp.jid.EntityBareJid; 121import org.jxmpp.jid.EntityFullJid; 122import org.jxmpp.jid.Jid; 123import org.jxmpp.jid.impl.JidCreate; 124import org.jxmpp.jid.parts.Resourcepart; 125import org.jxmpp.stringprep.XmppStringprepException; 126import org.jxmpp.util.XmppStringUtils; 127 128/** 129 * This abstract class is commonly used as super class for XMPP connection mechanisms like TCP and BOSH. Hence it 130 * provides the methods for connection state management, like {@link #connect()}, {@link #login()} and 131 * {@link #disconnect()} (which are deliberately not provided by the {@link XMPPConnection} interface). 132 * <p> 133 * <b>Note:</b> The default entry point to Smack's documentation is {@link XMPPConnection}. If you are getting started 134 * with Smack, then head over to {@link XMPPConnection} and the come back here. 135 * </p> 136 * <h2>Parsing Exceptions</h2> 137 * <p> 138 * In case a Smack parser (Provider) throws those exceptions are handled over to the {@link ParsingExceptionCallback}. A 139 * common cause for a provider throwing is illegal input, for example a non-numeric String where only Integers are 140 * allowed. Smack's <em>default behavior</em> follows the <b>"fail-hard per default"</b> principle leading to a 141 * termination of the connection on parsing exceptions. This default was chosen to make users eventually aware that they 142 * should configure their own callback and handle those exceptions to prevent the disconnect. Handle a parsing exception 143 * could be as simple as using a non-throwing no-op callback, which would cause the faulty stream element to be taken 144 * out of the stream, i.e., Smack behaves like that element was never received. 145 * </p> 146 * <p> 147 * If the parsing exception is because Smack received illegal input, then please consider informing the authors of the 148 * originating entity about that. If it was thrown because of an bug in a Smack parser, then please consider filling a 149 * bug with Smack. 150 * </p> 151 * <h3>Managing the parsing exception callback</h3> 152 * <p> 153 * The "fail-hard per default" behavior is achieved by using the 154 * {@link org.jivesoftware.smack.parsing.ExceptionThrowingCallbackWithHint} as default parsing exception callback. You 155 * can change the behavior using {@link #setParsingExceptionCallback(ParsingExceptionCallback)} to set a new callback. 156 * Use {@link org.jivesoftware.smack.SmackConfiguration#setDefaultParsingExceptionCallback(ParsingExceptionCallback)} to 157 * set the default callback. 158 * </p> 159 */ 160public abstract class AbstractXMPPConnection implements XMPPConnection { 161 private static final Logger LOGGER = Logger.getLogger(AbstractXMPPConnection.class.getName()); 162 163 protected static final SmackReactor SMACK_REACTOR; 164 165 static { 166 SMACK_REACTOR = SmackReactor.getInstance(); 167 } 168 169 /** 170 * Counter to uniquely identify connections that are created. 171 */ 172 private static final AtomicInteger connectionCounter = new AtomicInteger(0); 173 174 static { 175 Smack.ensureInitialized(); 176 } 177 178 protected enum SyncPointState { 179 initial, 180 request_sent, 181 successful, 182 } 183 184 /** 185 * A collection of ConnectionListeners which listen for connection closing 186 * and reconnection events. 187 */ 188 protected final Set<ConnectionListener> connectionListeners = 189 new CopyOnWriteArraySet<>(); 190 191 /** 192 * A collection of StanzaCollectors which collects packets for a specified filter 193 * and perform blocking and polling operations on the result queue. 194 * <p> 195 * We use a ConcurrentLinkedQueue here, because its Iterator is weakly 196 * consistent and we want {@link #invokeStanzaCollectorsAndNotifyRecvListeners(Stanza)} for-each 197 * loop to be lock free. As drawback, removing a StanzaCollector is O(n). 198 * The alternative would be a synchronized HashSet, but this would mean a 199 * synchronized block around every usage of <code>collectors</code>. 200 * </p> 201 */ 202 private final Collection<StanzaCollector> collectors = new ConcurrentLinkedQueue<>(); 203 204 private final Map<StanzaListener, ListenerWrapper> recvListeners = new LinkedHashMap<>(); 205 206 /** 207 * List of PacketListeners that will be notified synchronously when a new stanza was received. 208 */ 209 private final Map<StanzaListener, ListenerWrapper> syncRecvListeners = new LinkedHashMap<>(); 210 211 /** 212 * List of PacketListeners that will be notified asynchronously when a new stanza was received. 213 */ 214 private final Map<StanzaListener, ListenerWrapper> asyncRecvListeners = new LinkedHashMap<>(); 215 216 /** 217 * List of PacketListeners that will be notified when a new stanza was sent. 218 */ 219 private final Map<StanzaListener, ListenerWrapper> sendListeners = 220 new HashMap<>(); 221 222 /** 223 * List of PacketListeners that will be notified when a new stanza is about to be 224 * sent to the server. These interceptors may modify the stanza before it is being 225 * actually sent to the server. 226 */ 227 private final Map<StanzaListener, InterceptorWrapper> interceptors = 228 new HashMap<>(); 229 230 private final Map<Consumer<MessageBuilder>, GenericInterceptorWrapper<MessageBuilder, Message>> messageInterceptors = new HashMap<>(); 231 232 private final Map<Consumer<PresenceBuilder>, GenericInterceptorWrapper<PresenceBuilder, Presence>> presenceInterceptors = new HashMap<>(); 233 234 private XmlEnvironment incomingStreamXmlEnvironment; 235 236 protected XmlEnvironment outgoingStreamXmlEnvironment; 237 238 final MultiMap<QName, NonzaCallback> nonzaCallbacksMap = new MultiMap<>(); 239 240 protected final Lock connectionLock = new ReentrantLock(); 241 242 protected final Map<QName, FullyQualifiedElement> streamFeatures = new HashMap<>(); 243 244 /** 245 * The full JID of the authenticated user, as returned by the resource binding response of the server. 246 * <p> 247 * It is important that we don't infer the user from the login() arguments and the configurations service name, as, 248 * for example, when SASL External is used, the username is not given to login but taken from the 'external' 249 * certificate. 250 * </p> 251 */ 252 protected EntityFullJid user; 253 254 protected boolean connected = false; 255 256 /** 257 * The stream ID, see RFC 6120 § 4.7.3 258 */ 259 protected String streamId; 260 261 /** 262 * The timeout to wait for a reply in milliseconds. 263 */ 264 private long replyTimeout = SmackConfiguration.getDefaultReplyTimeout(); 265 266 /** 267 * The SmackDebugger allows to log and debug XML traffic. 268 */ 269 protected final SmackDebugger debugger; 270 271 /** 272 * The Reader which is used for the debugger. 273 */ 274 protected Reader reader; 275 276 /** 277 * The Writer which is used for the debugger. 278 */ 279 protected Writer writer; 280 281 protected SmackException currentSmackException; 282 protected XMPPException currentXmppException; 283 284 protected boolean tlsHandled; 285 286 /** 287 * Set to <code>true</code> if the last features stanza from the server has been parsed. A XMPP connection 288 * handshake can invoke multiple features stanzas, e.g. when TLS is activated a second feature 289 * stanza is send by the server. This is set to true once the last feature stanza has been 290 * parsed. 291 */ 292 protected boolean lastFeaturesReceived; 293 294 /** 295 * Set to <code>true</code> if the SASL feature has been received. 296 */ 297 protected boolean saslFeatureReceived; 298 299 /** 300 * A synchronization point which is successful if this connection has received the closing 301 * stream element from the remote end-point, i.e. the server. 302 */ 303 protected boolean closingStreamReceived; 304 305 /** 306 * The SASLAuthentication manager that is responsible for authenticating with the server. 307 */ 308 private final SASLAuthentication saslAuthentication; 309 310 /** 311 * A number to uniquely identify connections that are created. This is distinct from the 312 * connection ID, which is a value sent by the server once a connection is made. 313 */ 314 protected final int connectionCounterValue = connectionCounter.getAndIncrement(); 315 316 /** 317 * Holds the initial configuration used while creating the connection. 318 */ 319 protected final ConnectionConfiguration config; 320 321 /** 322 * Defines how the from attribute of outgoing stanzas should be handled. 323 */ 324 private FromMode fromMode = FromMode.OMITTED; 325 326 protected XMPPInputOutputStream compressionHandler; 327 328 private ParsingExceptionCallback parsingExceptionCallback = SmackConfiguration.getDefaultParsingExceptionCallback(); 329 330 /** 331 * A cached thread pool executor service with custom thread factory to set meaningful names on the threads and set 332 * them 'daemon'. 333 */ 334 private static final ExecutorService CACHED_EXECUTOR_SERVICE = Executors.newCachedThreadPool(new ThreadFactory() { 335 @Override 336 public Thread newThread(Runnable runnable) { 337 Thread thread = new Thread(runnable); 338 thread.setName("Smack Cached Executor"); 339 thread.setDaemon(true); 340 thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 341 @Override 342 public void uncaughtException(Thread t, Throwable e) { 343 LOGGER.log(Level.WARNING, t + " encountered uncaught exception", e); 344 } 345 }); 346 return thread; 347 } 348 }); 349 350 protected static final AsyncButOrdered<AbstractXMPPConnection> ASYNC_BUT_ORDERED = new AsyncButOrdered<>(); 351 352 protected final AsyncButOrdered<StanzaListener> inOrderListeners = new AsyncButOrdered<>(); 353 354 /** 355 * The used host to establish the connection to 356 */ 357 protected String host; 358 359 /** 360 * The used port to establish the connection to 361 */ 362 protected UInt16 port; 363 364 /** 365 * Flag that indicates if the user is currently authenticated with the server. 366 */ 367 protected boolean authenticated = false; 368 369 // TODO: Migrate to ZonedDateTime once Smack's minimum required Android SDK level is 26 (8.0, Oreo) or higher. 370 protected long authenticatedConnectionInitiallyEstablishedTimestamp; 371 372 /** 373 * Flag that indicates if the user was authenticated with the server when the connection 374 * to the server was closed (abruptly or not). 375 */ 376 protected boolean wasAuthenticated = false; 377 378 private final Map<QName, IQRequestHandler> setIqRequestHandler = new HashMap<>(); 379 private final Map<QName, IQRequestHandler> getIqRequestHandler = new HashMap<>(); 380 381 private final StanzaFactory stanzaFactory; 382 383 /** 384 * Create a new XMPPConnection to an XMPP server. 385 * 386 * @param configuration The configuration which is used to establish the connection. 387 */ 388 protected AbstractXMPPConnection(ConnectionConfiguration configuration) { 389 saslAuthentication = new SASLAuthentication(this, configuration); 390 config = configuration; 391 392 // Install the SASL Nonza callbacks. 393 buildNonzaCallback() 394 .listenFor(SaslNonza.Challenge.class, c -> { 395 try { 396 saslAuthentication.challengeReceived(c); 397 } catch (SmackException | InterruptedException e) { 398 saslAuthentication.authenticationFailed(e); 399 } 400 }) 401 .listenFor(SaslNonza.Success.class, s -> { 402 try { 403 saslAuthentication.authenticated(s); 404 } catch (SmackSaslException | NotConnectedException | InterruptedException e) { 405 saslAuthentication.authenticationFailed(e); 406 } 407 }) 408 .listenFor(SaslNonza.SASLFailure.class, f -> saslAuthentication.authenticationFailed(f)) 409 .install(); 410 411 SmackDebuggerFactory debuggerFactory = configuration.getDebuggerFactory(); 412 if (debuggerFactory != null) { 413 debugger = debuggerFactory.create(this); 414 } else { 415 debugger = null; 416 } 417 // Notify listeners that a new connection has been established 418 for (ConnectionCreationListener listener : XMPPConnectionRegistry.getConnectionCreationListeners()) { 419 listener.connectionCreated(this); 420 } 421 422 StanzaIdSource stanzaIdSource = configuration.constructStanzaIdSource(); 423 stanzaFactory = new StanzaFactory(stanzaIdSource); 424 } 425 426 /** 427 * Get the connection configuration used by this connection. 428 * 429 * @return the connection configuration. 430 */ 431 public ConnectionConfiguration getConfiguration() { 432 return config; 433 } 434 435 @Override 436 public DomainBareJid getXMPPServiceDomain() { 437 if (xmppServiceDomain != null) { 438 return xmppServiceDomain; 439 } 440 return config.getXMPPServiceDomain(); 441 } 442 443 @Override 444 public String getHost() { 445 return host; 446 } 447 448 @Override 449 public int getPort() { 450 final UInt16 port = this.port; 451 if (port == null) { 452 return -1; 453 } 454 455 return port.intValue(); 456 } 457 458 @Override 459 public abstract boolean isSecureConnection(); 460 461 protected abstract void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException; 462 463 @Override 464 public boolean trySendStanza(Stanza stanza) throws NotConnectedException { 465 // Default implementation which falls back to sendStanza() as mentioned in the methods javadoc. May be 466 // overwritten by subclasses. 467 try { 468 sendStanza(stanza); 469 } catch (InterruptedException e) { 470 LOGGER.log(Level.FINER, 471 "Thread blocked in fallback implementation of trySendStanza(Stanza) was interrupted", e); 472 return false; 473 } 474 return true; 475 } 476 477 @Override 478 public boolean trySendStanza(Stanza stanza, long timeout, TimeUnit unit) 479 throws NotConnectedException, InterruptedException { 480 // Default implementation which falls back to sendStanza() as mentioned in the methods javadoc. May be 481 // overwritten by subclasses. 482 sendStanza(stanza); 483 return true; 484 } 485 486 @Override 487 public abstract void sendNonza(Nonza element) throws NotConnectedException, InterruptedException; 488 489 @Override 490 public abstract boolean isUsingCompression(); 491 492 protected void initState() { 493 currentSmackException = null; 494 currentXmppException = null; 495 saslFeatureReceived = lastFeaturesReceived = tlsHandled = false; 496 // TODO: We do not init closingStreamReceived here, as the integration tests use it to check if we waited for 497 // it. 498 } 499 500 /** 501 * Establishes a connection to the XMPP server. It basically 502 * creates and maintains a connection to the server. 503 * <p> 504 * Listeners will be preserved from a previous connection. 505 * </p> 506 * 507 * @throws XMPPException if an error occurs on the XMPP protocol level. 508 * @throws SmackException if an error occurs somewhere else besides XMPP protocol level. 509 * @throws IOException if an I/O error occurred. 510 * @return a reference to this object, to chain <code>connect()</code> with <code>login()</code>. 511 * @throws InterruptedException if the calling thread was interrupted. 512 */ 513 public synchronized AbstractXMPPConnection connect() throws SmackException, IOException, XMPPException, InterruptedException { 514 // Check if not already connected 515 throwAlreadyConnectedExceptionIfAppropriate(); 516 517 // Notify connection listeners that we are trying to connect 518 callConnectionConnectingListener(); 519 520 // Reset the connection state 521 initState(); 522 closingStreamReceived = false; 523 streamId = null; 524 525 // The connection should not be connected nor marked as such prior calling connectInternal(). 526 assert !connected; 527 528 try { 529 // Perform the actual connection to the XMPP service 530 connectInternal(); 531 532 // If TLS is required but the server doesn't offer it, disconnect 533 // from the server and throw an error. First check if we've already negotiated TLS 534 // and are secure, however (features get parsed a second time after TLS is established). 535 if (!isSecureConnection() && getConfiguration().getSecurityMode() == SecurityMode.required) { 536 throw new SecurityRequiredByClientException(); 537 } 538 } catch (SmackException | IOException | XMPPException | InterruptedException e) { 539 instantShutdown(); 540 throw e; 541 } 542 543 // If connectInternal() did not throw, then this connection must now be marked as connected. 544 assert connected; 545 546 callConnectionConnectedListener(); 547 548 return this; 549 } 550 551 /** 552 * Abstract method that concrete subclasses of XMPPConnection need to implement to perform their 553 * way of XMPP connection establishment. Implementations are required to perform an automatic 554 * login if the previous connection state was logged (authenticated). 555 * 556 * @throws SmackException if Smack detected an exceptional situation. 557 * @throws IOException if an I/O error occurred. 558 * @throws XMPPException if an XMPP protocol error was received. 559 * @throws InterruptedException if the calling thread was interrupted. 560 */ 561 protected abstract void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException; 562 563 private String usedUsername, usedPassword; 564 565 /** 566 * The resourcepart used for this connection. May not be the resulting resourcepart if it's null or overridden by the XMPP service. 567 */ 568 private Resourcepart usedResource; 569 570 /** 571 * Logs in to the server using the strongest SASL mechanism supported by 572 * the server. If more than the connection's default stanza timeout elapses in each step of the 573 * authentication process without a response from the server, a 574 * {@link SmackException.NoResponseException} will be thrown. 575 * <p> 576 * Before logging in (i.e. authenticate) to the server the connection must be connected 577 * by calling {@link #connect}. 578 * </p> 579 * <p> 580 * It is possible to log in without sending an initial available presence by using 581 * {@link ConnectionConfiguration.Builder#setSendPresence(boolean)}. 582 * Finally, if you want to not pass a password and instead use a more advanced mechanism 583 * while using SASL then you may be interested in using 584 * {@link ConnectionConfiguration.Builder#setCallbackHandler(javax.security.auth.callback.CallbackHandler)}. 585 * For more advanced login settings see {@link ConnectionConfiguration}. 586 * </p> 587 * 588 * @throws XMPPException if an error occurs on the XMPP protocol level. 589 * @throws SmackException if an error occurs somewhere else besides XMPP protocol level. 590 * @throws IOException if an I/O error occurs during login. 591 * @throws InterruptedException if the calling thread was interrupted. 592 */ 593 public synchronized void login() throws XMPPException, SmackException, IOException, InterruptedException { 594 // The previously used username, password and resource take over precedence over the 595 // ones from the connection configuration 596 CharSequence username = usedUsername != null ? usedUsername : config.getUsername(); 597 String password = usedPassword != null ? usedPassword : config.getPassword(); 598 Resourcepart resource = usedResource != null ? usedResource : config.getResource(); 599 login(username, password, resource); 600 } 601 602 /** 603 * Same as {@link #login(CharSequence, String, Resourcepart)}, but takes the resource from the connection 604 * configuration. 605 * 606 * @param username TODO javadoc me please 607 * @param password TODO javadoc me please 608 * @throws XMPPException if an XMPP protocol error was received. 609 * @throws SmackException if Smack detected an exceptional situation. 610 * @throws IOException if an I/O error occurred. 611 * @throws InterruptedException if the calling thread was interrupted. 612 * @see #login 613 */ 614 public synchronized void login(CharSequence username, String password) throws XMPPException, SmackException, 615 IOException, InterruptedException { 616 login(username, password, config.getResource()); 617 } 618 619 /** 620 * Login with the given username (authorization identity). You may omit the password if a callback handler is used. 621 * If resource is null, then the server will generate one. 622 * 623 * @param username TODO javadoc me please 624 * @param password TODO javadoc me please 625 * @param resource TODO javadoc me please 626 * @throws XMPPException if an XMPP protocol error was received. 627 * @throws SmackException if Smack detected an exceptional situation. 628 * @throws IOException if an I/O error occurred. 629 * @throws InterruptedException if the calling thread was interrupted. 630 * @see #login 631 */ 632 public synchronized void login(CharSequence username, String password, Resourcepart resource) throws XMPPException, 633 SmackException, IOException, InterruptedException { 634 if (!config.allowNullOrEmptyUsername) { 635 StringUtils.requireNotNullNorEmpty(username, "Username must not be null nor empty"); 636 } 637 throwNotConnectedExceptionIfAppropriate("Did you call connect() before login()?"); 638 throwAlreadyLoggedInExceptionIfAppropriate(); 639 usedUsername = username != null ? username.toString() : null; 640 usedPassword = password; 641 usedResource = resource; 642 loginInternal(usedUsername, usedPassword, usedResource); 643 } 644 645 protected abstract void loginInternal(String username, String password, Resourcepart resource) 646 throws XMPPException, SmackException, IOException, InterruptedException; 647 648 @Override 649 public final boolean isConnected() { 650 return connected; 651 } 652 653 @Override 654 public final boolean isAuthenticated() { 655 return authenticated; 656 } 657 658 @Override 659 public final EntityFullJid getUser() { 660 return user; 661 } 662 663 @Override 664 public String getStreamId() { 665 if (!isConnected()) { 666 return null; 667 } 668 return streamId; 669 } 670 671 protected final void throwCurrentConnectionException() throws SmackException, XMPPException { 672 if (currentSmackException != null) { 673 throw currentSmackException; 674 } else if (currentXmppException != null) { 675 throw currentXmppException; 676 } 677 678 throw new AssertionError("No current connection exception set, although throwCurrentException() was called"); 679 } 680 681 protected final boolean hasCurrentConnectionException() { 682 return currentSmackException != null || currentXmppException != null; 683 } 684 685 protected final void setCurrentConnectionExceptionAndNotify(Exception exception) { 686 if (exception instanceof SmackException) { 687 currentSmackException = (SmackException) exception; 688 } else if (exception instanceof XMPPException) { 689 currentXmppException = (XMPPException) exception; 690 } else { 691 currentSmackException = new SmackException.SmackWrappedException(exception); 692 } 693 694 notifyWaitingThreads(); 695 } 696 697 /** 698 * We use an extra object for {@link #notifyWaitingThreads()} and {@link #waitForConditionOrConnectionException(Supplier)}, because all state 699 * changing methods of the connection are synchronized using the connection instance as monitor. If we now would 700 * also use the connection instance for the internal process to wait for a condition, the {@link Object#wait()} 701 * would leave the monitor when it waites, which would allow for another potential call to a state changing function 702 * to proceed. 703 */ 704 private final Object internalMonitor = new Object(); 705 706 protected final void notifyWaitingThreads() { 707 synchronized (internalMonitor) { 708 internalMonitor.notifyAll(); 709 } 710 } 711 712 protected final boolean waitFor(Supplier<Boolean> condition) throws InterruptedException { 713 final long deadline = System.currentTimeMillis() + getReplyTimeout(); 714 synchronized (internalMonitor) { 715 while (!condition.get().booleanValue()) { 716 final long now = System.currentTimeMillis(); 717 if (now >= deadline) { 718 return false; 719 } 720 internalMonitor.wait(deadline - now); 721 } 722 } 723 return true; 724 } 725 726 protected final boolean waitForConditionOrConnectionException(Supplier<Boolean> condition) throws InterruptedException { 727 return waitFor(() -> condition.get().booleanValue() || hasCurrentConnectionException()); 728 } 729 730 protected final void waitForConditionOrConnectionException(Supplier<Boolean> condition, String waitFor) throws InterruptedException, NoResponseException { 731 boolean success = waitForConditionOrConnectionException(condition); 732 if (!success) { 733 throw NoResponseException.newWith(this, waitFor); 734 } 735 } 736 737 protected final void waitForConditionOrThrowConnectionException(Supplier<Boolean> condition, String waitFor) throws InterruptedException, SmackException, XMPPException { 738 waitForConditionOrConnectionException(condition, waitFor); 739 if (hasCurrentConnectionException()) { 740 throwCurrentConnectionException(); 741 } 742 } 743 744 protected Resourcepart bindResourceAndEstablishSession(Resourcepart resource) 745 throws SmackException, InterruptedException, XMPPException { 746 // Wait until either: 747 // - the servers last features stanza has been parsed 748 // - the timeout occurs 749 LOGGER.finer("Waiting for last features to be received before continuing with resource binding"); 750 waitForConditionOrThrowConnectionException(() -> lastFeaturesReceived, "last stream features received from server"); 751 752 if (!hasFeature(Bind.ELEMENT, Bind.NAMESPACE)) { 753 // Server never offered resource binding, which is REQUIRED in XMPP client and 754 // server implementations as per RFC6120 7.2 755 throw new ResourceBindingNotOfferedException(); 756 } 757 758 // Resource binding, see RFC6120 7. 759 // Note that we can not use IQReplyFilter here, since the users full JID is not yet 760 // available. It will become available right after the resource has been successfully bound. 761 Bind bindResource = Bind.newSet(resource); 762 StanzaCollector packetCollector = createStanzaCollectorAndSend(new StanzaIdFilter(bindResource), bindResource); 763 Bind response = packetCollector.nextResultOrThrow(); 764 // Set the connections user to the result of resource binding. It is important that we don't infer the user 765 // from the login() arguments and the configurations service name, as, for example, when SASL External is used, 766 // the username is not given to login but taken from the 'external' certificate. 767 user = response.getJid(); 768 xmppServiceDomain = user.asDomainBareJid(); 769 770 Session.Feature sessionFeature = getFeature(Session.Feature.class); 771 // Only bind the session if it's announced as stream feature by the server, is not optional and not disabled 772 // For more information see http://tools.ietf.org/html/draft-cridland-xmpp-session-01 773 if (sessionFeature != null && !sessionFeature.isOptional()) { 774 Session session = new Session(); 775 packetCollector = createStanzaCollectorAndSend(new StanzaIdFilter(session), session); 776 packetCollector.nextResultOrThrow(); 777 } 778 779 return response.getJid().getResourcepart(); 780 } 781 782 protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException { 783 if (!resumed) { 784 authenticatedConnectionInitiallyEstablishedTimestamp = System.currentTimeMillis(); 785 } 786 // Indicate that we're now authenticated. 787 this.authenticated = true; 788 789 // If debugging is enabled, change the the debug window title to include the 790 // name we are now logged-in as. 791 // If DEBUG was set to true AFTER the connection was created the debugger 792 // will be null 793 if (debugger != null) { 794 debugger.userHasLogged(user); 795 } 796 callConnectionAuthenticatedListener(resumed); 797 798 // Set presence to online. It is important that this is done after 799 // callConnectionAuthenticatedListener(), as this call will also 800 // eventually load the roster. And we should load the roster before we 801 // send the initial presence. 802 if (config.isSendPresence() && !resumed) { 803 Presence availablePresence = getStanzaFactory() 804 .buildPresenceStanza() 805 .ofType(Presence.Type.available) 806 .build(); 807 sendStanza(availablePresence); 808 } 809 } 810 811 @Override 812 public final boolean isAnonymous() { 813 return isAuthenticated() && SASLAnonymous.NAME.equals(getUsedSaslMechansism()); 814 } 815 816 /** 817 * Get the name of the SASL mechanism that was used to authenticate this connection. This returns the name of 818 * mechanism which was used the last time this connection was authenticated, and will return <code>null</code> if 819 * this connection was not authenticated before. 820 * 821 * @return the name of the used SASL mechanism. 822 * @since 4.2 823 */ 824 public final String getUsedSaslMechansism() { 825 return saslAuthentication.getNameOfLastUsedSaslMechansism(); 826 } 827 828 private DomainBareJid xmppServiceDomain; 829 830 protected Lock getConnectionLock() { 831 return connectionLock; 832 } 833 834 protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException { 835 throwNotConnectedExceptionIfAppropriate(null); 836 } 837 838 protected void throwNotConnectedExceptionIfAppropriate(String optionalHint) throws NotConnectedException { 839 if (!isConnected()) { 840 throw new NotConnectedException(optionalHint); 841 } 842 } 843 844 protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException { 845 if (isConnected()) { 846 throw new AlreadyConnectedException(); 847 } 848 } 849 850 protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException { 851 if (isAuthenticated()) { 852 throw new AlreadyLoggedInException(); 853 } 854 } 855 856 @Override 857 public final StanzaFactory getStanzaFactory() { 858 return stanzaFactory; 859 } 860 861 @Override 862 public final void sendStanza(Stanza stanza) throws NotConnectedException, InterruptedException { 863 Objects.requireNonNull(stanza, "Stanza must not be null"); 864 assert stanza instanceof Message || stanza instanceof Presence || stanza instanceof IQ; 865 866 throwNotConnectedExceptionIfAppropriate(); 867 switch (fromMode) { 868 case OMITTED: 869 stanza.setFrom((Jid) null); 870 break; 871 case USER: 872 stanza.setFrom(getUser()); 873 break; 874 case UNCHANGED: 875 default: 876 break; 877 } 878 // Invoke interceptors for the new stanza that is about to be sent. Interceptors may modify 879 // the content of the stanza. 880 Stanza stanzaAfterInterceptors = firePacketInterceptors(stanza); 881 sendStanzaInternal(stanzaAfterInterceptors); 882 } 883 884 /** 885 * Authenticate a connection. 886 * 887 * @param username the username that is authenticating with the server. 888 * @param password the password to send to the server. 889 * @param authzid the authorization identifier (typically null). 890 * @param sslSession the optional SSL/TLS session (if one was established) 891 * @return the used SASLMechanism. 892 * @throws XMPPErrorException if there was an XMPP error returned. 893 * @throws SASLErrorException if a SASL protocol error was returned. 894 * @throws IOException if an I/O error occurred. 895 * @throws InterruptedException if the calling thread was interrupted. 896 * @throws SmackSaslException if a SASL specific error occurred. 897 * @throws NotConnectedException if the XMPP connection is not connected. 898 * @throws NoResponseException if there was no response from the remote entity. 899 * @throws SmackWrappedException in case of an exception. 900 * @see SASLAuthentication#authenticate(String, String, EntityBareJid, SSLSession) 901 */ 902 protected final SASLMechanism authenticate(String username, String password, EntityBareJid authzid, 903 SSLSession sslSession) throws XMPPErrorException, SASLErrorException, SmackSaslException, 904 NotConnectedException, NoResponseException, IOException, InterruptedException, SmackWrappedException { 905 SASLMechanism saslMechanism = saslAuthentication.authenticate(username, password, authzid, sslSession); 906 afterSaslAuthenticationSuccess(); 907 return saslMechanism; 908 } 909 910 /** 911 * Hook for subclasses right after successful SASL authentication. RFC 6120 § 6.4.6. specifies a that the initiating 912 * entity, needs to initiate a new stream in this case. But some transports, like BOSH, requires a special handling. 913 * <p> 914 * Note that we can not reset XMPPTCPConnection's parser here, because this method is invoked by the thread calling 915 * {@link #login()}, but the parser reset has to be done within the reader thread. 916 * </p> 917 * 918 * @throws NotConnectedException if the XMPP connection is not connected. 919 * @throws InterruptedException if the calling thread was interrupted. 920 * @throws SmackWrappedException in case of an exception. 921 */ 922 protected void afterSaslAuthenticationSuccess() 923 throws NotConnectedException, InterruptedException, SmackWrappedException { 924 sendStreamOpen(); 925 } 926 927 protected final boolean isSaslAuthenticated() { 928 return saslAuthentication.authenticationSuccessful(); 929 } 930 931 /** 932 * Closes the connection by setting presence to unavailable then closing the connection to 933 * the XMPP server. The XMPPConnection can still be used for connecting to the server 934 * again. 935 * 936 */ 937 public void disconnect() { 938 Presence unavailablePresence = null; 939 if (isAuthenticated()) { 940 unavailablePresence = getStanzaFactory().buildPresenceStanza() 941 .ofType(Presence.Type.unavailable) 942 .build(); 943 } 944 try { 945 disconnect(unavailablePresence); 946 } 947 catch (NotConnectedException e) { 948 LOGGER.log(Level.FINEST, "Connection is already disconnected", e); 949 } 950 } 951 952 /** 953 * Closes the connection. A custom unavailable presence is sent to the server, followed 954 * by closing the stream. The XMPPConnection can still be used for connecting to the server 955 * again. A custom unavailable presence is useful for communicating offline presence 956 * information such as "On vacation". Typically, just the status text of the presence 957 * stanza is set with online information, but most XMPP servers will deliver the full 958 * presence stanza with whatever data is set. 959 * 960 * @param unavailablePresence the optional presence stanza to send during shutdown. 961 * @throws NotConnectedException if the XMPP connection is not connected. 962 */ 963 public synchronized void disconnect(Presence unavailablePresence) throws NotConnectedException { 964 if (unavailablePresence != null) { 965 try { 966 sendStanza(unavailablePresence); 967 } catch (InterruptedException e) { 968 LOGGER.log(Level.FINE, 969 "Was interrupted while sending unavailable presence. Continuing to disconnect the connection", 970 e); 971 } 972 } 973 shutdown(); 974 callConnectionClosedListener(); 975 } 976 977 private final Object notifyConnectionErrorMonitor = new Object(); 978 979 /** 980 * Sends out a notification that there was an error with the connection 981 * and closes the connection. 982 * 983 * @param exception the exception that causes the connection close event. 984 */ 985 protected final void notifyConnectionError(final Exception exception) { 986 synchronized (notifyConnectionErrorMonitor) { 987 if (!isConnected()) { 988 LOGGER.log(Level.INFO, "Connection was already disconnected when attempting to handle " + exception, 989 exception); 990 return; 991 } 992 993 // Note that we first have to set the current connection exception and notify waiting threads, as one of them 994 // could hold the instance lock, which we also need later when calling instantShutdown(). 995 setCurrentConnectionExceptionAndNotify(exception); 996 997 // Closes the connection temporary. A if the connection supports stream management, then a reconnection is 998 // possible. Note that a connection listener of e.g. XMPPTCPConnection will drop the SM state in 999 // case the Exception is a StreamErrorException. 1000 instantShutdown(); 1001 1002 for (StanzaCollector collector : collectors) { 1003 collector.notifyConnectionError(exception); 1004 } 1005 1006 Async.go(() -> { 1007 // Notify connection listeners of the error. 1008 callConnectionClosedOnErrorListener(exception); 1009 }, AbstractXMPPConnection.this + " callConnectionClosedOnErrorListener()"); 1010 } 1011 } 1012 1013 /** 1014 * Performs an unclean disconnect and shutdown of the connection. Does not send a closing stream stanza. 1015 */ 1016 public abstract void instantShutdown(); 1017 1018 /** 1019 * Shuts the current connection down. 1020 */ 1021 protected abstract void shutdown(); 1022 1023 protected final boolean waitForClosingStreamTagFromServer() { 1024 try { 1025 waitForConditionOrThrowConnectionException(() -> closingStreamReceived, "closing stream tag from the server"); 1026 } catch (InterruptedException | SmackException | XMPPException e) { 1027 LOGGER.log(Level.INFO, "Exception while waiting for closing stream element from the server " + this, e); 1028 return false; 1029 } 1030 return true; 1031 } 1032 1033 @Override 1034 public void addConnectionListener(ConnectionListener connectionListener) { 1035 if (connectionListener == null) { 1036 return; 1037 } 1038 connectionListeners.add(connectionListener); 1039 } 1040 1041 @Override 1042 public void removeConnectionListener(ConnectionListener connectionListener) { 1043 connectionListeners.remove(connectionListener); 1044 } 1045 1046 @Override 1047 public <I extends IQ> I sendIqRequestAndWaitForResponse(IQ request) 1048 throws NoResponseException, XMPPErrorException, NotConnectedException, InterruptedException { 1049 StanzaCollector collector = createStanzaCollectorAndSend(request); 1050 IQ resultResponse = collector.nextResultOrThrow(); 1051 @SuppressWarnings("unchecked") 1052 I concreteResultResponse = (I) resultResponse; 1053 return concreteResultResponse; 1054 } 1055 1056 @Override 1057 public StanzaCollector createStanzaCollectorAndSend(IQ packet) throws NotConnectedException, InterruptedException { 1058 StanzaFilter packetFilter = new IQReplyFilter(packet, this); 1059 // Create the packet collector before sending the packet 1060 StanzaCollector packetCollector = createStanzaCollectorAndSend(packetFilter, packet); 1061 return packetCollector; 1062 } 1063 1064 @Override 1065 public StanzaCollector createStanzaCollectorAndSend(StanzaFilter packetFilter, Stanza packet) 1066 throws NotConnectedException, InterruptedException { 1067 StanzaCollector.Configuration configuration = StanzaCollector.newConfiguration() 1068 .setStanzaFilter(packetFilter) 1069 .setRequest(packet); 1070 // Create the packet collector before sending the packet 1071 StanzaCollector packetCollector = createStanzaCollector(configuration); 1072 try { 1073 // Now we can send the packet as the collector has been created 1074 sendStanza(packet); 1075 } 1076 catch (InterruptedException | NotConnectedException | RuntimeException e) { 1077 packetCollector.cancel(); 1078 throw e; 1079 } 1080 return packetCollector; 1081 } 1082 1083 @Override 1084 public StanzaCollector createStanzaCollector(StanzaFilter packetFilter) { 1085 StanzaCollector.Configuration configuration = StanzaCollector.newConfiguration().setStanzaFilter(packetFilter); 1086 return createStanzaCollector(configuration); 1087 } 1088 1089 @Override 1090 public StanzaCollector createStanzaCollector(StanzaCollector.Configuration configuration) { 1091 StanzaCollector collector = new StanzaCollector(this, configuration); 1092 // Add the collector to the list of active collectors. 1093 collectors.add(collector); 1094 return collector; 1095 } 1096 1097 @Override 1098 public void removeStanzaCollector(StanzaCollector collector) { 1099 collectors.remove(collector); 1100 } 1101 1102 @Override 1103 public final void addStanzaListener(StanzaListener stanzaListener, StanzaFilter stanzaFilter) { 1104 if (stanzaListener == null) { 1105 throw new NullPointerException("Given stanza listener must not be null"); 1106 } 1107 ListenerWrapper wrapper = new ListenerWrapper(stanzaListener, stanzaFilter); 1108 synchronized (recvListeners) { 1109 recvListeners.put(stanzaListener, wrapper); 1110 } 1111 } 1112 1113 @Override 1114 public final boolean removeStanzaListener(StanzaListener stanzaListener) { 1115 synchronized (recvListeners) { 1116 return recvListeners.remove(stanzaListener) != null; 1117 } 1118 } 1119 1120 @Override 1121 public void addSyncStanzaListener(StanzaListener packetListener, StanzaFilter packetFilter) { 1122 if (packetListener == null) { 1123 throw new NullPointerException("Packet listener is null."); 1124 } 1125 ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); 1126 synchronized (syncRecvListeners) { 1127 syncRecvListeners.put(packetListener, wrapper); 1128 } 1129 } 1130 1131 @Override 1132 public boolean removeSyncStanzaListener(StanzaListener packetListener) { 1133 synchronized (syncRecvListeners) { 1134 return syncRecvListeners.remove(packetListener) != null; 1135 } 1136 } 1137 1138 @Override 1139 public void addAsyncStanzaListener(StanzaListener packetListener, StanzaFilter packetFilter) { 1140 if (packetListener == null) { 1141 throw new NullPointerException("Packet listener is null."); 1142 } 1143 ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); 1144 synchronized (asyncRecvListeners) { 1145 asyncRecvListeners.put(packetListener, wrapper); 1146 } 1147 } 1148 1149 @Override 1150 public boolean removeAsyncStanzaListener(StanzaListener packetListener) { 1151 synchronized (asyncRecvListeners) { 1152 return asyncRecvListeners.remove(packetListener) != null; 1153 } 1154 } 1155 1156 @Override 1157 public void addStanzaSendingListener(StanzaListener packetListener, StanzaFilter packetFilter) { 1158 if (packetListener == null) { 1159 throw new NullPointerException("Packet listener is null."); 1160 } 1161 ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); 1162 synchronized (sendListeners) { 1163 sendListeners.put(packetListener, wrapper); 1164 } 1165 } 1166 1167 @Override 1168 public void removeStanzaSendingListener(StanzaListener packetListener) { 1169 synchronized (sendListeners) { 1170 sendListeners.remove(packetListener); 1171 } 1172 } 1173 1174 /** 1175 * Process all stanza listeners for sending stanzas. 1176 * <p> 1177 * Compared to {@link #firePacketInterceptors(Stanza)}, the listeners will be invoked in a new thread. 1178 * </p> 1179 * 1180 * @param sendTopLevelStreamElement the top level stream element which just got send. 1181 */ 1182 // TODO: Rename to fireElementSendingListeners(). 1183 @SuppressWarnings("javadoc") 1184 protected void firePacketSendingListeners(final TopLevelStreamElement sendTopLevelStreamElement) { 1185 if (debugger != null) { 1186 debugger.onOutgoingStreamElement(sendTopLevelStreamElement); 1187 } 1188 1189 if (!(sendTopLevelStreamElement instanceof Stanza)) { 1190 return; 1191 } 1192 Stanza packet = (Stanza) sendTopLevelStreamElement; 1193 1194 final List<StanzaListener> listenersToNotify = new LinkedList<>(); 1195 synchronized (sendListeners) { 1196 for (ListenerWrapper listenerWrapper : sendListeners.values()) { 1197 if (listenerWrapper.filterMatches(packet)) { 1198 listenersToNotify.add(listenerWrapper.getListener()); 1199 } 1200 } 1201 } 1202 if (listenersToNotify.isEmpty()) { 1203 return; 1204 } 1205 // Notify in a new thread, because we can 1206 asyncGo(new Runnable() { 1207 @Override 1208 public void run() { 1209 for (StanzaListener listener : listenersToNotify) { 1210 try { 1211 listener.processStanza(packet); 1212 } 1213 catch (Exception e) { 1214 LOGGER.log(Level.WARNING, "Sending listener threw exception", e); 1215 continue; 1216 } 1217 } 1218 } 1219 }); 1220 } 1221 1222 @Deprecated 1223 @Override 1224 public void addStanzaInterceptor(StanzaListener packetInterceptor, 1225 StanzaFilter packetFilter) { 1226 if (packetInterceptor == null) { 1227 throw new NullPointerException("Packet interceptor is null."); 1228 } 1229 InterceptorWrapper interceptorWrapper = new InterceptorWrapper(packetInterceptor, packetFilter); 1230 synchronized (interceptors) { 1231 interceptors.put(packetInterceptor, interceptorWrapper); 1232 } 1233 } 1234 1235 @Deprecated 1236 @Override 1237 public void removeStanzaInterceptor(StanzaListener packetInterceptor) { 1238 synchronized (interceptors) { 1239 interceptors.remove(packetInterceptor); 1240 } 1241 } 1242 1243 private static <MPB extends MessageOrPresenceBuilder<MP, MPB>, MP extends MessageOrPresence<MPB>> void addInterceptor( 1244 Map<Consumer<MPB>, GenericInterceptorWrapper<MPB, MP>> interceptors, Consumer<MPB> interceptor, 1245 Predicate<MP> filter) { 1246 Objects.requireNonNull(interceptor, "Interceptor must not be null"); 1247 1248 GenericInterceptorWrapper<MPB, MP> interceptorWrapper = new GenericInterceptorWrapper<>(interceptor, filter); 1249 1250 synchronized (interceptors) { 1251 interceptors.put(interceptor, interceptorWrapper); 1252 } 1253 } 1254 1255 private static <MPB extends MessageOrPresenceBuilder<MP, MPB>, MP extends MessageOrPresence<MPB>> void removeInterceptor( 1256 Map<Consumer<MPB>, GenericInterceptorWrapper<MPB, MP>> interceptors, Consumer<MPB> interceptor) { 1257 synchronized (interceptors) { 1258 interceptors.remove(interceptor); 1259 } 1260 } 1261 1262 @Override 1263 public void addMessageInterceptor(Consumer<MessageBuilder> messageInterceptor, Predicate<Message> messageFilter) { 1264 addInterceptor(messageInterceptors, messageInterceptor, messageFilter); 1265 } 1266 1267 @Override 1268 public void removeMessageInterceptor(Consumer<MessageBuilder> messageInterceptor) { 1269 removeInterceptor(messageInterceptors, messageInterceptor); 1270 } 1271 1272 @Override 1273 public void addPresenceInterceptor(Consumer<PresenceBuilder> presenceInterceptor, 1274 Predicate<Presence> presenceFilter) { 1275 addInterceptor(presenceInterceptors, presenceInterceptor, presenceFilter); 1276 } 1277 1278 @Override 1279 public void removePresenceInterceptor(Consumer<PresenceBuilder> presenceInterceptor) { 1280 removeInterceptor(presenceInterceptors, presenceInterceptor); 1281 } 1282 1283 private static <MPB extends MessageOrPresenceBuilder<MP, MPB>, MP extends MessageOrPresence<MPB>> MP fireMessageOrPresenceInterceptors( 1284 MP messageOrPresence, Map<Consumer<MPB>, GenericInterceptorWrapper<MPB, MP>> interceptors) { 1285 List<Consumer<MPB>> interceptorsToInvoke = new LinkedList<>(); 1286 synchronized (interceptors) { 1287 for (GenericInterceptorWrapper<MPB, MP> interceptorWrapper : interceptors.values()) { 1288 if (interceptorWrapper.filterMatches(messageOrPresence)) { 1289 Consumer<MPB> interceptor = interceptorWrapper.getInterceptor(); 1290 interceptorsToInvoke.add(interceptor); 1291 } 1292 } 1293 } 1294 1295 // Avoid transforming the stanza to a builder if there is no interceptor. 1296 if (interceptorsToInvoke.isEmpty()) { 1297 return messageOrPresence; 1298 } 1299 1300 MPB builder = messageOrPresence.asBuilder(); 1301 for (Consumer<MPB> interceptor : interceptorsToInvoke) { 1302 interceptor.accept(builder); 1303 } 1304 1305 // Now that the interceptors have (probably) modified the stanza in its builder form, we need to re-assemble it. 1306 messageOrPresence = builder.build(); 1307 return messageOrPresence; 1308 } 1309 1310 /** 1311 * Process interceptors. Interceptors may modify the stanza that is about to be sent. 1312 * Since the thread that requested to send the stanza will invoke all interceptors, it 1313 * is important that interceptors perform their work as soon as possible so that the 1314 * thread does not remain blocked for a long period. 1315 * 1316 * @param packet the stanza that is going to be sent to the server. 1317 * @return the, potentially modified stanza, after the interceptors are run. 1318 */ 1319 private Stanza firePacketInterceptors(Stanza packet) { 1320 List<StanzaListener> interceptorsToInvoke = new LinkedList<>(); 1321 synchronized (interceptors) { 1322 for (InterceptorWrapper interceptorWrapper : interceptors.values()) { 1323 if (interceptorWrapper.filterMatches(packet)) { 1324 interceptorsToInvoke.add(interceptorWrapper.getInterceptor()); 1325 } 1326 } 1327 } 1328 for (StanzaListener interceptor : interceptorsToInvoke) { 1329 try { 1330 interceptor.processStanza(packet); 1331 } catch (Exception e) { 1332 LOGGER.log(Level.SEVERE, "Packet interceptor threw exception", e); 1333 } 1334 } 1335 1336 final Stanza stanzaAfterInterceptors; 1337 if (packet instanceof Message) { 1338 Message message = (Message) packet; 1339 stanzaAfterInterceptors = fireMessageOrPresenceInterceptors(message, messageInterceptors); 1340 } 1341 else if (packet instanceof Presence) { 1342 Presence presence = (Presence) packet; 1343 stanzaAfterInterceptors = fireMessageOrPresenceInterceptors(presence, presenceInterceptors); 1344 } else { 1345 // We do not (yet) support interceptors for IQ stanzas. 1346 assert packet instanceof IQ; 1347 stanzaAfterInterceptors = packet; 1348 } 1349 1350 return stanzaAfterInterceptors; 1351 } 1352 1353 /** 1354 * Initialize the {@link #debugger}. You can specify a customized {@link SmackDebugger} 1355 * by setup the system property <code>smack.debuggerClass</code> to the implementation. 1356 * 1357 * @throws IllegalStateException if the reader or writer isn't yet initialized. 1358 * @throws IllegalArgumentException if the SmackDebugger can't be loaded. 1359 */ 1360 protected void initDebugger() { 1361 if (reader == null || writer == null) { 1362 throw new NullPointerException("Reader or writer isn't initialized."); 1363 } 1364 // If debugging is enabled, we open a window and write out all network traffic. 1365 if (debugger != null) { 1366 // Obtain new reader and writer from the existing debugger 1367 reader = debugger.newConnectionReader(reader); 1368 writer = debugger.newConnectionWriter(writer); 1369 } 1370 } 1371 1372 @Override 1373 public long getReplyTimeout() { 1374 return replyTimeout; 1375 } 1376 1377 @Override 1378 public void setReplyTimeout(long timeout) { 1379 if (Long.MAX_VALUE - System.currentTimeMillis() < timeout) { 1380 throw new IllegalArgumentException("Extremely long reply timeout"); 1381 } 1382 else { 1383 replyTimeout = timeout; 1384 } 1385 } 1386 1387 private SmackConfiguration.UnknownIqRequestReplyMode unknownIqRequestReplyMode = SmackConfiguration.getUnknownIqRequestReplyMode(); 1388 1389 /** 1390 * Set how Smack behaves when an unknown IQ request has been received. 1391 * 1392 * @param unknownIqRequestReplyMode reply mode. 1393 */ 1394 public void setUnknownIqRequestReplyMode(UnknownIqRequestReplyMode unknownIqRequestReplyMode) { 1395 this.unknownIqRequestReplyMode = Objects.requireNonNull(unknownIqRequestReplyMode, "Mode must not be null"); 1396 } 1397 1398 protected final NonzaCallback.Builder buildNonzaCallback() { 1399 return new NonzaCallback.Builder(this); 1400 } 1401 1402 protected <SN extends Nonza, FN extends Nonza> SN sendAndWaitForResponse(Nonza nonza, Class<SN> successNonzaClass, 1403 Class<FN> failedNonzaClass) 1404 throws NoResponseException, NotConnectedException, InterruptedException, FailedNonzaException { 1405 NonzaCallback.Builder builder = buildNonzaCallback(); 1406 SN successNonza = NonzaCallback.sendAndWaitForResponse(builder, nonza, successNonzaClass, failedNonzaClass); 1407 return successNonza; 1408 } 1409 1410 protected final void parseAndProcessNonza(XmlPullParser parser) throws IOException, XmlPullParserException, SmackParsingException { 1411 ParserUtils.assertAtStartTag(parser); 1412 1413 final int initialDepth = parser.getDepth(); 1414 final String element = parser.getName(); 1415 final String namespace = parser.getNamespace(); 1416 final QName key = new QName(namespace, element); 1417 1418 NonzaProvider<? extends Nonza> nonzaProvider = ProviderManager.getNonzaProvider(key); 1419 if (nonzaProvider == null) { 1420 LOGGER.severe("Unknown nonza: " + key); 1421 ParserUtils.forwardToEndTagOfDepth(parser, initialDepth); 1422 return; 1423 } 1424 1425 List<NonzaCallback> nonzaCallbacks; 1426 synchronized (nonzaCallbacksMap) { 1427 nonzaCallbacks = nonzaCallbacksMap.getAll(key); 1428 nonzaCallbacks = CollectionUtil.newListWith(nonzaCallbacks); 1429 } 1430 if (nonzaCallbacks == null) { 1431 LOGGER.info("No nonza callback for " + key); 1432 ParserUtils.forwardToEndTagOfDepth(parser, initialDepth); 1433 return; 1434 } 1435 1436 Nonza nonza = nonzaProvider.parse(parser, incomingStreamXmlEnvironment); 1437 1438 for (NonzaCallback nonzaCallback : nonzaCallbacks) { 1439 nonzaCallback.onNonzaReceived(nonza); 1440 } 1441 } 1442 1443 protected void parseAndProcessStanza(XmlPullParser parser) 1444 throws XmlPullParserException, IOException, InterruptedException { 1445 ParserUtils.assertAtStartTag(parser); 1446 int parserDepth = parser.getDepth(); 1447 Stanza stanza = null; 1448 try { 1449 stanza = PacketParserUtils.parseStanza(parser, incomingStreamXmlEnvironment); 1450 } 1451 catch (XmlPullParserException | SmackParsingException | IOException | IllegalArgumentException e) { 1452 CharSequence content = PacketParserUtils.parseContentDepth(parser, 1453 parserDepth); 1454 UnparseableStanza message = new UnparseableStanza(content, e); 1455 ParsingExceptionCallback callback = getParsingExceptionCallback(); 1456 if (callback != null) { 1457 callback.handleUnparsableStanza(message); 1458 } 1459 } 1460 ParserUtils.assertAtEndTag(parser); 1461 if (stanza != null) { 1462 processStanza(stanza); 1463 } 1464 } 1465 1466 /** 1467 * Processes a stanza after it's been fully parsed by looping through the installed 1468 * stanza collectors and listeners and letting them examine the stanza to see if 1469 * they are a match with the filter. 1470 * 1471 * @param stanza the stanza to process. 1472 * @throws InterruptedException if the calling thread was interrupted. 1473 */ 1474 protected void processStanza(final Stanza stanza) throws InterruptedException { 1475 assert stanza != null; 1476 1477 final SmackDebugger debugger = this.debugger; 1478 if (debugger != null) { 1479 debugger.onIncomingStreamElement(stanza); 1480 } 1481 1482 lastStanzaReceived = System.currentTimeMillis(); 1483 // Deliver the incoming packet to listeners. 1484 invokeStanzaCollectorsAndNotifyRecvListeners(stanza); 1485 } 1486 1487 /** 1488 * Invoke {@link StanzaCollector#processStanza(Stanza)} for every 1489 * StanzaCollector with the given packet. Also notify the receive listeners with a matching stanza filter about the packet. 1490 * <p> 1491 * This method will be invoked by the connections incoming processing thread which may be shared across multiple connections and 1492 * thus it is important that no user code, e.g. in form of a callback, is invoked by this method. For the same reason, 1493 * this method must not block for an extended period of time. 1494 * </p> 1495 * 1496 * @param packet the stanza to notify the StanzaCollectors and receive listeners about. 1497 */ 1498 protected void invokeStanzaCollectorsAndNotifyRecvListeners(final Stanza packet) { 1499 if (packet instanceof IQ) { 1500 final IQ iq = (IQ) packet; 1501 if (iq.isRequestIQ()) { 1502 final IQ iqRequest = iq; 1503 final QName key = iqRequest.getChildElementQName(); 1504 IQRequestHandler iqRequestHandler; 1505 final IQ.Type type = iq.getType(); 1506 switch (type) { 1507 case set: 1508 synchronized (setIqRequestHandler) { 1509 iqRequestHandler = setIqRequestHandler.get(key); 1510 } 1511 break; 1512 case get: 1513 synchronized (getIqRequestHandler) { 1514 iqRequestHandler = getIqRequestHandler.get(key); 1515 } 1516 break; 1517 default: 1518 throw new IllegalStateException("Should only encounter IQ type 'get' or 'set'"); 1519 } 1520 if (iqRequestHandler == null) { 1521 StanzaError.Condition replyCondition; 1522 switch (unknownIqRequestReplyMode) { 1523 case doNotReply: 1524 return; 1525 case replyFeatureNotImplemented: 1526 replyCondition = StanzaError.Condition.feature_not_implemented; 1527 break; 1528 case replyServiceUnavailable: 1529 replyCondition = StanzaError.Condition.service_unavailable; 1530 break; 1531 default: 1532 throw new AssertionError(); 1533 } 1534 1535 // If the IQ stanza is of type "get" or "set" with no registered IQ request handler, then answer an 1536 // IQ of type 'error' with condition 'service-unavailable'. 1537 final ErrorIQ errorIQ = IQ.createErrorResponse(iq, StanzaError.getBuilder( 1538 replyCondition).build()); 1539 // Use async sendStanza() here, since if sendStanza() would block, then some connections, e.g. 1540 // XmppNioTcpConnection, would deadlock, as this operation is performed in the same thread that is 1541 asyncGo(() -> { 1542 try { 1543 sendStanza(errorIQ); 1544 } 1545 catch (InterruptedException | NotConnectedException e) { 1546 LOGGER.log(Level.WARNING, "Exception while sending error IQ to unkown IQ request", e); 1547 } 1548 }); 1549 } else { 1550 Executor executorService = null; 1551 switch (iqRequestHandler.getMode()) { 1552 case sync: 1553 executorService = ASYNC_BUT_ORDERED.asExecutorFor(this); 1554 break; 1555 case async: 1556 executorService = this::asyncGoLimited; 1557 break; 1558 } 1559 final IQRequestHandler finalIqRequestHandler = iqRequestHandler; 1560 executorService.execute(new Runnable() { 1561 @Override 1562 public void run() { 1563 IQ response = finalIqRequestHandler.handleIQRequest(iq); 1564 if (response == null) { 1565 // It is not ideal if the IQ request handler does not return an IQ response, because RFC 1566 // 6120 § 8.1.2 does specify that a response is mandatory. But some APIs, mostly the 1567 // file transfer one, does not always return a result, so we need to handle this case. 1568 // Also sometimes a request handler may decide that it's better to not send a response, 1569 // e.g. to avoid presence leaks. 1570 return; 1571 } 1572 1573 assert response.isResponseIQ(); 1574 1575 response.setTo(iqRequest.getFrom()); 1576 response.setStanzaId(iqRequest.getStanzaId()); 1577 try { 1578 sendStanza(response); 1579 } 1580 catch (InterruptedException | NotConnectedException e) { 1581 LOGGER.log(Level.WARNING, "Exception while sending response to IQ request", e); 1582 } 1583 } 1584 }); 1585 } 1586 // The following returns makes it impossible for packet listeners and collectors to 1587 // filter for IQ request stanzas, i.e. IQs of type 'set' or 'get'. This is the 1588 // desired behavior. 1589 return; 1590 } 1591 } 1592 1593 // First handle the async recv listeners. Note that this code is very similar to what follows a few lines below, 1594 // the only difference is that asyncRecvListeners is used here and that the packet listeners are started in 1595 // their own thread. 1596 final Collection<StanzaListener> listenersToNotify = new LinkedList<>(); 1597 extractMatchingListeners(packet, asyncRecvListeners, listenersToNotify); 1598 for (final StanzaListener listener : listenersToNotify) { 1599 asyncGoLimited(new Runnable() { 1600 @Override 1601 public void run() { 1602 try { 1603 listener.processStanza(packet); 1604 } catch (Exception e) { 1605 LOGGER.log(Level.SEVERE, "Exception in async packet listener", e); 1606 } 1607 } 1608 }); 1609 } 1610 1611 // Loop through all collectors and notify the appropriate ones. 1612 for (StanzaCollector collector : collectors) { 1613 collector.processStanza(packet); 1614 } 1615 1616 listenersToNotify.clear(); 1617 extractMatchingListeners(packet, recvListeners, listenersToNotify); 1618 for (StanzaListener stanzaListener : listenersToNotify) { 1619 inOrderListeners.performAsyncButOrdered(stanzaListener, () -> { 1620 try { 1621 stanzaListener.processStanza(packet); 1622 } 1623 catch (NotConnectedException e) { 1624 LOGGER.log(Level.WARNING, "Got not connected exception, aborting", e); 1625 } 1626 catch (Exception e) { 1627 LOGGER.log(Level.SEVERE, "Exception in packet listener", e); 1628 } 1629 }); 1630 } 1631 1632 // Notify the receive listeners interested in the packet 1633 listenersToNotify.clear(); 1634 extractMatchingListeners(packet, syncRecvListeners, listenersToNotify); 1635 // Decouple incoming stanza processing from listener invocation. Unlike async listeners, this uses a single 1636 // threaded executor service and therefore keeps the order. 1637 ASYNC_BUT_ORDERED.performAsyncButOrdered(this, new Runnable() { 1638 @Override 1639 public void run() { 1640 // As listeners are able to remove themselves and because the timepoint where it is decided to invoke a 1641 // listener is a different timepoint where the listener is actually invoked (here), we have to check 1642 // again if the listener is still active. 1643 Iterator<StanzaListener> it = listenersToNotify.iterator(); 1644 synchronized (syncRecvListeners) { 1645 while (it.hasNext()) { 1646 StanzaListener stanzaListener = it.next(); 1647 if (!syncRecvListeners.containsKey(stanzaListener)) { 1648 // The listener was removed from syncRecvListener, also remove him from listenersToNotify. 1649 it.remove(); 1650 } 1651 } 1652 } 1653 for (StanzaListener listener : listenersToNotify) { 1654 try { 1655 listener.processStanza(packet); 1656 } catch (NotConnectedException e) { 1657 LOGGER.log(Level.WARNING, "Got not connected exception, aborting", e); 1658 break; 1659 } catch (Exception e) { 1660 LOGGER.log(Level.SEVERE, "Exception in packet listener", e); 1661 } 1662 } 1663 } 1664 }); 1665 } 1666 1667 private static void extractMatchingListeners(Stanza stanza, Map<StanzaListener, ListenerWrapper> listeners, 1668 Collection<StanzaListener> listenersToNotify) { 1669 synchronized (listeners) { 1670 for (ListenerWrapper listenerWrapper : listeners.values()) { 1671 if (listenerWrapper.filterMatches(stanza)) { 1672 listenersToNotify.add(listenerWrapper.getListener()); 1673 } 1674 } 1675 } 1676 } 1677 1678 /** 1679 * Sets whether the connection has already logged in the server. This method assures that the 1680 * {@link #wasAuthenticated} flag is never reset once it has ever been set. 1681 * 1682 */ 1683 protected void setWasAuthenticated() { 1684 // Never reset the flag if the connection has ever been authenticated 1685 if (!wasAuthenticated) { 1686 wasAuthenticated = authenticated; 1687 } 1688 } 1689 1690 protected void callConnectionConnectingListener() { 1691 for (ConnectionListener listener : connectionListeners) { 1692 listener.connecting(this); 1693 } 1694 } 1695 1696 protected void callConnectionConnectedListener() { 1697 for (ConnectionListener listener : connectionListeners) { 1698 listener.connected(this); 1699 } 1700 } 1701 1702 protected void callConnectionAuthenticatedListener(boolean resumed) { 1703 for (ConnectionListener listener : connectionListeners) { 1704 try { 1705 listener.authenticated(this, resumed); 1706 } catch (Exception e) { 1707 // Catch and print any exception so we can recover 1708 // from a faulty listener and finish the shutdown process 1709 LOGGER.log(Level.SEVERE, "Exception in authenticated listener", e); 1710 } 1711 } 1712 } 1713 1714 void callConnectionClosedListener() { 1715 for (ConnectionListener listener : connectionListeners) { 1716 try { 1717 listener.connectionClosed(); 1718 } 1719 catch (Exception e) { 1720 // Catch and print any exception so we can recover 1721 // from a faulty listener and finish the shutdown process 1722 LOGGER.log(Level.SEVERE, "Error in listener while closing connection", e); 1723 } 1724 } 1725 } 1726 1727 private void callConnectionClosedOnErrorListener(Exception e) { 1728 boolean logWarning = true; 1729 if (e instanceof StreamErrorException) { 1730 StreamErrorException see = (StreamErrorException) e; 1731 if (see.getStreamError().getCondition() == StreamError.Condition.not_authorized 1732 && wasAuthenticated) { 1733 logWarning = false; 1734 LOGGER.log(Level.FINE, 1735 "Connection closed with not-authorized stream error after it was already authenticated. The account was likely deleted/unregistered on the server"); 1736 } 1737 } 1738 if (logWarning) { 1739 LOGGER.log(Level.WARNING, "Connection " + this + " closed with error", e); 1740 } 1741 for (ConnectionListener listener : connectionListeners) { 1742 try { 1743 listener.connectionClosedOnError(e); 1744 } 1745 catch (Exception e2) { 1746 // Catch and print any exception so we can recover 1747 // from a faulty listener 1748 LOGGER.log(Level.SEVERE, "Error in listener while closing connection", e2); 1749 } 1750 } 1751 } 1752 1753 /** 1754 * A wrapper class to associate a stanza filter with a listener. 1755 */ 1756 protected static class ListenerWrapper { 1757 1758 private final StanzaListener packetListener; 1759 private final StanzaFilter packetFilter; 1760 1761 /** 1762 * Create a class which associates a stanza filter with a listener. 1763 * 1764 * @param packetListener the stanza listener. 1765 * @param packetFilter the associated filter or null if it listen for all packets. 1766 */ 1767 public ListenerWrapper(StanzaListener packetListener, StanzaFilter packetFilter) { 1768 this.packetListener = packetListener; 1769 this.packetFilter = packetFilter; 1770 } 1771 1772 public boolean filterMatches(Stanza packet) { 1773 return packetFilter == null || packetFilter.accept(packet); 1774 } 1775 1776 public StanzaListener getListener() { 1777 return packetListener; 1778 } 1779 } 1780 1781 /** 1782 * A wrapper class to associate a stanza filter with an interceptor. 1783 */ 1784 @Deprecated 1785 // TODO: Remove once addStanzaInterceptor is gone. 1786 protected static class InterceptorWrapper { 1787 1788 private final StanzaListener packetInterceptor; 1789 private final StanzaFilter packetFilter; 1790 1791 /** 1792 * Create a class which associates a stanza filter with an interceptor. 1793 * 1794 * @param packetInterceptor the interceptor. 1795 * @param packetFilter the associated filter or null if it intercepts all packets. 1796 */ 1797 public InterceptorWrapper(StanzaListener packetInterceptor, StanzaFilter packetFilter) { 1798 this.packetInterceptor = packetInterceptor; 1799 this.packetFilter = packetFilter; 1800 } 1801 1802 public boolean filterMatches(Stanza packet) { 1803 return packetFilter == null || packetFilter.accept(packet); 1804 } 1805 1806 public StanzaListener getInterceptor() { 1807 return packetInterceptor; 1808 } 1809 } 1810 1811 private static final class GenericInterceptorWrapper<MPB extends MessageOrPresenceBuilder<MP, MPB>, MP extends MessageOrPresence<MPB>> { 1812 private final Consumer<MPB> stanzaInterceptor; 1813 private final Predicate<MP> stanzaFilter; 1814 1815 private GenericInterceptorWrapper(Consumer<MPB> stanzaInterceptor, Predicate<MP> stanzaFilter) { 1816 this.stanzaInterceptor = stanzaInterceptor; 1817 this.stanzaFilter = stanzaFilter; 1818 } 1819 1820 private boolean filterMatches(MP stanza) { 1821 return stanzaFilter == null || stanzaFilter.test(stanza); 1822 } 1823 1824 public Consumer<MPB> getInterceptor() { 1825 return stanzaInterceptor; 1826 } 1827 } 1828 1829 @Override 1830 public int getConnectionCounter() { 1831 return connectionCounterValue; 1832 } 1833 1834 @Override 1835 public void setFromMode(FromMode fromMode) { 1836 this.fromMode = fromMode; 1837 } 1838 1839 @Override 1840 public FromMode getFromMode() { 1841 return this.fromMode; 1842 } 1843 1844 protected final void parseFeatures(XmlPullParser parser) throws XmlPullParserException, IOException, SmackParsingException { 1845 streamFeatures.clear(); 1846 final int initialDepth = parser.getDepth(); 1847 while (true) { 1848 XmlPullParser.Event eventType = parser.next(); 1849 1850 if (eventType == XmlPullParser.Event.START_ELEMENT && parser.getDepth() == initialDepth + 1) { 1851 FullyQualifiedElement streamFeature = null; 1852 String name = parser.getName(); 1853 String namespace = parser.getNamespace(); 1854 switch (name) { 1855 case StartTls.ELEMENT: 1856 streamFeature = PacketParserUtils.parseStartTlsFeature(parser); 1857 break; 1858 case Mechanisms.ELEMENT: 1859 streamFeature = new Mechanisms(PacketParserUtils.parseMechanisms(parser)); 1860 break; 1861 case Bind.ELEMENT: 1862 streamFeature = Bind.Feature.INSTANCE; 1863 break; 1864 case Session.ELEMENT: 1865 streamFeature = PacketParserUtils.parseSessionFeature(parser); 1866 break; 1867 case Compress.Feature.ELEMENT: 1868 streamFeature = PacketParserUtils.parseCompressionFeature(parser); 1869 break; 1870 default: 1871 ExtensionElementProvider<ExtensionElement> provider = ProviderManager.getStreamFeatureProvider(name, namespace); 1872 if (provider != null) { 1873 streamFeature = provider.parse(parser, incomingStreamXmlEnvironment); 1874 } 1875 break; 1876 } 1877 if (streamFeature != null) { 1878 addStreamFeature(streamFeature); 1879 } 1880 } 1881 else if (eventType == XmlPullParser.Event.END_ELEMENT && parser.getDepth() == initialDepth) { 1882 break; 1883 } 1884 } 1885 } 1886 1887 protected final void parseFeaturesAndNotify(XmlPullParser parser) throws Exception { 1888 parseFeatures(parser); 1889 1890 if (hasFeature(Mechanisms.ELEMENT, Mechanisms.NAMESPACE)) { 1891 // Only proceed with SASL auth if TLS is disabled or if the server doesn't announce it 1892 if (!hasFeature(StartTls.ELEMENT, StartTls.NAMESPACE) 1893 || config.getSecurityMode() == SecurityMode.disabled) { 1894 tlsHandled = saslFeatureReceived = true; 1895 notifyWaitingThreads(); 1896 } 1897 } 1898 1899 // If the server reported the bind feature then we are that that we did SASL and maybe 1900 // STARTTLS. We can then report that the last 'stream:features' have been parsed 1901 if (hasFeature(Bind.ELEMENT, Bind.NAMESPACE)) { 1902 if (!hasFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE) 1903 || !config.isCompressionEnabled()) { 1904 // This where the last stream features from the server, either it did not contain 1905 // compression or we disabled it. 1906 lastFeaturesReceived = true; 1907 notifyWaitingThreads(); 1908 } 1909 } 1910 afterFeaturesReceived(); 1911 } 1912 1913 @SuppressWarnings("unused") 1914 protected void afterFeaturesReceived() throws SecurityRequiredException, NotConnectedException, InterruptedException { 1915 // Default implementation does nothing 1916 } 1917 1918 @SuppressWarnings("unchecked") 1919 @Override 1920 public <F extends FullyQualifiedElement> F getFeature(QName qname) { 1921 return (F) streamFeatures.get(qname); 1922 } 1923 1924 @Override 1925 public boolean hasFeature(QName qname) { 1926 return streamFeatures.containsKey(qname); 1927 } 1928 1929 protected void addStreamFeature(FullyQualifiedElement feature) { 1930 QName key = feature.getQName(); 1931 streamFeatures.put(key, feature); 1932 } 1933 1934 @Override 1935 public SmackFuture<IQ, Exception> sendIqRequestAsync(IQ request) { 1936 return sendIqRequestAsync(request, getReplyTimeout()); 1937 } 1938 1939 @Override 1940 public SmackFuture<IQ, Exception> sendIqRequestAsync(IQ request, long timeout) { 1941 StanzaFilter replyFilter = new IQReplyFilter(request, this); 1942 return sendAsync(request, replyFilter, timeout); 1943 } 1944 1945 @Override 1946 public <S extends Stanza> SmackFuture<S, Exception> sendAsync(S stanza, final StanzaFilter replyFilter) { 1947 return sendAsync(stanza, replyFilter, getReplyTimeout()); 1948 } 1949 1950 @SuppressWarnings("FutureReturnValueIgnored") 1951 @Override 1952 public <S extends Stanza> SmackFuture<S, Exception> sendAsync(S stanza, final StanzaFilter replyFilter, long timeout) { 1953 Objects.requireNonNull(stanza, "stanza must not be null"); 1954 // While Smack allows to add PacketListeners with a PacketFilter value of 'null', we 1955 // disallow it here in the async API as it makes no sense 1956 Objects.requireNonNull(replyFilter, "replyFilter must not be null"); 1957 1958 final InternalSmackFuture<S, Exception> future = new InternalSmackFuture<>(); 1959 1960 final StanzaListener stanzaListener = new StanzaListener() { 1961 @Override 1962 public void processStanza(Stanza stanza) throws NotConnectedException, InterruptedException { 1963 boolean removed = removeAsyncStanzaListener(this); 1964 if (!removed) { 1965 // We lost a race against the "no response" handling runnable. Avoid calling the callback, as the 1966 // exception callback will be invoked (if any). 1967 return; 1968 } 1969 try { 1970 XMPPErrorException.ifHasErrorThenThrow(stanza); 1971 @SuppressWarnings("unchecked") 1972 S s = (S) stanza; 1973 future.setResult(s); 1974 } 1975 catch (XMPPErrorException exception) { 1976 future.setException(exception); 1977 } 1978 } 1979 }; 1980 schedule(new Runnable() { 1981 @Override 1982 public void run() { 1983 boolean removed = removeAsyncStanzaListener(stanzaListener); 1984 if (!removed) { 1985 // We lost a race against the stanza listener, he already removed itself because he received a 1986 // reply. There is nothing more to do here. 1987 return; 1988 } 1989 1990 // If the packetListener got removed, then it was never run and 1991 // we never received a response, inform the exception callback 1992 Exception exception; 1993 if (!isConnected()) { 1994 // If the connection is no longer connected, throw a not connected exception. 1995 exception = new NotConnectedException(AbstractXMPPConnection.this, replyFilter); 1996 } 1997 else { 1998 exception = NoResponseException.newWith(AbstractXMPPConnection.this, replyFilter); 1999 } 2000 future.setException(exception); 2001 } 2002 }, timeout, TimeUnit.MILLISECONDS); 2003 2004 addAsyncStanzaListener(stanzaListener, replyFilter); 2005 try { 2006 sendStanza(stanza); 2007 } 2008 catch (NotConnectedException | InterruptedException exception) { 2009 future.setException(exception); 2010 } 2011 2012 return future; 2013 } 2014 2015 @SuppressWarnings("FutureReturnValueIgnored") 2016 @Override 2017 public void addOneTimeSyncCallback(final StanzaListener callback, final StanzaFilter packetFilter) { 2018 final StanzaListener packetListener = new StanzaListener() { 2019 @Override 2020 public void processStanza(Stanza packet) throws NotConnectedException, InterruptedException, NotLoggedInException { 2021 try { 2022 callback.processStanza(packet); 2023 } finally { 2024 removeSyncStanzaListener(this); 2025 } 2026 } 2027 }; 2028 addSyncStanzaListener(packetListener, packetFilter); 2029 schedule(new Runnable() { 2030 @Override 2031 public void run() { 2032 removeSyncStanzaListener(packetListener); 2033 } 2034 }, getReplyTimeout(), TimeUnit.MILLISECONDS); 2035 } 2036 2037 @Override 2038 public IQRequestHandler registerIQRequestHandler(final IQRequestHandler iqRequestHandler) { 2039 final QName key = iqRequestHandler.getQName(); 2040 switch (iqRequestHandler.getType()) { 2041 case set: 2042 synchronized (setIqRequestHandler) { 2043 return setIqRequestHandler.put(key, iqRequestHandler); 2044 } 2045 case get: 2046 synchronized (getIqRequestHandler) { 2047 return getIqRequestHandler.put(key, iqRequestHandler); 2048 } 2049 default: 2050 throw new IllegalArgumentException("Only IQ type of 'get' and 'set' allowed"); 2051 } 2052 } 2053 2054 @Override 2055 public final IQRequestHandler unregisterIQRequestHandler(IQRequestHandler iqRequestHandler) { 2056 return unregisterIQRequestHandler(iqRequestHandler.getElement(), iqRequestHandler.getNamespace(), 2057 iqRequestHandler.getType()); 2058 } 2059 2060 @Override 2061 public IQRequestHandler unregisterIQRequestHandler(String element, String namespace, IQ.Type type) { 2062 final QName key = new QName(namespace, element); 2063 switch (type) { 2064 case set: 2065 synchronized (setIqRequestHandler) { 2066 return setIqRequestHandler.remove(key); 2067 } 2068 case get: 2069 synchronized (getIqRequestHandler) { 2070 return getIqRequestHandler.remove(key); 2071 } 2072 default: 2073 throw new IllegalArgumentException("Only IQ type of 'get' and 'set' allowed"); 2074 } 2075 } 2076 2077 private long lastStanzaReceived; 2078 2079 @Override 2080 public long getLastStanzaReceived() { 2081 return lastStanzaReceived; 2082 } 2083 2084 /** 2085 * Get the timestamp when the connection was the first time authenticated, i.e., when the first successful login was 2086 * performed. Note that this value is not reset on disconnect, so it represents the timestamp from the last 2087 * authenticated connection. The value is also not reset on stream resumption. 2088 * 2089 * @return the timestamp or {@code null}. 2090 * @since 4.3.3 2091 */ 2092 public final long getAuthenticatedConnectionInitiallyEstablishedTimestamp() { 2093 return authenticatedConnectionInitiallyEstablishedTimestamp; 2094 } 2095 2096 /** 2097 * Install a parsing exception callback, which will be invoked once an exception is encountered while parsing a 2098 * stanza. 2099 * 2100 * @param callback the callback to install 2101 */ 2102 public void setParsingExceptionCallback(ParsingExceptionCallback callback) { 2103 parsingExceptionCallback = callback; 2104 } 2105 2106 /** 2107 * Get the current active parsing exception callback. 2108 * 2109 * @return the active exception callback or null if there is none 2110 */ 2111 public ParsingExceptionCallback getParsingExceptionCallback() { 2112 return parsingExceptionCallback; 2113 } 2114 2115 @Override 2116 public final String toString() { 2117 EntityFullJid localEndpoint = getUser(); 2118 String localEndpointString = localEndpoint == null ? "not-authenticated" : localEndpoint.toString(); 2119 return getClass().getSimpleName() + '[' + localEndpointString + "] (" + getConnectionCounter() + ')'; 2120 } 2121 2122 /** 2123 * A queue of deferred runnables that where not executed immediately because {@link #currentAsyncRunnables} reached 2124 * {@link #maxAsyncRunnables}. Note that we use a {@code LinkedList} in order to avoid space blowups in case the 2125 * list ever becomes very big and shrinks again. 2126 */ 2127 private final Queue<Runnable> deferredAsyncRunnables = new LinkedList<>(); 2128 2129 private int deferredAsyncRunnablesCount; 2130 2131 private int deferredAsyncRunnablesCountPrevious; 2132 2133 private int maxAsyncRunnables = SmackConfiguration.getDefaultConcurrencyLevelLimit(); 2134 2135 private int currentAsyncRunnables; 2136 2137 protected void asyncGoLimited(final Runnable runnable) { 2138 Runnable wrappedRunnable = new Runnable() { 2139 @Override 2140 public void run() { 2141 runnable.run(); 2142 2143 synchronized (deferredAsyncRunnables) { 2144 Runnable defferredRunnable = deferredAsyncRunnables.poll(); 2145 if (defferredRunnable == null) { 2146 currentAsyncRunnables--; 2147 } else { 2148 deferredAsyncRunnablesCount--; 2149 asyncGo(defferredRunnable); 2150 } 2151 } 2152 } 2153 }; 2154 2155 synchronized (deferredAsyncRunnables) { 2156 if (currentAsyncRunnables < maxAsyncRunnables) { 2157 currentAsyncRunnables++; 2158 asyncGo(wrappedRunnable); 2159 } else { 2160 deferredAsyncRunnablesCount++; 2161 deferredAsyncRunnables.add(wrappedRunnable); 2162 } 2163 2164 final int HIGH_WATERMARK = 100; 2165 final int INFORM_WATERMARK = 20; 2166 2167 final int deferredAsyncRunnablesCount = this.deferredAsyncRunnablesCount; 2168 2169 if (deferredAsyncRunnablesCount >= HIGH_WATERMARK 2170 && deferredAsyncRunnablesCountPrevious < HIGH_WATERMARK) { 2171 LOGGER.log(Level.WARNING, "High watermark of " + HIGH_WATERMARK + " simultaneous executing runnables reached"); 2172 } else if (deferredAsyncRunnablesCount >= INFORM_WATERMARK 2173 && deferredAsyncRunnablesCountPrevious < INFORM_WATERMARK) { 2174 LOGGER.log(Level.INFO, INFORM_WATERMARK + " simultaneous executing runnables reached"); 2175 } 2176 2177 deferredAsyncRunnablesCountPrevious = deferredAsyncRunnablesCount; 2178 } 2179 } 2180 2181 public void setMaxAsyncOperations(int maxAsyncOperations) { 2182 if (maxAsyncOperations < 1) { 2183 throw new IllegalArgumentException("Max async operations must be greater than 0"); 2184 } 2185 2186 synchronized (deferredAsyncRunnables) { 2187 maxAsyncRunnables = maxAsyncOperations; 2188 } 2189 } 2190 2191 protected static void asyncGo(Runnable runnable) { 2192 CACHED_EXECUTOR_SERVICE.execute(runnable); 2193 } 2194 2195 @SuppressWarnings("static-method") 2196 protected final SmackReactor getReactor() { 2197 return SMACK_REACTOR; 2198 } 2199 2200 protected static ScheduledAction schedule(Runnable runnable, long delay, TimeUnit unit) { 2201 return SMACK_REACTOR.schedule(runnable, delay, unit, ScheduledAction.Kind.NonBlocking); 2202 } 2203 2204 protected void onStreamOpen(XmlPullParser parser) { 2205 // We found an opening stream. 2206 if ("jabber:client".equals(parser.getNamespace(null))) { 2207 streamId = parser.getAttributeValue("", "id"); 2208 incomingStreamXmlEnvironment = XmlEnvironment.from(parser); 2209 2210 String reportedServerDomainString = parser.getAttributeValue("", "from"); 2211 if (reportedServerDomainString == null) { 2212 // RFC 6120 § 4.7.1. makes no explicit statement whether or not 'from' in the stream open from the server 2213 // in c2s connections is required or not. 2214 return; 2215 } 2216 DomainBareJid reportedServerDomain; 2217 try { 2218 reportedServerDomain = JidCreate.domainBareFrom(reportedServerDomainString); 2219 DomainBareJid configuredXmppServiceDomain = config.getXMPPServiceDomain(); 2220 if (!configuredXmppServiceDomain.equals(reportedServerDomain)) { 2221 LOGGER.warning("Domain reported by server '" + reportedServerDomain 2222 + "' does not match configured domain '" + configuredXmppServiceDomain + "'"); 2223 } 2224 } catch (XmppStringprepException e) { 2225 LOGGER.log(Level.WARNING, "XMPP service domain '" + reportedServerDomainString 2226 + "' as reported by server could not be transformed to a valid JID", e); 2227 } 2228 } 2229 } 2230 2231 protected final void sendStreamOpen() throws NotConnectedException, InterruptedException { 2232 // If possible, provide the receiving entity of the stream open tag, i.e. the server, as much information as 2233 // possible. The 'to' attribute is *always* available. The 'from' attribute if set by the user and no external 2234 // mechanism is used to determine the local entity (user). And the 'id' attribute is available after the first 2235 // response from the server (see e.g. RFC 6120 § 9.1.1 Step 2.) 2236 CharSequence to = getXMPPServiceDomain(); 2237 CharSequence from = null; 2238 CharSequence localpart = config.getUsername(); 2239 if (localpart != null) { 2240 from = XmppStringUtils.completeJidFrom(localpart, to); 2241 } 2242 String id = getStreamId(); 2243 String lang = config.getXmlLang(); 2244 2245 AbstractStreamOpen streamOpen = getStreamOpen(to, from, id, lang); 2246 sendNonza(streamOpen); 2247 updateOutgoingStreamXmlEnvironmentOnStreamOpen(streamOpen); 2248 } 2249 2250 protected AbstractStreamOpen getStreamOpen(CharSequence to, CharSequence from, String id, String lang) { 2251 return new StreamOpen(to, from, id, lang); 2252 } 2253 2254 protected void updateOutgoingStreamXmlEnvironmentOnStreamOpen(AbstractStreamOpen streamOpen) { 2255 XmlEnvironment.Builder xmlEnvironmentBuilder = XmlEnvironment.builder(); 2256 xmlEnvironmentBuilder.with(streamOpen); 2257 outgoingStreamXmlEnvironment = xmlEnvironmentBuilder.build(); 2258 } 2259 2260 protected final SmackTlsContext getSmackTlsContext() { 2261 return config.smackTlsContext; 2262 } 2263}