mirror of
https://github.com/wiremock/WireMock.Net.git
synced 2026-04-19 15:31:39 +02:00
broadcast
This commit is contained in:
@@ -211,7 +211,7 @@ public static class Program
|
|||||||
var broadcastMessage = $"[{timestamp}] Broadcast: {text}";
|
var broadcastMessage = $"[{timestamp}] Broadcast: {text}";
|
||||||
|
|
||||||
// Broadcast to all connected clients
|
// Broadcast to all connected clients
|
||||||
await context.BroadcastTextAsync(broadcastMessage);
|
await context.BroadcastAsync(broadcastMessage);
|
||||||
|
|
||||||
Console.WriteLine($"Broadcasted to {server.GetWebSocketConnections(broadcastMappingGuid).Count} clients: {text}");
|
Console.WriteLine($"Broadcasted to {server.GetWebSocketConnections(broadcastMappingGuid).Count} clients: {text}");
|
||||||
}
|
}
|
||||||
@@ -428,7 +428,7 @@ public static class Program
|
|||||||
{
|
{
|
||||||
if (message.MessageType == WebSocketMessageType.Text)
|
if (message.MessageType == WebSocketMessageType.Text)
|
||||||
{
|
{
|
||||||
await context.BroadcastTextAsync($"[Broadcast] {message.Text}");
|
await context.BroadcastAsync($"[Broadcast] {message.Text}");
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -35,39 +35,64 @@ public partial class WireMockServer
|
|||||||
public async Task CloseWebSocketConnectionAsync(
|
public async Task CloseWebSocketConnectionAsync(
|
||||||
Guid connectionId,
|
Guid connectionId,
|
||||||
WebSocketCloseStatus closeStatus = WebSocketCloseStatus.NormalClosure,
|
WebSocketCloseStatus closeStatus = WebSocketCloseStatus.NormalClosure,
|
||||||
string statusDescription = "Closed by server")
|
string statusDescription = "Closed by server",
|
||||||
|
CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
foreach (var registry in _options.WebSocketRegistries.Values)
|
foreach (var registry in _options.WebSocketRegistries.Values)
|
||||||
{
|
{
|
||||||
if (registry.TryGetConnection(connectionId, out var connection))
|
if (registry.TryGetConnection(connectionId, out var connection) && !cancellationToken.IsCancellationRequested)
|
||||||
{
|
{
|
||||||
await connection.CloseAsync(closeStatus, statusDescription);
|
await connection.CloseAsync(closeStatus, statusDescription, cancellationToken);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Broadcast a message to all WebSocket connections in a specific mapping
|
/// Broadcast a text message to all WebSocket connections in a specific mapping
|
||||||
/// </summary>
|
/// </summary>
|
||||||
[PublicAPI]
|
[PublicAPI]
|
||||||
public async Task BroadcastToWebSocketsAsync(Guid mappingGuid, string text)
|
public async Task BroadcastToWebSocketsAsync(Guid mappingGuid, string text, Guid? excludeConnectionId = null, CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
if (_options.WebSocketRegistries.TryGetValue(mappingGuid, out var registry))
|
if (_options.WebSocketRegistries.TryGetValue(mappingGuid, out var registry))
|
||||||
{
|
{
|
||||||
await registry.BroadcastTextAsync(text);
|
await registry.BroadcastAsync(text, excludeConnectionId, cancellationToken);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Broadcast a message to all WebSocket connections
|
/// Broadcast a text message to all WebSocket connections
|
||||||
/// </summary>
|
/// </summary>
|
||||||
[PublicAPI]
|
[PublicAPI]
|
||||||
public async Task BroadcastToAllWebSocketsAsync(string text)
|
public async Task BroadcastToAllWebSocketsAsync(string text, Guid? excludeConnectionId = null, CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
foreach (var registry in _options.WebSocketRegistries.Values)
|
foreach (var registry in _options.WebSocketRegistries.Values)
|
||||||
{
|
{
|
||||||
await registry.BroadcastTextAsync(text);
|
await registry.BroadcastAsync(text, excludeConnectionId, cancellationToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Broadcast a binary message to all WebSocket connections in a specific mapping
|
||||||
|
/// </summary>
|
||||||
|
[PublicAPI]
|
||||||
|
public async Task BroadcastToWebSocketsAsync(Guid mappingGuid, byte[] bytes, Guid? excludeConnectionId = null, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
if (_options.WebSocketRegistries.TryGetValue(mappingGuid, out var registry))
|
||||||
|
{
|
||||||
|
await registry.BroadcastAsync(bytes, excludeConnectionId, cancellationToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Broadcast a binary message to all WebSocket connections
|
||||||
|
/// </summary>
|
||||||
|
[PublicAPI]
|
||||||
|
public async Task BroadcastToAllWebSocketsAsync(byte[] bytes, Guid? excludeConnectionId = null, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
foreach (var registry in _options.WebSocketRegistries.Values)
|
||||||
|
{
|
||||||
|
await registry.BroadcastAsync(bytes, excludeConnectionId, cancellationToken);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -3,6 +3,7 @@
|
|||||||
using System.Collections.Concurrent;
|
using System.Collections.Concurrent;
|
||||||
using System.Diagnostics.CodeAnalysis;
|
using System.Diagnostics.CodeAnalysis;
|
||||||
using System.Net.WebSockets;
|
using System.Net.WebSockets;
|
||||||
|
using static System.Net.Mime.MediaTypeNames;
|
||||||
|
|
||||||
namespace WireMock.WebSockets;
|
namespace WireMock.WebSockets;
|
||||||
|
|
||||||
@@ -48,12 +49,24 @@ internal class WebSocketConnectionRegistry
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// Broadcast text to all connections
|
/// Broadcast text to all connections
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public async Task BroadcastTextAsync(string text, CancellationToken cancellationToken = default)
|
public async Task BroadcastAsync(string text, Guid? excludeConnectionId, CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
var tasks = _connections.Values
|
var tasks = Filter(excludeConnectionId).Select(c => c.SendAsync(text, cancellationToken));
|
||||||
.Where(c => c.WebSocket.State == WebSocketState.Open)
|
|
||||||
.Select(c => c.SendAsync(text, cancellationToken));
|
|
||||||
|
|
||||||
await Task.WhenAll(tasks);
|
await Task.WhenAll(tasks);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Broadcast binary to all connections
|
||||||
|
/// </summary>
|
||||||
|
public async Task BroadcastAsync(byte[] bytes, Guid? excludeConnectionId, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
var tasks = Filter(excludeConnectionId).Select(c => c.SendAsync(bytes, cancellationToken));
|
||||||
|
await Task.WhenAll(tasks);
|
||||||
|
}
|
||||||
|
|
||||||
|
private IEnumerable<WireMockWebSocketContext> Filter(Guid? excludeConnectionId)
|
||||||
|
{
|
||||||
|
return _connections.Values
|
||||||
|
.Where(c =>c.WebSocket.State == WebSocketState.Open && (!excludeConnectionId.HasValue || c.ConnectionId != excludeConnectionId));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -94,19 +94,30 @@ public class WireMockWebSocketContext : IWebSocketContext
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public async Task CloseAsync(WebSocketCloseStatus closeStatus, string statusDescription)
|
public async Task CloseAsync(WebSocketCloseStatus closeStatus, string statusDescription, CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
await WebSocket.CloseAsync(closeStatus, statusDescription, CancellationToken.None);
|
await WebSocket.CloseAsync(closeStatus, statusDescription, cancellationToken);
|
||||||
|
|
||||||
LogWebSocketMessage(WebSocketMessageDirection.Send, WebSocketMessageType.Close, $"CloseStatus: {closeStatus}, Description: {statusDescription}", null);
|
LogWebSocketMessage(WebSocketMessageDirection.Send, WebSocketMessageType.Close, $"CloseStatus: {closeStatus}, Description: {statusDescription}", null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public async Task BroadcastTextAsync(string text, CancellationToken cancellationToken = default)
|
public async Task BroadcastAsync(string text, bool excludeSender = false, CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
if (Registry != null)
|
if (Registry != null)
|
||||||
{
|
{
|
||||||
await Registry.BroadcastTextAsync(text, cancellationToken);
|
Guid? excludeConnectionId = excludeSender ? ConnectionId : null;
|
||||||
|
await Registry.BroadcastAsync(text, excludeConnectionId, cancellationToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public async Task BroadcastAsync(byte[] bytes, bool excludeSender = false, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
if (Registry != null)
|
||||||
|
{
|
||||||
|
Guid? excludeConnectionId = excludeSender ? ConnectionId : null;
|
||||||
|
await Registry.BroadcastAsync(bytes, excludeConnectionId, cancellationToken);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -48,10 +48,15 @@ public interface IWebSocketContext
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// Close the WebSocket connection
|
/// Close the WebSocket connection
|
||||||
/// </summary>
|
/// </summary>
|
||||||
Task CloseAsync(WebSocketCloseStatus closeStatus, string statusDescription);
|
Task CloseAsync(WebSocketCloseStatus closeStatus, string statusDescription, CancellationToken cancellationToken = default);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Broadcast text message to all connections in this mapping
|
/// Broadcast text message to all connections in this mapping
|
||||||
/// </summary>
|
/// </summary>
|
||||||
Task BroadcastTextAsync(string text, CancellationToken cancellationToken = default);
|
Task BroadcastAsync(string text, bool excludeSender = false, CancellationToken cancellationToken = default);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Broadcast binary message to all connections in this mapping
|
||||||
|
/// </summary>
|
||||||
|
Task BroadcastAsync(byte[] bytes, bool excludeSender = false, CancellationToken cancellationToken = default);
|
||||||
}
|
}
|
||||||
@@ -780,4 +780,458 @@ public class WebSocketIntegrationTests(ITestOutputHelper output, ITestContextAcc
|
|||||||
|
|
||||||
await Task.Delay(100, _ct);
|
await Task.Delay(100, _ct);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Broadcast_Should_Send_TextMessage_To_Multiple_Connected_Clients()
|
||||||
|
{
|
||||||
|
// Arrange
|
||||||
|
using var server = WireMockServer.Start(new WireMockServerSettings
|
||||||
|
{
|
||||||
|
Logger = new TestOutputHelperWireMockLogger(output),
|
||||||
|
Urls = ["ws://localhost:0"]
|
||||||
|
});
|
||||||
|
|
||||||
|
var broadcastMessage = "Broadcast to all clients";
|
||||||
|
|
||||||
|
server
|
||||||
|
.Given(Request.Create()
|
||||||
|
.WithPath("/ws/broadcast")
|
||||||
|
.WithWebSocketUpgrade()
|
||||||
|
)
|
||||||
|
.RespondWith(Response.Create()
|
||||||
|
.WithWebSocket(ws => ws
|
||||||
|
.WithCloseTimeout(TimeSpan.FromSeconds(10))
|
||||||
|
.WithBroadcast()
|
||||||
|
.WithMessageHandler(async (message, context) =>
|
||||||
|
{
|
||||||
|
if (message.MessageType == WebSocketMessageType.Text)
|
||||||
|
{
|
||||||
|
var text = message.Text ?? string.Empty;
|
||||||
|
|
||||||
|
if (text == "register")
|
||||||
|
{
|
||||||
|
await context.SendAsync($"Registered: {context.ConnectionId}");
|
||||||
|
}
|
||||||
|
else if (text.StartsWith("broadcast:"))
|
||||||
|
{
|
||||||
|
var broadcastText = text.Substring(10);
|
||||||
|
await context.BroadcastAsync(broadcastText);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
using var client1 = new ClientWebSocket();
|
||||||
|
using var client2 = new ClientWebSocket();
|
||||||
|
using var client3 = new ClientWebSocket();
|
||||||
|
|
||||||
|
var uri = new Uri($"{server.Url}/ws/broadcast");
|
||||||
|
|
||||||
|
// Act
|
||||||
|
await client1.ConnectAsync(uri, _ct);
|
||||||
|
await client2.ConnectAsync(uri, _ct);
|
||||||
|
await client3.ConnectAsync(uri, _ct);
|
||||||
|
|
||||||
|
await client1.SendAsync("register", cancellationToken: _ct);
|
||||||
|
await client2.SendAsync("register", cancellationToken: _ct);
|
||||||
|
await client3.SendAsync("register", cancellationToken: _ct);
|
||||||
|
|
||||||
|
// Receive registration confirmations
|
||||||
|
var reg1 = await client1.ReceiveAsTextAsync(cancellationToken: _ct);
|
||||||
|
var reg2 = await client2.ReceiveAsTextAsync(cancellationToken: _ct);
|
||||||
|
var reg3 = await client3.ReceiveAsTextAsync(cancellationToken: _ct);
|
||||||
|
|
||||||
|
reg1.Should().StartWith("Registered: ");
|
||||||
|
reg2.Should().StartWith("Registered: ");
|
||||||
|
reg3.Should().StartWith("Registered: ");
|
||||||
|
|
||||||
|
// Send broadcast from client1
|
||||||
|
await client1.SendAsync($"broadcast:{broadcastMessage}", cancellationToken: _ct);
|
||||||
|
|
||||||
|
// Assert - all clients should receive the broadcast
|
||||||
|
var received1 = await client1.ReceiveAsTextAsync(cancellationToken: _ct);
|
||||||
|
var received2 = await client2.ReceiveAsTextAsync(cancellationToken: _ct);
|
||||||
|
var received3 = await client3.ReceiveAsTextAsync(cancellationToken: _ct);
|
||||||
|
|
||||||
|
received1.Should().Be(broadcastMessage);
|
||||||
|
received2.Should().Be(broadcastMessage);
|
||||||
|
received3.Should().Be(broadcastMessage);
|
||||||
|
|
||||||
|
await client1.CloseAsync(WebSocketCloseStatus.NormalClosure, "Test complete", _ct);
|
||||||
|
await client2.CloseAsync(WebSocketCloseStatus.NormalClosure, "Test complete", _ct);
|
||||||
|
await client3.CloseAsync(WebSocketCloseStatus.NormalClosure, "Test complete", _ct);
|
||||||
|
|
||||||
|
await Task.Delay(300, _ct);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Broadcast_Should_Send_BinaryMessage_To_Multiple_Connected_Clients()
|
||||||
|
{
|
||||||
|
// Arrange
|
||||||
|
using var server = WireMockServer.Start(new WireMockServerSettings
|
||||||
|
{
|
||||||
|
Logger = new TestOutputHelperWireMockLogger(output),
|
||||||
|
Urls = ["ws://localhost:0"]
|
||||||
|
});
|
||||||
|
|
||||||
|
var message = new byte[] { 0x00, 0x01, 0x02, 0x03 };
|
||||||
|
var broadcastMessageFromWireMock = new byte[] { 0x01, 0x02, 0x03, 0x04 };
|
||||||
|
|
||||||
|
server
|
||||||
|
.Given(Request.Create()
|
||||||
|
.WithPath("/ws/broadcast")
|
||||||
|
.WithWebSocketUpgrade()
|
||||||
|
)
|
||||||
|
.RespondWith(Response.Create()
|
||||||
|
.WithWebSocket(ws => ws
|
||||||
|
.WithCloseTimeout(TimeSpan.FromSeconds(10))
|
||||||
|
.WithBroadcast()
|
||||||
|
.WithMessageHandler(async (message, context) =>
|
||||||
|
{
|
||||||
|
if (message.MessageType == WebSocketMessageType.Text && message.Text == "register")
|
||||||
|
{
|
||||||
|
await context.SendAsync($"Registered: {context.ConnectionId}");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (message.MessageType == WebSocketMessageType.Binary)
|
||||||
|
{
|
||||||
|
await context.BroadcastAsync(broadcastMessageFromWireMock);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
using var client1 = new ClientWebSocket();
|
||||||
|
using var client2 = new ClientWebSocket();
|
||||||
|
using var client3 = new ClientWebSocket();
|
||||||
|
|
||||||
|
var uri = new Uri($"{server.Url}/ws/broadcast");
|
||||||
|
|
||||||
|
// Act
|
||||||
|
await client1.ConnectAsync(uri, _ct);
|
||||||
|
await client2.ConnectAsync(uri, _ct);
|
||||||
|
await client3.ConnectAsync(uri, _ct);
|
||||||
|
|
||||||
|
await client1.SendAsync("register", cancellationToken: _ct);
|
||||||
|
await client2.SendAsync("register", cancellationToken: _ct);
|
||||||
|
await client3.SendAsync("register", cancellationToken: _ct);
|
||||||
|
|
||||||
|
// Receive registration confirmations
|
||||||
|
var reg1 = await client1.ReceiveAsTextAsync(cancellationToken: _ct);
|
||||||
|
var reg2 = await client2.ReceiveAsTextAsync(cancellationToken: _ct);
|
||||||
|
var reg3 = await client3.ReceiveAsTextAsync(cancellationToken: _ct);
|
||||||
|
|
||||||
|
reg1.Should().StartWith("Registered: ");
|
||||||
|
reg2.Should().StartWith("Registered: ");
|
||||||
|
reg3.Should().StartWith("Registered: ");
|
||||||
|
|
||||||
|
// Send broadcast from client1
|
||||||
|
await client1.SendAsync(new ArraySegment<byte>(message), WebSocketMessageType.Binary, true, cancellationToken: _ct);
|
||||||
|
|
||||||
|
// Assert - all clients should receive the broadcast
|
||||||
|
var received1 = await client1.ReceiveAsBytesAsync(cancellationToken: _ct);
|
||||||
|
var received2 = await client2.ReceiveAsBytesAsync(cancellationToken: _ct);
|
||||||
|
var received3 = await client3.ReceiveAsBytesAsync(cancellationToken: _ct);
|
||||||
|
|
||||||
|
received1.Should().BeEquivalentTo(broadcastMessageFromWireMock);
|
||||||
|
received2.Should().BeEquivalentTo(broadcastMessageFromWireMock);
|
||||||
|
received3.Should().BeEquivalentTo(broadcastMessageFromWireMock);
|
||||||
|
|
||||||
|
await client1.CloseAsync(WebSocketCloseStatus.NormalClosure, "Test complete", _ct);
|
||||||
|
await client2.CloseAsync(WebSocketCloseStatus.NormalClosure, "Test complete", _ct);
|
||||||
|
await client3.CloseAsync(WebSocketCloseStatus.NormalClosure, "Test complete", _ct);
|
||||||
|
|
||||||
|
await Task.Delay(300, _ct);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Broadcast_Should_Handle_Multiple_Broadcast_Messages()
|
||||||
|
{
|
||||||
|
// Arrange
|
||||||
|
using var server = WireMockServer.Start(new WireMockServerSettings
|
||||||
|
{
|
||||||
|
Logger = new TestOutputHelperWireMockLogger(output),
|
||||||
|
Urls = ["ws://localhost:0"]
|
||||||
|
});
|
||||||
|
|
||||||
|
server
|
||||||
|
.Given(Request.Create()
|
||||||
|
.WithPath("/ws/broadcast-multi")
|
||||||
|
.WithWebSocketUpgrade()
|
||||||
|
)
|
||||||
|
.RespondWith(Response.Create()
|
||||||
|
.WithWebSocket(ws => ws
|
||||||
|
.WithBroadcast()
|
||||||
|
.WithCloseTimeout(TimeSpan.FromSeconds(10))
|
||||||
|
.WithMessageHandler(async (message, context) =>
|
||||||
|
{
|
||||||
|
if (message.MessageType == WebSocketMessageType.Text)
|
||||||
|
{
|
||||||
|
var text = message.Text ?? string.Empty;
|
||||||
|
await context.BroadcastAsync(text);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
using var client1 = new ClientWebSocket();
|
||||||
|
using var client2 = new ClientWebSocket();
|
||||||
|
|
||||||
|
var uri = new Uri($"{server.Url}/ws/broadcast-multi");
|
||||||
|
|
||||||
|
await client1.ConnectAsync(uri, _ct);
|
||||||
|
await client2.ConnectAsync(uri, _ct);
|
||||||
|
|
||||||
|
var messages = new[] { "Message 1", "Message 2", "Message 3" };
|
||||||
|
|
||||||
|
// Act & Assert
|
||||||
|
foreach (var message in messages)
|
||||||
|
{
|
||||||
|
await client1.SendAsync(message, cancellationToken: _ct);
|
||||||
|
|
||||||
|
var received1 = await client1.ReceiveAsTextAsync(cancellationToken: _ct);
|
||||||
|
var received2 = await client2.ReceiveAsTextAsync(cancellationToken: _ct);
|
||||||
|
|
||||||
|
received1.Should().Be(message);
|
||||||
|
received2.Should().Be(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
await client1.CloseAsync(WebSocketCloseStatus.NormalClosure, "Test complete", _ct);
|
||||||
|
await client2.CloseAsync(WebSocketCloseStatus.NormalClosure, "Test complete", _ct);
|
||||||
|
|
||||||
|
await Task.Delay(300, _ct);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Broadcast_Should_Exclude_Sender_When_ExcludeSender_Is_True()
|
||||||
|
{
|
||||||
|
// Arrange
|
||||||
|
using var server = WireMockServer.Start(new WireMockServerSettings
|
||||||
|
{
|
||||||
|
Logger = new TestOutputHelperWireMockLogger(output),
|
||||||
|
Urls = ["ws://localhost:0"]
|
||||||
|
});
|
||||||
|
|
||||||
|
server
|
||||||
|
.Given(Request.Create()
|
||||||
|
.WithPath("/ws/broadcast-exclude")
|
||||||
|
.WithWebSocketUpgrade()
|
||||||
|
)
|
||||||
|
.RespondWith(Response.Create()
|
||||||
|
.WithWebSocket(ws => ws
|
||||||
|
.WithBroadcast()
|
||||||
|
.WithCloseTimeout(TimeSpan.FromSeconds(10))
|
||||||
|
.WithMessageHandler(async (message, context) =>
|
||||||
|
{
|
||||||
|
if (message.MessageType == WebSocketMessageType.Text)
|
||||||
|
{
|
||||||
|
var text = message.Text ?? string.Empty;
|
||||||
|
|
||||||
|
if (text.StartsWith("send:"))
|
||||||
|
{
|
||||||
|
var broadcastText = text.Substring(5);
|
||||||
|
await context.BroadcastAsync(broadcastText, excludeSender: true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
using var client1 = new ClientWebSocket();
|
||||||
|
using var client2 = new ClientWebSocket();
|
||||||
|
|
||||||
|
var uri = new Uri($"{server.Url}/ws/broadcast-exclude");
|
||||||
|
|
||||||
|
await client1.ConnectAsync(uri, _ct);
|
||||||
|
await client2.ConnectAsync(uri, _ct);
|
||||||
|
|
||||||
|
var broadcastMessage = "Exclusive broadcast";
|
||||||
|
|
||||||
|
// Act
|
||||||
|
await client1.SendAsync($"send:{broadcastMessage}", cancellationToken: _ct);
|
||||||
|
|
||||||
|
// Assert - only client2 should receive the message
|
||||||
|
var received2 = await client2.ReceiveAsTextAsync(cancellationToken: _ct);
|
||||||
|
received2.Should().Be(broadcastMessage);
|
||||||
|
|
||||||
|
// client1 should not receive anything (or should timeout)
|
||||||
|
var receiveTask1 = client1.ReceiveAsTextAsync(cancellationToken: _ct);
|
||||||
|
var delayTask = Task.Delay(500, _ct);
|
||||||
|
|
||||||
|
var completedTask = await Task.WhenAny(receiveTask1, delayTask);
|
||||||
|
completedTask.Should().Be(delayTask, "client1 should not receive the exclusive broadcast");
|
||||||
|
|
||||||
|
await client1.CloseAsync(WebSocketCloseStatus.NormalClosure, "Test complete", _ct);
|
||||||
|
await client2.CloseAsync(WebSocketCloseStatus.NormalClosure, "Test complete", _ct);
|
||||||
|
|
||||||
|
await Task.Delay(200, _ct);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Broadcast_Should_Work_With_Single_Client()
|
||||||
|
{
|
||||||
|
// Arrange
|
||||||
|
using var server = WireMockServer.Start(new WireMockServerSettings
|
||||||
|
{
|
||||||
|
Logger = new TestOutputHelperWireMockLogger(output),
|
||||||
|
Urls = ["ws://localhost:0"]
|
||||||
|
});
|
||||||
|
|
||||||
|
var broadcastMessage = "Single client broadcast";
|
||||||
|
|
||||||
|
server
|
||||||
|
.Given(Request.Create()
|
||||||
|
.WithPath("/ws/broadcast-single")
|
||||||
|
.WithWebSocketUpgrade()
|
||||||
|
)
|
||||||
|
.RespondWith(Response.Create()
|
||||||
|
.WithWebSocket(ws => ws
|
||||||
|
.WithCloseTimeout(TimeSpan.FromSeconds(10))
|
||||||
|
.WithBroadcast()
|
||||||
|
.WithMessageHandler(async (message, context) =>
|
||||||
|
{
|
||||||
|
if (message.MessageType == WebSocketMessageType.Text)
|
||||||
|
{
|
||||||
|
var text = message.Text ?? string.Empty;
|
||||||
|
await context.BroadcastAsync(text);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
using var client = new ClientWebSocket();
|
||||||
|
var uri = new Uri($"{server.Url}/ws/broadcast-single");
|
||||||
|
|
||||||
|
// Act
|
||||||
|
await client.ConnectAsync(uri, _ct);
|
||||||
|
await client.SendAsync(broadcastMessage, cancellationToken: _ct);
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
var received = await client.ReceiveAsTextAsync(cancellationToken: _ct);
|
||||||
|
received.Should().Be(broadcastMessage);
|
||||||
|
|
||||||
|
await client.CloseAsync(WebSocketCloseStatus.NormalClosure, "Test complete", _ct);
|
||||||
|
|
||||||
|
await Task.Delay(100, _ct);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Broadcast_Should_Handle_Client_Disconnect_During_Broadcast()
|
||||||
|
{
|
||||||
|
// Arrange
|
||||||
|
using var server = WireMockServer.Start(new WireMockServerSettings
|
||||||
|
{
|
||||||
|
Logger = new TestOutputHelperWireMockLogger(output),
|
||||||
|
Urls = ["ws://localhost:0"]
|
||||||
|
});
|
||||||
|
|
||||||
|
var broadcastMessage = "Message after disconnect";
|
||||||
|
|
||||||
|
server
|
||||||
|
.Given(Request.Create()
|
||||||
|
.WithPath("/ws/broadcast-disconnect")
|
||||||
|
.WithWebSocketUpgrade()
|
||||||
|
)
|
||||||
|
.RespondWith(Response.Create()
|
||||||
|
.WithWebSocket(ws => ws
|
||||||
|
.WithCloseTimeout(TimeSpan.FromSeconds(10))
|
||||||
|
.WithBroadcast()
|
||||||
|
.WithMessageHandler(async (message, context) =>
|
||||||
|
{
|
||||||
|
if (message.MessageType == WebSocketMessageType.Text)
|
||||||
|
{
|
||||||
|
var text = message.Text ?? string.Empty;
|
||||||
|
await context.BroadcastAsync(text);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
using var client1 = new ClientWebSocket();
|
||||||
|
using var client2 = new ClientWebSocket();
|
||||||
|
|
||||||
|
var uri = new Uri($"{server.Url}/ws/broadcast-disconnect");
|
||||||
|
|
||||||
|
await client1.ConnectAsync(uri, _ct);
|
||||||
|
await client2.ConnectAsync(uri, _ct);
|
||||||
|
|
||||||
|
// Act - disconnect client1
|
||||||
|
await client1.CloseAsync(WebSocketCloseStatus.NormalClosure, "Disconnecting", _ct);
|
||||||
|
|
||||||
|
// Send broadcast from client2 - should handle disconnected client gracefully
|
||||||
|
await client2.SendAsync(broadcastMessage, cancellationToken: _ct);
|
||||||
|
|
||||||
|
// Assert - client2 should still receive the broadcast
|
||||||
|
var received2 = await client2.ReceiveAsTextAsync(cancellationToken: _ct);
|
||||||
|
received2.Should().Be(broadcastMessage);
|
||||||
|
|
||||||
|
await client2.CloseAsync(WebSocketCloseStatus.NormalClosure, "Test complete", _ct);
|
||||||
|
|
||||||
|
await Task.Delay(200, _ct);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Broadcast_Should_Support_Targeted_Broadcasting_Based_On_Condition()
|
||||||
|
{
|
||||||
|
// Arrange
|
||||||
|
using var server = WireMockServer.Start(new WireMockServerSettings
|
||||||
|
{
|
||||||
|
Logger = new TestOutputHelperWireMockLogger(output),
|
||||||
|
Urls = ["ws://localhost:0"]
|
||||||
|
});
|
||||||
|
|
||||||
|
server
|
||||||
|
.Given(Request.Create()
|
||||||
|
.WithPath("/ws/broadcast-conditional")
|
||||||
|
.WithWebSocketUpgrade()
|
||||||
|
)
|
||||||
|
.RespondWith(Response.Create()
|
||||||
|
.WithWebSocket(ws => ws
|
||||||
|
.WithCloseTimeout(TimeSpan.FromSeconds(10))
|
||||||
|
.WithBroadcast()
|
||||||
|
.WithMessageHandler(async (message, context) =>
|
||||||
|
{
|
||||||
|
if (message.MessageType == WebSocketMessageType.Text)
|
||||||
|
{
|
||||||
|
var text = message.Text ?? string.Empty;
|
||||||
|
|
||||||
|
if (text.StartsWith("to-admins:"))
|
||||||
|
{
|
||||||
|
var adminMessage = text.Substring(10);
|
||||||
|
await context.SendAsync($"Admin broadcast: {adminMessage}");
|
||||||
|
}
|
||||||
|
else if (text.StartsWith("to-all:"))
|
||||||
|
{
|
||||||
|
var allMessage = text.Substring(7);
|
||||||
|
await context.BroadcastAsync(allMessage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
using var client1 = new ClientWebSocket();
|
||||||
|
using var client2 = new ClientWebSocket();
|
||||||
|
|
||||||
|
var uri = new Uri($"{server.Url}/ws/broadcast-conditional");
|
||||||
|
|
||||||
|
await client1.ConnectAsync(uri, _ct);
|
||||||
|
await client2.ConnectAsync(uri, _ct);
|
||||||
|
|
||||||
|
// Act
|
||||||
|
await client1.SendAsync("to-all:General message", cancellationToken: _ct);
|
||||||
|
|
||||||
|
// Assert - both clients receive the broadcast
|
||||||
|
var received1 = await client1.ReceiveAsTextAsync(cancellationToken: _ct);
|
||||||
|
var received2 = await client2.ReceiveAsTextAsync(cancellationToken: _ct);
|
||||||
|
|
||||||
|
received1.Should().Be("General message");
|
||||||
|
received2.Should().Be("General message");
|
||||||
|
|
||||||
|
await client1.CloseAsync(WebSocketCloseStatus.NormalClosure, "Test complete", _ct);
|
||||||
|
await client2.CloseAsync(WebSocketCloseStatus.NormalClosure, "Test complete", _ct);
|
||||||
|
|
||||||
|
await Task.Delay(200, _ct);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user