< Summary

Information
Class: IceRpc.Server
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Server.cs
Tag: 701_22528036593
Line coverage
92%
Covered lines: 486
Uncovered lines: 40
Coverable lines: 526
Total lines: 980
Line coverage: 92.3%
Branch coverage
90%
Covered branches: 74
Total branches: 82
Branch coverage: 90.2%
Method coverage
98%
Covered methods: 50
Total methods: 51
Method coverage: 98%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)94.44%18.011897.4%
.ctor(...)100%11100%
.ctor(...)100%11100%
.ctor(...)100%11100%
DisposeAsync()100%66100%
PerformDisposeAsync()100%4.06484.21%
Listen()75%4.06484.61%
ListenAsync()90%101098.27%
ConnectAsync()100%1212100%
DisposeDetachedConnectionAsync()83.33%66100%
ShutdownWhenRequestedAsync()100%22100%
ShutdownAsync(...)75%4.05485.71%
PerformShutdownAsync()66.66%9.54653.84%
ToString()100%11100%
IsRetryableAcceptException(...)100%44100%
get_ServerAddress()100%11100%
AcceptAsync()100%1.11152.17%
DisposeAsync()100%11100%
.ctor(...)100%11100%
ConnectTransportConnectionAsync()100%11100%
CreateProtocolConnection(...)100%11100%
DisposeAsync()100%11100%
RefuseTransportConnectionAsync(...)100%210%
.ctor(...)100%11100%
get_ServerAddress()100%11100%
AcceptAsync()100%11100%
DisposeAsync()100%11100%
.ctor(...)100%11100%
ConnectTransportConnectionAsync()100%11100%
CreateProtocolConnection(...)100%11100%
DisposeAsync()100%11100%
RefuseTransportConnectionAsync()100%11100%
.ctor(...)100%11100%
get_ServerAddress()100%11100%
DisposeAsync()100%11100%
AcceptAsync()100%11100%
.ctor(...)100%11100%
ConnectTransportConnectionAsync(...)100%11100%
CreateProtocolConnection(...)100%11100%
DisposeAsync()100%22100%
RefuseTransportConnectionAsync(...)100%11100%
.ctor(...)100%11100%
get_ServerAddress()100%11100%
AcceptAsync()100%11100%
DisposeAsync()100%11100%
.ctor(...)100%11100%
ConnectTransportConnectionAsync(...)100%11100%
CreateProtocolConnection(...)100%11100%
DisposeAsync()100%22100%
RefuseTransportConnectionAsync(...)50%22100%
.ctor(...)100%11100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Server.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using IceRpc.Transports;
 5using Microsoft.Extensions.Logging;
 6using Microsoft.Extensions.Logging.Abstractions;
 7using System.Diagnostics;
 8using System.Net;
 9using System.Net.Security;
 10using System.Security.Authentication;
 11
 12namespace IceRpc;
 13
 14/// <summary>A server accepts connections from clients and dispatches the requests it receives over these connections.
 15/// </summary>
 16public sealed class Server : IAsyncDisposable
 17{
 8118    private readonly LinkedList<IProtocolConnection> _connections = new();
 19
 20    private readonly TimeSpan _connectTimeout;
 21
 22    // A detached connection is a protocol connection that we've decided to connect, or that is connecting, shutting
 23    // down or being disposed. It counts towards _maxConnections and both Server.ShutdownAsync and DisposeAsync wait for
 24    // detached connections to reach 0 using _detachedConnectionsTcs. Such a connection is "detached" because it's not
 25    // in _connections.
 26    private int _detachedConnectionCount;
 27
 8128    private readonly TaskCompletionSource _detachedConnectionsTcs = new();
 29
 30    // A cancellation token source that is canceled by DisposeAsync.
 8131    private readonly CancellationTokenSource _disposedCts = new();
 32
 33    private Task? _disposeTask;
 34
 35    private readonly Func<IConnectorListener> _listenerFactory;
 36
 37    private Task? _listenTask;
 38
 39    private readonly int _maxConnections;
 40
 41    private readonly int _maxPendingConnections;
 42
 8143    private readonly Lock _mutex = new();
 44
 45    private readonly ServerAddress _serverAddress;
 46
 47    // A cancellation token source canceled by ShutdownAsync and DisposeAsync.
 48    private readonly CancellationTokenSource _shutdownCts;
 49
 50    private Task? _shutdownTask;
 51
 52    private readonly TimeSpan _shutdownTimeout;
 53
 54    /// <summary>Constructs a server.</summary>
 55    /// <param name="options">The server options.</param>
 56    /// <param name="duplexServerTransport">The transport used to create ice protocol connections. The <see
 57    /// langword="null" /> value is equivalent to <see cref="IDuplexServerTransport.Default" />.</param>
 58    /// <param name="multiplexedServerTransport">The transport used to create icerpc protocol connections. The <see
 59    /// langword="null" /> value is equivalent to <see cref="IMultiplexedServerTransport.Default" />.</param>
 60    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance"
 61    /// />.</param>
 8162    public Server(
 8163        ServerOptions options,
 8164        IDuplexServerTransport? duplexServerTransport = null,
 8165        IMultiplexedServerTransport? multiplexedServerTransport = null,
 8166        ILogger? logger = null)
 8167    {
 8168        if (options.ConnectionOptions.Dispatcher is null)
 069        {
 070            throw new ArgumentException($"{nameof(ServerOptions.ConnectionOptions.Dispatcher)} cannot be null");
 71        }
 72
 8173        logger ??= NullLogger.Instance;
 74
 8175        _shutdownCts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
 76
 8177        duplexServerTransport ??= IDuplexServerTransport.Default;
 8178        multiplexedServerTransport ??= IMultiplexedServerTransport.Default;
 8179        _maxConnections = options.MaxConnections;
 8180        _maxPendingConnections = options.MaxPendingConnections;
 81
 8182        _connectTimeout = options.ConnectTimeout;
 8183        _shutdownTimeout = options.ShutdownTimeout;
 84
 8185        _serverAddress = options.ServerAddress;
 8186        if (_serverAddress.Transport is null)
 7587        {
 7588            _serverAddress = _serverAddress with
 7589            {
 7590                Transport = _serverAddress.Protocol == Protocol.Ice ?
 7591                    duplexServerTransport.Name : multiplexedServerTransport.Name
 7592            };
 7593        }
 94
 8195        _listenerFactory = () =>
 8096        {
 8197            IConnectorListener listener;
 8098            if (_serverAddress.Protocol == Protocol.Ice)
 2199            {
 21100                IListener<IDuplexConnection> transportListener = duplexServerTransport.Listen(
 21101                    _serverAddress,
 21102                    new DuplexConnectionOptions
 21103                    {
 21104                        MinSegmentSize = options.ConnectionOptions.MinSegmentSize,
 21105                        Pool = options.ConnectionOptions.Pool,
 21106                    },
 21107                    options.ServerAuthenticationOptions);
 81108
 21109                listener = new IceConnectorListener(
 21110                    transportListener,
 21111                    options.ConnectionOptions);
 21112            }
 81113            else
 59114            {
 59115                IListener<IMultiplexedConnection> transportListener = multiplexedServerTransport.Listen(
 59116                    _serverAddress,
 59117                    new MultiplexedConnectionOptions
 59118                    {
 59119                        HandshakeTimeout = options.ConnectTimeout,
 59120                        MaxBidirectionalStreams = options.ConnectionOptions.MaxIceRpcBidirectionalStreams,
 59121                        // Add an additional stream for the icerpc protocol control stream.
 59122                        MaxUnidirectionalStreams = options.ConnectionOptions.MaxIceRpcUnidirectionalStreams + 1,
 59123                        MinSegmentSize = options.ConnectionOptions.MinSegmentSize,
 59124                        Pool = options.ConnectionOptions.Pool
 59125                    },
 59126                    options.ServerAuthenticationOptions);
 81127
 59128                listener = new IceRpcConnectorListener(
 59129                    transportListener,
 59130                    options.ConnectionOptions,
 59131                    logger == NullLogger.Instance ? null : new LogTaskExceptionObserver(logger));
 59132            }
 81133
 80134            listener = new MetricsConnectorListenerDecorator(listener);
 80135            if (logger != NullLogger.Instance)
 10136            {
 10137                listener = new LogConnectorListenerDecorator(listener, logger);
 10138            }
 80139            return listener;
 161140        };
 81141    }
 142
 143    /// <summary>Constructs a server with the specified dispatcher and authentication options. All other properties
 144    /// use the <see cref="ServerOptions" /> defaults.</summary>
 145    /// <param name="dispatcher">The dispatcher of the server.</param>
 146    /// <param name="serverAuthenticationOptions">The SSL server authentication options. When not <see langword="null"
 147    /// />, the server will accept only secure connections.</param>
 148    /// <param name="duplexServerTransport">The transport used to create ice protocol connections. <see langword="null"
 149    /// /> is equivalent to <see cref="IDuplexServerTransport.Default" />.</param>
 150    /// <param name="multiplexedServerTransport">The transport used to create icerpc protocol connections. <see
 151    /// langword="null" /> is equivalent to <see cref="IMultiplexedServerTransport.Default" />.</param>
 152    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance"
 153    /// />.</param>
 154    public Server(
 155        IDispatcher dispatcher,
 156        SslServerAuthenticationOptions? serverAuthenticationOptions = null,
 157        IDuplexServerTransport? duplexServerTransport = null,
 158        IMultiplexedServerTransport? multiplexedServerTransport = null,
 159        ILogger? logger = null)
 1160        : this(
 1161            new ServerOptions
 1162            {
 1163                ServerAuthenticationOptions = serverAuthenticationOptions,
 1164                ConnectionOptions = new()
 1165                {
 1166                    Dispatcher = dispatcher,
 1167                }
 1168            },
 1169            duplexServerTransport,
 1170            multiplexedServerTransport,
 1171            logger)
 1172    {
 1173    }
 174
 175    /// <summary>Constructs a server with the specified dispatcher, server address and authentication options. All
 176    /// other properties use the <see cref="ServerOptions" /> defaults.</summary>
 177    /// <param name="dispatcher">The dispatcher of the server.</param>
 178    /// <param name="serverAddress">The server address of the server.</param>
 179    /// <param name="serverAuthenticationOptions">The SSL server authentication options. When not <see langword="null"
 180    /// />, the server will accept only secure connections.</param>
 181    /// <param name="duplexServerTransport">The transport used to create ice protocol connections. <see langword="null"
 182    /// /> is equivalent to <see cref="IDuplexServerTransport.Default" />.</param>
 183    /// <param name="multiplexedServerTransport">The transport used to create icerpc protocol connections. <see
 184    /// langword="null" /> is equivalent to <see cref="IMultiplexedServerTransport.Default" />.</param>
 185    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance"
 186    /// />.</param>
 187    public Server(
 188        IDispatcher dispatcher,
 189        ServerAddress serverAddress,
 190        SslServerAuthenticationOptions? serverAuthenticationOptions = null,
 191        IDuplexServerTransport? duplexServerTransport = null,
 192        IMultiplexedServerTransport? multiplexedServerTransport = null,
 193        ILogger? logger = null)
 25194        : this(
 25195            new ServerOptions
 25196            {
 25197                ServerAuthenticationOptions = serverAuthenticationOptions,
 25198                ConnectionOptions = new()
 25199                {
 25200                    Dispatcher = dispatcher,
 25201                },
 25202                ServerAddress = serverAddress
 25203            },
 25204            duplexServerTransport,
 25205            multiplexedServerTransport,
 25206            logger)
 25207    {
 25208    }
 209
 210    /// <summary>Constructs a server with the specified dispatcher, server address URI and authentication options. All
 211    /// other properties use the <see cref="ServerOptions" /> defaults.</summary>
 212    /// <param name="dispatcher">The dispatcher of the server.</param>
 213    /// <param name="serverAddressUri">A URI that represents the server address of the server.</param>
 214    /// <param name="serverAuthenticationOptions">The SSL server authentication options. When not <see langword="null"
 215    /// />, the server will accept only secure connections.</param>
 216    /// <param name="duplexServerTransport">The transport used to create ice protocol connections. <see langword="null"
 217    /// /> is equivalent to <see cref="IDuplexServerTransport.Default" />.</param>
 218    /// <param name="multiplexedServerTransport">The transport used to create icerpc protocol connections. <see
 219    /// langword="null" /> is equivalent to <see cref="IMultiplexedServerTransport.Default" />.</param>
 220    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance"
 221    /// />.</param>
 222    public Server(
 223        IDispatcher dispatcher,
 224        Uri serverAddressUri,
 225        SslServerAuthenticationOptions? serverAuthenticationOptions = null,
 226        IDuplexServerTransport? duplexServerTransport = null,
 227        IMultiplexedServerTransport? multiplexedServerTransport = null,
 228        ILogger? logger = null)
 5229        : this(
 5230            dispatcher,
 5231            new ServerAddress(serverAddressUri),
 5232            serverAuthenticationOptions,
 5233            duplexServerTransport,
 5234            multiplexedServerTransport,
 5235            logger)
 5236    {
 5237    }
 238
 239    /// <summary>Releases all resources allocated by this server. The server stops listening for new connections and
 240    /// disposes the connections it accepted from clients.</summary>
 241    /// <returns>A value task that completes when the disposal of all connections accepted by the server has completed.
 242    /// This includes connections that were active when this method is called and connections whose disposal was
 243    /// initiated prior to this call.</returns>
 244    /// <remarks>The disposal of an underlying connection of the server aborts invocations, cancels dispatches and
 245    /// disposes the underlying transport connection without waiting for the peer. To wait for invocations and
 246    /// dispatches to complete, call <see cref="ShutdownAsync" /> first. If the configured dispatcher does not complete
 247    /// promptly when its cancellation token is canceled, the disposal can hang.</remarks>
 248    public ValueTask DisposeAsync()
 82249    {
 250        lock (_mutex)
 82251        {
 82252            if (_disposeTask is null)
 81253            {
 81254                _shutdownTask ??= Task.CompletedTask;
 81255                if (_detachedConnectionCount == 0)
 73256                {
 73257                    _ = _detachedConnectionsTcs.TrySetResult();
 73258                }
 259
 81260                _disposeTask = PerformDisposeAsync();
 81261            }
 82262            return new(_disposeTask);
 263        }
 264
 265        async Task PerformDisposeAsync()
 81266        {
 81267            await Task.Yield(); // exit mutex lock
 268
 81269            _disposedCts.Cancel();
 270
 271            // _listenTask etc are immutable when _disposeTask is not null.
 272
 81273            if (_listenTask is not null)
 80274            {
 275                // Wait for shutdown before disposing connections.
 276                try
 80277                {
 80278                    await Task.WhenAll(_listenTask, _shutdownTask).ConfigureAwait(false);
 80279                }
 0280                catch
 0281                {
 282                    // Ignore exceptions.
 0283                }
 284
 80285                await Task.WhenAll(
 80286                    _connections
 42287                        .Select(connection => connection.DisposeAsync().AsTask())
 80288                        .Append(_detachedConnectionsTcs.Task)).ConfigureAwait(false);
 80289            }
 290
 81291            _disposedCts.Dispose();
 81292            _shutdownCts.Dispose();
 81293        }
 82294    }
 295
 296    /// <summary>Starts accepting connections on the configured server address. Requests received over these connections
 297    /// are then dispatched by the configured dispatcher.</summary>
 298    /// <returns>The server address this server is listening on and that a client would connect to. This address is the
 299    /// same as the <see cref="ServerOptions.ServerAddress" /> of <see cref="ServerOptions" /> except its
 300    /// <see cref="ServerAddress.Transport" /> property is always non-null and its port number is never 0 when the host
 301    /// is an IP address.</returns>
 302    /// <exception cref="IceRpcException">Thrown when the server transport fails to listen on the configured <see
 303    /// cref="ServerOptions.ServerAddress" />.</exception>
 304    /// <exception cref="InvalidOperationException">Thrown when the server is already listening, shut down or shutting
 305    /// down.</exception>
 306    /// <exception cref="ObjectDisposedException">Throw when the server is disposed.</exception>
 307    /// <remarks><see cref="Listen" /> can also throw exceptions from the transport; for example, the transport can
 308    /// reject the server address.</remarks>
 309    public ServerAddress Listen()
 82310    {
 311        lock (_mutex)
 82312        {
 82313            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 314
 81315            if (_shutdownTask is not null)
 0316            {
 0317                throw new InvalidOperationException($"Server '{this}' is shut down or shutting down.");
 318            }
 81319            if (_listenTask is not null)
 1320            {
 1321                throw new InvalidOperationException($"Server '{this}' is already listening.");
 322            }
 323
 80324            IConnectorListener listener = _listenerFactory();
 80325            _listenTask = ListenAsync(listener); // _listenTask owns listener and must dispose it
 80326            return listener.ServerAddress;
 327        }
 328
 329        async Task ListenAsync(IConnectorListener listener)
 80330        {
 80331            await Task.Yield(); // exit mutex lock
 332
 333            try
 80334            {
 80335                using var pendingConnectionSemaphore = new SemaphoreSlim(
 80336                    _maxPendingConnections,
 80337                    _maxPendingConnections);
 338
 160339                while (!_shutdownCts.IsCancellationRequested)
 160340                {
 160341                    await pendingConnectionSemaphore.WaitAsync(_shutdownCts.Token).ConfigureAwait(false);
 342
 159343                    IConnector? connector = null;
 344                    do
 249345                    {
 346                        try
 249347                        {
 249348                            (connector, _) = await listener.AcceptAsync(_shutdownCts.Token).ConfigureAwait(false);
 80349                        }
 169350                        catch (Exception exception) when (IsRetryableAcceptException(exception))
 90351                        {
 352                            // continue
 90353                        }
 170354                    }
 170355                    while (connector is null);
 356
 357                    // We don't wait for the connection to be activated or shutdown. This could take a while for some
 358                    // transports such as TLS based transports where the handshake requires few round trips between the
 359                    // client and server. Waiting could also cause a security issue if the client doesn't respond to the
 360                    // connection initialization as we wouldn't be able to accept new connections in the meantime. The
 361                    // call will eventually timeout if the ConnectTimeout expires.
 80362                    CancellationToken cancellationToken = _disposedCts.Token;
 80363                    _ = Task.Run(
 80364                        async () =>
 80365                        {
 80366                            try
 80367                            {
 80368                                await ConnectAsync(connector, cancellationToken).ConfigureAwait(false);
 73369                            }
 7370                            catch
 7371                            {
 80372                                // Ignore connection establishment failure. This failures are logged by the
 80373                                // LogConnectorDecorator
 7374                            }
 80375                            finally
 80376                            {
 80377                                // The connection dispose will dispose the transport connection if it has not been
 80378                                // adopted by the protocol connection.
 80379                                await connector.DisposeAsync().ConfigureAwait(false);
 80380
 80381                                // The pending connection semaphore is disposed by the listen task completion once
 80382                                // shutdown / dispose is initiated.
 80383                                lock (_mutex)
 80384                                {
 80385                                    if (_shutdownTask is null)
 73386                                    {
 73387                                        pendingConnectionSemaphore.Release();
 73388                                    }
 80389                                }
 80390                            }
 80391                        },
 80392                        CancellationToken.None); // the task must run to dispose the connector.
 80393                }
 0394            }
 80395            catch
 80396            {
 397                // Ignore. Exceptions thrown by listener.AcceptAsync are logged by the log decorator when appropriate.
 80398            }
 399            finally
 80400            {
 80401                await listener.DisposeAsync().ConfigureAwait(false);
 80402            }
 403
 404            async Task ConnectAsync(IConnector connector, CancellationToken cancellationToken)
 80405            {
 80406                using var connectCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
 80407                connectCts.CancelAfter(_connectTimeout);
 408
 409                // Connect the transport connection first. This connection establishment can be interrupted by the
 410                // connect timeout or the server ShutdownAsync/DisposeAsync.
 80411                TransportConnectionInformation transportConnectionInformation =
 80412                    await connector.ConnectTransportConnectionAsync(connectCts.Token).ConfigureAwait(false);
 413
 75414                IProtocolConnection? protocolConnection = null;
 75415                bool serverBusy = false;
 416
 417                lock (_mutex)
 75418                {
 75419                    Debug.Assert(
 75420                        _maxConnections == 0 || _connections.Count + _detachedConnectionCount <= _maxConnections);
 421
 75422                    if (_shutdownTask is null)
 75423                    {
 75424                        if (_maxConnections > 0 && (_connections.Count + _detachedConnectionCount) == _maxConnections)
 7425                        {
 7426                            serverBusy = true;
 7427                        }
 428                        else
 68429                        {
 430                            // The protocol connection adopts the transport connection from the connector and it's
 431                            // now responsible for disposing of it.
 68432                            protocolConnection = connector.CreateProtocolConnection(transportConnectionInformation);
 68433                            _detachedConnectionCount++;
 68434                        }
 75435                    }
 75436                }
 437
 75438                if (protocolConnection is null)
 7439                {
 440                    try
 7441                    {
 7442                        await connector.RefuseTransportConnectionAsync(serverBusy, connectCts.Token)
 7443                            .ConfigureAwait(false);
 5444                    }
 2445                    catch
 2446                    {
 447                        // ignore and continue
 2448                    }
 449                    // The transport connection is disposed by the disposal of the connector.
 7450                }
 451                else
 68452                {
 453                    Task shutdownRequested;
 454                    try
 68455                    {
 68456                        (_, shutdownRequested) = await protocolConnection.ConnectAsync(connectCts.Token)
 68457                            .ConfigureAwait(false);
 66458                    }
 2459                    catch
 2460                    {
 2461                        await DisposeDetachedConnectionAsync(protocolConnection, withShutdown: false)
 2462                            .ConfigureAwait(false);
 2463                        throw;
 464                    }
 465
 66466                    LinkedListNode<IProtocolConnection>? listNode = null;
 467
 468                    lock (_mutex)
 469                    {
 66470                        if (_shutdownTask is null)
 471                        {
 65472                            listNode = _connections.AddLast(protocolConnection);
 473
 474                            // protocolConnection is no longer a detached connection since it's now "attached" in
 475                            // _connections.
 65476                            _detachedConnectionCount--;
 477                        }
 66478                    }
 479
 66480                    if (listNode is null)
 481                    {
 1482                        await DisposeDetachedConnectionAsync(protocolConnection, withShutdown: true)
 1483                            .ConfigureAwait(false);
 484                    }
 485                    else
 65486                    {
 487                        // Schedule removal after successful ConnectAsync.
 65488                        _ = ShutdownWhenRequestedAsync(protocolConnection, shutdownRequested, listNode);
 489                    }
 66490                }
 491            }
 492        }
 493
 494        async Task DisposeDetachedConnectionAsync(IProtocolConnection connection, bool withShutdown)
 26495        {
 26496            if (withShutdown)
 24497            {
 498                // _disposedCts is not disposed since we own a _backgroundConnectionDisposeCount.
 24499                using var cts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
 24500                cts.CancelAfter(_shutdownTimeout);
 501
 502                try
 24503                {
 504                    // Can be canceled by DisposeAsync or the shutdown timeout.
 24505                    await connection.ShutdownAsync(cts.Token).ConfigureAwait(false);
 13506                }
 11507                catch
 11508                {
 509                    // Ignore connection shutdown failures. connection.ShutdownAsync makes sure it's an "expected"
 510                    // exception.
 11511                }
 24512            }
 513
 26514            await connection.DisposeAsync().ConfigureAwait(false);
 515            lock (_mutex)
 26516            {
 26517                if (--_detachedConnectionCount == 0 && _shutdownTask is not null)
 11518                {
 11519                    _detachedConnectionsTcs.SetResult();
 11520                }
 26521            }
 26522        }
 523
 524        // Remove the connection from _connections after a successful ConnectAsync.
 525        async Task ShutdownWhenRequestedAsync(
 526            IProtocolConnection connection,
 527            Task shutdownRequested,
 528            LinkedListNode<IProtocolConnection> listNode)
 65529        {
 65530            await shutdownRequested.ConfigureAwait(false);
 531
 532            lock (_mutex)
 51533            {
 51534                if (_shutdownTask is null)
 23535                {
 23536                    _connections.Remove(listNode);
 23537                    _detachedConnectionCount++;
 23538                }
 539                else
 28540                {
 541                    // _connections is immutable and ShutdownAsync/DisposeAsync is responsible to shutdown/dispose
 542                    // this connection.
 28543                    return;
 544                }
 23545            }
 546
 23547            await DisposeDetachedConnectionAsync(connection, withShutdown: true).ConfigureAwait(false);
 51548        }
 233549    }
 550
 551    /// <summary>Gracefully shuts down this server: the server stops accepting new connections and shuts down gracefully
 552    /// all its connections.</summary>
 553    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 554    /// <returns>A task that completes successfully once the shutdown of all connections accepted by the server has
 555    /// completed. This includes connections that were active when this method is called and connections whose shutdown
 556    /// was initiated prior to this call. This task can also complete with one of the following exceptions:
 557    /// <list type="bullet">
 558    /// <item><description><see cref="IceRpcException" /> with error <see cref="IceRpcError.OperationAborted" /> if the
 559    /// server is disposed while being shut down.</description></item>
 560    /// <item><description><see cref="OperationCanceledException" /> if cancellation was requested through the
 561    /// cancellation token.</description></item>
 562    /// <item><description><see cref="TimeoutException" /> if the shutdown timed out.</description></item>
 563    /// </list>
 564    /// </returns>
 565    /// <exception cref="InvalidOperationException">Thrown if this method is called more than once.</exception>
 566    /// <exception cref="ObjectDisposedException">Thrown if the server is disposed.</exception>
 567    public Task ShutdownAsync(CancellationToken cancellationToken = default)
 23568    {
 569        lock (_mutex)
 23570        {
 23571            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 572
 23573            if (_shutdownTask is not null)
 0574            {
 0575                throw new InvalidOperationException($"Server '{this}' is shut down or shutting down.");
 576            }
 577
 23578            if (_detachedConnectionCount == 0)
 20579            {
 20580                _detachedConnectionsTcs.SetResult();
 20581            }
 582
 23583            _shutdownTask = PerformShutdownAsync();
 23584        }
 23585        return _shutdownTask;
 586
 587        async Task PerformShutdownAsync()
 23588        {
 23589            await Task.Yield(); // exit mutex lock
 590
 23591            _shutdownCts.Cancel();
 592
 593            // _listenTask is immutable once _shutdownTask is not null.
 23594            if (_listenTask is not null)
 23595            {
 596                try
 23597                {
 23598                    using var cts = CancellationTokenSource.CreateLinkedTokenSource(
 23599                        cancellationToken,
 23600                        _disposedCts.Token);
 601
 23602                    cts.CancelAfter(_shutdownTimeout);
 603
 604                    try
 23605                    {
 23606                        await Task.WhenAll(
 23607                            _connections
 12608                                .Select(connection => connection.ShutdownAsync(cts.Token))
 23609                                .Append(_listenTask.WaitAsync(cts.Token))
 23610                                .Append(_detachedConnectionsTcs.Task.WaitAsync(cts.Token)))
 23611                            .ConfigureAwait(false);
 23612                    }
 0613                    catch (OperationCanceledException)
 0614                    {
 0615                        throw;
 616                    }
 0617                    catch
 0618                    {
 619                        // Ignore _listenTask and connection shutdown exceptions
 620
 621                        // Throw OperationCanceledException if this WhenAll exception is hiding an OCE.
 0622                        cts.Token.ThrowIfCancellationRequested();
 0623                    }
 23624                }
 0625                catch (OperationCanceledException)
 0626                {
 0627                    cancellationToken.ThrowIfCancellationRequested();
 628
 0629                    if (_disposedCts.IsCancellationRequested)
 0630                    {
 0631                        throw new IceRpcException(
 0632                            IceRpcError.OperationAborted,
 0633                            "The shutdown was aborted because the server was disposed.");
 634                    }
 635                    else
 0636                    {
 0637                        throw new TimeoutException(
 0638                            $"The server shut down timed out after {_shutdownTimeout.TotalSeconds} s.");
 639                    }
 640                }
 23641            }
 23642        }
 23643    }
 644
 645    /// <summary>Returns a string that represents this server.</summary>
 646    /// <returns>A string that represents this server.</returns>
 1647    public override string ToString() => _serverAddress.ToString();
 648
 649    /// <summary>Returns true if the <see cref="IConnectorListener.AcceptAsync" /> failure can be retried.</summary>
 650    private static bool IsRetryableAcceptException(Exception exception) =>
 651        // Transports such as QUIC do the SSL handshake when the connection is accepted, this can throw
 652        // AuthenticationException if it fails.
 169653        exception is IceRpcException or AuthenticationException;
 654
 655    /// <summary>Provides a decorator that adds logging to a <see cref="IConnectorListener" />.</summary>
 656    private class LogConnectorListenerDecorator : IConnectorListener
 657    {
 46658        public ServerAddress ServerAddress => _decoratee.ServerAddress;
 659
 660        private readonly IConnectorListener _decoratee;
 661        private readonly ILogger _logger;
 662
 663        public async Task<(IConnector, EndPoint)> AcceptAsync(CancellationToken cancellationToken)
 18664        {
 665            try
 18666            {
 18667                (IConnector connector, EndPoint remoteNetworkAddress) =
 18668                    await _decoratee.AcceptAsync(cancellationToken).ConfigureAwait(false);
 669
 8670                _logger.LogConnectionAccepted(ServerAddress, remoteNetworkAddress);
 8671                return (
 8672                    new LogConnectorDecorator(connector, ServerAddress, remoteNetworkAddress, _logger),
 8673                    remoteNetworkAddress);
 674            }
 10675            catch (OperationCanceledException exception) when (exception.CancellationToken == cancellationToken)
 10676            {
 677                // Do not log this exception. The AcceptAsync call can fail with OperationCanceledException during
 678                // shutdown once the shutdown cancellation token is canceled.
 10679                throw;
 680            }
 0681            catch (ObjectDisposedException)
 0682            {
 683                // Do not log this exception. The AcceptAsync call can fail with ObjectDisposedException during
 684                // shutdown once the listener is disposed or if it is accepting a connection while the listener is
 685                // disposed.
 0686                throw;
 687            }
 0688            catch (Exception exception) when (IsRetryableAcceptException(exception))
 0689            {
 0690                _logger.LogConnectionAcceptFailedWithRetryableException(ServerAddress, exception);
 0691                throw;
 692            }
 0693            catch (Exception exception)
 0694            {
 0695                _logger.LogConnectionAcceptFailed(ServerAddress, exception);
 0696                throw;
 697            }
 8698        }
 699
 700        public ValueTask DisposeAsync()
 10701        {
 10702            _logger.LogStopAcceptingConnections(ServerAddress);
 10703            return _decoratee.DisposeAsync();
 10704        }
 705
 10706        internal LogConnectorListenerDecorator(IConnectorListener decoratee, ILogger logger)
 10707        {
 10708            _decoratee = decoratee;
 10709            _logger = logger;
 10710            _logger.LogStartAcceptingConnections(ServerAddress);
 10711        }
 712    }
 713
 714    private class LogConnectorDecorator : IConnector
 715    {
 716        private readonly IConnector _decoratee;
 717        private readonly ILogger _logger;
 718        private readonly EndPoint _remoteNetworkAddress;
 719        private readonly ServerAddress _serverAddress;
 720
 721        public async Task<TransportConnectionInformation> ConnectTransportConnectionAsync(
 722            CancellationToken cancellationToken)
 8723        {
 724            try
 8725            {
 8726                return await _decoratee.ConnectTransportConnectionAsync(cancellationToken).ConfigureAwait(false);
 727            }
 2728            catch (Exception exception)
 2729            {
 2730                _logger.LogConnectionConnectFailed(_serverAddress, _remoteNetworkAddress, exception);
 2731                throw;
 732            }
 6733        }
 734
 735        public IProtocolConnection CreateProtocolConnection(
 736            TransportConnectionInformation transportConnectionInformation) =>
 6737            new LogProtocolConnectionDecorator(
 6738                _decoratee.CreateProtocolConnection(transportConnectionInformation),
 6739                _serverAddress,
 6740                _remoteNetworkAddress,
 6741                _logger);
 742
 8743        public ValueTask DisposeAsync() => _decoratee.DisposeAsync();
 744
 745        public Task RefuseTransportConnectionAsync(bool serverBusy, CancellationToken cancel) =>
 0746            _decoratee.RefuseTransportConnectionAsync(serverBusy, cancel);
 747
 8748        internal LogConnectorDecorator(
 8749            IConnector decoratee,
 8750            ServerAddress serverAddress,
 8751            EndPoint remoteNetworkAddress,
 8752            ILogger logger)
 8753        {
 8754            _decoratee = decoratee;
 8755            _logger = logger;
 8756            _serverAddress = serverAddress;
 8757            _remoteNetworkAddress = remoteNetworkAddress;
 8758        }
 759    }
 760
 761    /// <summary>Provides a decorator that adds metrics to a <see cref="IConnectorListener" />.</summary>
 762    private class MetricsConnectorListenerDecorator : IConnectorListener
 763    {
 116764        public ServerAddress ServerAddress => _decoratee.ServerAddress;
 765
 766        private readonly IConnectorListener _decoratee;
 767
 768        public async Task<(IConnector, EndPoint)> AcceptAsync(
 769            CancellationToken cancellationToken)
 249770        {
 249771            (IConnector connector, EndPoint remoteNetworkAddress) =
 249772                await _decoratee.AcceptAsync(cancellationToken).ConfigureAwait(false);
 80773            return (new MetricsConnectorDecorator(connector), remoteNetworkAddress);
 80774        }
 775
 80776        public ValueTask DisposeAsync() => _decoratee.DisposeAsync();
 777
 80778        internal MetricsConnectorListenerDecorator(IConnectorListener decoratee) =>
 80779            _decoratee = decoratee;
 780    }
 781
 782    private class MetricsConnectorDecorator : IConnector
 783    {
 784        private readonly IConnector _decoratee;
 785
 786        public async Task<TransportConnectionInformation> ConnectTransportConnectionAsync(
 787            CancellationToken cancellationToken)
 80788        {
 80789            Metrics.ServerMetrics.ConnectStart();
 790            try
 80791            {
 80792                return await _decoratee.ConnectTransportConnectionAsync(cancellationToken).ConfigureAwait(false);
 793            }
 5794            catch
 5795            {
 5796                Metrics.ServerMetrics.ConnectStop();
 5797                Metrics.ServerMetrics.ConnectionFailure();
 5798                throw;
 799            }
 75800        }
 801
 802        public IProtocolConnection CreateProtocolConnection(
 803            TransportConnectionInformation transportConnectionInformation) =>
 68804                new MetricsProtocolConnectionDecorator(
 68805                    _decoratee.CreateProtocolConnection(transportConnectionInformation),
 68806                    Metrics.ServerMetrics,
 68807                    connectStarted: true);
 808
 80809        public ValueTask DisposeAsync() => _decoratee.DisposeAsync();
 810
 811        public async Task RefuseTransportConnectionAsync(bool serverBusy, CancellationToken cancel)
 7812        {
 813            try
 7814            {
 7815                await _decoratee.RefuseTransportConnectionAsync(serverBusy, cancel).ConfigureAwait(false);
 5816            }
 817            finally
 7818            {
 7819                Metrics.ServerMetrics.ConnectionFailure();
 7820                Metrics.ServerMetrics.ConnectStop();
 7821            }
 5822        }
 823
 160824        internal MetricsConnectorDecorator(IConnector decoratee) => _decoratee = decoratee;
 825    }
 826
 827    /// <summary>A connector listener accepts a transport connection and returns a <see cref="IConnector" />. The
 828    /// connector is used to refuse the transport connection or obtain a protocol connection once the transport
 829    /// connection is connected.</summary>
 830    private interface IConnectorListener : IAsyncDisposable
 831    {
 832        ServerAddress ServerAddress { get; }
 833
 834        Task<(IConnector, EndPoint)> AcceptAsync(CancellationToken cancel);
 835    }
 836
 837    /// <summary>A connector is returned by <see cref="IConnectorListener" />. The connector allows to connect the
 838    /// transport connection. If successful, the transport connection can either be refused or accepted by creating the
 839    /// protocol connection out of it.</summary>
 840    private interface IConnector : IAsyncDisposable
 841    {
 842        Task<TransportConnectionInformation> ConnectTransportConnectionAsync(CancellationToken cancellationToken);
 843
 844        IProtocolConnection CreateProtocolConnection(TransportConnectionInformation transportConnectionInformation);
 845
 846        Task RefuseTransportConnectionAsync(bool serverBusy, CancellationToken cancel);
 847    }
 848
 849    private class IceConnectorListener : IConnectorListener
 850    {
 39851        public ServerAddress ServerAddress => _listener.ServerAddress;
 852
 853        private readonly IListener<IDuplexConnection> _listener;
 854        private readonly ConnectionOptions _options;
 855
 21856        public ValueTask DisposeAsync() => _listener.DisposeAsync();
 857
 858        public async Task<(IConnector, EndPoint)> AcceptAsync(CancellationToken cancel)
 43859        {
 43860            (IDuplexConnection transportConnection, EndPoint remoteNetworkAddress) = await _listener.AcceptAsync(
 43861                cancel).ConfigureAwait(false);
 22862            return (new IceConnector(transportConnection, _options), remoteNetworkAddress);
 22863        }
 864
 21865        internal IceConnectorListener(IListener<IDuplexConnection> listener, ConnectionOptions options)
 21866        {
 21867            _listener = listener;
 21868            _options = options;
 21869        }
 870    }
 871
 872    private class IceConnector : IConnector
 873    {
 874        private readonly ConnectionOptions _options;
 875        private IDuplexConnection? _transportConnection;
 876
 877        public Task<TransportConnectionInformation> ConnectTransportConnectionAsync(
 878            CancellationToken cancellationToken) =>
 22879            _transportConnection!.ConnectAsync(cancellationToken);
 880
 881        public IProtocolConnection CreateProtocolConnection(
 882            TransportConnectionInformation transportConnectionInformation)
 19883        {
 884            // The protocol connection takes ownership of the transport connection.
 19885            var protocolConnection = new IceProtocolConnection(
 19886                _transportConnection!,
 19887                transportConnectionInformation,
 19888                _options);
 19889            _transportConnection = null;
 19890            return protocolConnection;
 19891        }
 892
 893        public ValueTask DisposeAsync()
 22894        {
 22895            _transportConnection?.Dispose();
 22896            return new();
 22897        }
 898
 899        public Task RefuseTransportConnectionAsync(bool serverBusy, CancellationToken cancellationToken)
 2900        {
 2901            _transportConnection!.Dispose();
 2902            return Task.CompletedTask;
 2903        }
 904
 22905        internal IceConnector(IDuplexConnection transportConnection, ConnectionOptions options)
 22906        {
 22907            _transportConnection = transportConnection;
 22908            _options = options;
 22909        }
 910    }
 911
 912    private class IceRpcConnectorListener : IConnectorListener
 913    {
 77914        public ServerAddress ServerAddress => _listener.ServerAddress;
 915
 916        private readonly IListener<IMultiplexedConnection> _listener;
 917        private readonly ConnectionOptions _options;
 918        private readonly ITaskExceptionObserver? _taskExceptionObserver;
 919
 920        public async Task<(IConnector, EndPoint)> AcceptAsync(CancellationToken cancel)
 206921        {
 206922            (IMultiplexedConnection transportConnection, EndPoint remoteNetworkAddress) = await _listener.AcceptAsync(
 206923                cancel).ConfigureAwait(false);
 58924            return (new IceRpcConnector(transportConnection, _options, _taskExceptionObserver), remoteNetworkAddress);
 58925        }
 926
 59927        public ValueTask DisposeAsync() => _listener.DisposeAsync();
 928
 59929        internal IceRpcConnectorListener(
 59930            IListener<IMultiplexedConnection> listener,
 59931            ConnectionOptions options,
 59932            ITaskExceptionObserver? taskExceptionObserver)
 59933        {
 59934            _listener = listener;
 59935            _options = options;
 59936            _taskExceptionObserver = taskExceptionObserver;
 59937        }
 938    }
 939
 940    private class IceRpcConnector : IConnector
 941    {
 942        private readonly ConnectionOptions _options;
 943        private readonly ITaskExceptionObserver? _taskExceptionObserver;
 944        private IMultiplexedConnection? _transportConnection;
 945
 946        public Task<TransportConnectionInformation> ConnectTransportConnectionAsync(
 947            CancellationToken cancellationToken) =>
 58948            _transportConnection!.ConnectAsync(cancellationToken);
 949
 950        public IProtocolConnection CreateProtocolConnection(
 951            TransportConnectionInformation transportConnectionInformation)
 49952        {
 953            // The protocol connection takes ownership of the transport connection.
 49954            var protocolConnection = new IceRpcProtocolConnection(
 49955                _transportConnection!,
 49956                transportConnectionInformation,
 49957                _options,
 49958                _taskExceptionObserver);
 49959            _transportConnection = null;
 49960            return protocolConnection;
 49961        }
 962
 58963        public ValueTask DisposeAsync() => _transportConnection?.DisposeAsync() ?? new();
 964
 965        public Task RefuseTransportConnectionAsync(bool serverBusy, CancellationToken cancellationToken) =>
 5966            _transportConnection!.CloseAsync(
 5967                serverBusy ? MultiplexedConnectionCloseError.ServerBusy : MultiplexedConnectionCloseError.Refused,
 5968                cancellationToken);
 969
 58970        internal IceRpcConnector(
 58971            IMultiplexedConnection transportConnection,
 58972            ConnectionOptions options,
 58973            ITaskExceptionObserver? taskExceptionObserver)
 58974        {
 58975            _transportConnection = transportConnection;
 58976            _options = options;
 58977            _taskExceptionObserver = taskExceptionObserver;
 58978        }
 979    }
 980}

