001/** 002 * 003 * Copyright 2016 Florian Schmaus 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smackx.iot.data; 018 019import java.util.ArrayList; 020import java.util.List; 021import java.util.Map; 022import java.util.WeakHashMap; 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.concurrent.atomic.AtomicInteger; 025import java.util.logging.Level; 026import java.util.logging.Logger; 027 028import org.jivesoftware.smack.ConnectionCreationListener; 029import org.jivesoftware.smack.SmackException.NoResponseException; 030import org.jivesoftware.smack.SmackException.NotConnectedException; 031import org.jivesoftware.smack.StanzaCollector; 032import org.jivesoftware.smack.XMPPConnection; 033import org.jivesoftware.smack.XMPPConnectionRegistry; 034import org.jivesoftware.smack.XMPPException.XMPPErrorException; 035import org.jivesoftware.smack.filter.StanzaFilter; 036import org.jivesoftware.smack.iqrequest.IQRequestHandler.Mode; 037import org.jivesoftware.smack.packet.IQ; 038import org.jivesoftware.smack.packet.Message; 039 040import org.jivesoftware.smackx.iot.IoTManager; 041import org.jivesoftware.smackx.iot.Thing; 042import org.jivesoftware.smackx.iot.data.element.IoTDataField; 043import org.jivesoftware.smackx.iot.data.element.IoTDataReadOutAccepted; 044import org.jivesoftware.smackx.iot.data.element.IoTDataRequest; 045import org.jivesoftware.smackx.iot.data.element.IoTFieldsExtension; 046import org.jivesoftware.smackx.iot.data.filter.IoTFieldsExtensionFilter; 047import org.jivesoftware.smackx.iot.element.NodeInfo; 048 049import org.jxmpp.jid.EntityFullJid; 050 051/** 052 * A manager for XEP-0323: Internet of Things - Sensor Data. 053 * 054 * @author Florian Schmaus {@literal <flo@geekplace.eu>} 055 * @see <a href="http://xmpp.org/extensions/xep-0323.html">XEP-0323: Internet of Things - Sensor Data</a> 056 */ 057public final class IoTDataManager extends IoTManager { 058 059 private static final Logger LOGGER = Logger.getLogger(IoTDataManager.class.getName()); 060 061 private static final Map<XMPPConnection, IoTDataManager> INSTANCES = new WeakHashMap<>(); 062 063 // Ensure a IoTDataManager exists for every connection. 064 static { 065 XMPPConnectionRegistry.addConnectionCreationListener(new ConnectionCreationListener() { 066 @Override 067 public void connectionCreated(XMPPConnection connection) { 068 if (!isAutoEnableActive()) return; 069 getInstanceFor(connection); 070 } 071 }); 072 } 073 074 /** 075 * Get the manger instance responsible for the given connection. 076 * 077 * @param connection the XMPP connection. 078 * @return a manager instance. 079 */ 080 public static synchronized IoTDataManager getInstanceFor(XMPPConnection connection) { 081 IoTDataManager manager = INSTANCES.get(connection); 082 if (manager == null) { 083 manager = new IoTDataManager(connection); 084 INSTANCES.put(connection, manager); 085 } 086 return manager; 087 } 088 089 private final AtomicInteger nextSeqNr = new AtomicInteger(); 090 091 private final Map<NodeInfo, Thing> things = new ConcurrentHashMap<>(); 092 093 private IoTDataManager(XMPPConnection connection) { 094 super(connection); 095 connection.registerIQRequestHandler(new IoTIqRequestHandler(IoTDataRequest.ELEMENT, 096 IoTDataRequest.NAMESPACE, IQ.Type.get, Mode.async) { 097 @Override 098 public IQ handleIoTIqRequest(IQ iqRequest) { 099 final IoTDataRequest dataRequest = (IoTDataRequest) iqRequest; 100 101 if (!dataRequest.isMomentary()) { 102 // TODO return error IQ that non momentary requests are not implemented yet. 103 return null; 104 } 105 106 // TODO Add support for multiple things(/NodeInfos). 107 final Thing thing = things.get(NodeInfo.EMPTY); 108 if (thing == null) { 109 // TODO return error if not at least one thing registered. 110 return null; 111 } 112 113 ThingMomentaryReadOutRequest readOutRequest = thing.getMomentaryReadOutRequestHandler(); 114 if (readOutRequest == null) { 115 // TODO Thing does not provide momentary read-out 116 return null; 117 } 118 119 // Callback hell begins here. :) XEP-0323 decouples the read-out results from the IQ result. I'm not 120 // sure if I would have made the same design decision but the reasons where likely being able to get a 121 // fast read-out acknowledgement back to the requester even with sensors that take "a long time" to 122 // read-out their values. I had designed that as special case and made the "results in IQ response" the 123 // normal case. 124 readOutRequest.momentaryReadOutRequest(new ThingMomentaryReadOutResult() { 125 @Override 126 public void momentaryReadOut(List<? extends IoTDataField> results) { 127 IoTFieldsExtension iotFieldsExtension = IoTFieldsExtension.buildFor(dataRequest.getSequenceNr(), true, thing.getNodeInfo(), results); 128 129 XMPPConnection connection = connection(); 130 Message message = connection.getStanzaFactory().buildMessageStanza() 131 .to(dataRequest.getFrom()) 132 .addExtension(iotFieldsExtension) 133 .build(); 134 try { 135 connection.sendStanza(message); 136 } 137 catch (NotConnectedException | InterruptedException e) { 138 LOGGER.log(Level.SEVERE, "Could not send read-out response " + message, e); 139 } 140 } 141 }); 142 143 return new IoTDataReadOutAccepted(dataRequest); 144 } 145 }); 146 } 147 148 /** 149 * Install a thing in the manager. Activates data read out functionality (if provided by the 150 * thing). 151 * 152 * @param thing the thing to install. 153 */ 154 public void installThing(Thing thing) { 155 things.put(thing.getNodeInfo(), thing); 156 } 157 158 public Thing uninstallThing(Thing thing) { 159 return uninstallThing(thing.getNodeInfo()); 160 } 161 162 public Thing uninstallThing(NodeInfo nodeInfo) { 163 return things.remove(nodeInfo); 164 } 165 166 /** 167 * Try to read out a things momentary values. 168 * 169 * @param jid the full JID of the thing to read data from. 170 * @return a list with the read out data. 171 * @throws NoResponseException if there was no response from the remote entity. 172 * @throws XMPPErrorException if there was an XMPP error returned. 173 * @throws NotConnectedException if the XMPP connection is not connected. 174 * @throws InterruptedException if the calling thread was interrupted. 175 */ 176 public List<IoTFieldsExtension> requestMomentaryValuesReadOut(EntityFullJid jid) 177 throws NoResponseException, XMPPErrorException, NotConnectedException, InterruptedException { 178 final XMPPConnection connection = connection(); 179 final int seqNr = nextSeqNr.incrementAndGet(); 180 IoTDataRequest iotDataRequest = new IoTDataRequest(seqNr, true); 181 iotDataRequest.setTo(jid); 182 183 StanzaFilter doneFilter = new IoTFieldsExtensionFilter(seqNr, true); 184 StanzaFilter dataFilter = new IoTFieldsExtensionFilter(seqNr, false); 185 186 // Setup the IoTFieldsExtension message collectors before sending the IQ to avoid a data race. 187 StanzaCollector doneCollector = connection.createStanzaCollector(doneFilter); 188 189 StanzaCollector.Configuration dataCollectorConfiguration = StanzaCollector.newConfiguration().setStanzaFilter( 190 dataFilter).setCollectorToReset(doneCollector); 191 StanzaCollector dataCollector = connection.createStanzaCollector(dataCollectorConfiguration); 192 193 try { 194 connection.sendIqRequestAndWaitForResponse(iotDataRequest); 195 // Wait until a message with an IoTFieldsExtension and the done flag comes in. 196 doneCollector.nextResult(); 197 } 198 finally { 199 // Canceling dataCollector will also cancel the doneCollector since it is configured as dataCollector's 200 // collector to reset. 201 dataCollector.cancel(); 202 } 203 204 int collectedCount = dataCollector.getCollectedCount(); 205 List<IoTFieldsExtension> res = new ArrayList<>(collectedCount); 206 for (int i = 0; i < collectedCount; i++) { 207 Message message = dataCollector.pollResult(); 208 IoTFieldsExtension iotFieldsExtension = IoTFieldsExtension.from(message); 209 res.add(iotFieldsExtension); 210 } 211 212 return res; 213 } 214}