001/**
002 *
003 * Copyright the original author or authors
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smackx.bytestreams.ibb;
018
019import java.io.IOException;
020import java.io.InputStream;
021import java.io.OutputStream;
022import java.net.SocketTimeoutException;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.LinkedBlockingQueue;
025import java.util.concurrent.TimeUnit;
026import java.util.logging.Level;
027import java.util.logging.Logger;
028
029import org.jivesoftware.smack.SmackException.NotConnectedException;
030import org.jivesoftware.smack.SmackException.NotLoggedInException;
031import org.jivesoftware.smack.StanzaListener;
032import org.jivesoftware.smack.XMPPConnection;
033import org.jivesoftware.smack.datatypes.UInt16;
034import org.jivesoftware.smack.filter.AndFilter;
035import org.jivesoftware.smack.filter.StanzaFilter;
036import org.jivesoftware.smack.filter.StanzaTypeFilter;
037import org.jivesoftware.smack.packet.IQ;
038import org.jivesoftware.smack.packet.Message;
039import org.jivesoftware.smack.packet.Stanza;
040import org.jivesoftware.smack.packet.StanzaBuilder;
041import org.jivesoftware.smack.packet.StanzaError;
042import org.jivesoftware.smack.util.stringencoder.Base64;
043
044import org.jivesoftware.smackx.bytestreams.BytestreamSession;
045import org.jivesoftware.smackx.bytestreams.ibb.packet.Close;
046import org.jivesoftware.smackx.bytestreams.ibb.packet.Data;
047import org.jivesoftware.smackx.bytestreams.ibb.packet.DataPacketExtension;
048import org.jivesoftware.smackx.bytestreams.ibb.packet.Open;
049
050import org.jxmpp.jid.Jid;
051
052/**
053 * InBandBytestreamSession class represents an In-Band Bytestream session.
054 * <p>
055 * In-band bytestreams are bidirectional and this session encapsulates the streams for both
056 * directions.
057 * <p>
058 * Note that closing the In-Band Bytestream session will close both streams. If both streams are
059 * closed individually the session will be closed automatically once the second stream is closed.
060 * Use the {@link #setCloseBothStreamsEnabled(boolean)} method if both streams should be closed
061 * automatically if one of them is closed.
062 *
063 * @author Henning Staib
064 */
065public class InBandBytestreamSession implements BytestreamSession {
066
067    private static final Logger LOGGER = Logger.getLogger(InBandBytestreamSession.class.getName());
068
069    static final String UNEXPECTED_IBB_SEQUENCE = "Unexpected IBB sequence";
070
071    /* XMPP connection */
072    private final XMPPConnection connection;
073
074    /* the In-Band Bytestream open request for this session */
075    private final Open byteStreamRequest;
076
077    /*
078     * the input stream for this session (either IQIBBInputStream or MessageIBBInputStream)
079     */
080    private IBBInputStream inputStream;
081
082    /*
083     * the output stream for this session (either IQIBBOutputStream or MessageIBBOutputStream)
084     */
085    private IBBOutputStream outputStream;
086
087    /* JID of the remote peer */
088    private Jid remoteJID;
089
090    /* flag to close both streams if one of them is closed */
091    private boolean closeBothStreamsEnabled = false;
092
093    /* flag to indicate if session is closed */
094    private boolean isClosed = false;
095
096    /**
097     * Constructor.
098     *
099     * @param connection the XMPP connection
100     * @param byteStreamRequest the In-Band Bytestream open request for this session
101     * @param remoteJID JID of the remote peer
102     */
103    protected InBandBytestreamSession(XMPPConnection connection, Open byteStreamRequest,
104                    Jid remoteJID) {
105        this.connection = connection;
106        this.byteStreamRequest = byteStreamRequest;
107        this.remoteJID = remoteJID;
108
109        // initialize streams dependent to the uses stanza type
110        switch (byteStreamRequest.getStanza()) {
111        case IQ:
112            this.inputStream = new IQIBBInputStream();
113            this.outputStream = new IQIBBOutputStream();
114            break;
115        case MESSAGE:
116            this.inputStream = new MessageIBBInputStream();
117            this.outputStream = new MessageIBBOutputStream();
118            break;
119        }
120
121    }
122
123    @Override
124    public InputStream getInputStream() {
125        return this.inputStream;
126    }
127
128    @Override
129    public OutputStream getOutputStream() {
130        return this.outputStream;
131    }
132
133    @Override
134    public int getReadTimeout() {
135        return this.inputStream.readTimeout;
136    }
137
138    @Override
139    public void setReadTimeout(int timeout) {
140        if (timeout < 0) {
141            throw new IllegalArgumentException("Timeout must be >= 0");
142        }
143        this.inputStream.readTimeout = timeout;
144    }
145
146    /**
147     * Returns whether both streams should be closed automatically if one of the streams is closed.
148     * Default is <code>false</code>.
149     *
150     * @return <code>true</code> if both streams will be closed if one of the streams is closed,
151     *         <code>false</code> if both streams can be closed independently.
152     */
153    public boolean isCloseBothStreamsEnabled() {
154        return closeBothStreamsEnabled;
155    }
156
157    /**
158     * Sets whether both streams should be closed automatically if one of the streams is closed.
159     * Default is <code>false</code>.
160     *
161     * @param closeBothStreamsEnabled <code>true</code> if both streams should be closed if one of
162     *        the streams is closed, <code>false</code> if both streams should be closed
163     *        independently
164     */
165    public void setCloseBothStreamsEnabled(boolean closeBothStreamsEnabled) {
166        this.closeBothStreamsEnabled = closeBothStreamsEnabled;
167    }
168
169    @Override
170    public void close() throws IOException {
171        closeByLocal(true); // close input stream
172        closeByLocal(false); // close output stream
173    }
174
175    /**
176     * This method is invoked if a request to close the In-Band Bytestream has been received.
177     *
178     * @param closeRequest the close request from the remote peer
179     * @throws NotConnectedException if the XMPP connection is not connected.
180     * @throws InterruptedException if the calling thread was interrupted.
181     */
182    protected void closeByPeer(Close closeRequest) throws NotConnectedException, InterruptedException {
183
184        /*
185         * close streams without flushing them, because stream is already considered closed on the
186         * remote peers side
187         */
188        this.inputStream.closeInternal();
189        this.inputStream.cleanup();
190        this.outputStream.closeInternal(false);
191
192        // acknowledge close request
193        IQ confirmClose = IQ.createResultIQ(closeRequest);
194        this.connection.sendStanza(confirmClose);
195
196    }
197
198    /**
199     * This method is invoked if one of the streams has been closed locally, if an error occurred
200     * locally or if the whole session should be closed.
201     *
202     * @param in do we want to close the Input- or OutputStream?
203     * @throws IOException if an error occurs while sending the close request
204     */
205    protected synchronized void closeByLocal(boolean in) throws IOException {
206        if (this.isClosed) {
207            return;
208        }
209
210        if (this.closeBothStreamsEnabled) {
211            this.inputStream.closeInternal();
212            this.outputStream.closeInternal(true);
213        }
214        else {
215            if (in) {
216                this.inputStream.closeInternal();
217            }
218            else {
219                // close stream but try to send any data left
220                this.outputStream.closeInternal(true);
221            }
222        }
223
224        if (this.inputStream.isClosed && this.outputStream.isClosed) {
225            this.isClosed = true;
226
227            // send close request
228            Close close = new Close(this.byteStreamRequest.getSessionID());
229            close.setTo(this.remoteJID);
230            try {
231                connection.sendIqRequestAndWaitForResponse(close);
232            }
233            catch (Exception e) {
234                // Sadly we are unable to use the IOException(Throwable) constructor because this
235                // constructor is only supported from Android API 9 on.
236                IOException ioException = new IOException();
237                ioException.initCause(e);
238                throw ioException;
239            }
240
241            this.inputStream.cleanup();
242
243            // remove session from manager
244            // Thanks Google Error Prone for finding the bug where remove() was called with 'this' as argument. Changed
245            // now to remove(byteStreamRequest.getSessionID).
246            InBandBytestreamManager.getByteStreamManager(this.connection).getSessions().remove(byteStreamRequest.getSessionID());
247        }
248
249    }
250
251    /**
252     * IBBInputStream class is the base implementation of an In-Band Bytestream input stream.
253     * Subclasses of this input stream must provide a stanza listener along with a stanza filter to
254     * collect the In-Band Bytestream data packets.
255     */
256    private abstract class IBBInputStream extends InputStream {
257
258        /* the data packet listener to fill the data queue */
259        private final StanzaListener dataPacketListener;
260
261        /* queue containing received In-Band Bytestream data packets */
262        protected final BlockingQueue<DataPacketExtension> dataQueue = new LinkedBlockingQueue<DataPacketExtension>();
263
264        /* buffer containing the data from one data packet */
265        private byte[] buffer;
266
267        /* pointer to the next byte to read from buffer */
268        private int bufferPointer = -1;
269
270        /* data packet sequence (range from 0 to 65535) */
271        private UInt16 expectedSeq = UInt16.MIN_VALUE;
272
273        /* flag to indicate if input stream is closed */
274        private boolean isClosed = false;
275
276        /* flag to indicate if close method was invoked */
277        private boolean closeInvoked = false;
278
279        /* timeout for read operations */
280        private int readTimeout = 0;
281
282        /**
283         * Constructor.
284         */
285        protected IBBInputStream() {
286            // add data packet listener to connection
287            this.dataPacketListener = getDataPacketListener();
288            connection.addSyncStanzaListener(this.dataPacketListener, getDataPacketFilter());
289        }
290
291        /**
292         * Returns the stanza listener that processes In-Band Bytestream data packets.
293         *
294         * @return the data stanza listener
295         */
296        protected abstract StanzaListener getDataPacketListener();
297
298        /**
299         * Returns the stanza filter that accepts In-Band Bytestream data packets.
300         *
301         * @return the data stanza filter
302         */
303        protected abstract StanzaFilter getDataPacketFilter();
304
305        @Override
306        public synchronized int read() throws IOException {
307            checkClosed();
308
309            // if nothing read yet or whole buffer has been read fill buffer
310            if (bufferPointer == -1 || bufferPointer >= buffer.length) {
311                // if no data available and stream was closed return -1
312                if (!loadBuffer()) {
313                    return -1;
314                }
315            }
316
317            // return byte and increment buffer pointer
318            return buffer[bufferPointer++] & 0xff;
319        }
320
321        @Override
322        public synchronized int read(byte[] b, int off, int len) throws IOException {
323            if (b == null) {
324                throw new NullPointerException();
325            }
326            else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
327                            || ((off + len) < 0)) {
328                throw new IndexOutOfBoundsException();
329            }
330            else if (len == 0) {
331                return 0;
332            }
333
334            checkClosed();
335
336            // if nothing read yet or whole buffer has been read fill buffer
337            if (bufferPointer == -1 || bufferPointer >= buffer.length) {
338                // if no data available and stream was closed return -1
339                if (!loadBuffer()) {
340                    return -1;
341                }
342            }
343
344            // if more bytes wanted than available return all available
345            int bytesAvailable = buffer.length - bufferPointer;
346            if (len > bytesAvailable) {
347                len = bytesAvailable;
348            }
349
350            System.arraycopy(buffer, bufferPointer, b, off, len);
351            bufferPointer += len;
352            return len;
353        }
354
355        @Override
356        public synchronized int read(byte[] b) throws IOException {
357            return read(b, 0, b.length);
358        }
359
360        /**
361         * This method blocks until a data stanza is received, the stream is closed or the current
362         * thread is interrupted.
363         *
364         * @return <code>true</code> if data was received, otherwise <code>false</code>
365         * @throws IOException if data packets are out of sequence
366         */
367        private synchronized boolean loadBuffer() throws IOException {
368
369            // wait until data is available or stream is closed
370            DataPacketExtension data = null;
371            try {
372                if (this.readTimeout == 0) {
373                    while (data == null) {
374                        if (isClosed && this.dataQueue.isEmpty()) {
375                            return false;
376                        }
377                        data = this.dataQueue.poll(1000, TimeUnit.MILLISECONDS);
378                    }
379                }
380                else {
381                    data = this.dataQueue.poll(this.readTimeout, TimeUnit.MILLISECONDS);
382                    if (data == null) {
383                        throw new SocketTimeoutException();
384                    }
385                }
386            }
387            catch (InterruptedException e) {
388                // Restore the interrupted status
389                Thread.currentThread().interrupt();
390                return false;
391            }
392
393            final UInt16 dataSeq = data.getSeq();
394            // check if data packets sequence is successor of last seen sequence
395            if (!expectedSeq.equals(dataSeq)) {
396                // packets out of order; close stream/session
397                InBandBytestreamSession.this.close();
398                String message = UNEXPECTED_IBB_SEQUENCE + " " + dataSeq + " received, expected "
399                                + expectedSeq;
400                throw new IOException(message);
401            }
402            expectedSeq = dataSeq.incrementedByOne();
403
404            // set buffer to decoded data
405            buffer = data.getDecodedData();
406            bufferPointer = 0;
407            return true;
408        }
409
410        /**
411         * Checks if this stream is closed and throws an IOException if necessary
412         *
413         * @throws IOException if stream is closed and no data should be read anymore
414         */
415        private void checkClosed() throws IOException {
416            // Throw an exception if, and only if, this stream has been already
417            // closed by the user using the close() method
418            if (closeInvoked) {
419                // clear data queue in case additional data was received after stream was closed
420                this.dataQueue.clear();
421                throw new IOException("Stream is closed");
422            }
423        }
424
425        @Override
426        public boolean markSupported() {
427            return false;
428        }
429
430        @Override
431        public void close() throws IOException {
432            if (closeInvoked) {
433                return;
434            }
435
436            this.closeInvoked = true;
437
438            InBandBytestreamSession.this.closeByLocal(true);
439        }
440
441        /**
442         * This method sets the close flag and removes the data stanza listener.
443         */
444        private void closeInternal() {
445            if (isClosed) {
446                return;
447            }
448            isClosed = true;
449        }
450
451        /**
452         * Invoked if the session is closed.
453         */
454        private void cleanup() {
455            connection.removeSyncStanzaListener(this.dataPacketListener);
456        }
457
458    }
459
460    /**
461     * IQIBBInputStream class implements IBBInputStream to be used with IQ stanzas encapsulating the
462     * data packets.
463     */
464    private final class IQIBBInputStream extends IBBInputStream {
465
466        @Override
467        protected StanzaListener getDataPacketListener() {
468            return new StanzaListener() {
469
470                private UInt16 expectedSequence = UInt16.MIN_VALUE;;
471
472                @Override
473                public void processStanza(Stanza packet) throws NotConnectedException, InterruptedException {
474                    final Data dataIq = (Data) packet;
475                    // get data packet extension
476                    DataPacketExtension data = dataIq.getDataPacketExtension();
477
478                    final UInt16 seq = data.getSeq();
479                    /*
480                     * check if sequence was not used already (see XEP-0047 Section 2.2)
481                     */
482                    if (!expectedSequence.equals(seq)) {
483                        String descriptiveEnTest = UNEXPECTED_IBB_SEQUENCE + " " + seq + " received, expected "
484                                        + expectedSequence;
485                        StanzaError stanzaError = StanzaError.getBuilder()
486                                        .setCondition(StanzaError.Condition.unexpected_request)
487                                        .setDescriptiveEnText(descriptiveEnTest)
488                                        .build();
489                        IQ unexpectedRequest = IQ.createErrorResponse(dataIq, stanzaError);
490                        connection.sendStanza(unexpectedRequest);
491
492                        try {
493                            // TODO: It would be great if close would take a "close error reason" argument. Also there
494                            // is the question if this is really a reason to close the stream. We could have some more
495                            // tolerance regarding out-of-sequence stanzas arriving: Even though XMPP has the in-order
496                            // guarantee, I could imagine that there are cases where stanzas are, for example,
497                            // duplicated because of stream resumption.
498                            close();
499                        } catch (IOException e) {
500                            LOGGER.log(Level.FINER, "Could not close session, because of IOException. Close reason: "
501                                            + descriptiveEnTest);
502                        }
503
504                        return;
505                    }
506
507                    // check if encoded data is valid (see XEP-0047 Section 2.2)
508                    if (data.getDecodedData() == null) {
509                        // data is invalid; respond with bad-request error
510                        IQ badRequest = IQ.createErrorResponse((IQ) packet,
511                                        StanzaError.Condition.bad_request);
512                        connection.sendStanza(badRequest);
513                        return;
514                    }
515
516                    expectedSequence = seq.incrementedByOne();
517
518                    // data is valid; add to data queue
519                    dataQueue.offer(data);
520
521                    // confirm IQ
522                    IQ confirmData = IQ.createResultIQ((IQ) packet);
523                    connection.sendStanza(confirmData);
524                }
525
526            };
527        }
528
529        @Override
530        protected StanzaFilter getDataPacketFilter() {
531            /*
532             * filter all IQ stanzas having type 'SET' (represented by Data class), containing a
533             * data stanza extension, matching session ID and recipient
534             */
535            return new AndFilter(new StanzaTypeFilter(Data.class), new IBBDataPacketFilter());
536        }
537
538    }
539
540    /**
541     * MessageIBBInputStream class implements IBBInputStream to be used with message stanzas
542     * encapsulating the data packets.
543     */
544    private final class MessageIBBInputStream extends IBBInputStream {
545
546        @Override
547        protected StanzaListener getDataPacketListener() {
548            return new StanzaListener() {
549
550                @Override
551                public void processStanza(Stanza packet) {
552                    // get data packet extension
553                    DataPacketExtension data = packet.getExtension(
554                                    DataPacketExtension.class);
555
556                    // check if encoded data is valid
557                    if (data.getDecodedData() == null) {
558                        /*
559                         * TODO once a majority of XMPP server implementation support XEP-0079
560                         * Advanced Message Processing the invalid message could be answered with an
561                         * appropriate error. For now we just ignore the packet. Subsequent packets
562                         * with an increased sequence will cause the input stream to close the
563                         * stream/session.
564                         */
565                        return;
566                    }
567
568                    // data is valid; add to data queue
569                    dataQueue.offer(data);
570
571                    // TODO confirm packet once XMPP servers support XEP-0079
572                }
573
574            };
575        }
576
577        @Override
578        protected StanzaFilter getDataPacketFilter() {
579            /*
580             * filter all message stanzas containing a data stanza extension, matching session ID
581             * and recipient
582             */
583            return new AndFilter(new StanzaTypeFilter(Message.class), new IBBDataPacketFilter());
584        }
585
586    }
587
588    /**
589     * IBBDataPacketFilter class filters all packets from the remote peer of this session,
590     * containing an In-Band Bytestream data stanza extension whose session ID matches this sessions
591     * ID.
592     */
593    private final class IBBDataPacketFilter implements StanzaFilter {
594
595        @Override
596        public boolean accept(Stanza packet) {
597            // sender equals remote peer
598            if (!packet.getFrom().equals(remoteJID)) {
599                return false;
600            }
601
602            DataPacketExtension data;
603            if (packet instanceof Data) {
604                data = ((Data) packet).getDataPacketExtension();
605            } else {
606                // stanza contains data packet extension
607                data = packet.getExtension(
608                        DataPacketExtension.class);
609                if (data == null) {
610                    return false;
611                }
612            }
613
614            // session ID equals this session ID
615            if (!data.getSessionID().equals(byteStreamRequest.getSessionID())) {
616                return false;
617            }
618
619            return true;
620        }
621
622    }
623
624    /**
625     * IBBOutputStream class is the base implementation of an In-Band Bytestream output stream.
626     * Subclasses of this output stream must provide a method to send data over XMPP stream.
627     */
628    private abstract class IBBOutputStream extends OutputStream {
629
630        /* buffer with the size of this sessions block size */
631        protected final byte[] buffer;
632
633        /* pointer to next byte to write to buffer */
634        protected int bufferPointer = 0;
635
636        /* data packet sequence (range from 0 to 65535) */
637        protected UInt16 seq = UInt16.from(0);
638
639        /* flag to indicate if output stream is closed */
640        protected boolean isClosed = false;
641
642        /**
643         * Constructor.
644         */
645        private IBBOutputStream() {
646            this.buffer = new byte[byteStreamRequest.getBlockSize()];
647        }
648
649        /**
650         * Writes the given data stanza to the XMPP stream.
651         *
652         * @param data the data packet
653         * @throws IOException if an I/O error occurred while sending or if the stream is closed
654         * @throws NotConnectedException if the XMPP connection is not connected.
655         * @throws InterruptedException if the calling thread was interrupted.
656         */
657        protected abstract void writeToXML(DataPacketExtension data) throws IOException, NotConnectedException, InterruptedException;
658
659        @Override
660        public synchronized void write(int b) throws IOException {
661            if (this.isClosed) {
662                throw new IOException("Stream is closed");
663            }
664
665            // if buffer is full flush buffer
666            if (bufferPointer >= buffer.length) {
667                flushBuffer();
668            }
669
670            buffer[bufferPointer++] = (byte) b;
671        }
672
673        @Override
674        public synchronized void write(byte[] b, int off, int len) throws IOException {
675            if (b == null) {
676                throw new NullPointerException();
677            }
678            else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
679                            || ((off + len) < 0)) {
680                throw new IndexOutOfBoundsException();
681            }
682            else if (len == 0) {
683                return;
684            }
685
686            if (this.isClosed) {
687                throw new IOException("Stream is closed");
688            }
689
690            // is data to send greater than buffer size
691            if (len >= buffer.length) {
692
693                // "byte" off the first chunk to write out
694                writeOut(b, off, buffer.length);
695
696                // recursively call this method with the lesser amount
697                write(b, off + buffer.length, len - buffer.length);
698            }
699            else {
700                writeOut(b, off, len);
701            }
702        }
703
704        @Override
705        public synchronized void write(byte[] b) throws IOException {
706            write(b, 0, b.length);
707        }
708
709        /**
710         * Fills the buffer with the given data and sends it over the XMPP stream if the buffers
711         * capacity has been reached. This method is only called from this class so it is assured
712         * that the amount of data to send is <= buffer capacity
713         *
714         * @param b the data
715         * @param off the data
716         * @param len the number of bytes to write
717         * @throws IOException if an I/O error occurred while sending or if the stream is closed
718         */
719        private synchronized void writeOut(byte[] b, int off, int len) throws IOException {
720            if (this.isClosed) {
721                throw new IOException("Stream is closed");
722            }
723
724            // set to 0 in case the next 'if' block is not executed
725            int available = 0;
726
727            // is data to send greater that buffer space left
728            if (len > buffer.length - bufferPointer) {
729                // fill buffer to capacity and send it
730                available = buffer.length - bufferPointer;
731                System.arraycopy(b, off, buffer, bufferPointer, available);
732                bufferPointer += available;
733                flushBuffer();
734            }
735
736            // copy the data left to buffer
737            System.arraycopy(b, off + available, buffer, bufferPointer, len - available);
738            bufferPointer += len - available;
739        }
740
741        @Override
742        public synchronized void flush() throws IOException {
743            if (this.isClosed) {
744                throw new IOException("Stream is closed");
745            }
746            flushBuffer();
747        }
748
749        private synchronized void flushBuffer() throws IOException {
750
751            // do nothing if no data to send available
752            if (bufferPointer == 0) {
753                return;
754            }
755
756            // create data packet
757            String enc = Base64.encodeToString(buffer, 0, bufferPointer);
758            DataPacketExtension data = new DataPacketExtension(byteStreamRequest.getSessionID(),
759                            this.seq, enc);
760
761            // write to XMPP stream
762            try {
763                writeToXML(data);
764            }
765            catch (InterruptedException | NotConnectedException e) {
766                IOException ioException = new IOException();
767                ioException.initCause(e);
768                throw ioException;
769            }
770
771            // reset buffer pointer
772            bufferPointer = 0;
773
774            // increment sequence, considering sequence overflow
775            seq = seq.incrementedByOne();
776
777        }
778
779        @Override
780        public void close() throws IOException {
781            if (isClosed) {
782                return;
783            }
784            InBandBytestreamSession.this.closeByLocal(false);
785        }
786
787        /**
788         * Sets the close flag and optionally flushes the stream.
789         *
790         * @param flush if <code>true</code> flushes the stream
791         */
792        protected void closeInternal(boolean flush) {
793            if (this.isClosed) {
794                return;
795            }
796            this.isClosed = true;
797
798            try {
799                if (flush) {
800                    flushBuffer();
801                }
802            }
803            catch (IOException e) {
804                /*
805                 * ignore, because writeToXML() will not throw an exception if stream is already
806                 * closed
807                 */
808            }
809        }
810
811    }
812
813    /**
814     * IQIBBOutputStream class implements IBBOutputStream to be used with IQ stanzas encapsulating
815     * the data packets.
816     */
817    private final class IQIBBOutputStream extends IBBOutputStream {
818
819        @Override
820        protected synchronized void writeToXML(DataPacketExtension data) throws IOException {
821            // create IQ stanza containing data packet
822            IQ iq = new Data(data);
823            iq.setTo(remoteJID);
824
825            try {
826                connection.sendIqRequestAndWaitForResponse(iq);
827            }
828            catch (Exception e) {
829                // close session unless it is already closed
830                if (!this.isClosed) {
831                    InBandBytestreamSession.this.close();
832                    // Sadly we are unable to use the IOException(Throwable) constructor because this
833                    // constructor is only supported from Android API 9 on.
834                    IOException ioException = new IOException();
835                    ioException.initCause(e);
836                    throw ioException;
837                }
838            }
839
840        }
841
842    }
843
844    /**
845     * MessageIBBOutputStream class implements IBBOutputStream to be used with message stanzas
846     * encapsulating the data packets.
847     */
848    private final class MessageIBBOutputStream extends IBBOutputStream {
849
850        @Override
851        protected synchronized void writeToXML(DataPacketExtension data) throws NotConnectedException, InterruptedException {
852            // create message stanza containing data packet
853            Message message = StanzaBuilder.buildMessage().to(remoteJID)
854                    .addExtension(data)
855                    .build();
856
857            connection.sendStanza(message);
858
859        }
860
861    }
862
863    /**
864     * Process IQ stanza.
865     * @param data TODO javadoc me please
866     * @throws NotConnectedException if the XMPP connection is not connected.
867     * @throws InterruptedException if the calling thread was interrupted.
868     * @throws NotLoggedInException if the XMPP connection is not authenticated.
869     */
870    public void processIQPacket(Data data) throws NotConnectedException, InterruptedException, NotLoggedInException {
871        inputStream.dataPacketListener.processStanza(data);
872    }
873
874}