001/**
002 *
003 * Copyright 2003-2006 Jive Software.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smackx.filetransfer;
018
019import org.jivesoftware.smack.PacketCollector;
020import org.jivesoftware.smack.SmackException;
021import org.jivesoftware.smack.SmackException.NoResponseException;
022import org.jivesoftware.smack.XMPPConnection;
023import org.jivesoftware.smack.XMPPException;
024import org.jivesoftware.smack.XMPPException.XMPPErrorException;
025import org.jivesoftware.smack.filter.OrFilter;
026import org.jivesoftware.smack.filter.PacketFilter;
027import org.jivesoftware.smack.packet.Packet;
028import org.jivesoftware.smackx.si.packet.StreamInitiation;
029
030import java.io.InputStream;
031import java.io.OutputStream;
032import java.util.concurrent.*;
033import java.util.List;
034import java.util.ArrayList;
035
036
037/**
038 * The fault tolerant negotiator takes two stream negotiators, the primary and the secondary
039 * negotiator. If the primary negotiator fails during the stream negotiaton process, the second
040 * negotiator is used.
041 */
042public class FaultTolerantNegotiator extends StreamNegotiator {
043
044    private StreamNegotiator primaryNegotiator;
045    private StreamNegotiator secondaryNegotiator;
046    private XMPPConnection connection;
047    private PacketFilter primaryFilter;
048    private PacketFilter secondaryFilter;
049
050    public FaultTolerantNegotiator(XMPPConnection connection, StreamNegotiator primary,
051            StreamNegotiator secondary) {
052        this.primaryNegotiator = primary;
053        this.secondaryNegotiator = secondary;
054        this.connection = connection;
055    }
056
057    public PacketFilter getInitiationPacketFilter(String from, String streamID) {
058        if (primaryFilter == null || secondaryFilter == null) {
059            primaryFilter = primaryNegotiator.getInitiationPacketFilter(from, streamID);
060            secondaryFilter = secondaryNegotiator.getInitiationPacketFilter(from, streamID);
061        }
062        return new OrFilter(primaryFilter, secondaryFilter);
063    }
064
065    InputStream negotiateIncomingStream(Packet streamInitiation) {
066        throw new UnsupportedOperationException("Negotiation only handled by create incoming " +
067                "stream method.");
068    }
069
070    final Packet initiateIncomingStream(XMPPConnection connection, StreamInitiation initiation) {
071        throw new UnsupportedOperationException("Initiation handled by createIncomingStream " +
072                "method");
073    }
074
075    public InputStream createIncomingStream(StreamInitiation initiation) throws SmackException {
076        PacketCollector collector = connection.createPacketCollector(
077                getInitiationPacketFilter(initiation.getFrom(), initiation.getSessionID()));
078
079        connection.sendPacket(super.createInitiationAccept(initiation, getNamespaces()));
080
081        ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(2);
082        CompletionService<InputStream> service
083                = new ExecutorCompletionService<InputStream>(threadPoolExecutor);
084        List<Future<InputStream>> futures = new ArrayList<Future<InputStream>>();
085        InputStream stream = null;
086        SmackException exception = null;
087        try {
088            futures.add(service.submit(new NegotiatorService(collector)));
089            futures.add(service.submit(new NegotiatorService(collector)));
090
091            int i = 0;
092            while (stream == null && i < futures.size()) {
093                Future<InputStream> future;
094                try {
095                    i++;
096                    future = service.poll(10, TimeUnit.SECONDS);
097                }
098                catch (InterruptedException e) {
099                    continue;
100                }
101
102                if (future == null) {
103                    continue;
104                }
105
106                try {
107                    stream = future.get();
108                }
109                catch (InterruptedException e) {
110                    /* Do Nothing */
111                }
112                catch (ExecutionException e) {
113                    exception = new SmackException(e.getCause());
114                }
115            }
116        }
117        finally {
118            for (Future<InputStream> future : futures) {
119                future.cancel(true);
120            }
121            collector.cancel();
122            threadPoolExecutor.shutdownNow();
123        }
124        if (stream == null) {
125            if (exception != null) {
126                throw exception;
127            }
128            else {
129                throw new SmackException("File transfer negotiation failed.");
130            }
131        }
132
133        return stream;
134    }
135
136    private StreamNegotiator determineNegotiator(Packet streamInitiation) {
137        return primaryFilter.accept(streamInitiation) ? primaryNegotiator : secondaryNegotiator;
138    }
139
140    public OutputStream createOutgoingStream(String streamID, String initiator, String target)
141                    throws SmackException, XMPPException {
142        OutputStream stream;
143        try {
144            stream = primaryNegotiator.createOutgoingStream(streamID, initiator, target);
145        }
146        catch (Exception ex) {
147            stream = secondaryNegotiator.createOutgoingStream(streamID, initiator, target);
148        }
149
150        return stream;
151    }
152
153    public String[] getNamespaces() {
154        String[] primary = primaryNegotiator.getNamespaces();
155        String[] secondary = secondaryNegotiator.getNamespaces();
156
157        String[] namespaces = new String[primary.length + secondary.length];
158        System.arraycopy(primary, 0, namespaces, 0, primary.length);
159        System.arraycopy(secondary, 0, namespaces, primary.length, secondary.length);
160
161        return namespaces;
162    }
163
164    public void cleanup() {
165    }
166
167    private class NegotiatorService implements Callable<InputStream> {
168
169        private PacketCollector collector;
170
171        NegotiatorService(PacketCollector collector) {
172            this.collector = collector;
173        }
174
175        public InputStream call() throws XMPPErrorException, InterruptedException, SmackException {
176            Packet streamInitiation = collector.nextResult();
177            if (streamInitiation == null) {
178                throw new NoResponseException();
179            }
180            StreamNegotiator negotiator = determineNegotiator(streamInitiation);
181            return negotiator.negotiateIncomingStream(streamInitiation);
182        }
183    }
184}