001/** 002 * 003 * Copyright 2014-2019 Florian Schmaus 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack.util; 018 019import java.util.AbstractQueue; 020import java.util.Collection; 021import java.util.Iterator; 022import java.util.NoSuchElementException; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.locks.Condition; 026import java.util.concurrent.locks.ReentrantLock; 027 028/** 029 * Like ArrayBlockingQueue but with additional {@link #shutdown()} and {@link #start} methods. Will 030 * throw {@link InterruptedException} if Queue has been shutdown on {@link #take()} and 031 * {@link #poll(long, TimeUnit)}. 032 * <p> 033 * Based on ArrayBlockingQueue of OpenJDK by Doug Lea (who released ArrayBlockingQueue as public 034 * domain). 035 * 036 * @param <E> the type of elements held in this collection 037 */ 038public class ArrayBlockingQueueWithShutdown<E> extends AbstractQueue<E> implements BlockingQueue<E> { 039 040 private final E[] items; 041 042 private int takeIndex; 043 044 private int putIndex; 045 046 private int count; 047 048 private final ReentrantLock lock; 049 050 private final Condition notEmpty; 051 052 private final Condition notFull; 053 054 private volatile boolean isShutdown = false; 055 056 private int inc(int i) { 057 return (++i == items.length) ? 0 : i; 058 } 059 060 private void insert(E e) { 061 insert(e, true); 062 } 063 064 private void insert(E e, boolean signalNotEmpty) { 065 items[putIndex] = e; 066 putIndex = inc(putIndex); 067 count++; 068 if (signalNotEmpty) { 069 notEmpty.signal(); 070 } 071 } 072 073 private E extract() { 074 E e = items[takeIndex]; 075 items[takeIndex] = null; 076 takeIndex = inc(takeIndex); 077 count--; 078 notFull.signal(); 079 return e; 080 } 081 082 private void removeAt(int i) { 083 if (i == takeIndex) { 084 items[takeIndex] = null; 085 takeIndex = inc(takeIndex); 086 } 087 else { 088 while (true) { 089 int nexti = inc(i); 090 if (nexti != putIndex) { 091 items[i] = items[nexti]; 092 i = nexti; 093 } 094 else { 095 items[i] = null; 096 putIndex = i; 097 break; 098 } 099 } 100 } 101 count--; 102 notFull.signal(); 103 } 104 105 private static void checkNotNull(Object o) { 106 if (o == null) { 107 throw new NullPointerException(); 108 } 109 } 110 111 private void checkNotShutdown() throws InterruptedException { 112 if (isShutdown) { 113 throw new InterruptedException("Queue was already shut down"); 114 } 115 } 116 117 private boolean hasNoElements() { 118 return count == 0; 119 } 120 121 private boolean hasElements() { 122 return !hasNoElements(); 123 } 124 125 private boolean isFull() { 126 return count == items.length; 127 } 128 129 private boolean isNotFull() { 130 return !isFull(); 131 } 132 133 public ArrayBlockingQueueWithShutdown(int capacity) { 134 this(capacity, false); 135 } 136 137 @SuppressWarnings("unchecked") 138 public ArrayBlockingQueueWithShutdown(int capacity, boolean fair) { 139 if (capacity <= 0) 140 throw new IllegalArgumentException(); 141 items = (E[]) new Object[capacity]; 142 lock = new ReentrantLock(fair); 143 notEmpty = lock.newCondition(); 144 notFull = lock.newCondition(); 145 } 146 147 /** 148 * Shutdown the Queue. Will method currently waiting for a not full/empty condition will unblock 149 * (and usually throw a InterruptedException). 150 */ 151 public void shutdown() { 152 lock.lock(); 153 try { 154 isShutdown = true; 155 notEmpty.signalAll(); 156 notFull.signalAll(); 157 } 158 finally { 159 lock.unlock(); 160 } 161 } 162 163 /** 164 * Start the queue. Newly created instances will be started automatically, thus this only needs 165 * to be called after {@link #shutdown()}. 166 * 167 * @return <code>true</code> if the queues was shutdown before, <code>false</code> if not. 168 */ 169 public boolean start() { 170 boolean previousIsShutdown; 171 lock.lock(); 172 try { 173 previousIsShutdown = isShutdown; 174 isShutdown = false; 175 } 176 finally { 177 lock.unlock(); 178 } 179 return previousIsShutdown; 180 } 181 182 /** 183 * Returns true if the queue is currently shut down. 184 * 185 * @return true if the queue is shut down. 186 */ 187 public boolean isShutdown() { 188 lock.lock(); 189 try { 190 return isShutdown; 191 } finally { 192 lock.unlock(); 193 } 194 } 195 196 @Override 197 public E poll() { 198 lock.lock(); 199 try { 200 if (hasNoElements()) { 201 return null; 202 } 203 E e = extract(); 204 return e; 205 } 206 finally { 207 lock.unlock(); 208 } 209 } 210 211 @Override 212 public E peek() { 213 lock.lock(); 214 try { 215 return hasNoElements() ? null : items[takeIndex]; 216 } 217 finally { 218 lock.unlock(); 219 } 220 } 221 222 @Override 223 public boolean offer(E e) { 224 checkNotNull(e); 225 lock.lock(); 226 try { 227 if (isFull() || isShutdown) { 228 return false; 229 } 230 else { 231 insert(e); 232 return true; 233 } 234 } 235 finally { 236 lock.unlock(); 237 } 238 } 239 240 public boolean offerAndShutdown(E e) { 241 checkNotNull(e); 242 boolean res; 243 lock.lock(); 244 try { 245 res = offer(e); 246 shutdown(); 247 } finally { 248 lock.unlock(); 249 } 250 return res; 251 } 252 253 private void putInternal(E e, boolean signalNotEmpty) throws InterruptedException { 254 assert lock.isHeldByCurrentThread(); 255 256 while (isFull()) { 257 try { 258 notFull.await(); 259 checkNotShutdown(); 260 } 261 catch (InterruptedException ie) { 262 notFull.signal(); 263 throw ie; 264 } 265 } 266 insert(e, signalNotEmpty); 267 } 268 269 /** 270 * Inserts the specified element into this queue, waiting if necessary 271 * for space to become available. 272 * <p> 273 * This may throw an {@link InterruptedException} in two cases 274 * <ol> 275 * <li>If the queue was shut down.</li> 276 * <li>If the thread was was interrupted.</li> 277 * </ol> 278 * So you have to check which is the case, e.g. by calling {@link #isShutdown()}. 279 * 280 * @param e the element to add. 281 * @throws InterruptedException if interrupted while waiting or if the queue was shut down. 282 */ 283 @Override 284 public void put(E e) throws InterruptedException { 285 checkNotNull(e); 286 lock.lockInterruptibly(); 287 288 try { 289 putInternal(e, true); 290 } 291 finally { 292 lock.unlock(); 293 } 294 } 295 296 /** 297 * Put if the queue has not been shutdown yet. 298 * 299 * @param e the element to put into the queue. 300 * @return <code>true</code> if the element has been put into the queue, <code>false</code> if the queue was shutdown. 301 * @throws InterruptedException if the calling thread was interrupted. 302 * @since 4.4 303 */ 304 public boolean putIfNotShutdown(E e) throws InterruptedException { 305 checkNotNull(e); 306 lock.lockInterruptibly(); 307 308 try { 309 if (isShutdown) { 310 return false; 311 } 312 313 putInternal(e, true); 314 return true; 315 } finally { 316 lock.unlock(); 317 } 318 } 319 320 public void putAll(Collection<? extends E> elements) throws InterruptedException { 321 checkNotNull(elements); 322 lock.lockInterruptibly(); 323 324 try { 325 for (E element : elements) { 326 putInternal(element, false); 327 } 328 } finally { 329 notEmpty.signalAll(); 330 lock.unlock(); 331 } 332 } 333 334 public enum TryPutResult { 335 /** 336 * The method was unable to acquire the queue lock. 337 */ 338 couldNotLock, 339 340 /** 341 * The queue was shut down. 342 */ 343 queueWasShutDown, 344 345 /** 346 * The method was unable to put another element into the queue because the queue was full. 347 */ 348 queueWasFull, 349 350 /** 351 * The element was successfully placed into the queue. 352 */ 353 putSuccessful, 354 } 355 356 public TryPutResult tryPut(E e) { 357 checkNotNull(e); 358 359 boolean locked = lock.tryLock(); 360 if (!locked) { 361 return TryPutResult.couldNotLock; 362 } 363 try { 364 if (isShutdown) { 365 return TryPutResult.queueWasShutDown; 366 } 367 if (isFull()) { 368 return TryPutResult.queueWasFull; 369 } 370 371 insert(e); 372 return TryPutResult.putSuccessful; 373 } finally { 374 lock.unlock(); 375 } 376 } 377 378 @Override 379 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { 380 checkNotNull(e); 381 long nanos = unit.toNanos(timeout); 382 lock.lockInterruptibly(); 383 try { 384 while (true) { 385 if (isNotFull()) { 386 insert(e); 387 return true; 388 } 389 if (nanos <= 0) { 390 return false; 391 } 392 try { 393 nanos = notFull.awaitNanos(nanos); 394 checkNotShutdown(); 395 } 396 catch (InterruptedException ie) { 397 notFull.signal(); 398 throw ie; 399 } 400 } 401 } 402 finally { 403 lock.unlock(); 404 } 405 406 } 407 408 @Override 409 public E take() throws InterruptedException { 410 lock.lockInterruptibly(); 411 try { 412 checkNotShutdown(); 413 try { 414 while (hasNoElements()) { 415 notEmpty.await(); 416 checkNotShutdown(); 417 } 418 } 419 catch (InterruptedException ie) { 420 notEmpty.signal(); 421 throw ie; 422 } 423 E e = extract(); 424 return e; 425 } 426 finally { 427 lock.unlock(); 428 } 429 } 430 431 public enum TryTakeResultCode { 432 /** 433 * The method was unable to acquire the queue lock. 434 */ 435 couldNotLock, 436 437 /** 438 * The queue was shut down. 439 */ 440 queueWasShutDown, 441 442 /** 443 * The queue was empty. 444 */ 445 queueWasEmpty, 446 447 /** 448 * An element was successfully removed from the queue. 449 */ 450 takeSuccessful, 451 } 452 453 public static final class TryTakeResult<E> { 454 private final E element; 455 private final TryTakeResultCode resultCode; 456 457 private TryTakeResult(TryTakeResultCode resultCode) { 458 assert resultCode != null; 459 this.resultCode = resultCode; 460 this.element = null; 461 } 462 463 private TryTakeResult(E element) { 464 assert element != null; 465 this.resultCode = TryTakeResultCode.takeSuccessful; 466 this.element = element; 467 } 468 469 public TryTakeResultCode getResultCode() { 470 return resultCode; 471 } 472 473 public E getElement() { 474 return element; 475 } 476 } 477 478 public TryTakeResult<E> tryTake() { 479 boolean locked = lock.tryLock(); 480 if (!locked) { 481 return new TryTakeResult<E>(TryTakeResultCode.couldNotLock); 482 } 483 try { 484 if (isShutdown) { 485 return new TryTakeResult<E>(TryTakeResultCode.queueWasShutDown); 486 } 487 if (hasNoElements()) { 488 return new TryTakeResult<E>(TryTakeResultCode.queueWasEmpty); 489 } 490 E element = extract(); 491 return new TryTakeResult<E>(element); 492 } finally { 493 lock.unlock(); 494 } 495 } 496 497 @Override 498 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 499 long nanos = unit.toNanos(timeout); 500 lock.lockInterruptibly(); 501 try { 502 checkNotShutdown(); 503 while (true) { 504 if (hasElements()) { 505 E e = extract(); 506 return e; 507 } 508 if (nanos <= 0) { 509 return null; 510 } 511 try { 512 nanos = notEmpty.awaitNanos(nanos); 513 checkNotShutdown(); 514 } 515 catch (InterruptedException ie) { 516 notEmpty.signal(); 517 throw ie; 518 } 519 } 520 } 521 finally { 522 lock.unlock(); 523 } 524 } 525 526 @Override 527 public int remainingCapacity() { 528 lock.lock(); 529 try { 530 return items.length - count; 531 } 532 finally { 533 lock.unlock(); 534 } 535 } 536 537 @Override 538 public int drainTo(Collection<? super E> c) { 539 checkNotNull(c); 540 if (c == this) { 541 throw new IllegalArgumentException(); 542 } 543 lock.lock(); 544 try { 545 int i = takeIndex; 546 int n = 0; 547 for (; n < count; n++) { 548 c.add(items[i]); 549 items[i] = null; 550 i = inc(i); 551 } 552 if (n > 0) { 553 count = 0; 554 putIndex = 0; 555 takeIndex = 0; 556 notFull.signalAll(); 557 } 558 return n; 559 } 560 finally { 561 lock.unlock(); 562 } 563 } 564 565 @Override 566 public int drainTo(Collection<? super E> c, int maxElements) { 567 checkNotNull(c); 568 if (c == this) { 569 throw new IllegalArgumentException(); 570 } 571 if (maxElements <= 0) { 572 return 0; 573 } 574 lock.lock(); 575 try { 576 int i = takeIndex; 577 int n = 0; 578 int max = (maxElements < count) ? maxElements : count; 579 for (; n < max; n++) { 580 c.add(items[i]); 581 items[i] = null; 582 i = inc(i); 583 } 584 if (n > 0) { 585 count -= n; 586 takeIndex = i; 587 notFull.signalAll(); 588 } 589 return n; 590 } 591 finally { 592 lock.unlock(); 593 } 594 } 595 596 @Override 597 public int size() { 598 lock.lock(); 599 try { 600 return count; 601 } 602 finally { 603 lock.unlock(); 604 } 605 } 606 607 @Override 608 public Iterator<E> iterator() { 609 lock.lock(); 610 try { 611 return new Itr(); 612 } 613 finally { 614 lock.unlock(); 615 } 616 } 617 618 private class Itr implements Iterator<E> { 619 private int nextIndex; 620 private E nextItem; 621 private int lastRet; 622 623 Itr() { 624 lastRet = -1; 625 if (count == 0) { 626 nextIndex = -1; 627 } 628 else { 629 nextIndex = takeIndex; 630 nextItem = items[takeIndex]; 631 } 632 } 633 634 @Override 635 public boolean hasNext() { 636 return nextIndex >= 0; 637 } 638 639 private void checkNext() { 640 if (nextIndex == putIndex) { 641 nextIndex = -1; 642 nextItem = null; 643 } 644 else { 645 nextItem = items[nextIndex]; 646 if (nextItem == null) { 647 nextIndex = -1; 648 } 649 } 650 } 651 652 @Override 653 public E next() { 654 lock.lock(); 655 try { 656 if (nextIndex < 0) { 657 throw new NoSuchElementException(); 658 } 659 lastRet = nextIndex; 660 E e = nextItem; 661 nextIndex = inc(nextIndex); 662 checkNext(); 663 return e; 664 } 665 finally { 666 lock.unlock(); 667 } 668 } 669 670 @Override 671 public void remove() { 672 lock.lock(); 673 try { 674 int i = lastRet; 675 if (i < 0) { 676 throw new IllegalStateException(); 677 } 678 lastRet = -1; 679 int ti = takeIndex; 680 removeAt(i); 681 nextIndex = (i == ti) ? takeIndex : i; 682 checkNext(); 683 } 684 finally { 685 lock.unlock(); 686 } 687 } 688 } 689 690}