001/** 002 * 003 * Copyright 2018 Florian Schmaus 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack; 018 019import java.util.Map; 020import java.util.Queue; 021import java.util.WeakHashMap; 022import java.util.concurrent.ConcurrentLinkedQueue; 023import java.util.concurrent.Executor; 024 025/** 026 * Helper class to perform an operation asynchronous but keeping the order in respect to a given key. 027 * <p> 028 * A typical use pattern for this helper class consists of callbacks for an abstract entity where the order of callbacks 029 * matters, which eventually call user code in form of listeners. Since the order the callbacks matters, you need to use 030 * synchronous connection listeners. But if those listeners would invoke the user provided listeners, and if those user 031 * provided listeners would take a long time to complete, or even worse, block, then Smack's total progress is stalled, 032 * since synchronous connection listeners are invoked from the main event loop. 033 * </p> 034 * <p> 035 * It is common for those situations that the order of callbacks is not globally important, but only important in 036 * respect to an particular entity. Take chat state notifications (CSN) for example: Assume there are two contacts which 037 * send you CSNs. If a contact sends you first 'active' and then 'inactive, it is crucial that first the listener is 038 * called with 'active' and afterwards with 'inactive'. But if there is another contact is sending 'composing' followed 039 * by 'paused', then it is also important that the listeners are invoked in the correct order, but the order in which 040 * the listeners for those two contacts are invoked does not matter. 041 * </p> 042 * <p> 043 * Using this helper class, one would call {@link #performAsyncButOrdered(Object, Runnable)} which the remote contacts 044 * JID as first argument and a {@link Runnable} invoking the user listeners as second. This class guarantees that 045 * runnables of subsequent invocations are always executed after the runnables of previous invocations using the same 046 * key. 047 * </p> 048 * 049 * @param <K> the type of the key 050 * @since 4.3 051 */ 052public class AsyncButOrdered<K> { 053 054 private final Map<K, Queue<Runnable>> pendingRunnables = new WeakHashMap<>(); 055 056 private final Map<K, Boolean> threadActiveMap = new WeakHashMap<>(); 057 058 /** 059 * Invoke the given {@link Runnable} asynchronous but ordered in respect to the given key. 060 * 061 * @param key the key deriving the order 062 * @param runnable the {@link Runnable} to run 063 * @return true if a new thread was created 064 */ 065 public boolean performAsyncButOrdered(K key, Runnable runnable) { 066 Queue<Runnable> keyQueue; 067 synchronized (pendingRunnables) { 068 keyQueue = pendingRunnables.get(key); 069 if (keyQueue == null) { 070 keyQueue = new ConcurrentLinkedQueue<>(); 071 pendingRunnables.put(key, keyQueue); 072 } 073 } 074 075 keyQueue.add(runnable); 076 077 boolean newHandler; 078 synchronized (threadActiveMap) { 079 Boolean threadActive = threadActiveMap.get(key); 080 if (threadActive == null) { 081 threadActive = false; 082 threadActiveMap.put(key, threadActive); 083 } 084 085 newHandler = !threadActive; 086 if (newHandler) { 087 Handler handler = new Handler(keyQueue, key); 088 threadActiveMap.put(key, true); 089 AbstractXMPPConnection.asyncGo(handler); 090 } 091 } 092 093 return newHandler; 094 } 095 096 public Executor asExecutorFor(final K key) { 097 return new Executor() { 098 @Override 099 public void execute(Runnable runnable) { 100 performAsyncButOrdered(key, runnable); 101 } 102 }; 103 } 104 105 private class Handler implements Runnable { 106 private final Queue<Runnable> keyQueue; 107 private final K key; 108 109 Handler(Queue<Runnable> keyQueue, K key) { 110 this.keyQueue = keyQueue; 111 this.key = key; 112 } 113 114 @Override 115 public void run() { 116 mainloop: 117 while (true) { 118 Runnable runnable = null; 119 while ((runnable = keyQueue.poll()) != null) { 120 try { 121 runnable.run(); 122 } catch (Throwable t) { 123 // The run() method threw, this handler thread is going to terminate because of that. Ensure we note 124 // that in the map. 125 synchronized (threadActiveMap) { 126 threadActiveMap.put(key, false); 127 } 128 throw t; 129 } 130 } 131 132 synchronized (threadActiveMap) { 133 // If the queue is empty, stop this handler, otherwise continue looping. 134 if (keyQueue.isEmpty()) { 135 threadActiveMap.put(key, false); 136 break mainloop; 137 } 138 } 139 } 140 } 141 } 142}