InBandBytestreamSession.java

  1. /**
  2.  *
  3.  * Copyright the original author or authors
  4.  *
  5.  * Licensed under the Apache License, Version 2.0 (the "License");
  6.  * you may not use this file except in compliance with the License.
  7.  * You may obtain a copy of the License at
  8.  *
  9.  *     http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */
  17. package org.jivesoftware.smackx.bytestreams.ibb;

  18. import java.io.IOException;
  19. import java.io.InputStream;
  20. import java.io.OutputStream;
  21. import java.net.SocketTimeoutException;
  22. import java.util.concurrent.BlockingQueue;
  23. import java.util.concurrent.LinkedBlockingQueue;
  24. import java.util.concurrent.TimeUnit;

  25. import org.jivesoftware.smack.SmackException.NotConnectedException;
  26. import org.jivesoftware.smack.SmackException.NotLoggedInException;
  27. import org.jivesoftware.smack.StanzaListener;
  28. import org.jivesoftware.smack.XMPPConnection;
  29. import org.jivesoftware.smack.filter.AndFilter;
  30. import org.jivesoftware.smack.filter.StanzaFilter;
  31. import org.jivesoftware.smack.filter.StanzaTypeFilter;
  32. import org.jivesoftware.smack.packet.IQ;
  33. import org.jivesoftware.smack.packet.Message;
  34. import org.jivesoftware.smack.packet.Stanza;
  35. import org.jivesoftware.smack.packet.StanzaError;
  36. import org.jivesoftware.smack.util.stringencoder.Base64;

  37. import org.jivesoftware.smackx.bytestreams.BytestreamSession;
  38. import org.jivesoftware.smackx.bytestreams.ibb.packet.Close;
  39. import org.jivesoftware.smackx.bytestreams.ibb.packet.Data;
  40. import org.jivesoftware.smackx.bytestreams.ibb.packet.DataPacketExtension;
  41. import org.jivesoftware.smackx.bytestreams.ibb.packet.Open;

  42. import org.jxmpp.jid.Jid;

  43. /**
  44.  * InBandBytestreamSession class represents an In-Band Bytestream session.
  45.  * <p>
  46.  * In-band bytestreams are bidirectional and this session encapsulates the streams for both
  47.  * directions.
  48.  * <p>
  49.  * Note that closing the In-Band Bytestream session will close both streams. If both streams are
  50.  * closed individually the session will be closed automatically once the second stream is closed.
  51.  * Use the {@link #setCloseBothStreamsEnabled(boolean)} method if both streams should be closed
  52.  * automatically if one of them is closed.
  53.  *
  54.  * @author Henning Staib
  55.  */
  56. public class InBandBytestreamSession implements BytestreamSession {

  57.     /* XMPP connection */
  58.     private final XMPPConnection connection;

  59.     /* the In-Band Bytestream open request for this session */
  60.     private final Open byteStreamRequest;

  61.     /*
  62.      * the input stream for this session (either IQIBBInputStream or MessageIBBInputStream)
  63.      */
  64.     private IBBInputStream inputStream;

  65.     /*
  66.      * the output stream for this session (either IQIBBOutputStream or MessageIBBOutputStream)
  67.      */
  68.     private IBBOutputStream outputStream;

  69.     /* JID of the remote peer */
  70.     private Jid remoteJID;

  71.     /* flag to close both streams if one of them is closed */
  72.     private boolean closeBothStreamsEnabled = false;

  73.     /* flag to indicate if session is closed */
  74.     private boolean isClosed = false;

  75.     /**
  76.      * Constructor.
  77.      *
  78.      * @param connection the XMPP connection
  79.      * @param byteStreamRequest the In-Band Bytestream open request for this session
  80.      * @param remoteJID JID of the remote peer
  81.      */
  82.     protected InBandBytestreamSession(XMPPConnection connection, Open byteStreamRequest,
  83.                     Jid remoteJID) {
  84.         this.connection = connection;
  85.         this.byteStreamRequest = byteStreamRequest;
  86.         this.remoteJID = remoteJID;

  87.         // initialize streams dependent to the uses stanza type
  88.         switch (byteStreamRequest.getStanza()) {
  89.         case IQ:
  90.             this.inputStream = new IQIBBInputStream();
  91.             this.outputStream = new IQIBBOutputStream();
  92.             break;
  93.         case MESSAGE:
  94.             this.inputStream = new MessageIBBInputStream();
  95.             this.outputStream = new MessageIBBOutputStream();
  96.             break;
  97.         }

  98.     }

  99.     @Override
  100.     public InputStream getInputStream() {
  101.         return this.inputStream;
  102.     }

  103.     @Override
  104.     public OutputStream getOutputStream() {
  105.         return this.outputStream;
  106.     }

  107.     @Override
  108.     public int getReadTimeout() {
  109.         return this.inputStream.readTimeout;
  110.     }

  111.     @Override
  112.     public void setReadTimeout(int timeout) {
  113.         if (timeout < 0) {
  114.             throw new IllegalArgumentException("Timeout must be >= 0");
  115.         }
  116.         this.inputStream.readTimeout = timeout;
  117.     }

  118.     /**
  119.      * Returns whether both streams should be closed automatically if one of the streams is closed.
  120.      * Default is <code>false</code>.
  121.      *
  122.      * @return <code>true</code> if both streams will be closed if one of the streams is closed,
  123.      *         <code>false</code> if both streams can be closed independently.
  124.      */
  125.     public boolean isCloseBothStreamsEnabled() {
  126.         return closeBothStreamsEnabled;
  127.     }

  128.     /**
  129.      * Sets whether both streams should be closed automatically if one of the streams is closed.
  130.      * Default is <code>false</code>.
  131.      *
  132.      * @param closeBothStreamsEnabled <code>true</code> if both streams should be closed if one of
  133.      *        the streams is closed, <code>false</code> if both streams should be closed
  134.      *        independently
  135.      */
  136.     public void setCloseBothStreamsEnabled(boolean closeBothStreamsEnabled) {
  137.         this.closeBothStreamsEnabled = closeBothStreamsEnabled;
  138.     }

  139.     @Override
  140.     public void close() throws IOException {
  141.         closeByLocal(true); // close input stream
  142.         closeByLocal(false); // close output stream
  143.     }

  144.     /**
  145.      * This method is invoked if a request to close the In-Band Bytestream has been received.
  146.      *
  147.      * @param closeRequest the close request from the remote peer
  148.      * @throws NotConnectedException
  149.      * @throws InterruptedException
  150.      */
  151.     protected void closeByPeer(Close closeRequest) throws NotConnectedException, InterruptedException {

  152.         /*
  153.          * close streams without flushing them, because stream is already considered closed on the
  154.          * remote peers side
  155.          */
  156.         this.inputStream.closeInternal();
  157.         this.inputStream.cleanup();
  158.         this.outputStream.closeInternal(false);

  159.         // acknowledge close request
  160.         IQ confirmClose = IQ.createResultIQ(closeRequest);
  161.         this.connection.sendStanza(confirmClose);

  162.     }

  163.     /**
  164.      * This method is invoked if one of the streams has been closed locally, if an error occurred
  165.      * locally or if the whole session should be closed.
  166.      *
  167.      * @param in do we want to close the Input- or OutputStream?
  168.      * @throws IOException if an error occurs while sending the close request
  169.      */
  170.     protected synchronized void closeByLocal(boolean in) throws IOException {
  171.         if (this.isClosed) {
  172.             return;
  173.         }

  174.         if (this.closeBothStreamsEnabled) {
  175.             this.inputStream.closeInternal();
  176.             this.outputStream.closeInternal(true);
  177.         }
  178.         else {
  179.             if (in) {
  180.                 this.inputStream.closeInternal();
  181.             }
  182.             else {
  183.                 // close stream but try to send any data left
  184.                 this.outputStream.closeInternal(true);
  185.             }
  186.         }

  187.         if (this.inputStream.isClosed && this.outputStream.isClosed) {
  188.             this.isClosed = true;

  189.             // send close request
  190.             Close close = new Close(this.byteStreamRequest.getSessionID());
  191.             close.setTo(this.remoteJID);
  192.             try {
  193.                 connection.createStanzaCollectorAndSend(close).nextResultOrThrow();
  194.             }
  195.             catch (Exception e) {
  196.                 // Sadly we are unable to use the IOException(Throwable) constructor because this
  197.                 // constructor is only supported from Android API 9 on.
  198.                 IOException ioException = new IOException();
  199.                 ioException.initCause(e);
  200.                 throw ioException;
  201.             }

  202.             this.inputStream.cleanup();

  203.             // remove session from manager
  204.             // Thanks Google Error Prone for finding the bug where remove() was called with 'this' as argument. Changed
  205.             // now to remove(byteStreamRequest.getSessionID).
  206.             InBandBytestreamManager.getByteStreamManager(this.connection).getSessions().remove(byteStreamRequest.getSessionID());
  207.         }

  208.     }

  209.     /**
  210.      * IBBInputStream class is the base implementation of an In-Band Bytestream input stream.
  211.      * Subclasses of this input stream must provide a stanza listener along with a stanza filter to
  212.      * collect the In-Band Bytestream data packets.
  213.      */
  214.     private abstract class IBBInputStream extends InputStream {

  215.         /* the data packet listener to fill the data queue */
  216.         private final StanzaListener dataPacketListener;

  217.         /* queue containing received In-Band Bytestream data packets */
  218.         protected final BlockingQueue<DataPacketExtension> dataQueue = new LinkedBlockingQueue<DataPacketExtension>();

  219.         /* buffer containing the data from one data packet */
  220.         private byte[] buffer;

  221.         /* pointer to the next byte to read from buffer */
  222.         private int bufferPointer = -1;

  223.         /* data packet sequence (range from 0 to 65535) */
  224.         private long seq = -1;

  225.         /* flag to indicate if input stream is closed */
  226.         private boolean isClosed = false;

  227.         /* flag to indicate if close method was invoked */
  228.         private boolean closeInvoked = false;

  229.         /* timeout for read operations */
  230.         private int readTimeout = 0;

  231.         /**
  232.          * Constructor.
  233.          */
  234.         protected IBBInputStream() {
  235.             // add data packet listener to connection
  236.             this.dataPacketListener = getDataPacketListener();
  237.             connection.addSyncStanzaListener(this.dataPacketListener, getDataPacketFilter());
  238.         }

  239.         /**
  240.          * Returns the stanza listener that processes In-Band Bytestream data packets.
  241.          *
  242.          * @return the data stanza listener
  243.          */
  244.         protected abstract StanzaListener getDataPacketListener();

  245.         /**
  246.          * Returns the stanza filter that accepts In-Band Bytestream data packets.
  247.          *
  248.          * @return the data stanza filter
  249.          */
  250.         protected abstract StanzaFilter getDataPacketFilter();

  251.         @Override
  252.         public synchronized int read() throws IOException {
  253.             checkClosed();

  254.             // if nothing read yet or whole buffer has been read fill buffer
  255.             if (bufferPointer == -1 || bufferPointer >= buffer.length) {
  256.                 // if no data available and stream was closed return -1
  257.                 if (!loadBuffer()) {
  258.                     return -1;
  259.                 }
  260.             }

  261.             // return byte and increment buffer pointer
  262.             return buffer[bufferPointer++] & 0xff;
  263.         }

  264.         @Override
  265.         public synchronized int read(byte[] b, int off, int len) throws IOException {
  266.             if (b == null) {
  267.                 throw new NullPointerException();
  268.             }
  269.             else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
  270.                             || ((off + len) < 0)) {
  271.                 throw new IndexOutOfBoundsException();
  272.             }
  273.             else if (len == 0) {
  274.                 return 0;
  275.             }

  276.             checkClosed();

  277.             // if nothing read yet or whole buffer has been read fill buffer
  278.             if (bufferPointer == -1 || bufferPointer >= buffer.length) {
  279.                 // if no data available and stream was closed return -1
  280.                 if (!loadBuffer()) {
  281.                     return -1;
  282.                 }
  283.             }

  284.             // if more bytes wanted than available return all available
  285.             int bytesAvailable = buffer.length - bufferPointer;
  286.             if (len > bytesAvailable) {
  287.                 len = bytesAvailable;
  288.             }

  289.             System.arraycopy(buffer, bufferPointer, b, off, len);
  290.             bufferPointer += len;
  291.             return len;
  292.         }

  293.         @Override
  294.         public synchronized int read(byte[] b) throws IOException {
  295.             return read(b, 0, b.length);
  296.         }

  297.         /**
  298.          * This method blocks until a data stanza is received, the stream is closed or the current
  299.          * thread is interrupted.
  300.          *
  301.          * @return <code>true</code> if data was received, otherwise <code>false</code>
  302.          * @throws IOException if data packets are out of sequence
  303.          */
  304.         private synchronized boolean loadBuffer() throws IOException {

  305.             // wait until data is available or stream is closed
  306.             DataPacketExtension data = null;
  307.             try {
  308.                 if (this.readTimeout == 0) {
  309.                     while (data == null) {
  310.                         if (isClosed && this.dataQueue.isEmpty()) {
  311.                             return false;
  312.                         }
  313.                         data = this.dataQueue.poll(1000, TimeUnit.MILLISECONDS);
  314.                     }
  315.                 }
  316.                 else {
  317.                     data = this.dataQueue.poll(this.readTimeout, TimeUnit.MILLISECONDS);
  318.                     if (data == null) {
  319.                         throw new SocketTimeoutException();
  320.                     }
  321.                 }
  322.             }
  323.             catch (InterruptedException e) {
  324.                 // Restore the interrupted status
  325.                 Thread.currentThread().interrupt();
  326.                 return false;
  327.             }

  328.             // handle sequence overflow
  329.             if (this.seq == 65535) {
  330.                 this.seq = -1;
  331.             }

  332.             // check if data packets sequence is successor of last seen sequence
  333.             long seq = data.getSeq();
  334.             if (seq - 1 != this.seq) {
  335.                 // packets out of order; close stream/session
  336.                 InBandBytestreamSession.this.close();
  337.                 throw new IOException("Packets out of sequence");
  338.             }
  339.             else {
  340.                 this.seq = seq;
  341.             }

  342.             // set buffer to decoded data
  343.             buffer = data.getDecodedData();
  344.             bufferPointer = 0;
  345.             return true;
  346.         }

  347.         /**
  348.          * Checks if this stream is closed and throws an IOException if necessary
  349.          *
  350.          * @throws IOException if stream is closed and no data should be read anymore
  351.          */
  352.         private void checkClosed() throws IOException {
  353.             // Throw an exception if, and only if, this stream has been already
  354.             // closed by the user using the close() method
  355.             if (closeInvoked) {
  356.                 // clear data queue in case additional data was received after stream was closed
  357.                 this.dataQueue.clear();
  358.                 throw new IOException("Stream is closed");
  359.             }
  360.         }

  361.         @Override
  362.         public boolean markSupported() {
  363.             return false;
  364.         }

  365.         @Override
  366.         public void close() throws IOException {
  367.             if (closeInvoked) {
  368.                 return;
  369.             }

  370.             this.closeInvoked = true;

  371.             InBandBytestreamSession.this.closeByLocal(true);
  372.         }

  373.         /**
  374.          * This method sets the close flag and removes the data stanza listener.
  375.          */
  376.         private void closeInternal() {
  377.             if (isClosed) {
  378.                 return;
  379.             }
  380.             isClosed = true;
  381.         }

  382.         /**
  383.          * Invoked if the session is closed.
  384.          */
  385.         private void cleanup() {
  386.             connection.removeSyncStanzaListener(this.dataPacketListener);
  387.         }

  388.     }

  389.     /**
  390.      * IQIBBInputStream class implements IBBInputStream to be used with IQ stanzas encapsulating the
  391.      * data packets.
  392.      */
  393.     private class IQIBBInputStream extends IBBInputStream {

  394.         @Override
  395.         protected StanzaListener getDataPacketListener() {
  396.             return new StanzaListener() {

  397.                 private long lastSequence = -1;

  398.                 @Override
  399.                 public void processStanza(Stanza packet) throws NotConnectedException, InterruptedException {
  400.                     // get data packet extension
  401.                     DataPacketExtension data = ((Data) packet).getDataPacketExtension();

  402.                     /*
  403.                      * check if sequence was not used already (see XEP-0047 Section 2.2)
  404.                      */
  405.                     if (data.getSeq() <= this.lastSequence) {
  406.                         IQ unexpectedRequest = IQ.createErrorResponse((IQ) packet,
  407.                                         StanzaError.Condition.unexpected_request);
  408.                         connection.sendStanza(unexpectedRequest);
  409.                         return;

  410.                     }

  411.                     // check if encoded data is valid (see XEP-0047 Section 2.2)
  412.                     if (data.getDecodedData() == null) {
  413.                         // data is invalid; respond with bad-request error
  414.                         IQ badRequest = IQ.createErrorResponse((IQ) packet,
  415.                                         StanzaError.Condition.bad_request);
  416.                         connection.sendStanza(badRequest);
  417.                         return;
  418.                     }

  419.                     // data is valid; add to data queue
  420.                     dataQueue.offer(data);

  421.                     // confirm IQ
  422.                     IQ confirmData = IQ.createResultIQ((IQ) packet);
  423.                     connection.sendStanza(confirmData);

  424.                     // set last seen sequence
  425.                     this.lastSequence = data.getSeq();
  426.                     if (this.lastSequence == 65535) {
  427.                         this.lastSequence = -1;
  428.                     }

  429.                 }

  430.             };
  431.         }

  432.         @Override
  433.         protected StanzaFilter getDataPacketFilter() {
  434.             /*
  435.              * filter all IQ stanzas having type 'SET' (represented by Data class), containing a
  436.              * data stanza extension, matching session ID and recipient
  437.              */
  438.             return new AndFilter(new StanzaTypeFilter(Data.class), new IBBDataPacketFilter());
  439.         }

  440.     }

  441.     /**
  442.      * MessageIBBInputStream class implements IBBInputStream to be used with message stanzas
  443.      * encapsulating the data packets.
  444.      */
  445.     private class MessageIBBInputStream extends IBBInputStream {

  446.         @Override
  447.         protected StanzaListener getDataPacketListener() {
  448.             return new StanzaListener() {

  449.                 @Override
  450.                 public void processStanza(Stanza packet) {
  451.                     // get data packet extension
  452.                     DataPacketExtension data = packet.getExtension(
  453.                                     DataPacketExtension.ELEMENT,
  454.                                     DataPacketExtension.NAMESPACE);

  455.                     // check if encoded data is valid
  456.                     if (data.getDecodedData() == null) {
  457.                         /*
  458.                          * TODO once a majority of XMPP server implementation support XEP-0079
  459.                          * Advanced Message Processing the invalid message could be answered with an
  460.                          * appropriate error. For now we just ignore the packet. Subsequent packets
  461.                          * with an increased sequence will cause the input stream to close the
  462.                          * stream/session.
  463.                          */
  464.                         return;
  465.                     }

  466.                     // data is valid; add to data queue
  467.                     dataQueue.offer(data);

  468.                     // TODO confirm packet once XMPP servers support XEP-0079
  469.                 }

  470.             };
  471.         }

  472.         @Override
  473.         protected StanzaFilter getDataPacketFilter() {
  474.             /*
  475.              * filter all message stanzas containing a data stanza extension, matching session ID
  476.              * and recipient
  477.              */
  478.             return new AndFilter(new StanzaTypeFilter(Message.class), new IBBDataPacketFilter());
  479.         }

  480.     }

  481.     /**
  482.      * IBBDataPacketFilter class filters all packets from the remote peer of this session,
  483.      * containing an In-Band Bytestream data stanza extension whose session ID matches this sessions
  484.      * ID.
  485.      */
  486.     private class IBBDataPacketFilter implements StanzaFilter {

  487.         @Override
  488.         public boolean accept(Stanza packet) {
  489.             // sender equals remote peer
  490.             if (!packet.getFrom().equals(remoteJID)) {
  491.                 return false;
  492.             }

  493.             DataPacketExtension data;
  494.             if (packet instanceof Data) {
  495.                 data = ((Data) packet).getDataPacketExtension();
  496.             } else {
  497.                 // stanza contains data packet extension
  498.                 data = packet.getExtension(
  499.                         DataPacketExtension.ELEMENT,
  500.                         DataPacketExtension.NAMESPACE);
  501.                 if (data == null) {
  502.                     return false;
  503.                 }
  504.             }

  505.             // session ID equals this session ID
  506.             if (!data.getSessionID().equals(byteStreamRequest.getSessionID())) {
  507.                 return false;
  508.             }

  509.             return true;
  510.         }

  511.     }

  512.     /**
  513.      * IBBOutputStream class is the base implementation of an In-Band Bytestream output stream.
  514.      * Subclasses of this output stream must provide a method to send data over XMPP stream.
  515.      */
  516.     private abstract class IBBOutputStream extends OutputStream {

  517.         /* buffer with the size of this sessions block size */
  518.         protected final byte[] buffer;

  519.         /* pointer to next byte to write to buffer */
  520.         protected int bufferPointer = 0;

  521.         /* data packet sequence (range from 0 to 65535) */
  522.         protected long seq = 0;

  523.         /* flag to indicate if output stream is closed */
  524.         protected boolean isClosed = false;

  525.         /**
  526.          * Constructor.
  527.          */
  528.         private IBBOutputStream() {
  529.             this.buffer = new byte[byteStreamRequest.getBlockSize()];
  530.         }

  531.         /**
  532.          * Writes the given data stanza to the XMPP stream.
  533.          *
  534.          * @param data the data packet
  535.          * @throws IOException if an I/O error occurred while sending or if the stream is closed
  536.          * @throws NotConnectedException
  537.          * @throws InterruptedException
  538.          */
  539.         protected abstract void writeToXML(DataPacketExtension data) throws IOException, NotConnectedException, InterruptedException;

  540.         @Override
  541.         public synchronized void write(int b) throws IOException {
  542.             if (this.isClosed) {
  543.                 throw new IOException("Stream is closed");
  544.             }

  545.             // if buffer is full flush buffer
  546.             if (bufferPointer >= buffer.length) {
  547.                 flushBuffer();
  548.             }

  549.             buffer[bufferPointer++] = (byte) b;
  550.         }

  551.         @Override
  552.         public synchronized void write(byte[] b, int off, int len) throws IOException {
  553.             if (b == null) {
  554.                 throw new NullPointerException();
  555.             }
  556.             else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
  557.                             || ((off + len) < 0)) {
  558.                 throw new IndexOutOfBoundsException();
  559.             }
  560.             else if (len == 0) {
  561.                 return;
  562.             }

  563.             if (this.isClosed) {
  564.                 throw new IOException("Stream is closed");
  565.             }

  566.             // is data to send greater than buffer size
  567.             if (len >= buffer.length) {

  568.                 // "byte" off the first chunk to write out
  569.                 writeOut(b, off, buffer.length);

  570.                 // recursively call this method with the lesser amount
  571.                 write(b, off + buffer.length, len - buffer.length);
  572.             }
  573.             else {
  574.                 writeOut(b, off, len);
  575.             }
  576.         }

  577.         @Override
  578.         public synchronized void write(byte[] b) throws IOException {
  579.             write(b, 0, b.length);
  580.         }

  581.         /**
  582.          * Fills the buffer with the given data and sends it over the XMPP stream if the buffers
  583.          * capacity has been reached. This method is only called from this class so it is assured
  584.          * that the amount of data to send is <= buffer capacity
  585.          *
  586.          * @param b the data
  587.          * @param off the data
  588.          * @param len the number of bytes to write
  589.          * @throws IOException if an I/O error occurred while sending or if the stream is closed
  590.          */
  591.         private synchronized void writeOut(byte[] b, int off, int len) throws IOException {
  592.             if (this.isClosed) {
  593.                 throw new IOException("Stream is closed");
  594.             }

  595.             // set to 0 in case the next 'if' block is not executed
  596.             int available = 0;

  597.             // is data to send greater that buffer space left
  598.             if (len > buffer.length - bufferPointer) {
  599.                 // fill buffer to capacity and send it
  600.                 available = buffer.length - bufferPointer;
  601.                 System.arraycopy(b, off, buffer, bufferPointer, available);
  602.                 bufferPointer += available;
  603.                 flushBuffer();
  604.             }

  605.             // copy the data left to buffer
  606.             System.arraycopy(b, off + available, buffer, bufferPointer, len - available);
  607.             bufferPointer += len - available;
  608.         }

  609.         @Override
  610.         public synchronized void flush() throws IOException {
  611.             if (this.isClosed) {
  612.                 throw new IOException("Stream is closed");
  613.             }
  614.             flushBuffer();
  615.         }

  616.         private synchronized void flushBuffer() throws IOException {

  617.             // do nothing if no data to send available
  618.             if (bufferPointer == 0) {
  619.                 return;
  620.             }

  621.             // create data packet
  622.             String enc = Base64.encodeToString(buffer, 0, bufferPointer);
  623.             DataPacketExtension data = new DataPacketExtension(byteStreamRequest.getSessionID(),
  624.                             this.seq, enc);

  625.             // write to XMPP stream
  626.             try {
  627.                 writeToXML(data);
  628.             }
  629.             catch (InterruptedException | NotConnectedException e) {
  630.                 IOException ioException = new IOException();
  631.                 ioException.initCause(e);
  632.                 throw ioException;
  633.             }

  634.             // reset buffer pointer
  635.             bufferPointer = 0;

  636.             // increment sequence, considering sequence overflow
  637.             this.seq = (this.seq + 1 == 65535 ? 0 : this.seq + 1);

  638.         }

  639.         @Override
  640.         public void close() throws IOException {
  641.             if (isClosed) {
  642.                 return;
  643.             }
  644.             InBandBytestreamSession.this.closeByLocal(false);
  645.         }

  646.         /**
  647.          * Sets the close flag and optionally flushes the stream.
  648.          *
  649.          * @param flush if <code>true</code> flushes the stream
  650.          */
  651.         protected void closeInternal(boolean flush) {
  652.             if (this.isClosed) {
  653.                 return;
  654.             }
  655.             this.isClosed = true;

  656.             try {
  657.                 if (flush) {
  658.                     flushBuffer();
  659.                 }
  660.             }
  661.             catch (IOException e) {
  662.                 /*
  663.                  * ignore, because writeToXML() will not throw an exception if stream is already
  664.                  * closed
  665.                  */
  666.             }
  667.         }

  668.     }

  669.     /**
  670.      * IQIBBOutputStream class implements IBBOutputStream to be used with IQ stanzas encapsulating
  671.      * the data packets.
  672.      */
  673.     private class IQIBBOutputStream extends IBBOutputStream {

  674.         @Override
  675.         protected synchronized void writeToXML(DataPacketExtension data) throws IOException {
  676.             // create IQ stanza containing data packet
  677.             IQ iq = new Data(data);
  678.             iq.setTo(remoteJID);

  679.             try {
  680.                 connection.createStanzaCollectorAndSend(iq).nextResultOrThrow();
  681.             }
  682.             catch (Exception e) {
  683.                 // close session unless it is already closed
  684.                 if (!this.isClosed) {
  685.                     InBandBytestreamSession.this.close();
  686.                     // Sadly we are unable to use the IOException(Throwable) constructor because this
  687.                     // constructor is only supported from Android API 9 on.
  688.                     IOException ioException = new IOException();
  689.                     ioException.initCause(e);
  690.                     throw ioException;
  691.                 }
  692.             }

  693.         }

  694.     }

  695.     /**
  696.      * MessageIBBOutputStream class implements IBBOutputStream to be used with message stanzas
  697.      * encapsulating the data packets.
  698.      */
  699.     private class MessageIBBOutputStream extends IBBOutputStream {

  700.         @Override
  701.         protected synchronized void writeToXML(DataPacketExtension data) throws NotConnectedException, InterruptedException {
  702.             // create message stanza containing data packet
  703.             Message message = new Message(remoteJID);
  704.             message.addExtension(data);

  705.             connection.sendStanza(message);

  706.         }

  707.     }

  708.     /**
  709.      * Process IQ stanza.
  710.      * @param data
  711.      * @throws NotConnectedException
  712.      * @throws InterruptedException
  713.      * @throws NotLoggedInException
  714.      */
  715.     public void processIQPacket(Data data) throws NotConnectedException, InterruptedException, NotLoggedInException {
  716.         inputStream.dataPacketListener.processStanza(data);
  717.     }

  718. }