001/** 002 * 003 * Copyright 2017-2018 Florian Schmaus 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack; 018 019import java.io.IOException; 020import java.net.Socket; 021import java.net.SocketAddress; 022import java.util.concurrent.CancellationException; 023import java.util.concurrent.ExecutionException; 024import java.util.concurrent.Future; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.TimeoutException; 027import java.util.logging.Level; 028import java.util.logging.Logger; 029 030import javax.net.SocketFactory; 031 032import org.jivesoftware.smack.packet.Stanza; 033import org.jivesoftware.smack.util.CallbackRecipient; 034import org.jivesoftware.smack.util.ExceptionCallback; 035import org.jivesoftware.smack.util.SuccessCallback; 036 037public abstract class SmackFuture<V, E extends Exception> implements Future<V>, CallbackRecipient<V, E> { 038 039 private static final Logger LOGGER = Logger.getLogger(SmackFuture.class.getName()); 040 041 private boolean cancelled; 042 043 protected V result; 044 045 protected E exception; 046 047 private SuccessCallback<V> successCallback; 048 049 private ExceptionCallback<E> exceptionCallback; 050 051 @Override 052 public final synchronized boolean cancel(boolean mayInterruptIfRunning) { 053 if (isDone()) { 054 return false; 055 } 056 057 cancelled = true; 058 059 if (mayInterruptIfRunning) { 060 notifyAll(); 061 } 062 063 return true; 064 } 065 066 @Override 067 public final synchronized boolean isCancelled() { 068 return cancelled; 069 } 070 071 @Override 072 public final synchronized boolean isDone() { 073 return result != null; 074 } 075 076 @Override 077 public CallbackRecipient<V, E> onSuccess(SuccessCallback<V> successCallback) { 078 this.successCallback = successCallback; 079 maybeInvokeCallbacks(); 080 return this; 081 } 082 083 @Override 084 public CallbackRecipient<V, E> onError(ExceptionCallback<E> exceptionCallback) { 085 this.exceptionCallback = exceptionCallback; 086 maybeInvokeCallbacks(); 087 return this; 088 } 089 090 private V getOrThrowExecutionException() throws ExecutionException { 091 assert (result != null || exception != null || cancelled); 092 if (result != null) { 093 return result; 094 } 095 if (exception != null) { 096 throw new ExecutionException(exception); 097 } 098 099 assert (cancelled); 100 throw new CancellationException(); 101 } 102 103 @Override 104 public final synchronized V get() throws InterruptedException, ExecutionException { 105 while (result == null && exception == null && !cancelled) { 106 futureWait(); 107 } 108 109 return getOrThrowExecutionException(); 110 } 111 112 public final synchronized V getOrThrow() throws E, InterruptedException { 113 while (result == null && exception == null && !cancelled) { 114 futureWait(); 115 } 116 117 if (exception != null) { 118 throw exception; 119 } 120 121 if (cancelled) { 122 throw new CancellationException(); 123 } 124 125 assert result != null; 126 return result; 127 } 128 129 @Override 130 public final synchronized V get(long timeout, TimeUnit unit) 131 throws InterruptedException, ExecutionException, TimeoutException { 132 final long deadline = System.currentTimeMillis() + unit.toMillis(timeout); 133 while (result != null && exception != null) { 134 final long waitTimeRemaining = deadline - System.currentTimeMillis(); 135 if (waitTimeRemaining > 0) { 136 futureWait(waitTimeRemaining); 137 } 138 } 139 140 if (cancelled) { 141 throw new CancellationException(); 142 } 143 144 if (result == null || exception == null) { 145 throw new TimeoutException(); 146 } 147 148 return getOrThrowExecutionException(); 149 } 150 151 protected final synchronized void maybeInvokeCallbacks() { 152 if (cancelled) { 153 return; 154 } 155 156 if (result != null && successCallback != null) { 157 AbstractXMPPConnection.asyncGo(new Runnable() { 158 @Override 159 public void run() { 160 successCallback.onSuccess(result); 161 } 162 }); 163 } 164 else if (exception != null && exceptionCallback != null) { 165 AbstractXMPPConnection.asyncGo(new Runnable() { 166 @Override 167 public void run() { 168 exceptionCallback.processException(exception); 169 } 170 }); 171 } 172 } 173 174 protected final void futureWait() throws InterruptedException { 175 futureWait(0); 176 } 177 178 @SuppressWarnings("WaitNotInLoop") 179 protected void futureWait(long timeout) throws InterruptedException { 180 wait(timeout); 181 } 182 183 public static class InternalSmackFuture<V, E extends Exception> extends SmackFuture<V, E> { 184 public final synchronized void setResult(V result) { 185 this.result = result; 186 this.notifyAll(); 187 188 maybeInvokeCallbacks(); 189 } 190 191 public final synchronized void setException(E exception) { 192 this.exception = exception; 193 this.notifyAll(); 194 195 maybeInvokeCallbacks(); 196 } 197 } 198 199 public static class SocketFuture extends InternalSmackFuture<Socket, IOException> { 200 private final Socket socket; 201 202 private final Object wasInterruptedLock = new Object(); 203 204 private boolean wasInterrupted; 205 206 public SocketFuture(SocketFactory socketFactory) throws IOException { 207 socket = socketFactory.createSocket(); 208 } 209 210 @Override 211 protected void futureWait(long timeout) throws InterruptedException { 212 try { 213 super.futureWait(timeout); 214 } catch (InterruptedException interruptedException) { 215 synchronized (wasInterruptedLock) { 216 wasInterrupted = true; 217 if (!socket.isClosed()) { 218 closeSocket(); 219 } 220 } 221 throw interruptedException; 222 } 223 } 224 225 public void connectAsync(final SocketAddress socketAddress, final int timeout) { 226 AbstractXMPPConnection.asyncGo(new Runnable() { 227 @Override 228 public void run() { 229 try { 230 socket.connect(socketAddress, timeout); 231 } 232 catch (IOException e) { 233 setException(e); 234 return; 235 } 236 synchronized (wasInterruptedLock) { 237 if (wasInterrupted) { 238 closeSocket(); 239 return; 240 } 241 } 242 setResult(socket); 243 } 244 }); 245 } 246 247 private void closeSocket() { 248 try { 249 socket.close(); 250 } 251 catch (IOException ioException) { 252 LOGGER.log(Level.WARNING, "Could not close socket", ioException); 253 } 254 } 255 } 256 257 public abstract static class InternalProcessStanzaSmackFuture<V, E extends Exception> extends InternalSmackFuture<V, E> 258 implements StanzaListener, ExceptionCallback<E> { 259 260 /** 261 * This method checks if the given exception is <b>not</b> fatal. If this method returns <code>false</code>, 262 * then the future will automatically set the given exception as failure reason and notify potential waiting 263 * threads. 264 * 265 * @param exception the exception to check. 266 * @return <code>true</code> if the exception is not fatal, <code>false</code> otherwise. 267 */ 268 protected abstract boolean isNonFatalException(E exception); 269 270 protected abstract void handleStanza(Stanza stanza); 271 272 @Override 273 public final synchronized void processException(E exception) { 274 if (!isNonFatalException(exception)) { 275 this.exception = exception; 276 this.notifyAll(); 277 278 maybeInvokeCallbacks(); 279 } 280 } 281 282 /** 283 * Wrapper method for {@link #handleStanza(Stanza)}. Note that this method is <code>synchronized</code>. 284 */ 285 @Override 286 public final synchronized void processStanza(Stanza stanza) { 287 handleStanza(stanza); 288 } 289 } 290 291 /** 292 * A simple version of InternalSmackFuture which implements isNonFatalException(E) as always returning 293 * <code>false</code> method. 294 * 295 * @param <V> 296 */ 297 public abstract static class SimpleInternalProcessStanzaSmackFuture<V, E extends Exception> 298 extends InternalProcessStanzaSmackFuture<V, E> { 299 @Override 300 protected boolean isNonFatalException(E exception) { 301 return false; 302 } 303 } 304 305 public static <V, E extends Exception> SmackFuture<V, E> from(V result) { 306 InternalSmackFuture<V, E> future = new InternalSmackFuture<>(); 307 future.setResult(result); 308 return future; 309 } 310 311}