001/** 002 * 003 * Copyright 2009 Jive Software, 2018-2024 Florian Schmaus. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack; 018 019import java.io.IOException; 020import java.io.Reader; 021import java.io.Writer; 022import java.time.ZonedDateTime; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.HashMap; 026import java.util.Iterator; 027import java.util.LinkedHashMap; 028import java.util.LinkedList; 029import java.util.List; 030import java.util.Map; 031import java.util.Queue; 032import java.util.Set; 033import java.util.concurrent.ConcurrentLinkedQueue; 034import java.util.concurrent.CopyOnWriteArraySet; 035import java.util.concurrent.Executor; 036import java.util.concurrent.ExecutorService; 037import java.util.concurrent.Executors; 038import java.util.concurrent.Semaphore; 039import java.util.concurrent.ThreadFactory; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicInteger; 042import java.util.concurrent.locks.Lock; 043import java.util.concurrent.locks.ReentrantLock; 044import java.util.function.Predicate; 045import java.util.function.Supplier; 046import java.util.logging.Level; 047import java.util.logging.Logger; 048 049import javax.net.ssl.SSLSession; 050import javax.xml.namespace.QName; 051 052import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode; 053import org.jivesoftware.smack.SmackConfiguration.UnknownIqRequestReplyMode; 054import org.jivesoftware.smack.SmackException.AlreadyConnectedException; 055import org.jivesoftware.smack.SmackException.AlreadyLoggedInException; 056import org.jivesoftware.smack.SmackException.NoResponseException; 057import org.jivesoftware.smack.SmackException.NotConnectedException; 058import org.jivesoftware.smack.SmackException.NotLoggedInException; 059import org.jivesoftware.smack.SmackException.OutgoingQueueFullException; 060import org.jivesoftware.smack.SmackException.ResourceBindingNotOfferedException; 061import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException; 062import org.jivesoftware.smack.SmackException.SecurityRequiredException; 063import org.jivesoftware.smack.SmackException.SmackSaslException; 064import org.jivesoftware.smack.SmackException.SmackWrappedException; 065import org.jivesoftware.smack.SmackFuture.InternalSmackFuture; 066import org.jivesoftware.smack.XMPPException.FailedNonzaException; 067import org.jivesoftware.smack.XMPPException.StreamErrorException; 068import org.jivesoftware.smack.XMPPException.XMPPErrorException; 069import org.jivesoftware.smack.compress.packet.Compress; 070import org.jivesoftware.smack.compression.XMPPInputOutputStream; 071import org.jivesoftware.smack.datatypes.UInt16; 072import org.jivesoftware.smack.debugger.SmackDebugger; 073import org.jivesoftware.smack.debugger.SmackDebuggerFactory; 074import org.jivesoftware.smack.filter.IQReplyFilter; 075import org.jivesoftware.smack.filter.StanzaFilter; 076import org.jivesoftware.smack.filter.StanzaIdFilter; 077import org.jivesoftware.smack.internal.SmackTlsContext; 078import org.jivesoftware.smack.iqrequest.IQRequestHandler; 079import org.jivesoftware.smack.packet.AbstractStreamOpen; 080import org.jivesoftware.smack.packet.Bind; 081import org.jivesoftware.smack.packet.ErrorIQ; 082import org.jivesoftware.smack.packet.ExtensionElement; 083import org.jivesoftware.smack.packet.IQ; 084import org.jivesoftware.smack.packet.Mechanisms; 085import org.jivesoftware.smack.packet.Message; 086import org.jivesoftware.smack.packet.MessageBuilder; 087import org.jivesoftware.smack.packet.MessageOrPresence; 088import org.jivesoftware.smack.packet.MessageOrPresenceBuilder; 089import org.jivesoftware.smack.packet.Nonza; 090import org.jivesoftware.smack.packet.Presence; 091import org.jivesoftware.smack.packet.PresenceBuilder; 092import org.jivesoftware.smack.packet.Session; 093import org.jivesoftware.smack.packet.Stanza; 094import org.jivesoftware.smack.packet.StanzaError; 095import org.jivesoftware.smack.packet.StanzaFactory; 096import org.jivesoftware.smack.packet.StartTls; 097import org.jivesoftware.smack.packet.StreamError; 098import org.jivesoftware.smack.packet.StreamOpen; 099import org.jivesoftware.smack.packet.TopLevelStreamElement; 100import org.jivesoftware.smack.packet.XmlElement; 101import org.jivesoftware.smack.packet.XmlEnvironment; 102import org.jivesoftware.smack.packet.id.StanzaIdSource; 103import org.jivesoftware.smack.parsing.ParsingExceptionCallback; 104import org.jivesoftware.smack.parsing.SmackParsingException; 105import org.jivesoftware.smack.provider.ExtensionElementProvider; 106import org.jivesoftware.smack.provider.NonzaProvider; 107import org.jivesoftware.smack.provider.ProviderManager; 108import org.jivesoftware.smack.sasl.SASLErrorException; 109import org.jivesoftware.smack.sasl.SASLMechanism; 110import org.jivesoftware.smack.sasl.core.SASLAnonymous; 111import org.jivesoftware.smack.sasl.packet.SaslNonza; 112import org.jivesoftware.smack.util.Async; 113import org.jivesoftware.smack.util.CollectionUtil; 114import org.jivesoftware.smack.util.Consumer; 115import org.jivesoftware.smack.util.MultiMap; 116import org.jivesoftware.smack.util.Objects; 117import org.jivesoftware.smack.util.PacketParserUtils; 118import org.jivesoftware.smack.util.ParserUtils; 119import org.jivesoftware.smack.util.StringUtils; 120import org.jivesoftware.smack.xml.XmlPullParser; 121import org.jivesoftware.smack.xml.XmlPullParserException; 122 123import org.jxmpp.jid.DomainBareJid; 124import org.jxmpp.jid.EntityBareJid; 125import org.jxmpp.jid.EntityFullJid; 126import org.jxmpp.jid.Jid; 127import org.jxmpp.jid.impl.JidCreate; 128import org.jxmpp.jid.parts.Resourcepart; 129import org.jxmpp.stringprep.XmppStringprepException; 130import org.jxmpp.util.XmppStringUtils; 131 132/** 133 * This abstract class is commonly used as super class for XMPP connection mechanisms like TCP and BOSH. Hence, it 134 * provides the methods for connection state management, like {@link #connect()}, {@link #login()} and 135 * {@link #disconnect()} (which are deliberately not provided by the {@link XMPPConnection} interface). 136 * <p> 137 * <b>Note:</b> The default entry point to Smack's documentation is {@link XMPPConnection}. If you are getting started 138 * with Smack, then head over to {@link XMPPConnection} and the come back here. 139 * </p> 140 * <h2>Parsing Exceptions</h2> 141 * <p> 142 * In case a Smack parser (Provider) throws those exceptions are handled over to the {@link ParsingExceptionCallback}. A 143 * common cause for a provider throwing is illegal input, for example a non-numeric String where only Integers are 144 * allowed. Smack's <em>default behavior</em> follows the <b>"fail-hard per default"</b> principle leading to a 145 * termination of the connection on parsing exceptions. This default was chosen to make users eventually aware that they 146 * should configure their own callback and handle those exceptions to prevent the disconnect. Handle a parsing exception 147 * could be as simple as using a non-throwing no-op callback, which would cause the faulty stream element to be taken 148 * out of the stream, i.e., Smack behaves like that element was never received. 149 * </p> 150 * <p> 151 * If the parsing exception is because Smack received illegal input, then please consider informing the authors of the 152 * originating entity about that. If it was thrown because of an bug in a Smack parser, then please consider filling a 153 * bug with Smack. 154 * </p> 155 * <h3>Managing the parsing exception callback</h3> 156 * <p> 157 * The "fail-hard per default" behavior is achieved by using the 158 * {@link org.jivesoftware.smack.parsing.ExceptionThrowingCallbackWithHint} as default parsing exception callback. You 159 * can change the behavior using {@link #setParsingExceptionCallback(ParsingExceptionCallback)} to set a new callback. 160 * Use {@link org.jivesoftware.smack.SmackConfiguration#setDefaultParsingExceptionCallback(ParsingExceptionCallback)} to 161 * set the default callback. 162 * </p> 163 */ 164public abstract class AbstractXMPPConnection implements XMPPConnection { 165 private static final Logger LOGGER = Logger.getLogger(AbstractXMPPConnection.class.getName()); 166 167 protected static final SmackReactor SMACK_REACTOR; 168 169 static { 170 SMACK_REACTOR = SmackReactor.getInstance(); 171 } 172 173 /** 174 * Counter to uniquely identify connections that are created. 175 */ 176 private static final AtomicInteger connectionCounter = new AtomicInteger(0); 177 178 static { 179 Smack.ensureInitialized(); 180 } 181 182 protected enum SyncPointState { 183 initial, 184 request_sent, 185 successful, 186 } 187 188 /** 189 * A collection of ConnectionListeners which listen for connection closing 190 * and reconnection events. 191 */ 192 protected final Set<ConnectionListener> connectionListeners = 193 new CopyOnWriteArraySet<>(); 194 195 /** 196 * A collection of StanzaCollectors which collects packets for a specified filter 197 * and perform blocking and polling operations on the result queue. 198 * <p> 199 * We use a ConcurrentLinkedQueue here, because its Iterator is weakly 200 * consistent and we want {@link #invokeStanzaCollectorsAndNotifyRecvListeners(Stanza)} for-each 201 * loop to be lock free. As drawback, removing a StanzaCollector is O(n). 202 * The alternative would be a synchronized HashSet, but this would mean a 203 * synchronized block around every usage of <code>collectors</code>. 204 * </p> 205 */ 206 private final Collection<StanzaCollector> collectors = new ConcurrentLinkedQueue<>(); 207 208 private final Map<StanzaListener, ListenerWrapper> recvListeners = new LinkedHashMap<>(); 209 210 /** 211 * List of PacketListeners that will be notified synchronously when a new stanza was received. 212 */ 213 private final Map<StanzaListener, ListenerWrapper> syncRecvListeners = new LinkedHashMap<>(); 214 215 /** 216 * List of PacketListeners that will be notified asynchronously when a new stanza was received. 217 */ 218 private final Map<StanzaListener, ListenerWrapper> asyncRecvListeners = new LinkedHashMap<>(); 219 220 /** 221 * List of PacketListeners that will be notified when a new stanza was sent. 222 */ 223 private final Map<StanzaListener, ListenerWrapper> sendListeners = 224 new HashMap<>(); 225 226 /** 227 * List of PacketListeners that will be notified when a new stanza is about to be 228 * sent to the server. These interceptors may modify the stanza before it is being 229 * actually sent to the server. 230 */ 231 private final Map<StanzaListener, InterceptorWrapper> interceptors = 232 new HashMap<>(); 233 234 private final Map<Consumer<MessageBuilder>, GenericInterceptorWrapper<MessageBuilder, Message>> messageInterceptors = new HashMap<>(); 235 236 private final Map<Consumer<PresenceBuilder>, GenericInterceptorWrapper<PresenceBuilder, Presence>> presenceInterceptors = new HashMap<>(); 237 238 private XmlEnvironment incomingStreamXmlEnvironment; 239 240 protected XmlEnvironment outgoingStreamXmlEnvironment; 241 242 final MultiMap<QName, NonzaCallback> nonzaCallbacksMap = new MultiMap<>(); 243 244 protected final Lock connectionLock = new ReentrantLock(); 245 246 protected final Map<QName, XmlElement> streamFeatures = new HashMap<>(); 247 248 /** 249 * The full JID of the authenticated user, as returned by the resource binding response of the server. 250 * <p> 251 * It is important that we don't infer the user from the login() arguments and the configurations service name, as, 252 * for example, when SASL External is used, the username is not given to login but taken from the 'external' 253 * certificate. 254 * </p> 255 */ 256 protected EntityFullJid user; 257 258 protected boolean connected = false; 259 260 /** 261 * The stream ID, see RFC 6120 § 4.7.3 262 */ 263 protected String streamId; 264 265 /** 266 * The timeout to wait for a reply in milliseconds. 267 */ 268 private long replyTimeout = SmackConfiguration.getDefaultReplyTimeout(); 269 270 /** 271 * The SmackDebugger allows to log and debug XML traffic. 272 */ 273 protected final SmackDebugger debugger; 274 275 /** 276 * The Reader which is used for the debugger. 277 */ 278 protected Reader reader; 279 280 /** 281 * The Writer which is used for the debugger. 282 */ 283 protected Writer writer; 284 285 private Exception currentConnectionException; 286 287 protected boolean tlsHandled; 288 289 /** 290 * Set to <code>true</code> if the last features stanza from the server has been parsed. A XMPP connection 291 * handshake can invoke multiple features stanzas, e.g. when TLS is activated a second feature 292 * stanza is send by the server. This is set to true once the last feature stanza has been 293 * parsed. 294 */ 295 protected boolean lastFeaturesReceived; 296 297 /** 298 * Set to <code>true</code> if the SASL feature has been received. 299 */ 300 protected boolean saslFeatureReceived; 301 302 /** 303 * A synchronization point which is successful if this connection has received the closing 304 * stream element from the remote end-point, i.e. the server. 305 */ 306 protected boolean closingStreamReceived; 307 308 /** 309 * The SASLAuthentication manager that is responsible for authenticating with the server. 310 */ 311 private final SASLAuthentication saslAuthentication; 312 313 /** 314 * A number to uniquely identify connections that are created. This is distinct from the 315 * connection ID, which is a value sent by the server once a connection is made. 316 */ 317 protected final int connectionCounterValue = connectionCounter.getAndIncrement(); 318 319 /** 320 * Holds the initial configuration used while creating the connection. 321 */ 322 protected final ConnectionConfiguration config; 323 324 /** 325 * Defines how the from attribute of outgoing stanzas should be handled. 326 */ 327 private FromMode fromMode = FromMode.OMITTED; 328 329 protected XMPPInputOutputStream compressionHandler; 330 331 private ParsingExceptionCallback parsingExceptionCallback = SmackConfiguration.getDefaultParsingExceptionCallback(); 332 333 /** 334 * A cached thread pool executor service with custom thread factory to set meaningful names on the threads and set 335 * them 'daemon'. 336 */ 337 private static final ExecutorService CACHED_EXECUTOR_SERVICE = Executors.newCachedThreadPool(new ThreadFactory() { 338 @Override 339 public Thread newThread(Runnable runnable) { 340 Thread thread = new Thread(runnable); 341 thread.setName("Smack Cached Executor"); 342 thread.setDaemon(true); 343 thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 344 @Override 345 public void uncaughtException(Thread t, Throwable e) { 346 LOGGER.log(Level.WARNING, t + " encountered uncaught exception", e); 347 } 348 }); 349 return thread; 350 } 351 }); 352 353 protected static final AsyncButOrdered<AbstractXMPPConnection> ASYNC_BUT_ORDERED = new AsyncButOrdered<>(); 354 355 /** 356 * The used host to establish the connection to 357 */ 358 protected String host; 359 360 /** 361 * The used port to establish the connection to 362 */ 363 protected UInt16 port; 364 365 /** 366 * Flag that indicates if the user is currently authenticated with the server. 367 */ 368 protected boolean authenticated = false; 369 370 protected ZonedDateTime authenticatedConnectionInitiallyEstablishedTimestamp; 371 372 /** 373 * Flag that indicates if the user was authenticated with the server when the connection 374 * to the server was closed (abruptly or not). 375 */ 376 protected boolean wasAuthenticated = false; 377 378 private final Map<QName, IQRequestHandler> setIqRequestHandler = new HashMap<>(); 379 private final Map<QName, IQRequestHandler> getIqRequestHandler = new HashMap<>(); 380 private final Set<String> iqRequestHandlerNamespaces = new CopyOnWriteArraySet<>(); 381 private final Map<String, Integer> iqRequestHandlerNamespacesReferenceCounters = new HashMap<>(); 382 383 private final StanzaFactory stanzaFactory; 384 385 /** 386 * Create a new XMPPConnection to an XMPP server. 387 * 388 * @param configuration The configuration which is used to establish the connection. 389 */ 390 @SuppressWarnings("this-escape") 391 protected AbstractXMPPConnection(ConnectionConfiguration configuration) { 392 saslAuthentication = new SASLAuthentication(this, configuration); 393 config = configuration; 394 395 // Install the SASL Nonza callbacks. 396 buildNonzaCallback() 397 .listenFor(SaslNonza.Challenge.class, c -> { 398 try { 399 saslAuthentication.challengeReceived(c); 400 } catch (SmackException | InterruptedException e) { 401 saslAuthentication.authenticationFailed(e); 402 } 403 }) 404 .listenFor(SaslNonza.Success.class, s -> { 405 try { 406 saslAuthentication.authenticated(s); 407 } catch (SmackSaslException | NotConnectedException | InterruptedException e) { 408 saslAuthentication.authenticationFailed(e); 409 } 410 }) 411 .listenFor(SaslNonza.SASLFailure.class, f -> saslAuthentication.authenticationFailed(f)) 412 .install(); 413 414 SmackDebuggerFactory debuggerFactory = configuration.getDebuggerFactory(); 415 if (debuggerFactory != null) { 416 debugger = debuggerFactory.create(this); 417 } else { 418 debugger = null; 419 } 420 // Notify listeners that a new connection has been established 421 for (ConnectionCreationListener listener : XMPPConnectionRegistry.getConnectionCreationListeners()) { 422 listener.connectionCreated(this); 423 } 424 425 StanzaIdSource stanzaIdSource = configuration.constructStanzaIdSource(); 426 stanzaFactory = new StanzaFactory(stanzaIdSource); 427 } 428 429 /** 430 * Get the connection configuration used by this connection. 431 * 432 * @return the connection configuration. 433 */ 434 public ConnectionConfiguration getConfiguration() { 435 return config; 436 } 437 438 @Override 439 public DomainBareJid getXMPPServiceDomain() { 440 if (xmppServiceDomain != null) { 441 return xmppServiceDomain; 442 } 443 return config.getXMPPServiceDomain(); 444 } 445 446 @Override 447 public String getHost() { 448 return host; 449 } 450 451 @Override 452 public int getPort() { 453 final UInt16 port = this.port; 454 if (port == null) { 455 return -1; 456 } 457 458 return port.intValue(); 459 } 460 461 @Override 462 public abstract boolean isSecureConnection(); 463 464 // Usually batching is a good idea. So the two 465 // send(Internal|NonBlockingInternal) methods below could be using 466 // Collection<? extends TopLevelStreamElement> as parameter type instead. 467 // TODO: Add "batched send" support. Note that for the non-blocking variant, this probably requires a change in 468 // return type, so that it is possible to signal which messages could be "send" and which not. 469 470 protected abstract void sendInternal(TopLevelStreamElement element) throws NotConnectedException, InterruptedException; 471 472 protected abstract void sendNonBlockingInternal(TopLevelStreamElement element) throws NotConnectedException, OutgoingQueueFullException; 473 474 @SuppressWarnings("deprecation") 475 @Override 476 public boolean trySendStanza(Stanza stanza) throws NotConnectedException { 477 // Default implementation which falls back to sendStanza() as mentioned in the methods javadoc. May be 478 // overwritten by subclasses. 479 try { 480 sendStanza(stanza); 481 } catch (InterruptedException e) { 482 LOGGER.log(Level.FINER, 483 "Thread blocked in fallback implementation of trySendStanza(Stanza) was interrupted", e); 484 return false; 485 } 486 return true; 487 } 488 489 @SuppressWarnings("deprecation") 490 @Override 491 public boolean trySendStanza(Stanza stanza, long timeout, TimeUnit unit) 492 throws NotConnectedException, InterruptedException { 493 // Default implementation which falls back to sendStanza() as mentioned in the methods javadoc. May be 494 // overwritten by subclasses. 495 sendStanza(stanza); 496 return true; 497 } 498 499 @Override 500 public final void sendNonza(Nonza nonza) throws NotConnectedException, InterruptedException { 501 sendInternal(nonza); 502 } 503 504 @Override 505 public final void sendNonzaNonBlocking(Nonza nonza) throws NotConnectedException, OutgoingQueueFullException { 506 sendNonBlockingInternal(nonza); 507 } 508 509 @Override 510 public abstract boolean isUsingCompression(); 511 512 protected void initState() { 513 currentConnectionException = null; 514 saslFeatureReceived = lastFeaturesReceived = tlsHandled = false; 515 // TODO: We do not init closingStreamReceived here, as the integration tests use it to check if we waited for 516 // it. 517 } 518 519 /** 520 * Establishes a connection to the XMPP server. It basically 521 * creates and maintains a connection to the server. 522 * <p> 523 * Listeners will be preserved from a previous connection. 524 * </p> 525 * 526 * @throws XMPPException if an error occurs on the XMPP protocol level. 527 * @throws SmackException if an error occurs somewhere else besides XMPP protocol level. 528 * @throws IOException if an I/O error occurred. 529 * @return a reference to this object, to chain <code>connect()</code> with <code>login()</code>. 530 * @throws InterruptedException if the calling thread was interrupted. 531 */ 532 public synchronized AbstractXMPPConnection connect() throws SmackException, IOException, XMPPException, InterruptedException { 533 // Check if not already connected 534 throwAlreadyConnectedExceptionIfAppropriate(); 535 536 // Notify connection listeners that we are trying to connect 537 callConnectionConnectingListener(); 538 539 // Reset the connection state 540 initState(); 541 closingStreamReceived = false; 542 streamId = null; 543 544 try { 545 // Perform the actual connection to the XMPP service 546 connectInternal(); 547 548 // If TLS is required but the server doesn't offer it, disconnect 549 // from the server and throw an error. First check if we've already negotiated TLS 550 // and are secure, however (features get parsed a second time after TLS is established). 551 if (!isSecureConnection() && getConfiguration().getSecurityMode() == SecurityMode.required) { 552 throw new SecurityRequiredByClientException(); 553 } 554 } catch (SmackException | IOException | XMPPException | InterruptedException e) { 555 instantShutdown(); 556 throw e; 557 } 558 559 // If connectInternal() did not throw, then this connection must now be marked as connected. 560 assert connected; 561 562 callConnectionConnectedListener(); 563 564 return this; 565 } 566 567 /** 568 * Abstract method that concrete subclasses of XMPPConnection need to implement to perform their 569 * way of XMPP connection establishment. Implementations are required to perform an automatic 570 * login if the previous connection state was logged (authenticated). 571 * 572 * @throws SmackException if Smack detected an exceptional situation. 573 * @throws IOException if an I/O error occurred. 574 * @throws XMPPException if an XMPP protocol error was received. 575 * @throws InterruptedException if the calling thread was interrupted. 576 */ 577 protected abstract void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException; 578 579 private String usedUsername, usedPassword; 580 581 /** 582 * The resourcepart used for this connection. May not be the resulting resourcepart if it's null or overridden by the XMPP service. 583 */ 584 private Resourcepart usedResource; 585 586 /** 587 * Logs in to the server using the strongest SASL mechanism supported by 588 * the server. If more than the connection's default stanza timeout elapses in each step of the 589 * authentication process without a response from the server, a 590 * {@link SmackException.NoResponseException} will be thrown. 591 * <p> 592 * Before logging in (i.e. authenticate) to the server the connection must be connected 593 * by calling {@link #connect}. 594 * </p> 595 * <p> 596 * It is possible to log in without sending an initial available presence by using 597 * {@link ConnectionConfiguration.Builder#setSendPresence(boolean)}. 598 * Finally, if you want to not pass a password and instead use a more advanced mechanism 599 * while using SASL then you may be interested in using 600 * {@link ConnectionConfiguration.Builder#setCallbackHandler(javax.security.auth.callback.CallbackHandler)}. 601 * For more advanced login settings see {@link ConnectionConfiguration}. 602 * </p> 603 * 604 * @throws XMPPException if an error occurs on the XMPP protocol level. 605 * @throws SmackException if an error occurs somewhere else besides XMPP protocol level. 606 * @throws IOException if an I/O error occurs during login. 607 * @throws InterruptedException if the calling thread was interrupted. 608 */ 609 public synchronized void login() throws XMPPException, SmackException, IOException, InterruptedException { 610 // The previously used username, password and resource take over precedence over the 611 // ones from the connection configuration 612 CharSequence username = usedUsername != null ? usedUsername : config.getUsername(); 613 String password = usedPassword != null ? usedPassword : config.getPassword(); 614 Resourcepart resource = usedResource != null ? usedResource : config.getResource(); 615 login(username, password, resource); 616 } 617 618 /** 619 * Same as {@link #login(CharSequence, String, Resourcepart)}, but takes the resource from the connection 620 * configuration. 621 * 622 * @param username TODO javadoc me please 623 * @param password TODO javadoc me please 624 * @throws XMPPException if an XMPP protocol error was received. 625 * @throws SmackException if Smack detected an exceptional situation. 626 * @throws IOException if an I/O error occurred. 627 * @throws InterruptedException if the calling thread was interrupted. 628 * @see #login 629 */ 630 public synchronized void login(CharSequence username, String password) throws XMPPException, SmackException, 631 IOException, InterruptedException { 632 login(username, password, config.getResource()); 633 } 634 635 /** 636 * Login with the given username (authorization identity). You may omit the password if a callback handler is used. 637 * If resource is null, then the server will generate one. 638 * 639 * @param username TODO javadoc me please 640 * @param password TODO javadoc me please 641 * @param resource TODO javadoc me please 642 * @throws XMPPException if an XMPP protocol error was received. 643 * @throws SmackException if Smack detected an exceptional situation. 644 * @throws IOException if an I/O error occurred. 645 * @throws InterruptedException if the calling thread was interrupted. 646 * @see #login 647 */ 648 public synchronized void login(CharSequence username, String password, Resourcepart resource) throws XMPPException, 649 SmackException, IOException, InterruptedException { 650 if (!config.allowNullOrEmptyUsername) { 651 StringUtils.requireNotNullNorEmpty(username, "Username must not be null nor empty"); 652 } 653 throwNotConnectedExceptionIfAppropriate("Did you call connect() before login()?"); 654 throwAlreadyLoggedInExceptionIfAppropriate(); 655 usedUsername = username != null ? username.toString() : null; 656 usedPassword = password; 657 usedResource = resource; 658 loginInternal(usedUsername, usedPassword, usedResource); 659 } 660 661 protected abstract void loginInternal(String username, String password, Resourcepart resource) 662 throws XMPPException, SmackException, IOException, InterruptedException; 663 664 @Override 665 public final boolean isConnected() { 666 return connected; 667 } 668 669 @Override 670 public final boolean isAuthenticated() { 671 return authenticated; 672 } 673 674 @Override 675 public final EntityFullJid getUser() { 676 return user; 677 } 678 679 @Override 680 public String getStreamId() { 681 if (!isConnected()) { 682 return null; 683 } 684 return streamId; 685 } 686 687 protected final boolean hasCurrentConnectionException() { 688 return currentConnectionException != null; 689 } 690 691 protected final void setCurrentConnectionExceptionAndNotify(Exception exception) { 692 currentConnectionException = exception; 693 694 notifyWaitingThreads(); 695 } 696 697 /** 698 * We use an extra object for {@link #notifyWaitingThreads()} and {@link #waitFor(Supplier)}, because all state 699 * changing methods of the connection are synchronized using the connection instance as monitor. If we now would 700 * also use the connection instance for the internal process to wait for a condition, the {@link Object#wait()} 701 * would leave the monitor when it waits, which would allow for another potential call to a state changing function 702 * to proceed. 703 */ 704 private final Object internalMonitor = new Object(); 705 706 protected final void notifyWaitingThreads() { 707 synchronized (internalMonitor) { 708 internalMonitor.notifyAll(); 709 } 710 } 711 712 protected final boolean waitFor(Supplier<Boolean> condition) throws InterruptedException { 713 final long deadline = System.currentTimeMillis() + getReplyTimeout(); 714 synchronized (internalMonitor) { 715 while (!condition.get().booleanValue()) { 716 final long now = System.currentTimeMillis(); 717 if (now >= deadline) { 718 return false; 719 } 720 internalMonitor.wait(deadline - now); 721 } 722 } 723 return true; 724 } 725 726 protected final void waitForConditionOrThrowConnectionException(Supplier<Boolean> condition, String waitFor) 727 throws InterruptedException, SmackException.SmackWrappedException, NoResponseException { 728 boolean success = waitFor(() -> condition.get().booleanValue() || hasCurrentConnectionException()); 729 final Exception currentConnectionException = this.currentConnectionException; 730 if (currentConnectionException != null) { 731 throw new SmackException.SmackWrappedException(currentConnectionException); 732 } 733 734 // If there was no connection exception and we still did not successfully wait for the condition to hold, then 735 // we ran into a no-response timeout. 736 if (!success) { 737 throw NoResponseException.newWith(this, waitFor); 738 } 739 // Otherwise we successfully awaited the condition. 740 } 741 742 protected Resourcepart bindResourceAndEstablishSession(Resourcepart resource) 743 throws SmackException, InterruptedException, XMPPException { 744 // Wait until either: 745 // - the servers last features stanza has been parsed 746 // - the timeout occurs 747 LOGGER.finer("Waiting for last features to be received before continuing with resource binding"); 748 waitForConditionOrThrowConnectionException(() -> lastFeaturesReceived, "last stream features received from server"); 749 750 if (!hasFeature(Bind.ELEMENT, Bind.NAMESPACE)) { 751 // Server never offered resource binding, which is REQUIRED in XMPP client and 752 // server implementations as per RFC6120 7.2 753 throw new ResourceBindingNotOfferedException(); 754 } 755 756 // Resource binding, see RFC6120 7. 757 // Note that we can not use IQReplyFilter here, since the users full JID is not yet 758 // available. It will become available right after the resource has been successfully bound. 759 Bind bindResource = Bind.newSet(resource); 760 StanzaCollector packetCollector = createStanzaCollectorAndSend(new StanzaIdFilter(bindResource), bindResource); 761 Bind response = packetCollector.nextResultOrThrow(); 762 // Set the connections user to the result of resource binding. It is important that we don't infer the user 763 // from the login() arguments and the configurations service name, as, for example, when SASL External is used, 764 // the username is not given to login but taken from the 'external' certificate. 765 user = response.getJid(); 766 xmppServiceDomain = user.asDomainBareJid(); 767 768 Session.Feature sessionFeature = getFeature(Session.Feature.class); 769 // Only bind the session if it's announced as stream feature by the server, is not optional and not disabled 770 // For more information see http://tools.ietf.org/html/draft-cridland-xmpp-session-01 771 if (sessionFeature != null && !sessionFeature.isOptional()) { 772 Session session = new Session(); 773 packetCollector = createStanzaCollectorAndSend(new StanzaIdFilter(session), session); 774 packetCollector.nextResultOrThrow(); 775 } 776 777 return response.getJid().getResourcepart(); 778 } 779 780 protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException { 781 if (!resumed) { 782 authenticatedConnectionInitiallyEstablishedTimestamp = ZonedDateTime.now(); 783 } 784 // Indicate that we're now authenticated. 785 this.authenticated = true; 786 787 // If debugging is enabled, change the debug window title to include the 788 // name we are now logged-in as. 789 // If DEBUG was set to true AFTER the connection was created the debugger 790 // will be null 791 if (debugger != null) { 792 debugger.userHasLogged(user); 793 } 794 callConnectionAuthenticatedListener(resumed); 795 796 // Set presence to online. It is important that this is done after 797 // callConnectionAuthenticatedListener(), as this call will also 798 // eventually load the roster. And we should load the roster before we 799 // send the initial presence. 800 if (config.isSendPresence() && !resumed) { 801 Presence availablePresence = getStanzaFactory() 802 .buildPresenceStanza() 803 .ofType(Presence.Type.available) 804 .build(); 805 sendStanza(availablePresence); 806 } 807 } 808 809 @Override 810 public final boolean isAnonymous() { 811 return isAuthenticated() && SASLAnonymous.NAME.equals(getUsedSaslMechansism()); 812 } 813 814 /** 815 * Get the name of the SASL mechanism that was used to authenticate this connection. This returns the name of 816 * mechanism which was used the last time this connection was authenticated, and will return <code>null</code> if 817 * this connection was not authenticated before. 818 * 819 * @return the name of the used SASL mechanism. 820 * @since 4.2 821 */ 822 public final String getUsedSaslMechansism() { 823 return saslAuthentication.getNameOfLastUsedSaslMechansism(); 824 } 825 826 private DomainBareJid xmppServiceDomain; 827 828 protected Lock getConnectionLock() { 829 return connectionLock; 830 } 831 832 protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException { 833 throwNotConnectedExceptionIfAppropriate(null); 834 } 835 836 protected void throwNotConnectedExceptionIfAppropriate(String optionalHint) throws NotConnectedException { 837 if (!isConnected()) { 838 throw new NotConnectedException(optionalHint); 839 } 840 } 841 842 protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException { 843 if (isConnected()) { 844 throw new AlreadyConnectedException(); 845 } 846 } 847 848 protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException { 849 if (isAuthenticated()) { 850 throw new AlreadyLoggedInException(); 851 } 852 } 853 854 @Override 855 public final StanzaFactory getStanzaFactory() { 856 return stanzaFactory; 857 } 858 859 private Stanza preSendStanza(Stanza stanza) throws NotConnectedException { 860 Objects.requireNonNull(stanza, "Stanza must not be null"); 861 assert stanza instanceof Message || stanza instanceof Presence || stanza instanceof IQ; 862 863 throwNotConnectedExceptionIfAppropriate(); 864 switch (fromMode) { 865 case OMITTED: 866 stanza.setFrom((Jid) null); 867 break; 868 case USER: 869 stanza.setFrom(getUser()); 870 break; 871 case UNCHANGED: 872 default: 873 break; 874 } 875 // Invoke interceptors for the new stanza that is about to be sent. Interceptors may modify 876 // the content of the stanza. 877 Stanza stanzaAfterInterceptors = firePacketInterceptors(stanza); 878 return stanzaAfterInterceptors; 879 } 880 881 @Override 882 public final void sendStanza(Stanza stanza) throws NotConnectedException, InterruptedException { 883 stanza = preSendStanza(stanza); 884 sendInternal(stanza); 885 } 886 887 @Override 888 public final void sendStanzaNonBlocking(Stanza stanza) throws NotConnectedException, OutgoingQueueFullException { 889 stanza = preSendStanza(stanza); 890 sendNonBlockingInternal(stanza); 891 } 892 893 /** 894 * Authenticate a connection. 895 * 896 * @param username the username that is authenticating with the server. 897 * @param password the password to send to the server. 898 * @param authzid the authorization identifier (typically null). 899 * @param sslSession the optional SSL/TLS session (if one was established) 900 * @return the used SASLMechanism. 901 * @throws XMPPErrorException if there was an XMPP error returned. 902 * @throws SASLErrorException if a SASL protocol error was returned. 903 * @throws IOException if an I/O error occurred. 904 * @throws InterruptedException if the calling thread was interrupted. 905 * @throws SmackSaslException if a SASL specific error occurred. 906 * @throws NotConnectedException if the XMPP connection is not connected. 907 * @throws NoResponseException if there was no response from the remote entity. 908 * @throws SmackWrappedException in case of an exception. 909 * @see SASLAuthentication#authenticate(String, String, EntityBareJid, SSLSession) 910 */ 911 protected final SASLMechanism authenticate(String username, String password, EntityBareJid authzid, 912 SSLSession sslSession) throws XMPPErrorException, SASLErrorException, SmackSaslException, 913 NotConnectedException, NoResponseException, IOException, InterruptedException, SmackWrappedException { 914 SASLMechanism saslMechanism = saslAuthentication.authenticate(username, password, authzid, sslSession); 915 afterSaslAuthenticationSuccess(); 916 return saslMechanism; 917 } 918 919 /** 920 * Hook for subclasses right after successful SASL authentication. RFC 6120 § 6.4.6. specifies a that the initiating 921 * entity, needs to initiate a new stream in this case. But some transports, like BOSH, requires a special handling. 922 * <p> 923 * Note that we can not reset XMPPTCPConnection's parser here, because this method is invoked by the thread calling 924 * {@link #login()}, but the parser reset has to be done within the reader thread. 925 * </p> 926 * 927 * @throws NotConnectedException if the XMPP connection is not connected. 928 * @throws InterruptedException if the calling thread was interrupted. 929 * @throws SmackWrappedException in case of an exception. 930 */ 931 protected void afterSaslAuthenticationSuccess() 932 throws NotConnectedException, InterruptedException, SmackWrappedException { 933 sendStreamOpen(); 934 } 935 936 protected final boolean isSaslAuthenticated() { 937 return saslAuthentication.authenticationSuccessful(); 938 } 939 940 /** 941 * Closes the connection by setting presence to unavailable then closing the connection to 942 * the XMPP server. The XMPPConnection can still be used for connecting to the server 943 * again. 944 * 945 */ 946 public void disconnect() { 947 Presence unavailablePresence = null; 948 if (isAuthenticated()) { 949 unavailablePresence = getStanzaFactory().buildPresenceStanza() 950 .ofType(Presence.Type.unavailable) 951 .build(); 952 } 953 try { 954 disconnect(unavailablePresence); 955 } 956 catch (NotConnectedException e) { 957 LOGGER.log(Level.FINEST, "Connection is already disconnected", e); 958 } 959 } 960 961 /** 962 * Closes the connection. A custom unavailable presence is sent to the server, followed 963 * by closing the stream. The XMPPConnection can still be used for connecting to the server 964 * again. A custom unavailable presence is useful for communicating offline presence 965 * information such as "On vacation". Typically, just the status text of the presence 966 * stanza is set with online information, but most XMPP servers will deliver the full 967 * presence stanza with whatever data is set. 968 * 969 * @param unavailablePresence the optional presence stanza to send during shutdown. 970 * @throws NotConnectedException if the XMPP connection is not connected. 971 */ 972 public synchronized void disconnect(Presence unavailablePresence) throws NotConnectedException { 973 if (unavailablePresence != null) { 974 try { 975 sendStanza(unavailablePresence); 976 } catch (InterruptedException e) { 977 LOGGER.log(Level.FINE, 978 "Was interrupted while sending unavailable presence. Continuing to disconnect the connection", 979 e); 980 } 981 } 982 shutdown(); 983 callConnectionClosedListener(); 984 } 985 986 private final Object notifyConnectionErrorMonitor = new Object(); 987 988 /** 989 * Sends out a notification that there was an error with the connection 990 * and closes the connection. 991 * 992 * @param exception the exception that causes the connection close event. 993 */ 994 protected final void notifyConnectionError(final Exception exception) { 995 synchronized (notifyConnectionErrorMonitor) { 996 if (!isConnected()) { 997 LOGGER.log(Level.INFO, "Connection was already disconnected when attempting to handle " + exception, 998 exception); 999 return; 1000 } 1001 1002 // Note that we first have to set the current connection exception and notify waiting threads, as one of them 1003 // could hold the instance lock, which we also need later when calling instantShutdown(). 1004 setCurrentConnectionExceptionAndNotify(exception); 1005 1006 // Closes the connection temporary. A if the connection supports stream management, then a reconnection is 1007 // possible. Note that a connection listener of e.g. XMPPTCPConnection will drop the SM state in 1008 // case the Exception is a StreamErrorException. 1009 instantShutdown(); 1010 1011 for (StanzaCollector collector : collectors) { 1012 collector.notifyConnectionError(exception); 1013 } 1014 1015 Async.go(() -> { 1016 // Notify connection listeners of the error. 1017 callConnectionClosedOnErrorListener(exception); 1018 }, AbstractXMPPConnection.this + " callConnectionClosedOnErrorListener()"); 1019 } 1020 } 1021 1022 /** 1023 * Performs an unclean disconnect and shutdown of the connection. Does not send a closing stream stanza. 1024 */ 1025 public abstract void instantShutdown(); 1026 1027 /** 1028 * Shuts the current connection down. 1029 */ 1030 protected abstract void shutdown(); 1031 1032 protected final boolean waitForClosingStreamTagFromServer() { 1033 try { 1034 waitForConditionOrThrowConnectionException(() -> closingStreamReceived, "closing stream tag from the server"); 1035 } catch (InterruptedException | SmackException.SmackWrappedException | NoResponseException e) { 1036 LOGGER.log(Level.INFO, "Exception while waiting for closing stream element from the server " + this, e); 1037 return false; 1038 } 1039 return true; 1040 } 1041 1042 @Override 1043 public void addConnectionListener(ConnectionListener connectionListener) { 1044 if (connectionListener == null) { 1045 return; 1046 } 1047 connectionListeners.add(connectionListener); 1048 } 1049 1050 @Override 1051 public void removeConnectionListener(ConnectionListener connectionListener) { 1052 connectionListeners.remove(connectionListener); 1053 } 1054 1055 @Override 1056 @SuppressWarnings("TypeParameterUnusedInFormals") 1057 public <I extends IQ> I sendIqRequestAndWaitForResponse(IQ request) 1058 throws NoResponseException, XMPPErrorException, NotConnectedException, InterruptedException { 1059 StanzaCollector collector = createStanzaCollectorAndSend(request); 1060 IQ resultResponse = collector.nextResultOrThrow(); 1061 @SuppressWarnings("unchecked") 1062 I concreteResultResponse = (I) resultResponse; 1063 return concreteResultResponse; 1064 } 1065 1066 @Override 1067 public StanzaCollector createStanzaCollectorAndSend(IQ packet) throws NotConnectedException, InterruptedException { 1068 StanzaFilter packetFilter = new IQReplyFilter(packet, this); 1069 // Create the packet collector before sending the packet 1070 StanzaCollector packetCollector = createStanzaCollectorAndSend(packetFilter, packet); 1071 return packetCollector; 1072 } 1073 1074 @Override 1075 public StanzaCollector createStanzaCollectorAndSend(StanzaFilter packetFilter, Stanza packet) 1076 throws NotConnectedException, InterruptedException { 1077 StanzaCollector.Configuration configuration = StanzaCollector.newConfiguration() 1078 .setStanzaFilter(packetFilter) 1079 .setRequest(packet); 1080 // Create the packet collector before sending the packet 1081 StanzaCollector packetCollector = createStanzaCollector(configuration); 1082 try { 1083 // Now we can send the packet as the collector has been created 1084 sendStanza(packet); 1085 } 1086 catch (InterruptedException | NotConnectedException | RuntimeException e) { 1087 packetCollector.cancel(); 1088 throw e; 1089 } 1090 return packetCollector; 1091 } 1092 1093 @Override 1094 public StanzaCollector createStanzaCollector(StanzaFilter packetFilter) { 1095 StanzaCollector.Configuration configuration = StanzaCollector.newConfiguration().setStanzaFilter(packetFilter); 1096 return createStanzaCollector(configuration); 1097 } 1098 1099 @Override 1100 public StanzaCollector createStanzaCollector(StanzaCollector.Configuration configuration) { 1101 StanzaCollector collector = new StanzaCollector(this, configuration); 1102 // Add the collector to the list of active collectors. 1103 collectors.add(collector); 1104 return collector; 1105 } 1106 1107 @Override 1108 public void removeStanzaCollector(StanzaCollector collector) { 1109 collectors.remove(collector); 1110 } 1111 1112 @Override 1113 public final ListenerHandle addStanzaListener(StanzaListener stanzaListener, StanzaFilter stanzaFilter) { 1114 if (stanzaListener == null) { 1115 throw new NullPointerException("Given stanza listener must not be null"); 1116 } 1117 ListenerWrapper wrapper = new ListenerWrapper(stanzaListener, stanzaFilter); 1118 synchronized (recvListeners) { 1119 recvListeners.put(stanzaListener, wrapper); 1120 } 1121 return new ListenerHandle.StanzaListenerHandle(this, stanzaListener); 1122 } 1123 1124 @Override 1125 public final boolean removeStanzaListener(StanzaListener stanzaListener) { 1126 synchronized (recvListeners) { 1127 return recvListeners.remove(stanzaListener) != null; 1128 } 1129 } 1130 1131 @Override 1132 public ListenerHandle addSyncStanzaListener(StanzaListener packetListener, StanzaFilter packetFilter) { 1133 if (packetListener == null) { 1134 throw new NullPointerException("Packet listener is null."); 1135 } 1136 ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); 1137 synchronized (syncRecvListeners) { 1138 syncRecvListeners.put(packetListener, wrapper); 1139 } 1140 return new ListenerHandle.SyncStanzaListenerHandle(this, packetListener); 1141 } 1142 1143 @Override 1144 public boolean removeSyncStanzaListener(StanzaListener packetListener) { 1145 synchronized (syncRecvListeners) { 1146 return syncRecvListeners.remove(packetListener) != null; 1147 } 1148 } 1149 1150 @Override 1151 public ListenerHandle addAsyncStanzaListener(StanzaListener packetListener, StanzaFilter packetFilter) { 1152 if (packetListener == null) { 1153 throw new NullPointerException("Packet listener is null."); 1154 } 1155 ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); 1156 synchronized (asyncRecvListeners) { 1157 asyncRecvListeners.put(packetListener, wrapper); 1158 } 1159 return new ListenerHandle.AsyncStanzaListenerHandle(this, packetListener); 1160 } 1161 1162 @Override 1163 public boolean removeAsyncStanzaListener(StanzaListener packetListener) { 1164 synchronized (asyncRecvListeners) { 1165 return asyncRecvListeners.remove(packetListener) != null; 1166 } 1167 } 1168 1169 @Override 1170 public void addStanzaSendingListener(StanzaListener packetListener, StanzaFilter packetFilter) { 1171 if (packetListener == null) { 1172 throw new NullPointerException("Packet listener is null."); 1173 } 1174 ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); 1175 synchronized (sendListeners) { 1176 sendListeners.put(packetListener, wrapper); 1177 } 1178 } 1179 1180 @Override 1181 public void removeStanzaSendingListener(StanzaListener packetListener) { 1182 synchronized (sendListeners) { 1183 sendListeners.remove(packetListener); 1184 } 1185 } 1186 1187 /** 1188 * Process all stanza listeners for sending stanzas. 1189 * <p> 1190 * Compared to {@link #firePacketInterceptors(Stanza)}, the listeners will be invoked in a new thread. 1191 * </p> 1192 * 1193 * @param sendTopLevelStreamElement the top level stream element which just got send. 1194 */ 1195 // TODO: Rename to fireElementSendingListeners(). 1196 @SuppressWarnings("javadoc") 1197 protected void firePacketSendingListeners(final TopLevelStreamElement sendTopLevelStreamElement) { 1198 if (debugger != null) { 1199 debugger.onOutgoingStreamElement(sendTopLevelStreamElement); 1200 } 1201 1202 if (!(sendTopLevelStreamElement instanceof Stanza)) { 1203 return; 1204 } 1205 Stanza packet = (Stanza) sendTopLevelStreamElement; 1206 1207 final List<StanzaListener> listenersToNotify = new ArrayList<>(); 1208 synchronized (sendListeners) { 1209 for (ListenerWrapper listenerWrapper : sendListeners.values()) { 1210 if (listenerWrapper.filterMatches(packet)) { 1211 listenersToNotify.add(listenerWrapper.getListener()); 1212 } 1213 } 1214 } 1215 if (listenersToNotify.isEmpty()) { 1216 return; 1217 } 1218 // Notify in a new thread, because we can 1219 asyncGo(new Runnable() { 1220 @Override 1221 public void run() { 1222 for (StanzaListener listener : listenersToNotify) { 1223 try { 1224 listener.processStanza(packet); 1225 } 1226 catch (Exception e) { 1227 LOGGER.log(Level.WARNING, "Sending listener threw exception", e); 1228 continue; 1229 } 1230 } 1231 } 1232 }); 1233 } 1234 1235 private static <MPB extends MessageOrPresenceBuilder<MP, MPB>, MP extends MessageOrPresence<MPB>> void addInterceptor( 1236 Map<Consumer<MPB>, GenericInterceptorWrapper<MPB, MP>> interceptors, Consumer<MPB> interceptor, 1237 Predicate<MP> filter) { 1238 Objects.requireNonNull(interceptor, "Interceptor must not be null"); 1239 1240 GenericInterceptorWrapper<MPB, MP> interceptorWrapper = new GenericInterceptorWrapper<>(interceptor, filter); 1241 1242 synchronized (interceptors) { 1243 interceptors.put(interceptor, interceptorWrapper); 1244 } 1245 } 1246 1247 private static <MPB extends MessageOrPresenceBuilder<MP, MPB>, MP extends MessageOrPresence<MPB>> void removeInterceptor( 1248 Map<Consumer<MPB>, GenericInterceptorWrapper<MPB, MP>> interceptors, Consumer<MPB> interceptor) { 1249 synchronized (interceptors) { 1250 interceptors.remove(interceptor); 1251 } 1252 } 1253 1254 @Override 1255 public void addMessageInterceptor(Consumer<MessageBuilder> messageInterceptor, Predicate<Message> messageFilter) { 1256 addInterceptor(messageInterceptors, messageInterceptor, messageFilter); 1257 } 1258 1259 @Override 1260 public void removeMessageInterceptor(Consumer<MessageBuilder> messageInterceptor) { 1261 removeInterceptor(messageInterceptors, messageInterceptor); 1262 } 1263 1264 @Override 1265 public void addPresenceInterceptor(Consumer<PresenceBuilder> presenceInterceptor, 1266 Predicate<Presence> presenceFilter) { 1267 addInterceptor(presenceInterceptors, presenceInterceptor, presenceFilter); 1268 } 1269 1270 @Override 1271 public void removePresenceInterceptor(Consumer<PresenceBuilder> presenceInterceptor) { 1272 removeInterceptor(presenceInterceptors, presenceInterceptor); 1273 } 1274 1275 private static <MPB extends MessageOrPresenceBuilder<MP, MPB>, MP extends MessageOrPresence<MPB>> MP fireMessageOrPresenceInterceptors( 1276 MP messageOrPresence, Map<Consumer<MPB>, GenericInterceptorWrapper<MPB, MP>> interceptors) { 1277 List<Consumer<MPB>> interceptorsToInvoke = new ArrayList<>(); 1278 synchronized (interceptors) { 1279 for (GenericInterceptorWrapper<MPB, MP> interceptorWrapper : interceptors.values()) { 1280 if (interceptorWrapper.filterMatches(messageOrPresence)) { 1281 Consumer<MPB> interceptor = interceptorWrapper.getInterceptor(); 1282 interceptorsToInvoke.add(interceptor); 1283 } 1284 } 1285 } 1286 1287 // Avoid transforming the stanza to a builder if there is no interceptor. 1288 if (interceptorsToInvoke.isEmpty()) { 1289 return messageOrPresence; 1290 } 1291 1292 MPB builder = messageOrPresence.asBuilder(); 1293 for (Consumer<MPB> interceptor : interceptorsToInvoke) { 1294 interceptor.accept(builder); 1295 } 1296 1297 // Now that the interceptors have (probably) modified the stanza in its builder form, we need to re-assemble it. 1298 messageOrPresence = builder.build(); 1299 return messageOrPresence; 1300 } 1301 1302 /** 1303 * Process interceptors. Interceptors may modify the stanza that is about to be sent. 1304 * Since the thread that requested to send the stanza will invoke all interceptors, it 1305 * is important that interceptors perform their work as soon as possible so that the 1306 * thread does not remain blocked for a long period. 1307 * 1308 * @param packet the stanza that is going to be sent to the server. 1309 * @return the, potentially modified stanza, after the interceptors are run. 1310 */ 1311 private Stanza firePacketInterceptors(Stanza packet) { 1312 List<StanzaListener> interceptorsToInvoke = new ArrayList<>(); 1313 synchronized (interceptors) { 1314 for (InterceptorWrapper interceptorWrapper : interceptors.values()) { 1315 if (interceptorWrapper.filterMatches(packet)) { 1316 interceptorsToInvoke.add(interceptorWrapper.getInterceptor()); 1317 } 1318 } 1319 } 1320 for (StanzaListener interceptor : interceptorsToInvoke) { 1321 try { 1322 interceptor.processStanza(packet); 1323 } catch (Exception e) { 1324 LOGGER.log(Level.SEVERE, "Packet interceptor threw exception", e); 1325 } 1326 } 1327 1328 final Stanza stanzaAfterInterceptors; 1329 if (packet instanceof Message) { 1330 Message message = (Message) packet; 1331 stanzaAfterInterceptors = fireMessageOrPresenceInterceptors(message, messageInterceptors); 1332 } 1333 else if (packet instanceof Presence) { 1334 Presence presence = (Presence) packet; 1335 stanzaAfterInterceptors = fireMessageOrPresenceInterceptors(presence, presenceInterceptors); 1336 } else { 1337 // We do not (yet) support interceptors for IQ stanzas. 1338 assert packet instanceof IQ; 1339 stanzaAfterInterceptors = packet; 1340 } 1341 1342 return stanzaAfterInterceptors; 1343 } 1344 1345 /** 1346 * Initialize the {@link #debugger}. You can specify a customized {@link SmackDebugger} 1347 * by setup the system property <code>smack.debuggerClass</code> to the implementation. 1348 * 1349 * @throws IllegalStateException if the reader or writer isn't yet initialized. 1350 * @throws IllegalArgumentException if the SmackDebugger can't be loaded. 1351 */ 1352 protected void initDebugger() { 1353 if (reader == null || writer == null) { 1354 throw new NullPointerException("Reader or writer isn't initialized."); 1355 } 1356 // If debugging is enabled, we open a window and write out all network traffic. 1357 if (debugger != null) { 1358 // Obtain new reader and writer from the existing debugger 1359 reader = debugger.newConnectionReader(reader); 1360 writer = debugger.newConnectionWriter(writer); 1361 } 1362 } 1363 1364 @Override 1365 public long getReplyTimeout() { 1366 return replyTimeout; 1367 } 1368 1369 @Override 1370 public void setReplyTimeout(long timeout) { 1371 if (Long.MAX_VALUE - System.currentTimeMillis() < timeout) { 1372 throw new IllegalArgumentException("Extremely long reply timeout"); 1373 } 1374 else { 1375 replyTimeout = timeout; 1376 } 1377 } 1378 1379 private SmackConfiguration.UnknownIqRequestReplyMode unknownIqRequestReplyMode = SmackConfiguration.getUnknownIqRequestReplyMode(); 1380 1381 /** 1382 * Set how Smack behaves when an unknown IQ request has been received. 1383 * 1384 * @param unknownIqRequestReplyMode reply mode. 1385 */ 1386 public void setUnknownIqRequestReplyMode(UnknownIqRequestReplyMode unknownIqRequestReplyMode) { 1387 this.unknownIqRequestReplyMode = Objects.requireNonNull(unknownIqRequestReplyMode, "Mode must not be null"); 1388 } 1389 1390 protected final NonzaCallback.Builder buildNonzaCallback() { 1391 return new NonzaCallback.Builder(this); 1392 } 1393 1394 protected <SN extends Nonza, FN extends Nonza> SN sendAndWaitForResponse(Nonza nonza, Class<SN> successNonzaClass, 1395 Class<FN> failedNonzaClass) 1396 throws NoResponseException, NotConnectedException, InterruptedException, FailedNonzaException { 1397 NonzaCallback.Builder builder = buildNonzaCallback(); 1398 SN successNonza = NonzaCallback.sendAndWaitForResponse(builder, nonza, successNonzaClass, failedNonzaClass); 1399 return successNonza; 1400 } 1401 1402 private void maybeNotifyDebuggerAboutIncoming(TopLevelStreamElement incomingTopLevelStreamElement) { 1403 final SmackDebugger debugger = this.debugger; 1404 if (debugger != null) { 1405 debugger.onIncomingStreamElement(incomingTopLevelStreamElement); 1406 } 1407 } 1408 1409 protected final void parseAndProcessNonza(XmlPullParser parser) throws IOException, XmlPullParserException, SmackParsingException { 1410 ParserUtils.assertAtStartTag(parser); 1411 1412 final int initialDepth = parser.getDepth(); 1413 final String element = parser.getName(); 1414 final String namespace = parser.getNamespace(); 1415 final QName key = new QName(namespace, element); 1416 1417 NonzaProvider<? extends Nonza> nonzaProvider = ProviderManager.getNonzaProvider(key); 1418 if (nonzaProvider == null) { 1419 LOGGER.severe("Unknown nonza: " + key); 1420 ParserUtils.forwardToEndTagOfDepth(parser, initialDepth); 1421 return; 1422 } 1423 1424 List<NonzaCallback> nonzaCallbacks; 1425 synchronized (nonzaCallbacksMap) { 1426 nonzaCallbacks = nonzaCallbacksMap.getAll(key); 1427 nonzaCallbacks = CollectionUtil.newListWith(nonzaCallbacks); 1428 } 1429 if (nonzaCallbacks == null) { 1430 LOGGER.info("No nonza callback for " + key); 1431 ParserUtils.forwardToEndTagOfDepth(parser, initialDepth); 1432 return; 1433 } 1434 1435 Nonza nonza = nonzaProvider.parse(parser, incomingStreamXmlEnvironment); 1436 1437 maybeNotifyDebuggerAboutIncoming(nonza); 1438 1439 for (NonzaCallback nonzaCallback : nonzaCallbacks) { 1440 nonzaCallback.onNonzaReceived(nonza); 1441 } 1442 } 1443 1444 protected void parseAndProcessStanza(XmlPullParser parser) 1445 throws XmlPullParserException, IOException, InterruptedException { 1446 ParserUtils.assertAtStartTag(parser); 1447 int parserDepth = parser.getDepth(); 1448 Stanza stanza = null; 1449 try { 1450 try { 1451 stanza = PacketParserUtils.parseStanza(parser, incomingStreamXmlEnvironment); 1452 } catch (NullPointerException e) { 1453 // Those exceptions should probably be wrapped into a SmackParsingException and therefore likely constitute a missing verification in the throwing parser. 1454 String message = "Smack parser throw unexpected exception '" + e.getMessage() + "', please report this at " + Smack.BUG_REPORT_URL; 1455 LOGGER.log(Level.SEVERE, message, e); 1456 throw new IOException(message, e); 1457 } 1458 } 1459 catch (XmlPullParserException | SmackParsingException | IOException | IllegalArgumentException e) { 1460 CharSequence content = PacketParserUtils.parseContentDepth(parser, 1461 parserDepth); 1462 UnparseableStanza message = new UnparseableStanza(content, e); 1463 ParsingExceptionCallback callback = getParsingExceptionCallback(); 1464 if (callback != null) { 1465 callback.handleUnparsableStanza(message); 1466 } 1467 } 1468 ParserUtils.assertAtEndTag(parser); 1469 if (stanza != null) { 1470 processStanza(stanza); 1471 } 1472 } 1473 1474 /** 1475 * Processes a stanza after it's been fully parsed by looping through the installed 1476 * stanza collectors and listeners and letting them examine the stanza to see if 1477 * they are a match with the filter. 1478 * 1479 * @param stanza the stanza to process. 1480 * @throws InterruptedException if the calling thread was interrupted. 1481 */ 1482 protected void processStanza(final Stanza stanza) throws InterruptedException { 1483 assert stanza != null; 1484 1485 maybeNotifyDebuggerAboutIncoming(stanza); 1486 1487 lastStanzaReceived = System.currentTimeMillis(); 1488 // Deliver the incoming packet to listeners. 1489 invokeStanzaCollectorsAndNotifyRecvListeners(stanza); 1490 } 1491 1492 /** 1493 * Invoke {@link StanzaCollector#processStanza(Stanza)} for every 1494 * StanzaCollector with the given packet. Also notify the receive listeners with a matching stanza filter about the packet. 1495 * <p> 1496 * This method will be invoked by the connections incoming processing thread which may be shared across multiple connections and 1497 * thus it is important that no user code, e.g. in form of a callback, is invoked by this method. For the same reason, 1498 * this method must not block for an extended period of time. 1499 * </p> 1500 * 1501 * @param packet the stanza to notify the StanzaCollectors and receive listeners about. 1502 */ 1503 protected void invokeStanzaCollectorsAndNotifyRecvListeners(final Stanza packet) { 1504 if (packet instanceof IQ) { 1505 final IQ iq = (IQ) packet; 1506 if (iq.isRequestIQ()) { 1507 final IQ iqRequest = iq; 1508 final QName key = iqRequest.getChildElementQName(); 1509 IQRequestHandler iqRequestHandler; 1510 final IQ.Type type = iq.getType(); 1511 switch (type) { 1512 case set: 1513 synchronized (setIqRequestHandler) { 1514 iqRequestHandler = setIqRequestHandler.get(key); 1515 } 1516 break; 1517 case get: 1518 synchronized (getIqRequestHandler) { 1519 iqRequestHandler = getIqRequestHandler.get(key); 1520 } 1521 break; 1522 default: 1523 throw new IllegalStateException("Should only encounter IQ type 'get' or 'set'"); 1524 } 1525 if (iqRequestHandler == null) { 1526 final String iqNamespace = key.getNamespaceURI(); 1527 StanzaError.Condition replyCondition; 1528 switch (unknownIqRequestReplyMode) { 1529 case doNotReply: 1530 return; 1531 case reply: 1532 boolean isKnownNamespace = iqRequestHandlerNamespaces.contains(iqNamespace); 1533 if (isKnownNamespace) { 1534 replyCondition = StanzaError.Condition.feature_not_implemented; 1535 } else { 1536 replyCondition = StanzaError.Condition.service_unavailable; 1537 } 1538 break; 1539 default: 1540 throw new AssertionError(); 1541 } 1542 1543 // If the IQ stanza is of type "get" or "set" with no registered IQ request handler, then answer an 1544 // IQ of type 'error' with condition 'service-unavailable'. 1545 final ErrorIQ errorIQ = IQ.createErrorResponse(iq, StanzaError.getBuilder( 1546 replyCondition).build()); 1547 // Use async sendStanza() here, since if sendStanza() would block, then some connections, e.g. 1548 // XmppNioTcpConnection, would deadlock, as this operation is performed in the same thread that is 1549 asyncGo(() -> { 1550 try { 1551 sendStanza(errorIQ); 1552 } 1553 catch (InterruptedException | NotConnectedException e) { 1554 LOGGER.log(Level.WARNING, "Exception while sending error IQ to unkown IQ request", e); 1555 } 1556 }); 1557 } else { 1558 Executor executorService = null; 1559 switch (iqRequestHandler.getMode()) { 1560 case sync: 1561 executorService = ASYNC_BUT_ORDERED.asExecutorFor(this); 1562 break; 1563 case async: 1564 executorService = this::asyncGoLimited; 1565 break; 1566 } 1567 final IQRequestHandler finalIqRequestHandler = iqRequestHandler; 1568 executorService.execute(new Runnable() { 1569 @Override 1570 public void run() { 1571 IQ response = finalIqRequestHandler.handleIQRequest(iq); 1572 if (response == null) { 1573 // It is not ideal if the IQ request handler does not return an IQ response, because RFC 1574 // 6120 § 8.1.2 does specify that a response is mandatory. But some APIs, mostly the 1575 // file transfer one, does not always return a result, so we need to handle this case. 1576 // Also sometimes a request handler may decide that it's better to not send a response, 1577 // e.g. to avoid presence leaks. 1578 return; 1579 } 1580 1581 assert response.isResponseIQ(); 1582 1583 response.setTo(iqRequest.getFrom()); 1584 response.setStanzaId(iqRequest.getStanzaId()); 1585 try { 1586 sendStanza(response); 1587 } 1588 catch (InterruptedException | NotConnectedException e) { 1589 LOGGER.log(Level.WARNING, "Exception while sending response to IQ request", e); 1590 } 1591 } 1592 }); 1593 } 1594 // The following returns makes it impossible for packet listeners and collectors to 1595 // filter for IQ request stanzas, i.e. IQs of type 'set' or 'get'. This is the 1596 // desired behavior. 1597 return; 1598 } 1599 } 1600 1601 // First handle the async recv listeners. Note that this code is very similar to what follows a few lines below, 1602 // the only difference is that asyncRecvListeners is used here and that the packet listeners are started in 1603 // their own thread. 1604 final Collection<StanzaListener> listenersToNotify = new ArrayList<>(); 1605 extractMatchingListeners(packet, asyncRecvListeners, listenersToNotify); 1606 for (final StanzaListener listener : listenersToNotify) { 1607 asyncGoLimited(new Runnable() { 1608 @Override 1609 public void run() { 1610 try { 1611 listener.processStanza(packet); 1612 } catch (Exception e) { 1613 LOGGER.log(Level.SEVERE, "Exception in async packet listener", e); 1614 } 1615 } 1616 }); 1617 } 1618 1619 // Loop through all collectors and notify the appropriate ones. 1620 for (StanzaCollector collector : collectors) { 1621 collector.processStanza(packet); 1622 } 1623 1624 listenersToNotify.clear(); 1625 extractMatchingListeners(packet, recvListeners, listenersToNotify); 1626 final Semaphore listenerSemaphore = new Semaphore(1 - listenersToNotify.size()); 1627 for (StanzaListener stanzaListener : listenersToNotify) { 1628 asyncGoLimited(() -> { 1629 try { 1630 stanzaListener.processStanza(packet); 1631 } 1632 catch (NotConnectedException e) { 1633 LOGGER.log(Level.WARNING, "Got not connected exception, aborting", e); 1634 } 1635 catch (Exception e) { 1636 LOGGER.log(Level.SEVERE, "Exception in packet listener", e); 1637 } finally { 1638 listenerSemaphore.release(); 1639 } 1640 }); 1641 } 1642 listenerSemaphore.acquireUninterruptibly(); 1643 1644 // Notify the receive listeners interested in the packet 1645 listenersToNotify.clear(); 1646 extractMatchingListeners(packet, syncRecvListeners, listenersToNotify); 1647 // Decouple incoming stanza processing from listener invocation. Unlike async listeners, this uses a single 1648 // threaded executor service and therefore keeps the order. 1649 ASYNC_BUT_ORDERED.performAsyncButOrdered(this, new Runnable() { 1650 @Override 1651 public void run() { 1652 // As listeners are able to remove themselves and because the timepoint where it is decided to invoke a 1653 // listener is a different timepoint where the listener is actually invoked (here), we have to check 1654 // again if the listener is still active. 1655 Iterator<StanzaListener> it = listenersToNotify.iterator(); 1656 synchronized (syncRecvListeners) { 1657 while (it.hasNext()) { 1658 StanzaListener stanzaListener = it.next(); 1659 if (!syncRecvListeners.containsKey(stanzaListener)) { 1660 // The listener was removed from syncRecvListener, also remove him from listenersToNotify. 1661 it.remove(); 1662 } 1663 } 1664 } 1665 for (StanzaListener listener : listenersToNotify) { 1666 try { 1667 listener.processStanza(packet); 1668 } catch (NotConnectedException e) { 1669 LOGGER.log(Level.WARNING, "Got not connected exception, aborting", e); 1670 break; 1671 } catch (Exception e) { 1672 LOGGER.log(Level.SEVERE, "Exception in packet listener", e); 1673 } 1674 } 1675 } 1676 }); 1677 } 1678 1679 private static void extractMatchingListeners(Stanza stanza, Map<StanzaListener, ListenerWrapper> listeners, 1680 Collection<StanzaListener> listenersToNotify) { 1681 synchronized (listeners) { 1682 for (ListenerWrapper listenerWrapper : listeners.values()) { 1683 if (listenerWrapper.filterMatches(stanza)) { 1684 listenersToNotify.add(listenerWrapper.getListener()); 1685 } 1686 } 1687 } 1688 } 1689 1690 /** 1691 * Sets whether the connection has already logged in the server. This method assures that the 1692 * {@link #wasAuthenticated} flag is never reset once it has ever been set. 1693 * 1694 */ 1695 protected void setWasAuthenticated() { 1696 // Never reset the flag if the connection has ever been authenticated 1697 if (!wasAuthenticated) { 1698 wasAuthenticated = authenticated; 1699 } 1700 } 1701 1702 protected void callConnectionConnectingListener() { 1703 for (ConnectionListener listener : connectionListeners) { 1704 listener.connecting(this); 1705 } 1706 } 1707 1708 protected void callConnectionConnectedListener() { 1709 for (ConnectionListener listener : connectionListeners) { 1710 listener.connected(this); 1711 } 1712 } 1713 1714 protected void callConnectionAuthenticatedListener(boolean resumed) { 1715 for (ConnectionListener listener : connectionListeners) { 1716 try { 1717 listener.authenticated(this, resumed); 1718 } catch (Exception e) { 1719 // Catch and print any exception so we can recover 1720 // from a faulty listener and finish the shutdown process 1721 LOGGER.log(Level.SEVERE, "Exception in authenticated listener", e); 1722 } 1723 } 1724 } 1725 1726 void callConnectionClosedListener() { 1727 for (ConnectionListener listener : connectionListeners) { 1728 try { 1729 listener.connectionClosed(); 1730 } 1731 catch (Exception e) { 1732 // Catch and print any exception so we can recover 1733 // from a faulty listener and finish the shutdown process 1734 LOGGER.log(Level.SEVERE, "Error in listener while closing connection", e); 1735 } 1736 } 1737 } 1738 1739 private void callConnectionClosedOnErrorListener(Exception e) { 1740 boolean logWarning = true; 1741 if (e instanceof StreamErrorException) { 1742 StreamErrorException see = (StreamErrorException) e; 1743 if (see.getStreamError().getCondition() == StreamError.Condition.not_authorized 1744 && wasAuthenticated) { 1745 logWarning = false; 1746 LOGGER.log(Level.FINE, 1747 "Connection closed with not-authorized stream error after it was already authenticated. The account was likely deleted/unregistered on the server"); 1748 } 1749 } 1750 if (logWarning) { 1751 LOGGER.log(Level.WARNING, "Connection " + this + " closed with error", e); 1752 } 1753 for (ConnectionListener listener : connectionListeners) { 1754 try { 1755 listener.connectionClosedOnError(e); 1756 } 1757 catch (Exception e2) { 1758 // Catch and print any exception so we can recover 1759 // from a faulty listener 1760 LOGGER.log(Level.SEVERE, "Error in listener while closing connection", e2); 1761 } 1762 } 1763 } 1764 1765 /** 1766 * A wrapper class to associate a stanza filter with a listener. 1767 */ 1768 protected static class ListenerWrapper { 1769 1770 private final StanzaListener packetListener; 1771 private final StanzaFilter packetFilter; 1772 1773 /** 1774 * Create a class which associates a stanza filter with a listener. 1775 * 1776 * @param packetListener the stanza listener. 1777 * @param packetFilter the associated filter or null if it listen for all packets. 1778 */ 1779 public ListenerWrapper(StanzaListener packetListener, StanzaFilter packetFilter) { 1780 this.packetListener = packetListener; 1781 this.packetFilter = packetFilter; 1782 } 1783 1784 public boolean filterMatches(Stanza packet) { 1785 return packetFilter == null || packetFilter.accept(packet); 1786 } 1787 1788 public StanzaListener getListener() { 1789 return packetListener; 1790 } 1791 } 1792 1793 /** 1794 * A wrapper class to associate a stanza filter with an interceptor. 1795 */ 1796 @Deprecated 1797 // TODO: Remove once addStanzaInterceptor is gone. 1798 protected static class InterceptorWrapper { 1799 1800 private final StanzaListener packetInterceptor; 1801 private final StanzaFilter packetFilter; 1802 1803 /** 1804 * Create a class which associates a stanza filter with an interceptor. 1805 * 1806 * @param packetInterceptor the interceptor. 1807 * @param packetFilter the associated filter or null if it intercepts all packets. 1808 */ 1809 public InterceptorWrapper(StanzaListener packetInterceptor, StanzaFilter packetFilter) { 1810 this.packetInterceptor = packetInterceptor; 1811 this.packetFilter = packetFilter; 1812 } 1813 1814 public boolean filterMatches(Stanza packet) { 1815 return packetFilter == null || packetFilter.accept(packet); 1816 } 1817 1818 public StanzaListener getInterceptor() { 1819 return packetInterceptor; 1820 } 1821 } 1822 1823 private static final class GenericInterceptorWrapper<MPB extends MessageOrPresenceBuilder<MP, MPB>, MP extends MessageOrPresence<MPB>> { 1824 private final Consumer<MPB> stanzaInterceptor; 1825 private final Predicate<MP> stanzaFilter; 1826 1827 private GenericInterceptorWrapper(Consumer<MPB> stanzaInterceptor, Predicate<MP> stanzaFilter) { 1828 this.stanzaInterceptor = stanzaInterceptor; 1829 this.stanzaFilter = stanzaFilter; 1830 } 1831 1832 private boolean filterMatches(MP stanza) { 1833 return stanzaFilter == null || stanzaFilter.test(stanza); 1834 } 1835 1836 public Consumer<MPB> getInterceptor() { 1837 return stanzaInterceptor; 1838 } 1839 } 1840 1841 @Override 1842 public int getConnectionCounter() { 1843 return connectionCounterValue; 1844 } 1845 1846 @Override 1847 public void setFromMode(FromMode fromMode) { 1848 this.fromMode = fromMode; 1849 } 1850 1851 @Override 1852 public FromMode getFromMode() { 1853 return this.fromMode; 1854 } 1855 1856 protected final void parseFeatures(XmlPullParser parser) throws XmlPullParserException, IOException, SmackParsingException { 1857 streamFeatures.clear(); 1858 final int initialDepth = parser.getDepth(); 1859 while (true) { 1860 XmlPullParser.Event eventType = parser.next(); 1861 1862 if (eventType == XmlPullParser.Event.START_ELEMENT && parser.getDepth() == initialDepth + 1) { 1863 XmlElement streamFeature = null; 1864 String name = parser.getName(); 1865 String namespace = parser.getNamespace(); 1866 switch (name) { 1867 case StartTls.ELEMENT: 1868 streamFeature = PacketParserUtils.parseStartTlsFeature(parser); 1869 break; 1870 case Mechanisms.ELEMENT: 1871 streamFeature = new Mechanisms(PacketParserUtils.parseMechanisms(parser)); 1872 break; 1873 case Bind.ELEMENT: 1874 streamFeature = Bind.Feature.INSTANCE; 1875 break; 1876 case Session.ELEMENT: 1877 streamFeature = PacketParserUtils.parseSessionFeature(parser); 1878 break; 1879 case Compress.Feature.ELEMENT: 1880 streamFeature = PacketParserUtils.parseCompressionFeature(parser); 1881 break; 1882 default: 1883 ExtensionElementProvider<ExtensionElement> provider = ProviderManager.getStreamFeatureProvider(name, namespace); 1884 if (provider != null) { 1885 streamFeature = provider.parse(parser, incomingStreamXmlEnvironment); 1886 } 1887 break; 1888 } 1889 if (streamFeature != null) { 1890 addStreamFeature(streamFeature); 1891 } 1892 } 1893 else if (eventType == XmlPullParser.Event.END_ELEMENT && parser.getDepth() == initialDepth) { 1894 break; 1895 } 1896 } 1897 } 1898 1899 protected final void parseFeaturesAndNotify(XmlPullParser parser) throws Exception { 1900 parseFeatures(parser); 1901 1902 if (hasFeature(Mechanisms.ELEMENT, Mechanisms.NAMESPACE)) { 1903 // Only proceed with SASL auth if TLS is disabled or if the server doesn't announce it 1904 if (!hasFeature(StartTls.ELEMENT, StartTls.NAMESPACE) 1905 || config.getSecurityMode() == SecurityMode.disabled) { 1906 tlsHandled = saslFeatureReceived = true; 1907 notifyWaitingThreads(); 1908 } 1909 } 1910 1911 // If the server reported the bind feature then we are that that we did SASL and maybe 1912 // STARTTLS. We can then report that the last 'stream:features' have been parsed 1913 if (hasFeature(Bind.ELEMENT, Bind.NAMESPACE)) { 1914 if (!hasFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE) 1915 || !config.isCompressionEnabled()) { 1916 // This where the last stream features from the server, either it did not contain 1917 // compression or we disabled it. 1918 lastFeaturesReceived = true; 1919 notifyWaitingThreads(); 1920 } 1921 } 1922 afterFeaturesReceived(); 1923 } 1924 1925 @SuppressWarnings("unused") 1926 protected void afterFeaturesReceived() throws SecurityRequiredException, NotConnectedException, InterruptedException { 1927 // Default implementation does nothing 1928 } 1929 1930 @SuppressWarnings({"unchecked", "TypeParameterUnusedInFormals"}) 1931 @Override 1932 public <F extends XmlElement> F getFeature(QName qname) { 1933 return (F) streamFeatures.get(qname); 1934 } 1935 1936 @Override 1937 public boolean hasFeature(QName qname) { 1938 return streamFeatures.containsKey(qname); 1939 } 1940 1941 protected void addStreamFeature(XmlElement feature) { 1942 QName key = feature.getQName(); 1943 streamFeatures.put(key, feature); 1944 } 1945 1946 @Override 1947 public SmackFuture<IQ, Exception> sendIqRequestAsync(IQ request) { 1948 return sendIqRequestAsync(request, getReplyTimeout()); 1949 } 1950 1951 @Override 1952 public SmackFuture<IQ, Exception> sendIqRequestAsync(IQ request, long timeout) { 1953 StanzaFilter replyFilter = new IQReplyFilter(request, this); 1954 return sendAsync(request, replyFilter, timeout); 1955 } 1956 1957 @Override 1958 public <S extends Stanza> SmackFuture<S, Exception> sendAsync(S stanza, final StanzaFilter replyFilter) { 1959 return sendAsync(stanza, replyFilter, getReplyTimeout()); 1960 } 1961 1962 @SuppressWarnings("FutureReturnValueIgnored") 1963 @Override 1964 public <S extends Stanza> SmackFuture<S, Exception> sendAsync(S stanza, final StanzaFilter replyFilter, long timeout) { 1965 Objects.requireNonNull(stanza, "stanza must not be null"); 1966 // While Smack allows to add PacketListeners with a PacketFilter value of 'null', we 1967 // disallow it here in the async API as it makes no sense 1968 Objects.requireNonNull(replyFilter, "replyFilter must not be null"); 1969 1970 final InternalSmackFuture<S, Exception> future = new InternalSmackFuture<>(); 1971 1972 final StanzaListener stanzaListener = new StanzaListener() { 1973 @Override 1974 public void processStanza(Stanza stanza) throws NotConnectedException, InterruptedException { 1975 boolean removed = removeAsyncStanzaListener(this); 1976 if (!removed) { 1977 // We lost a race against the "no response" handling runnable. Avoid calling the callback, as the 1978 // exception callback will be invoked (if any). 1979 return; 1980 } 1981 try { 1982 XMPPErrorException.ifHasErrorThenThrow(stanza); 1983 @SuppressWarnings("unchecked") 1984 S s = (S) stanza; 1985 future.setResult(s); 1986 } 1987 catch (XMPPErrorException exception) { 1988 future.setException(exception); 1989 } 1990 } 1991 }; 1992 schedule(new Runnable() { 1993 @Override 1994 public void run() { 1995 boolean removed = removeAsyncStanzaListener(stanzaListener); 1996 if (!removed) { 1997 // We lost a race against the stanza listener, he already removed itself because he received a 1998 // reply. There is nothing more to do here. 1999 return; 2000 } 2001 2002 // If the packetListener got removed, then it was never run and 2003 // we never received a response, inform the exception callback 2004 Exception exception; 2005 if (!isConnected()) { 2006 // If the connection is no longer connected, throw a not connected exception. 2007 exception = new NotConnectedException(AbstractXMPPConnection.this, replyFilter); 2008 } 2009 else { 2010 exception = NoResponseException.newWith(AbstractXMPPConnection.this, replyFilter); 2011 } 2012 future.setException(exception); 2013 } 2014 }, timeout, TimeUnit.MILLISECONDS); 2015 2016 addAsyncStanzaListener(stanzaListener, replyFilter); 2017 try { 2018 sendStanzaNonBlocking(stanza); 2019 } 2020 catch (NotConnectedException | OutgoingQueueFullException exception) { 2021 future.setException(exception); 2022 } 2023 2024 return future; 2025 } 2026 2027 @SuppressWarnings("FutureReturnValueIgnored") 2028 @Override 2029 public void addOneTimeSyncCallback(final StanzaListener callback, final StanzaFilter packetFilter) { 2030 final StanzaListener packetListener = new StanzaListener() { 2031 @Override 2032 public void processStanza(Stanza packet) throws NotConnectedException, InterruptedException, NotLoggedInException { 2033 try { 2034 callback.processStanza(packet); 2035 } finally { 2036 removeSyncStanzaListener(this); 2037 } 2038 } 2039 }; 2040 addSyncStanzaListener(packetListener, packetFilter); 2041 schedule(new Runnable() { 2042 @Override 2043 public void run() { 2044 removeSyncStanzaListener(packetListener); 2045 } 2046 }, getReplyTimeout(), TimeUnit.MILLISECONDS); 2047 } 2048 2049 @Override 2050 public IQRequestHandler registerIQRequestHandler(final IQRequestHandler iqRequestHandler) { 2051 final QName key = iqRequestHandler.getQName(); 2052 IQRequestHandler previous; 2053 switch (iqRequestHandler.getType()) { 2054 case set: 2055 synchronized (setIqRequestHandler) { 2056 previous = setIqRequestHandler.put(key, iqRequestHandler); 2057 } 2058 break; 2059 case get: 2060 synchronized (getIqRequestHandler) { 2061 previous = getIqRequestHandler.put(key, iqRequestHandler); 2062 } 2063 break; 2064 default: 2065 throw new IllegalArgumentException("Only IQ type of 'get' and 'set' allowed"); 2066 } 2067 2068 final String iqNamespace = key.getNamespaceURI(); 2069 synchronized (iqRequestHandlerNamespacesReferenceCounters) { 2070 Integer newValue; 2071 Integer counter = iqRequestHandlerNamespacesReferenceCounters.get(iqNamespace); 2072 if (counter == null) { 2073 iqRequestHandlerNamespaces.add(iqNamespace); 2074 newValue = 0; 2075 } else { 2076 newValue = counter.intValue() + 1; 2077 } 2078 iqRequestHandlerNamespacesReferenceCounters.put(iqNamespace, newValue); 2079 } 2080 return previous; 2081 } 2082 2083 @Override 2084 public final IQRequestHandler unregisterIQRequestHandler(IQRequestHandler iqRequestHandler) { 2085 return unregisterIQRequestHandler(iqRequestHandler.getElement(), iqRequestHandler.getNamespace(), 2086 iqRequestHandler.getType()); 2087 } 2088 2089 @Override 2090 public IQRequestHandler unregisterIQRequestHandler(String element, String namespace, IQ.Type type) { 2091 IQRequestHandler unregisteredHandler; 2092 final QName key = new QName(namespace, element); 2093 switch (type) { 2094 case set: 2095 synchronized (setIqRequestHandler) { 2096 unregisteredHandler = setIqRequestHandler.remove(key); 2097 } 2098 break; 2099 case get: 2100 synchronized (getIqRequestHandler) { 2101 unregisteredHandler = getIqRequestHandler.remove(key); 2102 } 2103 break; 2104 default: 2105 throw new IllegalArgumentException("Only IQ type of 'get' and 'set' allowed"); 2106 } 2107 2108 if (unregisteredHandler == null) { 2109 return null; 2110 } 2111 2112 synchronized (iqRequestHandlerNamespacesReferenceCounters) { 2113 int newValue = iqRequestHandlerNamespacesReferenceCounters.get(namespace).intValue() - 1; 2114 if (newValue == 0) { 2115 iqRequestHandlerNamespacesReferenceCounters.remove(namespace); 2116 iqRequestHandlerNamespaces.remove(namespace); 2117 } else { 2118 iqRequestHandlerNamespacesReferenceCounters.put(namespace, newValue); 2119 } 2120 } 2121 2122 return unregisteredHandler; 2123 } 2124 2125 private long lastStanzaReceived; 2126 2127 @Override 2128 public long getLastStanzaReceived() { 2129 return lastStanzaReceived; 2130 } 2131 2132 /** 2133 * Get the timestamp when the connection was the first time authenticated, i.e., when the first successful login was 2134 * performed. Note that this value is not reset on disconnect, so it represents the timestamp from the last 2135 * authenticated connection. The value is also not reset on stream resumption. 2136 * 2137 * @return the timestamp or {@code null}. 2138 * @since 4.3.3 2139 */ 2140 public final ZonedDateTime getAuthenticatedConnectionInitiallyEstablishedTimestamp() { 2141 return authenticatedConnectionInitiallyEstablishedTimestamp; 2142 } 2143 2144 /** 2145 * Install a parsing exception callback, which will be invoked once an exception is encountered while parsing a 2146 * stanza. 2147 * 2148 * @param callback the callback to install 2149 */ 2150 public void setParsingExceptionCallback(ParsingExceptionCallback callback) { 2151 parsingExceptionCallback = callback; 2152 } 2153 2154 /** 2155 * Get the current active parsing exception callback. 2156 * 2157 * @return the active exception callback or null if there is none 2158 */ 2159 public ParsingExceptionCallback getParsingExceptionCallback() { 2160 return parsingExceptionCallback; 2161 } 2162 2163 @Override 2164 public final String toString() { 2165 EntityFullJid localEndpoint = getUser(); 2166 String localEndpointString = localEndpoint == null ? "not-authenticated" : localEndpoint.toString(); 2167 return getClass().getSimpleName() + '[' + localEndpointString + "] (" + getConnectionCounter() + ')'; 2168 } 2169 2170 /** 2171 * A queue of deferred runnables that where not executed immediately because {@link #currentAsyncRunnables} reached 2172 * {@link #maxAsyncRunnables}. Note that we use a {@code LinkedList} in order to avoid space blowups in case the 2173 * list ever becomes very big and shrinks again. 2174 */ 2175 @SuppressWarnings("JdkObsolete") 2176 private final Queue<Runnable> deferredAsyncRunnables = new LinkedList<>(); 2177 2178 private int deferredAsyncRunnablesCount; 2179 2180 private int deferredAsyncRunnablesCountPrevious; 2181 2182 private int maxAsyncRunnables = SmackConfiguration.getDefaultConcurrencyLevelLimit(); 2183 2184 private int currentAsyncRunnables; 2185 2186 protected void asyncGoLimited(final Runnable runnable) { 2187 Runnable wrappedRunnable = new Runnable() { 2188 @Override 2189 public void run() { 2190 runnable.run(); 2191 2192 synchronized (deferredAsyncRunnables) { 2193 Runnable defferredRunnable = deferredAsyncRunnables.poll(); 2194 if (defferredRunnable == null) { 2195 currentAsyncRunnables--; 2196 } else { 2197 deferredAsyncRunnablesCount--; 2198 asyncGo(defferredRunnable); 2199 } 2200 } 2201 } 2202 }; 2203 2204 synchronized (deferredAsyncRunnables) { 2205 if (currentAsyncRunnables < maxAsyncRunnables) { 2206 currentAsyncRunnables++; 2207 asyncGo(wrappedRunnable); 2208 } else { 2209 deferredAsyncRunnablesCount++; 2210 deferredAsyncRunnables.add(wrappedRunnable); 2211 } 2212 2213 final int HIGH_WATERMARK = 100; 2214 final int INFORM_WATERMARK = 20; 2215 2216 final int deferredAsyncRunnablesCount = this.deferredAsyncRunnablesCount; 2217 2218 if (deferredAsyncRunnablesCount >= HIGH_WATERMARK 2219 && deferredAsyncRunnablesCountPrevious < HIGH_WATERMARK) { 2220 LOGGER.log(Level.WARNING, "High watermark of " + HIGH_WATERMARK + " simultaneous executing runnables reached"); 2221 } else if (deferredAsyncRunnablesCount >= INFORM_WATERMARK 2222 && deferredAsyncRunnablesCountPrevious < INFORM_WATERMARK) { 2223 LOGGER.log(Level.INFO, INFORM_WATERMARK + " simultaneous executing runnables reached"); 2224 } 2225 2226 deferredAsyncRunnablesCountPrevious = deferredAsyncRunnablesCount; 2227 } 2228 } 2229 2230 public void setMaxAsyncOperations(int maxAsyncOperations) { 2231 if (maxAsyncOperations < 1) { 2232 throw new IllegalArgumentException("Max async operations must be greater than 0"); 2233 } 2234 2235 synchronized (deferredAsyncRunnables) { 2236 maxAsyncRunnables = maxAsyncOperations; 2237 } 2238 } 2239 2240 protected static void asyncGo(Runnable runnable) { 2241 CACHED_EXECUTOR_SERVICE.execute(runnable); 2242 } 2243 2244 @SuppressWarnings("static-method") 2245 protected final SmackReactor getReactor() { 2246 return SMACK_REACTOR; 2247 } 2248 2249 protected static ScheduledAction schedule(Runnable runnable, long delay, TimeUnit unit) { 2250 return SMACK_REACTOR.schedule(runnable, delay, unit, ScheduledAction.Kind.NonBlocking); 2251 } 2252 2253 /** 2254 * Must be called when a XMPP stream open tag is encountered. Sets values like the stream ID and the incoming stream 2255 * XML environment. 2256 * <p> 2257 * This method also returns a matching stream close tag. For example if the stream open is {@code <stream …>}, then 2258 * {@code </stream>} is returned. But if it is {@code <stream:stream>}, then {@code </stream:stream>} is returned. 2259 * Or if it is {@code <foo:stream>}, then {@code </foo:stream>} is returned. 2260 * </p> 2261 * 2262 * @param parser an XML parser that is positioned at the start of the stream open. 2263 * @return a String representing the corresponding stream end tag. 2264 */ 2265 protected String onStreamOpen(XmlPullParser parser) { 2266 assert StreamOpen.ETHERX_JABBER_STREAMS_NAMESPACE.equals(parser.getNamespace()) : parser.getNamespace() 2267 + " is not " + StreamOpen.ETHERX_JABBER_STREAMS_NAMESPACE; 2268 assert StreamOpen.UNPREFIXED_ELEMENT.equals(parser.getName()); 2269 2270 streamId = parser.getAttributeValue("id"); 2271 incomingStreamXmlEnvironment = XmlEnvironment.from(parser); 2272 2273 String reportedServerDomainString = parser.getAttributeValue("from"); 2274 // RFC 6120 § 4.7.1. makes no explicit statement whether or not 'from' in the stream open from the server 2275 // in c2s connections is required or not. 2276 if (reportedServerDomainString != null) { 2277 DomainBareJid reportedServerDomain; 2278 try { 2279 reportedServerDomain = JidCreate.domainBareFrom(reportedServerDomainString); 2280 DomainBareJid configuredXmppServiceDomain = config.getXMPPServiceDomain(); 2281 if (!configuredXmppServiceDomain.equals(reportedServerDomain)) { 2282 LOGGER.warning("Domain reported by server '" + reportedServerDomain 2283 + "' does not match configured domain '" + configuredXmppServiceDomain + "'"); 2284 } 2285 } catch (XmppStringprepException e) { 2286 LOGGER.log(Level.WARNING, "XMPP service domain '" + reportedServerDomainString 2287 + "' as reported by server could not be transformed to a valid JID", e); 2288 } 2289 } 2290 2291 String prefix = parser.getPrefix(); 2292 if (StringUtils.isNotEmpty(prefix)) { 2293 return "</" + prefix + ":stream>"; 2294 } 2295 return "</stream>"; 2296 } 2297 2298 protected final void sendStreamOpen() throws NotConnectedException, InterruptedException { 2299 // If possible, provide the receiving entity of the stream open tag, i.e. the server, as much information as 2300 // possible. The 'to' attribute is *always* available. The 'from' attribute if set by the user and no external 2301 // mechanism is used to determine the local entity (user). And the 'id' attribute is available after the first 2302 // response from the server (see e.g. RFC 6120 § 9.1.1 Step 2.) 2303 DomainBareJid to = getXMPPServiceDomain(); 2304 CharSequence from = null; 2305 CharSequence localpart = config.getUsername(); 2306 if (localpart != null) { 2307 from = XmppStringUtils.completeJidFrom(localpart, to); 2308 } 2309 String id = getStreamId(); 2310 String lang = config.getXmlLang(); 2311 2312 AbstractStreamOpen streamOpen = getStreamOpen(to, from, id, lang); 2313 sendNonza(streamOpen); 2314 updateOutgoingStreamXmlEnvironmentOnStreamOpen(streamOpen); 2315 } 2316 2317 protected AbstractStreamOpen getStreamOpen(DomainBareJid to, CharSequence from, String id, String lang) { 2318 return new StreamOpen(to, from, id, lang); 2319 } 2320 2321 protected void updateOutgoingStreamXmlEnvironmentOnStreamOpen(AbstractStreamOpen streamOpen) { 2322 XmlEnvironment.Builder xmlEnvironmentBuilder = XmlEnvironment.builder(); 2323 xmlEnvironmentBuilder.with(streamOpen); 2324 outgoingStreamXmlEnvironment = xmlEnvironmentBuilder.build(); 2325 } 2326 2327 protected final SmackTlsContext getSmackTlsContext() { 2328 return config.smackTlsContext; 2329 } 2330}