001/** 002 * 003 * Copyright the original author or authors 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smackx.bytestreams.ibb; 018 019import java.io.IOException; 020import java.io.InputStream; 021import java.io.OutputStream; 022import java.net.SocketTimeoutException; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.LinkedBlockingQueue; 025import java.util.concurrent.TimeUnit; 026import java.util.logging.Level; 027import java.util.logging.Logger; 028 029import org.jivesoftware.smack.SmackException.NotConnectedException; 030import org.jivesoftware.smack.SmackException.NotLoggedInException; 031import org.jivesoftware.smack.StanzaListener; 032import org.jivesoftware.smack.XMPPConnection; 033import org.jivesoftware.smack.datatypes.UInt16; 034import org.jivesoftware.smack.filter.AndFilter; 035import org.jivesoftware.smack.filter.StanzaFilter; 036import org.jivesoftware.smack.filter.StanzaTypeFilter; 037import org.jivesoftware.smack.packet.IQ; 038import org.jivesoftware.smack.packet.Message; 039import org.jivesoftware.smack.packet.Stanza; 040import org.jivesoftware.smack.packet.StanzaBuilder; 041import org.jivesoftware.smack.packet.StanzaError; 042import org.jivesoftware.smack.util.stringencoder.Base64; 043 044import org.jivesoftware.smackx.bytestreams.BytestreamSession; 045import org.jivesoftware.smackx.bytestreams.ibb.packet.Close; 046import org.jivesoftware.smackx.bytestreams.ibb.packet.Data; 047import org.jivesoftware.smackx.bytestreams.ibb.packet.DataPacketExtension; 048import org.jivesoftware.smackx.bytestreams.ibb.packet.Open; 049 050import org.jxmpp.jid.Jid; 051 052/** 053 * InBandBytestreamSession class represents an In-Band Bytestream session. 054 * <p> 055 * In-band bytestreams are bidirectional and this session encapsulates the streams for both 056 * directions. 057 * <p> 058 * Note that closing the In-Band Bytestream session will close both streams. If both streams are 059 * closed individually the session will be closed automatically once the second stream is closed. 060 * Use the {@link #setCloseBothStreamsEnabled(boolean)} method if both streams should be closed 061 * automatically if one of them is closed. 062 * 063 * @author Henning Staib 064 */ 065public class InBandBytestreamSession implements BytestreamSession { 066 067 private static final Logger LOGGER = Logger.getLogger(InBandBytestreamSession.class.getName()); 068 069 static final String UNEXPECTED_IBB_SEQUENCE = "Unexpected IBB sequence"; 070 071 /* XMPP connection */ 072 private final XMPPConnection connection; 073 074 /* the In-Band Bytestream open request for this session */ 075 private final Open byteStreamRequest; 076 077 /* 078 * the input stream for this session (either IQIBBInputStream or MessageIBBInputStream) 079 */ 080 private IBBInputStream inputStream; 081 082 /* 083 * the output stream for this session (either IQIBBOutputStream or MessageIBBOutputStream) 084 */ 085 private IBBOutputStream outputStream; 086 087 /* JID of the remote peer */ 088 private Jid remoteJID; 089 090 /* flag to close both streams if one of them is closed */ 091 private boolean closeBothStreamsEnabled = false; 092 093 /* flag to indicate if session is closed */ 094 private boolean isClosed = false; 095 096 /** 097 * Constructor. 098 * 099 * @param connection the XMPP connection 100 * @param byteStreamRequest the In-Band Bytestream open request for this session 101 * @param remoteJID JID of the remote peer 102 */ 103 protected InBandBytestreamSession(XMPPConnection connection, Open byteStreamRequest, 104 Jid remoteJID) { 105 this.connection = connection; 106 this.byteStreamRequest = byteStreamRequest; 107 this.remoteJID = remoteJID; 108 109 // initialize streams dependent to the uses stanza type 110 switch (byteStreamRequest.getStanza()) { 111 case IQ: 112 this.inputStream = new IQIBBInputStream(); 113 this.outputStream = new IQIBBOutputStream(); 114 break; 115 case MESSAGE: 116 this.inputStream = new MessageIBBInputStream(); 117 this.outputStream = new MessageIBBOutputStream(); 118 break; 119 } 120 121 } 122 123 @Override 124 public InputStream getInputStream() { 125 return this.inputStream; 126 } 127 128 @Override 129 public OutputStream getOutputStream() { 130 return this.outputStream; 131 } 132 133 @Override 134 public int getReadTimeout() { 135 return this.inputStream.readTimeout; 136 } 137 138 @Override 139 public void setReadTimeout(int timeout) { 140 if (timeout < 0) { 141 throw new IllegalArgumentException("Timeout must be >= 0"); 142 } 143 this.inputStream.readTimeout = timeout; 144 } 145 146 /** 147 * Returns whether both streams should be closed automatically if one of the streams is closed. 148 * Default is <code>false</code>. 149 * 150 * @return <code>true</code> if both streams will be closed if one of the streams is closed, 151 * <code>false</code> if both streams can be closed independently. 152 */ 153 public boolean isCloseBothStreamsEnabled() { 154 return closeBothStreamsEnabled; 155 } 156 157 /** 158 * Sets whether both streams should be closed automatically if one of the streams is closed. 159 * Default is <code>false</code>. 160 * 161 * @param closeBothStreamsEnabled <code>true</code> if both streams should be closed if one of 162 * the streams is closed, <code>false</code> if both streams should be closed 163 * independently 164 */ 165 public void setCloseBothStreamsEnabled(boolean closeBothStreamsEnabled) { 166 this.closeBothStreamsEnabled = closeBothStreamsEnabled; 167 } 168 169 @Override 170 public void close() throws IOException { 171 closeByLocal(true); // close input stream 172 closeByLocal(false); // close output stream 173 } 174 175 /** 176 * This method is invoked if a request to close the In-Band Bytestream has been received. 177 * 178 * @param closeRequest the close request from the remote peer 179 * @throws NotConnectedException if the XMPP connection is not connected. 180 * @throws InterruptedException if the calling thread was interrupted. 181 */ 182 protected void closeByPeer(Close closeRequest) throws NotConnectedException, InterruptedException { 183 184 /* 185 * close streams without flushing them, because stream is already considered closed on the 186 * remote peers side 187 */ 188 this.inputStream.closeInternal(); 189 this.inputStream.cleanup(); 190 this.outputStream.closeInternal(false); 191 192 // acknowledge close request 193 IQ confirmClose = IQ.createResultIQ(closeRequest); 194 this.connection.sendStanza(confirmClose); 195 196 } 197 198 /** 199 * This method is invoked if one of the streams has been closed locally, if an error occurred 200 * locally or if the whole session should be closed. 201 * 202 * @param in do we want to close the Input- or OutputStream? 203 * @throws IOException if an error occurs while sending the close request 204 */ 205 protected synchronized void closeByLocal(boolean in) throws IOException { 206 if (this.isClosed) { 207 return; 208 } 209 210 if (this.closeBothStreamsEnabled) { 211 this.inputStream.closeInternal(); 212 this.outputStream.closeInternal(true); 213 } 214 else { 215 if (in) { 216 this.inputStream.closeInternal(); 217 } 218 else { 219 // close stream but try to send any data left 220 this.outputStream.closeInternal(true); 221 } 222 } 223 224 if (this.inputStream.isClosed && this.outputStream.isClosed) { 225 this.isClosed = true; 226 227 // send close request 228 Close close = new Close(this.byteStreamRequest.getSessionID()); 229 close.setTo(this.remoteJID); 230 try { 231 connection.sendIqRequestAndWaitForResponse(close); 232 } 233 catch (Exception e) { 234 // Sadly we are unable to use the IOException(Throwable) constructor because this 235 // constructor is only supported from Android API 9 on. 236 IOException ioException = new IOException(); 237 ioException.initCause(e); 238 throw ioException; 239 } 240 241 this.inputStream.cleanup(); 242 243 // remove session from manager 244 // Thanks Google Error Prone for finding the bug where remove() was called with 'this' as argument. Changed 245 // now to remove(byteStreamRequest.getSessionID). 246 InBandBytestreamManager.getByteStreamManager(this.connection).getSessions().remove(byteStreamRequest.getSessionID()); 247 } 248 249 } 250 251 /** 252 * IBBInputStream class is the base implementation of an In-Band Bytestream input stream. 253 * Subclasses of this input stream must provide a stanza listener along with a stanza filter to 254 * collect the In-Band Bytestream data packets. 255 */ 256 private abstract class IBBInputStream extends InputStream { 257 258 /* the data packet listener to fill the data queue */ 259 private final StanzaListener dataPacketListener; 260 261 /* queue containing received In-Band Bytestream data packets */ 262 protected final BlockingQueue<DataPacketExtension> dataQueue = new LinkedBlockingQueue<DataPacketExtension>(); 263 264 /* buffer containing the data from one data packet */ 265 private byte[] buffer; 266 267 /* pointer to the next byte to read from buffer */ 268 private int bufferPointer = -1; 269 270 /* data packet sequence (range from 0 to 65535) */ 271 private UInt16 expectedSeq = UInt16.MIN_VALUE; 272 273 /* flag to indicate if input stream is closed */ 274 private boolean isClosed = false; 275 276 /* flag to indicate if close method was invoked */ 277 private boolean closeInvoked = false; 278 279 /* timeout for read operations */ 280 private int readTimeout = 0; 281 282 /** 283 * Constructor. 284 */ 285 protected IBBInputStream() { 286 // add data packet listener to connection 287 this.dataPacketListener = getDataPacketListener(); 288 connection.addSyncStanzaListener(this.dataPacketListener, getDataPacketFilter()); 289 } 290 291 /** 292 * Returns the stanza listener that processes In-Band Bytestream data packets. 293 * 294 * @return the data stanza listener 295 */ 296 protected abstract StanzaListener getDataPacketListener(); 297 298 /** 299 * Returns the stanza filter that accepts In-Band Bytestream data packets. 300 * 301 * @return the data stanza filter 302 */ 303 protected abstract StanzaFilter getDataPacketFilter(); 304 305 @Override 306 public synchronized int read() throws IOException { 307 checkClosed(); 308 309 // if nothing read yet or whole buffer has been read fill buffer 310 if (bufferPointer == -1 || bufferPointer >= buffer.length) { 311 // if no data available and stream was closed return -1 312 if (!loadBuffer()) { 313 return -1; 314 } 315 } 316 317 // return byte and increment buffer pointer 318 return buffer[bufferPointer++] & 0xff; 319 } 320 321 @Override 322 public synchronized int read(byte[] b, int off, int len) throws IOException { 323 if (b == null) { 324 throw new NullPointerException(); 325 } 326 else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) 327 || ((off + len) < 0)) { 328 throw new IndexOutOfBoundsException(); 329 } 330 else if (len == 0) { 331 return 0; 332 } 333 334 checkClosed(); 335 336 // if nothing read yet or whole buffer has been read fill buffer 337 if (bufferPointer == -1 || bufferPointer >= buffer.length) { 338 // if no data available and stream was closed return -1 339 if (!loadBuffer()) { 340 return -1; 341 } 342 } 343 344 // if more bytes wanted than available return all available 345 int bytesAvailable = buffer.length - bufferPointer; 346 if (len > bytesAvailable) { 347 len = bytesAvailable; 348 } 349 350 System.arraycopy(buffer, bufferPointer, b, off, len); 351 bufferPointer += len; 352 return len; 353 } 354 355 @Override 356 public synchronized int read(byte[] b) throws IOException { 357 return read(b, 0, b.length); 358 } 359 360 /** 361 * This method blocks until a data stanza is received, the stream is closed or the current 362 * thread is interrupted. 363 * 364 * @return <code>true</code> if data was received, otherwise <code>false</code> 365 * @throws IOException if data packets are out of sequence 366 */ 367 private synchronized boolean loadBuffer() throws IOException { 368 369 // wait until data is available or stream is closed 370 DataPacketExtension data = null; 371 try { 372 if (this.readTimeout == 0) { 373 while (data == null) { 374 if (isClosed && this.dataQueue.isEmpty()) { 375 return false; 376 } 377 data = this.dataQueue.poll(1000, TimeUnit.MILLISECONDS); 378 } 379 } 380 else { 381 data = this.dataQueue.poll(this.readTimeout, TimeUnit.MILLISECONDS); 382 if (data == null) { 383 throw new SocketTimeoutException(); 384 } 385 } 386 } 387 catch (InterruptedException e) { 388 // Restore the interrupted status 389 Thread.currentThread().interrupt(); 390 return false; 391 } 392 393 final UInt16 dataSeq = data.getSeq(); 394 // check if data packets sequence is successor of last seen sequence 395 if (!expectedSeq.equals(dataSeq)) { 396 // packets out of order; close stream/session 397 InBandBytestreamSession.this.close(); 398 String message = UNEXPECTED_IBB_SEQUENCE + " " + dataSeq + " received, expected " 399 + expectedSeq; 400 throw new IOException(message); 401 } 402 expectedSeq = dataSeq.incrementedByOne(); 403 404 // set buffer to decoded data 405 buffer = data.getDecodedData(); 406 bufferPointer = 0; 407 return true; 408 } 409 410 /** 411 * Checks if this stream is closed and throws an IOException if necessary 412 * 413 * @throws IOException if stream is closed and no data should be read anymore 414 */ 415 private void checkClosed() throws IOException { 416 // Throw an exception if, and only if, this stream has been already 417 // closed by the user using the close() method 418 if (closeInvoked) { 419 // clear data queue in case additional data was received after stream was closed 420 this.dataQueue.clear(); 421 throw new IOException("Stream is closed"); 422 } 423 } 424 425 @Override 426 public boolean markSupported() { 427 return false; 428 } 429 430 @Override 431 public void close() throws IOException { 432 if (closeInvoked) { 433 return; 434 } 435 436 this.closeInvoked = true; 437 438 InBandBytestreamSession.this.closeByLocal(true); 439 } 440 441 /** 442 * This method sets the close flag and removes the data stanza listener. 443 */ 444 private void closeInternal() { 445 if (isClosed) { 446 return; 447 } 448 isClosed = true; 449 } 450 451 /** 452 * Invoked if the session is closed. 453 */ 454 private void cleanup() { 455 connection.removeSyncStanzaListener(this.dataPacketListener); 456 } 457 458 } 459 460 /** 461 * IQIBBInputStream class implements IBBInputStream to be used with IQ stanzas encapsulating the 462 * data packets. 463 */ 464 private final class IQIBBInputStream extends IBBInputStream { 465 466 @Override 467 protected StanzaListener getDataPacketListener() { 468 return new StanzaListener() { 469 470 private UInt16 expectedSequence = UInt16.MIN_VALUE;; 471 472 @Override 473 public void processStanza(Stanza packet) throws NotConnectedException, InterruptedException { 474 final Data dataIq = (Data) packet; 475 // get data packet extension 476 DataPacketExtension data = dataIq.getDataPacketExtension(); 477 478 final UInt16 seq = data.getSeq(); 479 /* 480 * check if sequence was not used already (see XEP-0047 Section 2.2) 481 */ 482 if (!expectedSequence.equals(seq)) { 483 String descriptiveEnTest = UNEXPECTED_IBB_SEQUENCE + " " + seq + " received, expected " 484 + expectedSequence; 485 StanzaError stanzaError = StanzaError.getBuilder() 486 .setCondition(StanzaError.Condition.unexpected_request) 487 .setDescriptiveEnText(descriptiveEnTest) 488 .build(); 489 IQ unexpectedRequest = IQ.createErrorResponse(dataIq, stanzaError); 490 connection.sendStanza(unexpectedRequest); 491 492 try { 493 // TODO: It would be great if close would take a "close error reason" argument. Also there 494 // is the question if this is really a reason to close the stream. We could have some more 495 // tolerance regarding out-of-sequence stanzas arriving: Even though XMPP has the in-order 496 // guarantee, I could imagine that there are cases where stanzas are, for example, 497 // duplicated because of stream resumption. 498 close(); 499 } catch (IOException e) { 500 LOGGER.log(Level.FINER, "Could not close session, because of IOException. Close reason: " 501 + descriptiveEnTest); 502 } 503 504 return; 505 } 506 507 // check if encoded data is valid (see XEP-0047 Section 2.2) 508 if (data.getDecodedData() == null) { 509 // data is invalid; respond with bad-request error 510 IQ badRequest = IQ.createErrorResponse((IQ) packet, 511 StanzaError.Condition.bad_request); 512 connection.sendStanza(badRequest); 513 return; 514 } 515 516 expectedSequence = seq.incrementedByOne(); 517 518 // data is valid; add to data queue 519 dataQueue.offer(data); 520 521 // confirm IQ 522 IQ confirmData = IQ.createResultIQ((IQ) packet); 523 connection.sendStanza(confirmData); 524 } 525 526 }; 527 } 528 529 @Override 530 protected StanzaFilter getDataPacketFilter() { 531 /* 532 * filter all IQ stanzas having type 'SET' (represented by Data class), containing a 533 * data stanza extension, matching session ID and recipient 534 */ 535 return new AndFilter(new StanzaTypeFilter(Data.class), new IBBDataPacketFilter()); 536 } 537 538 } 539 540 /** 541 * MessageIBBInputStream class implements IBBInputStream to be used with message stanzas 542 * encapsulating the data packets. 543 */ 544 private final class MessageIBBInputStream extends IBBInputStream { 545 546 @Override 547 protected StanzaListener getDataPacketListener() { 548 return new StanzaListener() { 549 550 @Override 551 public void processStanza(Stanza packet) { 552 // get data packet extension 553 DataPacketExtension data = packet.getExtension( 554 DataPacketExtension.class); 555 556 // check if encoded data is valid 557 if (data.getDecodedData() == null) { 558 /* 559 * TODO once a majority of XMPP server implementation support XEP-0079 560 * Advanced Message Processing the invalid message could be answered with an 561 * appropriate error. For now we just ignore the packet. Subsequent packets 562 * with an increased sequence will cause the input stream to close the 563 * stream/session. 564 */ 565 return; 566 } 567 568 // data is valid; add to data queue 569 dataQueue.offer(data); 570 571 // TODO confirm packet once XMPP servers support XEP-0079 572 } 573 574 }; 575 } 576 577 @Override 578 protected StanzaFilter getDataPacketFilter() { 579 /* 580 * filter all message stanzas containing a data stanza extension, matching session ID 581 * and recipient 582 */ 583 return new AndFilter(new StanzaTypeFilter(Message.class), new IBBDataPacketFilter()); 584 } 585 586 } 587 588 /** 589 * IBBDataPacketFilter class filters all packets from the remote peer of this session, 590 * containing an In-Band Bytestream data stanza extension whose session ID matches this sessions 591 * ID. 592 */ 593 private final class IBBDataPacketFilter implements StanzaFilter { 594 595 @Override 596 public boolean accept(Stanza packet) { 597 // sender equals remote peer 598 if (!packet.getFrom().equals(remoteJID)) { 599 return false; 600 } 601 602 DataPacketExtension data; 603 if (packet instanceof Data) { 604 data = ((Data) packet).getDataPacketExtension(); 605 } else { 606 // stanza contains data packet extension 607 data = packet.getExtension( 608 DataPacketExtension.class); 609 if (data == null) { 610 return false; 611 } 612 } 613 614 // session ID equals this session ID 615 if (!data.getSessionID().equals(byteStreamRequest.getSessionID())) { 616 return false; 617 } 618 619 return true; 620 } 621 622 } 623 624 /** 625 * IBBOutputStream class is the base implementation of an In-Band Bytestream output stream. 626 * Subclasses of this output stream must provide a method to send data over XMPP stream. 627 */ 628 private abstract class IBBOutputStream extends OutputStream { 629 630 /* buffer with the size of this sessions block size */ 631 protected final byte[] buffer; 632 633 /* pointer to next byte to write to buffer */ 634 protected int bufferPointer = 0; 635 636 /* data packet sequence (range from 0 to 65535) */ 637 protected UInt16 seq = UInt16.from(0); 638 639 /* flag to indicate if output stream is closed */ 640 protected boolean isClosed = false; 641 642 /** 643 * Constructor. 644 */ 645 private IBBOutputStream() { 646 this.buffer = new byte[byteStreamRequest.getBlockSize()]; 647 } 648 649 /** 650 * Writes the given data stanza to the XMPP stream. 651 * 652 * @param data the data packet 653 * @throws IOException if an I/O error occurred while sending or if the stream is closed 654 * @throws NotConnectedException if the XMPP connection is not connected. 655 * @throws InterruptedException if the calling thread was interrupted. 656 */ 657 protected abstract void writeToXML(DataPacketExtension data) throws IOException, NotConnectedException, InterruptedException; 658 659 @Override 660 public synchronized void write(int b) throws IOException { 661 if (this.isClosed) { 662 throw new IOException("Stream is closed"); 663 } 664 665 // if buffer is full flush buffer 666 if (bufferPointer >= buffer.length) { 667 flushBuffer(); 668 } 669 670 buffer[bufferPointer++] = (byte) b; 671 } 672 673 @Override 674 public synchronized void write(byte[] b, int off, int len) throws IOException { 675 if (b == null) { 676 throw new NullPointerException(); 677 } 678 else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) 679 || ((off + len) < 0)) { 680 throw new IndexOutOfBoundsException(); 681 } 682 else if (len == 0) { 683 return; 684 } 685 686 if (this.isClosed) { 687 throw new IOException("Stream is closed"); 688 } 689 690 // is data to send greater than buffer size 691 if (len >= buffer.length) { 692 693 // "byte" off the first chunk to write out 694 writeOut(b, off, buffer.length); 695 696 // recursively call this method with the lesser amount 697 write(b, off + buffer.length, len - buffer.length); 698 } 699 else { 700 writeOut(b, off, len); 701 } 702 } 703 704 @Override 705 public synchronized void write(byte[] b) throws IOException { 706 write(b, 0, b.length); 707 } 708 709 /** 710 * Fills the buffer with the given data and sends it over the XMPP stream if the buffers 711 * capacity has been reached. This method is only called from this class so it is assured 712 * that the amount of data to send is <= buffer capacity 713 * 714 * @param b the data 715 * @param off the data 716 * @param len the number of bytes to write 717 * @throws IOException if an I/O error occurred while sending or if the stream is closed 718 */ 719 private synchronized void writeOut(byte[] b, int off, int len) throws IOException { 720 if (this.isClosed) { 721 throw new IOException("Stream is closed"); 722 } 723 724 // set to 0 in case the next 'if' block is not executed 725 int available = 0; 726 727 // is data to send greater that buffer space left 728 if (len > buffer.length - bufferPointer) { 729 // fill buffer to capacity and send it 730 available = buffer.length - bufferPointer; 731 System.arraycopy(b, off, buffer, bufferPointer, available); 732 bufferPointer += available; 733 flushBuffer(); 734 } 735 736 // copy the data left to buffer 737 System.arraycopy(b, off + available, buffer, bufferPointer, len - available); 738 bufferPointer += len - available; 739 } 740 741 @Override 742 public synchronized void flush() throws IOException { 743 if (this.isClosed) { 744 throw new IOException("Stream is closed"); 745 } 746 flushBuffer(); 747 } 748 749 private synchronized void flushBuffer() throws IOException { 750 751 // do nothing if no data to send available 752 if (bufferPointer == 0) { 753 return; 754 } 755 756 // create data packet 757 String enc = Base64.encodeToString(buffer, 0, bufferPointer); 758 DataPacketExtension data = new DataPacketExtension(byteStreamRequest.getSessionID(), 759 this.seq, enc); 760 761 // write to XMPP stream 762 try { 763 writeToXML(data); 764 } 765 catch (InterruptedException | NotConnectedException e) { 766 IOException ioException = new IOException(); 767 ioException.initCause(e); 768 throw ioException; 769 } 770 771 // reset buffer pointer 772 bufferPointer = 0; 773 774 // increment sequence, considering sequence overflow 775 seq = seq.incrementedByOne(); 776 777 } 778 779 @Override 780 public void close() throws IOException { 781 if (isClosed) { 782 return; 783 } 784 InBandBytestreamSession.this.closeByLocal(false); 785 } 786 787 /** 788 * Sets the close flag and optionally flushes the stream. 789 * 790 * @param flush if <code>true</code> flushes the stream 791 */ 792 protected void closeInternal(boolean flush) { 793 if (this.isClosed) { 794 return; 795 } 796 this.isClosed = true; 797 798 try { 799 if (flush) { 800 flushBuffer(); 801 } 802 } 803 catch (IOException e) { 804 /* 805 * ignore, because writeToXML() will not throw an exception if stream is already 806 * closed 807 */ 808 } 809 } 810 811 } 812 813 /** 814 * IQIBBOutputStream class implements IBBOutputStream to be used with IQ stanzas encapsulating 815 * the data packets. 816 */ 817 private final class IQIBBOutputStream extends IBBOutputStream { 818 819 @Override 820 protected synchronized void writeToXML(DataPacketExtension data) throws IOException { 821 // create IQ stanza containing data packet 822 IQ iq = new Data(data); 823 iq.setTo(remoteJID); 824 825 try { 826 connection.sendIqRequestAndWaitForResponse(iq); 827 } 828 catch (Exception e) { 829 // close session unless it is already closed 830 if (!this.isClosed) { 831 InBandBytestreamSession.this.close(); 832 // Sadly we are unable to use the IOException(Throwable) constructor because this 833 // constructor is only supported from Android API 9 on. 834 IOException ioException = new IOException(); 835 ioException.initCause(e); 836 throw ioException; 837 } 838 } 839 840 } 841 842 } 843 844 /** 845 * MessageIBBOutputStream class implements IBBOutputStream to be used with message stanzas 846 * encapsulating the data packets. 847 */ 848 private final class MessageIBBOutputStream extends IBBOutputStream { 849 850 @Override 851 protected synchronized void writeToXML(DataPacketExtension data) throws NotConnectedException, InterruptedException { 852 // create message stanza containing data packet 853 Message message = StanzaBuilder.buildMessage().to(remoteJID) 854 .addExtension(data) 855 .build(); 856 857 connection.sendStanza(message); 858 859 } 860 861 } 862 863 /** 864 * Process IQ stanza. 865 * @param data TODO javadoc me please 866 * @throws NotConnectedException if the XMPP connection is not connected. 867 * @throws InterruptedException if the calling thread was interrupted. 868 * @throws NotLoggedInException if the XMPP connection is not authenticated. 869 */ 870 public void processIQPacket(Data data) throws NotConnectedException, InterruptedException, NotLoggedInException { 871 inputStream.dataPacketListener.processStanza(data); 872 } 873 874}