001/**
002 *
003 * Copyright the original author or authors
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smackx.bytestreams.ibb;
018
019import java.io.IOException;
020import java.io.InputStream;
021import java.io.OutputStream;
022import java.net.SocketTimeoutException;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.LinkedBlockingQueue;
025import java.util.concurrent.TimeUnit;
026
027import org.jivesoftware.smack.SmackException.NotConnectedException;
028import org.jivesoftware.smack.XMPPConnection;
029import org.jivesoftware.smack.StanzaListener;
030import org.jivesoftware.smack.filter.AndFilter;
031import org.jivesoftware.smack.filter.StanzaFilter;
032import org.jivesoftware.smack.filter.StanzaTypeFilter;
033import org.jivesoftware.smack.packet.IQ;
034import org.jivesoftware.smack.packet.Message;
035import org.jivesoftware.smack.packet.Stanza;
036import org.jivesoftware.smack.packet.XMPPError;
037import org.jivesoftware.smack.util.stringencoder.Base64;
038import org.jivesoftware.smackx.bytestreams.BytestreamSession;
039import org.jivesoftware.smackx.bytestreams.ibb.packet.Close;
040import org.jivesoftware.smackx.bytestreams.ibb.packet.Data;
041import org.jivesoftware.smackx.bytestreams.ibb.packet.DataPacketExtension;
042import org.jivesoftware.smackx.bytestreams.ibb.packet.Open;
043
044/**
045 * InBandBytestreamSession class represents an In-Band Bytestream session.
046 * <p>
047 * In-band bytestreams are bidirectional and this session encapsulates the streams for both
048 * directions.
049 * <p>
050 * Note that closing the In-Band Bytestream session will close both streams. If both streams are
051 * closed individually the session will be closed automatically once the second stream is closed.
052 * Use the {@link #setCloseBothStreamsEnabled(boolean)} method if both streams should be closed
053 * automatically if one of them is closed.
054 * 
055 * @author Henning Staib
056 */
057public class InBandBytestreamSession implements BytestreamSession {
058
059    /* XMPP connection */
060    private final XMPPConnection connection;
061
062    /* the In-Band Bytestream open request for this session */
063    private final Open byteStreamRequest;
064
065    /*
066     * the input stream for this session (either IQIBBInputStream or MessageIBBInputStream)
067     */
068    private IBBInputStream inputStream;
069
070    /*
071     * the output stream for this session (either IQIBBOutputStream or MessageIBBOutputStream)
072     */
073    private IBBOutputStream outputStream;
074
075    /* JID of the remote peer */
076    private String remoteJID;
077
078    /* flag to close both streams if one of them is closed */
079    private boolean closeBothStreamsEnabled = false;
080
081    /* flag to indicate if session is closed */
082    private boolean isClosed = false;
083
084    /**
085     * Constructor.
086     * 
087     * @param connection the XMPP connection
088     * @param byteStreamRequest the In-Band Bytestream open request for this session
089     * @param remoteJID JID of the remote peer
090     */
091    protected InBandBytestreamSession(XMPPConnection connection, Open byteStreamRequest,
092                    String remoteJID) {
093        this.connection = connection;
094        this.byteStreamRequest = byteStreamRequest;
095        this.remoteJID = remoteJID;
096
097        // initialize streams dependent to the uses stanza type
098        switch (byteStreamRequest.getStanza()) {
099        case IQ:
100            this.inputStream = new IQIBBInputStream();
101            this.outputStream = new IQIBBOutputStream();
102            break;
103        case MESSAGE:
104            this.inputStream = new MessageIBBInputStream();
105            this.outputStream = new MessageIBBOutputStream();
106            break;
107        }
108
109    }
110
111    public InputStream getInputStream() {
112        return this.inputStream;
113    }
114
115    public OutputStream getOutputStream() {
116        return this.outputStream;
117    }
118
119    public int getReadTimeout() {
120        return this.inputStream.readTimeout;
121    }
122
123    public void setReadTimeout(int timeout) {
124        if (timeout < 0) {
125            throw new IllegalArgumentException("Timeout must be >= 0");
126        }
127        this.inputStream.readTimeout = timeout;
128    }
129
130    /**
131     * Returns whether both streams should be closed automatically if one of the streams is closed.
132     * Default is <code>false</code>.
133     * 
134     * @return <code>true</code> if both streams will be closed if one of the streams is closed,
135     *         <code>false</code> if both streams can be closed independently.
136     */
137    public boolean isCloseBothStreamsEnabled() {
138        return closeBothStreamsEnabled;
139    }
140
141    /**
142     * Sets whether both streams should be closed automatically if one of the streams is closed.
143     * Default is <code>false</code>.
144     * 
145     * @param closeBothStreamsEnabled <code>true</code> if both streams should be closed if one of
146     *        the streams is closed, <code>false</code> if both streams should be closed
147     *        independently
148     */
149    public void setCloseBothStreamsEnabled(boolean closeBothStreamsEnabled) {
150        this.closeBothStreamsEnabled = closeBothStreamsEnabled;
151    }
152
153    public void close() throws IOException {
154        closeByLocal(true); // close input stream
155        closeByLocal(false); // close output stream
156    }
157
158    /**
159     * This method is invoked if a request to close the In-Band Bytestream has been received.
160     * 
161     * @param closeRequest the close request from the remote peer
162     * @throws NotConnectedException 
163     */
164    protected void closeByPeer(Close closeRequest) throws NotConnectedException {
165
166        /*
167         * close streams without flushing them, because stream is already considered closed on the
168         * remote peers side
169         */
170        this.inputStream.closeInternal();
171        this.inputStream.cleanup();
172        this.outputStream.closeInternal(false);
173
174        // acknowledge close request
175        IQ confirmClose = IQ.createResultIQ(closeRequest);
176        this.connection.sendStanza(confirmClose);
177
178    }
179
180    /**
181     * This method is invoked if one of the streams has been closed locally, if an error occurred
182     * locally or if the whole session should be closed.
183     * 
184     * @throws IOException if an error occurs while sending the close request
185     */
186    protected synchronized void closeByLocal(boolean in) throws IOException {
187        if (this.isClosed) {
188            return;
189        }
190
191        if (this.closeBothStreamsEnabled) {
192            this.inputStream.closeInternal();
193            this.outputStream.closeInternal(true);
194        }
195        else {
196            if (in) {
197                this.inputStream.closeInternal();
198            }
199            else {
200                // close stream but try to send any data left
201                this.outputStream.closeInternal(true);
202            }
203        }
204
205        if (this.inputStream.isClosed && this.outputStream.isClosed) {
206            this.isClosed = true;
207
208            // send close request
209            Close close = new Close(this.byteStreamRequest.getSessionID());
210            close.setTo(this.remoteJID);
211            try {
212                connection.createPacketCollectorAndSend(close).nextResultOrThrow();
213            }
214            catch (Exception e) {
215                // Sadly we are unable to use the IOException(Throwable) constructor because this
216                // constructor is only supported from Android API 9 on.
217                IOException ioException = new IOException();
218                ioException.initCause(e);
219                throw ioException;
220            }
221
222            this.inputStream.cleanup();
223
224            // remove session from manager
225            InBandBytestreamManager.getByteStreamManager(this.connection).getSessions().remove(this);
226        }
227
228    }
229
230    /**
231     * IBBInputStream class is the base implementation of an In-Band Bytestream input stream.
232     * Subclasses of this input stream must provide a stanza(/packet) listener along with a stanza(/packet) filter to
233     * collect the In-Band Bytestream data packets.
234     */
235    private abstract class IBBInputStream extends InputStream {
236
237        /* the data packet listener to fill the data queue */
238        private final StanzaListener dataPacketListener;
239
240        /* queue containing received In-Band Bytestream data packets */
241        protected final BlockingQueue<DataPacketExtension> dataQueue = new LinkedBlockingQueue<DataPacketExtension>();
242
243        /* buffer containing the data from one data packet */
244        private byte[] buffer;
245
246        /* pointer to the next byte to read from buffer */
247        private int bufferPointer = -1;
248
249        /* data packet sequence (range from 0 to 65535) */
250        private long seq = -1;
251
252        /* flag to indicate if input stream is closed */
253        private boolean isClosed = false;
254
255        /* flag to indicate if close method was invoked */
256        private boolean closeInvoked = false;
257
258        /* timeout for read operations */
259        private int readTimeout = 0;
260
261        /**
262         * Constructor.
263         */
264        public IBBInputStream() {
265            // add data packet listener to connection
266            this.dataPacketListener = getDataPacketListener();
267            connection.addSyncStanzaListener(this.dataPacketListener, getDataPacketFilter());
268        }
269
270        /**
271         * Returns the stanza(/packet) listener that processes In-Band Bytestream data packets.
272         * 
273         * @return the data stanza(/packet) listener
274         */
275        protected abstract StanzaListener getDataPacketListener();
276
277        /**
278         * Returns the stanza(/packet) filter that accepts In-Band Bytestream data packets.
279         * 
280         * @return the data stanza(/packet) filter
281         */
282        protected abstract StanzaFilter getDataPacketFilter();
283
284        public synchronized int read() throws IOException {
285            checkClosed();
286
287            // if nothing read yet or whole buffer has been read fill buffer
288            if (bufferPointer == -1 || bufferPointer >= buffer.length) {
289                // if no data available and stream was closed return -1
290                if (!loadBuffer()) {
291                    return -1;
292                }
293            }
294
295            // return byte and increment buffer pointer
296            return ((int) buffer[bufferPointer++]) & 0xff;
297        }
298
299        public synchronized int read(byte[] b, int off, int len) throws IOException {
300            if (b == null) {
301                throw new NullPointerException();
302            }
303            else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
304                            || ((off + len) < 0)) {
305                throw new IndexOutOfBoundsException();
306            }
307            else if (len == 0) {
308                return 0;
309            }
310
311            checkClosed();
312
313            // if nothing read yet or whole buffer has been read fill buffer
314            if (bufferPointer == -1 || bufferPointer >= buffer.length) {
315                // if no data available and stream was closed return -1
316                if (!loadBuffer()) {
317                    return -1;
318                }
319            }
320
321            // if more bytes wanted than available return all available
322            int bytesAvailable = buffer.length - bufferPointer;
323            if (len > bytesAvailable) {
324                len = bytesAvailable;
325            }
326
327            System.arraycopy(buffer, bufferPointer, b, off, len);
328            bufferPointer += len;
329            return len;
330        }
331
332        public synchronized int read(byte[] b) throws IOException {
333            return read(b, 0, b.length);
334        }
335
336        /**
337         * This method blocks until a data stanza(/packet) is received, the stream is closed or the current
338         * thread is interrupted.
339         * 
340         * @return <code>true</code> if data was received, otherwise <code>false</code>
341         * @throws IOException if data packets are out of sequence
342         */
343        private synchronized boolean loadBuffer() throws IOException {
344
345            // wait until data is available or stream is closed
346            DataPacketExtension data = null;
347            try {
348                if (this.readTimeout == 0) {
349                    while (data == null) {
350                        if (isClosed && this.dataQueue.isEmpty()) {
351                            return false;
352                        }
353                        data = this.dataQueue.poll(1000, TimeUnit.MILLISECONDS);
354                    }
355                }
356                else {
357                    data = this.dataQueue.poll(this.readTimeout, TimeUnit.MILLISECONDS);
358                    if (data == null) {
359                        throw new SocketTimeoutException();
360                    }
361                }
362            }
363            catch (InterruptedException e) {
364                // Restore the interrupted status
365                Thread.currentThread().interrupt();
366                return false;
367            }
368
369            // handle sequence overflow
370            if (this.seq == 65535) {
371                this.seq = -1;
372            }
373
374            // check if data packets sequence is successor of last seen sequence
375            long seq = data.getSeq();
376            if (seq - 1 != this.seq) {
377                // packets out of order; close stream/session
378                InBandBytestreamSession.this.close();
379                throw new IOException("Packets out of sequence");
380            }
381            else {
382                this.seq = seq;
383            }
384
385            // set buffer to decoded data
386            buffer = data.getDecodedData();
387            bufferPointer = 0;
388            return true;
389        }
390
391        /**
392         * Checks if this stream is closed and throws an IOException if necessary
393         * 
394         * @throws IOException if stream is closed and no data should be read anymore
395         */
396        private void checkClosed() throws IOException {
397            // Throw an exception if, and only if, this stream has been already
398            // closed by the user using the close() method
399            if (closeInvoked) {
400                // clear data queue in case additional data was received after stream was closed
401                this.dataQueue.clear();
402                throw new IOException("Stream is closed");
403            }
404        }
405
406        public boolean markSupported() {
407            return false;
408        }
409
410        public void close() throws IOException {
411            if (closeInvoked) {
412                return;
413            }
414
415            this.closeInvoked = true;
416
417            InBandBytestreamSession.this.closeByLocal(true);
418        }
419
420        /**
421         * This method sets the close flag and removes the data stanza(/packet) listener.
422         */
423        private void closeInternal() {
424            if (isClosed) {
425                return;
426            }
427            isClosed = true;
428        }
429
430        /**
431         * Invoked if the session is closed.
432         */
433        private void cleanup() {
434            connection.removeSyncStanzaListener(this.dataPacketListener);
435        }
436
437    }
438
439    /**
440     * IQIBBInputStream class implements IBBInputStream to be used with IQ stanzas encapsulating the
441     * data packets.
442     */
443    private class IQIBBInputStream extends IBBInputStream {
444
445        protected StanzaListener getDataPacketListener() {
446            return new StanzaListener() {
447
448                private long lastSequence = -1;
449
450                public void processPacket(Stanza packet) throws NotConnectedException {
451                    // get data packet extension
452                    DataPacketExtension data = ((Data) packet).getDataPacketExtension();
453
454                    /*
455                     * check if sequence was not used already (see XEP-0047 Section 2.2)
456                     */
457                    if (data.getSeq() <= this.lastSequence) {
458                        IQ unexpectedRequest = IQ.createErrorResponse((IQ) packet, new XMPPError(
459                                        XMPPError.Condition.unexpected_request));
460                        connection.sendStanza(unexpectedRequest);
461                        return;
462
463                    }
464
465                    // check if encoded data is valid (see XEP-0047 Section 2.2)
466                    if (data.getDecodedData() == null) {
467                        // data is invalid; respond with bad-request error
468                        IQ badRequest = IQ.createErrorResponse((IQ) packet, new XMPPError(
469                                        XMPPError.Condition.bad_request));
470                        connection.sendStanza(badRequest);
471                        return;
472                    }
473
474                    // data is valid; add to data queue
475                    dataQueue.offer(data);
476
477                    // confirm IQ
478                    IQ confirmData = IQ.createResultIQ((IQ) packet);
479                    connection.sendStanza(confirmData);
480
481                    // set last seen sequence
482                    this.lastSequence = data.getSeq();
483                    if (this.lastSequence == 65535) {
484                        this.lastSequence = -1;
485                    }
486
487                }
488
489            };
490        }
491
492        protected StanzaFilter getDataPacketFilter() {
493            /*
494             * filter all IQ stanzas having type 'SET' (represented by Data class), containing a
495             * data stanza(/packet) extension, matching session ID and recipient
496             */
497            return new AndFilter(new StanzaTypeFilter(Data.class), new IBBDataPacketFilter());
498        }
499
500    }
501
502    /**
503     * MessageIBBInputStream class implements IBBInputStream to be used with message stanzas
504     * encapsulating the data packets.
505     */
506    private class MessageIBBInputStream extends IBBInputStream {
507
508        protected StanzaListener getDataPacketListener() {
509            return new StanzaListener() {
510
511                public void processPacket(Stanza packet) {
512                    // get data packet extension
513                    DataPacketExtension data = (DataPacketExtension) packet.getExtension(
514                                    DataPacketExtension.ELEMENT,
515                                    DataPacketExtension.NAMESPACE);
516
517                    // check if encoded data is valid
518                    if (data.getDecodedData() == null) {
519                        /*
520                         * TODO once a majority of XMPP server implementation support XEP-0079
521                         * Advanced Message Processing the invalid message could be answered with an
522                         * appropriate error. For now we just ignore the packet. Subsequent packets
523                         * with an increased sequence will cause the input stream to close the
524                         * stream/session.
525                         */
526                        return;
527                    }
528
529                    // data is valid; add to data queue
530                    dataQueue.offer(data);
531
532                    // TODO confirm packet once XMPP servers support XEP-0079
533                }
534
535            };
536        }
537
538        @Override
539        protected StanzaFilter getDataPacketFilter() {
540            /*
541             * filter all message stanzas containing a data stanza(/packet) extension, matching session ID
542             * and recipient
543             */
544            return new AndFilter(new StanzaTypeFilter(Message.class), new IBBDataPacketFilter());
545        }
546
547    }
548
549    /**
550     * IBBDataPacketFilter class filters all packets from the remote peer of this session,
551     * containing an In-Band Bytestream data stanza(/packet) extension whose session ID matches this sessions
552     * ID.
553     */
554    private class IBBDataPacketFilter implements StanzaFilter {
555
556        public boolean accept(Stanza packet) {
557            // sender equals remote peer
558            if (!packet.getFrom().equalsIgnoreCase(remoteJID)) {
559                return false;
560            }
561
562            DataPacketExtension data;
563            if (packet instanceof Data) {
564                data = ((Data) packet).getDataPacketExtension();
565            } else {
566                // stanza contains data packet extension
567                data = packet.getExtension(
568                        DataPacketExtension.ELEMENT,
569                        DataPacketExtension.NAMESPACE);
570                if (data == null) {
571                    return false;
572                }
573            }
574
575            // session ID equals this session ID
576            if (!data.getSessionID().equals(byteStreamRequest.getSessionID())) {
577                return false;
578            }
579
580            return true;
581        }
582
583    }
584
585    /**
586     * IBBOutputStream class is the base implementation of an In-Band Bytestream output stream.
587     * Subclasses of this output stream must provide a method to send data over XMPP stream.
588     */
589    private abstract class IBBOutputStream extends OutputStream {
590
591        /* buffer with the size of this sessions block size */
592        protected final byte[] buffer;
593
594        /* pointer to next byte to write to buffer */
595        protected int bufferPointer = 0;
596
597        /* data packet sequence (range from 0 to 65535) */
598        protected long seq = 0;
599
600        /* flag to indicate if output stream is closed */
601        protected boolean isClosed = false;
602
603        /**
604         * Constructor.
605         */
606        public IBBOutputStream() {
607            this.buffer = new byte[byteStreamRequest.getBlockSize()];
608        }
609
610        /**
611         * Writes the given data stanza(/packet) to the XMPP stream.
612         * 
613         * @param data the data packet
614         * @throws IOException if an I/O error occurred while sending or if the stream is closed
615         * @throws NotConnectedException 
616         */
617        protected abstract void writeToXML(DataPacketExtension data) throws IOException, NotConnectedException;
618
619        public synchronized void write(int b) throws IOException {
620            if (this.isClosed) {
621                throw new IOException("Stream is closed");
622            }
623
624            // if buffer is full flush buffer
625            if (bufferPointer >= buffer.length) {
626                flushBuffer();
627            }
628
629            buffer[bufferPointer++] = (byte) b;
630        }
631
632        public synchronized void write(byte[] b, int off, int len) throws IOException {
633            if (b == null) {
634                throw new NullPointerException();
635            }
636            else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
637                            || ((off + len) < 0)) {
638                throw new IndexOutOfBoundsException();
639            }
640            else if (len == 0) {
641                return;
642            }
643
644            if (this.isClosed) {
645                throw new IOException("Stream is closed");
646            }
647
648            // is data to send greater than buffer size
649            if (len >= buffer.length) {
650
651                // "byte" off the first chunk to write out
652                writeOut(b, off, buffer.length);
653
654                // recursively call this method with the lesser amount
655                write(b, off + buffer.length, len - buffer.length);
656            }
657            else {
658                writeOut(b, off, len);
659            }
660        }
661
662        public synchronized void write(byte[] b) throws IOException {
663            write(b, 0, b.length);
664        }
665
666        /**
667         * Fills the buffer with the given data and sends it over the XMPP stream if the buffers
668         * capacity has been reached. This method is only called from this class so it is assured
669         * that the amount of data to send is <= buffer capacity
670         * 
671         * @param b the data
672         * @param off the data
673         * @param len the number of bytes to write
674         * @throws IOException if an I/O error occurred while sending or if the stream is closed
675         */
676        private synchronized void writeOut(byte[] b, int off, int len) throws IOException {
677            if (this.isClosed) {
678                throw new IOException("Stream is closed");
679            }
680
681            // set to 0 in case the next 'if' block is not executed
682            int available = 0;
683
684            // is data to send greater that buffer space left
685            if (len > buffer.length - bufferPointer) {
686                // fill buffer to capacity and send it
687                available = buffer.length - bufferPointer;
688                System.arraycopy(b, off, buffer, bufferPointer, available);
689                bufferPointer += available;
690                flushBuffer();
691            }
692
693            // copy the data left to buffer
694            System.arraycopy(b, off + available, buffer, bufferPointer, len - available);
695            bufferPointer += len - available;
696        }
697
698        public synchronized void flush() throws IOException {
699            if (this.isClosed) {
700                throw new IOException("Stream is closed");
701            }
702            flushBuffer();
703        }
704
705        private synchronized void flushBuffer() throws IOException {
706
707            // do nothing if no data to send available
708            if (bufferPointer == 0) {
709                return;
710            }
711
712            // create data packet
713            String enc = Base64.encodeToString(buffer, 0, bufferPointer);
714            DataPacketExtension data = new DataPacketExtension(byteStreamRequest.getSessionID(),
715                            this.seq, enc);
716
717            // write to XMPP stream
718            try {
719                writeToXML(data);
720            }
721            catch (NotConnectedException e) {
722                IOException ioException = new IOException();
723                ioException.initCause(e);
724                throw ioException;
725            }
726
727            // reset buffer pointer
728            bufferPointer = 0;
729
730            // increment sequence, considering sequence overflow
731            this.seq = (this.seq + 1 == 65535 ? 0 : this.seq + 1);
732
733        }
734
735        public void close() throws IOException {
736            if (isClosed) {
737                return;
738            }
739            InBandBytestreamSession.this.closeByLocal(false);
740        }
741
742        /**
743         * Sets the close flag and optionally flushes the stream.
744         * 
745         * @param flush if <code>true</code> flushes the stream
746         */
747        protected void closeInternal(boolean flush) {
748            if (this.isClosed) {
749                return;
750            }
751            this.isClosed = true;
752
753            try {
754                if (flush) {
755                    flushBuffer();
756                }
757            }
758            catch (IOException e) {
759                /*
760                 * ignore, because writeToXML() will not throw an exception if stream is already
761                 * closed
762                 */
763            }
764        }
765
766    }
767
768    /**
769     * IQIBBOutputStream class implements IBBOutputStream to be used with IQ stanzas encapsulating
770     * the data packets.
771     */
772    private class IQIBBOutputStream extends IBBOutputStream {
773
774        @Override
775        protected synchronized void writeToXML(DataPacketExtension data) throws IOException {
776            // create IQ stanza containing data packet
777            IQ iq = new Data(data);
778            iq.setTo(remoteJID);
779
780            try {
781                connection.createPacketCollectorAndSend(iq).nextResultOrThrow();
782            }
783            catch (Exception e) {
784                // close session unless it is already closed
785                if (!this.isClosed) {
786                    InBandBytestreamSession.this.close();
787                    // Sadly we are unable to use the IOException(Throwable) constructor because this
788                    // constructor is only supported from Android API 9 on.
789                    IOException ioException = new IOException();
790                    ioException.initCause(e);
791                    throw ioException;
792                }
793            }
794
795        }
796
797    }
798
799    /**
800     * MessageIBBOutputStream class implements IBBOutputStream to be used with message stanzas
801     * encapsulating the data packets.
802     */
803    private class MessageIBBOutputStream extends IBBOutputStream {
804
805        @Override
806        protected synchronized void writeToXML(DataPacketExtension data) throws NotConnectedException {
807            // create message stanza containing data packet
808            Message message = new Message(remoteJID);
809            message.addExtension(data);
810
811            connection.sendStanza(message);
812
813        }
814
815    }
816
817    /**
818     * @param data
819     * @throws NotConnectedException
820     */
821    public void processIQPacket(Data data) throws NotConnectedException {
822        inputStream.dataPacketListener.processPacket(data);
823    }
824
825}