001/**
002 *
003 * Copyright © 2014-2015 Florian Schmaus
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack;
018
019import java.util.concurrent.TimeUnit;
020import java.util.concurrent.locks.Condition;
021import java.util.concurrent.locks.Lock;
022
023import org.jivesoftware.smack.SmackException.NoResponseException;
024import org.jivesoftware.smack.SmackException.NotConnectedException;
025import org.jivesoftware.smack.packet.Nonza;
026import org.jivesoftware.smack.packet.Stanza;
027import org.jivesoftware.smack.packet.TopLevelStreamElement;
028
029public class SynchronizationPoint<E extends Exception> {
030
031    private final AbstractXMPPConnection connection;
032    private final Lock connectionLock;
033    private final Condition condition;
034    private final String waitFor;
035
036    // Note that there is no need to make 'state' and 'failureException' volatile. Since 'lock' and 'unlock' have the
037    // same memory synchronization effects as synchronization block enter and leave.
038    private State state;
039    private E failureException;
040
041    /**
042     * Construct a new synchronization point for the given connection.
043     *
044     * @param connection the connection of this synchronization point.
045     * @param waitFor a description of the event this synchronization point handles.
046     */
047    public SynchronizationPoint(AbstractXMPPConnection connection, String waitFor) {
048        this.connection = connection;
049        this.connectionLock = connection.getConnectionLock();
050        this.condition = connection.getConnectionLock().newCondition();
051        this.waitFor = waitFor;
052        init();
053    }
054
055    /**
056     * Initialize (or reset) this synchronization point.
057     */
058    public void init() {
059        connectionLock.lock();
060        state = State.Initial;
061        failureException = null;
062        connectionLock.unlock();
063    }
064
065    /**
066     * Send the given top level stream element and wait for a response.
067     *
068     * @param request the plain stream element to send.
069     * @throws NoResponseException if no response was received.
070     * @throws NotConnectedException if the connection is not connected.
071     * @throws InterruptedException if the connection is interrupted.
072     * @return <code>null</code> if synchronization point was successful, or the failure Exception.
073     */
074    public E sendAndWaitForResponse(TopLevelStreamElement request) throws NoResponseException,
075                    NotConnectedException, InterruptedException {
076        assert (state == State.Initial);
077        connectionLock.lock();
078        try {
079            if (request != null) {
080                if (request instanceof Stanza) {
081                    connection.sendStanza((Stanza) request);
082                }
083                else if (request instanceof Nonza) {
084                    connection.sendNonza((Nonza) request);
085                } else {
086                    throw new IllegalStateException("Unsupported element type");
087                }
088                state = State.RequestSent;
089            }
090            waitForConditionOrTimeout();
091        }
092        finally {
093            connectionLock.unlock();
094        }
095        return checkForResponse();
096    }
097
098    /**
099     * Send the given plain stream element and wait for a response.
100     *
101     * @param request the plain stream element to send.
102     * @throws E if an failure was reported.
103     * @throws NoResponseException if no response was received.
104     * @throws NotConnectedException if the connection is not connected.
105     * @throws InterruptedException if the connection is interrupted.
106     */
107    public void sendAndWaitForResponseOrThrow(Nonza request) throws E, NoResponseException,
108                    NotConnectedException, InterruptedException {
109        sendAndWaitForResponse(request);
110        switch (state) {
111        case Failure:
112            if (failureException != null) {
113                throw failureException;
114            }
115            break;
116        default:
117            // Success, do nothing
118        }
119    }
120
121    /**
122     * Check if this synchronization point is successful or wait the connections reply timeout.
123     * @throws NoResponseException if there was no response marking the synchronization point as success or failed.
124     * @throws E if there was a failure
125     * @throws InterruptedException if the connection is interrupted.
126     */
127    public void checkIfSuccessOrWaitOrThrow() throws NoResponseException, E, InterruptedException {
128        checkIfSuccessOrWait();
129        if (state == State.Failure) {
130            throw failureException;
131        }
132    }
133
134    /**
135     * Check if this synchronization point is successful or wait the connections reply timeout.
136     * @throws NoResponseException if there was no response marking the synchronization point as success or failed.
137     * @throws InterruptedException
138     * @return <code>null</code> if synchronization point was successful, or the failure Exception.
139     */
140    public E checkIfSuccessOrWait() throws NoResponseException, InterruptedException {
141        connectionLock.lock();
142        try {
143            switch (state) {
144            // Return immediately on success or failure
145            case Success:
146                return null;
147            case Failure:
148                return failureException;
149            default:
150                // Do nothing
151                break;
152            }
153            waitForConditionOrTimeout();
154        } finally {
155            connectionLock.unlock();
156        }
157        return checkForResponse();
158    }
159
160    /**
161     * Report this synchronization point as successful.
162     */
163    public void reportSuccess() {
164        connectionLock.lock();
165        try {
166            state = State.Success;
167            condition.signalAll();
168        }
169        finally {
170            connectionLock.unlock();
171        }
172    }
173
174    /**
175     * Deprecated.
176     * @deprecated use {@link #reportFailure(Exception)} instead.
177     */
178    @Deprecated
179    public void reportFailure() {
180        reportFailure(null);
181    }
182
183    /**
184     * Report this synchronization point as failed because of the given exception. The {@code failureException} must be set.
185     *
186     * @param failureException the exception causing this synchronization point to fail.
187     */
188    public void reportFailure(E failureException) {
189        assert failureException != null;
190        connectionLock.lock();
191        try {
192            state = State.Failure;
193            this.failureException = failureException;
194            condition.signalAll();
195        }
196        finally {
197            connectionLock.unlock();
198        }
199    }
200
201    /**
202     * Check if this synchronization point was successful.
203     *
204     * @return true if the synchronization point was successful, false otherwise.
205     */
206    public boolean wasSuccessful() {
207        connectionLock.lock();
208        try {
209            return state == State.Success;
210        }
211        finally {
212            connectionLock.unlock();
213        }
214    }
215
216    /**
217     * Check if this synchronization point has its request already sent.
218     *
219     * @return true if the request was already sent, false otherwise.
220     */
221    public boolean requestSent() {
222        connectionLock.lock();
223        try {
224            return state == State.RequestSent;
225        }
226        finally {
227            connectionLock.unlock();
228        }
229    }
230
231    public E getFailureException() {
232        connectionLock.lock();
233        try {
234            return failureException;
235        }
236        finally {
237            connectionLock.unlock();
238        }
239    }
240
241    /**
242     * Wait for the condition to become something else as {@link State#RequestSent} or {@link State#Initial}.
243     * {@link #reportSuccess()}, {@link #reportFailure()} and {@link #reportFailure(Exception)} will either set this
244     * synchronization point to {@link State#Success} or {@link State#Failure}. If none of them is set after the
245     * connections reply timeout, this method will set the state of {@link State#NoResponse}.
246     * @throws InterruptedException 
247     */
248    private void waitForConditionOrTimeout() throws InterruptedException {
249        long remainingWait = TimeUnit.MILLISECONDS.toNanos(connection.getReplyTimeout());
250        while (state == State.RequestSent || state == State.Initial) {
251            if (remainingWait <= 0) {
252                state = State.NoResponse;
253                break;
254            }
255            remainingWait = condition.awaitNanos(remainingWait);
256        }
257    }
258
259    /**
260     * Check for a response and throw a {@link NoResponseException} if there was none.
261     * <p>
262     * The exception is thrown, if state is one of 'Initial', 'NoResponse' or 'RequestSent'
263     * </p>
264     * @return <code>true</code> if synchronization point was successful, <code>false</code> on failure.
265     * @throws NoResponseException
266     */
267    private E checkForResponse() throws NoResponseException {
268        switch (state) {
269        case Initial:
270        case NoResponse:
271        case RequestSent:
272            throw NoResponseException.newWith(connection, waitFor);
273        case Success:
274            return null;
275        case Failure:
276            return failureException;
277        default:
278            throw new AssertionError("Unknown state " + state);
279        }
280    }
281
282    private enum State {
283        Initial,
284        RequestSent,
285        NoResponse,
286        Success,
287        Failure,
288    }
289}