001/** 002 * 003 * Copyright the original author or authors 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smackx.bytestreams.ibb; 018 019import java.io.IOException; 020import java.io.InputStream; 021import java.io.OutputStream; 022import java.net.SocketTimeoutException; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.LinkedBlockingQueue; 025import java.util.concurrent.TimeUnit; 026 027import org.jivesoftware.smack.SmackException.NotConnectedException; 028import org.jivesoftware.smack.XMPPConnection; 029import org.jivesoftware.smack.StanzaListener; 030import org.jivesoftware.smack.filter.AndFilter; 031import org.jivesoftware.smack.filter.StanzaFilter; 032import org.jivesoftware.smack.filter.StanzaTypeFilter; 033import org.jivesoftware.smack.packet.IQ; 034import org.jivesoftware.smack.packet.Message; 035import org.jivesoftware.smack.packet.Stanza; 036import org.jivesoftware.smack.packet.XMPPError; 037import org.jivesoftware.smack.util.stringencoder.Base64; 038import org.jivesoftware.smackx.bytestreams.BytestreamSession; 039import org.jivesoftware.smackx.bytestreams.ibb.packet.Close; 040import org.jivesoftware.smackx.bytestreams.ibb.packet.Data; 041import org.jivesoftware.smackx.bytestreams.ibb.packet.DataPacketExtension; 042import org.jivesoftware.smackx.bytestreams.ibb.packet.Open; 043 044/** 045 * InBandBytestreamSession class represents an In-Band Bytestream session. 046 * <p> 047 * In-band bytestreams are bidirectional and this session encapsulates the streams for both 048 * directions. 049 * <p> 050 * Note that closing the In-Band Bytestream session will close both streams. If both streams are 051 * closed individually the session will be closed automatically once the second stream is closed. 052 * Use the {@link #setCloseBothStreamsEnabled(boolean)} method if both streams should be closed 053 * automatically if one of them is closed. 054 * 055 * @author Henning Staib 056 */ 057public class InBandBytestreamSession implements BytestreamSession { 058 059 /* XMPP connection */ 060 private final XMPPConnection connection; 061 062 /* the In-Band Bytestream open request for this session */ 063 private final Open byteStreamRequest; 064 065 /* 066 * the input stream for this session (either IQIBBInputStream or MessageIBBInputStream) 067 */ 068 private IBBInputStream inputStream; 069 070 /* 071 * the output stream for this session (either IQIBBOutputStream or MessageIBBOutputStream) 072 */ 073 private IBBOutputStream outputStream; 074 075 /* JID of the remote peer */ 076 private String remoteJID; 077 078 /* flag to close both streams if one of them is closed */ 079 private boolean closeBothStreamsEnabled = false; 080 081 /* flag to indicate if session is closed */ 082 private boolean isClosed = false; 083 084 /** 085 * Constructor. 086 * 087 * @param connection the XMPP connection 088 * @param byteStreamRequest the In-Band Bytestream open request for this session 089 * @param remoteJID JID of the remote peer 090 */ 091 protected InBandBytestreamSession(XMPPConnection connection, Open byteStreamRequest, 092 String remoteJID) { 093 this.connection = connection; 094 this.byteStreamRequest = byteStreamRequest; 095 this.remoteJID = remoteJID; 096 097 // initialize streams dependent to the uses stanza type 098 switch (byteStreamRequest.getStanza()) { 099 case IQ: 100 this.inputStream = new IQIBBInputStream(); 101 this.outputStream = new IQIBBOutputStream(); 102 break; 103 case MESSAGE: 104 this.inputStream = new MessageIBBInputStream(); 105 this.outputStream = new MessageIBBOutputStream(); 106 break; 107 } 108 109 } 110 111 public InputStream getInputStream() { 112 return this.inputStream; 113 } 114 115 public OutputStream getOutputStream() { 116 return this.outputStream; 117 } 118 119 public int getReadTimeout() { 120 return this.inputStream.readTimeout; 121 } 122 123 public void setReadTimeout(int timeout) { 124 if (timeout < 0) { 125 throw new IllegalArgumentException("Timeout must be >= 0"); 126 } 127 this.inputStream.readTimeout = timeout; 128 } 129 130 /** 131 * Returns whether both streams should be closed automatically if one of the streams is closed. 132 * Default is <code>false</code>. 133 * 134 * @return <code>true</code> if both streams will be closed if one of the streams is closed, 135 * <code>false</code> if both streams can be closed independently. 136 */ 137 public boolean isCloseBothStreamsEnabled() { 138 return closeBothStreamsEnabled; 139 } 140 141 /** 142 * Sets whether both streams should be closed automatically if one of the streams is closed. 143 * Default is <code>false</code>. 144 * 145 * @param closeBothStreamsEnabled <code>true</code> if both streams should be closed if one of 146 * the streams is closed, <code>false</code> if both streams should be closed 147 * independently 148 */ 149 public void setCloseBothStreamsEnabled(boolean closeBothStreamsEnabled) { 150 this.closeBothStreamsEnabled = closeBothStreamsEnabled; 151 } 152 153 public void close() throws IOException { 154 closeByLocal(true); // close input stream 155 closeByLocal(false); // close output stream 156 } 157 158 /** 159 * This method is invoked if a request to close the In-Band Bytestream has been received. 160 * 161 * @param closeRequest the close request from the remote peer 162 * @throws NotConnectedException 163 */ 164 protected void closeByPeer(Close closeRequest) throws NotConnectedException { 165 166 /* 167 * close streams without flushing them, because stream is already considered closed on the 168 * remote peers side 169 */ 170 this.inputStream.closeInternal(); 171 this.inputStream.cleanup(); 172 this.outputStream.closeInternal(false); 173 174 // acknowledge close request 175 IQ confirmClose = IQ.createResultIQ(closeRequest); 176 this.connection.sendStanza(confirmClose); 177 178 } 179 180 /** 181 * This method is invoked if one of the streams has been closed locally, if an error occurred 182 * locally or if the whole session should be closed. 183 * 184 * @throws IOException if an error occurs while sending the close request 185 */ 186 protected synchronized void closeByLocal(boolean in) throws IOException { 187 if (this.isClosed) { 188 return; 189 } 190 191 if (this.closeBothStreamsEnabled) { 192 this.inputStream.closeInternal(); 193 this.outputStream.closeInternal(true); 194 } 195 else { 196 if (in) { 197 this.inputStream.closeInternal(); 198 } 199 else { 200 // close stream but try to send any data left 201 this.outputStream.closeInternal(true); 202 } 203 } 204 205 if (this.inputStream.isClosed && this.outputStream.isClosed) { 206 this.isClosed = true; 207 208 // send close request 209 Close close = new Close(this.byteStreamRequest.getSessionID()); 210 close.setTo(this.remoteJID); 211 try { 212 connection.createPacketCollectorAndSend(close).nextResultOrThrow(); 213 } 214 catch (Exception e) { 215 // Sadly we are unable to use the IOException(Throwable) constructor because this 216 // constructor is only supported from Android API 9 on. 217 IOException ioException = new IOException(); 218 ioException.initCause(e); 219 throw ioException; 220 } 221 222 this.inputStream.cleanup(); 223 224 // remove session from manager 225 InBandBytestreamManager.getByteStreamManager(this.connection).getSessions().remove(this); 226 } 227 228 } 229 230 /** 231 * IBBInputStream class is the base implementation of an In-Band Bytestream input stream. 232 * Subclasses of this input stream must provide a stanza(/packet) listener along with a stanza(/packet) filter to 233 * collect the In-Band Bytestream data packets. 234 */ 235 private abstract class IBBInputStream extends InputStream { 236 237 /* the data packet listener to fill the data queue */ 238 private final StanzaListener dataPacketListener; 239 240 /* queue containing received In-Band Bytestream data packets */ 241 protected final BlockingQueue<DataPacketExtension> dataQueue = new LinkedBlockingQueue<DataPacketExtension>(); 242 243 /* buffer containing the data from one data packet */ 244 private byte[] buffer; 245 246 /* pointer to the next byte to read from buffer */ 247 private int bufferPointer = -1; 248 249 /* data packet sequence (range from 0 to 65535) */ 250 private long seq = -1; 251 252 /* flag to indicate if input stream is closed */ 253 private boolean isClosed = false; 254 255 /* flag to indicate if close method was invoked */ 256 private boolean closeInvoked = false; 257 258 /* timeout for read operations */ 259 private int readTimeout = 0; 260 261 /** 262 * Constructor. 263 */ 264 public IBBInputStream() { 265 // add data packet listener to connection 266 this.dataPacketListener = getDataPacketListener(); 267 connection.addSyncStanzaListener(this.dataPacketListener, getDataPacketFilter()); 268 } 269 270 /** 271 * Returns the stanza(/packet) listener that processes In-Band Bytestream data packets. 272 * 273 * @return the data stanza(/packet) listener 274 */ 275 protected abstract StanzaListener getDataPacketListener(); 276 277 /** 278 * Returns the stanza(/packet) filter that accepts In-Band Bytestream data packets. 279 * 280 * @return the data stanza(/packet) filter 281 */ 282 protected abstract StanzaFilter getDataPacketFilter(); 283 284 public synchronized int read() throws IOException { 285 checkClosed(); 286 287 // if nothing read yet or whole buffer has been read fill buffer 288 if (bufferPointer == -1 || bufferPointer >= buffer.length) { 289 // if no data available and stream was closed return -1 290 if (!loadBuffer()) { 291 return -1; 292 } 293 } 294 295 // return byte and increment buffer pointer 296 return ((int) buffer[bufferPointer++]) & 0xff; 297 } 298 299 public synchronized int read(byte[] b, int off, int len) throws IOException { 300 if (b == null) { 301 throw new NullPointerException(); 302 } 303 else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) 304 || ((off + len) < 0)) { 305 throw new IndexOutOfBoundsException(); 306 } 307 else if (len == 0) { 308 return 0; 309 } 310 311 checkClosed(); 312 313 // if nothing read yet or whole buffer has been read fill buffer 314 if (bufferPointer == -1 || bufferPointer >= buffer.length) { 315 // if no data available and stream was closed return -1 316 if (!loadBuffer()) { 317 return -1; 318 } 319 } 320 321 // if more bytes wanted than available return all available 322 int bytesAvailable = buffer.length - bufferPointer; 323 if (len > bytesAvailable) { 324 len = bytesAvailable; 325 } 326 327 System.arraycopy(buffer, bufferPointer, b, off, len); 328 bufferPointer += len; 329 return len; 330 } 331 332 public synchronized int read(byte[] b) throws IOException { 333 return read(b, 0, b.length); 334 } 335 336 /** 337 * This method blocks until a data stanza(/packet) is received, the stream is closed or the current 338 * thread is interrupted. 339 * 340 * @return <code>true</code> if data was received, otherwise <code>false</code> 341 * @throws IOException if data packets are out of sequence 342 */ 343 private synchronized boolean loadBuffer() throws IOException { 344 345 // wait until data is available or stream is closed 346 DataPacketExtension data = null; 347 try { 348 if (this.readTimeout == 0) { 349 while (data == null) { 350 if (isClosed && this.dataQueue.isEmpty()) { 351 return false; 352 } 353 data = this.dataQueue.poll(1000, TimeUnit.MILLISECONDS); 354 } 355 } 356 else { 357 data = this.dataQueue.poll(this.readTimeout, TimeUnit.MILLISECONDS); 358 if (data == null) { 359 throw new SocketTimeoutException(); 360 } 361 } 362 } 363 catch (InterruptedException e) { 364 // Restore the interrupted status 365 Thread.currentThread().interrupt(); 366 return false; 367 } 368 369 // handle sequence overflow 370 if (this.seq == 65535) { 371 this.seq = -1; 372 } 373 374 // check if data packets sequence is successor of last seen sequence 375 long seq = data.getSeq(); 376 if (seq - 1 != this.seq) { 377 // packets out of order; close stream/session 378 InBandBytestreamSession.this.close(); 379 throw new IOException("Packets out of sequence"); 380 } 381 else { 382 this.seq = seq; 383 } 384 385 // set buffer to decoded data 386 buffer = data.getDecodedData(); 387 bufferPointer = 0; 388 return true; 389 } 390 391 /** 392 * Checks if this stream is closed and throws an IOException if necessary 393 * 394 * @throws IOException if stream is closed and no data should be read anymore 395 */ 396 private void checkClosed() throws IOException { 397 // Throw an exception if, and only if, this stream has been already 398 // closed by the user using the close() method 399 if (closeInvoked) { 400 // clear data queue in case additional data was received after stream was closed 401 this.dataQueue.clear(); 402 throw new IOException("Stream is closed"); 403 } 404 } 405 406 public boolean markSupported() { 407 return false; 408 } 409 410 public void close() throws IOException { 411 if (closeInvoked) { 412 return; 413 } 414 415 this.closeInvoked = true; 416 417 InBandBytestreamSession.this.closeByLocal(true); 418 } 419 420 /** 421 * This method sets the close flag and removes the data stanza(/packet) listener. 422 */ 423 private void closeInternal() { 424 if (isClosed) { 425 return; 426 } 427 isClosed = true; 428 } 429 430 /** 431 * Invoked if the session is closed. 432 */ 433 private void cleanup() { 434 connection.removeSyncStanzaListener(this.dataPacketListener); 435 } 436 437 } 438 439 /** 440 * IQIBBInputStream class implements IBBInputStream to be used with IQ stanzas encapsulating the 441 * data packets. 442 */ 443 private class IQIBBInputStream extends IBBInputStream { 444 445 protected StanzaListener getDataPacketListener() { 446 return new StanzaListener() { 447 448 private long lastSequence = -1; 449 450 public void processPacket(Stanza packet) throws NotConnectedException { 451 // get data packet extension 452 DataPacketExtension data = ((Data) packet).getDataPacketExtension(); 453 454 /* 455 * check if sequence was not used already (see XEP-0047 Section 2.2) 456 */ 457 if (data.getSeq() <= this.lastSequence) { 458 IQ unexpectedRequest = IQ.createErrorResponse((IQ) packet, new XMPPError( 459 XMPPError.Condition.unexpected_request)); 460 connection.sendStanza(unexpectedRequest); 461 return; 462 463 } 464 465 // check if encoded data is valid (see XEP-0047 Section 2.2) 466 if (data.getDecodedData() == null) { 467 // data is invalid; respond with bad-request error 468 IQ badRequest = IQ.createErrorResponse((IQ) packet, new XMPPError( 469 XMPPError.Condition.bad_request)); 470 connection.sendStanza(badRequest); 471 return; 472 } 473 474 // data is valid; add to data queue 475 dataQueue.offer(data); 476 477 // confirm IQ 478 IQ confirmData = IQ.createResultIQ((IQ) packet); 479 connection.sendStanza(confirmData); 480 481 // set last seen sequence 482 this.lastSequence = data.getSeq(); 483 if (this.lastSequence == 65535) { 484 this.lastSequence = -1; 485 } 486 487 } 488 489 }; 490 } 491 492 protected StanzaFilter getDataPacketFilter() { 493 /* 494 * filter all IQ stanzas having type 'SET' (represented by Data class), containing a 495 * data stanza(/packet) extension, matching session ID and recipient 496 */ 497 return new AndFilter(new StanzaTypeFilter(Data.class), new IBBDataPacketFilter()); 498 } 499 500 } 501 502 /** 503 * MessageIBBInputStream class implements IBBInputStream to be used with message stanzas 504 * encapsulating the data packets. 505 */ 506 private class MessageIBBInputStream extends IBBInputStream { 507 508 protected StanzaListener getDataPacketListener() { 509 return new StanzaListener() { 510 511 public void processPacket(Stanza packet) { 512 // get data packet extension 513 DataPacketExtension data = (DataPacketExtension) packet.getExtension( 514 DataPacketExtension.ELEMENT, 515 DataPacketExtension.NAMESPACE); 516 517 // check if encoded data is valid 518 if (data.getDecodedData() == null) { 519 /* 520 * TODO once a majority of XMPP server implementation support XEP-0079 521 * Advanced Message Processing the invalid message could be answered with an 522 * appropriate error. For now we just ignore the packet. Subsequent packets 523 * with an increased sequence will cause the input stream to close the 524 * stream/session. 525 */ 526 return; 527 } 528 529 // data is valid; add to data queue 530 dataQueue.offer(data); 531 532 // TODO confirm packet once XMPP servers support XEP-0079 533 } 534 535 }; 536 } 537 538 @Override 539 protected StanzaFilter getDataPacketFilter() { 540 /* 541 * filter all message stanzas containing a data stanza(/packet) extension, matching session ID 542 * and recipient 543 */ 544 return new AndFilter(new StanzaTypeFilter(Message.class), new IBBDataPacketFilter()); 545 } 546 547 } 548 549 /** 550 * IBBDataPacketFilter class filters all packets from the remote peer of this session, 551 * containing an In-Band Bytestream data stanza(/packet) extension whose session ID matches this sessions 552 * ID. 553 */ 554 private class IBBDataPacketFilter implements StanzaFilter { 555 556 public boolean accept(Stanza packet) { 557 // sender equals remote peer 558 if (!packet.getFrom().equalsIgnoreCase(remoteJID)) { 559 return false; 560 } 561 562 DataPacketExtension data; 563 if (packet instanceof Data) { 564 data = ((Data) packet).getDataPacketExtension(); 565 } else { 566 // stanza contains data packet extension 567 data = packet.getExtension( 568 DataPacketExtension.ELEMENT, 569 DataPacketExtension.NAMESPACE); 570 if (data == null) { 571 return false; 572 } 573 } 574 575 // session ID equals this session ID 576 if (!data.getSessionID().equals(byteStreamRequest.getSessionID())) { 577 return false; 578 } 579 580 return true; 581 } 582 583 } 584 585 /** 586 * IBBOutputStream class is the base implementation of an In-Band Bytestream output stream. 587 * Subclasses of this output stream must provide a method to send data over XMPP stream. 588 */ 589 private abstract class IBBOutputStream extends OutputStream { 590 591 /* buffer with the size of this sessions block size */ 592 protected final byte[] buffer; 593 594 /* pointer to next byte to write to buffer */ 595 protected int bufferPointer = 0; 596 597 /* data packet sequence (range from 0 to 65535) */ 598 protected long seq = 0; 599 600 /* flag to indicate if output stream is closed */ 601 protected boolean isClosed = false; 602 603 /** 604 * Constructor. 605 */ 606 public IBBOutputStream() { 607 this.buffer = new byte[byteStreamRequest.getBlockSize()]; 608 } 609 610 /** 611 * Writes the given data stanza(/packet) to the XMPP stream. 612 * 613 * @param data the data packet 614 * @throws IOException if an I/O error occurred while sending or if the stream is closed 615 * @throws NotConnectedException 616 */ 617 protected abstract void writeToXML(DataPacketExtension data) throws IOException, NotConnectedException; 618 619 public synchronized void write(int b) throws IOException { 620 if (this.isClosed) { 621 throw new IOException("Stream is closed"); 622 } 623 624 // if buffer is full flush buffer 625 if (bufferPointer >= buffer.length) { 626 flushBuffer(); 627 } 628 629 buffer[bufferPointer++] = (byte) b; 630 } 631 632 public synchronized void write(byte[] b, int off, int len) throws IOException { 633 if (b == null) { 634 throw new NullPointerException(); 635 } 636 else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) 637 || ((off + len) < 0)) { 638 throw new IndexOutOfBoundsException(); 639 } 640 else if (len == 0) { 641 return; 642 } 643 644 if (this.isClosed) { 645 throw new IOException("Stream is closed"); 646 } 647 648 // is data to send greater than buffer size 649 if (len >= buffer.length) { 650 651 // "byte" off the first chunk to write out 652 writeOut(b, off, buffer.length); 653 654 // recursively call this method with the lesser amount 655 write(b, off + buffer.length, len - buffer.length); 656 } 657 else { 658 writeOut(b, off, len); 659 } 660 } 661 662 public synchronized void write(byte[] b) throws IOException { 663 write(b, 0, b.length); 664 } 665 666 /** 667 * Fills the buffer with the given data and sends it over the XMPP stream if the buffers 668 * capacity has been reached. This method is only called from this class so it is assured 669 * that the amount of data to send is <= buffer capacity 670 * 671 * @param b the data 672 * @param off the data 673 * @param len the number of bytes to write 674 * @throws IOException if an I/O error occurred while sending or if the stream is closed 675 */ 676 private synchronized void writeOut(byte[] b, int off, int len) throws IOException { 677 if (this.isClosed) { 678 throw new IOException("Stream is closed"); 679 } 680 681 // set to 0 in case the next 'if' block is not executed 682 int available = 0; 683 684 // is data to send greater that buffer space left 685 if (len > buffer.length - bufferPointer) { 686 // fill buffer to capacity and send it 687 available = buffer.length - bufferPointer; 688 System.arraycopy(b, off, buffer, bufferPointer, available); 689 bufferPointer += available; 690 flushBuffer(); 691 } 692 693 // copy the data left to buffer 694 System.arraycopy(b, off + available, buffer, bufferPointer, len - available); 695 bufferPointer += len - available; 696 } 697 698 public synchronized void flush() throws IOException { 699 if (this.isClosed) { 700 throw new IOException("Stream is closed"); 701 } 702 flushBuffer(); 703 } 704 705 private synchronized void flushBuffer() throws IOException { 706 707 // do nothing if no data to send available 708 if (bufferPointer == 0) { 709 return; 710 } 711 712 // create data packet 713 String enc = Base64.encodeToString(buffer, 0, bufferPointer); 714 DataPacketExtension data = new DataPacketExtension(byteStreamRequest.getSessionID(), 715 this.seq, enc); 716 717 // write to XMPP stream 718 try { 719 writeToXML(data); 720 } 721 catch (NotConnectedException e) { 722 IOException ioException = new IOException(); 723 ioException.initCause(e); 724 throw ioException; 725 } 726 727 // reset buffer pointer 728 bufferPointer = 0; 729 730 // increment sequence, considering sequence overflow 731 this.seq = (this.seq + 1 == 65535 ? 0 : this.seq + 1); 732 733 } 734 735 public void close() throws IOException { 736 if (isClosed) { 737 return; 738 } 739 InBandBytestreamSession.this.closeByLocal(false); 740 } 741 742 /** 743 * Sets the close flag and optionally flushes the stream. 744 * 745 * @param flush if <code>true</code> flushes the stream 746 */ 747 protected void closeInternal(boolean flush) { 748 if (this.isClosed) { 749 return; 750 } 751 this.isClosed = true; 752 753 try { 754 if (flush) { 755 flushBuffer(); 756 } 757 } 758 catch (IOException e) { 759 /* 760 * ignore, because writeToXML() will not throw an exception if stream is already 761 * closed 762 */ 763 } 764 } 765 766 } 767 768 /** 769 * IQIBBOutputStream class implements IBBOutputStream to be used with IQ stanzas encapsulating 770 * the data packets. 771 */ 772 private class IQIBBOutputStream extends IBBOutputStream { 773 774 @Override 775 protected synchronized void writeToXML(DataPacketExtension data) throws IOException { 776 // create IQ stanza containing data packet 777 IQ iq = new Data(data); 778 iq.setTo(remoteJID); 779 780 try { 781 connection.createPacketCollectorAndSend(iq).nextResultOrThrow(); 782 } 783 catch (Exception e) { 784 // close session unless it is already closed 785 if (!this.isClosed) { 786 InBandBytestreamSession.this.close(); 787 // Sadly we are unable to use the IOException(Throwable) constructor because this 788 // constructor is only supported from Android API 9 on. 789 IOException ioException = new IOException(); 790 ioException.initCause(e); 791 throw ioException; 792 } 793 } 794 795 } 796 797 } 798 799 /** 800 * MessageIBBOutputStream class implements IBBOutputStream to be used with message stanzas 801 * encapsulating the data packets. 802 */ 803 private class MessageIBBOutputStream extends IBBOutputStream { 804 805 @Override 806 protected synchronized void writeToXML(DataPacketExtension data) throws NotConnectedException { 807 // create message stanza containing data packet 808 Message message = new Message(remoteJID); 809 message.addExtension(data); 810 811 connection.sendStanza(message); 812 813 } 814 815 } 816 817 /** 818 * @param data 819 * @throws NotConnectedException 820 */ 821 public void processIQPacket(Data data) throws NotConnectedException { 822 inputStream.dataPacketListener.processPacket(data); 823 } 824 825}