001/**
002 *
003 * Copyright 2016 Florian Schmaus
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smackx.iot.data;
018
019import java.util.ArrayList;
020import java.util.List;
021import java.util.Map;
022import java.util.WeakHashMap;
023import java.util.concurrent.ConcurrentHashMap;
024import java.util.concurrent.atomic.AtomicInteger;
025import java.util.logging.Level;
026import java.util.logging.Logger;
027
028import org.jivesoftware.smack.ConnectionCreationListener;
029import org.jivesoftware.smack.SmackException.NoResponseException;
030import org.jivesoftware.smack.SmackException.NotConnectedException;
031import org.jivesoftware.smack.StanzaCollector;
032import org.jivesoftware.smack.XMPPConnection;
033import org.jivesoftware.smack.XMPPConnectionRegistry;
034import org.jivesoftware.smack.XMPPException.XMPPErrorException;
035import org.jivesoftware.smack.filter.StanzaFilter;
036import org.jivesoftware.smack.iqrequest.IQRequestHandler.Mode;
037import org.jivesoftware.smack.packet.IQ;
038import org.jivesoftware.smack.packet.Message;
039
040import org.jivesoftware.smackx.iot.IoTManager;
041import org.jivesoftware.smackx.iot.Thing;
042import org.jivesoftware.smackx.iot.data.element.IoTDataField;
043import org.jivesoftware.smackx.iot.data.element.IoTDataReadOutAccepted;
044import org.jivesoftware.smackx.iot.data.element.IoTDataRequest;
045import org.jivesoftware.smackx.iot.data.element.IoTFieldsExtension;
046import org.jivesoftware.smackx.iot.data.filter.IoTFieldsExtensionFilter;
047import org.jivesoftware.smackx.iot.element.NodeInfo;
048
049import org.jxmpp.jid.EntityFullJid;
050
051/**
052 * A manager for XEP-0323: Internet of Things - Sensor Data.
053 *
054 * @author Florian Schmaus {@literal <flo@geekplace.eu>}
055 * @see <a href="http://xmpp.org/extensions/xep-0323.html">XEP-0323: Internet of Things - Sensor Data</a>
056 */
057public final class IoTDataManager extends IoTManager {
058
059    private static final Logger LOGGER = Logger.getLogger(IoTDataManager.class.getName());
060
061    private static final Map<XMPPConnection, IoTDataManager> INSTANCES = new WeakHashMap<>();
062
063    // Ensure a IoTDataManager exists for every connection.
064    static {
065        XMPPConnectionRegistry.addConnectionCreationListener(new ConnectionCreationListener() {
066            @Override
067            public void connectionCreated(XMPPConnection connection) {
068                if (!isAutoEnableActive()) return;
069                getInstanceFor(connection);
070            }
071        });
072    }
073
074    /**
075     * Get the manger instance responsible for the given connection.
076     *
077     * @param connection the XMPP connection.
078     * @return a manager instance.
079     */
080    public static synchronized IoTDataManager getInstanceFor(XMPPConnection connection) {
081        IoTDataManager manager = INSTANCES.get(connection);
082        if (manager == null) {
083            manager = new IoTDataManager(connection);
084            INSTANCES.put(connection, manager);
085        }
086        return manager;
087    }
088
089    private final AtomicInteger nextSeqNr = new AtomicInteger();
090
091    private final Map<NodeInfo, Thing> things = new ConcurrentHashMap<>();
092
093    private IoTDataManager(XMPPConnection connection) {
094        super(connection);
095        connection.registerIQRequestHandler(new IoTIqRequestHandler(IoTDataRequest.ELEMENT,
096                        IoTDataRequest.NAMESPACE, IQ.Type.get, Mode.async) {
097            @Override
098            public IQ handleIoTIqRequest(IQ iqRequest) {
099                final IoTDataRequest dataRequest = (IoTDataRequest) iqRequest;
100
101                if (!dataRequest.isMomentary()) {
102                    // TODO return error IQ that non momentary requests are not implemented yet.
103                    return null;
104                }
105
106                // TODO Add support for multiple things(/NodeInfos).
107                final Thing thing = things.get(NodeInfo.EMPTY);
108                if (thing == null) {
109                    // TODO return error if not at least one thing registered.
110                    return null;
111                }
112
113                ThingMomentaryReadOutRequest readOutRequest = thing.getMomentaryReadOutRequestHandler();
114                if (readOutRequest == null) {
115                    // TODO Thing does not provide momentary read-out
116                    return null;
117                }
118
119                // Callback hell begins here. :) XEP-0323 decouples the read-out results from the IQ result. I'm not
120                // sure if I would have made the same design decision but the reasons where likely being able to get a
121                // fast read-out acknowledgement back to the requester even with sensors that take "a long time" to
122                // read-out their values. I had designed that as special case and made the "results in IQ response" the
123                // normal case.
124                readOutRequest.momentaryReadOutRequest(new ThingMomentaryReadOutResult() {
125                    @Override
126                    public void momentaryReadOut(List<? extends IoTDataField> results) {
127                        IoTFieldsExtension iotFieldsExtension = IoTFieldsExtension.buildFor(dataRequest.getSequenceNr(), true, thing.getNodeInfo(), results);
128
129                        XMPPConnection connection = connection();
130                        Message message = connection.getStanzaFactory().buildMessageStanza()
131                                .to(dataRequest.getFrom())
132                                .addExtension(iotFieldsExtension)
133                                .build();
134                        try {
135                            connection.sendStanza(message);
136                        }
137                        catch (NotConnectedException | InterruptedException e) {
138                            LOGGER.log(Level.SEVERE, "Could not send read-out response " + message, e);
139                        }
140                    }
141                });
142
143                return new IoTDataReadOutAccepted(dataRequest);
144            }
145        });
146    }
147
148    /**
149     * Install a thing in the manager. Activates data read out functionality (if provided by the
150     * thing).
151     *
152     * @param thing the thing to install.
153     */
154    public void installThing(Thing thing) {
155        things.put(thing.getNodeInfo(), thing);
156    }
157
158    public Thing uninstallThing(Thing thing) {
159        return uninstallThing(thing.getNodeInfo());
160    }
161
162    public Thing uninstallThing(NodeInfo nodeInfo) {
163        return things.remove(nodeInfo);
164    }
165
166    /**
167     * Try to read out a things momentary values.
168     *
169     * @param jid the full JID of the thing to read data from.
170     * @return a list with the read out data.
171     * @throws NoResponseException if there was no response from the remote entity.
172     * @throws XMPPErrorException if there was an XMPP error returned.
173     * @throws NotConnectedException if the XMPP connection is not connected.
174     * @throws InterruptedException if the calling thread was interrupted.
175     */
176    public List<IoTFieldsExtension> requestMomentaryValuesReadOut(EntityFullJid jid)
177                    throws NoResponseException, XMPPErrorException, NotConnectedException, InterruptedException {
178        final XMPPConnection connection = connection();
179        final int seqNr = nextSeqNr.incrementAndGet();
180        IoTDataRequest iotDataRequest = new IoTDataRequest(seqNr, true);
181        iotDataRequest.setTo(jid);
182
183        StanzaFilter doneFilter = new IoTFieldsExtensionFilter(seqNr, true);
184        StanzaFilter dataFilter = new IoTFieldsExtensionFilter(seqNr, false);
185
186        // Setup the IoTFieldsExtension message collectors before sending the IQ to avoid a data race.
187        StanzaCollector doneCollector = connection.createStanzaCollector(doneFilter);
188
189        StanzaCollector.Configuration dataCollectorConfiguration = StanzaCollector.newConfiguration().setStanzaFilter(
190                        dataFilter).setCollectorToReset(doneCollector);
191        StanzaCollector dataCollector = connection.createStanzaCollector(dataCollectorConfiguration);
192
193        try {
194            connection.sendIqRequestAndWaitForResponse(iotDataRequest);
195            // Wait until a message with an IoTFieldsExtension and the done flag comes in.
196            doneCollector.nextResult();
197        }
198        finally {
199            // Canceling dataCollector will also cancel the doneCollector since it is configured as dataCollector's
200            // collector to reset.
201            dataCollector.cancel();
202        }
203
204        int collectedCount = dataCollector.getCollectedCount();
205        List<IoTFieldsExtension> res = new ArrayList<>(collectedCount);
206        for (int i = 0; i < collectedCount; i++) {
207            Message message = dataCollector.pollResult();
208            IoTFieldsExtension iotFieldsExtension = IoTFieldsExtension.from(message);
209            res.add(iotFieldsExtension);
210        }
211
212        return res;
213    }
214}