AsyncButOrdered.java

  1. /**
  2.  *
  3.  * Copyright 2018-2019 Florian Schmaus
  4.  *
  5.  * Licensed under the Apache License, Version 2.0 (the "License");
  6.  * you may not use this file except in compliance with the License.
  7.  * You may obtain a copy of the License at
  8.  *
  9.  *     http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */
  17. package org.jivesoftware.smack;

  18. import java.util.HashMap;
  19. import java.util.Map;
  20. import java.util.Queue;
  21. import java.util.WeakHashMap;
  22. import java.util.concurrent.ConcurrentLinkedQueue;
  23. import java.util.concurrent.Executor;

  24. /**
  25.  * Helper class to perform an operation asynchronous but keeping the order in respect to a given key.
  26.  * <p>
  27.  * A typical use pattern for this helper class consists of callbacks for an abstract entity where the order of callbacks
  28.  * matters, which eventually call user code in form of listeners. Since the order the callbacks matters, you need to use
  29.  * synchronous connection listeners. But if those listeners would invoke the user provided listeners, and if those user
  30.  * provided listeners would take a long time to complete, or even worse, block, then Smack's total progress is stalled,
  31.  * since synchronous connection listeners are invoked from the main event loop.
  32.  * </p>
  33.  * <p>
  34.  * It is common for those situations that the order of callbacks is not globally important, but only important in
  35.  * respect to an particular entity. Take chat state notifications (CSN) for example: Assume there are two contacts which
  36.  * send you CSNs. If a contact sends you first 'active' and then 'inactive, it is crucial that first the listener is
  37.  * called with 'active' and afterwards with 'inactive'. But if there is another contact is sending 'composing' followed
  38.  * by 'paused', then it is also important that the listeners are invoked in the correct order, but the order in which
  39.  * the listeners for those two contacts are invoked does not matter.
  40.  * </p>
  41.  * <p>
  42.  * Using this helper class, one would call {@link #performAsyncButOrdered(Object, Runnable)} which the remote contacts
  43.  * JID as first argument and a {@link Runnable} invoking the user listeners as second. This class guarantees that
  44.  * runnables of subsequent invocations are always executed after the runnables of previous invocations using the same
  45.  * key.
  46.  * </p>
  47.  *
  48.  * @param <K> the type of the key
  49.  * @since 4.3
  50.  */
  51. public class AsyncButOrdered<K> {

  52.     /**
  53.      * A map with the currently pending runnables for a given key. Note that this is a weak hash map, so we do not have
  54.      * to take care of removing the keys ourselves from the map.
  55.      */
  56.     private final Map<K, Queue<Runnable>> pendingRunnables = new WeakHashMap<>();

  57.     /**
  58.      * A marker map if there is an active thread for the given key. Holds the responsible handler thread if one is
  59.      * active, otherwise the key is non-existent in the map.
  60.      */
  61.     private final Map<K, Handler> threadActiveMap = new HashMap<>();

  62.     private final Executor executor;

  63.     public AsyncButOrdered() {
  64.         this(null);
  65.     }

  66.     public AsyncButOrdered(Executor executor) {
  67.         this.executor = executor;
  68.     }

  69.     private void scheduleHandler(Handler handler) {
  70.         if (executor == null) {
  71.             AbstractXMPPConnection.asyncGo(handler);
  72.         } else {
  73.             executor.execute(handler);
  74.         }
  75.     }

  76.     /**
  77.      * Invoke the given {@link Runnable} asynchronous but ordered in respect to the given key.
  78.      *
  79.      * @param key the key deriving the order
  80.      * @param runnable the {@link Runnable} to run
  81.      * @return true if a new thread was created
  82.      */
  83.     public boolean performAsyncButOrdered(K key, Runnable runnable) {
  84.         // First check if a key queue already exists, create one if not.
  85.         Queue<Runnable> keyQueue;
  86.         synchronized (pendingRunnables) {
  87.             keyQueue = pendingRunnables.get(key);
  88.             if (keyQueue == null) {
  89.                 keyQueue = new ConcurrentLinkedQueue<>();
  90.                 pendingRunnables.put(key, keyQueue);
  91.             }
  92.         }

  93.         // Then add the task to the queue.
  94.         keyQueue.add(runnable);

  95.         // Finally check if there is already a handler working on that queue, create one if not.
  96.         Handler newlyCreatedHandler = null;
  97.         synchronized (threadActiveMap) {
  98.             if (!threadActiveMap.containsKey(key)) {
  99.                 newlyCreatedHandler = new Handler(keyQueue, key);

  100.                 // Mark that there is thread active for the given key. Note that this has to be done before scheduling
  101.                 // the handler thread.
  102.                 threadActiveMap.put(key, newlyCreatedHandler);
  103.             }
  104.         }

  105.         if (newlyCreatedHandler != null) {
  106.             scheduleHandler(newlyCreatedHandler);
  107.             return true;
  108.         }

  109.         return false;
  110.     }

  111.     public Executor asExecutorFor(final K key) {
  112.         return new Executor() {
  113.             @Override
  114.             public void execute(Runnable runnable) {
  115.                 performAsyncButOrdered(key, runnable);
  116.             }
  117.         };
  118.     }

  119.     private class Handler implements Runnable {
  120.         private final Queue<Runnable> keyQueue;
  121.         private final K key;

  122.         Handler(Queue<Runnable> keyQueue, K key) {
  123.             this.keyQueue = keyQueue;
  124.             this.key = key;
  125.         }

  126.         @Override
  127.         public void run() {
  128.             mainloop:
  129.             while (true) {
  130.                 Runnable runnable = null;
  131.                 while ((runnable = keyQueue.poll()) != null) {
  132.                     try {
  133.                         runnable.run();
  134.                     } catch (Throwable t) {
  135.                         // The run() method threw, this handler thread is going to terminate because of that. We create
  136.                         // a new handler to continue working on the queue while throwing the throwable so that the
  137.                         // executor can handle it.
  138.                         Handler newlyCreatedHandler = new Handler(keyQueue, key);
  139.                         synchronized (threadActiveMap) {
  140.                             threadActiveMap.put(key, newlyCreatedHandler);
  141.                         }
  142.                         scheduleHandler(newlyCreatedHandler);
  143.                         throw t;
  144.                     }
  145.                 }

  146.                 synchronized (threadActiveMap) {
  147.                     // If the queue is empty, stop this handler, otherwise continue looping.
  148.                     if (keyQueue.isEmpty()) {
  149.                         threadActiveMap.remove(key);
  150.                         break mainloop;
  151.                     }
  152.                 }
  153.             }
  154.         }
  155.     }
  156. }