001/**
002 *
003 * Copyright the original author or authors
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smackx.bytestreams.ibb;
018
019import java.io.IOException;
020import java.io.InputStream;
021import java.io.OutputStream;
022import java.net.SocketTimeoutException;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.LinkedBlockingQueue;
025import java.util.concurrent.TimeUnit;
026
027import org.jivesoftware.smack.SmackException.NotConnectedException;
028import org.jivesoftware.smack.XMPPConnection;
029import org.jivesoftware.smack.PacketListener;
030import org.jivesoftware.smack.filter.AndFilter;
031import org.jivesoftware.smack.filter.PacketFilter;
032import org.jivesoftware.smack.filter.PacketTypeFilter;
033import org.jivesoftware.smack.packet.IQ;
034import org.jivesoftware.smack.packet.Message;
035import org.jivesoftware.smack.packet.Packet;
036import org.jivesoftware.smack.packet.PacketExtension;
037import org.jivesoftware.smack.packet.XMPPError;
038import org.jivesoftware.smack.util.StringUtils;
039import org.jivesoftware.smackx.bytestreams.BytestreamSession;
040import org.jivesoftware.smackx.bytestreams.ibb.packet.Close;
041import org.jivesoftware.smackx.bytestreams.ibb.packet.Data;
042import org.jivesoftware.smackx.bytestreams.ibb.packet.DataPacketExtension;
043import org.jivesoftware.smackx.bytestreams.ibb.packet.Open;
044
045/**
046 * InBandBytestreamSession class represents an In-Band Bytestream session.
047 * <p>
048 * In-band bytestreams are bidirectional and this session encapsulates the streams for both
049 * directions.
050 * <p>
051 * Note that closing the In-Band Bytestream session will close both streams. If both streams are
052 * closed individually the session will be closed automatically once the second stream is closed.
053 * Use the {@link #setCloseBothStreamsEnabled(boolean)} method if both streams should be closed
054 * automatically if one of them is closed.
055 * 
056 * @author Henning Staib
057 */
058public class InBandBytestreamSession implements BytestreamSession {
059
060    /* XMPP connection */
061    private final XMPPConnection connection;
062
063    /* the In-Band Bytestream open request for this session */
064    private final Open byteStreamRequest;
065
066    /*
067     * the input stream for this session (either IQIBBInputStream or MessageIBBInputStream)
068     */
069    private IBBInputStream inputStream;
070
071    /*
072     * the output stream for this session (either IQIBBOutputStream or MessageIBBOutputStream)
073     */
074    private IBBOutputStream outputStream;
075
076    /* JID of the remote peer */
077    private String remoteJID;
078
079    /* flag to close both streams if one of them is closed */
080    private boolean closeBothStreamsEnabled = false;
081
082    /* flag to indicate if session is closed */
083    private boolean isClosed = false;
084
085    /**
086     * Constructor.
087     * 
088     * @param connection the XMPP connection
089     * @param byteStreamRequest the In-Band Bytestream open request for this session
090     * @param remoteJID JID of the remote peer
091     */
092    protected InBandBytestreamSession(XMPPConnection connection, Open byteStreamRequest,
093                    String remoteJID) {
094        this.connection = connection;
095        this.byteStreamRequest = byteStreamRequest;
096        this.remoteJID = remoteJID;
097
098        // initialize streams dependent to the uses stanza type
099        switch (byteStreamRequest.getStanza()) {
100        case IQ:
101            this.inputStream = new IQIBBInputStream();
102            this.outputStream = new IQIBBOutputStream();
103            break;
104        case MESSAGE:
105            this.inputStream = new MessageIBBInputStream();
106            this.outputStream = new MessageIBBOutputStream();
107            break;
108        }
109
110    }
111
112    public InputStream getInputStream() {
113        return this.inputStream;
114    }
115
116    public OutputStream getOutputStream() {
117        return this.outputStream;
118    }
119
120    public int getReadTimeout() {
121        return this.inputStream.readTimeout;
122    }
123
124    public void setReadTimeout(int timeout) {
125        if (timeout < 0) {
126            throw new IllegalArgumentException("Timeout must be >= 0");
127        }
128        this.inputStream.readTimeout = timeout;
129    }
130
131    /**
132     * Returns whether both streams should be closed automatically if one of the streams is closed.
133     * Default is <code>false</code>.
134     * 
135     * @return <code>true</code> if both streams will be closed if one of the streams is closed,
136     *         <code>false</code> if both streams can be closed independently.
137     */
138    public boolean isCloseBothStreamsEnabled() {
139        return closeBothStreamsEnabled;
140    }
141
142    /**
143     * Sets whether both streams should be closed automatically if one of the streams is closed.
144     * Default is <code>false</code>.
145     * 
146     * @param closeBothStreamsEnabled <code>true</code> if both streams should be closed if one of
147     *        the streams is closed, <code>false</code> if both streams should be closed
148     *        independently
149     */
150    public void setCloseBothStreamsEnabled(boolean closeBothStreamsEnabled) {
151        this.closeBothStreamsEnabled = closeBothStreamsEnabled;
152    }
153
154    public void close() throws IOException {
155        closeByLocal(true); // close input stream
156        closeByLocal(false); // close output stream
157    }
158
159    /**
160     * This method is invoked if a request to close the In-Band Bytestream has been received.
161     * 
162     * @param closeRequest the close request from the remote peer
163     * @throws NotConnectedException 
164     */
165    protected void closeByPeer(Close closeRequest) throws NotConnectedException {
166
167        /*
168         * close streams without flushing them, because stream is already considered closed on the
169         * remote peers side
170         */
171        this.inputStream.closeInternal();
172        this.inputStream.cleanup();
173        this.outputStream.closeInternal(false);
174
175        // acknowledge close request
176        IQ confirmClose = IQ.createResultIQ(closeRequest);
177        this.connection.sendPacket(confirmClose);
178
179    }
180
181    /**
182     * This method is invoked if one of the streams has been closed locally, if an error occurred
183     * locally or if the whole session should be closed.
184     * 
185     * @throws IOException if an error occurs while sending the close request
186     */
187    protected synchronized void closeByLocal(boolean in) throws IOException {
188        if (this.isClosed) {
189            return;
190        }
191
192        if (this.closeBothStreamsEnabled) {
193            this.inputStream.closeInternal();
194            this.outputStream.closeInternal(true);
195        }
196        else {
197            if (in) {
198                this.inputStream.closeInternal();
199            }
200            else {
201                // close stream but try to send any data left
202                this.outputStream.closeInternal(true);
203            }
204        }
205
206        if (this.inputStream.isClosed && this.outputStream.isClosed) {
207            this.isClosed = true;
208
209            // send close request
210            Close close = new Close(this.byteStreamRequest.getSessionID());
211            close.setTo(this.remoteJID);
212            try {
213                connection.createPacketCollectorAndSend(close).nextResultOrThrow();
214            }
215            catch (Exception e) {
216                // Sadly we are unable to use the IOException(Throwable) constructor because this
217                // constructor is only supported from Android API 9 on.
218                IOException ioException = new IOException();
219                ioException.initCause(e);
220                throw ioException;
221            }
222
223            this.inputStream.cleanup();
224
225            // remove session from manager
226            InBandBytestreamManager.getByteStreamManager(this.connection).getSessions().remove(this);
227        }
228
229    }
230
231    /**
232     * IBBInputStream class is the base implementation of an In-Band Bytestream input stream.
233     * Subclasses of this input stream must provide a packet listener along with a packet filter to
234     * collect the In-Band Bytestream data packets.
235     */
236    private abstract class IBBInputStream extends InputStream {
237
238        /* the data packet listener to fill the data queue */
239        private final PacketListener dataPacketListener;
240
241        /* queue containing received In-Band Bytestream data packets */
242        protected final BlockingQueue<DataPacketExtension> dataQueue = new LinkedBlockingQueue<DataPacketExtension>();
243
244        /* buffer containing the data from one data packet */
245        private byte[] buffer;
246
247        /* pointer to the next byte to read from buffer */
248        private int bufferPointer = -1;
249
250        /* data packet sequence (range from 0 to 65535) */
251        private long seq = -1;
252
253        /* flag to indicate if input stream is closed */
254        private boolean isClosed = false;
255
256        /* flag to indicate if close method was invoked */
257        private boolean closeInvoked = false;
258
259        /* timeout for read operations */
260        private int readTimeout = 0;
261
262        /**
263         * Constructor.
264         */
265        public IBBInputStream() {
266            // add data packet listener to connection
267            this.dataPacketListener = getDataPacketListener();
268            connection.addPacketListener(this.dataPacketListener, getDataPacketFilter());
269        }
270
271        /**
272         * Returns the packet listener that processes In-Band Bytestream data packets.
273         * 
274         * @return the data packet listener
275         */
276        protected abstract PacketListener getDataPacketListener();
277
278        /**
279         * Returns the packet filter that accepts In-Band Bytestream data packets.
280         * 
281         * @return the data packet filter
282         */
283        protected abstract PacketFilter getDataPacketFilter();
284
285        public synchronized int read() throws IOException {
286            checkClosed();
287
288            // if nothing read yet or whole buffer has been read fill buffer
289            if (bufferPointer == -1 || bufferPointer >= buffer.length) {
290                // if no data available and stream was closed return -1
291                if (!loadBuffer()) {
292                    return -1;
293                }
294            }
295
296            // return byte and increment buffer pointer
297            return ((int) buffer[bufferPointer++]) & 0xff;
298        }
299
300        public synchronized int read(byte[] b, int off, int len) throws IOException {
301            if (b == null) {
302                throw new NullPointerException();
303            }
304            else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
305                            || ((off + len) < 0)) {
306                throw new IndexOutOfBoundsException();
307            }
308            else if (len == 0) {
309                return 0;
310            }
311
312            checkClosed();
313
314            // if nothing read yet or whole buffer has been read fill buffer
315            if (bufferPointer == -1 || bufferPointer >= buffer.length) {
316                // if no data available and stream was closed return -1
317                if (!loadBuffer()) {
318                    return -1;
319                }
320            }
321
322            // if more bytes wanted than available return all available
323            int bytesAvailable = buffer.length - bufferPointer;
324            if (len > bytesAvailable) {
325                len = bytesAvailable;
326            }
327
328            System.arraycopy(buffer, bufferPointer, b, off, len);
329            bufferPointer += len;
330            return len;
331        }
332
333        public synchronized int read(byte[] b) throws IOException {
334            return read(b, 0, b.length);
335        }
336
337        /**
338         * This method blocks until a data packet is received, the stream is closed or the current
339         * thread is interrupted.
340         * 
341         * @return <code>true</code> if data was received, otherwise <code>false</code>
342         * @throws IOException if data packets are out of sequence
343         */
344        private synchronized boolean loadBuffer() throws IOException {
345
346            // wait until data is available or stream is closed
347            DataPacketExtension data = null;
348            try {
349                if (this.readTimeout == 0) {
350                    while (data == null) {
351                        if (isClosed && this.dataQueue.isEmpty()) {
352                            return false;
353                        }
354                        data = this.dataQueue.poll(1000, TimeUnit.MILLISECONDS);
355                    }
356                }
357                else {
358                    data = this.dataQueue.poll(this.readTimeout, TimeUnit.MILLISECONDS);
359                    if (data == null) {
360                        throw new SocketTimeoutException();
361                    }
362                }
363            }
364            catch (InterruptedException e) {
365                // Restore the interrupted status
366                Thread.currentThread().interrupt();
367                return false;
368            }
369
370            // handle sequence overflow
371            if (this.seq == 65535) {
372                this.seq = -1;
373            }
374
375            // check if data packets sequence is successor of last seen sequence
376            long seq = data.getSeq();
377            if (seq - 1 != this.seq) {
378                // packets out of order; close stream/session
379                InBandBytestreamSession.this.close();
380                throw new IOException("Packets out of sequence");
381            }
382            else {
383                this.seq = seq;
384            }
385
386            // set buffer to decoded data
387            buffer = data.getDecodedData();
388            bufferPointer = 0;
389            return true;
390        }
391
392        /**
393         * Checks if this stream is closed and throws an IOException if necessary
394         * 
395         * @throws IOException if stream is closed and no data should be read anymore
396         */
397        private void checkClosed() throws IOException {
398            // Throw an exception if, and only if, this stream has been already
399            // closed by the user using the close() method
400            if (closeInvoked) {
401                // clear data queue in case additional data was received after stream was closed
402                this.dataQueue.clear();
403                throw new IOException("Stream is closed");
404            }
405        }
406
407        public boolean markSupported() {
408            return false;
409        }
410
411        public void close() throws IOException {
412            if (closeInvoked) {
413                return;
414            }
415
416            this.closeInvoked = true;
417
418            InBandBytestreamSession.this.closeByLocal(true);
419        }
420
421        /**
422         * This method sets the close flag and removes the data packet listener.
423         */
424        private void closeInternal() {
425            if (isClosed) {
426                return;
427            }
428            isClosed = true;
429        }
430
431        /**
432         * Invoked if the session is closed.
433         */
434        private void cleanup() {
435            connection.removePacketListener(this.dataPacketListener);
436        }
437
438    }
439
440    /**
441     * IQIBBInputStream class implements IBBInputStream to be used with IQ stanzas encapsulating the
442     * data packets.
443     */
444    private class IQIBBInputStream extends IBBInputStream {
445
446        protected PacketListener getDataPacketListener() {
447            return new PacketListener() {
448
449                private long lastSequence = -1;
450
451                public void processPacket(Packet packet) throws NotConnectedException {
452                    // get data packet extension
453                    DataPacketExtension data = (DataPacketExtension) packet.getExtension(
454                                    DataPacketExtension.ELEMENT_NAME,
455                                    InBandBytestreamManager.NAMESPACE);
456
457                    /*
458                     * check if sequence was not used already (see XEP-0047 Section 2.2)
459                     */
460                    if (data.getSeq() <= this.lastSequence) {
461                        IQ unexpectedRequest = IQ.createErrorResponse((IQ) packet, new XMPPError(
462                                        XMPPError.Condition.unexpected_request));
463                        connection.sendPacket(unexpectedRequest);
464                        return;
465
466                    }
467
468                    // check if encoded data is valid (see XEP-0047 Section 2.2)
469                    if (data.getDecodedData() == null) {
470                        // data is invalid; respond with bad-request error
471                        IQ badRequest = IQ.createErrorResponse((IQ) packet, new XMPPError(
472                                        XMPPError.Condition.bad_request));
473                        connection.sendPacket(badRequest);
474                        return;
475                    }
476
477                    // data is valid; add to data queue
478                    dataQueue.offer(data);
479
480                    // confirm IQ
481                    IQ confirmData = IQ.createResultIQ((IQ) packet);
482                    connection.sendPacket(confirmData);
483
484                    // set last seen sequence
485                    this.lastSequence = data.getSeq();
486                    if (this.lastSequence == 65535) {
487                        this.lastSequence = -1;
488                    }
489
490                }
491
492            };
493        }
494
495        protected PacketFilter getDataPacketFilter() {
496            /*
497             * filter all IQ stanzas having type 'SET' (represented by Data class), containing a
498             * data packet extension, matching session ID and recipient
499             */
500            return new AndFilter(new PacketTypeFilter(Data.class), new IBBDataPacketFilter());
501        }
502
503    }
504
505    /**
506     * MessageIBBInputStream class implements IBBInputStream to be used with message stanzas
507     * encapsulating the data packets.
508     */
509    private class MessageIBBInputStream extends IBBInputStream {
510
511        protected PacketListener getDataPacketListener() {
512            return new PacketListener() {
513
514                public void processPacket(Packet packet) {
515                    // get data packet extension
516                    DataPacketExtension data = (DataPacketExtension) packet.getExtension(
517                                    DataPacketExtension.ELEMENT_NAME,
518                                    InBandBytestreamManager.NAMESPACE);
519
520                    // check if encoded data is valid
521                    if (data.getDecodedData() == null) {
522                        /*
523                         * TODO once a majority of XMPP server implementation support XEP-0079
524                         * Advanced Message Processing the invalid message could be answered with an
525                         * appropriate error. For now we just ignore the packet. Subsequent packets
526                         * with an increased sequence will cause the input stream to close the
527                         * stream/session.
528                         */
529                        return;
530                    }
531
532                    // data is valid; add to data queue
533                    dataQueue.offer(data);
534
535                    // TODO confirm packet once XMPP servers support XEP-0079
536                }
537
538            };
539        }
540
541        @Override
542        protected PacketFilter getDataPacketFilter() {
543            /*
544             * filter all message stanzas containing a data packet extension, matching session ID
545             * and recipient
546             */
547            return new AndFilter(new PacketTypeFilter(Message.class), new IBBDataPacketFilter());
548        }
549
550    }
551
552    /**
553     * IBBDataPacketFilter class filters all packets from the remote peer of this session,
554     * containing an In-Band Bytestream data packet extension whose session ID matches this sessions
555     * ID.
556     */
557    private class IBBDataPacketFilter implements PacketFilter {
558
559        public boolean accept(Packet packet) {
560            // sender equals remote peer
561            if (!packet.getFrom().equalsIgnoreCase(remoteJID)) {
562                return false;
563            }
564
565            // stanza contains data packet extension
566            PacketExtension packetExtension = packet.getExtension(DataPacketExtension.ELEMENT_NAME,
567                            InBandBytestreamManager.NAMESPACE);
568            if (packetExtension == null || !(packetExtension instanceof DataPacketExtension)) {
569                return false;
570            }
571
572            // session ID equals this session ID
573            DataPacketExtension data = (DataPacketExtension) packetExtension;
574            if (!data.getSessionID().equals(byteStreamRequest.getSessionID())) {
575                return false;
576            }
577
578            return true;
579        }
580
581    }
582
583    /**
584     * IBBOutputStream class is the base implementation of an In-Band Bytestream output stream.
585     * Subclasses of this output stream must provide a method to send data over XMPP stream.
586     */
587    private abstract class IBBOutputStream extends OutputStream {
588
589        /* buffer with the size of this sessions block size */
590        protected final byte[] buffer;
591
592        /* pointer to next byte to write to buffer */
593        protected int bufferPointer = 0;
594
595        /* data packet sequence (range from 0 to 65535) */
596        protected long seq = 0;
597
598        /* flag to indicate if output stream is closed */
599        protected boolean isClosed = false;
600
601        /**
602         * Constructor.
603         */
604        public IBBOutputStream() {
605            this.buffer = new byte[byteStreamRequest.getBlockSize()];
606        }
607
608        /**
609         * Writes the given data packet to the XMPP stream.
610         * 
611         * @param data the data packet
612         * @throws IOException if an I/O error occurred while sending or if the stream is closed
613         * @throws NotConnectedException 
614         */
615        protected abstract void writeToXML(DataPacketExtension data) throws IOException, NotConnectedException;
616
617        public synchronized void write(int b) throws IOException {
618            if (this.isClosed) {
619                throw new IOException("Stream is closed");
620            }
621
622            // if buffer is full flush buffer
623            if (bufferPointer >= buffer.length) {
624                flushBuffer();
625            }
626
627            buffer[bufferPointer++] = (byte) b;
628        }
629
630        public synchronized void write(byte b[], int off, int len) throws IOException {
631            if (b == null) {
632                throw new NullPointerException();
633            }
634            else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
635                            || ((off + len) < 0)) {
636                throw new IndexOutOfBoundsException();
637            }
638            else if (len == 0) {
639                return;
640            }
641
642            if (this.isClosed) {
643                throw new IOException("Stream is closed");
644            }
645
646            // is data to send greater than buffer size
647            if (len >= buffer.length) {
648
649                // "byte" off the first chunk to write out
650                writeOut(b, off, buffer.length);
651
652                // recursively call this method with the lesser amount
653                write(b, off + buffer.length, len - buffer.length);
654            }
655            else {
656                writeOut(b, off, len);
657            }
658        }
659
660        public synchronized void write(byte[] b) throws IOException {
661            write(b, 0, b.length);
662        }
663
664        /**
665         * Fills the buffer with the given data and sends it over the XMPP stream if the buffers
666         * capacity has been reached. This method is only called from this class so it is assured
667         * that the amount of data to send is <= buffer capacity
668         * 
669         * @param b the data
670         * @param off the data
671         * @param len the number of bytes to write
672         * @throws IOException if an I/O error occurred while sending or if the stream is closed
673         */
674        private synchronized void writeOut(byte b[], int off, int len) throws IOException {
675            if (this.isClosed) {
676                throw new IOException("Stream is closed");
677            }
678
679            // set to 0 in case the next 'if' block is not executed
680            int available = 0;
681
682            // is data to send greater that buffer space left
683            if (len > buffer.length - bufferPointer) {
684                // fill buffer to capacity and send it
685                available = buffer.length - bufferPointer;
686                System.arraycopy(b, off, buffer, bufferPointer, available);
687                bufferPointer += available;
688                flushBuffer();
689            }
690
691            // copy the data left to buffer
692            System.arraycopy(b, off + available, buffer, bufferPointer, len - available);
693            bufferPointer += len - available;
694        }
695
696        public synchronized void flush() throws IOException {
697            if (this.isClosed) {
698                throw new IOException("Stream is closed");
699            }
700            flushBuffer();
701        }
702
703        private synchronized void flushBuffer() throws IOException {
704
705            // do nothing if no data to send available
706            if (bufferPointer == 0) {
707                return;
708            }
709
710            // create data packet
711            String enc = StringUtils.encodeBase64(buffer, 0, bufferPointer, false);
712            DataPacketExtension data = new DataPacketExtension(byteStreamRequest.getSessionID(),
713                            this.seq, enc);
714
715            // write to XMPP stream
716            try {
717                writeToXML(data);
718            }
719            catch (NotConnectedException e) {
720                IOException ioException = new IOException();
721                ioException.initCause(e);
722                throw ioException;
723            }
724
725            // reset buffer pointer
726            bufferPointer = 0;
727
728            // increment sequence, considering sequence overflow
729            this.seq = (this.seq + 1 == 65535 ? 0 : this.seq + 1);
730
731        }
732
733        public void close() throws IOException {
734            if (isClosed) {
735                return;
736            }
737            InBandBytestreamSession.this.closeByLocal(false);
738        }
739
740        /**
741         * Sets the close flag and optionally flushes the stream.
742         * 
743         * @param flush if <code>true</code> flushes the stream
744         */
745        protected void closeInternal(boolean flush) {
746            if (this.isClosed) {
747                return;
748            }
749            this.isClosed = true;
750
751            try {
752                if (flush) {
753                    flushBuffer();
754                }
755            }
756            catch (IOException e) {
757                /*
758                 * ignore, because writeToXML() will not throw an exception if stream is already
759                 * closed
760                 */
761            }
762        }
763
764    }
765
766    /**
767     * IQIBBOutputStream class implements IBBOutputStream to be used with IQ stanzas encapsulating
768     * the data packets.
769     */
770    private class IQIBBOutputStream extends IBBOutputStream {
771
772        @Override
773        protected synchronized void writeToXML(DataPacketExtension data) throws IOException {
774            // create IQ stanza containing data packet
775            IQ iq = new Data(data);
776            iq.setTo(remoteJID);
777
778            try {
779                connection.createPacketCollectorAndSend(iq).nextResultOrThrow();
780            }
781            catch (Exception e) {
782                // close session unless it is already closed
783                if (!this.isClosed) {
784                    InBandBytestreamSession.this.close();
785                    // Sadly we are unable to use the IOException(Throwable) constructor because this
786                    // constructor is only supported from Android API 9 on.
787                    IOException ioException = new IOException();
788                    ioException.initCause(e);
789                    throw ioException;
790                }
791            }
792
793        }
794
795    }
796
797    /**
798     * MessageIBBOutputStream class implements IBBOutputStream to be used with message stanzas
799     * encapsulating the data packets.
800     */
801    private class MessageIBBOutputStream extends IBBOutputStream {
802
803        @Override
804        protected synchronized void writeToXML(DataPacketExtension data) throws NotConnectedException {
805            // create message stanza containing data packet
806            Message message = new Message(remoteJID);
807            message.addExtension(data);
808
809            connection.sendPacket(message);
810
811        }
812
813    }
814
815}