001/**
002 *
003 * Copyright 2017-2018 Florian Schmaus
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack;
018
019import java.io.IOException;
020import java.net.Socket;
021import java.net.SocketAddress;
022import java.util.concurrent.CancellationException;
023import java.util.concurrent.ExecutionException;
024import java.util.concurrent.Future;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.TimeoutException;
027import java.util.logging.Level;
028import java.util.logging.Logger;
029
030import javax.net.SocketFactory;
031
032import org.jivesoftware.smack.packet.Stanza;
033import org.jivesoftware.smack.util.CallbackRecipient;
034import org.jivesoftware.smack.util.ExceptionCallback;
035import org.jivesoftware.smack.util.SuccessCallback;
036
037public abstract class SmackFuture<V, E extends Exception> implements Future<V>, CallbackRecipient<V, E> {
038
039    private static final Logger LOGGER = Logger.getLogger(SmackFuture.class.getName());
040
041    private boolean cancelled;
042
043    protected V result;
044
045    protected E exception;
046
047    private SuccessCallback<V> successCallback;
048
049    private ExceptionCallback<E> exceptionCallback;
050
051    @Override
052    public final synchronized boolean cancel(boolean mayInterruptIfRunning) {
053        if (isDone()) {
054            return false;
055        }
056
057        cancelled = true;
058
059        if (mayInterruptIfRunning) {
060            notifyAll();
061        }
062
063        return true;
064    }
065
066    @Override
067    public final synchronized boolean isCancelled() {
068        return cancelled;
069    }
070
071    @Override
072    public final synchronized boolean isDone() {
073        return result != null;
074    }
075
076    @Override
077    public CallbackRecipient<V, E> onSuccess(SuccessCallback<V> successCallback) {
078        this.successCallback = successCallback;
079        maybeInvokeCallbacks();
080        return this;
081    }
082
083    @Override
084    public CallbackRecipient<V, E> onError(ExceptionCallback<E> exceptionCallback) {
085        this.exceptionCallback = exceptionCallback;
086        maybeInvokeCallbacks();
087        return this;
088    }
089
090    private V getOrThrowExecutionException() throws ExecutionException {
091        assert (result != null || exception != null || cancelled);
092        if (result != null) {
093            return result;
094        }
095        if (exception != null) {
096            throw new ExecutionException(exception);
097        }
098
099        assert (cancelled);
100        throw new CancellationException();
101    }
102
103    @Override
104    public final synchronized V get() throws InterruptedException, ExecutionException {
105        while (result == null && exception == null && !cancelled) {
106            futureWait();
107        }
108
109        return getOrThrowExecutionException();
110    }
111
112    public final synchronized V getOrThrow() throws E, InterruptedException {
113        while (result == null && exception == null && !cancelled) {
114            futureWait();
115        }
116
117        if (exception != null) {
118            throw exception;
119        }
120
121        if (cancelled) {
122            throw new CancellationException();
123        }
124
125        assert result != null;
126        return result;
127    }
128
129    @Override
130    public final synchronized V get(long timeout, TimeUnit unit)
131                    throws InterruptedException, ExecutionException, TimeoutException {
132        final long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
133        while (result != null && exception != null) {
134            final long waitTimeRemaining = deadline - System.currentTimeMillis();
135            if (waitTimeRemaining > 0) {
136                futureWait(waitTimeRemaining);
137            }
138        }
139
140        if (cancelled) {
141            throw new CancellationException();
142        }
143
144        if (result == null || exception == null) {
145            throw new TimeoutException();
146        }
147
148        return getOrThrowExecutionException();
149    }
150
151    protected final synchronized void maybeInvokeCallbacks() {
152        if (cancelled) {
153            return;
154        }
155
156        if (result != null && successCallback != null) {
157            AbstractXMPPConnection.asyncGo(new Runnable() {
158                @Override
159                public void run() {
160                    successCallback.onSuccess(result);
161                }
162            });
163        }
164        else if (exception != null && exceptionCallback != null) {
165            AbstractXMPPConnection.asyncGo(new Runnable() {
166                @Override
167                public void run() {
168                    exceptionCallback.processException(exception);
169                }
170            });
171        }
172    }
173
174    protected final void futureWait() throws InterruptedException {
175        futureWait(0);
176    }
177
178    @SuppressWarnings("WaitNotInLoop")
179    protected void futureWait(long timeout) throws InterruptedException {
180        wait(timeout);
181    }
182
183    public static class InternalSmackFuture<V, E extends Exception> extends SmackFuture<V, E> {
184        public final synchronized void setResult(V result) {
185            this.result = result;
186            this.notifyAll();
187
188            maybeInvokeCallbacks();
189        }
190
191        public final synchronized void setException(E exception) {
192            this.exception = exception;
193            this.notifyAll();
194
195            maybeInvokeCallbacks();
196        }
197    }
198
199    public static class SocketFuture extends InternalSmackFuture<Socket, IOException> {
200        private final Socket socket;
201
202        private final Object wasInterruptedLock = new Object();
203
204        private boolean wasInterrupted;
205
206        public SocketFuture(SocketFactory socketFactory) throws IOException {
207            socket = socketFactory.createSocket();
208        }
209
210        @Override
211        protected void futureWait(long timeout) throws InterruptedException {
212            try {
213                super.futureWait(timeout);
214            } catch (InterruptedException interruptedException) {
215                synchronized (wasInterruptedLock) {
216                    wasInterrupted = true;
217                    if (!socket.isClosed()) {
218                        closeSocket();
219                    }
220                }
221                throw interruptedException;
222            }
223        }
224
225        public void connectAsync(final SocketAddress socketAddress, final int timeout) {
226            AbstractXMPPConnection.asyncGo(new Runnable() {
227                @Override
228                public void run() {
229                    try {
230                        socket.connect(socketAddress, timeout);
231                    }
232                    catch (IOException e) {
233                        setException(e);
234                        return;
235                    }
236                    synchronized (wasInterruptedLock) {
237                        if (wasInterrupted) {
238                            closeSocket();
239                            return;
240                        }
241                    }
242                    setResult(socket);
243                }
244            });
245        }
246
247        private void closeSocket() {
248            try {
249                socket.close();
250            }
251            catch (IOException ioException) {
252                LOGGER.log(Level.WARNING, "Could not close socket", ioException);
253            }
254        }
255    }
256
257    public abstract static class InternalProcessStanzaSmackFuture<V, E extends Exception> extends InternalSmackFuture<V, E>
258                    implements StanzaListener, ExceptionCallback<E> {
259
260        /**
261         * This method checks if the given exception is <b>not</b> fatal. If this method returns <code>false</code>,
262         * then the future will automatically set the given exception as failure reason and notify potential waiting
263         * threads.
264         *
265         * @param exception the exception to check.
266         * @return <code>true</code> if the exception is not fatal, <code>false</code> otherwise.
267         */
268        protected abstract boolean isNonFatalException(E exception);
269
270        protected abstract void handleStanza(Stanza stanza);
271
272        @Override
273        public final synchronized void processException(E exception) {
274            if (!isNonFatalException(exception)) {
275                this.exception = exception;
276                this.notifyAll();
277
278                maybeInvokeCallbacks();
279            }
280        }
281
282        /**
283         * Wrapper method for {@link #handleStanza(Stanza)}. Note that this method is <code>synchronized</code>.
284         */
285        @Override
286        public final synchronized void processStanza(Stanza stanza) {
287            handleStanza(stanza);
288        }
289    }
290
291    /**
292     * A simple version of InternalSmackFuture which implements isNonFatalException(E) as always returning
293     * <code>false</code> method.
294     *
295     * @param <V>
296     */
297    public abstract static class SimpleInternalProcessStanzaSmackFuture<V, E extends Exception>
298                    extends InternalProcessStanzaSmackFuture<V, E> {
299        @Override
300        protected boolean isNonFatalException(E exception) {
301            return false;
302        }
303    }
304
305    public static <V, E extends Exception> SmackFuture<V, E> from(V result) {
306        InternalSmackFuture<V, E> future = new InternalSmackFuture<>();
307        future.setResult(result);
308        return future;
309    }
310
311}