IoTDataManager.java
- /**
- *
- * Copyright 2016 Florian Schmaus
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.jivesoftware.smackx.iot.data;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
- import java.util.WeakHashMap;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.atomic.AtomicInteger;
- import java.util.logging.Level;
- import java.util.logging.Logger;
- import org.jivesoftware.smack.ConnectionCreationListener;
- import org.jivesoftware.smack.SmackException.NoResponseException;
- import org.jivesoftware.smack.SmackException.NotConnectedException;
- import org.jivesoftware.smack.StanzaCollector;
- import org.jivesoftware.smack.XMPPConnection;
- import org.jivesoftware.smack.XMPPConnectionRegistry;
- import org.jivesoftware.smack.XMPPException.XMPPErrorException;
- import org.jivesoftware.smack.filter.StanzaFilter;
- import org.jivesoftware.smack.iqrequest.IQRequestHandler.Mode;
- import org.jivesoftware.smack.packet.IQ;
- import org.jivesoftware.smack.packet.Message;
- import org.jivesoftware.smackx.iot.IoTManager;
- import org.jivesoftware.smackx.iot.Thing;
- import org.jivesoftware.smackx.iot.data.element.IoTDataField;
- import org.jivesoftware.smackx.iot.data.element.IoTDataReadOutAccepted;
- import org.jivesoftware.smackx.iot.data.element.IoTDataRequest;
- import org.jivesoftware.smackx.iot.data.element.IoTFieldsExtension;
- import org.jivesoftware.smackx.iot.data.filter.IoTFieldsExtensionFilter;
- import org.jivesoftware.smackx.iot.element.NodeInfo;
- import org.jxmpp.jid.EntityFullJid;
- /**
- * A manager for XEP-0323: Internet of Things - Sensor Data.
- *
- * @author Florian Schmaus {@literal <flo@geekplace.eu>}
- * @see <a href="http://xmpp.org/extensions/xep-0323.html">XEP-0323: Internet of Things - Sensor Data</a>
- */
- public final class IoTDataManager extends IoTManager {
- private static final Logger LOGGER = Logger.getLogger(IoTDataManager.class.getName());
- private static final Map<XMPPConnection, IoTDataManager> INSTANCES = new WeakHashMap<>();
- // Ensure a IoTDataManager exists for every connection.
- static {
- XMPPConnectionRegistry.addConnectionCreationListener(new ConnectionCreationListener() {
- @Override
- public void connectionCreated(XMPPConnection connection) {
- if (!isAutoEnableActive()) return;
- getInstanceFor(connection);
- }
- });
- }
- /**
- * Get the manger instance responsible for the given connection.
- *
- * @param connection the XMPP connection.
- * @return a manager instance.
- */
- public static synchronized IoTDataManager getInstanceFor(XMPPConnection connection) {
- IoTDataManager manager = INSTANCES.get(connection);
- if (manager == null) {
- manager = new IoTDataManager(connection);
- INSTANCES.put(connection, manager);
- }
- return manager;
- }
- private final AtomicInteger nextSeqNr = new AtomicInteger();
- private final Map<NodeInfo, Thing> things = new ConcurrentHashMap<>();
- private IoTDataManager(XMPPConnection connection) {
- super(connection);
- connection.registerIQRequestHandler(new IoTIqRequestHandler(IoTDataRequest.ELEMENT,
- IoTDataRequest.NAMESPACE, IQ.Type.get, Mode.async) {
- @Override
- public IQ handleIoTIqRequest(IQ iqRequest) {
- final IoTDataRequest dataRequest = (IoTDataRequest) iqRequest;
- if (!dataRequest.isMomentary()) {
- // TODO return error IQ that non momentary requests are not implemented yet.
- return null;
- }
- // TODO Add support for multiple things(/NodeInfos).
- final Thing thing = things.get(NodeInfo.EMPTY);
- if (thing == null) {
- // TODO return error if not at least one thing registered.
- return null;
- }
- ThingMomentaryReadOutRequest readOutRequest = thing.getMomentaryReadOutRequestHandler();
- if (readOutRequest == null) {
- // TODO Thing does not provide momentary read-out
- return null;
- }
- // Callback hell begins here. :) XEP-0323 decouples the read-out results from the IQ result. I'm not
- // sure if I would have made the same design decision but the reasons where likely being able to get a
- // fast read-out acknowledgement back to the requester even with sensors that take "a long time" to
- // read-out their values. I had designed that as special case and made the "results in IQ response" the
- // normal case.
- readOutRequest.momentaryReadOutRequest(new ThingMomentaryReadOutResult() {
- @Override
- public void momentaryReadOut(List<? extends IoTDataField> results) {
- IoTFieldsExtension iotFieldsExtension = IoTFieldsExtension.buildFor(dataRequest.getSequenceNr(), true, thing.getNodeInfo(), results);
- XMPPConnection connection = connection();
- Message message = connection.getStanzaFactory().buildMessageStanza()
- .to(dataRequest.getFrom())
- .addExtension(iotFieldsExtension)
- .build();
- try {
- connection.sendStanza(message);
- }
- catch (NotConnectedException | InterruptedException e) {
- LOGGER.log(Level.SEVERE, "Could not send read-out response " + message, e);
- }
- }
- });
- return new IoTDataReadOutAccepted(dataRequest);
- }
- });
- }
- /**
- * Install a thing in the manager. Activates data read out functionality (if provided by the
- * thing).
- *
- * @param thing the thing to install.
- */
- public void installThing(Thing thing) {
- things.put(thing.getNodeInfo(), thing);
- }
- public Thing uninstallThing(Thing thing) {
- return uninstallThing(thing.getNodeInfo());
- }
- public Thing uninstallThing(NodeInfo nodeInfo) {
- return things.remove(nodeInfo);
- }
- /**
- * Try to read out a things momentary values.
- *
- * @param jid the full JID of the thing to read data from.
- * @return a list with the read out data.
- * @throws NoResponseException if there was no response from the remote entity.
- * @throws XMPPErrorException if there was an XMPP error returned.
- * @throws NotConnectedException if the XMPP connection is not connected.
- * @throws InterruptedException if the calling thread was interrupted.
- */
- public List<IoTFieldsExtension> requestMomentaryValuesReadOut(EntityFullJid jid)
- throws NoResponseException, XMPPErrorException, NotConnectedException, InterruptedException {
- final XMPPConnection connection = connection();
- final int seqNr = nextSeqNr.incrementAndGet();
- IoTDataRequest iotDataRequest = new IoTDataRequest(seqNr, true);
- iotDataRequest.setTo(jid);
- StanzaFilter doneFilter = new IoTFieldsExtensionFilter(seqNr, true);
- StanzaFilter dataFilter = new IoTFieldsExtensionFilter(seqNr, false);
- // Setup the IoTFieldsExtension message collectors before sending the IQ to avoid a data race.
- StanzaCollector doneCollector = connection.createStanzaCollector(doneFilter);
- StanzaCollector.Configuration dataCollectorConfiguration = StanzaCollector.newConfiguration().setStanzaFilter(
- dataFilter).setCollectorToReset(doneCollector);
- StanzaCollector dataCollector = connection.createStanzaCollector(dataCollectorConfiguration);
- try {
- connection.sendIqRequestAndWaitForResponse(iotDataRequest);
- // Wait until a message with an IoTFieldsExtension and the done flag comes in.
- doneCollector.nextResult();
- }
- finally {
- // Canceling dataCollector will also cancel the doneCollector since it is configured as dataCollector's
- // collector to reset.
- dataCollector.cancel();
- }
- int collectedCount = dataCollector.getCollectedCount();
- List<IoTFieldsExtension> res = new ArrayList<>(collectedCount);
- for (int i = 0; i < collectedCount; i++) {
- Message message = dataCollector.pollResult();
- IoTFieldsExtension iotFieldsExtension = IoTFieldsExtension.from(message);
- res.add(iotFieldsExtension);
- }
- return res;
- }
- }