001/** 002 * 003 * Copyright 2017-2020 Florian Schmaus 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack; 018 019import java.io.IOException; 020import java.net.Socket; 021import java.net.SocketAddress; 022import java.util.Collection; 023import java.util.concurrent.CancellationException; 024import java.util.concurrent.CountDownLatch; 025import java.util.concurrent.ExecutionException; 026import java.util.concurrent.Future; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.TimeoutException; 029import java.util.logging.Level; 030import java.util.logging.Logger; 031 032import javax.net.SocketFactory; 033 034import org.jivesoftware.smack.packet.Stanza; 035import org.jivesoftware.smack.util.CallbackRecipient; 036import org.jivesoftware.smack.util.Consumer; 037import org.jivesoftware.smack.util.ExceptionCallback; 038import org.jivesoftware.smack.util.SuccessCallback; 039 040public abstract class SmackFuture<V, E extends Exception> implements Future<V>, CallbackRecipient<V, E> { 041 042 private static final Logger LOGGER = Logger.getLogger(SmackFuture.class.getName()); 043 044 private boolean cancelled; 045 046 protected V result; 047 048 protected E exception; 049 050 private SuccessCallback<V> successCallback; 051 052 private ExceptionCallback<E> exceptionCallback; 053 054 private Consumer<SmackFuture<V, E>> completionCallback; 055 056 @Override 057 public final synchronized boolean cancel(boolean mayInterruptIfRunning) { 058 if (isDone()) { 059 return false; 060 } 061 062 cancelled = true; 063 064 if (mayInterruptIfRunning) { 065 notifyAll(); 066 } 067 068 return true; 069 } 070 071 @Override 072 public final synchronized boolean isCancelled() { 073 return cancelled; 074 } 075 076 @Override 077 public final synchronized boolean isDone() { 078 return result != null; 079 } 080 081 @Override 082 public CallbackRecipient<V, E> onSuccess(SuccessCallback<V> successCallback) { 083 this.successCallback = successCallback; 084 maybeInvokeCallbacks(); 085 return this; 086 } 087 088 @Override 089 public CallbackRecipient<V, E> onError(ExceptionCallback<E> exceptionCallback) { 090 this.exceptionCallback = exceptionCallback; 091 maybeInvokeCallbacks(); 092 return this; 093 } 094 095 public void onCompletion(Consumer<SmackFuture<V, E>> completionCallback) { 096 this.completionCallback = completionCallback; 097 maybeInvokeCallbacks(); 098 } 099 100 private V getOrThrowExecutionException() throws ExecutionException { 101 assert result != null || exception != null || cancelled; 102 if (result != null) { 103 return result; 104 } 105 if (exception != null) { 106 throw new ExecutionException(exception); 107 } 108 109 assert cancelled; 110 throw new CancellationException(); 111 } 112 113 @Override 114 public final synchronized V get() throws InterruptedException, ExecutionException { 115 while (result == null && exception == null && !cancelled) { 116 futureWait(); 117 } 118 119 return getOrThrowExecutionException(); 120 } 121 122 public final synchronized V getOrThrow() throws E, InterruptedException { 123 while (result == null && exception == null && !cancelled) { 124 futureWait(); 125 } 126 127 if (exception != null) { 128 throw exception; 129 } 130 131 if (cancelled) { 132 throw new CancellationException(); 133 } 134 135 assert result != null; 136 return result; 137 } 138 139 @Override 140 public final synchronized V get(long timeout, TimeUnit unit) 141 throws InterruptedException, ExecutionException, TimeoutException { 142 final long deadline = System.currentTimeMillis() + unit.toMillis(timeout); 143 while (result != null && exception != null) { 144 final long waitTimeRemaining = deadline - System.currentTimeMillis(); 145 if (waitTimeRemaining > 0) { 146 futureWait(waitTimeRemaining); 147 } 148 } 149 150 if (cancelled) { 151 throw new CancellationException(); 152 } 153 154 if (result == null || exception == null) { 155 throw new TimeoutException(); 156 } 157 158 return getOrThrowExecutionException(); 159 } 160 161 public V getIfAvailable() { 162 return result; 163 } 164 165 private boolean callbacksInvoked; 166 167 protected final synchronized void maybeInvokeCallbacks() { 168 if (cancelled || callbacksInvoked) { 169 return; 170 } 171 172 if ((result != null || exception != null) && completionCallback != null) { 173 callbacksInvoked = true; 174 completionCallback.accept(this); 175 } 176 177 if (result != null && successCallback != null) { 178 callbacksInvoked = true; 179 AbstractXMPPConnection.asyncGo(new Runnable() { 180 @Override 181 public void run() { 182 successCallback.onSuccess(result); 183 } 184 }); 185 } 186 else if (exception != null && exceptionCallback != null) { 187 callbacksInvoked = true; 188 AbstractXMPPConnection.asyncGo(new Runnable() { 189 @Override 190 public void run() { 191 exceptionCallback.processException(exception); 192 } 193 }); 194 } 195 } 196 197 protected final void futureWait() throws InterruptedException { 198 futureWait(0); 199 } 200 201 @SuppressWarnings("WaitNotInLoop") 202 protected void futureWait(long timeout) throws InterruptedException { 203 wait(timeout); 204 } 205 206 public static class InternalSmackFuture<V, E extends Exception> extends SmackFuture<V, E> { 207 public final synchronized void setResult(V result) { 208 this.result = result; 209 this.notifyAll(); 210 211 maybeInvokeCallbacks(); 212 } 213 214 public final synchronized void setException(E exception) { 215 this.exception = exception; 216 this.notifyAll(); 217 218 maybeInvokeCallbacks(); 219 } 220 } 221 222 public static class SocketFuture extends InternalSmackFuture<Socket, IOException> { 223 private final Socket socket; 224 225 private final Object wasInterruptedLock = new Object(); 226 227 private boolean wasInterrupted; 228 229 public SocketFuture(SocketFactory socketFactory) throws IOException { 230 socket = socketFactory.createSocket(); 231 } 232 233 @Override 234 protected void futureWait(long timeout) throws InterruptedException { 235 try { 236 super.futureWait(timeout); 237 } catch (InterruptedException interruptedException) { 238 synchronized (wasInterruptedLock) { 239 wasInterrupted = true; 240 if (!socket.isClosed()) { 241 closeSocket(); 242 } 243 } 244 throw interruptedException; 245 } 246 } 247 248 public void connectAsync(final SocketAddress socketAddress, final int timeout) { 249 AbstractXMPPConnection.asyncGo(new Runnable() { 250 @Override 251 public void run() { 252 try { 253 socket.connect(socketAddress, timeout); 254 } 255 catch (IOException e) { 256 setException(e); 257 return; 258 } 259 synchronized (wasInterruptedLock) { 260 if (wasInterrupted) { 261 closeSocket(); 262 return; 263 } 264 } 265 setResult(socket); 266 } 267 }); 268 } 269 270 private void closeSocket() { 271 try { 272 socket.close(); 273 } 274 catch (IOException ioException) { 275 LOGGER.log(Level.WARNING, "Could not close socket", ioException); 276 } 277 } 278 } 279 280 public abstract static class InternalProcessStanzaSmackFuture<V, E extends Exception> extends InternalSmackFuture<V, E> 281 implements StanzaListener, ExceptionCallback<E> { 282 283 /** 284 * This method checks if the given exception is <b>not</b> fatal. If this method returns <code>false</code>, 285 * then the future will automatically set the given exception as failure reason and notify potential waiting 286 * threads. 287 * 288 * @param exception the exception to check. 289 * @return <code>true</code> if the exception is not fatal, <code>false</code> otherwise. 290 */ 291 protected abstract boolean isNonFatalException(E exception); 292 293 protected abstract void handleStanza(Stanza stanza); 294 295 @Override 296 public final synchronized void processException(E exception) { 297 if (!isNonFatalException(exception)) { 298 this.exception = exception; 299 this.notifyAll(); 300 301 maybeInvokeCallbacks(); 302 } 303 } 304 305 /** 306 * Wrapper method for {@link #handleStanza(Stanza)}. Note that this method is <code>synchronized</code>. 307 */ 308 @Override 309 public final synchronized void processStanza(Stanza stanza) { 310 handleStanza(stanza); 311 } 312 } 313 314 /** 315 * A simple version of InternalSmackFuture which implements isNonFatalException(E) as always returning 316 * <code>false</code> method. 317 * 318 * @param <V> the return value of the future. 319 */ 320 public abstract static class SimpleInternalProcessStanzaSmackFuture<V, E extends Exception> 321 extends InternalProcessStanzaSmackFuture<V, E> { 322 @Override 323 protected boolean isNonFatalException(E exception) { 324 return false; 325 } 326 } 327 328 public static <V, E extends Exception> SmackFuture<V, E> from(V result) { 329 InternalSmackFuture<V, E> future = new InternalSmackFuture<>(); 330 future.setResult(result); 331 return future; 332 } 333 334 public static boolean await(Collection<? extends SmackFuture<?, ?>> futures, long timeout, TimeUnit unit) throws InterruptedException { 335 CountDownLatch latch = new CountDownLatch(futures.size()); 336 for (SmackFuture<?, ?> future : futures) { 337 future.onCompletion(f -> latch.countDown()); 338 } 339 340 return latch.await(timeout, unit); 341 } 342}