001/** 002 * 003 * Copyright © 2014-2015 Florian Schmaus 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack; 018 019import java.util.concurrent.TimeUnit; 020import java.util.concurrent.locks.Condition; 021import java.util.concurrent.locks.Lock; 022 023import org.jivesoftware.smack.SmackException.NoResponseException; 024import org.jivesoftware.smack.SmackException.NotConnectedException; 025import org.jivesoftware.smack.packet.Nonza; 026import org.jivesoftware.smack.packet.Stanza; 027import org.jivesoftware.smack.packet.TopLevelStreamElement; 028 029public class SynchronizationPoint<E extends Exception> { 030 031 private final AbstractXMPPConnection connection; 032 private final Lock connectionLock; 033 private final Condition condition; 034 private final String waitFor; 035 036 // Note that there is no need to make 'state' and 'failureException' volatile. Since 'lock' and 'unlock' have the 037 // same memory synchronization effects as synchronization block enter and leave. 038 private State state; 039 private E failureException; 040 041 /** 042 * Construct a new synchronization point for the given connection. 043 * 044 * @param connection the connection of this synchronization point. 045 * @param waitFor a description of the event this synchronization point handles. 046 */ 047 public SynchronizationPoint(AbstractXMPPConnection connection, String waitFor) { 048 this.connection = connection; 049 this.connectionLock = connection.getConnectionLock(); 050 this.condition = connection.getConnectionLock().newCondition(); 051 this.waitFor = waitFor; 052 init(); 053 } 054 055 /** 056 * Initialize (or reset) this synchronization point. 057 */ 058 public void init() { 059 connectionLock.lock(); 060 state = State.Initial; 061 failureException = null; 062 connectionLock.unlock(); 063 } 064 065 /** 066 * Send the given top level stream element and wait for a response. 067 * 068 * @param request the plain stream element to send. 069 * @throws NoResponseException if no response was received. 070 * @throws NotConnectedException if the connection is not connected. 071 * @throws InterruptedException if the connection is interrupted. 072 * @return <code>null</code> if synchronization point was successful, or the failure Exception. 073 */ 074 public E sendAndWaitForResponse(TopLevelStreamElement request) throws NoResponseException, 075 NotConnectedException, InterruptedException { 076 assert (state == State.Initial); 077 connectionLock.lock(); 078 try { 079 if (request != null) { 080 if (request instanceof Stanza) { 081 connection.sendStanza((Stanza) request); 082 } 083 else if (request instanceof Nonza) { 084 connection.sendNonza((Nonza) request); 085 } else { 086 throw new IllegalStateException("Unsupported element type"); 087 } 088 state = State.RequestSent; 089 } 090 waitForConditionOrTimeout(); 091 } 092 finally { 093 connectionLock.unlock(); 094 } 095 return checkForResponse(); 096 } 097 098 /** 099 * Send the given plain stream element and wait for a response. 100 * 101 * @param request the plain stream element to send. 102 * @throws E if an failure was reported. 103 * @throws NoResponseException if no response was received. 104 * @throws NotConnectedException if the connection is not connected. 105 * @throws InterruptedException if the connection is interrupted. 106 */ 107 public void sendAndWaitForResponseOrThrow(Nonza request) throws E, NoResponseException, 108 NotConnectedException, InterruptedException { 109 sendAndWaitForResponse(request); 110 switch (state) { 111 case Failure: 112 if (failureException != null) { 113 throw failureException; 114 } 115 break; 116 default: 117 // Success, do nothing 118 } 119 } 120 121 /** 122 * Check if this synchronization point is successful or wait the connections reply timeout. 123 * @throws NoResponseException if there was no response marking the synchronization point as success or failed. 124 * @throws E if there was a failure 125 * @throws InterruptedException if the connection is interrupted. 126 */ 127 public void checkIfSuccessOrWaitOrThrow() throws NoResponseException, E, InterruptedException { 128 checkIfSuccessOrWait(); 129 if (state == State.Failure) { 130 throw failureException; 131 } 132 } 133 134 /** 135 * Check if this synchronization point is successful or wait the connections reply timeout. 136 * @throws NoResponseException if there was no response marking the synchronization point as success or failed. 137 * @throws InterruptedException 138 * @return <code>null</code> if synchronization point was successful, or the failure Exception. 139 */ 140 public E checkIfSuccessOrWait() throws NoResponseException, InterruptedException { 141 connectionLock.lock(); 142 try { 143 switch (state) { 144 // Return immediately on success or failure 145 case Success: 146 return null; 147 case Failure: 148 return failureException; 149 default: 150 // Do nothing 151 break; 152 } 153 waitForConditionOrTimeout(); 154 } finally { 155 connectionLock.unlock(); 156 } 157 return checkForResponse(); 158 } 159 160 /** 161 * Report this synchronization point as successful. 162 */ 163 public void reportSuccess() { 164 connectionLock.lock(); 165 try { 166 state = State.Success; 167 condition.signalAll(); 168 } 169 finally { 170 connectionLock.unlock(); 171 } 172 } 173 174 /** 175 * Deprecated. 176 * @deprecated use {@link #reportFailure(Exception)} instead. 177 */ 178 @Deprecated 179 public void reportFailure() { 180 reportFailure(null); 181 } 182 183 /** 184 * Report this synchronization point as failed because of the given exception. The {@code failureException} must be set. 185 * 186 * @param failureException the exception causing this synchronization point to fail. 187 */ 188 public void reportFailure(E failureException) { 189 assert failureException != null; 190 connectionLock.lock(); 191 try { 192 state = State.Failure; 193 this.failureException = failureException; 194 condition.signalAll(); 195 } 196 finally { 197 connectionLock.unlock(); 198 } 199 } 200 201 /** 202 * Check if this synchronization point was successful. 203 * 204 * @return true if the synchronization point was successful, false otherwise. 205 */ 206 public boolean wasSuccessful() { 207 connectionLock.lock(); 208 try { 209 return state == State.Success; 210 } 211 finally { 212 connectionLock.unlock(); 213 } 214 } 215 216 /** 217 * Check if this synchronization point has its request already sent. 218 * 219 * @return true if the request was already sent, false otherwise. 220 */ 221 public boolean requestSent() { 222 connectionLock.lock(); 223 try { 224 return state == State.RequestSent; 225 } 226 finally { 227 connectionLock.unlock(); 228 } 229 } 230 231 public E getFailureException() { 232 connectionLock.lock(); 233 try { 234 return failureException; 235 } 236 finally { 237 connectionLock.unlock(); 238 } 239 } 240 241 /** 242 * Wait for the condition to become something else as {@link State#RequestSent} or {@link State#Initial}. 243 * {@link #reportSuccess()}, {@link #reportFailure()} and {@link #reportFailure(Exception)} will either set this 244 * synchronization point to {@link State#Success} or {@link State#Failure}. If none of them is set after the 245 * connections reply timeout, this method will set the state of {@link State#NoResponse}. 246 * @throws InterruptedException 247 */ 248 private void waitForConditionOrTimeout() throws InterruptedException { 249 long remainingWait = TimeUnit.MILLISECONDS.toNanos(connection.getReplyTimeout()); 250 while (state == State.RequestSent || state == State.Initial) { 251 if (remainingWait <= 0) { 252 state = State.NoResponse; 253 break; 254 } 255 remainingWait = condition.awaitNanos(remainingWait); 256 } 257 } 258 259 /** 260 * Check for a response and throw a {@link NoResponseException} if there was none. 261 * <p> 262 * The exception is thrown, if state is one of 'Initial', 'NoResponse' or 'RequestSent' 263 * </p> 264 * @return <code>true</code> if synchronization point was successful, <code>false</code> on failure. 265 * @throws NoResponseException 266 */ 267 private E checkForResponse() throws NoResponseException { 268 switch (state) { 269 case Initial: 270 case NoResponse: 271 case RequestSent: 272 throw NoResponseException.newWith(connection, waitFor); 273 case Success: 274 return null; 275 case Failure: 276 return failureException; 277 default: 278 throw new AssertionError("Unknown state " + state); 279 } 280 } 281 282 private enum State { 283 Initial, 284 RequestSent, 285 NoResponse, 286 Success, 287 Failure, 288 } 289}