001/**
002 *
003 * Copyright 2003-2007 Jive Software.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.jivesoftware.smack;
019
020import java.util.concurrent.ArrayBlockingQueue;
021import java.util.concurrent.TimeUnit;
022import java.util.logging.Level;
023import java.util.logging.Logger;
024
025import org.jivesoftware.smack.SmackException.NoResponseException;
026import org.jivesoftware.smack.XMPPException.XMPPErrorException;
027import org.jivesoftware.smack.filter.StanzaFilter;
028import org.jivesoftware.smack.packet.Stanza;
029
030/**
031 * Provides a mechanism to collect packets into a result queue that pass a
032 * specified filter. The collector lets you perform blocking and polling
033 * operations on the result queue. So, a PacketCollector is more suitable to
034 * use than a {@link StanzaListener} when you need to wait for a specific
035 * result.<p>
036 *
037 * Each stanza(/packet) collector will queue up a configured number of packets for processing before
038 * older packets are automatically dropped.  The default number is retrieved by 
039 * {@link SmackConfiguration#getPacketCollectorSize()}.
040 *
041 * @see XMPPConnection#createPacketCollector(StanzaFilter)
042 * @author Matt Tucker
043 */
044public class PacketCollector {
045
046    private static final Logger LOGGER = Logger.getLogger(PacketCollector.class.getName());
047
048    private final StanzaFilter packetFilter;
049    private final ArrayBlockingQueue<Stanza> resultQueue;
050
051    /**
052     * The stanza(/packet) collector which timeout for the next result will get reset once this collector collects a stanza.
053     */
054    private final PacketCollector collectorToReset;
055
056    private final XMPPConnection connection;
057
058    private boolean cancelled = false;
059
060    /**
061     * Creates a new stanza(/packet) collector. If the stanza(/packet) filter is <tt>null</tt>, then
062     * all packets will match this collector.
063     *
064     * @param connection the connection the collector is tied to.
065     * @param configuration the configuration used to construct this collector
066     */
067    protected PacketCollector(XMPPConnection connection, Configuration configuration) {
068        this.connection = connection;
069        this.packetFilter = configuration.packetFilter;
070        this.resultQueue = new ArrayBlockingQueue<>(configuration.size);
071        this.collectorToReset = configuration.collectorToReset;
072    }
073
074    /**
075     * Explicitly cancels the stanza(/packet) collector so that no more results are
076     * queued up. Once a stanza(/packet) collector has been cancelled, it cannot be
077     * re-enabled. Instead, a new stanza(/packet) collector must be created.
078     */
079    public void cancel() {
080        // If the packet collector has already been cancelled, do nothing.
081        if (!cancelled) {
082            cancelled = true;
083            connection.removePacketCollector(this);
084        }
085    }
086
087    /**
088     * Returns the stanza(/packet) filter associated with this stanza(/packet) collector. The packet
089     * filter is used to determine what packets are queued as results.
090     *
091     * @return the stanza(/packet) filter.
092     * @deprecated use {@link #getStanzaFilter()} instead.
093     */
094    @Deprecated
095    public StanzaFilter getPacketFilter() {
096        return getStanzaFilter();
097    }
098
099    /**
100     * Returns the stanza filter associated with this stanza collector. The stanza
101     * filter is used to determine what stanzas are queued as results.
102     *
103     * @return the stanza filter.
104     */
105    public StanzaFilter getStanzaFilter() {
106        return packetFilter;
107    }
108 
109    /**
110     * Polls to see if a stanza(/packet) is currently available and returns it, or
111     * immediately returns <tt>null</tt> if no packets are currently in the
112     * result queue.
113     *
114     * @return the next stanza(/packet) result, or <tt>null</tt> if there are no more
115     *      results.
116     */
117    @SuppressWarnings("unchecked")
118    public <P extends Stanza> P pollResult() {
119        return (P) resultQueue.poll();
120    }
121
122    /**
123     * Polls to see if a stanza(/packet) is currently available and returns it, or
124     * immediately returns <tt>null</tt> if no packets are currently in the
125     * result queue.
126     * <p>
127     * Throws an XMPPErrorException in case the polled stanzas did contain an XMPPError.
128     * </p>
129     * 
130     * @return the next available packet.
131     * @throws XMPPErrorException in case an error response.
132     */
133    public <P extends Stanza> P pollResultOrThrow() throws XMPPErrorException {
134        P result = pollResult();
135        if (result != null) {
136            XMPPErrorException.ifHasErrorThenThrow(result);
137        }
138        return result;
139    }
140
141    /**
142     * Returns the next available packet. The method call will block (not return) until a stanza(/packet) is
143     * available.
144     * 
145     * @return the next available packet.
146     */
147    @SuppressWarnings("unchecked")
148    public <P extends Stanza> P nextResultBlockForever() {
149        throwIfCancelled();
150        P res = null;
151        while (res == null) {
152            try {
153                res = (P) resultQueue.take();
154            } catch (InterruptedException e) {
155                LOGGER.log(Level.FINE,
156                                "nextResultBlockForever was interrupted", e);
157            }
158        }
159        return res;
160    }
161
162    /**
163     * Returns the next available packet. The method call will block until the connection's default
164     * timeout has elapsed.
165     * 
166     * @return the next available packet.
167     */
168    public <P extends Stanza> P nextResult() {
169        return nextResult(connection.getPacketReplyTimeout());
170    }
171
172    private volatile long waitStart;
173
174    /**
175     * Returns the next available packet. The method call will block (not return)
176     * until a stanza(/packet) is available or the <tt>timeout</tt> has elapsed. If the
177     * timeout elapses without a result, <tt>null</tt> will be returned.
178     *
179     * @param timeout the timeout in milliseconds.
180     * @return the next available packet.
181     */
182    @SuppressWarnings("unchecked")
183    public <P extends Stanza> P nextResult(long timeout) {
184        throwIfCancelled();
185        P res = null;
186        long remainingWait = timeout;
187        waitStart = System.currentTimeMillis();
188        do {
189            try {
190                res = (P) resultQueue.poll(remainingWait, TimeUnit.MILLISECONDS);
191            }
192            catch (InterruptedException e) {
193                LOGGER.log(Level.FINE, "nextResult was interrupted", e);
194            }
195            if (res != null) {
196                return res;
197            }
198            remainingWait = timeout - (System.currentTimeMillis() - waitStart);
199        } while (remainingWait > 0);
200        return null;
201    }
202
203    /**
204     * Returns the next available packet. The method call will block until a stanza(/packet) is available or
205     * the connections reply timeout has elapsed. If the timeout elapses without a result,
206     * <tt>null</tt> will be returned. This method does also cancel the PacketCollector.
207     * 
208     * @return the next available packet.
209     * @throws XMPPErrorException in case an error response.
210     * @throws NoResponseException if there was no response from the server.
211     */
212    public <P extends Stanza> P nextResultOrThrow() throws NoResponseException, XMPPErrorException {
213        return nextResultOrThrow(connection.getPacketReplyTimeout());
214    }
215
216    /**
217     * Returns the next available packet. The method call will block until a stanza(/packet) is available or
218     * the <tt>timeout</tt> has elapsed. This method does also cancel the PacketCollector.
219     * 
220     * @param timeout the amount of time to wait for the next stanza(/packet) (in milleseconds).
221     * @return the next available packet.
222     * @throws NoResponseException if there was no response from the server.
223     * @throws XMPPErrorException in case an error response.
224     */
225    public <P extends Stanza> P nextResultOrThrow(long timeout) throws NoResponseException, XMPPErrorException {
226        P result = nextResult(timeout);
227        cancel();
228        if (result == null) {
229            throw NoResponseException.newWith(connection, this);
230        }
231
232        XMPPErrorException.ifHasErrorThenThrow(result);
233
234        return result;
235    }
236
237    /**
238     * Get the number of collected stanzas this stanza(/packet) collector has collected so far.
239     * 
240     * @return the count of collected stanzas.
241     * @since 4.1
242     */
243    public int getCollectedCount() {
244        return resultQueue.size();
245    }
246
247    /**
248     * Processes a stanza(/packet) to see if it meets the criteria for this stanza(/packet) collector.
249     * If so, the stanza(/packet) is added to the result queue.
250     *
251     * @param packet the stanza(/packet) to process.
252     */
253    protected void processPacket(Stanza packet) {
254        if (packetFilter == null || packetFilter.accept(packet)) {
255                while (!resultQueue.offer(packet)) {
256                        // Since we know the queue is full, this poll should never actually block.
257                        resultQueue.poll();
258                }
259            if (collectorToReset != null) {
260                collectorToReset.waitStart = System.currentTimeMillis();
261            }
262        }
263    }
264
265    private final void throwIfCancelled() {
266        if (cancelled) {
267            throw new IllegalStateException("Packet collector already cancelled");
268        }
269    }
270
271    /**
272     * Get a new stanza(/packet) collector configuration instance.
273     * 
274     * @return a new stanza(/packet) collector configuration.
275     */
276    public static Configuration newConfiguration() {
277        return new Configuration();
278    }
279
280    public static class Configuration {
281        private StanzaFilter packetFilter;
282        private int size = SmackConfiguration.getPacketCollectorSize();
283        private PacketCollector collectorToReset;
284
285        private Configuration() {
286        }
287
288        /**
289         * Set the stanza(/packet) filter used by this collector. If <code>null</code>, then all packets will
290         * get collected by this collector.
291         * 
292         * @param packetFilter
293         * @return a reference to this configuration.
294         * @deprecated use {@link #setStanzaFilter(StanzaFilter)} instead.
295         */
296        @Deprecated
297        public Configuration setPacketFilter(StanzaFilter packetFilter) {
298            return setStanzaFilter(packetFilter);
299        }
300
301        /**
302         * Set the stanza filter used by this collector. If <code>null</code>, then all stanzas will
303         * get collected by this collector.
304         * 
305         * @param stanzaFilter
306         * @return a reference to this configuration.
307         */
308        public Configuration setStanzaFilter(StanzaFilter stanzaFilter) {
309            this.packetFilter = stanzaFilter;
310            return this;
311        }
312
313        /**
314         * Set the maximum size of this collector, i.e. how many stanzas this collector will collect
315         * before dropping old ones.
316         * 
317         * @param size
318         * @return a reference to this configuration.
319         */
320        public Configuration setSize(int size) {
321            this.size = size;
322            return this;
323        }
324
325        /**
326         * Set the collector which timeout for the next result is reset once this collector collects
327         * a packet.
328         * 
329         * @param collector
330         * @return a reference to this configuration.
331         */
332        public Configuration setCollectorToReset(PacketCollector collector) {
333            this.collectorToReset = collector;
334            return this;
335        }
336    }
337}