InBandBytestreamSession.java

  1. /**
  2.  *
  3.  * Copyright the original author or authors
  4.  *
  5.  * Licensed under the Apache License, Version 2.0 (the "License");
  6.  * you may not use this file except in compliance with the License.
  7.  * You may obtain a copy of the License at
  8.  *
  9.  *     http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */
  17. package org.jivesoftware.smackx.bytestreams.ibb;

  18. import java.io.IOException;
  19. import java.io.InputStream;
  20. import java.io.OutputStream;
  21. import java.net.SocketTimeoutException;
  22. import java.util.concurrent.BlockingQueue;
  23. import java.util.concurrent.LinkedBlockingQueue;
  24. import java.util.concurrent.TimeUnit;

  25. import org.jivesoftware.smack.SmackException.NotConnectedException;
  26. import org.jivesoftware.smack.XMPPConnection;
  27. import org.jivesoftware.smack.StanzaListener;
  28. import org.jivesoftware.smack.filter.AndFilter;
  29. import org.jivesoftware.smack.filter.StanzaFilter;
  30. import org.jivesoftware.smack.filter.StanzaTypeFilter;
  31. import org.jivesoftware.smack.packet.IQ;
  32. import org.jivesoftware.smack.packet.Message;
  33. import org.jivesoftware.smack.packet.Stanza;
  34. import org.jivesoftware.smack.packet.XMPPError;
  35. import org.jivesoftware.smack.util.stringencoder.Base64;
  36. import org.jivesoftware.smackx.bytestreams.BytestreamSession;
  37. import org.jivesoftware.smackx.bytestreams.ibb.packet.Close;
  38. import org.jivesoftware.smackx.bytestreams.ibb.packet.Data;
  39. import org.jivesoftware.smackx.bytestreams.ibb.packet.DataPacketExtension;
  40. import org.jivesoftware.smackx.bytestreams.ibb.packet.Open;
  41. import org.jxmpp.jid.Jid;

  42. /**
  43.  * InBandBytestreamSession class represents an In-Band Bytestream session.
  44.  * <p>
  45.  * In-band bytestreams are bidirectional and this session encapsulates the streams for both
  46.  * directions.
  47.  * <p>
  48.  * Note that closing the In-Band Bytestream session will close both streams. If both streams are
  49.  * closed individually the session will be closed automatically once the second stream is closed.
  50.  * Use the {@link #setCloseBothStreamsEnabled(boolean)} method if both streams should be closed
  51.  * automatically if one of them is closed.
  52.  *
  53.  * @author Henning Staib
  54.  */
  55. public class InBandBytestreamSession implements BytestreamSession {

  56.     /* XMPP connection */
  57.     private final XMPPConnection connection;

  58.     /* the In-Band Bytestream open request for this session */
  59.     private final Open byteStreamRequest;

  60.     /*
  61.      * the input stream for this session (either IQIBBInputStream or MessageIBBInputStream)
  62.      */
  63.     private IBBInputStream inputStream;

  64.     /*
  65.      * the output stream for this session (either IQIBBOutputStream or MessageIBBOutputStream)
  66.      */
  67.     private IBBOutputStream outputStream;

  68.     /* JID of the remote peer */
  69.     private Jid remoteJID;

  70.     /* flag to close both streams if one of them is closed */
  71.     private boolean closeBothStreamsEnabled = false;

  72.     /* flag to indicate if session is closed */
  73.     private boolean isClosed = false;

  74.     /**
  75.      * Constructor.
  76.      *
  77.      * @param connection the XMPP connection
  78.      * @param byteStreamRequest the In-Band Bytestream open request for this session
  79.      * @param remoteJID JID of the remote peer
  80.      */
  81.     protected InBandBytestreamSession(XMPPConnection connection, Open byteStreamRequest,
  82.                     Jid remoteJID) {
  83.         this.connection = connection;
  84.         this.byteStreamRequest = byteStreamRequest;
  85.         this.remoteJID = remoteJID;

  86.         // initialize streams dependent to the uses stanza type
  87.         switch (byteStreamRequest.getStanza()) {
  88.         case IQ:
  89.             this.inputStream = new IQIBBInputStream();
  90.             this.outputStream = new IQIBBOutputStream();
  91.             break;
  92.         case MESSAGE:
  93.             this.inputStream = new MessageIBBInputStream();
  94.             this.outputStream = new MessageIBBOutputStream();
  95.             break;
  96.         }

  97.     }

  98.     public InputStream getInputStream() {
  99.         return this.inputStream;
  100.     }

  101.     public OutputStream getOutputStream() {
  102.         return this.outputStream;
  103.     }

  104.     public int getReadTimeout() {
  105.         return this.inputStream.readTimeout;
  106.     }

  107.     public void setReadTimeout(int timeout) {
  108.         if (timeout < 0) {
  109.             throw new IllegalArgumentException("Timeout must be >= 0");
  110.         }
  111.         this.inputStream.readTimeout = timeout;
  112.     }

  113.     /**
  114.      * Returns whether both streams should be closed automatically if one of the streams is closed.
  115.      * Default is <code>false</code>.
  116.      *
  117.      * @return <code>true</code> if both streams will be closed if one of the streams is closed,
  118.      *         <code>false</code> if both streams can be closed independently.
  119.      */
  120.     public boolean isCloseBothStreamsEnabled() {
  121.         return closeBothStreamsEnabled;
  122.     }

  123.     /**
  124.      * Sets whether both streams should be closed automatically if one of the streams is closed.
  125.      * Default is <code>false</code>.
  126.      *
  127.      * @param closeBothStreamsEnabled <code>true</code> if both streams should be closed if one of
  128.      *        the streams is closed, <code>false</code> if both streams should be closed
  129.      *        independently
  130.      */
  131.     public void setCloseBothStreamsEnabled(boolean closeBothStreamsEnabled) {
  132.         this.closeBothStreamsEnabled = closeBothStreamsEnabled;
  133.     }

  134.     public void close() throws IOException {
  135.         closeByLocal(true); // close input stream
  136.         closeByLocal(false); // close output stream
  137.     }

  138.     /**
  139.      * This method is invoked if a request to close the In-Band Bytestream has been received.
  140.      *
  141.      * @param closeRequest the close request from the remote peer
  142.      * @throws NotConnectedException
  143.      * @throws InterruptedException
  144.      */
  145.     protected void closeByPeer(Close closeRequest) throws NotConnectedException, InterruptedException {

  146.         /*
  147.          * close streams without flushing them, because stream is already considered closed on the
  148.          * remote peers side
  149.          */
  150.         this.inputStream.closeInternal();
  151.         this.inputStream.cleanup();
  152.         this.outputStream.closeInternal(false);

  153.         // acknowledge close request
  154.         IQ confirmClose = IQ.createResultIQ(closeRequest);
  155.         this.connection.sendStanza(confirmClose);

  156.     }

  157.     /**
  158.      * This method is invoked if one of the streams has been closed locally, if an error occurred
  159.      * locally or if the whole session should be closed.
  160.      *
  161.      * @throws IOException if an error occurs while sending the close request
  162.      */
  163.     protected synchronized void closeByLocal(boolean in) throws IOException {
  164.         if (this.isClosed) {
  165.             return;
  166.         }

  167.         if (this.closeBothStreamsEnabled) {
  168.             this.inputStream.closeInternal();
  169.             this.outputStream.closeInternal(true);
  170.         }
  171.         else {
  172.             if (in) {
  173.                 this.inputStream.closeInternal();
  174.             }
  175.             else {
  176.                 // close stream but try to send any data left
  177.                 this.outputStream.closeInternal(true);
  178.             }
  179.         }

  180.         if (this.inputStream.isClosed && this.outputStream.isClosed) {
  181.             this.isClosed = true;

  182.             // send close request
  183.             Close close = new Close(this.byteStreamRequest.getSessionID());
  184.             close.setTo(this.remoteJID);
  185.             try {
  186.                 connection.createPacketCollectorAndSend(close).nextResultOrThrow();
  187.             }
  188.             catch (Exception e) {
  189.                 // Sadly we are unable to use the IOException(Throwable) constructor because this
  190.                 // constructor is only supported from Android API 9 on.
  191.                 IOException ioException = new IOException();
  192.                 ioException.initCause(e);
  193.                 throw ioException;
  194.             }

  195.             this.inputStream.cleanup();

  196.             // remove session from manager
  197.             InBandBytestreamManager.getByteStreamManager(this.connection).getSessions().remove(this);
  198.         }

  199.     }

  200.     /**
  201.      * IBBInputStream class is the base implementation of an In-Band Bytestream input stream.
  202.      * Subclasses of this input stream must provide a packet listener along with a packet filter to
  203.      * collect the In-Band Bytestream data packets.
  204.      */
  205.     private abstract class IBBInputStream extends InputStream {

  206.         /* the data packet listener to fill the data queue */
  207.         private final StanzaListener dataPacketListener;

  208.         /* queue containing received In-Band Bytestream data packets */
  209.         protected final BlockingQueue<DataPacketExtension> dataQueue = new LinkedBlockingQueue<DataPacketExtension>();

  210.         /* buffer containing the data from one data packet */
  211.         private byte[] buffer;

  212.         /* pointer to the next byte to read from buffer */
  213.         private int bufferPointer = -1;

  214.         /* data packet sequence (range from 0 to 65535) */
  215.         private long seq = -1;

  216.         /* flag to indicate if input stream is closed */
  217.         private boolean isClosed = false;

  218.         /* flag to indicate if close method was invoked */
  219.         private boolean closeInvoked = false;

  220.         /* timeout for read operations */
  221.         private int readTimeout = 0;

  222.         /**
  223.          * Constructor.
  224.          */
  225.         public IBBInputStream() {
  226.             // add data packet listener to connection
  227.             this.dataPacketListener = getDataPacketListener();
  228.             connection.addSyncStanzaListener(this.dataPacketListener, getDataPacketFilter());
  229.         }

  230.         /**
  231.          * Returns the packet listener that processes In-Band Bytestream data packets.
  232.          *
  233.          * @return the data packet listener
  234.          */
  235.         protected abstract StanzaListener getDataPacketListener();

  236.         /**
  237.          * Returns the packet filter that accepts In-Band Bytestream data packets.
  238.          *
  239.          * @return the data packet filter
  240.          */
  241.         protected abstract StanzaFilter getDataPacketFilter();

  242.         public synchronized int read() throws IOException {
  243.             checkClosed();

  244.             // if nothing read yet or whole buffer has been read fill buffer
  245.             if (bufferPointer == -1 || bufferPointer >= buffer.length) {
  246.                 // if no data available and stream was closed return -1
  247.                 if (!loadBuffer()) {
  248.                     return -1;
  249.                 }
  250.             }

  251.             // return byte and increment buffer pointer
  252.             return ((int) buffer[bufferPointer++]) & 0xff;
  253.         }

  254.         public synchronized int read(byte[] b, int off, int len) throws IOException {
  255.             if (b == null) {
  256.                 throw new NullPointerException();
  257.             }
  258.             else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
  259.                             || ((off + len) < 0)) {
  260.                 throw new IndexOutOfBoundsException();
  261.             }
  262.             else if (len == 0) {
  263.                 return 0;
  264.             }

  265.             checkClosed();

  266.             // if nothing read yet or whole buffer has been read fill buffer
  267.             if (bufferPointer == -1 || bufferPointer >= buffer.length) {
  268.                 // if no data available and stream was closed return -1
  269.                 if (!loadBuffer()) {
  270.                     return -1;
  271.                 }
  272.             }

  273.             // if more bytes wanted than available return all available
  274.             int bytesAvailable = buffer.length - bufferPointer;
  275.             if (len > bytesAvailable) {
  276.                 len = bytesAvailable;
  277.             }

  278.             System.arraycopy(buffer, bufferPointer, b, off, len);
  279.             bufferPointer += len;
  280.             return len;
  281.         }

  282.         public synchronized int read(byte[] b) throws IOException {
  283.             return read(b, 0, b.length);
  284.         }

  285.         /**
  286.          * This method blocks until a data packet is received, the stream is closed or the current
  287.          * thread is interrupted.
  288.          *
  289.          * @return <code>true</code> if data was received, otherwise <code>false</code>
  290.          * @throws IOException if data packets are out of sequence
  291.          */
  292.         private synchronized boolean loadBuffer() throws IOException {

  293.             // wait until data is available or stream is closed
  294.             DataPacketExtension data = null;
  295.             try {
  296.                 if (this.readTimeout == 0) {
  297.                     while (data == null) {
  298.                         if (isClosed && this.dataQueue.isEmpty()) {
  299.                             return false;
  300.                         }
  301.                         data = this.dataQueue.poll(1000, TimeUnit.MILLISECONDS);
  302.                     }
  303.                 }
  304.                 else {
  305.                     data = this.dataQueue.poll(this.readTimeout, TimeUnit.MILLISECONDS);
  306.                     if (data == null) {
  307.                         throw new SocketTimeoutException();
  308.                     }
  309.                 }
  310.             }
  311.             catch (InterruptedException e) {
  312.                 // Restore the interrupted status
  313.                 Thread.currentThread().interrupt();
  314.                 return false;
  315.             }

  316.             // handle sequence overflow
  317.             if (this.seq == 65535) {
  318.                 this.seq = -1;
  319.             }

  320.             // check if data packets sequence is successor of last seen sequence
  321.             long seq = data.getSeq();
  322.             if (seq - 1 != this.seq) {
  323.                 // packets out of order; close stream/session
  324.                 InBandBytestreamSession.this.close();
  325.                 throw new IOException("Packets out of sequence");
  326.             }
  327.             else {
  328.                 this.seq = seq;
  329.             }

  330.             // set buffer to decoded data
  331.             buffer = data.getDecodedData();
  332.             bufferPointer = 0;
  333.             return true;
  334.         }

  335.         /**
  336.          * Checks if this stream is closed and throws an IOException if necessary
  337.          *
  338.          * @throws IOException if stream is closed and no data should be read anymore
  339.          */
  340.         private void checkClosed() throws IOException {
  341.             // Throw an exception if, and only if, this stream has been already
  342.             // closed by the user using the close() method
  343.             if (closeInvoked) {
  344.                 // clear data queue in case additional data was received after stream was closed
  345.                 this.dataQueue.clear();
  346.                 throw new IOException("Stream is closed");
  347.             }
  348.         }

  349.         public boolean markSupported() {
  350.             return false;
  351.         }

  352.         public void close() throws IOException {
  353.             if (closeInvoked) {
  354.                 return;
  355.             }

  356.             this.closeInvoked = true;

  357.             InBandBytestreamSession.this.closeByLocal(true);
  358.         }

  359.         /**
  360.          * This method sets the close flag and removes the data packet listener.
  361.          */
  362.         private void closeInternal() {
  363.             if (isClosed) {
  364.                 return;
  365.             }
  366.             isClosed = true;
  367.         }

  368.         /**
  369.          * Invoked if the session is closed.
  370.          */
  371.         private void cleanup() {
  372.             connection.removeSyncStanzaListener(this.dataPacketListener);
  373.         }

  374.     }

  375.     /**
  376.      * IQIBBInputStream class implements IBBInputStream to be used with IQ stanzas encapsulating the
  377.      * data packets.
  378.      */
  379.     private class IQIBBInputStream extends IBBInputStream {

  380.         protected StanzaListener getDataPacketListener() {
  381.             return new StanzaListener() {

  382.                 private long lastSequence = -1;

  383.                 public void processPacket(Stanza packet) throws NotConnectedException, InterruptedException {
  384.                     // get data packet extension
  385.                     DataPacketExtension data = ((Data) packet).getDataPacketExtension();

  386.                     /*
  387.                      * check if sequence was not used already (see XEP-0047 Section 2.2)
  388.                      */
  389.                     if (data.getSeq() <= this.lastSequence) {
  390.                         IQ unexpectedRequest = IQ.createErrorResponse((IQ) packet, new XMPPError(
  391.                                         XMPPError.Condition.unexpected_request));
  392.                         connection.sendStanza(unexpectedRequest);
  393.                         return;

  394.                     }

  395.                     // check if encoded data is valid (see XEP-0047 Section 2.2)
  396.                     if (data.getDecodedData() == null) {
  397.                         // data is invalid; respond with bad-request error
  398.                         IQ badRequest = IQ.createErrorResponse((IQ) packet, new XMPPError(
  399.                                         XMPPError.Condition.bad_request));
  400.                         connection.sendStanza(badRequest);
  401.                         return;
  402.                     }

  403.                     // data is valid; add to data queue
  404.                     dataQueue.offer(data);

  405.                     // confirm IQ
  406.                     IQ confirmData = IQ.createResultIQ((IQ) packet);
  407.                     connection.sendStanza(confirmData);

  408.                     // set last seen sequence
  409.                     this.lastSequence = data.getSeq();
  410.                     if (this.lastSequence == 65535) {
  411.                         this.lastSequence = -1;
  412.                     }

  413.                 }

  414.             };
  415.         }

  416.         protected StanzaFilter getDataPacketFilter() {
  417.             /*
  418.              * filter all IQ stanzas having type 'SET' (represented by Data class), containing a
  419.              * data packet extension, matching session ID and recipient
  420.              */
  421.             return new AndFilter(new StanzaTypeFilter(Data.class), new IBBDataPacketFilter());
  422.         }

  423.     }

  424.     /**
  425.      * MessageIBBInputStream class implements IBBInputStream to be used with message stanzas
  426.      * encapsulating the data packets.
  427.      */
  428.     private class MessageIBBInputStream extends IBBInputStream {

  429.         protected StanzaListener getDataPacketListener() {
  430.             return new StanzaListener() {

  431.                 public void processPacket(Stanza packet) {
  432.                     // get data packet extension
  433.                     DataPacketExtension data = (DataPacketExtension) packet.getExtension(
  434.                                     DataPacketExtension.ELEMENT,
  435.                                     DataPacketExtension.NAMESPACE);

  436.                     // check if encoded data is valid
  437.                     if (data.getDecodedData() == null) {
  438.                         /*
  439.                          * TODO once a majority of XMPP server implementation support XEP-0079
  440.                          * Advanced Message Processing the invalid message could be answered with an
  441.                          * appropriate error. For now we just ignore the packet. Subsequent packets
  442.                          * with an increased sequence will cause the input stream to close the
  443.                          * stream/session.
  444.                          */
  445.                         return;
  446.                     }

  447.                     // data is valid; add to data queue
  448.                     dataQueue.offer(data);

  449.                     // TODO confirm packet once XMPP servers support XEP-0079
  450.                 }

  451.             };
  452.         }

  453.         @Override
  454.         protected StanzaFilter getDataPacketFilter() {
  455.             /*
  456.              * filter all message stanzas containing a data packet extension, matching session ID
  457.              * and recipient
  458.              */
  459.             return new AndFilter(new StanzaTypeFilter(Message.class), new IBBDataPacketFilter());
  460.         }

  461.     }

  462.     /**
  463.      * IBBDataPacketFilter class filters all packets from the remote peer of this session,
  464.      * containing an In-Band Bytestream data packet extension whose session ID matches this sessions
  465.      * ID.
  466.      */
  467.     private class IBBDataPacketFilter implements StanzaFilter {

  468.         public boolean accept(Stanza packet) {
  469.             // sender equals remote peer
  470.             if (!packet.getFrom().equals(remoteJID)) {
  471.                 return false;
  472.             }

  473.             DataPacketExtension data;
  474.             if (packet instanceof Data) {
  475.                 data = ((Data) packet).getDataPacketExtension();
  476.             } else {
  477.                 // stanza contains data packet extension
  478.                 data = packet.getExtension(
  479.                         DataPacketExtension.ELEMENT,
  480.                         DataPacketExtension.NAMESPACE);
  481.                 if (data == null) {
  482.                     return false;
  483.                 }
  484.             }

  485.             // session ID equals this session ID
  486.             if (!data.getSessionID().equals(byteStreamRequest.getSessionID())) {
  487.                 return false;
  488.             }

  489.             return true;
  490.         }

  491.     }

  492.     /**
  493.      * IBBOutputStream class is the base implementation of an In-Band Bytestream output stream.
  494.      * Subclasses of this output stream must provide a method to send data over XMPP stream.
  495.      */
  496.     private abstract class IBBOutputStream extends OutputStream {

  497.         /* buffer with the size of this sessions block size */
  498.         protected final byte[] buffer;

  499.         /* pointer to next byte to write to buffer */
  500.         protected int bufferPointer = 0;

  501.         /* data packet sequence (range from 0 to 65535) */
  502.         protected long seq = 0;

  503.         /* flag to indicate if output stream is closed */
  504.         protected boolean isClosed = false;

  505.         /**
  506.          * Constructor.
  507.          */
  508.         public IBBOutputStream() {
  509.             this.buffer = new byte[byteStreamRequest.getBlockSize()];
  510.         }

  511.         /**
  512.          * Writes the given data packet to the XMPP stream.
  513.          *
  514.          * @param data the data packet
  515.          * @throws IOException if an I/O error occurred while sending or if the stream is closed
  516.          * @throws NotConnectedException
  517.          * @throws InterruptedException
  518.          */
  519.         protected abstract void writeToXML(DataPacketExtension data) throws IOException, NotConnectedException, InterruptedException;

  520.         public synchronized void write(int b) throws IOException {
  521.             if (this.isClosed) {
  522.                 throw new IOException("Stream is closed");
  523.             }

  524.             // if buffer is full flush buffer
  525.             if (bufferPointer >= buffer.length) {
  526.                 flushBuffer();
  527.             }

  528.             buffer[bufferPointer++] = (byte) b;
  529.         }

  530.         public synchronized void write(byte[] b, int off, int len) throws IOException {
  531.             if (b == null) {
  532.                 throw new NullPointerException();
  533.             }
  534.             else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
  535.                             || ((off + len) < 0)) {
  536.                 throw new IndexOutOfBoundsException();
  537.             }
  538.             else if (len == 0) {
  539.                 return;
  540.             }

  541.             if (this.isClosed) {
  542.                 throw new IOException("Stream is closed");
  543.             }

  544.             // is data to send greater than buffer size
  545.             if (len >= buffer.length) {

  546.                 // "byte" off the first chunk to write out
  547.                 writeOut(b, off, buffer.length);

  548.                 // recursively call this method with the lesser amount
  549.                 write(b, off + buffer.length, len - buffer.length);
  550.             }
  551.             else {
  552.                 writeOut(b, off, len);
  553.             }
  554.         }

  555.         public synchronized void write(byte[] b) throws IOException {
  556.             write(b, 0, b.length);
  557.         }

  558.         /**
  559.          * Fills the buffer with the given data and sends it over the XMPP stream if the buffers
  560.          * capacity has been reached. This method is only called from this class so it is assured
  561.          * that the amount of data to send is <= buffer capacity
  562.          *
  563.          * @param b the data
  564.          * @param off the data
  565.          * @param len the number of bytes to write
  566.          * @throws IOException if an I/O error occurred while sending or if the stream is closed
  567.          */
  568.         private synchronized void writeOut(byte[] b, int off, int len) throws IOException {
  569.             if (this.isClosed) {
  570.                 throw new IOException("Stream is closed");
  571.             }

  572.             // set to 0 in case the next 'if' block is not executed
  573.             int available = 0;

  574.             // is data to send greater that buffer space left
  575.             if (len > buffer.length - bufferPointer) {
  576.                 // fill buffer to capacity and send it
  577.                 available = buffer.length - bufferPointer;
  578.                 System.arraycopy(b, off, buffer, bufferPointer, available);
  579.                 bufferPointer += available;
  580.                 flushBuffer();
  581.             }

  582.             // copy the data left to buffer
  583.             System.arraycopy(b, off + available, buffer, bufferPointer, len - available);
  584.             bufferPointer += len - available;
  585.         }

  586.         public synchronized void flush() throws IOException {
  587.             if (this.isClosed) {
  588.                 throw new IOException("Stream is closed");
  589.             }
  590.             flushBuffer();
  591.         }

  592.         private synchronized void flushBuffer() throws IOException {

  593.             // do nothing if no data to send available
  594.             if (bufferPointer == 0) {
  595.                 return;
  596.             }

  597.             // create data packet
  598.             String enc = Base64.encodeToString(buffer, 0, bufferPointer);
  599.             DataPacketExtension data = new DataPacketExtension(byteStreamRequest.getSessionID(),
  600.                             this.seq, enc);

  601.             // write to XMPP stream
  602.             try {
  603.                 writeToXML(data);
  604.             }
  605.             catch (InterruptedException | NotConnectedException e) {
  606.                 IOException ioException = new IOException();
  607.                 ioException.initCause(e);
  608.                 throw ioException;
  609.             }

  610.             // reset buffer pointer
  611.             bufferPointer = 0;

  612.             // increment sequence, considering sequence overflow
  613.             this.seq = (this.seq + 1 == 65535 ? 0 : this.seq + 1);

  614.         }

  615.         public void close() throws IOException {
  616.             if (isClosed) {
  617.                 return;
  618.             }
  619.             InBandBytestreamSession.this.closeByLocal(false);
  620.         }

  621.         /**
  622.          * Sets the close flag and optionally flushes the stream.
  623.          *
  624.          * @param flush if <code>true</code> flushes the stream
  625.          */
  626.         protected void closeInternal(boolean flush) {
  627.             if (this.isClosed) {
  628.                 return;
  629.             }
  630.             this.isClosed = true;

  631.             try {
  632.                 if (flush) {
  633.                     flushBuffer();
  634.                 }
  635.             }
  636.             catch (IOException e) {
  637.                 /*
  638.                  * ignore, because writeToXML() will not throw an exception if stream is already
  639.                  * closed
  640.                  */
  641.             }
  642.         }

  643.     }

  644.     /**
  645.      * IQIBBOutputStream class implements IBBOutputStream to be used with IQ stanzas encapsulating
  646.      * the data packets.
  647.      */
  648.     private class IQIBBOutputStream extends IBBOutputStream {

  649.         @Override
  650.         protected synchronized void writeToXML(DataPacketExtension data) throws IOException {
  651.             // create IQ stanza containing data packet
  652.             IQ iq = new Data(data);
  653.             iq.setTo(remoteJID);

  654.             try {
  655.                 connection.createPacketCollectorAndSend(iq).nextResultOrThrow();
  656.             }
  657.             catch (Exception e) {
  658.                 // close session unless it is already closed
  659.                 if (!this.isClosed) {
  660.                     InBandBytestreamSession.this.close();
  661.                     // Sadly we are unable to use the IOException(Throwable) constructor because this
  662.                     // constructor is only supported from Android API 9 on.
  663.                     IOException ioException = new IOException();
  664.                     ioException.initCause(e);
  665.                     throw ioException;
  666.                 }
  667.             }

  668.         }

  669.     }

  670.     /**
  671.      * MessageIBBOutputStream class implements IBBOutputStream to be used with message stanzas
  672.      * encapsulating the data packets.
  673.      */
  674.     private class MessageIBBOutputStream extends IBBOutputStream {

  675.         @Override
  676.         protected synchronized void writeToXML(DataPacketExtension data) throws NotConnectedException, InterruptedException {
  677.             // create message stanza containing data packet
  678.             Message message = new Message(remoteJID);
  679.             message.addExtension(data);

  680.             connection.sendStanza(message);

  681.         }

  682.     }

  683.     /**
  684.      * @param data
  685.      * @throws NotConnectedException
  686.      * @throws InterruptedException
  687.      */
  688.     public void processIQPacket(Data data) throws NotConnectedException, InterruptedException {
  689.         inputStream.dataPacketListener.processPacket(data);
  690.     }

  691. }