Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,10 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
"Authorization error connecting to SSE stream",
responseEvent.responseInfo()));
}
else if (statusCode == METHOD_NOT_ALLOWED) {
logger.debug("The server does not support SSE streams, using request-response mode.");
return Flux.empty();
}

if (!(responseEvent instanceof ResponseSubscribers.SseResponseEvent sseResponseEvent)) {
return Flux.<McpSchema.JSONRPCMessage>error(new McpTransportException(
Expand Down Expand Up @@ -344,10 +348,6 @@ else if (statusCode >= 200 && statusCode < 300) {
return Flux.empty();
}
}
else if (statusCode == METHOD_NOT_ALLOWED) { // NotAllowed
logger.debug("The server does not support SSE streams, using request-response mode.");
return Flux.empty();
}
else if (statusCode == NOT_FOUND) {

if (transportSession != null && transportSession.sessionId().isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.sun.net.httpserver.HttpServer;
import io.modelcontextprotocol.client.transport.customizer.McpHttpClientAuthorizationErrorHandler;
import io.modelcontextprotocol.common.McpTransportContext;
import org.reactivestreams.Publisher;
import io.modelcontextprotocol.server.transport.TomcatTestUtil;
import io.modelcontextprotocol.spec.HttpHeaders;
import io.modelcontextprotocol.spec.McpClientTransport;
Expand All @@ -34,6 +33,7 @@
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

Expand Down Expand Up @@ -369,6 +369,26 @@ else if (status == 404) {
StepVerifier.create(transport.closeGracefully()).verifyComplete();
}

@Test
void test405OnConnectReturnsEmptyFlux() {
serverSseResponseStatus.set(405);
AtomicReference<Throwable> capturedException = new AtomicReference<>();
var transport = HttpClientStreamableHttpTransport.builder(HOST).openConnectionOnStartup(true).build();
transport.setExceptionHandler(capturedException::set);

var messages = new ArrayList<McpSchema.JSONRPCMessage>();
StepVerifier.create(transport.connect(msg -> msg.doOnNext(messages::add))).verifyComplete();

Awaitility.await()
.atMost(Duration.ofSeconds(1))
.untilAsserted(() -> assertThat(processedSseConnectCount.get()).isEqualTo(1));

assertThat(messages).isEmpty();
assertThat(capturedException.get()).isNull();

StepVerifier.create(transport.closeGracefully()).verifyComplete();
}

@Nested
class AuthorizationError {

Expand Down
Loading