< Summary

Information
Class: IceRpc.Transports.Coloc.Internal.ColocListener
Assembly: IceRpc.Transports.Coloc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Transports.Coloc/Internal/ColocListener.cs
Tag: 275_13775359185
Line coverage
100%
Covered lines: 69
Uncovered lines: 0
Coverable lines: 69
Total lines: 141
Line coverage: 100%
Branch coverage
100%
Covered branches: 8
Total branches: 8
Branch coverage: 100%
Method coverage
100%
Covered methods: 5
Total methods: 5
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_ServerAddress()100%11100%
.ctor(...)100%11100%
AcceptAsync()100%22100%
DisposeAsync()100%44100%
TryQueueConnect(...)100%22100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Transports.Coloc/Internal/ColocListener.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using System.Diagnostics;
 4using System.Diagnostics.CodeAnalysis;
 5using System.IO.Pipelines;
 6using System.Net;
 7using System.Threading.Channels;
 8
 9namespace IceRpc.Transports.Coloc.Internal;
 10
 11/// <summary>The listener implementation for the colocated transport.</summary>
 12internal class ColocListener : IListener<IDuplexConnection>
 13{
 289714    public ServerAddress ServerAddress { get; }
 15
 98616    private readonly CancellationTokenSource _disposeCts = new();
 17    private readonly EndPoint _networkAddress;
 18    private readonly PipeOptions _pipeOptions;
 19
 20    // The channel used by the client connection ConnectAsync method to queue a connection establishment request. A
 21    // client connection establishment request is represented by:
 22    // - a TaskCompletionSource which is completed by AcceptAsync when the connection is accepted. The server connection
 23    //   pipe reader is set as the result. ClientColocConnection.ConnectAsync waits on the task completion source task.
 24    // - the client connection pipe reader provided to the server connection when the server connection is created by
 25    //   AcceptAsync.
 26    private readonly Channel<(TaskCompletionSource<PipeReader>, PipeReader)> _channel;
 27
 28    public async Task<(IDuplexConnection, EndPoint)> AcceptAsync(CancellationToken cancellationToken)
 103229    {
 103230        ObjectDisposedException.ThrowIf(_disposeCts.IsCancellationRequested, this);
 31
 102832        using var cts = CancellationTokenSource.CreateLinkedTokenSource(_disposeCts.Token, cancellationToken);
 33        try
 102834        {
 103035            while (true)
 103036            {
 103037                (TaskCompletionSource<PipeReader> tcs, PipeReader clientPipeReader) =
 103038                    await _channel.Reader.ReadAsync(cts.Token).ConfigureAwait(false);
 39
 89440                var serverPipe = new Pipe(_pipeOptions);
 89441                if (tcs.TrySetResult(serverPipe.Reader))
 89242                {
 89243                    var serverConnection = new ServerColocConnection(
 89244                        ServerAddress,
 89245                        serverPipe.Writer,
 89246                        clientPipeReader);
 89247                    return (serverConnection, _networkAddress);
 48                }
 49                else
 250                {
 51                    // The client connection establishment was canceled.
 252                    serverPipe.Writer.Complete();
 253                    serverPipe.Reader.Complete();
 254                }
 255            }
 56        }
 13657        catch (OperationCanceledException)
 13658        {
 13659            cancellationToken.ThrowIfCancellationRequested();
 60            // The accept operation was canceled because the listener was disposed.
 2161            Debug.Assert(_disposeCts.IsCancellationRequested);
 2162            throw new ObjectDisposedException($"{typeof(ColocListener)}");
 63        }
 89264    }
 65
 66    public ValueTask DisposeAsync()
 176667    {
 176668        if (_disposeCts.IsCancellationRequested)
 78469        {
 70            // Dispose already called.
 78471            return default;
 72        }
 73
 74        // Cancel pending AcceptAsync.
 98275        _disposeCts.Cancel();
 76
 77        // Ensure no more client connection establishment request is queued.
 98278        _channel.Writer.Complete();
 79
 80        // Complete all the queued client connection establishment requests with IceRpcError.ConnectionRefused. Use
 81        // TrySetException in case the task has been already canceled.
 101982        while (_channel.Reader.TryRead(out (TaskCompletionSource<PipeReader> Tcs, PipeReader) item))
 3783        {
 3784            item.Tcs.TrySetException(new IceRpcException(IceRpcError.ConnectionRefused));
 3785        }
 86
 98287        _disposeCts.Dispose();
 88
 98289        return default;
 176690    }
 91
 98692    internal ColocListener(
 98693        ServerAddress serverAddress,
 98694        ColocTransportOptions colocTransportOptions,
 98695        DuplexConnectionOptions duplexConnectionOptions)
 98696    {
 98697        ServerAddress = serverAddress;
 98
 98699        _networkAddress = new ColocEndPoint(serverAddress);
 986100        _pipeOptions = new PipeOptions(
 986101            pool: duplexConnectionOptions.Pool,
 986102            minimumSegmentSize: duplexConnectionOptions.MinSegmentSize,
 986103            pauseWriterThreshold: colocTransportOptions.PauseWriterThreshold,
 986104            resumeWriterThreshold: colocTransportOptions.ResumeWriterThreshold,
 986105            useSynchronizationContext: false);
 106
 107        // Create a bounded channel with a capacity that matches the listen backlog, and with
 108        // the default concurrency settings that allow multiple reader and writers.
 986109        _channel = Channel.CreateBounded<(TaskCompletionSource<PipeReader>, PipeReader)>(
 986110            new BoundedChannelOptions(colocTransportOptions.ListenBacklog));
 986111    }
 112
 113    /// <summary>Queue client connection establishment requests from the client.</summary>
 114    /// <param name="clientPipeReader">A <see cref="PipeReader"/> for reading from the client connection.</param>
 115    /// <param name="cancellationToken">>A cancellation token that receives the cancellation requests.</param>
 116    /// <param name="serverPipeReaderTask">A task that returns a <see cref="PipeReader"/> for reading from the server
 117    /// connection.</param>
 118    /// <returns>Returns true if the connection establishment request has been queue otherwise, false.</returns>
 119    internal bool TryQueueConnect(
 120        PipeReader clientPipeReader,
 121        CancellationToken cancellationToken,
 122        [NotNullWhen(true)] out Task<PipeReader>? serverPipeReaderTask)
 943123    {
 124        // Create a tcs that is completed by AcceptAsync when accepts the corresponding connection, at which point
 125        // the client side connect operation will complete.
 126        // We use RunContinuationsAsynchronously to avoid the ConnectAsync continuation end up running in the AcceptAsyn
 127        // loop that completes this tcs.
 943128        var tcs = new TaskCompletionSource<PipeReader>(TaskCreationOptions.RunContinuationsAsynchronously);
 943129        if (_channel.Writer.TryWrite((tcs, clientPipeReader)))
 931130        {
 1003131            cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken));
 931132            serverPipeReaderTask = tcs.Task;
 931133            return true;
 134        }
 135        else
 12136        {
 12137            serverPipeReaderTask = null;
 12138            return false;
 139        }
 943140    }
 141}