001/**
002 *
003 * Copyright 2014-2019 Florian Schmaus
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack.util;
018
019import java.util.AbstractQueue;
020import java.util.Collection;
021import java.util.Iterator;
022import java.util.NoSuchElementException;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.locks.Condition;
026import java.util.concurrent.locks.ReentrantLock;
027
028/**
029 * Like ArrayBlockingQueue but with additional {@link #shutdown()} and {@link #start} methods. Will
030 * throw {@link InterruptedException} if Queue has been shutdown on {@link #take()} and
031 * {@link #poll(long, TimeUnit)}.
032 * <p>
033 * Based on ArrayBlockingQueue of OpenJDK by Doug Lea (who released ArrayBlockingQueue as public
034 * domain).
035 *
036 * @param <E> the type of elements held in this collection
037 */
038public class ArrayBlockingQueueWithShutdown<E> extends AbstractQueue<E> implements BlockingQueue<E> {
039
040    private final E[] items;
041
042    private int takeIndex;
043
044    private int putIndex;
045
046    private int count;
047
048    private final ReentrantLock lock;
049
050    private final Condition notEmpty;
051
052    private final Condition notFull;
053
054    private volatile boolean isShutdown = false;
055
056    private int inc(int i) {
057        return (++i == items.length) ? 0 : i;
058    }
059
060    private void insert(E e) {
061        insert(e, true);
062    }
063
064    private void insert(E e, boolean signalNotEmpty) {
065        items[putIndex] = e;
066        putIndex = inc(putIndex);
067        count++;
068        if (signalNotEmpty) {
069            notEmpty.signal();
070        }
071    }
072
073    private E extract() {
074        E e = items[takeIndex];
075        items[takeIndex] = null;
076        takeIndex = inc(takeIndex);
077        count--;
078        notFull.signal();
079        return e;
080    }
081
082    private void removeAt(int i) {
083        if (i == takeIndex) {
084            items[takeIndex] = null;
085            takeIndex = inc(takeIndex);
086        }
087        else {
088            while (true) {
089                int nexti = inc(i);
090                if (nexti != putIndex) {
091                    items[i] = items[nexti];
092                    i = nexti;
093                }
094                else {
095                    items[i] = null;
096                    putIndex = i;
097                    break;
098                }
099            }
100        }
101        count--;
102        notFull.signal();
103    }
104
105    private static void checkNotNull(Object o) {
106        if (o == null) {
107            throw new NullPointerException();
108        }
109    }
110
111    private void checkNotShutdown() throws InterruptedException {
112        if (isShutdown) {
113            throw new InterruptedException("Queue was already shut down");
114        }
115    }
116
117    private boolean hasNoElements() {
118        return count == 0;
119    }
120
121    private boolean hasElements() {
122        return !hasNoElements();
123    }
124
125    private boolean isFull() {
126        return count == items.length;
127    }
128
129    private boolean isNotFull() {
130        return !isFull();
131    }
132
133    public ArrayBlockingQueueWithShutdown(int capacity) {
134        this(capacity, false);
135    }
136
137    @SuppressWarnings("unchecked")
138    public ArrayBlockingQueueWithShutdown(int capacity, boolean fair) {
139        if (capacity <= 0)
140            throw new IllegalArgumentException();
141        items = (E[]) new Object[capacity];
142        lock = new ReentrantLock(fair);
143        notEmpty = lock.newCondition();
144        notFull = lock.newCondition();
145    }
146
147    /**
148     * Shutdown the Queue. Will method currently waiting for a not full/empty condition will unblock
149     * (and usually throw a InterruptedException).
150     */
151    public void shutdown() {
152        lock.lock();
153        try {
154            isShutdown = true;
155            notEmpty.signalAll();
156            notFull.signalAll();
157        }
158        finally {
159            lock.unlock();
160        }
161    }
162
163    /**
164     * Start the queue. Newly created instances will be started automatically, thus this only needs
165     * to be called after {@link #shutdown()}.
166     *
167     * @return <code>true</code> if the queues was shutdown before, <code>false</code> if not.
168     */
169    public boolean start() {
170        boolean previousIsShutdown;
171        lock.lock();
172        try {
173            previousIsShutdown = isShutdown;
174            isShutdown = false;
175        }
176        finally {
177            lock.unlock();
178        }
179        return previousIsShutdown;
180    }
181
182    /**
183     * Returns true if the queue is currently shut down.
184     *
185     * @return true if the queue is shut down.
186     */
187    public boolean isShutdown() {
188        lock.lock();
189        try {
190            return isShutdown;
191        } finally {
192            lock.unlock();
193        }
194    }
195
196    @Override
197    public E poll() {
198        lock.lock();
199        try {
200            if (hasNoElements()) {
201                return null;
202            }
203            E e = extract();
204            return e;
205        }
206        finally {
207            lock.unlock();
208        }
209    }
210
211    @Override
212    public E peek() {
213        lock.lock();
214        try {
215            return hasNoElements() ? null : items[takeIndex];
216        }
217        finally {
218            lock.unlock();
219        }
220    }
221
222    @Override
223    public boolean offer(E e) {
224        checkNotNull(e);
225        lock.lock();
226        try {
227            if (isFull() || isShutdown) {
228                return false;
229            }
230            else {
231                insert(e);
232                return true;
233            }
234        }
235        finally {
236            lock.unlock();
237        }
238    }
239
240    public boolean offerAndShutdown(E e) {
241        checkNotNull(e);
242        boolean res;
243        lock.lock();
244        try {
245            res = offer(e);
246            shutdown();
247        } finally {
248            lock.unlock();
249        }
250        return res;
251    }
252
253    private void putInternal(E e, boolean signalNotEmpty) throws InterruptedException {
254        assert lock.isHeldByCurrentThread();
255
256        while (isFull()) {
257            try {
258                notFull.await();
259                checkNotShutdown();
260            }
261            catch (InterruptedException ie) {
262                notFull.signal();
263                throw ie;
264            }
265        }
266        insert(e, signalNotEmpty);
267    }
268
269    /**
270     * Inserts the specified element into this queue, waiting if necessary
271     * for space to become available.
272     * <p>
273     * This may throw an {@link InterruptedException} in two cases
274     * <ol>
275     *  <li>If the queue was shut down.</li>
276     *  <li>If the thread was was interrupted.</li>
277     * </ol>
278     * So you have to check which is the case, e.g. by calling {@link #isShutdown()}.
279     *
280     * @param e the element to add.
281     * @throws InterruptedException if interrupted while waiting or if the queue was shut down.
282     */
283    @Override
284    public void put(E e) throws InterruptedException {
285        checkNotNull(e);
286        lock.lockInterruptibly();
287
288        try {
289            putInternal(e, true);
290        }
291        finally {
292            lock.unlock();
293        }
294    }
295
296    /**
297     * Put if the queue has not been shutdown yet.
298     *
299     * @param e the element to put into the queue.
300     * @return <code>true</code> if the element has been put into the queue, <code>false</code> if the queue was shutdown.
301     * @throws InterruptedException if the calling thread was interrupted.
302     * @since 4.4
303     */
304    public boolean putIfNotShutdown(E e) throws InterruptedException {
305        checkNotNull(e);
306        lock.lockInterruptibly();
307
308        try {
309            if (isShutdown) {
310                return false;
311            }
312
313            putInternal(e, true);
314            return true;
315        } finally {
316            lock.unlock();
317        }
318    }
319
320    public void putAll(Collection<? extends E> elements) throws InterruptedException {
321        checkNotNull(elements);
322        lock.lockInterruptibly();
323
324        try {
325            for (E element : elements) {
326                putInternal(element, false);
327            }
328        } finally {
329            notEmpty.signalAll();
330            lock.unlock();
331        }
332    }
333
334    public enum TryPutResult {
335        /**
336         * The method was unable to acquire the queue lock.
337         */
338        couldNotLock,
339
340        /**
341         * The queue was shut down.
342         */
343        queueWasShutDown,
344
345        /**
346         * The method was unable to put another element into the queue because the queue was full.
347         */
348        queueWasFull,
349
350        /**
351         * The element was successfully placed into the queue.
352         */
353        putSuccessful,
354    }
355
356    public TryPutResult tryPut(E e) {
357        checkNotNull(e);
358
359        boolean locked = lock.tryLock();
360        if (!locked) {
361            return TryPutResult.couldNotLock;
362        }
363        try {
364            if (isShutdown) {
365                return TryPutResult.queueWasShutDown;
366            }
367            if (isFull()) {
368                return TryPutResult.queueWasFull;
369            }
370
371            insert(e);
372            return TryPutResult.putSuccessful;
373        } finally {
374            lock.unlock();
375        }
376    }
377
378    @Override
379    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
380        checkNotNull(e);
381        long nanos = unit.toNanos(timeout);
382        lock.lockInterruptibly();
383        try {
384            while (true) {
385                if (isNotFull()) {
386                    insert(e);
387                    return true;
388                }
389                if (nanos <= 0) {
390                    return false;
391                }
392                try {
393                    nanos = notFull.awaitNanos(nanos);
394                    checkNotShutdown();
395                }
396                catch (InterruptedException ie) {
397                    notFull.signal();
398                    throw ie;
399                }
400            }
401        }
402        finally {
403            lock.unlock();
404        }
405
406    }
407
408    @Override
409    public E take() throws InterruptedException {
410        lock.lockInterruptibly();
411        try {
412            checkNotShutdown();
413            try {
414                while (hasNoElements()) {
415                    notEmpty.await();
416                    checkNotShutdown();
417                }
418            }
419            catch (InterruptedException ie) {
420                notEmpty.signal();
421                throw ie;
422            }
423            E e = extract();
424            return e;
425        }
426        finally {
427            lock.unlock();
428        }
429    }
430
431    public enum TryTakeResultCode {
432        /**
433         * The method was unable to acquire the queue lock.
434         */
435        couldNotLock,
436
437        /**
438         * The queue was shut down.
439         */
440        queueWasShutDown,
441
442        /**
443         * The queue was empty.
444         */
445        queueWasEmpty,
446
447        /**
448         * An element was successfully removed from the queue.
449         */
450        takeSuccessful,
451    }
452
453    public static final class TryTakeResult<E> {
454        private final E element;
455        private final TryTakeResultCode resultCode;
456
457        private TryTakeResult(TryTakeResultCode resultCode) {
458            assert resultCode != null;
459            this.resultCode = resultCode;
460            this.element = null;
461        }
462
463        private TryTakeResult(E element) {
464            assert element != null;
465            this.resultCode = TryTakeResultCode.takeSuccessful;
466            this.element = element;
467        }
468
469        public TryTakeResultCode getResultCode() {
470            return resultCode;
471        }
472
473        public E getElement() {
474            return element;
475        }
476    }
477
478    public TryTakeResult<E> tryTake() {
479        boolean locked = lock.tryLock();
480        if (!locked) {
481            return new TryTakeResult<E>(TryTakeResultCode.couldNotLock);
482        }
483        try {
484            if (isShutdown) {
485                return new TryTakeResult<E>(TryTakeResultCode.queueWasShutDown);
486            }
487            if (hasNoElements()) {
488                return new TryTakeResult<E>(TryTakeResultCode.queueWasEmpty);
489            }
490            E element = extract();
491            return new TryTakeResult<E>(element);
492        } finally {
493            lock.unlock();
494        }
495    }
496
497    @Override
498    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
499        long nanos = unit.toNanos(timeout);
500        lock.lockInterruptibly();
501        try {
502            checkNotShutdown();
503            while (true) {
504                if (hasElements()) {
505                    E e = extract();
506                    return e;
507                }
508                if (nanos <= 0) {
509                    return null;
510                }
511                try {
512                    nanos = notEmpty.awaitNanos(nanos);
513                    checkNotShutdown();
514                }
515                catch (InterruptedException ie) {
516                    notEmpty.signal();
517                    throw ie;
518                }
519            }
520        }
521        finally {
522            lock.unlock();
523        }
524    }
525
526    @Override
527    public int remainingCapacity() {
528        lock.lock();
529        try {
530            return items.length - count;
531        }
532        finally {
533            lock.unlock();
534        }
535    }
536
537    @Override
538    public int drainTo(Collection<? super E> c) {
539        checkNotNull(c);
540        if (c == this) {
541            throw new IllegalArgumentException();
542        }
543        lock.lock();
544        try {
545            int i = takeIndex;
546            int n = 0;
547            for (; n < count; n++) {
548                c.add(items[i]);
549                items[i] = null;
550                i = inc(i);
551            }
552            if (n > 0) {
553                count = 0;
554                putIndex = 0;
555                takeIndex = 0;
556                notFull.signalAll();
557            }
558            return n;
559        }
560        finally {
561            lock.unlock();
562        }
563    }
564
565    @Override
566    public int drainTo(Collection<? super E> c, int maxElements) {
567        checkNotNull(c);
568        if (c == this) {
569            throw new IllegalArgumentException();
570        }
571        if (maxElements <= 0) {
572            return 0;
573        }
574        lock.lock();
575        try {
576            int i = takeIndex;
577            int n = 0;
578            int max = (maxElements < count) ? maxElements : count;
579            for (; n < max; n++) {
580                c.add(items[i]);
581                items[i] = null;
582                i = inc(i);
583            }
584            if (n > 0) {
585                count -= n;
586                takeIndex = i;
587                notFull.signalAll();
588            }
589            return n;
590        }
591        finally {
592            lock.unlock();
593        }
594    }
595
596    @Override
597    public int size() {
598        lock.lock();
599        try {
600            return count;
601        }
602        finally {
603            lock.unlock();
604        }
605    }
606
607    @Override
608    public Iterator<E> iterator() {
609        lock.lock();
610        try {
611            return new Itr();
612        }
613        finally {
614            lock.unlock();
615        }
616    }
617
618    private class Itr implements Iterator<E> {
619        private int nextIndex;
620        private E nextItem;
621        private int lastRet;
622
623        Itr() {
624            lastRet = -1;
625            if (count == 0) {
626                nextIndex = -1;
627            }
628            else {
629                nextIndex = takeIndex;
630                nextItem = items[takeIndex];
631            }
632        }
633
634        @Override
635        public boolean hasNext() {
636            return nextIndex >= 0;
637        }
638
639        private void checkNext() {
640            if (nextIndex == putIndex) {
641                nextIndex = -1;
642                nextItem = null;
643            }
644            else {
645                nextItem = items[nextIndex];
646                if (nextItem == null) {
647                    nextIndex = -1;
648                }
649            }
650        }
651
652        @Override
653        public E next() {
654            lock.lock();
655            try {
656                if (nextIndex < 0) {
657                    throw new NoSuchElementException();
658                }
659                lastRet = nextIndex;
660                E e = nextItem;
661                nextIndex = inc(nextIndex);
662                checkNext();
663                return e;
664            }
665            finally {
666                lock.unlock();
667            }
668        }
669
670        @Override
671        public void remove() {
672            lock.lock();
673            try {
674                int i = lastRet;
675                if (i < 0) {
676                    throw new IllegalStateException();
677                }
678                lastRet = -1;
679                int ti = takeIndex;
680                removeAt(i);
681                nextIndex = (i == ti) ? takeIndex : i;
682                checkNext();
683            }
684            finally {
685                lock.unlock();
686            }
687        }
688    }
689
690}