001/**
002 *
003 * Copyright 2003-2007 Jive Software.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack.tcp;
018
019import org.jivesoftware.smack.AbstractXMPPConnection;
020import org.jivesoftware.smack.ConnectionConfiguration;
021import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
022import org.jivesoftware.smack.ConnectionCreationListener;
023import org.jivesoftware.smack.StanzaListener;
024import org.jivesoftware.smack.SmackConfiguration;
025import org.jivesoftware.smack.SmackException;
026import org.jivesoftware.smack.SmackException.AlreadyConnectedException;
027import org.jivesoftware.smack.SmackException.AlreadyLoggedInException;
028import org.jivesoftware.smack.SmackException.NoResponseException;
029import org.jivesoftware.smack.SmackException.NotConnectedException;
030import org.jivesoftware.smack.SmackException.ConnectionException;
031import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException;
032import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException;
033import org.jivesoftware.smack.SmackException.SecurityRequiredException;
034import org.jivesoftware.smack.SynchronizationPoint;
035import org.jivesoftware.smack.XMPPException.StreamErrorException;
036import org.jivesoftware.smack.XMPPConnection;
037import org.jivesoftware.smack.XMPPException;
038import org.jivesoftware.smack.XMPPException.XMPPErrorException;
039import org.jivesoftware.smack.compress.packet.Compressed;
040import org.jivesoftware.smack.compression.XMPPInputOutputStream;
041import org.jivesoftware.smack.filter.StanzaFilter;
042import org.jivesoftware.smack.compress.packet.Compress;
043import org.jivesoftware.smack.packet.Element;
044import org.jivesoftware.smack.packet.IQ;
045import org.jivesoftware.smack.packet.Message;
046import org.jivesoftware.smack.packet.StreamOpen;
047import org.jivesoftware.smack.packet.Stanza;
048import org.jivesoftware.smack.packet.Presence;
049import org.jivesoftware.smack.packet.StartTls;
050import org.jivesoftware.smack.sasl.packet.SaslStreamElements;
051import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Challenge;
052import org.jivesoftware.smack.sasl.packet.SaslStreamElements.SASLFailure;
053import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Success;
054import org.jivesoftware.smack.sm.SMUtils;
055import org.jivesoftware.smack.sm.StreamManagementException;
056import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException;
057import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError;
058import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException;
059import org.jivesoftware.smack.sm.packet.StreamManagement;
060import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer;
061import org.jivesoftware.smack.sm.packet.StreamManagement.AckRequest;
062import org.jivesoftware.smack.sm.packet.StreamManagement.Enable;
063import org.jivesoftware.smack.sm.packet.StreamManagement.Enabled;
064import org.jivesoftware.smack.sm.packet.StreamManagement.Failed;
065import org.jivesoftware.smack.sm.packet.StreamManagement.Resume;
066import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed;
067import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature;
068import org.jivesoftware.smack.sm.predicates.Predicate;
069import org.jivesoftware.smack.sm.provider.ParseStreamManagement;
070import org.jivesoftware.smack.packet.PlainStreamElement;
071import org.jivesoftware.smack.packet.XMPPError;
072import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
073import org.jivesoftware.smack.util.Async;
074import org.jivesoftware.smack.util.PacketParserUtils;
075import org.jivesoftware.smack.util.StringUtils;
076import org.jivesoftware.smack.util.TLSUtils;
077import org.jivesoftware.smack.util.dns.HostAddress;
078import org.jxmpp.util.XmppStringUtils;
079import org.xmlpull.v1.XmlPullParser;
080import org.xmlpull.v1.XmlPullParserException;
081
082import javax.net.SocketFactory;
083import javax.net.ssl.HostnameVerifier;
084import javax.net.ssl.KeyManager;
085import javax.net.ssl.KeyManagerFactory;
086import javax.net.ssl.SSLContext;
087import javax.net.ssl.SSLSocket;
088import javax.security.auth.callback.Callback;
089import javax.security.auth.callback.PasswordCallback;
090
091import java.io.BufferedReader;
092import java.io.ByteArrayInputStream;
093import java.io.FileInputStream;
094import java.io.IOException;
095import java.io.InputStream;
096import java.io.InputStreamReader;
097import java.io.OutputStream;
098import java.io.OutputStreamWriter;
099import java.io.Writer;
100import java.lang.reflect.Constructor;
101import java.net.InetAddress;
102import java.net.InetSocketAddress;
103import java.net.Socket;
104import java.net.UnknownHostException;
105import java.security.KeyManagementException;
106import java.security.KeyStore;
107import java.security.KeyStoreException;
108import java.security.NoSuchAlgorithmException;
109import java.security.NoSuchProviderException;
110import java.security.Provider;
111import java.security.Security;
112import java.security.UnrecoverableKeyException;
113import java.security.cert.CertificateException;
114import java.util.ArrayList;
115import java.util.Arrays;
116import java.util.Collection;
117import java.util.Iterator;
118import java.util.LinkedHashSet;
119import java.util.LinkedList;
120import java.util.List;
121import java.util.Map;
122import java.util.Set;
123import java.util.concurrent.ArrayBlockingQueue;
124import java.util.concurrent.BlockingQueue;
125import java.util.concurrent.ConcurrentHashMap;
126import java.util.concurrent.ConcurrentLinkedQueue;
127import java.util.concurrent.TimeUnit;
128import java.util.concurrent.atomic.AtomicBoolean;
129import java.util.logging.Level;
130import java.util.logging.Logger;
131
132/**
133 * Creates a socket connection to an XMPP server. This is the default connection
134 * to an XMPP server and is specified in the XMPP Core (RFC 6120).
135 * 
136 * @see XMPPConnection
137 * @author Matt Tucker
138 */
139public class XMPPTCPConnection extends AbstractXMPPConnection {
140
141    private static final int QUEUE_SIZE = 500;
142    private static final Logger LOGGER = Logger.getLogger(XMPPTCPConnection.class.getName());
143
144    /**
145     * The socket which is used for this connection.
146     */
147    private Socket socket;
148
149    /**
150     * 
151     */
152    private boolean disconnectedButResumeable = false;
153
154    /**
155     * Flag to indicate if the socket was closed intentionally by Smack.
156     * <p>
157     * This boolean flag is used concurrently, therefore it is marked volatile.
158     * </p>
159     */
160    private volatile boolean socketClosed = false;
161
162    private boolean usingTLS = false;
163
164    /**
165     * Protected access level because of unit test purposes
166     */
167    protected PacketWriter packetWriter;
168
169    /**
170     * Protected access level because of unit test purposes
171     */
172    protected PacketReader packetReader;
173
174    private final SynchronizationPoint<Exception> initalOpenStreamSend = new SynchronizationPoint<Exception>(this);
175
176    /**
177     * 
178     */
179    private final SynchronizationPoint<XMPPException> maybeCompressFeaturesReceived = new SynchronizationPoint<XMPPException>(
180                    this);
181
182    /**
183     * 
184     */
185    private final SynchronizationPoint<XMPPException> compressSyncPoint = new SynchronizationPoint<XMPPException>(
186                    this);
187
188    /**
189     * The default bundle and defer callback, used for new connections.
190     * @see bundleAndDeferCallback
191     */
192    private static BundleAndDeferCallback defaultBundleAndDeferCallback;
193
194    /**
195     * The used bundle and defer callback.
196     * <p>
197     * Although this field may be set concurrently, the 'volatile' keyword was deliberately not added, in order to avoid
198     * having a 'volatile' read within the writer threads loop.
199     * </p>
200     */
201    private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback;
202
203    private static boolean useSmDefault = false;
204
205    private static boolean useSmResumptionDefault = true;
206
207    /**
208     * The stream ID of the stream that is currently resumable, ie. the stream we hold the state
209     * for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and
210     * {@link #unacknowledgedStanzas}.
211     */
212    private String smSessionId;
213
214    private final SynchronizationPoint<XMPPException> smResumedSyncPoint = new SynchronizationPoint<XMPPException>(
215                    this);
216
217    private final SynchronizationPoint<XMPPException> smEnabledSyncPoint = new SynchronizationPoint<XMPPException>(
218                    this);
219
220    /**
221     * The client's preferred maximum resumption time in seconds.
222     */
223    private int smClientMaxResumptionTime = -1;
224
225    /**
226     * The server's preferred maximum resumption time in seconds.
227     */
228    private int smServerMaxResumptimTime = -1;
229
230    /**
231     * Indicates whether Stream Management (XEP-198) should be used if it's supported by the server.
232     */
233    private boolean useSm = useSmDefault;
234    private boolean useSmResumption = useSmResumptionDefault;
235
236    /**
237     * The counter that the server sends the client about it's current height. For example, if the server sends
238     * {@code <a h='42'/>}, then this will be set to 42 (while also handling the {@link #unacknowledgedStanzas} queue).
239     */
240    private long serverHandledStanzasCount = 0;
241
242    /**
243     * The counter for stanzas handled ("received") by the client.
244     * <p>
245     * Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are
246     * not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since
247     * {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and
248     * therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic.
249     * </p>
250     */
251    private long clientHandledStanzasCount = 0;
252
253    private BlockingQueue<Stanza> unacknowledgedStanzas;
254
255    /**
256     * Set to true if Stream Management was at least once enabled for this connection.
257     */
258    private boolean smWasEnabledAtLeastOnce = false;
259
260    /**
261     * This listeners are invoked for every stanza that got acknowledged.
262     * <p>
263     * We use a {@link ConccurrentLinkedQueue} here in order to allow the listeners to remove
264     * themselves after they have been invoked.
265     * </p>
266     */
267    private final Collection<StanzaListener> stanzaAcknowledgedListeners = new ConcurrentLinkedQueue<StanzaListener>();
268
269    /**
270     * This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will
271     * only be invoked once and automatically removed after that.
272     */
273    private final Map<String, StanzaListener> stanzaIdAcknowledgedListeners = new ConcurrentHashMap<String, StanzaListener>();
274
275    /**
276     * Predicates that determine if an stream management ack should be requested from the server.
277     * <p>
278     * We use a linked hash set here, so that the order how the predicates are added matches the
279     * order in which they are invoked in order to determine if an ack request should be send or not.
280     * </p>
281     */
282    private final Set<StanzaFilter> requestAckPredicates = new LinkedHashSet<StanzaFilter>();
283
284    private final XMPPTCPConnectionConfiguration config;
285
286    /**
287     * Creates a new XMPP connection over TCP (optionally using proxies).
288     * <p>
289     * Note that XMPPTCPConnection constructors do not establish a connection to the server
290     * and you must call {@link #connect()}.
291     * </p>
292     *
293     * @param config the connection configuration.
294     */
295    public XMPPTCPConnection(XMPPTCPConnectionConfiguration config) {
296        super(config);
297        this.config = config;
298    }
299
300    /**
301     * Creates a new XMPP connection over TCP.
302     * <p>
303     * Note that {@code jid} must be the bare JID, e.g. "user@example.org". More fine-grained control over the
304     * connection settings is available using the {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)}
305     * constructor.
306     * </p>
307     * 
308     * @param jid the bare JID used by the client.
309     * @param password the password or authentication token.
310     */
311    public XMPPTCPConnection(CharSequence jid, String password) {
312        this(XmppStringUtils.parseLocalpart(jid.toString()), password, XmppStringUtils.parseDomain(jid.toString()));
313    }
314
315    /**
316     * Creates a new XMPP connection over TCP.
317     * <p>
318     * This is the simplest constructor for connecting to an XMPP server. Alternatively,
319     * you can get fine-grained control over connection settings using the
320     * {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} constructor.
321     * </p>
322     * @param username
323     * @param password
324     * @param serviceName
325     */
326    public XMPPTCPConnection(CharSequence username, String password, String serviceName) {
327        this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setServiceName(
328                                        serviceName).build());
329    }
330
331    @Override
332    protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException {
333        if (packetWriter == null) {
334            throw new NotConnectedException();
335        }
336        packetWriter.throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
337    }
338
339    @Override
340    protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException {
341        if (isConnected() && !disconnectedButResumeable) {
342            throw new AlreadyConnectedException();
343        }
344    }
345
346    @Override
347    protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException {
348        if (isAuthenticated() && !disconnectedButResumeable) {
349            throw new AlreadyLoggedInException();
350        }
351    }
352
353    @Override
354    protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException {
355        // Reset the flag in case it was set
356        disconnectedButResumeable = false;
357        super.afterSuccessfulLogin(resumed);
358    }
359
360    @Override
361    protected synchronized void loginNonAnonymously(String username, String password, String resource) throws XMPPException, SmackException, IOException {
362        if (saslAuthentication.hasNonAnonymousAuthentication()) {
363            // Authenticate using SASL
364            if (password != null) {
365                saslAuthentication.authenticate(username, password, resource);
366            }
367            else {
368                saslAuthentication.authenticate(resource, config.getCallbackHandler());
369            }
370        } else {
371            throw new SmackException("No non-anonymous SASL authentication mechanism available");
372        }
373
374        // If compression is enabled then request the server to use stream compression. XEP-170
375        // recommends to perform stream compression before resource binding.
376        if (config.isCompressionEnabled()) {
377            useCompression();
378        }
379
380        if (isSmResumptionPossible()) {
381            smResumedSyncPoint.sendAndWaitForResponse(new Resume(clientHandledStanzasCount, smSessionId));
382            if (smResumedSyncPoint.wasSuccessful()) {
383                // We successfully resumed the stream, be done here
384                afterSuccessfulLogin(true);
385                return;
386            }
387            // SM resumption failed, what Smack does here is to report success of
388            // lastFeaturesReceived in case of sm resumption was answered with 'failed' so that
389            // normal resource binding can be tried.
390            LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process");
391        }
392
393        bindResourceAndEstablishSession(resource);
394
395        List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>();
396        if (unacknowledgedStanzas != null) {
397            // There was a previous connection with SM enabled but that was either not resumable or
398            // failed to resume. Make sure that we (re-)send the unacknowledged stanzas.
399            unacknowledgedStanzas.drainTo(previouslyUnackedStanzas);
400            // Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this
401            // XMPP session (There maybe was an enabled in a previous XMPP session of this
402            // connection instance though). This is used in writePackets to decide if stanzas should
403            // be added to the unacknowledged stanzas queue, because they have to be added right
404            // after the 'enable' stream element has been sent.
405            unacknowledgedStanzas = null;
406        }
407        if (isSmAvailable() && useSm) {
408            // Remove what is maybe left from previously stream managed sessions
409            serverHandledStanzasCount = 0;
410            // XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed'
411            // then this is a non recoverable error and we therefore throw an exception.
412            smEnabledSyncPoint.sendAndWaitForResponseOrThrow(new Enable(useSmResumption, smClientMaxResumptionTime));
413            synchronized (requestAckPredicates) {
414                if (requestAckPredicates.isEmpty()) {
415                    // Assure that we have at lest one predicate set up that so that we request acks
416                    // for the server and eventually flush some stanzas from the unacknowledged
417                    // stanza queue
418                    requestAckPredicates.add(Predicate.forMessagesOrAfter5Stanzas());
419                }
420            }
421        }
422        // (Re-)send the stanzas *after* we tried to enable SM
423        for (Stanza stanza : previouslyUnackedStanzas) {
424            sendStanzaInternal(stanza);
425        }
426
427        afterSuccessfulLogin(false);
428    }
429
430    @Override
431    public synchronized void loginAnonymously() throws XMPPException, SmackException, IOException {
432        // Wait with SASL auth until the SASL mechanisms have been received
433        saslFeatureReceived.checkIfSuccessOrWaitOrThrow();
434
435        if (saslAuthentication.hasAnonymousAuthentication()) {
436            saslAuthentication.authenticateAnonymously();
437        }
438        else {
439            throw new SmackException("No anonymous SASL authentication mechanism available");
440        }
441
442        // If compression is enabled then request the server to use stream compression
443        if (config.isCompressionEnabled()) {
444            useCompression();
445        }
446
447        bindResourceAndEstablishSession(null);
448
449        afterSuccessfulLogin(false);
450    }
451
452    @Override
453    public boolean isSecureConnection() {
454        return usingTLS;
455    }
456
457    public boolean isSocketClosed() {
458        return socketClosed;
459    }
460
461    /**
462     * Shuts the current connection down. After this method returns, the connection must be ready
463     * for re-use by connect.
464     */
465    @Override
466    protected void shutdown() {
467        if (isSmEnabled()) {
468            try {
469                // Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM
470                // state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either.
471                sendSmAcknowledgementInternal();
472            } catch (NotConnectedException e) {
473                LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e);
474            }
475        }
476        shutdown(false);
477    }
478
479    /**
480     * Performs an unclean disconnect and shutdown of the connection. Does not send a closing stream stanza.
481     */
482    public synchronized void instantShutdown() {
483        shutdown(true);
484    }
485
486    private void shutdown(boolean instant) {
487        if (disconnectedButResumeable) {
488            return;
489        }
490        if (packetReader != null) {
491                packetReader.shutdown();
492        }
493        if (packetWriter != null) {
494                packetWriter.shutdown(instant);
495        }
496
497        // Set socketClosed to true. This will cause the PacketReader
498        // and PacketWriter to ignore any Exceptions that are thrown
499        // because of a read/write from/to a closed stream.
500        // It is *important* that this is done before socket.close()!
501        socketClosed = true;
502        try {
503                socket.close();
504        } catch (Exception e) {
505                LOGGER.log(Level.WARNING, "shutdown", e);
506        }
507
508        setWasAuthenticated();
509        // If we are able to resume the stream, then don't set
510        // connected/authenticated/usingTLS to false since we like behave like we are still
511        // connected (e.g. sendStanza should not throw a NotConnectedException).
512        if (isSmResumptionPossible() && instant) {
513            disconnectedButResumeable = true;
514        } else {
515            disconnectedButResumeable = false;
516            // Reset the stream management session id to null, since if the stream is cleanly closed, i.e. sending a closing
517            // stream tag, there is no longer a stream to resume.
518            smSessionId = null;
519        }
520        authenticated = false;
521        connected = false;
522        usingTLS = false;
523        reader = null;
524        writer = null;
525
526        maybeCompressFeaturesReceived.init();
527        compressSyncPoint.init();
528        smResumedSyncPoint.init();
529        smEnabledSyncPoint.init();
530        initalOpenStreamSend.init();
531    }
532
533    @Override
534    public void send(PlainStreamElement element) throws NotConnectedException {
535        packetWriter.sendStreamElement(element);
536    }
537
538    @Override
539    protected void sendStanzaInternal(Stanza packet) throws NotConnectedException {
540        packetWriter.sendStreamElement(packet);
541        if (isSmEnabled()) {
542            for (StanzaFilter requestAckPredicate : requestAckPredicates) {
543                if (requestAckPredicate.accept(packet)) {
544                    requestSmAcknowledgementInternal();
545                    break;
546                }
547            }
548        }
549    }
550
551    private void connectUsingConfiguration() throws IOException, ConnectionException {
552        List<HostAddress> failedAddresses = populateHostAddresses();
553        SocketFactory socketFactory = config.getSocketFactory();
554        if (socketFactory == null) {
555            socketFactory = SocketFactory.getDefault();
556        }
557        for (HostAddress hostAddress : hostAddresses) {
558            String host = hostAddress.getFQDN();
559            int port = hostAddress.getPort();
560            socket = socketFactory.createSocket();
561            try {
562                Iterator<InetAddress> inetAddresses = Arrays.asList(InetAddress.getAllByName(host)).iterator();
563                if (!inetAddresses.hasNext()) {
564                    // This should not happen
565                    LOGGER.warning("InetAddress.getAllByName() returned empty result array.");
566                    throw new UnknownHostException(host);
567                }
568                innerloop: while (inetAddresses.hasNext()) {
569                    final InetAddress inetAddress = inetAddresses.next();
570                    final String inetAddressAndPort = inetAddress + " at port " + port;
571                    LOGGER.finer("Trying to establish TCP connection to " + inetAddressAndPort);
572                    try {
573                        socket.connect(new InetSocketAddress(inetAddress, port), config.getConnectTimeout());
574                    } catch (Exception e) {
575                        if (inetAddresses.hasNext()) {
576                            continue innerloop;
577                        } else {
578                            throw e;
579                        }
580                    }
581                    LOGGER.finer("Established TCP connection to " + inetAddressAndPort);
582                    // We found a host to connect to, return here
583                    this.host = host;
584                    this.port = port;
585                    return;
586                }
587            }
588            catch (Exception e) {
589                hostAddress.setException(e);
590                failedAddresses.add(hostAddress);
591            }
592        }
593        // There are no more host addresses to try
594        // throw an exception and report all tried
595        // HostAddresses in the exception
596        throw ConnectionException.from(failedAddresses);
597    }
598
599    /**
600     * Initializes the connection by creating a stanza(/packet) reader and writer and opening a
601     * XMPP stream to the server.
602     *
603     * @throws XMPPException if establishing a connection to the server fails.
604     * @throws SmackException if the server failes to respond back or if there is anther error.
605     * @throws IOException 
606     */
607    private void initConnection() throws IOException {
608        boolean isFirstInitialization = packetReader == null || packetWriter == null;
609        compressionHandler = null;
610
611        // Set the reader and writer instance variables
612        initReaderAndWriter();
613
614        if (isFirstInitialization) {
615            packetWriter = new PacketWriter();
616            packetReader = new PacketReader();
617
618            // If debugging is enabled, we should start the thread that will listen for
619            // all packets and then log them.
620            if (config.isDebuggerEnabled()) {
621                addAsyncStanzaListener(debugger.getReaderListener(), null);
622                if (debugger.getWriterListener() != null) {
623                    addPacketSendingListener(debugger.getWriterListener(), null);
624                }
625            }
626        }
627        // Start the packet writer. This will open an XMPP stream to the server
628        packetWriter.init();
629        // Start the packet reader. The startup() method will block until we
630        // get an opening stream packet back from server
631        packetReader.init();
632
633        if (isFirstInitialization) {
634            // Notify listeners that a new connection has been established
635            for (ConnectionCreationListener listener : getConnectionCreationListeners()) {
636                listener.connectionCreated(this);
637            }
638        }
639    }
640
641    private void initReaderAndWriter() throws IOException {
642        InputStream is = socket.getInputStream();
643        OutputStream os = socket.getOutputStream();
644        if (compressionHandler != null) {
645            is = compressionHandler.getInputStream(is);
646            os = compressionHandler.getOutputStream(os);
647        }
648        // OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter
649        writer = new OutputStreamWriter(os, "UTF-8");
650        reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
651
652        // If debugging is enabled, we open a window and write out all network traffic.
653        initDebugger();
654    }
655
656    /**
657     * The server has indicated that TLS negotiation can start. We now need to secure the
658     * existing plain connection and perform a handshake. This method won't return until the
659     * connection has finished the handshake or an error occurred while securing the connection.
660     * @throws IOException 
661     * @throws CertificateException 
662     * @throws NoSuchAlgorithmException 
663     * @throws NoSuchProviderException 
664     * @throws KeyStoreException 
665     * @throws UnrecoverableKeyException 
666     * @throws KeyManagementException 
667     * @throws SmackException 
668     * @throws Exception if an exception occurs.
669     */
670    private void proceedTLSReceived() throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException, NoSuchProviderException, UnrecoverableKeyException, KeyManagementException, SmackException {
671        SSLContext context = this.config.getCustomSSLContext();
672        KeyStore ks = null;
673        KeyManager[] kms = null;
674        PasswordCallback pcb = null;
675
676        if(config.getCallbackHandler() == null) {
677           ks = null;
678        } else if (context == null) {
679            if(config.getKeystoreType().equals("NONE")) {
680                ks = null;
681                pcb = null;
682            }
683            else if(config.getKeystoreType().equals("PKCS11")) {
684                try {
685                    Constructor<?> c = Class.forName("sun.security.pkcs11.SunPKCS11").getConstructor(InputStream.class);
686                    String pkcs11Config = "name = SmartCard\nlibrary = "+config.getPKCS11Library();
687                    ByteArrayInputStream config = new ByteArrayInputStream(pkcs11Config.getBytes());
688                    Provider p = (Provider)c.newInstance(config);
689                    Security.addProvider(p);
690                    ks = KeyStore.getInstance("PKCS11",p);
691                    pcb = new PasswordCallback("PKCS11 Password: ",false);
692                    this.config.getCallbackHandler().handle(new Callback[]{pcb});
693                    ks.load(null,pcb.getPassword());
694                }
695                catch (Exception e) {
696                    ks = null;
697                    pcb = null;
698                }
699            }
700            else if(config.getKeystoreType().equals("Apple")) {
701                ks = KeyStore.getInstance("KeychainStore","Apple");
702                ks.load(null,null);
703                //pcb = new PasswordCallback("Apple Keychain",false);
704                //pcb.setPassword(null);
705            }
706            else {
707                ks = KeyStore.getInstance(config.getKeystoreType());
708                try {
709                    pcb = new PasswordCallback("Keystore Password: ",false);
710                    config.getCallbackHandler().handle(new Callback[]{pcb});
711                    ks.load(new FileInputStream(config.getKeystorePath()), pcb.getPassword());
712                }
713                catch(Exception e) {
714                    ks = null;
715                    pcb = null;
716                }
717            }
718            KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
719            try {
720                if(pcb == null) {
721                    kmf.init(ks,null);
722                } else {
723                    kmf.init(ks,pcb.getPassword());
724                    pcb.clearPassword();
725                }
726                kms = kmf.getKeyManagers();
727            } catch (NullPointerException npe) {
728                kms = null;
729            }
730        }
731
732        // If the user didn't specify a SSLContext, use the default one
733        if (context == null) {
734            context = SSLContext.getInstance("TLS");
735            context.init(kms, null, new java.security.SecureRandom());
736        }
737        Socket plain = socket;
738        // Secure the plain connection
739        socket = context.getSocketFactory().createSocket(plain,
740                host, plain.getPort(), true);
741        // Initialize the reader and writer with the new secured version
742        initReaderAndWriter();
743
744        final SSLSocket sslSocket = (SSLSocket) socket;
745        TLSUtils.setEnabledProtocolsAndCiphers(sslSocket, config.getEnabledSSLProtocols(), config.getEnabledSSLCiphers());
746
747        // Proceed to do the handshake
748        sslSocket.startHandshake();
749
750        final HostnameVerifier verifier = getConfiguration().getHostnameVerifier();
751        if (verifier == null) {
752                throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure.");
753        } else if (!verifier.verify(getServiceName(), sslSocket.getSession())) {
754            throw new CertificateException("Hostname verification of certificate failed. Certificate does not authenticate " + getServiceName());
755        }
756
757        // Set that TLS was successful
758        usingTLS = true;
759    }
760
761    /**
762     * Returns the compression handler that can be used for one compression methods offered by the server.
763     * 
764     * @return a instance of XMPPInputOutputStream or null if no suitable instance was found
765     * 
766     */
767    private XMPPInputOutputStream maybeGetCompressionHandler() {
768        Compress.Feature compression = getFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE);
769        if (compression == null) {
770            // Server does not support compression
771            return null;
772        }
773        for (XMPPInputOutputStream handler : SmackConfiguration.getCompresionHandlers()) {
774                String method = handler.getCompressionMethod();
775                if (compression.getMethods().contains(method))
776                    return handler;
777        }
778        return null;
779    }
780
781    @Override
782    public boolean isUsingCompression() {
783        return compressionHandler != null && compressSyncPoint.wasSuccessful();
784    }
785
786    /**
787     * <p>
788     * Starts using stream compression that will compress network traffic. Traffic can be
789     * reduced up to 90%. Therefore, stream compression is ideal when using a slow speed network
790     * connection. However, the server and the client will need to use more CPU time in order to
791     * un/compress network data so under high load the server performance might be affected.
792     * </p>
793     * <p>
794     * Stream compression has to have been previously offered by the server. Currently only the
795     * zlib method is supported by the client. Stream compression negotiation has to be done
796     * before authentication took place.
797     * </p>
798     *
799     * @throws NotConnectedException 
800     * @throws XMPPException 
801     * @throws NoResponseException 
802     */
803    private void useCompression() throws NotConnectedException, NoResponseException, XMPPException {
804        maybeCompressFeaturesReceived.checkIfSuccessOrWait();
805        // If stream compression was offered by the server and we want to use
806        // compression then send compression request to the server
807        if ((compressionHandler = maybeGetCompressionHandler()) != null) {
808            compressSyncPoint.sendAndWaitForResponseOrThrow(new Compress(compressionHandler.getCompressionMethod()));
809        } else {
810            LOGGER.warning("Could not enable compression because no matching handler/method pair was found");
811        }
812    }
813
814    /**
815     * Establishes a connection to the XMPP server and performs an automatic login
816     * only if the previous connection state was logged (authenticated). It basically
817     * creates and maintains a socket connection to the server.<p>
818     * <p/>
819     * Listeners will be preserved from a previous connection if the reconnection
820     * occurs after an abrupt termination.
821     *
822     * @throws XMPPException if an error occurs while trying to establish the connection.
823     * @throws SmackException 
824     * @throws IOException 
825     */
826    @Override
827    protected void connectInternal() throws SmackException, IOException, XMPPException {
828        // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if
829        // there is an error establishing the connection
830        connectUsingConfiguration();
831
832        // We connected successfully to the servers TCP port
833        socketClosed = false;
834        initConnection();
835
836        // Wait with SASL auth until the SASL mechanisms have been received
837        saslFeatureReceived.checkIfSuccessOrWaitOrThrow();
838
839        // Make note of the fact that we're now connected.
840        connected = true;
841        callConnectionConnectedListener();
842
843        // Automatically makes the login if the user was previously connected successfully
844        // to the server and the connection was terminated abruptly
845        if (wasAuthenticated) {
846            login();
847            notifyReconnection();
848        }
849    }
850
851    /**
852     * Sends out a notification that there was an error with the connection
853     * and closes the connection. Also prints the stack trace of the given exception
854     *
855     * @param e the exception that causes the connection close event.
856     */
857    private synchronized void notifyConnectionError(Exception e) {
858        // Listeners were already notified of the exception, return right here.
859        if ((packetReader == null || packetReader.done) &&
860                (packetWriter == null || packetWriter.done())) return;
861
862        // Closes the connection temporary. A reconnection is possible
863        instantShutdown();
864
865        // Notify connection listeners of the error.
866        callConnectionClosedOnErrorListener(e);
867    }
868
869    /**
870     * For unit testing purposes
871     *
872     * @param writer
873     */
874    protected void setWriter(Writer writer) {
875        this.writer = writer;
876    }
877
878    @Override
879    protected void afterFeaturesReceived() throws SecurityRequiredException, NotConnectedException {
880        StartTls startTlsFeature = getFeature(StartTls.ELEMENT, StartTls.NAMESPACE);
881        if (startTlsFeature != null) {
882            if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) {
883                notifyConnectionError(new SecurityRequiredByServerException());
884                return;
885            }
886
887            if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) {
888                send(new StartTls());
889            }
890        }
891        // If TLS is required but the server doesn't offer it, disconnect
892        // from the server and throw an error. First check if we've already negotiated TLS
893        // and are secure, however (features get parsed a second time after TLS is established).
894        if (!isSecureConnection() && startTlsFeature == null
895                        && getConfiguration().getSecurityMode() == SecurityMode.required) {
896            throw new SecurityRequiredByClientException();
897        }
898
899        if (getSASLAuthentication().authenticationSuccessful()) {
900            // If we have received features after the SASL has been successfully completed, then we
901            // have also *maybe* received, as it is an optional feature, the compression feature
902            // from the server.
903            maybeCompressFeaturesReceived.reportSuccess();
904        }
905    }
906
907    /**
908     * Resets the parser using the latest connection's reader. Reseting the parser is necessary
909     * when the plain connection has been secured or when a new opening stream element is going
910     * to be sent by the server.
911     *
912     * @throws SmackException if the parser could not be reset.
913     */
914    void openStream() throws SmackException {
915        // If possible, provide the receiving entity of the stream open tag, i.e. the server, as much information as
916        // possible. The 'to' attribute is *always* available. The 'from' attribute if set by the user and no external
917        // mechanism is used to determine the local entity (user). And the 'id' attribute is available after the first
918        // response from the server (see e.g. RFC 6120 ยง 9.1.1 Step 2.)
919        CharSequence to = getServiceName();
920        CharSequence from = null;
921        CharSequence localpart = config.getUsername();
922        if (localpart != null) {
923            from = XmppStringUtils.completeJidFrom(localpart, to);
924        }
925        String id = getStreamId();
926        send(new StreamOpen(to, from, id));
927        try {
928            packetReader.parser = PacketParserUtils.newXmppParser(reader);
929        }
930        catch (XmlPullParserException e) {
931            throw new SmackException(e);
932        }
933    }
934
935    protected class PacketReader {
936
937        XmlPullParser parser;
938
939        private volatile boolean done;
940
941        /**
942         * Initializes the reader in order to be used. The reader is initialized during the
943         * first connection and when reconnecting due to an abruptly disconnection.
944         */
945        void init() {
946            done = false;
947
948            Async.go(new Runnable() {
949                public void run() {
950                    parsePackets();
951                }
952            }, "Smack Packet Reader (" + getConnectionCounter() + ")");
953         }
954
955        /**
956         * Shuts the stanza(/packet) reader down. This method simply sets the 'done' flag to true.
957         */
958        void shutdown() {
959            done = true;
960        }
961
962        /**
963         * Parse top-level packets in order to process them further.
964         *
965         * @param thread the thread that is being used by the reader to parse incoming packets.
966         */
967        private void parsePackets() {
968            try {
969                initalOpenStreamSend.checkIfSuccessOrWait();
970                int eventType = parser.getEventType();
971                while (!done) {
972                    switch (eventType) {
973                    case XmlPullParser.START_TAG:
974                        final String name = parser.getName();
975                        switch (name) {
976                        case Message.ELEMENT:
977                        case IQ.IQ_ELEMENT:
978                        case Presence.ELEMENT:
979                            try {
980                                parseAndProcessStanza(parser);
981                            } finally {
982                                clientHandledStanzasCount = SMUtils.incrementHeight(clientHandledStanzasCount);
983                            }
984                            break;
985                        case "stream":
986                            // We found an opening stream.
987                            if ("jabber:client".equals(parser.getNamespace(null))) {
988                                streamId = parser.getAttributeValue("", "id");
989                                String reportedServiceName = parser.getAttributeValue("", "from");
990                                assert(reportedServiceName.equals(config.getServiceName()));
991                            }
992                            break;
993                        case "error":
994                            throw new StreamErrorException(PacketParserUtils.parseStreamError(parser));
995                        case "features":
996                            parseFeatures(parser);
997                            break;
998                        case "proceed":
999                            try {
1000                                // Secure the connection by negotiating TLS
1001                                proceedTLSReceived();
1002                                // Send a new opening stream to the server
1003                                openStream();
1004                            }
1005                            catch (Exception e) {
1006                                // We report any failure regarding TLS in the second stage of XMPP
1007                                // connection establishment, namely the SASL authentication
1008                                saslFeatureReceived.reportFailure(new SmackException(e));
1009                                throw e;
1010                            }
1011                            break;
1012                        case "failure":
1013                            String namespace = parser.getNamespace(null);
1014                            switch (namespace) {
1015                            case "urn:ietf:params:xml:ns:xmpp-tls":
1016                                // TLS negotiation has failed. The server will close the connection
1017                                // TODO Parse failure stanza
1018                                throw new XMPPErrorException("TLS negotiation has failed", null);
1019                            case "http://jabber.org/protocol/compress":
1020                                // Stream compression has been denied. This is a recoverable
1021                                // situation. It is still possible to authenticate and
1022                                // use the connection but using an uncompressed connection
1023                                // TODO Parse failure stanza
1024                                compressSyncPoint.reportFailure(new XMPPErrorException(
1025                                                "Could not establish compression", null));
1026                                break;
1027                            case SaslStreamElements.NAMESPACE:
1028                                // SASL authentication has failed. The server may close the connection
1029                                // depending on the number of retries
1030                                final SASLFailure failure = PacketParserUtils.parseSASLFailure(parser);
1031                                getSASLAuthentication().authenticationFailed(failure);
1032                                break;
1033                            }
1034                            break;
1035                        case Challenge.ELEMENT:
1036                            // The server is challenging the SASL authentication made by the client
1037                            String challengeData = parser.nextText();
1038                            getSASLAuthentication().challengeReceived(challengeData);
1039                            break;
1040                        case Success.ELEMENT:
1041                            Success success = new Success(parser.nextText());
1042                            // We now need to bind a resource for the connection
1043                            // Open a new stream and wait for the response
1044                            openStream();
1045                            // The SASL authentication with the server was successful. The next step
1046                            // will be to bind the resource
1047                            getSASLAuthentication().authenticated(success);
1048                            break;
1049                        case Compressed.ELEMENT:
1050                            // Server confirmed that it's possible to use stream compression. Start
1051                            // stream compression
1052                            // Initialize the reader and writer with the new compressed version
1053                            initReaderAndWriter();
1054                            // Send a new opening stream to the server
1055                            openStream();
1056                            // Notify that compression is being used
1057                            compressSyncPoint.reportSuccess();
1058                            break;
1059                        case Enabled.ELEMENT:
1060                            Enabled enabled = ParseStreamManagement.enabled(parser);
1061                            if (enabled.isResumeSet()) {
1062                                smSessionId = enabled.getId();
1063                                if (StringUtils.isNullOrEmpty(smSessionId)) {
1064                                    XMPPErrorException xmppException = new XMPPErrorException(
1065                                                    "Stream Management 'enabled' element with resume attribute but without session id received",
1066                                                    new XMPPError(
1067                                                                    XMPPError.Condition.bad_request));
1068                                    smEnabledSyncPoint.reportFailure(xmppException);
1069                                    throw xmppException;
1070                                }
1071                                smServerMaxResumptimTime = enabled.getMaxResumptionTime();
1072                            } else {
1073                                // Mark this a non-resumable stream by setting smSessionId to null
1074                                smSessionId = null;
1075                            }
1076                            clientHandledStanzasCount = 0;
1077                            smWasEnabledAtLeastOnce = true;
1078                            smEnabledSyncPoint.reportSuccess();
1079                            LOGGER.fine("Stream Management (XEP-198): succesfully enabled");
1080                            break;
1081                        case Failed.ELEMENT:
1082                            Failed failed = ParseStreamManagement.failed(parser);
1083                            XMPPError xmppError = new XMPPError(failed.getXMPPErrorCondition());
1084                            XMPPException xmppException = new XMPPErrorException("Stream Management failed", xmppError);
1085                            // If only XEP-198 would specify different failure elements for the SM
1086                            // enable and SM resume failure case. But this is not the case, so we
1087                            // need to determine if this is a 'Failed' response for either 'Enable'
1088                            // or 'Resume'.
1089                            if (smResumedSyncPoint.requestSent()) {
1090                                smResumedSyncPoint.reportFailure(xmppException);
1091                            }
1092                            else {
1093                                if (!smEnabledSyncPoint.requestSent()) {
1094                                    throw new IllegalStateException("Failed element received but SM was not previously enabled");
1095                                }
1096                                smEnabledSyncPoint.reportFailure(xmppException);
1097                                // Report success for last lastFeaturesReceived so that in case a
1098                                // failed resumption, we can continue with normal resource binding.
1099                                // See text of XEP-198 5. below Example 11.
1100                                lastFeaturesReceived.reportSuccess();
1101                            }
1102                            break;
1103                        case Resumed.ELEMENT:
1104                            Resumed resumed = ParseStreamManagement.resumed(parser);
1105                            if (!smSessionId.equals(resumed.getPrevId())) {
1106                                throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId());
1107                            }
1108                            // First, drop the stanzas already handled by the server
1109                            processHandledCount(resumed.getHandledCount());
1110                            // Then re-send what is left in the unacknowledged queue
1111                            List<Stanza> stanzasToResend = new LinkedList<Stanza>();
1112                            stanzasToResend.addAll(unacknowledgedStanzas);
1113                            for (Stanza stanza : stanzasToResend) {
1114                                packetWriter.sendStreamElement(stanza);
1115                            }
1116                            smResumedSyncPoint.reportSuccess();
1117                            smEnabledSyncPoint.reportSuccess();
1118                            // If there where stanzas resent, then request a SM ack for them.
1119                            // Writer's sendStreamElement() won't do it automatically based on
1120                            // predicates.
1121                            if (!stanzasToResend.isEmpty()) {
1122                                requestSmAcknowledgementInternal();
1123                            }
1124                            LOGGER.fine("Stream Management (XEP-198): Stream resumed");
1125                            break;
1126                        case AckAnswer.ELEMENT:
1127                            AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser);
1128                            processHandledCount(ackAnswer.getHandledCount());
1129                            break;
1130                        case AckRequest.ELEMENT:
1131                            ParseStreamManagement.ackRequest(parser);
1132                            if (smEnabledSyncPoint.wasSuccessful()) {
1133                                sendSmAcknowledgementInternal();
1134                            } else {
1135                                LOGGER.warning("SM Ack Request received while SM is not enabled");
1136                            }
1137                            break;
1138                         default:
1139                             LOGGER.warning("Unknown top level stream element: " + name);
1140                             break;
1141                        }
1142                        break;
1143                    case XmlPullParser.END_TAG:
1144                        if (parser.getName().equals("stream")) {
1145                            // Disconnect the connection
1146                            disconnect();
1147                        }
1148                        break;
1149                    case XmlPullParser.END_DOCUMENT:
1150                        // END_DOCUMENT only happens in an error case, as otherwise we would see a
1151                        // closing stream element before.
1152                        throw new SmackException(
1153                                        "Parser got END_DOCUMENT event. This could happen e.g. if the server closed the connection without sending a closing stream element");
1154                    }
1155                    eventType = parser.next();
1156                }
1157            }
1158            catch (Exception e) {
1159                // The exception can be ignored if the the connection is 'done'
1160                // or if the it was caused because the socket got closed
1161                if (!(done || isSocketClosed())) {
1162                    // Close the connection and notify connection listeners of the
1163                    // error.
1164                    notifyConnectionError(e);
1165                }
1166            }
1167        }
1168    }
1169
1170    protected class PacketWriter {
1171        public static final int QUEUE_SIZE = XMPPTCPConnection.QUEUE_SIZE;
1172
1173        private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<Element>(
1174                        QUEUE_SIZE, true);
1175
1176        /**
1177         * Needs to be protected for unit testing purposes.
1178         */
1179        protected SynchronizationPoint<NoResponseException> shutdownDone = new SynchronizationPoint<NoResponseException>(
1180                        XMPPTCPConnection.this);
1181
1182        /**
1183         * If set, the stanza(/packet) writer is shut down
1184         */
1185        protected volatile Long shutdownTimestamp = null;
1186
1187        private volatile boolean instantShutdown;
1188
1189        /**
1190         * True if some preconditions are given to start the bundle and defer mechanism.
1191         * <p>
1192         * This will likely get set to true right after the start of the writer thread, because
1193         * {@link #nextStreamElement()} will check if {@link queue} is empty, which is probably the case, and then set
1194         * this field to true.
1195         * </p>
1196         */
1197        private boolean shouldBundleAndDefer;
1198
1199        /** 
1200        * Initializes the writer in order to be used. It is called at the first connection and also 
1201        * is invoked if the connection is disconnected by an error.
1202        */ 
1203        void init() {
1204            shutdownDone.init();
1205            shutdownTimestamp = null;
1206
1207            if (unacknowledgedStanzas != null) {
1208                // It's possible that there are new stanzas in the writer queue that
1209                // came in while we were disconnected but resumable, drain those into
1210                // the unacknowledged queue so that they get resent now
1211                drainWriterQueueToUnacknowledgedStanzas();
1212            }
1213
1214            queue.start();
1215            Async.go(new Runnable() {
1216                @Override
1217                public void run() {
1218                    writePackets();
1219                }
1220            }, "Smack Packet Writer (" + getConnectionCounter() + ")");
1221        }
1222
1223        private boolean done() {
1224            return shutdownTimestamp != null;
1225        }
1226
1227        protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException {
1228            if (done() && !isSmResumptionPossible()) {
1229                // Don't throw a NotConnectedException is there is an resumable stream available
1230                throw new NotConnectedException();
1231            }
1232        }
1233
1234        /**
1235         * Sends the specified element to the server.
1236         *
1237         * @param element the element to send.
1238         * @throws NotConnectedException 
1239         */
1240        protected void sendStreamElement(Element element) throws NotConnectedException {
1241            throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
1242
1243            boolean enqueued = false;
1244            while (!enqueued) {
1245                try {
1246                    queue.put(element);
1247                    enqueued = true;
1248                }
1249                catch (InterruptedException e) {
1250                    throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
1251                    // If the method above did not throw, then the sending thread was interrupted
1252                    // TODO in a later version of Smack the InterruptedException should be thrown to
1253                    // allow users to interrupt a sending thread that is currently blocking because
1254                    // the queue is full.
1255                    LOGGER.log(Level.WARNING, "Sending thread was interrupted", e);
1256                }
1257            }
1258        }
1259
1260        /**
1261         * Shuts down the stanza(/packet) writer. Once this method has been called, no further
1262         * packets will be written to the server.
1263         */
1264        void shutdown(boolean instant) {
1265            instantShutdown = instant;
1266            shutdownTimestamp = System.currentTimeMillis();
1267            queue.shutdown();
1268            try {
1269                shutdownDone.checkIfSuccessOrWait();
1270            }
1271            catch (NoResponseException e) {
1272                LOGGER.log(Level.WARNING, "shutdownDone was not marked as successful by the writer thread", e);
1273            }
1274        }
1275
1276        /**
1277         * Maybe return the next available element from the queue for writing. If the queue is shut down <b>or</b> a
1278         * spurious interrupt occurs, <code>null</code> is returned. So it is important to check the 'done' condition in
1279         * that case.
1280         *
1281         * @return the next element for writing or null.
1282         */
1283        private Element nextStreamElement() {
1284            // It is important the we check if the queue is empty before removing an element from it
1285            if (queue.isEmpty()) {
1286                shouldBundleAndDefer = true;
1287            }
1288            Element packet = null;
1289            try {
1290                packet = queue.take();
1291            }
1292            catch (InterruptedException e) {
1293                if (!queue.isShutdown()) {
1294                    // Users shouldn't try to interrupt the packet writer thread
1295                    LOGGER.log(Level.WARNING, "Packet writer thread was interrupted. Don't do that. Use disconnect() instead.", e);
1296                }
1297            }
1298            return packet;
1299        }
1300
1301        private void writePackets() {
1302            try {
1303                openStream();
1304                initalOpenStreamSend.reportSuccess();
1305                // Write out packets from the queue.
1306                while (!done()) {
1307                    Element element = nextStreamElement();
1308                    if (element == null) {
1309                        continue;
1310                    }
1311
1312                    // Get a local version of the bundle and defer callback, in case it's unset
1313                    // between the null check and the method invocation
1314                    final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback;
1315                    // If the preconditions are given (e.g. bundleAndDefer callback is set, queue is
1316                    // empty), then we could wait a bit for further stanzas attempting to decrease
1317                    // our energy consumption
1318                    if (localBundleAndDeferCallback != null && isAuthenticated() && shouldBundleAndDefer) {
1319                        // Reset shouldBundleAndDefer to false, nextStreamElement() will set it to true once the
1320                        // queue is empty again.
1321                        shouldBundleAndDefer = false;
1322                        final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean();
1323                        final int bundleAndDeferMillis = localBundleAndDeferCallback.getBundleAndDeferMillis(new BundleAndDefer(
1324                                        bundlingAndDeferringStopped));
1325                        if (bundleAndDeferMillis > 0) {
1326                            long remainingWait = bundleAndDeferMillis;
1327                            final long waitStart = System.currentTimeMillis();
1328                            synchronized (bundlingAndDeferringStopped) {
1329                                while (!bundlingAndDeferringStopped.get() && remainingWait > 0) {
1330                                    bundlingAndDeferringStopped.wait(remainingWait);
1331                                    remainingWait = bundleAndDeferMillis
1332                                                    - (System.currentTimeMillis() - waitStart);
1333                                }
1334                            }
1335                        }
1336                    }
1337
1338                    Stanza packet = null;
1339                    if (element instanceof Stanza) {
1340                        packet = (Stanza) element;
1341                    }
1342                    else if (element instanceof Enable) {
1343                        // The client needs to add messages to the unacknowledged stanzas queue
1344                        // right after it sent 'enabled'. Stanza will be added once
1345                        // unacknowledgedStanzas is not null.
1346                        unacknowledgedStanzas = new ArrayBlockingQueue<>(QUEUE_SIZE);
1347                    }
1348                    // Check if the stream element should be put to the unacknowledgedStanza
1349                    // queue. Note that we can not do the put() in sendStanzaInternal() and the
1350                    // packet order is not stable at this point (sendStanzaInternal() can be
1351                    // called concurrently).
1352                    if (unacknowledgedStanzas != null && packet != null) {
1353                        // If the unacknowledgedStanza queue is nearly full, request an new ack
1354                        // from the server in order to drain it
1355                        if (unacknowledgedStanzas.size() == 0.8 * XMPPTCPConnection.QUEUE_SIZE) {
1356                            writer.write(AckRequest.INSTANCE.toXML().toString());
1357                            writer.flush();
1358                        }
1359                        try {
1360                            // It is important the we put the stanza in the unacknowledged stanza
1361                            // queue before we put it on the wire
1362                            unacknowledgedStanzas.put(packet);
1363                        }
1364                        catch (InterruptedException e) {
1365                            throw new IllegalStateException(e);
1366                        }
1367                    }
1368                    writer.write(element.toXML().toString());
1369                    if (queue.isEmpty()) {
1370                        writer.flush();
1371                    }
1372                    if (packet != null) {
1373                        firePacketSendingListeners(packet);
1374                    }
1375                }
1376                if (!instantShutdown) {
1377                    // Flush out the rest of the queue.
1378                    try {
1379                        while (!queue.isEmpty()) {
1380                            Element packet = queue.remove();
1381                            writer.write(packet.toXML().toString());
1382                        }
1383                        writer.flush();
1384                    }
1385                    catch (Exception e) {
1386                        LOGGER.log(Level.WARNING,
1387                                        "Exception flushing queue during shutdown, ignore and continue",
1388                                        e);
1389                    }
1390
1391                    // Close the stream.
1392                    try {
1393                        writer.write("</stream:stream>");
1394                        writer.flush();
1395                    }
1396                    catch (Exception e) {
1397                        LOGGER.log(Level.WARNING, "Exception writing closing stream element", e);
1398                    }
1399                    // Delete the queue contents (hopefully nothing is left).
1400                    queue.clear();
1401                } else if (instantShutdown && isSmEnabled()) {
1402                    // This was an instantShutdown and SM is enabled, drain all remaining stanzas
1403                    // into the unacknowledgedStanzas queue
1404                    drainWriterQueueToUnacknowledgedStanzas();
1405                }
1406
1407                try {
1408                    writer.close();
1409                }
1410                catch (Exception e) {
1411                    // Do nothing
1412                }
1413
1414            }
1415            catch (Exception e) {
1416                // The exception can be ignored if the the connection is 'done'
1417                // or if the it was caused because the socket got closed
1418                if (!(done() || isSocketClosed())) {
1419                    notifyConnectionError(e);
1420                } else {
1421                    LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e);
1422                }
1423            } finally {
1424                LOGGER.fine("Reporting shutdownDone success in writer thread");
1425                shutdownDone.reportSuccess();
1426            }
1427        }
1428
1429        private void drainWriterQueueToUnacknowledgedStanzas() {
1430            List<Element> elements = new ArrayList<Element>(queue.size());
1431            queue.drainTo(elements);
1432            for (Element element : elements) {
1433                if (element instanceof Stanza) {
1434                    unacknowledgedStanzas.add((Stanza) element);
1435                }
1436            }
1437        }
1438    }
1439
1440    /**
1441     * Set if Stream Management should be used by default for new connections.
1442     * 
1443     * @param useSmDefault true to use Stream Management for new connections.
1444     */
1445    public static void setUseStreamManagementDefault(boolean useSmDefault) {
1446        XMPPTCPConnection.useSmDefault = useSmDefault;
1447    }
1448
1449    /**
1450     * Set if Stream Management resumption should be used by default for new connections.
1451     * 
1452     * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
1453     */
1454    public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) {
1455        if (useSmResumptionDefault) {
1456            // Also enable SM is resumption is enabled
1457            setUseStreamManagementDefault(useSmResumptionDefault);
1458        }
1459        XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault;
1460    }
1461
1462    /**
1463     * Set if Stream Management should be used if supported by the server.
1464     * 
1465     * @param useSm true to use Stream Management.
1466     */
1467    public void setUseStreamManagement(boolean useSm) {
1468        this.useSm = useSm;
1469    }
1470
1471    /**
1472     * Set if Stream Management resumption should be used if supported by the server.
1473     *
1474     * @param useSmResumption true to use Stream Management resumption.
1475     */
1476    public void setUseStreamManagementResumption(boolean useSmResumption) {
1477        if (useSmResumption) {
1478            // Also enable SM is resumption is enabled
1479            setUseStreamManagement(useSmResumption);
1480        }
1481        this.useSmResumption = useSmResumption;
1482    }
1483
1484    /**
1485     * Set the preferred resumption time in seconds.
1486     * @param resumptionTime the preferred resumption time in seconds
1487     */
1488    public void setPreferredResumptionTime(int resumptionTime) {
1489        smClientMaxResumptionTime = resumptionTime;
1490    }
1491
1492    /**
1493     * Add a predicate for Stream Management acknowledgment requests.
1494     * <p>
1495     * Those predicates are used to determine when a Stream Management acknowledgement request is send to the server.
1496     * Some pre-defined predicates are found in the <code>org.jivesoftware.smack.sm.predicates</code> package.
1497     * </p>
1498     * <p>
1499     * If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used.
1500     * </p>
1501     * 
1502     * @param predicate the predicate to add.
1503     * @return if the predicate was not already active.
1504     */
1505    public boolean addRequestAckPredicate(StanzaFilter predicate) {
1506        synchronized (requestAckPredicates) {
1507            return requestAckPredicates.add(predicate);
1508        }
1509    }
1510
1511    /**
1512     * Remove the given predicate for Stream Management acknowledgment request.
1513     * @param predicate the predicate to remove.
1514     * @return true if the predicate was removed.
1515     */
1516    public boolean removeRequestAckPredicate(StanzaFilter predicate) {
1517        synchronized (requestAckPredicates) {
1518            return requestAckPredicates.remove(predicate);
1519        }
1520    }
1521
1522    /**
1523     * Remove all predicates for Stream Management acknowledgment requests.
1524     */
1525    public void removeAllRequestAckPredicates() {
1526        synchronized (requestAckPredicates) {
1527            requestAckPredicates.clear();
1528        }
1529    }
1530
1531    /**
1532     * Send an unconditional Stream Management acknowledgement request to the server.
1533     *
1534     * @throws StreamManagementNotEnabledException if Stream Mangement is not enabled.
1535     * @throws NotConnectedException if the connection is not connected.
1536     */
1537    public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException {
1538        if (!isSmEnabled()) {
1539            throw new StreamManagementException.StreamManagementNotEnabledException();
1540        }
1541        requestSmAcknowledgementInternal();
1542    }
1543
1544    private void requestSmAcknowledgementInternal() throws NotConnectedException {
1545        packetWriter.sendStreamElement(AckRequest.INSTANCE);
1546    }
1547
1548    /**
1549     * Send a unconditional Stream Management acknowledgment to the server.
1550     * <p>
1551     * See <a href="http://xmpp.org/extensions/xep-0198.html#acking">XEP-198: Stream Management ยง 4. Acks</a>:
1552     * "Either party MAY send an <a/> element at any time (e.g., after it has received a certain number of stanzas,
1553     * or after a certain period of time), even if it has not received an <r/> element from the other party."
1554     * </p>
1555     * 
1556     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1557     * @throws NotConnectedException if the connection is not connected.
1558     */
1559    public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException {
1560        if (!isSmEnabled()) {
1561            throw new StreamManagementException.StreamManagementNotEnabledException();
1562        }
1563        sendSmAcknowledgementInternal();
1564    }
1565
1566    private void sendSmAcknowledgementInternal() throws NotConnectedException {
1567        packetWriter.sendStreamElement(new AckAnswer(clientHandledStanzasCount));
1568    }
1569
1570    /**
1571     * Add a Stanza acknowledged listener.
1572     * <p>
1573     * Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get
1574     * automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, StanzaListener)} when
1575     * possible.
1576     * </p>
1577     * 
1578     * @param listener the listener to add.
1579     */
1580    public void addStanzaAcknowledgedListener(StanzaListener listener) {
1581        stanzaAcknowledgedListeners.add(listener);
1582    }
1583
1584    /**
1585     * Remove the given Stanza acknowledged listener.
1586     *
1587     * @param listener the listener.
1588     * @return true if the listener was removed.
1589     */
1590    public boolean removeStanzaAcknowledgedListener(StanzaListener listener) {
1591        return stanzaAcknowledgedListeners.remove(listener);
1592    }
1593
1594    /**
1595     * Remove all stanza acknowledged listeners.
1596     */
1597    public void removeAllStanzaAcknowledgedListeners() {
1598        stanzaAcknowledgedListeners.clear();
1599    }
1600
1601    /**
1602     * Add a new Stanza ID acknowledged listener for the given ID.
1603     * <p>
1604     * The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will
1605     * automatically be removed after the listener was run.
1606     * </p>
1607     * 
1608     * @param id the stanza ID.
1609     * @param listener the listener to invoke.
1610     * @return the previous listener for this stanza ID or null.
1611     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1612     */
1613    public StanzaListener addStanzaIdAcknowledgedListener(final String id, StanzaListener listener) throws StreamManagementNotEnabledException {
1614        // Prevent users from adding callbacks that will never get removed
1615        if (!smWasEnabledAtLeastOnce) {
1616            throw new StreamManagementException.StreamManagementNotEnabledException();
1617        }
1618        // Remove the listener after max. 12 hours
1619        final int removeAfterSeconds = Math.min(getMaxSmResumptionTime(), 12 * 60 * 60);
1620        schedule(new Runnable() {
1621            @Override
1622            public void run() {
1623                stanzaIdAcknowledgedListeners.remove(id);
1624            }
1625        }, removeAfterSeconds, TimeUnit.SECONDS);
1626        return stanzaIdAcknowledgedListeners.put(id, listener);
1627    }
1628
1629    /**
1630     * Remove the Stanza ID acknowledged listener for the given ID.
1631     * 
1632     * @param id the stanza ID.
1633     * @return true if the listener was found and removed, false otherwise.
1634     */
1635    public StanzaListener removeStanzaIdAcknowledgedListener(String id) {
1636        return stanzaIdAcknowledgedListeners.remove(id);
1637    }
1638
1639    /**
1640     * Removes all Stanza ID acknowledged listeners.
1641     */
1642    public void removeAllStanzaIdAcknowledgedListeners() {
1643        stanzaIdAcknowledgedListeners.clear();
1644    }
1645
1646    /**
1647     * Returns true if Stream Management is supported by the server.
1648     *
1649     * @return true if Stream Management is supported by the server.
1650     */
1651    public boolean isSmAvailable() {
1652        return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE);
1653    }
1654
1655    /**
1656     * Returns true if Stream Management was successfully negotiated with the server.
1657     *
1658     * @return true if Stream Management was negotiated.
1659     */
1660    public boolean isSmEnabled() {
1661        return smEnabledSyncPoint.wasSuccessful();
1662    }
1663
1664    /**
1665     * Returns true if the stream was successfully resumed with help of Stream Management.
1666     * 
1667     * @return true if the stream was resumed.
1668     */
1669    public boolean streamWasResumed() {
1670        return smResumedSyncPoint.wasSuccessful();
1671    }
1672
1673    /**
1674     * Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible.
1675     * 
1676     * @return true if disconnected but resumption possible.
1677     */
1678    public boolean isDisconnectedButSmResumptionPossible() {
1679        return disconnectedButResumeable && isSmResumptionPossible();
1680    }
1681
1682    /**
1683     * Returns true if the stream is resumable.
1684     *
1685     * @return true if the stream is resumable.
1686     */
1687    public boolean isSmResumptionPossible() {
1688        // There is no resumable stream available
1689        if (smSessionId == null)
1690            return false;
1691
1692        final Long shutdownTimestamp = packetWriter.shutdownTimestamp;
1693        // Seems like we are already reconnected, report true
1694        if (shutdownTimestamp == null) {
1695            return true;
1696        }
1697
1698        // See if resumption time is over
1699        long current = System.currentTimeMillis();
1700        long maxResumptionMillies = ((long) getMaxSmResumptionTime()) * 1000;
1701        if (current > shutdownTimestamp + maxResumptionMillies) {
1702            // Stream resumption is *not* possible if the current timestamp is greater then the greatest timestamp where
1703            // resumption is possible
1704            return false;
1705        } else {
1706            return true;
1707        }
1708    }
1709
1710    /**
1711     * Get the maximum resumption time in seconds after which a managed stream can be resumed.
1712     * <p>
1713     * This method will return {@link Integer#MAX_VALUE} if neither the client nor the server specify a maximum
1714     * resumption time. Be aware of integer overflows when using this value, e.g. do not add arbitrary values to it
1715     * without checking for overflows before.
1716     * </p>
1717     *
1718     * @return the maximum resumption time in seconds or {@link Integer#MAX_VALUE} if none set.
1719     */
1720    public int getMaxSmResumptionTime() {
1721        int clientResumptionTime = smClientMaxResumptionTime > 0 ? smClientMaxResumptionTime : Integer.MAX_VALUE;
1722        int serverResumptionTime = smServerMaxResumptimTime > 0 ? smServerMaxResumptimTime : Integer.MAX_VALUE;
1723        return Math.min(clientResumptionTime, serverResumptionTime);
1724    }
1725
1726    private void processHandledCount(long handledCount) throws NotConnectedException, StreamManagementCounterError {
1727        long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount);
1728        final List<Stanza> ackedStanzas = new ArrayList<Stanza>(
1729                        handledCount <= Integer.MAX_VALUE ? (int) handledCount
1730                                        : Integer.MAX_VALUE);
1731        for (long i = 0; i < ackedStanzasCount; i++) {
1732            Stanza ackedStanza = unacknowledgedStanzas.poll();
1733            // If the server ack'ed a stanza, then it must be in the
1734            // unacknowledged stanza queue. There can be no exception.
1735            if (ackedStanza == null) {
1736                throw new StreamManagementCounterError(handledCount, serverHandledStanzasCount,
1737                                ackedStanzasCount, ackedStanzas);
1738            }
1739            ackedStanzas.add(ackedStanza);
1740        }
1741
1742        boolean atLeastOneStanzaAcknowledgedListener = false;
1743        if (!stanzaAcknowledgedListeners.isEmpty()) {
1744            // If stanzaAcknowledgedListeners is not empty, the we have at least one
1745            atLeastOneStanzaAcknowledgedListener = true;
1746        }
1747        else {
1748            // Otherwise we look for a matching id in the stanza *id* acknowledged listeners
1749            for (Stanza ackedStanza : ackedStanzas) {
1750                String id = ackedStanza.getStanzaId();
1751                if (id != null && stanzaIdAcknowledgedListeners.containsKey(id)) {
1752                    atLeastOneStanzaAcknowledgedListener = true;
1753                    break;
1754                }
1755            }
1756        }
1757
1758        // Only spawn a new thread if there is a chance that some listener is invoked
1759        if (atLeastOneStanzaAcknowledgedListener) {
1760            asyncGo(new Runnable() {
1761                @Override
1762                public void run() {
1763                    for (Stanza ackedStanza : ackedStanzas) {
1764                        for (StanzaListener listener : stanzaAcknowledgedListeners) {
1765                            try {
1766                                listener.processPacket(ackedStanza);
1767                            }
1768                            catch (NotConnectedException e) {
1769                                LOGGER.log(Level.FINER, "Received not connected exception", e);
1770                            }
1771                        }
1772                        String id = ackedStanza.getStanzaId();
1773                        if (StringUtils.isNullOrEmpty(id)) {
1774                            continue;
1775                        }
1776                        StanzaListener listener = stanzaIdAcknowledgedListeners.remove(id);
1777                        if (listener != null) {
1778                            try {
1779                                listener.processPacket(ackedStanza);
1780                            }
1781                            catch (NotConnectedException e) {
1782                                LOGGER.log(Level.FINER, "Received not connected exception", e);
1783                            }
1784                        }
1785                    }
1786                }
1787            });
1788        }
1789
1790        serverHandledStanzasCount = handledCount;
1791    }
1792
1793    /**
1794     * Set the default bundle and defer callback used for new connections.
1795     *
1796     * @param defaultBundleAndDeferCallback
1797     * @see BundleAndDeferCallback
1798     * @since 4.1
1799     */
1800    public static void setDefaultBundleAndDeferCallback(BundleAndDeferCallback defaultBundleAndDeferCallback) {
1801        XMPPTCPConnection.defaultBundleAndDeferCallback = defaultBundleAndDeferCallback;
1802    }
1803
1804    /**
1805     * Set the bundle and defer callback used for this connection.
1806     * <p>
1807     * You can use <code>null</code> as argument to reset the callback. Outgoing stanzas will then
1808     * no longer get deferred.
1809     * </p>
1810     *
1811     * @param bundleAndDeferCallback the callback or <code>null</code>.
1812     * @see BundleAndDeferCallback
1813     * @since 4.1
1814     */
1815    public void setBundleandDeferCallback(BundleAndDeferCallback bundleAndDeferCallback) {
1816        this.bundleAndDeferCallback = bundleAndDeferCallback;
1817    }
1818
1819}