001/**
002 *
003 * Copyright 2018-2019 Florian Schmaus
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack;
018
019import java.util.HashMap;
020import java.util.Map;
021import java.util.Queue;
022import java.util.WeakHashMap;
023import java.util.concurrent.ConcurrentLinkedQueue;
024import java.util.concurrent.Executor;
025
026/**
027 * Helper class to perform an operation asynchronous but keeping the order in respect to a given key.
028 * <p>
029 * A typical use pattern for this helper class consists of callbacks for an abstract entity where the order of callbacks
030 * matters, which eventually call user code in form of listeners. Since the order the callbacks matters, you need to use
031 * synchronous connection listeners. But if those listeners would invoke the user provided listeners, and if those user
032 * provided listeners would take a long time to complete, or even worse, block, then Smack's total progress is stalled,
033 * since synchronous connection listeners are invoked from the main event loop.
034 * </p>
035 * <p>
036 * It is common for those situations that the order of callbacks is not globally important, but only important in
037 * respect to an particular entity. Take chat state notifications (CSN) for example: Assume there are two contacts which
038 * send you CSNs. If a contact sends you first 'active' and then 'inactive, it is crucial that first the listener is
039 * called with 'active' and afterwards with 'inactive'. But if there is another contact is sending 'composing' followed
040 * by 'paused', then it is also important that the listeners are invoked in the correct order, but the order in which
041 * the listeners for those two contacts are invoked does not matter.
042 * </p>
043 * <p>
044 * Using this helper class, one would call {@link #performAsyncButOrdered(Object, Runnable)} which the remote contacts
045 * JID as first argument and a {@link Runnable} invoking the user listeners as second. This class guarantees that
046 * runnables of subsequent invocations are always executed after the runnables of previous invocations using the same
047 * key.
048 * </p>
049 *
050 * @param <K> the type of the key
051 * @since 4.3
052 */
053public class AsyncButOrdered<K> {
054
055    /**
056     * A map with the currently pending runnables for a given key. Note that this is a weak hash map so we do not have
057     * to take care of removing the keys ourselfs from the map.
058     */
059    private final Map<K, Queue<Runnable>> pendingRunnables = new WeakHashMap<>();
060
061    /**
062     * A marker map if there is an active thread for the given key. Holds the responsible handler thread if one is
063     * active, otherwise the key is non-existend in the map.
064     */
065    private final Map<K, Handler> threadActiveMap = new HashMap<>();
066
067    private final Executor executor;
068
069    public AsyncButOrdered() {
070        this(null);
071    }
072
073    public AsyncButOrdered(Executor executor) {
074        this.executor = executor;
075    }
076
077    private void scheduleHandler(Handler handler) {
078        if (executor == null) {
079            AbstractXMPPConnection.asyncGo(handler);
080        } else {
081            executor.execute(handler);
082        }
083    }
084
085    /**
086     * Invoke the given {@link Runnable} asynchronous but ordered in respect to the given key.
087     *
088     * @param key the key deriving the order
089     * @param runnable the {@link Runnable} to run
090     * @return true if a new thread was created
091     */
092    public boolean performAsyncButOrdered(K key, Runnable runnable) {
093        // First check if a key queue already exists, create one if not.
094        Queue<Runnable> keyQueue;
095        synchronized (pendingRunnables) {
096            keyQueue = pendingRunnables.get(key);
097            if (keyQueue == null) {
098                keyQueue = new ConcurrentLinkedQueue<>();
099                pendingRunnables.put(key, keyQueue);
100            }
101        }
102
103        // Then add the task to the queue.
104        keyQueue.add(runnable);
105
106        // Finally check if there is already a handler working on that queue, create one if not.
107        Handler newlyCreatedHandler = null;
108        synchronized (threadActiveMap) {
109            if (!threadActiveMap.containsKey(key)) {
110                newlyCreatedHandler = new Handler(keyQueue, key);
111
112                // Mark that there is thread active for the given key. Note that this has to be done before scheduling
113                // the handler thread.
114                threadActiveMap.put(key, newlyCreatedHandler);
115            }
116        }
117
118        if (newlyCreatedHandler != null) {
119            scheduleHandler(newlyCreatedHandler);
120            return true;
121        }
122
123        return false;
124    }
125
126    public Executor asExecutorFor(final K key) {
127        return new Executor() {
128            @Override
129            public void execute(Runnable runnable) {
130                performAsyncButOrdered(key, runnable);
131            }
132        };
133    }
134
135    private class Handler implements Runnable {
136        private final Queue<Runnable> keyQueue;
137        private final K key;
138
139        Handler(Queue<Runnable> keyQueue, K key) {
140            this.keyQueue = keyQueue;
141            this.key = key;
142        }
143
144        @Override
145        public void run() {
146            mainloop:
147            while (true) {
148                Runnable runnable = null;
149                while ((runnable = keyQueue.poll()) != null) {
150                    try {
151                        runnable.run();
152                    } catch (Throwable t) {
153                        // The run() method threw, this handler thread is going to terminate because of that. We create
154                        // a new handler to continue working on the queue while throwing the throwable so that the
155                        // executor can handle it.
156                        Handler newlyCreatedHandler = new Handler(keyQueue, key);
157                        synchronized (threadActiveMap) {
158                            threadActiveMap.put(key, newlyCreatedHandler);
159                        }
160                        scheduleHandler(newlyCreatedHandler);
161                        throw t;
162                    }
163                }
164
165                synchronized (threadActiveMap) {
166                    // If the queue is empty, stop this handler, otherwise continue looping.
167                    if (keyQueue.isEmpty()) {
168                        threadActiveMap.remove(key);
169                        break mainloop;
170                    }
171                }
172            }
173        }
174    }
175}