001/** 002 * 003 * Copyright 2018-2019 Florian Schmaus 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack; 018 019import java.util.HashMap; 020import java.util.Map; 021import java.util.Queue; 022import java.util.WeakHashMap; 023import java.util.concurrent.ConcurrentLinkedQueue; 024import java.util.concurrent.Executor; 025 026/** 027 * Helper class to perform an operation asynchronous but keeping the order in respect to a given key. 028 * <p> 029 * A typical use pattern for this helper class consists of callbacks for an abstract entity where the order of callbacks 030 * matters, which eventually call user code in form of listeners. Since the order the callbacks matters, you need to use 031 * synchronous connection listeners. But if those listeners would invoke the user provided listeners, and if those user 032 * provided listeners would take a long time to complete, or even worse, block, then Smack's total progress is stalled, 033 * since synchronous connection listeners are invoked from the main event loop. 034 * </p> 035 * <p> 036 * It is common for those situations that the order of callbacks is not globally important, but only important in 037 * respect to an particular entity. Take chat state notifications (CSN) for example: Assume there are two contacts which 038 * send you CSNs. If a contact sends you first 'active' and then 'inactive, it is crucial that first the listener is 039 * called with 'active' and afterwards with 'inactive'. But if there is another contact is sending 'composing' followed 040 * by 'paused', then it is also important that the listeners are invoked in the correct order, but the order in which 041 * the listeners for those two contacts are invoked does not matter. 042 * </p> 043 * <p> 044 * Using this helper class, one would call {@link #performAsyncButOrdered(Object, Runnable)} which the remote contacts 045 * JID as first argument and a {@link Runnable} invoking the user listeners as second. This class guarantees that 046 * runnables of subsequent invocations are always executed after the runnables of previous invocations using the same 047 * key. 048 * </p> 049 * 050 * @param <K> the type of the key 051 * @since 4.3 052 */ 053public class AsyncButOrdered<K> { 054 055 /** 056 * A map with the currently pending runnables for a given key. Note that this is a weak hash map so we do not have 057 * to take care of removing the keys ourselfs from the map. 058 */ 059 private final Map<K, Queue<Runnable>> pendingRunnables = new WeakHashMap<>(); 060 061 /** 062 * A marker map if there is an active thread for the given key. Holds the responsible handler thread if one is 063 * active, otherwise the key is non-existend in the map. 064 */ 065 private final Map<K, Handler> threadActiveMap = new HashMap<>(); 066 067 private final Executor executor; 068 069 public AsyncButOrdered() { 070 this(null); 071 } 072 073 public AsyncButOrdered(Executor executor) { 074 this.executor = executor; 075 } 076 077 private void scheduleHandler(Handler handler) { 078 if (executor == null) { 079 AbstractXMPPConnection.asyncGo(handler); 080 } else { 081 executor.execute(handler); 082 } 083 } 084 085 /** 086 * Invoke the given {@link Runnable} asynchronous but ordered in respect to the given key. 087 * 088 * @param key the key deriving the order 089 * @param runnable the {@link Runnable} to run 090 * @return true if a new thread was created 091 */ 092 public boolean performAsyncButOrdered(K key, Runnable runnable) { 093 // First check if a key queue already exists, create one if not. 094 Queue<Runnable> keyQueue; 095 synchronized (pendingRunnables) { 096 keyQueue = pendingRunnables.get(key); 097 if (keyQueue == null) { 098 keyQueue = new ConcurrentLinkedQueue<>(); 099 pendingRunnables.put(key, keyQueue); 100 } 101 } 102 103 // Then add the task to the queue. 104 keyQueue.add(runnable); 105 106 // Finally check if there is already a handler working on that queue, create one if not. 107 Handler newlyCreatedHandler = null; 108 synchronized (threadActiveMap) { 109 if (!threadActiveMap.containsKey(key)) { 110 newlyCreatedHandler = new Handler(keyQueue, key); 111 112 // Mark that there is thread active for the given key. Note that this has to be done before scheduling 113 // the handler thread. 114 threadActiveMap.put(key, newlyCreatedHandler); 115 } 116 } 117 118 if (newlyCreatedHandler != null) { 119 scheduleHandler(newlyCreatedHandler); 120 return true; 121 } 122 123 return false; 124 } 125 126 public Executor asExecutorFor(final K key) { 127 return new Executor() { 128 @Override 129 public void execute(Runnable runnable) { 130 performAsyncButOrdered(key, runnable); 131 } 132 }; 133 } 134 135 private class Handler implements Runnable { 136 private final Queue<Runnable> keyQueue; 137 private final K key; 138 139 Handler(Queue<Runnable> keyQueue, K key) { 140 this.keyQueue = keyQueue; 141 this.key = key; 142 } 143 144 @Override 145 public void run() { 146 mainloop: 147 while (true) { 148 Runnable runnable = null; 149 while ((runnable = keyQueue.poll()) != null) { 150 try { 151 runnable.run(); 152 } catch (Throwable t) { 153 // The run() method threw, this handler thread is going to terminate because of that. We create 154 // a new handler to continue working on the queue while throwing the throwable so that the 155 // executor can handle it. 156 Handler newlyCreatedHandler = new Handler(keyQueue, key); 157 synchronized (threadActiveMap) { 158 threadActiveMap.put(key, newlyCreatedHandler); 159 } 160 scheduleHandler(newlyCreatedHandler); 161 throw t; 162 } 163 } 164 165 synchronized (threadActiveMap) { 166 // If the queue is empty, stop this handler, otherwise continue looping. 167 if (keyQueue.isEmpty()) { 168 threadActiveMap.remove(key); 169 break mainloop; 170 } 171 } 172 } 173 } 174 } 175}