001/** 002 * 003 * Copyright 2014 Florian Schmaus 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack.util; 018 019import java.util.AbstractQueue; 020import java.util.Collection; 021import java.util.Iterator; 022import java.util.NoSuchElementException; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.locks.Condition; 026import java.util.concurrent.locks.ReentrantLock; 027 028/** 029 * Like ArrayBlockingQueue but with additional {@link #shutdown()} and {@link #start} methods. Will 030 * throw {@link InterruptedException} if Queue has been shutdown on {@link #take()} and 031 * {@link #poll(long, TimeUnit)}. 032 * <p> 033 * Based on ArrayBlockingQueue of OpenJDK by Doug Lea (who released ArrayBlockingQueue as public 034 * domain). 035 * 036 * @param <E> the type of elements held in this collection 037 */ 038public class ArrayBlockingQueueWithShutdown<E> extends AbstractQueue<E> implements BlockingQueue<E> { 039 040 private final E[] items; 041 042 private int takeIndex; 043 044 private int putIndex; 045 046 private int count; 047 048 private final ReentrantLock lock; 049 050 private final Condition notEmpty; 051 052 private final Condition notFull; 053 054 private volatile boolean isShutdown = false; 055 056 private int inc(int i) { 057 return (++i == items.length) ? 0 : i; 058 } 059 060 private void insert(E e) { 061 items[putIndex] = e; 062 putIndex = inc(putIndex); 063 count++; 064 notEmpty.signal(); 065 } 066 067 private E extract() { 068 E e = items[takeIndex]; 069 items[takeIndex] = null; 070 takeIndex = inc(takeIndex); 071 count--; 072 notFull.signal(); 073 return e; 074 } 075 076 private void removeAt(int i) { 077 if (i == takeIndex) { 078 items[takeIndex] = null; 079 takeIndex = inc(takeIndex); 080 } 081 else { 082 while (true) { 083 int nexti = inc(i); 084 if (nexti != putIndex) { 085 items[i] = items[nexti]; 086 i = nexti; 087 } 088 else { 089 items[i] = null; 090 putIndex = i; 091 break; 092 } 093 } 094 } 095 count--; 096 notFull.signal(); 097 } 098 099 private static void checkNotNull(Object o) { 100 if (o == null) { 101 throw new NullPointerException(); 102 } 103 } 104 105 private void checkNotShutdown() throws InterruptedException { 106 if (isShutdown) { 107 throw new InterruptedException(); 108 } 109 } 110 111 private boolean hasNoElements() { 112 return count == 0; 113 } 114 115 private boolean hasElements() { 116 return !hasNoElements(); 117 } 118 119 private boolean isFull() { 120 return count == items.length; 121 } 122 123 private boolean isNotFull() { 124 return !isFull(); 125 } 126 127 public ArrayBlockingQueueWithShutdown(int capacity) { 128 this(capacity, false); 129 } 130 131 @SuppressWarnings("unchecked") 132 public ArrayBlockingQueueWithShutdown(int capacity, boolean fair) { 133 if (capacity <= 0) 134 throw new IllegalArgumentException(); 135 items = (E[]) new Object[capacity]; 136 lock = new ReentrantLock(fair); 137 notEmpty = lock.newCondition(); 138 notFull = lock.newCondition(); 139 } 140 141 /** 142 * Shutdown the Queue. Will method currently waiting for a not full/empty condition will unblock 143 * (and usually throw a InterruptedException). 144 */ 145 public void shutdown() { 146 lock.lock(); 147 try { 148 isShutdown = true; 149 notEmpty.signalAll(); 150 notFull.signalAll(); 151 } 152 finally { 153 lock.unlock(); 154 } 155 } 156 157 /** 158 * Start the queue. Newly created instances will be started automatically, thus this only needs 159 * to be called after {@link #shutdown()}. 160 */ 161 public void start() { 162 lock.lock(); 163 try { 164 isShutdown = false; 165 } 166 finally { 167 lock.unlock(); 168 } 169 } 170 171 /** 172 * Returns true if the queue is currently shut down. 173 * 174 * @return true if the queue is shut down. 175 */ 176 public boolean isShutdown() { 177 lock.lock(); 178 try { 179 return isShutdown; 180 } finally { 181 lock.unlock(); 182 } 183 } 184 185 @Override 186 public E poll() { 187 lock.lock(); 188 try { 189 if (hasNoElements()) { 190 return null; 191 } 192 E e = extract(); 193 return e; 194 } 195 finally { 196 lock.unlock(); 197 } 198 } 199 200 @Override 201 public E peek() { 202 lock.lock(); 203 try { 204 return hasNoElements() ? null : items[takeIndex]; 205 } 206 finally { 207 lock.unlock(); 208 } 209 } 210 211 @Override 212 public boolean offer(E e) { 213 checkNotNull(e); 214 lock.lock(); 215 try { 216 if (isFull() || isShutdown) { 217 return false; 218 } 219 else { 220 insert(e); 221 return true; 222 } 223 } 224 finally { 225 lock.unlock(); 226 } 227 } 228 229 /** 230 * Inserts the specified element into this queue, waiting if necessary 231 * for space to become available. 232 * <p> 233 * This may throw an {@link InterruptedException} in two cases 234 * <ol> 235 * <li>If the queue was shut down.</li> 236 * <li>If the thread was was interrupted.</li> 237 * </ol> 238 * So you have to check which is the case, e.g. by calling {@link #isShutdown()}. 239 * 240 * @param e the element to add. 241 * @throws InterruptedException if interrupted while waiting or if the queue was shut down. 242 */ 243 @Override 244 public void put(E e) throws InterruptedException { 245 checkNotNull(e); 246 lock.lockInterruptibly(); 247 248 try { 249 while (isFull()) { 250 try { 251 notFull.await(); 252 checkNotShutdown(); 253 } 254 catch (InterruptedException ie) { 255 notFull.signal(); 256 throw ie; 257 } 258 } 259 insert(e); 260 } 261 finally { 262 lock.unlock(); 263 } 264 } 265 266 @Override 267 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { 268 checkNotNull(e); 269 long nanos = unit.toNanos(timeout); 270 lock.lockInterruptibly(); 271 try { 272 while (true) { 273 if (isNotFull()) { 274 insert(e); 275 return true; 276 } 277 if (nanos <= 0) { 278 return false; 279 } 280 try { 281 nanos = notFull.awaitNanos(nanos); 282 checkNotShutdown(); 283 } 284 catch (InterruptedException ie) { 285 notFull.signal(); 286 throw ie; 287 } 288 } 289 } 290 finally { 291 lock.unlock(); 292 } 293 294 } 295 296 @Override 297 public E take() throws InterruptedException { 298 lock.lockInterruptibly(); 299 try { 300 checkNotShutdown(); 301 try { 302 while (hasNoElements()) { 303 notEmpty.await(); 304 checkNotShutdown(); 305 } 306 } 307 catch (InterruptedException ie) { 308 notEmpty.signal(); 309 throw ie; 310 } 311 E e = extract(); 312 return e; 313 } 314 finally { 315 lock.unlock(); 316 } 317 } 318 319 @Override 320 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 321 long nanos = unit.toNanos(timeout); 322 lock.lockInterruptibly(); 323 try { 324 checkNotShutdown(); 325 while (true) { 326 if (hasElements()) { 327 E e = extract(); 328 return e; 329 } 330 if (nanos <= 0) { 331 return null; 332 } 333 try { 334 nanos = notEmpty.awaitNanos(nanos); 335 checkNotShutdown(); 336 } 337 catch (InterruptedException ie) { 338 notEmpty.signal(); 339 throw ie; 340 } 341 } 342 } 343 finally { 344 lock.unlock(); 345 } 346 } 347 348 @Override 349 public int remainingCapacity() { 350 lock.lock(); 351 try { 352 return items.length - count; 353 } 354 finally { 355 lock.unlock(); 356 } 357 } 358 359 @Override 360 public int drainTo(Collection<? super E> c) { 361 checkNotNull(c); 362 if (c == this) { 363 throw new IllegalArgumentException(); 364 } 365 lock.lock(); 366 try { 367 int i = takeIndex; 368 int n = 0; 369 for (; n < count; n++) { 370 c.add(items[i]); 371 items[i] = null; 372 i = inc(i); 373 } 374 if (n > 0) { 375 count = 0; 376 putIndex = 0; 377 takeIndex = 0; 378 notFull.signalAll(); 379 } 380 return n; 381 } 382 finally { 383 lock.unlock(); 384 } 385 } 386 387 @Override 388 public int drainTo(Collection<? super E> c, int maxElements) { 389 checkNotNull(c); 390 if (c == this) { 391 throw new IllegalArgumentException(); 392 } 393 if (maxElements <= 0) { 394 return 0; 395 } 396 lock.lock(); 397 try { 398 int i = takeIndex; 399 int n = 0; 400 int max = (maxElements < count) ? maxElements : count; 401 for (; n < max; n++) { 402 c.add(items[i]); 403 items[i] = null; 404 i = inc(i); 405 } 406 if (n > 0) { 407 count -= n; 408 takeIndex = i; 409 notFull.signalAll(); 410 } 411 return n; 412 } 413 finally { 414 lock.unlock(); 415 } 416 } 417 418 @Override 419 public int size() { 420 lock.lock(); 421 try { 422 return count; 423 } 424 finally { 425 lock.unlock(); 426 } 427 } 428 429 @Override 430 public Iterator<E> iterator() { 431 lock.lock(); 432 try { 433 return new Itr(); 434 } 435 finally { 436 lock.unlock(); 437 } 438 } 439 440 private class Itr implements Iterator<E> { 441 private int nextIndex; 442 private E nextItem; 443 private int lastRet; 444 445 Itr() { 446 lastRet = -1; 447 if (count == 0) { 448 nextIndex = -1; 449 } 450 else { 451 nextIndex = takeIndex; 452 nextItem = items[takeIndex]; 453 } 454 } 455 456 @Override 457 public boolean hasNext() { 458 return nextIndex >= 0; 459 } 460 461 private void checkNext() { 462 if (nextIndex == putIndex) { 463 nextIndex = -1; 464 nextItem = null; 465 } 466 else { 467 nextItem = items[nextIndex]; 468 if (nextItem == null) { 469 nextIndex = -1; 470 } 471 } 472 } 473 474 @Override 475 public E next() { 476 lock.lock(); 477 try { 478 if (nextIndex < 0) { 479 throw new NoSuchElementException(); 480 } 481 lastRet = nextIndex; 482 E e = nextItem; 483 nextIndex = inc(nextIndex); 484 checkNext(); 485 return e; 486 } 487 finally { 488 lock.unlock(); 489 } 490 } 491 492 @Override 493 public void remove() { 494 lock.lock(); 495 try { 496 int i = lastRet; 497 if (i < 0) { 498 throw new IllegalStateException(); 499 } 500 lastRet = -1; 501 int ti = takeIndex; 502 removeAt(i); 503 nextIndex = (i == ti) ? takeIndex : i; 504 checkNext(); 505 } 506 finally { 507 lock.unlock(); 508 } 509 } 510 } 511 512}