ArrayBlockingQueueWithShutdown.java

  1. /**
  2.  *
  3.  * Copyright 2014 Florian Schmaus
  4.  *
  5.  * Licensed under the Apache License, Version 2.0 (the "License");
  6.  * you may not use this file except in compliance with the License.
  7.  * You may obtain a copy of the License at
  8.  *
  9.  *     http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */
  17. package org.jivesoftware.smack.util;

  18. import java.util.AbstractQueue;
  19. import java.util.Collection;
  20. import java.util.Iterator;
  21. import java.util.NoSuchElementException;
  22. import java.util.concurrent.BlockingQueue;
  23. import java.util.concurrent.TimeUnit;
  24. import java.util.concurrent.locks.Condition;
  25. import java.util.concurrent.locks.ReentrantLock;

  26. /**
  27.  * Like ArrayBlockingQueue but with additional {@link #shutdown()} and {@link #start} methods. Will
  28.  * throw {@link InterruptedException} if Queue has been shutdown on {@link #take()} and
  29.  * {@link #poll(long, TimeUnit)}.
  30.  * <p>
  31.  * Based on ArrayBlockingQueue of OpenJDK by Doug Lea (who released ArrayBlockingQueue as public
  32.  * domain).
  33.  *
  34.  * @param <E> the type of elements held in this collection
  35.  */
  36. public class ArrayBlockingQueueWithShutdown<E> extends AbstractQueue<E> implements BlockingQueue<E> {

  37.     private final E[] items;

  38.     private int takeIndex;

  39.     private int putIndex;

  40.     private int count;

  41.     private final ReentrantLock lock;

  42.     private final Condition notEmpty;

  43.     private final Condition notFull;

  44.     private volatile boolean isShutdown = false;

  45.     private final int inc(int i) {
  46.         return (++i == items.length) ? 0 : i;
  47.     }

  48.     private final void insert(E e) {
  49.         items[putIndex] = e;
  50.         putIndex = inc(putIndex);
  51.         count++;
  52.         notEmpty.signal();
  53.     }

  54.     private final E extract() {
  55.         E e = items[takeIndex];
  56.         items[takeIndex] = null;
  57.         takeIndex = inc(takeIndex);
  58.         count--;
  59.         notFull.signal();
  60.         return e;
  61.     }

  62.     private final void removeAt(int i) {
  63.         if (i == takeIndex) {
  64.             items[takeIndex] = null;
  65.             takeIndex = inc(takeIndex);
  66.         }
  67.         else {
  68.             while (true) {
  69.                 int nexti = inc(i);
  70.                 if (nexti != putIndex) {
  71.                     items[i] = items[nexti];
  72.                     i = nexti;
  73.                 }
  74.                 else {
  75.                     items[i] = null;
  76.                     putIndex = i;
  77.                     break;
  78.                 }
  79.             }
  80.         }
  81.         count--;
  82.         notFull.signal();
  83.     }

  84.     private final static void checkNotNull(Object o) {
  85.         if (o == null) {
  86.             throw new NullPointerException();
  87.         }
  88.     }

  89.     private final void checkNotShutdown() throws InterruptedException {
  90.         if (isShutdown) {
  91.             throw new InterruptedException();
  92.         }
  93.     }

  94.     private final boolean hasNoElements() {
  95.         return count == 0;
  96.     }

  97.     private final boolean hasElements() {
  98.         return !hasNoElements();
  99.     }

  100.     private final boolean isFull() {
  101.         return count == items.length;
  102.     }

  103.     private final boolean isNotFull() {
  104.         return !isFull();
  105.     }

  106.     public ArrayBlockingQueueWithShutdown(int capacity) {
  107.         this(capacity, false);
  108.     }

  109.     @SuppressWarnings("unchecked")
  110.     public ArrayBlockingQueueWithShutdown(int capacity, boolean fair) {
  111.         if (capacity <= 0)
  112.             throw new IllegalArgumentException();
  113.         items = (E[]) new Object[capacity];
  114.         lock = new ReentrantLock(fair);
  115.         notEmpty = lock.newCondition();
  116.         notFull = lock.newCondition();
  117.     }

  118.     /**
  119.      * Shutdown the Queue. Will method currently waiting for a not full/empty condition will unblock
  120.      * (and usually throw a InterruptedException).
  121.      */
  122.     public void shutdown() {
  123.         lock.lock();
  124.         try {
  125.             isShutdown = true;
  126.             notEmpty.signalAll();
  127.             notFull.signalAll();
  128.         }
  129.         finally {
  130.             lock.unlock();
  131.         }
  132.     }

  133.     /**
  134.      * Start the queue. Newly created instances will be started automatically, thus this only needs
  135.      * to be called after {@link #shutdown()}.
  136.      */
  137.     public void start() {
  138.         lock.lock();
  139.         try {
  140.             isShutdown = false;
  141.         }
  142.         finally {
  143.             lock.unlock();
  144.         }
  145.     }

  146.     /**
  147.      * Returns true if the queue is currently shut down.
  148.      *
  149.      * @return true if the queue is shut down.
  150.      */
  151.     public boolean isShutdown() {
  152.         lock.lock();
  153.         try {
  154.             return isShutdown;
  155.         } finally {
  156.             lock.unlock();
  157.         }
  158.     }

  159.     @Override
  160.     public E poll() {
  161.         lock.lock();
  162.         try {
  163.             if (hasNoElements()) {
  164.                 return null;
  165.             }
  166.             E e = extract();
  167.             return e;
  168.         }
  169.         finally {
  170.             lock.unlock();
  171.         }
  172.     }

  173.     @Override
  174.     public E peek() {
  175.         lock.lock();
  176.         try {
  177.             return hasNoElements() ? null : items[takeIndex];
  178.         }
  179.         finally {
  180.             lock.unlock();
  181.         }
  182.     }

  183.     @Override
  184.     public boolean offer(E e) {
  185.         checkNotNull(e);
  186.         lock.lock();
  187.         try {
  188.             if (isFull() || isShutdown) {
  189.                 return false;
  190.             }
  191.             else {
  192.                 insert(e);
  193.                 return true;
  194.             }
  195.         }
  196.         finally {
  197.             lock.unlock();
  198.         }
  199.     }

  200.     /**
  201.      * Inserts the specified element into this queue, waiting if necessary
  202.      * for space to become available.
  203.      * <p>
  204.      * This may throw an {@link InterruptedException} in two cases
  205.      * <ol>
  206.      *  <li>If the queue was shut down.</li>
  207.      *  <li>If the thread was was interrupted.</li>
  208.      * </ol>
  209.      * So you have to check which is the case, e.g. by calling {@link #isShutdown()}.
  210.      * </p>
  211.      *
  212.      * @param e the element to add.
  213.      * @throws InterruptedException if interrupted while waiting or if the queue was shut down.
  214.      */
  215.     public void put(E e) throws InterruptedException {
  216.         checkNotNull(e);
  217.         lock.lockInterruptibly();

  218.         try {
  219.             while (isFull()) {
  220.                 try {
  221.                     notFull.await();
  222.                     checkNotShutdown();
  223.                 }
  224.                 catch (InterruptedException ie) {
  225.                     notFull.signal();
  226.                     throw ie;
  227.                 }
  228.             }
  229.             insert(e);
  230.         }
  231.         finally {
  232.             lock.unlock();
  233.         }
  234.     }

  235.     @Override
  236.     public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
  237.         checkNotNull(e);
  238.         long nanos = unit.toNanos(timeout);
  239.         lock.lockInterruptibly();
  240.         try {
  241.             while (true) {
  242.                 if (isNotFull()) {
  243.                     insert(e);
  244.                     return true;
  245.                 }
  246.                 if (nanos <= 0) {
  247.                     return false;
  248.                 }
  249.                 try {
  250.                     nanos = notFull.awaitNanos(nanos);
  251.                     checkNotShutdown();
  252.                 }
  253.                 catch (InterruptedException ie) {
  254.                     notFull.signal();
  255.                     throw ie;
  256.                 }
  257.             }
  258.         }
  259.         finally {
  260.             lock.unlock();
  261.         }

  262.     }

  263.     @Override
  264.     public E take() throws InterruptedException {
  265.         lock.lockInterruptibly();
  266.         try {
  267.             checkNotShutdown();
  268.             try {
  269.                 while (hasNoElements()) {
  270.                     notEmpty.await();
  271.                     checkNotShutdown();
  272.                 }
  273.             }
  274.             catch (InterruptedException ie) {
  275.                 notEmpty.signal();
  276.                 throw ie;
  277.             }
  278.             E e = extract();
  279.             return e;
  280.         }
  281.         finally {
  282.             lock.unlock();
  283.         }
  284.     }

  285.     @Override
  286.     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  287.         long nanos = unit.toNanos(timeout);
  288.         lock.lockInterruptibly();
  289.         try {
  290.             checkNotShutdown();
  291.             while (true) {
  292.                 if (hasElements()) {
  293.                     E e = extract();
  294.                     return e;
  295.                 }
  296.                 if (nanos <= 0) {
  297.                     return null;
  298.                 }
  299.                 try {
  300.                     nanos = notEmpty.awaitNanos(nanos);
  301.                     checkNotShutdown();
  302.                 }
  303.                 catch (InterruptedException ie) {
  304.                     notEmpty.signal();
  305.                     throw ie;
  306.                 }
  307.             }
  308.         }
  309.         finally {
  310.             lock.unlock();
  311.         }
  312.     }

  313.     @Override
  314.     public int remainingCapacity() {
  315.         lock.lock();
  316.         try {
  317.             return items.length - count;
  318.         }
  319.         finally {
  320.             lock.unlock();
  321.         }
  322.     }

  323.     @Override
  324.     public int drainTo(Collection<? super E> c) {
  325.         checkNotNull(c);
  326.         if (c == this) {
  327.             throw new IllegalArgumentException();
  328.         }
  329.         lock.lock();
  330.         try {
  331.             int i = takeIndex;
  332.             int n = 0;
  333.             for (; n < count; n++) {
  334.                 c.add(items[i]);
  335.                 items[i] = null;
  336.                 i = inc(i);
  337.             }
  338.             if (n > 0) {
  339.                 count = 0;
  340.                 putIndex = 0;
  341.                 takeIndex = 0;
  342.                 notFull.signalAll();
  343.             }
  344.             return n;
  345.         }
  346.         finally {
  347.             lock.unlock();
  348.         }
  349.     }

  350.     @Override
  351.     public int drainTo(Collection<? super E> c, int maxElements) {
  352.         checkNotNull(c);
  353.         if (c == this) {
  354.             throw new IllegalArgumentException();
  355.         }
  356.         if (maxElements <= 0) {
  357.             return 0;
  358.         }
  359.         lock.lock();
  360.         try {
  361.             int i = takeIndex;
  362.             int n = 0;
  363.             int max = (maxElements < count) ? maxElements : count;
  364.             for (; n < max; n++) {
  365.                 c.add(items[i]);
  366.                 items[i] = null;
  367.                 i = inc(i);
  368.             }
  369.             if (n > 0) {
  370.                 count -= n;
  371.                 takeIndex = i;
  372.                 notFull.signalAll();
  373.             }
  374.             return n;
  375.         }
  376.         finally {
  377.             lock.unlock();
  378.         }
  379.     }

  380.     @Override
  381.     public int size() {
  382.         lock.lock();
  383.         try {
  384.             return count;
  385.         }
  386.         finally {
  387.             lock.unlock();
  388.         }
  389.     }

  390.     @Override
  391.     public Iterator<E> iterator() {
  392.         lock.lock();
  393.         try {
  394.             return new Itr();
  395.         }
  396.         finally {
  397.             lock.unlock();
  398.         }
  399.     }

  400.     private class Itr implements Iterator<E> {
  401.         private int nextIndex;
  402.         private E nextItem;
  403.         private int lastRet;

  404.         Itr() {
  405.             lastRet = -1;
  406.             if (count == 0) {
  407.                 nextIndex = -1;
  408.             }
  409.             else {
  410.                 nextIndex = takeIndex;
  411.                 nextItem = items[takeIndex];
  412.             }
  413.         }

  414.         public boolean hasNext() {
  415.             return nextIndex >= 0;
  416.         }

  417.         private void checkNext() {
  418.             if (nextIndex == putIndex) {
  419.                 nextIndex = -1;
  420.                 nextItem = null;
  421.             }
  422.             else {
  423.                 nextItem = items[nextIndex];
  424.                 if (nextItem == null) {
  425.                     nextIndex = -1;
  426.                 }
  427.             }
  428.         }

  429.         public E next() {
  430.             lock.lock();
  431.             try {
  432.                 if (nextIndex < 0) {
  433.                     throw new NoSuchElementException();
  434.                 }
  435.                 lastRet = nextIndex;
  436.                 E e = nextItem;
  437.                 nextIndex = inc(nextIndex);
  438.                 checkNext();
  439.                 return e;
  440.             }
  441.             finally {
  442.                 lock.unlock();
  443.             }
  444.         }

  445.         public void remove() {
  446.             lock.lock();
  447.             try {
  448.                 int i = lastRet;
  449.                 if (i < 0) {
  450.                     throw new IllegalStateException();
  451.                 }
  452.                 lastRet = -1;
  453.                 int ti = takeIndex;
  454.                 removeAt(i);
  455.                 nextIndex = (i == ti) ? takeIndex : i;
  456.                 checkNext();
  457.             }
  458.             finally {
  459.                 lock.unlock();
  460.             }
  461.         }
  462.     }

  463. }