001/**
002 *
003 * Copyright 2014 Florian Schmaus
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack.util;
018
019import java.util.AbstractQueue;
020import java.util.Collection;
021import java.util.Iterator;
022import java.util.NoSuchElementException;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.locks.Condition;
026import java.util.concurrent.locks.ReentrantLock;
027
028/**
029 * Like ArrayBlockingQueue but with additional {@link #shutdown()} and {@link #start} methods. Will
030 * throw {@link InterruptedException} if Queue has been shutdown on {@link #take()} and
031 * {@link #poll(long, TimeUnit)}.
032 * <p>
033 * Based on ArrayBlockingQueue of OpenJDK by Doug Lea (who released ArrayBlockingQueue as public
034 * domain).
035 * 
036 * @param <E> the type of elements held in this collection
037 */
038public class ArrayBlockingQueueWithShutdown<E> extends AbstractQueue<E> implements BlockingQueue<E> {
039
040    private final E[] items;
041
042    private int takeIndex;
043
044    private int putIndex;
045
046    private int count;
047
048    private final ReentrantLock lock;
049
050    private final Condition notEmpty;
051
052    private final Condition notFull;
053
054    private volatile boolean isShutdown = false;
055
056    private final int inc(int i) {
057        return (++i == items.length) ? 0 : i;
058    }
059
060    private final void insert(E e) {
061        items[putIndex] = e;
062        putIndex = inc(putIndex);
063        count++;
064        notEmpty.signal();
065    }
066
067    private final E extract() {
068        E e = items[takeIndex];
069        items[takeIndex] = null;
070        takeIndex = inc(takeIndex);
071        count--;
072        notFull.signal();
073        return e;
074    }
075
076    private final void removeAt(int i) {
077        if (i == takeIndex) {
078            items[takeIndex] = null;
079            takeIndex = inc(takeIndex);
080        }
081        else {
082            while (true) {
083                int nexti = inc(i);
084                if (nexti != putIndex) {
085                    items[i] = items[nexti];
086                    i = nexti;
087                }
088                else {
089                    items[i] = null;
090                    putIndex = i;
091                    break;
092                }
093            }
094        }
095        count--;
096        notFull.signal();
097    }
098
099    private final static void checkNotNull(Object o) {
100        if (o == null) {
101            throw new NullPointerException();
102        }
103    }
104
105    private final void checkNotShutdown() throws InterruptedException {
106        if (isShutdown) {
107            throw new InterruptedException();
108        }
109    }
110
111    private final boolean hasNoElements() {
112        return count == 0;
113    }
114
115    private final boolean hasElements() {
116        return !hasNoElements();
117    }
118
119    private final boolean isFull() {
120        return count == items.length;
121    }
122
123    private final boolean isNotFull() {
124        return !isFull();
125    }
126
127    public ArrayBlockingQueueWithShutdown(int capacity) {
128        this(capacity, false);
129    }
130
131    @SuppressWarnings("unchecked")
132    public ArrayBlockingQueueWithShutdown(int capacity, boolean fair) {
133        if (capacity <= 0)
134            throw new IllegalArgumentException();
135        items = (E[]) new Object[capacity];
136        lock = new ReentrantLock(fair);
137        notEmpty = lock.newCondition();
138        notFull = lock.newCondition();
139    }
140
141    /**
142     * Shutdown the Queue. Will method currently waiting for a not full/empty condition will unblock
143     * (and usually throw a InterruptedException).
144     */
145    public void shutdown() {
146        lock.lock();
147        try {
148            isShutdown = true;
149            notEmpty.signalAll();
150            notFull.signalAll();
151        }
152        finally {
153            lock.unlock();
154        }
155    }
156
157    /**
158     * Start the queue. Newly created instances will be started automatically, thus this only needs
159     * to be called after {@link #shutdown()}.
160     */
161    public void start() {
162        lock.lock();
163        try {
164            isShutdown = false;
165        }
166        finally {
167            lock.unlock();
168        }
169    }
170
171    /**
172     * Returns true if the queue is currently shut down.
173     * 
174     * @return true if the queue is shut down.
175     */
176    public boolean isShutdown() {
177        lock.lock();
178        try {
179            return isShutdown;
180        } finally {
181            lock.unlock();
182        }
183    }
184
185    @Override
186    public E poll() {
187        lock.lock();
188        try {
189            if (hasNoElements()) {
190                return null;
191            }
192            E e = extract();
193            return e;
194        }
195        finally {
196            lock.unlock();
197        }
198    }
199
200    @Override
201    public E peek() {
202        lock.lock();
203        try {
204            return hasNoElements() ? null : items[takeIndex];
205        }
206        finally {
207            lock.unlock();
208        }
209    }
210
211    @Override
212    public boolean offer(E e) {
213        checkNotNull(e);
214        lock.lock();
215        try {
216            if (isFull() || isShutdown) {
217                return false;
218            }
219            else {
220                insert(e);
221                return true;
222            }
223        }
224        finally {
225            lock.unlock();
226        }
227    }
228
229    @Override
230    public void put(E e) throws InterruptedException {
231        checkNotNull(e);
232        lock.lockInterruptibly();
233
234        try {
235            while (isFull()) {
236                try {
237                    notFull.await();
238                    checkNotShutdown();
239                }
240                catch (InterruptedException ie) {
241                    notFull.signal();
242                    throw ie;
243                }
244            }
245            insert(e);
246        }
247        finally {
248            lock.unlock();
249        }
250    }
251
252    @Override
253    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
254        checkNotNull(e);
255        long nanos = unit.toNanos(timeout);
256        lock.lockInterruptibly();
257        try {
258            while (true) {
259                if (isNotFull()) {
260                    insert(e);
261                    return true;
262                }
263                if (nanos <= 0) {
264                    return false;
265                }
266                try {
267                    nanos = notFull.awaitNanos(nanos);
268                    checkNotShutdown();
269                }
270                catch (InterruptedException ie) {
271                    notFull.signal();
272                    throw ie;
273                }
274            }
275        }
276        finally {
277            lock.unlock();
278        }
279
280    }
281
282    @Override
283    public E take() throws InterruptedException {
284        lock.lockInterruptibly();
285        try {
286            checkNotShutdown();
287            try {
288                while (hasNoElements()) {
289                    notEmpty.await();
290                    checkNotShutdown();
291                }
292            }
293            catch (InterruptedException ie) {
294                notEmpty.signal();
295                throw ie;
296            }
297            E e = extract();
298            return e;
299        }
300        finally {
301            lock.unlock();
302        }
303    }
304
305    @Override
306    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
307        long nanos = unit.toNanos(timeout);
308        lock.lockInterruptibly();
309        try {
310            checkNotShutdown();
311            while (true) {
312                if (hasElements()) {
313                    E e = extract();
314                    return e;
315                }
316                if (nanos <= 0) {
317                    return null;
318                }
319                try {
320                    nanos = notEmpty.awaitNanos(nanos);
321                    checkNotShutdown();
322                }
323                catch (InterruptedException ie) {
324                    notEmpty.signal();
325                    throw ie;
326                }
327            }
328        }
329        finally {
330            lock.unlock();
331        }
332    }
333
334    @Override
335    public int remainingCapacity() {
336        lock.lock();
337        try {
338            return items.length - count;
339        }
340        finally {
341            lock.unlock();
342        }
343    }
344
345    @Override
346    public int drainTo(Collection<? super E> c) {
347        checkNotNull(c);
348        if (c == this) {
349            throw new IllegalArgumentException();
350        }
351        lock.lock();
352        try {
353            int i = takeIndex;
354            int n = 0;
355            for (; n < count; n++) {
356                c.add(items[i]);
357                items[i] = null;
358                i = inc(i);
359            }
360            if (n > 0) {
361                count = 0;
362                putIndex = 0;
363                takeIndex = 0;
364                notFull.signalAll();
365            }
366            return n;
367        }
368        finally {
369            lock.unlock();
370        }
371    }
372
373    @Override
374    public int drainTo(Collection<? super E> c, int maxElements) {
375        checkNotNull(c);
376        if (c == this) {
377            throw new IllegalArgumentException();
378        }
379        if (maxElements <= 0) {
380            return 0;
381        }
382        lock.lock();
383        try {
384            int i = takeIndex;
385            int n = 0;
386            int max = (maxElements < count) ? maxElements : count;
387            for (; n < max; n++) {
388                c.add(items[i]);
389                items[i] = null;
390                i = inc(i);
391            }
392            if (n > 0) {
393                count -= n;
394                takeIndex = i;
395                notFull.signalAll();
396            }
397            return n;
398        }
399        finally {
400            lock.unlock();
401        }
402    }
403
404    @Override
405    public int size() {
406        lock.lock();
407        try {
408            return count;
409        }
410        finally {
411            lock.unlock();
412        }
413    }
414
415    @Override
416    public Iterator<E> iterator() {
417        lock.lock();
418        try {
419            return new Itr();
420        }
421        finally {
422            lock.unlock();
423        }
424    }
425
426    private class Itr implements Iterator<E> {
427        private int nextIndex;
428        private E nextItem;
429        private int lastRet;
430
431        Itr() {
432            lastRet = -1;
433            if (count == 0) {
434                nextIndex = -1;
435            }
436            else {
437                nextIndex = takeIndex;
438                nextItem = items[takeIndex];
439            }
440        }
441
442        public boolean hasNext() {
443            return nextIndex >= 0;
444        }
445
446        private void checkNext() {
447            if (nextIndex == putIndex) {
448                nextIndex = -1;
449                nextItem = null;
450            }
451            else {
452                nextItem = items[nextIndex];
453                if (nextItem == null) {
454                    nextIndex = -1;
455                }
456            }
457        }
458
459        public E next() {
460            lock.lock();
461            try {
462                if (nextIndex < 0) {
463                    throw new NoSuchElementException();
464                }
465                lastRet = nextIndex;
466                E e = nextItem;
467                nextIndex = inc(nextIndex);
468                checkNext();
469                return e;
470            }
471            finally {
472                lock.unlock();
473            }
474        }
475
476        public void remove() {
477            lock.lock();
478            try {
479                int i = lastRet;
480                if (i < 0) {
481                    throw new IllegalStateException();
482                }
483                lastRet = -1;
484                int ti = takeIndex;
485                removeAt(i);
486                nextIndex = (i == ti) ? takeIndex : i;
487                checkNext();
488            }
489            finally {
490                lock.unlock();
491            }
492        }
493    }
494
495}