InBandBytestreamSession.java

  1. /**
  2.  *
  3.  * Copyright the original author or authors
  4.  *
  5.  * Licensed under the Apache License, Version 2.0 (the "License");
  6.  * you may not use this file except in compliance with the License.
  7.  * You may obtain a copy of the License at
  8.  *
  9.  *     http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */
  17. package org.jivesoftware.smackx.bytestreams.ibb;

  18. import java.io.IOException;
  19. import java.io.InputStream;
  20. import java.io.OutputStream;
  21. import java.net.SocketTimeoutException;
  22. import java.util.concurrent.BlockingQueue;
  23. import java.util.concurrent.LinkedBlockingQueue;
  24. import java.util.concurrent.TimeUnit;
  25. import java.util.logging.Level;
  26. import java.util.logging.Logger;

  27. import org.jivesoftware.smack.SmackException.NotConnectedException;
  28. import org.jivesoftware.smack.SmackException.NotLoggedInException;
  29. import org.jivesoftware.smack.StanzaListener;
  30. import org.jivesoftware.smack.XMPPConnection;
  31. import org.jivesoftware.smack.datatypes.UInt16;
  32. import org.jivesoftware.smack.filter.AndFilter;
  33. import org.jivesoftware.smack.filter.StanzaFilter;
  34. import org.jivesoftware.smack.filter.StanzaTypeFilter;
  35. import org.jivesoftware.smack.packet.IQ;
  36. import org.jivesoftware.smack.packet.Message;
  37. import org.jivesoftware.smack.packet.Stanza;
  38. import org.jivesoftware.smack.packet.StanzaBuilder;
  39. import org.jivesoftware.smack.packet.StanzaError;
  40. import org.jivesoftware.smack.util.stringencoder.Base64;

  41. import org.jivesoftware.smackx.bytestreams.BytestreamSession;
  42. import org.jivesoftware.smackx.bytestreams.ibb.packet.Close;
  43. import org.jivesoftware.smackx.bytestreams.ibb.packet.Data;
  44. import org.jivesoftware.smackx.bytestreams.ibb.packet.DataPacketExtension;
  45. import org.jivesoftware.smackx.bytestreams.ibb.packet.Open;

  46. import org.jxmpp.jid.Jid;

  47. /**
  48.  * InBandBytestreamSession class represents an In-Band Bytestream session.
  49.  * <p>
  50.  * In-band bytestreams are bidirectional and this session encapsulates the streams for both
  51.  * directions.
  52.  * <p>
  53.  * Note that closing the In-Band Bytestream session will close both streams. If both streams are
  54.  * closed individually the session will be closed automatically once the second stream is closed.
  55.  * Use the {@link #setCloseBothStreamsEnabled(boolean)} method if both streams should be closed
  56.  * automatically if one of them is closed.
  57.  *
  58.  * @author Henning Staib
  59.  */
  60. public class InBandBytestreamSession implements BytestreamSession {

  61.     private static final Logger LOGGER = Logger.getLogger(InBandBytestreamSession.class.getName());

  62.     static final String UNEXPECTED_IBB_SEQUENCE = "Unexpected IBB sequence";

  63.     /* XMPP connection */
  64.     private final XMPPConnection connection;

  65.     /* the In-Band Bytestream open request for this session */
  66.     private final Open byteStreamRequest;

  67.     /*
  68.      * the input stream for this session (either IQIBBInputStream or MessageIBBInputStream)
  69.      */
  70.     private IBBInputStream inputStream;

  71.     /*
  72.      * the output stream for this session (either IQIBBOutputStream or MessageIBBOutputStream)
  73.      */
  74.     private IBBOutputStream outputStream;

  75.     /* JID of the remote peer */
  76.     private Jid remoteJID;

  77.     /* flag to close both streams if one of them is closed */
  78.     private boolean closeBothStreamsEnabled = false;

  79.     /* flag to indicate if session is closed */
  80.     private boolean isClosed = false;

  81.     /**
  82.      * Constructor.
  83.      *
  84.      * @param connection the XMPP connection
  85.      * @param byteStreamRequest the In-Band Bytestream open request for this session
  86.      * @param remoteJID JID of the remote peer
  87.      */
  88.     protected InBandBytestreamSession(XMPPConnection connection, Open byteStreamRequest,
  89.                     Jid remoteJID) {
  90.         this.connection = connection;
  91.         this.byteStreamRequest = byteStreamRequest;
  92.         this.remoteJID = remoteJID;

  93.         // initialize streams dependent to the uses stanza type
  94.         switch (byteStreamRequest.getStanza()) {
  95.         case IQ:
  96.             this.inputStream = new IQIBBInputStream();
  97.             this.outputStream = new IQIBBOutputStream();
  98.             break;
  99.         case MESSAGE:
  100.             this.inputStream = new MessageIBBInputStream();
  101.             this.outputStream = new MessageIBBOutputStream();
  102.             break;
  103.         }

  104.     }

  105.     @Override
  106.     public InputStream getInputStream() {
  107.         return this.inputStream;
  108.     }

  109.     @Override
  110.     public OutputStream getOutputStream() {
  111.         return this.outputStream;
  112.     }

  113.     @Override
  114.     public int getReadTimeout() {
  115.         return this.inputStream.readTimeout;
  116.     }

  117.     @Override
  118.     public void setReadTimeout(int timeout) {
  119.         if (timeout < 0) {
  120.             throw new IllegalArgumentException("Timeout must be >= 0");
  121.         }
  122.         this.inputStream.readTimeout = timeout;
  123.     }

  124.     /**
  125.      * Returns whether both streams should be closed automatically if one of the streams is closed.
  126.      * Default is <code>false</code>.
  127.      *
  128.      * @return <code>true</code> if both streams will be closed if one of the streams is closed,
  129.      *         <code>false</code> if both streams can be closed independently.
  130.      */
  131.     public boolean isCloseBothStreamsEnabled() {
  132.         return closeBothStreamsEnabled;
  133.     }

  134.     /**
  135.      * Sets whether both streams should be closed automatically if one of the streams is closed.
  136.      * Default is <code>false</code>.
  137.      *
  138.      * @param closeBothStreamsEnabled <code>true</code> if both streams should be closed if one of
  139.      *        the streams is closed, <code>false</code> if both streams should be closed
  140.      *        independently
  141.      */
  142.     public void setCloseBothStreamsEnabled(boolean closeBothStreamsEnabled) {
  143.         this.closeBothStreamsEnabled = closeBothStreamsEnabled;
  144.     }

  145.     @Override
  146.     public void close() throws IOException {
  147.         closeByLocal(true); // close input stream
  148.         closeByLocal(false); // close output stream
  149.     }

  150.     /**
  151.      * This method is invoked if a request to close the In-Band Bytestream has been received.
  152.      *
  153.      * @param closeRequest the close request from the remote peer
  154.      * @throws NotConnectedException if the XMPP connection is not connected.
  155.      * @throws InterruptedException if the calling thread was interrupted.
  156.      */
  157.     protected void closeByPeer(Close closeRequest) throws NotConnectedException, InterruptedException {

  158.         /*
  159.          * close streams without flushing them, because stream is already considered closed on the
  160.          * remote peers side
  161.          */
  162.         this.inputStream.closeInternal();
  163.         this.inputStream.cleanup();
  164.         this.outputStream.closeInternal(false);

  165.         // acknowledge close request
  166.         IQ confirmClose = IQ.createResultIQ(closeRequest);
  167.         this.connection.sendStanza(confirmClose);

  168.     }

  169.     /**
  170.      * This method is invoked if one of the streams has been closed locally, if an error occurred
  171.      * locally or if the whole session should be closed.
  172.      *
  173.      * @param in do we want to close the Input- or OutputStream?
  174.      * @throws IOException if an error occurs while sending the close request
  175.      */
  176.     protected synchronized void closeByLocal(boolean in) throws IOException {
  177.         if (this.isClosed) {
  178.             return;
  179.         }

  180.         if (this.closeBothStreamsEnabled) {
  181.             this.inputStream.closeInternal();
  182.             this.outputStream.closeInternal(true);
  183.         }
  184.         else {
  185.             if (in) {
  186.                 this.inputStream.closeInternal();
  187.             }
  188.             else {
  189.                 // close stream but try to send any data left
  190.                 this.outputStream.closeInternal(true);
  191.             }
  192.         }

  193.         if (this.inputStream.isClosed && this.outputStream.isClosed) {
  194.             this.isClosed = true;

  195.             // send close request
  196.             Close close = new Close(this.byteStreamRequest.getSessionID());
  197.             close.setTo(this.remoteJID);
  198.             try {
  199.                 connection.sendIqRequestAndWaitForResponse(close);
  200.             }
  201.             catch (Exception e) {
  202.                 // Sadly we are unable to use the IOException(Throwable) constructor because this
  203.                 // constructor is only supported from Android API 9 on.
  204.                 IOException ioException = new IOException();
  205.                 ioException.initCause(e);
  206.                 throw ioException;
  207.             }

  208.             this.inputStream.cleanup();

  209.             // remove session from manager
  210.             // Thanks Google Error Prone for finding the bug where remove() was called with 'this' as argument. Changed
  211.             // now to remove(byteStreamRequest.getSessionID).
  212.             InBandBytestreamManager.getByteStreamManager(this.connection).getSessions().remove(byteStreamRequest.getSessionID());
  213.         }

  214.     }

  215.     /**
  216.      * IBBInputStream class is the base implementation of an In-Band Bytestream input stream.
  217.      * Subclasses of this input stream must provide a stanza listener along with a stanza filter to
  218.      * collect the In-Band Bytestream data packets.
  219.      */
  220.     private abstract class IBBInputStream extends InputStream {

  221.         /* the data packet listener to fill the data queue */
  222.         private final StanzaListener dataPacketListener;

  223.         /* queue containing received In-Band Bytestream data packets */
  224.         protected final BlockingQueue<DataPacketExtension> dataQueue = new LinkedBlockingQueue<DataPacketExtension>();

  225.         /* buffer containing the data from one data packet */
  226.         private byte[] buffer;

  227.         /* pointer to the next byte to read from buffer */
  228.         private int bufferPointer = -1;

  229.         /* data packet sequence (range from 0 to 65535) */
  230.         private UInt16 expectedSeq = UInt16.MIN_VALUE;

  231.         /* flag to indicate if input stream is closed */
  232.         private boolean isClosed = false;

  233.         /* flag to indicate if close method was invoked */
  234.         private boolean closeInvoked = false;

  235.         /* timeout for read operations */
  236.         private int readTimeout = 0;

  237.         /**
  238.          * Constructor.
  239.          */
  240.         protected IBBInputStream() {
  241.             // add data packet listener to connection
  242.             this.dataPacketListener = getDataPacketListener();
  243.             connection.addSyncStanzaListener(this.dataPacketListener, getDataPacketFilter());
  244.         }

  245.         /**
  246.          * Returns the stanza listener that processes In-Band Bytestream data packets.
  247.          *
  248.          * @return the data stanza listener
  249.          */
  250.         protected abstract StanzaListener getDataPacketListener();

  251.         /**
  252.          * Returns the stanza filter that accepts In-Band Bytestream data packets.
  253.          *
  254.          * @return the data stanza filter
  255.          */
  256.         protected abstract StanzaFilter getDataPacketFilter();

  257.         @Override
  258.         public synchronized int read() throws IOException {
  259.             checkClosed();

  260.             // if nothing read yet or whole buffer has been read fill buffer
  261.             if (bufferPointer == -1 || bufferPointer >= buffer.length) {
  262.                 // if no data available and stream was closed return -1
  263.                 if (!loadBuffer()) {
  264.                     return -1;
  265.                 }
  266.             }

  267.             // return byte and increment buffer pointer
  268.             return buffer[bufferPointer++] & 0xff;
  269.         }

  270.         @Override
  271.         public synchronized int read(byte[] b, int off, int len) throws IOException {
  272.             if (b == null) {
  273.                 throw new NullPointerException();
  274.             }
  275.             else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
  276.                             || ((off + len) < 0)) {
  277.                 throw new IndexOutOfBoundsException();
  278.             }
  279.             else if (len == 0) {
  280.                 return 0;
  281.             }

  282.             checkClosed();

  283.             // if nothing read yet or whole buffer has been read fill buffer
  284.             if (bufferPointer == -1 || bufferPointer >= buffer.length) {
  285.                 // if no data available and stream was closed return -1
  286.                 if (!loadBuffer()) {
  287.                     return -1;
  288.                 }
  289.             }

  290.             // if more bytes wanted than available return all available
  291.             int bytesAvailable = buffer.length - bufferPointer;
  292.             if (len > bytesAvailable) {
  293.                 len = bytesAvailable;
  294.             }

  295.             System.arraycopy(buffer, bufferPointer, b, off, len);
  296.             bufferPointer += len;
  297.             return len;
  298.         }

  299.         @Override
  300.         public synchronized int read(byte[] b) throws IOException {
  301.             return read(b, 0, b.length);
  302.         }

  303.         /**
  304.          * This method blocks until a data stanza is received, the stream is closed or the current
  305.          * thread is interrupted.
  306.          *
  307.          * @return <code>true</code> if data was received, otherwise <code>false</code>
  308.          * @throws IOException if data packets are out of sequence
  309.          */
  310.         private synchronized boolean loadBuffer() throws IOException {

  311.             // wait until data is available or stream is closed
  312.             DataPacketExtension data = null;
  313.             try {
  314.                 if (this.readTimeout == 0) {
  315.                     while (data == null) {
  316.                         if (isClosed && this.dataQueue.isEmpty()) {
  317.                             return false;
  318.                         }
  319.                         data = this.dataQueue.poll(1000, TimeUnit.MILLISECONDS);
  320.                     }
  321.                 }
  322.                 else {
  323.                     data = this.dataQueue.poll(this.readTimeout, TimeUnit.MILLISECONDS);
  324.                     if (data == null) {
  325.                         throw new SocketTimeoutException();
  326.                     }
  327.                 }
  328.             }
  329.             catch (InterruptedException e) {
  330.                 // Restore the interrupted status
  331.                 Thread.currentThread().interrupt();
  332.                 return false;
  333.             }

  334.             final UInt16 dataSeq = data.getSeq();
  335.             // check if data packets sequence is successor of last seen sequence
  336.             if (!expectedSeq.equals(dataSeq)) {
  337.                 // packets out of order; close stream/session
  338.                 InBandBytestreamSession.this.close();
  339.                 String message = UNEXPECTED_IBB_SEQUENCE + " " + dataSeq + " received, expected "
  340.                                 + expectedSeq;
  341.                 throw new IOException(message);
  342.             }
  343.             expectedSeq = dataSeq.incrementedByOne();

  344.             // set buffer to decoded data
  345.             buffer = data.getDecodedData();
  346.             bufferPointer = 0;
  347.             return true;
  348.         }

  349.         /**
  350.          * Checks if this stream is closed and throws an IOException if necessary
  351.          *
  352.          * @throws IOException if stream is closed and no data should be read anymore
  353.          */
  354.         private void checkClosed() throws IOException {
  355.             // Throw an exception if, and only if, this stream has been already
  356.             // closed by the user using the close() method
  357.             if (closeInvoked) {
  358.                 // clear data queue in case additional data was received after stream was closed
  359.                 this.dataQueue.clear();
  360.                 throw new IOException("Stream is closed");
  361.             }
  362.         }

  363.         @Override
  364.         public boolean markSupported() {
  365.             return false;
  366.         }

  367.         @Override
  368.         public void close() throws IOException {
  369.             if (closeInvoked) {
  370.                 return;
  371.             }

  372.             this.closeInvoked = true;

  373.             InBandBytestreamSession.this.closeByLocal(true);
  374.         }

  375.         /**
  376.          * This method sets the close flag and removes the data stanza listener.
  377.          */
  378.         private void closeInternal() {
  379.             if (isClosed) {
  380.                 return;
  381.             }
  382.             isClosed = true;
  383.         }

  384.         /**
  385.          * Invoked if the session is closed.
  386.          */
  387.         private void cleanup() {
  388.             connection.removeSyncStanzaListener(this.dataPacketListener);
  389.         }

  390.     }

  391.     /**
  392.      * IQIBBInputStream class implements IBBInputStream to be used with IQ stanzas encapsulating the
  393.      * data packets.
  394.      */
  395.     private class IQIBBInputStream extends IBBInputStream {

  396.         @Override
  397.         protected StanzaListener getDataPacketListener() {
  398.             return new StanzaListener() {

  399.                 private UInt16 expectedSequence = UInt16.MIN_VALUE;;

  400.                 @Override
  401.                 public void processStanza(Stanza packet) throws NotConnectedException, InterruptedException {
  402.                     final Data dataIq = (Data) packet;
  403.                     // get data packet extension
  404.                     DataPacketExtension data = dataIq.getDataPacketExtension();

  405.                     final UInt16 seq = data.getSeq();
  406.                     /*
  407.                      * check if sequence was not used already (see XEP-0047 Section 2.2)
  408.                      */
  409.                     if (!expectedSequence.equals(seq)) {
  410.                         String descriptiveEnTest = UNEXPECTED_IBB_SEQUENCE + " " + seq + " received, expected "
  411.                                         + expectedSequence;
  412.                         StanzaError stanzaError = StanzaError.getBuilder()
  413.                                         .setCondition(StanzaError.Condition.unexpected_request)
  414.                                         .setDescriptiveEnText(descriptiveEnTest)
  415.                                         .build();
  416.                         IQ unexpectedRequest = IQ.createErrorResponse(dataIq, stanzaError);
  417.                         connection.sendStanza(unexpectedRequest);

  418.                         try {
  419.                             // TODO: It would be great if close would take a "close error reason" argument. Also there
  420.                             // is the question if this is really a reason to close the stream. We could have some more
  421.                             // tolerance regarding out-of-sequence stanzas arriving: Even though XMPP has the in-order
  422.                             // guarantee, I could imagine that there are cases where stanzas are, for example,
  423.                             // duplicated because of stream resumption.
  424.                             close();
  425.                         } catch (IOException e) {
  426.                             LOGGER.log(Level.FINER, "Could not close session, because of IOException. Close reason: "
  427.                                             + descriptiveEnTest);
  428.                         }

  429.                         return;
  430.                     }

  431.                     // check if encoded data is valid (see XEP-0047 Section 2.2)
  432.                     if (data.getDecodedData() == null) {
  433.                         // data is invalid; respond with bad-request error
  434.                         IQ badRequest = IQ.createErrorResponse((IQ) packet,
  435.                                         StanzaError.Condition.bad_request);
  436.                         connection.sendStanza(badRequest);
  437.                         return;
  438.                     }

  439.                     expectedSequence = seq.incrementedByOne();

  440.                     // data is valid; add to data queue
  441.                     dataQueue.offer(data);

  442.                     // confirm IQ
  443.                     IQ confirmData = IQ.createResultIQ((IQ) packet);
  444.                     connection.sendStanza(confirmData);
  445.                 }

  446.             };
  447.         }

  448.         @Override
  449.         protected StanzaFilter getDataPacketFilter() {
  450.             /*
  451.              * filter all IQ stanzas having type 'SET' (represented by Data class), containing a
  452.              * data stanza extension, matching session ID and recipient
  453.              */
  454.             return new AndFilter(new StanzaTypeFilter(Data.class), new IBBDataPacketFilter());
  455.         }

  456.     }

  457.     /**
  458.      * MessageIBBInputStream class implements IBBInputStream to be used with message stanzas
  459.      * encapsulating the data packets.
  460.      */
  461.     private class MessageIBBInputStream extends IBBInputStream {

  462.         @Override
  463.         protected StanzaListener getDataPacketListener() {
  464.             return new StanzaListener() {

  465.                 @Override
  466.                 public void processStanza(Stanza packet) {
  467.                     // get data packet extension
  468.                     DataPacketExtension data = packet.getExtension(
  469.                                     DataPacketExtension.class);

  470.                     // check if encoded data is valid
  471.                     if (data.getDecodedData() == null) {
  472.                         /*
  473.                          * TODO once a majority of XMPP server implementation support XEP-0079
  474.                          * Advanced Message Processing the invalid message could be answered with an
  475.                          * appropriate error. For now we just ignore the packet. Subsequent packets
  476.                          * with an increased sequence will cause the input stream to close the
  477.                          * stream/session.
  478.                          */
  479.                         return;
  480.                     }

  481.                     // data is valid; add to data queue
  482.                     dataQueue.offer(data);

  483.                     // TODO confirm packet once XMPP servers support XEP-0079
  484.                 }

  485.             };
  486.         }

  487.         @Override
  488.         protected StanzaFilter getDataPacketFilter() {
  489.             /*
  490.              * filter all message stanzas containing a data stanza extension, matching session ID
  491.              * and recipient
  492.              */
  493.             return new AndFilter(new StanzaTypeFilter(Message.class), new IBBDataPacketFilter());
  494.         }

  495.     }

  496.     /**
  497.      * IBBDataPacketFilter class filters all packets from the remote peer of this session,
  498.      * containing an In-Band Bytestream data stanza extension whose session ID matches this sessions
  499.      * ID.
  500.      */
  501.     private class IBBDataPacketFilter implements StanzaFilter {

  502.         @Override
  503.         public boolean accept(Stanza packet) {
  504.             // sender equals remote peer
  505.             if (!packet.getFrom().equals(remoteJID)) {
  506.                 return false;
  507.             }

  508.             DataPacketExtension data;
  509.             if (packet instanceof Data) {
  510.                 data = ((Data) packet).getDataPacketExtension();
  511.             } else {
  512.                 // stanza contains data packet extension
  513.                 data = packet.getExtension(
  514.                         DataPacketExtension.class);
  515.                 if (data == null) {
  516.                     return false;
  517.                 }
  518.             }

  519.             // session ID equals this session ID
  520.             if (!data.getSessionID().equals(byteStreamRequest.getSessionID())) {
  521.                 return false;
  522.             }

  523.             return true;
  524.         }

  525.     }

  526.     /**
  527.      * IBBOutputStream class is the base implementation of an In-Band Bytestream output stream.
  528.      * Subclasses of this output stream must provide a method to send data over XMPP stream.
  529.      */
  530.     private abstract class IBBOutputStream extends OutputStream {

  531.         /* buffer with the size of this sessions block size */
  532.         protected final byte[] buffer;

  533.         /* pointer to next byte to write to buffer */
  534.         protected int bufferPointer = 0;

  535.         /* data packet sequence (range from 0 to 65535) */
  536.         protected UInt16 seq = UInt16.from(0);

  537.         /* flag to indicate if output stream is closed */
  538.         protected boolean isClosed = false;

  539.         /**
  540.          * Constructor.
  541.          */
  542.         private IBBOutputStream() {
  543.             this.buffer = new byte[byteStreamRequest.getBlockSize()];
  544.         }

  545.         /**
  546.          * Writes the given data stanza to the XMPP stream.
  547.          *
  548.          * @param data the data packet
  549.          * @throws IOException if an I/O error occurred while sending or if the stream is closed
  550.          * @throws NotConnectedException if the XMPP connection is not connected.
  551.          * @throws InterruptedException if the calling thread was interrupted.
  552.          */
  553.         protected abstract void writeToXML(DataPacketExtension data) throws IOException, NotConnectedException, InterruptedException;

  554.         @Override
  555.         public synchronized void write(int b) throws IOException {
  556.             if (this.isClosed) {
  557.                 throw new IOException("Stream is closed");
  558.             }

  559.             // if buffer is full flush buffer
  560.             if (bufferPointer >= buffer.length) {
  561.                 flushBuffer();
  562.             }

  563.             buffer[bufferPointer++] = (byte) b;
  564.         }

  565.         @Override
  566.         public synchronized void write(byte[] b, int off, int len) throws IOException {
  567.             if (b == null) {
  568.                 throw new NullPointerException();
  569.             }
  570.             else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
  571.                             || ((off + len) < 0)) {
  572.                 throw new IndexOutOfBoundsException();
  573.             }
  574.             else if (len == 0) {
  575.                 return;
  576.             }

  577.             if (this.isClosed) {
  578.                 throw new IOException("Stream is closed");
  579.             }

  580.             // is data to send greater than buffer size
  581.             if (len >= buffer.length) {

  582.                 // "byte" off the first chunk to write out
  583.                 writeOut(b, off, buffer.length);

  584.                 // recursively call this method with the lesser amount
  585.                 write(b, off + buffer.length, len - buffer.length);
  586.             }
  587.             else {
  588.                 writeOut(b, off, len);
  589.             }
  590.         }

  591.         @Override
  592.         public synchronized void write(byte[] b) throws IOException {
  593.             write(b, 0, b.length);
  594.         }

  595.         /**
  596.          * Fills the buffer with the given data and sends it over the XMPP stream if the buffers
  597.          * capacity has been reached. This method is only called from this class so it is assured
  598.          * that the amount of data to send is <= buffer capacity
  599.          *
  600.          * @param b the data
  601.          * @param off the data
  602.          * @param len the number of bytes to write
  603.          * @throws IOException if an I/O error occurred while sending or if the stream is closed
  604.          */
  605.         private synchronized void writeOut(byte[] b, int off, int len) throws IOException {
  606.             if (this.isClosed) {
  607.                 throw new IOException("Stream is closed");
  608.             }

  609.             // set to 0 in case the next 'if' block is not executed
  610.             int available = 0;

  611.             // is data to send greater that buffer space left
  612.             if (len > buffer.length - bufferPointer) {
  613.                 // fill buffer to capacity and send it
  614.                 available = buffer.length - bufferPointer;
  615.                 System.arraycopy(b, off, buffer, bufferPointer, available);
  616.                 bufferPointer += available;
  617.                 flushBuffer();
  618.             }

  619.             // copy the data left to buffer
  620.             System.arraycopy(b, off + available, buffer, bufferPointer, len - available);
  621.             bufferPointer += len - available;
  622.         }

  623.         @Override
  624.         public synchronized void flush() throws IOException {
  625.             if (this.isClosed) {
  626.                 throw new IOException("Stream is closed");
  627.             }
  628.             flushBuffer();
  629.         }

  630.         private synchronized void flushBuffer() throws IOException {

  631.             // do nothing if no data to send available
  632.             if (bufferPointer == 0) {
  633.                 return;
  634.             }

  635.             // create data packet
  636.             String enc = Base64.encodeToString(buffer, 0, bufferPointer);
  637.             DataPacketExtension data = new DataPacketExtension(byteStreamRequest.getSessionID(),
  638.                             this.seq, enc);

  639.             // write to XMPP stream
  640.             try {
  641.                 writeToXML(data);
  642.             }
  643.             catch (InterruptedException | NotConnectedException e) {
  644.                 IOException ioException = new IOException();
  645.                 ioException.initCause(e);
  646.                 throw ioException;
  647.             }

  648.             // reset buffer pointer
  649.             bufferPointer = 0;

  650.             // increment sequence, considering sequence overflow
  651.             seq = seq.incrementedByOne();

  652.         }

  653.         @Override
  654.         public void close() throws IOException {
  655.             if (isClosed) {
  656.                 return;
  657.             }
  658.             InBandBytestreamSession.this.closeByLocal(false);
  659.         }

  660.         /**
  661.          * Sets the close flag and optionally flushes the stream.
  662.          *
  663.          * @param flush if <code>true</code> flushes the stream
  664.          */
  665.         protected void closeInternal(boolean flush) {
  666.             if (this.isClosed) {
  667.                 return;
  668.             }
  669.             this.isClosed = true;

  670.             try {
  671.                 if (flush) {
  672.                     flushBuffer();
  673.                 }
  674.             }
  675.             catch (IOException e) {
  676.                 /*
  677.                  * ignore, because writeToXML() will not throw an exception if stream is already
  678.                  * closed
  679.                  */
  680.             }
  681.         }

  682.     }

  683.     /**
  684.      * IQIBBOutputStream class implements IBBOutputStream to be used with IQ stanzas encapsulating
  685.      * the data packets.
  686.      */
  687.     private class IQIBBOutputStream extends IBBOutputStream {

  688.         @Override
  689.         protected synchronized void writeToXML(DataPacketExtension data) throws IOException {
  690.             // create IQ stanza containing data packet
  691.             IQ iq = new Data(data);
  692.             iq.setTo(remoteJID);

  693.             try {
  694.                 connection.sendIqRequestAndWaitForResponse(iq);
  695.             }
  696.             catch (Exception e) {
  697.                 // close session unless it is already closed
  698.                 if (!this.isClosed) {
  699.                     InBandBytestreamSession.this.close();
  700.                     // Sadly we are unable to use the IOException(Throwable) constructor because this
  701.                     // constructor is only supported from Android API 9 on.
  702.                     IOException ioException = new IOException();
  703.                     ioException.initCause(e);
  704.                     throw ioException;
  705.                 }
  706.             }

  707.         }

  708.     }

  709.     /**
  710.      * MessageIBBOutputStream class implements IBBOutputStream to be used with message stanzas
  711.      * encapsulating the data packets.
  712.      */
  713.     private class MessageIBBOutputStream extends IBBOutputStream {

  714.         @Override
  715.         protected synchronized void writeToXML(DataPacketExtension data) throws NotConnectedException, InterruptedException {
  716.             // create message stanza containing data packet
  717.             Message message = StanzaBuilder.buildMessage().to(remoteJID)
  718.                     .addExtension(data)
  719.                     .build();

  720.             connection.sendStanza(message);

  721.         }

  722.     }

  723.     /**
  724.      * Process IQ stanza.
  725.      * @param data TODO javadoc me please
  726.      * @throws NotConnectedException if the XMPP connection is not connected.
  727.      * @throws InterruptedException if the calling thread was interrupted.
  728.      * @throws NotLoggedInException if the XMPP connection is not authenticated.
  729.      */
  730.     public void processIQPacket(Data data) throws NotConnectedException, InterruptedException, NotLoggedInException {
  731.         inputStream.dataPacketListener.processStanza(data);
  732.     }

  733. }