< Summary

Information
Class: IceRpc.Transports.Coloc.Internal.ColocListener
Assembly: IceRpc.Transports.Coloc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Transports.Coloc/Internal/ColocListener.cs
Tag: 1856_27024993493
Line coverage
93%
Covered lines: 76
Uncovered lines: 5
Coverable lines: 81
Total lines: 162
Line coverage: 93.8%
Branch coverage
87%
Covered branches: 7
Total branches: 8
Branch coverage: 87.5%
Method coverage
100%
Covered methods: 6
Fully covered methods: 4
Total methods: 6
Method coverage: 100%
Full method coverage: 66.6%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_TransportAddress()100%11100%
.ctor(...)100%11100%
AcceptAsync()50%2283.33%
Dispose()100%44100%
DisposeAsync()100%11100%
TryQueueConnect(...)100%22100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Transports.Coloc/Internal/ColocListener.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using System.Diagnostics;
 4using System.Diagnostics.CodeAnalysis;
 5using System.IO.Pipelines;
 6using System.Net;
 7using System.Threading.Channels;
 8
 9namespace IceRpc.Transports.Coloc.Internal;
 10
 11/// <summary>The listener implementation for the colocated transport.</summary>
 12internal class ColocListener : IListener<IDuplexConnection>, IDisposable
 13{
 101814    public TransportAddress TransportAddress { get; }
 15
 16    [SuppressMessage(
 17        "Usage",
 18        "CA2213:Disposable fields should be disposed",
 19        Justification = "Disposing this CTS races with AcceptAsync creating a linked token source from its Token; a CTS 
 54420    private readonly CancellationTokenSource _disposeCts = new();
 21    private bool _disposed;
 22    private readonly Action<ColocListener> _onDispose;
 54423    private readonly Lock _mutex = new();
 24    private readonly EndPoint _networkAddress;
 25    private readonly PipeOptions _pipeOptions;
 26
 27    // The channel used by the client connection ConnectAsync method to queue a connection establishment request. A
 28    // client connection establishment request is represented by:
 29    // - a TaskCompletionSource which is completed by AcceptAsync when the connection is accepted. The server connection
 30    //   pipe reader is set as the result. ClientColocConnection.ConnectAsync waits on the task completion source task.
 31    // - the client connection pipe reader provided to the server connection when the server connection is created by
 32    //   AcceptAsync.
 33    private readonly Channel<(TaskCompletionSource<PipeReader>, PipeReader)> _channel;
 34
 35    public async Task<(IDuplexConnection, EndPoint)> AcceptAsync(CancellationToken cancellationToken)
 57836    {
 37        CancellationTokenSource cts;
 38        lock (_mutex)
 57839        {
 57840            ObjectDisposedException.ThrowIf(_disposed, this);
 57641            cts = CancellationTokenSource.CreateLinkedTokenSource(_disposeCts.Token, cancellationToken);
 57642        }
 57643        using var _ = cts;
 44        try
 57645        {
 57646            while (true)
 57647            {
 57648                (TaskCompletionSource<PipeReader> tcs, PipeReader clientPipeReader) =
 57649                    await _channel.Reader.ReadAsync(cts.Token).ConfigureAwait(false);
 50
 49651                var serverPipe = new Pipe(_pipeOptions);
 49652                if (tcs.TrySetResult(serverPipe.Reader))
 49653                {
 49654                    var serverConnection = new ServerColocConnection(
 49655                        TransportAddress,
 49656                        serverPipe.Writer,
 49657                        clientPipeReader);
 49658                    return (serverConnection, _networkAddress);
 59                }
 60                else
 061                {
 62                    // The client connection establishment was canceled.
 063                    serverPipe.Writer.Complete();
 064                    serverPipe.Reader.Complete();
 065                }
 066            }
 67        }
 8068        catch (OperationCanceledException)
 8069        {
 8070            cancellationToken.ThrowIfCancellationRequested();
 71            // The accept operation was canceled because the listener was disposed.
 1172            Debug.Assert(_disposeCts.IsCancellationRequested);
 1173            throw new ObjectDisposedException($"{typeof(ColocListener)}");
 74        }
 49675    }
 76
 77    public void Dispose()
 96878    {
 79        lock (_mutex)
 96880        {
 96881            if (_disposed)
 42482            {
 42483                return;
 84            }
 54485            _disposed = true;
 86
 87            // Notify the owner (e.g. the server transport) so it can release its reference to this listener.
 54488            _onDispose(this);
 89
 90            // Cancel pending AcceptAsync.
 54491            _disposeCts.Cancel();
 92
 93            // Ensure no more client connection establishment request is queued.
 54494            _channel.Writer.Complete();
 95
 96            // Complete all the queued client connection establishment requests with IceRpcError.ConnectionRefused.
 97            // Use TrySetException in case the task has been already canceled.
 56398            while (_channel.Reader.TryRead(out (TaskCompletionSource<PipeReader> Tcs, PipeReader) item))
 1999            {
 19100                item.Tcs.TrySetException(new IceRpcException(IceRpcError.ConnectionRefused));
 19101            }
 544102        }
 968103    }
 104
 105    public ValueTask DisposeAsync()
 966106    {
 966107        Dispose();
 966108        return default;
 966109    }
 110
 544111    internal ColocListener(
 544112        TransportAddress transportAddress,
 544113        Action<ColocListener> onDispose,
 544114        ColocTransportOptions colocTransportOptions,
 544115        DuplexConnectionOptions duplexConnectionOptions)
 544116    {
 544117        TransportAddress = transportAddress;
 118
 544119        _onDispose = onDispose;
 544120        _networkAddress = new ColocEndPoint(transportAddress);
 544121        _pipeOptions = new PipeOptions(
 544122            pool: duplexConnectionOptions.Pool,
 544123            minimumSegmentSize: duplexConnectionOptions.MinSegmentSize,
 544124            pauseWriterThreshold: colocTransportOptions.PauseWriterThreshold,
 544125            resumeWriterThreshold: colocTransportOptions.ResumeWriterThreshold,
 544126            useSynchronizationContext: false);
 127
 128        // Create a bounded channel with a capacity that matches the listen backlog, and with
 129        // the default concurrency settings that allow multiple reader and writers.
 544130        _channel = Channel.CreateBounded<(TaskCompletionSource<PipeReader>, PipeReader)>(
 544131            new BoundedChannelOptions(colocTransportOptions.ListenBacklog));
 544132    }
 133
 134    /// <summary>Queue client connection establishment requests from the client.</summary>
 135    /// <param name="clientPipeReader">A <see cref="PipeReader"/> for reading from the client connection.</param>
 136    /// <param name="cancellationToken">>A cancellation token that receives the cancellation requests.</param>
 137    /// <param name="serverPipeReaderTask">A task that returns a <see cref="PipeReader"/> for reading from the server
 138    /// connection.</param>
 139    /// <returns>Returns true if the connection establishment request has been queue otherwise, false.</returns>
 140    internal bool TryQueueConnect(
 141        PipeReader clientPipeReader,
 142        CancellationToken cancellationToken,
 143        [NotNullWhen(true)] out Task<PipeReader>? serverPipeReaderTask)
 518144    {
 145        // Create a tcs that is completed by AcceptAsync when accepts the corresponding connection, at which point
 146        // the client side connect operation will complete.
 147        // We use RunContinuationsAsynchronously to avoid the ConnectAsync continuation end up running in the AcceptAsyn
 148        // loop that completes this tcs.
 518149        var tcs = new TaskCompletionSource<PipeReader>(TaskCreationOptions.RunContinuationsAsynchronously);
 518150        if (_channel.Writer.TryWrite((tcs, clientPipeReader)))
 515151        {
 551152            cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken));
 515153            serverPipeReaderTask = tcs.Task;
 515154            return true;
 155        }
 156        else
 3157        {
 3158            serverPipeReaderTask = null;
 3159            return false;
 160        }
 518161    }
 162}