001/**
002 *
003 * Copyright 2017-2020 Florian Schmaus
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack;
018
019import java.io.IOException;
020import java.net.Socket;
021import java.net.SocketAddress;
022import java.util.Collection;
023import java.util.concurrent.CancellationException;
024import java.util.concurrent.CountDownLatch;
025import java.util.concurrent.ExecutionException;
026import java.util.concurrent.Future;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.TimeoutException;
029import java.util.logging.Level;
030import java.util.logging.Logger;
031
032import javax.net.SocketFactory;
033
034import org.jivesoftware.smack.packet.Stanza;
035import org.jivesoftware.smack.util.CallbackRecipient;
036import org.jivesoftware.smack.util.Consumer;
037import org.jivesoftware.smack.util.ExceptionCallback;
038import org.jivesoftware.smack.util.SuccessCallback;
039
040public abstract class SmackFuture<V, E extends Exception> implements Future<V>, CallbackRecipient<V, E> {
041
042    private static final Logger LOGGER = Logger.getLogger(SmackFuture.class.getName());
043
044    private boolean cancelled;
045
046    protected V result;
047
048    protected E exception;
049
050    private SuccessCallback<V> successCallback;
051
052    private ExceptionCallback<E> exceptionCallback;
053
054    private Consumer<SmackFuture<V, E>> completionCallback;
055
056    @Override
057    public final synchronized boolean cancel(boolean mayInterruptIfRunning) {
058        if (isDone()) {
059            return false;
060        }
061
062        cancelled = true;
063
064        if (mayInterruptIfRunning) {
065            notifyAll();
066        }
067
068        return true;
069    }
070
071    @Override
072    public final synchronized boolean isCancelled() {
073        return cancelled;
074    }
075
076    @Override
077    public final synchronized boolean isDone() {
078        return result != null;
079    }
080
081    @Override
082    public CallbackRecipient<V, E> onSuccess(SuccessCallback<V> successCallback) {
083        this.successCallback = successCallback;
084        maybeInvokeCallbacks();
085        return this;
086    }
087
088    @Override
089    public CallbackRecipient<V, E> onError(ExceptionCallback<E> exceptionCallback) {
090        this.exceptionCallback = exceptionCallback;
091        maybeInvokeCallbacks();
092        return this;
093    }
094
095    public void onCompletion(Consumer<SmackFuture<V, E>> completionCallback) {
096        this.completionCallback = completionCallback;
097        maybeInvokeCallbacks();
098    }
099
100    private V getOrThrowExecutionException() throws ExecutionException {
101        assert result != null || exception != null || cancelled;
102        if (result != null) {
103            return result;
104        }
105        if (exception != null) {
106            throw new ExecutionException(exception);
107        }
108
109        assert cancelled;
110        throw new CancellationException();
111    }
112
113    @Override
114    public final synchronized V get() throws InterruptedException, ExecutionException {
115        while (result == null && exception == null && !cancelled) {
116            futureWait();
117        }
118
119        return getOrThrowExecutionException();
120    }
121
122    public final synchronized V getOrThrow() throws E, InterruptedException {
123        while (result == null && exception == null && !cancelled) {
124            futureWait();
125        }
126
127        if (exception != null) {
128            throw exception;
129        }
130
131        if (cancelled) {
132            throw new CancellationException();
133        }
134
135        assert result != null;
136        return result;
137    }
138
139    @Override
140    public final synchronized V get(long timeout, TimeUnit unit)
141                    throws InterruptedException, ExecutionException, TimeoutException {
142        final long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
143        while (result != null && exception != null) {
144            final long waitTimeRemaining = deadline - System.currentTimeMillis();
145            if (waitTimeRemaining > 0) {
146                futureWait(waitTimeRemaining);
147            }
148        }
149
150        if (cancelled) {
151            throw new CancellationException();
152        }
153
154        if (result == null || exception == null) {
155            throw new TimeoutException();
156        }
157
158        return getOrThrowExecutionException();
159    }
160
161    public V getIfAvailable() {
162        return result;
163    }
164
165    private boolean callbacksInvoked;
166
167    protected final synchronized void maybeInvokeCallbacks() {
168        if (cancelled || callbacksInvoked) {
169            return;
170        }
171
172        if ((result != null || exception != null) && completionCallback != null) {
173            callbacksInvoked = true;
174            completionCallback.accept(this);
175        }
176
177        if (result != null && successCallback != null) {
178            callbacksInvoked = true;
179            AbstractXMPPConnection.asyncGo(new Runnable() {
180                @Override
181                public void run() {
182                    successCallback.onSuccess(result);
183                }
184            });
185        }
186        else if (exception != null && exceptionCallback != null) {
187            callbacksInvoked = true;
188            AbstractXMPPConnection.asyncGo(new Runnable() {
189                @Override
190                public void run() {
191                    exceptionCallback.processException(exception);
192                }
193            });
194        }
195    }
196
197    protected final void futureWait() throws InterruptedException {
198        futureWait(0);
199    }
200
201    @SuppressWarnings("WaitNotInLoop")
202    protected void futureWait(long timeout) throws InterruptedException {
203        wait(timeout);
204    }
205
206    public static class InternalSmackFuture<V, E extends Exception> extends SmackFuture<V, E> {
207        public final synchronized void setResult(V result) {
208            this.result = result;
209            this.notifyAll();
210
211            maybeInvokeCallbacks();
212        }
213
214        public final synchronized void setException(E exception) {
215            this.exception = exception;
216            this.notifyAll();
217
218            maybeInvokeCallbacks();
219        }
220    }
221
222    public static class SocketFuture extends InternalSmackFuture<Socket, IOException> {
223        private final Socket socket;
224
225        private final Object wasInterruptedLock = new Object();
226
227        private boolean wasInterrupted;
228
229        public SocketFuture(SocketFactory socketFactory) throws IOException {
230            socket = socketFactory.createSocket();
231        }
232
233        @Override
234        protected void futureWait(long timeout) throws InterruptedException {
235            try {
236                super.futureWait(timeout);
237            } catch (InterruptedException interruptedException) {
238                synchronized (wasInterruptedLock) {
239                    wasInterrupted = true;
240                    if (!socket.isClosed()) {
241                        closeSocket();
242                    }
243                }
244                throw interruptedException;
245            }
246        }
247
248        public void connectAsync(final SocketAddress socketAddress, final int timeout) {
249            AbstractXMPPConnection.asyncGo(new Runnable() {
250                @Override
251                public void run() {
252                    try {
253                        socket.connect(socketAddress, timeout);
254                    }
255                    catch (IOException e) {
256                        setException(e);
257                        return;
258                    }
259                    synchronized (wasInterruptedLock) {
260                        if (wasInterrupted) {
261                            closeSocket();
262                            return;
263                        }
264                    }
265                    setResult(socket);
266                }
267            });
268        }
269
270        private void closeSocket() {
271            try {
272                socket.close();
273            }
274            catch (IOException ioException) {
275                LOGGER.log(Level.WARNING, "Could not close socket", ioException);
276            }
277        }
278    }
279
280    public abstract static class InternalProcessStanzaSmackFuture<V, E extends Exception> extends InternalSmackFuture<V, E>
281                    implements StanzaListener, ExceptionCallback<E> {
282
283        /**
284         * This method checks if the given exception is <b>not</b> fatal. If this method returns <code>false</code>,
285         * then the future will automatically set the given exception as failure reason and notify potential waiting
286         * threads.
287         *
288         * @param exception the exception to check.
289         * @return <code>true</code> if the exception is not fatal, <code>false</code> otherwise.
290         */
291        protected abstract boolean isNonFatalException(E exception);
292
293        protected abstract void handleStanza(Stanza stanza);
294
295        @Override
296        public final synchronized void processException(E exception) {
297            if (!isNonFatalException(exception)) {
298                this.exception = exception;
299                this.notifyAll();
300
301                maybeInvokeCallbacks();
302            }
303        }
304
305        /**
306         * Wrapper method for {@link #handleStanza(Stanza)}. Note that this method is <code>synchronized</code>.
307         */
308        @Override
309        public final synchronized void processStanza(Stanza stanza) {
310            handleStanza(stanza);
311        }
312    }
313
314    /**
315     * A simple version of InternalSmackFuture which implements isNonFatalException(E) as always returning
316     * <code>false</code> method.
317     *
318     * @param <V> the return value of the future.
319     */
320    public abstract static class SimpleInternalProcessStanzaSmackFuture<V, E extends Exception>
321                    extends InternalProcessStanzaSmackFuture<V, E> {
322        @Override
323        protected boolean isNonFatalException(E exception) {
324            return false;
325        }
326    }
327
328    public static <V, E extends Exception> SmackFuture<V, E> from(V result) {
329        InternalSmackFuture<V, E> future = new InternalSmackFuture<>();
330        future.setResult(result);
331        return future;
332    }
333
334    public static boolean await(Collection<? extends SmackFuture<?, ?>> futures, long timeout, TimeUnit unit) throws InterruptedException {
335        CountDownLatch latch = new CountDownLatch(futures.size());
336        for (SmackFuture<?, ?> future : futures) {
337            future.onCompletion(f -> latch.countDown());
338        }
339
340        return latch.await(timeout, unit);
341    }
342}