< Summary

Information
Class: IceRpc.Server
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Server.cs
Tag: 275_13775359185
Line coverage
92%
Covered lines: 494
Uncovered lines: 40
Coverable lines: 534
Total lines: 979
Line coverage: 92.5%
Branch coverage
90%
Covered branches: 74
Total branches: 82
Branch coverage: 90.2%
Method coverage
98%
Covered methods: 50
Total methods: 51
Method coverage: 98%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)94.44%18.011897.36%
.ctor(...)100%11100%
.ctor(...)100%11100%
.ctor(...)100%11100%
DisposeAsync()100%66100%
PerformDisposeAsync()100%4.06484.21%
Listen()75%4.05485.71%
ListenAsync()90%101098.3%
ConnectAsync()100%1212100%
DisposeDetachedConnectionAsync()83.33%66100%
ShutdownWhenRequestedAsync()100%22100%
ShutdownAsync(...)75%4.04486.66%
PerformShutdownAsync()66.66%9.54653.84%
ToString()100%11100%
IsRetryableAcceptException(...)100%44100%
get_ServerAddress()100%11100%
AcceptAsync()100%1.11152.17%
DisposeAsync()100%11100%
.ctor(...)100%11100%
ConnectTransportConnectionAsync()100%11100%
CreateProtocolConnection(...)100%11100%
DisposeAsync()100%11100%
RefuseTransportConnectionAsync(...)100%210%
.ctor(...)100%11100%
get_ServerAddress()100%11100%
AcceptAsync()100%11100%
DisposeAsync()100%11100%
.ctor(...)100%11100%
ConnectTransportConnectionAsync()100%11100%
CreateProtocolConnection(...)100%11100%
DisposeAsync()100%11100%
RefuseTransportConnectionAsync()100%11100%
.ctor(...)100%11100%
get_ServerAddress()100%11100%
DisposeAsync()100%11100%
AcceptAsync()100%11100%
.ctor(...)100%11100%
ConnectTransportConnectionAsync(...)100%11100%
CreateProtocolConnection(...)100%11100%
DisposeAsync()100%22100%
RefuseTransportConnectionAsync(...)100%11100%
.ctor(...)100%11100%
get_ServerAddress()100%11100%
AcceptAsync()100%11100%
DisposeAsync()100%11100%
.ctor(...)100%11100%
ConnectTransportConnectionAsync(...)100%11100%
CreateProtocolConnection(...)100%11100%
DisposeAsync()100%22100%
RefuseTransportConnectionAsync(...)50%22100%
.ctor(...)100%11100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Server.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using IceRpc.Transports;
 5using Microsoft.Extensions.Logging;
 6using Microsoft.Extensions.Logging.Abstractions;
 7using System.Diagnostics;
 8using System.Net;
 9using System.Net.Security;
 10using System.Security.Authentication;
 11
 12namespace IceRpc;
 13
 14/// <summary>A server accepts connections from clients and dispatches the requests it receives over these connections.
 15/// </summary>
 16public sealed class Server : IAsyncDisposable
 17{
 14218    private readonly LinkedList<IProtocolConnection> _connections = new();
 19
 20    private readonly TimeSpan _connectTimeout;
 21
 22    // A detached connection is a protocol connection that we've decided to connect, or that is connecting, shutting
 23    // down or being disposed. It counts towards _maxConnections and both Server.ShutdownAsync and DisposeAsync wait for
 24    // detached connections to reach 0 using _detachedConnectionsTcs. Such a connection is "detached" because it's not
 25    // in _connections.
 26    private int _detachedConnectionCount;
 27
 14228    private readonly TaskCompletionSource _detachedConnectionsTcs = new();
 29
 30    // A cancellation token source that is canceled by DisposeAsync.
 14231    private readonly CancellationTokenSource _disposedCts = new();
 32
 33    private Task? _disposeTask;
 34
 35    private readonly Func<IConnectorListener> _listenerFactory;
 36
 37    private Task? _listenTask;
 38
 39    private readonly int _maxConnections;
 40
 41    private readonly int _maxPendingConnections;
 42
 14243    private readonly object _mutex = new();
 44
 45    private readonly ServerAddress _serverAddress;
 46
 47    // A cancellation token source canceled by ShutdownAsync and DisposeAsync.
 48    private readonly CancellationTokenSource _shutdownCts;
 49
 50    private Task? _shutdownTask;
 51
 52    private readonly TimeSpan _shutdownTimeout;
 53
 54    /// <summary>Constructs a server.</summary>
 55    /// <param name="options">The server options.</param>
 56    /// <param name="duplexServerTransport">The transport used to create ice protocol connections. The <see
 57    /// langword="null" /> value is equivalent to <see cref="IDuplexServerTransport.Default" />.</param>
 58    /// <param name="multiplexedServerTransport">The transport used to create icerpc protocol connections. The <see
 59    /// langword="null" /> value is equivalent to <see cref="IMultiplexedServerTransport.Default" />.</param>
 60    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance"
 61    /// />.</param>
 14262    public Server(
 14263        ServerOptions options,
 14264        IDuplexServerTransport? duplexServerTransport = null,
 14265        IMultiplexedServerTransport? multiplexedServerTransport = null,
 14266        ILogger? logger = null)
 14267    {
 14268        if (options.ConnectionOptions.Dispatcher is null)
 069        {
 070            throw new ArgumentException($"{nameof(ServerOptions.ConnectionOptions.Dispatcher)} cannot be null");
 71        }
 72
 14273        logger ??= NullLogger.Instance;
 74
 14275        _shutdownCts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
 76
 14277        duplexServerTransport ??= IDuplexServerTransport.Default;
 14278        multiplexedServerTransport ??= IMultiplexedServerTransport.Default;
 14279        _maxConnections = options.MaxConnections;
 14280        _maxPendingConnections = options.MaxPendingConnections;
 81
 14282        _connectTimeout = options.ConnectTimeout;
 14283        _shutdownTimeout = options.ShutdownTimeout;
 84
 14285        _serverAddress = options.ServerAddress;
 14286        if (_serverAddress.Transport is null)
 13287        {
 13288            _serverAddress = _serverAddress with
 13289            {
 13290                Transport = _serverAddress.Protocol == Protocol.Ice ?
 13291                    duplexServerTransport.Name : multiplexedServerTransport.Name
 13292            };
 13293        }
 94
 14295        _listenerFactory = () =>
 14096        {
 14297            IConnectorListener listener;
 14098            if (_serverAddress.Protocol == Protocol.Ice)
 3799            {
 37100                IListener<IDuplexConnection> transportListener = duplexServerTransport.Listen(
 37101                    _serverAddress,
 37102                    new DuplexConnectionOptions
 37103                    {
 37104                        MinSegmentSize = options.ConnectionOptions.MinSegmentSize,
 37105                        Pool = options.ConnectionOptions.Pool,
 37106                    },
 37107                    options.ServerAuthenticationOptions);
 142108
 37109                listener = new IceConnectorListener(
 37110                    transportListener,
 37111                    options.ConnectionOptions);
 37112            }
 142113            else
 103114            {
 103115                IListener<IMultiplexedConnection> transportListener = multiplexedServerTransport.Listen(
 103116                    _serverAddress,
 103117                    new MultiplexedConnectionOptions
 103118                    {
 103119                        MaxBidirectionalStreams = options.ConnectionOptions.MaxIceRpcBidirectionalStreams,
 103120                        // Add an additional stream for the icerpc protocol control stream.
 103121                        MaxUnidirectionalStreams = options.ConnectionOptions.MaxIceRpcUnidirectionalStreams + 1,
 103122                        MinSegmentSize = options.ConnectionOptions.MinSegmentSize,
 103123                        Pool = options.ConnectionOptions.Pool
 103124                    },
 103125                    options.ServerAuthenticationOptions);
 142126
 103127                listener = new IceRpcConnectorListener(
 103128                    transportListener,
 103129                    options.ConnectionOptions,
 103130                    logger == NullLogger.Instance ? null : new LogTaskExceptionObserver(logger));
 103131            }
 142132
 140133            listener = new MetricsConnectorListenerDecorator(listener);
 140134            if (logger != NullLogger.Instance)
 20135            {
 20136                listener = new LogConnectorListenerDecorator(listener, logger);
 20137            }
 140138            return listener;
 282139        };
 142140    }
 141
 142    /// <summary>Constructs a server with the specified dispatcher and authentication options. All other properties
 143    /// use the <see cref="ServerOptions" /> defaults.</summary>
 144    /// <param name="dispatcher">The dispatcher of the server.</param>
 145    /// <param name="serverAuthenticationOptions">The SSL server authentication options. When not <see langword="null"
 146    /// />, the server will accept only secure connections.</param>
 147    /// <param name="duplexServerTransport">The transport used to create ice protocol connections. <see langword="null"
 148    /// /> is equivalent to <see cref="IDuplexServerTransport.Default" />.</param>
 149    /// <param name="multiplexedServerTransport">The transport used to create icerpc protocol connections. <see
 150    /// langword="null" /> is equivalent to <see cref="IMultiplexedServerTransport.Default" />.</param>
 151    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance"
 152    /// />.</param>
 153    public Server(
 154        IDispatcher dispatcher,
 155        SslServerAuthenticationOptions? serverAuthenticationOptions = null,
 156        IDuplexServerTransport? duplexServerTransport = null,
 157        IMultiplexedServerTransport? multiplexedServerTransport = null,
 158        ILogger? logger = null)
 2159        : this(
 2160            new ServerOptions
 2161            {
 2162                ServerAuthenticationOptions = serverAuthenticationOptions,
 2163                ConnectionOptions = new()
 2164                {
 2165                    Dispatcher = dispatcher,
 2166                }
 2167            },
 2168            duplexServerTransport,
 2169            multiplexedServerTransport,
 2170            logger)
 2171    {
 2172    }
 173
 174    /// <summary>Constructs a server with the specified dispatcher, server address and authentication options. All
 175    /// other properties use the <see cref="ServerOptions" /> defaults.</summary>
 176    /// <param name="dispatcher">The dispatcher of the server.</param>
 177    /// <param name="serverAddress">The server address of the server.</param>
 178    /// <param name="serverAuthenticationOptions">The SSL server authentication options. When not <see langword="null"
 179    /// />, the server will accept only secure connections.</param>
 180    /// <param name="duplexServerTransport">The transport used to create ice protocol connections. <see langword="null"
 181    /// /> is equivalent to <see cref="IDuplexServerTransport.Default" />.</param>
 182    /// <param name="multiplexedServerTransport">The transport used to create icerpc protocol connections. <see
 183    /// langword="null" /> is equivalent to <see cref="IMultiplexedServerTransport.Default" />.</param>
 184    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance"
 185    /// />.</param>
 186    public Server(
 187        IDispatcher dispatcher,
 188        ServerAddress serverAddress,
 189        SslServerAuthenticationOptions? serverAuthenticationOptions = null,
 190        IDuplexServerTransport? duplexServerTransport = null,
 191        IMultiplexedServerTransport? multiplexedServerTransport = null,
 192        ILogger? logger = null)
 50193        : this(
 50194            new ServerOptions
 50195            {
 50196                ServerAuthenticationOptions = serverAuthenticationOptions,
 50197                ConnectionOptions = new()
 50198                {
 50199                    Dispatcher = dispatcher,
 50200                },
 50201                ServerAddress = serverAddress
 50202            },
 50203            duplexServerTransport,
 50204            multiplexedServerTransport,
 50205            logger)
 50206    {
 50207    }
 208
 209    /// <summary>Constructs a server with the specified dispatcher, server address URI and authentication options. All
 210    /// other properties use the <see cref="ServerOptions" /> defaults.</summary>
 211    /// <param name="dispatcher">The dispatcher of the server.</param>
 212    /// <param name="serverAddressUri">A URI that represents the server address of the server.</param>
 213    /// <param name="serverAuthenticationOptions">The SSL server authentication options. When not <see langword="null"
 214    /// />, the server will accept only secure connections.</param>
 215    /// <param name="duplexServerTransport">The transport used to create ice protocol connections. <see langword="null"
 216    /// /> is equivalent to <see cref="IDuplexServerTransport.Default" />.</param>
 217    /// <param name="multiplexedServerTransport">The transport used to create icerpc protocol connections. <see
 218    /// langword="null" /> is equivalent to <see cref="IMultiplexedServerTransport.Default" />.</param>
 219    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance"
 220    /// />.</param>
 221    public Server(
 222        IDispatcher dispatcher,
 223        Uri serverAddressUri,
 224        SslServerAuthenticationOptions? serverAuthenticationOptions = null,
 225        IDuplexServerTransport? duplexServerTransport = null,
 226        IMultiplexedServerTransport? multiplexedServerTransport = null,
 227        ILogger? logger = null)
 10228        : this(
 10229            dispatcher,
 10230            new ServerAddress(serverAddressUri),
 10231            serverAuthenticationOptions,
 10232            duplexServerTransport,
 10233            multiplexedServerTransport,
 10234            logger)
 10235    {
 10236    }
 237
 238    /// <summary>Releases all resources allocated by this server. The server stops listening for new connections and
 239    /// disposes the connections it accepted from clients.</summary>
 240    /// <returns>A value task that completes when the disposal of all connections accepted by the server has completed.
 241    /// This includes connections that were active when this method is called and connections whose disposal was
 242    /// initiated prior to this call.</returns>
 243    /// <remarks>The disposal of an underlying connection of the server aborts invocations, cancels dispatches and
 244    /// disposes the underlying transport connection without waiting for the peer. To wait for invocations and
 245    /// dispatches to complete, call <see cref="ShutdownAsync" /> first. If the configured dispatcher does not complete
 246    /// promptly when its cancellation token is canceled, the disposal can hang.</remarks>
 247    public ValueTask DisposeAsync()
 144248    {
 144249        lock (_mutex)
 144250        {
 144251            if (_disposeTask is null)
 142252            {
 142253                _shutdownTask ??= Task.CompletedTask;
 142254                if (_detachedConnectionCount == 0)
 133255                {
 133256                    _ = _detachedConnectionsTcs.TrySetResult();
 133257                }
 258
 142259                _disposeTask = PerformDisposeAsync();
 142260            }
 144261            return new(_disposeTask);
 262        }
 263
 264        async Task PerformDisposeAsync()
 142265        {
 142266            await Task.Yield(); // exit mutex lock
 267
 142268            _disposedCts.Cancel();
 269
 270            // _listenTask etc are immutable when _disposeTask is not null.
 271
 142272            if (_listenTask is not null)
 140273            {
 274                // Wait for shutdown before disposing connections.
 275                try
 140276                {
 140277                    await Task.WhenAll(_listenTask, _shutdownTask).ConfigureAwait(false);
 140278                }
 0279                catch
 0280                {
 281                    // Ignore exceptions.
 0282                }
 283
 140284                await Task.WhenAll(
 140285                    _connections
 85286                        .Select(connection => connection.DisposeAsync().AsTask())
 140287                        .Append(_detachedConnectionsTcs.Task)).ConfigureAwait(false);
 140288            }
 289
 142290            _disposedCts.Dispose();
 142291            _shutdownCts.Dispose();
 142292        }
 144293    }
 294
 295    /// <summary>Starts accepting connections on the configured server address. Requests received over these connections
 296    /// are then dispatched by the configured dispatcher.</summary>
 297    /// <returns>The server address this server is listening on and that a client would connect to. This address is the
 298    /// same as the <see cref="ServerOptions.ServerAddress" /> of <see cref="ServerOptions" /> except its
 299    /// <see cref="ServerAddress.Transport" /> property is always non-null and its port number is never 0 when the host
 300    /// is an IP address.</returns>
 301    /// <exception cref="IceRpcException">Thrown when the server transport fails to listen on the configured <see
 302    /// cref="ServerOptions.ServerAddress" />.</exception>
 303    /// <exception cref="InvalidOperationException">Thrown when the server is already listening, shut down or shutting
 304    /// down.</exception>
 305    /// <exception cref="ObjectDisposedException">Throw when the server is disposed.</exception>
 306    /// <remarks><see cref="Listen" /> can also throw exceptions from the transport; for example, the transport can
 307    /// reject the server address.</remarks>
 308    public ServerAddress Listen()
 144309    {
 144310        lock (_mutex)
 144311        {
 144312            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 313
 142314            if (_shutdownTask is not null)
 0315            {
 0316                throw new InvalidOperationException($"Server '{this}' is shut down or shutting down.");
 317            }
 142318            if (_listenTask is not null)
 2319            {
 2320                throw new InvalidOperationException($"Server '{this}' is already listening.");
 321            }
 322
 140323            IConnectorListener listener = _listenerFactory();
 140324            _listenTask = ListenAsync(listener); // _listenTask owns listener and must dispose it
 140325            return listener.ServerAddress;
 326        }
 327
 328        async Task ListenAsync(IConnectorListener listener)
 140329        {
 140330            await Task.Yield(); // exit mutex lock
 331
 332            try
 140333            {
 140334                using var pendingConnectionSemaphore = new SemaphoreSlim(
 140335                    _maxPendingConnections,
 140336                    _maxPendingConnections);
 337
 279338                while (!_shutdownCts.IsCancellationRequested)
 279339                {
 279340                    await pendingConnectionSemaphore.WaitAsync(_shutdownCts.Token).ConfigureAwait(false);
 341
 278342                    IConnector? connector = null;
 343                    do
 451344                    {
 345                        try
 451346                        {
 451347                            (connector, _) = await listener.AcceptAsync(_shutdownCts.Token).ConfigureAwait(false);
 139348                        }
 312349                        catch (Exception exception) when (IsRetryableAcceptException(exception))
 173350                        {
 351                            // continue
 173352                        }
 312353                    }
 312354                    while (connector is null);
 355
 356                    // We don't wait for the connection to be activated or shutdown. This could take a while for some
 357                    // transports such as TLS based transports where the handshake requires few round trips between the
 358                    // client and server. Waiting could also cause a security issue if the client doesn't respond to the
 359                    // connection initialization as we wouldn't be able to accept new connections in the meantime. The
 360                    // call will eventually timeout if the ConnectTimeout expires.
 139361                    CancellationToken cancellationToken = _disposedCts.Token;
 139362                    _ = Task.Run(
 139363                        async () =>
 139364                        {
 139365                            try
 139366                            {
 139367                                await ConnectAsync(connector, cancellationToken).ConfigureAwait(false);
 127368                            }
 12369                            catch
 12370                            {
 139371                                // Ignore connection establishment failure. This failures are logged by the
 139372                                // LogConnectorDecorator
 12373                            }
 139374                            finally
 139375                            {
 139376                                // The connection dispose will dispose the transport connection if it has not been
 139377                                // adopted by the protocol connection.
 139378                                await connector.DisposeAsync().ConfigureAwait(false);
 139379
 139380                                // The pending connection semaphore is disposed by the listen task completion once
 139381                                // shutdown / dispose is initiated.
 139382                                lock (_mutex)
 139383                                {
 139384                                    if (_shutdownTask is null)
 129385                                    {
 129386                                        pendingConnectionSemaphore.Release();
 129387                                    }
 139388                                }
 139389                            }
 139390                        },
 139391                        CancellationToken.None); // the task must run to dispose the connector.
 139392                }
 0393            }
 140394            catch
 140395            {
 396                // Ignore. Exceptions thrown by listener.AcceptAsync are logged by the log decorator when appropriate.
 140397            }
 398            finally
 140399            {
 140400                await listener.DisposeAsync().ConfigureAwait(false);
 140401            }
 402
 403            async Task ConnectAsync(IConnector connector, CancellationToken cancellationToken)
 139404            {
 139405                using var connectCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
 139406                connectCts.CancelAfter(_connectTimeout);
 407
 408                // Connect the transport connection first. This connection establishment can be interrupted by the
 409                // connect timeout or the server ShutdownAsync/DisposeAsync.
 139410                TransportConnectionInformation transportConnectionInformation =
 139411                    await connector.ConnectTransportConnectionAsync(connectCts.Token).ConfigureAwait(false);
 412
 130413                IProtocolConnection? protocolConnection = null;
 130414                bool serverBusy = false;
 415
 130416                lock (_mutex)
 130417                {
 130418                    Debug.Assert(
 130419                        _maxConnections == 0 || _connections.Count + _detachedConnectionCount <= _maxConnections);
 420
 130421                    if (_shutdownTask is null)
 130422                    {
 130423                        if (_maxConnections > 0 && (_connections.Count + _detachedConnectionCount) == _maxConnections)
 14424                        {
 14425                            serverBusy = true;
 14426                        }
 427                        else
 116428                        {
 429                            // The protocol connection adopts the transport connection from the connector and it's
 430                            // now responsible for disposing of it.
 116431                            protocolConnection = connector.CreateProtocolConnection(transportConnectionInformation);
 116432                            _detachedConnectionCount++;
 116433                        }
 130434                    }
 130435                }
 436
 130437                if (protocolConnection is null)
 14438                {
 439                    try
 14440                    {
 14441                        await connector.RefuseTransportConnectionAsync(serverBusy, connectCts.Token)
 14442                            .ConfigureAwait(false);
 10443                    }
 4444                    catch
 4445                    {
 446                        // ignore and continue
 4447                    }
 448                    // The transport connection is disposed by the disposal of the connector.
 14449                }
 450                else
 116451                {
 452                    Task shutdownRequested;
 453                    try
 116454                    {
 116455                        (_, shutdownRequested) = await protocolConnection.ConnectAsync(connectCts.Token)
 116456                            .ConfigureAwait(false);
 113457                    }
 3458                    catch
 3459                    {
 3460                        await DisposeDetachedConnectionAsync(protocolConnection, withShutdown: false)
 3461                            .ConfigureAwait(false);
 3462                        throw;
 463                    }
 464
 113465                    LinkedListNode<IProtocolConnection>? listNode = null;
 466
 113467                    lock (_mutex)
 468                    {
 113469                        if (_shutdownTask is null)
 470                        {
 112471                            listNode = _connections.AddLast(protocolConnection);
 472
 473                            // protocolConnection is no longer a detached connection since it's now "attached" in
 474                            // _connections.
 112475                            _detachedConnectionCount--;
 476                        }
 113477                    }
 478
 113479                    if (listNode is null)
 480                    {
 1481                        await DisposeDetachedConnectionAsync(protocolConnection, withShutdown: true)
 1482                            .ConfigureAwait(false);
 483                    }
 484                    else
 112485                    {
 486                        // Schedule removal after successful ConnectAsync.
 112487                        _ = ShutdownWhenRequestedAsync(protocolConnection, shutdownRequested, listNode);
 488                    }
 113489                }
 127490            }
 140491        }
 492
 493        async Task DisposeDetachedConnectionAsync(IProtocolConnection connection, bool withShutdown)
 31494        {
 31495            if (withShutdown)
 28496            {
 497                // _disposedCts is not disposed since we own a _backgroundConnectionDisposeCount.
 28498                using var cts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
 28499                cts.CancelAfter(_shutdownTimeout);
 500
 501                try
 28502                {
 503                    // Can be canceled by DisposeAsync or the shutdown timeout.
 28504                    await connection.ShutdownAsync(cts.Token).ConfigureAwait(false);
 17505                }
 11506                catch
 11507                {
 508                    // Ignore connection shutdown failures. connection.ShutdownAsync makes sure it's an "expected"
 509                    // exception.
 11510                }
 28511            }
 512
 31513            await connection.DisposeAsync().ConfigureAwait(false);
 31514            lock (_mutex)
 31515            {
 31516                if (--_detachedConnectionCount == 0 && _shutdownTask is not null)
 11517                {
 11518                    _detachedConnectionsTcs.SetResult();
 11519                }
 31520            }
 31521        }
 522
 523        // Remove the connection from _connections after a successful ConnectAsync.
 524        async Task ShutdownWhenRequestedAsync(
 525            IProtocolConnection connection,
 526            Task shutdownRequested,
 527            LinkedListNode<IProtocolConnection> listNode)
 112528        {
 112529            await shutdownRequested.ConfigureAwait(false);
 530
 91531            lock (_mutex)
 91532            {
 91533                if (_shutdownTask is null)
 27534                {
 27535                    _connections.Remove(listNode);
 27536                    _detachedConnectionCount++;
 27537                }
 538                else
 64539                {
 540                    // _connections is immutable and ShutdownAsync/DisposeAsync is responsible to shutdown/dispose
 541                    // this connection.
 64542                    return;
 543                }
 27544            }
 545
 27546            await DisposeDetachedConnectionAsync(connection, withShutdown: true).ConfigureAwait(false);
 91547        }
 140548    }
 549
 550    /// <summary>Gracefully shuts down this server: the server stops accepting new connections and shuts down gracefully
 551    /// all its connections.</summary>
 552    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 553    /// <returns>A task that completes successfully once the shutdown of all connections accepted by the server has
 554    /// completed. This includes connections that were active when this method is called and connections whose shutdown
 555    /// was initiated prior to this call. This task can also complete with one of the following exceptions:
 556    /// <list type="bullet">
 557    /// <item><description><see cref="IceRpcException" /> with error <see cref="IceRpcError.OperationAborted" /> if the
 558    /// server is disposed while being shut down.</description></item>
 559    /// <item><description><see cref="OperationCanceledException" /> if cancellation was requested through the
 560    /// cancellation token.</description></item>
 561    /// <item><description><see cref="TimeoutException" /> if the shutdown timed out.</description></item>
 562    /// </list>
 563    /// </returns>
 564    /// <exception cref="InvalidOperationException">Thrown if this method is called more than once.</exception>
 565    /// <exception cref="ObjectDisposedException">Thrown if the server is disposed.</exception>
 566    public Task ShutdownAsync(CancellationToken cancellationToken = default)
 36567    {
 36568        lock (_mutex)
 36569        {
 36570            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 571
 36572            if (_shutdownTask is not null)
 0573            {
 0574                throw new InvalidOperationException($"Server '{this}' is shut down or shutting down.");
 575            }
 576
 36577            if (_detachedConnectionCount == 0)
 34578            {
 34579                _detachedConnectionsTcs.SetResult();
 34580            }
 581
 36582            _shutdownTask = PerformShutdownAsync();
 36583        }
 36584        return _shutdownTask;
 585
 586        async Task PerformShutdownAsync()
 36587        {
 36588            await Task.Yield(); // exit mutex lock
 589
 36590            _shutdownCts.Cancel();
 591
 592            // _listenTask is immutable once _shutdownTask is not null.
 36593            if (_listenTask is not null)
 36594            {
 595                try
 36596                {
 36597                    using var cts = CancellationTokenSource.CreateLinkedTokenSource(
 36598                        cancellationToken,
 36599                        _disposedCts.Token);
 600
 36601                    cts.CancelAfter(_shutdownTimeout);
 602
 603                    try
 36604                    {
 36605                        await Task.WhenAll(
 36606                            _connections
 23607                                .Select(connection => connection.ShutdownAsync(cts.Token))
 36608                                .Append(_listenTask.WaitAsync(cts.Token))
 36609                                .Append(_detachedConnectionsTcs.Task.WaitAsync(cts.Token)))
 36610                            .ConfigureAwait(false);
 36611                    }
 0612                    catch (OperationCanceledException)
 0613                    {
 0614                        throw;
 615                    }
 0616                    catch
 0617                    {
 618                        // Ignore _listenTask and connection shutdown exceptions
 619
 620                        // Throw OperationCanceledException if this WhenAll exception is hiding an OCE.
 0621                        cts.Token.ThrowIfCancellationRequested();
 0622                    }
 36623                }
 0624                catch (OperationCanceledException)
 0625                {
 0626                    cancellationToken.ThrowIfCancellationRequested();
 627
 0628                    if (_disposedCts.IsCancellationRequested)
 0629                    {
 0630                        throw new IceRpcException(
 0631                            IceRpcError.OperationAborted,
 0632                            "The shutdown was aborted because the server was disposed.");
 633                    }
 634                    else
 0635                    {
 0636                        throw new TimeoutException(
 0637                            $"The server shut down timed out after {_shutdownTimeout.TotalSeconds} s.");
 638                    }
 639                }
 36640            }
 36641        }
 36642    }
 643
 644    /// <summary>Returns a string that represents this server.</summary>
 645    /// <returns>A string that represents this server.</returns>
 2646    public override string ToString() => _serverAddress.ToString();
 647
 648    /// <summary>Returns true if the <see cref="IConnectorListener.AcceptAsync" /> failure can be retried.</summary>
 649    private static bool IsRetryableAcceptException(Exception exception) =>
 650        // Transports such as Quic do the SSL handshake when the connection is accepted, this can throw
 651        // AuthenticationException if it fails.
 312652        exception is IceRpcException or AuthenticationException;
 653
 654    /// <summary>Provides a decorator that adds logging to a <see cref="IConnectorListener" />.</summary>
 655    private class LogConnectorListenerDecorator : IConnectorListener
 656    {
 92657        public ServerAddress ServerAddress => _decoratee.ServerAddress;
 658
 659        private readonly IConnectorListener _decoratee;
 660        private readonly ILogger _logger;
 661
 662        public async Task<(IConnector, EndPoint)> AcceptAsync(CancellationToken cancellationToken)
 36663        {
 664            try
 36665            {
 36666                (IConnector connector, EndPoint remoteNetworkAddress) =
 36667                    await _decoratee.AcceptAsync(cancellationToken).ConfigureAwait(false);
 668
 16669                _logger.LogConnectionAccepted(ServerAddress, remoteNetworkAddress);
 16670                return (
 16671                    new LogConnectorDecorator(connector, ServerAddress, remoteNetworkAddress, _logger),
 16672                    remoteNetworkAddress);
 673            }
 20674            catch (OperationCanceledException exception) when (exception.CancellationToken == cancellationToken)
 20675            {
 676                // Do not log this exception. The AcceptAsync call can fail with OperationCanceledException during
 677                // shutdown once the shutdown cancellation token is canceled.
 20678                throw;
 679            }
 0680            catch (ObjectDisposedException)
 0681            {
 682                // Do not log this exception. The AcceptAsync call can fail with ObjectDisposedException during
 683                // shutdown once the listener is disposed or if it is accepting a connection while the listener is
 684                // disposed.
 0685                throw;
 686            }
 0687            catch (Exception exception) when (IsRetryableAcceptException(exception))
 0688            {
 0689                _logger.LogConnectionAcceptFailedWithRetryableException(ServerAddress, exception);
 0690                throw;
 691            }
 0692            catch (Exception exception)
 0693            {
 0694                _logger.LogConnectionAcceptFailed(ServerAddress, exception);
 0695                throw;
 696            }
 16697        }
 698
 699        public ValueTask DisposeAsync()
 20700        {
 20701            _logger.LogStopAcceptingConnections(ServerAddress);
 20702            return _decoratee.DisposeAsync();
 20703        }
 704
 20705        internal LogConnectorListenerDecorator(IConnectorListener decoratee, ILogger logger)
 20706        {
 20707            _decoratee = decoratee;
 20708            _logger = logger;
 20709            _logger.LogStartAcceptingConnections(ServerAddress);
 20710        }
 711    }
 712
 713    private class LogConnectorDecorator : IConnector
 714    {
 715        private readonly IConnector _decoratee;
 716        private readonly ILogger _logger;
 717        private readonly EndPoint _remoteNetworkAddress;
 718        private readonly ServerAddress _serverAddress;
 719
 720        public async Task<TransportConnectionInformation> ConnectTransportConnectionAsync(
 721            CancellationToken cancellationToken)
 16722        {
 723            try
 16724            {
 16725                return await _decoratee.ConnectTransportConnectionAsync(cancellationToken).ConfigureAwait(false);
 726            }
 4727            catch (Exception exception)
 4728            {
 4729                _logger.LogConnectionConnectFailed(_serverAddress, _remoteNetworkAddress, exception);
 4730                throw;
 731            }
 12732        }
 733
 734        public IProtocolConnection CreateProtocolConnection(
 735            TransportConnectionInformation transportConnectionInformation) =>
 12736            new LogProtocolConnectionDecorator(
 12737                _decoratee.CreateProtocolConnection(transportConnectionInformation),
 12738                _serverAddress,
 12739                _remoteNetworkAddress,
 12740                _logger);
 741
 16742        public ValueTask DisposeAsync() => _decoratee.DisposeAsync();
 743
 744        public Task RefuseTransportConnectionAsync(bool serverBusy, CancellationToken cancel) =>
 0745            _decoratee.RefuseTransportConnectionAsync(serverBusy, cancel);
 746
 16747        internal LogConnectorDecorator(
 16748            IConnector decoratee,
 16749            ServerAddress serverAddress,
 16750            EndPoint remoteNetworkAddress,
 16751            ILogger logger)
 16752        {
 16753            _decoratee = decoratee;
 16754            _logger = logger;
 16755            _serverAddress = serverAddress;
 16756            _remoteNetworkAddress = remoteNetworkAddress;
 16757        }
 758    }
 759
 760    /// <summary>Provides a decorator that adds metrics to a <see cref="IConnectorListener" />.</summary>
 761    private class MetricsConnectorListenerDecorator : IConnectorListener
 762    {
 212763        public ServerAddress ServerAddress => _decoratee.ServerAddress;
 764
 765        private readonly IConnectorListener _decoratee;
 766
 767        public async Task<(IConnector, EndPoint)> AcceptAsync(
 768            CancellationToken cancellationToken)
 451769        {
 451770            (IConnector connector, EndPoint remoteNetworkAddress) =
 451771                await _decoratee.AcceptAsync(cancellationToken).ConfigureAwait(false);
 139772            return (new MetricsConnectorDecorator(connector), remoteNetworkAddress);
 139773        }
 774
 140775        public ValueTask DisposeAsync() => _decoratee.DisposeAsync();
 776
 140777        internal MetricsConnectorListenerDecorator(IConnectorListener decoratee) =>
 140778            _decoratee = decoratee;
 779    }
 780
 781    private class MetricsConnectorDecorator : IConnector
 782    {
 783        private readonly IConnector _decoratee;
 784
 785        public async Task<TransportConnectionInformation> ConnectTransportConnectionAsync(
 786            CancellationToken cancellationToken)
 139787        {
 139788            Metrics.ServerMetrics.ConnectStart();
 789            try
 139790            {
 139791                return await _decoratee.ConnectTransportConnectionAsync(cancellationToken).ConfigureAwait(false);
 792            }
 9793            catch
 9794            {
 9795                Metrics.ServerMetrics.ConnectStop();
 9796                Metrics.ServerMetrics.ConnectionFailure();
 9797                throw;
 798            }
 130799        }
 800
 801        public IProtocolConnection CreateProtocolConnection(
 802            TransportConnectionInformation transportConnectionInformation) =>
 116803                new MetricsProtocolConnectionDecorator(
 116804                    _decoratee.CreateProtocolConnection(transportConnectionInformation),
 116805                    Metrics.ServerMetrics,
 116806                    connectStarted: true);
 807
 139808        public ValueTask DisposeAsync() => _decoratee.DisposeAsync();
 809
 810        public async Task RefuseTransportConnectionAsync(bool serverBusy, CancellationToken cancel)
 14811        {
 812            try
 14813            {
 14814                await _decoratee.RefuseTransportConnectionAsync(serverBusy, cancel).ConfigureAwait(false);
 10815            }
 816            finally
 14817            {
 14818                Metrics.ServerMetrics.ConnectionFailure();
 14819                Metrics.ServerMetrics.ConnectStop();
 14820            }
 10821        }
 822
 278823        internal MetricsConnectorDecorator(IConnector decoratee) => _decoratee = decoratee;
 824    }
 825
 826    /// <summary>A connector listener accepts a transport connection and returns a <see cref="IConnector" />. The
 827    /// connector is used to refuse the transport connection or obtain a protocol connection once the transport
 828    /// connection is connected.</summary>
 829    private interface IConnectorListener : IAsyncDisposable
 830    {
 831        ServerAddress ServerAddress { get; }
 832
 833        Task<(IConnector, EndPoint)> AcceptAsync(CancellationToken cancel);
 834    }
 835
 836    /// <summary>A connector is returned by <see cref="IConnectorListener" />. The connector allows to connect the
 837    /// transport connection. If successful, the transport connection can either be refused or accepted by creating the
 838    /// protocol connection out of it.</summary>
 839    private interface IConnector : IAsyncDisposable
 840    {
 841        Task<TransportConnectionInformation> ConnectTransportConnectionAsync(CancellationToken cancellationToken);
 842
 843        IProtocolConnection CreateProtocolConnection(TransportConnectionInformation transportConnectionInformation);
 844
 845        Task RefuseTransportConnectionAsync(bool serverBusy, CancellationToken cancel);
 846    }
 847
 848    private class IceConnectorListener : IConnectorListener
 849    {
 73850        public ServerAddress ServerAddress => _listener.ServerAddress;
 851
 852        private readonly IListener<IDuplexConnection> _listener;
 853        private readonly ConnectionOptions _options;
 854
 37855        public ValueTask DisposeAsync() => _listener.DisposeAsync();
 856
 857        public async Task<(IConnector, EndPoint)> AcceptAsync(CancellationToken cancel)
 76858        {
 76859            (IDuplexConnection transportConnection, EndPoint remoteNetworkAddress) = await _listener.AcceptAsync(
 76860                cancel).ConfigureAwait(false);
 39861            return (new IceConnector(transportConnection, _options), remoteNetworkAddress);
 39862        }
 863
 37864        internal IceConnectorListener(IListener<IDuplexConnection> listener, ConnectionOptions options)
 37865        {
 37866            _listener = listener;
 37867            _options = options;
 37868        }
 869    }
 870
 871    private class IceConnector : IConnector
 872    {
 873        private readonly ConnectionOptions _options;
 874        private IDuplexConnection? _transportConnection;
 875
 876        public Task<TransportConnectionInformation> ConnectTransportConnectionAsync(
 877            CancellationToken cancellationToken) =>
 39878            _transportConnection!.ConnectAsync(cancellationToken);
 879
 880        public IProtocolConnection CreateProtocolConnection(
 881            TransportConnectionInformation transportConnectionInformation)
 33882        {
 883            // The protocol connection takes ownership of the transport connection.
 33884            var protocolConnection = new IceProtocolConnection(
 33885                _transportConnection!,
 33886                transportConnectionInformation,
 33887                _options);
 33888            _transportConnection = null;
 33889            return protocolConnection;
 33890        }
 891
 892        public ValueTask DisposeAsync()
 39893        {
 39894            _transportConnection?.Dispose();
 39895            return new();
 39896        }
 897
 898        public Task RefuseTransportConnectionAsync(bool serverBusy, CancellationToken cancellationToken)
 4899        {
 4900            _transportConnection!.Dispose();
 4901            return Task.CompletedTask;
 4902        }
 903
 39904        internal IceConnector(IDuplexConnection transportConnection, ConnectionOptions options)
 39905        {
 39906            _transportConnection = transportConnection;
 39907            _options = options;
 39908        }
 909    }
 910
 911    private class IceRpcConnectorListener : IConnectorListener
 912    {
 139913        public ServerAddress ServerAddress => _listener.ServerAddress;
 914
 915        private readonly IListener<IMultiplexedConnection> _listener;
 916        private readonly ConnectionOptions _options;
 917        private readonly ITaskExceptionObserver? _taskExceptionObserver;
 918
 919        public async Task<(IConnector, EndPoint)> AcceptAsync(CancellationToken cancel)
 375920        {
 375921            (IMultiplexedConnection transportConnection, EndPoint remoteNetworkAddress) = await _listener.AcceptAsync(
 375922                cancel).ConfigureAwait(false);
 100923            return (new IceRpcConnector(transportConnection, _options, _taskExceptionObserver), remoteNetworkAddress);
 100924        }
 925
 103926        public ValueTask DisposeAsync() => _listener.DisposeAsync();
 927
 103928        internal IceRpcConnectorListener(
 103929            IListener<IMultiplexedConnection> listener,
 103930            ConnectionOptions options,
 103931            ITaskExceptionObserver? taskExceptionObserver)
 103932        {
 103933            _listener = listener;
 103934            _options = options;
 103935            _taskExceptionObserver = taskExceptionObserver;
 103936        }
 937    }
 938
 939    private class IceRpcConnector : IConnector
 940    {
 941        private readonly ConnectionOptions _options;
 942        private readonly ITaskExceptionObserver? _taskExceptionObserver;
 943        private IMultiplexedConnection? _transportConnection;
 944
 945        public Task<TransportConnectionInformation> ConnectTransportConnectionAsync(
 946            CancellationToken cancellationToken) =>
 100947            _transportConnection!.ConnectAsync(cancellationToken);
 948
 949        public IProtocolConnection CreateProtocolConnection(
 950            TransportConnectionInformation transportConnectionInformation)
 83951        {
 952            // The protocol connection takes ownership of the transport connection.
 83953            var protocolConnection = new IceRpcProtocolConnection(
 83954                _transportConnection!,
 83955                transportConnectionInformation,
 83956                _options,
 83957                _taskExceptionObserver);
 83958            _transportConnection = null;
 83959            return protocolConnection;
 83960        }
 961
 100962        public ValueTask DisposeAsync() => _transportConnection?.DisposeAsync() ?? new();
 963
 964        public Task RefuseTransportConnectionAsync(bool serverBusy, CancellationToken cancellationToken) =>
 10965            _transportConnection!.CloseAsync(
 10966                serverBusy ? MultiplexedConnectionCloseError.ServerBusy : MultiplexedConnectionCloseError.Refused,
 10967                cancellationToken);
 968
 100969        internal IceRpcConnector(
 100970            IMultiplexedConnection transportConnection,
 100971            ConnectionOptions options,
 100972            ITaskExceptionObserver? taskExceptionObserver)
 100973        {
 100974            _transportConnection = transportConnection;
 100975            _options = options;
 100976            _taskExceptionObserver = taskExceptionObserver;
 100977        }
 978    }
 979}

