001/**
002 *
003 * Copyright the original author or authors
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smackx.bytestreams.ibb;
018
019import java.io.IOException;
020import java.io.InputStream;
021import java.io.OutputStream;
022import java.net.SocketTimeoutException;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.LinkedBlockingQueue;
025import java.util.concurrent.TimeUnit;
026
027import org.jivesoftware.smack.SmackException.NotConnectedException;
028import org.jivesoftware.smack.SmackException.NotLoggedInException;
029import org.jivesoftware.smack.StanzaListener;
030import org.jivesoftware.smack.XMPPConnection;
031import org.jivesoftware.smack.filter.AndFilter;
032import org.jivesoftware.smack.filter.StanzaFilter;
033import org.jivesoftware.smack.filter.StanzaTypeFilter;
034import org.jivesoftware.smack.packet.IQ;
035import org.jivesoftware.smack.packet.Message;
036import org.jivesoftware.smack.packet.Stanza;
037import org.jivesoftware.smack.packet.StanzaError;
038import org.jivesoftware.smack.util.stringencoder.Base64;
039
040import org.jivesoftware.smackx.bytestreams.BytestreamSession;
041import org.jivesoftware.smackx.bytestreams.ibb.packet.Close;
042import org.jivesoftware.smackx.bytestreams.ibb.packet.Data;
043import org.jivesoftware.smackx.bytestreams.ibb.packet.DataPacketExtension;
044import org.jivesoftware.smackx.bytestreams.ibb.packet.Open;
045
046import org.jxmpp.jid.Jid;
047
048/**
049 * InBandBytestreamSession class represents an In-Band Bytestream session.
050 * <p>
051 * In-band bytestreams are bidirectional and this session encapsulates the streams for both
052 * directions.
053 * <p>
054 * Note that closing the In-Band Bytestream session will close both streams. If both streams are
055 * closed individually the session will be closed automatically once the second stream is closed.
056 * Use the {@link #setCloseBothStreamsEnabled(boolean)} method if both streams should be closed
057 * automatically if one of them is closed.
058 *
059 * @author Henning Staib
060 */
061public class InBandBytestreamSession implements BytestreamSession {
062
063    /* XMPP connection */
064    private final XMPPConnection connection;
065
066    /* the In-Band Bytestream open request for this session */
067    private final Open byteStreamRequest;
068
069    /*
070     * the input stream for this session (either IQIBBInputStream or MessageIBBInputStream)
071     */
072    private IBBInputStream inputStream;
073
074    /*
075     * the output stream for this session (either IQIBBOutputStream or MessageIBBOutputStream)
076     */
077    private IBBOutputStream outputStream;
078
079    /* JID of the remote peer */
080    private Jid remoteJID;
081
082    /* flag to close both streams if one of them is closed */
083    private boolean closeBothStreamsEnabled = false;
084
085    /* flag to indicate if session is closed */
086    private boolean isClosed = false;
087
088    /**
089     * Constructor.
090     *
091     * @param connection the XMPP connection
092     * @param byteStreamRequest the In-Band Bytestream open request for this session
093     * @param remoteJID JID of the remote peer
094     */
095    protected InBandBytestreamSession(XMPPConnection connection, Open byteStreamRequest,
096                    Jid remoteJID) {
097        this.connection = connection;
098        this.byteStreamRequest = byteStreamRequest;
099        this.remoteJID = remoteJID;
100
101        // initialize streams dependent to the uses stanza type
102        switch (byteStreamRequest.getStanza()) {
103        case IQ:
104            this.inputStream = new IQIBBInputStream();
105            this.outputStream = new IQIBBOutputStream();
106            break;
107        case MESSAGE:
108            this.inputStream = new MessageIBBInputStream();
109            this.outputStream = new MessageIBBOutputStream();
110            break;
111        }
112
113    }
114
115    @Override
116    public InputStream getInputStream() {
117        return this.inputStream;
118    }
119
120    @Override
121    public OutputStream getOutputStream() {
122        return this.outputStream;
123    }
124
125    @Override
126    public int getReadTimeout() {
127        return this.inputStream.readTimeout;
128    }
129
130    @Override
131    public void setReadTimeout(int timeout) {
132        if (timeout < 0) {
133            throw new IllegalArgumentException("Timeout must be >= 0");
134        }
135        this.inputStream.readTimeout = timeout;
136    }
137
138    /**
139     * Returns whether both streams should be closed automatically if one of the streams is closed.
140     * Default is <code>false</code>.
141     *
142     * @return <code>true</code> if both streams will be closed if one of the streams is closed,
143     *         <code>false</code> if both streams can be closed independently.
144     */
145    public boolean isCloseBothStreamsEnabled() {
146        return closeBothStreamsEnabled;
147    }
148
149    /**
150     * Sets whether both streams should be closed automatically if one of the streams is closed.
151     * Default is <code>false</code>.
152     *
153     * @param closeBothStreamsEnabled <code>true</code> if both streams should be closed if one of
154     *        the streams is closed, <code>false</code> if both streams should be closed
155     *        independently
156     */
157    public void setCloseBothStreamsEnabled(boolean closeBothStreamsEnabled) {
158        this.closeBothStreamsEnabled = closeBothStreamsEnabled;
159    }
160
161    @Override
162    public void close() throws IOException {
163        closeByLocal(true); // close input stream
164        closeByLocal(false); // close output stream
165    }
166
167    /**
168     * This method is invoked if a request to close the In-Band Bytestream has been received.
169     *
170     * @param closeRequest the close request from the remote peer
171     * @throws NotConnectedException
172     * @throws InterruptedException
173     */
174    protected void closeByPeer(Close closeRequest) throws NotConnectedException, InterruptedException {
175
176        /*
177         * close streams without flushing them, because stream is already considered closed on the
178         * remote peers side
179         */
180        this.inputStream.closeInternal();
181        this.inputStream.cleanup();
182        this.outputStream.closeInternal(false);
183
184        // acknowledge close request
185        IQ confirmClose = IQ.createResultIQ(closeRequest);
186        this.connection.sendStanza(confirmClose);
187
188    }
189
190    /**
191     * This method is invoked if one of the streams has been closed locally, if an error occurred
192     * locally or if the whole session should be closed.
193     *
194     * @param in do we want to close the Input- or OutputStream?
195     * @throws IOException if an error occurs while sending the close request
196     */
197    protected synchronized void closeByLocal(boolean in) throws IOException {
198        if (this.isClosed) {
199            return;
200        }
201
202        if (this.closeBothStreamsEnabled) {
203            this.inputStream.closeInternal();
204            this.outputStream.closeInternal(true);
205        }
206        else {
207            if (in) {
208                this.inputStream.closeInternal();
209            }
210            else {
211                // close stream but try to send any data left
212                this.outputStream.closeInternal(true);
213            }
214        }
215
216        if (this.inputStream.isClosed && this.outputStream.isClosed) {
217            this.isClosed = true;
218
219            // send close request
220            Close close = new Close(this.byteStreamRequest.getSessionID());
221            close.setTo(this.remoteJID);
222            try {
223                connection.createStanzaCollectorAndSend(close).nextResultOrThrow();
224            }
225            catch (Exception e) {
226                // Sadly we are unable to use the IOException(Throwable) constructor because this
227                // constructor is only supported from Android API 9 on.
228                IOException ioException = new IOException();
229                ioException.initCause(e);
230                throw ioException;
231            }
232
233            this.inputStream.cleanup();
234
235            // remove session from manager
236            // Thanks Google Error Prone for finding the bug where remove() was called with 'this' as argument. Changed
237            // now to remove(byteStreamRequest.getSessionID).
238            InBandBytestreamManager.getByteStreamManager(this.connection).getSessions().remove(byteStreamRequest.getSessionID());
239        }
240
241    }
242
243    /**
244     * IBBInputStream class is the base implementation of an In-Band Bytestream input stream.
245     * Subclasses of this input stream must provide a stanza listener along with a stanza filter to
246     * collect the In-Band Bytestream data packets.
247     */
248    private abstract class IBBInputStream extends InputStream {
249
250        /* the data packet listener to fill the data queue */
251        private final StanzaListener dataPacketListener;
252
253        /* queue containing received In-Band Bytestream data packets */
254        protected final BlockingQueue<DataPacketExtension> dataQueue = new LinkedBlockingQueue<DataPacketExtension>();
255
256        /* buffer containing the data from one data packet */
257        private byte[] buffer;
258
259        /* pointer to the next byte to read from buffer */
260        private int bufferPointer = -1;
261
262        /* data packet sequence (range from 0 to 65535) */
263        private long seq = -1;
264
265        /* flag to indicate if input stream is closed */
266        private boolean isClosed = false;
267
268        /* flag to indicate if close method was invoked */
269        private boolean closeInvoked = false;
270
271        /* timeout for read operations */
272        private int readTimeout = 0;
273
274        /**
275         * Constructor.
276         */
277        protected IBBInputStream() {
278            // add data packet listener to connection
279            this.dataPacketListener = getDataPacketListener();
280            connection.addSyncStanzaListener(this.dataPacketListener, getDataPacketFilter());
281        }
282
283        /**
284         * Returns the stanza listener that processes In-Band Bytestream data packets.
285         *
286         * @return the data stanza listener
287         */
288        protected abstract StanzaListener getDataPacketListener();
289
290        /**
291         * Returns the stanza filter that accepts In-Band Bytestream data packets.
292         *
293         * @return the data stanza filter
294         */
295        protected abstract StanzaFilter getDataPacketFilter();
296
297        @Override
298        public synchronized int read() throws IOException {
299            checkClosed();
300
301            // if nothing read yet or whole buffer has been read fill buffer
302            if (bufferPointer == -1 || bufferPointer >= buffer.length) {
303                // if no data available and stream was closed return -1
304                if (!loadBuffer()) {
305                    return -1;
306                }
307            }
308
309            // return byte and increment buffer pointer
310            return buffer[bufferPointer++] & 0xff;
311        }
312
313        @Override
314        public synchronized int read(byte[] b, int off, int len) throws IOException {
315            if (b == null) {
316                throw new NullPointerException();
317            }
318            else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
319                            || ((off + len) < 0)) {
320                throw new IndexOutOfBoundsException();
321            }
322            else if (len == 0) {
323                return 0;
324            }
325
326            checkClosed();
327
328            // if nothing read yet or whole buffer has been read fill buffer
329            if (bufferPointer == -1 || bufferPointer >= buffer.length) {
330                // if no data available and stream was closed return -1
331                if (!loadBuffer()) {
332                    return -1;
333                }
334            }
335
336            // if more bytes wanted than available return all available
337            int bytesAvailable = buffer.length - bufferPointer;
338            if (len > bytesAvailable) {
339                len = bytesAvailable;
340            }
341
342            System.arraycopy(buffer, bufferPointer, b, off, len);
343            bufferPointer += len;
344            return len;
345        }
346
347        @Override
348        public synchronized int read(byte[] b) throws IOException {
349            return read(b, 0, b.length);
350        }
351
352        /**
353         * This method blocks until a data stanza is received, the stream is closed or the current
354         * thread is interrupted.
355         *
356         * @return <code>true</code> if data was received, otherwise <code>false</code>
357         * @throws IOException if data packets are out of sequence
358         */
359        private synchronized boolean loadBuffer() throws IOException {
360
361            // wait until data is available or stream is closed
362            DataPacketExtension data = null;
363            try {
364                if (this.readTimeout == 0) {
365                    while (data == null) {
366                        if (isClosed && this.dataQueue.isEmpty()) {
367                            return false;
368                        }
369                        data = this.dataQueue.poll(1000, TimeUnit.MILLISECONDS);
370                    }
371                }
372                else {
373                    data = this.dataQueue.poll(this.readTimeout, TimeUnit.MILLISECONDS);
374                    if (data == null) {
375                        throw new SocketTimeoutException();
376                    }
377                }
378            }
379            catch (InterruptedException e) {
380                // Restore the interrupted status
381                Thread.currentThread().interrupt();
382                return false;
383            }
384
385            // handle sequence overflow
386            if (this.seq == 65535) {
387                this.seq = -1;
388            }
389
390            // check if data packets sequence is successor of last seen sequence
391            long seq = data.getSeq();
392            if (seq - 1 != this.seq) {
393                // packets out of order; close stream/session
394                InBandBytestreamSession.this.close();
395                throw new IOException("Packets out of sequence");
396            }
397            else {
398                this.seq = seq;
399            }
400
401            // set buffer to decoded data
402            buffer = data.getDecodedData();
403            bufferPointer = 0;
404            return true;
405        }
406
407        /**
408         * Checks if this stream is closed and throws an IOException if necessary
409         *
410         * @throws IOException if stream is closed and no data should be read anymore
411         */
412        private void checkClosed() throws IOException {
413            // Throw an exception if, and only if, this stream has been already
414            // closed by the user using the close() method
415            if (closeInvoked) {
416                // clear data queue in case additional data was received after stream was closed
417                this.dataQueue.clear();
418                throw new IOException("Stream is closed");
419            }
420        }
421
422        @Override
423        public boolean markSupported() {
424            return false;
425        }
426
427        @Override
428        public void close() throws IOException {
429            if (closeInvoked) {
430                return;
431            }
432
433            this.closeInvoked = true;
434
435            InBandBytestreamSession.this.closeByLocal(true);
436        }
437
438        /**
439         * This method sets the close flag and removes the data stanza listener.
440         */
441        private void closeInternal() {
442            if (isClosed) {
443                return;
444            }
445            isClosed = true;
446        }
447
448        /**
449         * Invoked if the session is closed.
450         */
451        private void cleanup() {
452            connection.removeSyncStanzaListener(this.dataPacketListener);
453        }
454
455    }
456
457    /**
458     * IQIBBInputStream class implements IBBInputStream to be used with IQ stanzas encapsulating the
459     * data packets.
460     */
461    private class IQIBBInputStream extends IBBInputStream {
462
463        @Override
464        protected StanzaListener getDataPacketListener() {
465            return new StanzaListener() {
466
467                private long lastSequence = -1;
468
469                @Override
470                public void processStanza(Stanza packet) throws NotConnectedException, InterruptedException {
471                    // get data packet extension
472                    DataPacketExtension data = ((Data) packet).getDataPacketExtension();
473
474                    /*
475                     * check if sequence was not used already (see XEP-0047 Section 2.2)
476                     */
477                    if (data.getSeq() <= this.lastSequence) {
478                        IQ unexpectedRequest = IQ.createErrorResponse((IQ) packet,
479                                        StanzaError.Condition.unexpected_request);
480                        connection.sendStanza(unexpectedRequest);
481                        return;
482
483                    }
484
485                    // check if encoded data is valid (see XEP-0047 Section 2.2)
486                    if (data.getDecodedData() == null) {
487                        // data is invalid; respond with bad-request error
488                        IQ badRequest = IQ.createErrorResponse((IQ) packet,
489                                        StanzaError.Condition.bad_request);
490                        connection.sendStanza(badRequest);
491                        return;
492                    }
493
494                    // data is valid; add to data queue
495                    dataQueue.offer(data);
496
497                    // confirm IQ
498                    IQ confirmData = IQ.createResultIQ((IQ) packet);
499                    connection.sendStanza(confirmData);
500
501                    // set last seen sequence
502                    this.lastSequence = data.getSeq();
503                    if (this.lastSequence == 65535) {
504                        this.lastSequence = -1;
505                    }
506
507                }
508
509            };
510        }
511
512        @Override
513        protected StanzaFilter getDataPacketFilter() {
514            /*
515             * filter all IQ stanzas having type 'SET' (represented by Data class), containing a
516             * data stanza extension, matching session ID and recipient
517             */
518            return new AndFilter(new StanzaTypeFilter(Data.class), new IBBDataPacketFilter());
519        }
520
521    }
522
523    /**
524     * MessageIBBInputStream class implements IBBInputStream to be used with message stanzas
525     * encapsulating the data packets.
526     */
527    private class MessageIBBInputStream extends IBBInputStream {
528
529        @Override
530        protected StanzaListener getDataPacketListener() {
531            return new StanzaListener() {
532
533                @Override
534                public void processStanza(Stanza packet) {
535                    // get data packet extension
536                    DataPacketExtension data = packet.getExtension(
537                                    DataPacketExtension.ELEMENT,
538                                    DataPacketExtension.NAMESPACE);
539
540                    // check if encoded data is valid
541                    if (data.getDecodedData() == null) {
542                        /*
543                         * TODO once a majority of XMPP server implementation support XEP-0079
544                         * Advanced Message Processing the invalid message could be answered with an
545                         * appropriate error. For now we just ignore the packet. Subsequent packets
546                         * with an increased sequence will cause the input stream to close the
547                         * stream/session.
548                         */
549                        return;
550                    }
551
552                    // data is valid; add to data queue
553                    dataQueue.offer(data);
554
555                    // TODO confirm packet once XMPP servers support XEP-0079
556                }
557
558            };
559        }
560
561        @Override
562        protected StanzaFilter getDataPacketFilter() {
563            /*
564             * filter all message stanzas containing a data stanza extension, matching session ID
565             * and recipient
566             */
567            return new AndFilter(new StanzaTypeFilter(Message.class), new IBBDataPacketFilter());
568        }
569
570    }
571
572    /**
573     * IBBDataPacketFilter class filters all packets from the remote peer of this session,
574     * containing an In-Band Bytestream data stanza extension whose session ID matches this sessions
575     * ID.
576     */
577    private class IBBDataPacketFilter implements StanzaFilter {
578
579        @Override
580        public boolean accept(Stanza packet) {
581            // sender equals remote peer
582            if (!packet.getFrom().equals(remoteJID)) {
583                return false;
584            }
585
586            DataPacketExtension data;
587            if (packet instanceof Data) {
588                data = ((Data) packet).getDataPacketExtension();
589            } else {
590                // stanza contains data packet extension
591                data = packet.getExtension(
592                        DataPacketExtension.ELEMENT,
593                        DataPacketExtension.NAMESPACE);
594                if (data == null) {
595                    return false;
596                }
597            }
598
599            // session ID equals this session ID
600            if (!data.getSessionID().equals(byteStreamRequest.getSessionID())) {
601                return false;
602            }
603
604            return true;
605        }
606
607    }
608
609    /**
610     * IBBOutputStream class is the base implementation of an In-Band Bytestream output stream.
611     * Subclasses of this output stream must provide a method to send data over XMPP stream.
612     */
613    private abstract class IBBOutputStream extends OutputStream {
614
615        /* buffer with the size of this sessions block size */
616        protected final byte[] buffer;
617
618        /* pointer to next byte to write to buffer */
619        protected int bufferPointer = 0;
620
621        /* data packet sequence (range from 0 to 65535) */
622        protected long seq = 0;
623
624        /* flag to indicate if output stream is closed */
625        protected boolean isClosed = false;
626
627        /**
628         * Constructor.
629         */
630        private IBBOutputStream() {
631            this.buffer = new byte[byteStreamRequest.getBlockSize()];
632        }
633
634        /**
635         * Writes the given data stanza to the XMPP stream.
636         *
637         * @param data the data packet
638         * @throws IOException if an I/O error occurred while sending or if the stream is closed
639         * @throws NotConnectedException
640         * @throws InterruptedException
641         */
642        protected abstract void writeToXML(DataPacketExtension data) throws IOException, NotConnectedException, InterruptedException;
643
644        @Override
645        public synchronized void write(int b) throws IOException {
646            if (this.isClosed) {
647                throw new IOException("Stream is closed");
648            }
649
650            // if buffer is full flush buffer
651            if (bufferPointer >= buffer.length) {
652                flushBuffer();
653            }
654
655            buffer[bufferPointer++] = (byte) b;
656        }
657
658        @Override
659        public synchronized void write(byte[] b, int off, int len) throws IOException {
660            if (b == null) {
661                throw new NullPointerException();
662            }
663            else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
664                            || ((off + len) < 0)) {
665                throw new IndexOutOfBoundsException();
666            }
667            else if (len == 0) {
668                return;
669            }
670
671            if (this.isClosed) {
672                throw new IOException("Stream is closed");
673            }
674
675            // is data to send greater than buffer size
676            if (len >= buffer.length) {
677
678                // "byte" off the first chunk to write out
679                writeOut(b, off, buffer.length);
680
681                // recursively call this method with the lesser amount
682                write(b, off + buffer.length, len - buffer.length);
683            }
684            else {
685                writeOut(b, off, len);
686            }
687        }
688
689        @Override
690        public synchronized void write(byte[] b) throws IOException {
691            write(b, 0, b.length);
692        }
693
694        /**
695         * Fills the buffer with the given data and sends it over the XMPP stream if the buffers
696         * capacity has been reached. This method is only called from this class so it is assured
697         * that the amount of data to send is <= buffer capacity
698         *
699         * @param b the data
700         * @param off the data
701         * @param len the number of bytes to write
702         * @throws IOException if an I/O error occurred while sending or if the stream is closed
703         */
704        private synchronized void writeOut(byte[] b, int off, int len) throws IOException {
705            if (this.isClosed) {
706                throw new IOException("Stream is closed");
707            }
708
709            // set to 0 in case the next 'if' block is not executed
710            int available = 0;
711
712            // is data to send greater that buffer space left
713            if (len > buffer.length - bufferPointer) {
714                // fill buffer to capacity and send it
715                available = buffer.length - bufferPointer;
716                System.arraycopy(b, off, buffer, bufferPointer, available);
717                bufferPointer += available;
718                flushBuffer();
719            }
720
721            // copy the data left to buffer
722            System.arraycopy(b, off + available, buffer, bufferPointer, len - available);
723            bufferPointer += len - available;
724        }
725
726        @Override
727        public synchronized void flush() throws IOException {
728            if (this.isClosed) {
729                throw new IOException("Stream is closed");
730            }
731            flushBuffer();
732        }
733
734        private synchronized void flushBuffer() throws IOException {
735
736            // do nothing if no data to send available
737            if (bufferPointer == 0) {
738                return;
739            }
740
741            // create data packet
742            String enc = Base64.encodeToString(buffer, 0, bufferPointer);
743            DataPacketExtension data = new DataPacketExtension(byteStreamRequest.getSessionID(),
744                            this.seq, enc);
745
746            // write to XMPP stream
747            try {
748                writeToXML(data);
749            }
750            catch (InterruptedException | NotConnectedException e) {
751                IOException ioException = new IOException();
752                ioException.initCause(e);
753                throw ioException;
754            }
755
756            // reset buffer pointer
757            bufferPointer = 0;
758
759            // increment sequence, considering sequence overflow
760            this.seq = (this.seq + 1 == 65535 ? 0 : this.seq + 1);
761
762        }
763
764        @Override
765        public void close() throws IOException {
766            if (isClosed) {
767                return;
768            }
769            InBandBytestreamSession.this.closeByLocal(false);
770        }
771
772        /**
773         * Sets the close flag and optionally flushes the stream.
774         *
775         * @param flush if <code>true</code> flushes the stream
776         */
777        protected void closeInternal(boolean flush) {
778            if (this.isClosed) {
779                return;
780            }
781            this.isClosed = true;
782
783            try {
784                if (flush) {
785                    flushBuffer();
786                }
787            }
788            catch (IOException e) {
789                /*
790                 * ignore, because writeToXML() will not throw an exception if stream is already
791                 * closed
792                 */
793            }
794        }
795
796    }
797
798    /**
799     * IQIBBOutputStream class implements IBBOutputStream to be used with IQ stanzas encapsulating
800     * the data packets.
801     */
802    private class IQIBBOutputStream extends IBBOutputStream {
803
804        @Override
805        protected synchronized void writeToXML(DataPacketExtension data) throws IOException {
806            // create IQ stanza containing data packet
807            IQ iq = new Data(data);
808            iq.setTo(remoteJID);
809
810            try {
811                connection.createStanzaCollectorAndSend(iq).nextResultOrThrow();
812            }
813            catch (Exception e) {
814                // close session unless it is already closed
815                if (!this.isClosed) {
816                    InBandBytestreamSession.this.close();
817                    // Sadly we are unable to use the IOException(Throwable) constructor because this
818                    // constructor is only supported from Android API 9 on.
819                    IOException ioException = new IOException();
820                    ioException.initCause(e);
821                    throw ioException;
822                }
823            }
824
825        }
826
827    }
828
829    /**
830     * MessageIBBOutputStream class implements IBBOutputStream to be used with message stanzas
831     * encapsulating the data packets.
832     */
833    private class MessageIBBOutputStream extends IBBOutputStream {
834
835        @Override
836        protected synchronized void writeToXML(DataPacketExtension data) throws NotConnectedException, InterruptedException {
837            // create message stanza containing data packet
838            Message message = new Message(remoteJID);
839            message.addExtension(data);
840
841            connection.sendStanza(message);
842
843        }
844
845    }
846
847    /**
848     * Process IQ stanza.
849     * @param data
850     * @throws NotConnectedException
851     * @throws InterruptedException
852     * @throws NotLoggedInException
853     */
854    public void processIQPacket(Data data) throws NotConnectedException, InterruptedException, NotLoggedInException {
855        inputStream.dataPacketListener.processStanza(data);
856    }
857
858}