InBandBytestreamSession.java

/**
 *
 * Copyright the original author or authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.jivesoftware.smackx.bytestreams.ibb;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.StanzaListener;
import org.jivesoftware.smack.filter.AndFilter;
import org.jivesoftware.smack.filter.StanzaFilter;
import org.jivesoftware.smack.filter.StanzaTypeFilter;
import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smack.packet.Stanza;
import org.jivesoftware.smack.packet.XMPPError;
import org.jivesoftware.smack.util.stringencoder.Base64;
import org.jivesoftware.smackx.bytestreams.BytestreamSession;
import org.jivesoftware.smackx.bytestreams.ibb.packet.Close;
import org.jivesoftware.smackx.bytestreams.ibb.packet.Data;
import org.jivesoftware.smackx.bytestreams.ibb.packet.DataPacketExtension;
import org.jivesoftware.smackx.bytestreams.ibb.packet.Open;
import org.jxmpp.jid.Jid;

/**
 * InBandBytestreamSession class represents an In-Band Bytestream session.
 * <p>
 * In-band bytestreams are bidirectional and this session encapsulates the streams for both
 * directions.
 * <p>
 * Note that closing the In-Band Bytestream session will close both streams. If both streams are
 * closed individually the session will be closed automatically once the second stream is closed.
 * Use the {@link #setCloseBothStreamsEnabled(boolean)} method if both streams should be closed
 * automatically if one of them is closed.
 * 
 * @author Henning Staib
 */
public class InBandBytestreamSession implements BytestreamSession {

    /* XMPP connection */
    private final XMPPConnection connection;

    /* the In-Band Bytestream open request for this session */
    private final Open byteStreamRequest;

    /*
     * the input stream for this session (either IQIBBInputStream or MessageIBBInputStream)
     */
    private IBBInputStream inputStream;

    /*
     * the output stream for this session (either IQIBBOutputStream or MessageIBBOutputStream)
     */
    private IBBOutputStream outputStream;

    /* JID of the remote peer */
    private Jid remoteJID;

    /* flag to close both streams if one of them is closed */
    private boolean closeBothStreamsEnabled = false;

    /* flag to indicate if session is closed */
    private boolean isClosed = false;

    /**
     * Constructor.
     * 
     * @param connection the XMPP connection
     * @param byteStreamRequest the In-Band Bytestream open request for this session
     * @param remoteJID JID of the remote peer
     */
    protected InBandBytestreamSession(XMPPConnection connection, Open byteStreamRequest,
                    Jid remoteJID) {
        this.connection = connection;
        this.byteStreamRequest = byteStreamRequest;
        this.remoteJID = remoteJID;

        // initialize streams dependent to the uses stanza type
        switch (byteStreamRequest.getStanza()) {
        case IQ:
            this.inputStream = new IQIBBInputStream();
            this.outputStream = new IQIBBOutputStream();
            break;
        case MESSAGE:
            this.inputStream = new MessageIBBInputStream();
            this.outputStream = new MessageIBBOutputStream();
            break;
        }

    }

    public InputStream getInputStream() {
        return this.inputStream;
    }

    public OutputStream getOutputStream() {
        return this.outputStream;
    }

    public int getReadTimeout() {
        return this.inputStream.readTimeout;
    }

    public void setReadTimeout(int timeout) {
        if (timeout < 0) {
            throw new IllegalArgumentException("Timeout must be >= 0");
        }
        this.inputStream.readTimeout = timeout;
    }

    /**
     * Returns whether both streams should be closed automatically if one of the streams is closed.
     * Default is <code>false</code>.
     * 
     * @return <code>true</code> if both streams will be closed if one of the streams is closed,
     *         <code>false</code> if both streams can be closed independently.
     */
    public boolean isCloseBothStreamsEnabled() {
        return closeBothStreamsEnabled;
    }

    /**
     * Sets whether both streams should be closed automatically if one of the streams is closed.
     * Default is <code>false</code>.
     * 
     * @param closeBothStreamsEnabled <code>true</code> if both streams should be closed if one of
     *        the streams is closed, <code>false</code> if both streams should be closed
     *        independently
     */
    public void setCloseBothStreamsEnabled(boolean closeBothStreamsEnabled) {
        this.closeBothStreamsEnabled = closeBothStreamsEnabled;
    }

    public void close() throws IOException {
        closeByLocal(true); // close input stream
        closeByLocal(false); // close output stream
    }

