001/**
002 *
003 * Copyright 2003-2007 Jive Software, 2016-2018 Florian Schmaus.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.jivesoftware.smack;
019
020import java.util.ArrayList;
021import java.util.List;
022import java.util.concurrent.ArrayBlockingQueue;
023import java.util.concurrent.TimeUnit;
024
025import org.jivesoftware.smack.SmackException.NoResponseException;
026import org.jivesoftware.smack.SmackException.NotConnectedException;
027import org.jivesoftware.smack.XMPPException.XMPPErrorException;
028import org.jivesoftware.smack.filter.StanzaFilter;
029import org.jivesoftware.smack.packet.Stanza;
030
031/**
032 * Provides a mechanism to collect Stanzas into a result queue that pass a
033 * specified filter/matcher. The collector lets you perform blocking and polling
034 * operations on the result queue. So, a StanzaCollector is more suitable to
035 * use than a {@link StanzaListener} when you need to wait for a specific
036 * result.<p>
037 *
038 * Each stanza collector will queue up a configured number of packets for processing before
039 * older packets are automatically dropped.  The default number is retrieved by
040 * {@link SmackConfiguration#getStanzaCollectorSize()}.
041 *
042 * @see XMPPConnection#createStanzaCollector(StanzaFilter)
043 * @author Matt Tucker
044 */
045public class StanzaCollector {
046
047    private final StanzaFilter packetFilter;
048
049    private final ArrayBlockingQueue<Stanza> resultQueue;
050
051    /**
052     * The stanza collector which timeout for the next result will get reset once this collector collects a stanza.
053     */
054    private final StanzaCollector collectorToReset;
055
056    private final XMPPConnection connection;
057
058    private final Stanza request;
059
060    private volatile boolean cancelled = false;
061
062    /**
063     * Creates a new stanza collector. If the stanza filter is <tt>null</tt>, then
064     * all packets will match this collector.
065     *
066     * @param connection the connection the collector is tied to.
067     * @param configuration the configuration used to construct this collector
068     */
069    protected StanzaCollector(XMPPConnection connection, Configuration configuration) {
070        this.connection = connection;
071        this.packetFilter = configuration.packetFilter;
072        this.resultQueue = new ArrayBlockingQueue<>(configuration.size);
073        this.collectorToReset = configuration.collectorToReset;
074        this.request = configuration.request;
075    }
076
077    /**
078     * Explicitly cancels the stanza collector so that no more results are
079     * queued up. Once a stanza collector has been cancelled, it cannot be
080     * re-enabled. Instead, a new stanza collector must be created.
081     */
082    public void cancel() {
083        // If the packet collector has already been cancelled, do nothing.
084        if (!cancelled) {
085            cancelled = true;
086            connection.removeStanzaCollector(this);
087        }
088    }
089
090    /**
091     * Returns the stanza filter associated with this stanza collector. The packet
092     * filter is used to determine what packets are queued as results.
093     *
094     * @return the stanza filter.
095     * @deprecated use {@link #getStanzaFilter()} instead.
096     */
097    @Deprecated
098    public StanzaFilter getPacketFilter() {
099        return getStanzaFilter();
100    }
101
102    /**
103     * Returns the stanza filter associated with this stanza collector. The stanza
104     * filter is used to determine what stanzas are queued as results.
105     *
106     * @return the stanza filter.
107     */
108    public StanzaFilter getStanzaFilter() {
109        return packetFilter;
110    }
111
112    /**
113     * Polls to see if a stanza is currently available and returns it, or
114     * immediately returns <tt>null</tt> if no packets are currently in the
115     * result queue.
116     *
117     * @param <P> type of the result stanza.
118     * @return the next stanza result, or <tt>null</tt> if there are no more
119     *      results.
120     */
121    @SuppressWarnings("unchecked")
122    public <P extends Stanza> P pollResult() {
123        return (P) resultQueue.poll();
124    }
125
126    /**
127     * Polls to see if a stanza is currently available and returns it, or
128     * immediately returns <tt>null</tt> if no packets are currently in the
129     * result queue.
130     * <p>
131     * Throws an XMPPErrorException in case the polled stanzas did contain an XMPPError.
132     * </p>
133     *
134     * @param <P> type of the result stanza.
135     * @return the next available packet.
136     * @throws XMPPErrorException in case an error response.
137     */
138    public <P extends Stanza> P pollResultOrThrow() throws XMPPErrorException {
139        P result = pollResult();
140        if (result != null) {
141            XMPPErrorException.ifHasErrorThenThrow(result);
142        }
143        return result;
144    }
145
146    /**
147     * Returns the next available packet. The method call will block (not return) until a stanza is
148     * available.
149     *
150     * @param <P> type of the result stanza.
151     * @return the next available packet.
152     * @throws InterruptedException
153     */
154    @SuppressWarnings("unchecked")
155    public <P extends Stanza> P nextResultBlockForever() throws InterruptedException {
156        throwIfCancelled();
157        P res = null;
158        while (res == null) {
159            res = (P) resultQueue.take();
160        }
161        return res;
162    }
163
164    /**
165     * Returns the next available packet. The method call will block until the connection's default
166     * timeout has elapsed.
167     *
168     * @param <P> type of the result stanza.
169     * @return the next available packet.
170     * @throws InterruptedException
171     */
172    public <P extends Stanza> P nextResult() throws InterruptedException {
173        return nextResult(connection.getReplyTimeout());
174    }
175
176    private volatile long waitStart;
177
178    /**
179     * Returns the next available packet. The method call will block (not return)
180     * until a stanza is available or the <tt>timeout</tt> has elapsed. If the
181     * timeout elapses without a result, <tt>null</tt> will be returned.
182     *
183     * @param <P> type of the result stanza.
184     * @param timeout the timeout in milliseconds.
185     * @return the next available packet.
186     * @throws InterruptedException
187     */
188    @SuppressWarnings("unchecked")
189    public <P extends Stanza> P nextResult(long timeout) throws InterruptedException {
190        throwIfCancelled();
191        P res = null;
192        long remainingWait = timeout;
193        waitStart = System.currentTimeMillis();
194        do {
195            res = (P) resultQueue.poll(remainingWait, TimeUnit.MILLISECONDS);
196            if (res != null) {
197                return res;
198            }
199            remainingWait = timeout - (System.currentTimeMillis() - waitStart);
200        } while (remainingWait > 0);
201        return null;
202    }
203
204    /**
205     * Returns the next available stanza. The method in equivalent to
206     * {@link #nextResultOrThrow(long)} where the timeout argument is the default reply timeout of
207     * the connection associated with this collector.
208     *
209     * @param <P> type of the result stanza.
210     * @return the next available stanza.
211     * @throws XMPPErrorException in case an error response was received.
212     * @throws NoResponseException if there was no response from the server.
213     * @throws InterruptedException
214     * @throws NotConnectedException
215     * @see #nextResultOrThrow(long)
216     */
217    public <P extends Stanza> P nextResultOrThrow() throws NoResponseException, XMPPErrorException,
218                    InterruptedException, NotConnectedException {
219        return nextResultOrThrow(connection.getReplyTimeout());
220    }
221
222    /**
223     * Returns the next available stanza. The method call will block until a stanza is
224     * available or the <tt>timeout</tt> has elapsed. This method does also cancel the
225     * collector in every case.
226     * <p>
227     * Three things can happen when waiting for an response:
228     * </p>
229     * <ol>
230     * <li>A result response arrives.</li>
231     * <li>An error response arrives.</li>
232     * <li>An timeout occurs.</li>
233     * <li>The thread is interrupted</li>
234     * </ol>
235     * <p>
236     * in which this method will
237     * </p>
238     * <ol>
239     * <li>return with the result.</li>
240     * <li>throw an {@link XMPPErrorException}.</li>
241     * <li>throw an {@link NoResponseException}.</li>
242     * <li>throw an {@link InterruptedException}.</li>
243     * </ol>
244     * <p>
245     * Additionally the method will throw a {@link NotConnectedException} if no response was
246     * received and the connection got disconnected.
247     * </p>
248     *
249     * @param timeout the amount of time to wait for the next stanza in milliseconds.
250     * @param <P> type of the result stanza.
251     * @return the next available stanza.
252     * @throws NoResponseException if there was no response from the server.
253     * @throws XMPPErrorException in case an error response was received.
254     * @throws InterruptedException if the calling thread was interrupted.
255     * @throws NotConnectedException if there was no response and the connection got disconnected.
256     */
257    public <P extends Stanza> P nextResultOrThrow(long timeout) throws NoResponseException,
258                    XMPPErrorException, InterruptedException, NotConnectedException {
259        P result = nextResult(timeout);
260        cancel();
261        if (result == null) {
262            if (!connection.isConnected()) {
263                throw new NotConnectedException(connection, packetFilter);
264            }
265            throw NoResponseException.newWith(connection, this);
266        }
267
268        XMPPErrorException.ifHasErrorThenThrow(result);
269
270        return result;
271    }
272
273    private List<Stanza> collectedCache;
274
275    /**
276     * Return a list of all collected stanzas. This method must be invoked after the collector has been cancelled.
277     *
278     * @return a list of collected stanzas.
279     * @since 4.3.0
280     */
281    public List<Stanza> getCollectedStanzasAfterCancelled() {
282        if (!cancelled) {
283            throw new IllegalStateException("Stanza collector was not yet cancelled");
284        }
285
286        if (collectedCache == null) {
287            collectedCache = new ArrayList<>(getCollectedCount());
288            resultQueue.drainTo(collectedCache);
289        }
290
291        return collectedCache;
292    }
293
294    /**
295     * Get the number of collected stanzas this stanza collector has collected so far.
296     *
297     * @return the count of collected stanzas.
298     * @since 4.1
299     */
300    public int getCollectedCount() {
301        return resultQueue.size();
302    }
303
304    /**
305     * Processes a stanza to see if it meets the criteria for this stanza collector.
306     * If so, the stanza is added to the result queue.
307     *
308     * @param packet the stanza to process.
309     */
310    protected void processStanza(Stanza packet) {
311        if (packetFilter == null || packetFilter.accept(packet)) {
312            // CHECKSTYLE:OFF
313                while (!resultQueue.offer(packet)) {
314                        // Since we know the queue is full, this poll should never actually block.
315                        resultQueue.poll();
316                }
317            // CHECKSTYLE:ON
318            if (collectorToReset != null) {
319                collectorToReset.waitStart = System.currentTimeMillis();
320            }
321        }
322    }
323
324    private void throwIfCancelled() {
325        if (cancelled) {
326            throw new IllegalStateException("Stanza collector already cancelled");
327        }
328    }
329
330    /**
331     * Get a new stanza collector configuration instance.
332     *
333     * @return a new stanza collector configuration.
334     */
335    public static Configuration newConfiguration() {
336        return new Configuration();
337    }
338
339    public static final class Configuration {
340        private StanzaFilter packetFilter;
341        private int size = SmackConfiguration.getStanzaCollectorSize();
342        private StanzaCollector collectorToReset;
343        private Stanza request;
344
345        private Configuration() {
346        }
347
348        /**
349         * Set the stanza filter used by this collector. If <code>null</code>, then all packets will
350         * get collected by this collector.
351         *
352         * @param packetFilter
353         * @return a reference to this configuration.
354         * @deprecated use {@link #setStanzaFilter(StanzaFilter)} instead.
355         */
356        @Deprecated
357        public Configuration setPacketFilter(StanzaFilter packetFilter) {
358            return setStanzaFilter(packetFilter);
359        }
360
361        /**
362         * Set the stanza filter used by this collector. If <code>null</code>, then all stanzas will
363         * get collected by this collector.
364         *
365         * @param stanzaFilter
366         * @return a reference to this configuration.
367         */
368        public Configuration setStanzaFilter(StanzaFilter stanzaFilter) {
369            this.packetFilter = stanzaFilter;
370            return this;
371        }
372
373        /**
374         * Set the maximum size of this collector, i.e. how many stanzas this collector will collect
375         * before dropping old ones.
376         *
377         * @param size
378         * @return a reference to this configuration.
379         */
380        public Configuration setSize(int size) {
381            this.size = size;
382            return this;
383        }
384
385        /**
386         * Set the collector which timeout for the next result is reset once this collector collects
387         * a packet.
388         *
389         * @param collector
390         * @return a reference to this configuration.
391         */
392        public Configuration setCollectorToReset(StanzaCollector collector) {
393            this.collectorToReset = collector;
394            return this;
395        }
396
397        public Configuration setRequest(Stanza request) {
398            this.request = request;
399            return this;
400        }
401    }
402}