001/** 002 * 003 * Copyright 2003-2007 Jive Software, 2016-2017 Florian Schmaus. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.jivesoftware.smack; 019 020import java.util.concurrent.ArrayBlockingQueue; 021import java.util.concurrent.TimeUnit; 022 023import org.jivesoftware.smack.SmackException.NoResponseException; 024import org.jivesoftware.smack.SmackException.NotConnectedException; 025import org.jivesoftware.smack.XMPPException.XMPPErrorException; 026import org.jivesoftware.smack.filter.StanzaFilter; 027import org.jivesoftware.smack.packet.Stanza; 028 029/** 030 * Provides a mechanism to collect Stanzas into a result queue that pass a 031 * specified filter/matcher. The collector lets you perform blocking and polling 032 * operations on the result queue. So, a StanzaCollector is more suitable to 033 * use than a {@link StanzaListener} when you need to wait for a specific 034 * result.<p> 035 * 036 * Each stanza(/packet) collector will queue up a configured number of packets for processing before 037 * older packets are automatically dropped. The default number is retrieved by 038 * {@link SmackConfiguration#getStanzaCollectorSize()}. 039 * 040 * @see XMPPConnection#createStanzaCollector(StanzaFilter) 041 * @author Matt Tucker 042 */ 043public class StanzaCollector { 044 045 private final StanzaFilter packetFilter; 046 047 private final ArrayBlockingQueue<Stanza> resultQueue; 048 049 /** 050 * The stanza(/packet) collector which timeout for the next result will get reset once this collector collects a stanza. 051 */ 052 private final StanzaCollector collectorToReset; 053 054 private final XMPPConnection connection; 055 056 private boolean cancelled = false; 057 058 /** 059 * Creates a new stanza(/packet) collector. If the stanza(/packet) filter is <tt>null</tt>, then 060 * all packets will match this collector. 061 * 062 * @param connection the connection the collector is tied to. 063 * @param configuration the configuration used to construct this collector 064 */ 065 protected StanzaCollector(XMPPConnection connection, Configuration configuration) { 066 this.connection = connection; 067 this.packetFilter = configuration.packetFilter; 068 this.resultQueue = new ArrayBlockingQueue<>(configuration.size); 069 this.collectorToReset = configuration.collectorToReset; 070 } 071 072 /** 073 * Explicitly cancels the stanza(/packet) collector so that no more results are 074 * queued up. Once a stanza(/packet) collector has been cancelled, it cannot be 075 * re-enabled. Instead, a new stanza(/packet) collector must be created. 076 */ 077 public void cancel() { 078 // If the packet collector has already been cancelled, do nothing. 079 if (!cancelled) { 080 cancelled = true; 081 connection.removeStanzaCollector(this); 082 } 083 } 084 085 /** 086 * Returns the stanza(/packet) filter associated with this stanza(/packet) collector. The packet 087 * filter is used to determine what packets are queued as results. 088 * 089 * @return the stanza(/packet) filter. 090 * @deprecated use {@link #getStanzaFilter()} instead. 091 */ 092 @Deprecated 093 public StanzaFilter getPacketFilter() { 094 return getStanzaFilter(); 095 } 096 097 /** 098 * Returns the stanza filter associated with this stanza collector. The stanza 099 * filter is used to determine what stanzas are queued as results. 100 * 101 * @return the stanza filter. 102 */ 103 public StanzaFilter getStanzaFilter() { 104 return packetFilter; 105 } 106 107 /** 108 * Polls to see if a stanza(/packet) is currently available and returns it, or 109 * immediately returns <tt>null</tt> if no packets are currently in the 110 * result queue. 111 * 112 * @param <P> type of the result stanza. 113 * @return the next stanza(/packet) result, or <tt>null</tt> if there are no more 114 * results. 115 */ 116 @SuppressWarnings("unchecked") 117 public <P extends Stanza> P pollResult() { 118 return (P) resultQueue.poll(); 119 } 120 121 /** 122 * Polls to see if a stanza(/packet) is currently available and returns it, or 123 * immediately returns <tt>null</tt> if no packets are currently in the 124 * result queue. 125 * <p> 126 * Throws an XMPPErrorException in case the polled stanzas did contain an XMPPError. 127 * </p> 128 * 129 * @param <P> type of the result stanza. 130 * @return the next available packet. 131 * @throws XMPPErrorException in case an error response. 132 */ 133 public <P extends Stanza> P pollResultOrThrow() throws XMPPErrorException { 134 P result = pollResult(); 135 if (result != null) { 136 XMPPErrorException.ifHasErrorThenThrow(result); 137 } 138 return result; 139 } 140 141 /** 142 * Returns the next available packet. The method call will block (not return) until a stanza(/packet) is 143 * available. 144 * 145 * @param <P> type of the result stanza. 146 * @return the next available packet. 147 * @throws InterruptedException 148 */ 149 @SuppressWarnings("unchecked") 150 public <P extends Stanza> P nextResultBlockForever() throws InterruptedException { 151 throwIfCancelled(); 152 P res = null; 153 while (res == null) { 154 res = (P) resultQueue.take(); 155 } 156 return res; 157 } 158 159 /** 160 * Returns the next available packet. The method call will block until the connection's default 161 * timeout has elapsed. 162 * 163 * @param <P> type of the result stanza. 164 * @return the next available packet. 165 * @throws InterruptedException 166 */ 167 public <P extends Stanza> P nextResult() throws InterruptedException { 168 return nextResult(connection.getReplyTimeout()); 169 } 170 171 private volatile long waitStart; 172 173 /** 174 * Returns the next available packet. The method call will block (not return) 175 * until a stanza(/packet) is available or the <tt>timeout</tt> has elapsed. If the 176 * timeout elapses without a result, <tt>null</tt> will be returned. 177 * 178 * @param <P> type of the result stanza. 179 * @param timeout the timeout in milliseconds. 180 * @return the next available packet. 181 * @throws InterruptedException 182 */ 183 @SuppressWarnings("unchecked") 184 public <P extends Stanza> P nextResult(long timeout) throws InterruptedException { 185 throwIfCancelled(); 186 P res = null; 187 long remainingWait = timeout; 188 waitStart = System.currentTimeMillis(); 189 do { 190 res = (P) resultQueue.poll(remainingWait, TimeUnit.MILLISECONDS); 191 if (res != null) { 192 return res; 193 } 194 remainingWait = timeout - (System.currentTimeMillis() - waitStart); 195 } while (remainingWait > 0); 196 return null; 197 } 198 199 /** 200 * Returns the next available stanza. The method in equivalent to 201 * {@link #nextResultOrThrow(long)} where the timeout argument is the default reply timeout of 202 * the connection associated with this collector. 203 * 204 * @param <P> type of the result stanza. 205 * @return the next available stanza. 206 * @throws XMPPErrorException in case an error response was received. 207 * @throws NoResponseException if there was no response from the server. 208 * @throws InterruptedException 209 * @throws NotConnectedException 210 * @see #nextResultOrThrow(long) 211 */ 212 public <P extends Stanza> P nextResultOrThrow() throws NoResponseException, XMPPErrorException, 213 InterruptedException, NotConnectedException { 214 return nextResultOrThrow(connection.getReplyTimeout()); 215 } 216 217 /** 218 * Returns the next available stanza. The method call will block until a stanza is 219 * available or the <tt>timeout</tt> has elapsed. This method does also cancel the 220 * collector in every case. 221 * <p> 222 * Three things can happen when waiting for an response: 223 * </p> 224 * <ol> 225 * <li>A result response arrives.</li> 226 * <li>An error response arrives.</li> 227 * <li>An timeout occurs.</li> 228 * <li>The thread is interrupted</li> 229 * </ol> 230 * <p> 231 * in which this method will 232 * </p> 233 * <ol> 234 * <li>return with the result.</li> 235 * <li>throw an {@link XMPPErrorException}.</li> 236 * <li>throw an {@link NoResponseException}.</li> 237 * <li>throw an {@link InterruptedException}.</li> 238 * </ol> 239 * <p> 240 * Additionally the method will throw a {@link NotConnectedException} if no response was 241 * received and the connection got disconnected. 242 * </p> 243 * 244 * @param timeout the amount of time to wait for the next stanza in milliseconds. 245 * @param <P> type of the result stanza. 246 * @return the next available stanza. 247 * @throws NoResponseException if there was no response from the server. 248 * @throws XMPPErrorException in case an error response was received. 249 * @throws InterruptedException if the calling thread was interrupted. 250 * @throws NotConnectedException if there was no response and the connection got disconnected. 251 */ 252 public <P extends Stanza> P nextResultOrThrow(long timeout) throws NoResponseException, 253 XMPPErrorException, InterruptedException, NotConnectedException { 254 P result = nextResult(timeout); 255 cancel(); 256 if (result == null) { 257 if (!connection.isConnected()) { 258 throw new NotConnectedException(connection, packetFilter); 259 } 260 throw NoResponseException.newWith(connection, this); 261 } 262 263 XMPPErrorException.ifHasErrorThenThrow(result); 264 265 return result; 266 } 267 268 /** 269 * Get the number of collected stanzas this stanza(/packet) collector has collected so far. 270 * 271 * @return the count of collected stanzas. 272 * @since 4.1 273 */ 274 public int getCollectedCount() { 275 return resultQueue.size(); 276 } 277 278 /** 279 * Processes a stanza(/packet) to see if it meets the criteria for this stanza(/packet) collector. 280 * If so, the stanza(/packet) is added to the result queue. 281 * 282 * @param packet the stanza(/packet) to process. 283 */ 284 protected void processStanza(Stanza packet) { 285 if (packetFilter == null || packetFilter.accept(packet)) { 286 // CHECKSTYLE:OFF 287 while (!resultQueue.offer(packet)) { 288 // Since we know the queue is full, this poll should never actually block. 289 resultQueue.poll(); 290 } 291 // CHECKSTYLE:ON 292 if (collectorToReset != null) { 293 collectorToReset.waitStart = System.currentTimeMillis(); 294 } 295 } 296 } 297 298 private void throwIfCancelled() { 299 if (cancelled) { 300 throw new IllegalStateException("Packet collector already cancelled"); 301 } 302 } 303 304 /** 305 * Get a new stanza(/packet) collector configuration instance. 306 * 307 * @return a new stanza(/packet) collector configuration. 308 */ 309 public static Configuration newConfiguration() { 310 return new Configuration(); 311 } 312 313 public static final class Configuration { 314 private StanzaFilter packetFilter; 315 private int size = SmackConfiguration.getStanzaCollectorSize(); 316 private StanzaCollector collectorToReset; 317 318 private Configuration() { 319 } 320 321 /** 322 * Set the stanza(/packet) filter used by this collector. If <code>null</code>, then all packets will 323 * get collected by this collector. 324 * 325 * @param packetFilter 326 * @return a reference to this configuration. 327 * @deprecated use {@link #setStanzaFilter(StanzaFilter)} instead. 328 */ 329 @Deprecated 330 public Configuration setPacketFilter(StanzaFilter packetFilter) { 331 return setStanzaFilter(packetFilter); 332 } 333 334 /** 335 * Set the stanza filter used by this collector. If <code>null</code>, then all stanzas will 336 * get collected by this collector. 337 * 338 * @param stanzaFilter 339 * @return a reference to this configuration. 340 */ 341 public Configuration setStanzaFilter(StanzaFilter stanzaFilter) { 342 this.packetFilter = stanzaFilter; 343 return this; 344 } 345 346 /** 347 * Set the maximum size of this collector, i.e. how many stanzas this collector will collect 348 * before dropping old ones. 349 * 350 * @param size 351 * @return a reference to this configuration. 352 */ 353 public Configuration setSize(int size) { 354 this.size = size; 355 return this; 356 } 357 358 /** 359 * Set the collector which timeout for the next result is reset once this collector collects 360 * a packet. 361 * 362 * @param collector 363 * @return a reference to this configuration. 364 */ 365 public Configuration setCollectorToReset(StanzaCollector collector) { 366 this.collectorToReset = collector; 367 return this; 368 } 369 } 370}