001/** 002 * 003 * Copyright 2009 Jive Software. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.jivesoftware.smack.bosh; 019 020import java.io.IOException; 021import java.io.PipedReader; 022import java.io.PipedWriter; 023import java.io.Writer; 024import java.net.InetAddress; 025import java.util.Map; 026import java.util.logging.Level; 027import java.util.logging.Logger; 028 029import org.jivesoftware.smack.AbstractXMPPConnection; 030import org.jivesoftware.smack.SmackException; 031import org.jivesoftware.smack.SmackException.GenericConnectionException; 032import org.jivesoftware.smack.SmackException.NotConnectedException; 033import org.jivesoftware.smack.SmackException.OutgoingQueueFullException; 034import org.jivesoftware.smack.SmackException.SmackWrappedException; 035import org.jivesoftware.smack.XMPPConnection; 036import org.jivesoftware.smack.XMPPException; 037import org.jivesoftware.smack.XMPPException.StreamErrorException; 038import org.jivesoftware.smack.packet.IQ; 039import org.jivesoftware.smack.packet.Message; 040import org.jivesoftware.smack.packet.Presence; 041import org.jivesoftware.smack.packet.Stanza; 042import org.jivesoftware.smack.packet.StanzaError; 043import org.jivesoftware.smack.packet.TopLevelStreamElement; 044import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown; 045import org.jivesoftware.smack.util.Async; 046import org.jivesoftware.smack.util.CloseableUtil; 047import org.jivesoftware.smack.util.PacketParserUtils; 048import org.jivesoftware.smack.xml.XmlPullParser; 049import org.jivesoftware.smack.xml.XmlPullParserException; 050 051import org.igniterealtime.jbosh.AbstractBody; 052import org.igniterealtime.jbosh.BOSHClient; 053import org.igniterealtime.jbosh.BOSHClientConfig; 054import org.igniterealtime.jbosh.BOSHClientConnEvent; 055import org.igniterealtime.jbosh.BOSHClientConnListener; 056import org.igniterealtime.jbosh.BOSHClientRequestListener; 057import org.igniterealtime.jbosh.BOSHClientResponseListener; 058import org.igniterealtime.jbosh.BOSHException; 059import org.igniterealtime.jbosh.BOSHMessageEvent; 060import org.igniterealtime.jbosh.BodyQName; 061import org.igniterealtime.jbosh.ComposableBody; 062import org.jxmpp.jid.DomainBareJid; 063import org.jxmpp.jid.parts.Resourcepart; 064 065/** 066 * Creates a connection to an XMPP server via HTTP binding. 067 * This is specified in the XEP-0206: XMPP Over BOSH. 068 * 069 * @see XMPPConnection 070 * @author Guenther Niess 071 */ 072public class XMPPBOSHConnection extends AbstractXMPPConnection { 073 private static final Logger LOGGER = Logger.getLogger(XMPPBOSHConnection.class.getName()); 074 075 /** 076 * The XMPP Over Bosh namespace. 077 */ 078 public static final String XMPP_BOSH_NS = "urn:xmpp:xbosh"; 079 080 /** 081 * The BOSH namespace from XEP-0124. 082 */ 083 public static final String BOSH_URI = "http://jabber.org/protocol/httpbind"; 084 085 /** 086 * The used BOSH client from the jbosh library. 087 */ 088 private BOSHClient client; 089 090 /** 091 * Holds the initial configuration used while creating the connection. 092 */ 093 @SuppressWarnings("HidingField") 094 private final BOSHConfiguration config; 095 096 private final ArrayBlockingQueueWithShutdown<TopLevelStreamElement> outgoingQueue = new ArrayBlockingQueueWithShutdown<>(100, true); 097 098 private Thread writerThread; 099 100 // Some flags which provides some info about the current state. 101 private boolean isFirstInitialization = true; 102 private boolean done = false; 103 104 // The readerPipe and consumer thread are used for the debugger. 105 private PipedWriter readerPipe; 106 private Thread readerConsumer; 107 108 /** 109 * The session ID for the BOSH session with the connection manager. 110 */ 111 protected String sessionID = null; 112 113 private boolean notified; 114 115 /** 116 * Create a HTTP Binding connection to an XMPP server. 117 * 118 * @param username the username to use. 119 * @param password the password to use. 120 * @param https true if you want to use SSL 121 * (e.g. false for http://domain.lt:7070/http-bind). 122 * @param host the hostname or IP address of the connection manager 123 * (e.g. domain.lt for http://domain.lt:7070/http-bind). 124 * @param port the port of the connection manager 125 * (e.g. 7070 for http://domain.lt:7070/http-bind). 126 * @param filePath the file which is described by the URL 127 * (e.g. /http-bind for http://domain.lt:7070/http-bind). 128 * @param xmppServiceDomain the XMPP service name 129 * (e.g. domain.lt for the user alice@domain.lt) 130 */ 131 public XMPPBOSHConnection(String username, String password, boolean https, String host, int port, String filePath, DomainBareJid xmppServiceDomain) { 132 this(BOSHConfiguration.builder().setUseHttps(https).setHost(host) 133 .setPort(port).setFile(filePath).setXmppDomain(xmppServiceDomain) 134 .setUsernameAndPassword(username, password).build()); 135 } 136 137 /** 138 * Create a HTTP Binding connection to an XMPP server. 139 * 140 * @param config The configuration which is used for this connection. 141 */ 142 public XMPPBOSHConnection(BOSHConfiguration config) { 143 super(config); 144 this.config = config; 145 } 146 147 @SuppressWarnings("deprecation") 148 @Override 149 protected void connectInternal() throws SmackException, InterruptedException { 150 done = false; 151 notified = false; 152 try { 153 // Ensure a clean starting state 154 if (client != null) { 155 client.close(); 156 client = null; 157 } 158 sessionID = null; 159 160 // Initialize BOSH client 161 BOSHClientConfig.Builder cfgBuilder = BOSHClientConfig.Builder 162 .create(config.getURI(), config.getXMPPServiceDomain().toString()); 163 if (config.isProxyEnabled()) { 164 cfgBuilder.setProxy(config.getProxyAddress(), config.getProxyPort()); 165 } 166 167 cfgBuilder.setCompressionEnabled(config.isCompressionEnabled()); 168 169 for (Map.Entry<String, String> h : config.getHttpHeaders().entrySet()) { 170 cfgBuilder.addHttpHeader(h.getKey(), h.getValue()); 171 } 172 173 client = BOSHClient.create(cfgBuilder.build()); 174 175 client.addBOSHClientConnListener(new BOSHConnectionListener()); 176 client.addBOSHClientResponseListener(new BOSHPacketReader()); 177 178 // Initialize the debugger 179 if (debugger != null) { 180 initDebugger(); 181 } 182 183 // Send the session creation request 184 client.send(ComposableBody.builder() 185 .setNamespaceDefinition("xmpp", XMPP_BOSH_NS) 186 .setAttribute(BodyQName.createWithPrefix(XMPP_BOSH_NS, "version", "xmpp"), "1.0") 187 .build()); 188 } catch (Exception e) { 189 throw new GenericConnectionException(e); 190 } 191 192 // Wait for the response from the server 193 synchronized (this) { 194 if (!connected) { 195 final long deadline = System.currentTimeMillis() + getReplyTimeout(); 196 while (!notified) { 197 final long now = System.currentTimeMillis(); 198 if (now >= deadline) break; 199 wait(deadline - now); 200 } 201 } 202 } 203 204 assert writerThread == null || !writerThread.isAlive(); 205 outgoingQueue.start(); 206 writerThread = Async.go(this::writeElements, this + " Writer"); 207 208 // If there is no feedback, throw an remote server timeout error 209 if (!connected && !done) { 210 done = true; 211 String errorMessage = "Timeout reached for the connection to " 212 + getHost() + ":" + getPort() + "."; 213 instantShutdown(); 214 throw new SmackException.SmackMessageException(errorMessage); 215 } 216 217 try { 218 XmlPullParser parser = PacketParserUtils.getParserFor( 219 "<stream:stream xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'/>"); 220 onStreamOpen(parser); 221 } catch (XmlPullParserException | IOException e) { 222 instantShutdown(); 223 throw new AssertionError("Failed to setup stream environment", e); 224 } 225 } 226 227 @Override 228 public boolean isSecureConnection() { 229 return config.isUsingHTTPS(); 230 } 231 232 @Override 233 public boolean isUsingCompression() { 234 // TODO: Implement compression 235 return false; 236 } 237 238 @Override 239 protected void loginInternal(String username, String password, Resourcepart resource) throws XMPPException, 240 SmackException, IOException, InterruptedException { 241 // Authenticate using SASL 242 authenticate(username, password, config.getAuthzid(), null); 243 244 bindResourceAndEstablishSession(resource); 245 246 afterSuccessfulLogin(false); 247 } 248 249 private volatile boolean writerThreadRunning; 250 251 private void writeElements() { 252 writerThreadRunning = true; 253 try { 254 while (true) { 255 TopLevelStreamElement element; 256 try { 257 element = outgoingQueue.take(); 258 } catch (InterruptedException e) { 259 LOGGER.log(Level.FINE, 260 "Writer thread exiting: Outgoing queue was shutdown as signalled by interrupted exception", 261 e); 262 return; 263 } 264 265 String xmlPayload = element.toXML(BOSH_URI).toString(); 266 ComposableBody.Builder composableBodyBuilder = ComposableBody.builder().setPayloadXML(xmlPayload); 267 if (sessionID != null) { 268 BodyQName qName = BodyQName.create(BOSH_URI, "sid"); 269 composableBodyBuilder.setAttribute(qName, sessionID); 270 } 271 272 ComposableBody composableBody = composableBodyBuilder.build(); 273 274 try { 275 client.send(composableBody); 276 } catch (BOSHException e) { 277 LOGGER.log(Level.WARNING, this + " received BOSHException in writer thread, connection broke!", e); 278 // TODO: Signal the user that there was an unexpected exception. 279 return; 280 } 281 282 if (element instanceof Stanza) { 283 Stanza stanza = (Stanza) element; 284 firePacketSendingListeners(stanza); 285 } 286 } 287 } catch (Exception exception) { 288 LOGGER.log(Level.WARNING, "BOSH writer thread threw", exception); 289 } finally { 290 writerThreadRunning = false; 291 notifyWaitingThreads(); 292 } 293 } 294 295 @Override 296 protected void sendInternal(TopLevelStreamElement element) throws NotConnectedException, InterruptedException { 297 throwNotConnectedExceptionIfAppropriate(); 298 try { 299 outgoingQueue.put(element); 300 } catch (InterruptedException e) { 301 throwNotConnectedExceptionIfAppropriate(); 302 // If the method above did not throw, then the sending thread was interrupted 303 throw e; 304 } 305 } 306 307 @Override 308 protected void sendNonBlockingInternal(TopLevelStreamElement element) 309 throws NotConnectedException, OutgoingQueueFullException { 310 throwNotConnectedExceptionIfAppropriate(); 311 boolean enqueued = outgoingQueue.offer(element); 312 if (!enqueued) { 313 throwNotConnectedExceptionIfAppropriate(); 314 throw new OutgoingQueueFullException(); 315 } 316 } 317 318 @Override 319 public InetAddress getLocalAddress() { 320 return null; 321 } 322 323 @Override 324 protected void shutdown() { 325 instantShutdown(); 326 } 327 328 @Override 329 public void instantShutdown() { 330 outgoingQueue.shutdown(); 331 332 try { 333 boolean writerThreadTerminated = waitFor(() -> !writerThreadRunning); 334 if (!writerThreadTerminated) { 335 LOGGER.severe("Writer thread of " + this + " did not terminate timely"); 336 } 337 } catch (InterruptedException e) { 338 LOGGER.log(Level.FINE, "Interrupted while waiting for writer thread to terminate", e); 339 } 340 341 if (client != null) { 342 try { 343 client.disconnect(); 344 } catch (Exception e) { 345 LOGGER.log(Level.WARNING, "shutdown", e); 346 } 347 } 348 349 setWasAuthenticated(); 350 sessionID = null; 351 done = true; 352 authenticated = false; 353 connected = false; 354 isFirstInitialization = false; 355 client = null; 356 357 // Close down the readers and writers. 358 CloseableUtil.maybeClose(readerPipe, LOGGER); 359 CloseableUtil.maybeClose(reader, LOGGER); 360 CloseableUtil.maybeClose(writer, LOGGER); 361 362 readerPipe = null; 363 reader = null; 364 writer = null; 365 readerConsumer = null; 366 } 367 368 /** 369 * Send a HTTP request to the connection manager with the provided body element. 370 * 371 * @param body the body which will be sent. 372 * @throws BOSHException if an BOSH (Bidirectional-streams Over Synchronous HTTP, XEP-0124) related error occurs 373 */ 374 protected void send(ComposableBody body) throws BOSHException { 375 if (!connected) { 376 throw new IllegalStateException("Not connected to a server!"); 377 } 378 if (body == null) { 379 throw new NullPointerException("Body mustn't be null!"); 380 } 381 if (sessionID != null) { 382 body = body.rebuild().setAttribute( 383 BodyQName.create(BOSH_URI, "sid"), sessionID).build(); 384 } 385 client.send(body); 386 } 387 388 /** 389 * Initialize the SmackDebugger which allows to log and debug XML traffic. 390 */ 391 @Override 392 protected void initDebugger() { 393 // TODO: Maybe we want to extend the SmackDebugger for simplification 394 // and a performance boost. 395 396 // Initialize a empty writer which discards all data. 397 writer = new Writer() { 398 @Override 399 public void write(char[] cbuf, int off, int len) { 400 /* ignore */ } 401 402 @Override 403 public void close() { 404 /* ignore */ } 405 406 @Override 407 public void flush() { 408 /* ignore */ } 409 }; 410 411 // Initialize a pipe for received raw data. 412 try { 413 readerPipe = new PipedWriter(); 414 reader = new PipedReader(readerPipe); 415 } 416 catch (IOException e) { 417 // Ignore 418 } 419 420 // Call the method from the parent class which initializes the debugger. 421 super.initDebugger(); 422 423 // Add listeners for the received and sent raw data. 424 client.addBOSHClientResponseListener(new BOSHClientResponseListener() { 425 @Override 426 public void responseReceived(BOSHMessageEvent event) { 427 if (event.getBody() != null) { 428 try { 429 readerPipe.write(event.getBody().toXML()); 430 readerPipe.flush(); 431 } catch (Exception e) { 432 // Ignore 433 } 434 } 435 } 436 }); 437 client.addBOSHClientRequestListener(new BOSHClientRequestListener() { 438 @Override 439 public void requestSent(BOSHMessageEvent event) { 440 if (event.getBody() != null) { 441 try { 442 writer.write(event.getBody().toXML()); 443 } catch (Exception e) { 444 // Ignore 445 } 446 } 447 } 448 }); 449 450 // Create and start a thread which discards all read data. 451 readerConsumer = new Thread() { 452 private Thread thread = this; 453 private int bufferLength = 1024; 454 455 @Override 456 public void run() { 457 try { 458 char[] cbuf = new char[bufferLength]; 459 while (readerConsumer == thread && !done) { 460 reader.read(cbuf, 0, bufferLength); 461 } 462 } catch (IOException e) { 463 // Ignore 464 } 465 } 466 }; 467 readerConsumer.setDaemon(true); 468 readerConsumer.start(); 469 } 470 471 @Override 472 protected void afterSaslAuthenticationSuccess() 473 throws NotConnectedException, InterruptedException, SmackWrappedException { 474 // XMPP over BOSH is unusual when it comes to SASL authentication: Instead of sending a new stream open, it 475 // requires a special XML element ot be send after successful SASL authentication. 476 // See XEP-0206 ยง 5., especially the following is example 5 of XEP-0206. 477 ComposableBody composeableBody = ComposableBody.builder() 478 .setNamespaceDefinition("xmpp", XMPPBOSHConnection.XMPP_BOSH_NS) 479 .setAttribute(BodyQName.createWithPrefix(XMPPBOSHConnection.XMPP_BOSH_NS, "restart", "xmpp"), "true") 480 .setAttribute(BodyQName.create(XMPPBOSHConnection.BOSH_URI, "to"), getXMPPServiceDomain().toString()) 481 .setAttribute(BodyQName.create(BOSH_URI, "sid"), sessionID) 482 .build(); 483 484 try { 485 client.send(composeableBody); 486 } catch (BOSHException e) { 487 // jbosh's exception API does not really match the one of Smack. 488 throw new SmackException.SmackWrappedException(e); 489 } 490 } 491 492 /** 493 * A listener class which listen for a successfully established connection 494 * and connection errors and notifies the BOSHConnection. 495 * 496 * @author Guenther Niess 497 */ 498 private class BOSHConnectionListener implements BOSHClientConnListener { 499 500 /** 501 * Notify the BOSHConnection about connection state changes. 502 * Process the connection listeners and try to login if the 503 * connection was formerly authenticated and is now reconnected. 504 */ 505 @Override 506 public void connectionEvent(BOSHClientConnEvent connEvent) { 507 try { 508 if (connEvent.isConnected()) { 509 connected = true; 510 if (isFirstInitialization) { 511 isFirstInitialization = false; 512 } 513 else { 514 if (wasAuthenticated) { 515 try { 516 login(); 517 } 518 catch (Exception e) { 519 throw new RuntimeException(e); 520 } 521 } 522 } 523 } 524 else { 525 if (connEvent.isError()) { 526 // TODO Check why jbosh's getCause returns Throwable here. This is very 527 // unusual and should be avoided if possible 528 Throwable cause = connEvent.getCause(); 529 Exception e; 530 if (cause instanceof Exception) { 531 e = (Exception) cause; 532 } else { 533 e = new Exception(cause); 534 } 535 notifyConnectionError(e); 536 } 537 connected = false; 538 } 539 } 540 finally { 541 notified = true; 542 synchronized (XMPPBOSHConnection.this) { 543 XMPPBOSHConnection.this.notifyAll(); 544 } 545 } 546 } 547 } 548 549 /** 550 * Listens for XML traffic from the BOSH connection manager and parses it into 551 * stanza objects. 552 * 553 * @author Guenther Niess 554 */ 555 private class BOSHPacketReader implements BOSHClientResponseListener { 556 557 /** 558 * Parse the received packets and notify the corresponding connection. 559 * 560 * @param event the BOSH client response which includes the received packet. 561 */ 562 @Override 563 public void responseReceived(BOSHMessageEvent event) { 564 AbstractBody body = event.getBody(); 565 if (body != null) { 566 try { 567 if (sessionID == null) { 568 sessionID = body.getAttribute(BodyQName.create(XMPPBOSHConnection.BOSH_URI, "sid")); 569 } 570 if (streamId == null) { 571 streamId = body.getAttribute(BodyQName.create(XMPPBOSHConnection.BOSH_URI, "authid")); 572 } 573 final XmlPullParser parser = PacketParserUtils.getParserFor(body.toXML()); 574 575 XmlPullParser.Event eventType = parser.getEventType(); 576 do { 577 eventType = parser.next(); 578 switch (eventType) { 579 case START_ELEMENT: 580 String name = parser.getName(); 581 switch (name) { 582 case Message.ELEMENT: 583 case IQ.IQ_ELEMENT: 584 case Presence.ELEMENT: 585 parseAndProcessStanza(parser); 586 break; 587 case "features": 588 parseFeaturesAndNotify(parser); 589 break; 590 case "error": 591 // Some BOSH error isn't stream error. 592 if ("urn:ietf:params:xml:ns:xmpp-streams".equals(parser.getNamespace(null))) { 593 throw new StreamErrorException(PacketParserUtils.parseStreamError(parser)); 594 } else { 595 StanzaError stanzaError = PacketParserUtils.parseError(parser); 596 throw new XMPPException.XMPPErrorException(null, stanzaError); 597 } 598 default: 599 parseAndProcessNonza(parser); 600 break; 601 } 602 break; 603 default: 604 // Catch all for incomplete switch (MissingCasesInEnumSwitch) statement. 605 break; 606 } 607 } 608 while (eventType != XmlPullParser.Event.END_DOCUMENT); 609 } 610 catch (Exception e) { 611 if (isConnected()) { 612 notifyConnectionError(e); 613 } 614 } 615 } 616 } 617 } 618}