StanzaCollector.java

  1. /**
  2.  *
  3.  * Copyright 2003-2007 Jive Software, 2016-2019 Florian Schmaus.
  4.  *
  5.  * Licensed under the Apache License, Version 2.0 (the "License");
  6.  * you may not use this file except in compliance with the License.
  7.  * You may obtain a copy of the License at
  8.  *
  9.  *     http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */

  17. package org.jivesoftware.smack;

  18. import java.util.ArrayDeque;
  19. import java.util.ArrayList;
  20. import java.util.List;

  21. import org.jivesoftware.smack.SmackException.NoResponseException;
  22. import org.jivesoftware.smack.SmackException.NotConnectedException;
  23. import org.jivesoftware.smack.XMPPException.XMPPErrorException;
  24. import org.jivesoftware.smack.filter.StanzaFilter;
  25. import org.jivesoftware.smack.packet.Stanza;

  26. /**
  27.  * Provides a mechanism to collect Stanzas into a result queue that pass a
  28.  * specified filter/matcher. The collector lets you perform blocking and polling
  29.  * operations on the result queue. So, a StanzaCollector is more suitable to
  30.  * use than a {@link StanzaListener} when you need to wait for a specific
  31.  * result.<p>
  32.  *
  33.  * Each stanza collector will queue up a configured number of packets for processing before
  34.  * older packets are automatically dropped.  The default number is retrieved by
  35.  * {@link SmackConfiguration#getStanzaCollectorSize()}.
  36.  *
  37.  * @see XMPPConnection#createStanzaCollector(StanzaFilter)
  38.  * @author Matt Tucker
  39.  */
  40. public final class StanzaCollector implements AutoCloseable {

  41.     private final StanzaFilter packetFilter;

  42.     private final ArrayDeque<Stanza> resultQueue;

  43.     private final int maxQueueSize;

  44.     /**
  45.      * The stanza collector which timeout for the next result will get reset once this collector collects a stanza.
  46.      */
  47.     private final StanzaCollector collectorToReset;

  48.     private final XMPPConnection connection;

  49.     private final Stanza request;

  50.     private volatile boolean cancelled;

  51.     private Exception connectionException;

  52.     /**
  53.      * Creates a new stanza collector. If the stanza filter is <code>null</code>, then
  54.      * all packets will match this collector.
  55.      *
  56.      * @param connection the connection the collector is tied to.
  57.      * @param configuration the configuration used to construct this collector
  58.      */
  59.     StanzaCollector(XMPPConnection connection, Configuration configuration) {
  60.         this.connection = connection;
  61.         this.packetFilter = configuration.packetFilter;
  62.         this.resultQueue = new ArrayDeque<>(configuration.size);
  63.         this.maxQueueSize = configuration.size;
  64.         this.collectorToReset = configuration.collectorToReset;
  65.         this.request = configuration.request;
  66.     }

  67.     /**
  68.      * Explicitly cancels the stanza collector so that no more results are
  69.      * queued up. Once a stanza collector has been cancelled, it cannot be
  70.      * re-enabled. Instead, a new stanza collector must be created.
  71.      */
  72.     public synchronized void cancel() {
  73.         // If the packet collector has already been cancelled, do nothing.
  74.         if (cancelled) {
  75.             return;
  76.         }

  77.         cancelled = true;
  78.         connection.removeStanzaCollector(this);
  79.         notifyAll();

  80.         if (collectorToReset != null) {
  81.             collectorToReset.cancel();
  82.         }
  83.     }

  84.     /**
  85.      * Returns the stanza filter associated with this stanza collector. The stanza
  86.      * filter is used to determine what stanzas are queued as results.
  87.      *
  88.      * @return the stanza filter.
  89.      */
  90.     public StanzaFilter getStanzaFilter() {
  91.         return packetFilter;
  92.     }

  93.     /**
  94.      * Polls to see if a stanza is currently available and returns it, or
  95.      * immediately returns <code>null</code> if no packets are currently in the
  96.      * result queue.
  97.      *
  98.      * @param <P> type of the result stanza.
  99.      * @return the next stanza result, or <code>null</code> if there are no more
  100.      *      results.
  101.      */
  102.     @SuppressWarnings("unchecked")
  103.     public synchronized <P extends Stanza> P pollResult() {
  104.         return (P) resultQueue.poll();
  105.     }

  106.     /**
  107.      * Polls to see if a stanza is currently available and returns it, or
  108.      * immediately returns <code>null</code> if no packets are currently in the
  109.      * result queue.
  110.      * <p>
  111.      * Throws an XMPPErrorException in case the polled stanzas did contain an XMPPError.
  112.      * </p>
  113.      *
  114.      * @param <P> type of the result stanza.
  115.      * @return the next available packet.
  116.      * @throws XMPPErrorException in case an error response.
  117.      */
  118.     public <P extends Stanza> P pollResultOrThrow() throws XMPPErrorException {
  119.         P result = pollResult();
  120.         if (result != null) {
  121.             XMPPErrorException.ifHasErrorThenThrow(result);
  122.         }
  123.         return result;
  124.     }

  125.     /**
  126.      * Returns the next available packet. The method call will block (not return) until a stanza is
  127.      * available.
  128.      *
  129.      * @param <P> type of the result stanza.
  130.      * @return the next available packet.
  131.      * @throws InterruptedException if the calling thread was interrupted.
  132.      */
  133.     @SuppressWarnings("unchecked")
  134.     // TODO: Consider removing this method as it is hardly ever useful.
  135.     public synchronized <P extends Stanza> P nextResultBlockForever() throws InterruptedException {
  136.         throwIfCancelled();

  137.         while (true) {
  138.             P res = (P) resultQueue.poll();
  139.             if (res != null) {
  140.                 return res;
  141.             }
  142.             if (cancelled) {
  143.                 return null;
  144.             }
  145.             wait();
  146.         }
  147.     }

  148.     /**
  149.      * Returns the next available packet. The method call will block until the connection's default
  150.      * timeout has elapsed.
  151.      *
  152.      * @param <P> type of the result stanza.
  153.      * @return the next available packet.
  154.      * @throws InterruptedException if the calling thread was interrupted.
  155.      */
  156.     public <P extends Stanza> P nextResult() throws InterruptedException {
  157.         return nextResult(connection.getReplyTimeout());
  158.     }

  159.     private volatile long waitStart;

  160.     /**
  161.      * Returns the next available stanza. The method call will block (not return) until a stanza is available or the
  162.      * <code>timeout</code> has elapsed or if the connection was terminated because of an error. If the timeout elapses without a
  163.      * result or if there was an connection error, <code>null</code> will be returned.
  164.      *
  165.      * @param <P> type of the result stanza.
  166.      * @param timeout the timeout in milliseconds.
  167.      * @return the next available stanza or <code>null</code> on timeout or connection error.
  168.      * @throws InterruptedException if the calling thread was interrupted.
  169.      */
  170.     @SuppressWarnings("unchecked")
  171.     public <P extends Stanza> P nextResult(long timeout) throws InterruptedException {
  172.         throwIfCancelled();
  173.         P res = null;
  174.         long remainingWait = timeout;
  175.         waitStart = System.currentTimeMillis();
  176.         while (remainingWait > 0 && connectionException == null && !cancelled) {
  177.             synchronized (this) {
  178.                 res = (P) resultQueue.poll();
  179.                 if (res != null) {
  180.                     return res;
  181.                 }
  182.                 wait(remainingWait);
  183.             }
  184.             remainingWait = timeout - (System.currentTimeMillis() - waitStart);
  185.         }
  186.         return res;
  187.     }

  188.     /**
  189.      * Returns the next available stanza. The method in equivalent to
  190.      * {@link #nextResultOrThrow(long)} where the timeout argument is the default reply timeout of
  191.      * the connection associated with this collector.
  192.      *
  193.      * @param <P> type of the result stanza.
  194.      * @return the next available stanza.
  195.      * @throws XMPPErrorException in case an error response was received.
  196.      * @throws NoResponseException if there was no response from the server.
  197.      * @throws InterruptedException if the calling thread was interrupted.
  198.      * @throws NotConnectedException if the XMPP connection is not connected.
  199.      * @see #nextResultOrThrow(long)
  200.      */
  201.     public <P extends Stanza> P nextResultOrThrow() throws NoResponseException, XMPPErrorException,
  202.                     InterruptedException, NotConnectedException {
  203.         return nextResultOrThrow(connection.getReplyTimeout());
  204.     }

  205.     /**
  206.      * Returns the next available stanza. The method call will block until a stanza is
  207.      * available or the <code>timeout</code> has elapsed. This method does also cancel the
  208.      * collector in every case.
  209.      * <p>
  210.      * Three things can happen when waiting for an response:
  211.      * </p>
  212.      * <ol>
  213.      * <li>A result response arrives.</li>
  214.      * <li>An error response arrives.</li>
  215.      * <li>An timeout occurs.</li>
  216.      * <li>The thread is interrupted</li>
  217.      * </ol>
  218.      * <p>
  219.      * in which this method will
  220.      * </p>
  221.      * <ol>
  222.      * <li>return with the result.</li>
  223.      * <li>throw an {@link XMPPErrorException}.</li>
  224.      * <li>throw an {@link NoResponseException}.</li>
  225.      * <li>throw an {@link InterruptedException}.</li>
  226.      * </ol>
  227.      * <p>
  228.      * Additionally the method will throw a {@link NotConnectedException} if no response was
  229.      * received and the connection got disconnected.
  230.      * </p>
  231.      *
  232.      * @param timeout the amount of time to wait for the next stanza in milliseconds.
  233.      * @param <P> type of the result stanza.
  234.      * @return the next available stanza.
  235.      * @throws NoResponseException if there was no response from the server.
  236.      * @throws XMPPErrorException in case an error response was received.
  237.      * @throws InterruptedException if the calling thread was interrupted.
  238.      * @throws NotConnectedException if there was no response and the connection got disconnected.
  239.      */
  240.     public <P extends Stanza> P nextResultOrThrow(long timeout) throws NoResponseException,
  241.                     XMPPErrorException, InterruptedException, NotConnectedException {
  242.         P result;
  243.         try {
  244.             result = nextResult(timeout);
  245.         } finally {
  246.             cancel();
  247.         }
  248.         if (result == null) {
  249.             if (connectionException != null) {
  250.                 throw new NotConnectedException(connection, packetFilter, connectionException);
  251.             }
  252.             if (!connection.isConnected()) {
  253.                 throw new NotConnectedException(connection, packetFilter);
  254.             }
  255.             throw NoResponseException.newWith(timeout, this, cancelled);
  256.         }

  257.         XMPPErrorException.ifHasErrorThenThrow(result);

  258.         return result;
  259.     }

  260.     private List<Stanza> collectedCache;

  261.     /**
  262.      * Return a list of all collected stanzas. This method must be invoked after the collector has been cancelled.
  263.      *
  264.      * @return a list of collected stanzas.
  265.      * @since 4.3.0
  266.      */
  267.     public List<Stanza> getCollectedStanzasAfterCancelled() {
  268.         if (!cancelled) {
  269.             throw new IllegalStateException("Stanza collector was not yet cancelled");
  270.         }

  271.         if (collectedCache == null) {
  272.             collectedCache = new ArrayList<>(getCollectedCount());
  273.             collectedCache.addAll(resultQueue);
  274.         }

  275.         return collectedCache;
  276.     }

  277.     /**
  278.      * Get the number of collected stanzas this stanza collector has collected so far.
  279.      *
  280.      * @return the count of collected stanzas.
  281.      * @since 4.1
  282.      */
  283.     public synchronized int getCollectedCount() {
  284.         return resultQueue.size();
  285.     }

  286.     private String stringCache;

  287.     @Override
  288.     public String toString() {
  289.         if (stringCache == null) {
  290.             StringBuilder sb = new StringBuilder(128);
  291.             sb.append("Stanza Collector filter='").append(packetFilter).append('\'');
  292.             if (request != null) {
  293.                 sb.append(" request='").append(request).append('\'');
  294.             }
  295.             stringCache = sb.toString();
  296.         }
  297.         return stringCache;
  298.     }

  299.     synchronized void notifyConnectionError(Exception exception) {
  300.         connectionException = exception;
  301.         notifyAll();
  302.     }

  303.     /**
  304.      * Processes a stanza to see if it meets the criteria for this stanza collector.
  305.      * If so, the stanza is added to the result queue.
  306.      *
  307.      * @param packet the stanza to process.
  308.      */
  309.     void processStanza(Stanza packet) {
  310.         if (packetFilter == null || packetFilter.accept(packet)) {
  311.             synchronized (this) {
  312.                 if (resultQueue.size() == maxQueueSize) {
  313.                     Stanza rolledOverStanza = resultQueue.poll();
  314.                     assert rolledOverStanza != null;
  315.                 }
  316.                 resultQueue.add(packet);
  317.                 notifyAll();
  318.             }
  319.             if (collectorToReset != null) {
  320.                 collectorToReset.waitStart = System.currentTimeMillis();
  321.             }
  322.         }
  323.     }

  324.     private void throwIfCancelled() {
  325.         if (cancelled) {
  326.             throw new IllegalStateException("Stanza collector already cancelled");
  327.         }
  328.     }

  329.     /**
  330.      * Get a new stanza collector configuration instance.
  331.      *
  332.      * @return a new stanza collector configuration.
  333.      */
  334.     public static Configuration newConfiguration() {
  335.         return new Configuration();
  336.     }

  337.     public static final class Configuration {
  338.         private StanzaFilter packetFilter;
  339.         private int size = SmackConfiguration.getStanzaCollectorSize();
  340.         private StanzaCollector collectorToReset;
  341.         private Stanza request;

  342.         private Configuration() {
  343.         }

  344.         /**
  345.          * Set the stanza filter used by this collector. If <code>null</code>, then all stanzas will
  346.          * get collected by this collector.
  347.          *
  348.          * @param stanzaFilter TODO javadoc me please
  349.          * @return a reference to this configuration.
  350.          */
  351.         public Configuration setStanzaFilter(StanzaFilter stanzaFilter) {
  352.             this.packetFilter = stanzaFilter;
  353.             return this;
  354.         }

  355.         /**
  356.          * Set the maximum size of this collector, i.e. how many stanzas this collector will collect
  357.          * before dropping old ones.
  358.          *
  359.          * @param size TODO javadoc me please
  360.          * @return a reference to this configuration.
  361.          */
  362.         public Configuration setSize(int size) {
  363.             this.size = size;
  364.             return this;
  365.         }

  366.         /**
  367.          * Set the collector which timeout for the next result is reset once this collector collects
  368.          * a packet.
  369.          *
  370.          * @param collector TODO javadoc me please
  371.          * @return a reference to this configuration.
  372.          */
  373.         public Configuration setCollectorToReset(StanzaCollector collector) {
  374.             this.collectorToReset = collector;
  375.             return this;
  376.         }

  377.         public Configuration setRequest(Stanza request) {
  378.             this.request = request;
  379.             return this;
  380.         }
  381.     }

  382.     @Override
  383.     public void close() {
  384.         cancel();
  385.     }

  386. }