< Summary

Information
Class: IceRpc.Transports.Slic.Internal.SlicDuplexConnectionWriter
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicDuplexConnectionWriter.cs
Tag: 1321_24790053727
Line coverage
98%
Covered lines: 61
Uncovered lines: 1
Coverable lines: 62
Total lines: 113
Line coverage: 98.3%
Branch coverage
100%
Covered branches: 2
Total branches: 2
Branch coverage: 100%
Method coverage
88%
Covered methods: 8
Fully covered methods: 7
Total methods: 9
Method coverage: 88.8%
Full method coverage: 77.7%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_WriterTask()100%11100%
.ctor(...)100%11100%
Advance(...)100%11100%
DisposeAsync()100%22100%
PerformDisposeAsync()100%11100%
GetMemory(...)100%210%
GetSpan(...)100%11100%
Flush()100%11100%
Shutdown()100%11100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicDuplexConnectionWriter.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using System.Buffers;
 4using System.Diagnostics;
 5using System.IO.Pipelines;
 6
 7namespace IceRpc.Transports.Slic.Internal;
 8
 9/// <summary>A helper class to write data to a duplex connection. Its methods shouldn't be called concurrently. The data
 10/// written to this writer is copied and buffered with an internal pipe. The data from the pipe is written on the duplex
 11/// connection with a background task.</summary>
 12internal class SlicDuplexConnectionWriter : IBufferWriter<byte>, IAsyncDisposable
 13{
 155614    internal Task WriterTask { get; private init; }
 15
 16    private readonly IDuplexConnection _connection;
 70617    private readonly CancellationTokenSource _disposeCts = new();
 18    private Task? _disposeTask;
 19    private readonly Pipe _pipe;
 20
 9056221    public void Advance(int bytes) => _pipe.Writer.Advance(bytes);
 22
 23    /// <inheritdoc/>
 24    public ValueTask DisposeAsync()
 70525    {
 70526        _disposeTask ??= PerformDisposeAsync();
 70527        return new(_disposeTask);
 28
 29        async Task PerformDisposeAsync()
 70530        {
 70531            _disposeCts.Cancel();
 32
 70533            await WriterTask.ConfigureAwait(false);
 34
 70535            _pipe.Reader.Complete();
 70536            _pipe.Writer.Complete();
 37
 70538            _disposeCts.Dispose();
 70539        }
 70540    }
 41
 42    /// <inheritdoc/>
 043    public Memory<byte> GetMemory(int sizeHint = 0) => _pipe.Writer.GetMemory(sizeHint);
 44
 45    /// <inheritdoc/>
 9056246    public Span<byte> GetSpan(int sizeHint = 0) => _pipe.Writer.GetSpan(sizeHint);
 47
 48    /// <summary>Constructs a duplex connection writer.</summary>
 49    /// <param name="connection">The duplex connection to write to.</param>
 50    /// <param name="pool">The memory pool to use.</param>
 51    /// <param name="minimumSegmentSize">The minimum segment size for buffers allocated from <paramref
 52    /// name="pool"/>.</param>
 70653    internal SlicDuplexConnectionWriter(IDuplexConnection connection, MemoryPool<byte> pool, int minimumSegmentSize)
 70654    {
 70655        _connection = connection;
 56
 57        // We set pauseWriterThreshold to 0 because Slic implements flow-control at the stream level. So there's no need
 58        // to limit the amount of data buffered by the writer pipe. The amount of data buffered is limited to
 59        // (MaxBidirectionalStreams + MaxUnidirectionalStreams) * PeerPauseWriterThreshold bytes.
 70660        _pipe = new Pipe(new PipeOptions(
 70661            pool: pool,
 70662            minimumSegmentSize: minimumSegmentSize,
 70663            pauseWriterThreshold: 0,
 70664            useSynchronizationContext: false));
 65
 70666        WriterTask = Task.Run(
 70667            async () =>
 70668            {
 70669                try
 70670                {
 604271                    while (true)
 604272                    {
 604273                        ReadResult readResult = await _pipe.Reader.ReadAsync(_disposeCts.Token).ConfigureAwait(false);
 70674
 549075                        if (!readResult.Buffer.IsEmpty)
 539576                        {
 539577                            await _connection.WriteAsync(readResult.Buffer, _disposeCts.Token).ConfigureAwait(false);
 538678                            _pipe.Reader.AdvanceTo(readResult.Buffer.End);
 538679                        }
 70680
 548181                        if (readResult.IsCompleted)
 14582                        {
 14583                            await _connection.ShutdownWriteAsync(_disposeCts.Token).ConfigureAwait(false);
 14384                            break;
 70685                        }
 533686                    }
 14387                    _pipe.Reader.Complete();
 14388                }
 55989                catch (OperationCanceledException)
 55990                {
 70691                    // DisposeAsync was called.
 55992                }
 393                catch (Exception exception)
 394                {
 395                    _pipe.Reader.Complete(exception);
 396                }
 141197            });
 70698    }
 99
 100    internal void Flush()
 13004101    {
 13004102        ValueTask<FlushResult> flushResult = _pipe.Writer.FlushAsync(CancellationToken.None);
 103
 104        // PauseWriterThreshold is 0 so FlushAsync should always complete synchronously.
 13004105        Debug.Assert(flushResult.IsCompletedSuccessfully);
 13004106    }
 107
 108    /// <summary>Requests the shut down of the duplex connection writes after the buffered data is written on the
 109    /// duplex connection.</summary>
 110    internal void Shutdown() =>
 111        // Completing the pipe writer makes the background write task complete successfully.
 145112        _pipe.Writer.Complete();
 113}