001/**
002 *
003 * Copyright 2009 Jive Software.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.jivesoftware.smack.bosh;
019
020import java.io.IOException;
021import java.io.PipedReader;
022import java.io.PipedWriter;
023import java.io.Writer;
024import java.net.InetAddress;
025import java.util.Map;
026import java.util.logging.Level;
027import java.util.logging.Logger;
028
029import org.jivesoftware.smack.AbstractXMPPConnection;
030import org.jivesoftware.smack.SmackException;
031import org.jivesoftware.smack.SmackException.GenericConnectionException;
032import org.jivesoftware.smack.SmackException.NotConnectedException;
033import org.jivesoftware.smack.SmackException.OutgoingQueueFullException;
034import org.jivesoftware.smack.SmackException.SmackWrappedException;
035import org.jivesoftware.smack.XMPPConnection;
036import org.jivesoftware.smack.XMPPException;
037import org.jivesoftware.smack.XMPPException.StreamErrorException;
038import org.jivesoftware.smack.packet.IQ;
039import org.jivesoftware.smack.packet.Message;
040import org.jivesoftware.smack.packet.Presence;
041import org.jivesoftware.smack.packet.Stanza;
042import org.jivesoftware.smack.packet.StanzaError;
043import org.jivesoftware.smack.packet.TopLevelStreamElement;
044import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
045import org.jivesoftware.smack.util.Async;
046import org.jivesoftware.smack.util.CloseableUtil;
047import org.jivesoftware.smack.util.PacketParserUtils;
048import org.jivesoftware.smack.xml.XmlPullParser;
049import org.jivesoftware.smack.xml.XmlPullParserException;
050
051import org.igniterealtime.jbosh.AbstractBody;
052import org.igniterealtime.jbosh.BOSHClient;
053import org.igniterealtime.jbosh.BOSHClientConfig;
054import org.igniterealtime.jbosh.BOSHClientConnEvent;
055import org.igniterealtime.jbosh.BOSHClientConnListener;
056import org.igniterealtime.jbosh.BOSHClientRequestListener;
057import org.igniterealtime.jbosh.BOSHClientResponseListener;
058import org.igniterealtime.jbosh.BOSHException;
059import org.igniterealtime.jbosh.BOSHMessageEvent;
060import org.igniterealtime.jbosh.BodyQName;
061import org.igniterealtime.jbosh.ComposableBody;
062import org.jxmpp.jid.DomainBareJid;
063import org.jxmpp.jid.parts.Resourcepart;
064
065/**
066 * Creates a connection to an XMPP server via HTTP binding.
067 * This is specified in the XEP-0206: XMPP Over BOSH.
068 *
069 * @see XMPPConnection
070 * @author Guenther Niess
071 */
072public class XMPPBOSHConnection extends AbstractXMPPConnection {
073    private static final Logger LOGGER = Logger.getLogger(XMPPBOSHConnection.class.getName());
074
075    /**
076     * The XMPP Over Bosh namespace.
077     */
078    public static final String XMPP_BOSH_NS = "urn:xmpp:xbosh";
079
080    /**
081     * The BOSH namespace from XEP-0124.
082     */
083    public static final String BOSH_URI = "http://jabber.org/protocol/httpbind";
084
085    /**
086     * The used BOSH client from the jbosh library.
087     */
088    private BOSHClient client;
089
090    /**
091     * Holds the initial configuration used while creating the connection.
092     */
093    @SuppressWarnings("HidingField")
094    private final BOSHConfiguration config;
095
096    private final ArrayBlockingQueueWithShutdown<TopLevelStreamElement> outgoingQueue = new ArrayBlockingQueueWithShutdown<>(100, true);
097
098    private Thread writerThread;
099
100    // Some flags which provides some info about the current state.
101    private boolean isFirstInitialization = true;
102    private boolean done = false;
103
104    // The readerPipe and consumer thread are used for the debugger.
105    private PipedWriter readerPipe;
106    private Thread readerConsumer;
107
108    /**
109     * The session ID for the BOSH session with the connection manager.
110     */
111    protected String sessionID = null;
112
113    private boolean notified;
114
115    /**
116     * Create a HTTP Binding connection to an XMPP server.
117     *
118     * @param username the username to use.
119     * @param password the password to use.
120     * @param https true if you want to use SSL
121     *             (e.g. false for http://domain.lt:7070/http-bind).
122     * @param host the hostname or IP address of the connection manager
123     *             (e.g. domain.lt for http://domain.lt:7070/http-bind).
124     * @param port the port of the connection manager
125     *             (e.g. 7070 for http://domain.lt:7070/http-bind).
126     * @param filePath the file which is described by the URL
127     *             (e.g. /http-bind for http://domain.lt:7070/http-bind).
128     * @param xmppServiceDomain the XMPP service name
129     *             (e.g. domain.lt for the user alice@domain.lt)
130     */
131    public XMPPBOSHConnection(String username, String password, boolean https, String host, int port, String filePath, DomainBareJid xmppServiceDomain) {
132        this(BOSHConfiguration.builder().setUseHttps(https).setHost(host)
133                .setPort(port).setFile(filePath).setXmppDomain(xmppServiceDomain)
134                .setUsernameAndPassword(username, password).build());
135    }
136
137    /**
138     * Create a HTTP Binding connection to an XMPP server.
139     *
140     * @param config The configuration which is used for this connection.
141     */
142    public XMPPBOSHConnection(BOSHConfiguration config) {
143        super(config);
144        this.config = config;
145    }
146
147    @SuppressWarnings("deprecation")
148    @Override
149    protected void connectInternal() throws SmackException, InterruptedException {
150        done = false;
151        notified = false;
152        try {
153            // Ensure a clean starting state
154            if (client != null) {
155                client.close();
156                client = null;
157            }
158            sessionID = null;
159
160            // Initialize BOSH client
161            BOSHClientConfig.Builder cfgBuilder = BOSHClientConfig.Builder
162                    .create(config.getURI(), config.getXMPPServiceDomain().toString());
163            if (config.isProxyEnabled()) {
164                cfgBuilder.setProxy(config.getProxyAddress(), config.getProxyPort());
165            }
166
167            cfgBuilder.setCompressionEnabled(config.isCompressionEnabled());
168
169            for (Map.Entry<String, String> h : config.getHttpHeaders().entrySet()) {
170                cfgBuilder.addHttpHeader(h.getKey(), h.getValue());
171            }
172
173            client = BOSHClient.create(cfgBuilder.build());
174
175            client.addBOSHClientConnListener(new BOSHConnectionListener());
176            client.addBOSHClientResponseListener(new BOSHPacketReader());
177
178            // Initialize the debugger
179            if (debugger != null) {
180                initDebugger();
181            }
182
183            // Send the session creation request
184            client.send(ComposableBody.builder()
185                    .setNamespaceDefinition("xmpp", XMPP_BOSH_NS)
186                    .setAttribute(BodyQName.createWithPrefix(XMPP_BOSH_NS, "version", "xmpp"), "1.0")
187                    .build());
188        } catch (Exception e) {
189            throw new GenericConnectionException(e);
190        }
191
192        // Wait for the response from the server
193        synchronized (this) {
194            if (!connected) {
195                final long deadline = System.currentTimeMillis() + getReplyTimeout();
196                while (!notified) {
197                    final long now = System.currentTimeMillis();
198                    if (now >= deadline) break;
199                    wait(deadline - now);
200                }
201            }
202        }
203
204        assert writerThread == null || !writerThread.isAlive();
205        outgoingQueue.start();
206        writerThread = Async.go(this::writeElements, this + " Writer");
207
208        // If there is no feedback, throw an remote server timeout error
209        if (!connected && !done) {
210            done = true;
211            String errorMessage = "Timeout reached for the connection to "
212                    + getHost() + ":" + getPort() + ".";
213            instantShutdown();
214            throw new SmackException.SmackMessageException(errorMessage);
215        }
216
217        try {
218            XmlPullParser parser = PacketParserUtils.getParserFor(
219                            "<stream:stream xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'/>");
220            onStreamOpen(parser);
221        } catch (XmlPullParserException | IOException e) {
222            instantShutdown();
223            throw new AssertionError("Failed to setup stream environment", e);
224        }
225    }
226
227    @Override
228    public boolean isSecureConnection() {
229        return config.isUsingHTTPS();
230    }
231
232    @Override
233    public boolean isUsingCompression() {
234        // TODO: Implement compression
235        return false;
236    }
237
238    @Override
239    protected void loginInternal(String username, String password, Resourcepart resource) throws XMPPException,
240                    SmackException, IOException, InterruptedException {
241        // Authenticate using SASL
242        authenticate(username, password, config.getAuthzid(), null);
243
244        bindResourceAndEstablishSession(resource);
245
246        afterSuccessfulLogin(false);
247    }
248
249    private volatile boolean writerThreadRunning;
250
251    private void writeElements() {
252        writerThreadRunning = true;
253        try {
254            while (true) {
255                TopLevelStreamElement element;
256                try {
257                    element = outgoingQueue.take();
258                } catch (InterruptedException e) {
259                    LOGGER.log(Level.FINE,
260                                    "Writer thread exiting: Outgoing queue was shutdown as signalled by interrupted exception",
261                                    e);
262                    return;
263                }
264
265                String xmlPayload = element.toXML(BOSH_URI).toString();
266                ComposableBody.Builder composableBodyBuilder = ComposableBody.builder().setPayloadXML(xmlPayload);
267                if (sessionID != null) {
268                    BodyQName qName = BodyQName.create(BOSH_URI, "sid");
269                    composableBodyBuilder.setAttribute(qName, sessionID);
270                }
271
272                ComposableBody composableBody = composableBodyBuilder.build();
273
274                try {
275                    client.send(composableBody);
276                } catch (BOSHException e) {
277                    LOGGER.log(Level.WARNING, this + " received BOSHException in writer thread, connection broke!", e);
278                    // TODO: Signal the user that there was an unexpected exception.
279                    return;
280                }
281
282                if (element instanceof Stanza) {
283                    Stanza stanza = (Stanza) element;
284                    firePacketSendingListeners(stanza);
285                }
286            }
287        } catch (Exception exception) {
288            LOGGER.log(Level.WARNING, "BOSH writer thread threw", exception);
289        } finally {
290            writerThreadRunning = false;
291            notifyWaitingThreads();
292        }
293    }
294
295    @Override
296    protected void sendInternal(TopLevelStreamElement element) throws NotConnectedException, InterruptedException {
297        throwNotConnectedExceptionIfAppropriate();
298        try {
299            outgoingQueue.put(element);
300        } catch (InterruptedException e) {
301            throwNotConnectedExceptionIfAppropriate();
302            // If the method above did not throw, then the sending thread was interrupted
303            throw e;
304        }
305    }
306
307    @Override
308    protected void sendNonBlockingInternal(TopLevelStreamElement element)
309                    throws NotConnectedException, OutgoingQueueFullException {
310        throwNotConnectedExceptionIfAppropriate();
311        boolean enqueued = outgoingQueue.offer(element);
312        if (!enqueued) {
313            throwNotConnectedExceptionIfAppropriate();
314            throw new OutgoingQueueFullException();
315        }
316    }
317
318    @Override
319    public InetAddress getLocalAddress() {
320        return null;
321    }
322
323    @Override
324    protected void shutdown() {
325        instantShutdown();
326    }
327
328    @Override
329    public void instantShutdown() {
330        outgoingQueue.shutdown();
331
332        try {
333            boolean writerThreadTerminated = waitFor(() -> !writerThreadRunning);
334            if (!writerThreadTerminated) {
335                LOGGER.severe("Writer thread of " + this + " did not terminate timely");
336            }
337        } catch (InterruptedException e) {
338            LOGGER.log(Level.FINE, "Interrupted while waiting for writer thread to terminate", e);
339        }
340
341        if (client != null) {
342            try {
343                client.disconnect();
344            } catch (Exception e) {
345                LOGGER.log(Level.WARNING, "shutdown", e);
346            }
347        }
348
349        setWasAuthenticated();
350        sessionID = null;
351        done = true;
352        authenticated = false;
353        connected = false;
354        isFirstInitialization = false;
355        client = null;
356
357        // Close down the readers and writers.
358        CloseableUtil.maybeClose(readerPipe, LOGGER);
359        CloseableUtil.maybeClose(reader, LOGGER);
360        CloseableUtil.maybeClose(writer, LOGGER);
361
362        readerPipe = null;
363        reader = null;
364        writer = null;
365        readerConsumer = null;
366    }
367
368    /**
369     * Send a HTTP request to the connection manager with the provided body element.
370     *
371     * @param body the body which will be sent.
372     * @throws BOSHException if an BOSH (Bidirectional-streams Over Synchronous HTTP, XEP-0124) related error occurs
373     */
374    protected void send(ComposableBody body) throws BOSHException {
375        if (!connected) {
376            throw new IllegalStateException("Not connected to a server!");
377        }
378        if (body == null) {
379            throw new NullPointerException("Body mustn't be null!");
380        }
381        if (sessionID != null) {
382            body = body.rebuild().setAttribute(
383                    BodyQName.create(BOSH_URI, "sid"), sessionID).build();
384        }
385        client.send(body);
386    }
387
388    /**
389     * Initialize the SmackDebugger which allows to log and debug XML traffic.
390     */
391    @Override
392    protected void initDebugger() {
393        // TODO: Maybe we want to extend the SmackDebugger for simplification
394        //       and a performance boost.
395
396        // Initialize a empty writer which discards all data.
397        writer = new Writer() {
398            @Override
399            public void write(char[] cbuf, int off, int len) {
400                /* ignore */ }
401
402            @Override
403            public void close() {
404                /* ignore */ }
405
406            @Override
407            public void flush() {
408                /* ignore */ }
409        };
410
411        // Initialize a pipe for received raw data.
412        try {
413            readerPipe = new PipedWriter();
414            reader = new PipedReader(readerPipe);
415        }
416        catch (IOException e) {
417            // Ignore
418        }
419
420        // Call the method from the parent class which initializes the debugger.
421        super.initDebugger();
422
423        // Add listeners for the received and sent raw data.
424        client.addBOSHClientResponseListener(new BOSHClientResponseListener() {
425            @Override
426            public void responseReceived(BOSHMessageEvent event) {
427                if (event.getBody() != null) {
428                    try {
429                        readerPipe.write(event.getBody().toXML());
430                        readerPipe.flush();
431                    } catch (Exception e) {
432                        // Ignore
433                    }
434                }
435            }
436        });
437        client.addBOSHClientRequestListener(new BOSHClientRequestListener() {
438            @Override
439            public void requestSent(BOSHMessageEvent event) {
440                if (event.getBody() != null) {
441                    try {
442                        writer.write(event.getBody().toXML());
443                    } catch (Exception e) {
444                        // Ignore
445                    }
446                }
447            }
448        });
449
450        // Create and start a thread which discards all read data.
451        readerConsumer = new Thread() {
452            private Thread thread = this;
453            private int bufferLength = 1024;
454
455            @Override
456            public void run() {
457                try {
458                    char[] cbuf = new char[bufferLength];
459                    while (readerConsumer == thread && !done) {
460                        reader.read(cbuf, 0, bufferLength);
461                    }
462                } catch (IOException e) {
463                    // Ignore
464                }
465            }
466        };
467        readerConsumer.setDaemon(true);
468        readerConsumer.start();
469    }
470
471    @Override
472    protected void afterSaslAuthenticationSuccess()
473                    throws NotConnectedException, InterruptedException, SmackWrappedException {
474        // XMPP over BOSH is unusual when it comes to SASL authentication: Instead of sending a new stream open, it
475        // requires a special XML element ot be send after successful SASL authentication.
476        // See XEP-0206 ยง 5., especially the following is example 5 of XEP-0206.
477        ComposableBody composeableBody = ComposableBody.builder()
478                .setNamespaceDefinition("xmpp", XMPPBOSHConnection.XMPP_BOSH_NS)
479                .setAttribute(BodyQName.createWithPrefix(XMPPBOSHConnection.XMPP_BOSH_NS, "restart", "xmpp"), "true")
480                .setAttribute(BodyQName.create(XMPPBOSHConnection.BOSH_URI, "to"), getXMPPServiceDomain().toString())
481                .setAttribute(BodyQName.create(BOSH_URI, "sid"), sessionID)
482                .build();
483
484        try {
485            client.send(composeableBody);
486        } catch (BOSHException e) {
487            // jbosh's exception API does not really match the one of Smack.
488            throw new SmackException.SmackWrappedException(e);
489        }
490    }
491
492    /**
493     * A listener class which listen for a successfully established connection
494     * and connection errors and notifies the BOSHConnection.
495     *
496     * @author Guenther Niess
497     */
498    private class BOSHConnectionListener implements BOSHClientConnListener {
499
500        /**
501         * Notify the BOSHConnection about connection state changes.
502         * Process the connection listeners and try to login if the
503         * connection was formerly authenticated and is now reconnected.
504         */
505        @Override
506        public void connectionEvent(BOSHClientConnEvent connEvent) {
507            try {
508                if (connEvent.isConnected()) {
509                    connected = true;
510                    if (isFirstInitialization) {
511                        isFirstInitialization = false;
512                    }
513                    else {
514                            if (wasAuthenticated) {
515                                try {
516                                    login();
517                                }
518                                catch (Exception e) {
519                                    throw new RuntimeException(e);
520                                }
521                            }
522                    }
523                }
524                else {
525                    if (connEvent.isError()) {
526                        // TODO Check why jbosh's getCause returns Throwable here. This is very
527                        // unusual and should be avoided if possible
528                        Throwable cause = connEvent.getCause();
529                        Exception e;
530                        if (cause instanceof Exception) {
531                            e = (Exception) cause;
532                        } else {
533                            e = new Exception(cause);
534                        }
535                        notifyConnectionError(e);
536                    }
537                    connected = false;
538                }
539            }
540            finally {
541                notified = true;
542                synchronized (XMPPBOSHConnection.this) {
543                    XMPPBOSHConnection.this.notifyAll();
544                }
545            }
546        }
547    }
548
549    /**
550     * Listens for XML traffic from the BOSH connection manager and parses it into
551     * stanza objects.
552     *
553     * @author Guenther Niess
554     */
555    private class BOSHPacketReader implements BOSHClientResponseListener {
556
557        /**
558         * Parse the received packets and notify the corresponding connection.
559         *
560         * @param event the BOSH client response which includes the received packet.
561         */
562        @Override
563        public void responseReceived(BOSHMessageEvent event) {
564            AbstractBody body = event.getBody();
565            if (body != null) {
566                try {
567                    if (sessionID == null) {
568                        sessionID = body.getAttribute(BodyQName.create(XMPPBOSHConnection.BOSH_URI, "sid"));
569                    }
570                    if (streamId == null) {
571                        streamId = body.getAttribute(BodyQName.create(XMPPBOSHConnection.BOSH_URI, "authid"));
572                    }
573                    final XmlPullParser parser = PacketParserUtils.getParserFor(body.toXML());
574
575                    XmlPullParser.Event eventType = parser.getEventType();
576                    do {
577                        eventType = parser.next();
578                        switch (eventType) {
579                        case START_ELEMENT:
580                            String name = parser.getName();
581                            switch (name) {
582                            case Message.ELEMENT:
583                            case IQ.IQ_ELEMENT:
584                            case Presence.ELEMENT:
585                                parseAndProcessStanza(parser);
586                                break;
587                            case "features":
588                                parseFeaturesAndNotify(parser);
589                                break;
590                            case "error":
591                                // Some BOSH error isn't stream error.
592                                if ("urn:ietf:params:xml:ns:xmpp-streams".equals(parser.getNamespace(null))) {
593                                    throw new StreamErrorException(PacketParserUtils.parseStreamError(parser));
594                                } else {
595                                    StanzaError stanzaError = PacketParserUtils.parseError(parser);
596                                    throw new XMPPException.XMPPErrorException(null, stanzaError);
597                                }
598                            default:
599                                parseAndProcessNonza(parser);
600                                break;
601                            }
602                            break;
603                        default:
604                            // Catch all for incomplete switch (MissingCasesInEnumSwitch) statement.
605                            break;
606                        }
607                    }
608                    while (eventType != XmlPullParser.Event.END_DOCUMENT);
609                }
610                catch (Exception e) {
611                    if (isConnected()) {
612                        notifyConnectionError(e);
613                    }
614                }
615            }
616        }
617    }
618}