001/** 002 * 003 * Copyright 2003-2007 Jive Software, 2016-2018 Florian Schmaus. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.jivesoftware.smack; 019 020import java.util.ArrayList; 021import java.util.List; 022import java.util.concurrent.ArrayBlockingQueue; 023import java.util.concurrent.TimeUnit; 024 025import org.jivesoftware.smack.SmackException.NoResponseException; 026import org.jivesoftware.smack.SmackException.NotConnectedException; 027import org.jivesoftware.smack.XMPPException.XMPPErrorException; 028import org.jivesoftware.smack.filter.StanzaFilter; 029import org.jivesoftware.smack.packet.Stanza; 030 031/** 032 * Provides a mechanism to collect Stanzas into a result queue that pass a 033 * specified filter/matcher. The collector lets you perform blocking and polling 034 * operations on the result queue. So, a StanzaCollector is more suitable to 035 * use than a {@link StanzaListener} when you need to wait for a specific 036 * result.<p> 037 * 038 * Each stanza collector will queue up a configured number of packets for processing before 039 * older packets are automatically dropped. The default number is retrieved by 040 * {@link SmackConfiguration#getStanzaCollectorSize()}. 041 * 042 * @see XMPPConnection#createStanzaCollector(StanzaFilter) 043 * @author Matt Tucker 044 */ 045public class StanzaCollector { 046 047 private final StanzaFilter packetFilter; 048 049 private final ArrayBlockingQueue<Stanza> resultQueue; 050 051 /** 052 * The stanza collector which timeout for the next result will get reset once this collector collects a stanza. 053 */ 054 private final StanzaCollector collectorToReset; 055 056 private final XMPPConnection connection; 057 058 private final Stanza request; 059 060 private volatile boolean cancelled = false; 061 062 /** 063 * Creates a new stanza collector. If the stanza filter is <tt>null</tt>, then 064 * all packets will match this collector. 065 * 066 * @param connection the connection the collector is tied to. 067 * @param configuration the configuration used to construct this collector 068 */ 069 protected StanzaCollector(XMPPConnection connection, Configuration configuration) { 070 this.connection = connection; 071 this.packetFilter = configuration.packetFilter; 072 this.resultQueue = new ArrayBlockingQueue<>(configuration.size); 073 this.collectorToReset = configuration.collectorToReset; 074 this.request = configuration.request; 075 } 076 077 /** 078 * Explicitly cancels the stanza collector so that no more results are 079 * queued up. Once a stanza collector has been cancelled, it cannot be 080 * re-enabled. Instead, a new stanza collector must be created. 081 */ 082 public void cancel() { 083 // If the packet collector has already been cancelled, do nothing. 084 if (!cancelled) { 085 cancelled = true; 086 connection.removeStanzaCollector(this); 087 } 088 } 089 090 /** 091 * Returns the stanza filter associated with this stanza collector. The packet 092 * filter is used to determine what packets are queued as results. 093 * 094 * @return the stanza filter. 095 * @deprecated use {@link #getStanzaFilter()} instead. 096 */ 097 @Deprecated 098 public StanzaFilter getPacketFilter() { 099 return getStanzaFilter(); 100 } 101 102 /** 103 * Returns the stanza filter associated with this stanza collector. The stanza 104 * filter is used to determine what stanzas are queued as results. 105 * 106 * @return the stanza filter. 107 */ 108 public StanzaFilter getStanzaFilter() { 109 return packetFilter; 110 } 111 112 /** 113 * Polls to see if a stanza is currently available and returns it, or 114 * immediately returns <tt>null</tt> if no packets are currently in the 115 * result queue. 116 * 117 * @param <P> type of the result stanza. 118 * @return the next stanza result, or <tt>null</tt> if there are no more 119 * results. 120 */ 121 @SuppressWarnings("unchecked") 122 public <P extends Stanza> P pollResult() { 123 return (P) resultQueue.poll(); 124 } 125 126 /** 127 * Polls to see if a stanza is currently available and returns it, or 128 * immediately returns <tt>null</tt> if no packets are currently in the 129 * result queue. 130 * <p> 131 * Throws an XMPPErrorException in case the polled stanzas did contain an XMPPError. 132 * </p> 133 * 134 * @param <P> type of the result stanza. 135 * @return the next available packet. 136 * @throws XMPPErrorException in case an error response. 137 */ 138 public <P extends Stanza> P pollResultOrThrow() throws XMPPErrorException { 139 P result = pollResult(); 140 if (result != null) { 141 XMPPErrorException.ifHasErrorThenThrow(result); 142 } 143 return result; 144 } 145 146 /** 147 * Returns the next available packet. The method call will block (not return) until a stanza is 148 * available. 149 * 150 * @param <P> type of the result stanza. 151 * @return the next available packet. 152 * @throws InterruptedException 153 */ 154 @SuppressWarnings("unchecked") 155 public <P extends Stanza> P nextResultBlockForever() throws InterruptedException { 156 throwIfCancelled(); 157 P res = null; 158 while (res == null) { 159 res = (P) resultQueue.take(); 160 } 161 return res; 162 } 163 164 /** 165 * Returns the next available packet. The method call will block until the connection's default 166 * timeout has elapsed. 167 * 168 * @param <P> type of the result stanza. 169 * @return the next available packet. 170 * @throws InterruptedException 171 */ 172 public <P extends Stanza> P nextResult() throws InterruptedException { 173 return nextResult(connection.getReplyTimeout()); 174 } 175 176 private volatile long waitStart; 177 178 /** 179 * Returns the next available packet. The method call will block (not return) 180 * until a stanza is available or the <tt>timeout</tt> has elapsed. If the 181 * timeout elapses without a result, <tt>null</tt> will be returned. 182 * 183 * @param <P> type of the result stanza. 184 * @param timeout the timeout in milliseconds. 185 * @return the next available packet. 186 * @throws InterruptedException 187 */ 188 @SuppressWarnings("unchecked") 189 public <P extends Stanza> P nextResult(long timeout) throws InterruptedException { 190 throwIfCancelled(); 191 P res = null; 192 long remainingWait = timeout; 193 waitStart = System.currentTimeMillis(); 194 do { 195 res = (P) resultQueue.poll(remainingWait, TimeUnit.MILLISECONDS); 196 if (res != null) { 197 return res; 198 } 199 remainingWait = timeout - (System.currentTimeMillis() - waitStart); 200 } while (remainingWait > 0); 201 return null; 202 } 203 204 /** 205 * Returns the next available stanza. The method in equivalent to 206 * {@link #nextResultOrThrow(long)} where the timeout argument is the default reply timeout of 207 * the connection associated with this collector. 208 * 209 * @param <P> type of the result stanza. 210 * @return the next available stanza. 211 * @throws XMPPErrorException in case an error response was received. 212 * @throws NoResponseException if there was no response from the server. 213 * @throws InterruptedException 214 * @throws NotConnectedException 215 * @see #nextResultOrThrow(long) 216 */ 217 public <P extends Stanza> P nextResultOrThrow() throws NoResponseException, XMPPErrorException, 218 InterruptedException, NotConnectedException { 219 return nextResultOrThrow(connection.getReplyTimeout()); 220 } 221 222 /** 223 * Returns the next available stanza. The method call will block until a stanza is 224 * available or the <tt>timeout</tt> has elapsed. This method does also cancel the 225 * collector in every case. 226 * <p> 227 * Three things can happen when waiting for an response: 228 * </p> 229 * <ol> 230 * <li>A result response arrives.</li> 231 * <li>An error response arrives.</li> 232 * <li>An timeout occurs.</li> 233 * <li>The thread is interrupted</li> 234 * </ol> 235 * <p> 236 * in which this method will 237 * </p> 238 * <ol> 239 * <li>return with the result.</li> 240 * <li>throw an {@link XMPPErrorException}.</li> 241 * <li>throw an {@link NoResponseException}.</li> 242 * <li>throw an {@link InterruptedException}.</li> 243 * </ol> 244 * <p> 245 * Additionally the method will throw a {@link NotConnectedException} if no response was 246 * received and the connection got disconnected. 247 * </p> 248 * 249 * @param timeout the amount of time to wait for the next stanza in milliseconds. 250 * @param <P> type of the result stanza. 251 * @return the next available stanza. 252 * @throws NoResponseException if there was no response from the server. 253 * @throws XMPPErrorException in case an error response was received. 254 * @throws InterruptedException if the calling thread was interrupted. 255 * @throws NotConnectedException if there was no response and the connection got disconnected. 256 */ 257 public <P extends Stanza> P nextResultOrThrow(long timeout) throws NoResponseException, 258 XMPPErrorException, InterruptedException, NotConnectedException { 259 P result = nextResult(timeout); 260 cancel(); 261 if (result == null) { 262 if (!connection.isConnected()) { 263 throw new NotConnectedException(connection, packetFilter); 264 } 265 throw NoResponseException.newWith(connection, this); 266 } 267 268 XMPPErrorException.ifHasErrorThenThrow(result); 269 270 return result; 271 } 272 273 private List<Stanza> collectedCache; 274 275 /** 276 * Return a list of all collected stanzas. This method must be invoked after the collector has been cancelled. 277 * 278 * @return a list of collected stanzas. 279 * @since 4.3.0 280 */ 281 public List<Stanza> getCollectedStanzasAfterCancelled() { 282 if (!cancelled) { 283 throw new IllegalStateException("Stanza collector was not yet cancelled"); 284 } 285 286 if (collectedCache == null) { 287 collectedCache = new ArrayList<>(getCollectedCount()); 288 resultQueue.drainTo(collectedCache); 289 } 290 291 return collectedCache; 292 } 293 294 /** 295 * Get the number of collected stanzas this stanza collector has collected so far. 296 * 297 * @return the count of collected stanzas. 298 * @since 4.1 299 */ 300 public int getCollectedCount() { 301 return resultQueue.size(); 302 } 303 304 /** 305 * Processes a stanza to see if it meets the criteria for this stanza collector. 306 * If so, the stanza is added to the result queue. 307 * 308 * @param packet the stanza to process. 309 */ 310 protected void processStanza(Stanza packet) { 311 if (packetFilter == null || packetFilter.accept(packet)) { 312 // CHECKSTYLE:OFF 313 while (!resultQueue.offer(packet)) { 314 // Since we know the queue is full, this poll should never actually block. 315 resultQueue.poll(); 316 } 317 // CHECKSTYLE:ON 318 if (collectorToReset != null) { 319 collectorToReset.waitStart = System.currentTimeMillis(); 320 } 321 } 322 } 323 324 private void throwIfCancelled() { 325 if (cancelled) { 326 throw new IllegalStateException("Stanza collector already cancelled"); 327 } 328 } 329 330 /** 331 * Get a new stanza collector configuration instance. 332 * 333 * @return a new stanza collector configuration. 334 */ 335 public static Configuration newConfiguration() { 336 return new Configuration(); 337 } 338 339 public static final class Configuration { 340 private StanzaFilter packetFilter; 341 private int size = SmackConfiguration.getStanzaCollectorSize(); 342 private StanzaCollector collectorToReset; 343 private Stanza request; 344 345 private Configuration() { 346 } 347 348 /** 349 * Set the stanza filter used by this collector. If <code>null</code>, then all packets will 350 * get collected by this collector. 351 * 352 * @param packetFilter 353 * @return a reference to this configuration. 354 * @deprecated use {@link #setStanzaFilter(StanzaFilter)} instead. 355 */ 356 @Deprecated 357 public Configuration setPacketFilter(StanzaFilter packetFilter) { 358 return setStanzaFilter(packetFilter); 359 } 360 361 /** 362 * Set the stanza filter used by this collector. If <code>null</code>, then all stanzas will 363 * get collected by this collector. 364 * 365 * @param stanzaFilter 366 * @return a reference to this configuration. 367 */ 368 public Configuration setStanzaFilter(StanzaFilter stanzaFilter) { 369 this.packetFilter = stanzaFilter; 370 return this; 371 } 372 373 /** 374 * Set the maximum size of this collector, i.e. how many stanzas this collector will collect 375 * before dropping old ones. 376 * 377 * @param size 378 * @return a reference to this configuration. 379 */ 380 public Configuration setSize(int size) { 381 this.size = size; 382 return this; 383 } 384 385 /** 386 * Set the collector which timeout for the next result is reset once this collector collects 387 * a packet. 388 * 389 * @param collector 390 * @return a reference to this configuration. 391 */ 392 public Configuration setCollectorToReset(StanzaCollector collector) { 393 this.collectorToReset = collector; 394 return this; 395 } 396 397 public Configuration setRequest(Stanza request) { 398 this.request = request; 399 return this; 400 } 401 } 402}