PacketCollector.java

  1. /**
  2.  *
  3.  * Copyright 2003-2007 Jive Software.
  4.  *
  5.  * Licensed under the Apache License, Version 2.0 (the "License");
  6.  * you may not use this file except in compliance with the License.
  7.  * You may obtain a copy of the License at
  8.  *
  9.  *     http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */

  17. package org.jivesoftware.smack;

  18. import java.util.concurrent.ArrayBlockingQueue;
  19. import java.util.concurrent.TimeUnit;

  20. import org.jivesoftware.smack.SmackException.NoResponseException;
  21. import org.jivesoftware.smack.XMPPException.XMPPErrorException;
  22. import org.jivesoftware.smack.filter.StanzaFilter;
  23. import org.jivesoftware.smack.packet.Stanza;

  24. /**
  25.  * Provides a mechanism to collect packets into a result queue that pass a
  26.  * specified filter. The collector lets you perform blocking and polling
  27.  * operations on the result queue. So, a PacketCollector is more suitable to
  28.  * use than a {@link StanzaListener} when you need to wait for a specific
  29.  * result.<p>
  30.  *
  31.  * Each packet collector will queue up a configured number of packets for processing before
  32.  * older packets are automatically dropped.  The default number is retrieved by
  33.  * {@link SmackConfiguration#getPacketCollectorSize()}.
  34.  *
  35.  * @see XMPPConnection#createPacketCollector(StanzaFilter)
  36.  * @author Matt Tucker
  37.  */
  38. public class PacketCollector {

  39.     private final StanzaFilter packetFilter;

  40.     private final ArrayBlockingQueue<Stanza> resultQueue;

  41.     /**
  42.      * The packet collector which timeout for the next result will get reset once this collector collects a stanza.
  43.      */
  44.     private final PacketCollector collectorToReset;

  45.     private final XMPPConnection connection;

  46.     private boolean cancelled = false;

  47.     /**
  48.      * Creates a new packet collector. If the packet filter is <tt>null</tt>, then
  49.      * all packets will match this collector.
  50.      *
  51.      * @param connection the connection the collector is tied to.
  52.      * @param configuration the configuration used to construct this collector
  53.      */
  54.     protected PacketCollector(XMPPConnection connection, Configuration configuration) {
  55.         this.connection = connection;
  56.         this.packetFilter = configuration.packetFilter;
  57.         this.resultQueue = new ArrayBlockingQueue<>(configuration.size);
  58.         this.collectorToReset = configuration.collectorToReset;
  59.     }

  60.     /**
  61.      * Explicitly cancels the packet collector so that no more results are
  62.      * queued up. Once a packet collector has been cancelled, it cannot be
  63.      * re-enabled. Instead, a new packet collector must be created.
  64.      */
  65.     public void cancel() {
  66.         // If the packet collector has already been cancelled, do nothing.
  67.         if (!cancelled) {
  68.             cancelled = true;
  69.             connection.removePacketCollector(this);
  70.         }
  71.     }

  72.     /**
  73.      * Returns the packet filter associated with this packet collector. The packet
  74.      * filter is used to determine what packets are queued as results.
  75.      *
  76.      * @return the packet filter.
  77.      * @deprecated use {@link #getStanzaFilter()} instead.
  78.      */
  79.     @Deprecated
  80.     public StanzaFilter getPacketFilter() {
  81.         return getStanzaFilter();
  82.     }

  83.     /**
  84.      * Returns the stanza filter associated with this stanza collector. The stanza
  85.      * filter is used to determine what stanzas are queued as results.
  86.      *
  87.      * @return the stanza filter.
  88.      */
  89.     public StanzaFilter getStanzaFilter() {
  90.         return packetFilter;
  91.     }
  92.  
  93.     /**
  94.      * Polls to see if a packet is currently available and returns it, or
  95.      * immediately returns <tt>null</tt> if no packets are currently in the
  96.      * result queue.
  97.      *
  98.      * @return the next packet result, or <tt>null</tt> if there are no more
  99.      *      results.
  100.      */
  101.     @SuppressWarnings("unchecked")
  102.     public <P extends Stanza> P pollResult() {
  103.         return (P) resultQueue.poll();
  104.     }

  105.     /**
  106.      * Polls to see if a packet is currently available and returns it, or
  107.      * immediately returns <tt>null</tt> if no packets are currently in the
  108.      * result queue.
  109.      * <p>
  110.      * Throws an XMPPErrorException in case the polled stanzas did contain an XMPPError.
  111.      * </p>
  112.      *
  113.      * @return the next available packet.
  114.      * @throws XMPPErrorException in case an error response.
  115.      */
  116.     public <P extends Stanza> P pollResultOrThrow() throws XMPPErrorException {
  117.         P result = pollResult();
  118.         if (result != null) {
  119.             XMPPErrorException.ifHasErrorThenThrow(result);
  120.         }
  121.         return result;
  122.     }

  123.     /**
  124.      * Returns the next available packet. The method call will block (not return) until a packet is
  125.      * available.
  126.      *
  127.      * @return the next available packet.
  128.      * @throws InterruptedException
  129.      */
  130.     @SuppressWarnings("unchecked")
  131.     public <P extends Stanza> P nextResultBlockForever() throws InterruptedException {
  132.         throwIfCancelled();
  133.         P res = null;
  134.         while (res == null) {
  135.             res = (P) resultQueue.take();
  136.         }
  137.         return res;
  138.     }

  139.     /**
  140.      * Returns the next available packet. The method call will block until the connection's default
  141.      * timeout has elapsed.
  142.      *
  143.      * @return the next available packet.
  144.      * @throws InterruptedException
  145.      */
  146.     public <P extends Stanza> P nextResult() throws InterruptedException {
  147.         return nextResult(connection.getPacketReplyTimeout());
  148.     }

  149.     private volatile long waitStart;

  150.     /**
  151.      * Returns the next available packet. The method call will block (not return)
  152.      * until a packet is available or the <tt>timeout</tt> has elapsed. If the
  153.      * timeout elapses without a result, <tt>null</tt> will be returned.
  154.      *
  155.      * @param timeout the timeout in milliseconds.
  156.      * @return the next available packet.
  157.      * @throws InterruptedException
  158.      */
  159.     @SuppressWarnings("unchecked")
  160.     public <P extends Stanza> P nextResult(long timeout) throws InterruptedException {
  161.         throwIfCancelled();
  162.         P res = null;
  163.         long remainingWait = timeout;
  164.         waitStart = System.currentTimeMillis();
  165.         do {
  166.             res = (P) resultQueue.poll(remainingWait, TimeUnit.MILLISECONDS);
  167.             if (res != null) {
  168.                 return res;
  169.             }
  170.             remainingWait = timeout - (System.currentTimeMillis() - waitStart);
  171.         } while (remainingWait > 0);
  172.         return null;
  173.     }

  174.     /**
  175.      * Returns the next available packet. The method call will block until a packet is available or
  176.      * the connections reply timeout has elapsed. If the timeout elapses without a result,
  177.      * <tt>null</tt> will be returned. This method does also cancel the PacketCollector.
  178.      *
  179.      * @return the next available packet.
  180.      * @throws XMPPErrorException in case an error response.
  181.      * @throws NoResponseException if there was no response from the server.
  182.      * @throws InterruptedException
  183.      */
  184.     public <P extends Stanza> P nextResultOrThrow() throws NoResponseException, XMPPErrorException, InterruptedException {
  185.         return nextResultOrThrow(connection.getPacketReplyTimeout());
  186.     }

  187.     /**
  188.      * Returns the next available packet. The method call will block until a packet is available or
  189.      * the <tt>timeout</tt> has elapsed. This method does also cancel the PacketCollector.
  190.      *
  191.      * @param timeout the amount of time to wait for the next packet (in milleseconds).
  192.      * @return the next available packet.
  193.      * @throws NoResponseException if there was no response from the server.
  194.      * @throws XMPPErrorException in case an error response.
  195.      * @throws InterruptedException
  196.      */
  197.     public <P extends Stanza> P nextResultOrThrow(long timeout) throws NoResponseException, XMPPErrorException, InterruptedException {
  198.         P result = nextResult(timeout);
  199.         cancel();
  200.         if (result == null) {
  201.             throw NoResponseException.newWith(connection, this);
  202.         }

  203.         XMPPErrorException.ifHasErrorThenThrow(result);

  204.         return result;
  205.     }

  206.     /**
  207.      * Get the number of collected stanzas this packet collector has collected so far.
  208.      *
  209.      * @return the count of collected stanzas.
  210.      * @since 4.1
  211.      */
  212.     public int getCollectedCount() {
  213.         return resultQueue.size();
  214.     }

  215.     /**
  216.      * Processes a packet to see if it meets the criteria for this packet collector.
  217.      * If so, the packet is added to the result queue.
  218.      *
  219.      * @param packet the packet to process.
  220.      */
  221.     protected void processPacket(Stanza packet) {
  222.         if (packetFilter == null || packetFilter.accept(packet)) {
  223.             while (!resultQueue.offer(packet)) {
  224.                 // Since we know the queue is full, this poll should never actually block.
  225.                 resultQueue.poll();
  226.             }
  227.             if (collectorToReset != null) {
  228.                 collectorToReset.waitStart = System.currentTimeMillis();
  229.             }
  230.         }
  231.     }

  232.     private final void throwIfCancelled() {
  233.         if (cancelled) {
  234.             throw new IllegalStateException("Packet collector already cancelled");
  235.         }
  236.     }

  237.     /**
  238.      * Get a new packet collector configuration instance.
  239.      *
  240.      * @return a new packet collector configuration.
  241.      */
  242.     public static Configuration newConfiguration() {
  243.         return new Configuration();
  244.     }

  245.     public static class Configuration {
  246.         private StanzaFilter packetFilter;
  247.         private int size = SmackConfiguration.getPacketCollectorSize();
  248.         private PacketCollector collectorToReset;

  249.         private Configuration() {
  250.         }

  251.         /**
  252.          * Set the packet filter used by this collector. If <code>null</code>, then all packets will
  253.          * get collected by this collector.
  254.          *
  255.          * @param packetFilter
  256.          * @return a reference to this configuration.
  257.          * @deprecated use {@link #setStanzaFilter(StanzaFilter)} instead.
  258.          */
  259.         @Deprecated
  260.         public Configuration setPacketFilter(StanzaFilter packetFilter) {
  261.             return setStanzaFilter(packetFilter);
  262.         }

  263.         /**
  264.          * Set the stanza filter used by this collector. If <code>null</code>, then all stanzas will
  265.          * get collected by this collector.
  266.          *
  267.          * @param stanzaFilter
  268.          * @return a reference to this configuration.
  269.          */
  270.         public Configuration setStanzaFilter(StanzaFilter stanzaFilter) {
  271.             this.packetFilter = stanzaFilter;
  272.             return this;
  273.         }

  274.         /**
  275.          * Set the maximum size of this collector, i.e. how many stanzas this collector will collect
  276.          * before dropping old ones.
  277.          *
  278.          * @param size
  279.          * @return a reference to this configuration.
  280.          */
  281.         public Configuration setSize(int size) {
  282.             this.size = size;
  283.             return this;
  284.         }

  285.         /**
  286.          * Set the collector which timeout for the next result is reset once this collector collects
  287.          * a packet.
  288.          *
  289.          * @param collector
  290.          * @return a reference to this configuration.
  291.          */
  292.         public Configuration setCollectorToReset(PacketCollector collector) {
  293.             this.collectorToReset = collector;
  294.             return this;
  295.         }
  296.     }
  297. }