001/** 002 * 003 * Copyright 2017-2020 Florian Schmaus 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack; 018 019import java.io.IOException; 020import java.net.Socket; 021import java.net.SocketAddress; 022import java.util.Collection; 023import java.util.concurrent.CancellationException; 024import java.util.concurrent.CountDownLatch; 025import java.util.concurrent.ExecutionException; 026import java.util.concurrent.Future; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.TimeoutException; 029import java.util.logging.Level; 030import java.util.logging.Logger; 031 032import javax.net.SocketFactory; 033 034import org.jivesoftware.smack.packet.Stanza; 035import org.jivesoftware.smack.util.CallbackRecipient; 036import org.jivesoftware.smack.util.Consumer; 037import org.jivesoftware.smack.util.ExceptionCallback; 038import org.jivesoftware.smack.util.SuccessCallback; 039 040public abstract class SmackFuture<V, E extends Exception> implements Future<V>, CallbackRecipient<V, E> { 041 042 private static final Logger LOGGER = Logger.getLogger(SmackFuture.class.getName()); 043 044 private boolean cancelled; 045 046 protected V result; 047 048 protected E exception; 049 050 private SuccessCallback<V> successCallback; 051 052 private ExceptionCallback<E> exceptionCallback; 053 054 private Consumer<SmackFuture<V, E>> completionCallback; 055 056 @Override 057 public final synchronized boolean cancel(boolean mayInterruptIfRunning) { 058 if (isDone()) { 059 return false; 060 } 061 062 cancelled = true; 063 064 if (mayInterruptIfRunning) { 065 notifyAll(); 066 } 067 068 return true; 069 } 070 071 @Override 072 public final synchronized boolean isCancelled() { 073 return cancelled; 074 } 075 076 @Override 077 public final synchronized boolean isDone() { 078 return result != null; 079 } 080 081 @Override 082 public CallbackRecipient<V, E> onSuccess(SuccessCallback<V> successCallback) { 083 this.successCallback = successCallback; 084 maybeInvokeCallbacks(); 085 return this; 086 } 087 088 @Override 089 public CallbackRecipient<V, E> onError(ExceptionCallback<E> exceptionCallback) { 090 this.exceptionCallback = exceptionCallback; 091 maybeInvokeCallbacks(); 092 return this; 093 } 094 095 public void onCompletion(Consumer<SmackFuture<V, E>> completionCallback) { 096 this.completionCallback = completionCallback; 097 maybeInvokeCallbacks(); 098 } 099 100 private V getOrThrowExecutionException() throws ExecutionException { 101 assert result != null || exception != null || cancelled; 102 if (result != null) { 103 return result; 104 } 105 if (exception != null) { 106 throw new ExecutionException(exception); 107 } 108 109 assert cancelled; 110 throw new CancellationException(); 111 } 112 113 @Override 114 public final synchronized V get() throws InterruptedException, ExecutionException { 115 while (result == null && exception == null && !cancelled) { 116 futureWait(); 117 } 118 119 return getOrThrowExecutionException(); 120 } 121 122 public final synchronized V getOrThrow() throws E, InterruptedException { 123 while (result == null && exception == null && !cancelled) { 124 futureWait(); 125 } 126 127 if (exception != null) { 128 throw exception; 129 } 130 131 if (cancelled) { 132 throw new CancellationException(); 133 } 134 135 assert result != null; 136 return result; 137 } 138 139 @Override 140 public final synchronized V get(long timeout, TimeUnit unit) 141 throws InterruptedException, ExecutionException, TimeoutException { 142 final long deadline = System.currentTimeMillis() + unit.toMillis(timeout); 143 while (result != null && exception != null) { 144 final long waitTimeRemaining = deadline - System.currentTimeMillis(); 145 if (waitTimeRemaining > 0) { 146 futureWait(waitTimeRemaining); 147 } 148 } 149 150 if (cancelled) { 151 throw new CancellationException(); 152 } 153 154 if (result == null || exception == null) { 155 throw new TimeoutException(); 156 } 157 158 return getOrThrowExecutionException(); 159 } 160 161 public V getIfAvailable() { 162 return result; 163 } 164 165 protected final synchronized void maybeInvokeCallbacks() { 166 if (cancelled) { 167 return; 168 } 169 170 if ((result != null || exception != null) && completionCallback != null) { 171 completionCallback.accept(this); 172 } 173 174 if (result != null && successCallback != null) { 175 AbstractXMPPConnection.asyncGo(new Runnable() { 176 @Override 177 public void run() { 178 successCallback.onSuccess(result); 179 } 180 }); 181 } 182 else if (exception != null && exceptionCallback != null) { 183 AbstractXMPPConnection.asyncGo(new Runnable() { 184 @Override 185 public void run() { 186 exceptionCallback.processException(exception); 187 } 188 }); 189 } 190 } 191 192 protected final void futureWait() throws InterruptedException { 193 futureWait(0); 194 } 195 196 @SuppressWarnings("WaitNotInLoop") 197 protected void futureWait(long timeout) throws InterruptedException { 198 wait(timeout); 199 } 200 201 public static class InternalSmackFuture<V, E extends Exception> extends SmackFuture<V, E> { 202 public final synchronized void setResult(V result) { 203 this.result = result; 204 this.notifyAll(); 205 206 maybeInvokeCallbacks(); 207 } 208 209 public final synchronized void setException(E exception) { 210 this.exception = exception; 211 this.notifyAll(); 212 213 maybeInvokeCallbacks(); 214 } 215 } 216 217 public static class SocketFuture extends InternalSmackFuture<Socket, IOException> { 218 private final Socket socket; 219 220 private final Object wasInterruptedLock = new Object(); 221 222 private boolean wasInterrupted; 223 224 public SocketFuture(SocketFactory socketFactory) throws IOException { 225 socket = socketFactory.createSocket(); 226 } 227 228 @Override 229 protected void futureWait(long timeout) throws InterruptedException { 230 try { 231 super.futureWait(timeout); 232 } catch (InterruptedException interruptedException) { 233 synchronized (wasInterruptedLock) { 234 wasInterrupted = true; 235 if (!socket.isClosed()) { 236 closeSocket(); 237 } 238 } 239 throw interruptedException; 240 } 241 } 242 243 public void connectAsync(final SocketAddress socketAddress, final int timeout) { 244 AbstractXMPPConnection.asyncGo(new Runnable() { 245 @Override 246 public void run() { 247 try { 248 socket.connect(socketAddress, timeout); 249 } 250 catch (IOException e) { 251 setException(e); 252 return; 253 } 254 synchronized (wasInterruptedLock) { 255 if (wasInterrupted) { 256 closeSocket(); 257 return; 258 } 259 } 260 setResult(socket); 261 } 262 }); 263 } 264 265 private void closeSocket() { 266 try { 267 socket.close(); 268 } 269 catch (IOException ioException) { 270 LOGGER.log(Level.WARNING, "Could not close socket", ioException); 271 } 272 } 273 } 274 275 public abstract static class InternalProcessStanzaSmackFuture<V, E extends Exception> extends InternalSmackFuture<V, E> 276 implements StanzaListener, ExceptionCallback<E> { 277 278 /** 279 * This method checks if the given exception is <b>not</b> fatal. If this method returns <code>false</code>, 280 * then the future will automatically set the given exception as failure reason and notify potential waiting 281 * threads. 282 * 283 * @param exception the exception to check. 284 * @return <code>true</code> if the exception is not fatal, <code>false</code> otherwise. 285 */ 286 protected abstract boolean isNonFatalException(E exception); 287 288 protected abstract void handleStanza(Stanza stanza); 289 290 @Override 291 public final synchronized void processException(E exception) { 292 if (!isNonFatalException(exception)) { 293 this.exception = exception; 294 this.notifyAll(); 295 296 maybeInvokeCallbacks(); 297 } 298 } 299 300 /** 301 * Wrapper method for {@link #handleStanza(Stanza)}. Note that this method is <code>synchronized</code>. 302 */ 303 @Override 304 public final synchronized void processStanza(Stanza stanza) { 305 handleStanza(stanza); 306 } 307 } 308 309 /** 310 * A simple version of InternalSmackFuture which implements isNonFatalException(E) as always returning 311 * <code>false</code> method. 312 * 313 * @param <V> the return value of the future. 314 */ 315 public abstract static class SimpleInternalProcessStanzaSmackFuture<V, E extends Exception> 316 extends InternalProcessStanzaSmackFuture<V, E> { 317 @Override 318 protected boolean isNonFatalException(E exception) { 319 return false; 320 } 321 } 322 323 public static <V, E extends Exception> SmackFuture<V, E> from(V result) { 324 InternalSmackFuture<V, E> future = new InternalSmackFuture<>(); 325 future.setResult(result); 326 return future; 327 } 328 329 public static boolean await(Collection<? extends SmackFuture<?, ?>> futures, long timeout, TimeUnit unit) throws InterruptedException { 330 CountDownLatch latch = new CountDownLatch(futures.size()); 331 for (SmackFuture<?, ?> future : futures) { 332 future.onCompletion(f -> latch.countDown()); 333 } 334 335 return latch.await(timeout, unit); 336 } 337}