001/**
002 *
003 * Copyright 2018 Florian Schmaus
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack;
018
019import java.util.Map;
020import java.util.Queue;
021import java.util.WeakHashMap;
022import java.util.concurrent.ConcurrentLinkedQueue;
023import java.util.concurrent.Executor;
024
025/**
026 * Helper class to perform an operation asynchronous but keeping the order in respect to a given key.
027 * <p>
028 * A typical use pattern for this helper class consists of callbacks for an abstract entity where the order of callbacks
029 * matters, which eventually call user code in form of listeners. Since the order the callbacks matters, you need to use
030 * synchronous connection listeners. But if those listeners would invoke the user provided listeners, and if those user
031 * provided listeners would take a long time to complete, or even worse, block, then Smack's total progress is stalled,
032 * since synchronous connection listeners are invoked from the main event loop.
033 * </p>
034 * <p>
035 * It is common for those situations that the order of callbacks is not globally important, but only important in
036 * respect to an particular entity. Take chat state notifications (CSN) for example: Assume there are two contacts which
037 * send you CSNs. If a contact sends you first 'active' and then 'inactive, it is crucial that first the listener is
038 * called with 'active' and afterwards with 'inactive'. But if there is another contact is sending 'composing' followed
039 * by 'paused', then it is also important that the listeners are invoked in the correct order, but the order in which
040 * the listeners for those two contacts are invoked does not matter.
041 * </p>
042 * <p>
043 * Using this helper class, one would call {@link #performAsyncButOrdered(Object, Runnable)} which the remote contacts
044 * JID as first argument and a {@link Runnable} invoking the user listeners as second. This class guarantees that
045 * runnables of subsequent invocations are always executed after the runnables of previous invocations using the same
046 * key.
047 * </p>
048 *
049 * @param <K> the type of the key
050 * @since 4.3
051 */
052public class AsyncButOrdered<K> {
053
054    private final Map<K, Queue<Runnable>> pendingRunnables = new WeakHashMap<>();
055
056    private final Map<K, Boolean> threadActiveMap = new WeakHashMap<>();
057
058    /**
059     * Invoke the given {@link Runnable} asynchronous but ordered in respect to the given key.
060     *
061     * @param key the key deriving the order
062     * @param runnable the {@link Runnable} to run
063     * @return true if a new thread was created
064     */
065    public boolean performAsyncButOrdered(K key, Runnable runnable) {
066        Queue<Runnable> keyQueue;
067        synchronized (pendingRunnables) {
068            keyQueue = pendingRunnables.get(key);
069            if (keyQueue == null) {
070                keyQueue = new ConcurrentLinkedQueue<>();
071                pendingRunnables.put(key, keyQueue);
072            }
073        }
074
075        keyQueue.add(runnable);
076
077        boolean newHandler;
078        synchronized (threadActiveMap) {
079            Boolean threadActive = threadActiveMap.get(key);
080            if (threadActive == null) {
081                threadActive = false;
082                threadActiveMap.put(key, threadActive);
083            }
084
085            newHandler = !threadActive;
086            if (newHandler) {
087                Handler handler = new Handler(keyQueue, key);
088                threadActiveMap.put(key, true);
089                AbstractXMPPConnection.asyncGo(handler);
090            }
091        }
092
093        return newHandler;
094    }
095
096    public Executor asExecutorFor(final K key) {
097        return new Executor() {
098            @Override
099            public void execute(Runnable runnable) {
100                performAsyncButOrdered(key, runnable);
101            }
102        };
103    }
104
105    private class Handler implements Runnable {
106        private final Queue<Runnable> keyQueue;
107        private final K key;
108
109        Handler(Queue<Runnable> keyQueue, K key) {
110            this.keyQueue = keyQueue;
111            this.key = key;
112        }
113
114        @Override
115        public void run() {
116            mainloop:
117            while (true) {
118                Runnable runnable = null;
119                while ((runnable = keyQueue.poll()) != null) {
120                    try {
121                        runnable.run();
122                    } catch (Throwable t) {
123                        // The run() method threw, this handler thread is going to terminate because of that. Ensure we note
124                        // that in the map.
125                        synchronized (threadActiveMap) {
126                            threadActiveMap.put(key, false);
127                        }
128                        throw t;
129                    }
130                }
131
132                synchronized (threadActiveMap) {
133                    // If the queue is empty, stop this handler, otherwise continue looping.
134                    if (keyQueue.isEmpty()) {
135                        threadActiveMap.put(key, false);
136                        break mainloop;
137                    }
138                }
139            }
140        }
141    }
142}