001/** 002 * 003 * Copyright 2017-2021 Florian Schmaus 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack; 018 019import java.io.IOException; 020import java.net.Socket; 021import java.net.SocketAddress; 022import java.util.Collection; 023import java.util.concurrent.CancellationException; 024import java.util.concurrent.CountDownLatch; 025import java.util.concurrent.ExecutionException; 026import java.util.concurrent.Future; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.TimeoutException; 029import java.util.logging.Level; 030import java.util.logging.Logger; 031 032import javax.net.SocketFactory; 033 034import org.jivesoftware.smack.packet.Stanza; 035import org.jivesoftware.smack.util.CallbackRecipient; 036import org.jivesoftware.smack.util.Consumer; 037import org.jivesoftware.smack.util.ExceptionCallback; 038import org.jivesoftware.smack.util.SuccessCallback; 039 040public abstract class SmackFuture<V, E extends Exception> implements Future<V>, CallbackRecipient<V, E> { 041 042 private static final Logger LOGGER = Logger.getLogger(SmackFuture.class.getName()); 043 044 private boolean cancelled; 045 046 protected V result; 047 048 protected E exception; 049 050 private SuccessCallback<V> successCallback; 051 052 private ExceptionCallback<E> exceptionCallback; 053 054 private Consumer<SmackFuture<V, E>> completionCallback; 055 056 @Override 057 public final synchronized boolean cancel(boolean mayInterruptIfRunning) { 058 if (isDone()) { 059 return false; 060 } 061 062 cancelled = true; 063 064 if (mayInterruptIfRunning) { 065 notifyAll(); 066 } 067 068 return true; 069 } 070 071 @Override 072 public final synchronized boolean isCancelled() { 073 return cancelled; 074 } 075 076 @Override 077 public final synchronized boolean isDone() { 078 return result != null || exception != null || cancelled; 079 } 080 081 public final synchronized boolean wasSuccessful() { 082 return result != null; 083 } 084 085 @Override 086 public CallbackRecipient<V, E> onSuccess(SuccessCallback<V> successCallback) { 087 this.successCallback = successCallback; 088 maybeInvokeCallbacks(); 089 return this; 090 } 091 092 @Override 093 public CallbackRecipient<V, E> onError(ExceptionCallback<E> exceptionCallback) { 094 this.exceptionCallback = exceptionCallback; 095 maybeInvokeCallbacks(); 096 return this; 097 } 098 099 public void onCompletion(Consumer<SmackFuture<V, E>> completionCallback) { 100 this.completionCallback = completionCallback; 101 maybeInvokeCallbacks(); 102 } 103 104 private V getOrThrowExecutionException() throws ExecutionException { 105 assert result != null || exception != null || cancelled; 106 if (result != null) { 107 return result; 108 } 109 if (exception != null) { 110 throw new ExecutionException(exception); 111 } 112 113 assert cancelled; 114 throw new CancellationException(); 115 } 116 117 @Override 118 public final synchronized V get() throws InterruptedException, ExecutionException { 119 while (result == null && exception == null && !cancelled) { 120 futureWait(); 121 } 122 123 return getOrThrowExecutionException(); 124 } 125 126 public final synchronized V getOrThrow() throws E, InterruptedException { 127 while (result == null && exception == null && !cancelled) { 128 futureWait(); 129 } 130 131 if (exception != null) { 132 throw exception; 133 } 134 135 if (cancelled) { 136 throw new CancellationException(); 137 } 138 139 assert result != null; 140 return result; 141 } 142 143 @Override 144 public final synchronized V get(long timeout, TimeUnit unit) 145 throws InterruptedException, ExecutionException, TimeoutException { 146 final long deadline = System.currentTimeMillis() + unit.toMillis(timeout); 147 while (result != null && exception != null) { 148 final long waitTimeRemaining = deadline - System.currentTimeMillis(); 149 if (waitTimeRemaining > 0) { 150 futureWait(waitTimeRemaining); 151 } 152 } 153 154 if (cancelled) { 155 throw new CancellationException(); 156 } 157 158 if (result == null || exception == null) { 159 throw new TimeoutException(); 160 } 161 162 return getOrThrowExecutionException(); 163 } 164 165 public V getIfAvailable() { 166 return result; 167 } 168 169 public E getExceptionIfAvailable() { 170 return exception; 171 } 172 173 private boolean callbacksInvoked; 174 175 protected final synchronized void maybeInvokeCallbacks() { 176 if (cancelled || callbacksInvoked) { 177 return; 178 } 179 180 if ((result != null || exception != null) && completionCallback != null) { 181 callbacksInvoked = true; 182 completionCallback.accept(this); 183 } 184 185 if (result != null && successCallback != null) { 186 callbacksInvoked = true; 187 AbstractXMPPConnection.asyncGo(new Runnable() { 188 @Override 189 public void run() { 190 successCallback.onSuccess(result); 191 } 192 }); 193 } 194 else if (exception != null && exceptionCallback != null) { 195 callbacksInvoked = true; 196 AbstractXMPPConnection.asyncGo(new Runnable() { 197 @Override 198 public void run() { 199 exceptionCallback.processException(exception); 200 } 201 }); 202 } 203 } 204 205 protected final void futureWait() throws InterruptedException { 206 futureWait(0); 207 } 208 209 @SuppressWarnings("WaitNotInLoop") 210 protected void futureWait(long timeout) throws InterruptedException { 211 wait(timeout); 212 } 213 214 public static class InternalSmackFuture<V, E extends Exception> extends SmackFuture<V, E> { 215 public final synchronized void setResult(V result) { 216 this.result = result; 217 this.notifyAll(); 218 219 maybeInvokeCallbacks(); 220 } 221 222 public final synchronized void setException(E exception) { 223 this.exception = exception; 224 this.notifyAll(); 225 226 maybeInvokeCallbacks(); 227 } 228 } 229 230 public static class SocketFuture extends InternalSmackFuture<Socket, IOException> { 231 private final Socket socket; 232 233 private final Object wasInterruptedLock = new Object(); 234 235 private boolean wasInterrupted; 236 237 public SocketFuture(SocketFactory socketFactory) throws IOException { 238 socket = socketFactory.createSocket(); 239 } 240 241 @Override 242 protected void futureWait(long timeout) throws InterruptedException { 243 try { 244 super.futureWait(timeout); 245 } catch (InterruptedException interruptedException) { 246 synchronized (wasInterruptedLock) { 247 wasInterrupted = true; 248 if (!socket.isClosed()) { 249 closeSocket(); 250 } 251 } 252 throw interruptedException; 253 } 254 } 255 256 public void connectAsync(final SocketAddress socketAddress, final int timeout) { 257 AbstractXMPPConnection.asyncGo(new Runnable() { 258 @Override 259 public void run() { 260 try { 261 socket.connect(socketAddress, timeout); 262 } 263 catch (IOException e) { 264 setException(e); 265 return; 266 } 267 synchronized (wasInterruptedLock) { 268 if (wasInterrupted) { 269 closeSocket(); 270 return; 271 } 272 } 273 setResult(socket); 274 } 275 }); 276 } 277 278 private void closeSocket() { 279 try { 280 socket.close(); 281 } 282 catch (IOException ioException) { 283 LOGGER.log(Level.WARNING, "Could not close socket", ioException); 284 } 285 } 286 } 287 288 public abstract static class InternalProcessStanzaSmackFuture<V, E extends Exception> extends InternalSmackFuture<V, E> 289 implements StanzaListener, ExceptionCallback<E> { 290 291 /** 292 * This method checks if the given exception is <b>not</b> fatal. If this method returns <code>false</code>, 293 * then the future will automatically set the given exception as failure reason and notify potential waiting 294 * threads. 295 * 296 * @param exception the exception to check. 297 * @return <code>true</code> if the exception is not fatal, <code>false</code> otherwise. 298 */ 299 protected abstract boolean isNonFatalException(E exception); 300 301 protected abstract void handleStanza(Stanza stanza); 302 303 @Override 304 public final synchronized void processException(E exception) { 305 if (!isNonFatalException(exception)) { 306 this.exception = exception; 307 this.notifyAll(); 308 309 maybeInvokeCallbacks(); 310 } 311 } 312 313 /** 314 * Wrapper method for {@link #handleStanza(Stanza)}. Note that this method is <code>synchronized</code>. 315 */ 316 @Override 317 public final synchronized void processStanza(Stanza stanza) { 318 handleStanza(stanza); 319 } 320 } 321 322 /** 323 * A simple version of InternalSmackFuture which implements isNonFatalException(E) as always returning 324 * <code>false</code> method. 325 * 326 * @param <V> the return value of the future. 327 */ 328 public abstract static class SimpleInternalProcessStanzaSmackFuture<V, E extends Exception> 329 extends InternalProcessStanzaSmackFuture<V, E> { 330 @Override 331 protected boolean isNonFatalException(E exception) { 332 return false; 333 } 334 } 335 336 public static <V, E extends Exception> SmackFuture<V, E> from(V result) { 337 InternalSmackFuture<V, E> future = new InternalSmackFuture<>(); 338 future.setResult(result); 339 return future; 340 } 341 342 public static boolean await(Collection<? extends SmackFuture<?, ?>> futures, long timeout) 343 throws InterruptedException { 344 return await(futures, timeout, TimeUnit.MILLISECONDS); 345 } 346 347 public static boolean await(Collection<? extends SmackFuture<?, ?>> futures, long timeout, TimeUnit unit) throws InterruptedException { 348 CountDownLatch latch = new CountDownLatch(futures.size()); 349 for (SmackFuture<?, ?> future : futures) { 350 future.onCompletion(f -> latch.countDown()); 351 } 352 353 return latch.await(timeout, unit); 354 } 355}