001/**
002 *
003 * Copyright 2003-2007 Jive Software, 2016-2017 Florian Schmaus.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.jivesoftware.smack;
019
020import java.util.concurrent.ArrayBlockingQueue;
021import java.util.concurrent.TimeUnit;
022
023import org.jivesoftware.smack.SmackException.NoResponseException;
024import org.jivesoftware.smack.SmackException.NotConnectedException;
025import org.jivesoftware.smack.XMPPException.XMPPErrorException;
026import org.jivesoftware.smack.filter.StanzaFilter;
027import org.jivesoftware.smack.packet.Stanza;
028
029/**
030 * Provides a mechanism to collect Stanzas into a result queue that pass a
031 * specified filter/matcher. The collector lets you perform blocking and polling
032 * operations on the result queue. So, a StanzaCollector is more suitable to
033 * use than a {@link StanzaListener} when you need to wait for a specific
034 * result.<p>
035 *
036 * Each stanza(/packet) collector will queue up a configured number of packets for processing before
037 * older packets are automatically dropped.  The default number is retrieved by 
038 * {@link SmackConfiguration#getStanzaCollectorSize()}.
039 *
040 * @see XMPPConnection#createStanzaCollector(StanzaFilter)
041 * @author Matt Tucker
042 */
043public class StanzaCollector {
044
045    private final StanzaFilter packetFilter;
046
047    private final ArrayBlockingQueue<Stanza> resultQueue;
048
049    /**
050     * The stanza(/packet) collector which timeout for the next result will get reset once this collector collects a stanza.
051     */
052    private final StanzaCollector collectorToReset;
053
054    private final XMPPConnection connection;
055
056    private boolean cancelled = false;
057
058    /**
059     * Creates a new stanza(/packet) collector. If the stanza(/packet) filter is <tt>null</tt>, then
060     * all packets will match this collector.
061     *
062     * @param connection the connection the collector is tied to.
063     * @param configuration the configuration used to construct this collector
064     */
065    protected StanzaCollector(XMPPConnection connection, Configuration configuration) {
066        this.connection = connection;
067        this.packetFilter = configuration.packetFilter;
068        this.resultQueue = new ArrayBlockingQueue<>(configuration.size);
069        this.collectorToReset = configuration.collectorToReset;
070    }
071
072    /**
073     * Explicitly cancels the stanza(/packet) collector so that no more results are
074     * queued up. Once a stanza(/packet) collector has been cancelled, it cannot be
075     * re-enabled. Instead, a new stanza(/packet) collector must be created.
076     */
077    public void cancel() {
078        // If the packet collector has already been cancelled, do nothing.
079        if (!cancelled) {
080            cancelled = true;
081            connection.removeStanzaCollector(this);
082        }
083    }
084
085    /**
086     * Returns the stanza(/packet) filter associated with this stanza(/packet) collector. The packet
087     * filter is used to determine what packets are queued as results.
088     *
089     * @return the stanza(/packet) filter.
090     * @deprecated use {@link #getStanzaFilter()} instead.
091     */
092    @Deprecated
093    public StanzaFilter getPacketFilter() {
094        return getStanzaFilter();
095    }
096
097    /**
098     * Returns the stanza filter associated with this stanza collector. The stanza
099     * filter is used to determine what stanzas are queued as results.
100     *
101     * @return the stanza filter.
102     */
103    public StanzaFilter getStanzaFilter() {
104        return packetFilter;
105    }
106
107    /**
108     * Polls to see if a stanza(/packet) is currently available and returns it, or
109     * immediately returns <tt>null</tt> if no packets are currently in the
110     * result queue.
111     *
112     * @param <P> type of the result stanza.
113     * @return the next stanza(/packet) result, or <tt>null</tt> if there are no more
114     *      results.
115     */
116    @SuppressWarnings("unchecked")
117    public <P extends Stanza> P pollResult() {
118        return (P) resultQueue.poll();
119    }
120
121    /**
122     * Polls to see if a stanza(/packet) is currently available and returns it, or
123     * immediately returns <tt>null</tt> if no packets are currently in the
124     * result queue.
125     * <p>
126     * Throws an XMPPErrorException in case the polled stanzas did contain an XMPPError.
127     * </p>
128     *
129     * @param <P> type of the result stanza.
130     * @return the next available packet.
131     * @throws XMPPErrorException in case an error response.
132     */
133    public <P extends Stanza> P pollResultOrThrow() throws XMPPErrorException {
134        P result = pollResult();
135        if (result != null) {
136            XMPPErrorException.ifHasErrorThenThrow(result);
137        }
138        return result;
139    }
140
141    /**
142     * Returns the next available packet. The method call will block (not return) until a stanza(/packet) is
143     * available.
144     *
145     * @param <P> type of the result stanza.
146     * @return the next available packet.
147     * @throws InterruptedException 
148     */
149    @SuppressWarnings("unchecked")
150    public <P extends Stanza> P nextResultBlockForever() throws InterruptedException {
151        throwIfCancelled();
152        P res = null;
153        while (res == null) {
154            res = (P) resultQueue.take();
155        }
156        return res;
157    }
158
159    /**
160     * Returns the next available packet. The method call will block until the connection's default
161     * timeout has elapsed.
162     *
163     * @param <P> type of the result stanza.
164     * @return the next available packet.
165     * @throws InterruptedException 
166     */
167    public <P extends Stanza> P nextResult() throws InterruptedException {
168        return nextResult(connection.getReplyTimeout());
169    }
170
171    private volatile long waitStart;
172
173    /**
174     * Returns the next available packet. The method call will block (not return)
175     * until a stanza(/packet) is available or the <tt>timeout</tt> has elapsed. If the
176     * timeout elapses without a result, <tt>null</tt> will be returned.
177     *
178     * @param <P> type of the result stanza.
179     * @param timeout the timeout in milliseconds.
180     * @return the next available packet.
181     * @throws InterruptedException 
182     */
183    @SuppressWarnings("unchecked")
184    public <P extends Stanza> P nextResult(long timeout) throws InterruptedException {
185        throwIfCancelled();
186        P res = null;
187        long remainingWait = timeout;
188        waitStart = System.currentTimeMillis();
189        do {
190            res = (P) resultQueue.poll(remainingWait, TimeUnit.MILLISECONDS);
191            if (res != null) {
192                return res;
193            }
194            remainingWait = timeout - (System.currentTimeMillis() - waitStart);
195        } while (remainingWait > 0);
196        return null;
197    }
198
199    /**
200     * Returns the next available stanza. The method in equivalent to
201     * {@link #nextResultOrThrow(long)} where the timeout argument is the default reply timeout of
202     * the connection associated with this collector.
203     *
204     * @param <P> type of the result stanza.
205     * @return the next available stanza.
206     * @throws XMPPErrorException in case an error response was received.
207     * @throws NoResponseException if there was no response from the server.
208     * @throws InterruptedException
209     * @throws NotConnectedException
210     * @see #nextResultOrThrow(long)
211     */
212    public <P extends Stanza> P nextResultOrThrow() throws NoResponseException, XMPPErrorException,
213                    InterruptedException, NotConnectedException {
214        return nextResultOrThrow(connection.getReplyTimeout());
215    }
216
217    /**
218     * Returns the next available stanza. The method call will block until a stanza is
219     * available or the <tt>timeout</tt> has elapsed. This method does also cancel the
220     * collector in every case.
221     * <p>
222     * Three things can happen when waiting for an response:
223     * </p>
224     * <ol>
225     * <li>A result response arrives.</li>
226     * <li>An error response arrives.</li>
227     * <li>An timeout occurs.</li>
228     * <li>The thread is interrupted</li>
229     * </ol>
230     * <p>
231     * in which this method will
232     * </p>
233     * <ol>
234     * <li>return with the result.</li>
235     * <li>throw an {@link XMPPErrorException}.</li>
236     * <li>throw an {@link NoResponseException}.</li>
237     * <li>throw an {@link InterruptedException}.</li>
238     * </ol>
239     * <p>
240     * Additionally the method will throw a {@link NotConnectedException} if no response was
241     * received and the connection got disconnected.
242     * </p>
243     *
244     * @param timeout the amount of time to wait for the next stanza in milliseconds.
245     * @param <P> type of the result stanza.
246     * @return the next available stanza.
247     * @throws NoResponseException if there was no response from the server.
248     * @throws XMPPErrorException in case an error response was received.
249     * @throws InterruptedException if the calling thread was interrupted.
250     * @throws NotConnectedException if there was no response and the connection got disconnected.
251     */
252    public <P extends Stanza> P nextResultOrThrow(long timeout) throws NoResponseException,
253                    XMPPErrorException, InterruptedException, NotConnectedException {
254        P result = nextResult(timeout);
255        cancel();
256        if (result == null) {
257            if (!connection.isConnected()) {
258                throw new NotConnectedException(connection, packetFilter);
259            }
260            throw NoResponseException.newWith(connection, this);
261        }
262
263        XMPPErrorException.ifHasErrorThenThrow(result);
264
265        return result;
266    }
267
268    /**
269     * Get the number of collected stanzas this stanza(/packet) collector has collected so far.
270     * 
271     * @return the count of collected stanzas.
272     * @since 4.1
273     */
274    public int getCollectedCount() {
275        return resultQueue.size();
276    }
277
278    /**
279     * Processes a stanza(/packet) to see if it meets the criteria for this stanza(/packet) collector.
280     * If so, the stanza(/packet) is added to the result queue.
281     *
282     * @param packet the stanza(/packet) to process.
283     */
284    protected void processStanza(Stanza packet) {
285        if (packetFilter == null || packetFilter.accept(packet)) {
286            // CHECKSTYLE:OFF
287                while (!resultQueue.offer(packet)) {
288                        // Since we know the queue is full, this poll should never actually block.
289                        resultQueue.poll();
290                }
291            // CHECKSTYLE:ON
292            if (collectorToReset != null) {
293                collectorToReset.waitStart = System.currentTimeMillis();
294            }
295        }
296    }
297
298    private void throwIfCancelled() {
299        if (cancelled) {
300            throw new IllegalStateException("Packet collector already cancelled");
301        }
302    }
303
304    /**
305     * Get a new stanza(/packet) collector configuration instance.
306     * 
307     * @return a new stanza(/packet) collector configuration.
308     */
309    public static Configuration newConfiguration() {
310        return new Configuration();
311    }
312
313    public static final class Configuration {
314        private StanzaFilter packetFilter;
315        private int size = SmackConfiguration.getStanzaCollectorSize();
316        private StanzaCollector collectorToReset;
317
318        private Configuration() {
319        }
320
321        /**
322         * Set the stanza(/packet) filter used by this collector. If <code>null</code>, then all packets will
323         * get collected by this collector.
324         * 
325         * @param packetFilter
326         * @return a reference to this configuration.
327         * @deprecated use {@link #setStanzaFilter(StanzaFilter)} instead.
328         */
329        @Deprecated
330        public Configuration setPacketFilter(StanzaFilter packetFilter) {
331            return setStanzaFilter(packetFilter);
332        }
333
334        /**
335         * Set the stanza filter used by this collector. If <code>null</code>, then all stanzas will
336         * get collected by this collector.
337         * 
338         * @param stanzaFilter
339         * @return a reference to this configuration.
340         */
341        public Configuration setStanzaFilter(StanzaFilter stanzaFilter) {
342            this.packetFilter = stanzaFilter;
343            return this;
344        }
345
346        /**
347         * Set the maximum size of this collector, i.e. how many stanzas this collector will collect
348         * before dropping old ones.
349         * 
350         * @param size
351         * @return a reference to this configuration.
352         */
353        public Configuration setSize(int size) {
354            this.size = size;
355            return this;
356        }
357
358        /**
359         * Set the collector which timeout for the next result is reset once this collector collects
360         * a packet.
361         * 
362         * @param collector
363         * @return a reference to this configuration.
364         */
365        public Configuration setCollectorToReset(StanzaCollector collector) {
366            this.collectorToReset = collector;
367            return this;
368        }
369    }
370}