001/**
002 *
003 * Copyright 2003-2007 Jive Software.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack.tcp;
018
019import org.jivesoftware.smack.AbstractConnectionListener;
020import org.jivesoftware.smack.AbstractXMPPConnection;
021import org.jivesoftware.smack.ConnectionConfiguration;
022import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
023import org.jivesoftware.smack.ConnectionCreationListener;
024import org.jivesoftware.smack.StanzaListener;
025import org.jivesoftware.smack.SmackConfiguration;
026import org.jivesoftware.smack.SmackException;
027import org.jivesoftware.smack.SmackException.AlreadyConnectedException;
028import org.jivesoftware.smack.SmackException.AlreadyLoggedInException;
029import org.jivesoftware.smack.SmackException.NoResponseException;
030import org.jivesoftware.smack.SmackException.NotConnectedException;
031import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException;
032import org.jivesoftware.smack.SmackException.ConnectionException;
033import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException;
034import org.jivesoftware.smack.SynchronizationPoint;
035import org.jivesoftware.smack.XMPPException.StreamErrorException;
036import org.jivesoftware.smack.XMPPConnection;
037import org.jivesoftware.smack.XMPPException;
038import org.jivesoftware.smack.XMPPException.XMPPErrorException;
039import org.jivesoftware.smack.compress.packet.Compressed;
040import org.jivesoftware.smack.compression.XMPPInputOutputStream;
041import org.jivesoftware.smack.filter.StanzaFilter;
042import org.jivesoftware.smack.compress.packet.Compress;
043import org.jivesoftware.smack.packet.Element;
044import org.jivesoftware.smack.packet.IQ;
045import org.jivesoftware.smack.packet.Message;
046import org.jivesoftware.smack.packet.StreamOpen;
047import org.jivesoftware.smack.packet.Stanza;
048import org.jivesoftware.smack.packet.Presence;
049import org.jivesoftware.smack.packet.StartTls;
050import org.jivesoftware.smack.sasl.packet.SaslStreamElements;
051import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Challenge;
052import org.jivesoftware.smack.sasl.packet.SaslStreamElements.SASLFailure;
053import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Success;
054import org.jivesoftware.smack.sm.SMUtils;
055import org.jivesoftware.smack.sm.StreamManagementException;
056import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException;
057import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError;
058import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException;
059import org.jivesoftware.smack.sm.packet.StreamManagement;
060import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer;
061import org.jivesoftware.smack.sm.packet.StreamManagement.AckRequest;
062import org.jivesoftware.smack.sm.packet.StreamManagement.Enable;
063import org.jivesoftware.smack.sm.packet.StreamManagement.Enabled;
064import org.jivesoftware.smack.sm.packet.StreamManagement.Failed;
065import org.jivesoftware.smack.sm.packet.StreamManagement.Resume;
066import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed;
067import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature;
068import org.jivesoftware.smack.sm.predicates.Predicate;
069import org.jivesoftware.smack.sm.provider.ParseStreamManagement;
070import org.jivesoftware.smack.packet.PlainStreamElement;
071import org.jivesoftware.smack.packet.XMPPError;
072import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
073import org.jivesoftware.smack.util.Async;
074import org.jivesoftware.smack.util.PacketParserUtils;
075import org.jivesoftware.smack.util.StringUtils;
076import org.jivesoftware.smack.util.TLSUtils;
077import org.jivesoftware.smack.util.dns.HostAddress;
078import org.jxmpp.util.XmppStringUtils;
079import org.xmlpull.v1.XmlPullParser;
080import org.xmlpull.v1.XmlPullParserException;
081
082import javax.net.SocketFactory;
083import javax.net.ssl.HostnameVerifier;
084import javax.net.ssl.KeyManager;
085import javax.net.ssl.KeyManagerFactory;
086import javax.net.ssl.SSLContext;
087import javax.net.ssl.SSLSocket;
088import javax.security.auth.callback.Callback;
089import javax.security.auth.callback.PasswordCallback;
090
091import java.io.BufferedReader;
092import java.io.ByteArrayInputStream;
093import java.io.FileInputStream;
094import java.io.IOException;
095import java.io.InputStream;
096import java.io.InputStreamReader;
097import java.io.OutputStream;
098import java.io.OutputStreamWriter;
099import java.io.Writer;
100import java.lang.reflect.Constructor;
101import java.net.InetAddress;
102import java.net.InetSocketAddress;
103import java.net.Socket;
104import java.net.UnknownHostException;
105import java.security.KeyManagementException;
106import java.security.KeyStore;
107import java.security.KeyStoreException;
108import java.security.NoSuchAlgorithmException;
109import java.security.NoSuchProviderException;
110import java.security.Provider;
111import java.security.Security;
112import java.security.UnrecoverableKeyException;
113import java.security.cert.CertificateException;
114import java.util.ArrayList;
115import java.util.Arrays;
116import java.util.Collection;
117import java.util.Iterator;
118import java.util.LinkedHashSet;
119import java.util.LinkedList;
120import java.util.List;
121import java.util.Map;
122import java.util.Set;
123import java.util.concurrent.ArrayBlockingQueue;
124import java.util.concurrent.BlockingQueue;
125import java.util.concurrent.ConcurrentHashMap;
126import java.util.concurrent.ConcurrentLinkedQueue;
127import java.util.concurrent.TimeUnit;
128import java.util.concurrent.atomic.AtomicBoolean;
129import java.util.logging.Level;
130import java.util.logging.Logger;
131
132/**
133 * Creates a socket connection to an XMPP server. This is the default connection
134 * to an XMPP server and is specified in the XMPP Core (RFC 6120).
135 * 
136 * @see XMPPConnection
137 * @author Matt Tucker
138 */
139public class XMPPTCPConnection extends AbstractXMPPConnection {
140
141    private static final int QUEUE_SIZE = 500;
142    private static final Logger LOGGER = Logger.getLogger(XMPPTCPConnection.class.getName());
143
144    /**
145     * The socket which is used for this connection.
146     */
147    private Socket socket;
148
149    /**
150     * 
151     */
152    private boolean disconnectedButResumeable = false;
153
154    /**
155     * Flag to indicate if the socket was closed intentionally by Smack.
156     * <p>
157     * This boolean flag is used concurrently, therefore it is marked volatile.
158     * </p>
159     */
160    private volatile boolean socketClosed = false;
161
162    private boolean usingTLS = false;
163
164    /**
165     * Protected access level because of unit test purposes
166     */
167    protected PacketWriter packetWriter;
168
169    /**
170     * Protected access level because of unit test purposes
171     */
172    protected PacketReader packetReader;
173
174    private final SynchronizationPoint<Exception> initalOpenStreamSend = new SynchronizationPoint<Exception>(this);
175
176    /**
177     * 
178     */
179    private final SynchronizationPoint<XMPPException> maybeCompressFeaturesReceived = new SynchronizationPoint<XMPPException>(
180                    this);
181
182    /**
183     * 
184     */
185    private final SynchronizationPoint<XMPPException> compressSyncPoint = new SynchronizationPoint<XMPPException>(
186                    this);
187
188    /**
189     * The default bundle and defer callback, used for new connections.
190     * @see bundleAndDeferCallback
191     */
192    private static BundleAndDeferCallback defaultBundleAndDeferCallback;
193
194    /**
195     * The used bundle and defer callback.
196     * <p>
197     * Although this field may be set concurrently, the 'volatile' keyword was deliberately not added, in order to avoid
198     * having a 'volatile' read within the writer threads loop.
199     * </p>
200     */
201    private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback;
202
203    private static boolean useSmDefault = false;
204
205    private static boolean useSmResumptionDefault = true;
206
207    /**
208     * The stream ID of the stream that is currently resumable, ie. the stream we hold the state
209     * for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and
210     * {@link #unacknowledgedStanzas}.
211     */
212    private String smSessionId;
213
214    private final SynchronizationPoint<XMPPException> smResumedSyncPoint = new SynchronizationPoint<XMPPException>(
215                    this);
216
217    private final SynchronizationPoint<XMPPException> smEnabledSyncPoint = new SynchronizationPoint<XMPPException>(
218                    this);
219
220    /**
221     * The client's preferred maximum resumption time in seconds.
222     */
223    private int smClientMaxResumptionTime = -1;
224
225    /**
226     * The server's preferred maximum resumption time in seconds.
227     */
228    private int smServerMaxResumptimTime = -1;
229
230    /**
231     * Indicates whether Stream Management (XEP-198) should be used if it's supported by the server.
232     */
233    private boolean useSm = useSmDefault;
234    private boolean useSmResumption = useSmResumptionDefault;
235
236    /**
237     * The counter that the server sends the client about it's current height. For example, if the server sends
238     * {@code <a h='42'/>}, then this will be set to 42 (while also handling the {@link #unacknowledgedStanzas} queue).
239     */
240    private long serverHandledStanzasCount = 0;
241
242    /**
243     * The counter for stanzas handled ("received") by the client.
244     * <p>
245     * Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are
246     * not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since
247     * {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and
248     * therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic.
249     * </p>
250     */
251    private long clientHandledStanzasCount = 0;
252
253    private BlockingQueue<Stanza> unacknowledgedStanzas;
254
255    /**
256     * Set to true if Stream Management was at least once enabled for this connection.
257     */
258    private boolean smWasEnabledAtLeastOnce = false;
259
260    /**
261     * This listeners are invoked for every stanza that got acknowledged.
262     * <p>
263     * We use a {@link ConccurrentLinkedQueue} here in order to allow the listeners to remove
264     * themselves after they have been invoked.
265     * </p>
266     */
267    private final Collection<StanzaListener> stanzaAcknowledgedListeners = new ConcurrentLinkedQueue<StanzaListener>();
268
269    /**
270     * This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will
271     * only be invoked once and automatically removed after that.
272     */
273    private final Map<String, StanzaListener> stanzaIdAcknowledgedListeners = new ConcurrentHashMap<String, StanzaListener>();
274
275    /**
276     * Predicates that determine if an stream management ack should be requested from the server.
277     * <p>
278     * We use a linked hash set here, so that the order how the predicates are added matches the
279     * order in which they are invoked in order to determine if an ack request should be send or not.
280     * </p>
281     */
282    private final Set<StanzaFilter> requestAckPredicates = new LinkedHashSet<StanzaFilter>();
283
284    private final XMPPTCPConnectionConfiguration config;
285
286    /**
287     * Creates a new XMPP connection over TCP (optionally using proxies).
288     * <p>
289     * Note that XMPPTCPConnection constructors do not establish a connection to the server
290     * and you must call {@link #connect()}.
291     * </p>
292     *
293     * @param config the connection configuration.
294     */
295    public XMPPTCPConnection(XMPPTCPConnectionConfiguration config) {
296        super(config);
297        this.config = config;
298        addConnectionListener(new AbstractConnectionListener() {
299            @Override
300            public void connectionClosedOnError(Exception e) {
301                if (e instanceof XMPPException.StreamErrorException) {
302                    dropSmState();
303                }
304            }
305        });
306    }
307
308    /**
309     * Creates a new XMPP connection over TCP.
310     * <p>
311     * Note that {@code jid} must be the bare JID, e.g. "user@example.org". More fine-grained control over the
312     * connection settings is available using the {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)}
313     * constructor.
314     * </p>
315     * 
316     * @param jid the bare JID used by the client.
317     * @param password the password or authentication token.
318     */
319    public XMPPTCPConnection(CharSequence jid, String password) {
320        this(XmppStringUtils.parseLocalpart(jid.toString()), password, XmppStringUtils.parseDomain(jid.toString()));
321    }
322
323    /**
324     * Creates a new XMPP connection over TCP.
325     * <p>
326     * This is the simplest constructor for connecting to an XMPP server. Alternatively,
327     * you can get fine-grained control over connection settings using the
328     * {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} constructor.
329     * </p>
330     * @param username
331     * @param password
332     * @param serviceName
333     */
334    public XMPPTCPConnection(CharSequence username, String password, String serviceName) {
335        this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setServiceName(
336                                        serviceName).build());
337    }
338
339    @Override
340    protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException {
341        if (packetWriter == null) {
342            throw new NotConnectedException();
343        }
344        packetWriter.throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
345    }
346
347    @Override
348    protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException {
349        if (isConnected() && !disconnectedButResumeable) {
350            throw new AlreadyConnectedException();
351        }
352    }
353
354    @Override
355    protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException {
356        if (isAuthenticated() && !disconnectedButResumeable) {
357            throw new AlreadyLoggedInException();
358        }
359    }
360
361    @Override
362    protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException {
363        // Reset the flag in case it was set
364        disconnectedButResumeable = false;
365        super.afterSuccessfulLogin(resumed);
366    }
367
368    @Override
369    protected synchronized void loginNonAnonymously(String username, String password, String resource) throws XMPPException, SmackException, IOException {
370        if (saslAuthentication.hasNonAnonymousAuthentication()) {
371            // Authenticate using SASL
372            if (password != null) {
373                saslAuthentication.authenticate(username, password, resource);
374            }
375            else {
376                saslAuthentication.authenticate(resource, config.getCallbackHandler());
377            }
378        } else {
379            throw new SmackException("No non-anonymous SASL authentication mechanism available");
380        }
381
382        // If compression is enabled then request the server to use stream compression. XEP-170
383        // recommends to perform stream compression before resource binding.
384        if (config.isCompressionEnabled()) {
385            useCompression();
386        }
387
388        if (isSmResumptionPossible()) {
389            smResumedSyncPoint.sendAndWaitForResponse(new Resume(clientHandledStanzasCount, smSessionId));
390            if (smResumedSyncPoint.wasSuccessful()) {
391                // We successfully resumed the stream, be done here
392                afterSuccessfulLogin(true);
393                return;
394            }
395            // SM resumption failed, what Smack does here is to report success of
396            // lastFeaturesReceived in case of sm resumption was answered with 'failed' so that
397            // normal resource binding can be tried.
398            LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process");
399        }
400
401        List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>();
402        if (unacknowledgedStanzas != null) {
403            // There was a previous connection with SM enabled but that was either not resumable or
404            // failed to resume. Make sure that we (re-)send the unacknowledged stanzas.
405            unacknowledgedStanzas.drainTo(previouslyUnackedStanzas);
406            // Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this
407            // XMPP session (There maybe was an enabled in a previous XMPP session of this
408            // connection instance though). This is used in writePackets to decide if stanzas should
409            // be added to the unacknowledged stanzas queue, because they have to be added right
410            // after the 'enable' stream element has been sent.
411            dropSmState();
412        }
413
414        // Now bind the resource. It is important to do this *after* we dropped an eventually
415        // existing Stream Management state. As otherwise <bind/> and <session/> may end up in
416        // unacknowledgedStanzas and become duplicated on reconnect. See SMACK-706.
417        bindResourceAndEstablishSession(resource);
418
419        if (isSmAvailable() && useSm) {
420            // Remove what is maybe left from previously stream managed sessions
421            serverHandledStanzasCount = 0;
422            // XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed'
423            // then this is a non recoverable error and we therefore throw an exception.
424            smEnabledSyncPoint.sendAndWaitForResponseOrThrow(new Enable(useSmResumption, smClientMaxResumptionTime));
425            synchronized (requestAckPredicates) {
426                if (requestAckPredicates.isEmpty()) {
427                    // Assure that we have at lest one predicate set up that so that we request acks
428                    // for the server and eventually flush some stanzas from the unacknowledged
429                    // stanza queue
430                    requestAckPredicates.add(Predicate.forMessagesOrAfter5Stanzas());
431                }
432            }
433        }
434        // (Re-)send the stanzas *after* we tried to enable SM
435        for (Stanza stanza : previouslyUnackedStanzas) {
436            sendStanzaInternal(stanza);
437        }
438
439        afterSuccessfulLogin(false);
440    }
441
442    @Override
443    public synchronized void loginAnonymously() throws XMPPException, SmackException, IOException {
444        // Wait with SASL auth until the SASL mechanisms have been received
445        saslFeatureReceived.checkIfSuccessOrWaitOrThrow();
446
447        if (saslAuthentication.hasAnonymousAuthentication()) {
448            saslAuthentication.authenticateAnonymously();
449        }
450        else {
451            throw new SmackException("No anonymous SASL authentication mechanism available");
452        }
453
454        // If compression is enabled then request the server to use stream compression
455        if (config.isCompressionEnabled()) {
456            useCompression();
457        }
458
459        bindResourceAndEstablishSession(null);
460
461        afterSuccessfulLogin(false);
462    }
463
464    @Override
465    public boolean isSecureConnection() {
466        return usingTLS;
467    }
468
469    public boolean isSocketClosed() {
470        return socketClosed;
471    }
472
473    /**
474     * Shuts the current connection down. After this method returns, the connection must be ready
475     * for re-use by connect.
476     */
477    @Override
478    protected void shutdown() {
479        if (isSmEnabled()) {
480            try {
481                // Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM
482                // state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either.
483                sendSmAcknowledgementInternal();
484            } catch (NotConnectedException e) {
485                LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e);
486            }
487        }
488        shutdown(false);
489    }
490
491    /**
492     * Performs an unclean disconnect and shutdown of the connection. Does not send a closing stream stanza.
493     */
494    public synchronized void instantShutdown() {
495        shutdown(true);
496    }
497
498    private void shutdown(boolean instant) {
499        if (disconnectedButResumeable) {
500            return;
501        }
502        if (packetReader != null) {
503                packetReader.shutdown();
504        }
505        if (packetWriter != null) {
506                packetWriter.shutdown(instant);
507        }
508
509        // Set socketClosed to true. This will cause the PacketReader
510        // and PacketWriter to ignore any Exceptions that are thrown
511        // because of a read/write from/to a closed stream.
512        // It is *important* that this is done before socket.close()!
513        socketClosed = true;
514        try {
515                socket.close();
516        } catch (Exception e) {
517                LOGGER.log(Level.WARNING, "shutdown", e);
518        }
519
520        setWasAuthenticated();
521        // If we are able to resume the stream, then don't set
522        // connected/authenticated/usingTLS to false since we like behave like we are still
523        // connected (e.g. sendStanza should not throw a NotConnectedException).
524        if (isSmResumptionPossible() && instant) {
525            disconnectedButResumeable = true;
526        } else {
527            disconnectedButResumeable = false;
528            // Reset the stream management session id to null, since if the stream is cleanly closed, i.e. sending a closing
529            // stream tag, there is no longer a stream to resume.
530            smSessionId = null;
531        }
532        authenticated = false;
533        connected = false;
534        usingTLS = false;
535        reader = null;
536        writer = null;
537
538        maybeCompressFeaturesReceived.init();
539        compressSyncPoint.init();
540        smResumedSyncPoint.init();
541        smEnabledSyncPoint.init();
542        initalOpenStreamSend.init();
543    }
544
545    @Override
546    public void send(PlainStreamElement element) throws NotConnectedException {
547        packetWriter.sendStreamElement(element);
548    }
549
550    @Override
551    protected void sendStanzaInternal(Stanza packet) throws NotConnectedException {
552        packetWriter.sendStreamElement(packet);
553        if (isSmEnabled()) {
554            for (StanzaFilter requestAckPredicate : requestAckPredicates) {
555                if (requestAckPredicate.accept(packet)) {
556                    requestSmAcknowledgementInternal();
557                    break;
558                }
559            }
560        }
561    }
562
563    private void connectUsingConfiguration() throws IOException, ConnectionException {
564        List<HostAddress> failedAddresses = populateHostAddresses();
565        SocketFactory socketFactory = config.getSocketFactory();
566        if (socketFactory == null) {
567            socketFactory = SocketFactory.getDefault();
568        }
569        for (HostAddress hostAddress : hostAddresses) {
570            String host = hostAddress.getFQDN();
571            int port = hostAddress.getPort();
572            socket = socketFactory.createSocket();
573            try {
574                Iterator<InetAddress> inetAddresses = Arrays.asList(InetAddress.getAllByName(host)).iterator();
575                if (!inetAddresses.hasNext()) {
576                    // This should not happen
577                    LOGGER.warning("InetAddress.getAllByName() returned empty result array.");
578                    throw new UnknownHostException(host);
579                }
580                innerloop: while (inetAddresses.hasNext()) {
581                    // Create a *new* Socket before every connection attempt, i.e. connect() call, since Sockets are not
582                    // re-usable after a failed connection attempt. See also SMACK-724.
583                    socket = socketFactory.createSocket();
584
585                    final InetAddress inetAddress = inetAddresses.next();
586                    final String inetAddressAndPort = inetAddress + " at port " + port;
587                    LOGGER.finer("Trying to establish TCP connection to " + inetAddressAndPort);
588                    try {
589                        socket.connect(new InetSocketAddress(inetAddress, port), config.getConnectTimeout());
590                    } catch (Exception e) {
591                        if (inetAddresses.hasNext()) {
592                            continue innerloop;
593                        } else {
594                            throw e;
595                        }
596                    }
597                    LOGGER.finer("Established TCP connection to " + inetAddressAndPort);
598                    // We found a host to connect to, return here
599                    this.host = host;
600                    this.port = port;
601                    return;
602                }
603            }
604            catch (Exception e) {
605                hostAddress.setException(e);
606                failedAddresses.add(hostAddress);
607            }
608        }
609        // There are no more host addresses to try
610        // throw an exception and report all tried
611        // HostAddresses in the exception
612        throw ConnectionException.from(failedAddresses);
613    }
614
615    /**
616     * Initializes the connection by creating a stanza(/packet) reader and writer and opening a
617     * XMPP stream to the server.
618     *
619     * @throws XMPPException if establishing a connection to the server fails.
620     * @throws SmackException if the server failes to respond back or if there is anther error.
621     * @throws IOException 
622     */
623    private void initConnection() throws IOException {
624        boolean isFirstInitialization = packetReader == null || packetWriter == null;
625        compressionHandler = null;
626
627        // Set the reader and writer instance variables
628        initReaderAndWriter();
629
630        if (isFirstInitialization) {
631            packetWriter = new PacketWriter();
632            packetReader = new PacketReader();
633
634            // If debugging is enabled, we should start the thread that will listen for
635            // all packets and then log them.
636            if (config.isDebuggerEnabled()) {
637                addAsyncStanzaListener(debugger.getReaderListener(), null);
638                if (debugger.getWriterListener() != null) {
639                    addPacketSendingListener(debugger.getWriterListener(), null);
640                }
641            }
642        }
643        // Start the packet writer. This will open an XMPP stream to the server
644        packetWriter.init();
645        // Start the packet reader. The startup() method will block until we
646        // get an opening stream packet back from server
647        packetReader.init();
648
649        if (isFirstInitialization) {
650            // Notify listeners that a new connection has been established
651            for (ConnectionCreationListener listener : getConnectionCreationListeners()) {
652                listener.connectionCreated(this);
653            }
654        }
655    }
656
657    private void initReaderAndWriter() throws IOException {
658        InputStream is = socket.getInputStream();
659        OutputStream os = socket.getOutputStream();
660        if (compressionHandler != null) {
661            is = compressionHandler.getInputStream(is);
662            os = compressionHandler.getOutputStream(os);
663        }
664        // OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter
665        writer = new OutputStreamWriter(os, "UTF-8");
666        reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
667
668        // If debugging is enabled, we open a window and write out all network traffic.
669        initDebugger();
670    }
671
672    /**
673     * The server has indicated that TLS negotiation can start. We now need to secure the
674     * existing plain connection and perform a handshake. This method won't return until the
675     * connection has finished the handshake or an error occurred while securing the connection.
676     * @throws IOException 
677     * @throws CertificateException 
678     * @throws NoSuchAlgorithmException 
679     * @throws NoSuchProviderException 
680     * @throws KeyStoreException 
681     * @throws UnrecoverableKeyException 
682     * @throws KeyManagementException 
683     * @throws SmackException 
684     * @throws Exception if an exception occurs.
685     */
686    private void proceedTLSReceived() throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException, NoSuchProviderException, UnrecoverableKeyException, KeyManagementException, SmackException {
687        SSLContext context = this.config.getCustomSSLContext();
688        KeyStore ks = null;
689        KeyManager[] kms = null;
690        PasswordCallback pcb = null;
691
692        if(config.getCallbackHandler() == null) {
693           ks = null;
694        } else if (context == null) {
695            if(config.getKeystoreType().equals("NONE")) {
696                ks = null;
697                pcb = null;
698            }
699            else if(config.getKeystoreType().equals("PKCS11")) {
700                try {
701                    Constructor<?> c = Class.forName("sun.security.pkcs11.SunPKCS11").getConstructor(InputStream.class);
702                    String pkcs11Config = "name = SmartCard\nlibrary = "+config.getPKCS11Library();
703                    ByteArrayInputStream config = new ByteArrayInputStream(pkcs11Config.getBytes());
704                    Provider p = (Provider)c.newInstance(config);
705                    Security.addProvider(p);
706                    ks = KeyStore.getInstance("PKCS11",p);
707                    pcb = new PasswordCallback("PKCS11 Password: ",false);
708                    this.config.getCallbackHandler().handle(new Callback[]{pcb});
709                    ks.load(null,pcb.getPassword());
710                }
711                catch (Exception e) {
712                    ks = null;
713                    pcb = null;
714                }
715            }
716            else if(config.getKeystoreType().equals("Apple")) {
717                ks = KeyStore.getInstance("KeychainStore","Apple");
718                ks.load(null,null);
719                //pcb = new PasswordCallback("Apple Keychain",false);
720                //pcb.setPassword(null);
721            }
722            else {
723                ks = KeyStore.getInstance(config.getKeystoreType());
724                try {
725                    pcb = new PasswordCallback("Keystore Password: ",false);
726                    config.getCallbackHandler().handle(new Callback[]{pcb});
727                    ks.load(new FileInputStream(config.getKeystorePath()), pcb.getPassword());
728                }
729                catch(Exception e) {
730                    ks = null;
731                    pcb = null;
732                }
733            }
734            KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
735            try {
736                if(pcb == null) {
737                    kmf.init(ks,null);
738                } else {
739                    kmf.init(ks,pcb.getPassword());
740                    pcb.clearPassword();
741                }
742                kms = kmf.getKeyManagers();
743            } catch (NullPointerException npe) {
744                kms = null;
745            }
746        }
747
748        // If the user didn't specify a SSLContext, use the default one
749        if (context == null) {
750            context = SSLContext.getInstance("TLS");
751            context.init(kms, null, new java.security.SecureRandom());
752        }
753        Socket plain = socket;
754        // Secure the plain connection
755        socket = context.getSocketFactory().createSocket(plain,
756                host, plain.getPort(), true);
757
758        final SSLSocket sslSocket = (SSLSocket) socket;
759        // Immediately set the enabled SSL protocols and ciphers. See SMACK-712 why this is
760        // important (at least on certain platforms) and it seems to be a good idea anyways to
761        // prevent an accidental implicit handshake.
762        TLSUtils.setEnabledProtocolsAndCiphers(sslSocket, config.getEnabledSSLProtocols(), config.getEnabledSSLCiphers());
763
764        // Initialize the reader and writer with the new secured version
765        initReaderAndWriter();
766
767        // Proceed to do the handshake
768        sslSocket.startHandshake();
769
770        final HostnameVerifier verifier = getConfiguration().getHostnameVerifier();
771        if (verifier == null) {
772                throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure.");
773        } else if (!verifier.verify(getServiceName(), sslSocket.getSession())) {
774            throw new CertificateException("Hostname verification of certificate failed. Certificate does not authenticate " + getServiceName());
775        }
776
777        // Set that TLS was successful
778        usingTLS = true;
779    }
780
781    /**
782     * Returns the compression handler that can be used for one compression methods offered by the server.
783     * 
784     * @return a instance of XMPPInputOutputStream or null if no suitable instance was found
785     * 
786     */
787    private XMPPInputOutputStream maybeGetCompressionHandler() {
788        Compress.Feature compression = getFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE);
789        if (compression == null) {
790            // Server does not support compression
791            return null;
792        }
793        for (XMPPInputOutputStream handler : SmackConfiguration.getCompresionHandlers()) {
794                String method = handler.getCompressionMethod();
795                if (compression.getMethods().contains(method))
796                    return handler;
797        }
798        return null;
799    }
800
801    @Override
802    public boolean isUsingCompression() {
803        return compressionHandler != null && compressSyncPoint.wasSuccessful();
804    }
805
806    /**
807     * <p>
808     * Starts using stream compression that will compress network traffic. Traffic can be
809     * reduced up to 90%. Therefore, stream compression is ideal when using a slow speed network
810     * connection. However, the server and the client will need to use more CPU time in order to
811     * un/compress network data so under high load the server performance might be affected.
812     * </p>
813     * <p>
814     * Stream compression has to have been previously offered by the server. Currently only the
815     * zlib method is supported by the client. Stream compression negotiation has to be done
816     * before authentication took place.
817     * </p>
818     *
819     * @throws NotConnectedException 
820     * @throws XMPPException 
821     * @throws NoResponseException 
822     */
823    private void useCompression() throws NotConnectedException, NoResponseException, XMPPException {
824        maybeCompressFeaturesReceived.checkIfSuccessOrWait();
825        // If stream compression was offered by the server and we want to use
826        // compression then send compression request to the server
827        if ((compressionHandler = maybeGetCompressionHandler()) != null) {
828            compressSyncPoint.sendAndWaitForResponseOrThrow(new Compress(compressionHandler.getCompressionMethod()));
829        } else {
830            LOGGER.warning("Could not enable compression because no matching handler/method pair was found");
831        }
832    }
833
834    /**
835     * Establishes a connection to the XMPP server and performs an automatic login
836     * only if the previous connection state was logged (authenticated). It basically
837     * creates and maintains a socket connection to the server.<p>
838     * <p/>
839     * Listeners will be preserved from a previous connection if the reconnection
840     * occurs after an abrupt termination.
841     *
842     * @throws XMPPException if an error occurs while trying to establish the connection.
843     * @throws SmackException 
844     * @throws IOException 
845     */
846    @Override
847    protected void connectInternal() throws SmackException, IOException, XMPPException {
848        // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if
849        // there is an error establishing the connection
850        connectUsingConfiguration();
851
852        // We connected successfully to the servers TCP port
853        socketClosed = false;
854        initConnection();
855
856        // Wait with SASL auth until the SASL mechanisms have been received
857        saslFeatureReceived.checkIfSuccessOrWaitOrThrow();
858
859        // If TLS is required but the server doesn't offer it, disconnect
860        // from the server and throw an error. First check if we've already negotiated TLS
861        // and are secure, however (features get parsed a second time after TLS is established).
862        if (!isSecureConnection() && getConfiguration().getSecurityMode() == SecurityMode.required) {
863            shutdown();
864            throw new SecurityRequiredByClientException();
865        }
866
867        // Make note of the fact that we're now connected.
868        connected = true;
869        callConnectionConnectedListener();
870
871        // Automatically makes the login if the user was previously connected successfully
872        // to the server and the connection was terminated abruptly
873        if (wasAuthenticated) {
874            login();
875            notifyReconnection();
876        }
877    }
878
879    /**
880     * Sends out a notification that there was an error with the connection
881     * and closes the connection. Also prints the stack trace of the given exception
882     *
883     * @param e the exception that causes the connection close event.
884     */
885    private synchronized void notifyConnectionError(Exception e) {
886        // Listeners were already notified of the exception, return right here.
887        if ((packetReader == null || packetReader.done) &&
888                (packetWriter == null || packetWriter.done())) return;
889
890        // Closes the connection temporary. A reconnection is possible
891        instantShutdown();
892
893        // Notify connection listeners of the error.
894        callConnectionClosedOnErrorListener(e);
895    }
896
897    /**
898     * For unit testing purposes
899     *
900     * @param writer
901     */
902    protected void setWriter(Writer writer) {
903        this.writer = writer;
904    }
905
906    @Override
907    protected void afterFeaturesReceived() throws NotConnectedException {
908        StartTls startTlsFeature = getFeature(StartTls.ELEMENT, StartTls.NAMESPACE);
909        if (startTlsFeature != null) {
910            if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) {
911                notifyConnectionError(new SecurityRequiredByServerException());
912                return;
913            }
914
915            if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) {
916                send(new StartTls());
917            }
918        }
919
920        if (getSASLAuthentication().authenticationSuccessful()) {
921            // If we have received features after the SASL has been successfully completed, then we
922            // have also *maybe* received, as it is an optional feature, the compression feature
923            // from the server.
924            maybeCompressFeaturesReceived.reportSuccess();
925        }
926    }
927
928    /**
929     * Resets the parser using the latest connection's reader. Reseting the parser is necessary
930     * when the plain connection has been secured or when a new opening stream element is going
931     * to be sent by the server.
932     *
933     * @throws SmackException if the parser could not be reset.
934     */
935    void openStream() throws SmackException {
936        // If possible, provide the receiving entity of the stream open tag, i.e. the server, as much information as
937        // possible. The 'to' attribute is *always* available. The 'from' attribute if set by the user and no external
938        // mechanism is used to determine the local entity (user). And the 'id' attribute is available after the first
939        // response from the server (see e.g. RFC 6120 ยง 9.1.1 Step 2.)
940        CharSequence to = getServiceName();
941        CharSequence from = null;
942        CharSequence localpart = config.getUsername();
943        if (localpart != null) {
944            from = XmppStringUtils.completeJidFrom(localpart, to);
945        }
946        String id = getStreamId();
947        send(new StreamOpen(to, from, id));
948        try {
949            packetReader.parser = PacketParserUtils.newXmppParser(reader);
950        }
951        catch (XmlPullParserException e) {
952            throw new SmackException(e);
953        }
954    }
955
956    protected class PacketReader {
957
958        XmlPullParser parser;
959
960        private volatile boolean done;
961
962        /**
963         * Initializes the reader in order to be used. The reader is initialized during the
964         * first connection and when reconnecting due to an abruptly disconnection.
965         */
966        void init() {
967            done = false;
968
969            Async.go(new Runnable() {
970                public void run() {
971                    parsePackets();
972                }
973            }, "Smack Packet Reader (" + getConnectionCounter() + ")");
974         }
975
976        /**
977         * Shuts the stanza(/packet) reader down. This method simply sets the 'done' flag to true.
978         */
979        void shutdown() {
980            done = true;
981        }
982
983        /**
984         * Parse top-level packets in order to process them further.
985         *
986         * @param thread the thread that is being used by the reader to parse incoming packets.
987         */
988        private void parsePackets() {
989            try {
990                initalOpenStreamSend.checkIfSuccessOrWait();
991                int eventType = parser.getEventType();
992                while (!done) {
993                    switch (eventType) {
994                    case XmlPullParser.START_TAG:
995                        final String name = parser.getName();
996                        switch (name) {
997                        case Message.ELEMENT:
998                        case IQ.IQ_ELEMENT:
999                        case Presence.ELEMENT:
1000                            try {
1001                                parseAndProcessStanza(parser);
1002                            } finally {
1003                                clientHandledStanzasCount = SMUtils.incrementHeight(clientHandledStanzasCount);
1004                            }
1005                            break;
1006                        case "stream":
1007                            // We found an opening stream.
1008                            if ("jabber:client".equals(parser.getNamespace(null))) {
1009                                streamId = parser.getAttributeValue("", "id");
1010                                String reportedServiceName = parser.getAttributeValue("", "from");
1011                                assert(reportedServiceName.equals(config.getServiceName()));
1012                            }
1013                            break;
1014                        case "error":
1015                            throw new StreamErrorException(PacketParserUtils.parseStreamError(parser));
1016                        case "features":
1017                            parseFeatures(parser);
1018                            break;
1019                        case "proceed":
1020                            try {
1021                                // Secure the connection by negotiating TLS
1022                                proceedTLSReceived();
1023                                // Send a new opening stream to the server
1024                                openStream();
1025                            }
1026                            catch (Exception e) {
1027                                // We report any failure regarding TLS in the second stage of XMPP
1028                                // connection establishment, namely the SASL authentication
1029                                saslFeatureReceived.reportFailure(new SmackException(e));
1030                                throw e;
1031                            }
1032                            break;
1033                        case "failure":
1034                            String namespace = parser.getNamespace(null);
1035                            switch (namespace) {
1036                            case "urn:ietf:params:xml:ns:xmpp-tls":
1037                                // TLS negotiation has failed. The server will close the connection
1038                                // TODO Parse failure stanza
1039                                throw new XMPPErrorException("TLS negotiation has failed", null);
1040                            case "http://jabber.org/protocol/compress":
1041                                // Stream compression has been denied. This is a recoverable
1042                                // situation. It is still possible to authenticate and
1043                                // use the connection but using an uncompressed connection
1044                                // TODO Parse failure stanza
1045                                compressSyncPoint.reportFailure(new XMPPErrorException(
1046                                                "Could not establish compression", null));
1047                                break;
1048                            case SaslStreamElements.NAMESPACE:
1049                                // SASL authentication has failed. The server may close the connection
1050                                // depending on the number of retries
1051                                final SASLFailure failure = PacketParserUtils.parseSASLFailure(parser);
1052                                getSASLAuthentication().authenticationFailed(failure);
1053                                break;
1054                            }
1055                            break;
1056                        case Challenge.ELEMENT:
1057                            // The server is challenging the SASL authentication made by the client
1058                            String challengeData = parser.nextText();
1059                            getSASLAuthentication().challengeReceived(challengeData);
1060                            break;
1061                        case Success.ELEMENT:
1062                            Success success = new Success(parser.nextText());
1063                            // We now need to bind a resource for the connection
1064                            // Open a new stream and wait for the response
1065                            openStream();
1066                            // The SASL authentication with the server was successful. The next step
1067                            // will be to bind the resource
1068                            getSASLAuthentication().authenticated(success);
1069                            break;
1070                        case Compressed.ELEMENT:
1071                            // Server confirmed that it's possible to use stream compression. Start
1072                            // stream compression
1073                            // Initialize the reader and writer with the new compressed version
1074                            initReaderAndWriter();
1075                            // Send a new opening stream to the server
1076                            openStream();
1077                            // Notify that compression is being used
1078                            compressSyncPoint.reportSuccess();
1079                            break;
1080                        case Enabled.ELEMENT:
1081                            Enabled enabled = ParseStreamManagement.enabled(parser);
1082                            if (enabled.isResumeSet()) {
1083                                smSessionId = enabled.getId();
1084                                if (StringUtils.isNullOrEmpty(smSessionId)) {
1085                                    XMPPErrorException xmppException = new XMPPErrorException(
1086                                                    "Stream Management 'enabled' element with resume attribute but without session id received",
1087                                                    new XMPPError(
1088                                                                    XMPPError.Condition.bad_request));
1089                                    smEnabledSyncPoint.reportFailure(xmppException);
1090                                    throw xmppException;
1091                                }
1092                                smServerMaxResumptimTime = enabled.getMaxResumptionTime();
1093                            } else {
1094                                // Mark this a non-resumable stream by setting smSessionId to null
1095                                smSessionId = null;
1096                            }
1097                            clientHandledStanzasCount = 0;
1098                            smWasEnabledAtLeastOnce = true;
1099                            smEnabledSyncPoint.reportSuccess();
1100                            LOGGER.fine("Stream Management (XEP-198): succesfully enabled");
1101                            break;
1102                        case Failed.ELEMENT:
1103                            Failed failed = ParseStreamManagement.failed(parser);
1104                            XMPPError xmppError = new XMPPError(failed.getXMPPErrorCondition());
1105                            XMPPException xmppException = new XMPPErrorException("Stream Management failed", xmppError);
1106                            // If only XEP-198 would specify different failure elements for the SM
1107                            // enable and SM resume failure case. But this is not the case, so we
1108                            // need to determine if this is a 'Failed' response for either 'Enable'
1109                            // or 'Resume'.
1110                            if (smResumedSyncPoint.requestSent()) {
1111                                smResumedSyncPoint.reportFailure(xmppException);
1112                            }
1113                            else {
1114                                if (!smEnabledSyncPoint.requestSent()) {
1115                                    throw new IllegalStateException("Failed element received but SM was not previously enabled");
1116                                }
1117                                smEnabledSyncPoint.reportFailure(xmppException);
1118                                // Report success for last lastFeaturesReceived so that in case a
1119                                // failed resumption, we can continue with normal resource binding.
1120                                // See text of XEP-198 5. below Example 11.
1121                                lastFeaturesReceived.reportSuccess();
1122                            }
1123                            break;
1124                        case Resumed.ELEMENT:
1125                            Resumed resumed = ParseStreamManagement.resumed(parser);
1126                            if (!smSessionId.equals(resumed.getPrevId())) {
1127                                throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId());
1128                            }
1129                            // Mark SM as enabled and resumption as successful.
1130                            smResumedSyncPoint.reportSuccess();
1131                            smEnabledSyncPoint.reportSuccess();
1132                            // First, drop the stanzas already handled by the server
1133                            processHandledCount(resumed.getHandledCount());
1134                            // Then re-send what is left in the unacknowledged queue
1135                            List<Stanza> stanzasToResend = new ArrayList<>(unacknowledgedStanzas.size());
1136                            unacknowledgedStanzas.drainTo(stanzasToResend);
1137                            for (Stanza stanza : stanzasToResend) {
1138                                sendStanzaInternal(stanza);
1139                            }
1140                            // If there where stanzas resent, then request a SM ack for them.
1141                            // Writer's sendStreamElement() won't do it automatically based on
1142                            // predicates.
1143                            if (!stanzasToResend.isEmpty()) {
1144                                requestSmAcknowledgementInternal();
1145                            }
1146                            LOGGER.fine("Stream Management (XEP-198): Stream resumed");
1147                            break;
1148                        case AckAnswer.ELEMENT:
1149                            AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser);
1150                            processHandledCount(ackAnswer.getHandledCount());
1151                            break;
1152                        case AckRequest.ELEMENT:
1153                            ParseStreamManagement.ackRequest(parser);
1154                            if (smEnabledSyncPoint.wasSuccessful()) {
1155                                sendSmAcknowledgementInternal();
1156                            } else {
1157                                LOGGER.warning("SM Ack Request received while SM is not enabled");
1158                            }
1159                            break;
1160                         default:
1161                             LOGGER.warning("Unknown top level stream element: " + name);
1162                             break;
1163                        }
1164                        break;
1165                    case XmlPullParser.END_TAG:
1166                        if (parser.getName().equals("stream")) {
1167                            // Disconnect the connection
1168                            disconnect();
1169                        }
1170                        break;
1171                    case XmlPullParser.END_DOCUMENT:
1172                        // END_DOCUMENT only happens in an error case, as otherwise we would see a
1173                        // closing stream element before.
1174                        throw new SmackException(
1175                                        "Parser got END_DOCUMENT event. This could happen e.g. if the server closed the connection without sending a closing stream element");
1176                    }
1177                    eventType = parser.next();
1178                }
1179            }
1180            catch (Exception e) {
1181                // The exception can be ignored if the the connection is 'done'
1182                // or if the it was caused because the socket got closed
1183                if (!(done || isSocketClosed())) {
1184                    // Close the connection and notify connection listeners of the
1185                    // error.
1186                    notifyConnectionError(e);
1187                }
1188            }
1189        }
1190    }
1191
1192    protected class PacketWriter {
1193        public static final int QUEUE_SIZE = XMPPTCPConnection.QUEUE_SIZE;
1194
1195        private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<Element>(
1196                        QUEUE_SIZE, true);
1197
1198        /**
1199         * Needs to be protected for unit testing purposes.
1200         */
1201        protected SynchronizationPoint<NoResponseException> shutdownDone = new SynchronizationPoint<NoResponseException>(
1202                        XMPPTCPConnection.this);
1203
1204        /**
1205         * If set, the stanza(/packet) writer is shut down
1206         */
1207        protected volatile Long shutdownTimestamp = null;
1208
1209        private volatile boolean instantShutdown;
1210
1211        /**
1212         * True if some preconditions are given to start the bundle and defer mechanism.
1213         * <p>
1214         * This will likely get set to true right after the start of the writer thread, because
1215         * {@link #nextStreamElement()} will check if {@link queue} is empty, which is probably the case, and then set
1216         * this field to true.
1217         * </p>
1218         */
1219        private boolean shouldBundleAndDefer;
1220
1221        /** 
1222        * Initializes the writer in order to be used. It is called at the first connection and also 
1223        * is invoked if the connection is disconnected by an error.
1224        */ 
1225        void init() {
1226            shutdownDone.init();
1227            shutdownTimestamp = null;
1228
1229            if (unacknowledgedStanzas != null) {
1230                // It's possible that there are new stanzas in the writer queue that
1231                // came in while we were disconnected but resumable, drain those into
1232                // the unacknowledged queue so that they get resent now
1233                drainWriterQueueToUnacknowledgedStanzas();
1234            }
1235
1236            queue.start();
1237            Async.go(new Runnable() {
1238                @Override
1239                public void run() {
1240                    writePackets();
1241                }
1242            }, "Smack Packet Writer (" + getConnectionCounter() + ")");
1243        }
1244
1245        private boolean done() {
1246            return shutdownTimestamp != null;
1247        }
1248
1249        protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException {
1250            if (done() && !isSmResumptionPossible()) {
1251                // Don't throw a NotConnectedException is there is an resumable stream available
1252                throw new NotConnectedException();
1253            }
1254        }
1255
1256        /**
1257         * Sends the specified element to the server.
1258         *
1259         * @param element the element to send.
1260         * @throws NotConnectedException 
1261         */
1262        protected void sendStreamElement(Element element) throws NotConnectedException {
1263            throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
1264
1265            boolean enqueued = false;
1266            while (!enqueued) {
1267                try {
1268                    queue.put(element);
1269                    enqueued = true;
1270                }
1271                catch (InterruptedException e) {
1272                    throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
1273                    // If the method above did not throw, then the sending thread was interrupted
1274                    // TODO in a later version of Smack the InterruptedException should be thrown to
1275                    // allow users to interrupt a sending thread that is currently blocking because
1276                    // the queue is full.
1277                    LOGGER.log(Level.WARNING, "Sending thread was interrupted", e);
1278                }
1279            }
1280        }
1281
1282        /**
1283         * Shuts down the stanza(/packet) writer. Once this method has been called, no further
1284         * packets will be written to the server.
1285         */
1286        void shutdown(boolean instant) {
1287            instantShutdown = instant;
1288            shutdownTimestamp = System.currentTimeMillis();
1289            queue.shutdown();
1290            try {
1291                shutdownDone.checkIfSuccessOrWait();
1292            }
1293            catch (NoResponseException e) {
1294                LOGGER.log(Level.WARNING, "shutdownDone was not marked as successful by the writer thread", e);
1295            }
1296        }
1297
1298        /**
1299         * Maybe return the next available element from the queue for writing. If the queue is shut down <b>or</b> a
1300         * spurious interrupt occurs, <code>null</code> is returned. So it is important to check the 'done' condition in
1301         * that case.
1302         *
1303         * @return the next element for writing or null.
1304         */
1305        private Element nextStreamElement() {
1306            // It is important the we check if the queue is empty before removing an element from it
1307            if (queue.isEmpty()) {
1308                shouldBundleAndDefer = true;
1309            }
1310            Element packet = null;
1311            try {
1312                packet = queue.take();
1313            }
1314            catch (InterruptedException e) {
1315                if (!queue.isShutdown()) {
1316                    // Users shouldn't try to interrupt the packet writer thread
1317                    LOGGER.log(Level.WARNING, "Packet writer thread was interrupted. Don't do that. Use disconnect() instead.", e);
1318                }
1319            }
1320            return packet;
1321        }
1322
1323        private void writePackets() {
1324            try {
1325                openStream();
1326                initalOpenStreamSend.reportSuccess();
1327                // Write out packets from the queue.
1328                while (!done()) {
1329                    Element element = nextStreamElement();
1330                    if (element == null) {
1331                        continue;
1332                    }
1333
1334                    // Get a local version of the bundle and defer callback, in case it's unset
1335                    // between the null check and the method invocation
1336                    final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback;
1337                    // If the preconditions are given (e.g. bundleAndDefer callback is set, queue is
1338                    // empty), then we could wait a bit for further stanzas attempting to decrease
1339                    // our energy consumption
1340                    if (localBundleAndDeferCallback != null && isAuthenticated() && shouldBundleAndDefer) {
1341                        // Reset shouldBundleAndDefer to false, nextStreamElement() will set it to true once the
1342                        // queue is empty again.
1343                        shouldBundleAndDefer = false;
1344                        final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean();
1345                        final int bundleAndDeferMillis = localBundleAndDeferCallback.getBundleAndDeferMillis(new BundleAndDefer(
1346                                        bundlingAndDeferringStopped));
1347                        if (bundleAndDeferMillis > 0) {
1348                            long remainingWait = bundleAndDeferMillis;
1349                            final long waitStart = System.currentTimeMillis();
1350                            synchronized (bundlingAndDeferringStopped) {
1351                                while (!bundlingAndDeferringStopped.get() && remainingWait > 0) {
1352                                    bundlingAndDeferringStopped.wait(remainingWait);
1353                                    remainingWait = bundleAndDeferMillis
1354                                                    - (System.currentTimeMillis() - waitStart);
1355                                }
1356                            }
1357                        }
1358                    }
1359
1360                    Stanza packet = null;
1361                    if (element instanceof Stanza) {
1362                        packet = (Stanza) element;
1363                    }
1364                    else if (element instanceof Enable) {
1365                        // The client needs to add messages to the unacknowledged stanzas queue
1366                        // right after it sent 'enabled'. Stanza will be added once
1367                        // unacknowledgedStanzas is not null.
1368                        unacknowledgedStanzas = new ArrayBlockingQueue<>(QUEUE_SIZE);
1369                    }
1370                    // Check if the stream element should be put to the unacknowledgedStanza
1371                    // queue. Note that we can not do the put() in sendStanzaInternal() and the
1372                    // packet order is not stable at this point (sendStanzaInternal() can be
1373                    // called concurrently).
1374                    if (unacknowledgedStanzas != null && packet != null) {
1375                        // If the unacknowledgedStanza queue is nearly full, request an new ack
1376                        // from the server in order to drain it
1377                        if (unacknowledgedStanzas.size() == 0.8 * XMPPTCPConnection.QUEUE_SIZE) {
1378                            writer.write(AckRequest.INSTANCE.toXML().toString());
1379                            writer.flush();
1380                        }
1381                        try {
1382                            // It is important the we put the stanza in the unacknowledged stanza
1383                            // queue before we put it on the wire
1384                            unacknowledgedStanzas.put(packet);
1385                        }
1386                        catch (InterruptedException e) {
1387                            throw new IllegalStateException(e);
1388                        }
1389                    }
1390                    writer.write(element.toXML().toString());
1391                    if (queue.isEmpty()) {
1392                        writer.flush();
1393                    }
1394                    if (packet != null) {
1395                        firePacketSendingListeners(packet);
1396                    }
1397                }
1398                if (!instantShutdown) {
1399                    // Flush out the rest of the queue.
1400                    try {
1401                        while (!queue.isEmpty()) {
1402                            Element packet = queue.remove();
1403                            writer.write(packet.toXML().toString());
1404                        }
1405                        writer.flush();
1406                    }
1407                    catch (Exception e) {
1408                        LOGGER.log(Level.WARNING,
1409                                        "Exception flushing queue during shutdown, ignore and continue",
1410                                        e);
1411                    }
1412
1413                    // Close the stream.
1414                    try {
1415                        writer.write("</stream:stream>");
1416                        writer.flush();
1417                    }
1418                    catch (Exception e) {
1419                        LOGGER.log(Level.WARNING, "Exception writing closing stream element", e);
1420                    }
1421                    // Delete the queue contents (hopefully nothing is left).
1422                    queue.clear();
1423                } else if (instantShutdown && isSmEnabled()) {
1424                    // This was an instantShutdown and SM is enabled, drain all remaining stanzas
1425                    // into the unacknowledgedStanzas queue
1426                    drainWriterQueueToUnacknowledgedStanzas();
1427                }
1428
1429                try {
1430                    writer.close();
1431                }
1432                catch (Exception e) {
1433                    // Do nothing
1434                }
1435
1436            }
1437            catch (Exception e) {
1438                // The exception can be ignored if the the connection is 'done'
1439                // or if the it was caused because the socket got closed
1440                if (!(done() || isSocketClosed())) {
1441                    notifyConnectionError(e);
1442                } else {
1443                    LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e);
1444                }
1445            } finally {
1446                LOGGER.fine("Reporting shutdownDone success in writer thread");
1447                shutdownDone.reportSuccess();
1448            }
1449        }
1450
1451        private void drainWriterQueueToUnacknowledgedStanzas() {
1452            List<Element> elements = new ArrayList<Element>(queue.size());
1453            queue.drainTo(elements);
1454            for (Element element : elements) {
1455                if (element instanceof Stanza) {
1456                    unacknowledgedStanzas.add((Stanza) element);
1457                }
1458            }
1459        }
1460    }
1461
1462    /**
1463     * Set if Stream Management should be used by default for new connections.
1464     * 
1465     * @param useSmDefault true to use Stream Management for new connections.
1466     */
1467    public static void setUseStreamManagementDefault(boolean useSmDefault) {
1468        XMPPTCPConnection.useSmDefault = useSmDefault;
1469    }
1470
1471    /**
1472     * Set if Stream Management resumption should be used by default for new connections.
1473     * 
1474     * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
1475     * @deprecated use {@link #setUseStreamManagementResumptionDefault(boolean)} instead.
1476     */
1477    @Deprecated
1478    public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) {
1479        setUseStreamManagementResumptionDefault(useSmResumptionDefault);
1480    }
1481
1482    /**
1483     * Set if Stream Management resumption should be used by default for new connections.
1484     *
1485     * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
1486     */
1487    public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) {
1488        if (useSmResumptionDefault) {
1489            // Also enable SM is resumption is enabled
1490            setUseStreamManagementDefault(useSmResumptionDefault);
1491        }
1492        XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault;
1493    }
1494
1495    /**
1496     * Set if Stream Management should be used if supported by the server.
1497     * 
1498     * @param useSm true to use Stream Management.
1499     */
1500    public void setUseStreamManagement(boolean useSm) {
1501        this.useSm = useSm;
1502    }
1503
1504    /**
1505     * Set if Stream Management resumption should be used if supported by the server.
1506     *
1507     * @param useSmResumption true to use Stream Management resumption.
1508     */
1509    public void setUseStreamManagementResumption(boolean useSmResumption) {
1510        if (useSmResumption) {
1511            // Also enable SM is resumption is enabled
1512            setUseStreamManagement(useSmResumption);
1513        }
1514        this.useSmResumption = useSmResumption;
1515    }
1516
1517    /**
1518     * Set the preferred resumption time in seconds.
1519     * @param resumptionTime the preferred resumption time in seconds
1520     */
1521    public void setPreferredResumptionTime(int resumptionTime) {
1522        smClientMaxResumptionTime = resumptionTime;
1523    }
1524
1525    /**
1526     * Add a predicate for Stream Management acknowledgment requests.
1527     * <p>
1528     * Those predicates are used to determine when a Stream Management acknowledgement request is send to the server.
1529     * Some pre-defined predicates are found in the <code>org.jivesoftware.smack.sm.predicates</code> package.
1530     * </p>
1531     * <p>
1532     * If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used.
1533     * </p>
1534     * 
1535     * @param predicate the predicate to add.
1536     * @return if the predicate was not already active.
1537     */
1538    public boolean addRequestAckPredicate(StanzaFilter predicate) {
1539        synchronized (requestAckPredicates) {
1540            return requestAckPredicates.add(predicate);
1541        }
1542    }
1543
1544    /**
1545     * Remove the given predicate for Stream Management acknowledgment request.
1546     * @param predicate the predicate to remove.
1547     * @return true if the predicate was removed.
1548     */
1549    public boolean removeRequestAckPredicate(StanzaFilter predicate) {
1550        synchronized (requestAckPredicates) {
1551            return requestAckPredicates.remove(predicate);
1552        }
1553    }
1554
1555    /**
1556     * Remove all predicates for Stream Management acknowledgment requests.
1557     */
1558    public void removeAllRequestAckPredicates() {
1559        synchronized (requestAckPredicates) {
1560            requestAckPredicates.clear();
1561        }
1562    }
1563
1564    /**
1565     * Send an unconditional Stream Management acknowledgement request to the server.
1566     *
1567     * @throws StreamManagementNotEnabledException if Stream Mangement is not enabled.
1568     * @throws NotConnectedException if the connection is not connected.
1569     */
1570    public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException {
1571        if (!isSmEnabled()) {
1572            throw new StreamManagementException.StreamManagementNotEnabledException();
1573        }
1574        requestSmAcknowledgementInternal();
1575    }
1576
1577    private void requestSmAcknowledgementInternal() throws NotConnectedException {
1578        packetWriter.sendStreamElement(AckRequest.INSTANCE);
1579    }
1580
1581    /**
1582     * Send a unconditional Stream Management acknowledgment to the server.
1583     * <p>
1584     * See <a href="http://xmpp.org/extensions/xep-0198.html#acking">XEP-198: Stream Management ยง 4. Acks</a>:
1585     * "Either party MAY send an <a/> element at any time (e.g., after it has received a certain number of stanzas,
1586     * or after a certain period of time), even if it has not received an <r/> element from the other party."
1587     * </p>
1588     * 
1589     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1590     * @throws NotConnectedException if the connection is not connected.
1591     */
1592    public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException {
1593        if (!isSmEnabled()) {
1594            throw new StreamManagementException.StreamManagementNotEnabledException();
1595        }
1596        sendSmAcknowledgementInternal();
1597    }
1598
1599    private void sendSmAcknowledgementInternal() throws NotConnectedException {
1600        packetWriter.sendStreamElement(new AckAnswer(clientHandledStanzasCount));
1601    }
1602
1603    /**
1604     * Add a Stanza acknowledged listener.
1605     * <p>
1606     * Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get
1607     * automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, StanzaListener)} when
1608     * possible.
1609     * </p>
1610     * 
1611     * @param listener the listener to add.
1612     */
1613    public void addStanzaAcknowledgedListener(StanzaListener listener) {
1614        stanzaAcknowledgedListeners.add(listener);
1615    }
1616
1617    /**
1618     * Remove the given Stanza acknowledged listener.
1619     *
1620     * @param listener the listener.
1621     * @return true if the listener was removed.
1622     */
1623    public boolean removeStanzaAcknowledgedListener(StanzaListener listener) {
1624        return stanzaAcknowledgedListeners.remove(listener);
1625    }
1626
1627    /**
1628     * Remove all stanza acknowledged listeners.
1629     */
1630    public void removeAllStanzaAcknowledgedListeners() {
1631        stanzaAcknowledgedListeners.clear();
1632    }
1633
1634    /**
1635     * Add a new Stanza ID acknowledged listener for the given ID.
1636     * <p>
1637     * The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will
1638     * automatically be removed after the listener was run.
1639     * </p>
1640     * 
1641     * @param id the stanza ID.
1642     * @param listener the listener to invoke.
1643     * @return the previous listener for this stanza ID or null.
1644     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1645     */
1646    public StanzaListener addStanzaIdAcknowledgedListener(final String id, StanzaListener listener) throws StreamManagementNotEnabledException {
1647        // Prevent users from adding callbacks that will never get removed
1648        if (!smWasEnabledAtLeastOnce) {
1649            throw new StreamManagementException.StreamManagementNotEnabledException();
1650        }
1651        // Remove the listener after max. 12 hours
1652        final int removeAfterSeconds = Math.min(getMaxSmResumptionTime(), 12 * 60 * 60);
1653        schedule(new Runnable() {
1654            @Override
1655            public void run() {
1656                stanzaIdAcknowledgedListeners.remove(id);
1657            }
1658        }, removeAfterSeconds, TimeUnit.SECONDS);
1659        return stanzaIdAcknowledgedListeners.put(id, listener);
1660    }
1661
1662    /**
1663     * Remove the Stanza ID acknowledged listener for the given ID.
1664     * 
1665     * @param id the stanza ID.
1666     * @return true if the listener was found and removed, false otherwise.
1667     */
1668    public StanzaListener removeStanzaIdAcknowledgedListener(String id) {
1669        return stanzaIdAcknowledgedListeners.remove(id);
1670    }
1671
1672    /**
1673     * Removes all Stanza ID acknowledged listeners.
1674     */
1675    public void removeAllStanzaIdAcknowledgedListeners() {
1676        stanzaIdAcknowledgedListeners.clear();
1677    }
1678
1679    /**
1680     * Returns true if Stream Management is supported by the server.
1681     *
1682     * @return true if Stream Management is supported by the server.
1683     */
1684    public boolean isSmAvailable() {
1685        return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE);
1686    }
1687
1688    /**
1689     * Returns true if Stream Management was successfully negotiated with the server.
1690     *
1691     * @return true if Stream Management was negotiated.
1692     */
1693    public boolean isSmEnabled() {
1694        return smEnabledSyncPoint.wasSuccessful();
1695    }
1696
1697    /**
1698     * Returns true if the stream was successfully resumed with help of Stream Management.
1699     * 
1700     * @return true if the stream was resumed.
1701     */
1702    public boolean streamWasResumed() {
1703        return smResumedSyncPoint.wasSuccessful();
1704    }
1705
1706    /**
1707     * Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible.
1708     * 
1709     * @return true if disconnected but resumption possible.
1710     */
1711    public boolean isDisconnectedButSmResumptionPossible() {
1712        return disconnectedButResumeable && isSmResumptionPossible();
1713    }
1714
1715    /**
1716     * Returns true if the stream is resumable.
1717     *
1718     * @return true if the stream is resumable.
1719     */
1720    public boolean isSmResumptionPossible() {
1721        // There is no resumable stream available
1722        if (smSessionId == null)
1723            return false;
1724
1725        final Long shutdownTimestamp = packetWriter.shutdownTimestamp;
1726        // Seems like we are already reconnected, report true
1727        if (shutdownTimestamp == null) {
1728            return true;
1729        }
1730
1731        // See if resumption time is over
1732        long current = System.currentTimeMillis();
1733        long maxResumptionMillies = ((long) getMaxSmResumptionTime()) * 1000;
1734        if (current > shutdownTimestamp + maxResumptionMillies) {
1735            // Stream resumption is *not* possible if the current timestamp is greater then the greatest timestamp where
1736            // resumption is possible
1737            return false;
1738        } else {
1739            return true;
1740        }
1741    }
1742
1743    /**
1744     * Drop the stream management state. Sets {@link #smSessionId} and
1745     * {@link #unacknowledgedStanzas} to <code>null</code>.
1746     */
1747    private void dropSmState() {
1748        // clientHandledCount and serverHandledCount will be reset on <enable/> and <enabled/>
1749        // respective. No need to reset them here.
1750        smSessionId = null;
1751        unacknowledgedStanzas = null;
1752    }
1753
1754    /**
1755     * Get the maximum resumption time in seconds after which a managed stream can be resumed.
1756     * <p>
1757     * This method will return {@link Integer#MAX_VALUE} if neither the client nor the server specify a maximum
1758     * resumption time. Be aware of integer overflows when using this value, e.g. do not add arbitrary values to it
1759     * without checking for overflows before.
1760     * </p>
1761     *
1762     * @return the maximum resumption time in seconds or {@link Integer#MAX_VALUE} if none set.
1763     */
1764    public int getMaxSmResumptionTime() {
1765        int clientResumptionTime = smClientMaxResumptionTime > 0 ? smClientMaxResumptionTime : Integer.MAX_VALUE;
1766        int serverResumptionTime = smServerMaxResumptimTime > 0 ? smServerMaxResumptimTime : Integer.MAX_VALUE;
1767        return Math.min(clientResumptionTime, serverResumptionTime);
1768    }
1769
1770    private void processHandledCount(long handledCount) throws StreamManagementCounterError {
1771        long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount);
1772        final List<Stanza> ackedStanzas = new ArrayList<Stanza>(
1773                        ackedStanzasCount <= Integer.MAX_VALUE ? (int) ackedStanzasCount
1774                                        : Integer.MAX_VALUE);
1775        for (long i = 0; i < ackedStanzasCount; i++) {
1776            Stanza ackedStanza = unacknowledgedStanzas.poll();
1777            // If the server ack'ed a stanza, then it must be in the
1778            // unacknowledged stanza queue. There can be no exception.
1779            if (ackedStanza == null) {
1780                throw new StreamManagementCounterError(handledCount, serverHandledStanzasCount,
1781                                ackedStanzasCount, ackedStanzas);
1782            }
1783            ackedStanzas.add(ackedStanza);
1784        }
1785
1786        boolean atLeastOneStanzaAcknowledgedListener = false;
1787        if (!stanzaAcknowledgedListeners.isEmpty()) {
1788            // If stanzaAcknowledgedListeners is not empty, the we have at least one
1789            atLeastOneStanzaAcknowledgedListener = true;
1790        }
1791        else {
1792            // Otherwise we look for a matching id in the stanza *id* acknowledged listeners
1793            for (Stanza ackedStanza : ackedStanzas) {
1794                String id = ackedStanza.getStanzaId();
1795                if (id != null && stanzaIdAcknowledgedListeners.containsKey(id)) {
1796                    atLeastOneStanzaAcknowledgedListener = true;
1797                    break;
1798                }
1799            }
1800        }
1801
1802        // Only spawn a new thread if there is a chance that some listener is invoked
1803        if (atLeastOneStanzaAcknowledgedListener) {
1804            asyncGo(new Runnable() {
1805                @Override
1806                public void run() {
1807                    for (Stanza ackedStanza : ackedStanzas) {
1808                        for (StanzaListener listener : stanzaAcknowledgedListeners) {
1809                            try {
1810                                listener.processPacket(ackedStanza);
1811                            }
1812                            catch (NotConnectedException e) {
1813                                LOGGER.log(Level.FINER, "Received not connected exception", e);
1814                            }
1815                        }
1816                        String id = ackedStanza.getStanzaId();
1817                        if (StringUtils.isNullOrEmpty(id)) {
1818                            continue;
1819                        }
1820                        StanzaListener listener = stanzaIdAcknowledgedListeners.remove(id);
1821                        if (listener != null) {
1822                            try {
1823                                listener.processPacket(ackedStanza);
1824                            }
1825                            catch (NotConnectedException e) {
1826                                LOGGER.log(Level.FINER, "Received not connected exception", e);
1827                            }
1828                        }
1829                    }
1830                }
1831            });
1832        }
1833
1834        serverHandledStanzasCount = handledCount;
1835    }
1836
1837    /**
1838     * Set the default bundle and defer callback used for new connections.
1839     *
1840     * @param defaultBundleAndDeferCallback
1841     * @see BundleAndDeferCallback
1842     * @since 4.1
1843     */
1844    public static void setDefaultBundleAndDeferCallback(BundleAndDeferCallback defaultBundleAndDeferCallback) {
1845        XMPPTCPConnection.defaultBundleAndDeferCallback = defaultBundleAndDeferCallback;
1846    }
1847
1848    /**
1849     * Set the bundle and defer callback used for this connection.
1850     * <p>
1851     * You can use <code>null</code> as argument to reset the callback. Outgoing stanzas will then
1852     * no longer get deferred.
1853     * </p>
1854     *
1855     * @param bundleAndDeferCallback the callback or <code>null</code>.
1856     * @see BundleAndDeferCallback
1857     * @since 4.1
1858     */
1859    public void setBundleandDeferCallback(BundleAndDeferCallback bundleAndDeferCallback) {
1860        this.bundleAndDeferCallback = bundleAndDeferCallback;
1861    }
1862
1863}