001/** 002 * 003 * Copyright the original author or authors 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smackx.bytestreams.ibb; 018 019import java.io.IOException; 020import java.io.InputStream; 021import java.io.OutputStream; 022import java.net.SocketTimeoutException; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.LinkedBlockingQueue; 025import java.util.concurrent.TimeUnit; 026 027import org.jivesoftware.smack.SmackException.NotConnectedException; 028import org.jivesoftware.smack.XMPPConnection; 029import org.jivesoftware.smack.PacketListener; 030import org.jivesoftware.smack.filter.AndFilter; 031import org.jivesoftware.smack.filter.PacketFilter; 032import org.jivesoftware.smack.filter.PacketTypeFilter; 033import org.jivesoftware.smack.packet.IQ; 034import org.jivesoftware.smack.packet.Message; 035import org.jivesoftware.smack.packet.Packet; 036import org.jivesoftware.smack.packet.PacketExtension; 037import org.jivesoftware.smack.packet.XMPPError; 038import org.jivesoftware.smack.util.StringUtils; 039import org.jivesoftware.smackx.bytestreams.BytestreamSession; 040import org.jivesoftware.smackx.bytestreams.ibb.packet.Close; 041import org.jivesoftware.smackx.bytestreams.ibb.packet.Data; 042import org.jivesoftware.smackx.bytestreams.ibb.packet.DataPacketExtension; 043import org.jivesoftware.smackx.bytestreams.ibb.packet.Open; 044 045/** 046 * InBandBytestreamSession class represents an In-Band Bytestream session. 047 * <p> 048 * In-band bytestreams are bidirectional and this session encapsulates the streams for both 049 * directions. 050 * <p> 051 * Note that closing the In-Band Bytestream session will close both streams. If both streams are 052 * closed individually the session will be closed automatically once the second stream is closed. 053 * Use the {@link #setCloseBothStreamsEnabled(boolean)} method if both streams should be closed 054 * automatically if one of them is closed. 055 * 056 * @author Henning Staib 057 */ 058public class InBandBytestreamSession implements BytestreamSession { 059 060 /* XMPP connection */ 061 private final XMPPConnection connection; 062 063 /* the In-Band Bytestream open request for this session */ 064 private final Open byteStreamRequest; 065 066 /* 067 * the input stream for this session (either IQIBBInputStream or MessageIBBInputStream) 068 */ 069 private IBBInputStream inputStream; 070 071 /* 072 * the output stream for this session (either IQIBBOutputStream or MessageIBBOutputStream) 073 */ 074 private IBBOutputStream outputStream; 075 076 /* JID of the remote peer */ 077 private String remoteJID; 078 079 /* flag to close both streams if one of them is closed */ 080 private boolean closeBothStreamsEnabled = false; 081 082 /* flag to indicate if session is closed */ 083 private boolean isClosed = false; 084 085 /** 086 * Constructor. 087 * 088 * @param connection the XMPP connection 089 * @param byteStreamRequest the In-Band Bytestream open request for this session 090 * @param remoteJID JID of the remote peer 091 */ 092 protected InBandBytestreamSession(XMPPConnection connection, Open byteStreamRequest, 093 String remoteJID) { 094 this.connection = connection; 095 this.byteStreamRequest = byteStreamRequest; 096 this.remoteJID = remoteJID; 097 098 // initialize streams dependent to the uses stanza type 099 switch (byteStreamRequest.getStanza()) { 100 case IQ: 101 this.inputStream = new IQIBBInputStream(); 102 this.outputStream = new IQIBBOutputStream(); 103 break; 104 case MESSAGE: 105 this.inputStream = new MessageIBBInputStream(); 106 this.outputStream = new MessageIBBOutputStream(); 107 break; 108 } 109 110 } 111 112 public InputStream getInputStream() { 113 return this.inputStream; 114 } 115 116 public OutputStream getOutputStream() { 117 return this.outputStream; 118 } 119 120 public int getReadTimeout() { 121 return this.inputStream.readTimeout; 122 } 123 124 public void setReadTimeout(int timeout) { 125 if (timeout < 0) { 126 throw new IllegalArgumentException("Timeout must be >= 0"); 127 } 128 this.inputStream.readTimeout = timeout; 129 } 130 131 /** 132 * Returns whether both streams should be closed automatically if one of the streams is closed. 133 * Default is <code>false</code>. 134 * 135 * @return <code>true</code> if both streams will be closed if one of the streams is closed, 136 * <code>false</code> if both streams can be closed independently. 137 */ 138 public boolean isCloseBothStreamsEnabled() { 139 return closeBothStreamsEnabled; 140 } 141 142 /** 143 * Sets whether both streams should be closed automatically if one of the streams is closed. 144 * Default is <code>false</code>. 145 * 146 * @param closeBothStreamsEnabled <code>true</code> if both streams should be closed if one of 147 * the streams is closed, <code>false</code> if both streams should be closed 148 * independently 149 */ 150 public void setCloseBothStreamsEnabled(boolean closeBothStreamsEnabled) { 151 this.closeBothStreamsEnabled = closeBothStreamsEnabled; 152 } 153 154 public void close() throws IOException { 155 closeByLocal(true); // close input stream 156 closeByLocal(false); // close output stream 157 } 158 159 /** 160 * This method is invoked if a request to close the In-Band Bytestream has been received. 161 * 162 * @param closeRequest the close request from the remote peer 163 * @throws NotConnectedException 164 */ 165 protected void closeByPeer(Close closeRequest) throws NotConnectedException { 166 167 /* 168 * close streams without flushing them, because stream is already considered closed on the 169 * remote peers side 170 */ 171 this.inputStream.closeInternal(); 172 this.inputStream.cleanup(); 173 this.outputStream.closeInternal(false); 174 175 // acknowledge close request 176 IQ confirmClose = IQ.createResultIQ(closeRequest); 177 this.connection.sendPacket(confirmClose); 178 179 } 180 181 /** 182 * This method is invoked if one of the streams has been closed locally, if an error occurred 183 * locally or if the whole session should be closed. 184 * 185 * @throws IOException if an error occurs while sending the close request 186 */ 187 protected synchronized void closeByLocal(boolean in) throws IOException { 188 if (this.isClosed) { 189 return; 190 } 191 192 if (this.closeBothStreamsEnabled) { 193 this.inputStream.closeInternal(); 194 this.outputStream.closeInternal(true); 195 } 196 else { 197 if (in) { 198 this.inputStream.closeInternal(); 199 } 200 else { 201 // close stream but try to send any data left 202 this.outputStream.closeInternal(true); 203 } 204 } 205 206 if (this.inputStream.isClosed && this.outputStream.isClosed) { 207 this.isClosed = true; 208 209 // send close request 210 Close close = new Close(this.byteStreamRequest.getSessionID()); 211 close.setTo(this.remoteJID); 212 try { 213 connection.createPacketCollectorAndSend(close).nextResultOrThrow(); 214 } 215 catch (Exception e) { 216 // Sadly we are unable to use the IOException(Throwable) constructor because this 217 // constructor is only supported from Android API 9 on. 218 IOException ioException = new IOException(); 219 ioException.initCause(e); 220 throw ioException; 221 } 222 223 this.inputStream.cleanup(); 224 225 // remove session from manager 226 InBandBytestreamManager.getByteStreamManager(this.connection).getSessions().remove(this); 227 } 228 229 } 230 231 /** 232 * IBBInputStream class is the base implementation of an In-Band Bytestream input stream. 233 * Subclasses of this input stream must provide a packet listener along with a packet filter to 234 * collect the In-Band Bytestream data packets. 235 */ 236 private abstract class IBBInputStream extends InputStream { 237 238 /* the data packet listener to fill the data queue */ 239 private final PacketListener dataPacketListener; 240 241 /* queue containing received In-Band Bytestream data packets */ 242 protected final BlockingQueue<DataPacketExtension> dataQueue = new LinkedBlockingQueue<DataPacketExtension>(); 243 244 /* buffer containing the data from one data packet */ 245 private byte[] buffer; 246 247 /* pointer to the next byte to read from buffer */ 248 private int bufferPointer = -1; 249 250 /* data packet sequence (range from 0 to 65535) */ 251 private long seq = -1; 252 253 /* flag to indicate if input stream is closed */ 254 private boolean isClosed = false; 255 256 /* flag to indicate if close method was invoked */ 257 private boolean closeInvoked = false; 258 259 /* timeout for read operations */ 260 private int readTimeout = 0; 261 262 /** 263 * Constructor. 264 */ 265 public IBBInputStream() { 266 // add data packet listener to connection 267 this.dataPacketListener = getDataPacketListener(); 268 connection.addPacketListener(this.dataPacketListener, getDataPacketFilter()); 269 } 270 271 /** 272 * Returns the packet listener that processes In-Band Bytestream data packets. 273 * 274 * @return the data packet listener 275 */ 276 protected abstract PacketListener getDataPacketListener(); 277 278 /** 279 * Returns the packet filter that accepts In-Band Bytestream data packets. 280 * 281 * @return the data packet filter 282 */ 283 protected abstract PacketFilter getDataPacketFilter(); 284 285 public synchronized int read() throws IOException { 286 checkClosed(); 287 288 // if nothing read yet or whole buffer has been read fill buffer 289 if (bufferPointer == -1 || bufferPointer >= buffer.length) { 290 // if no data available and stream was closed return -1 291 if (!loadBuffer()) { 292 return -1; 293 } 294 } 295 296 // return byte and increment buffer pointer 297 return ((int) buffer[bufferPointer++]) & 0xff; 298 } 299 300 public synchronized int read(byte[] b, int off, int len) throws IOException { 301 if (b == null) { 302 throw new NullPointerException(); 303 } 304 else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) 305 || ((off + len) < 0)) { 306 throw new IndexOutOfBoundsException(); 307 } 308 else if (len == 0) { 309 return 0; 310 } 311 312 checkClosed(); 313 314 // if nothing read yet or whole buffer has been read fill buffer 315 if (bufferPointer == -1 || bufferPointer >= buffer.length) { 316 // if no data available and stream was closed return -1 317 if (!loadBuffer()) { 318 return -1; 319 } 320 } 321 322 // if more bytes wanted than available return all available 323 int bytesAvailable = buffer.length - bufferPointer; 324 if (len > bytesAvailable) { 325 len = bytesAvailable; 326 } 327 328 System.arraycopy(buffer, bufferPointer, b, off, len); 329 bufferPointer += len; 330 return len; 331 } 332 333 public synchronized int read(byte[] b) throws IOException { 334 return read(b, 0, b.length); 335 } 336 337 /** 338 * This method blocks until a data packet is received, the stream is closed or the current 339 * thread is interrupted. 340 * 341 * @return <code>true</code> if data was received, otherwise <code>false</code> 342 * @throws IOException if data packets are out of sequence 343 */ 344 private synchronized boolean loadBuffer() throws IOException { 345 346 // wait until data is available or stream is closed 347 DataPacketExtension data = null; 348 try { 349 if (this.readTimeout == 0) { 350 while (data == null) { 351 if (isClosed && this.dataQueue.isEmpty()) { 352 return false; 353 } 354 data = this.dataQueue.poll(1000, TimeUnit.MILLISECONDS); 355 } 356 } 357 else { 358 data = this.dataQueue.poll(this.readTimeout, TimeUnit.MILLISECONDS); 359 if (data == null) { 360 throw new SocketTimeoutException(); 361 } 362 } 363 } 364 catch (InterruptedException e) { 365 // Restore the interrupted status 366 Thread.currentThread().interrupt(); 367 return false; 368 } 369 370 // handle sequence overflow 371 if (this.seq == 65535) { 372 this.seq = -1; 373 } 374 375 // check if data packets sequence is successor of last seen sequence 376 long seq = data.getSeq(); 377 if (seq - 1 != this.seq) { 378 // packets out of order; close stream/session 379 InBandBytestreamSession.this.close(); 380 throw new IOException("Packets out of sequence"); 381 } 382 else { 383 this.seq = seq; 384 } 385 386 // set buffer to decoded data 387 buffer = data.getDecodedData(); 388 bufferPointer = 0; 389 return true; 390 } 391 392 /** 393 * Checks if this stream is closed and throws an IOException if necessary 394 * 395 * @throws IOException if stream is closed and no data should be read anymore 396 */ 397 private void checkClosed() throws IOException { 398 // Throw an exception if, and only if, this stream has been already 399 // closed by the user using the close() method 400 if (closeInvoked) { 401 // clear data queue in case additional data was received after stream was closed 402 this.dataQueue.clear(); 403 throw new IOException("Stream is closed"); 404 } 405 } 406 407 public boolean markSupported() { 408 return false; 409 } 410 411 public void close() throws IOException { 412 if (closeInvoked) { 413 return; 414 } 415 416 this.closeInvoked = true; 417 418 InBandBytestreamSession.this.closeByLocal(true); 419 } 420 421 /** 422 * This method sets the close flag and removes the data packet listener. 423 */ 424 private void closeInternal() { 425 if (isClosed) { 426 return; 427 } 428 isClosed = true; 429 } 430 431 /** 432 * Invoked if the session is closed. 433 */ 434 private void cleanup() { 435 connection.removePacketListener(this.dataPacketListener); 436 } 437 438 } 439 440 /** 441 * IQIBBInputStream class implements IBBInputStream to be used with IQ stanzas encapsulating the 442 * data packets. 443 */ 444 private class IQIBBInputStream extends IBBInputStream { 445 446 protected PacketListener getDataPacketListener() { 447 return new PacketListener() { 448 449 private long lastSequence = -1; 450 451 public void processPacket(Packet packet) throws NotConnectedException { 452 // get data packet extension 453 DataPacketExtension data = (DataPacketExtension) packet.getExtension( 454 DataPacketExtension.ELEMENT_NAME, 455 InBandBytestreamManager.NAMESPACE); 456 457 /* 458 * check if sequence was not used already (see XEP-0047 Section 2.2) 459 */ 460 if (data.getSeq() <= this.lastSequence) { 461 IQ unexpectedRequest = IQ.createErrorResponse((IQ) packet, new XMPPError( 462 XMPPError.Condition.unexpected_request)); 463 connection.sendPacket(unexpectedRequest); 464 return; 465 466 } 467 468 // check if encoded data is valid (see XEP-0047 Section 2.2) 469 if (data.getDecodedData() == null) { 470 // data is invalid; respond with bad-request error 471 IQ badRequest = IQ.createErrorResponse((IQ) packet, new XMPPError( 472 XMPPError.Condition.bad_request)); 473 connection.sendPacket(badRequest); 474 return; 475 } 476 477 // data is valid; add to data queue 478 dataQueue.offer(data); 479 480 // confirm IQ 481 IQ confirmData = IQ.createResultIQ((IQ) packet); 482 connection.sendPacket(confirmData); 483 484 // set last seen sequence 485 this.lastSequence = data.getSeq(); 486 if (this.lastSequence == 65535) { 487 this.lastSequence = -1; 488 } 489 490 } 491 492 }; 493 } 494 495 protected PacketFilter getDataPacketFilter() { 496 /* 497 * filter all IQ stanzas having type 'SET' (represented by Data class), containing a 498 * data packet extension, matching session ID and recipient 499 */ 500 return new AndFilter(new PacketTypeFilter(Data.class), new IBBDataPacketFilter()); 501 } 502 503 } 504 505 /** 506 * MessageIBBInputStream class implements IBBInputStream to be used with message stanzas 507 * encapsulating the data packets. 508 */ 509 private class MessageIBBInputStream extends IBBInputStream { 510 511 protected PacketListener getDataPacketListener() { 512 return new PacketListener() { 513 514 public void processPacket(Packet packet) { 515 // get data packet extension 516 DataPacketExtension data = (DataPacketExtension) packet.getExtension( 517 DataPacketExtension.ELEMENT_NAME, 518 InBandBytestreamManager.NAMESPACE); 519 520 // check if encoded data is valid 521 if (data.getDecodedData() == null) { 522 /* 523 * TODO once a majority of XMPP server implementation support XEP-0079 524 * Advanced Message Processing the invalid message could be answered with an 525 * appropriate error. For now we just ignore the packet. Subsequent packets 526 * with an increased sequence will cause the input stream to close the 527 * stream/session. 528 */ 529 return; 530 } 531 532 // data is valid; add to data queue 533 dataQueue.offer(data); 534 535 // TODO confirm packet once XMPP servers support XEP-0079 536 } 537 538 }; 539 } 540 541 @Override 542 protected PacketFilter getDataPacketFilter() { 543 /* 544 * filter all message stanzas containing a data packet extension, matching session ID 545 * and recipient 546 */ 547 return new AndFilter(new PacketTypeFilter(Message.class), new IBBDataPacketFilter()); 548 } 549 550 } 551 552 /** 553 * IBBDataPacketFilter class filters all packets from the remote peer of this session, 554 * containing an In-Band Bytestream data packet extension whose session ID matches this sessions 555 * ID. 556 */ 557 private class IBBDataPacketFilter implements PacketFilter { 558 559 public boolean accept(Packet packet) { 560 // sender equals remote peer 561 if (!packet.getFrom().equalsIgnoreCase(remoteJID)) { 562 return false; 563 } 564 565 // stanza contains data packet extension 566 PacketExtension packetExtension = packet.getExtension(DataPacketExtension.ELEMENT_NAME, 567 InBandBytestreamManager.NAMESPACE); 568 if (packetExtension == null || !(packetExtension instanceof DataPacketExtension)) { 569 return false; 570 } 571 572 // session ID equals this session ID 573 DataPacketExtension data = (DataPacketExtension) packetExtension; 574 if (!data.getSessionID().equals(byteStreamRequest.getSessionID())) { 575 return false; 576 } 577 578 return true; 579 } 580 581 } 582 583 /** 584 * IBBOutputStream class is the base implementation of an In-Band Bytestream output stream. 585 * Subclasses of this output stream must provide a method to send data over XMPP stream. 586 */ 587 private abstract class IBBOutputStream extends OutputStream { 588 589 /* buffer with the size of this sessions block size */ 590 protected final byte[] buffer; 591 592 /* pointer to next byte to write to buffer */ 593 protected int bufferPointer = 0; 594 595 /* data packet sequence (range from 0 to 65535) */ 596 protected long seq = 0; 597 598 /* flag to indicate if output stream is closed */ 599 protected boolean isClosed = false; 600 601 /** 602 * Constructor. 603 */ 604 public IBBOutputStream() { 605 this.buffer = new byte[byteStreamRequest.getBlockSize()]; 606 } 607 608 /** 609 * Writes the given data packet to the XMPP stream. 610 * 611 * @param data the data packet 612 * @throws IOException if an I/O error occurred while sending or if the stream is closed 613 * @throws NotConnectedException 614 */ 615 protected abstract void writeToXML(DataPacketExtension data) throws IOException, NotConnectedException; 616 617 public synchronized void write(int b) throws IOException { 618 if (this.isClosed) { 619 throw new IOException("Stream is closed"); 620 } 621 622 // if buffer is full flush buffer 623 if (bufferPointer >= buffer.length) { 624 flushBuffer(); 625 } 626 627 buffer[bufferPointer++] = (byte) b; 628 } 629 630 public synchronized void write(byte b[], int off, int len) throws IOException { 631 if (b == null) { 632 throw new NullPointerException(); 633 } 634 else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) 635 || ((off + len) < 0)) { 636 throw new IndexOutOfBoundsException(); 637 } 638 else if (len == 0) { 639 return; 640 } 641 642 if (this.isClosed) { 643 throw new IOException("Stream is closed"); 644 } 645 646 // is data to send greater than buffer size 647 if (len >= buffer.length) { 648 649 // "byte" off the first chunk to write out 650 writeOut(b, off, buffer.length); 651 652 // recursively call this method with the lesser amount 653 write(b, off + buffer.length, len - buffer.length); 654 } 655 else { 656 writeOut(b, off, len); 657 } 658 } 659 660 public synchronized void write(byte[] b) throws IOException { 661 write(b, 0, b.length); 662 } 663 664 /** 665 * Fills the buffer with the given data and sends it over the XMPP stream if the buffers 666 * capacity has been reached. This method is only called from this class so it is assured 667 * that the amount of data to send is <= buffer capacity 668 * 669 * @param b the data 670 * @param off the data 671 * @param len the number of bytes to write 672 * @throws IOException if an I/O error occurred while sending or if the stream is closed 673 */ 674 private synchronized void writeOut(byte b[], int off, int len) throws IOException { 675 if (this.isClosed) { 676 throw new IOException("Stream is closed"); 677 } 678 679 // set to 0 in case the next 'if' block is not executed 680 int available = 0; 681 682 // is data to send greater that buffer space left 683 if (len > buffer.length - bufferPointer) { 684 // fill buffer to capacity and send it 685 available = buffer.length - bufferPointer; 686 System.arraycopy(b, off, buffer, bufferPointer, available); 687 bufferPointer += available; 688 flushBuffer(); 689 } 690 691 // copy the data left to buffer 692 System.arraycopy(b, off + available, buffer, bufferPointer, len - available); 693 bufferPointer += len - available; 694 } 695 696 public synchronized void flush() throws IOException { 697 if (this.isClosed) { 698 throw new IOException("Stream is closed"); 699 } 700 flushBuffer(); 701 } 702 703 private synchronized void flushBuffer() throws IOException { 704 705 // do nothing if no data to send available 706 if (bufferPointer == 0) { 707 return; 708 } 709 710 // create data packet 711 String enc = StringUtils.encodeBase64(buffer, 0, bufferPointer, false); 712 DataPacketExtension data = new DataPacketExtension(byteStreamRequest.getSessionID(), 713 this.seq, enc); 714 715 // write to XMPP stream 716 try { 717 writeToXML(data); 718 } 719 catch (NotConnectedException e) { 720 IOException ioException = new IOException(); 721 ioException.initCause(e); 722 throw ioException; 723 } 724 725 // reset buffer pointer 726 bufferPointer = 0; 727 728 // increment sequence, considering sequence overflow 729 this.seq = (this.seq + 1 == 65535 ? 0 : this.seq + 1); 730 731 } 732 733 public void close() throws IOException { 734 if (isClosed) { 735 return; 736 } 737 InBandBytestreamSession.this.closeByLocal(false); 738 } 739 740 /** 741 * Sets the close flag and optionally flushes the stream. 742 * 743 * @param flush if <code>true</code> flushes the stream 744 */ 745 protected void closeInternal(boolean flush) { 746 if (this.isClosed) { 747 return; 748 } 749 this.isClosed = true; 750 751 try { 752 if (flush) { 753 flushBuffer(); 754 } 755 } 756 catch (IOException e) { 757 /* 758 * ignore, because writeToXML() will not throw an exception if stream is already 759 * closed 760 */ 761 } 762 } 763 764 } 765 766 /** 767 * IQIBBOutputStream class implements IBBOutputStream to be used with IQ stanzas encapsulating 768 * the data packets. 769 */ 770 private class IQIBBOutputStream extends IBBOutputStream { 771 772 @Override 773 protected synchronized void writeToXML(DataPacketExtension data) throws IOException { 774 // create IQ stanza containing data packet 775 IQ iq = new Data(data); 776 iq.setTo(remoteJID); 777 778 try { 779 connection.createPacketCollectorAndSend(iq).nextResultOrThrow(); 780 } 781 catch (Exception e) { 782 // close session unless it is already closed 783 if (!this.isClosed) { 784 InBandBytestreamSession.this.close(); 785 // Sadly we are unable to use the IOException(Throwable) constructor because this 786 // constructor is only supported from Android API 9 on. 787 IOException ioException = new IOException(); 788 ioException.initCause(e); 789 throw ioException; 790 } 791 } 792 793 } 794 795 } 796 797 /** 798 * MessageIBBOutputStream class implements IBBOutputStream to be used with message stanzas 799 * encapsulating the data packets. 800 */ 801 private class MessageIBBOutputStream extends IBBOutputStream { 802 803 @Override 804 protected synchronized void writeToXML(DataPacketExtension data) throws NotConnectedException { 805 // create message stanza containing data packet 806 Message message = new Message(remoteJID); 807 message.addExtension(data); 808 809 connection.sendPacket(message); 810 811 } 812 813 } 814 815}