< Summary

Information
Class: IceRpc.Transports.Coloc.Internal.ColocListener
Assembly: IceRpc.Transports.Coloc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Transports.Coloc/Internal/ColocListener.cs
Tag: 592_20856082467
Line coverage
100%
Covered lines: 69
Uncovered lines: 0
Coverable lines: 69
Total lines: 141
Line coverage: 100%
Branch coverage
100%
Covered branches: 8
Total branches: 8
Branch coverage: 100%
Method coverage
100%
Covered methods: 5
Total methods: 5
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_ServerAddress()100%11100%
.ctor(...)100%11100%
AcceptAsync()100%22100%
DisposeAsync()100%44100%
TryQueueConnect(...)100%22100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Transports.Coloc/Internal/ColocListener.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using System.Diagnostics;
 4using System.Diagnostics.CodeAnalysis;
 5using System.IO.Pipelines;
 6using System.Net;
 7using System.Threading.Channels;
 8
 9namespace IceRpc.Transports.Coloc.Internal;
 10
 11/// <summary>The listener implementation for the colocated transport.</summary>
 12internal class ColocListener : IListener<IDuplexConnection>
 13{
 147614    public ServerAddress ServerAddress { get; }
 15
 50316    private readonly CancellationTokenSource _disposeCts = new();
 17    private readonly EndPoint _networkAddress;
 18    private readonly PipeOptions _pipeOptions;
 19
 20    // The channel used by the client connection ConnectAsync method to queue a connection establishment request. A
 21    // client connection establishment request is represented by:
 22    // - a TaskCompletionSource which is completed by AcceptAsync when the connection is accepted. The server connection
 23    //   pipe reader is set as the result. ClientColocConnection.ConnectAsync waits on the task completion source task.
 24    // - the client connection pipe reader provided to the server connection when the server connection is created by
 25    //   AcceptAsync.
 26    private readonly Channel<(TaskCompletionSource<PipeReader>, PipeReader)> _channel;
 27
 28    public async Task<(IDuplexConnection, EndPoint)> AcceptAsync(CancellationToken cancellationToken)
 53429    {
 53430        ObjectDisposedException.ThrowIf(_disposeCts.IsCancellationRequested, this);
 31
 53232        using var cts = CancellationTokenSource.CreateLinkedTokenSource(_disposeCts.Token, cancellationToken);
 33        try
 53234        {
 53435            while (true)
 53436            {
 53437                (TaskCompletionSource<PipeReader> tcs, PipeReader clientPipeReader) =
 53438                    await _channel.Reader.ReadAsync(cts.Token).ConfigureAwait(false);
 39
 45640                var serverPipe = new Pipe(_pipeOptions);
 45641                if (tcs.TrySetResult(serverPipe.Reader))
 45442                {
 45443                    var serverConnection = new ServerColocConnection(
 45444                        ServerAddress,
 45445                        serverPipe.Writer,
 45446                        clientPipeReader);
 45447                    return (serverConnection, _networkAddress);
 48                }
 49                else
 250                {
 51                    // The client connection establishment was canceled.
 252                    serverPipe.Writer.Complete();
 253                    serverPipe.Reader.Complete();
 254                }
 255            }
 56        }
 7857        catch (OperationCanceledException)
 7858        {
 7859            cancellationToken.ThrowIfCancellationRequested();
 60            // The accept operation was canceled because the listener was disposed.
 1161            Debug.Assert(_disposeCts.IsCancellationRequested);
 1162            throw new ObjectDisposedException($"{typeof(ColocListener)}");
 63        }
 45464    }
 65
 66    public ValueTask DisposeAsync()
 89467    {
 89468        if (_disposeCts.IsCancellationRequested)
 39369        {
 70            // Dispose already called.
 39371            return default;
 72        }
 73
 74        // Cancel pending AcceptAsync.
 50175        _disposeCts.Cancel();
 76
 77        // Ensure no more client connection establishment request is queued.
 50178        _channel.Writer.Complete();
 79
 80        // Complete all the queued client connection establishment requests with IceRpcError.ConnectionRefused. Use
 81        // TrySetException in case the task has been already canceled.
 51982        while (_channel.Reader.TryRead(out (TaskCompletionSource<PipeReader> Tcs, PipeReader) item))
 1883        {
 1884            item.Tcs.TrySetException(new IceRpcException(IceRpcError.ConnectionRefused));
 1885        }
 86
 50187        _disposeCts.Dispose();
 88
 50189        return default;
 89490    }
 91
 50392    internal ColocListener(
 50393        ServerAddress serverAddress,
 50394        ColocTransportOptions colocTransportOptions,
 50395        DuplexConnectionOptions duplexConnectionOptions)
 50396    {
 50397        ServerAddress = serverAddress;
 98
 50399        _networkAddress = new ColocEndPoint(serverAddress);
 503100        _pipeOptions = new PipeOptions(
 503101            pool: duplexConnectionOptions.Pool,
 503102            minimumSegmentSize: duplexConnectionOptions.MinSegmentSize,
 503103            pauseWriterThreshold: colocTransportOptions.PauseWriterThreshold,
 503104            resumeWriterThreshold: colocTransportOptions.ResumeWriterThreshold,
 503105            useSynchronizationContext: false);
 106
 107        // Create a bounded channel with a capacity that matches the listen backlog, and with
 108        // the default concurrency settings that allow multiple reader and writers.
 503109        _channel = Channel.CreateBounded<(TaskCompletionSource<PipeReader>, PipeReader)>(
 503110            new BoundedChannelOptions(colocTransportOptions.ListenBacklog));
 503111    }
 112
 113    /// <summary>Queue client connection establishment requests from the client.</summary>
 114    /// <param name="clientPipeReader">A <see cref="PipeReader"/> for reading from the client connection.</param>
 115    /// <param name="cancellationToken">>A cancellation token that receives the cancellation requests.</param>
 116    /// <param name="serverPipeReaderTask">A task that returns a <see cref="PipeReader"/> for reading from the server
 117    /// connection.</param>
 118    /// <returns>Returns true if the connection establishment request has been queue otherwise, false.</returns>
 119    internal bool TryQueueConnect(
 120        PipeReader clientPipeReader,
 121        CancellationToken cancellationToken,
 122        [NotNullWhen(true)] out Task<PipeReader>? serverPipeReaderTask)
 480123    {
 124        // Create a tcs that is completed by AcceptAsync when accepts the corresponding connection, at which point
 125        // the client side connect operation will complete.
 126        // We use RunContinuationsAsynchronously to avoid the ConnectAsync continuation end up running in the AcceptAsyn
 127        // loop that completes this tcs.
 480128        var tcs = new TaskCompletionSource<PipeReader>(TaskCreationOptions.RunContinuationsAsynchronously);
 480129        if (_channel.Writer.TryWrite((tcs, clientPipeReader)))
 474130        {
 510131            cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken));
 474132            serverPipeReaderTask = tcs.Task;
 474133            return true;
 134        }
 135        else
 6136        {
 6137            serverPipeReaderTask = null;
 6138            return false;
 139        }
 480140    }
 141}