ModularXmppClientToServerConnection.java
/**
*
* Copyright 2018-2020 Florian Schmaus
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.smack.c2s;
import java.io.IOException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.net.ssl.SSLSession;
import org.jivesoftware.smack.AbstractXMPPConnection;
import org.jivesoftware.smack.SmackException;
import org.jivesoftware.smack.SmackException.NoResponseException;
import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.SmackFuture;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.XMPPException.FailedNonzaException;
import org.jivesoftware.smack.XMPPException.StreamErrorException;
import org.jivesoftware.smack.XMPPException.XMPPErrorException;
import org.jivesoftware.smack.XmppInputOutputFilter;
import org.jivesoftware.smack.c2s.XmppClientToServerTransport.LookupConnectionEndpointsFailed;
import org.jivesoftware.smack.c2s.XmppClientToServerTransport.LookupConnectionEndpointsResult;
import org.jivesoftware.smack.c2s.XmppClientToServerTransport.LookupConnectionEndpointsSuccess;
import org.jivesoftware.smack.c2s.internal.ModularXmppClientToServerConnectionInternal;
import org.jivesoftware.smack.c2s.internal.WalkStateGraphContext;
import org.jivesoftware.smack.fsm.ConnectionStateEvent;
import org.jivesoftware.smack.fsm.ConnectionStateMachineListener;
import org.jivesoftware.smack.fsm.LoginContext;
import org.jivesoftware.smack.fsm.NoOpState;
import org.jivesoftware.smack.fsm.State;
import org.jivesoftware.smack.fsm.StateDescriptor;
import org.jivesoftware.smack.fsm.StateDescriptorGraph;
import org.jivesoftware.smack.fsm.StateDescriptorGraph.GraphVertex;
import org.jivesoftware.smack.fsm.StateMachineException;
import org.jivesoftware.smack.fsm.StateTransitionResult;
import org.jivesoftware.smack.fsm.StateTransitionResult.AttemptResult;
import org.jivesoftware.smack.internal.AbstractStats;
import org.jivesoftware.smack.internal.SmackTlsContext;
import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smack.packet.Nonza;
import org.jivesoftware.smack.packet.Presence;
import org.jivesoftware.smack.packet.Stanza;
import org.jivesoftware.smack.packet.StreamClose;
import org.jivesoftware.smack.packet.StreamError;
import org.jivesoftware.smack.packet.TopLevelStreamElement;
import org.jivesoftware.smack.packet.XmlEnvironment;
import org.jivesoftware.smack.parsing.SmackParsingException;
import org.jivesoftware.smack.sasl.SASLErrorException;
import org.jivesoftware.smack.sasl.SASLMechanism;
import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
import org.jivesoftware.smack.util.ExtendedAppendable;
import org.jivesoftware.smack.util.PacketParserUtils;
import org.jivesoftware.smack.util.StringUtils;
import org.jivesoftware.smack.util.Supplier;
import org.jivesoftware.smack.xml.XmlPullParser;
import org.jivesoftware.smack.xml.XmlPullParserException;
import org.jxmpp.jid.parts.Resourcepart;
public final class ModularXmppClientToServerConnection extends AbstractXMPPConnection {
private static final Logger LOGGER = Logger.getLogger(ModularXmppClientToServerConnectionConfiguration.class.getName());
private final ArrayBlockingQueueWithShutdown<TopLevelStreamElement> outgoingElementsQueue = new ArrayBlockingQueueWithShutdown<>(
100, true);
private XmppClientToServerTransport activeTransport;
private final List<ConnectionStateMachineListener> connectionStateMachineListeners = new CopyOnWriteArrayList<>();
private boolean featuresReceived;
protected boolean streamResumed;
private GraphVertex<State> currentStateVertex;
private List<State> walkFromDisconnectToAuthenticated;
private final ModularXmppClientToServerConnectionConfiguration configuration;
private final ModularXmppClientToServerConnectionInternal connectionInternal;
private final Map<Class<? extends ModularXmppClientToServerConnectionModuleDescriptor>, ModularXmppClientToServerConnectionModule<? extends ModularXmppClientToServerConnectionModuleDescriptor>> connectionModules = new HashMap<>();
private final Map<Class<? extends ModularXmppClientToServerConnectionModuleDescriptor>, XmppClientToServerTransport> transports = new HashMap<>();
/**
* This is one of those cases where the field is modified by one thread and read by another. We currently use
* CopyOnWriteArrayList but should potentially use a VarHandle once Smack supports them.
*/
private final List<XmppInputOutputFilter> inputOutputFilters = new CopyOnWriteArrayList<>();
private List<XmppInputOutputFilter> previousInputOutputFilters;
public ModularXmppClientToServerConnection(ModularXmppClientToServerConnectionConfiguration configuration) {
super(configuration);
this.configuration = configuration;
// Construct the internal connection API.
connectionInternal = new ModularXmppClientToServerConnectionInternal(this, getReactor(), debugger, outgoingElementsQueue) {
@Override
public void parseAndProcessElement(String wrappedCompleteElement) {
ModularXmppClientToServerConnection.this.parseAndProcessElement(wrappedCompleteElement);
}
@Override
public void notifyConnectionError(Exception e) {
ModularXmppClientToServerConnection.this.notifyConnectionError(e);
}
@Override
public void onStreamOpen(XmlPullParser parser) {
ModularXmppClientToServerConnection.this.onStreamOpen(parser);
}
@Override
public void onStreamClosed() {
ModularXmppClientToServerConnection.this.closingStreamReceived = true;
notifyWaitingThreads();
}
@Override
public void fireFirstLevelElementSendListeners(TopLevelStreamElement element) {
ModularXmppClientToServerConnection.this.firePacketSendingListeners(element);
}
@Override
public void invokeConnectionStateMachineListener(ConnectionStateEvent connectionStateEvent) {
ModularXmppClientToServerConnection.this.invokeConnectionStateMachineListener(connectionStateEvent);
}
@Override
public XmlEnvironment getOutgoingStreamXmlEnvironment() {
return outgoingStreamXmlEnvironment;
}
@Override
public void addXmppInputOutputFilter(XmppInputOutputFilter xmppInputOutputFilter) {
inputOutputFilters.add(0, xmppInputOutputFilter);
}
@Override
public ListIterator<XmppInputOutputFilter> getXmppInputOutputFilterBeginIterator() {
return inputOutputFilters.listIterator();
}
@Override
public ListIterator<XmppInputOutputFilter> getXmppInputOutputFilterEndIterator() {
return inputOutputFilters.listIterator(inputOutputFilters.size());
}
@Override
public void newStreamOpenWaitForFeaturesSequence(String waitFor) throws InterruptedException,
SmackException, XMPPException {
ModularXmppClientToServerConnection.this.newStreamOpenWaitForFeaturesSequence(waitFor);
}
@Override
public SmackTlsContext getSmackTlsContext() {
return ModularXmppClientToServerConnection.this.getSmackTlsContext();
}
@Override
public <SN extends Nonza, FN extends Nonza> SN sendAndWaitForResponse(Nonza nonza, Class<SN> successNonzaClass,
Class<FN> failedNonzaClass) throws NoResponseException, NotConnectedException, FailedNonzaException, InterruptedException {
return ModularXmppClientToServerConnection.this.sendAndWaitForResponse(nonza, successNonzaClass, failedNonzaClass);
}
@Override
public void asyncGo(Runnable runnable) {
AbstractXMPPConnection.asyncGo(runnable);
}
@Override
public void waitForConditionOrThrowConnectionException(Supplier<Boolean> condition, String waitFor)
throws InterruptedException, SmackException, XMPPException {
ModularXmppClientToServerConnection.this.waitForConditionOrThrowConnectionException(condition, waitFor);
}
@Override
public void notifyWaitingThreads() {
ModularXmppClientToServerConnection.this.notifyWaitingThreads();
}
@Override
public void setCompressionEnabled(boolean compressionEnabled) {
ModularXmppClientToServerConnection.this.compressionEnabled = compressionEnabled;
}
@Override
public void setTransport(XmppClientToServerTransport xmppTransport) {
ModularXmppClientToServerConnection.this.activeTransport = xmppTransport;
ModularXmppClientToServerConnection.this.connected = true;
}
};
// Construct the modules from the module descriptor. We do this before constructing the state graph, as the
// modules are sometimes used to construct the states.
for (ModularXmppClientToServerConnectionModuleDescriptor moduleDescriptor : configuration.moduleDescriptors) {
Class<? extends ModularXmppClientToServerConnectionModuleDescriptor> moduleDescriptorClass = moduleDescriptor.getClass();
ModularXmppClientToServerConnectionModule<? extends ModularXmppClientToServerConnectionModuleDescriptor> connectionModule = moduleDescriptor.constructXmppConnectionModule(connectionInternal);
connectionModules.put(moduleDescriptorClass, connectionModule);
XmppClientToServerTransport transport = connectionModule.getTransport();
// Not every module may provide a transport.
if (transport != null) {
transports.put(moduleDescriptorClass, transport);
}
}
GraphVertex<StateDescriptor> initialStateDescriptorVertex = configuration.initialStateDescriptorVertex;
// Convert the graph of state descriptors to a graph of states, bound to this very connection.
currentStateVertex = StateDescriptorGraph.convertToStateGraph(initialStateDescriptorVertex, connectionInternal);
}
@SuppressWarnings("unchecked")
public <CM extends ModularXmppClientToServerConnectionModule<? extends ModularXmppClientToServerConnectionModuleDescriptor>> CM getConnectionModuleFor(
Class<? extends ModularXmppClientToServerConnectionModuleDescriptor> descriptorClass) {
return (CM) connectionModules.get(descriptorClass);
}
@Override
protected void loginInternal(String username, String password, Resourcepart resource)
throws XMPPException, SmackException, IOException, InterruptedException {
WalkStateGraphContext walkStateGraphContext = buildNewWalkTo(
AuthenticatedAndResourceBoundStateDescriptor.class).withLoginContext(username, password,
resource).build();
walkStateGraph(walkStateGraphContext);
}
protected WalkStateGraphContext.Builder buildNewWalkTo(Class<? extends StateDescriptor> finalStateClass) {
return WalkStateGraphContext.builder(currentStateVertex.getElement().getStateDescriptor().getClass(), finalStateClass);
}
/**
* Unwind the state. This will revert the effects of the state by calling {@link State#resetState()} prior issuing a
* connection state event of {@link ConnectionStateEvent#StateRevertBackwardsWalk}.
*
* @param revertedState the state which is going to get reverted.
*/
private void unwindState(State revertedState) {
invokeConnectionStateMachineListener(new ConnectionStateEvent.StateRevertBackwardsWalk(revertedState));
revertedState.resetState();
}
protected void walkStateGraph(WalkStateGraphContext walkStateGraphContext)
throws XMPPException, IOException, SmackException, InterruptedException {
// Save a copy of the current state
GraphVertex<State> previousStateVertex = currentStateVertex;
try {
walkStateGraphInternal(walkStateGraphContext);
} catch (IOException | SmackException | InterruptedException | XMPPException e) {
currentStateVertex = previousStateVertex;
// Unwind the state.
State revertedState = currentStateVertex.getElement();
unwindState(revertedState);
throw e;
}
}
private void walkStateGraphInternal(WalkStateGraphContext walkStateGraphContext)
throws IOException, SmackException, InterruptedException, XMPPException {
// Save a copy of the current state
final GraphVertex<State> initialStateVertex = currentStateVertex;
final State initialState = initialStateVertex.getElement();
final StateDescriptor initialStateDescriptor = initialState.getStateDescriptor();
walkStateGraphContext.recordWalkTo(initialState);
// Check if this is the walk's final state.
if (walkStateGraphContext.isWalksFinalState(initialStateDescriptor)) {
// If this is used as final state, then it should be marked as such.
assert initialStateDescriptor.isFinalState();
// We reached the final state.
invokeConnectionStateMachineListener(new ConnectionStateEvent.FinalStateReached(initialState));
return;
}
List<GraphVertex<State>> outgoingStateEdges = initialStateVertex.getOutgoingEdges();
// See if we need to handle mandatory intermediate states.
GraphVertex<State> mandatoryIntermediateStateVertex = walkStateGraphContext.maybeReturnMandatoryImmediateState(outgoingStateEdges);
if (mandatoryIntermediateStateVertex != null) {
StateTransitionResult reason = attemptEnterState(mandatoryIntermediateStateVertex, walkStateGraphContext);
if (reason instanceof StateTransitionResult.Success) {
walkStateGraph(walkStateGraphContext);
return;
}
// We could not enter a mandatory intermediate state. Throw here.
throw new StateMachineException.SmackMandatoryStateFailedException(
mandatoryIntermediateStateVertex.getElement(), reason);
}
for (Iterator<GraphVertex<State>> it = outgoingStateEdges.iterator(); it.hasNext();) {
GraphVertex<State> successorStateVertex = it.next();
State successorState = successorStateVertex.getElement();
// Ignore successorStateVertex if the only way to the final state is via the initial state. This happens
// typically if we are in the ConnectedButUnauthenticated state on the way to ResourceboundAndAuthenticated,
// where we do not want to walk via InstantShutdown/Shtudown in a cycle over the initial state towards this
// state.
if (walkStateGraphContext.wouldCauseCycle(successorStateVertex)) {
// Ignore this successor.
invokeConnectionStateMachineListener(new ConnectionStateEvent.TransitionIgnoredDueCycle(initialStateVertex, successorStateVertex));
} else {
StateTransitionResult result = attemptEnterState(successorStateVertex, walkStateGraphContext);
if (result instanceof StateTransitionResult.Success) {
break;
}
// If attemptEnterState did not throw and did not return a value of type TransitionSuccessResult, then we
// just record this value and go on from there. Note that reason may be null, which is returned by
// attemptEnterState in case the state was already successfully handled. If this is the case, then we don't
// record it.
if (result != null) {
walkStateGraphContext.recordFailedState(successorState, result);
}
}
if (!it.hasNext()) {
throw StateMachineException.SmackStateGraphDeadEndException.from(walkStateGraphContext,
initialStateVertex);
}
}
// Walk the state graph by recursion.
walkStateGraph(walkStateGraphContext);
}
/**
* Attempt to enter a state. Note that this method may return <code>null</code> if this state can be safely ignored.
*
* @param successorStateVertex the successor state vertex.
* @param walkStateGraphContext the "walk state graph" context.
* @return A state transition result or <code>null</code> if this state can be ignored.
* @throws SmackException if Smack detected an exceptional situation.
* @throws XMPPException if an XMPP protocol error was received.
* @throws IOException if an I/O error occurred.
* @throws InterruptedException if the calling thread was interrupted.
*/
private StateTransitionResult attemptEnterState(GraphVertex<State> successorStateVertex,
WalkStateGraphContext walkStateGraphContext) throws SmackException, XMPPException,
IOException, InterruptedException {
final GraphVertex<State> initialStateVertex = currentStateVertex;
final State initialState = initialStateVertex.getElement();
final State successorState = successorStateVertex.getElement();
final StateDescriptor successorStateDescriptor = successorState.getStateDescriptor();
if (!successorStateDescriptor.isMultiVisitState()
&& walkStateGraphContext.stateAlreadyVisited(successorState)) {
// This can happen if a state leads back to the state where it originated from. See for example the
// 'Compression' state. We return 'null' here to signal that the state can safely be ignored.
return null;
}
if (successorStateDescriptor.isNotImplemented()) {
StateTransitionResult.TransitionImpossibleBecauseNotImplemented transtionImpossibleBecauseNotImplemented = new StateTransitionResult.TransitionImpossibleBecauseNotImplemented(
successorStateDescriptor);
invokeConnectionStateMachineListener(new ConnectionStateEvent.TransitionNotPossible(initialState, successorState,
transtionImpossibleBecauseNotImplemented));
return transtionImpossibleBecauseNotImplemented;
}
final StateTransitionResult.AttemptResult transitionAttemptResult;
try {
StateTransitionResult.TransitionImpossible transitionImpossible = successorState.isTransitionToPossible(
walkStateGraphContext);
if (transitionImpossible != null) {
invokeConnectionStateMachineListener(new ConnectionStateEvent.TransitionNotPossible(initialState, successorState,
transitionImpossible));
return transitionImpossible;
}
invokeConnectionStateMachineListener(new ConnectionStateEvent.AboutToTransitionInto(initialState, successorState));
transitionAttemptResult = successorState.transitionInto(walkStateGraphContext);
} catch (SmackException | IOException | InterruptedException | XMPPException e) {
// Unwind the state here too, since this state will not be unwound by walkStateGraph(), as it will not
// become a predecessor state in the walk.
unwindState(successorState);
throw e;
}
if (transitionAttemptResult instanceof StateTransitionResult.Failure) {
StateTransitionResult.Failure transitionFailureResult = (StateTransitionResult.Failure) transitionAttemptResult;
invokeConnectionStateMachineListener(
new ConnectionStateEvent.TransitionFailed(initialState, successorState, transitionFailureResult));
return transitionAttemptResult;
}
// If transitionAttemptResult is not an instance of TransitionFailureResult, then it has to be of type
// TransitionSuccessResult.
StateTransitionResult.Success transitionSuccessResult = (StateTransitionResult.Success) transitionAttemptResult;
currentStateVertex = successorStateVertex;
invokeConnectionStateMachineListener(
new ConnectionStateEvent.SuccessfullyTransitionedInto(successorState, transitionSuccessResult));
return transitionSuccessResult;
}
@Override
protected void sendStanzaInternal(Stanza stanza) throws NotConnectedException, InterruptedException {
sendTopLevelStreamElement(stanza);
}
@Override
public void sendNonza(Nonza nonza) throws NotConnectedException, InterruptedException {
sendTopLevelStreamElement(nonza);
}
private void sendTopLevelStreamElement(TopLevelStreamElement element) throws NotConnectedException, InterruptedException {
final XmppClientToServerTransport transport = activeTransport;
if (transport == null) {
throw new NotConnectedException();
}
outgoingElementsQueue.put(element);
transport.notifyAboutNewOutgoingElements();
}
@Override
protected void shutdown() {
shutdown(false);
}
@Override
public synchronized void instantShutdown() {
shutdown(true);
}
@Override
public ModularXmppClientToServerConnectionConfiguration getConfiguration() {
return configuration;
}
private void shutdown(boolean instant) {
Class<? extends StateDescriptor> mandatoryIntermediateState;
if (instant) {
mandatoryIntermediateState = InstantShutdownStateDescriptor.class;
} else {
mandatoryIntermediateState = ShutdownStateDescriptor.class;
}
WalkStateGraphContext context = buildNewWalkTo(
DisconnectedStateDescriptor.class).withMandatoryIntermediateState(
mandatoryIntermediateState).build();
try {
walkStateGraph(context);
} catch (IOException | SmackException | InterruptedException | XMPPException e) {
throw new IllegalStateException("A walk to disconnected state should never throw", e);
}
}
protected SSLSession getSSLSession() {
final XmppClientToServerTransport transport = activeTransport;
if (transport == null) {
return null;
}
return transport.getSslSession();
}
@Override
protected void afterFeaturesReceived() {
featuresReceived = true;
notifyWaitingThreads();
}
protected void parseAndProcessElement(String element) {
try {
XmlPullParser parser = PacketParserUtils.getParserFor(element);
// Skip the enclosing stream open what is guaranteed to be there.
parser.next();
XmlPullParser.Event event = parser.getEventType();
outerloop: while (true) {
switch (event) {
case START_ELEMENT:
final String name = parser.getName();
// Note that we don't handle "stream" here as it's done in the splitter.
switch (name) {
case Message.ELEMENT:
case IQ.IQ_ELEMENT:
case Presence.ELEMENT:
try {
parseAndProcessStanza(parser);
} finally {
// TODO: Here would be the following stream management code.
// clientHandledStanzasCount = SMUtils.incrementHeight(clientHandledStanzasCount);
}
break;
case "error":
StreamError streamError = PacketParserUtils.parseStreamError(parser, null);
StreamErrorException streamErrorException = new StreamErrorException(streamError);
currentXmppException = streamErrorException;
notifyWaitingThreads();
throw streamErrorException;
case "features":
parseFeatures(parser);
afterFeaturesReceived();
break;
default:
parseAndProcessNonza(parser);
break;
}
break;
case END_DOCUMENT:
break outerloop;
default: // fall out
}
event = parser.next();
}
} catch (XmlPullParserException | IOException | InterruptedException | StreamErrorException
| SmackParsingException e) {
notifyConnectionError(e);
}
}
protected synchronized void prepareToWaitForFeaturesReceived() {
featuresReceived = false;
}
protected void waitForFeaturesReceived(String waitFor)
throws InterruptedException, SmackException, XMPPException {
waitForConditionOrThrowConnectionException(() -> featuresReceived, waitFor);
}
protected void newStreamOpenWaitForFeaturesSequence(String waitFor) throws InterruptedException,
SmackException, XMPPException {
prepareToWaitForFeaturesReceived();
sendStreamOpen();
waitForFeaturesReceived(waitFor);
}
public static class DisconnectedStateDescriptor extends StateDescriptor {
protected DisconnectedStateDescriptor() {
super(DisconnectedState.class, StateDescriptor.Property.finalState);
addSuccessor(LookupRemoteConnectionEndpointsStateDescriptor.class);
}
}
private final class DisconnectedState extends State {
private DisconnectedState(StateDescriptor stateDescriptor, ModularXmppClientToServerConnectionInternal connectionInternal) {
super(stateDescriptor, connectionInternal);
}
@Override
public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext) {
synchronized (ModularXmppClientToServerConnection.this) {
if (inputOutputFilters.isEmpty()) {
previousInputOutputFilters = null;
} else {
previousInputOutputFilters = new ArrayList<>(inputOutputFilters.size());
previousInputOutputFilters.addAll(inputOutputFilters);
inputOutputFilters.clear();
}
}
// Reset all states we have visited when transitioning from disconnected to authenticated. This assumes that
// every state after authenticated does not need to be reset.
ListIterator<State> it = walkFromDisconnectToAuthenticated.listIterator(
walkFromDisconnectToAuthenticated.size());
while (it.hasPrevious()) {
State stateToReset = it.previous();
stateToReset.resetState();
}
walkFromDisconnectToAuthenticated = null;
return StateTransitionResult.Success.EMPTY_INSTANCE;
}
}
public static final class LookupRemoteConnectionEndpointsStateDescriptor extends StateDescriptor {
private LookupRemoteConnectionEndpointsStateDescriptor() {
super(LookupRemoteConnectionEndpointsState.class);
}
}
private final class LookupRemoteConnectionEndpointsState extends State {
boolean outgoingElementsQueueWasShutdown;
private LookupRemoteConnectionEndpointsState(StateDescriptor stateDescriptor, ModularXmppClientToServerConnectionInternal connectionInternal) {
super(stateDescriptor, connectionInternal);
}
@Override
public AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext) throws XMPPErrorException,
SASLErrorException, IOException, SmackException, InterruptedException, FailedNonzaException {
// There is a challenge here: We are going to trigger the discovery of endpoints which will run
// asynchronously. After a timeout, all discovered endpoints are collected. To prevent stale results from
// previous discover runs, the results are communicated via SmackFuture, so that we always handle the most
// up-to-date results.
Map<XmppClientToServerTransport, List<SmackFuture<LookupConnectionEndpointsResult, Exception>>> lookupFutures = new HashMap<>(
transports.size());
final int numberOfFutures;
{
List<SmackFuture<?, ?>> allFutures = new ArrayList<>();
for (XmppClientToServerTransport transport : transports.values()) {
// First we clear the transport of any potentially previously discovered connection endpoints.
transport.resetDiscoveredConnectionEndpoints();
// Ask the transport to start the discovery of remote connection endpoints asynchronously.
List<SmackFuture<LookupConnectionEndpointsResult, Exception>> transportFutures = transport.lookupConnectionEndpoints();
lookupFutures.put(transport, transportFutures);
allFutures.addAll(transportFutures);
}
numberOfFutures = allFutures.size();
// Wait until all features are ready or if the timeout occurs. Note that we do not inspect and react the
// return value of SmackFuture.await() as we want to collect the LookupConnectionEndpointsFailed later.
SmackFuture.await(allFutures, getReplyTimeout(), TimeUnit.MILLISECONDS);
}
// Note that we do not pass the lookupFailures in case there is at least one successful transport. The
// lookup failures are also recording in LookupConnectionEndpointsSuccess, e.g. as part of
// RemoteXmppTcpConnectionEndpoints.Result.
List<LookupConnectionEndpointsFailed> lookupFailures = new ArrayList<>(numberOfFutures);
boolean atLeastOneConnectionEndpointDiscovered = false;
for (Map.Entry<XmppClientToServerTransport, List<SmackFuture<LookupConnectionEndpointsResult, Exception>>> entry : lookupFutures.entrySet()) {
XmppClientToServerTransport transport = entry.getKey();
for (SmackFuture<LookupConnectionEndpointsResult, Exception> future : entry.getValue()) {
LookupConnectionEndpointsResult result = future.getIfAvailable();
if (result == null) {
continue;
}
if (result instanceof LookupConnectionEndpointsFailed) {
LookupConnectionEndpointsFailed lookupFailure = (LookupConnectionEndpointsFailed) result;
lookupFailures.add(lookupFailure);
continue;
}
LookupConnectionEndpointsSuccess successResult = (LookupConnectionEndpointsSuccess) result;
// Arm the transport with the success result, so that its information can be used by the transport
// to establish the connection.
transport.loadConnectionEndpoints(successResult);
// Mark that the connection attempt can continue.
atLeastOneConnectionEndpointDiscovered = true;
}
}
if (!atLeastOneConnectionEndpointDiscovered) {
throw SmackException.NoEndpointsDiscoveredException.from(lookupFailures);
}
// Even though the outgoing elements queue is unrelated to the lookup remote connection endpoints state, we
// do start the queue at this point. The transports will need it available, and we use the state's reset()
// function to close the queue again on failure.
outgoingElementsQueueWasShutdown = outgoingElementsQueue.start();
return StateTransitionResult.Success.EMPTY_INSTANCE;
}
@Override
public void resetState() {
for (XmppClientToServerTransport transport : transports.values()) {
transport.resetDiscoveredConnectionEndpoints();
}
if (outgoingElementsQueueWasShutdown) {
// Reset the outgoing elements queue in this state, since we also start it in this state.
outgoingElementsQueue.shutdown();
}
}
}
public static final class ConnectedButUnauthenticatedStateDescriptor extends StateDescriptor {
private ConnectedButUnauthenticatedStateDescriptor() {
super(ConnectedButUnauthenticatedState.class, StateDescriptor.Property.finalState);
addSuccessor(SaslAuthenticationStateDescriptor.class);
addSuccessor(InstantShutdownStateDescriptor.class);
addSuccessor(ShutdownStateDescriptor.class);
}
}
private final class ConnectedButUnauthenticatedState extends State {
private ConnectedButUnauthenticatedState(StateDescriptor stateDescriptor, ModularXmppClientToServerConnectionInternal connectionInternal) {
super(stateDescriptor, connectionInternal);
}
@Override
public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext) {
assert walkFromDisconnectToAuthenticated == null;
if (walkStateGraphContext.isWalksFinalState(getStateDescriptor())) {
// If this is the final state, then record the walk so far.
walkFromDisconnectToAuthenticated = walkStateGraphContext.getWalk();
}
connected = true;
return StateTransitionResult.Success.EMPTY_INSTANCE;
}
@Override
public void resetState() {
connected = false;
}
}
public static final class SaslAuthenticationStateDescriptor extends StateDescriptor {
private SaslAuthenticationStateDescriptor() {
super(SaslAuthenticationState.class, "RFC 6120 § 6");
addSuccessor(AuthenticatedButUnboundStateDescriptor.class);
}
}
private final class SaslAuthenticationState extends State {
private SaslAuthenticationState(StateDescriptor stateDescriptor, ModularXmppClientToServerConnectionInternal connectionInternal) {
super(stateDescriptor, connectionInternal);
}
@Override
public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext)
throws IOException, SmackException, InterruptedException, XMPPException {
prepareToWaitForFeaturesReceived();
LoginContext loginContext = walkStateGraphContext.getLoginContext();
SASLMechanism usedSaslMechanism = authenticate(loginContext.username, loginContext.password,
config.getAuthzid(), getSSLSession());
// authenticate() will only return if the SASL authentication was successful, but we also need to wait for
// the next round of stream features.
waitForFeaturesReceived("server stream features after SASL authentication");
return new SaslAuthenticationSuccessResult(usedSaslMechanism);
}
}
public static final class SaslAuthenticationSuccessResult extends StateTransitionResult.Success {
private final String saslMechanismName;
private SaslAuthenticationSuccessResult(SASLMechanism usedSaslMechanism) {
super("SASL authentication successfull using " + usedSaslMechanism.getName());
this.saslMechanismName = usedSaslMechanism.getName();
}
public String getSaslMechanismName() {
return saslMechanismName;
}
}
public static final class AuthenticatedButUnboundStateDescriptor extends StateDescriptor {
private AuthenticatedButUnboundStateDescriptor() {
super(StateDescriptor.Property.multiVisitState);
addSuccessor(ResourceBindingStateDescriptor.class);
}
}
public static final class ResourceBindingStateDescriptor extends StateDescriptor {
private ResourceBindingStateDescriptor() {
super(ResourceBindingState.class, "RFC 6120 § 7");
addSuccessor(AuthenticatedAndResourceBoundStateDescriptor.class);
}
}
private final class ResourceBindingState extends State {
private ResourceBindingState(StateDescriptor stateDescriptor, ModularXmppClientToServerConnectionInternal connectionInternal) {
super(stateDescriptor, connectionInternal);
}
@Override
public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext)
throws IOException, SmackException, InterruptedException, XMPPException {
// Calling bindResourceAndEstablishSession() below requires the lastFeaturesReceived sync point to be signaled.
// Since we entered this state, the FSM has decided that the last features have been received, hence signal
// the sync point.
lastFeaturesReceived = true;
notifyWaitingThreads();
LoginContext loginContext = walkStateGraphContext.getLoginContext();
Resourcepart resource = bindResourceAndEstablishSession(loginContext.resource);
// TODO: This should be a field in the Stream Management (SM) module. Here should be hook which the SM
// module can use to set the field instead.
streamResumed = false;
return new ResourceBoundResult(resource, loginContext.resource);
}
}
public static final class ResourceBoundResult extends StateTransitionResult.Success {
private final Resourcepart resource;
private ResourceBoundResult(Resourcepart boundResource, Resourcepart requestedResource) {
super("Resource '" + boundResource + "' bound (requested: '" + requestedResource + "')");
this.resource = boundResource;
}
public Resourcepart getResource() {
return resource;
}
}
private boolean compressionEnabled;
@Override
public boolean isUsingCompression() {
return compressionEnabled;
}
public static final class AuthenticatedAndResourceBoundStateDescriptor extends StateDescriptor {
private AuthenticatedAndResourceBoundStateDescriptor() {
super(AuthenticatedAndResourceBoundState.class, StateDescriptor.Property.finalState);
addSuccessor(InstantShutdownStateDescriptor.class);
addSuccessor(ShutdownStateDescriptor.class);
}
}
private final class AuthenticatedAndResourceBoundState extends State {
private AuthenticatedAndResourceBoundState(StateDescriptor stateDescriptor,
ModularXmppClientToServerConnectionInternal connectionInternal) {
super(stateDescriptor, connectionInternal);
}
@Override
public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext)
throws NotConnectedException, InterruptedException {
if (walkFromDisconnectToAuthenticated != null) {
// If there was already a previous walk to ConnectedButUnauthenticated, then the context of the current
// walk must not start from the 'Disconnected' state.
assert walkStateGraphContext.getWalk().get(0).getStateDescriptor().getClass()
!= DisconnectedStateDescriptor.class;
// Append the current walk to the previous one.
walkStateGraphContext.appendWalkTo(walkFromDisconnectToAuthenticated);
} else {
walkFromDisconnectToAuthenticated = new ArrayList<>(
walkStateGraphContext.getWalkLength() + 1);
walkStateGraphContext.appendWalkTo(walkFromDisconnectToAuthenticated);
}
walkFromDisconnectToAuthenticated.add(this);
afterSuccessfulLogin(streamResumed);
return StateTransitionResult.Success.EMPTY_INSTANCE;
}
@Override
public void resetState() {
authenticated = false;
}
}
static final class ShutdownStateDescriptor extends StateDescriptor {
private ShutdownStateDescriptor() {
super(ShutdownState.class);
addSuccessor(CloseConnectionStateDescriptor.class);
}
}
private final class ShutdownState extends State {
private ShutdownState(StateDescriptor stateDescriptor,
ModularXmppClientToServerConnectionInternal connectionInternal) {
super(stateDescriptor, connectionInternal);
}
@Override
public StateTransitionResult.TransitionImpossible isTransitionToPossible(WalkStateGraphContext walkStateGraphContext) {
ensureNotOnOurWayToAuthenticatedAndResourceBound(walkStateGraphContext);
return null;
}
@Override
public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext) {
closingStreamReceived = false;
boolean streamCloseIssued = outgoingElementsQueue.offerAndShutdown(StreamClose.INSTANCE);
if (streamCloseIssued) {
activeTransport.notifyAboutNewOutgoingElements();
boolean successfullyReceivedStreamClose = waitForClosingStreamTagFromServer();
if (successfullyReceivedStreamClose) {
for (Iterator<XmppInputOutputFilter> it = connectionInternal.getXmppInputOutputFilterBeginIterator(); it.hasNext();) {
XmppInputOutputFilter filter = it.next();
filter.closeInputOutput();
}
// Closing the filters may produced new outgoing data. Notify the transport about it.
activeTransport.afterFiltersClosed();
for (Iterator<XmppInputOutputFilter> it = connectionInternal.getXmppInputOutputFilterBeginIterator(); it.hasNext();) {
XmppInputOutputFilter filter = it.next();
try {
filter.waitUntilInputOutputClosed();
} catch (IOException | CertificateException | InterruptedException | SmackException | XMPPException e) {
LOGGER.log(Level.WARNING, "waitUntilInputOutputClosed() threw", e);
}
}
// For correctness we set authenticated to false here, even though we will later again set it to
// false in the disconnected state.
authenticated = false;
}
}
return StateTransitionResult.Success.EMPTY_INSTANCE;
}
}
static final class InstantShutdownStateDescriptor extends StateDescriptor {
private InstantShutdownStateDescriptor() {
super(InstantShutdownState.class);
addSuccessor(CloseConnectionStateDescriptor.class);
}
}
private static final class InstantShutdownState extends NoOpState {
private InstantShutdownState(ModularXmppClientToServerConnection connection, StateDescriptor stateDescriptor, ModularXmppClientToServerConnectionInternal connectionInternal) {
super(connection, stateDescriptor, connectionInternal);
}
@Override
public StateTransitionResult.TransitionImpossible isTransitionToPossible(WalkStateGraphContext walkStateGraphContext) {
ensureNotOnOurWayToAuthenticatedAndResourceBound(walkStateGraphContext);
return null;
}
}
private static final class CloseConnectionStateDescriptor extends StateDescriptor {
private CloseConnectionStateDescriptor() {
super(CloseConnectionState.class);
addSuccessor(DisconnectedStateDescriptor.class);
}
}
private final class CloseConnectionState extends State {
private CloseConnectionState(StateDescriptor stateDescriptor,
ModularXmppClientToServerConnectionInternal connectionInternal) {
super(stateDescriptor, connectionInternal);
}
@Override
public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext) {
activeTransport.disconnect();
activeTransport = null;
authenticated = connected = false;
return StateTransitionResult.Success.EMPTY_INSTANCE;
}
}
public void addConnectionStateMachineListener(ConnectionStateMachineListener connectionStateMachineListener) {
connectionStateMachineListeners.add(connectionStateMachineListener);
}
public boolean removeConnectionStateMachineListener(ConnectionStateMachineListener connectionStateMachineListener) {
return connectionStateMachineListeners.remove(connectionStateMachineListener);
}
protected void invokeConnectionStateMachineListener(ConnectionStateEvent connectionStateEvent) {
if (connectionStateMachineListeners.isEmpty()) {
return;
}
ASYNC_BUT_ORDERED.performAsyncButOrdered(this, () -> {
for (ConnectionStateMachineListener connectionStateMachineListener : connectionStateMachineListeners) {
connectionStateMachineListener.onConnectionStateEvent(connectionStateEvent, this);
}
});
}
@Override
public boolean isSecureConnection() {
final XmppClientToServerTransport transport = activeTransport;
if (transport == null) {
return false;
}
return transport.isTransportSecured();
}
@Override
protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException {
WalkStateGraphContext walkStateGraphContext = buildNewWalkTo(ConnectedButUnauthenticatedStateDescriptor.class)
.build();
walkStateGraph(walkStateGraphContext);
}
protected Map<String, Object> getFilterStats() {
Collection<XmppInputOutputFilter> filters;
synchronized (this) {
if (inputOutputFilters.isEmpty() && previousInputOutputFilters != null) {
filters = previousInputOutputFilters;
} else {
filters = inputOutputFilters;
}
}
Map<String, Object> filterStats = new HashMap<>(filters.size());
for (XmppInputOutputFilter xmppInputOutputFilter : filters) {
Object stats = xmppInputOutputFilter.getStats();
String filterName = xmppInputOutputFilter.getFilterName();
filterStats.put(filterName, stats);
}
return filterStats;
}
public Stats getStats() {
Map<Class<? extends ModularXmppClientToServerConnectionModuleDescriptor>, XmppClientToServerTransport.Stats> transportsStats = new HashMap<>(
transports.size());
for (Map.Entry<Class<? extends ModularXmppClientToServerConnectionModuleDescriptor>, XmppClientToServerTransport> entry : transports.entrySet()) {
XmppClientToServerTransport.Stats transportStats = entry.getValue().getStats();
transportsStats.put(entry.getKey(), transportStats);
}
Map<String, Object> filterStats = getFilterStats();
return new Stats(transportsStats, filterStats);
}
public static final class Stats extends AbstractStats {
public final Map<Class<? extends ModularXmppClientToServerConnectionModuleDescriptor>, XmppClientToServerTransport.Stats> transportsStats;
public final Map<String, Object> filtersStats;
private Stats(Map<Class<? extends ModularXmppClientToServerConnectionModuleDescriptor>, XmppClientToServerTransport.Stats> transportsStats,
Map<String, Object> filtersStats) {
this.transportsStats = Collections.unmodifiableMap(transportsStats);
this.filtersStats = Collections.unmodifiableMap(filtersStats);
}
@Override
public void appendStatsTo(ExtendedAppendable appendable) throws IOException {
StringUtils.appendHeading(appendable, "Connection stats", '#').append('\n');
for (Map.Entry<Class<? extends ModularXmppClientToServerConnectionModuleDescriptor>, XmppClientToServerTransport.Stats> entry : transportsStats.entrySet()) {
Class<? extends ModularXmppClientToServerConnectionModuleDescriptor> transportClass = entry.getKey();
XmppClientToServerTransport.Stats stats = entry.getValue();
StringUtils.appendHeading(appendable, transportClass.getName());
appendable.append(stats.toString()).append('\n');
}
for (Map.Entry<String, Object> entry : filtersStats.entrySet()) {
String filterName = entry.getKey();
Object filterStats = entry.getValue();
StringUtils.appendHeading(appendable, filterName);
appendable.append(filterStats.toString()).append('\n');
}
}
}
}