001/** 002 * 003 * Copyright 2014 Florian Schmaus 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack.util; 018 019import java.util.AbstractQueue; 020import java.util.Collection; 021import java.util.Iterator; 022import java.util.NoSuchElementException; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.locks.Condition; 026import java.util.concurrent.locks.ReentrantLock; 027 028/** 029 * Like ArrayBlockingQueue but with additional {@link #shutdown()} and {@link #start} methods. Will 030 * throw {@link InterruptedException} if Queue has been shutdown on {@link #take()} and 031 * {@link #poll(long, TimeUnit)}. 032 * <p> 033 * Based on ArrayBlockingQueue of OpenJDK by Doug Lea (who released ArrayBlockingQueue as public 034 * domain). 035 * 036 * @param <E> the type of elements held in this collection 037 */ 038public class ArrayBlockingQueueWithShutdown<E> extends AbstractQueue<E> implements BlockingQueue<E> { 039 040 private final E[] items; 041 042 private int takeIndex; 043 044 private int putIndex; 045 046 private int count; 047 048 private final ReentrantLock lock; 049 050 private final Condition notEmpty; 051 052 private final Condition notFull; 053 054 private volatile boolean isShutdown = false; 055 056 private final int inc(int i) { 057 return (++i == items.length) ? 0 : i; 058 } 059 060 private final void insert(E e) { 061 items[putIndex] = e; 062 putIndex = inc(putIndex); 063 count++; 064 notEmpty.signal(); 065 } 066 067 private final E extract() { 068 E e = items[takeIndex]; 069 items[takeIndex] = null; 070 takeIndex = inc(takeIndex); 071 count--; 072 notFull.signal(); 073 return e; 074 } 075 076 private final void removeAt(int i) { 077 if (i == takeIndex) { 078 items[takeIndex] = null; 079 takeIndex = inc(takeIndex); 080 } 081 else { 082 while (true) { 083 int nexti = inc(i); 084 if (nexti != putIndex) { 085 items[i] = items[nexti]; 086 i = nexti; 087 } 088 else { 089 items[i] = null; 090 putIndex = i; 091 break; 092 } 093 } 094 } 095 count--; 096 notFull.signal(); 097 } 098 099 private final static void checkNotNull(Object o) { 100 if (o == null) { 101 throw new NullPointerException(); 102 } 103 } 104 105 private final void checkNotShutdown() throws InterruptedException { 106 if (isShutdown) { 107 throw new InterruptedException(); 108 } 109 } 110 111 private final boolean hasNoElements() { 112 return count == 0; 113 } 114 115 private final boolean hasElements() { 116 return !hasNoElements(); 117 } 118 119 private final boolean isFull() { 120 return count == items.length; 121 } 122 123 private final boolean isNotFull() { 124 return !isFull(); 125 } 126 127 public ArrayBlockingQueueWithShutdown(int capacity) { 128 this(capacity, false); 129 } 130 131 @SuppressWarnings("unchecked") 132 public ArrayBlockingQueueWithShutdown(int capacity, boolean fair) { 133 if (capacity <= 0) 134 throw new IllegalArgumentException(); 135 items = (E[]) new Object[capacity]; 136 lock = new ReentrantLock(fair); 137 notEmpty = lock.newCondition(); 138 notFull = lock.newCondition(); 139 } 140 141 /** 142 * Shutdown the Queue. Will method currently waiting for a not full/empty condition will unblock 143 * (and usually throw a InterruptedException). 144 */ 145 public void shutdown() { 146 lock.lock(); 147 try { 148 isShutdown = true; 149 notEmpty.signalAll(); 150 notFull.signalAll(); 151 } 152 finally { 153 lock.unlock(); 154 } 155 } 156 157 /** 158 * Start the queue. Newly created instances will be started automatically, thus this only needs 159 * to be called after {@link #shutdown()}. 160 */ 161 public void start() { 162 lock.lock(); 163 try { 164 isShutdown = false; 165 } 166 finally { 167 lock.unlock(); 168 } 169 } 170 171 /** 172 * Returns true if the queue is currently shut down. 173 * 174 * @return true if the queue is shut down. 175 */ 176 public boolean isShutdown() { 177 lock.lock(); 178 try { 179 return isShutdown; 180 } finally { 181 lock.unlock(); 182 } 183 } 184 185 @Override 186 public E poll() { 187 lock.lock(); 188 try { 189 if (hasNoElements()) { 190 return null; 191 } 192 E e = extract(); 193 return e; 194 } 195 finally { 196 lock.unlock(); 197 } 198 } 199 200 @Override 201 public E peek() { 202 lock.lock(); 203 try { 204 return hasNoElements() ? null : items[takeIndex]; 205 } 206 finally { 207 lock.unlock(); 208 } 209 } 210 211 @Override 212 public boolean offer(E e) { 213 checkNotNull(e); 214 lock.lock(); 215 try { 216 if (isFull() || isShutdown) { 217 return false; 218 } 219 else { 220 insert(e); 221 return true; 222 } 223 } 224 finally { 225 lock.unlock(); 226 } 227 } 228 229 @Override 230 public void put(E e) throws InterruptedException { 231 checkNotNull(e); 232 lock.lockInterruptibly(); 233 234 try { 235 while (isFull()) { 236 try { 237 notFull.await(); 238 checkNotShutdown(); 239 } 240 catch (InterruptedException ie) { 241 notFull.signal(); 242 throw ie; 243 } 244 } 245 insert(e); 246 } 247 finally { 248 lock.unlock(); 249 } 250 } 251 252 @Override 253 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { 254 checkNotNull(e); 255 long nanos = unit.toNanos(timeout); 256 lock.lockInterruptibly(); 257 try { 258 while (true) { 259 if (isNotFull()) { 260 insert(e); 261 return true; 262 } 263 if (nanos <= 0) { 264 return false; 265 } 266 try { 267 nanos = notFull.awaitNanos(nanos); 268 checkNotShutdown(); 269 } 270 catch (InterruptedException ie) { 271 notFull.signal(); 272 throw ie; 273 } 274 } 275 } 276 finally { 277 lock.unlock(); 278 } 279 280 } 281 282 @Override 283 public E take() throws InterruptedException { 284 lock.lockInterruptibly(); 285 try { 286 checkNotShutdown(); 287 try { 288 while (hasNoElements()) { 289 notEmpty.await(); 290 checkNotShutdown(); 291 } 292 } 293 catch (InterruptedException ie) { 294 notEmpty.signal(); 295 throw ie; 296 } 297 E e = extract(); 298 return e; 299 } 300 finally { 301 lock.unlock(); 302 } 303 } 304 305 @Override 306 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 307 long nanos = unit.toNanos(timeout); 308 lock.lockInterruptibly(); 309 try { 310 checkNotShutdown(); 311 while (true) { 312 if (hasElements()) { 313 E e = extract(); 314 return e; 315 } 316 if (nanos <= 0) { 317 return null; 318 } 319 try { 320 nanos = notEmpty.awaitNanos(nanos); 321 checkNotShutdown(); 322 } 323 catch (InterruptedException ie) { 324 notEmpty.signal(); 325 throw ie; 326 } 327 } 328 } 329 finally { 330 lock.unlock(); 331 } 332 } 333 334 @Override 335 public int remainingCapacity() { 336 lock.lock(); 337 try { 338 return items.length - count; 339 } 340 finally { 341 lock.unlock(); 342 } 343 } 344 345 @Override 346 public int drainTo(Collection<? super E> c) { 347 checkNotNull(c); 348 if (c == this) { 349 throw new IllegalArgumentException(); 350 } 351 lock.lock(); 352 try { 353 int i = takeIndex; 354 int n = 0; 355 for (; n < count; n++) { 356 c.add(items[i]); 357 items[i] = null; 358 i = inc(i); 359 } 360 if (n > 0) { 361 count = 0; 362 putIndex = 0; 363 takeIndex = 0; 364 notFull.signalAll(); 365 } 366 return n; 367 } 368 finally { 369 lock.unlock(); 370 } 371 } 372 373 @Override 374 public int drainTo(Collection<? super E> c, int maxElements) { 375 checkNotNull(c); 376 if (c == this) { 377 throw new IllegalArgumentException(); 378 } 379 if (maxElements <= 0) { 380 return 0; 381 } 382 lock.lock(); 383 try { 384 int i = takeIndex; 385 int n = 0; 386 int max = (maxElements < count) ? maxElements : count; 387 for (; n < max; n++) { 388 c.add(items[i]); 389 items[i] = null; 390 i = inc(i); 391 } 392 if (n > 0) { 393 count -= n; 394 takeIndex = i; 395 notFull.signalAll(); 396 } 397 return n; 398 } 399 finally { 400 lock.unlock(); 401 } 402 } 403 404 @Override 405 public int size() { 406 lock.lock(); 407 try { 408 return count; 409 } 410 finally { 411 lock.unlock(); 412 } 413 } 414 415 @Override 416 public Iterator<E> iterator() { 417 lock.lock(); 418 try { 419 return new Itr(); 420 } 421 finally { 422 lock.unlock(); 423 } 424 } 425 426 private class Itr implements Iterator<E> { 427 private int nextIndex; 428 private E nextItem; 429 private int lastRet; 430 431 Itr() { 432 lastRet = -1; 433 if (count == 0) { 434 nextIndex = -1; 435 } 436 else { 437 nextIndex = takeIndex; 438 nextItem = items[takeIndex]; 439 } 440 } 441 442 public boolean hasNext() { 443 return nextIndex >= 0; 444 } 445 446 private void checkNext() { 447 if (nextIndex == putIndex) { 448 nextIndex = -1; 449 nextItem = null; 450 } 451 else { 452 nextItem = items[nextIndex]; 453 if (nextItem == null) { 454 nextIndex = -1; 455 } 456 } 457 } 458 459 public E next() { 460 lock.lock(); 461 try { 462 if (nextIndex < 0) { 463 throw new NoSuchElementException(); 464 } 465 lastRet = nextIndex; 466 E e = nextItem; 467 nextIndex = inc(nextIndex); 468 checkNext(); 469 return e; 470 } 471 finally { 472 lock.unlock(); 473 } 474 } 475 476 public void remove() { 477 lock.lock(); 478 try { 479 int i = lastRet; 480 if (i < 0) { 481 throw new IllegalStateException(); 482 } 483 lastRet = -1; 484 int ti = takeIndex; 485 removeAt(i); 486 nextIndex = (i == ti) ? takeIndex : i; 487 checkNext(); 488 } 489 finally { 490 lock.unlock(); 491 } 492 } 493 } 494 495}