Methods/Properties

.ctor(IceRpc.ServerOptions,IceRpc.Transports.IDuplexServerTransport,IceRpc.Transports.IMultiplexedServerTransport,Microsoft.Extensions.Logging.ILogger)
.ctor(IceRpc.IDispatcher,System.Net.Security.SslServerAuthenticationOptions,IceRpc.Transports.IDuplexServerTransport,IceRpc.Transports.IMultiplexedServerTransport,Microsoft.Extensions.Logging.ILogger)
.ctor(IceRpc.IDispatcher,IceRpc.ServerAddress,System.Net.Security.SslServerAuthenticationOptions,IceRpc.Transports.IDuplexServerTransport,IceRpc.Transports.IMultiplexedServerTransport,Microsoft.Extensions.Logging.ILogger)
.ctor(IceRpc.IDispatcher,System.Uri,System.Net.Security.SslServerAuthenticationOptions,IceRpc.Transports.IDuplexServerTransport,IceRpc.Transports.IMultiplexedServerTransport,Microsoft.Extensions.Logging.ILogger)
DisposeAsync()
PerformDisposeAsync()
Listen()
ListenAsync()
ConnectAsync()
DisposeDetachedConnectionAsync()
ShutdownWhenRequestedAsync()
ShutdownAsync(System.Threading.CancellationToken)
PerformShutdownAsync()
ToString()
IsRetryableAcceptException(System.Exception)
get_ServerAddress()
AcceptAsync()
DisposeAsync()
.ctor(IceRpc.Server/IConnectorListener,Microsoft.Extensions.Logging.ILogger)
ConnectTransportConnectionAsync()
CreateProtocolConnection(IceRpc.Transports.TransportConnectionInformation)
DisposeAsync()
RefuseTransportConnectionAsync(System.Boolean,System.Threading.CancellationToken)
.ctor(IceRpc.Server/IConnector,IceRpc.ServerAddress,System.Net.EndPoint,Microsoft.Extensions.Logging.ILogger)
get_ServerAddress()
AcceptAsync()
DisposeAsync()
.ctor(IceRpc.Server/IConnectorListener)
ConnectTransportConnectionAsync()
CreateProtocolConnection(IceRpc.Transports.TransportConnectionInformation)
DisposeAsync()
RefuseTransportConnectionAsync()
.ctor(IceRpc.Server/IConnector)
get_ServerAddress()
DisposeAsync()
AcceptAsync()
.ctor(IceRpc.Transports.IListener`1<IceRpc.Transports.IDuplexConnection>,IceRpc.ConnectionOptions)
ConnectTransportConnectionAsync(System.Threading.CancellationToken)
CreateProtocolConnection(IceRpc.Transports.TransportConnectionInformation)
DisposeAsync()
RefuseTransportConnectionAsync(System.Boolean,System.Threading.CancellationToken)
.ctor(IceRpc.Transports.IDuplexConnection,IceRpc.ConnectionOptions)
get_ServerAddress()
AcceptAsync()
DisposeAsync()
.ctor(IceRpc.Transports.IListener`1<IceRpc.Transports.IMultiplexedConnection>,IceRpc.ConnectionOptions,IceRpc.Internal.ITaskExceptionObserver)
ConnectTransportConnectionAsync(System.Threading.CancellationToken)
CreateProtocolConnection(IceRpc.Transports.TransportConnectionInformation)
DisposeAsync()
RefuseTransportConnectionAsync(System.Boolean,System.Threading.CancellationToken)
.ctor(IceRpc.Transports.IMultiplexedConnection,IceRpc.ConnectionOptions,IceRpc.Internal.ITaskExceptionObserver)