001/**
002 *
003 * Copyright © 2014-2015 Florian Schmaus
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack;
018
019import java.util.concurrent.TimeUnit;
020import java.util.concurrent.locks.Condition;
021import java.util.concurrent.locks.Lock;
022import java.util.logging.Level;
023import java.util.logging.Logger;
024
025import org.jivesoftware.smack.SmackException.NoResponseException;
026import org.jivesoftware.smack.SmackException.NotConnectedException;
027import org.jivesoftware.smack.packet.TopLevelStreamElement;
028import org.jivesoftware.smack.packet.Stanza;
029import org.jivesoftware.smack.packet.PlainStreamElement;
030
031public class SynchronizationPoint<E extends Exception> {
032
033    private static final Logger LOGGER = Logger.getLogger(SynchronizationPoint.class.getName());
034
035    private final AbstractXMPPConnection connection;
036    private final Lock connectionLock;
037    private final Condition condition;
038
039    // Note that there is no need to make 'state' and 'failureException' volatile. Since 'lock' and 'unlock' have the
040    // same memory synchronization effects as synchronization block enter and leave.
041    private State state;
042    private E failureException;
043
044    /**
045     * Construct a new synchronization point for the given connection.
046     *
047     * @param connection the connection of this synchronization point.
048     */
049    public SynchronizationPoint(AbstractXMPPConnection connection) {
050        this.connection = connection;
051        this.connectionLock = connection.getConnectionLock();
052        this.condition = connection.getConnectionLock().newCondition();
053        init();
054    }
055
056    /**
057     * Initialize (or reset) this synchronization point.
058     */
059    public void init() {
060        connectionLock.lock();
061        state = State.Initial;
062        failureException = null;
063        connectionLock.unlock();
064    }
065
066    /**
067     * Send the given top level stream element and wait for a response.
068     *
069     * @param request the plain stream element to send.
070     * @throws NoResponseException if no response was received.
071     * @throws NotConnectedException if the connection is not connected.
072     */
073    public void sendAndWaitForResponse(TopLevelStreamElement request) throws NoResponseException,
074                    NotConnectedException {
075        assert (state == State.Initial);
076        connectionLock.lock();
077        try {
078            if (request != null) {
079                if (request instanceof Stanza) {
080                    connection.sendStanza((Stanza) request);
081                }
082                else if (request instanceof PlainStreamElement){
083                    connection.send((PlainStreamElement) request);
084                } else {
085                    throw new IllegalStateException("Unsupported element type");
086                }
087                state = State.RequestSent;
088            }
089            waitForConditionOrTimeout();
090        }
091        finally {
092            connectionLock.unlock();
093        }
094        checkForResponse();
095    }
096
097    /**
098     * Send the given plain stream element and wait for a response.
099     *
100     * @param request the plain stream element to send.
101     * @throws E if an failure was reported.
102     * @throws NoResponseException if no response was received.
103     * @throws NotConnectedException if the connection is not connected.
104     */
105    public void sendAndWaitForResponseOrThrow(PlainStreamElement request) throws E, NoResponseException,
106                    NotConnectedException {
107        sendAndWaitForResponse(request);
108        switch (state) {
109        case Failure:
110            if (failureException != null) {
111                throw failureException;
112            }
113            break;
114        default:
115            // Success, do nothing
116        }
117    }
118
119    /**
120     * Check if this synchronization point is successful or wait the connections reply timeout.
121     * @throws NoResponseException if there was no response marking the synchronization point as success or failed.
122     * @throws E if there was a failure
123     */
124    public void checkIfSuccessOrWaitOrThrow() throws NoResponseException, E {
125        checkIfSuccessOrWait();
126        if (state == State.Failure) {
127            throw failureException;
128        }
129    }
130
131    /**
132     * Check if this synchronization point is successful or wait the connections reply timeout.
133     * @throws NoResponseException if there was no response marking the synchronization point as success or failed.
134     */
135    public void checkIfSuccessOrWait() throws NoResponseException {
136        connectionLock.lock();
137        try {
138            if (state == State.Success) {
139                // Return immediately
140                return;
141            }
142            waitForConditionOrTimeout();
143        } finally {
144            connectionLock.unlock();
145        }
146        checkForResponse();
147    }
148
149    /**
150     * Report this synchronization point as successful.
151     */
152    public void reportSuccess() {
153        connectionLock.lock();
154        try {
155            state = State.Success;
156            condition.signalAll();
157        }
158        finally {
159            connectionLock.unlock();
160        }
161    }
162
163    /**
164     * Deprecated
165     * @deprecated use {@link #reportFailure(Exception)} instead.
166     */
167    @Deprecated
168    public void reportFailure() {
169        reportFailure(null);
170    }
171
172    /**
173     * Report this synchronization point as failed because of the given exception. The {@code failureException} must be set.
174     *
175     * @param failureException the exception causing this synchronization point to fail.
176     */
177    public void reportFailure(E failureException) {
178        assert failureException != null;
179        connectionLock.lock();
180        try {
181            state = State.Failure;
182            this.failureException = failureException;
183            condition.signalAll();
184        }
185        finally {
186            connectionLock.unlock();
187        }
188    }
189
190    /**
191     * Check if this synchronization point was successful.
192     *
193     * @return true if the synchronization point was successful, false otherwise.
194     */
195    public boolean wasSuccessful() {
196        connectionLock.lock();
197        try {
198            return state == State.Success;
199        }
200        finally {
201            connectionLock.unlock();
202        }
203    }
204
205    /**
206     * Check if this synchronization point has its request already sent.
207     *
208     * @return true if the request was already sent, false otherwise.
209     */
210    public boolean requestSent() {
211        connectionLock.lock();
212        try {
213            return state == State.RequestSent;
214        }
215        finally {
216            connectionLock.unlock();
217        }
218    }
219
220    /**
221     * Wait for the condition to become something else as {@link State#RequestSent} or {@link State#Initial}.
222     * {@link #reportSuccess()}, {@link #reportFailure()} and {@link #reportFailure(Exception)} will either set this
223     * synchronization point to {@link State#Success} or {@link State#Failure}. If none of them is set after the
224     * connections reply timeout, this method will set the state of {@link State#NoResponse}.
225     */
226    private void waitForConditionOrTimeout() {
227        long remainingWait = TimeUnit.MILLISECONDS.toNanos(connection.getPacketReplyTimeout());
228        while (state == State.RequestSent || state == State.Initial) {
229            try {
230                if (remainingWait <= 0) {
231                    state = State.NoResponse;
232                    break;
233                }
234                remainingWait = condition.awaitNanos(remainingWait);
235            } catch (InterruptedException e) {
236                LOGGER.log(Level.WARNING, "Thread interrupt while waiting for condition or timeout ignored", e);
237            }
238        }
239    }
240
241    /**
242     * Check for a response and throw a {@link NoResponseException} if there was none.
243     * <p>
244     * The exception is thrown, if state is one of 'Initial', 'NoResponse' or 'RequestSent'
245     * </p>
246     * @throws NoResponseException
247     */
248    private void checkForResponse() throws NoResponseException {
249        switch (state) {
250        case Initial:
251        case NoResponse:
252        case RequestSent:
253            throw NoResponseException.newWith(connection);
254        default:
255            // Do nothing
256            break;
257        }
258    }
259
260    private enum State {
261        Initial,
262        RequestSent,
263        NoResponse,
264        Success,
265        Failure,
266    }
267}