IoTDataManager.java

  1. /**
  2.  *
  3.  * Copyright 2016 Florian Schmaus
  4.  *
  5.  * Licensed under the Apache License, Version 2.0 (the "License");
  6.  * you may not use this file except in compliance with the License.
  7.  * You may obtain a copy of the License at
  8.  *
  9.  *     http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */
  17. package org.jivesoftware.smackx.iot.data;

  18. import java.util.ArrayList;
  19. import java.util.List;
  20. import java.util.Map;
  21. import java.util.WeakHashMap;
  22. import java.util.concurrent.ConcurrentHashMap;
  23. import java.util.concurrent.atomic.AtomicInteger;
  24. import java.util.logging.Level;
  25. import java.util.logging.Logger;

  26. import org.jivesoftware.smack.ConnectionCreationListener;
  27. import org.jivesoftware.smack.SmackException.NoResponseException;
  28. import org.jivesoftware.smack.SmackException.NotConnectedException;
  29. import org.jivesoftware.smack.StanzaCollector;
  30. import org.jivesoftware.smack.XMPPConnection;
  31. import org.jivesoftware.smack.XMPPConnectionRegistry;
  32. import org.jivesoftware.smack.XMPPException.XMPPErrorException;
  33. import org.jivesoftware.smack.filter.StanzaFilter;
  34. import org.jivesoftware.smack.iqrequest.IQRequestHandler.Mode;
  35. import org.jivesoftware.smack.packet.IQ;
  36. import org.jivesoftware.smack.packet.Message;

  37. import org.jivesoftware.smackx.iot.IoTManager;
  38. import org.jivesoftware.smackx.iot.Thing;
  39. import org.jivesoftware.smackx.iot.data.element.IoTDataField;
  40. import org.jivesoftware.smackx.iot.data.element.IoTDataReadOutAccepted;
  41. import org.jivesoftware.smackx.iot.data.element.IoTDataRequest;
  42. import org.jivesoftware.smackx.iot.data.element.IoTFieldsExtension;
  43. import org.jivesoftware.smackx.iot.data.filter.IoTFieldsExtensionFilter;
  44. import org.jivesoftware.smackx.iot.element.NodeInfo;

  45. import org.jxmpp.jid.EntityFullJid;

  46. /**
  47.  * A manager for XEP-0323: Internet of Things - Sensor Data.
  48.  *
  49.  * @author Florian Schmaus {@literal <flo@geekplace.eu>}
  50.  * @see <a href="http://xmpp.org/extensions/xep-0323.html">XEP-0323: Internet of Things - Sensor Data</a>
  51.  */
  52. public final class IoTDataManager extends IoTManager {

  53.     private static final Logger LOGGER = Logger.getLogger(IoTDataManager.class.getName());

  54.     private static final Map<XMPPConnection, IoTDataManager> INSTANCES = new WeakHashMap<>();

  55.     // Ensure a IoTDataManager exists for every connection.
  56.     static {
  57.         XMPPConnectionRegistry.addConnectionCreationListener(new ConnectionCreationListener() {
  58.             @Override
  59.             public void connectionCreated(XMPPConnection connection) {
  60.                 if (!isAutoEnableActive()) return;
  61.                 getInstanceFor(connection);
  62.             }
  63.         });
  64.     }

  65.     /**
  66.      * Get the manger instance responsible for the given connection.
  67.      *
  68.      * @param connection the XMPP connection.
  69.      * @return a manager instance.
  70.      */
  71.     public static synchronized IoTDataManager getInstanceFor(XMPPConnection connection) {
  72.         IoTDataManager manager = INSTANCES.get(connection);
  73.         if (manager == null) {
  74.             manager = new IoTDataManager(connection);
  75.             INSTANCES.put(connection, manager);
  76.         }
  77.         return manager;
  78.     }

  79.     private final AtomicInteger nextSeqNr = new AtomicInteger();

  80.     private final Map<NodeInfo, Thing> things = new ConcurrentHashMap<>();

  81.     private IoTDataManager(XMPPConnection connection) {
  82.         super(connection);
  83.         connection.registerIQRequestHandler(new IoTIqRequestHandler(IoTDataRequest.ELEMENT,
  84.                         IoTDataRequest.NAMESPACE, IQ.Type.get, Mode.async) {
  85.             @Override
  86.             public IQ handleIoTIqRequest(IQ iqRequest) {
  87.                 final IoTDataRequest dataRequest = (IoTDataRequest) iqRequest;

  88.                 if (!dataRequest.isMomentary()) {
  89.                     // TODO return error IQ that non momentary requests are not implemented yet.
  90.                     return null;
  91.                 }

  92.                 // TODO Add support for multiple things(/NodeInfos).
  93.                 final Thing thing = things.get(NodeInfo.EMPTY);
  94.                 if (thing == null) {
  95.                     // TODO return error if not at least one thing registered.
  96.                     return null;
  97.                 }

  98.                 ThingMomentaryReadOutRequest readOutRequest = thing.getMomentaryReadOutRequestHandler();
  99.                 if (readOutRequest == null) {
  100.                     // TODO Thing does not provide momentary read-out
  101.                     return null;
  102.                 }

  103.                 // Callback hell begins here. :) XEP-0323 decouples the read-out results from the IQ result. I'm not
  104.                 // sure if I would have made the same design decision but the reasons where likely being able to get a
  105.                 // fast read-out acknowledgement back to the requester even with sensors that take "a long time" to
  106.                 // read-out their values. I had designed that as special case and made the "results in IQ response" the
  107.                 // normal case.
  108.                 readOutRequest.momentaryReadOutRequest(new ThingMomentaryReadOutResult() {
  109.                     @Override
  110.                     public void momentaryReadOut(List<? extends IoTDataField> results) {
  111.                         IoTFieldsExtension iotFieldsExtension = IoTFieldsExtension.buildFor(dataRequest.getSequenceNr(), true, thing.getNodeInfo(), results);

  112.                         XMPPConnection connection = connection();
  113.                         Message message = connection.getStanzaFactory().buildMessageStanza()
  114.                                 .to(dataRequest.getFrom())
  115.                                 .addExtension(iotFieldsExtension)
  116.                                 .build();
  117.                         try {
  118.                             connection.sendStanza(message);
  119.                         }
  120.                         catch (NotConnectedException | InterruptedException e) {
  121.                             LOGGER.log(Level.SEVERE, "Could not send read-out response " + message, e);
  122.                         }
  123.                     }
  124.                 });

  125.                 return new IoTDataReadOutAccepted(dataRequest);
  126.             }
  127.         });
  128.     }

  129.     /**
  130.      * Install a thing in the manager. Activates data read out functionality (if provided by the
  131.      * thing).
  132.      *
  133.      * @param thing the thing to install.
  134.      */
  135.     public void installThing(Thing thing) {
  136.         things.put(thing.getNodeInfo(), thing);
  137.     }

  138.     public Thing uninstallThing(Thing thing) {
  139.         return uninstallThing(thing.getNodeInfo());
  140.     }

  141.     public Thing uninstallThing(NodeInfo nodeInfo) {
  142.         return things.remove(nodeInfo);
  143.     }

  144.     /**
  145.      * Try to read out a things momentary values.
  146.      *
  147.      * @param jid the full JID of the thing to read data from.
  148.      * @return a list with the read out data.
  149.      * @throws NoResponseException if there was no response from the remote entity.
  150.      * @throws XMPPErrorException if there was an XMPP error returned.
  151.      * @throws NotConnectedException if the XMPP connection is not connected.
  152.      * @throws InterruptedException if the calling thread was interrupted.
  153.      */
  154.     public List<IoTFieldsExtension> requestMomentaryValuesReadOut(EntityFullJid jid)
  155.                     throws NoResponseException, XMPPErrorException, NotConnectedException, InterruptedException {
  156.         final XMPPConnection connection = connection();
  157.         final int seqNr = nextSeqNr.incrementAndGet();
  158.         IoTDataRequest iotDataRequest = new IoTDataRequest(seqNr, true);
  159.         iotDataRequest.setTo(jid);

  160.         StanzaFilter doneFilter = new IoTFieldsExtensionFilter(seqNr, true);
  161.         StanzaFilter dataFilter = new IoTFieldsExtensionFilter(seqNr, false);

  162.         // Setup the IoTFieldsExtension message collectors before sending the IQ to avoid a data race.
  163.         StanzaCollector doneCollector = connection.createStanzaCollector(doneFilter);

  164.         StanzaCollector.Configuration dataCollectorConfiguration = StanzaCollector.newConfiguration().setStanzaFilter(
  165.                         dataFilter).setCollectorToReset(doneCollector);
  166.         StanzaCollector dataCollector = connection.createStanzaCollector(dataCollectorConfiguration);

  167.         try {
  168.             connection.sendIqRequestAndWaitForResponse(iotDataRequest);
  169.             // Wait until a message with an IoTFieldsExtension and the done flag comes in.
  170.             doneCollector.nextResult();
  171.         }
  172.         finally {
  173.             // Canceling dataCollector will also cancel the doneCollector since it is configured as dataCollector's
  174.             // collector to reset.
  175.             dataCollector.cancel();
  176.         }

  177.         int collectedCount = dataCollector.getCollectedCount();
  178.         List<IoTFieldsExtension> res = new ArrayList<>(collectedCount);
  179.         for (int i = 0; i < collectedCount; i++) {
  180.             Message message = dataCollector.pollResult();
  181.             IoTFieldsExtension iotFieldsExtension = IoTFieldsExtension.from(message);
  182.             res.add(iotFieldsExtension);
  183.         }

  184.         return res;
  185.     }
  186. }