001/**
002 *
003 * Copyright 2009 Jive Software.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.jivesoftware.smack.bosh;
019
020import java.io.IOException;
021import java.io.PipedReader;
022import java.io.PipedWriter;
023import java.io.Writer;
024import java.net.InetAddress;
025import java.util.Map;
026import java.util.logging.Level;
027import java.util.logging.Logger;
028
029import org.jivesoftware.smack.AbstractXMPPConnection;
030import org.jivesoftware.smack.SmackException;
031import org.jivesoftware.smack.SmackException.GenericConnectionException;
032import org.jivesoftware.smack.SmackException.NotConnectedException;
033import org.jivesoftware.smack.SmackException.OutgoingQueueFullException;
034import org.jivesoftware.smack.SmackException.SmackWrappedException;
035import org.jivesoftware.smack.XMPPConnection;
036import org.jivesoftware.smack.XMPPException;
037import org.jivesoftware.smack.XMPPException.StreamErrorException;
038import org.jivesoftware.smack.packet.IQ;
039import org.jivesoftware.smack.packet.Message;
040import org.jivesoftware.smack.packet.Presence;
041import org.jivesoftware.smack.packet.Stanza;
042import org.jivesoftware.smack.packet.StanzaError;
043import org.jivesoftware.smack.packet.TopLevelStreamElement;
044import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
045import org.jivesoftware.smack.util.Async;
046import org.jivesoftware.smack.util.CloseableUtil;
047import org.jivesoftware.smack.util.PacketParserUtils;
048import org.jivesoftware.smack.xml.XmlPullParser;
049import org.jivesoftware.smack.xml.XmlPullParserException;
050
051import org.igniterealtime.jbosh.AbstractBody;
052import org.igniterealtime.jbosh.BOSHClient;
053import org.igniterealtime.jbosh.BOSHClientConfig;
054import org.igniterealtime.jbosh.BOSHClientConnEvent;
055import org.igniterealtime.jbosh.BOSHClientConnListener;
056import org.igniterealtime.jbosh.BOSHClientRequestListener;
057import org.igniterealtime.jbosh.BOSHClientResponseListener;
058import org.igniterealtime.jbosh.BOSHException;
059import org.igniterealtime.jbosh.BOSHMessageEvent;
060import org.igniterealtime.jbosh.BodyQName;
061import org.igniterealtime.jbosh.ComposableBody;
062import org.jxmpp.jid.DomainBareJid;
063import org.jxmpp.jid.parts.Resourcepart;
064
065/**
066 * Creates a connection to an XMPP server via HTTP binding.
067 * This is specified in the XEP-0206: XMPP Over BOSH.
068 *
069 * @see XMPPConnection
070 * @author Guenther Niess
071 */
072public class XMPPBOSHConnection extends AbstractXMPPConnection {
073    private static final Logger LOGGER = Logger.getLogger(XMPPBOSHConnection.class.getName());
074
075    /**
076     * The XMPP Over Bosh namespace.
077     */
078    public static final String XMPP_BOSH_NS = "urn:xmpp:xbosh";
079
080    /**
081     * The BOSH namespace from XEP-0124.
082     */
083    public static final String BOSH_URI = "http://jabber.org/protocol/httpbind";
084
085    /**
086     * The used BOSH client from the jbosh library.
087     */
088    private BOSHClient client;
089
090    /**
091     * Holds the initial configuration used while creating the connection.
092     */
093    @SuppressWarnings("HidingField")
094    private final BOSHConfiguration config;
095
096    private final ArrayBlockingQueueWithShutdown<TopLevelStreamElement> outgoingQueue = new ArrayBlockingQueueWithShutdown<>(100, true);
097
098    private Thread writerThread;
099
100    // Some flags which provides some info about the current state.
101    private boolean isFirstInitialization = true;
102    private boolean done = false;
103
104    // The readerPipe and consumer thread are used for the debugger.
105    private PipedWriter readerPipe;
106    private Thread readerConsumer;
107
108    /**
109     * The session ID for the BOSH session with the connection manager.
110     */
111    protected String sessionID = null;
112
113    private boolean notified;
114
115    /**
116     * Create a HTTP Binding connection to an XMPP server.
117     *
118     * @param username the username to use.
119     * @param password the password to use.
120     * @param https true if you want to use SSL
121     *             (e.g. false for http://domain.lt:7070/http-bind).
122     * @param host the hostname or IP address of the connection manager
123     *             (e.g. domain.lt for http://domain.lt:7070/http-bind).
124     * @param port the port of the connection manager
125     *             (e.g. 7070 for http://domain.lt:7070/http-bind).
126     * @param filePath the file which is described by the URL
127     *             (e.g. /http-bind for http://domain.lt:7070/http-bind).
128     * @param xmppServiceDomain the XMPP service name
129     *             (e.g. domain.lt for the user alice@domain.lt)
130     */
131    public XMPPBOSHConnection(String username, String password, boolean https, String host, int port, String filePath, DomainBareJid xmppServiceDomain) {
132        this(BOSHConfiguration.builder().setUseHttps(https).setHost(host)
133                .setPort(port).setFile(filePath).setXmppDomain(xmppServiceDomain)
134                .setUsernameAndPassword(username, password).build());
135    }
136
137    /**
138     * Create a HTTP Binding connection to an XMPP server.
139     *
140     * @param config The configuration which is used for this connection.
141     */
142    public XMPPBOSHConnection(BOSHConfiguration config) {
143        super(config);
144        this.config = config;
145    }
146
147    @SuppressWarnings("deprecation")
148    @Override
149    protected void connectInternal() throws SmackException, InterruptedException {
150        done = false;
151        notified = false;
152        try {
153            // Ensure a clean starting state
154            if (client != null) {
155                client.close();
156                client = null;
157            }
158            sessionID = null;
159
160            // Initialize BOSH client
161            BOSHClientConfig.Builder cfgBuilder = BOSHClientConfig.Builder
162                    .create(config.getURI(), config.getXMPPServiceDomain().toString());
163            if (config.isProxyEnabled()) {
164                cfgBuilder.setProxy(config.getProxyAddress(), config.getProxyPort());
165            }
166
167            cfgBuilder.setCompressionEnabled(config.isCompressionEnabled());
168
169            for (Map.Entry<String, String> h : config.getHttpHeaders().entrySet()) {
170                cfgBuilder.addHttpHeader(h.getKey(), h.getValue());
171            }
172
173            client = BOSHClient.create(cfgBuilder.build());
174
175            // Initialize the debugger before addBOSHClientResponseListener(new BOSHPacketReader());
176            // BOSHPacketReader may hold and send response prior to display of the request i.e. <response/> before <challenge/>
177            if (debugger != null) {
178                initDebugger();
179            }
180
181            client.addBOSHClientConnListener(new BOSHConnectionListener());
182            client.addBOSHClientResponseListener(new BOSHPacketReader());
183
184            // Send the session creation request
185            client.send(ComposableBody.builder()
186                    .setNamespaceDefinition("xmpp", XMPP_BOSH_NS)
187                    .setAttribute(BodyQName.createWithPrefix(XMPP_BOSH_NS, "version", "xmpp"), "1.0")
188                    .build());
189        } catch (Exception e) {
190            throw new GenericConnectionException(e);
191        }
192
193        // Wait for the response from the server
194        synchronized (this) {
195            if (!connected) {
196                final long deadline = System.currentTimeMillis() + getReplyTimeout();
197                while (!notified) {
198                    final long now = System.currentTimeMillis();
199                    if (now >= deadline) break;
200                    wait(deadline - now);
201                }
202            }
203        }
204
205        assert writerThread == null || !writerThread.isAlive();
206        outgoingQueue.start();
207        writerThread = Async.go(this::writeElements, this + " Writer");
208
209        // If there is no feedback, throw an remote server timeout error
210        if (!connected && !done) {
211            done = true;
212            String errorMessage = "Timeout reached for the connection to "
213                    + getHost() + ":" + getPort() + ".";
214            instantShutdown();
215            throw new SmackException.SmackMessageException(errorMessage);
216        }
217
218        try {
219            XmlPullParser parser = PacketParserUtils.getParserFor(
220                            "<stream:stream xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'/>");
221            onStreamOpen(parser);
222        } catch (XmlPullParserException | IOException e) {
223            instantShutdown();
224            throw new AssertionError("Failed to setup stream environment", e);
225        }
226    }
227
228    @Override
229    public boolean isSecureConnection() {
230        return config.isUsingHTTPS();
231    }
232
233    @Override
234    public boolean isUsingCompression() {
235        // TODO: Implement compression
236        return false;
237    }
238
239    @Override
240    protected void loginInternal(String username, String password, Resourcepart resource) throws XMPPException,
241                    SmackException, IOException, InterruptedException {
242        // Authenticate using SASL
243        authenticate(username, password, config.getAuthzid(), null);
244
245        bindResourceAndEstablishSession(resource);
246
247        afterSuccessfulLogin(false);
248    }
249
250    private volatile boolean writerThreadRunning;
251
252    private void writeElements() {
253        writerThreadRunning = true;
254        try {
255            while (true) {
256                TopLevelStreamElement element;
257                try {
258                    element = outgoingQueue.take();
259                } catch (InterruptedException e) {
260                    LOGGER.log(Level.FINE,
261                                    "Writer thread exiting: Outgoing queue was shutdown as signalled by interrupted exception",
262                                    e);
263                    return;
264                }
265
266                String xmlPayload = element.toXML(BOSH_URI).toString();
267                ComposableBody.Builder composableBodyBuilder = ComposableBody.builder().setPayloadXML(xmlPayload);
268                if (sessionID != null) {
269                    BodyQName qName = BodyQName.create(BOSH_URI, "sid");
270                    composableBodyBuilder.setAttribute(qName, sessionID);
271                }
272
273                ComposableBody composableBody = composableBodyBuilder.build();
274
275                try {
276                    client.send(composableBody);
277                } catch (BOSHException e) {
278                    LOGGER.log(Level.WARNING, this + " received BOSHException in writer thread, connection broke!", e);
279                    // TODO: Signal the user that there was an unexpected exception.
280                    return;
281                }
282
283                if (element instanceof Stanza) {
284                    Stanza stanza = (Stanza) element;
285                    firePacketSendingListeners(stanza);
286                }
287            }
288        } catch (Exception exception) {
289            LOGGER.log(Level.WARNING, "BOSH writer thread threw", exception);
290        } finally {
291            writerThreadRunning = false;
292            notifyWaitingThreads();
293        }
294    }
295
296    @Override
297    protected void sendInternal(TopLevelStreamElement element) throws NotConnectedException, InterruptedException {
298        throwNotConnectedExceptionIfAppropriate();
299        try {
300            outgoingQueue.put(element);
301        } catch (InterruptedException e) {
302            throwNotConnectedExceptionIfAppropriate();
303            // If the method above did not throw, then the sending thread was interrupted
304            throw e;
305        }
306    }
307
308    @Override
309    protected void sendNonBlockingInternal(TopLevelStreamElement element)
310                    throws NotConnectedException, OutgoingQueueFullException {
311        throwNotConnectedExceptionIfAppropriate();
312        boolean enqueued = outgoingQueue.offer(element);
313        if (!enqueued) {
314            throwNotConnectedExceptionIfAppropriate();
315            throw new OutgoingQueueFullException();
316        }
317    }
318
319    @Override
320    public InetAddress getLocalAddress() {
321        return null;
322    }
323
324    @Override
325    protected void shutdown() {
326        instantShutdown();
327    }
328
329    @Override
330    public void instantShutdown() {
331        outgoingQueue.shutdown();
332
333        try {
334            boolean writerThreadTerminated = waitFor(() -> !writerThreadRunning);
335            if (!writerThreadTerminated) {
336                LOGGER.severe("Writer thread of " + this + " did not terminate timely");
337            }
338        } catch (InterruptedException e) {
339            LOGGER.log(Level.FINE, "Interrupted while waiting for writer thread to terminate", e);
340        }
341
342        if (client != null) {
343            try {
344                client.disconnect();
345            } catch (Exception e) {
346                LOGGER.log(Level.WARNING, "shutdown", e);
347            }
348        }
349
350        setWasAuthenticated();
351        sessionID = null;
352        done = true;
353        authenticated = false;
354        connected = false;
355        isFirstInitialization = false;
356        client = null;
357
358        // Close down the readers and writers.
359        CloseableUtil.maybeClose(readerPipe, LOGGER);
360        CloseableUtil.maybeClose(reader, LOGGER);
361        CloseableUtil.maybeClose(writer, LOGGER);
362
363        // set readerConsumer = null before reader to avoid NPE reference
364        readerConsumer = null;
365        readerPipe = null;
366        reader = null;
367        writer = null;
368    }
369
370    /**
371     * Send a HTTP request to the connection manager with the provided body element.
372     *
373     * @param body the body which will be sent.
374     * @throws BOSHException if an BOSH (Bidirectional-streams Over Synchronous HTTP, XEP-0124) related error occurs
375     */
376    protected void send(ComposableBody body) throws BOSHException {
377        if (!connected) {
378            throw new IllegalStateException("Not connected to a server!");
379        }
380        if (body == null) {
381            throw new NullPointerException("Body mustn't be null!");
382        }
383        if (sessionID != null) {
384            body = body.rebuild().setAttribute(
385                    BodyQName.create(BOSH_URI, "sid"), sessionID).build();
386        }
387        client.send(body);
388    }
389
390    /**
391     * Initialize the SmackDebugger which allows to log and debug XML traffic.
392     */
393    @Override
394    protected void initDebugger() {
395        // TODO: Maybe we want to extend the SmackDebugger for simplification
396        //       and a performance boost.
397
398        // Initialize a empty writer which discards all data.
399        writer = new Writer() {
400            @Override
401            public void write(char[] cbuf, int off, int len) {
402                /* ignore */ }
403
404            @Override
405            public void close() {
406                /* ignore */ }
407
408            @Override
409            public void flush() {
410                /* ignore */ }
411        };
412
413        // Initialize a pipe for received raw data.
414        try {
415            readerPipe = new PipedWriter();
416            reader = new PipedReader(readerPipe);
417        }
418        catch (IOException e) {
419            // Ignore
420        }
421
422        // Call the method from the parent class which initializes the debugger.
423        super.initDebugger();
424
425        // Add listeners for the received and sent raw data.
426        client.addBOSHClientResponseListener(new BOSHClientResponseListener() {
427            @Override
428            public void responseReceived(BOSHMessageEvent event) {
429                if (event.getBody() != null) {
430                    try {
431                        readerPipe.write(event.getBody().toXML());
432                        readerPipe.flush();
433                    } catch (Exception e) {
434                        // Ignore
435                    }
436                }
437            }
438        });
439        client.addBOSHClientRequestListener(new BOSHClientRequestListener() {
440            @Override
441            public void requestSent(BOSHMessageEvent event) {
442                if (event.getBody() != null) {
443                    try {
444                        writer.write(event.getBody().toXML());
445                        // Fix all BOSH sent debug messages not shown
446                        writer.flush();
447                    } catch (Exception e) {
448                        // Ignore
449                    }
450                }
451            }
452        });
453
454        // Create and start a thread which discards all read data.
455        readerConsumer = new Thread() {
456            private Thread thread = this;
457            private int bufferLength = 1024;
458
459            @Override
460            public void run() {
461                try {
462                    char[] cbuf = new char[bufferLength];
463                    while (readerConsumer == thread && !done) {
464                        reader.read(cbuf, 0, bufferLength);
465                    }
466                } catch (IOException e) {
467                    // Ignore
468                }
469            }
470        };
471        readerConsumer.setDaemon(true);
472        readerConsumer.start();
473    }
474
475    @Override
476    protected void afterSaslAuthenticationSuccess()
477                    throws NotConnectedException, InterruptedException, SmackWrappedException {
478        // XMPP over BOSH is unusual when it comes to SASL authentication: Instead of sending a new stream open, it
479        // requires a special XML element ot be send after successful SASL authentication.
480        // See XEP-0206 ยง 5., especially the following is example 5 of XEP-0206.
481        ComposableBody composeableBody = ComposableBody.builder()
482                .setNamespaceDefinition("xmpp", XMPPBOSHConnection.XMPP_BOSH_NS)
483                .setAttribute(BodyQName.createWithPrefix(XMPPBOSHConnection.XMPP_BOSH_NS, "restart", "xmpp"), "true")
484                .setAttribute(BodyQName.create(XMPPBOSHConnection.BOSH_URI, "to"), getXMPPServiceDomain().toString())
485                .setAttribute(BodyQName.create(BOSH_URI, "sid"), sessionID)
486                .build();
487
488        try {
489            client.send(composeableBody);
490        } catch (BOSHException e) {
491            // jbosh's exception API does not really match the one of Smack.
492            throw new SmackException.SmackWrappedException(e);
493        }
494    }
495
496    /**
497     * A listener class which listen for a successfully established connection
498     * and connection errors and notifies the BOSHConnection.
499     *
500     * @author Guenther Niess
501     */
502    private class BOSHConnectionListener implements BOSHClientConnListener {
503
504        /**
505         * Notify the BOSHConnection about connection state changes.
506         * Process the connection listeners and try to login if the
507         * connection was formerly authenticated and is now reconnected.
508         */
509        @Override
510        public void connectionEvent(BOSHClientConnEvent connEvent) {
511            try {
512                if (connEvent.isConnected()) {
513                    connected = true;
514                    if (isFirstInitialization) {
515                        isFirstInitialization = false;
516                    }
517                    else {
518                            if (wasAuthenticated) {
519                                try {
520                                    login();
521                                }
522                                catch (Exception e) {
523                                    throw new RuntimeException(e);
524                                }
525                            }
526                    }
527                }
528                else {
529                    if (connEvent.isError()) {
530                        // TODO Check why jbosh's getCause returns Throwable here. This is very
531                        // unusual and should be avoided if possible
532                        Throwable cause = connEvent.getCause();
533                        Exception e;
534                        if (cause instanceof Exception) {
535                            e = (Exception) cause;
536                        } else {
537                            e = new Exception(cause);
538                        }
539                        notifyConnectionError(e);
540                    }
541                    connected = false;
542                }
543            }
544            finally {
545                notified = true;
546                synchronized (XMPPBOSHConnection.this) {
547                    XMPPBOSHConnection.this.notifyAll();
548                }
549            }
550        }
551    }
552
553    /**
554     * Listens for XML traffic from the BOSH connection manager and parses it into
555     * stanza objects.
556     *
557     * @author Guenther Niess
558     */
559    private class BOSHPacketReader implements BOSHClientResponseListener {
560
561        /**
562         * Parse the received packets and notify the corresponding connection.
563         *
564         * @param event the BOSH client response which includes the received packet.
565         */
566        @Override
567        public void responseReceived(BOSHMessageEvent event) {
568            AbstractBody body = event.getBody();
569            if (body != null) {
570                try {
571                    if (sessionID == null) {
572                        sessionID = body.getAttribute(BodyQName.create(XMPPBOSHConnection.BOSH_URI, "sid"));
573                    }
574                    if (streamId == null) {
575                        streamId = body.getAttribute(BodyQName.create(XMPPBOSHConnection.BOSH_URI, "authid"));
576                    }
577                    final XmlPullParser parser = PacketParserUtils.getParserFor(body.toXML());
578
579                    XmlPullParser.Event eventType = parser.getEventType();
580                    do {
581                        eventType = parser.next();
582                        switch (eventType) {
583                        case START_ELEMENT:
584                            String name = parser.getName();
585                            switch (name) {
586                            case Message.ELEMENT:
587                            case IQ.IQ_ELEMENT:
588                            case Presence.ELEMENT:
589                                parseAndProcessStanza(parser);
590                                break;
591                            case "features":
592                                parseFeaturesAndNotify(parser);
593                                break;
594                            case "error":
595                                // Some BOSH error isn't stream error.
596                                if ("urn:ietf:params:xml:ns:xmpp-streams".equals(parser.getNamespace(null))) {
597                                    throw new StreamErrorException(PacketParserUtils.parseStreamError(parser));
598                                } else {
599                                    StanzaError stanzaError = PacketParserUtils.parseError(parser);
600                                    throw new XMPPException.XMPPErrorException(null, stanzaError);
601                                }
602                            default:
603                                parseAndProcessNonza(parser);
604                                break;
605                            }
606                            break;
607                        default:
608                            // Catch all for incomplete switch (MissingCasesInEnumSwitch) statement.
609                            break;
610                        }
611                    }
612                    while (eventType != XmlPullParser.Event.END_DOCUMENT);
613                }
614                catch (Exception e) {
615                    if (isConnected()) {
616                        notifyConnectionError(e);
617                    }
618                }
619            }
620        }
621    }
622}