Methods/Properties

.ctor(IceRpc.ServerOptions,IceRpc.Transports.IDuplexServerTransport,IceRpc.Transports.IMultiplexedServerTransport,Microsoft.Extensions.Logging.ILogger)
.ctor(IceRpc.IDispatcher,System.Net.Security.SslServerAuthenticationOptions,IceRpc.Transports.IDuplexServerTransport,IceRpc.Transports.IMultiplexedServerTransport,Microsoft.Extensions.Logging.ILogger)
.ctor(IceRpc.IDispatcher,IceRpc.ServerAddress,System.Net.Security.SslServerAuthenticationOptions,IceRpc.Transports.IDuplexServerTransport,IceRpc.Transports.IMultiplexedServerTransport,Microsoft.Extensions.Logging.ILogger)
.ctor(IceRpc.IDispatcher,System.Uri,System.Net.Security.SslServerAuthenticationOptions,IceRpc.Transports.IDuplexServerTransport,IceRpc.Transports.IMultiplexedServerTransport,Microsoft.Extensions.Logging.ILogger)
DisposeAsync()
PerformDisposeAsync()
Listen()
ListenAsync()
ConnectAsync()
DisposeDetachedConnectionAsync()
ShutdownWhenRequestedAsync()
ShutdownAsync(System.Threading.CancellationToken)
PerformShutdownAsync()
ToString()
IsRetryableAcceptException(System.Exception)
get_ServerAddress()
AcceptAsync()
DisposeAsync()
.ctor(IceRpc.Server/IConnectorListener,Microsoft.Extensions.Logging.ILogger)
ConnectTransportConnectionAsync()
CreateProtocolConnection(IceRpc.Transports.TransportConnectionInformation)
DisposeAsync()
RefuseTransportConnectionAsync(System.Boolean,System.Threading.CancellationToken)
.ctor(IceRpc.Server/IConnector,IceRpc.ServerAddress,System.Net.EndPoint,Microsoft.Extensions.Logging.ILogger)
get_ServerAddress()
AcceptAsync()
DisposeAsync()
.ctor(IceRpc.Server/IConnectorListener)
ConnectTransportConnectionAsync()
CreateProtocolConnection(IceRpc.Transports.TransportConnectionInformation)
DisposeAsync()
RefuseTransportConnectionAsync()
.ctor(IceRpc.Server/IConnector)
get_ServerAddress()
DisposeAsync()
AcceptAsync()
.ctor(IceRpc.Transports.IListener`1<IceRpc.Transports.IDuplexConnection>,IceRpc.ConnectionOptions)
ConnectTransportConnectionAsync(System.Threading.CancellationToken)
CreateProtocolConnection(IceRpc.Transports.TransportConnectionInformation)
DisposeAsync()
RefuseTransportConnectionAsync(System.Boolean,System.Threading.CancellationToken)
.ctor(IceRpc.Transports.IDuplexConnection,IceRpc.ConnectionOptions)
get_ServerAddress()
AcceptAsync()
DisposeAsync()
.ctor(IceRpc.Transports.IListener`1<IceRpc.Transports.IMultiplexedConnection>,IceRpc.ConnectionOptions,IceRpc.Internal.ITaskExceptionObserver)
ConnectTransportConnectionAsync(System.Threading.CancellationToken)
CreateProtocolConnection(IceRpc.Transports.TransportConnectionInformation)
DisposeAsync()
RefuseTransportConnectionAsync(System.Boolean,System.Threading.CancellationToken)
.ctor(IceRpc.Transports.IMultiplexedConnection,IceRpc.ConnectionOptions,IceRpc.Internal.ITaskExceptionObserver)