< Summary

Information
Class: IceRpc.Server
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Server.cs
Tag: 1321_24790053727
Line coverage
91%
Covered lines: 502
Uncovered lines: 49
Coverable lines: 551
Total lines: 1010
Line coverage: 91.1%
Branch coverage
89%
Covered branches: 79
Total branches: 88
Branch coverage: 89.7%
Method coverage
98%
Covered methods: 50
Fully covered methods: 41
Total methods: 51
Method coverage: 98%
Full method coverage: 80.3%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)91.66%242493.75%
.ctor(...)100%11100%
.ctor(...)100%11100%
.ctor(...)100%11100%
DisposeAsync()100%66100%
PerformDisposeAsync()100%4484.21%
Listen()75%4484.61%
ListenAsync()90%101098.27%
ConnectAsync()100%121290.56%
DisposeDetachedConnectionAsync()83.33%66100%
ShutdownWhenRequestedAsync()100%22100%
ShutdownAsync(...)75%4485.71%
PerformShutdownAsync()66.66%10653.84%
ToString()100%11100%
IsRetryableAcceptException(...)100%44100%
get_ServerAddress()100%11100%
AcceptAsync()100%1152.17%
DisposeAsync()100%11100%
.ctor(...)100%11100%
ConnectTransportConnectionAsync()100%11100%
CreateProtocolConnection(...)100%11100%
DisposeAsync()100%11100%
RefuseTransportConnectionAsync(...)100%210%
.ctor(...)100%11100%
get_ServerAddress()100%11100%
AcceptAsync()100%11100%
DisposeAsync()100%11100%
.ctor(...)100%11100%
ConnectTransportConnectionAsync()100%11100%
CreateProtocolConnection(...)100%11100%
DisposeAsync()100%11100%
RefuseTransportConnectionAsync()100%11100%
.ctor(...)100%11100%
get_ServerAddress()100%11100%
DisposeAsync()100%11100%
AcceptAsync()100%11100%
.ctor(...)100%11100%
ConnectTransportConnectionAsync(...)100%11100%
CreateProtocolConnection(...)100%11100%
DisposeAsync()100%22100%
RefuseTransportConnectionAsync(...)100%11100%
.ctor(...)100%11100%
get_ServerAddress()100%11100%
AcceptAsync()100%11100%
DisposeAsync()100%11100%
.ctor(...)100%11100%
ConnectTransportConnectionAsync(...)100%11100%
CreateProtocolConnection(...)100%11100%
DisposeAsync()100%22100%
RefuseTransportConnectionAsync(...)50%22100%
.ctor(...)100%11100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Server.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using IceRpc.Transports;
 5using IceRpc.Transports.Internal;
 6using Microsoft.Extensions.Logging;
 7using Microsoft.Extensions.Logging.Abstractions;
 8using System.Diagnostics;
 9using System.Net;
 10using System.Net.Security;
 11using System.Security.Authentication;
 12
 13namespace IceRpc;
 14
 15/// <summary>A server accepts connections from clients and dispatches the requests it receives over these connections.
 16/// </summary>
 17public sealed class Server : IAsyncDisposable
 18{
 9019    private readonly LinkedList<IProtocolConnection> _connections = new();
 20
 21    private readonly TimeSpan _connectTimeout;
 22
 23    // A detached connection is a protocol connection that we've decided to connect, or that is connecting, shutting
 24    // down or being disposed. It counts towards _maxConnections and both Server.ShutdownAsync and DisposeAsync wait for
 25    // detached connections to reach 0 using _detachedConnectionsTcs. Such a connection is "detached" because it's not
 26    // in _connections.
 27    private int _detachedConnectionCount;
 28
 9029    private readonly TaskCompletionSource _detachedConnectionsTcs = new();
 30
 31    // A cancellation token source that is canceled by DisposeAsync.
 9032    private readonly CancellationTokenSource _disposedCts = new();
 33
 34    private Task? _disposeTask;
 35
 36    private readonly Func<IConnectorListener> _listenerFactory;
 37
 38    private Task? _listenTask;
 39
 40    private readonly int _maxConnections;
 41
 42    private readonly int _maxPendingConnections;
 43
 9044    private readonly Lock _mutex = new();
 45
 46    private readonly ServerAddress _serverAddress;
 47
 48    // A cancellation token source canceled by ShutdownAsync and DisposeAsync.
 49    private readonly CancellationTokenSource _shutdownCts;
 50
 51    private Task? _shutdownTask;
 52
 53    private readonly TimeSpan _shutdownTimeout;
 54
 55    /// <summary>Constructs a server.</summary>
 56    /// <param name="options">The server options.</param>
 57    /// <param name="duplexServerTransport">The transport used to create ice protocol connections. The <see
 58    /// langword="null" /> value is equivalent to <see cref="IDuplexServerTransport.Default" />.</param>
 59    /// <param name="multiplexedServerTransport">The transport used to create icerpc protocol connections. The <see
 60    /// langword="null" /> value is equivalent to <see cref="IMultiplexedServerTransport.Default" />.</param>
 61    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance"
 62    /// />.</param>
 9063    public Server(
 9064        ServerOptions options,
 9065        IDuplexServerTransport? duplexServerTransport = null,
 9066        IMultiplexedServerTransport? multiplexedServerTransport = null,
 9067        ILogger? logger = null)
 9068    {
 9069        if (options.ConnectionOptions.Dispatcher is null)
 070        {
 071            throw new ArgumentException($"{nameof(ServerOptions.ConnectionOptions.Dispatcher)} cannot be null");
 72        }
 73
 9074        logger ??= NullLogger.Instance;
 75
 9076        _shutdownCts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
 77
 9078        duplexServerTransport ??= IDuplexServerTransport.Default;
 9079        multiplexedServerTransport ??= IMultiplexedServerTransport.Default;
 9080        _maxConnections = options.MaxConnections;
 9081        _maxPendingConnections = options.MaxPendingConnections;
 82
 9083        _connectTimeout = options.ConnectTimeout;
 9084        _shutdownTimeout = options.ShutdownTimeout;
 85
 9086        _serverAddress = options.ServerAddress;
 9087        if (_serverAddress.Transport is null)
 7988        {
 7989            _serverAddress = _serverAddress with
 7990            {
 7991                Transport = _serverAddress.Protocol == Protocol.Ice ?
 7992                    duplexServerTransport.DefaultName : multiplexedServerTransport.DefaultName
 7993            };
 7994        }
 95
 9096        if (options.ServerAuthenticationOptions?.ApplicationProtocols is not null)
 097        {
 098            throw new ArgumentException(
 099                "The ApplicationProtocols property of the SSL server authentication options must be null. The ALPN is se
 0100                nameof(options));
 101        }
 102
 90103        var transportAddress = new TransportAddress
 90104        {
 90105            Host = _serverAddress.Host,
 90106            Port = _serverAddress.Port,
 90107            TransportName = _serverAddress.Transport,
 90108            Params = _serverAddress.Params
 90109        };
 110
 90111        _listenerFactory = () =>
 89112        {
 90113            IConnectorListener listener;
 90114
 89115            SslServerAuthenticationOptions? serverAuthenticationOptions = options.ServerAuthenticationOptions;
 89116            if (serverAuthenticationOptions is not null)
 5117            {
 5118                serverAuthenticationOptions = serverAuthenticationOptions.ShallowClone();
 5119                serverAuthenticationOptions.ApplicationProtocols = [_serverAddress.Protocol.AlpnProtocol];
 5120            }
 90121
 89122            if (_serverAddress.Protocol == Protocol.Ice)
 25123            {
 25124                IListener<IDuplexConnection> transportListener = duplexServerTransport.Listen(
 25125                    transportAddress,
 25126                    new DuplexConnectionOptions
 25127                    {
 25128                        MinSegmentSize = options.ConnectionOptions.MinSegmentSize,
 25129                        Pool = options.ConnectionOptions.Pool,
 25130                    },
 25131                    serverAuthenticationOptions);
 90132
 25133                listener = new IceConnectorListener(transportListener, _serverAddress, options.ConnectionOptions);
 25134            }
 90135            else
 64136            {
 64137                IListener<IMultiplexedConnection> transportListener = multiplexedServerTransport.Listen(
 64138                    transportAddress,
 64139                    new MultiplexedConnectionOptions
 64140                    {
 64141                        HandshakeTimeout = options.ConnectTimeout,
 64142                        MaxBidirectionalStreams = options.ConnectionOptions.MaxIceRpcBidirectionalStreams,
 64143                        // Add an additional stream for the icerpc protocol control stream.
 64144                        MaxUnidirectionalStreams = options.ConnectionOptions.MaxIceRpcUnidirectionalStreams + 1,
 64145                        MinSegmentSize = options.ConnectionOptions.MinSegmentSize,
 64146                        Pool = options.ConnectionOptions.Pool
 64147                    },
 64148                    serverAuthenticationOptions);
 90149
 64150                listener = new IceRpcConnectorListener(
 64151                    transportListener,
 64152                    _serverAddress,
 64153                    options.ConnectionOptions,
 64154                    logger == NullLogger.Instance ? null : new LogTaskExceptionObserver(logger));
 64155            }
 90156
 89157            listener = new MetricsConnectorListenerDecorator(listener);
 89158            if (logger != NullLogger.Instance)
 10159            {
 10160                listener = new LogConnectorListenerDecorator(listener, logger);
 10161            }
 89162            return listener;
 179163        };
 90164    }
 165
 166    /// <summary>Constructs a server with the specified dispatcher and authentication options. All other properties
 167    /// use the <see cref="ServerOptions" /> defaults.</summary>
 168    /// <param name="dispatcher">The dispatcher of the server.</param>
 169    /// <param name="serverAuthenticationOptions">The SSL server authentication options. When not <see langword="null"
 170    /// />, the server will accept only secure connections.</param>
 171    /// <param name="duplexServerTransport">The transport used to create ice protocol connections. <see langword="null"
 172    /// /> is equivalent to <see cref="IDuplexServerTransport.Default" />.</param>
 173    /// <param name="multiplexedServerTransport">The transport used to create icerpc protocol connections. <see
 174    /// langword="null" /> is equivalent to <see cref="IMultiplexedServerTransport.Default" />.</param>
 175    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance"
 176    /// />.</param>
 177    public Server(
 178        IDispatcher dispatcher,
 179        SslServerAuthenticationOptions? serverAuthenticationOptions = null,
 180        IDuplexServerTransport? duplexServerTransport = null,
 181        IMultiplexedServerTransport? multiplexedServerTransport = null,
 182        ILogger? logger = null)
 1183        : this(
 1184            new ServerOptions
 1185            {
 1186                ServerAuthenticationOptions = serverAuthenticationOptions,
 1187                ConnectionOptions = new()
 1188                {
 1189                    Dispatcher = dispatcher,
 1190                }
 1191            },
 1192            duplexServerTransport,
 1193            multiplexedServerTransport,
 1194            logger)
 1195    {
 1196    }
 197
 198    /// <summary>Constructs a server with the specified dispatcher, server address and authentication options. All
 199    /// other properties use the <see cref="ServerOptions" /> defaults.</summary>
 200    /// <param name="dispatcher">The dispatcher of the server.</param>
 201    /// <param name="serverAddress">The server address of the server.</param>
 202    /// <param name="serverAuthenticationOptions">The SSL server authentication options. When not <see langword="null"
 203    /// />, the server will accept only secure connections.</param>
 204    /// <param name="duplexServerTransport">The transport used to create ice protocol connections. <see langword="null"
 205    /// /> is equivalent to <see cref="IDuplexServerTransport.Default" />.</param>
 206    /// <param name="multiplexedServerTransport">The transport used to create icerpc protocol connections. <see
 207    /// langword="null" /> is equivalent to <see cref="IMultiplexedServerTransport.Default" />.</param>
 208    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance"
 209    /// />.</param>
 210    public Server(
 211        IDispatcher dispatcher,
 212        ServerAddress serverAddress,
 213        SslServerAuthenticationOptions? serverAuthenticationOptions = null,
 214        IDuplexServerTransport? duplexServerTransport = null,
 215        IMultiplexedServerTransport? multiplexedServerTransport = null,
 216        ILogger? logger = null)
 34217        : this(
 34218            new ServerOptions
 34219            {
 34220                ServerAuthenticationOptions = serverAuthenticationOptions,
 34221                ConnectionOptions = new()
 34222                {
 34223                    Dispatcher = dispatcher,
 34224                },
 34225                ServerAddress = serverAddress
 34226            },
 34227            duplexServerTransport,
 34228            multiplexedServerTransport,
 34229            logger)
 34230    {
 34231    }
 232
 233    /// <summary>Constructs a server with the specified dispatcher, server address URI and authentication options. All
 234    /// other properties use the <see cref="ServerOptions" /> defaults.</summary>
 235    /// <param name="dispatcher">The dispatcher of the server.</param>
 236    /// <param name="serverAddressUri">A URI that represents the server address of the server.</param>
 237    /// <param name="serverAuthenticationOptions">The SSL server authentication options. When not <see langword="null"
 238    /// />, the server will accept only secure connections.</param>
 239    /// <param name="duplexServerTransport">The transport used to create ice protocol connections. <see langword="null"
 240    /// /> is equivalent to <see cref="IDuplexServerTransport.Default" />.</param>
 241    /// <param name="multiplexedServerTransport">The transport used to create icerpc protocol connections. <see
 242    /// langword="null" /> is equivalent to <see cref="IMultiplexedServerTransport.Default" />.</param>
 243    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance"
 244    /// />.</param>
 245    public Server(
 246        IDispatcher dispatcher,
 247        Uri serverAddressUri,
 248        SslServerAuthenticationOptions? serverAuthenticationOptions = null,
 249        IDuplexServerTransport? duplexServerTransport = null,
 250        IMultiplexedServerTransport? multiplexedServerTransport = null,
 251        ILogger? logger = null)
 14252        : this(
 14253            dispatcher,
 14254            new ServerAddress(serverAddressUri),
 14255            serverAuthenticationOptions,
 14256            duplexServerTransport,
 14257            multiplexedServerTransport,
 14258            logger)
 14259    {
 14260    }
 261
 262    /// <summary>Releases all resources allocated by this server. The server stops listening for new connections and
 263    /// disposes the connections it accepted from clients.</summary>
 264    /// <returns>A value task that completes when the disposal of all connections accepted by the server has completed.
 265    /// This includes connections that were active when this method is called and connections whose disposal was
 266    /// initiated prior to this call.</returns>
 267    /// <remarks>The disposal of an underlying connection of the server aborts invocations, cancels dispatches and
 268    /// disposes the underlying transport connection without waiting for the peer. To wait for invocations and
 269    /// dispatches to complete, call <see cref="ShutdownAsync" /> first. If the configured dispatcher does not complete
 270    /// promptly when its cancellation token is canceled, the disposal can hang.</remarks>
 271    public ValueTask DisposeAsync()
 91272    {
 273        lock (_mutex)
 91274        {
 91275            if (_disposeTask is null)
 90276            {
 90277                _shutdownTask ??= Task.CompletedTask;
 90278                if (_detachedConnectionCount == 0)
 82279                {
 82280                    _ = _detachedConnectionsTcs.TrySetResult();
 82281                }
 282
 90283                _disposeTask = PerformDisposeAsync();
 90284            }
 91285            return new(_disposeTask);
 286        }
 287
 288        async Task PerformDisposeAsync()
 90289        {
 90290            await Task.Yield(); // exit mutex lock
 291
 90292            _disposedCts.Cancel();
 293
 294            // _listenTask etc are immutable when _disposeTask is not null.
 295
 90296            if (_listenTask is not null)
 89297            {
 298                // Wait for shutdown before disposing connections.
 299                try
 89300                {
 89301                    await Task.WhenAll(_listenTask, _shutdownTask).ConfigureAwait(false);
 89302                }
 0303                catch
 0304                {
 305                    // Ignore exceptions.
 0306                }
 307
 89308                await Task.WhenAll(
 89309                    _connections
 46310                        .Select(connection => connection.DisposeAsync().AsTask())
 89311                        .Append(_detachedConnectionsTcs.Task)).ConfigureAwait(false);
 89312            }
 313
 90314            _disposedCts.Dispose();
 90315            _shutdownCts.Dispose();
 90316        }
 91317    }
 318
 319    /// <summary>Starts accepting connections on the configured server address. Requests received over these connections
 320    /// are then dispatched by the configured dispatcher.</summary>
 321    /// <returns>The server address this server is listening on and that a client would connect to. This address is the
 322    /// same as the <see cref="ServerOptions.ServerAddress" /> of <see cref="ServerOptions" /> except its
 323    /// <see cref="ServerAddress.Transport" /> property is always non-null and its port number is never 0 when the host
 324    /// is an IP address.</returns>
 325    /// <exception cref="IceRpcException">Thrown when the server transport fails to listen on the configured <see
 326    /// cref="ServerOptions.ServerAddress" />.</exception>
 327    /// <exception cref="InvalidOperationException">Thrown when the server is already listening, shut down or shutting
 328    /// down.</exception>
 329    /// <exception cref="ObjectDisposedException">Throw when the server is disposed.</exception>
 330    /// <remarks><see cref="Listen" /> can also throw exceptions from the transport; for example, the transport can
 331    /// reject the server address.</remarks>
 332    public ServerAddress Listen()
 91333    {
 334        lock (_mutex)
 91335        {
 91336            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 337
 90338            if (_shutdownTask is not null)
 0339            {
 0340                throw new InvalidOperationException($"Server '{this}' is shut down or shutting down.");
 341            }
 90342            if (_listenTask is not null)
 1343            {
 1344                throw new InvalidOperationException($"Server '{this}' is already listening.");
 345            }
 346
 89347            IConnectorListener listener = _listenerFactory();
 89348            _listenTask = ListenAsync(listener); // _listenTask owns listener and must dispose it
 89349            return listener.ServerAddress;
 350        }
 351
 352        async Task ListenAsync(IConnectorListener listener)
 89353        {
 89354            await Task.Yield(); // exit mutex lock
 355
 356            try
 89357            {
 89358                using var pendingConnectionSemaphore = new SemaphoreSlim(
 89359                    _maxPendingConnections,
 89360                    _maxPendingConnections);
 361
 176362                while (!_shutdownCts.IsCancellationRequested)
 176363                {
 176364                    await pendingConnectionSemaphore.WaitAsync(_shutdownCts.Token).ConfigureAwait(false);
 365
 175366                    IConnector? connector = null;
 367                    do
 345368                    {
 369                        try
 345370                        {
 345371                            (connector, _) = await listener.AcceptAsync(_shutdownCts.Token).ConfigureAwait(false);
 87372                        }
 258373                        catch (Exception exception) when (IsRetryableAcceptException(exception))
 170374                        {
 375                            // continue
 170376                        }
 257377                    }
 257378                    while (connector is null);
 379
 380                    // We don't wait for the connection to be activated or shutdown. This could take a while for some
 381                    // transports such as TLS based transports where the handshake requires few round trips between the
 382                    // client and server. Waiting could also cause a security issue if the client doesn't respond to the
 383                    // connection initialization as we wouldn't be able to accept new connections in the meantime. The
 384                    // call will eventually timeout if the ConnectTimeout expires.
 87385                    CancellationToken cancellationToken = _disposedCts.Token;
 87386                    _ = Task.Run(
 87387                        async () =>
 87388                        {
 87389                            try
 87390                            {
 87391                                await ConnectAsync(connector, cancellationToken).ConfigureAwait(false);
 82392                            }
 5393                            catch
 5394                            {
 87395                                // Ignore connection establishment failure. This failures are logged by the
 87396                                // LogConnectorDecorator
 5397                            }
 87398                            finally
 87399                            {
 87400                                // The connection dispose will dispose the transport connection if it has not been
 87401                                // adopted by the protocol connection.
 87402                                await connector.DisposeAsync().ConfigureAwait(false);
 87403
 87404                                // The pending connection semaphore is disposed by the listen task completion once
 87405                                // shutdown / dispose is initiated.
 87406                                lock (_mutex)
 87407                                {
 87408                                    if (_shutdownTask is null)
 83409                                    {
 83410                                        pendingConnectionSemaphore.Release();
 83411                                    }
 87412                                }
 87413                            }
 87414                        },
 87415                        CancellationToken.None); // the task must run to dispose the connector.
 87416                }
 0417            }
 89418            catch
 89419            {
 420                // Ignore. Exceptions thrown by listener.AcceptAsync are logged by the log decorator when appropriate.
 89421            }
 422            finally
 89423            {
 89424                await listener.DisposeAsync().ConfigureAwait(false);
 89425            }
 426
 427            async Task ConnectAsync(IConnector connector, CancellationToken cancellationToken)
 87428            {
 87429                using var connectCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
 87430                connectCts.CancelAfter(_connectTimeout);
 431
 432                // Connect the transport connection first. This connection establishment can be interrupted by the
 433                // connect timeout or the server ShutdownAsync/DisposeAsync.
 87434                TransportConnectionInformation transportConnectionInformation =
 87435                    await connector.ConnectTransportConnectionAsync(connectCts.Token).ConfigureAwait(false);
 436
 82437                IProtocolConnection? protocolConnection = null;
 82438                bool serverBusy = false;
 439
 440                lock (_mutex)
 82441                {
 82442                    Debug.Assert(
 82443                        _maxConnections == 0 || _connections.Count + _detachedConnectionCount <= _maxConnections);
 444
 82445                    if (_shutdownTask is null)
 82446                    {
 82447                        if (_maxConnections > 0 && (_connections.Count + _detachedConnectionCount) == _maxConnections)
 7448                        {
 7449                            serverBusy = true;
 7450                        }
 451                        else
 75452                        {
 453                            // The protocol connection adopts the transport connection from the connector and it's
 454                            // now responsible for disposing of it.
 75455                            protocolConnection = connector.CreateProtocolConnection(transportConnectionInformation);
 75456                            _detachedConnectionCount++;
 75457                        }
 82458                    }
 82459                }
 460
 82461                if (protocolConnection is null)
 7462                {
 463                    try
 7464                    {
 7465                        await connector.RefuseTransportConnectionAsync(serverBusy, connectCts.Token)
 7466                            .ConfigureAwait(false);
 5467                    }
 2468                    catch
 2469                    {
 470                        // ignore and continue
 2471                    }
 472                    // The transport connection is disposed by the disposal of the connector.
 7473                }
 474                else
 75475                {
 476                    Task shutdownRequested;
 477                    try
 75478                    {
 75479                        (_, shutdownRequested) = await protocolConnection.ConnectAsync(connectCts.Token)
 75480                            .ConfigureAwait(false);
 75481                    }
 0482                    catch
 0483                    {
 0484                        await DisposeDetachedConnectionAsync(protocolConnection, withShutdown: false)
 0485                            .ConfigureAwait(false);
 0486                        throw;
 487                    }
 488
 75489                    LinkedListNode<IProtocolConnection>? listNode = null;
 490
 491                    lock (_mutex)
 492                    {
 75493                        if (_shutdownTask is null)
 494                        {
 74495                            listNode = _connections.AddLast(protocolConnection);
 496
 497                            // protocolConnection is no longer a detached connection since it's now "attached" in
 498                            // _connections.
 74499                            _detachedConnectionCount--;
 500                        }
 75501                    }
 502
 75503                    if (listNode is null)
 504                    {
 1505                        await DisposeDetachedConnectionAsync(protocolConnection, withShutdown: true)
 1506                            .ConfigureAwait(false);
 507                    }
 508                    else
 74509                    {
 510                        // Schedule removal after successful ConnectAsync.
 74511                        _ = ShutdownWhenRequestedAsync(protocolConnection, shutdownRequested, listNode);
 512                    }
 75513                }
 514            }
 515        }
 516
 517        async Task DisposeDetachedConnectionAsync(IProtocolConnection connection, bool withShutdown)
 29518        {
 29519            if (withShutdown)
 29520            {
 521                // _disposedCts is not disposed since we own a _backgroundConnectionDisposeCount.
 29522                using var cts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
 29523                cts.CancelAfter(_shutdownTimeout);
 524
 525                try
 29526                {
 527                    // Can be canceled by DisposeAsync or the shutdown timeout.
 29528                    await connection.ShutdownAsync(cts.Token).ConfigureAwait(false);
 22529                }
 7530                catch
 7531                {
 532                    // Ignore connection shutdown failures. connection.ShutdownAsync makes sure it's an "expected"
 533                    // exception.
 7534                }
 29535            }
 536
 29537            await connection.DisposeAsync().ConfigureAwait(false);
 538            lock (_mutex)
 29539            {
 29540                if (--_detachedConnectionCount == 0 && _shutdownTask is not null)
 15541                {
 15542                    _detachedConnectionsTcs.SetResult();
 15543                }
 29544            }
 29545        }
 546
 547        // Remove the connection from _connections after a successful ConnectAsync.
 548        async Task ShutdownWhenRequestedAsync(
 549            IProtocolConnection connection,
 550            Task shutdownRequested,
 551            LinkedListNode<IProtocolConnection> listNode)
 74552        {
 74553            await shutdownRequested.ConfigureAwait(false);
 554
 555            lock (_mutex)
 61556            {
 61557                if (_shutdownTask is null)
 28558                {
 28559                    _connections.Remove(listNode);
 28560                    _detachedConnectionCount++;
 28561                }
 562                else
 33563                {
 564                    // _connections is immutable and ShutdownAsync/DisposeAsync is responsible to shutdown/dispose
 565                    // this connection.
 33566                    return;
 567                }
 28568            }
 569
 28570            await DisposeDetachedConnectionAsync(connection, withShutdown: true).ConfigureAwait(false);
 61571        }
 260572    }
 573
 574    /// <summary>Gracefully shuts down this server: the server stops accepting new connections and shuts down gracefully
 575    /// all its connections.</summary>
 576    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 577    /// <returns>A task that completes successfully once the shutdown of all connections accepted by the server has
 578    /// completed. This includes connections that were active when this method is called and connections whose shutdown
 579    /// was initiated prior to this call.</returns>
 580    /// <exception cref="InvalidOperationException">Thrown if this method is called more than once.</exception>
 581    /// <exception cref="ObjectDisposedException">Thrown if the server is disposed.</exception>
 582    /// <remarks><para>The returned task can also complete with one of the following exceptions:</para>
 583    /// <list type="bullet">
 584    /// <item><description><see cref="IceRpcException" /> with error <see cref="IceRpcError.OperationAborted" /> if the
 585    /// server is disposed while being shut down.</description></item>
 586    /// <item><description><see cref="OperationCanceledException" /> if cancellation was requested through the
 587    /// cancellation token.</description></item>
 588    /// <item><description><see cref="TimeoutException" /> if the shutdown timed out.</description></item>
 589    /// </list>
 590    /// </remarks>
 591    public Task ShutdownAsync(CancellationToken cancellationToken = default)
 32592    {
 593        lock (_mutex)
 32594        {
 32595            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 596
 32597            if (_shutdownTask is not null)
 0598            {
 0599                throw new InvalidOperationException($"Server '{this}' is shut down or shutting down.");
 600            }
 601
 32602            if (_detachedConnectionCount == 0)
 25603            {
 25604                _detachedConnectionsTcs.SetResult();
 25605            }
 606
 32607            _shutdownTask = PerformShutdownAsync();
 32608        }
 32609        return _shutdownTask;
 610
 611        async Task PerformShutdownAsync()
 32612        {
 32613            await Task.Yield(); // exit mutex lock
 614
 32615            _shutdownCts.Cancel();
 616
 617            // _listenTask is immutable once _shutdownTask is not null.
 32618            if (_listenTask is not null)
 32619            {
 620                try
 32621                {
 32622                    using var cts = CancellationTokenSource.CreateLinkedTokenSource(
 32623                        cancellationToken,
 32624                        _disposedCts.Token);
 625
 32626                    cts.CancelAfter(_shutdownTimeout);
 627
 628                    try
 32629                    {
 32630                        await Task.WhenAll(
 32631                            _connections
 12632                                .Select(connection => connection.ShutdownAsync(cts.Token))
 32633                                .Append(_listenTask.WaitAsync(cts.Token))
 32634                                .Append(_detachedConnectionsTcs.Task.WaitAsync(cts.Token)))
 32635                            .ConfigureAwait(false);
 32636                    }
 0637                    catch (OperationCanceledException)
 0638                    {
 0639                        throw;
 640                    }
 0641                    catch
 0642                    {
 643                        // Ignore _listenTask and connection shutdown exceptions
 644
 645                        // Throw OperationCanceledException if this WhenAll exception is hiding an OCE.
 0646                        cts.Token.ThrowIfCancellationRequested();
 0647                    }
 32648                }
 0649                catch (OperationCanceledException)
 0650                {
 0651                    cancellationToken.ThrowIfCancellationRequested();
 652
 0653                    if (_disposedCts.IsCancellationRequested)
 0654                    {
 0655                        throw new IceRpcException(
 0656                            IceRpcError.OperationAborted,
 0657                            "The shutdown was aborted because the server was disposed.");
 658                    }
 659                    else
 0660                    {
 0661                        throw new TimeoutException(
 0662                            $"The server shut down timed out after {_shutdownTimeout.TotalSeconds} s.");
 663                    }
 664                }
 32665            }
 32666        }
 32667    }
 668
 669    /// <summary>Returns a string that represents this server.</summary>
 670    /// <returns>A string that represents this server.</returns>
 1671    public override string ToString() => _serverAddress.ToString();
 672
 673    /// <summary>Returns true if the <see cref="IConnectorListener.AcceptAsync" /> failure can be retried.</summary>
 674    private static bool IsRetryableAcceptException(Exception exception) =>
 675        // Transports such as QUIC do the SSL handshake when the connection is accepted, this can throw
 676        // AuthenticationException if it fails.
 258677        exception is IceRpcException or AuthenticationException;
 678
 679    /// <summary>Provides a decorator that adds logging to a <see cref="IConnectorListener" />.</summary>
 680    private class LogConnectorListenerDecorator : IConnectorListener
 681    {
 46682        public ServerAddress ServerAddress => _decoratee.ServerAddress;
 683
 684        private readonly IConnectorListener _decoratee;
 685        private readonly ILogger _logger;
 686
 687        public async Task<(IConnector, EndPoint)> AcceptAsync(CancellationToken cancellationToken)
 18688        {
 689            try
 18690            {
 18691                (IConnector connector, EndPoint remoteNetworkAddress) =
 18692                    await _decoratee.AcceptAsync(cancellationToken).ConfigureAwait(false);
 693
 8694                _logger.LogConnectionAccepted(ServerAddress, remoteNetworkAddress);
 8695                return (
 8696                    new LogConnectorDecorator(connector, ServerAddress, remoteNetworkAddress, _logger),
 8697                    remoteNetworkAddress);
 698            }
 10699            catch (OperationCanceledException exception) when (exception.CancellationToken == cancellationToken)
 10700            {
 701                // Do not log this exception. The AcceptAsync call can fail with OperationCanceledException during
 702                // shutdown once the shutdown cancellation token is canceled.
 10703                throw;
 704            }
 0705            catch (ObjectDisposedException)
 0706            {
 707                // Do not log this exception. The AcceptAsync call can fail with ObjectDisposedException during
 708                // shutdown once the listener is disposed or if it is accepting a connection while the listener is
 709                // disposed.
 0710                throw;
 711            }
 0712            catch (Exception exception) when (IsRetryableAcceptException(exception))
 0713            {
 0714                _logger.LogConnectionAcceptFailedWithRetryableException(ServerAddress, exception);
 0715                throw;
 716            }
 0717            catch (Exception exception)
 0718            {
 0719                _logger.LogConnectionAcceptFailed(ServerAddress, exception);
 0720                throw;
 721            }
 8722        }
 723
 724        public ValueTask DisposeAsync()
 10725        {
 10726            _logger.LogStopAcceptingConnections(ServerAddress);
 10727            return _decoratee.DisposeAsync();
 10728        }
 729
 10730        internal LogConnectorListenerDecorator(IConnectorListener decoratee, ILogger logger)
 10731        {
 10732            _decoratee = decoratee;
 10733            _logger = logger;
 10734            _logger.LogStartAcceptingConnections(ServerAddress);
 10735        }
 736    }
 737
 738    private class LogConnectorDecorator : IConnector
 739    {
 740        private readonly IConnector _decoratee;
 741        private readonly ILogger _logger;
 742        private readonly EndPoint _remoteNetworkAddress;
 743        private readonly ServerAddress _serverAddress;
 744
 745        public async Task<TransportConnectionInformation> ConnectTransportConnectionAsync(
 746            CancellationToken cancellationToken)
 8747        {
 748            try
 8749            {
 8750                return await _decoratee.ConnectTransportConnectionAsync(cancellationToken).ConfigureAwait(false);
 751            }
 2752            catch (Exception exception)
 2753            {
 2754                _logger.LogConnectionConnectFailed(_serverAddress, _remoteNetworkAddress, exception);
 2755                throw;
 756            }
 6757        }
 758
 759        public IProtocolConnection CreateProtocolConnection(
 760            TransportConnectionInformation transportConnectionInformation) =>
 6761            new LogProtocolConnectionDecorator(
 6762                _decoratee.CreateProtocolConnection(transportConnectionInformation),
 6763                _serverAddress,
 6764                _remoteNetworkAddress,
 6765                _logger);
 766
 8767        public ValueTask DisposeAsync() => _decoratee.DisposeAsync();
 768
 769        public Task RefuseTransportConnectionAsync(bool serverBusy, CancellationToken cancel) =>
 0770            _decoratee.RefuseTransportConnectionAsync(serverBusy, cancel);
 771
 8772        internal LogConnectorDecorator(
 8773            IConnector decoratee,
 8774            ServerAddress serverAddress,
 8775            EndPoint remoteNetworkAddress,
 8776            ILogger logger)
 8777        {
 8778            _decoratee = decoratee;
 8779            _logger = logger;
 8780            _serverAddress = serverAddress;
 8781            _remoteNetworkAddress = remoteNetworkAddress;
 8782        }
 783    }
 784
 785    /// <summary>Provides a decorator that adds metrics to a <see cref="IConnectorListener" />.</summary>
 786    private class MetricsConnectorListenerDecorator : IConnectorListener
 787    {
 125788        public ServerAddress ServerAddress => _decoratee.ServerAddress;
 789
 790        private readonly IConnectorListener _decoratee;
 791
 792        public async Task<(IConnector, EndPoint)> AcceptAsync(
 793            CancellationToken cancellationToken)
 345794        {
 345795            (IConnector connector, EndPoint remoteNetworkAddress) =
 345796                await _decoratee.AcceptAsync(cancellationToken).ConfigureAwait(false);
 87797            return (new MetricsConnectorDecorator(connector), remoteNetworkAddress);
 87798        }
 799
 89800        public ValueTask DisposeAsync() => _decoratee.DisposeAsync();
 801
 89802        internal MetricsConnectorListenerDecorator(IConnectorListener decoratee) =>
 89803            _decoratee = decoratee;
 804    }
 805
 806    private class MetricsConnectorDecorator : IConnector
 807    {
 808        private readonly IConnector _decoratee;
 809
 810        public async Task<TransportConnectionInformation> ConnectTransportConnectionAsync(
 811            CancellationToken cancellationToken)
 87812        {
 87813            Metrics.ServerMetrics.ConnectStart();
 814            try
 87815            {
 87816                return await _decoratee.ConnectTransportConnectionAsync(cancellationToken).ConfigureAwait(false);
 817            }
 5818            catch
 5819            {
 5820                Metrics.ServerMetrics.ConnectStop();
 5821                Metrics.ServerMetrics.ConnectionFailure();
 5822                throw;
 823            }
 82824        }
 825
 826        public IProtocolConnection CreateProtocolConnection(
 827            TransportConnectionInformation transportConnectionInformation) =>
 75828                new MetricsProtocolConnectionDecorator(
 75829                    _decoratee.CreateProtocolConnection(transportConnectionInformation),
 75830                    Metrics.ServerMetrics,
 75831                    connectStarted: true);
 832
 87833        public ValueTask DisposeAsync() => _decoratee.DisposeAsync();
 834
 835        public async Task RefuseTransportConnectionAsync(bool serverBusy, CancellationToken cancel)
 7836        {
 837            try
 7838            {
 7839                await _decoratee.RefuseTransportConnectionAsync(serverBusy, cancel).ConfigureAwait(false);
 5840            }
 841            finally
 7842            {
 7843                Metrics.ServerMetrics.ConnectionFailure();
 7844                Metrics.ServerMetrics.ConnectStop();
 7845            }
 5846        }
 847
 174848        internal MetricsConnectorDecorator(IConnector decoratee) => _decoratee = decoratee;
 849    }
 850
 851    /// <summary>A connector listener accepts a transport connection and returns a <see cref="IConnector" />. The
 852    /// connector is used to refuse the transport connection or obtain a protocol connection once the transport
 853    /// connection is connected.</summary>
 854    private interface IConnectorListener : IAsyncDisposable
 855    {
 856        ServerAddress ServerAddress { get; }
 857
 858        Task<(IConnector, EndPoint)> AcceptAsync(CancellationToken cancel);
 859    }
 860
 861    /// <summary>A connector is returned by <see cref="IConnectorListener" />. The connector allows to connect the
 862    /// transport connection. If successful, the transport connection can either be refused or accepted by creating the
 863    /// protocol connection out of it.</summary>
 864    private interface IConnector : IAsyncDisposable
 865    {
 866        Task<TransportConnectionInformation> ConnectTransportConnectionAsync(CancellationToken cancellationToken);
 867
 868        IProtocolConnection CreateProtocolConnection(TransportConnectionInformation transportConnectionInformation);
 869
 870        Task RefuseTransportConnectionAsync(bool serverBusy, CancellationToken cancel);
 871    }
 872
 873    private class IceConnectorListener : IConnectorListener
 874    {
 43875        public ServerAddress ServerAddress { get; }
 876
 877        private readonly IListener<IDuplexConnection> _listener;
 878        private readonly ConnectionOptions _options;
 879
 25880        public ValueTask DisposeAsync() => _listener.DisposeAsync();
 881
 882        public async Task<(IConnector, EndPoint)> AcceptAsync(CancellationToken cancel)
 51883        {
 51884            (IDuplexConnection transportConnection, EndPoint remoteNetworkAddress) = await _listener.AcceptAsync(
 51885                cancel).ConfigureAwait(false);
 26886            return (new IceConnector(transportConnection, _options), remoteNetworkAddress);
 26887        }
 888
 25889        internal IceConnectorListener(
 25890            IListener<IDuplexConnection> listener,
 25891            ServerAddress serverAddress,
 25892            ConnectionOptions options)
 25893        {
 25894            _listener = listener;
 25895            ServerAddress = serverAddress with { Port = listener.TransportAddress.Port };
 25896            _options = options;
 25897        }
 898    }
 899
 900    private class IceConnector : IConnector
 901    {
 902        private readonly ConnectionOptions _options;
 903        private IDuplexConnection? _transportConnection;
 904
 905        public Task<TransportConnectionInformation> ConnectTransportConnectionAsync(
 906            CancellationToken cancellationToken) =>
 26907            _transportConnection!.ConnectAsync(cancellationToken);
 908
 909        public IProtocolConnection CreateProtocolConnection(
 910            TransportConnectionInformation transportConnectionInformation)
 23911        {
 912            // The protocol connection takes ownership of the transport connection.
 23913            var protocolConnection = new IceProtocolConnection(
 23914                _transportConnection!,
 23915                transportConnectionInformation,
 23916                _options);
 23917            _transportConnection = null;
 23918            return protocolConnection;
 23919        }
 920
 921        public ValueTask DisposeAsync()
 26922        {
 26923            _transportConnection?.Dispose();
 26924            return new();
 26925        }
 926
 927        public Task RefuseTransportConnectionAsync(bool serverBusy, CancellationToken cancellationToken)
 2928        {
 2929            _transportConnection!.Dispose();
 2930            return Task.CompletedTask;
 2931        }
 932
 26933        internal IceConnector(IDuplexConnection transportConnection, ConnectionOptions options)
 26934        {
 26935            _transportConnection = transportConnection;
 26936            _options = options;
 26937        }
 938    }
 939
 940    private class IceRpcConnectorListener : IConnectorListener
 941    {
 82942        public ServerAddress ServerAddress { get; }
 943
 944        private readonly IListener<IMultiplexedConnection> _listener;
 945        private readonly ConnectionOptions _options;
 946        private readonly ITaskExceptionObserver? _taskExceptionObserver;
 947
 948        public async Task<(IConnector, EndPoint)> AcceptAsync(CancellationToken cancel)
 294949        {
 294950            (IMultiplexedConnection transportConnection, EndPoint remoteNetworkAddress) = await _listener.AcceptAsync(
 294951                cancel).ConfigureAwait(false);
 61952            return (new IceRpcConnector(transportConnection, _options, _taskExceptionObserver), remoteNetworkAddress);
 61953        }
 954
 64955        public ValueTask DisposeAsync() => _listener.DisposeAsync();
 956
 64957        internal IceRpcConnectorListener(
 64958            IListener<IMultiplexedConnection> listener,
 64959            ServerAddress serverAddress,
 64960            ConnectionOptions options,
 64961            ITaskExceptionObserver? taskExceptionObserver)
 64962        {
 64963            _listener = listener;
 64964            ServerAddress = serverAddress with { Port = listener.TransportAddress.Port };
 64965            _options = options;
 64966            _taskExceptionObserver = taskExceptionObserver;
 64967        }
 968    }
 969
 970    private class IceRpcConnector : IConnector
 971    {
 972        private readonly ConnectionOptions _options;
 973        private readonly ITaskExceptionObserver? _taskExceptionObserver;
 974        private IMultiplexedConnection? _transportConnection;
 975
 976        public Task<TransportConnectionInformation> ConnectTransportConnectionAsync(
 977            CancellationToken cancellationToken) =>
 61978            _transportConnection!.ConnectAsync(cancellationToken);
 979
 980        public IProtocolConnection CreateProtocolConnection(
 981            TransportConnectionInformation transportConnectionInformation)
 52982        {
 983            // The protocol connection takes ownership of the transport connection.
 52984            var protocolConnection = new IceRpcProtocolConnection(
 52985                _transportConnection!,
 52986                transportConnectionInformation,
 52987                _options,
 52988                _taskExceptionObserver);
 52989            _transportConnection = null;
 52990            return protocolConnection;
 52991        }
 992
 61993        public ValueTask DisposeAsync() => _transportConnection?.DisposeAsync() ?? new();
 994
 995        public Task RefuseTransportConnectionAsync(bool serverBusy, CancellationToken cancellationToken) =>
 5996            _transportConnection!.CloseAsync(
 5997                serverBusy ? MultiplexedConnectionCloseError.ServerBusy : MultiplexedConnectionCloseError.Refused,
 5998                cancellationToken);
 999
 611000        internal IceRpcConnector(
 611001            IMultiplexedConnection transportConnection,
 611002            ConnectionOptions options,
 611003            ITaskExceptionObserver? taskExceptionObserver)
 611004        {
 611005            _transportConnection = transportConnection;
 611006            _options = options;
 611007            _taskExceptionObserver = taskExceptionObserver;
 611008        }
 1009    }
 1010}

