StanzaCollector.java
- /**
- *
- * Copyright 2003-2007 Jive Software, 2016-2019 Florian Schmaus.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.jivesoftware.smack;
- import java.util.ArrayDeque;
- import java.util.ArrayList;
- import java.util.List;
- import org.jivesoftware.smack.SmackException.NoResponseException;
- import org.jivesoftware.smack.SmackException.NotConnectedException;
- import org.jivesoftware.smack.XMPPException.XMPPErrorException;
- import org.jivesoftware.smack.filter.StanzaFilter;
- import org.jivesoftware.smack.packet.Stanza;
- /**
- * Provides a mechanism to collect Stanzas into a result queue that pass a
- * specified filter/matcher. The collector lets you perform blocking and polling
- * operations on the result queue. So, a StanzaCollector is more suitable to
- * use than a {@link StanzaListener} when you need to wait for a specific
- * result.<p>
- *
- * Each stanza collector will queue up a configured number of packets for processing before
- * older packets are automatically dropped. The default number is retrieved by
- * {@link SmackConfiguration#getStanzaCollectorSize()}.
- *
- * @see XMPPConnection#createStanzaCollector(StanzaFilter)
- * @author Matt Tucker
- */
- public final class StanzaCollector implements AutoCloseable {
- private final StanzaFilter packetFilter;
- private final ArrayDeque<Stanza> resultQueue;
- private final int maxQueueSize;
- /**
- * The stanza collector which timeout for the next result will get reset once this collector collects a stanza.
- */
- private final StanzaCollector collectorToReset;
- private final XMPPConnection connection;
- private final Stanza request;
- private volatile boolean cancelled;
- private Exception connectionException;
- /**
- * Creates a new stanza collector. If the stanza filter is <code>null</code>, then
- * all packets will match this collector.
- *
- * @param connection the connection the collector is tied to.
- * @param configuration the configuration used to construct this collector
- */
- StanzaCollector(XMPPConnection connection, Configuration configuration) {
- this.connection = connection;
- this.packetFilter = configuration.packetFilter;
- this.resultQueue = new ArrayDeque<>(configuration.size);
- this.maxQueueSize = configuration.size;
- this.collectorToReset = configuration.collectorToReset;
- this.request = configuration.request;
- }
- /**
- * Explicitly cancels the stanza collector so that no more results are
- * queued up. Once a stanza collector has been cancelled, it cannot be
- * re-enabled. Instead, a new stanza collector must be created.
- */
- public synchronized void cancel() {
- // If the packet collector has already been cancelled, do nothing.
- if (cancelled) {
- return;
- }
- cancelled = true;
- connection.removeStanzaCollector(this);
- notifyAll();
- if (collectorToReset != null) {
- collectorToReset.cancel();
- }
- }
- /**
- * Returns the stanza filter associated with this stanza collector. The stanza
- * filter is used to determine what stanzas are queued as results.
- *
- * @return the stanza filter.
- */
- public StanzaFilter getStanzaFilter() {
- return packetFilter;
- }
- /**
- * Polls to see if a stanza is currently available and returns it, or
- * immediately returns <code>null</code> if no packets are currently in the
- * result queue.
- *
- * @param <P> type of the result stanza.
- * @return the next stanza result, or <code>null</code> if there are no more
- * results.
- */
- @SuppressWarnings("unchecked")
- public synchronized <P extends Stanza> P pollResult() {
- return (P) resultQueue.poll();
- }
- /**
- * Polls to see if a stanza is currently available and returns it, or
- * immediately returns <code>null</code> if no packets are currently in the
- * result queue.
- * <p>
- * Throws an XMPPErrorException in case the polled stanzas did contain an XMPPError.
- * </p>
- *
- * @param <P> type of the result stanza.
- * @return the next available packet.
- * @throws XMPPErrorException in case an error response.
- */
- public <P extends Stanza> P pollResultOrThrow() throws XMPPErrorException {
- P result = pollResult();
- if (result != null) {
- XMPPErrorException.ifHasErrorThenThrow(result);
- }
- return result;
- }
- /**
- * Returns the next available packet. The method call will block (not return) until a stanza is
- * available.
- *
- * @param <P> type of the result stanza.
- * @return the next available packet.
- * @throws InterruptedException if the calling thread was interrupted.
- */
- @SuppressWarnings("unchecked")
- // TODO: Consider removing this method as it is hardly ever useful.
- public synchronized <P extends Stanza> P nextResultBlockForever() throws InterruptedException {
- throwIfCancelled();
- while (true) {
- P res = (P) resultQueue.poll();
- if (res != null) {
- return res;
- }
- if (cancelled) {
- return null;
- }
- wait();
- }
- }
- /**
- * Returns the next available packet. The method call will block until the connection's default
- * timeout has elapsed.
- *
- * @param <P> type of the result stanza.
- * @return the next available packet.
- * @throws InterruptedException if the calling thread was interrupted.
- */
- public <P extends Stanza> P nextResult() throws InterruptedException {
- return nextResult(connection.getReplyTimeout());
- }
- private volatile long waitStart;
- /**
- * Returns the next available stanza. The method call will block (not return) until a stanza is available or the
- * <code>timeout</code> has elapsed or if the connection was terminated because of an error. If the timeout elapses without a
- * result or if there was an connection error, <code>null</code> will be returned.
- *
- * @param <P> type of the result stanza.
- * @param timeout the timeout in milliseconds.
- * @return the next available stanza or <code>null</code> on timeout or connection error.
- * @throws InterruptedException if the calling thread was interrupted.
- */
- @SuppressWarnings("unchecked")
- public <P extends Stanza> P nextResult(long timeout) throws InterruptedException {
- throwIfCancelled();
- P res = null;
- long remainingWait = timeout;
- waitStart = System.currentTimeMillis();
- while (remainingWait > 0 && connectionException == null && !cancelled) {
- synchronized (this) {
- res = (P) resultQueue.poll();
- if (res != null) {
- return res;
- }
- wait(remainingWait);
- }
- remainingWait = timeout - (System.currentTimeMillis() - waitStart);
- }
- return res;
- }
- /**
- * Returns the next available stanza. The method in equivalent to
- * {@link #nextResultOrThrow(long)} where the timeout argument is the default reply timeout of
- * the connection associated with this collector.
- *
- * @param <P> type of the result stanza.
- * @return the next available stanza.
- * @throws XMPPErrorException in case an error response was received.
- * @throws NoResponseException if there was no response from the server.
- * @throws InterruptedException if the calling thread was interrupted.
- * @throws NotConnectedException if the XMPP connection is not connected.
- * @see #nextResultOrThrow(long)
- */
- public <P extends Stanza> P nextResultOrThrow() throws NoResponseException, XMPPErrorException,
- InterruptedException, NotConnectedException {
- return nextResultOrThrow(connection.getReplyTimeout());
- }
- /**
- * Returns the next available stanza. The method call will block until a stanza is
- * available or the <code>timeout</code> has elapsed. This method does also cancel the
- * collector in every case.
- * <p>
- * Three things can happen when waiting for an response:
- * </p>
- * <ol>
- * <li>A result response arrives.</li>
- * <li>An error response arrives.</li>
- * <li>An timeout occurs.</li>
- * <li>The thread is interrupted</li>
- * </ol>
- * <p>
- * in which this method will
- * </p>
- * <ol>
- * <li>return with the result.</li>
- * <li>throw an {@link XMPPErrorException}.</li>
- * <li>throw an {@link NoResponseException}.</li>
- * <li>throw an {@link InterruptedException}.</li>
- * </ol>
- * <p>
- * Additionally the method will throw a {@link NotConnectedException} if no response was
- * received and the connection got disconnected.
- * </p>
- *
- * @param timeout the amount of time to wait for the next stanza in milliseconds.
- * @param <P> type of the result stanza.
- * @return the next available stanza.
- * @throws NoResponseException if there was no response from the server.
- * @throws XMPPErrorException in case an error response was received.
- * @throws InterruptedException if the calling thread was interrupted.
- * @throws NotConnectedException if there was no response and the connection got disconnected.
- */
- public <P extends Stanza> P nextResultOrThrow(long timeout) throws NoResponseException,
- XMPPErrorException, InterruptedException, NotConnectedException {
- P result;
- try {
- result = nextResult(timeout);
- } finally {
- cancel();
- }
- if (result == null) {
- if (connectionException != null) {
- throw new NotConnectedException(connection, packetFilter, connectionException);
- }
- if (!connection.isConnected()) {
- throw new NotConnectedException(connection, packetFilter);
- }
- throw NoResponseException.newWith(timeout, this, cancelled);
- }
- XMPPErrorException.ifHasErrorThenThrow(result);
- return result;
- }
- private List<Stanza> collectedCache;
- /**
- * Return a list of all collected stanzas. This method must be invoked after the collector has been cancelled.
- *
- * @return a list of collected stanzas.
- * @since 4.3.0
- */
- public List<Stanza> getCollectedStanzasAfterCancelled() {
- if (!cancelled) {
- throw new IllegalStateException("Stanza collector was not yet cancelled");
- }
- if (collectedCache == null) {
- collectedCache = new ArrayList<>(getCollectedCount());
- collectedCache.addAll(resultQueue);
- }
- return collectedCache;
- }
- /**
- * Get the number of collected stanzas this stanza collector has collected so far.
- *
- * @return the count of collected stanzas.
- * @since 4.1
- */
- public synchronized int getCollectedCount() {
- return resultQueue.size();
- }
- private String stringCache;
- @Override
- public String toString() {
- if (stringCache == null) {
- StringBuilder sb = new StringBuilder(128);
- sb.append("Stanza Collector filter='").append(packetFilter).append('\'');
- if (request != null) {
- sb.append(" request='").append(request).append('\'');
- }
- stringCache = sb.toString();
- }
- return stringCache;
- }
- synchronized void notifyConnectionError(Exception exception) {
- connectionException = exception;
- notifyAll();
- }
- /**
- * Processes a stanza to see if it meets the criteria for this stanza collector.
- * If so, the stanza is added to the result queue.
- *
- * @param packet the stanza to process.
- */
- void processStanza(Stanza packet) {
- if (packetFilter == null || packetFilter.accept(packet)) {
- synchronized (this) {
- if (resultQueue.size() == maxQueueSize) {
- Stanza rolledOverStanza = resultQueue.poll();
- assert rolledOverStanza != null;
- }
- resultQueue.add(packet);
- notifyAll();
- }
- if (collectorToReset != null) {
- collectorToReset.waitStart = System.currentTimeMillis();
- }
- }
- }
- private void throwIfCancelled() {
- if (cancelled) {
- throw new IllegalStateException("Stanza collector already cancelled");
- }
- }
- /**
- * Get a new stanza collector configuration instance.
- *
- * @return a new stanza collector configuration.
- */
- public static Configuration newConfiguration() {
- return new Configuration();
- }
- public static final class Configuration {
- private StanzaFilter packetFilter;
- private int size = SmackConfiguration.getStanzaCollectorSize();
- private StanzaCollector collectorToReset;
- private Stanza request;
- private Configuration() {
- }
- /**
- * Set the stanza filter used by this collector. If <code>null</code>, then all stanzas will
- * get collected by this collector.
- *
- * @param stanzaFilter TODO javadoc me please
- * @return a reference to this configuration.
- */
- public Configuration setStanzaFilter(StanzaFilter stanzaFilter) {
- this.packetFilter = stanzaFilter;
- return this;
- }
- /**
- * Set the maximum size of this collector, i.e. how many stanzas this collector will collect
- * before dropping old ones.
- *
- * @param size TODO javadoc me please
- * @return a reference to this configuration.
- */
- public Configuration setSize(int size) {
- this.size = size;
- return this;
- }
- /**
- * Set the collector which timeout for the next result is reset once this collector collects
- * a packet.
- *
- * @param collector TODO javadoc me please
- * @return a reference to this configuration.
- */
- public Configuration setCollectorToReset(StanzaCollector collector) {
- this.collectorToReset = collector;
- return this;
- }
- public Configuration setRequest(Stanza request) {
- this.request = request;
- return this;
- }
- }
- @Override
- public void close() {
- cancel();
- }
- }