    /**
     * This method is invoked if a request to close the In-Band Bytestream has been received.
     * 
     * @param closeRequest the close request from the remote peer
     * @throws NotConnectedException 
     * @throws InterruptedException 
     */
    protected void closeByPeer(Close closeRequest) throws NotConnectedException, InterruptedException {

        /*
         * close streams without flushing them, because stream is already considered closed on the
         * remote peers side
         */
        this.inputStream.closeInternal();
        this.inputStream.cleanup();
        this.outputStream.closeInternal(false);

        // acknowledge close request
        IQ confirmClose = IQ.createResultIQ(closeRequest);
        this.connection.sendStanza(confirmClose);

    }

    /**
     * This method is invoked if one of the streams has been closed locally, if an error occurred
     * locally or if the whole session should be closed.
     * 
     * @throws IOException if an error occurs while sending the close request
     */
    protected synchronized void closeByLocal(boolean in) throws IOException {
        if (this.isClosed) {
            return;
        }

        if (this.closeBothStreamsEnabled) {
            this.inputStream.closeInternal();
            this.outputStream.closeInternal(true);
        }
        else {
            if (in) {
                this.inputStream.closeInternal();
            }
            else {
                // close stream but try to send any data left
                this.outputStream.closeInternal(true);
            }
        }

        if (this.inputStream.isClosed && this.outputStream.isClosed) {
            this.isClosed = true;

            // send close request
            Close close = new Close(this.byteStreamRequest.getSessionID());
            close.setTo(this.remoteJID);
            try {
                connection.createPacketCollectorAndSend(close).nextResultOrThrow();
            }
            catch (Exception e) {
                // Sadly we are unable to use the IOException(Throwable) constructor because this
                // constructor is only supported from Android API 9 on.
                IOException ioException = new IOException();
                ioException.initCause(e);
                throw ioException;
            }

            this.inputStream.cleanup();

            // remove session from manager
            InBandBytestreamManager.getByteStreamManager(this.connection).getSessions().remove(this);
        }

    }

    /**
     * IBBInputStream class is the base implementation of an In-Band Bytestream input stream.
     * Subclasses of this input stream must provide a packet listener along with a packet filter to
     * collect the In-Band Bytestream data packets.
     */
    private abstract class IBBInputStream extends InputStream {

        /* the data packet listener to fill the data queue */
        private final StanzaListener dataPacketListener;

        /* queue containing received In-Band Bytestream data packets */
        protected final BlockingQueue<DataPacketExtension> dataQueue = new LinkedBlockingQueue<DataPacketExtension>();

        /* buffer containing the data from one data packet */
        private byte[] buffer;

        /* pointer to the next byte to read from buffer */
        private int bufferPointer = -1;

        /* data packet sequence (range from 0 to 65535) */
        private long seq = -1;

        /* flag to indicate if input stream is closed */
        private boolean isClosed = false;

        /* flag to indicate if close method was invoked */
        private boolean closeInvoked = false;

        /* timeout for read operations */
        private int readTimeout = 0;

        /**
         * Constructor.
         */
        public IBBInputStream() {
            // add data packet listener to connection
            this.dataPacketListener = getDataPacketListener();
            connection.addSyncStanzaListener(this.dataPacketListener, getDataPacketFilter());
        }

        /**
         * Returns the packet listener that processes In-Band Bytestream data packets.
         * 
         * @return the data packet listener
         */
        protected abstract StanzaListener getDataPacketListener();

        /**
         * Returns the packet filter that accepts In-Band Bytestream data packets.
         * 
         * @return the data packet filter
         */
        protected abstract StanzaFilter getDataPacketFilter();

        public synchronized int read() throws IOException {
            checkClosed();

            // if nothing read yet or whole buffer has been read fill buffer
            if (bufferPointer == -1 || bufferPointer >= buffer.length) {
                // if no data available and stream was closed return -1
                if (!loadBuffer()) {
                    return -1;
                }
            }

            // return byte and increment buffer pointer
            return ((int) buffer[bufferPointer++]) & 0xff;
        }

        public synchronized int read(byte[] b, int off, int len) throws IOException {
            if (b == null) {
                throw new NullPointerException();
            }
            else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
                            || ((off + len) < 0)) {
                throw new IndexOutOfBoundsException();
            }
            else if (len == 0) {
                return 0;
            }

            checkClosed();

            // if nothing read yet or whole buffer has been read fill buffer
            if (bufferPointer == -1 || bufferPointer >= buffer.length) {
                // if no data available and stream was closed return -1
                if (!loadBuffer()) {
                    return -1;
                }
            }

