001/** 002 * 003 * Copyright the original author or authors 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smackx.bytestreams.ibb; 018 019import java.io.IOException; 020import java.io.InputStream; 021import java.io.OutputStream; 022import java.net.SocketTimeoutException; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.LinkedBlockingQueue; 025import java.util.concurrent.TimeUnit; 026 027import org.jivesoftware.smack.SmackException.NotConnectedException; 028import org.jivesoftware.smack.SmackException.NotLoggedInException; 029import org.jivesoftware.smack.StanzaListener; 030import org.jivesoftware.smack.XMPPConnection; 031import org.jivesoftware.smack.filter.AndFilter; 032import org.jivesoftware.smack.filter.StanzaFilter; 033import org.jivesoftware.smack.filter.StanzaTypeFilter; 034import org.jivesoftware.smack.packet.IQ; 035import org.jivesoftware.smack.packet.Message; 036import org.jivesoftware.smack.packet.Stanza; 037import org.jivesoftware.smack.packet.StanzaError; 038import org.jivesoftware.smack.util.stringencoder.Base64; 039 040import org.jivesoftware.smackx.bytestreams.BytestreamSession; 041import org.jivesoftware.smackx.bytestreams.ibb.packet.Close; 042import org.jivesoftware.smackx.bytestreams.ibb.packet.Data; 043import org.jivesoftware.smackx.bytestreams.ibb.packet.DataPacketExtension; 044import org.jivesoftware.smackx.bytestreams.ibb.packet.Open; 045 046import org.jxmpp.jid.Jid; 047 048/** 049 * InBandBytestreamSession class represents an In-Band Bytestream session. 050 * <p> 051 * In-band bytestreams are bidirectional and this session encapsulates the streams for both 052 * directions. 053 * <p> 054 * Note that closing the In-Band Bytestream session will close both streams. If both streams are 055 * closed individually the session will be closed automatically once the second stream is closed. 056 * Use the {@link #setCloseBothStreamsEnabled(boolean)} method if both streams should be closed 057 * automatically if one of them is closed. 058 * 059 * @author Henning Staib 060 */ 061public class InBandBytestreamSession implements BytestreamSession { 062 063 /* XMPP connection */ 064 private final XMPPConnection connection; 065 066 /* the In-Band Bytestream open request for this session */ 067 private final Open byteStreamRequest; 068 069 /* 070 * the input stream for this session (either IQIBBInputStream or MessageIBBInputStream) 071 */ 072 private IBBInputStream inputStream; 073 074 /* 075 * the output stream for this session (either IQIBBOutputStream or MessageIBBOutputStream) 076 */ 077 private IBBOutputStream outputStream; 078 079 /* JID of the remote peer */ 080 private Jid remoteJID; 081 082 /* flag to close both streams if one of them is closed */ 083 private boolean closeBothStreamsEnabled = false; 084 085 /* flag to indicate if session is closed */ 086 private boolean isClosed = false; 087 088 /** 089 * Constructor. 090 * 091 * @param connection the XMPP connection 092 * @param byteStreamRequest the In-Band Bytestream open request for this session 093 * @param remoteJID JID of the remote peer 094 */ 095 protected InBandBytestreamSession(XMPPConnection connection, Open byteStreamRequest, 096 Jid remoteJID) { 097 this.connection = connection; 098 this.byteStreamRequest = byteStreamRequest; 099 this.remoteJID = remoteJID; 100 101 // initialize streams dependent to the uses stanza type 102 switch (byteStreamRequest.getStanza()) { 103 case IQ: 104 this.inputStream = new IQIBBInputStream(); 105 this.outputStream = new IQIBBOutputStream(); 106 break; 107 case MESSAGE: 108 this.inputStream = new MessageIBBInputStream(); 109 this.outputStream = new MessageIBBOutputStream(); 110 break; 111 } 112 113 } 114 115 @Override 116 public InputStream getInputStream() { 117 return this.inputStream; 118 } 119 120 @Override 121 public OutputStream getOutputStream() { 122 return this.outputStream; 123 } 124 125 @Override 126 public int getReadTimeout() { 127 return this.inputStream.readTimeout; 128 } 129 130 @Override 131 public void setReadTimeout(int timeout) { 132 if (timeout < 0) { 133 throw new IllegalArgumentException("Timeout must be >= 0"); 134 } 135 this.inputStream.readTimeout = timeout; 136 } 137 138 /** 139 * Returns whether both streams should be closed automatically if one of the streams is closed. 140 * Default is <code>false</code>. 141 * 142 * @return <code>true</code> if both streams will be closed if one of the streams is closed, 143 * <code>false</code> if both streams can be closed independently. 144 */ 145 public boolean isCloseBothStreamsEnabled() { 146 return closeBothStreamsEnabled; 147 } 148 149 /** 150 * Sets whether both streams should be closed automatically if one of the streams is closed. 151 * Default is <code>false</code>. 152 * 153 * @param closeBothStreamsEnabled <code>true</code> if both streams should be closed if one of 154 * the streams is closed, <code>false</code> if both streams should be closed 155 * independently 156 */ 157 public void setCloseBothStreamsEnabled(boolean closeBothStreamsEnabled) { 158 this.closeBothStreamsEnabled = closeBothStreamsEnabled; 159 } 160 161 @Override 162 public void close() throws IOException { 163 closeByLocal(true); // close input stream 164 closeByLocal(false); // close output stream 165 } 166 167 /** 168 * This method is invoked if a request to close the In-Band Bytestream has been received. 169 * 170 * @param closeRequest the close request from the remote peer 171 * @throws NotConnectedException 172 * @throws InterruptedException 173 */ 174 protected void closeByPeer(Close closeRequest) throws NotConnectedException, InterruptedException { 175 176 /* 177 * close streams without flushing them, because stream is already considered closed on the 178 * remote peers side 179 */ 180 this.inputStream.closeInternal(); 181 this.inputStream.cleanup(); 182 this.outputStream.closeInternal(false); 183 184 // acknowledge close request 185 IQ confirmClose = IQ.createResultIQ(closeRequest); 186 this.connection.sendStanza(confirmClose); 187 188 } 189 190 /** 191 * This method is invoked if one of the streams has been closed locally, if an error occurred 192 * locally or if the whole session should be closed. 193 * 194 * @param in do we want to close the Input- or OutputStream? 195 * @throws IOException if an error occurs while sending the close request 196 */ 197 protected synchronized void closeByLocal(boolean in) throws IOException { 198 if (this.isClosed) { 199 return; 200 } 201 202 if (this.closeBothStreamsEnabled) { 203 this.inputStream.closeInternal(); 204 this.outputStream.closeInternal(true); 205 } 206 else { 207 if (in) { 208 this.inputStream.closeInternal(); 209 } 210 else { 211 // close stream but try to send any data left 212 this.outputStream.closeInternal(true); 213 } 214 } 215 216 if (this.inputStream.isClosed && this.outputStream.isClosed) { 217 this.isClosed = true; 218 219 // send close request 220 Close close = new Close(this.byteStreamRequest.getSessionID()); 221 close.setTo(this.remoteJID); 222 try { 223 connection.createStanzaCollectorAndSend(close).nextResultOrThrow(); 224 } 225 catch (Exception e) { 226 // Sadly we are unable to use the IOException(Throwable) constructor because this 227 // constructor is only supported from Android API 9 on. 228 IOException ioException = new IOException(); 229 ioException.initCause(e); 230 throw ioException; 231 } 232 233 this.inputStream.cleanup(); 234 235 // remove session from manager 236 // Thanks Google Error Prone for finding the bug where remove() was called with 'this' as argument. Changed 237 // now to remove(byteStreamRequest.getSessionID). 238 InBandBytestreamManager.getByteStreamManager(this.connection).getSessions().remove(byteStreamRequest.getSessionID()); 239 } 240 241 } 242 243 /** 244 * IBBInputStream class is the base implementation of an In-Band Bytestream input stream. 245 * Subclasses of this input stream must provide a stanza listener along with a stanza filter to 246 * collect the In-Band Bytestream data packets. 247 */ 248 private abstract class IBBInputStream extends InputStream { 249 250 /* the data packet listener to fill the data queue */ 251 private final StanzaListener dataPacketListener; 252 253 /* queue containing received In-Band Bytestream data packets */ 254 protected final BlockingQueue<DataPacketExtension> dataQueue = new LinkedBlockingQueue<DataPacketExtension>(); 255 256 /* buffer containing the data from one data packet */ 257 private byte[] buffer; 258 259 /* pointer to the next byte to read from buffer */ 260 private int bufferPointer = -1; 261 262 /* data packet sequence (range from 0 to 65535) */ 263 private long seq = -1; 264 265 /* flag to indicate if input stream is closed */ 266 private boolean isClosed = false; 267 268 /* flag to indicate if close method was invoked */ 269 private boolean closeInvoked = false; 270 271 /* timeout for read operations */ 272 private int readTimeout = 0; 273 274 /** 275 * Constructor. 276 */ 277 protected IBBInputStream() { 278 // add data packet listener to connection 279 this.dataPacketListener = getDataPacketListener(); 280 connection.addSyncStanzaListener(this.dataPacketListener, getDataPacketFilter()); 281 } 282 283 /** 284 * Returns the stanza listener that processes In-Band Bytestream data packets. 285 * 286 * @return the data stanza listener 287 */ 288 protected abstract StanzaListener getDataPacketListener(); 289 290 /** 291 * Returns the stanza filter that accepts In-Band Bytestream data packets. 292 * 293 * @return the data stanza filter 294 */ 295 protected abstract StanzaFilter getDataPacketFilter(); 296 297 @Override 298 public synchronized int read() throws IOException { 299 checkClosed(); 300 301 // if nothing read yet or whole buffer has been read fill buffer 302 if (bufferPointer == -1 || bufferPointer >= buffer.length) { 303 // if no data available and stream was closed return -1 304 if (!loadBuffer()) { 305 return -1; 306 } 307 } 308 309 // return byte and increment buffer pointer 310 return buffer[bufferPointer++] & 0xff; 311 } 312 313 @Override 314 public synchronized int read(byte[] b, int off, int len) throws IOException { 315 if (b == null) { 316 throw new NullPointerException(); 317 } 318 else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) 319 || ((off + len) < 0)) { 320 throw new IndexOutOfBoundsException(); 321 } 322 else if (len == 0) { 323 return 0; 324 } 325 326 checkClosed(); 327 328 // if nothing read yet or whole buffer has been read fill buffer 329 if (bufferPointer == -1 || bufferPointer >= buffer.length) { 330 // if no data available and stream was closed return -1 331 if (!loadBuffer()) { 332 return -1; 333 } 334 } 335 336 // if more bytes wanted than available return all available 337 int bytesAvailable = buffer.length - bufferPointer; 338 if (len > bytesAvailable) { 339 len = bytesAvailable; 340 } 341 342 System.arraycopy(buffer, bufferPointer, b, off, len); 343 bufferPointer += len; 344 return len; 345 } 346 347 @Override 348 public synchronized int read(byte[] b) throws IOException { 349 return read(b, 0, b.length); 350 } 351 352 /** 353 * This method blocks until a data stanza is received, the stream is closed or the current 354 * thread is interrupted. 355 * 356 * @return <code>true</code> if data was received, otherwise <code>false</code> 357 * @throws IOException if data packets are out of sequence 358 */ 359 private synchronized boolean loadBuffer() throws IOException { 360 361 // wait until data is available or stream is closed 362 DataPacketExtension data = null; 363 try { 364 if (this.readTimeout == 0) { 365 while (data == null) { 366 if (isClosed && this.dataQueue.isEmpty()) { 367 return false; 368 } 369 data = this.dataQueue.poll(1000, TimeUnit.MILLISECONDS); 370 } 371 } 372 else { 373 data = this.dataQueue.poll(this.readTimeout, TimeUnit.MILLISECONDS); 374 if (data == null) { 375 throw new SocketTimeoutException(); 376 } 377 } 378 } 379 catch (InterruptedException e) { 380 // Restore the interrupted status 381 Thread.currentThread().interrupt(); 382 return false; 383 } 384 385 // handle sequence overflow 386 if (this.seq == 65535) { 387 this.seq = -1; 388 } 389 390 // check if data packets sequence is successor of last seen sequence 391 long seq = data.getSeq(); 392 if (seq - 1 != this.seq) { 393 // packets out of order; close stream/session 394 InBandBytestreamSession.this.close(); 395 throw new IOException("Packets out of sequence"); 396 } 397 else { 398 this.seq = seq; 399 } 400 401 // set buffer to decoded data 402 buffer = data.getDecodedData(); 403 bufferPointer = 0; 404 return true; 405 } 406 407 /** 408 * Checks if this stream is closed and throws an IOException if necessary 409 * 410 * @throws IOException if stream is closed and no data should be read anymore 411 */ 412 private void checkClosed() throws IOException { 413 // Throw an exception if, and only if, this stream has been already 414 // closed by the user using the close() method 415 if (closeInvoked) { 416 // clear data queue in case additional data was received after stream was closed 417 this.dataQueue.clear(); 418 throw new IOException("Stream is closed"); 419 } 420 } 421 422 @Override 423 public boolean markSupported() { 424 return false; 425 } 426 427 @Override 428 public void close() throws IOException { 429 if (closeInvoked) { 430 return; 431 } 432 433 this.closeInvoked = true; 434 435 InBandBytestreamSession.this.closeByLocal(true); 436 } 437 438 /** 439 * This method sets the close flag and removes the data stanza listener. 440 */ 441 private void closeInternal() { 442 if (isClosed) { 443 return; 444 } 445 isClosed = true; 446 } 447 448 /** 449 * Invoked if the session is closed. 450 */ 451 private void cleanup() { 452 connection.removeSyncStanzaListener(this.dataPacketListener); 453 } 454 455 } 456 457 /** 458 * IQIBBInputStream class implements IBBInputStream to be used with IQ stanzas encapsulating the 459 * data packets. 460 */ 461 private class IQIBBInputStream extends IBBInputStream { 462 463 @Override 464 protected StanzaListener getDataPacketListener() { 465 return new StanzaListener() { 466 467 private long lastSequence = -1; 468 469 @Override 470 public void processStanza(Stanza packet) throws NotConnectedException, InterruptedException { 471 // get data packet extension 472 DataPacketExtension data = ((Data) packet).getDataPacketExtension(); 473 474 /* 475 * check if sequence was not used already (see XEP-0047 Section 2.2) 476 */ 477 if (data.getSeq() <= this.lastSequence) { 478 IQ unexpectedRequest = IQ.createErrorResponse((IQ) packet, 479 StanzaError.Condition.unexpected_request); 480 connection.sendStanza(unexpectedRequest); 481 return; 482 483 } 484 485 // check if encoded data is valid (see XEP-0047 Section 2.2) 486 if (data.getDecodedData() == null) { 487 // data is invalid; respond with bad-request error 488 IQ badRequest = IQ.createErrorResponse((IQ) packet, 489 StanzaError.Condition.bad_request); 490 connection.sendStanza(badRequest); 491 return; 492 } 493 494 // data is valid; add to data queue 495 dataQueue.offer(data); 496 497 // confirm IQ 498 IQ confirmData = IQ.createResultIQ((IQ) packet); 499 connection.sendStanza(confirmData); 500 501 // set last seen sequence 502 this.lastSequence = data.getSeq(); 503 if (this.lastSequence == 65535) { 504 this.lastSequence = -1; 505 } 506 507 } 508 509 }; 510 } 511 512 @Override 513 protected StanzaFilter getDataPacketFilter() { 514 /* 515 * filter all IQ stanzas having type 'SET' (represented by Data class), containing a 516 * data stanza extension, matching session ID and recipient 517 */ 518 return new AndFilter(new StanzaTypeFilter(Data.class), new IBBDataPacketFilter()); 519 } 520 521 } 522 523 /** 524 * MessageIBBInputStream class implements IBBInputStream to be used with message stanzas 525 * encapsulating the data packets. 526 */ 527 private class MessageIBBInputStream extends IBBInputStream { 528 529 @Override 530 protected StanzaListener getDataPacketListener() { 531 return new StanzaListener() { 532 533 @Override 534 public void processStanza(Stanza packet) { 535 // get data packet extension 536 DataPacketExtension data = packet.getExtension( 537 DataPacketExtension.ELEMENT, 538 DataPacketExtension.NAMESPACE); 539 540 // check if encoded data is valid 541 if (data.getDecodedData() == null) { 542 /* 543 * TODO once a majority of XMPP server implementation support XEP-0079 544 * Advanced Message Processing the invalid message could be answered with an 545 * appropriate error. For now we just ignore the packet. Subsequent packets 546 * with an increased sequence will cause the input stream to close the 547 * stream/session. 548 */ 549 return; 550 } 551 552 // data is valid; add to data queue 553 dataQueue.offer(data); 554 555 // TODO confirm packet once XMPP servers support XEP-0079 556 } 557 558 }; 559 } 560 561 @Override 562 protected StanzaFilter getDataPacketFilter() { 563 /* 564 * filter all message stanzas containing a data stanza extension, matching session ID 565 * and recipient 566 */ 567 return new AndFilter(new StanzaTypeFilter(Message.class), new IBBDataPacketFilter()); 568 } 569 570 } 571 572 /** 573 * IBBDataPacketFilter class filters all packets from the remote peer of this session, 574 * containing an In-Band Bytestream data stanza extension whose session ID matches this sessions 575 * ID. 576 */ 577 private class IBBDataPacketFilter implements StanzaFilter { 578 579 @Override 580 public boolean accept(Stanza packet) { 581 // sender equals remote peer 582 if (!packet.getFrom().equals(remoteJID)) { 583 return false; 584 } 585 586 DataPacketExtension data; 587 if (packet instanceof Data) { 588 data = ((Data) packet).getDataPacketExtension(); 589 } else { 590 // stanza contains data packet extension 591 data = packet.getExtension( 592 DataPacketExtension.ELEMENT, 593 DataPacketExtension.NAMESPACE); 594 if (data == null) { 595 return false; 596 } 597 } 598 599 // session ID equals this session ID 600 if (!data.getSessionID().equals(byteStreamRequest.getSessionID())) { 601 return false; 602 } 603 604 return true; 605 } 606 607 } 608 609 /** 610 * IBBOutputStream class is the base implementation of an In-Band Bytestream output stream. 611 * Subclasses of this output stream must provide a method to send data over XMPP stream. 612 */ 613 private abstract class IBBOutputStream extends OutputStream { 614 615 /* buffer with the size of this sessions block size */ 616 protected final byte[] buffer; 617 618 /* pointer to next byte to write to buffer */ 619 protected int bufferPointer = 0; 620 621 /* data packet sequence (range from 0 to 65535) */ 622 protected long seq = 0; 623 624 /* flag to indicate if output stream is closed */ 625 protected boolean isClosed = false; 626 627 /** 628 * Constructor. 629 */ 630 private IBBOutputStream() { 631 this.buffer = new byte[byteStreamRequest.getBlockSize()]; 632 } 633 634 /** 635 * Writes the given data stanza to the XMPP stream. 636 * 637 * @param data the data packet 638 * @throws IOException if an I/O error occurred while sending or if the stream is closed 639 * @throws NotConnectedException 640 * @throws InterruptedException 641 */ 642 protected abstract void writeToXML(DataPacketExtension data) throws IOException, NotConnectedException, InterruptedException; 643 644 @Override 645 public synchronized void write(int b) throws IOException { 646 if (this.isClosed) { 647 throw new IOException("Stream is closed"); 648 } 649 650 // if buffer is full flush buffer 651 if (bufferPointer >= buffer.length) { 652 flushBuffer(); 653 } 654 655 buffer[bufferPointer++] = (byte) b; 656 } 657 658 @Override 659 public synchronized void write(byte[] b, int off, int len) throws IOException { 660 if (b == null) { 661 throw new NullPointerException(); 662 } 663 else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) 664 || ((off + len) < 0)) { 665 throw new IndexOutOfBoundsException(); 666 } 667 else if (len == 0) { 668 return; 669 } 670 671 if (this.isClosed) { 672 throw new IOException("Stream is closed"); 673 } 674 675 // is data to send greater than buffer size 676 if (len >= buffer.length) { 677 678 // "byte" off the first chunk to write out 679 writeOut(b, off, buffer.length); 680 681 // recursively call this method with the lesser amount 682 write(b, off + buffer.length, len - buffer.length); 683 } 684 else { 685 writeOut(b, off, len); 686 } 687 } 688 689 @Override 690 public synchronized void write(byte[] b) throws IOException { 691 write(b, 0, b.length); 692 } 693 694 /** 695 * Fills the buffer with the given data and sends it over the XMPP stream if the buffers 696 * capacity has been reached. This method is only called from this class so it is assured 697 * that the amount of data to send is <= buffer capacity 698 * 699 * @param b the data 700 * @param off the data 701 * @param len the number of bytes to write 702 * @throws IOException if an I/O error occurred while sending or if the stream is closed 703 */ 704 private synchronized void writeOut(byte[] b, int off, int len) throws IOException { 705 if (this.isClosed) { 706 throw new IOException("Stream is closed"); 707 } 708 709 // set to 0 in case the next 'if' block is not executed 710 int available = 0; 711 712 // is data to send greater that buffer space left 713 if (len > buffer.length - bufferPointer) { 714 // fill buffer to capacity and send it 715 available = buffer.length - bufferPointer; 716 System.arraycopy(b, off, buffer, bufferPointer, available); 717 bufferPointer += available; 718 flushBuffer(); 719 } 720 721 // copy the data left to buffer 722 System.arraycopy(b, off + available, buffer, bufferPointer, len - available); 723 bufferPointer += len - available; 724 } 725 726 @Override 727 public synchronized void flush() throws IOException { 728 if (this.isClosed) { 729 throw new IOException("Stream is closed"); 730 } 731 flushBuffer(); 732 } 733 734 private synchronized void flushBuffer() throws IOException { 735 736 // do nothing if no data to send available 737 if (bufferPointer == 0) { 738 return; 739 } 740 741 // create data packet 742 String enc = Base64.encodeToString(buffer, 0, bufferPointer); 743 DataPacketExtension data = new DataPacketExtension(byteStreamRequest.getSessionID(), 744 this.seq, enc); 745 746 // write to XMPP stream 747 try { 748 writeToXML(data); 749 } 750 catch (InterruptedException | NotConnectedException e) { 751 IOException ioException = new IOException(); 752 ioException.initCause(e); 753 throw ioException; 754 } 755 756 // reset buffer pointer 757 bufferPointer = 0; 758 759 // increment sequence, considering sequence overflow 760 this.seq = (this.seq + 1 == 65535 ? 0 : this.seq + 1); 761 762 } 763 764 @Override 765 public void close() throws IOException { 766 if (isClosed) { 767 return; 768 } 769 InBandBytestreamSession.this.closeByLocal(false); 770 } 771 772 /** 773 * Sets the close flag and optionally flushes the stream. 774 * 775 * @param flush if <code>true</code> flushes the stream 776 */ 777 protected void closeInternal(boolean flush) { 778 if (this.isClosed) { 779 return; 780 } 781 this.isClosed = true; 782 783 try { 784 if (flush) { 785 flushBuffer(); 786 } 787 } 788 catch (IOException e) { 789 /* 790 * ignore, because writeToXML() will not throw an exception if stream is already 791 * closed 792 */ 793 } 794 } 795 796 } 797 798 /** 799 * IQIBBOutputStream class implements IBBOutputStream to be used with IQ stanzas encapsulating 800 * the data packets. 801 */ 802 private class IQIBBOutputStream extends IBBOutputStream { 803 804 @Override 805 protected synchronized void writeToXML(DataPacketExtension data) throws IOException { 806 // create IQ stanza containing data packet 807 IQ iq = new Data(data); 808 iq.setTo(remoteJID); 809 810 try { 811 connection.createStanzaCollectorAndSend(iq).nextResultOrThrow(); 812 } 813 catch (Exception e) { 814 // close session unless it is already closed 815 if (!this.isClosed) { 816 InBandBytestreamSession.this.close(); 817 // Sadly we are unable to use the IOException(Throwable) constructor because this 818 // constructor is only supported from Android API 9 on. 819 IOException ioException = new IOException(); 820 ioException.initCause(e); 821 throw ioException; 822 } 823 } 824 825 } 826 827 } 828 829 /** 830 * MessageIBBOutputStream class implements IBBOutputStream to be used with message stanzas 831 * encapsulating the data packets. 832 */ 833 private class MessageIBBOutputStream extends IBBOutputStream { 834 835 @Override 836 protected synchronized void writeToXML(DataPacketExtension data) throws NotConnectedException, InterruptedException { 837 // create message stanza containing data packet 838 Message message = new Message(remoteJID); 839 message.addExtension(data); 840 841 connection.sendStanza(message); 842 843 } 844 845 } 846 847 /** 848 * Process IQ stanza. 849 * @param data 850 * @throws NotConnectedException 851 * @throws InterruptedException 852 * @throws NotLoggedInException 853 */ 854 public void processIQPacket(Data data) throws NotConnectedException, InterruptedException, NotLoggedInException { 855 inputStream.dataPacketListener.processStanza(data); 856 } 857 858}