< Summary

Information
Class: IceRpc.ClientConnection
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/ClientConnection.cs
Tag: 275_13775359185
Line coverage
83%
Covered lines: 304
Uncovered lines: 60
Coverable lines: 364
Total lines: 683
Line coverage: 83.5%
Branch coverage
78%
Covered branches: 77
Total branches: 98
Branch coverage: 78.5%
Method coverage
100%
Covered methods: 19
Total methods: 19
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)90%1010100%
.ctor(...)100%11100%
.ctor(...)100%11100%
ConnectAsync(...)66.66%6.37678.26%
PerformConnectAsync()100%1.03170%
DisposeAsync()100%66100%
PerformDisposeAsync()100%44100%
InvokeAsync(...)57.14%22.251465.21%
CheckRequestServerAddresses()83.33%6.02692.3%
PerformInvokeAsync()75%5.41455.55%
ShutdownAsync(...)75%4.05485.71%
PerformShutdownAsync()90%11.951073.07%
CreateConnectTask()66.66%12.181289.28%
DisposePendingConnectionAsync()100%4.15478.94%
ShutdownWhenRequestedAsync()100%11100%
RemoveFromActiveAsync(...)83.33%66100%
ShutdownAndDisposeConnectionAsync()100%44100%
GetActiveConnectionAsync(...)62.5%9875%
PerformGetActiveConnectionAsync()100%1.13150%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/ClientConnection.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Features;
 4using IceRpc.Transports;
 5using Microsoft.Extensions.Logging;
 6using Microsoft.Extensions.Logging.Abstractions;
 7using System.Collections.Immutable;
 8using System.Diagnostics;
 9using System.Net.Security;
 10using System.Security.Authentication;
 11
 12namespace IceRpc;
 13
 14/// <summary>Represents a client connection used to send requests to a server and receive the corresponding responses.
 15/// </summary>
 16/// <remarks>This client connection can also dispatch requests ("callbacks") received from the server. The client
 17/// connection's underlying connection is recreated and reconnected automatically when it's closed by any event other
 18/// than a call to <see cref="ShutdownAsync" /> or <see cref="DisposeAsync" />.</remarks>
 19public sealed class ClientConnection : IInvoker, IAsyncDisposable
 20{
 21    // The underlying protocol connection once successfully established.
 22    private (IProtocolConnection Connection, TransportConnectionInformation ConnectionInformation)? _activeConnection;
 23
 24    private readonly IClientProtocolConnectionFactory _clientProtocolConnectionFactory;
 25
 26    private readonly TimeSpan _connectTimeout;
 27
 28    // A detached connection is a protocol connection that is connecting, shutting down or being disposed. Both
 29    // ShutdownAsync and DisposeAsync wait for detached connections to reach 0 using _detachedConnectionsTcs. Such a
 30    // connection is "detached" because it's not in _activeConnection.
 31    private int _detachedConnectionCount;
 32
 13733    private readonly TaskCompletionSource _detachedConnectionsTcs = new();
 34
 35    // A cancellation token source that is canceled when DisposeAsync is called.
 13736    private readonly CancellationTokenSource _disposedCts = new();
 37    private Task? _disposeTask;
 38
 13739    private readonly object _mutex = new();
 40
 41    // A connection being established and its associated connect task. When non-null, _activeConnection is null.
 42    private (IProtocolConnection Connection, Task<TransportConnectionInformation> ConnectTask)? _pendingConnection;
 43
 44    private Task? _shutdownTask;
 45
 46    private readonly TimeSpan _shutdownTimeout;
 47
 48    private readonly ServerAddress _serverAddress;
 49
 50    /// <summary>Constructs a client connection.</summary>
 51    /// <param name="options">The client connection options.</param>
 52    /// <param name="duplexClientTransport">The duplex client transport. <see langword="null" /> is equivalent to <see
 53    /// cref="IDuplexClientTransport.Default" />.</param>
 54    /// <param name="multiplexedClientTransport">The multiplexed client transport. <see langword="null" /> is equivalent
 55    /// to <see cref="IMultiplexedClientTransport.Default" />.</param>
 56    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance" />.
 57    /// </param>
 13758    public ClientConnection(
 13759        ClientConnectionOptions options,
 13760        IDuplexClientTransport? duplexClientTransport = null,
 13761        IMultiplexedClientTransport? multiplexedClientTransport = null,
 13762        ILogger? logger = null)
 13763    {
 13764        _connectTimeout = options.ConnectTimeout;
 13765        _shutdownTimeout = options.ShutdownTimeout;
 66
 13767        duplexClientTransport ??= IDuplexClientTransport.Default;
 13768        multiplexedClientTransport ??= IMultiplexedClientTransport.Default;
 69
 13770        _serverAddress = options.ServerAddress ??
 13771            throw new ArgumentException(
 13772                $"{nameof(ClientConnectionOptions.ServerAddress)} is not set",
 13773                nameof(options));
 74
 13775        if (_serverAddress.Transport is null)
 6776        {
 6777            _serverAddress = _serverAddress with
 6778            {
 6779                Transport = _serverAddress.Protocol == Protocol.Ice ?
 6780                    duplexClientTransport.Name : multiplexedClientTransport.Name
 6781            };
 6782        }
 83
 13784        _clientProtocolConnectionFactory = new ClientProtocolConnectionFactory(
 13785            options,
 13786            options.ClientAuthenticationOptions,
 13787            duplexClientTransport,
 13788            multiplexedClientTransport,
 13789            logger);
 13790    }
 91
 92    /// <summary>Constructs a client connection with the specified server address and client authentication options.
 93    /// All other properties use the <see cref="ClientConnectionOptions" /> defaults.</summary>
 94    /// <param name="serverAddress">The connection's server address.</param>
 95    /// <param name="clientAuthenticationOptions">The SSL client authentication options. When not <see langword="null"
 96    /// />, <see cref="ConnectAsync(CancellationToken)" /> will either establish a secure connection or fail.</param>
 97    /// <param name="duplexClientTransport">The duplex client transport. <see langword="null" /> is equivalent to <see
 98    /// cref="IDuplexClientTransport.Default" />.</param>
 99    /// <param name="multiplexedClientTransport">The multiplexed client transport. <see langword="null" /> is equivalent
 100    /// to <see cref="IMultiplexedClientTransport.Default" />.</param>
 101    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance" />.
 102    /// </param>
 103    public ClientConnection(
 104        ServerAddress serverAddress,
 105        SslClientAuthenticationOptions? clientAuthenticationOptions = null,
 106        IDuplexClientTransport? duplexClientTransport = null,
 107        IMultiplexedClientTransport? multiplexedClientTransport = null,
 108        ILogger? logger = null)
 63109        : this(
 63110            new ClientConnectionOptions
 63111            {
 63112                ClientAuthenticationOptions = clientAuthenticationOptions,
 63113                ServerAddress = serverAddress
 63114            },
 63115            duplexClientTransport,
 63116            multiplexedClientTransport,
 63117            logger)
 63118    {
 63119    }
 120
 121    /// <summary>Constructs a client connection with the specified server address URI and client authentication options.
 122    /// All other properties use the <see cref="ClientConnectionOptions" /> defaults.</summary>
 123    /// <param name="serverAddressUri">The connection's server address URI.</param>
 124    /// <param name="clientAuthenticationOptions">The SSL client authentication options. When not <see langword="null"
 125    /// />, <see cref="ConnectAsync(CancellationToken)" /> will either establish a secure connection or fail.</param>
 126    /// <param name="duplexClientTransport">The duplex client transport. <see langword="null" /> is equivalent to <see
 127    /// cref="IDuplexClientTransport.Default" />.</param>
 128    /// <param name="multiplexedClientTransport">The multiplexed client transport. <see langword="null" /> is equivalent
 129    /// to <see cref="IMultiplexedClientTransport.Default" />.</param>
 130    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance" />.
 131    /// </param>
 132    public ClientConnection(
 133        Uri serverAddressUri,
 134        SslClientAuthenticationOptions? clientAuthenticationOptions = null,
 135        IDuplexClientTransport? duplexClientTransport = null,
 136        IMultiplexedClientTransport? multiplexedClientTransport = null,
 137        ILogger? logger = null)
 3138        : this(
 3139            new ServerAddress(serverAddressUri),
 3140            clientAuthenticationOptions,
 3141            duplexClientTransport,
 3142            multiplexedClientTransport,
 3143            logger)
 3144    {
 3145    }
 146
 147    /// <summary>Establishes the connection.</summary>
 148    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 149    /// <returns>A task that provides the <see cref="TransportConnectionInformation" /> of the transport connection,
 150    /// once this connection is established. This task can also complete with one of the following exceptions:
 151    /// <list type="bullet">
 152    /// <item><description><see cref="AuthenticationException" /> if authentication failed.</description></item>
 153    /// <item><description><see cref="IceRpcException" /> if the connection establishment failed.</description>
 154    /// </item>
 155    /// <item><description><see cref="OperationCanceledException" /> if cancellation was requested through the
 156    /// cancellation token.</description></item>
 157    /// <item><description><see cref="TimeoutException" /> if this connection attempt or a previous attempt exceeded
 158    /// <see cref="ClientConnectionOptions.ConnectTimeout" />.</description></item>
 159    /// </list>
 160    /// </returns>
 161    /// <exception cref="InvalidOperationException">Thrown if this client connection is shut down or shutting down.
 162    /// </exception>
 163    /// <exception cref="ObjectDisposedException">Thrown if this client connection is disposed.</exception>
 164    /// <remarks>This method can be called multiple times and concurrently. If the connection is not established, it
 165    /// will be connected or reconnected.</remarks>
 166    public Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken = default)
 98167    {
 168        Task<TransportConnectionInformation> connectTask;
 169
 98170        lock (_mutex)
 98171        {
 98172            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 98173            if (_shutdownTask is not null)
 0174            {
 0175                throw new InvalidOperationException("Cannot connect a client connection after shutting it down.");
 176            }
 177
 98178            if (_activeConnection is not null)
 2179            {
 2180                return Task.FromResult(_activeConnection.Value.ConnectionInformation);
 181            }
 182
 96183            if (_pendingConnection is null)
 96184            {
 96185                IProtocolConnection newConnection = _clientProtocolConnectionFactory.CreateConnection(_serverAddress);
 96186                _detachedConnectionCount++;
 96187                connectTask = CreateConnectTask(newConnection, cancellationToken);
 96188                _pendingConnection = (newConnection, connectTask);
 96189            }
 190            else
 0191            {
 0192                connectTask = _pendingConnection.Value.ConnectTask.WaitAsync(cancellationToken);
 0193            }
 96194        }
 195
 96196        return PerformConnectAsync();
 197
 198        async Task<TransportConnectionInformation> PerformConnectAsync()
 96199        {
 200            try
 96201            {
 96202                return await connectTask.ConfigureAwait(false);
 203            }
 6204            catch (OperationCanceledException)
 6205            {
 206                // Canceled via the cancellation token given to ConnectAsync, but not necessarily this ConnectAsync
 207                // call.
 208
 6209                cancellationToken.ThrowIfCancellationRequested();
 210
 0211                throw new IceRpcException(
 0212                    IceRpcError.ConnectionAborted,
 0213                    "The connection establishment was canceled by another concurrent attempt.");
 214            }
 54215        }
 98216    }
 217
 218    /// <summary>Releases all resources allocated by the connection. The connection disposes all the underlying
 219    /// connections it created.</summary>
 220    /// <returns>A value task that completes when the disposal of all the underlying connections has
 221    /// completed.</returns>
 222    /// <remarks>The disposal of an underlying connection aborts invocations, cancels dispatches and disposes the
 223    /// underlying transport connection without waiting for the peer. To wait for invocations and dispatches to
 224    /// complete, call <see cref="ShutdownAsync" /> first. If the configured dispatcher does not complete promptly when
 225    /// its cancellation token is canceled, the disposal can hang.</remarks>
 226    public ValueTask DisposeAsync()
 149227    {
 149228        lock (_mutex)
 149229        {
 149230            if (_disposeTask is null)
 137231            {
 137232                _shutdownTask ??= Task.CompletedTask;
 137233                if (_detachedConnectionCount == 0)
 125234                {
 125235                    _ = _detachedConnectionsTcs.TrySetResult();
 125236                }
 237
 137238                _disposeTask = PerformDisposeAsync();
 137239            }
 149240        }
 149241        return new(_disposeTask);
 242
 243        async Task PerformDisposeAsync()
 137244        {
 137245            await Task.Yield(); // Exit mutex lock
 246
 137247            _disposedCts.Cancel();
 248
 249            // Wait for shutdown before disposing connections.
 250            try
 137251            {
 137252                await _shutdownTask.ConfigureAwait(false);
 133253            }
 4254            catch
 4255            {
 256                // ignore exceptions.
 4257            }
 258
 259            // Since a pending connection is "detached", it's disposed via the connectTask, not directly by this method.
 137260            if (_activeConnection is not null)
 78261            {
 78262                await _activeConnection.Value.Connection.DisposeAsync().ConfigureAwait(false);
 78263            }
 264
 137265            await _detachedConnectionsTcs.Task.ConfigureAwait(false);
 266
 137267            _disposedCts.Dispose();
 137268        }
 149269    }
 270
 271    /// <summary>Sends an outgoing request and returns the corresponding incoming response.</summary>
 272    /// <param name="request">The outgoing request being sent.</param>
 273    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 274    /// <returns>The corresponding <see cref="IncomingResponse" />.</returns>
 275    /// <exception cref="InvalidOperationException">Thrown if none of the request's server addresses matches this
 276    /// connection's server address.</exception>
 277    /// <exception cref="IceRpcException">Thrown with error <see cref="IceRpcError.InvocationRefused" /> if this client
 278    /// connection is shutdown.</exception>
 279    /// <exception cref="ObjectDisposedException">Thrown if this client connection is disposed.</exception>
 280    /// <remarks>If the connection is not established, it will be connected or reconnected.</remarks>
 281    public Task<IncomingResponse> InvokeAsync(OutgoingRequest request, CancellationToken cancellationToken = default)
 70282    {
 70283        if (request.Features.Get<IServerAddressFeature>() is IServerAddressFeature serverAddressFeature)
 0284        {
 0285            if (serverAddressFeature.ServerAddress is ServerAddress mainServerAddress)
 0286            {
 0287                CheckRequestServerAddresses(mainServerAddress, serverAddressFeature.AltServerAddresses);
 0288            }
 0289        }
 70290        else if (request.ServiceAddress.ServerAddress is ServerAddress mainServerAddress)
 34291        {
 34292            CheckRequestServerAddresses(mainServerAddress, request.ServiceAddress.AltServerAddresses);
 22293        }
 294        // It's ok if the request has no server address at all.
 295
 58296        IProtocolConnection? activeConnection = null;
 297
 58298        lock (_mutex)
 58299        {
 58300            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 301
 58302            if (_shutdownTask is not null)
 0303            {
 0304                throw new IceRpcException(IceRpcError.InvocationRefused, "The client connection was shut down.");
 305            }
 306
 58307            activeConnection = _activeConnection?.Connection;
 58308        }
 309
 58310        return PerformInvokeAsync(activeConnection);
 311
 312        void CheckRequestServerAddresses(
 313            ServerAddress mainServerAddress,
 314            ImmutableList<ServerAddress> altServerAddresses)
 34315        {
 34316            if (ServerAddressComparer.OptionalTransport.Equals(mainServerAddress, _serverAddress))
 20317            {
 20318                return;
 319            }
 320
 44321            foreach (ServerAddress serverAddress in altServerAddresses)
 2322            {
 2323                if (ServerAddressComparer.OptionalTransport.Equals(serverAddress, _serverAddress))
 2324                {
 2325                    return;
 326                }
 0327            }
 328
 12329            throw new InvalidOperationException(
 12330                $"None of the request's server addresses matches this connection's server address: {_serverAddress}");
 22331        }
 332
 333        async Task<IncomingResponse> PerformInvokeAsync(IProtocolConnection? connection)
 58334        {
 335            // When InvokeAsync throws an IceRpcException(InvocationRefused) we retry unless the client connection is
 336            // being shutdown or disposed.
 58337            while (true)
 58338            {
 58339                connection ??= await GetActiveConnectionAsync(cancellationToken).ConfigureAwait(false);
 340
 341                try
 58342                {
 58343                    return await connection.InvokeAsync(request, cancellationToken).ConfigureAwait(false);
 344                }
 0345                catch (ObjectDisposedException)
 0346                {
 347                    // This can occasionally happen if we find a connection that was just closed and then automatically
 348                    // disposed by this client connection.
 0349                }
 4350                catch (IceRpcException exception) when (exception.IceRpcError == IceRpcError.InvocationRefused)
 0351                {
 352                    // The connection is refusing new invocations.
 0353                }
 4354                catch (IceRpcException exception) when (exception.IceRpcError == IceRpcError.OperationAborted)
 4355                {
 4356                    lock (_mutex)
 4357                    {
 4358                        if (_disposeTask is null)
 0359                        {
 0360                            throw new IceRpcException(
 0361                                IceRpcError.ConnectionAborted,
 0362                                "The underlying connection was disposed while the invocation was in progress.");
 363                        }
 364                        else
 4365                        {
 4366                            throw;
 367                        }
 368                    }
 369                }
 370
 371                // Make sure connection is no longer in _activeConnection before we retry.
 0372                _ = RemoveFromActiveAsync(connection);
 0373                connection = null;
 0374            }
 50375        }
 58376    }
 377
 378    /// <summary>Gracefully shuts down the connection. The shutdown waits for pending invocations and dispatches to
 379    /// complete.</summary>
 380    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 381    /// <returns>A task that completes once the shutdown is complete. This task can also complete with one of the
 382    /// following exceptions:
 383    /// <list type="bullet">
 384    /// <item><description><see cref="IceRpcException" /> if the connection shutdown failed.</description></item>
 385    /// <item><description><see cref="OperationCanceledException" /> if cancellation was requested through the
 386    /// cancellation token.</description></item>
 387    /// <item><description><see cref="TimeoutException" /> if this shutdown attempt or a previous attempt exceeded <see
 388    /// cref="ClientConnectionOptions.ShutdownTimeout" />.</description></item>
 389    /// </list>
 390    /// </returns>
 391    /// <exception cref="InvalidOperationException">Thrown if this connection is already shut down or shutting down.
 392    /// </exception>
 393    /// <exception cref="ObjectDisposedException">Thrown if this connection is disposed.</exception>
 394    public Task ShutdownAsync(CancellationToken cancellationToken = default)
 16395    {
 16396        lock (_mutex)
 16397        {
 16398            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 16399            if (_shutdownTask is not null)
 0400            {
 0401                throw new InvalidOperationException("The client connection is already shut down or shutting down.");
 402            }
 403
 16404            if (_detachedConnectionCount == 0)
 16405            {
 16406                _ = _detachedConnectionsTcs.TrySetResult();
 16407            }
 408
 16409            _shutdownTask = PerformShutdownAsync();
 16410            return _shutdownTask;
 411        }
 412
 413        async Task PerformShutdownAsync()
 16414        {
 16415            await Task.Yield(); // exit mutex lock
 416
 16417            using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _disposedCts.Token);
 16418            cts.CancelAfter(_shutdownTimeout);
 419
 420            // Since a pending connection is "detached", it's shutdown and disposed via the connectTask, not directly by
 421            // this method.
 422            try
 16423            {
 16424                if (_activeConnection is not null)
 12425                {
 12426                    await _activeConnection.Value.Connection.ShutdownAsync(cts.Token).ConfigureAwait(false);
 8427                }
 428
 12429                await _detachedConnectionsTcs.Task.WaitAsync(cts.Token).ConfigureAwait(false);
 12430            }
 4431            catch (OperationCanceledException)
 4432            {
 4433                cancellationToken.ThrowIfCancellationRequested();
 434
 2435                if (_disposedCts.IsCancellationRequested)
 0436                {
 0437                    throw new IceRpcException(
 0438                        IceRpcError.OperationAborted,
 0439                        "The shutdown was aborted because the client connection was disposed.");
 440                }
 441                else
 2442                {
 2443                    throw new TimeoutException(
 2444                        $"The client connection shut down timed out after {_shutdownTimeout.TotalSeconds} s.");
 445                }
 446            }
 0447            catch
 0448            {
 449                // ignore other shutdown exception
 0450            }
 12451        }
 16452    }
 453
 454    /// <summary>Creates the connection establishment task for a pending connection.</summary>
 455    /// <param name="connection">The new pending connection to connect.</param>
 456    /// <param name="cancellationToken">The cancellation token that can cancel this task.</param>
 457    /// <returns>A task that completes successfully when the connection is connected.</returns>
 458    private async Task<TransportConnectionInformation> CreateConnectTask(
 459        IProtocolConnection connection,
 460        CancellationToken cancellationToken)
 134461    {
 134462        await Task.Yield(); // exit mutex lock
 463
 464        // This task "owns" a detachedConnectionCount and as a result _disposedCts can't be disposed.
 134465        using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _disposedCts.Token);
 134466        cts.CancelAfter(_connectTimeout);
 467
 468        TransportConnectionInformation connectionInformation;
 469        Task shutdownRequested;
 134470        Task? connectTask = null;
 471
 472        try
 134473        {
 474            try
 134475            {
 134476                (connectionInformation, shutdownRequested) = await connection.ConnectAsync(cts.Token)
 134477                    .ConfigureAwait(false);
 92478            }
 10479            catch (OperationCanceledException)
 10480            {
 10481                cancellationToken.ThrowIfCancellationRequested();
 482
 4483                if (_disposedCts.IsCancellationRequested)
 2484                {
 2485                    throw new IceRpcException(
 2486                        IceRpcError.OperationAborted,
 2487                        "The connection establishment was aborted because the client connection was disposed.");
 488                }
 489                else
 2490                {
 2491                    throw new TimeoutException(
 2492                        $"The connection establishment timed out after {_connectTimeout.TotalSeconds} s.");
 493                }
 494            }
 92495        }
 42496        catch
 42497        {
 42498            lock (_mutex)
 42499            {
 42500                Debug.Assert(_pendingConnection is not null && _pendingConnection.Value.Connection == connection);
 42501                Debug.Assert(_activeConnection is null);
 502
 503                // connectTask is executing this method and about to throw.
 42504                connectTask = _pendingConnection.Value.ConnectTask;
 42505                _pendingConnection = null;
 42506            }
 507
 42508            _ = DisposePendingConnectionAsync(connection, connectTask);
 42509            throw;
 510        }
 511
 92512        lock (_mutex)
 92513        {
 92514            Debug.Assert(_pendingConnection is not null && _pendingConnection.Value.Connection == connection);
 92515            Debug.Assert(_activeConnection is null);
 516
 92517            if (_shutdownTask is null)
 92518            {
 519                // the connection is now "attached" in _activeConnection
 92520                _activeConnection = (connection, connectionInformation);
 92521                _detachedConnectionCount--;
 92522            }
 523            else
 0524            {
 0525                connectTask = _pendingConnection.Value.ConnectTask;
 0526            }
 92527            _pendingConnection = null;
 92528        }
 529
 92530        if (connectTask is null)
 92531        {
 92532            _ = ShutdownWhenRequestedAsync(connection, shutdownRequested);
 92533        }
 534        else
 0535        {
 536            // As soon as this method completes successfully, we shut down then dispose the connection.
 0537            _ = DisposePendingConnectionAsync(connection, connectTask);
 0538        }
 92539        return connectionInformation;
 540
 541        async Task DisposePendingConnectionAsync(IProtocolConnection connection, Task connectTask)
 42542        {
 543            try
 42544            {
 42545                await connectTask.ConfigureAwait(false);
 546
 547                // Since we own a detachedConnectionCount, _disposedCts is not disposed.
 0548                using var cts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
 0549                cts.CancelAfter(_shutdownTimeout);
 0550                await connection.ShutdownAsync(cts.Token).ConfigureAwait(false);
 0551            }
 42552            catch
 42553            {
 554                // Observe and ignore exceptions.
 42555            }
 556
 42557            await connection.DisposeAsync().ConfigureAwait(false);
 558
 42559            lock (_mutex)
 42560            {
 42561                if (--_detachedConnectionCount == 0 && _shutdownTask is not null)
 9562                {
 9563                    _detachedConnectionsTcs.SetResult();
 9564                }
 42565            }
 42566        }
 567
 568        async Task ShutdownWhenRequestedAsync(IProtocolConnection connection, Task shutdownRequested)
 92569        {
 92570            await shutdownRequested.ConfigureAwait(false);
 20571            await RemoveFromActiveAsync(connection).ConfigureAwait(false);
 20572        }
 92573    }
 574
 575    /// <summary>Removes the connection from _activeConnection, and when successful, shuts down and disposes this
 576    /// connection.</summary>
 577    /// <param name="connection">The connected connection to shutdown and dispose.</param>
 578    private Task RemoveFromActiveAsync(IProtocolConnection connection)
 20579    {
 20580        lock (_mutex)
 20581        {
 20582            if (_shutdownTask is null && _activeConnection?.Connection == connection)
 14583            {
 14584                _activeConnection = null; // it's now our connection.
 14585                _detachedConnectionCount++;
 14586            }
 587            else
 6588            {
 589                // Another task owns this connection
 6590                return Task.CompletedTask;
 591            }
 14592        }
 593
 14594        return ShutdownAndDisposeConnectionAsync();
 595
 596        async Task ShutdownAndDisposeConnectionAsync()
 14597        {
 598            // _disposedCts is not disposed since we own a detachedConnectionCount
 14599            using var cts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
 14600            cts.CancelAfter(_shutdownTimeout);
 601
 602            try
 14603            {
 14604                await connection.ShutdownAsync(cts.Token).ConfigureAwait(false);
 8605            }
 6606            catch
 6607            {
 608                // Ignore connection shutdown failures
 6609            }
 610
 14611            await connection.DisposeAsync().ConfigureAwait(false);
 612
 14613            lock (_mutex)
 14614            {
 14615                if (--_detachedConnectionCount == 0 && _shutdownTask is not null)
 3616                {
 3617                    _detachedConnectionsTcs.SetResult();
 3618                }
 14619            }
 14620        }
 20621    }
 622
 623    /// <summary>Gets an active connection, by creating and connecting (if necessary) a new protocol connection.
 624    /// </summary>
 625    /// <param name="cancellationToken">The cancellation token of the invocation calling this method.</param>
 626    /// <returns>A connected connection.</returns>
 627    /// <remarks>This method is called exclusively by <see cref="InvokeAsync" />.</remarks>
 628    private ValueTask<IProtocolConnection> GetActiveConnectionAsync(CancellationToken cancellationToken)
 38629    {
 630        (IProtocolConnection Connection, Task<TransportConnectionInformation> ConnectTask) pendingConnectionValue;
 631
 38632        lock (_mutex)
 38633        {
 38634            if (_disposeTask is not null)
 0635            {
 0636                throw new IceRpcException(IceRpcError.OperationAborted, "The client connection was disposed.");
 637            }
 38638            if (_shutdownTask is not null)
 0639            {
 0640                throw new IceRpcException(IceRpcError.InvocationRefused, "The client connection was shut down.");
 641            }
 642
 38643            if (_activeConnection is not null)
 0644            {
 0645                return new(_activeConnection.Value.Connection);
 646            }
 647
 38648            if (_pendingConnection is null)
 38649            {
 38650                IProtocolConnection connection = _clientProtocolConnectionFactory.CreateConnection(_serverAddress);
 38651                _detachedConnectionCount++;
 652
 653                // We pass CancellationToken.None because the invocation cancellation should not cancel the connection
 654                // establishment.
 38655                Task<TransportConnectionInformation> connectTask =
 38656                    CreateConnectTask(connection, CancellationToken.None);
 38657                _pendingConnection = (connection, connectTask);
 38658            }
 38659            pendingConnectionValue = _pendingConnection.Value;
 38660        }
 661
 38662        return PerformGetActiveConnectionAsync();
 663
 664        async ValueTask<IProtocolConnection> PerformGetActiveConnectionAsync()
 38665        {
 666            // ConnectTask itself takes care of scheduling its exception observation when it fails.
 667            try
 38668            {
 38669                _ = await pendingConnectionValue.ConnectTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 38670            }
 0671            catch (OperationCanceledException)
 0672            {
 0673                cancellationToken.ThrowIfCancellationRequested();
 674
 675                // Canceled by the cancellation token given to ClientConnection.ConnectAsync.
 0676                throw new IceRpcException(
 0677                    IceRpcError.ConnectionAborted,
 0678                    "The connection establishment was canceled by another concurrent attempt.");
 679            }
 38680            return pendingConnectionValue.Connection;
 38681        }
 38682    }
 683}