ArrayBlockingQueueWithShutdown.java

/**
 *
 * Copyright 2014-2019 Florian Schmaus
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.jivesoftware.smack.util;

import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Like ArrayBlockingQueue but with additional {@link #shutdown()} and {@link #start} methods. Will
 * throw {@link InterruptedException} if Queue has been shutdown on {@link #take()} and
 * {@link #poll(long, TimeUnit)}.
 * <p>
 * Based on ArrayBlockingQueue of OpenJDK by Doug Lea (who released ArrayBlockingQueue as public
 * domain).
 *
 * @param <E> the type of elements held in this collection
 */
public class ArrayBlockingQueueWithShutdown<E> extends AbstractQueue<E> implements BlockingQueue<E> {

    private final E[] items;

    private int takeIndex;

    private int putIndex;

    private int count;

    private final ReentrantLock lock;

    private final Condition notEmpty;

    private final Condition notFull;

    private volatile boolean isShutdown = false;

    private int inc(int i) {
        return (++i == items.length) ? 0 : i;
    }

    private void insert(E e) {
        insert(e, true);
    }

    private void insert(E e, boolean signalNotEmpty) {
        items[putIndex] = e;
        putIndex = inc(putIndex);
        count++;
        if (signalNotEmpty) {
            notEmpty.signal();
        }
    }

    private E extract() {
        E e = items[takeIndex];
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
        count--;
        notFull.signal();
        return e;
    }

    private void removeAt(int i) {
        if (i == takeIndex) {
            items[takeIndex] = null;
            takeIndex = inc(takeIndex);
        }
        else {
            while (true) {
                int nexti = inc(i);
                if (nexti != putIndex) {
                    items[i] = items[nexti];
                    i = nexti;
                }
                else {
                    items[i] = null;
                    putIndex = i;
                    break;
                }
            }
        }
        count--;
        notFull.signal();
    }

    private static void checkNotNull(Object o) {
        if (o == null) {
            throw new NullPointerException();
        }
    }

    private void checkNotShutdown() throws InterruptedException {
        if (isShutdown) {
            throw new InterruptedException("Queue was already shut down");
        }
    }

    private boolean hasNoElements() {
        return count == 0;
    }

    private boolean hasElements() {
        return !hasNoElements();
    }

    private boolean isFull() {
        return count == items.length;
    }

    private boolean isNotFull() {
        return !isFull();
    }

    public ArrayBlockingQueueWithShutdown(int capacity) {
        this(capacity, false);
    }

    @SuppressWarnings("unchecked")
    public ArrayBlockingQueueWithShutdown(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        items = (E[]) new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull = lock.newCondition();
    }

    /**
     * Shutdown the Queue. Will method currently waiting for a not full/empty condition will unblock
     * (and usually throw a InterruptedException).
     */
    public void shutdown() {
        lock.lock();
        try {
            isShutdown = true;
            notEmpty.signalAll();
            notFull.signalAll();
        }
        finally {
            lock.unlock();
        }
    }

    /**
     * Start the queue. Newly created instances will be started automatically, thus this only needs
     * to be called after {@link #shutdown()}.
     *
     * @return <code>true</code> if the queues was shutdown before, <code>false</code> if not.
     */
    public boolean start() {
        boolean previousIsShutdown;
        lock.lock();
        try {
            previousIsShutdown = isShutdown;
            isShutdown = false;
        }
        finally {
            lock.unlock();
        }
        return previousIsShutdown;
    }

    /**
     * Returns true if the queue is currently shut down.
     *
     * @return true if the queue is shut down.
     */
    public boolean isShutdown() {
        lock.lock();
        try {
            return isShutdown;
        } finally {
            lock.unlock();
        }
    }

    @Override
    public E poll() {
        lock.lock();
        try {
            if (hasNoElements()) {
                return null;
            }
            E e = extract();
            return e;
        }
        finally {
            lock.unlock();
        }
    }

    @Override
    public E peek() {
        lock.lock();
        try {
            return hasNoElements() ? null : items[takeIndex];
        }
        finally {
            lock.unlock();
        }
    }

    @Override
    public boolean offer(E e) {
        checkNotNull(e);
        lock.lock();
        try {
            if (isFull() || isShutdown) {
                return false;
            }
            else {
                insert(e);
                return true;
            }
        }
        finally {
            lock.unlock();
        }
    }

