001/**
002 *
003 * Copyright 2015 Florian Schmaus
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack.util;
018
019import java.util.concurrent.ArrayBlockingQueue;
020import java.util.concurrent.RejectedExecutionException;
021import java.util.concurrent.Semaphore;
022import java.util.concurrent.ThreadFactory;
023import java.util.concurrent.ThreadPoolExecutor;
024import java.util.concurrent.TimeUnit;
025
026public class BoundedThreadPoolExecutor extends ThreadPoolExecutor {
027
028    private final Semaphore semaphore;
029
030    public BoundedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
031                    TimeUnit unit, int bound, ThreadFactory threadFactory) {
032        // One could think that the array blocking queue bound should be "bound - 1" because the bound protected by the
033        // Semaphore also includes the "slot" in the worker thread executing the Runnable. But using that as bound could
034        // actually cause a RejectedExecutionException as the queue could fill up while the worker thread remains
035        // unscheduled and is thus not removing any entries.
036        super(corePoolSize, maximumPoolSize, keepAliveTime,
037                        unit, new ArrayBlockingQueue<Runnable>(bound), threadFactory);
038        semaphore = new Semaphore(bound);
039    }
040
041    public void executeBlocking(final Runnable command) throws InterruptedException {
042        semaphore.acquire();
043        try {
044            execute(new Runnable() {
045                @Override
046                public void run() {
047                    try {
048                        command.run();
049                    } finally {
050                        semaphore.release();
051                    }
052                }
053            });
054        } catch (Exception e) {
055            semaphore.release();
056            if (e instanceof RejectedExecutionException) {
057                throw (RejectedExecutionException) e;
058            } else {
059                throw new RuntimeException(e);
060            }
061        }
062    }
063}