            // if more bytes wanted than available return all available
            int bytesAvailable = buffer.length - bufferPointer;
            if (len > bytesAvailable) {
                len = bytesAvailable;
            }

            System.arraycopy(buffer, bufferPointer, b, off, len);
            bufferPointer += len;
            return len;
        }

        public synchronized int read(byte[] b) throws IOException {
            return read(b, 0, b.length);
        }

        /**
         * This method blocks until a data packet is received, the stream is closed or the current
         * thread is interrupted.
         * 
         * @return <code>true</code> if data was received, otherwise <code>false</code>
         * @throws IOException if data packets are out of sequence
         */
        private synchronized boolean loadBuffer() throws IOException {

            // wait until data is available or stream is closed
            DataPacketExtension data = null;
            try {
                if (this.readTimeout == 0) {
                    while (data == null) {
                        if (isClosed && this.dataQueue.isEmpty()) {
                            return false;
                        }
                        data = this.dataQueue.poll(1000, TimeUnit.MILLISECONDS);
                    }
                }
                else {
                    data = this.dataQueue.poll(this.readTimeout, TimeUnit.MILLISECONDS);
                    if (data == null) {
                        throw new SocketTimeoutException();
                    }
                }
            }
            catch (InterruptedException e) {
                // Restore the interrupted status
                Thread.currentThread().interrupt();
                return false;
            }

            // handle sequence overflow
            if (this.seq == 65535) {
                this.seq = -1;
            }

            // check if data packets sequence is successor of last seen sequence
            long seq = data.getSeq();
            if (seq - 1 != this.seq) {
                // packets out of order; close stream/session
                InBandBytestreamSession.this.close();
                throw new IOException("Packets out of sequence");
            }
            else {
                this.seq = seq;
            }

            // set buffer to decoded data
            buffer = data.getDecodedData();
            bufferPointer = 0;
            return true;
        }

        /**
         * Checks if this stream is closed and throws an IOException if necessary
         * 
         * @throws IOException if stream is closed and no data should be read anymore
         */
        private void checkClosed() throws IOException {
            // Throw an exception if, and only if, this stream has been already
            // closed by the user using the close() method
            if (closeInvoked) {
                // clear data queue in case additional data was received after stream was closed
                this.dataQueue.clear();
                throw new IOException("Stream is closed");
            }
        }

        public boolean markSupported() {
            return false;
        }

        public void close() throws IOException {
            if (closeInvoked) {
                return;
            }

            this.closeInvoked = true;

            InBandBytestreamSession.this.closeByLocal(true);
        }

        /**
         * This method sets the close flag and removes the data packet listener.
         */
        private void closeInternal() {
            if (isClosed) {
                return;
            }
            isClosed = true;
        }

