001/**
002 *
003 * Copyright 2009 Jive Software.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack;
018
019import java.io.IOException;
020import java.io.Reader;
021import java.io.Writer;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.HashMap;
025import java.util.LinkedHashMap;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Queue;
030import java.util.Set;
031import java.util.concurrent.ConcurrentLinkedQueue;
032import java.util.concurrent.CopyOnWriteArraySet;
033import java.util.concurrent.Executor;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.Executors;
036import java.util.concurrent.ScheduledExecutorService;
037import java.util.concurrent.ScheduledFuture;
038import java.util.concurrent.ThreadFactory;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.atomic.AtomicInteger;
041import java.util.concurrent.locks.Lock;
042import java.util.concurrent.locks.ReentrantLock;
043import java.util.logging.Level;
044import java.util.logging.Logger;
045
046import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
047import org.jivesoftware.smack.SmackConfiguration.UnknownIqRequestReplyMode;
048import org.jivesoftware.smack.SmackException.AlreadyConnectedException;
049import org.jivesoftware.smack.SmackException.AlreadyLoggedInException;
050import org.jivesoftware.smack.SmackException.NoResponseException;
051import org.jivesoftware.smack.SmackException.NotConnectedException;
052import org.jivesoftware.smack.SmackException.NotLoggedInException;
053import org.jivesoftware.smack.SmackException.ResourceBindingNotOfferedException;
054import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException;
055import org.jivesoftware.smack.SmackException.SecurityRequiredException;
056import org.jivesoftware.smack.SmackFuture.InternalSmackFuture;
057import org.jivesoftware.smack.XMPPException.StreamErrorException;
058import org.jivesoftware.smack.XMPPException.XMPPErrorException;
059import org.jivesoftware.smack.compress.packet.Compress;
060import org.jivesoftware.smack.compression.XMPPInputOutputStream;
061import org.jivesoftware.smack.debugger.SmackDebugger;
062import org.jivesoftware.smack.debugger.SmackDebuggerFactory;
063import org.jivesoftware.smack.filter.IQReplyFilter;
064import org.jivesoftware.smack.filter.StanzaFilter;
065import org.jivesoftware.smack.filter.StanzaIdFilter;
066import org.jivesoftware.smack.iqrequest.IQRequestHandler;
067import org.jivesoftware.smack.packet.Bind;
068import org.jivesoftware.smack.packet.ErrorIQ;
069import org.jivesoftware.smack.packet.ExtensionElement;
070import org.jivesoftware.smack.packet.IQ;
071import org.jivesoftware.smack.packet.Mechanisms;
072import org.jivesoftware.smack.packet.Message;
073import org.jivesoftware.smack.packet.Nonza;
074import org.jivesoftware.smack.packet.Presence;
075import org.jivesoftware.smack.packet.Session;
076import org.jivesoftware.smack.packet.Stanza;
077import org.jivesoftware.smack.packet.StanzaError;
078import org.jivesoftware.smack.packet.StartTls;
079import org.jivesoftware.smack.packet.StreamError;
080import org.jivesoftware.smack.parsing.ParsingExceptionCallback;
081import org.jivesoftware.smack.provider.ExtensionElementProvider;
082import org.jivesoftware.smack.provider.ProviderManager;
083import org.jivesoftware.smack.sasl.core.SASLAnonymous;
084import org.jivesoftware.smack.util.Async;
085import org.jivesoftware.smack.util.DNSUtil;
086import org.jivesoftware.smack.util.Objects;
087import org.jivesoftware.smack.util.PacketParserUtils;
088import org.jivesoftware.smack.util.ParserUtils;
089import org.jivesoftware.smack.util.StringUtils;
090import org.jivesoftware.smack.util.dns.HostAddress;
091
092import org.jxmpp.jid.DomainBareJid;
093import org.jxmpp.jid.EntityFullJid;
094import org.jxmpp.jid.Jid;
095import org.jxmpp.jid.parts.Resourcepart;
096import org.jxmpp.util.XmppStringUtils;
097import org.minidns.dnsname.DnsName;
098import org.xmlpull.v1.XmlPullParser;
099
100
101/**
102 * This abstract class is commonly used as super class for XMPP connection mechanisms like TCP and BOSH. Hence it
103 * provides the methods for connection state management, like {@link #connect()}, {@link #login()} and
104 * {@link #disconnect()} (which are deliberately not provided by the {@link XMPPConnection} interface).
105 * <p>
106 * <b>Note:</b> The default entry point to Smack's documentation is {@link XMPPConnection}. If you are getting started
107 * with Smack, then head over to {@link XMPPConnection} and the come back here.
108 * </p>
109 * <h2>Parsing Exceptions</h2>
110 * <p>
111 * In case a Smack parser (Provider) throws those exceptions are handled over to the {@link ParsingExceptionCallback}. A
112 * common cause for a provider throwing is illegal input, for example a non-numeric String where only Integers are
113 * allowed. Smack's <em>default behavior</em> follows the <b>"fail-hard per default"</b> principle leading to a
114 * termination of the connection on parsing exceptions. This default was chosen to make users eventually aware that they
115 * should configure their own callback and handle those exceptions to prevent the disconnect. Handle a parsing exception
116 * could be as simple as using a non-throwing no-op callback, which would cause the faulty stream element to be taken
117 * out of the stream, i.e., Smack behaves like that element was never received.
118 * </p>
119 * <p>
120 * If the parsing exception is because Smack received illegal input, then please consider informing the authors of the
121 * originating entity about that. If it was thrown because of an bug in a Smack parser, then please consider filling a
122 * bug with Smack.
123 * </p>
124 * <h3>Managing the parsing exception callback</h3>
125 * <p>
126 * The "fail-hard per default" behavior is achieved by using the
127 * {@link org.jivesoftware.smack.parsing.ExceptionThrowingCallbackWithHint} as default parsing exception callback. You
128 * can change the behavior using {@link #setParsingExceptionCallback(ParsingExceptionCallback)} to set a new callback.
129 * Use {@link org.jivesoftware.smack.SmackConfiguration#setDefaultParsingExceptionCallback(ParsingExceptionCallback)} to
130 * set the default callback.
131 * </p>
132 */
133public abstract class AbstractXMPPConnection implements XMPPConnection {
134    private static final Logger LOGGER = Logger.getLogger(AbstractXMPPConnection.class.getName());
135
136    /**
137     * Counter to uniquely identify connections that are created.
138     */
139    private static final AtomicInteger connectionCounter = new AtomicInteger(0);
140
141    static {
142        // Ensure the SmackConfiguration class is loaded by calling a method in it.
143        SmackConfiguration.getVersion();
144    }
145
146    /**
147     * A collection of ConnectionListeners which listen for connection closing
148     * and reconnection events.
149     */
150    protected final Set<ConnectionListener> connectionListeners =
151            new CopyOnWriteArraySet<>();
152
153    /**
154     * A collection of StanzaCollectors which collects packets for a specified filter
155     * and perform blocking and polling operations on the result queue.
156     * <p>
157     * We use a ConcurrentLinkedQueue here, because its Iterator is weakly
158     * consistent and we want {@link #invokeStanzaCollectorsAndNotifyRecvListeners(Stanza)} for-each
159     * loop to be lock free. As drawback, removing a StanzaCollector is O(n).
160     * The alternative would be a synchronized HashSet, but this would mean a
161     * synchronized block around every usage of <code>collectors</code>.
162     * </p>
163     */
164    private final Collection<StanzaCollector> collectors = new ConcurrentLinkedQueue<>();
165
166    /**
167     * List of PacketListeners that will be notified synchronously when a new stanza was received.
168     */
169    private final Map<StanzaListener, ListenerWrapper> syncRecvListeners = new LinkedHashMap<>();
170
171    /**
172     * List of PacketListeners that will be notified asynchronously when a new stanza was received.
173     */
174    private final Map<StanzaListener, ListenerWrapper> asyncRecvListeners = new LinkedHashMap<>();
175
176    /**
177     * List of PacketListeners that will be notified when a new stanza was sent.
178     */
179    private final Map<StanzaListener, ListenerWrapper> sendListeners =
180            new HashMap<>();
181
182    /**
183     * List of PacketListeners that will be notified when a new stanza is about to be
184     * sent to the server. These interceptors may modify the stanza before it is being
185     * actually sent to the server.
186     */
187    private final Map<StanzaListener, InterceptorWrapper> interceptors =
188            new HashMap<>();
189
190    protected final Lock connectionLock = new ReentrantLock();
191
192    protected final Map<String, ExtensionElement> streamFeatures = new HashMap<>();
193
194    /**
195     * The full JID of the authenticated user, as returned by the resource binding response of the server.
196     * <p>
197     * It is important that we don't infer the user from the login() arguments and the configurations service name, as,
198     * for example, when SASL External is used, the username is not given to login but taken from the 'external'
199     * certificate.
200     * </p>
201     */
202    protected EntityFullJid user;
203
204    protected boolean connected = false;
205
206    /**
207     * The stream ID, see RFC 6120 ยง 4.7.3
208     */
209    protected String streamId;
210
211    /**
212     * The timeout to wait for a reply in milliseconds.
213     */
214    private long replyTimeout = SmackConfiguration.getDefaultReplyTimeout();
215
216    /**
217     * The SmackDebugger allows to log and debug XML traffic.
218     */
219    protected final SmackDebugger debugger;
220
221    /**
222     * The Reader which is used for the debugger.
223     */
224    protected Reader reader;
225
226    /**
227     * The Writer which is used for the debugger.
228     */
229    protected Writer writer;
230
231    protected final SynchronizationPoint<SmackException> tlsHandled = new SynchronizationPoint<>(this, "establishing TLS");
232
233    /**
234     * Set to success if the last features stanza from the server has been parsed. A XMPP connection
235     * handshake can invoke multiple features stanzas, e.g. when TLS is activated a second feature
236     * stanza is send by the server. This is set to true once the last feature stanza has been
237     * parsed.
238     */
239    protected final SynchronizationPoint<SmackException> lastFeaturesReceived = new SynchronizationPoint<>(
240                    AbstractXMPPConnection.this, "last stream features received from server");
241
242    /**
243     * Set to success if the SASL feature has been received.
244     */
245    protected final SynchronizationPoint<XMPPException> saslFeatureReceived = new SynchronizationPoint<>(
246                    AbstractXMPPConnection.this, "SASL mechanisms stream feature from server");
247
248    /**
249     * The SASLAuthentication manager that is responsible for authenticating with the server.
250     */
251    protected final SASLAuthentication saslAuthentication;
252
253    /**
254     * A number to uniquely identify connections that are created. This is distinct from the
255     * connection ID, which is a value sent by the server once a connection is made.
256     */
257    protected final int connectionCounterValue = connectionCounter.getAndIncrement();
258
259    /**
260     * Holds the initial configuration used while creating the connection.
261     */
262    protected final ConnectionConfiguration config;
263
264    /**
265     * Defines how the from attribute of outgoing stanzas should be handled.
266     */
267    private FromMode fromMode = FromMode.OMITTED;
268
269    protected XMPPInputOutputStream compressionHandler;
270
271    private ParsingExceptionCallback parsingExceptionCallback = SmackConfiguration.getDefaultParsingExceptionCallback();
272
273    /**
274     * This scheduled thread pool executor is used to remove pending callbacks.
275     */
276    protected static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor(
277            new ThreadFactory() {
278                @Override
279                public Thread newThread(Runnable runnable) {
280                    Thread thread = new Thread(runnable);
281                    thread.setName("Smack Scheduled Executor Service");
282                    thread.setDaemon(true);
283                    return thread;
284                }
285            });
286
287    /**
288     * A cached thread pool executor service with custom thread factory to set meaningful names on the threads and set
289     * them 'daemon'.
290     */
291    private static final ExecutorService CACHED_EXECUTOR_SERVICE = Executors.newCachedThreadPool(new ThreadFactory() {
292        @Override
293        public Thread newThread(Runnable runnable) {
294            Thread thread = new Thread(runnable);
295            thread.setName("Smack Cached Executor");
296            thread.setDaemon(true);
297            thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
298                @Override
299                public void uncaughtException(Thread t, Throwable e) {
300                    LOGGER.log(Level.WARNING, t + " encountered uncaught exception", e);
301                }
302            });
303            return thread;
304        }
305    });
306
307    protected static final AsyncButOrdered<AbstractXMPPConnection> ASYNC_BUT_ORDERED = new AsyncButOrdered<>();
308
309    /**
310     * An executor which uses {@link #asyncGoLimited(Runnable)} to limit the number of asynchronously processed runnables
311     * per connection.
312     */
313    private final Executor limitedExcutor = new Executor() {
314        @Override
315        public void execute(Runnable runnable) {
316            asyncGoLimited(runnable);
317        }
318    };
319
320    /**
321     * The used host to establish the connection to
322     */
323    protected String host;
324
325    /**
326     * The used port to establish the connection to
327     */
328    protected int port;
329
330    /**
331     * Flag that indicates if the user is currently authenticated with the server.
332     */
333    protected boolean authenticated = false;
334
335    // TODO: Migrate to ZonedDateTime once Smack's minimum required Android SDK level is 26 (8.0, Oreo) or higher.
336    protected long authenticatedConnectionInitiallyEstablishedTimestamp;
337
338    /**
339     * Flag that indicates if the user was authenticated with the server when the connection
340     * to the server was closed (abruptly or not).
341     */
342    protected boolean wasAuthenticated = false;
343
344    private final Map<String, IQRequestHandler> setIqRequestHandler = new HashMap<>();
345    private final Map<String, IQRequestHandler> getIqRequestHandler = new HashMap<>();
346
347    /**
348     * Create a new XMPPConnection to an XMPP server.
349     *
350     * @param configuration The configuration which is used to establish the connection.
351     */
352    protected AbstractXMPPConnection(ConnectionConfiguration configuration) {
353        saslAuthentication = new SASLAuthentication(this, configuration);
354        config = configuration;
355        SmackDebuggerFactory debuggerFactory = configuration.getDebuggerFactory();
356        if (debuggerFactory != null) {
357            debugger = debuggerFactory.create(this);
358        } else {
359            debugger = null;
360        }
361        // Notify listeners that a new connection has been established
362        for (ConnectionCreationListener listener : XMPPConnectionRegistry.getConnectionCreationListeners()) {
363            listener.connectionCreated(this);
364        }
365    }
366
367    /**
368     * Get the connection configuration used by this connection.
369     *
370     * @return the connection configuration.
371     */
372    public ConnectionConfiguration getConfiguration() {
373        return config;
374    }
375
376    @Override
377    public DomainBareJid getXMPPServiceDomain() {
378        if (xmppServiceDomain != null) {
379            return xmppServiceDomain;
380        }
381        return config.getXMPPServiceDomain();
382    }
383
384    @Override
385    public String getHost() {
386        return host;
387    }
388
389    @Override
390    public int getPort() {
391        return port;
392    }
393
394    @Override
395    public abstract boolean isSecureConnection();
396
397    protected abstract void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException;
398
399    @Override
400    public abstract void sendNonza(Nonza element) throws NotConnectedException, InterruptedException;
401
402    @Override
403    public abstract boolean isUsingCompression();
404
405    protected void initState() {
406        saslFeatureReceived.init();
407        lastFeaturesReceived.init();
408        tlsHandled.init();
409    }
410
411    /**
412     * Establishes a connection to the XMPP server. It basically
413     * creates and maintains a connection to the server.
414     * <p>
415     * Listeners will be preserved from a previous connection.
416     * </p>
417     *
418     * @throws XMPPException if an error occurs on the XMPP protocol level.
419     * @throws SmackException if an error occurs somewhere else besides XMPP protocol level.
420     * @throws IOException
421     * @return a reference to this object, to chain <code>connect()</code> with <code>login()</code>.
422     * @throws InterruptedException
423     */
424    public synchronized AbstractXMPPConnection connect() throws SmackException, IOException, XMPPException, InterruptedException {
425        // Check if not already connected
426        throwAlreadyConnectedExceptionIfAppropriate();
427
428        // Reset the connection state
429        initState();
430        saslAuthentication.init();
431        streamId = null;
432
433        try {
434            // Perform the actual connection to the XMPP service
435            connectInternal();
436
437            // If TLS is required but the server doesn't offer it, disconnect
438            // from the server and throw an error. First check if we've already negotiated TLS
439            // and are secure, however (features get parsed a second time after TLS is established).
440            if (!isSecureConnection() && getConfiguration().getSecurityMode() == SecurityMode.required) {
441                throw new SecurityRequiredByClientException();
442            }
443        } catch (SmackException | IOException | XMPPException | InterruptedException e) {
444            instantShutdown();
445            throw e;
446        }
447
448        // Make note of the fact that we're now connected.
449        connected = true;
450        callConnectionConnectedListener();
451
452        return this;
453    }
454
455    /**
456     * Abstract method that concrete subclasses of XMPPConnection need to implement to perform their
457     * way of XMPP connection establishment. Implementations are required to perform an automatic
458     * login if the previous connection state was logged (authenticated).
459     *
460     * @throws SmackException
461     * @throws IOException
462     * @throws XMPPException
463     * @throws InterruptedException
464     */
465    protected abstract void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException;
466
467    private String usedUsername, usedPassword;
468
469    /**
470     * The resourcepart used for this connection. May not be the resulting resourcepart if it's null or overridden by the XMPP service.
471     */
472    private Resourcepart usedResource;
473
474    /**
475     * Logs in to the server using the strongest SASL mechanism supported by
476     * the server. If more than the connection's default stanza timeout elapses in each step of the
477     * authentication process without a response from the server, a
478     * {@link SmackException.NoResponseException} will be thrown.
479     * <p>
480     * Before logging in (i.e. authenticate) to the server the connection must be connected
481     * by calling {@link #connect}.
482     * </p>
483     * <p>
484     * It is possible to log in without sending an initial available presence by using
485     * {@link ConnectionConfiguration.Builder#setSendPresence(boolean)}.
486     * Finally, if you want to not pass a password and instead use a more advanced mechanism
487     * while using SASL then you may be interested in using
488     * {@link ConnectionConfiguration.Builder#setCallbackHandler(javax.security.auth.callback.CallbackHandler)}.
489     * For more advanced login settings see {@link ConnectionConfiguration}.
490     * </p>
491     *
492     * @throws XMPPException if an error occurs on the XMPP protocol level.
493     * @throws SmackException if an error occurs somewhere else besides XMPP protocol level.
494     * @throws IOException if an I/O error occurs during login.
495     * @throws InterruptedException
496     */
497    public synchronized void login() throws XMPPException, SmackException, IOException, InterruptedException {
498        // The previously used username, password and resource take over precedence over the
499        // ones from the connection configuration
500        CharSequence username = usedUsername != null ? usedUsername : config.getUsername();
501        String password = usedPassword != null ? usedPassword : config.getPassword();
502        Resourcepart resource = usedResource != null ? usedResource : config.getResource();
503        login(username, password, resource);
504    }
505
506    /**
507     * Same as {@link #login(CharSequence, String, Resourcepart)}, but takes the resource from the connection
508     * configuration.
509     *
510     * @param username
511     * @param password
512     * @throws XMPPException
513     * @throws SmackException
514     * @throws IOException
515     * @throws InterruptedException
516     * @see #login
517     */
518    public synchronized void login(CharSequence username, String password) throws XMPPException, SmackException,
519                    IOException, InterruptedException {
520        login(username, password, config.getResource());
521    }
522
523    /**
524     * Login with the given username (authorization identity). You may omit the password if a callback handler is used.
525     * If resource is null, then the server will generate one.
526     *
527     * @param username
528     * @param password
529     * @param resource
530     * @throws XMPPException
531     * @throws SmackException
532     * @throws IOException
533     * @throws InterruptedException
534     * @see #login
535     */
536    public synchronized void login(CharSequence username, String password, Resourcepart resource) throws XMPPException,
537                    SmackException, IOException, InterruptedException {
538        if (!config.allowNullOrEmptyUsername) {
539            StringUtils.requireNotNullOrEmpty(username, "Username must not be null or empty");
540        }
541        throwNotConnectedExceptionIfAppropriate("Did you call connect() before login()?");
542        throwAlreadyLoggedInExceptionIfAppropriate();
543        usedUsername = username != null ? username.toString() : null;
544        usedPassword = password;
545        usedResource = resource;
546        loginInternal(usedUsername, usedPassword, usedResource);
547    }
548
549    protected abstract void loginInternal(String username, String password, Resourcepart resource)
550                    throws XMPPException, SmackException, IOException, InterruptedException;
551
552    @Override
553    public final boolean isConnected() {
554        return connected;
555    }
556
557    @Override
558    public final boolean isAuthenticated() {
559        return authenticated;
560    }
561
562    @Override
563    public final EntityFullJid getUser() {
564        return user;
565    }
566
567    @Override
568    public String getStreamId() {
569        if (!isConnected()) {
570            return null;
571        }
572        return streamId;
573    }
574
575    protected void bindResourceAndEstablishSession(Resourcepart resource) throws XMPPErrorException,
576                    SmackException, InterruptedException {
577
578        // Wait until either:
579        // - the servers last features stanza has been parsed
580        // - the timeout occurs
581        LOGGER.finer("Waiting for last features to be received before continuing with resource binding");
582        lastFeaturesReceived.checkIfSuccessOrWaitOrThrow();
583
584
585        if (!hasFeature(Bind.ELEMENT, Bind.NAMESPACE)) {
586            // Server never offered resource binding, which is REQUIRED in XMPP client and
587            // server implementations as per RFC6120 7.2
588            throw new ResourceBindingNotOfferedException();
589        }
590
591        // Resource binding, see RFC6120 7.
592        // Note that we can not use IQReplyFilter here, since the users full JID is not yet
593        // available. It will become available right after the resource has been successfully bound.
594        Bind bindResource = Bind.newSet(resource);
595        StanzaCollector packetCollector = createStanzaCollectorAndSend(new StanzaIdFilter(bindResource), bindResource);
596        Bind response = packetCollector.nextResultOrThrow();
597        // Set the connections user to the result of resource binding. It is important that we don't infer the user
598        // from the login() arguments and the configurations service name, as, for example, when SASL External is used,
599        // the username is not given to login but taken from the 'external' certificate.
600        user = response.getJid();
601        xmppServiceDomain = user.asDomainBareJid();
602
603        Session.Feature sessionFeature = getFeature(Session.ELEMENT, Session.NAMESPACE);
604        // Only bind the session if it's announced as stream feature by the server, is not optional and not disabled
605        // For more information see http://tools.ietf.org/html/draft-cridland-xmpp-session-01
606        if (sessionFeature != null && !sessionFeature.isOptional()) {
607            Session session = new Session();
608            packetCollector = createStanzaCollectorAndSend(new StanzaIdFilter(session), session);
609            packetCollector.nextResultOrThrow();
610        }
611    }
612
613    protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException {
614        if (!resumed) {
615            authenticatedConnectionInitiallyEstablishedTimestamp = System.currentTimeMillis();
616        }
617        // Indicate that we're now authenticated.
618        this.authenticated = true;
619
620        // If debugging is enabled, change the the debug window title to include the
621        // name we are now logged-in as.
622        // If DEBUG was set to true AFTER the connection was created the debugger
623        // will be null
624        if (debugger != null) {
625            debugger.userHasLogged(user);
626        }
627        callConnectionAuthenticatedListener(resumed);
628
629        // Set presence to online. It is important that this is done after
630        // callConnectionAuthenticatedListener(), as this call will also
631        // eventually load the roster. And we should load the roster before we
632        // send the initial presence.
633        if (config.isSendPresence() && !resumed) {
634            sendStanza(new Presence(Presence.Type.available));
635        }
636    }
637
638    @Override
639    public final boolean isAnonymous() {
640        return isAuthenticated() && SASLAnonymous.NAME.equals(getUsedSaslMechansism());
641    }
642
643    /**
644     * Get the name of the SASL mechanism that was used to authenticate this connection. This returns the name of
645     * mechanism which was used the last time this connection was authenticated, and will return <code>null</code> if
646     * this connection was not authenticated before.
647     *
648     * @return the name of the used SASL mechanism.
649     * @since 4.2
650     */
651    public final String getUsedSaslMechansism() {
652        return saslAuthentication.getNameOfLastUsedSaslMechansism();
653    }
654
655    private DomainBareJid xmppServiceDomain;
656
657    protected List<HostAddress> hostAddresses;
658
659    /**
660     * Populates {@link #hostAddresses} with the resolved addresses or with the configured host address. If no host
661     * address was configured and all lookups failed, for example with NX_DOMAIN, then {@link #hostAddresses} will be
662     * populated with the empty list.
663     *
664     * @return a list of host addresses where DNS (SRV) RR resolution failed.
665     */
666    protected List<HostAddress> populateHostAddresses() {
667        List<HostAddress> failedAddresses = new LinkedList<>();
668        if (config.hostAddress != null) {
669            hostAddresses = new ArrayList<>(1);
670            HostAddress hostAddress = new HostAddress(config.port, config.hostAddress);
671            hostAddresses.add(hostAddress);
672        }
673        else if (config.host != null) {
674            hostAddresses = new ArrayList<>(1);
675            HostAddress hostAddress = DNSUtil.getDNSResolver().lookupHostAddress(config.host, config.port, failedAddresses, config.getDnssecMode());
676            if (hostAddress != null) {
677                hostAddresses.add(hostAddress);
678            }
679        } else {
680            // N.B.: Important to use config.serviceName and not AbstractXMPPConnection.serviceName
681            DnsName dnsName = DnsName.from(config.getXMPPServiceDomain());
682            hostAddresses = DNSUtil.resolveXMPPServiceDomain(dnsName, failedAddresses, config.getDnssecMode());
683        }
684        // Either the populated host addresses are not empty *or* there must be at least one failed address.
685        assert (!hostAddresses.isEmpty() || !failedAddresses.isEmpty());
686        return failedAddresses;
687    }
688
689    protected Lock getConnectionLock() {
690        return connectionLock;
691    }
692
693    protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException {
694        throwNotConnectedExceptionIfAppropriate(null);
695    }
696
697    protected void throwNotConnectedExceptionIfAppropriate(String optionalHint) throws NotConnectedException {
698        if (!isConnected()) {
699            throw new NotConnectedException(optionalHint);
700        }
701    }
702
703    protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException {
704        if (isConnected()) {
705            throw new AlreadyConnectedException();
706        }
707    }
708
709    protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException {
710        if (isAuthenticated()) {
711            throw new AlreadyLoggedInException();
712        }
713    }
714
715    @Override
716    public void sendStanza(Stanza stanza) throws NotConnectedException, InterruptedException {
717        Objects.requireNonNull(stanza, "Stanza must not be null");
718        assert (stanza instanceof Message || stanza instanceof Presence || stanza instanceof IQ);
719
720        throwNotConnectedExceptionIfAppropriate();
721        switch (fromMode) {
722        case OMITTED:
723            stanza.setFrom((Jid) null);
724            break;
725        case USER:
726            stanza.setFrom(getUser());
727            break;
728        case UNCHANGED:
729        default:
730            break;
731        }
732        // Invoke interceptors for the new stanza that is about to be sent. Interceptors may modify
733        // the content of the stanza.
734        firePacketInterceptors(stanza);
735        sendStanzaInternal(stanza);
736    }
737
738    /**
739     * Returns the SASLAuthentication manager that is responsible for authenticating with
740     * the server.
741     *
742     * @return the SASLAuthentication manager that is responsible for authenticating with
743     *         the server.
744     */
745    protected SASLAuthentication getSASLAuthentication() {
746        return saslAuthentication;
747    }
748
749    /**
750     * Closes the connection by setting presence to unavailable then closing the connection to
751     * the XMPP server. The XMPPConnection can still be used for connecting to the server
752     * again.
753     *
754     */
755    public void disconnect() {
756        Presence unavailablePresence = null;
757        if (isAuthenticated()) {
758            unavailablePresence = new Presence(Presence.Type.unavailable);
759        }
760        try {
761            disconnect(unavailablePresence);
762        }
763        catch (NotConnectedException e) {
764            LOGGER.log(Level.FINEST, "Connection is already disconnected", e);
765        }
766    }
767
768    /**
769     * Closes the connection. A custom unavailable presence is sent to the server, followed
770     * by closing the stream. The XMPPConnection can still be used for connecting to the server
771     * again. A custom unavailable presence is useful for communicating offline presence
772     * information such as "On vacation". Typically, just the status text of the presence
773     * stanza is set with online information, but most XMPP servers will deliver the full
774     * presence stanza with whatever data is set.
775     *
776     * @param unavailablePresence the optional presence stanza to send during shutdown.
777     * @throws NotConnectedException
778     */
779    public synchronized void disconnect(Presence unavailablePresence) throws NotConnectedException {
780        if (unavailablePresence != null) {
781            try {
782                sendStanza(unavailablePresence);
783            } catch (InterruptedException e) {
784                LOGGER.log(Level.FINE,
785                        "Was interrupted while sending unavailable presence. Continuing to disconnect the connection",
786                        e);
787            }
788        }
789        shutdown();
790        callConnectionClosedListener();
791    }
792
793    /**
794     * Shuts the current connection down.
795     */
796    protected abstract void shutdown();
797
798    /**
799     * Performs an unclean disconnect and shutdown of the connection. Does not send a closing stream stanza.
800     */
801    public abstract void instantShutdown();
802
803    @Override
804    public void addConnectionListener(ConnectionListener connectionListener) {
805        if (connectionListener == null) {
806            return;
807        }
808        connectionListeners.add(connectionListener);
809    }
810
811    @Override
812    public void removeConnectionListener(ConnectionListener connectionListener) {
813        connectionListeners.remove(connectionListener);
814    }
815
816    @Override
817    public <I extends IQ> I sendIqRequestAndWaitForResponse(IQ request)
818            throws NoResponseException, XMPPErrorException, NotConnectedException, InterruptedException {
819        StanzaCollector collector = createStanzaCollectorAndSend(request);
820        IQ resultResponse = collector.nextResultOrThrow();
821        @SuppressWarnings("unchecked")
822        I concreteResultResponse = (I) resultResponse;
823        return concreteResultResponse;
824    }
825
826    @Override
827    public StanzaCollector createStanzaCollectorAndSend(IQ packet) throws NotConnectedException, InterruptedException {
828        StanzaFilter packetFilter = new IQReplyFilter(packet, this);
829        // Create the packet collector before sending the packet
830        StanzaCollector packetCollector = createStanzaCollectorAndSend(packetFilter, packet);
831        return packetCollector;
832    }
833
834    @Override
835    public StanzaCollector createStanzaCollectorAndSend(StanzaFilter packetFilter, Stanza packet)
836                    throws NotConnectedException, InterruptedException {
837        StanzaCollector.Configuration configuration = StanzaCollector.newConfiguration()
838                        .setStanzaFilter(packetFilter)
839                        .setRequest(packet);
840        // Create the packet collector before sending the packet
841        StanzaCollector packetCollector = createStanzaCollector(configuration);
842        try {
843            // Now we can send the packet as the collector has been created
844            sendStanza(packet);
845        }
846        catch (InterruptedException | NotConnectedException | RuntimeException e) {
847            packetCollector.cancel();
848            throw e;
849        }
850        return packetCollector;
851    }
852
853    @Override
854    public StanzaCollector createStanzaCollector(StanzaFilter packetFilter) {
855        StanzaCollector.Configuration configuration = StanzaCollector.newConfiguration().setStanzaFilter(packetFilter);
856        return createStanzaCollector(configuration);
857    }
858
859    @Override
860    public StanzaCollector createStanzaCollector(StanzaCollector.Configuration configuration) {
861        StanzaCollector collector = new StanzaCollector(this, configuration);
862        // Add the collector to the list of active collectors.
863        collectors.add(collector);
864        return collector;
865    }
866
867    @Override
868    public void removeStanzaCollector(StanzaCollector collector) {
869        collectors.remove(collector);
870    }
871
872    @Override
873    public void addSyncStanzaListener(StanzaListener packetListener, StanzaFilter packetFilter) {
874        if (packetListener == null) {
875            throw new NullPointerException("Packet listener is null.");
876        }
877        ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter);
878        synchronized (syncRecvListeners) {
879            syncRecvListeners.put(packetListener, wrapper);
880        }
881    }
882
883    @Override
884    public boolean removeSyncStanzaListener(StanzaListener packetListener) {
885        synchronized (syncRecvListeners) {
886            return syncRecvListeners.remove(packetListener) != null;
887        }
888    }
889
890    @Override
891    public void addAsyncStanzaListener(StanzaListener packetListener, StanzaFilter packetFilter) {
892        if (packetListener == null) {
893            throw new NullPointerException("Packet listener is null.");
894        }
895        ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter);
896        synchronized (asyncRecvListeners) {
897            asyncRecvListeners.put(packetListener, wrapper);
898        }
899    }
900
901    @Override
902    public boolean removeAsyncStanzaListener(StanzaListener packetListener) {
903        synchronized (asyncRecvListeners) {
904            return asyncRecvListeners.remove(packetListener) != null;
905        }
906    }
907
908    @Deprecated
909    @Override
910    public void addPacketSendingListener(StanzaListener packetListener, StanzaFilter packetFilter) {
911        addStanzaSendingListener(packetListener, packetFilter);
912    }
913
914    @Override
915    public void addStanzaSendingListener(StanzaListener packetListener, StanzaFilter packetFilter) {
916        if (packetListener == null) {
917            throw new NullPointerException("Packet listener is null.");
918        }
919        ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter);
920        synchronized (sendListeners) {
921            sendListeners.put(packetListener, wrapper);
922        }
923    }
924
925    @Deprecated
926    @Override
927    public void removePacketSendingListener(StanzaListener packetListener) {
928        removeStanzaSendingListener(packetListener);
929    }
930
931    @Override
932    public void removeStanzaSendingListener(StanzaListener packetListener) {
933        synchronized (sendListeners) {
934            sendListeners.remove(packetListener);
935        }
936    }
937
938    /**
939     * Process all stanza listeners for sending packets.
940     * <p>
941     * Compared to {@link #firePacketInterceptors(Stanza)}, the listeners will be invoked in a new thread.
942     * </p>
943     *
944     * @param packet the stanza to process.
945     */
946    @SuppressWarnings("javadoc")
947    protected void firePacketSendingListeners(final Stanza packet) {
948        final SmackDebugger debugger = this.debugger;
949        if (debugger != null) {
950            debugger.onOutgoingStreamElement(packet);
951        }
952
953        final List<StanzaListener> listenersToNotify = new LinkedList<>();
954        synchronized (sendListeners) {
955            for (ListenerWrapper listenerWrapper : sendListeners.values()) {
956                if (listenerWrapper.filterMatches(packet)) {
957                    listenersToNotify.add(listenerWrapper.getListener());
958                }
959            }
960        }
961        if (listenersToNotify.isEmpty()) {
962            return;
963        }
964        // Notify in a new thread, because we can
965        asyncGo(new Runnable() {
966            @Override
967            public void run() {
968                for (StanzaListener listener : listenersToNotify) {
969                    try {
970                        listener.processStanza(packet);
971                    }
972                    catch (Exception e) {
973                        LOGGER.log(Level.WARNING, "Sending listener threw exception", e);
974                        continue;
975                    }
976                }
977            }
978        });
979    }
980
981    @Deprecated
982    @Override
983    public void addPacketInterceptor(StanzaListener packetInterceptor,
984                                     StanzaFilter packetFilter) {
985        addStanzaInterceptor(packetInterceptor, packetFilter);
986    }
987
988    @Override
989    public void addStanzaInterceptor(StanzaListener packetInterceptor,
990            StanzaFilter packetFilter) {
991        if (packetInterceptor == null) {
992            throw new NullPointerException("Packet interceptor is null.");
993        }
994        InterceptorWrapper interceptorWrapper = new InterceptorWrapper(packetInterceptor, packetFilter);
995        synchronized (interceptors) {
996            interceptors.put(packetInterceptor, interceptorWrapper);
997        }
998    }
999
1000    @Deprecated
1001    @Override
1002    public void removePacketInterceptor(StanzaListener packetInterceptor) {
1003        removeStanzaInterceptor(packetInterceptor);
1004    }
1005
1006    @Override
1007    public void removeStanzaInterceptor(StanzaListener packetInterceptor) {
1008        synchronized (interceptors) {
1009            interceptors.remove(packetInterceptor);
1010        }
1011    }
1012
1013    /**
1014     * Process interceptors. Interceptors may modify the stanza that is about to be sent.
1015     * Since the thread that requested to send the stanza will invoke all interceptors, it
1016     * is important that interceptors perform their work as soon as possible so that the
1017     * thread does not remain blocked for a long period.
1018     *
1019     * @param packet the stanza that is going to be sent to the server
1020     */
1021    private void firePacketInterceptors(Stanza packet) {
1022        List<StanzaListener> interceptorsToInvoke = new LinkedList<>();
1023        synchronized (interceptors) {
1024            for (InterceptorWrapper interceptorWrapper : interceptors.values()) {
1025                if (interceptorWrapper.filterMatches(packet)) {
1026                    interceptorsToInvoke.add(interceptorWrapper.getInterceptor());
1027                }
1028            }
1029        }
1030        for (StanzaListener interceptor : interceptorsToInvoke) {
1031            try {
1032                interceptor.processStanza(packet);
1033            } catch (Exception e) {
1034                LOGGER.log(Level.SEVERE, "Packet interceptor threw exception", e);
1035            }
1036        }
1037    }
1038
1039    /**
1040     * Initialize the {@link #debugger}. You can specify a customized {@link SmackDebugger}
1041     * by setup the system property <code>smack.debuggerClass</code> to the implementation.
1042     *
1043     * @throws IllegalStateException if the reader or writer isn't yet initialized.
1044     * @throws IllegalArgumentException if the SmackDebugger can't be loaded.
1045     */
1046    protected void initDebugger() {
1047        if (reader == null || writer == null) {
1048            throw new NullPointerException("Reader or writer isn't initialized.");
1049        }
1050        // If debugging is enabled, we open a window and write out all network traffic.
1051        if (debugger != null) {
1052            // Obtain new reader and writer from the existing debugger
1053            reader = debugger.newConnectionReader(reader);
1054            writer = debugger.newConnectionWriter(writer);
1055        }
1056    }
1057
1058    @Override
1059    public long getReplyTimeout() {
1060        return replyTimeout;
1061    }
1062
1063    @Override
1064    public void setReplyTimeout(long timeout) {
1065        replyTimeout = timeout;
1066    }
1067
1068    private SmackConfiguration.UnknownIqRequestReplyMode unknownIqRequestReplyMode = SmackConfiguration.getUnknownIqRequestReplyMode();
1069
1070    /**
1071     * Set how Smack behaves when an unknown IQ request has been received.
1072     *
1073     * @param unknownIqRequestReplyMode reply mode.
1074     */
1075    public void setUnknownIqRequestReplyMode(UnknownIqRequestReplyMode unknownIqRequestReplyMode) {
1076        this.unknownIqRequestReplyMode = Objects.requireNonNull(unknownIqRequestReplyMode, "Mode must not be null");
1077    }
1078
1079    protected void parseAndProcessStanza(XmlPullParser parser) throws Exception {
1080        ParserUtils.assertAtStartTag(parser);
1081        int parserDepth = parser.getDepth();
1082        Stanza stanza = null;
1083        try {
1084            stanza = PacketParserUtils.parseStanza(parser);
1085        }
1086        catch (Exception e) {
1087            CharSequence content = PacketParserUtils.parseContentDepth(parser,
1088                            parserDepth);
1089            UnparseableStanza message = new UnparseableStanza(content, e);
1090            ParsingExceptionCallback callback = getParsingExceptionCallback();
1091            if (callback != null) {
1092                callback.handleUnparsableStanza(message);
1093            }
1094        }
1095        ParserUtils.assertAtEndTag(parser);
1096        if (stanza != null) {
1097            processStanza(stanza);
1098        }
1099    }
1100
1101    /**
1102     * Processes a stanza after it's been fully parsed by looping through the installed
1103     * stanza collectors and listeners and letting them examine the stanza to see if
1104     * they are a match with the filter.
1105     *
1106     * @param stanza the stanza to process.
1107     * @throws InterruptedException
1108     */
1109    protected void processStanza(final Stanza stanza) throws InterruptedException {
1110        assert (stanza != null);
1111
1112        final SmackDebugger debugger = this.debugger;
1113        if (debugger != null) {
1114            debugger.onIncomingStreamElement(stanza);
1115        }
1116
1117        lastStanzaReceived = System.currentTimeMillis();
1118        // Deliver the incoming packet to listeners.
1119        invokeStanzaCollectorsAndNotifyRecvListeners(stanza);
1120    }
1121
1122    /**
1123     * Invoke {@link StanzaCollector#processStanza(Stanza)} for every
1124     * StanzaCollector with the given packet. Also notify the receive listeners with a matching stanza filter about the packet.
1125     * <p>
1126     * This method will be invoked by the connections incoming processing thread which may be shared across multiple connections and
1127     * thus it is important that no user code, e.g. in form of a callback, is invoked by this method. For the same reason,
1128     * this method must not block for an extended period of time.
1129     * </p>
1130     *
1131     * @param packet the stanza to notify the StanzaCollectors and receive listeners about.
1132     */
1133    protected void invokeStanzaCollectorsAndNotifyRecvListeners(final Stanza packet) {
1134        if (packet instanceof IQ) {
1135            final IQ iq = (IQ) packet;
1136            if (iq.isRequestIQ()) {
1137                final IQ iqRequest = iq;
1138                final String key = XmppStringUtils.generateKey(iq.getChildElementName(), iq.getChildElementNamespace());
1139                IQRequestHandler iqRequestHandler;
1140                final IQ.Type type = iq.getType();
1141                switch (type) {
1142                case set:
1143                    synchronized (setIqRequestHandler) {
1144                        iqRequestHandler = setIqRequestHandler.get(key);
1145                    }
1146                    break;
1147                case get:
1148                    synchronized (getIqRequestHandler) {
1149                        iqRequestHandler = getIqRequestHandler.get(key);
1150                    }
1151                    break;
1152                default:
1153                    throw new IllegalStateException("Should only encounter IQ type 'get' or 'set'");
1154                }
1155                if (iqRequestHandler == null) {
1156                    StanzaError.Condition replyCondition;
1157                    switch (unknownIqRequestReplyMode) {
1158                    case doNotReply:
1159                        return;
1160                    case replyFeatureNotImplemented:
1161                        replyCondition = StanzaError.Condition.feature_not_implemented;
1162                        break;
1163                    case replyServiceUnavailable:
1164                        replyCondition = StanzaError.Condition.service_unavailable;
1165                        break;
1166                    default:
1167                        throw new AssertionError();
1168                    }
1169
1170                    // If the IQ stanza is of type "get" or "set" with no registered IQ request handler, then answer an
1171                    // IQ of type 'error' with condition 'service-unavailable'.
1172                    ErrorIQ errorIQ = IQ.createErrorResponse(iq, StanzaError.getBuilder((
1173                                    replyCondition)));
1174                    try {
1175                        sendStanza(errorIQ);
1176                    }
1177                    catch (InterruptedException | NotConnectedException e) {
1178                        LOGGER.log(Level.WARNING, "Exception while sending error IQ to unkown IQ request", e);
1179                    }
1180                } else {
1181                    Executor executorService = null;
1182                    switch (iqRequestHandler.getMode()) {
1183                    case sync:
1184                        executorService = ASYNC_BUT_ORDERED.asExecutorFor(this);
1185                        break;
1186                    case async:
1187                        executorService = limitedExcutor;
1188                        break;
1189                    }
1190                    final IQRequestHandler finalIqRequestHandler = iqRequestHandler;
1191                    executorService.execute(new Runnable() {
1192                        @Override
1193                        public void run() {
1194                            IQ response = finalIqRequestHandler.handleIQRequest(iq);
1195                            if (response == null) {
1196                                // It is not ideal if the IQ request handler does not return an IQ response, because RFC
1197                                // 6120 ยง 8.1.2 does specify that a response is mandatory. But some APIs, mostly the
1198                                // file transfer one, does not always return a result, so we need to handle this case.
1199                                // Also sometimes a request handler may decide that it's better to not send a response,
1200                                // e.g. to avoid presence leaks.
1201                                return;
1202                            }
1203
1204                            assert (response.getType() == IQ.Type.result || response.getType() == IQ.Type.error);
1205
1206                            response.setTo(iqRequest.getFrom());
1207                            response.setStanzaId(iqRequest.getStanzaId());
1208                            try {
1209                                sendStanza(response);
1210                            }
1211                            catch (InterruptedException | NotConnectedException e) {
1212                                LOGGER.log(Level.WARNING, "Exception while sending response to IQ request", e);
1213                            }
1214                        }
1215                    });
1216                }
1217                // The following returns makes it impossible for packet listeners and collectors to
1218                // filter for IQ request stanzas, i.e. IQs of type 'set' or 'get'. This is the
1219                // desired behavior.
1220                return;
1221            }
1222        }
1223
1224        // First handle the async recv listeners. Note that this code is very similar to what follows a few lines below,
1225        // the only difference is that asyncRecvListeners is used here and that the packet listeners are started in
1226        // their own thread.
1227        final Collection<StanzaListener> listenersToNotify = new LinkedList<>();
1228        synchronized (asyncRecvListeners) {
1229            for (ListenerWrapper listenerWrapper : asyncRecvListeners.values()) {
1230                if (listenerWrapper.filterMatches(packet)) {
1231                    listenersToNotify.add(listenerWrapper.getListener());
1232                }
1233            }
1234        }
1235
1236        for (final StanzaListener listener : listenersToNotify) {
1237            asyncGoLimited(new Runnable() {
1238                @Override
1239                public void run() {
1240                    try {
1241                        listener.processStanza(packet);
1242                    } catch (Exception e) {
1243                        LOGGER.log(Level.SEVERE, "Exception in async packet listener", e);
1244                    }
1245                }
1246            });
1247        }
1248
1249        // Loop through all collectors and notify the appropriate ones.
1250        for (StanzaCollector collector : collectors) {
1251            collector.processStanza(packet);
1252        }
1253
1254        // Notify the receive listeners interested in the packet
1255        listenersToNotify.clear();
1256        synchronized (syncRecvListeners) {
1257            for (ListenerWrapper listenerWrapper : syncRecvListeners.values()) {
1258                if (listenerWrapper.filterMatches(packet)) {
1259                    listenersToNotify.add(listenerWrapper.getListener());
1260                }
1261            }
1262        }
1263
1264        // Decouple incoming stanza processing from listener invocation. Unlike async listeners, this uses a single
1265        // threaded executor service and therefore keeps the order.
1266        ASYNC_BUT_ORDERED.performAsyncButOrdered(this, new Runnable() {
1267            @Override
1268            public void run() {
1269                for (StanzaListener listener : listenersToNotify) {
1270                    try {
1271                        listener.processStanza(packet);
1272                    } catch (NotConnectedException e) {
1273                        LOGGER.log(Level.WARNING, "Got not connected exception, aborting", e);
1274                        break;
1275                    } catch (Exception e) {
1276                        LOGGER.log(Level.SEVERE, "Exception in packet listener", e);
1277                    }
1278                }
1279            }
1280        });
1281    }
1282
1283    /**
1284     * Sets whether the connection has already logged in the server. This method assures that the
1285     * {@link #wasAuthenticated} flag is never reset once it has ever been set.
1286     *
1287     */
1288    protected void setWasAuthenticated() {
1289        // Never reset the flag if the connection has ever been authenticated
1290        if (!wasAuthenticated) {
1291            wasAuthenticated = authenticated;
1292        }
1293    }
1294
1295    protected void callConnectionConnectedListener() {
1296        for (ConnectionListener listener : connectionListeners) {
1297            listener.connected(this);
1298        }
1299    }
1300
1301    protected void callConnectionAuthenticatedListener(boolean resumed) {
1302        for (ConnectionListener listener : connectionListeners) {
1303            try {
1304                listener.authenticated(this, resumed);
1305            } catch (Exception e) {
1306                // Catch and print any exception so we can recover
1307                // from a faulty listener and finish the shutdown process
1308                LOGGER.log(Level.SEVERE, "Exception in authenticated listener", e);
1309            }
1310        }
1311    }
1312
1313    void callConnectionClosedListener() {
1314        for (ConnectionListener listener : connectionListeners) {
1315            try {
1316                listener.connectionClosed();
1317            }
1318            catch (Exception e) {
1319                // Catch and print any exception so we can recover
1320                // from a faulty listener and finish the shutdown process
1321                LOGGER.log(Level.SEVERE, "Error in listener while closing connection", e);
1322            }
1323        }
1324    }
1325
1326    protected void callConnectionClosedOnErrorListener(Exception e) {
1327        boolean logWarning = true;
1328        if (e instanceof StreamErrorException) {
1329            StreamErrorException see = (StreamErrorException) e;
1330            if (see.getStreamError().getCondition() == StreamError.Condition.not_authorized
1331                            && wasAuthenticated) {
1332                logWarning = false;
1333                LOGGER.log(Level.FINE,
1334                                "Connection closed with not-authorized stream error after it was already authenticated. The account was likely deleted/unregistered on the server");
1335            }
1336        }
1337        if (logWarning) {
1338            LOGGER.log(Level.WARNING, "Connection " + this + " closed with error", e);
1339        }
1340        for (ConnectionListener listener : connectionListeners) {
1341            try {
1342                listener.connectionClosedOnError(e);
1343            }
1344            catch (Exception e2) {
1345                // Catch and print any exception so we can recover
1346                // from a faulty listener
1347                LOGGER.log(Level.SEVERE, "Error in listener while closing connection", e2);
1348            }
1349        }
1350    }
1351
1352    /**
1353     * A wrapper class to associate a stanza filter with a listener.
1354     */
1355    protected static class ListenerWrapper {
1356
1357        private final StanzaListener packetListener;
1358        private final StanzaFilter packetFilter;
1359
1360        /**
1361         * Create a class which associates a stanza filter with a listener.
1362         *
1363         * @param packetListener the stanza listener.
1364         * @param packetFilter the associated filter or null if it listen for all packets.
1365         */
1366        public ListenerWrapper(StanzaListener packetListener, StanzaFilter packetFilter) {
1367            this.packetListener = packetListener;
1368            this.packetFilter = packetFilter;
1369        }
1370
1371        public boolean filterMatches(Stanza packet) {
1372            return packetFilter == null || packetFilter.accept(packet);
1373        }
1374
1375        public StanzaListener getListener() {
1376            return packetListener;
1377        }
1378    }
1379
1380    /**
1381     * A wrapper class to associate a stanza filter with an interceptor.
1382     */
1383    protected static class InterceptorWrapper {
1384
1385        private final StanzaListener packetInterceptor;
1386        private final StanzaFilter packetFilter;
1387
1388        /**
1389         * Create a class which associates a stanza filter with an interceptor.
1390         *
1391         * @param packetInterceptor the interceptor.
1392         * @param packetFilter the associated filter or null if it intercepts all packets.
1393         */
1394        public InterceptorWrapper(StanzaListener packetInterceptor, StanzaFilter packetFilter) {
1395            this.packetInterceptor = packetInterceptor;
1396            this.packetFilter = packetFilter;
1397        }
1398
1399        public boolean filterMatches(Stanza packet) {
1400            return packetFilter == null || packetFilter.accept(packet);
1401        }
1402
1403        public StanzaListener getInterceptor() {
1404            return packetInterceptor;
1405        }
1406    }
1407
1408    @Override
1409    public int getConnectionCounter() {
1410        return connectionCounterValue;
1411    }
1412
1413    @Override
1414    public void setFromMode(FromMode fromMode) {
1415        this.fromMode = fromMode;
1416    }
1417
1418    @Override
1419    public FromMode getFromMode() {
1420        return this.fromMode;
1421    }
1422
1423    protected final void parseFeatures(XmlPullParser parser) throws Exception {
1424        streamFeatures.clear();
1425        final int initialDepth = parser.getDepth();
1426        while (true) {
1427            int eventType = parser.next();
1428
1429            if (eventType == XmlPullParser.START_TAG && parser.getDepth() == initialDepth + 1) {
1430                ExtensionElement streamFeature = null;
1431                String name = parser.getName();
1432                String namespace = parser.getNamespace();
1433                switch (name) {
1434                case StartTls.ELEMENT:
1435                    streamFeature = PacketParserUtils.parseStartTlsFeature(parser);
1436                    break;
1437                case Mechanisms.ELEMENT:
1438                    streamFeature = new Mechanisms(PacketParserUtils.parseMechanisms(parser));
1439                    break;
1440                case Bind.ELEMENT:
1441                    streamFeature = Bind.Feature.INSTANCE;
1442                    break;
1443                case Session.ELEMENT:
1444                    streamFeature = PacketParserUtils.parseSessionFeature(parser);
1445                    break;
1446                case Compress.Feature.ELEMENT:
1447                    streamFeature = PacketParserUtils.parseCompressionFeature(parser);
1448                    break;
1449                default:
1450                    ExtensionElementProvider<ExtensionElement> provider = ProviderManager.getStreamFeatureProvider(name, namespace);
1451                    if (provider != null) {
1452                        streamFeature = provider.parse(parser);
1453                    }
1454                    break;
1455                }
1456                if (streamFeature != null) {
1457                    addStreamFeature(streamFeature);
1458                }
1459            }
1460            else if (eventType == XmlPullParser.END_TAG && parser.getDepth() == initialDepth) {
1461                break;
1462            }
1463        }
1464
1465        if (hasFeature(Mechanisms.ELEMENT, Mechanisms.NAMESPACE)) {
1466            // Only proceed with SASL auth if TLS is disabled or if the server doesn't announce it
1467            if (!hasFeature(StartTls.ELEMENT, StartTls.NAMESPACE)
1468                            || config.getSecurityMode() == SecurityMode.disabled) {
1469                tlsHandled.reportSuccess();
1470                saslFeatureReceived.reportSuccess();
1471            }
1472        }
1473
1474        // If the server reported the bind feature then we are that that we did SASL and maybe
1475        // STARTTLS. We can then report that the last 'stream:features' have been parsed
1476        if (hasFeature(Bind.ELEMENT, Bind.NAMESPACE)) {
1477            if (!hasFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE)
1478                            || !config.isCompressionEnabled()) {
1479                // This was was last features from the server is either it did not contain
1480                // compression or if we disabled it
1481                lastFeaturesReceived.reportSuccess();
1482            }
1483        }
1484        afterFeaturesReceived();
1485    }
1486
1487    @SuppressWarnings("unused")
1488    protected void afterFeaturesReceived() throws SecurityRequiredException, NotConnectedException, InterruptedException {
1489        // Default implementation does nothing
1490    }
1491
1492    @SuppressWarnings("unchecked")
1493    @Override
1494    public <F extends ExtensionElement> F getFeature(String element, String namespace) {
1495        return (F) streamFeatures.get(XmppStringUtils.generateKey(element, namespace));
1496    }
1497
1498    @Override
1499    public boolean hasFeature(String element, String namespace) {
1500        return getFeature(element, namespace) != null;
1501    }
1502
1503    protected void addStreamFeature(ExtensionElement feature) {
1504        String key = XmppStringUtils.generateKey(feature.getElementName(), feature.getNamespace());
1505        streamFeatures.put(key, feature);
1506    }
1507
1508    @SuppressWarnings("deprecation")
1509    @Override
1510    public void sendStanzaWithResponseCallback(Stanza stanza, StanzaFilter replyFilter,
1511                    StanzaListener callback) throws NotConnectedException, InterruptedException {
1512        sendStanzaWithResponseCallback(stanza, replyFilter, callback, null);
1513    }
1514
1515    @SuppressWarnings("deprecation")
1516    @Override
1517    public void sendStanzaWithResponseCallback(Stanza stanza, StanzaFilter replyFilter,
1518                    StanzaListener callback, ExceptionCallback exceptionCallback)
1519                    throws NotConnectedException, InterruptedException {
1520        sendStanzaWithResponseCallback(stanza, replyFilter, callback, exceptionCallback,
1521                        getReplyTimeout());
1522    }
1523
1524    @Override
1525    public SmackFuture<IQ, Exception> sendIqRequestAsync(IQ request) {
1526        return sendIqRequestAsync(request, getReplyTimeout());
1527    }
1528
1529    @Override
1530    public SmackFuture<IQ, Exception> sendIqRequestAsync(IQ request, long timeout) {
1531        StanzaFilter replyFilter = new IQReplyFilter(request, this);
1532        return sendAsync(request, replyFilter, timeout);
1533    }
1534
1535    @Override
1536    public <S extends Stanza> SmackFuture<S, Exception> sendAsync(S stanza, final StanzaFilter replyFilter) {
1537        return sendAsync(stanza, replyFilter, getReplyTimeout());
1538    }
1539
1540    @SuppressWarnings("FutureReturnValueIgnored")
1541    @Override
1542    public <S extends Stanza> SmackFuture<S, Exception> sendAsync(S stanza, final StanzaFilter replyFilter, long timeout) {
1543        Objects.requireNonNull(stanza, "stanza must not be null");
1544        // While Smack allows to add PacketListeners with a PacketFilter value of 'null', we
1545        // disallow it here in the async API as it makes no sense
1546        Objects.requireNonNull(replyFilter, "replyFilter must not be null");
1547
1548        final InternalSmackFuture<S, Exception> future = new InternalSmackFuture<>();
1549
1550        final StanzaListener stanzaListener = new StanzaListener() {
1551            @Override
1552            public void processStanza(Stanza stanza) throws NotConnectedException, InterruptedException {
1553                boolean removed = removeAsyncStanzaListener(this);
1554                if (!removed) {
1555                    // We lost a race against the "no response" handling runnable. Avoid calling the callback, as the
1556                    // exception callback will be invoked (if any).
1557                    return;
1558                }
1559                try {
1560                    XMPPErrorException.ifHasErrorThenThrow(stanza);
1561                    @SuppressWarnings("unchecked")
1562                    S s = (S) stanza;
1563                    future.setResult(s);
1564                }
1565                catch (XMPPErrorException exception) {
1566                    future.setException(exception);
1567                }
1568            }
1569        };
1570        schedule(new Runnable() {
1571            @Override
1572            public void run() {
1573                boolean removed = removeAsyncStanzaListener(stanzaListener);
1574                if (!removed) {
1575                    // We lost a race against the stanza listener, he already removed itself because he received a
1576                    // reply. There is nothing more to do here.
1577                    return;
1578                }
1579
1580                // If the packetListener got removed, then it was never run and
1581                // we never received a response, inform the exception callback
1582                Exception exception;
1583                if (!isConnected()) {
1584                    // If the connection is no longer connected, throw a not connected exception.
1585                    exception = new NotConnectedException(AbstractXMPPConnection.this, replyFilter);
1586                }
1587                else {
1588                    exception = NoResponseException.newWith(AbstractXMPPConnection.this, replyFilter);
1589                }
1590                future.setException(exception);
1591            }
1592        }, timeout, TimeUnit.MILLISECONDS);
1593
1594        addAsyncStanzaListener(stanzaListener, replyFilter);
1595        try {
1596            sendStanza(stanza);
1597        }
1598        catch (NotConnectedException | InterruptedException exception) {
1599            future.setException(exception);
1600        }
1601
1602        return future;
1603    }
1604
1605    @SuppressWarnings({ "FutureReturnValueIgnored", "deprecation" })
1606    @Override
1607    public void sendStanzaWithResponseCallback(Stanza stanza, final StanzaFilter replyFilter,
1608                    final StanzaListener callback, final ExceptionCallback exceptionCallback,
1609                    long timeout) throws NotConnectedException, InterruptedException {
1610        Objects.requireNonNull(stanza, "stanza must not be null");
1611        // While Smack allows to add PacketListeners with a PacketFilter value of 'null', we
1612        // disallow it here in the async API as it makes no sense
1613        Objects.requireNonNull(replyFilter, "replyFilter must not be null");
1614        Objects.requireNonNull(callback, "callback must not be null");
1615
1616        final StanzaListener packetListener = new StanzaListener() {
1617            @Override
1618            public void processStanza(Stanza packet) throws NotConnectedException, InterruptedException, NotLoggedInException {
1619                boolean removed = removeAsyncStanzaListener(this);
1620                if (!removed) {
1621                    // We lost a race against the "no response" handling runnable. Avoid calling the callback, as the
1622                    // exception callback will be invoked (if any).
1623                    return;
1624                }
1625                try {
1626                    XMPPErrorException.ifHasErrorThenThrow(packet);
1627                    callback.processStanza(packet);
1628                }
1629                catch (XMPPErrorException e) {
1630                    if (exceptionCallback != null) {
1631                        exceptionCallback.processException(e);
1632                    }
1633                }
1634            }
1635        };
1636        schedule(new Runnable() {
1637            @Override
1638            public void run() {
1639                boolean removed = removeAsyncStanzaListener(packetListener);
1640                // If the packetListener got removed, then it was never run and
1641                // we never received a response, inform the exception callback
1642                if (removed && exceptionCallback != null) {
1643                    Exception exception;
1644                    if (!isConnected()) {
1645                        // If the connection is no longer connected, throw a not connected exception.
1646                        exception = new NotConnectedException(AbstractXMPPConnection.this, replyFilter);
1647                    } else {
1648                        exception = NoResponseException.newWith(AbstractXMPPConnection.this, replyFilter);
1649                    }
1650                    final Exception exceptionToProcess = exception;
1651                    Async.go(new Runnable() {
1652                        @Override
1653                        public void run() {
1654                            exceptionCallback.processException(exceptionToProcess);
1655                        }
1656                    });
1657                }
1658            }
1659        }, timeout, TimeUnit.MILLISECONDS);
1660        addAsyncStanzaListener(packetListener, replyFilter);
1661        sendStanza(stanza);
1662    }
1663
1664    @SuppressWarnings("deprecation")
1665    @Override
1666    public void sendIqWithResponseCallback(IQ iqRequest, StanzaListener callback)
1667                    throws NotConnectedException, InterruptedException {
1668        sendIqWithResponseCallback(iqRequest, callback, null);
1669    }
1670
1671    @SuppressWarnings("deprecation")
1672    @Override
1673    public void sendIqWithResponseCallback(IQ iqRequest, StanzaListener callback,
1674                    ExceptionCallback exceptionCallback) throws NotConnectedException, InterruptedException {
1675        sendIqWithResponseCallback(iqRequest, callback, exceptionCallback, getReplyTimeout());
1676    }
1677
1678    @SuppressWarnings("deprecation")
1679    @Override
1680    public void sendIqWithResponseCallback(IQ iqRequest, final StanzaListener callback,
1681                    final ExceptionCallback exceptionCallback, long timeout)
1682                    throws NotConnectedException, InterruptedException {
1683        StanzaFilter replyFilter = new IQReplyFilter(iqRequest, this);
1684        sendStanzaWithResponseCallback(iqRequest, replyFilter, callback, exceptionCallback, timeout);
1685    }
1686
1687    @SuppressWarnings("FutureReturnValueIgnored")
1688    @Override
1689    public void addOneTimeSyncCallback(final StanzaListener callback, final StanzaFilter packetFilter) {
1690        final StanzaListener packetListener = new StanzaListener() {
1691            @Override
1692            public void processStanza(Stanza packet) throws NotConnectedException, InterruptedException, NotLoggedInException {
1693                try {
1694                    callback.processStanza(packet);
1695                } finally {
1696                    removeSyncStanzaListener(this);
1697                }
1698            }
1699        };
1700        addSyncStanzaListener(packetListener, packetFilter);
1701        schedule(new Runnable() {
1702            @Override
1703            public void run() {
1704                removeSyncStanzaListener(packetListener);
1705            }
1706        }, getReplyTimeout(), TimeUnit.MILLISECONDS);
1707    }
1708
1709    @Override
1710    public IQRequestHandler registerIQRequestHandler(final IQRequestHandler iqRequestHandler) {
1711        final String key = XmppStringUtils.generateKey(iqRequestHandler.getElement(), iqRequestHandler.getNamespace());
1712        switch (iqRequestHandler.getType()) {
1713        case set:
1714            synchronized (setIqRequestHandler) {
1715                return setIqRequestHandler.put(key, iqRequestHandler);
1716            }
1717        case get:
1718            synchronized (getIqRequestHandler) {
1719                return getIqRequestHandler.put(key, iqRequestHandler);
1720            }
1721        default:
1722            throw new IllegalArgumentException("Only IQ type of 'get' and 'set' allowed");
1723        }
1724    }
1725
1726    @Override
1727    public final IQRequestHandler unregisterIQRequestHandler(IQRequestHandler iqRequestHandler) {
1728        return unregisterIQRequestHandler(iqRequestHandler.getElement(), iqRequestHandler.getNamespace(),
1729                        iqRequestHandler.getType());
1730    }
1731
1732    @Override
1733    public IQRequestHandler unregisterIQRequestHandler(String element, String namespace, IQ.Type type) {
1734        final String key = XmppStringUtils.generateKey(element, namespace);
1735        switch (type) {
1736        case set:
1737            synchronized (setIqRequestHandler) {
1738                return setIqRequestHandler.remove(key);
1739            }
1740        case get:
1741            synchronized (getIqRequestHandler) {
1742                return getIqRequestHandler.remove(key);
1743            }
1744        default:
1745            throw new IllegalArgumentException("Only IQ type of 'get' and 'set' allowed");
1746        }
1747    }
1748
1749    private long lastStanzaReceived;
1750
1751    @Override
1752    public long getLastStanzaReceived() {
1753        return lastStanzaReceived;
1754    }
1755
1756    /**
1757     * Get the timestamp when the connection was the first time authenticated, i.e., when the first successful login was
1758     * performed. Note that this value is not reset on disconnect, so it represents the timestamp from the last
1759     * authenticated connection. The value is also not reset on stream resumption.
1760     *
1761     * @return the timestamp or {@code null}.
1762     * @since 4.3.3
1763     */
1764    public final long getAuthenticatedConnectionInitiallyEstablishedTimestamp() {
1765        return authenticatedConnectionInitiallyEstablishedTimestamp;
1766    }
1767
1768    /**
1769     * Install a parsing exception callback, which will be invoked once an exception is encountered while parsing a
1770     * stanza.
1771     *
1772     * @param callback the callback to install
1773     */
1774    public void setParsingExceptionCallback(ParsingExceptionCallback callback) {
1775        parsingExceptionCallback = callback;
1776    }
1777
1778    /**
1779     * Get the current active parsing exception callback.
1780     *
1781     * @return the active exception callback or null if there is none
1782     */
1783    public ParsingExceptionCallback getParsingExceptionCallback() {
1784        return parsingExceptionCallback;
1785    }
1786
1787    @Override
1788    public final String toString() {
1789        EntityFullJid localEndpoint = getUser();
1790        String localEndpointString = (localEndpoint == null ?  "not-authenticated" : localEndpoint.toString());
1791        return getClass().getSimpleName() + '[' + localEndpointString + "] (" + getConnectionCounter() + ')';
1792    }
1793
1794    /**
1795     * A queue of deferred runnables that where not executed immediately because {@link #currentAsyncRunnables} reached
1796     * {@link #maxAsyncRunnables}. Note that we use a {@code LinkedList} in order to avoid space blowups in case the
1797     * list ever becomes very big and shrinks again.
1798     */
1799    private final Queue<Runnable> deferredAsyncRunnables = new LinkedList<>();
1800
1801    private int deferredAsyncRunnablesCount;
1802
1803    private int deferredAsyncRunnablesCountPrevious;
1804
1805    private int maxAsyncRunnables = SmackConfiguration.getDefaultConcurrencyLevelLimit();
1806
1807    private int currentAsyncRunnables;
1808
1809    protected void asyncGoLimited(final Runnable runnable) {
1810        Runnable wrappedRunnable = new Runnable() {
1811            @Override
1812            public void run() {
1813                runnable.run();
1814
1815                synchronized (deferredAsyncRunnables) {
1816                    Runnable defferredRunnable = deferredAsyncRunnables.poll();
1817                    if (defferredRunnable == null) {
1818                        currentAsyncRunnables--;
1819                    } else {
1820                        deferredAsyncRunnablesCount--;
1821                        asyncGo(defferredRunnable);
1822                    }
1823                }
1824            }
1825        };
1826
1827        synchronized (deferredAsyncRunnables) {
1828            if (currentAsyncRunnables < maxAsyncRunnables) {
1829                currentAsyncRunnables++;
1830                asyncGo(wrappedRunnable);
1831            } else {
1832                deferredAsyncRunnablesCount++;
1833                deferredAsyncRunnables.add(wrappedRunnable);
1834            }
1835
1836            final int HIGH_WATERMARK = 100;
1837            final int INFORM_WATERMARK = 20;
1838
1839            final int deferredAsyncRunnablesCount = this.deferredAsyncRunnablesCount;
1840
1841            if (deferredAsyncRunnablesCount >= HIGH_WATERMARK
1842                    && deferredAsyncRunnablesCountPrevious < HIGH_WATERMARK) {
1843                LOGGER.log(Level.WARNING, "High watermark of " + HIGH_WATERMARK + " simultaneous executing runnables reached");
1844            } else if (deferredAsyncRunnablesCount >= INFORM_WATERMARK
1845                    && deferredAsyncRunnablesCountPrevious < INFORM_WATERMARK) {
1846                LOGGER.log(Level.INFO, INFORM_WATERMARK + " simultaneous executing runnables reached");
1847            }
1848
1849            deferredAsyncRunnablesCountPrevious = deferredAsyncRunnablesCount;
1850        }
1851    }
1852
1853    public void setMaxAsyncOperations(int maxAsyncOperations) {
1854        if (maxAsyncOperations < 1) {
1855            throw new IllegalArgumentException("Max async operations must be greater than 0");
1856        }
1857
1858        synchronized (deferredAsyncRunnables) {
1859            maxAsyncRunnables = maxAsyncOperations;
1860        }
1861    }
1862
1863    protected static void asyncGo(Runnable runnable) {
1864        CACHED_EXECUTOR_SERVICE.execute(runnable);
1865    }
1866
1867    protected static ScheduledFuture<?> schedule(Runnable runnable, long delay, TimeUnit unit) {
1868        return SCHEDULED_EXECUTOR_SERVICE.schedule(runnable, delay, unit);
1869    }
1870}