001/**
002 *
003 * Copyright 2003-2007 Jive Software, 2016-2019 Florian Schmaus.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.jivesoftware.smack;
019
020import java.util.ArrayDeque;
021import java.util.ArrayList;
022import java.util.List;
023
024import org.jivesoftware.smack.SmackException.NoResponseException;
025import org.jivesoftware.smack.SmackException.NotConnectedException;
026import org.jivesoftware.smack.XMPPException.XMPPErrorException;
027import org.jivesoftware.smack.filter.StanzaFilter;
028import org.jivesoftware.smack.packet.Stanza;
029
030/**
031 * Provides a mechanism to collect Stanzas into a result queue that pass a
032 * specified filter/matcher. The collector lets you perform blocking and polling
033 * operations on the result queue. So, a StanzaCollector is more suitable to
034 * use than a {@link StanzaListener} when you need to wait for a specific
035 * result.<p>
036 *
037 * Each stanza collector will queue up a configured number of packets for processing before
038 * older packets are automatically dropped.  The default number is retrieved by
039 * {@link SmackConfiguration#getStanzaCollectorSize()}.
040 *
041 * @see XMPPConnection#createStanzaCollector(StanzaFilter)
042 * @author Matt Tucker
043 */
044public final class StanzaCollector implements AutoCloseable {
045
046    private final StanzaFilter packetFilter;
047
048    private final ArrayDeque<Stanza> resultQueue;
049
050    private final int maxQueueSize;
051
052    /**
053     * The stanza collector which timeout for the next result will get reset once this collector collects a stanza.
054     */
055    private final StanzaCollector collectorToReset;
056
057    private final XMPPConnection connection;
058
059    private final Stanza request;
060
061    private volatile boolean cancelled;
062
063    private Exception connectionException;
064
065    /**
066     * Creates a new stanza collector. If the stanza filter is <code>null</code>, then
067     * all packets will match this collector.
068     *
069     * @param connection the connection the collector is tied to.
070     * @param configuration the configuration used to construct this collector
071     */
072    StanzaCollector(XMPPConnection connection, Configuration configuration) {
073        this.connection = connection;
074        this.packetFilter = configuration.packetFilter;
075        this.resultQueue = new ArrayDeque<>(configuration.size);
076        this.maxQueueSize = configuration.size;
077        this.collectorToReset = configuration.collectorToReset;
078        this.request = configuration.request;
079    }
080
081    /**
082     * Explicitly cancels the stanza collector so that no more results are
083     * queued up. Once a stanza collector has been cancelled, it cannot be
084     * re-enabled. Instead, a new stanza collector must be created.
085     */
086    public synchronized void cancel() {
087        // If the packet collector has already been cancelled, do nothing.
088        if (cancelled) {
089            return;
090        }
091
092        cancelled = true;
093        connection.removeStanzaCollector(this);
094        notifyAll();
095
096        if (collectorToReset != null) {
097            collectorToReset.cancel();
098        }
099    }
100
101    /**
102     * Returns the stanza filter associated with this stanza collector. The stanza
103     * filter is used to determine what stanzas are queued as results.
104     *
105     * @return the stanza filter.
106     */
107    public StanzaFilter getStanzaFilter() {
108        return packetFilter;
109    }
110
111    /**
112     * Polls to see if a stanza is currently available and returns it, or
113     * immediately returns <code>null</code> if no packets are currently in the
114     * result queue.
115     *
116     * @param <P> type of the result stanza.
117     * @return the next stanza result, or <code>null</code> if there are no more
118     *      results.
119     */
120    @SuppressWarnings("unchecked")
121    public synchronized <P extends Stanza> P pollResult() {
122        return (P) resultQueue.poll();
123    }
124
125    /**
126     * Polls to see if a stanza is currently available and returns it, or
127     * immediately returns <code>null</code> if no packets are currently in the
128     * result queue.
129     * <p>
130     * Throws an XMPPErrorException in case the polled stanzas did contain an XMPPError.
131     * </p>
132     *
133     * @param <P> type of the result stanza.
134     * @return the next available packet.
135     * @throws XMPPErrorException in case an error response.
136     */
137    public <P extends Stanza> P pollResultOrThrow() throws XMPPErrorException {
138        P result = pollResult();
139        if (result != null) {
140            XMPPErrorException.ifHasErrorThenThrow(result);
141        }
142        return result;
143    }
144
145    /**
146     * Returns the next available packet. The method call will block (not return) until a stanza is
147     * available.
148     *
149     * @param <P> type of the result stanza.
150     * @return the next available packet.
151     * @throws InterruptedException if the calling thread was interrupted.
152     */
153    @SuppressWarnings("unchecked")
154    // TODO: Consider removing this method as it is hardly ever useful.
155    public synchronized <P extends Stanza> P nextResultBlockForever() throws InterruptedException {
156        throwIfCancelled();
157
158        while (true) {
159            P res = (P) resultQueue.poll();
160            if (res != null) {
161                return res;
162            }
163            if (cancelled) {
164                return null;
165            }
166            wait();
167        }
168    }
169
170    /**
171     * Returns the next available packet. The method call will block until the connection's default
172     * timeout has elapsed.
173     *
174     * @param <P> type of the result stanza.
175     * @return the next available packet.
176     * @throws InterruptedException if the calling thread was interrupted.
177     */
178    public <P extends Stanza> P nextResult() throws InterruptedException {
179        return nextResult(connection.getReplyTimeout());
180    }
181
182    private volatile long waitStart;
183
184    /**
185     * Returns the next available stanza. The method call will block (not return) until a stanza is available or the
186     * <code>timeout</code> has elapsed or if the connection was terminated because of an error. If the timeout elapses without a
187     * result or if there was an connection error, <code>null</code> will be returned.
188     *
189     * @param <P> type of the result stanza.
190     * @param timeout the timeout in milliseconds.
191     * @return the next available stanza or <code>null</code> on timeout or connection error.
192     * @throws InterruptedException if the calling thread was interrupted.
193     */
194    @SuppressWarnings("unchecked")
195    public <P extends Stanza> P nextResult(long timeout) throws InterruptedException {
196        throwIfCancelled();
197        P res = null;
198        long remainingWait = timeout;
199        waitStart = System.currentTimeMillis();
200        while (remainingWait > 0 && connectionException == null && !cancelled) {
201            synchronized (this) {
202                res = (P) resultQueue.poll();
203                if (res != null) {
204                    return res;
205                }
206                wait(remainingWait);
207            }
208            remainingWait = timeout - (System.currentTimeMillis() - waitStart);
209        }
210        return res;
211    }
212
213    /**
214     * Returns the next available stanza. The method in equivalent to
215     * {@link #nextResultOrThrow(long)} where the timeout argument is the default reply timeout of
216     * the connection associated with this collector.
217     *
218     * @param <P> type of the result stanza.
219     * @return the next available stanza.
220     * @throws XMPPErrorException in case an error response was received.
221     * @throws NoResponseException if there was no response from the server.
222     * @throws InterruptedException if the calling thread was interrupted.
223     * @throws NotConnectedException if the XMPP connection is not connected.
224     * @see #nextResultOrThrow(long)
225     */
226    public <P extends Stanza> P nextResultOrThrow() throws NoResponseException, XMPPErrorException,
227                    InterruptedException, NotConnectedException {
228        return nextResultOrThrow(connection.getReplyTimeout());
229    }
230
231    /**
232     * Returns the next available stanza. The method call will block until a stanza is
233     * available or the <code>timeout</code> has elapsed. This method does also cancel the
234     * collector in every case.
235     * <p>
236     * Three things can happen when waiting for an response:
237     * </p>
238     * <ol>
239     * <li>A result response arrives.</li>
240     * <li>An error response arrives.</li>
241     * <li>An timeout occurs.</li>
242     * <li>The thread is interrupted</li>
243     * </ol>
244     * <p>
245     * in which this method will
246     * </p>
247     * <ol>
248     * <li>return with the result.</li>
249     * <li>throw an {@link XMPPErrorException}.</li>
250     * <li>throw an {@link NoResponseException}.</li>
251     * <li>throw an {@link InterruptedException}.</li>
252     * </ol>
253     * <p>
254     * Additionally the method will throw a {@link NotConnectedException} if no response was
255     * received and the connection got disconnected.
256     * </p>
257     *
258     * @param timeout the amount of time to wait for the next stanza in milliseconds.
259     * @param <P> type of the result stanza.
260     * @return the next available stanza.
261     * @throws NoResponseException if there was no response from the server.
262     * @throws XMPPErrorException in case an error response was received.
263     * @throws InterruptedException if the calling thread was interrupted.
264     * @throws NotConnectedException if there was no response and the connection got disconnected.
265     */
266    public <P extends Stanza> P nextResultOrThrow(long timeout) throws NoResponseException,
267                    XMPPErrorException, InterruptedException, NotConnectedException {
268        P result;
269        try {
270            result = nextResult(timeout);
271        } finally {
272            cancel();
273        }
274        if (result == null) {
275            if (connectionException != null) {
276                throw new NotConnectedException(connection, packetFilter, connectionException);
277            }
278            if (!connection.isConnected()) {
279                throw new NotConnectedException(connection, packetFilter);
280            }
281            throw NoResponseException.newWith(timeout, this, cancelled);
282        }
283
284        XMPPErrorException.ifHasErrorThenThrow(result);
285
286        return result;
287    }
288
289    private List<Stanza> collectedCache;
290
291    /**
292     * Return a list of all collected stanzas. This method must be invoked after the collector has been cancelled.
293     *
294     * @return a list of collected stanzas.
295     * @since 4.3.0
296     */
297    public List<Stanza> getCollectedStanzasAfterCancelled() {
298        if (!cancelled) {
299            throw new IllegalStateException("Stanza collector was not yet cancelled");
300        }
301
302        if (collectedCache == null) {
303            collectedCache = new ArrayList<>(getCollectedCount());
304            collectedCache.addAll(resultQueue);
305        }
306
307        return collectedCache;
308    }
309
310    /**
311     * Get the number of collected stanzas this stanza collector has collected so far.
312     *
313     * @return the count of collected stanzas.
314     * @since 4.1
315     */
316    public synchronized int getCollectedCount() {
317        return resultQueue.size();
318    }
319
320    private String stringCache;
321
322    @Override
323    public String toString() {
324        if (stringCache == null) {
325            StringBuilder sb = new StringBuilder(128);
326            sb.append("Stanza Collector filter='").append(packetFilter).append('\'');
327            if (request != null) {
328                sb.append(" request='").append(request).append('\'');
329            }
330            stringCache = sb.toString();
331        }
332        return stringCache;
333    }
334
335    synchronized void notifyConnectionError(Exception exception) {
336        connectionException = exception;
337        notifyAll();
338    }
339
340    /**
341     * Processes a stanza to see if it meets the criteria for this stanza collector.
342     * If so, the stanza is added to the result queue.
343     *
344     * @param packet the stanza to process.
345     */
346    protected void processStanza(Stanza packet) {
347        if (packetFilter == null || packetFilter.accept(packet)) {
348            synchronized (this) {
349                if (resultQueue.size() == maxQueueSize) {
350                    Stanza rolledOverStanza = resultQueue.poll();
351                    assert rolledOverStanza != null;
352                }
353                resultQueue.add(packet);
354                notifyAll();
355            }
356            if (collectorToReset != null) {
357                collectorToReset.waitStart = System.currentTimeMillis();
358            }
359        }
360    }
361
362    private void throwIfCancelled() {
363        if (cancelled) {
364            throw new IllegalStateException("Stanza collector already cancelled");
365        }
366    }
367
368    /**
369     * Get a new stanza collector configuration instance.
370     *
371     * @return a new stanza collector configuration.
372     */
373    public static Configuration newConfiguration() {
374        return new Configuration();
375    }
376
377    public static final class Configuration {
378        private StanzaFilter packetFilter;
379        private int size = SmackConfiguration.getStanzaCollectorSize();
380        private StanzaCollector collectorToReset;
381        private Stanza request;
382
383        private Configuration() {
384        }
385
386        /**
387         * Set the stanza filter used by this collector. If <code>null</code>, then all stanzas will
388         * get collected by this collector.
389         *
390         * @param stanzaFilter TODO javadoc me please
391         * @return a reference to this configuration.
392         */
393        public Configuration setStanzaFilter(StanzaFilter stanzaFilter) {
394            this.packetFilter = stanzaFilter;
395            return this;
396        }
397
398        /**
399         * Set the maximum size of this collector, i.e. how many stanzas this collector will collect
400         * before dropping old ones.
401         *
402         * @param size TODO javadoc me please
403         * @return a reference to this configuration.
404         */
405        public Configuration setSize(int size) {
406            this.size = size;
407            return this;
408        }
409
410        /**
411         * Set the collector which timeout for the next result is reset once this collector collects
412         * a packet.
413         *
414         * @param collector TODO javadoc me please
415         * @return a reference to this configuration.
416         */
417        public Configuration setCollectorToReset(StanzaCollector collector) {
418            this.collectorToReset = collector;
419            return this;
420        }
421
422        public Configuration setRequest(Stanza request) {
423            this.request = request;
424            return this;
425        }
426    }
427
428    @Override
429    public void close() {
430        cancel();
431    }
432
433}