    public boolean offerAndShutdown(E e) {
        checkNotNull(e);
        boolean res;
        lock.lock();
        try {
            res = offer(e);
            shutdown();
        } finally {
            lock.unlock();
        }
        return res;
    }

    private void putInternal(E e, boolean signalNotEmpty) throws InterruptedException {
        assert lock.isHeldByCurrentThread();

        while (isFull()) {
            try {
                notFull.await();
                checkNotShutdown();
            }
            catch (InterruptedException ie) {
                notFull.signal();
                throw ie;
            }
        }
        insert(e, signalNotEmpty);
    }

    /**
     * Inserts the specified element into this queue, waiting if necessary
     * for space to become available.
     * <p>
     * This may throw an {@link InterruptedException} in two cases
     * <ol>
     *  <li>If the queue was shut down.</li>
     *  <li>If the thread was was interrupted.</li>
     * </ol>
     * So you have to check which is the case, e.g. by calling {@link #isShutdown()}.
     *
     * @param e the element to add.
     * @throws InterruptedException if interrupted while waiting or if the queue was shut down.
     */
    @Override
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        lock.lockInterruptibly();

        try {
            putInternal(e, true);
        }
        finally {
            lock.unlock();
        }
    }

    /**
     * Put if the queue has not been shutdown yet.
     *
     * @param e the element to put into the queue.
     * @return <code>true</code> if the element has been put into the queue, <code>false</code> if the queue was shutdown.
     * @throws InterruptedException if the calling thread was interrupted.
     * @since 4.4
     */
    public boolean putIfNotShutdown(E e) throws InterruptedException {
        checkNotNull(e);
        lock.lockInterruptibly();

        try {
            if (isShutdown) {
                return false;
            }

            putInternal(e, true);
            return true;
        } finally {
            lock.unlock();
        }
    }

    public void putAll(Collection<? extends E> elements) throws InterruptedException {
        checkNotNull(elements);
        lock.lockInterruptibly();

        try {
            for (E element : elements) {
                putInternal(element, false);
            }
        } finally {
            notEmpty.signalAll();
            lock.unlock();
        }
    }

    public enum TryPutResult {
        /**
         * The method was unable to acquire the queue lock.
         */
        couldNotLock,

        /**
         * The queue was shut down.
         */
        queueWasShutDown,

        /**
         * The method was unable to put another element into the queue because the queue was full.
         */
        queueWasFull,

        /**
         * The element was successfully placed into the queue.
         */
        putSuccessful,
    }

    public TryPutResult tryPut(E e) {
        checkNotNull(e);

        boolean locked = lock.tryLock();
        if (!locked) {
            return TryPutResult.couldNotLock;
        }
        try {
            if (isShutdown) {
                return TryPutResult.queueWasShutDown;
            }
            if (isFull()) {
                return TryPutResult.queueWasFull;
            }

            insert(e);
            return TryPutResult.putSuccessful;
        } finally {
            lock.unlock();
        }
    }

    @Override
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        lock.lockInterruptibly();
        try {
            while (true) {
                if (isNotFull()) {
                    insert(e);
                    return true;
                }
                if (nanos <= 0) {
                    return false;
                }
                try {
                    nanos = notFull.awaitNanos(nanos);
                    checkNotShutdown();
                }
                catch (InterruptedException ie) {
                    notFull.signal();
                    throw ie;
                }
            }
        }
        finally {
            lock.unlock();
        }

    }

    @Override
    public E take() throws InterruptedException {
        lock.lockInterruptibly();
        try {
            checkNotShutdown();
            try {
                while (hasNoElements()) {
                    notEmpty.await();
                    checkNotShutdown();
                }
            }
            catch (InterruptedException ie) {
                notEmpty.signal();
                throw ie;
            }
            E e = extract();
            return e;
        }
        finally {
            lock.unlock();
        }
    }

    public enum TryTakeResultCode {
        /**
         * The method was unable to acquire the queue lock.
         */
        couldNotLock,

        /**
         * The queue was shut down.
         */
        queueWasShutDown,

        /**
         * The queue was empty.
         */
        queueWasEmpty,

        /**
         * An element was successfully removed from the queue.
         */
        takeSuccessful,
    }

    public static final class TryTakeResult<E> {
        private final E element;
        private final TryTakeResultCode resultCode;

        private TryTakeResult(TryTakeResultCode resultCode) {
            assert resultCode != null;
            this.resultCode = resultCode;
            this.element = null;
        }

        private TryTakeResult(E element) {
            assert element != null;
            this.resultCode = TryTakeResultCode.takeSuccessful;
            this.element = element;
        }

        public TryTakeResultCode getResultCode() {
            return resultCode;
        }

        public E getElement() {
            return element;
        }
    }

    public TryTakeResult<E> tryTake() {
        boolean locked = lock.tryLock();
        if (!locked) {
            return new TryTakeResult<E>(TryTakeResultCode.couldNotLock);
        }
        try {
            if (isShutdown) {
                return new TryTakeResult<E>(TryTakeResultCode.queueWasShutDown);
            }
            if (hasNoElements()) {
                return new TryTakeResult<E>(TryTakeResultCode.queueWasEmpty);
            }
            E element = extract();
            return new TryTakeResult<E>(element);
        } finally {
            lock.unlock();
        }
    }

    @Override
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        lock.lockInterruptibly();
        try {
            checkNotShutdown();
            while (true) {
                if (hasElements()) {
                    E e = extract();
                    return e;
                }
                if (nanos <= 0) {
                    return null;
                }
                try {
                    nanos = notEmpty.awaitNanos(nanos);
                    checkNotShutdown();
                }
                catch (InterruptedException ie) {
                    notEmpty.signal();
                    throw ie;
                }
            }
        }
        finally {
            lock.unlock();
        }
    }

    @Override
    public int remainingCapacity() {
        lock.lock();
        try {
            return items.length - count;
        }
        finally {
            lock.unlock();
        }
    }

    @Override
    public int drainTo(Collection<? super E> c) {
        checkNotNull(c);
        if (c == this) {
            throw new IllegalArgumentException();
        }
        lock.lock();
        try {
            int i = takeIndex;
            int n = 0;
            for (; n < count; n++) {
                c.add(items[i]);
                items[i] = null;
                i = inc(i);
            }
            if (n > 0) {
                count = 0;
                putIndex = 0;
                takeIndex = 0;
                notFull.signalAll();
            }
            return n;
        }
        finally {
            lock.unlock();
        }
    }

    @Override
    public int drainTo(Collection<? super E> c, int maxElements) {
        checkNotNull(c);
        if (c == this) {
            throw new IllegalArgumentException();
        }
        if (maxElements <= 0) {
            return 0;
        }
        lock.lock();
        try {
            int i = takeIndex;
            int n = 0;
            int max = (maxElements < count) ? maxElements : count;
            for (; n < max; n++) {
                c.add(items[i]);
                items[i] = null;
                i = inc(i);
            }
            if (n > 0) {
                count -= n;
                takeIndex = i;
                notFull.signalAll();
            }
            return n;
        }
        finally {
            lock.unlock();
        }
    }

    @Override
    public int size() {
        lock.lock();
        try {
            return count;
        }
        finally {
            lock.unlock();
        }
    }

    @Override
    public Iterator<E> iterator() {
        lock.lock();
        try {
            return new Itr();
        }
        finally {
            lock.unlock();
        }
    }

    private class Itr implements Iterator<E> {
        private int nextIndex;
        private E nextItem;
        private int lastRet;

        Itr() {
            lastRet = -1;
            if (count == 0) {
                nextIndex = -1;
            }
            else {
                nextIndex = takeIndex;
                nextItem = items[takeIndex];
            }
        }

        @Override
        public boolean hasNext() {
            return nextIndex >= 0;
        }

        private void checkNext() {
            if (nextIndex == putIndex) {
                nextIndex = -1;
                nextItem = null;
            }
            else {
                nextItem = items[nextIndex];
                if (nextItem == null) {
                    nextIndex = -1;
                }
            }
        }

        @Override
        public E next() {
            lock.lock();
            try {
                if (nextIndex < 0) {
                    throw new NoSuchElementException();
                }
                lastRet = nextIndex;
                E e = nextItem;
                nextIndex = inc(nextIndex);
                checkNext();
                return e;
            }
            finally {
                lock.unlock();
            }
        }

        @Override
        public void remove() {
            lock.lock();
            try {
                int i = lastRet;
                if (i < 0) {
                    throw new IllegalStateException();
                }
                lastRet = -1;
                int ti = takeIndex;
                removeAt(i);
                nextIndex = (i == ti) ? takeIndex : i;
                checkNext();
            }
            finally {
                lock.unlock();
            }
        }
    }

}