< Summary

Information
Class: IceRpc.Transports.Slic.Internal.SlicDuplexConnectionWriter
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicDuplexConnectionWriter.cs
Tag: 275_13775359185
Line coverage
98%
Covered lines: 61
Uncovered lines: 1
Coverable lines: 62
Total lines: 113
Line coverage: 98.3%
Branch coverage
100%
Covered branches: 2
Total branches: 2
Branch coverage: 100%
Method coverage
88%
Covered methods: 8
Total methods: 9
Method coverage: 88.8%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_WriterTask()100%11100%
.ctor(...)100%11100%
Advance(...)100%11100%
DisposeAsync()100%22100%
PerformDisposeAsync()100%11100%
GetMemory(...)100%210%
GetSpan(...)100%11100%
Flush()100%11100%
Shutdown()100%11100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicDuplexConnectionWriter.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using System.Buffers;
 4using System.Diagnostics;
 5using System.IO.Pipelines;
 6
 7namespace IceRpc.Transports.Slic.Internal;
 8
 9/// <summary>A helper class to write data to a duplex connection. Its methods shouldn't be called concurrently. The data
 10/// written to this writer is copied and buffered with an internal pipe. The data from the pipe is written on the duplex
 11/// connection with a background task.</summary>
 12internal class SlicDuplexConnectionWriter : IBufferWriter<byte>, IAsyncDisposable
 13{
 301414    internal Task WriterTask { get; private init; }
 15
 16    private readonly IDuplexConnection _connection;
 137317    private readonly CancellationTokenSource _disposeCts = new();
 18    private Task? _disposeTask;
 19    private readonly Pipe _pipe;
 20
 17821921    public void Advance(int bytes) => _pipe.Writer.Advance(bytes);
 22
 23    /// <inheritdoc/>
 24    public ValueTask DisposeAsync()
 137125    {
 137126        _disposeTask ??= PerformDisposeAsync();
 137127        return new(_disposeTask);
 28
 29        async Task PerformDisposeAsync()
 137130        {
 137131            _disposeCts.Cancel();
 32
 137133            await WriterTask.ConfigureAwait(false);
 34
 137135            _pipe.Reader.Complete();
 137136            _pipe.Writer.Complete();
 37
 137138            _disposeCts.Dispose();
 137139        }
 137140    }
 41
 42    /// <inheritdoc/>
 043    public Memory<byte> GetMemory(int sizeHint = 0) => _pipe.Writer.GetMemory(sizeHint);
 44
 45    /// <inheritdoc/>
 17821946    public Span<byte> GetSpan(int sizeHint = 0) => _pipe.Writer.GetSpan(sizeHint);
 47
 48    /// <summary>Constructs a duplex connection writer.</summary>
 49    /// <param name="connection">The duplex connection to write to.</param>
 50    /// <param name="pool">The memory pool to use.</param>
 51    /// <param name="minimumSegmentSize">The minimum segment size for buffers allocated from <paramref
 52    /// name="pool"/>.</param>
 137353    internal SlicDuplexConnectionWriter(IDuplexConnection connection, MemoryPool<byte> pool, int minimumSegmentSize)
 137354    {
 137355        _connection = connection;
 56
 57        // We set pauseWriterThreshold to 0 because Slic implements flow-control at the stream level. So there's no need
 58        // to limit the amount of data buffered by the writer pipe. The amount of data buffered is limited to
 59        // (MaxBidirectionalStreams + MaxUnidirectionalStreams) * PeerPauseWriterThreshold bytes.
 137360        _pipe = new Pipe(new PipeOptions(
 137361            pool: pool,
 137362            minimumSegmentSize: minimumSegmentSize,
 137363            pauseWriterThreshold: 0,
 137364            useSynchronizationContext: false));
 65
 137366        WriterTask = Task.Run(
 137367            async () =>
 137368            {
 137369                try
 137370                {
 892371                    while (true)
 892372                    {
 892373                        ReadResult readResult = await _pipe.Reader.ReadAsync(_disposeCts.Token).ConfigureAwait(false);
 137374
 783975                        if (!readResult.Buffer.IsEmpty)
 764576                        {
 764577                            await _connection.WriteAsync(readResult.Buffer, _disposeCts.Token).ConfigureAwait(false);
 762678                            _pipe.Reader.AdvanceTo(readResult.Buffer.End);
 762679                        }
 137380
 782081                        if (readResult.IsCompleted)
 27082                        {
 27083                            await _connection.ShutdownWriteAsync(_disposeCts.Token).ConfigureAwait(false);
 26684                            break;
 137385                        }
 755086                    }
 26687                    _pipe.Reader.Complete();
 26688                }
 109889                catch (OperationCanceledException)
 109890                {
 137391                    // DisposeAsync was called.
 109892                }
 793                catch (Exception exception)
 794                {
 795                    _pipe.Reader.Complete(exception);
 796                }
 274497            });
 137398    }
 99
 100    internal void Flush()
 25475101    {
 25475102        ValueTask<FlushResult> flushResult = _pipe.Writer.FlushAsync(CancellationToken.None);
 103
 104        // PauseWriterThreshold is 0 so FlushAsync should always complete synchronously.
 25475105        Debug.Assert(flushResult.IsCompletedSuccessfully);
 25475106    }
 107
 108    /// <summary>Requests the shut down of the duplex connection writes after the buffered data is written on the
 109    /// duplex connection.</summary>
 110    internal void Shutdown() =>
 111        // Completing the pipe writer makes the background write task complete successfully.
 270112        _pipe.Writer.Complete();
 113}