Methods/Properties

.ctor(IceRpc.ServerOptions,IceRpc.Transports.IDuplexServerTransport,IceRpc.Transports.IMultiplexedServerTransport,Microsoft.Extensions.Logging.ILogger)
.ctor(IceRpc.IDispatcher,System.Net.Security.SslServerAuthenticationOptions,IceRpc.Transports.IDuplexServerTransport,IceRpc.Transports.IMultiplexedServerTransport,Microsoft.Extensions.Logging.ILogger)
.ctor(IceRpc.IDispatcher,IceRpc.ServerAddress,System.Net.Security.SslServerAuthenticationOptions,IceRpc.Transports.IDuplexServerTransport,IceRpc.Transports.IMultiplexedServerTransport,Microsoft.Extensions.Logging.ILogger)
.ctor(IceRpc.IDispatcher,System.Uri,System.Net.Security.SslServerAuthenticationOptions,IceRpc.Transports.IDuplexServerTransport,IceRpc.Transports.IMultiplexedServerTransport,Microsoft.Extensions.Logging.ILogger)
DisposeAsync()
PerformDisposeAsync()
Listen()
ListenAsync()
ConnectAsync()
DisposeDetachedConnectionAsync()
ShutdownWhenRequestedAsync()
ShutdownAsync(System.Threading.CancellationToken)
PerformShutdownAsync()
ToString()
IsRetryableAcceptException(System.Exception)
get_ServerAddress()
AcceptAsync()
DisposeAsync()
.ctor(IceRpc.Server/IConnectorListener,Microsoft.Extensions.Logging.ILogger)
ConnectTransportConnectionAsync()
CreateProtocolConnection(IceRpc.Transports.TransportConnectionInformation)
DisposeAsync()
RefuseTransportConnectionAsync(System.Boolean,System.Threading.CancellationToken)
.ctor(IceRpc.Server/IConnector,IceRpc.ServerAddress,System.Net.EndPoint,Microsoft.Extensions.Logging.ILogger)
get_ServerAddress()
AcceptAsync()
DisposeAsync()
.ctor(IceRpc.Server/IConnectorListener)
ConnectTransportConnectionAsync()
CreateProtocolConnection(IceRpc.Transports.TransportConnectionInformation)
DisposeAsync()
RefuseTransportConnectionAsync()
.ctor(IceRpc.Server/IConnector)
get_ServerAddress()
DisposeAsync()
AcceptAsync()
.ctor(IceRpc.Transports.IListener`1<IceRpc.Transports.IDuplexConnection>,IceRpc.ServerAddress,IceRpc.ConnectionOptions)
ConnectTransportConnectionAsync(System.Threading.CancellationToken)
CreateProtocolConnection(IceRpc.Transports.TransportConnectionInformation)
DisposeAsync()
RefuseTransportConnectionAsync(System.Boolean,System.Threading.CancellationToken)
.ctor(IceRpc.Transports.IDuplexConnection,IceRpc.ConnectionOptions)
get_ServerAddress()
AcceptAsync()
DisposeAsync()
.ctor(IceRpc.Transports.IListener`1<IceRpc.Transports.IMultiplexedConnection>,IceRpc.ServerAddress,IceRpc.ConnectionOptions,IceRpc.Internal.ITaskExceptionObserver)
ConnectTransportConnectionAsync(System.Threading.CancellationToken)
CreateProtocolConnection(IceRpc.Transports.TransportConnectionInformation)
DisposeAsync()
RefuseTransportConnectionAsync(System.Boolean,System.Threading.CancellationToken)
.ctor(IceRpc.Transports.IMultiplexedConnection,IceRpc.ConnectionOptions,IceRpc.Internal.ITaskExceptionObserver)