001/** 002 * 003 * Copyright 2003-2006 Jive Software. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smackx.filetransfer; 018 019import org.jivesoftware.smack.PacketCollector; 020import org.jivesoftware.smack.SmackException; 021import org.jivesoftware.smack.SmackException.NoResponseException; 022import org.jivesoftware.smack.XMPPConnection; 023import org.jivesoftware.smack.XMPPException; 024import org.jivesoftware.smack.XMPPException.XMPPErrorException; 025import org.jivesoftware.smack.filter.OrFilter; 026import org.jivesoftware.smack.filter.PacketFilter; 027import org.jivesoftware.smack.packet.Packet; 028import org.jivesoftware.smackx.si.packet.StreamInitiation; 029 030import java.io.InputStream; 031import java.io.OutputStream; 032import java.util.concurrent.*; 033import java.util.List; 034import java.util.ArrayList; 035 036 037/** 038 * The fault tolerant negotiator takes two stream negotiators, the primary and the secondary 039 * negotiator. If the primary negotiator fails during the stream negotiaton process, the second 040 * negotiator is used. 041 */ 042public class FaultTolerantNegotiator extends StreamNegotiator { 043 044 private StreamNegotiator primaryNegotiator; 045 private StreamNegotiator secondaryNegotiator; 046 private XMPPConnection connection; 047 private PacketFilter primaryFilter; 048 private PacketFilter secondaryFilter; 049 050 public FaultTolerantNegotiator(XMPPConnection connection, StreamNegotiator primary, 051 StreamNegotiator secondary) { 052 this.primaryNegotiator = primary; 053 this.secondaryNegotiator = secondary; 054 this.connection = connection; 055 } 056 057 public PacketFilter getInitiationPacketFilter(String from, String streamID) { 058 if (primaryFilter == null || secondaryFilter == null) { 059 primaryFilter = primaryNegotiator.getInitiationPacketFilter(from, streamID); 060 secondaryFilter = secondaryNegotiator.getInitiationPacketFilter(from, streamID); 061 } 062 return new OrFilter(primaryFilter, secondaryFilter); 063 } 064 065 InputStream negotiateIncomingStream(Packet streamInitiation) { 066 throw new UnsupportedOperationException("Negotiation only handled by create incoming " + 067 "stream method."); 068 } 069 070 final Packet initiateIncomingStream(XMPPConnection connection, StreamInitiation initiation) { 071 throw new UnsupportedOperationException("Initiation handled by createIncomingStream " + 072 "method"); 073 } 074 075 public InputStream createIncomingStream(StreamInitiation initiation) throws SmackException { 076 PacketCollector collector = connection.createPacketCollector( 077 getInitiationPacketFilter(initiation.getFrom(), initiation.getSessionID())); 078 079 connection.sendPacket(super.createInitiationAccept(initiation, getNamespaces())); 080 081 ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(2); 082 CompletionService<InputStream> service 083 = new ExecutorCompletionService<InputStream>(threadPoolExecutor); 084 List<Future<InputStream>> futures = new ArrayList<Future<InputStream>>(); 085 InputStream stream = null; 086 SmackException exception = null; 087 try { 088 futures.add(service.submit(new NegotiatorService(collector))); 089 futures.add(service.submit(new NegotiatorService(collector))); 090 091 int i = 0; 092 while (stream == null && i < futures.size()) { 093 Future<InputStream> future; 094 try { 095 i++; 096 future = service.poll(10, TimeUnit.SECONDS); 097 } 098 catch (InterruptedException e) { 099 continue; 100 } 101 102 if (future == null) { 103 continue; 104 } 105 106 try { 107 stream = future.get(); 108 } 109 catch (InterruptedException e) { 110 /* Do Nothing */ 111 } 112 catch (ExecutionException e) { 113 exception = new SmackException(e.getCause()); 114 } 115 } 116 } 117 finally { 118 for (Future<InputStream> future : futures) { 119 future.cancel(true); 120 } 121 collector.cancel(); 122 threadPoolExecutor.shutdownNow(); 123 } 124 if (stream == null) { 125 if (exception != null) { 126 throw exception; 127 } 128 else { 129 throw new SmackException("File transfer negotiation failed."); 130 } 131 } 132 133 return stream; 134 } 135 136 private StreamNegotiator determineNegotiator(Packet streamInitiation) { 137 return primaryFilter.accept(streamInitiation) ? primaryNegotiator : secondaryNegotiator; 138 } 139 140 public OutputStream createOutgoingStream(String streamID, String initiator, String target) 141 throws SmackException, XMPPException { 142 OutputStream stream; 143 try { 144 stream = primaryNegotiator.createOutgoingStream(streamID, initiator, target); 145 } 146 catch (Exception ex) { 147 stream = secondaryNegotiator.createOutgoingStream(streamID, initiator, target); 148 } 149 150 return stream; 151 } 152 153 public String[] getNamespaces() { 154 String[] primary = primaryNegotiator.getNamespaces(); 155 String[] secondary = secondaryNegotiator.getNamespaces(); 156 157 String[] namespaces = new String[primary.length + secondary.length]; 158 System.arraycopy(primary, 0, namespaces, 0, primary.length); 159 System.arraycopy(secondary, 0, namespaces, primary.length, secondary.length); 160 161 return namespaces; 162 } 163 164 public void cleanup() { 165 } 166 167 private class NegotiatorService implements Callable<InputStream> { 168 169 private PacketCollector collector; 170 171 NegotiatorService(PacketCollector collector) { 172 this.collector = collector; 173 } 174 175 public InputStream call() throws XMPPErrorException, InterruptedException, SmackException { 176 Packet streamInitiation = collector.nextResult(); 177 if (streamInitiation == null) { 178 throw new NoResponseException(); 179 } 180 StreamNegotiator negotiator = determineNegotiator(streamInitiation); 181 return negotiator.negotiateIncomingStream(streamInitiation); 182 } 183 } 184}