001/** 002 * 003 * Copyright © 2014-2015 Florian Schmaus 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack; 018 019import java.util.concurrent.TimeUnit; 020import java.util.concurrent.locks.Condition; 021import java.util.concurrent.locks.Lock; 022import java.util.logging.Level; 023import java.util.logging.Logger; 024 025import org.jivesoftware.smack.SmackException.NoResponseException; 026import org.jivesoftware.smack.SmackException.NotConnectedException; 027import org.jivesoftware.smack.packet.TopLevelStreamElement; 028import org.jivesoftware.smack.packet.Stanza; 029import org.jivesoftware.smack.packet.PlainStreamElement; 030 031public class SynchronizationPoint<E extends Exception> { 032 033 private static final Logger LOGGER = Logger.getLogger(SynchronizationPoint.class.getName()); 034 035 private final AbstractXMPPConnection connection; 036 private final Lock connectionLock; 037 private final Condition condition; 038 039 // Note that there is no need to make 'state' and 'failureException' volatile. Since 'lock' and 'unlock' have the 040 // same memory synchronization effects as synchronization block enter and leave. 041 private State state; 042 private E failureException; 043 044 /** 045 * Construct a new synchronization point for the given connection. 046 * 047 * @param connection the connection of this synchronization point. 048 */ 049 public SynchronizationPoint(AbstractXMPPConnection connection) { 050 this.connection = connection; 051 this.connectionLock = connection.getConnectionLock(); 052 this.condition = connection.getConnectionLock().newCondition(); 053 init(); 054 } 055 056 /** 057 * Initialize (or reset) this synchronization point. 058 */ 059 public void init() { 060 connectionLock.lock(); 061 state = State.Initial; 062 failureException = null; 063 connectionLock.unlock(); 064 } 065 066 /** 067 * Send the given top level stream element and wait for a response. 068 * 069 * @param request the plain stream element to send. 070 * @throws NoResponseException if no response was received. 071 * @throws NotConnectedException if the connection is not connected. 072 */ 073 public void sendAndWaitForResponse(TopLevelStreamElement request) throws NoResponseException, 074 NotConnectedException { 075 assert (state == State.Initial); 076 connectionLock.lock(); 077 try { 078 if (request != null) { 079 if (request instanceof Stanza) { 080 connection.sendStanza((Stanza) request); 081 } 082 else if (request instanceof PlainStreamElement){ 083 connection.send((PlainStreamElement) request); 084 } else { 085 throw new IllegalStateException("Unsupported element type"); 086 } 087 state = State.RequestSent; 088 } 089 waitForConditionOrTimeout(); 090 } 091 finally { 092 connectionLock.unlock(); 093 } 094 checkForResponse(); 095 } 096 097 /** 098 * Send the given plain stream element and wait for a response. 099 * 100 * @param request the plain stream element to send. 101 * @throws E if an failure was reported. 102 * @throws NoResponseException if no response was received. 103 * @throws NotConnectedException if the connection is not connected. 104 */ 105 public void sendAndWaitForResponseOrThrow(PlainStreamElement request) throws E, NoResponseException, 106 NotConnectedException { 107 sendAndWaitForResponse(request); 108 switch (state) { 109 case Failure: 110 if (failureException != null) { 111 throw failureException; 112 } 113 break; 114 default: 115 // Success, do nothing 116 } 117 } 118 119 /** 120 * Check if this synchronization point is successful or wait the connections reply timeout. 121 * @throws NoResponseException if there was no response marking the synchronization point as success or failed. 122 * @throws E if there was a failure 123 */ 124 public void checkIfSuccessOrWaitOrThrow() throws NoResponseException, E { 125 checkIfSuccessOrWait(); 126 if (state == State.Failure) { 127 throw failureException; 128 } 129 } 130 131 /** 132 * Check if this synchronization point is successful or wait the connections reply timeout. 133 * @throws NoResponseException if there was no response marking the synchronization point as success or failed. 134 */ 135 public void checkIfSuccessOrWait() throws NoResponseException { 136 connectionLock.lock(); 137 try { 138 if (state == State.Success) { 139 // Return immediately 140 return; 141 } 142 waitForConditionOrTimeout(); 143 } finally { 144 connectionLock.unlock(); 145 } 146 checkForResponse(); 147 } 148 149 /** 150 * Report this synchronization point as successful. 151 */ 152 public void reportSuccess() { 153 connectionLock.lock(); 154 try { 155 state = State.Success; 156 condition.signalAll(); 157 } 158 finally { 159 connectionLock.unlock(); 160 } 161 } 162 163 /** 164 * Deprecated 165 * @deprecated use {@link #reportFailure(Exception)} instead. 166 */ 167 @Deprecated 168 public void reportFailure() { 169 reportFailure(null); 170 } 171 172 /** 173 * Report this synchronization point as failed because of the given exception. The {@code failureException} must be set. 174 * 175 * @param failureException the exception causing this synchronization point to fail. 176 */ 177 public void reportFailure(E failureException) { 178 assert failureException != null; 179 connectionLock.lock(); 180 try { 181 state = State.Failure; 182 this.failureException = failureException; 183 condition.signalAll(); 184 } 185 finally { 186 connectionLock.unlock(); 187 } 188 } 189 190 /** 191 * Check if this synchronization point was successful. 192 * 193 * @return true if the synchronization point was successful, false otherwise. 194 */ 195 public boolean wasSuccessful() { 196 connectionLock.lock(); 197 try { 198 return state == State.Success; 199 } 200 finally { 201 connectionLock.unlock(); 202 } 203 } 204 205 /** 206 * Check if this synchronization point has its request already sent. 207 * 208 * @return true if the request was already sent, false otherwise. 209 */ 210 public boolean requestSent() { 211 connectionLock.lock(); 212 try { 213 return state == State.RequestSent; 214 } 215 finally { 216 connectionLock.unlock(); 217 } 218 } 219 220 /** 221 * Wait for the condition to become something else as {@link State#RequestSent} or {@link State#Initial}. 222 * {@link #reportSuccess()}, {@link #reportFailure()} and {@link #reportFailure(Exception)} will either set this 223 * synchronization point to {@link State#Success} or {@link State#Failure}. If none of them is set after the 224 * connections reply timeout, this method will set the state of {@link State#NoResponse}. 225 */ 226 private void waitForConditionOrTimeout() { 227 long remainingWait = TimeUnit.MILLISECONDS.toNanos(connection.getPacketReplyTimeout()); 228 while (state == State.RequestSent || state == State.Initial) { 229 try { 230 if (remainingWait <= 0) { 231 state = State.NoResponse; 232 break; 233 } 234 remainingWait = condition.awaitNanos(remainingWait); 235 } catch (InterruptedException e) { 236 LOGGER.log(Level.WARNING, "Thread interrupt while waiting for condition or timeout ignored", e); 237 } 238 } 239 } 240 241 /** 242 * Check for a response and throw a {@link NoResponseException} if there was none. 243 * <p> 244 * The exception is thrown, if state is one of 'Initial', 'NoResponse' or 'RequestSent' 245 * </p> 246 * @throws NoResponseException 247 */ 248 private void checkForResponse() throws NoResponseException { 249 switch (state) { 250 case Initial: 251 case NoResponse: 252 case RequestSent: 253 throw NoResponseException.newWith(connection); 254 default: 255 // Do nothing 256 break; 257 } 258 } 259 260 private enum State { 261 Initial, 262 RequestSent, 263 NoResponse, 264 Success, 265 Failure, 266 } 267}