001/**
002 *
003 * Copyright 2017-2020 Florian Schmaus
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack;
018
019import java.io.IOException;
020import java.net.Socket;
021import java.net.SocketAddress;
022import java.util.Collection;
023import java.util.concurrent.CancellationException;
024import java.util.concurrent.CountDownLatch;
025import java.util.concurrent.ExecutionException;
026import java.util.concurrent.Future;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.TimeoutException;
029import java.util.logging.Level;
030import java.util.logging.Logger;
031
032import javax.net.SocketFactory;
033
034import org.jivesoftware.smack.packet.Stanza;
035import org.jivesoftware.smack.util.CallbackRecipient;
036import org.jivesoftware.smack.util.Consumer;
037import org.jivesoftware.smack.util.ExceptionCallback;
038import org.jivesoftware.smack.util.SuccessCallback;
039
040public abstract class SmackFuture<V, E extends Exception> implements Future<V>, CallbackRecipient<V, E> {
041
042    private static final Logger LOGGER = Logger.getLogger(SmackFuture.class.getName());
043
044    private boolean cancelled;
045
046    protected V result;
047
048    protected E exception;
049
050    private SuccessCallback<V> successCallback;
051
052    private ExceptionCallback<E> exceptionCallback;
053
054    private Consumer<SmackFuture<V, E>> completionCallback;
055
056    @Override
057    public final synchronized boolean cancel(boolean mayInterruptIfRunning) {
058        if (isDone()) {
059            return false;
060        }
061
062        cancelled = true;
063
064        if (mayInterruptIfRunning) {
065            notifyAll();
066        }
067
068        return true;
069    }
070
071    @Override
072    public final synchronized boolean isCancelled() {
073        return cancelled;
074    }
075
076    @Override
077    public final synchronized boolean isDone() {
078        return result != null;
079    }
080
081    @Override
082    public CallbackRecipient<V, E> onSuccess(SuccessCallback<V> successCallback) {
083        this.successCallback = successCallback;
084        maybeInvokeCallbacks();
085        return this;
086    }
087
088    @Override
089    public CallbackRecipient<V, E> onError(ExceptionCallback<E> exceptionCallback) {
090        this.exceptionCallback = exceptionCallback;
091        maybeInvokeCallbacks();
092        return this;
093    }
094
095    public void onCompletion(Consumer<SmackFuture<V, E>> completionCallback) {
096        this.completionCallback = completionCallback;
097        maybeInvokeCallbacks();
098    }
099
100    private V getOrThrowExecutionException() throws ExecutionException {
101        assert result != null || exception != null || cancelled;
102        if (result != null) {
103            return result;
104        }
105        if (exception != null) {
106            throw new ExecutionException(exception);
107        }
108
109        assert cancelled;
110        throw new CancellationException();
111    }
112
113    @Override
114    public final synchronized V get() throws InterruptedException, ExecutionException {
115        while (result == null && exception == null && !cancelled) {
116            futureWait();
117        }
118
119        return getOrThrowExecutionException();
120    }
121
122    public final synchronized V getOrThrow() throws E, InterruptedException {
123        while (result == null && exception == null && !cancelled) {
124            futureWait();
125        }
126
127        if (exception != null) {
128            throw exception;
129        }
130
131        if (cancelled) {
132            throw new CancellationException();
133        }
134
135        assert result != null;
136        return result;
137    }
138
139    @Override
140    public final synchronized V get(long timeout, TimeUnit unit)
141                    throws InterruptedException, ExecutionException, TimeoutException {
142        final long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
143        while (result != null && exception != null) {
144            final long waitTimeRemaining = deadline - System.currentTimeMillis();
145            if (waitTimeRemaining > 0) {
146                futureWait(waitTimeRemaining);
147            }
148        }
149
150        if (cancelled) {
151            throw new CancellationException();
152        }
153
154        if (result == null || exception == null) {
155            throw new TimeoutException();
156        }
157
158        return getOrThrowExecutionException();
159    }
160
161    public V getIfAvailable() {
162        return result;
163    }
164
165    protected final synchronized void maybeInvokeCallbacks() {
166        if (cancelled) {
167            return;
168        }
169
170        if ((result != null || exception != null) && completionCallback != null) {
171            completionCallback.accept(this);
172        }
173
174        if (result != null && successCallback != null) {
175            AbstractXMPPConnection.asyncGo(new Runnable() {
176                @Override
177                public void run() {
178                    successCallback.onSuccess(result);
179                }
180            });
181        }
182        else if (exception != null && exceptionCallback != null) {
183            AbstractXMPPConnection.asyncGo(new Runnable() {
184                @Override
185                public void run() {
186                    exceptionCallback.processException(exception);
187                }
188            });
189        }
190    }
191
192    protected final void futureWait() throws InterruptedException {
193        futureWait(0);
194    }
195
196    @SuppressWarnings("WaitNotInLoop")
197    protected void futureWait(long timeout) throws InterruptedException {
198        wait(timeout);
199    }
200
201    public static class InternalSmackFuture<V, E extends Exception> extends SmackFuture<V, E> {
202        public final synchronized void setResult(V result) {
203            this.result = result;
204            this.notifyAll();
205
206            maybeInvokeCallbacks();
207        }
208
209        public final synchronized void setException(E exception) {
210            this.exception = exception;
211            this.notifyAll();
212
213            maybeInvokeCallbacks();
214        }
215    }
216
217    public static class SocketFuture extends InternalSmackFuture<Socket, IOException> {
218        private final Socket socket;
219
220        private final Object wasInterruptedLock = new Object();
221
222        private boolean wasInterrupted;
223
224        public SocketFuture(SocketFactory socketFactory) throws IOException {
225            socket = socketFactory.createSocket();
226        }
227
228        @Override
229        protected void futureWait(long timeout) throws InterruptedException {
230            try {
231                super.futureWait(timeout);
232            } catch (InterruptedException interruptedException) {
233                synchronized (wasInterruptedLock) {
234                    wasInterrupted = true;
235                    if (!socket.isClosed()) {
236                        closeSocket();
237                    }
238                }
239                throw interruptedException;
240            }
241        }
242
243        public void connectAsync(final SocketAddress socketAddress, final int timeout) {
244            AbstractXMPPConnection.asyncGo(new Runnable() {
245                @Override
246                public void run() {
247                    try {
248                        socket.connect(socketAddress, timeout);
249                    }
250                    catch (IOException e) {
251                        setException(e);
252                        return;
253                    }
254                    synchronized (wasInterruptedLock) {
255                        if (wasInterrupted) {
256                            closeSocket();
257                            return;
258                        }
259                    }
260                    setResult(socket);
261                }
262            });
263        }
264
265        private void closeSocket() {
266            try {
267                socket.close();
268            }
269            catch (IOException ioException) {
270                LOGGER.log(Level.WARNING, "Could not close socket", ioException);
271            }
272        }
273    }
274
275    public abstract static class InternalProcessStanzaSmackFuture<V, E extends Exception> extends InternalSmackFuture<V, E>
276                    implements StanzaListener, ExceptionCallback<E> {
277
278        /**
279         * This method checks if the given exception is <b>not</b> fatal. If this method returns <code>false</code>,
280         * then the future will automatically set the given exception as failure reason and notify potential waiting
281         * threads.
282         *
283         * @param exception the exception to check.
284         * @return <code>true</code> if the exception is not fatal, <code>false</code> otherwise.
285         */
286        protected abstract boolean isNonFatalException(E exception);
287
288        protected abstract void handleStanza(Stanza stanza);
289
290        @Override
291        public final synchronized void processException(E exception) {
292            if (!isNonFatalException(exception)) {
293                this.exception = exception;
294                this.notifyAll();
295
296                maybeInvokeCallbacks();
297            }
298        }
299
300        /**
301         * Wrapper method for {@link #handleStanza(Stanza)}. Note that this method is <code>synchronized</code>.
302         */
303        @Override
304        public final synchronized void processStanza(Stanza stanza) {
305            handleStanza(stanza);
306        }
307    }
308
309    /**
310     * A simple version of InternalSmackFuture which implements isNonFatalException(E) as always returning
311     * <code>false</code> method.
312     *
313     * @param <V> the return value of the future.
314     */
315    public abstract static class SimpleInternalProcessStanzaSmackFuture<V, E extends Exception>
316                    extends InternalProcessStanzaSmackFuture<V, E> {
317        @Override
318        protected boolean isNonFatalException(E exception) {
319            return false;
320        }
321    }
322
323    public static <V, E extends Exception> SmackFuture<V, E> from(V result) {
324        InternalSmackFuture<V, E> future = new InternalSmackFuture<>();
325        future.setResult(result);
326        return future;
327    }
328
329    public static boolean await(Collection<? extends SmackFuture<?, ?>> futures, long timeout, TimeUnit unit) throws InterruptedException {
330        CountDownLatch latch = new CountDownLatch(futures.size());
331        for (SmackFuture<?, ?> future : futures) {
332            future.onCompletion(f -> latch.countDown());
333        }
334
335        return latch.await(timeout, unit);
336    }
337}