001/**
002 *
003 * Copyright 2003-2007 Jive Software.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.jivesoftware.smack;
019
020import java.util.concurrent.ArrayBlockingQueue;
021import java.util.concurrent.TimeUnit;
022import java.util.logging.Level;
023import java.util.logging.Logger;
024
025import org.jivesoftware.smack.SmackException.NoResponseException;
026import org.jivesoftware.smack.XMPPException.XMPPErrorException;
027import org.jivesoftware.smack.filter.PacketFilter;
028import org.jivesoftware.smack.packet.Packet;
029import org.jivesoftware.smack.packet.XMPPError;
030
031/**
032 * Provides a mechanism to collect packets into a result queue that pass a
033 * specified filter. The collector lets you perform blocking and polling
034 * operations on the result queue. So, a PacketCollector is more suitable to
035 * use than a {@link PacketListener} when you need to wait for a specific
036 * result.<p>
037 *
038 * Each packet collector will queue up a configured number of packets for processing before
039 * older packets are automatically dropped.  The default number is retrieved by 
040 * {@link SmackConfiguration#getPacketCollectorSize()}.
041 *
042 * @see XMPPConnection#createPacketCollector(PacketFilter)
043 * @author Matt Tucker
044 */
045public class PacketCollector {
046
047    private static final Logger LOGGER = Logger.getLogger(PacketCollector.class.getName());
048
049    private PacketFilter packetFilter;
050    private ArrayBlockingQueue<Packet> resultQueue;
051    private XMPPConnection connection;
052    private boolean cancelled = false;
053
054    /**
055     * Creates a new packet collector. If the packet filter is <tt>null</tt>, then
056     * all packets will match this collector.
057     *
058     * @param connection the connection the collector is tied to.
059     * @param packetFilter determines which packets will be returned by this collector.
060     */
061    protected PacketCollector(XMPPConnection connection, PacketFilter packetFilter) {
062        this(connection, packetFilter, SmackConfiguration.getPacketCollectorSize());
063    }
064
065    /**
066     * Creates a new packet collector. If the packet filter is <tt>null</tt>, then
067     * all packets will match this collector.
068     *
069     * @param connection the connection the collector is tied to.
070     * @param packetFilter determines which packets will be returned by this collector.
071     * @param maxSize the maximum number of packets that will be stored in the collector.
072     */
073    protected PacketCollector(XMPPConnection connection, PacketFilter packetFilter, int maxSize) {
074        this.connection = connection;
075        this.packetFilter = packetFilter;
076        this.resultQueue = new ArrayBlockingQueue<Packet>(maxSize);
077    }
078
079    /**
080     * Explicitly cancels the packet collector so that no more results are
081     * queued up. Once a packet collector has been cancelled, it cannot be
082     * re-enabled. Instead, a new packet collector must be created.
083     */
084    public void cancel() {
085        // If the packet collector has already been cancelled, do nothing.
086        if (!cancelled) {
087            cancelled = true;
088            connection.removePacketCollector(this);
089        }
090    }
091
092    /**
093     * Returns the packet filter associated with this packet collector. The packet
094     * filter is used to determine what packets are queued as results.
095     *
096     * @return the packet filter.
097     */
098    public PacketFilter getPacketFilter() {
099        return packetFilter;
100    }
101
102    /**
103     * Polls to see if a packet is currently available and returns it, or
104     * immediately returns <tt>null</tt> if no packets are currently in the
105     * result queue.
106     *
107     * @return the next packet result, or <tt>null</tt> if there are no more
108     *      results.
109     */
110    public Packet pollResult() {
111        return resultQueue.poll();
112    }
113
114    /**
115     * Returns the next available packet. The method call will block (not return) until a packet is
116     * available.
117     * 
118     * @return the next available packet.
119     */
120    public Packet nextResultBlockForever() {
121        Packet res = null;
122        while (res == null) {
123            try {
124                res = resultQueue.take();
125            } catch (InterruptedException e) {
126                LOGGER.log(Level.FINE,
127                                "nextResultBlockForever was interrupted", e);
128            }
129        }
130        return res;
131    }
132
133    /**
134     * Returns the next available packet. The method call will block until the connection's default
135     * timeout has elapsed.
136     * 
137     * @return the next availabe packet.
138     */
139    public Packet nextResult() {
140        return nextResult(connection.getPacketReplyTimeout());
141    }
142
143    /**
144     * Returns the next available packet. The method call will block (not return)
145     * until a packet is available or the <tt>timeout</tt> has elapsed. If the
146     * timeout elapses without a result, <tt>null</tt> will be returned.
147     *
148     * @return the next available packet.
149     */
150    public Packet nextResult(final long timeout) {
151        Packet res = null;
152        long remainingWait = timeout;
153        final long waitStart = System.currentTimeMillis();
154        while (res == null && remainingWait > 0) {
155            try {
156                res = resultQueue.poll(remainingWait, TimeUnit.MILLISECONDS);
157                remainingWait = timeout - (System.currentTimeMillis() - waitStart);
158            } catch (InterruptedException e) {
159                LOGGER.log(Level.FINE, "nextResult was interrupted", e);
160            }
161        }
162        return res;
163    }
164
165    /**
166     * Returns the next available packet. The method call will block until a packet is available or
167     * the connections reply timeout has elapsed. If the timeout elapses without a result,
168     * <tt>null</tt> will be returned. This method does also cancel the PacketCollector.
169     * 
170     * @return the next available packet.
171     * @throws XMPPErrorException in case an error response.
172     * @throws NoResponseException if there was no response from the server.
173     */
174    public Packet nextResultOrThrow() throws NoResponseException, XMPPErrorException {
175        return nextResultOrThrow(connection.getPacketReplyTimeout());
176    }
177
178    /**
179     * Returns the next available packet. The method call will block until a packet is available or
180     * the <tt>timeout</tt> has elapsed. This method does also cancel the PacketCollector.
181     * 
182     * @param timeout the amount of time to wait for the next packet (in milleseconds).
183     * @return the next available packet.
184     * @throws NoResponseException if there was no response from the server.
185     * @throws XMPPErrorException in case an error response.
186     */
187    public Packet nextResultOrThrow(long timeout) throws NoResponseException, XMPPErrorException {
188        Packet result = nextResult(timeout);
189        cancel();
190        if (result == null) {
191            throw new NoResponseException();
192        }
193
194        XMPPError xmppError = result.getError();
195        if (xmppError != null) {
196            throw new XMPPErrorException(xmppError);
197        }
198
199        return result;
200    }
201
202    /**
203     * Processes a packet to see if it meets the criteria for this packet collector.
204     * If so, the packet is added to the result queue.
205     *
206     * @param packet the packet to process.
207     */
208    protected void processPacket(Packet packet) {
209        if (packet == null) {
210            return;
211        }
212        
213        if (packetFilter == null || packetFilter.accept(packet)) {
214                while (!resultQueue.offer(packet)) {
215                        // Since we know the queue is full, this poll should never actually block.
216                        resultQueue.poll();
217                }
218        }
219    }
220}