001/** 002 * 003 * Copyright 2009 Jive Software. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.jivesoftware.smack.bosh; 019 020import java.io.IOException; 021import java.io.PipedReader; 022import java.io.PipedWriter; 023import java.io.Writer; 024import java.net.InetAddress; 025import java.util.Map; 026import java.util.logging.Level; 027import java.util.logging.Logger; 028 029import org.jivesoftware.smack.AbstractXMPPConnection; 030import org.jivesoftware.smack.SmackException; 031import org.jivesoftware.smack.SmackException.GenericConnectionException; 032import org.jivesoftware.smack.SmackException.NotConnectedException; 033import org.jivesoftware.smack.SmackException.OutgoingQueueFullException; 034import org.jivesoftware.smack.SmackException.SmackWrappedException; 035import org.jivesoftware.smack.XMPPConnection; 036import org.jivesoftware.smack.XMPPException; 037import org.jivesoftware.smack.XMPPException.StreamErrorException; 038import org.jivesoftware.smack.packet.IQ; 039import org.jivesoftware.smack.packet.Message; 040import org.jivesoftware.smack.packet.Presence; 041import org.jivesoftware.smack.packet.Stanza; 042import org.jivesoftware.smack.packet.StanzaError; 043import org.jivesoftware.smack.packet.TopLevelStreamElement; 044import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown; 045import org.jivesoftware.smack.util.Async; 046import org.jivesoftware.smack.util.CloseableUtil; 047import org.jivesoftware.smack.util.PacketParserUtils; 048import org.jivesoftware.smack.xml.XmlPullParser; 049import org.jivesoftware.smack.xml.XmlPullParserException; 050 051import org.igniterealtime.jbosh.AbstractBody; 052import org.igniterealtime.jbosh.BOSHClient; 053import org.igniterealtime.jbosh.BOSHClientConfig; 054import org.igniterealtime.jbosh.BOSHClientConnEvent; 055import org.igniterealtime.jbosh.BOSHClientConnListener; 056import org.igniterealtime.jbosh.BOSHClientRequestListener; 057import org.igniterealtime.jbosh.BOSHClientResponseListener; 058import org.igniterealtime.jbosh.BOSHException; 059import org.igniterealtime.jbosh.BOSHMessageEvent; 060import org.igniterealtime.jbosh.BodyQName; 061import org.igniterealtime.jbosh.ComposableBody; 062import org.jxmpp.jid.DomainBareJid; 063import org.jxmpp.jid.parts.Resourcepart; 064 065/** 066 * Creates a connection to an XMPP server via HTTP binding. 067 * This is specified in the XEP-0206: XMPP Over BOSH. 068 * 069 * @see XMPPConnection 070 * @author Guenther Niess 071 */ 072public class XMPPBOSHConnection extends AbstractXMPPConnection { 073 private static final Logger LOGGER = Logger.getLogger(XMPPBOSHConnection.class.getName()); 074 075 /** 076 * The XMPP Over Bosh namespace. 077 */ 078 public static final String XMPP_BOSH_NS = "urn:xmpp:xbosh"; 079 080 /** 081 * The BOSH namespace from XEP-0124. 082 */ 083 public static final String BOSH_URI = "http://jabber.org/protocol/httpbind"; 084 085 /** 086 * The used BOSH client from the jbosh library. 087 */ 088 private BOSHClient client; 089 090 /** 091 * Holds the initial configuration used while creating the connection. 092 */ 093 @SuppressWarnings("HidingField") 094 private final BOSHConfiguration config; 095 096 private final ArrayBlockingQueueWithShutdown<TopLevelStreamElement> outgoingQueue = new ArrayBlockingQueueWithShutdown<>(100, true); 097 098 private Thread writerThread; 099 100 // Some flags which provides some info about the current state. 101 private boolean isFirstInitialization = true; 102 private boolean done = false; 103 104 // The readerPipe and consumer thread are used for the debugger. 105 private PipedWriter readerPipe; 106 private Thread readerConsumer; 107 108 /** 109 * The session ID for the BOSH session with the connection manager. 110 */ 111 protected String sessionID = null; 112 113 private boolean notified; 114 115 /** 116 * Create a HTTP Binding connection to an XMPP server. 117 * 118 * @param username the username to use. 119 * @param password the password to use. 120 * @param https true if you want to use SSL 121 * (e.g. false for http://domain.lt:7070/http-bind). 122 * @param host the hostname or IP address of the connection manager 123 * (e.g. domain.lt for http://domain.lt:7070/http-bind). 124 * @param port the port of the connection manager 125 * (e.g. 7070 for http://domain.lt:7070/http-bind). 126 * @param filePath the file which is described by the URL 127 * (e.g. /http-bind for http://domain.lt:7070/http-bind). 128 * @param xmppServiceDomain the XMPP service name 129 * (e.g. domain.lt for the user alice@domain.lt) 130 */ 131 public XMPPBOSHConnection(String username, String password, boolean https, String host, int port, String filePath, DomainBareJid xmppServiceDomain) { 132 this(BOSHConfiguration.builder().setUseHttps(https).setHost(host) 133 .setPort(port).setFile(filePath).setXmppDomain(xmppServiceDomain) 134 .setUsernameAndPassword(username, password).build()); 135 } 136 137 /** 138 * Create a HTTP Binding connection to an XMPP server. 139 * 140 * @param config The configuration which is used for this connection. 141 */ 142 public XMPPBOSHConnection(BOSHConfiguration config) { 143 super(config); 144 this.config = config; 145 } 146 147 @SuppressWarnings("deprecation") 148 @Override 149 protected void connectInternal() throws SmackException, InterruptedException { 150 done = false; 151 notified = false; 152 try { 153 // Ensure a clean starting state 154 if (client != null) { 155 client.close(); 156 client = null; 157 } 158 sessionID = null; 159 160 // Initialize BOSH client 161 BOSHClientConfig.Builder cfgBuilder = BOSHClientConfig.Builder 162 .create(config.getURI(), config.getXMPPServiceDomain().toString()); 163 if (config.isProxyEnabled()) { 164 cfgBuilder.setProxy(config.getProxyAddress(), config.getProxyPort()); 165 } 166 167 cfgBuilder.setCompressionEnabled(config.isCompressionEnabled()); 168 169 for (Map.Entry<String, String> h : config.getHttpHeaders().entrySet()) { 170 cfgBuilder.addHttpHeader(h.getKey(), h.getValue()); 171 } 172 173 client = BOSHClient.create(cfgBuilder.build()); 174 175 // Initialize the debugger before addBOSHClientResponseListener(new BOSHPacketReader()); 176 // BOSHPacketReader may hold and send response prior to display of the request i.e. <response/> before <challenge/> 177 if (debugger != null) { 178 initDebugger(); 179 } 180 181 client.addBOSHClientConnListener(new BOSHConnectionListener()); 182 client.addBOSHClientResponseListener(new BOSHPacketReader()); 183 184 // Send the session creation request 185 client.send(ComposableBody.builder() 186 .setNamespaceDefinition("xmpp", XMPP_BOSH_NS) 187 .setAttribute(BodyQName.createWithPrefix(XMPP_BOSH_NS, "version", "xmpp"), "1.0") 188 .build()); 189 } catch (Exception e) { 190 throw new GenericConnectionException(e); 191 } 192 193 // Wait for the response from the server 194 synchronized (this) { 195 if (!connected) { 196 final long deadline = System.currentTimeMillis() + getReplyTimeout(); 197 while (!notified) { 198 final long now = System.currentTimeMillis(); 199 if (now >= deadline) break; 200 wait(deadline - now); 201 } 202 } 203 } 204 205 assert writerThread == null || !writerThread.isAlive(); 206 outgoingQueue.start(); 207 writerThread = Async.go(this::writeElements, this + " Writer"); 208 209 // If there is no feedback, throw an remote server timeout error 210 if (!connected && !done) { 211 done = true; 212 String errorMessage = "Timeout reached for the connection to " 213 + getHost() + ":" + getPort() + "."; 214 instantShutdown(); 215 throw new SmackException.SmackMessageException(errorMessage); 216 } 217 218 try { 219 XmlPullParser parser = PacketParserUtils.getParserFor( 220 "<stream:stream xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'/>"); 221 onStreamOpen(parser); 222 } catch (XmlPullParserException | IOException e) { 223 instantShutdown(); 224 throw new AssertionError("Failed to setup stream environment", e); 225 } 226 } 227 228 @Override 229 public boolean isSecureConnection() { 230 return config.isUsingHTTPS(); 231 } 232 233 @Override 234 public boolean isUsingCompression() { 235 // TODO: Implement compression 236 return false; 237 } 238 239 @Override 240 protected void loginInternal(String username, String password, Resourcepart resource) throws XMPPException, 241 SmackException, IOException, InterruptedException { 242 // Authenticate using SASL 243 authenticate(username, password, config.getAuthzid(), null); 244 245 bindResourceAndEstablishSession(resource); 246 247 afterSuccessfulLogin(false); 248 } 249 250 private volatile boolean writerThreadRunning; 251 252 private void writeElements() { 253 writerThreadRunning = true; 254 try { 255 while (true) { 256 TopLevelStreamElement element; 257 try { 258 element = outgoingQueue.take(); 259 } catch (InterruptedException e) { 260 LOGGER.log(Level.FINE, 261 "Writer thread exiting: Outgoing queue was shutdown as signalled by interrupted exception", 262 e); 263 return; 264 } 265 266 String xmlPayload = element.toXML(BOSH_URI).toString(); 267 ComposableBody.Builder composableBodyBuilder = ComposableBody.builder().setPayloadXML(xmlPayload); 268 if (sessionID != null) { 269 BodyQName qName = BodyQName.create(BOSH_URI, "sid"); 270 composableBodyBuilder.setAttribute(qName, sessionID); 271 } 272 273 ComposableBody composableBody = composableBodyBuilder.build(); 274 275 try { 276 client.send(composableBody); 277 } catch (BOSHException e) { 278 LOGGER.log(Level.WARNING, this + " received BOSHException in writer thread, connection broke!", e); 279 // TODO: Signal the user that there was an unexpected exception. 280 return; 281 } 282 283 if (element instanceof Stanza) { 284 Stanza stanza = (Stanza) element; 285 firePacketSendingListeners(stanza); 286 } 287 } 288 } catch (Exception exception) { 289 LOGGER.log(Level.WARNING, "BOSH writer thread threw", exception); 290 } finally { 291 writerThreadRunning = false; 292 notifyWaitingThreads(); 293 } 294 } 295 296 @Override 297 protected void sendInternal(TopLevelStreamElement element) throws NotConnectedException, InterruptedException { 298 throwNotConnectedExceptionIfAppropriate(); 299 try { 300 outgoingQueue.put(element); 301 } catch (InterruptedException e) { 302 throwNotConnectedExceptionIfAppropriate(); 303 // If the method above did not throw, then the sending thread was interrupted 304 throw e; 305 } 306 } 307 308 @Override 309 protected void sendNonBlockingInternal(TopLevelStreamElement element) 310 throws NotConnectedException, OutgoingQueueFullException { 311 throwNotConnectedExceptionIfAppropriate(); 312 boolean enqueued = outgoingQueue.offer(element); 313 if (!enqueued) { 314 throwNotConnectedExceptionIfAppropriate(); 315 throw new OutgoingQueueFullException(); 316 } 317 } 318 319 @Override 320 public InetAddress getLocalAddress() { 321 return null; 322 } 323 324 @Override 325 protected void shutdown() { 326 instantShutdown(); 327 } 328 329 @Override 330 public void instantShutdown() { 331 outgoingQueue.shutdown(); 332 333 try { 334 boolean writerThreadTerminated = waitFor(() -> !writerThreadRunning); 335 if (!writerThreadTerminated) { 336 LOGGER.severe("Writer thread of " + this + " did not terminate timely"); 337 } 338 } catch (InterruptedException e) { 339 LOGGER.log(Level.FINE, "Interrupted while waiting for writer thread to terminate", e); 340 } 341 342 if (client != null) { 343 try { 344 client.disconnect(); 345 } catch (Exception e) { 346 LOGGER.log(Level.WARNING, "shutdown", e); 347 } 348 } 349 350 setWasAuthenticated(); 351 sessionID = null; 352 done = true; 353 authenticated = false; 354 connected = false; 355 isFirstInitialization = false; 356 client = null; 357 358 // Close down the readers and writers. 359 CloseableUtil.maybeClose(readerPipe, LOGGER); 360 CloseableUtil.maybeClose(reader, LOGGER); 361 CloseableUtil.maybeClose(writer, LOGGER); 362 363 // set readerConsumer = null before reader to avoid NPE reference 364 readerConsumer = null; 365 readerPipe = null; 366 reader = null; 367 writer = null; 368 } 369 370 /** 371 * Send a HTTP request to the connection manager with the provided body element. 372 * 373 * @param body the body which will be sent. 374 * @throws BOSHException if an BOSH (Bidirectional-streams Over Synchronous HTTP, XEP-0124) related error occurs 375 */ 376 protected void send(ComposableBody body) throws BOSHException { 377 if (!connected) { 378 throw new IllegalStateException("Not connected to a server!"); 379 } 380 if (body == null) { 381 throw new NullPointerException("Body mustn't be null!"); 382 } 383 if (sessionID != null) { 384 body = body.rebuild().setAttribute( 385 BodyQName.create(BOSH_URI, "sid"), sessionID).build(); 386 } 387 client.send(body); 388 } 389 390 /** 391 * Initialize the SmackDebugger which allows to log and debug XML traffic. 392 */ 393 @Override 394 protected void initDebugger() { 395 // TODO: Maybe we want to extend the SmackDebugger for simplification 396 // and a performance boost. 397 398 // Initialize a empty writer which discards all data. 399 writer = new Writer() { 400 @Override 401 public void write(char[] cbuf, int off, int len) { 402 /* ignore */ } 403 404 @Override 405 public void close() { 406 /* ignore */ } 407 408 @Override 409 public void flush() { 410 /* ignore */ } 411 }; 412 413 // Initialize a pipe for received raw data. 414 try { 415 readerPipe = new PipedWriter(); 416 reader = new PipedReader(readerPipe); 417 } 418 catch (IOException e) { 419 // Ignore 420 } 421 422 // Call the method from the parent class which initializes the debugger. 423 super.initDebugger(); 424 425 // Add listeners for the received and sent raw data. 426 client.addBOSHClientResponseListener(new BOSHClientResponseListener() { 427 @Override 428 public void responseReceived(BOSHMessageEvent event) { 429 if (event.getBody() != null) { 430 try { 431 readerPipe.write(event.getBody().toXML()); 432 readerPipe.flush(); 433 } catch (Exception e) { 434 // Ignore 435 } 436 } 437 } 438 }); 439 client.addBOSHClientRequestListener(new BOSHClientRequestListener() { 440 @Override 441 public void requestSent(BOSHMessageEvent event) { 442 if (event.getBody() != null) { 443 try { 444 writer.write(event.getBody().toXML()); 445 // Fix all BOSH sent debug messages not shown 446 writer.flush(); 447 } catch (Exception e) { 448 // Ignore 449 } 450 } 451 } 452 }); 453 454 // Create and start a thread which discards all read data. 455 readerConsumer = new Thread() { 456 private Thread thread = this; 457 private int bufferLength = 1024; 458 459 @Override 460 public void run() { 461 try { 462 char[] cbuf = new char[bufferLength]; 463 while (readerConsumer == thread && !done) { 464 reader.read(cbuf, 0, bufferLength); 465 } 466 } catch (IOException e) { 467 // Ignore 468 } 469 } 470 }; 471 readerConsumer.setDaemon(true); 472 readerConsumer.start(); 473 } 474 475 @Override 476 protected void afterSaslAuthenticationSuccess() 477 throws NotConnectedException, InterruptedException, SmackWrappedException { 478 // XMPP over BOSH is unusual when it comes to SASL authentication: Instead of sending a new stream open, it 479 // requires a special XML element ot be send after successful SASL authentication. 480 // See XEP-0206 ยง 5., especially the following is example 5 of XEP-0206. 481 ComposableBody composeableBody = ComposableBody.builder() 482 .setNamespaceDefinition("xmpp", XMPPBOSHConnection.XMPP_BOSH_NS) 483 .setAttribute(BodyQName.createWithPrefix(XMPPBOSHConnection.XMPP_BOSH_NS, "restart", "xmpp"), "true") 484 .setAttribute(BodyQName.create(XMPPBOSHConnection.BOSH_URI, "to"), getXMPPServiceDomain().toString()) 485 .setAttribute(BodyQName.create(BOSH_URI, "sid"), sessionID) 486 .build(); 487 488 try { 489 client.send(composeableBody); 490 } catch (BOSHException e) { 491 // jbosh's exception API does not really match the one of Smack. 492 throw new SmackException.SmackWrappedException(e); 493 } 494 } 495 496 /** 497 * A listener class which listen for a successfully established connection 498 * and connection errors and notifies the BOSHConnection. 499 * 500 * @author Guenther Niess 501 */ 502 private class BOSHConnectionListener implements BOSHClientConnListener { 503 504 /** 505 * Notify the BOSHConnection about connection state changes. 506 * Process the connection listeners and try to login if the 507 * connection was formerly authenticated and is now reconnected. 508 */ 509 @Override 510 public void connectionEvent(BOSHClientConnEvent connEvent) { 511 try { 512 if (connEvent.isConnected()) { 513 connected = true; 514 if (isFirstInitialization) { 515 isFirstInitialization = false; 516 } 517 else { 518 if (wasAuthenticated) { 519 try { 520 login(); 521 } 522 catch (Exception e) { 523 throw new RuntimeException(e); 524 } 525 } 526 } 527 } 528 else { 529 if (connEvent.isError()) { 530 // TODO Check why jbosh's getCause returns Throwable here. This is very 531 // unusual and should be avoided if possible 532 Throwable cause = connEvent.getCause(); 533 Exception e; 534 if (cause instanceof Exception) { 535 e = (Exception) cause; 536 } else { 537 e = new Exception(cause); 538 } 539 notifyConnectionError(e); 540 } 541 connected = false; 542 } 543 } 544 finally { 545 notified = true; 546 synchronized (XMPPBOSHConnection.this) { 547 XMPPBOSHConnection.this.notifyAll(); 548 } 549 } 550 } 551 } 552 553 /** 554 * Listens for XML traffic from the BOSH connection manager and parses it into 555 * stanza objects. 556 * 557 * @author Guenther Niess 558 */ 559 private class BOSHPacketReader implements BOSHClientResponseListener { 560 561 /** 562 * Parse the received packets and notify the corresponding connection. 563 * 564 * @param event the BOSH client response which includes the received packet. 565 */ 566 @Override 567 public void responseReceived(BOSHMessageEvent event) { 568 AbstractBody body = event.getBody(); 569 if (body != null) { 570 try { 571 if (sessionID == null) { 572 sessionID = body.getAttribute(BodyQName.create(XMPPBOSHConnection.BOSH_URI, "sid")); 573 } 574 if (streamId == null) { 575 streamId = body.getAttribute(BodyQName.create(XMPPBOSHConnection.BOSH_URI, "authid")); 576 } 577 final XmlPullParser parser = PacketParserUtils.getParserFor(body.toXML()); 578 579 XmlPullParser.Event eventType = parser.getEventType(); 580 do { 581 eventType = parser.next(); 582 switch (eventType) { 583 case START_ELEMENT: 584 String name = parser.getName(); 585 switch (name) { 586 case Message.ELEMENT: 587 case IQ.IQ_ELEMENT: 588 case Presence.ELEMENT: 589 parseAndProcessStanza(parser); 590 break; 591 case "features": 592 parseFeaturesAndNotify(parser); 593 break; 594 case "error": 595 // Some BOSH error isn't stream error. 596 if ("urn:ietf:params:xml:ns:xmpp-streams".equals(parser.getNamespace(null))) { 597 throw new StreamErrorException(PacketParserUtils.parseStreamError(parser)); 598 } else { 599 StanzaError stanzaError = PacketParserUtils.parseError(parser); 600 throw new XMPPException.XMPPErrorException(null, stanzaError); 601 } 602 default: 603 parseAndProcessNonza(parser); 604 break; 605 } 606 break; 607 default: 608 // Catch all for incomplete switch (MissingCasesInEnumSwitch) statement. 609 break; 610 } 611 } 612 while (eventType != XmlPullParser.Event.END_DOCUMENT); 613 } 614 catch (Exception e) { 615 if (isConnected()) { 616 notifyConnectionError(e); 617 } 618 } 619 } 620 } 621 } 622}