ArrayBlockingQueueWithShutdown.java

  1. /**
  2.  *
  3.  * Copyright 2014-2019 Florian Schmaus
  4.  *
  5.  * Licensed under the Apache License, Version 2.0 (the "License");
  6.  * you may not use this file except in compliance with the License.
  7.  * You may obtain a copy of the License at
  8.  *
  9.  *     http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */
  17. package org.jivesoftware.smack.util;

  18. import java.util.AbstractQueue;
  19. import java.util.Collection;
  20. import java.util.Iterator;
  21. import java.util.NoSuchElementException;
  22. import java.util.concurrent.BlockingQueue;
  23. import java.util.concurrent.TimeUnit;
  24. import java.util.concurrent.locks.Condition;
  25. import java.util.concurrent.locks.ReentrantLock;

  26. /**
  27.  * Like ArrayBlockingQueue but with additional {@link #shutdown()} and {@link #start} methods. Will
  28.  * throw {@link InterruptedException} if Queue has been shutdown on {@link #take()} and
  29.  * {@link #poll(long, TimeUnit)}.
  30.  * <p>
  31.  * Based on ArrayBlockingQueue of OpenJDK by Doug Lea (who released ArrayBlockingQueue as public
  32.  * domain).
  33.  *
  34.  * @param <E> the type of elements held in this collection
  35.  */
  36. public class ArrayBlockingQueueWithShutdown<E> extends AbstractQueue<E> implements BlockingQueue<E> {

  37.     private final E[] items;

  38.     private int takeIndex;

  39.     private int putIndex;

  40.     private int count;

  41.     private final ReentrantLock lock;

  42.     private final Condition notEmpty;

  43.     private final Condition notFull;

  44.     private volatile boolean isShutdown = false;

  45.     private int inc(int i) {
  46.         return (++i == items.length) ? 0 : i;
  47.     }

  48.     private void insert(E e) {
  49.         insert(e, true);
  50.     }

  51.     private void insert(E e, boolean signalNotEmpty) {
  52.         items[putIndex] = e;
  53.         putIndex = inc(putIndex);
  54.         count++;
  55.         if (signalNotEmpty) {
  56.             notEmpty.signal();
  57.         }
  58.     }

  59.     private E extract() {
  60.         E e = items[takeIndex];
  61.         items[takeIndex] = null;
  62.         takeIndex = inc(takeIndex);
  63.         count--;
  64.         notFull.signal();
  65.         return e;
  66.     }

  67.     private void removeAt(int i) {
  68.         if (i == takeIndex) {
  69.             items[takeIndex] = null;
  70.             takeIndex = inc(takeIndex);
  71.         }
  72.         else {
  73.             while (true) {
  74.                 int nexti = inc(i);
  75.                 if (nexti != putIndex) {
  76.                     items[i] = items[nexti];
  77.                     i = nexti;
  78.                 }
  79.                 else {
  80.                     items[i] = null;
  81.                     putIndex = i;
  82.                     break;
  83.                 }
  84.             }
  85.         }
  86.         count--;
  87.         notFull.signal();
  88.     }

  89.     private static void checkNotNull(Object o) {
  90.         if (o == null) {
  91.             throw new NullPointerException();
  92.         }
  93.     }

  94.     private void checkNotShutdown() throws InterruptedException {
  95.         if (isShutdown) {
  96.             throw new InterruptedException("Queue was already shut down");
  97.         }
  98.     }

  99.     private boolean hasNoElements() {
  100.         return count == 0;
  101.     }

  102.     private boolean hasElements() {
  103.         return !hasNoElements();
  104.     }

  105.     private boolean isFull() {
  106.         return count == items.length;
  107.     }

  108.     private boolean isNotFull() {
  109.         return !isFull();
  110.     }

  111.     public ArrayBlockingQueueWithShutdown(int capacity) {
  112.         this(capacity, false);
  113.     }

  114.     @SuppressWarnings("unchecked")
  115.     public ArrayBlockingQueueWithShutdown(int capacity, boolean fair) {
  116.         if (capacity <= 0)
  117.             throw new IllegalArgumentException();
  118.         items = (E[]) new Object[capacity];
  119.         lock = new ReentrantLock(fair);
  120.         notEmpty = lock.newCondition();
  121.         notFull = lock.newCondition();
  122.     }

  123.     /**
  124.      * Shutdown the Queue. Will method currently waiting for a not full/empty condition will unblock
  125.      * (and usually throw a InterruptedException).
  126.      */
  127.     public void shutdown() {
  128.         lock.lock();
  129.         try {
  130.             isShutdown = true;
  131.             notEmpty.signalAll();
  132.             notFull.signalAll();
  133.         }
  134.         finally {
  135.             lock.unlock();
  136.         }
  137.     }

  138.     /**
  139.      * Start the queue. Newly created instances will be started automatically, thus this only needs
  140.      * to be called after {@link #shutdown()}.
  141.      *
  142.      * @return <code>true</code> if the queues was shutdown before, <code>false</code> if not.
  143.      */
  144.     public boolean start() {
  145.         boolean previousIsShutdown;
  146.         lock.lock();
  147.         try {
  148.             previousIsShutdown = isShutdown;
  149.             isShutdown = false;
  150.         }
  151.         finally {
  152.             lock.unlock();
  153.         }
  154.         return previousIsShutdown;
  155.     }

  156.     /**
  157.      * Returns true if the queue is currently shut down.
  158.      *
  159.      * @return true if the queue is shut down.
  160.      */
  161.     public boolean isShutdown() {
  162.         lock.lock();
  163.         try {
  164.             return isShutdown;
  165.         } finally {
  166.             lock.unlock();
  167.         }
  168.     }

  169.     @Override
  170.     public E poll() {
  171.         lock.lock();
  172.         try {
  173.             if (hasNoElements()) {
  174.                 return null;
  175.             }
  176.             E e = extract();
  177.             return e;
  178.         }
  179.         finally {
  180.             lock.unlock();
  181.         }
  182.     }

  183.     @Override
  184.     public E peek() {
  185.         lock.lock();
  186.         try {
  187.             return hasNoElements() ? null : items[takeIndex];
  188.         }
  189.         finally {
  190.             lock.unlock();
  191.         }
  192.     }

  193.     @Override
  194.     public boolean offer(E e) {
  195.         checkNotNull(e);
  196.         lock.lock();
  197.         try {
  198.             if (isFull() || isShutdown) {
  199.                 return false;
  200.             }
  201.             else {
  202.                 insert(e);
  203.                 return true;
  204.             }
  205.         }
  206.         finally {
  207.             lock.unlock();
  208.         }
  209.     }

  210.     public boolean offerAndShutdown(E e) {
  211.         checkNotNull(e);
  212.         boolean res;
  213.         lock.lock();
  214.         try {
  215.             res = offer(e);
  216.             shutdown();
  217.         } finally {
  218.             lock.unlock();
  219.         }
  220.         return res;
  221.     }

  222.     private void putInternal(E e, boolean signalNotEmpty) throws InterruptedException {
  223.         assert lock.isHeldByCurrentThread();

  224.         while (isFull()) {
  225.             try {
  226.                 notFull.await();
  227.                 checkNotShutdown();
  228.             }
  229.             catch (InterruptedException ie) {
  230.                 notFull.signal();
  231.                 throw ie;
  232.             }
  233.         }
  234.         insert(e, signalNotEmpty);
  235.     }

  236.     /**
  237.      * Inserts the specified element into this queue, waiting if necessary
  238.      * for space to become available.
  239.      * <p>
  240.      * This may throw an {@link InterruptedException} in two cases
  241.      * <ol>
  242.      *  <li>If the queue was shut down.</li>
  243.      *  <li>If the thread was was interrupted.</li>
  244.      * </ol>
  245.      * So you have to check which is the case, e.g. by calling {@link #isShutdown()}.
  246.      *
  247.      * @param e the element to add.
  248.      * @throws InterruptedException if interrupted while waiting or if the queue was shut down.
  249.      */
  250.     @Override
  251.     public void put(E e) throws InterruptedException {
  252.         checkNotNull(e);
  253.         lock.lockInterruptibly();

  254.         try {
  255.             putInternal(e, true);
  256.         }
  257.         finally {
  258.             lock.unlock();
  259.         }
  260.     }

  261.     /**
  262.      * Put if the queue has not been shutdown yet.
  263.      *
  264.      * @param e the element to put into the queue.
  265.      * @return <code>true</code> if the element has been put into the queue, <code>false</code> if the queue was shutdown.
  266.      * @throws InterruptedException if the calling thread was interrupted.
  267.      * @since 4.4
  268.      */
  269.     public boolean putIfNotShutdown(E e) throws InterruptedException {
  270.         checkNotNull(e);
  271.         lock.lockInterruptibly();

  272.         try {
  273.             if (isShutdown) {
  274.                 return false;
  275.             }

  276.             putInternal(e, true);
  277.             return true;
  278.         } finally {
  279.             lock.unlock();
  280.         }
  281.     }

  282.     public void putAll(Collection<? extends E> elements) throws InterruptedException {
  283.         checkNotNull(elements);
  284.         lock.lockInterruptibly();

  285.         try {
  286.             for (E element : elements) {
  287.                 putInternal(element, false);
  288.             }
  289.         } finally {
  290.             notEmpty.signalAll();
  291.             lock.unlock();
  292.         }
  293.     }

  294.     public enum TryPutResult {
  295.         /**
  296.          * The method was unable to acquire the queue lock.
  297.          */
  298.         couldNotLock,

  299.         /**
  300.          * The queue was shut down.
  301.          */
  302.         queueWasShutDown,

  303.         /**
  304.          * The method was unable to put another element into the queue because the queue was full.
  305.          */
  306.         queueWasFull,

  307.         /**
  308.          * The element was successfully placed into the queue.
  309.          */
  310.         putSuccessful,
  311.     }

  312.     public TryPutResult tryPut(E e) {
  313.         checkNotNull(e);

  314.         boolean locked = lock.tryLock();
  315.         if (!locked) {
  316.             return TryPutResult.couldNotLock;
  317.         }
  318.         try {
  319.             if (isShutdown) {
  320.                 return TryPutResult.queueWasShutDown;
  321.             }
  322.             if (isFull()) {
  323.                 return TryPutResult.queueWasFull;
  324.             }

  325.             insert(e);
  326.             return TryPutResult.putSuccessful;
  327.         } finally {
  328.             lock.unlock();
  329.         }
  330.     }

  331.     @Override
  332.     public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
  333.         checkNotNull(e);
  334.         long nanos = unit.toNanos(timeout);
  335.         lock.lockInterruptibly();
  336.         try {
  337.             while (true) {
  338.                 if (isNotFull()) {
  339.                     insert(e);
  340.                     return true;
  341.                 }
  342.                 if (nanos <= 0) {
  343.                     return false;
  344.                 }
  345.                 try {
  346.                     nanos = notFull.awaitNanos(nanos);
  347.                     checkNotShutdown();
  348.                 }
  349.                 catch (InterruptedException ie) {
  350.                     notFull.signal();
  351.                     throw ie;
  352.                 }
  353.             }
  354.         }
  355.         finally {
  356.             lock.unlock();
  357.         }

  358.     }

  359.     @Override
  360.     public E take() throws InterruptedException {
  361.         lock.lockInterruptibly();
  362.         try {
  363.             checkNotShutdown();
  364.             try {
  365.                 while (hasNoElements()) {
  366.                     notEmpty.await();
  367.                     checkNotShutdown();
  368.                 }
  369.             }
  370.             catch (InterruptedException ie) {
  371.                 notEmpty.signal();
  372.                 throw ie;
  373.             }
  374.             E e = extract();
  375.             return e;
  376.         }
  377.         finally {
  378.             lock.unlock();
  379.         }
  380.     }

  381.     public enum TryTakeResultCode {
  382.         /**
  383.          * The method was unable to acquire the queue lock.
  384.          */
  385.         couldNotLock,

  386.         /**
  387.          * The queue was shut down.
  388.          */
  389.         queueWasShutDown,

  390.         /**
  391.          * The queue was empty.
  392.          */
  393.         queueWasEmpty,

  394.         /**
  395.          * An element was successfully removed from the queue.
  396.          */
  397.         takeSuccessful,
  398.     }

  399.     public static final class TryTakeResult<E> {
  400.         private final E element;
  401.         private final TryTakeResultCode resultCode;

  402.         private TryTakeResult(TryTakeResultCode resultCode) {
  403.             assert resultCode != null;
  404.             this.resultCode = resultCode;
  405.             this.element = null;
  406.         }

  407.         private TryTakeResult(E element) {
  408.             assert element != null;
  409.             this.resultCode = TryTakeResultCode.takeSuccessful;
  410.             this.element = element;
  411.         }

  412.         public TryTakeResultCode getResultCode() {
  413.             return resultCode;
  414.         }

  415.         public E getElement() {
  416.             return element;
  417.         }
  418.     }

  419.     public TryTakeResult<E> tryTake() {
  420.         boolean locked = lock.tryLock();
  421.         if (!locked) {
  422.             return new TryTakeResult<E>(TryTakeResultCode.couldNotLock);
  423.         }
  424.         try {
  425.             if (isShutdown) {
  426.                 return new TryTakeResult<E>(TryTakeResultCode.queueWasShutDown);
  427.             }
  428.             if (hasNoElements()) {
  429.                 return new TryTakeResult<E>(TryTakeResultCode.queueWasEmpty);
  430.             }
  431.             E element = extract();
  432.             return new TryTakeResult<E>(element);
  433.         } finally {
  434.             lock.unlock();
  435.         }
  436.     }

  437.     @Override
  438.     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  439.         long nanos = unit.toNanos(timeout);
  440.         lock.lockInterruptibly();
  441.         try {
  442.             checkNotShutdown();
  443.             while (true) {
  444.                 if (hasElements()) {
  445.                     E e = extract();
  446.                     return e;
  447.                 }
  448.                 if (nanos <= 0) {
  449.                     return null;
  450.                 }
  451.                 try {
  452.                     nanos = notEmpty.awaitNanos(nanos);
  453.                     checkNotShutdown();
  454.                 }
  455.                 catch (InterruptedException ie) {
  456.                     notEmpty.signal();
  457.                     throw ie;
  458.                 }
  459.             }
  460.         }
  461.         finally {
  462.             lock.unlock();
  463.         }
  464.     }

  465.     @Override
  466.     public int remainingCapacity() {
  467.         lock.lock();
  468.         try {
  469.             return items.length - count;
  470.         }
  471.         finally {
  472.             lock.unlock();
  473.         }
  474.     }

  475.     @Override
  476.     public int drainTo(Collection<? super E> c) {
  477.         checkNotNull(c);
  478.         if (c == this) {
  479.             throw new IllegalArgumentException();
  480.         }
  481.         lock.lock();
  482.         try {
  483.             int i = takeIndex;
  484.             int n = 0;
  485.             for (; n < count; n++) {
  486.                 c.add(items[i]);
  487.                 items[i] = null;
  488.                 i = inc(i);
  489.             }
  490.             if (n > 0) {
  491.                 count = 0;
  492.                 putIndex = 0;
  493.                 takeIndex = 0;
  494.                 notFull.signalAll();
  495.             }
  496.             return n;
  497.         }
  498.         finally {
  499.             lock.unlock();
  500.         }
  501.     }

  502.     @Override
  503.     public int drainTo(Collection<? super E> c, int maxElements) {
  504.         checkNotNull(c);
  505.         if (c == this) {
  506.             throw new IllegalArgumentException();
  507.         }
  508.         if (maxElements <= 0) {
  509.             return 0;
  510.         }
  511.         lock.lock();
  512.         try {
  513.             int i = takeIndex;
  514.             int n = 0;
  515.             int max = (maxElements < count) ? maxElements : count;
  516.             for (; n < max; n++) {
  517.                 c.add(items[i]);
  518.                 items[i] = null;
  519.                 i = inc(i);
  520.             }
  521.             if (n > 0) {
  522.                 count -= n;
  523.                 takeIndex = i;
  524.                 notFull.signalAll();
  525.             }
  526.             return n;
  527.         }
  528.         finally {
  529.             lock.unlock();
  530.         }
  531.     }

  532.     @Override
  533.     public int size() {
  534.         lock.lock();
  535.         try {
  536.             return count;
  537.         }
  538.         finally {
  539.             lock.unlock();
  540.         }
  541.     }

  542.     @Override
  543.     public Iterator<E> iterator() {
  544.         lock.lock();
  545.         try {
  546.             return new Itr();
  547.         }
  548.         finally {
  549.             lock.unlock();
  550.         }
  551.     }

  552.     private class Itr implements Iterator<E> {
  553.         private int nextIndex;
  554.         private E nextItem;
  555.         private int lastRet;

  556.         Itr() {
  557.             lastRet = -1;
  558.             if (count == 0) {
  559.                 nextIndex = -1;
  560.             }
  561.             else {
  562.                 nextIndex = takeIndex;
  563.                 nextItem = items[takeIndex];
  564.             }
  565.         }

  566.         @Override
  567.         public boolean hasNext() {
  568.             return nextIndex >= 0;
  569.         }

  570.         private void checkNext() {
  571.             if (nextIndex == putIndex) {
  572.                 nextIndex = -1;
  573.                 nextItem = null;
  574.             }
  575.             else {
  576.                 nextItem = items[nextIndex];
  577.                 if (nextItem == null) {
  578.                     nextIndex = -1;
  579.                 }
  580.             }
  581.         }

  582.         @Override
  583.         public E next() {
  584.             lock.lock();
  585.             try {
  586.                 if (nextIndex < 0) {
  587.                     throw new NoSuchElementException();
  588.                 }
  589.                 lastRet = nextIndex;
  590.                 E e = nextItem;
  591.                 nextIndex = inc(nextIndex);
  592.                 checkNext();
  593.                 return e;
  594.             }
  595.             finally {
  596.                 lock.unlock();
  597.             }
  598.         }

  599.         @Override
  600.         public void remove() {
  601.             lock.lock();
  602.             try {
  603.                 int i = lastRet;
  604.                 if (i < 0) {
  605.                     throw new IllegalStateException();
  606.                 }
  607.                 lastRet = -1;
  608.                 int ti = takeIndex;
  609.                 removeAt(i);
  610.                 nextIndex = (i == ti) ? takeIndex : i;
  611.                 checkNext();
  612.             }
  613.             finally {
  614.                 lock.unlock();
  615.             }
  616.         }
  617.     }

  618. }