001/** 002 * 003 * Copyright © 2014 Florian Schmaus 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack; 018 019import java.util.concurrent.TimeUnit; 020import java.util.concurrent.locks.Condition; 021import java.util.concurrent.locks.Lock; 022import java.util.logging.Level; 023import java.util.logging.Logger; 024 025import org.jivesoftware.smack.SmackException.NoResponseException; 026import org.jivesoftware.smack.SmackException.NotConnectedException; 027import org.jivesoftware.smack.packet.TopLevelStreamElement; 028import org.jivesoftware.smack.packet.Stanza; 029import org.jivesoftware.smack.packet.PlainStreamElement; 030 031public class SynchronizationPoint<E extends Exception> { 032 033 private static final Logger LOGGER = Logger.getLogger(SynchronizationPoint.class.getName()); 034 035 private final AbstractXMPPConnection connection; 036 private final Lock connectionLock; 037 private final Condition condition; 038 039 // Note that there is no need to make 'state' and 'failureException' volatile. Since 'lock' and 'unlock' have the 040 // same memory synchronization effects as synchronization block enter and leave. 041 private State state; 042 private E failureException; 043 044 public SynchronizationPoint(AbstractXMPPConnection connection) { 045 this.connection = connection; 046 this.connectionLock = connection.getConnectionLock(); 047 this.condition = connection.getConnectionLock().newCondition(); 048 init(); 049 } 050 051 public void init() { 052 connectionLock.lock(); 053 state = State.Initial; 054 failureException = null; 055 connectionLock.unlock(); 056 } 057 058 public void sendAndWaitForResponse(TopLevelStreamElement request) throws NoResponseException, 059 NotConnectedException { 060 assert (state == State.Initial); 061 connectionLock.lock(); 062 try { 063 if (request != null) { 064 if (request instanceof Stanza) { 065 connection.sendStanza((Stanza) request); 066 } 067 else if (request instanceof PlainStreamElement){ 068 connection.send((PlainStreamElement) request); 069 } else { 070 throw new IllegalStateException("Unsupported element type"); 071 } 072 state = State.RequestSent; 073 } 074 waitForConditionOrTimeout(); 075 } 076 finally { 077 connectionLock.unlock(); 078 } 079 checkForResponse(); 080 } 081 082 public void sendAndWaitForResponseOrThrow(PlainStreamElement request) throws E, NoResponseException, 083 NotConnectedException { 084 sendAndWaitForResponse(request); 085 switch (state) { 086 case Failure: 087 if (failureException != null) { 088 throw failureException; 089 } 090 break; 091 default: 092 // Success, do nothing 093 } 094 } 095 096 public void checkIfSuccessOrWaitOrThrow() throws NoResponseException, E { 097 checkIfSuccessOrWait(); 098 if (state == State.Failure) { 099 throw failureException; 100 } 101 } 102 103 public void checkIfSuccessOrWait() throws NoResponseException { 104 connectionLock.lock(); 105 try { 106 if (state == State.Success) { 107 // Return immediately 108 return; 109 } 110 waitForConditionOrTimeout(); 111 } finally { 112 connectionLock.unlock(); 113 } 114 checkForResponse(); 115 } 116 117 public void reportSuccess() { 118 connectionLock.lock(); 119 try { 120 state = State.Success; 121 condition.signal(); 122 } 123 finally { 124 connectionLock.unlock(); 125 } 126 } 127 128 public void reportFailure() { 129 reportFailure(null); 130 } 131 132 public void reportFailure(E failureException) { 133 connectionLock.lock(); 134 try { 135 state = State.Failure; 136 this.failureException = failureException; 137 condition.signal(); 138 } 139 finally { 140 connectionLock.unlock(); 141 } 142 } 143 144 public boolean wasSuccessful() { 145 connectionLock.lock(); 146 try { 147 return state == State.Success; 148 } 149 finally { 150 connectionLock.unlock(); 151 } 152 } 153 154 public boolean requestSent() { 155 connectionLock.lock(); 156 try { 157 return state == State.RequestSent; 158 } 159 finally { 160 connectionLock.unlock(); 161 } 162 } 163 164 private void waitForConditionOrTimeout() { 165 long remainingWait = TimeUnit.MILLISECONDS.toNanos(connection.getPacketReplyTimeout()); 166 while (state == State.RequestSent || state == State.Initial) { 167 try { 168 remainingWait = condition.awaitNanos( 169 remainingWait); 170 if (remainingWait <= 0) { 171 state = State.NoResponse; 172 break; 173 } 174 } catch (InterruptedException e) { 175 LOGGER.log(Level.WARNING, "Thread interrupt while waiting for condition or timeout ignored", e); 176 } 177 } 178 } 179 180 /** 181 * Check for a response and throw a {@link NoResponseException} if there was none. 182 * <p> 183 * The exception is thrown, if state is one of 'Initial', 'NoResponse' or 'RequestSent' 184 * </p> 185 * @throws NoResponseException 186 */ 187 private void checkForResponse() throws NoResponseException { 188 switch (state) { 189 case Initial: 190 case NoResponse: 191 case RequestSent: 192 throw NoResponseException.newWith(connection); 193 default: 194 // Do nothing 195 break; 196 } 197 } 198 199 private enum State { 200 Initial, 201 RequestSent, 202 NoResponse, 203 Success, 204 Failure, 205 } 206}