        /**
         * Invoked if the session is closed.
         */
        private void cleanup() {
            connection.removeSyncStanzaListener(this.dataPacketListener);
        }

    }

    /**
     * IQIBBInputStream class implements IBBInputStream to be used with IQ stanzas encapsulating the
     * data packets.
     */
    private class IQIBBInputStream extends IBBInputStream {

        protected StanzaListener getDataPacketListener() {
            return new StanzaListener() {

                private long lastSequence = -1;

                public void processPacket(Stanza packet) throws NotConnectedException, InterruptedException {
                    // get data packet extension
                    DataPacketExtension data = ((Data) packet).getDataPacketExtension();

                    /*
                     * check if sequence was not used already (see XEP-0047 Section 2.2)
                     */
                    if (data.getSeq() <= this.lastSequence) {
                        IQ unexpectedRequest = IQ.createErrorResponse((IQ) packet, new XMPPError(
                                        XMPPError.Condition.unexpected_request));
                        connection.sendStanza(unexpectedRequest);
                        return;

                    }

                    // check if encoded data is valid (see XEP-0047 Section 2.2)
                    if (data.getDecodedData() == null) {
                        // data is invalid; respond with bad-request error
                        IQ badRequest = IQ.createErrorResponse((IQ) packet, new XMPPError(
                                        XMPPError.Condition.bad_request));
                        connection.sendStanza(badRequest);
                        return;
                    }

                    // data is valid; add to data queue
                    dataQueue.offer(data);

                    // confirm IQ
                    IQ confirmData = IQ.createResultIQ((IQ) packet);
                    connection.sendStanza(confirmData);

                    // set last seen sequence
                    this.lastSequence = data.getSeq();
                    if (this.lastSequence == 65535) {
                        this.lastSequence = -1;
                    }

                }

            };
        }

        protected StanzaFilter getDataPacketFilter() {
            /*
             * filter all IQ stanzas having type 'SET' (represented by Data class), containing a
             * data packet extension, matching session ID and recipient
             */
            return new AndFilter(new StanzaTypeFilter(Data.class), new IBBDataPacketFilter());
        }

    }

    /**
     * MessageIBBInputStream class implements IBBInputStream to be used with message stanzas
     * encapsulating the data packets.
     */
    private class MessageIBBInputStream extends IBBInputStream {

        protected StanzaListener getDataPacketListener() {
            return new StanzaListener() {

                public void processPacket(Stanza packet) {
                    // get data packet extension
                    DataPacketExtension data = (DataPacketExtension) packet.getExtension(
                                    DataPacketExtension.ELEMENT,
                                    DataPacketExtension.NAMESPACE);

                    // check if encoded data is valid
                    if (data.getDecodedData() == null) {
                        /*
                         * TODO once a majority of XMPP server implementation support XEP-0079
                         * Advanced Message Processing the invalid message could be answered with an
                         * appropriate error. For now we just ignore the packet. Subsequent packets
                         * with an increased sequence will cause the input stream to close the
                         * stream/session.
                         */
                        return;
                    }

                    // data is valid; add to data queue
                    dataQueue.offer(data);

                    // TODO confirm packet once XMPP servers support XEP-0079
                }

            };
        }

        @Override
        protected StanzaFilter getDataPacketFilter() {
            /*
             * filter all message stanzas containing a data packet extension, matching session ID
             * and recipient
             */
            return new AndFilter(new StanzaTypeFilter(Message.class), new IBBDataPacketFilter());
        }

    }

    /**
     * IBBDataPacketFilter class filters all packets from the remote peer of this session,
     * containing an In-Band Bytestream data packet extension whose session ID matches this sessions
     * ID.
     */
    private class IBBDataPacketFilter implements StanzaFilter {

        public boolean accept(Stanza packet) {
            // sender equals remote peer
            if (!packet.getFrom().equals(remoteJID)) {
                return false;
            }

            DataPacketExtension data;
            if (packet instanceof Data) {
                data = ((Data) packet).getDataPacketExtension();
            } else {
                // stanza contains data packet extension
                data = packet.getExtension(
                        DataPacketExtension.ELEMENT,
                        DataPacketExtension.NAMESPACE);
                if (data == null) {
                    return false;
                }
            }

            // session ID equals this session ID
            if (!data.getSessionID().equals(byteStreamRequest.getSessionID())) {
                return false;
            }

            return true;
        }

    }

    /**
     * IBBOutputStream class is the base implementation of an In-Band Bytestream output stream.
     * Subclasses of this output stream must provide a method to send data over XMPP stream.
     */
    private abstract class IBBOutputStream extends OutputStream {

        /* buffer with the size of this sessions block size */
        protected final byte[] buffer;

        /* pointer to next byte to write to buffer */
        protected int bufferPointer = 0;

        /* data packet sequence (range from 0 to 65535) */
        protected long seq = 0;

        /* flag to indicate if output stream is closed */
        protected boolean isClosed = false;

        /**
         * Constructor.
         */
        public IBBOutputStream() {
            this.buffer = new byte[byteStreamRequest.getBlockSize()];
        }

        /**
         * Writes the given data packet to the XMPP stream.
         * 
         * @param data the data packet
         * @throws IOException if an I/O error occurred while sending or if the stream is closed
         * @throws NotConnectedException 
         * @throws InterruptedException 
         */
        protected abstract void writeToXML(DataPacketExtension data) throws IOException, NotConnectedException, InterruptedException;

        public synchronized void write(int b) throws IOException {
            if (this.isClosed) {
                throw new IOException("Stream is closed");
            }

            // if buffer is full flush buffer
            if (bufferPointer >= buffer.length) {
                flushBuffer();
            }

            buffer[bufferPointer++] = (byte) b;
        }

        public synchronized void write(byte[] b, int off, int len) throws IOException {
            if (b == null) {
                throw new NullPointerException();
            }
            else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
                            || ((off + len) < 0)) {
                throw new IndexOutOfBoundsException();
            }
            else if (len == 0) {
                return;
            }

            if (this.isClosed) {
                throw new IOException("Stream is closed");
            }

            // is data to send greater than buffer size
            if (len >= buffer.length) {

                // "byte" off the first chunk to write out
                writeOut(b, off, buffer.length);

                // recursively call this method with the lesser amount
                write(b, off + buffer.length, len - buffer.length);
            }
            else {
                writeOut(b, off, len);
            }
        }

        public synchronized void write(byte[] b) throws IOException {
            write(b, 0, b.length);
        }

        /**
         * Fills the buffer with the given data and sends it over the XMPP stream if the buffers
         * capacity has been reached. This method is only called from this class so it is assured
         * that the amount of data to send is <= buffer capacity
         * 
         * @param b the data
         * @param off the data
         * @param len the number of bytes to write
         * @throws IOException if an I/O error occurred while sending or if the stream is closed
         */
        private synchronized void writeOut(byte[] b, int off, int len) throws IOException {
            if (this.isClosed) {
                throw new IOException("Stream is closed");
            }

            // set to 0 in case the next 'if' block is not executed
            int available = 0;

            // is data to send greater that buffer space left
            if (len > buffer.length - bufferPointer) {
                // fill buffer to capacity and send it
                available = buffer.length - bufferPointer;
                System.arraycopy(b, off, buffer, bufferPointer, available);
                bufferPointer += available;
                flushBuffer();
            }

            // copy the data left to buffer
            System.arraycopy(b, off + available, buffer, bufferPointer, len - available);
            bufferPointer += len - available;
        }

        public synchronized void flush() throws IOException {
            if (this.isClosed) {
                throw new IOException("Stream is closed");
            }
            flushBuffer();
        }

        private synchronized void flushBuffer() throws IOException {

            // do nothing if no data to send available
            if (bufferPointer == 0) {
                return;
            }

            // create data packet
            String enc = Base64.encodeToString(buffer, 0, bufferPointer);
            DataPacketExtension data = new DataPacketExtension(byteStreamRequest.getSessionID(),
                            this.seq, enc);

            // write to XMPP stream
            try {
                writeToXML(data);
            }
            catch (InterruptedException | NotConnectedException e) {
                IOException ioException = new IOException();
                ioException.initCause(e);
                throw ioException;
            }

            // reset buffer pointer
            bufferPointer = 0;

            // increment sequence, considering sequence overflow
            this.seq = (this.seq + 1 == 65535 ? 0 : this.seq + 1);

        }

        public void close() throws IOException {
            if (isClosed) {
                return;
            }
            InBandBytestreamSession.this.closeByLocal(false);
        }

        /**
         * Sets the close flag and optionally flushes the stream.
         * 
         * @param flush if <code>true</code> flushes the stream
         */
        protected void closeInternal(boolean flush) {
            if (this.isClosed) {
                return;
            }
            this.isClosed = true;

            try {
                if (flush) {
                    flushBuffer();
                }
            }
            catch (IOException e) {
                /*
                 * ignore, because writeToXML() will not throw an exception if stream is already
                 * closed
                 */
            }
        }

    }

    /**
     * IQIBBOutputStream class implements IBBOutputStream to be used with IQ stanzas encapsulating
     * the data packets.
     */
    private class IQIBBOutputStream extends IBBOutputStream {

        @Override
        protected synchronized void writeToXML(DataPacketExtension data) throws IOException {
            // create IQ stanza containing data packet
            IQ iq = new Data(data);
            iq.setTo(remoteJID);

            try {
                connection.createPacketCollectorAndSend(iq).nextResultOrThrow();
            }
            catch (Exception e) {
                // close session unless it is already closed
                if (!this.isClosed) {
                    InBandBytestreamSession.this.close();
                    // Sadly we are unable to use the IOException(Throwable) constructor because this
                    // constructor is only supported from Android API 9 on.
                    IOException ioException = new IOException();
                    ioException.initCause(e);
                    throw ioException;
                }
            }

        }

    }

    /**
     * MessageIBBOutputStream class implements IBBOutputStream to be used with message stanzas
     * encapsulating the data packets.
     */
    private class MessageIBBOutputStream extends IBBOutputStream {

        @Override
        protected synchronized void writeToXML(DataPacketExtension data) throws NotConnectedException, InterruptedException {
            // create message stanza containing data packet
            Message message = new Message(remoteJID);
            message.addExtension(data);

            connection.sendStanza(message);

        }

    }

    /**
     * @param data
     * @throws NotConnectedException
     * @throws InterruptedException 
     */
    public void processIQPacket(Data data) throws NotConnectedException, InterruptedException {
        inputStream.dataPacketListener.processPacket(data);
    }

}