001/** 002 * 003 * Copyright 2003-2007 Jive Software, 2016-2019 Florian Schmaus. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.jivesoftware.smack; 019 020import java.util.ArrayDeque; 021import java.util.ArrayList; 022import java.util.List; 023 024import org.jivesoftware.smack.SmackException.NoResponseException; 025import org.jivesoftware.smack.SmackException.NotConnectedException; 026import org.jivesoftware.smack.XMPPException.XMPPErrorException; 027import org.jivesoftware.smack.filter.StanzaFilter; 028import org.jivesoftware.smack.packet.Stanza; 029 030/** 031 * Provides a mechanism to collect Stanzas into a result queue that pass a 032 * specified filter/matcher. The collector lets you perform blocking and polling 033 * operations on the result queue. So, a StanzaCollector is more suitable to 034 * use than a {@link StanzaListener} when you need to wait for a specific 035 * result.<p> 036 * 037 * Each stanza collector will queue up a configured number of packets for processing before 038 * older packets are automatically dropped. The default number is retrieved by 039 * {@link SmackConfiguration#getStanzaCollectorSize()}. 040 * 041 * @see XMPPConnection#createStanzaCollector(StanzaFilter) 042 * @author Matt Tucker 043 */ 044public final class StanzaCollector implements AutoCloseable { 045 046 private final StanzaFilter packetFilter; 047 048 private final ArrayDeque<Stanza> resultQueue; 049 050 private final int maxQueueSize; 051 052 /** 053 * The stanza collector which timeout for the next result will get reset once this collector collects a stanza. 054 */ 055 private final StanzaCollector collectorToReset; 056 057 private final XMPPConnection connection; 058 059 private final Stanza request; 060 061 private volatile boolean cancelled; 062 063 private Exception connectionException; 064 065 /** 066 * Creates a new stanza collector. If the stanza filter is <code>null</code>, then 067 * all packets will match this collector. 068 * 069 * @param connection the connection the collector is tied to. 070 * @param configuration the configuration used to construct this collector 071 */ 072 StanzaCollector(XMPPConnection connection, Configuration configuration) { 073 this.connection = connection; 074 this.packetFilter = configuration.packetFilter; 075 this.resultQueue = new ArrayDeque<>(configuration.size); 076 this.maxQueueSize = configuration.size; 077 this.collectorToReset = configuration.collectorToReset; 078 this.request = configuration.request; 079 } 080 081 /** 082 * Explicitly cancels the stanza collector so that no more results are 083 * queued up. Once a stanza collector has been cancelled, it cannot be 084 * re-enabled. Instead, a new stanza collector must be created. 085 */ 086 public synchronized void cancel() { 087 // If the packet collector has already been cancelled, do nothing. 088 if (cancelled) { 089 return; 090 } 091 092 cancelled = true; 093 connection.removeStanzaCollector(this); 094 notifyAll(); 095 096 if (collectorToReset != null) { 097 collectorToReset.cancel(); 098 } 099 } 100 101 /** 102 * Returns the stanza filter associated with this stanza collector. The stanza 103 * filter is used to determine what stanzas are queued as results. 104 * 105 * @return the stanza filter. 106 */ 107 public StanzaFilter getStanzaFilter() { 108 return packetFilter; 109 } 110 111 /** 112 * Polls to see if a stanza is currently available and returns it, or 113 * immediately returns <code>null</code> if no packets are currently in the 114 * result queue. 115 * 116 * @param <P> type of the result stanza. 117 * @return the next stanza result, or <code>null</code> if there are no more 118 * results. 119 */ 120 @SuppressWarnings("unchecked") 121 public synchronized <P extends Stanza> P pollResult() { 122 return (P) resultQueue.poll(); 123 } 124 125 /** 126 * Polls to see if a stanza is currently available and returns it, or 127 * immediately returns <code>null</code> if no packets are currently in the 128 * result queue. 129 * <p> 130 * Throws an XMPPErrorException in case the polled stanzas did contain an XMPPError. 131 * </p> 132 * 133 * @param <P> type of the result stanza. 134 * @return the next available packet. 135 * @throws XMPPErrorException in case an error response. 136 */ 137 public <P extends Stanza> P pollResultOrThrow() throws XMPPErrorException { 138 P result = pollResult(); 139 if (result != null) { 140 XMPPErrorException.ifHasErrorThenThrow(result); 141 } 142 return result; 143 } 144 145 /** 146 * Returns the next available packet. The method call will block (not return) until a stanza is 147 * available. 148 * 149 * @param <P> type of the result stanza. 150 * @return the next available packet. 151 * @throws InterruptedException if the calling thread was interrupted. 152 */ 153 @SuppressWarnings("unchecked") 154 // TODO: Consider removing this method as it is hardly ever useful. 155 public synchronized <P extends Stanza> P nextResultBlockForever() throws InterruptedException { 156 throwIfCancelled(); 157 158 while (true) { 159 P res = (P) resultQueue.poll(); 160 if (res != null) { 161 return res; 162 } 163 if (cancelled) { 164 return null; 165 } 166 wait(); 167 } 168 } 169 170 /** 171 * Returns the next available packet. The method call will block until the connection's default 172 * timeout has elapsed. 173 * 174 * @param <P> type of the result stanza. 175 * @return the next available packet. 176 * @throws InterruptedException if the calling thread was interrupted. 177 */ 178 public <P extends Stanza> P nextResult() throws InterruptedException { 179 return nextResult(connection.getReplyTimeout()); 180 } 181 182 private volatile long waitStart; 183 184 /** 185 * Returns the next available stanza. The method call will block (not return) until a stanza is available or the 186 * <code>timeout</code> has elapsed or if the connection was terminated because of an error. If the timeout elapses without a 187 * result or if there was an connection error, <code>null</code> will be returned. 188 * 189 * @param <P> type of the result stanza. 190 * @param timeout the timeout in milliseconds. 191 * @return the next available stanza or <code>null</code> on timeout or connection error. 192 * @throws InterruptedException if the calling thread was interrupted. 193 */ 194 @SuppressWarnings("unchecked") 195 public <P extends Stanza> P nextResult(long timeout) throws InterruptedException { 196 throwIfCancelled(); 197 P res = null; 198 long remainingWait = timeout; 199 waitStart = System.currentTimeMillis(); 200 while (remainingWait > 0 && connectionException == null && !cancelled) { 201 synchronized (this) { 202 res = (P) resultQueue.poll(); 203 if (res != null) { 204 return res; 205 } 206 wait(remainingWait); 207 } 208 remainingWait = timeout - (System.currentTimeMillis() - waitStart); 209 } 210 return res; 211 } 212 213 /** 214 * Returns the next available stanza. The method in equivalent to 215 * {@link #nextResultOrThrow(long)} where the timeout argument is the default reply timeout of 216 * the connection associated with this collector. 217 * 218 * @param <P> type of the result stanza. 219 * @return the next available stanza. 220 * @throws XMPPErrorException in case an error response was received. 221 * @throws NoResponseException if there was no response from the server. 222 * @throws InterruptedException if the calling thread was interrupted. 223 * @throws NotConnectedException if the XMPP connection is not connected. 224 * @see #nextResultOrThrow(long) 225 */ 226 public <P extends Stanza> P nextResultOrThrow() throws NoResponseException, XMPPErrorException, 227 InterruptedException, NotConnectedException { 228 return nextResultOrThrow(connection.getReplyTimeout()); 229 } 230 231 /** 232 * Returns the next available stanza. The method call will block until a stanza is 233 * available or the <code>timeout</code> has elapsed. This method does also cancel the 234 * collector in every case. 235 * <p> 236 * Three things can happen when waiting for an response: 237 * </p> 238 * <ol> 239 * <li>A result response arrives.</li> 240 * <li>An error response arrives.</li> 241 * <li>An timeout occurs.</li> 242 * <li>The thread is interrupted</li> 243 * </ol> 244 * <p> 245 * in which this method will 246 * </p> 247 * <ol> 248 * <li>return with the result.</li> 249 * <li>throw an {@link XMPPErrorException}.</li> 250 * <li>throw an {@link NoResponseException}.</li> 251 * <li>throw an {@link InterruptedException}.</li> 252 * </ol> 253 * <p> 254 * Additionally the method will throw a {@link NotConnectedException} if no response was 255 * received and the connection got disconnected. 256 * </p> 257 * 258 * @param timeout the amount of time to wait for the next stanza in milliseconds. 259 * @param <P> type of the result stanza. 260 * @return the next available stanza. 261 * @throws NoResponseException if there was no response from the server. 262 * @throws XMPPErrorException in case an error response was received. 263 * @throws InterruptedException if the calling thread was interrupted. 264 * @throws NotConnectedException if there was no response and the connection got disconnected. 265 */ 266 public <P extends Stanza> P nextResultOrThrow(long timeout) throws NoResponseException, 267 XMPPErrorException, InterruptedException, NotConnectedException { 268 P result; 269 try { 270 result = nextResult(timeout); 271 } finally { 272 cancel(); 273 } 274 if (result == null) { 275 if (connectionException != null) { 276 throw new NotConnectedException(connection, packetFilter, connectionException); 277 } 278 if (!connection.isConnected()) { 279 throw new NotConnectedException(connection, packetFilter); 280 } 281 throw NoResponseException.newWith(timeout, this, cancelled); 282 } 283 284 XMPPErrorException.ifHasErrorThenThrow(result); 285 286 return result; 287 } 288 289 private List<Stanza> collectedCache; 290 291 /** 292 * Return a list of all collected stanzas. This method must be invoked after the collector has been cancelled. 293 * 294 * @return a list of collected stanzas. 295 * @since 4.3.0 296 */ 297 public List<Stanza> getCollectedStanzasAfterCancelled() { 298 if (!cancelled) { 299 throw new IllegalStateException("Stanza collector was not yet cancelled"); 300 } 301 302 if (collectedCache == null) { 303 collectedCache = new ArrayList<>(getCollectedCount()); 304 collectedCache.addAll(resultQueue); 305 } 306 307 return collectedCache; 308 } 309 310 /** 311 * Get the number of collected stanzas this stanza collector has collected so far. 312 * 313 * @return the count of collected stanzas. 314 * @since 4.1 315 */ 316 public synchronized int getCollectedCount() { 317 return resultQueue.size(); 318 } 319 320 private String stringCache; 321 322 @Override 323 public String toString() { 324 if (stringCache == null) { 325 StringBuilder sb = new StringBuilder(128); 326 sb.append("Stanza Collector filter='").append(packetFilter).append('\''); 327 if (request != null) { 328 sb.append(" request='").append(request).append('\''); 329 } 330 stringCache = sb.toString(); 331 } 332 return stringCache; 333 } 334 335 synchronized void notifyConnectionError(Exception exception) { 336 connectionException = exception; 337 notifyAll(); 338 } 339 340 /** 341 * Processes a stanza to see if it meets the criteria for this stanza collector. 342 * If so, the stanza is added to the result queue. 343 * 344 * @param packet the stanza to process. 345 */ 346 void processStanza(Stanza packet) { 347 if (packetFilter == null || packetFilter.accept(packet)) { 348 synchronized (this) { 349 if (resultQueue.size() == maxQueueSize) { 350 Stanza rolledOverStanza = resultQueue.poll(); 351 assert rolledOverStanza != null; 352 } 353 resultQueue.add(packet); 354 notifyAll(); 355 } 356 if (collectorToReset != null) { 357 collectorToReset.waitStart = System.currentTimeMillis(); 358 } 359 } 360 } 361 362 private void throwIfCancelled() { 363 if (cancelled) { 364 throw new IllegalStateException("Stanza collector already cancelled"); 365 } 366 } 367 368 /** 369 * Get a new stanza collector configuration instance. 370 * 371 * @return a new stanza collector configuration. 372 */ 373 public static Configuration newConfiguration() { 374 return new Configuration(); 375 } 376 377 public static final class Configuration { 378 private StanzaFilter packetFilter; 379 private int size = SmackConfiguration.getStanzaCollectorSize(); 380 private StanzaCollector collectorToReset; 381 private Stanza request; 382 383 private Configuration() { 384 } 385 386 /** 387 * Set the stanza filter used by this collector. If <code>null</code>, then all stanzas will 388 * get collected by this collector. 389 * 390 * @param stanzaFilter TODO javadoc me please 391 * @return a reference to this configuration. 392 */ 393 public Configuration setStanzaFilter(StanzaFilter stanzaFilter) { 394 this.packetFilter = stanzaFilter; 395 return this; 396 } 397 398 /** 399 * Set the maximum size of this collector, i.e. how many stanzas this collector will collect 400 * before dropping old ones. 401 * 402 * @param size TODO javadoc me please 403 * @return a reference to this configuration. 404 */ 405 public Configuration setSize(int size) { 406 this.size = size; 407 return this; 408 } 409 410 /** 411 * Set the collector which timeout for the next result is reset once this collector collects 412 * a packet. 413 * 414 * @param collector TODO javadoc me please 415 * @return a reference to this configuration. 416 */ 417 public Configuration setCollectorToReset(StanzaCollector collector) { 418 this.collectorToReset = collector; 419 return this; 420 } 421 422 public Configuration setRequest(Stanza request) { 423 this.request = request; 424 return this; 425 } 426 } 427 428 @Override 429 public void close() { 430 cancel(); 431 } 432 433}