001/** 002 * 003 * Copyright 2003-2007 Jive Software. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.jivesoftware.smack; 019 020import java.util.concurrent.ArrayBlockingQueue; 021import java.util.concurrent.TimeUnit; 022import java.util.logging.Level; 023import java.util.logging.Logger; 024 025import org.jivesoftware.smack.SmackException.NoResponseException; 026import org.jivesoftware.smack.XMPPException.XMPPErrorException; 027import org.jivesoftware.smack.filter.StanzaFilter; 028import org.jivesoftware.smack.packet.Stanza; 029 030/** 031 * Provides a mechanism to collect packets into a result queue that pass a 032 * specified filter. The collector lets you perform blocking and polling 033 * operations on the result queue. So, a PacketCollector is more suitable to 034 * use than a {@link StanzaListener} when you need to wait for a specific 035 * result.<p> 036 * 037 * Each stanza(/packet) collector will queue up a configured number of packets for processing before 038 * older packets are automatically dropped. The default number is retrieved by 039 * {@link SmackConfiguration#getPacketCollectorSize()}. 040 * 041 * @see XMPPConnection#createPacketCollector(StanzaFilter) 042 * @author Matt Tucker 043 */ 044public class PacketCollector { 045 046 private static final Logger LOGGER = Logger.getLogger(PacketCollector.class.getName()); 047 048 private final StanzaFilter packetFilter; 049 private final ArrayBlockingQueue<Stanza> resultQueue; 050 051 /** 052 * The stanza(/packet) collector which timeout for the next result will get reset once this collector collects a stanza. 053 */ 054 private final PacketCollector collectorToReset; 055 056 private final XMPPConnection connection; 057 058 private boolean cancelled = false; 059 060 /** 061 * Creates a new stanza(/packet) collector. If the stanza(/packet) filter is <tt>null</tt>, then 062 * all packets will match this collector. 063 * 064 * @param connection the connection the collector is tied to. 065 * @param configuration the configuration used to construct this collector 066 */ 067 protected PacketCollector(XMPPConnection connection, Configuration configuration) { 068 this.connection = connection; 069 this.packetFilter = configuration.packetFilter; 070 this.resultQueue = new ArrayBlockingQueue<>(configuration.size); 071 this.collectorToReset = configuration.collectorToReset; 072 } 073 074 /** 075 * Explicitly cancels the stanza(/packet) collector so that no more results are 076 * queued up. Once a stanza(/packet) collector has been cancelled, it cannot be 077 * re-enabled. Instead, a new stanza(/packet) collector must be created. 078 */ 079 public void cancel() { 080 // If the packet collector has already been cancelled, do nothing. 081 if (!cancelled) { 082 cancelled = true; 083 connection.removePacketCollector(this); 084 } 085 } 086 087 /** 088 * Returns the stanza(/packet) filter associated with this stanza(/packet) collector. The packet 089 * filter is used to determine what packets are queued as results. 090 * 091 * @return the stanza(/packet) filter. 092 * @deprecated use {@link #getStanzaFilter()} instead. 093 */ 094 @Deprecated 095 public StanzaFilter getPacketFilter() { 096 return getStanzaFilter(); 097 } 098 099 /** 100 * Returns the stanza filter associated with this stanza collector. The stanza 101 * filter is used to determine what stanzas are queued as results. 102 * 103 * @return the stanza filter. 104 */ 105 public StanzaFilter getStanzaFilter() { 106 return packetFilter; 107 } 108 109 /** 110 * Polls to see if a stanza(/packet) is currently available and returns it, or 111 * immediately returns <tt>null</tt> if no packets are currently in the 112 * result queue. 113 * 114 * @return the next stanza(/packet) result, or <tt>null</tt> if there are no more 115 * results. 116 */ 117 @SuppressWarnings("unchecked") 118 public <P extends Stanza> P pollResult() { 119 return (P) resultQueue.poll(); 120 } 121 122 /** 123 * Polls to see if a stanza(/packet) is currently available and returns it, or 124 * immediately returns <tt>null</tt> if no packets are currently in the 125 * result queue. 126 * <p> 127 * Throws an XMPPErrorException in case the polled stanzas did contain an XMPPError. 128 * </p> 129 * 130 * @return the next available packet. 131 * @throws XMPPErrorException in case an error response. 132 */ 133 public <P extends Stanza> P pollResultOrThrow() throws XMPPErrorException { 134 P result = pollResult(); 135 if (result != null) { 136 XMPPErrorException.ifHasErrorThenThrow(result); 137 } 138 return result; 139 } 140 141 /** 142 * Returns the next available packet. The method call will block (not return) until a stanza(/packet) is 143 * available. 144 * 145 * @return the next available packet. 146 */ 147 @SuppressWarnings("unchecked") 148 public <P extends Stanza> P nextResultBlockForever() { 149 throwIfCancelled(); 150 P res = null; 151 while (res == null) { 152 try { 153 res = (P) resultQueue.take(); 154 } catch (InterruptedException e) { 155 LOGGER.log(Level.FINE, 156 "nextResultBlockForever was interrupted", e); 157 } 158 } 159 return res; 160 } 161 162 /** 163 * Returns the next available packet. The method call will block until the connection's default 164 * timeout has elapsed. 165 * 166 * @return the next available packet. 167 */ 168 public <P extends Stanza> P nextResult() { 169 return nextResult(connection.getPacketReplyTimeout()); 170 } 171 172 private volatile long waitStart; 173 174 /** 175 * Returns the next available packet. The method call will block (not return) 176 * until a stanza(/packet) is available or the <tt>timeout</tt> has elapsed. If the 177 * timeout elapses without a result, <tt>null</tt> will be returned. 178 * 179 * @param timeout the timeout in milliseconds. 180 * @return the next available packet. 181 */ 182 @SuppressWarnings("unchecked") 183 public <P extends Stanza> P nextResult(long timeout) { 184 throwIfCancelled(); 185 P res = null; 186 long remainingWait = timeout; 187 waitStart = System.currentTimeMillis(); 188 do { 189 try { 190 res = (P) resultQueue.poll(remainingWait, TimeUnit.MILLISECONDS); 191 } 192 catch (InterruptedException e) { 193 LOGGER.log(Level.FINE, "nextResult was interrupted", e); 194 } 195 if (res != null) { 196 return res; 197 } 198 remainingWait = timeout - (System.currentTimeMillis() - waitStart); 199 } while (remainingWait > 0); 200 return null; 201 } 202 203 /** 204 * Returns the next available packet. The method call will block until a stanza(/packet) is available or 205 * the connections reply timeout has elapsed. If the timeout elapses without a result, 206 * <tt>null</tt> will be returned. This method does also cancel the PacketCollector. 207 * 208 * @return the next available packet. 209 * @throws XMPPErrorException in case an error response. 210 * @throws NoResponseException if there was no response from the server. 211 */ 212 public <P extends Stanza> P nextResultOrThrow() throws NoResponseException, XMPPErrorException { 213 return nextResultOrThrow(connection.getPacketReplyTimeout()); 214 } 215 216 /** 217 * Returns the next available packet. The method call will block until a stanza(/packet) is available or 218 * the <tt>timeout</tt> has elapsed. This method does also cancel the PacketCollector. 219 * 220 * @param timeout the amount of time to wait for the next stanza(/packet) (in milleseconds). 221 * @return the next available packet. 222 * @throws NoResponseException if there was no response from the server. 223 * @throws XMPPErrorException in case an error response. 224 */ 225 public <P extends Stanza> P nextResultOrThrow(long timeout) throws NoResponseException, XMPPErrorException { 226 P result = nextResult(timeout); 227 cancel(); 228 if (result == null) { 229 throw NoResponseException.newWith(connection, this); 230 } 231 232 XMPPErrorException.ifHasErrorThenThrow(result); 233 234 return result; 235 } 236 237 /** 238 * Get the number of collected stanzas this stanza(/packet) collector has collected so far. 239 * 240 * @return the count of collected stanzas. 241 * @since 4.1 242 */ 243 public int getCollectedCount() { 244 return resultQueue.size(); 245 } 246 247 /** 248 * Processes a stanza(/packet) to see if it meets the criteria for this stanza(/packet) collector. 249 * If so, the stanza(/packet) is added to the result queue. 250 * 251 * @param packet the stanza(/packet) to process. 252 */ 253 protected void processPacket(Stanza packet) { 254 if (packetFilter == null || packetFilter.accept(packet)) { 255 while (!resultQueue.offer(packet)) { 256 // Since we know the queue is full, this poll should never actually block. 257 resultQueue.poll(); 258 } 259 if (collectorToReset != null) { 260 collectorToReset.waitStart = System.currentTimeMillis(); 261 } 262 } 263 } 264 265 private final void throwIfCancelled() { 266 if (cancelled) { 267 throw new IllegalStateException("Packet collector already cancelled"); 268 } 269 } 270 271 /** 272 * Get a new stanza(/packet) collector configuration instance. 273 * 274 * @return a new stanza(/packet) collector configuration. 275 */ 276 public static Configuration newConfiguration() { 277 return new Configuration(); 278 } 279 280 public static class Configuration { 281 private StanzaFilter packetFilter; 282 private int size = SmackConfiguration.getPacketCollectorSize(); 283 private PacketCollector collectorToReset; 284 285 private Configuration() { 286 } 287 288 /** 289 * Set the stanza(/packet) filter used by this collector. If <code>null</code>, then all packets will 290 * get collected by this collector. 291 * 292 * @param packetFilter 293 * @return a reference to this configuration. 294 * @deprecated use {@link #setStanzaFilter(StanzaFilter)} instead. 295 */ 296 @Deprecated 297 public Configuration setPacketFilter(StanzaFilter packetFilter) { 298 return setStanzaFilter(packetFilter); 299 } 300 301 /** 302 * Set the stanza filter used by this collector. If <code>null</code>, then all stanzas will 303 * get collected by this collector. 304 * 305 * @param stanzaFilter 306 * @return a reference to this configuration. 307 */ 308 public Configuration setStanzaFilter(StanzaFilter stanzaFilter) { 309 this.packetFilter = stanzaFilter; 310 return this; 311 } 312 313 /** 314 * Set the maximum size of this collector, i.e. how many stanzas this collector will collect 315 * before dropping old ones. 316 * 317 * @param size 318 * @return a reference to this configuration. 319 */ 320 public Configuration setSize(int size) { 321 this.size = size; 322 return this; 323 } 324 325 /** 326 * Set the collector which timeout for the next result is reset once this collector collects 327 * a packet. 328 * 329 * @param collector 330 * @return a reference to this configuration. 331 */ 332 public Configuration setCollectorToReset(PacketCollector collector) { 333 this.collectorToReset = collector; 334 return this; 335 } 336 } 337}