AsyncButOrdered.java
- /**
- *
- * Copyright 2018-2019 Florian Schmaus
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.jivesoftware.smack;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.Queue;
- import java.util.WeakHashMap;
- import java.util.concurrent.ConcurrentLinkedQueue;
- import java.util.concurrent.Executor;
- /**
- * Helper class to perform an operation asynchronous but keeping the order in respect to a given key.
- * <p>
- * A typical use pattern for this helper class consists of callbacks for an abstract entity where the order of callbacks
- * matters, which eventually call user code in form of listeners. Since the order the callbacks matters, you need to use
- * synchronous connection listeners. But if those listeners would invoke the user provided listeners, and if those user
- * provided listeners would take a long time to complete, or even worse, block, then Smack's total progress is stalled,
- * since synchronous connection listeners are invoked from the main event loop.
- * </p>
- * <p>
- * It is common for those situations that the order of callbacks is not globally important, but only important in
- * respect to an particular entity. Take chat state notifications (CSN) for example: Assume there are two contacts which
- * send you CSNs. If a contact sends you first 'active' and then 'inactive, it is crucial that first the listener is
- * called with 'active' and afterwards with 'inactive'. But if there is another contact is sending 'composing' followed
- * by 'paused', then it is also important that the listeners are invoked in the correct order, but the order in which
- * the listeners for those two contacts are invoked does not matter.
- * </p>
- * <p>
- * Using this helper class, one would call {@link #performAsyncButOrdered(Object, Runnable)} which the remote contacts
- * JID as first argument and a {@link Runnable} invoking the user listeners as second. This class guarantees that
- * runnables of subsequent invocations are always executed after the runnables of previous invocations using the same
- * key.
- * </p>
- *
- * @param <K> the type of the key
- * @since 4.3
- */
- public class AsyncButOrdered<K> {
- /**
- * A map with the currently pending runnables for a given key. Note that this is a weak hash map, so we do not have
- * to take care of removing the keys ourselves from the map.
- */
- private final Map<K, Queue<Runnable>> pendingRunnables = new WeakHashMap<>();
- /**
- * A marker map if there is an active thread for the given key. Holds the responsible handler thread if one is
- * active, otherwise the key is non-existent in the map.
- */
- private final Map<K, Handler> threadActiveMap = new HashMap<>();
- private final Executor executor;
- public AsyncButOrdered() {
- this(null);
- }
- public AsyncButOrdered(Executor executor) {
- this.executor = executor;
- }
- private void scheduleHandler(Handler handler) {
- if (executor == null) {
- AbstractXMPPConnection.asyncGo(handler);
- } else {
- executor.execute(handler);
- }
- }
- /**
- * Invoke the given {@link Runnable} asynchronous but ordered in respect to the given key.
- *
- * @param key the key deriving the order
- * @param runnable the {@link Runnable} to run
- * @return true if a new thread was created
- */
- public boolean performAsyncButOrdered(K key, Runnable runnable) {
- // First check if a key queue already exists, create one if not.
- Queue<Runnable> keyQueue;
- synchronized (pendingRunnables) {
- keyQueue = pendingRunnables.get(key);
- if (keyQueue == null) {
- keyQueue = new ConcurrentLinkedQueue<>();
- pendingRunnables.put(key, keyQueue);
- }
- }
- // Then add the task to the queue.
- keyQueue.add(runnable);
- // Finally check if there is already a handler working on that queue, create one if not.
- Handler newlyCreatedHandler = null;
- synchronized (threadActiveMap) {
- if (!threadActiveMap.containsKey(key)) {
- newlyCreatedHandler = new Handler(keyQueue, key);
- // Mark that there is thread active for the given key. Note that this has to be done before scheduling
- // the handler thread.
- threadActiveMap.put(key, newlyCreatedHandler);
- }
- }
- if (newlyCreatedHandler != null) {
- scheduleHandler(newlyCreatedHandler);
- return true;
- }
- return false;
- }
- public Executor asExecutorFor(final K key) {
- return new Executor() {
- @Override
- public void execute(Runnable runnable) {
- performAsyncButOrdered(key, runnable);
- }
- };
- }
- private class Handler implements Runnable {
- private final Queue<Runnable> keyQueue;
- private final K key;
- Handler(Queue<Runnable> keyQueue, K key) {
- this.keyQueue = keyQueue;
- this.key = key;
- }
- @Override
- public void run() {
- mainloop:
- while (true) {
- Runnable runnable = null;
- while ((runnable = keyQueue.poll()) != null) {
- try {
- runnable.run();
- } catch (Throwable t) {
- // The run() method threw, this handler thread is going to terminate because of that. We create
- // a new handler to continue working on the queue while throwing the throwable so that the
- // executor can handle it.
- Handler newlyCreatedHandler = new Handler(keyQueue, key);
- synchronized (threadActiveMap) {
- threadActiveMap.put(key, newlyCreatedHandler);
- }
- scheduleHandler(newlyCreatedHandler);
- throw t;
- }
- }
- synchronized (threadActiveMap) {
- // If the queue is empty, stop this handler, otherwise continue looping.
- if (keyQueue.isEmpty()) {
- threadActiveMap.remove(key);
- break mainloop;
- }
- }
- }
- }
- }
- }