001/** 002 * 003 * Copyright 2017-2021 Florian Schmaus 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack; 018 019import java.io.IOException; 020import java.net.Socket; 021import java.net.SocketAddress; 022import java.util.Collection; 023import java.util.concurrent.CancellationException; 024import java.util.concurrent.CountDownLatch; 025import java.util.concurrent.ExecutionException; 026import java.util.concurrent.Future; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.TimeoutException; 029import java.util.logging.Level; 030import java.util.logging.Logger; 031 032import javax.net.SocketFactory; 033 034import org.jivesoftware.smack.packet.Stanza; 035import org.jivesoftware.smack.util.CallbackRecipient; 036import org.jivesoftware.smack.util.Consumer; 037import org.jivesoftware.smack.util.ExceptionCallback; 038import org.jivesoftware.smack.util.SuccessCallback; 039 040public abstract class SmackFuture<V, E extends Exception> implements Future<V>, CallbackRecipient<V, E> { 041 042 private static final Logger LOGGER = Logger.getLogger(SmackFuture.class.getName()); 043 044 private boolean cancelled; 045 046 protected V result; 047 048 protected E exception; 049 050 private SuccessCallback<V> successCallback; 051 052 private ExceptionCallback<E> exceptionCallback; 053 054 private Consumer<SmackFuture<V, E>> completionCallback; 055 056 @Override 057 public final synchronized boolean cancel(boolean mayInterruptIfRunning) { 058 if (isDone()) { 059 return false; 060 } 061 062 cancelled = true; 063 064 if (mayInterruptIfRunning) { 065 notifyAll(); 066 } 067 068 return true; 069 } 070 071 @Override 072 public final synchronized boolean isCancelled() { 073 return cancelled; 074 } 075 076 @Override 077 public final synchronized boolean isDone() { 078 return result != null || exception != null || cancelled; 079 } 080 081 public final synchronized boolean wasSuccessful() { 082 return result != null; 083 } 084 085 @Override 086 public CallbackRecipient<V, E> onSuccess(SuccessCallback<V> successCallback) { 087 this.successCallback = successCallback; 088 maybeInvokeCallbacks(); 089 return this; 090 } 091 092 @Override 093 public CallbackRecipient<V, E> onError(ExceptionCallback<E> exceptionCallback) { 094 this.exceptionCallback = exceptionCallback; 095 maybeInvokeCallbacks(); 096 return this; 097 } 098 099 public void onCompletion(Consumer<SmackFuture<V, E>> completionCallback) { 100 this.completionCallback = completionCallback; 101 maybeInvokeCallbacks(); 102 } 103 104 private V getOrThrowExecutionException() throws ExecutionException { 105 assert result != null || exception != null || cancelled; 106 if (result != null) { 107 return result; 108 } 109 if (exception != null) { 110 throw new ExecutionException(exception); 111 } 112 113 assert cancelled; 114 throw new CancellationException(); 115 } 116 117 @Override 118 public final synchronized V get() throws InterruptedException, ExecutionException { 119 while (result == null && exception == null && !cancelled) { 120 futureWait(); 121 } 122 123 return getOrThrowExecutionException(); 124 } 125 126 public final synchronized V getOrThrow() throws E, InterruptedException { 127 while (result == null && exception == null && !cancelled) { 128 futureWait(); 129 } 130 131 if (exception != null) { 132 throw exception; 133 } 134 135 if (cancelled) { 136 throw new CancellationException(); 137 } 138 139 assert result != null; 140 return result; 141 } 142 143 @Override 144 public final synchronized V get(long timeout, TimeUnit unit) 145 throws InterruptedException, ExecutionException, TimeoutException { 146 final long deadline = System.currentTimeMillis() + unit.toMillis(timeout); 147 while (result != null && exception != null) { 148 final long waitTimeRemaining = deadline - System.currentTimeMillis(); 149 if (waitTimeRemaining > 0) { 150 futureWait(waitTimeRemaining); 151 } 152 } 153 154 if (cancelled) { 155 throw new CancellationException(); 156 } 157 158 if (result == null || exception == null) { 159 throw new TimeoutException(); 160 } 161 162 return getOrThrowExecutionException(); 163 } 164 165 public V getIfAvailable() { 166 return result; 167 } 168 169 public E getExceptionIfAvailable() { 170 return exception; 171 } 172 173 protected final synchronized void maybeInvokeCallbacks() { 174 if (cancelled) { 175 return; 176 } 177 178 if ((result != null || exception != null) && completionCallback != null) { 179 completionCallback.accept(this); 180 } 181 182 if (result != null && successCallback != null) { 183 AbstractXMPPConnection.asyncGo(new Runnable() { 184 @Override 185 public void run() { 186 successCallback.onSuccess(result); 187 } 188 }); 189 } 190 else if (exception != null && exceptionCallback != null) { 191 AbstractXMPPConnection.asyncGo(new Runnable() { 192 @Override 193 public void run() { 194 exceptionCallback.processException(exception); 195 } 196 }); 197 } 198 } 199 200 protected final void futureWait() throws InterruptedException { 201 futureWait(0); 202 } 203 204 @SuppressWarnings("WaitNotInLoop") 205 protected void futureWait(long timeout) throws InterruptedException { 206 wait(timeout); 207 } 208 209 public static class InternalSmackFuture<V, E extends Exception> extends SmackFuture<V, E> { 210 public final synchronized void setResult(V result) { 211 this.result = result; 212 this.notifyAll(); 213 214 maybeInvokeCallbacks(); 215 } 216 217 public final synchronized void setException(E exception) { 218 this.exception = exception; 219 this.notifyAll(); 220 221 maybeInvokeCallbacks(); 222 } 223 } 224 225 public static class SocketFuture extends InternalSmackFuture<Socket, IOException> { 226 private final Socket socket; 227 228 private final Object wasInterruptedLock = new Object(); 229 230 private boolean wasInterrupted; 231 232 public SocketFuture(SocketFactory socketFactory) throws IOException { 233 socket = socketFactory.createSocket(); 234 } 235 236 @Override 237 protected void futureWait(long timeout) throws InterruptedException { 238 try { 239 super.futureWait(timeout); 240 } catch (InterruptedException interruptedException) { 241 synchronized (wasInterruptedLock) { 242 wasInterrupted = true; 243 if (!socket.isClosed()) { 244 closeSocket(); 245 } 246 } 247 throw interruptedException; 248 } 249 } 250 251 public void connectAsync(final SocketAddress socketAddress, final int timeout) { 252 AbstractXMPPConnection.asyncGo(new Runnable() { 253 @Override 254 public void run() { 255 try { 256 socket.connect(socketAddress, timeout); 257 } 258 catch (IOException e) { 259 setException(e); 260 return; 261 } 262 synchronized (wasInterruptedLock) { 263 if (wasInterrupted) { 264 closeSocket(); 265 return; 266 } 267 } 268 setResult(socket); 269 } 270 }); 271 } 272 273 private void closeSocket() { 274 try { 275 socket.close(); 276 } 277 catch (IOException ioException) { 278 LOGGER.log(Level.WARNING, "Could not close socket", ioException); 279 } 280 } 281 } 282 283 public abstract static class InternalProcessStanzaSmackFuture<V, E extends Exception> extends InternalSmackFuture<V, E> 284 implements StanzaListener, ExceptionCallback<E> { 285 286 /** 287 * This method checks if the given exception is <b>not</b> fatal. If this method returns <code>false</code>, 288 * then the future will automatically set the given exception as failure reason and notify potential waiting 289 * threads. 290 * 291 * @param exception the exception to check. 292 * @return <code>true</code> if the exception is not fatal, <code>false</code> otherwise. 293 */ 294 protected abstract boolean isNonFatalException(E exception); 295 296 protected abstract void handleStanza(Stanza stanza); 297 298 @Override 299 public final synchronized void processException(E exception) { 300 if (!isNonFatalException(exception)) { 301 this.exception = exception; 302 this.notifyAll(); 303 304 maybeInvokeCallbacks(); 305 } 306 } 307 308 /** 309 * Wrapper method for {@link #handleStanza(Stanza)}. Note that this method is <code>synchronized</code>. 310 */ 311 @Override 312 public final synchronized void processStanza(Stanza stanza) { 313 handleStanza(stanza); 314 } 315 } 316 317 /** 318 * A simple version of InternalSmackFuture which implements isNonFatalException(E) as always returning 319 * <code>false</code> method. 320 * 321 * @param <V> the return value of the future. 322 */ 323 public abstract static class SimpleInternalProcessStanzaSmackFuture<V, E extends Exception> 324 extends InternalProcessStanzaSmackFuture<V, E> { 325 @Override 326 protected boolean isNonFatalException(E exception) { 327 return false; 328 } 329 } 330 331 public static <V, E extends Exception> SmackFuture<V, E> from(V result) { 332 InternalSmackFuture<V, E> future = new InternalSmackFuture<>(); 333 future.setResult(result); 334 return future; 335 } 336 337 public static boolean await(Collection<? extends SmackFuture<?, ?>> futures, long timeout) 338 throws InterruptedException { 339 return await(futures, timeout, TimeUnit.MILLISECONDS); 340 } 341 342 public static boolean await(Collection<? extends SmackFuture<?, ?>> futures, long timeout, TimeUnit unit) throws InterruptedException { 343 CountDownLatch latch = new CountDownLatch(futures.size()); 344 for (SmackFuture<?, ?> future : futures) { 345 future.onCompletion(f -> latch.countDown()); 346 } 347 348 return latch.await(timeout, unit); 349 } 350}