< Summary

Information
Class: IceRpc.Transports.Coloc.Internal.ColocListener
Assembly: IceRpc.Transports.Coloc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Transports.Coloc/Internal/ColocListener.cs
Tag: 1321_24790053727
Line coverage
93%
Covered lines: 67
Uncovered lines: 5
Coverable lines: 72
Total lines: 147
Line coverage: 93%
Branch coverage
87%
Covered branches: 7
Total branches: 8
Branch coverage: 87.5%
Method coverage
100%
Covered methods: 5
Fully covered methods: 3
Total methods: 5
Method coverage: 100%
Full method coverage: 60%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_TransportAddress()100%11100%
.ctor(...)100%11100%
AcceptAsync()50%2281.48%
DisposeAsync()100%44100%
TryQueueConnect(...)100%22100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Transports.Coloc/Internal/ColocListener.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using System.Diagnostics;
 4using System.Diagnostics.CodeAnalysis;
 5using System.IO.Pipelines;
 6using System.Net;
 7using System.Threading.Channels;
 8
 9namespace IceRpc.Transports.Coloc.Internal;
 10
 11/// <summary>The listener implementation for the colocated transport.</summary>
 12internal class ColocListener : IListener<IDuplexConnection>
 13{
 96214    public TransportAddress TransportAddress { get; }
 15
 51616    private readonly CancellationTokenSource _disposeCts = new();
 17    private readonly Action<ColocListener> _onDispose;
 18    private readonly EndPoint _networkAddress;
 19    private readonly PipeOptions _pipeOptions;
 20
 21    // The channel used by the client connection ConnectAsync method to queue a connection establishment request. A
 22    // client connection establishment request is represented by:
 23    // - a TaskCompletionSource which is completed by AcceptAsync when the connection is accepted. The server connection
 24    //   pipe reader is set as the result. ClientColocConnection.ConnectAsync waits on the task completion source task.
 25    // - the client connection pipe reader provided to the server connection when the server connection is created by
 26    //   AcceptAsync.
 27    private readonly Channel<(TaskCompletionSource<PipeReader>, PipeReader)> _channel;
 28
 29    public async Task<(IDuplexConnection, EndPoint)> AcceptAsync(CancellationToken cancellationToken)
 54730    {
 54731        ObjectDisposedException.ThrowIf(_disposeCts.IsCancellationRequested, this);
 32
 54533        using var cts = CancellationTokenSource.CreateLinkedTokenSource(_disposeCts.Token, cancellationToken);
 34        try
 54535        {
 54536            while (true)
 54537            {
 54538                (TaskCompletionSource<PipeReader> tcs, PipeReader clientPipeReader) =
 54539                    await _channel.Reader.ReadAsync(cts.Token).ConfigureAwait(false);
 40
 46841                var serverPipe = new Pipe(_pipeOptions);
 46842                if (tcs.TrySetResult(serverPipe.Reader))
 46843                {
 46844                    var serverConnection = new ServerColocConnection(
 46845                        TransportAddress,
 46846                        serverPipe.Writer,
 46847                        clientPipeReader);
 46848                    return (serverConnection, _networkAddress);
 49                }
 50                else
 051                {
 52                    // The client connection establishment was canceled.
 053                    serverPipe.Writer.Complete();
 054                    serverPipe.Reader.Complete();
 055                }
 056            }
 57        }
 7758        catch (OperationCanceledException)
 7759        {
 7760            cancellationToken.ThrowIfCancellationRequested();
 61            // The accept operation was canceled because the listener was disposed.
 1162            Debug.Assert(_disposeCts.IsCancellationRequested);
 1163            throw new ObjectDisposedException($"{typeof(ColocListener)}");
 64        }
 46865    }
 66
 67    public ValueTask DisposeAsync()
 91968    {
 91969        if (_disposeCts.IsCancellationRequested)
 40570        {
 71            // Dispose already called.
 40572            return default;
 73        }
 74
 75        // Notify the owner (e.g. the server transport) so it can release its reference to this listener.
 51476        _onDispose(this);
 77
 78        // Cancel pending AcceptAsync.
 51479        _disposeCts.Cancel();
 80
 81        // Ensure no more client connection establishment request is queued.
 51482        _channel.Writer.Complete();
 83
 84        // Complete all the queued client connection establishment requests with IceRpcError.ConnectionRefused. Use
 85        // TrySetException in case the task has been already canceled.
 53386        while (_channel.Reader.TryRead(out (TaskCompletionSource<PipeReader> Tcs, PipeReader) item))
 1987        {
 1988            item.Tcs.TrySetException(new IceRpcException(IceRpcError.ConnectionRefused));
 1989        }
 90
 51491        _disposeCts.Dispose();
 92
 51493        return default;
 91994    }
 95
 51696    internal ColocListener(
 51697        TransportAddress transportAddress,
 51698        Action<ColocListener> onDispose,
 51699        ColocTransportOptions colocTransportOptions,
 516100        DuplexConnectionOptions duplexConnectionOptions)
 516101    {
 516102        TransportAddress = transportAddress;
 103
 516104        _onDispose = onDispose;
 516105        _networkAddress = new ColocEndPoint(transportAddress);
 516106        _pipeOptions = new PipeOptions(
 516107            pool: duplexConnectionOptions.Pool,
 516108            minimumSegmentSize: duplexConnectionOptions.MinSegmentSize,
 516109            pauseWriterThreshold: colocTransportOptions.PauseWriterThreshold,
 516110            resumeWriterThreshold: colocTransportOptions.ResumeWriterThreshold,
 516111            useSynchronizationContext: false);
 112
 113        // Create a bounded channel with a capacity that matches the listen backlog, and with
 114        // the default concurrency settings that allow multiple reader and writers.
 516115        _channel = Channel.CreateBounded<(TaskCompletionSource<PipeReader>, PipeReader)>(
 516116            new BoundedChannelOptions(colocTransportOptions.ListenBacklog));
 516117    }
 118
 119    /// <summary>Queue client connection establishment requests from the client.</summary>
 120    /// <param name="clientPipeReader">A <see cref="PipeReader"/> for reading from the client connection.</param>
 121    /// <param name="cancellationToken">>A cancellation token that receives the cancellation requests.</param>
 122    /// <param name="serverPipeReaderTask">A task that returns a <see cref="PipeReader"/> for reading from the server
 123    /// connection.</param>
 124    /// <returns>Returns true if the connection establishment request has been queue otherwise, false.</returns>
 125    internal bool TryQueueConnect(
 126        PipeReader clientPipeReader,
 127        CancellationToken cancellationToken,
 128        [NotNullWhen(true)] out Task<PipeReader>? serverPipeReaderTask)
 490129    {
 130        // Create a tcs that is completed by AcceptAsync when accepts the corresponding connection, at which point
 131        // the client side connect operation will complete.
 132        // We use RunContinuationsAsynchronously to avoid the ConnectAsync continuation end up running in the AcceptAsyn
 133        // loop that completes this tcs.
 490134        var tcs = new TaskCompletionSource<PipeReader>(TaskCreationOptions.RunContinuationsAsynchronously);
 490135        if (_channel.Writer.TryWrite((tcs, clientPipeReader)))
 487136        {
 523137            cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken));
 487138            serverPipeReaderTask = tcs.Task;
 487139            return true;
 140        }
 141        else
 3142        {
 3143            serverPipeReaderTask = null;
 3144            return false;
 145        }
 490146    }
 147}