< Summary

Information
Class: IceRpc.Transports.Slic.Internal.SlicDuplexConnectionWriter
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicDuplexConnectionWriter.cs
Tag: 276_17717543480
Line coverage
98%
Covered lines: 61
Uncovered lines: 1
Coverable lines: 62
Total lines: 113
Line coverage: 98.3%
Branch coverage
100%
Covered branches: 2
Total branches: 2
Branch coverage: 100%
Method coverage
88%
Covered methods: 8
Total methods: 9
Method coverage: 88.8%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_WriterTask()100%11100%
.ctor(...)100%11100%
Advance(...)100%11100%
DisposeAsync()100%22100%
PerformDisposeAsync()100%11100%
GetMemory(...)100%210%
GetSpan(...)100%11100%
Flush()100%11100%
Shutdown()100%11100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicDuplexConnectionWriter.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using System.Buffers;
 4using System.Diagnostics;
 5using System.IO.Pipelines;
 6
 7namespace IceRpc.Transports.Slic.Internal;
 8
 9/// <summary>A helper class to write data to a duplex connection. Its methods shouldn't be called concurrently. The data
 10/// written to this writer is copied and buffered with an internal pipe. The data from the pipe is written on the duplex
 11/// connection with a background task.</summary>
 12internal class SlicDuplexConnectionWriter : IBufferWriter<byte>, IAsyncDisposable
 13{
 300814    internal Task WriterTask { get; private init; }
 15
 16    private readonly IDuplexConnection _connection;
 136917    private readonly CancellationTokenSource _disposeCts = new();
 18    private Task? _disposeTask;
 19    private readonly Pipe _pipe;
 20
 17861721    public void Advance(int bytes) => _pipe.Writer.Advance(bytes);
 22
 23    /// <inheritdoc/>
 24    public ValueTask DisposeAsync()
 136725    {
 136726        _disposeTask ??= PerformDisposeAsync();
 136727        return new(_disposeTask);
 28
 29        async Task PerformDisposeAsync()
 136730        {
 136731            _disposeCts.Cancel();
 32
 136733            await WriterTask.ConfigureAwait(false);
 34
 136735            _pipe.Reader.Complete();
 136736            _pipe.Writer.Complete();
 37
 136738            _disposeCts.Dispose();
 136739        }
 136740    }
 41
 42    /// <inheritdoc/>
 043    public Memory<byte> GetMemory(int sizeHint = 0) => _pipe.Writer.GetMemory(sizeHint);
 44
 45    /// <inheritdoc/>
 17861746    public Span<byte> GetSpan(int sizeHint = 0) => _pipe.Writer.GetSpan(sizeHint);
 47
 48    /// <summary>Constructs a duplex connection writer.</summary>
 49    /// <param name="connection">The duplex connection to write to.</param>
 50    /// <param name="pool">The memory pool to use.</param>
 51    /// <param name="minimumSegmentSize">The minimum segment size for buffers allocated from <paramref
 52    /// name="pool"/>.</param>
 136953    internal SlicDuplexConnectionWriter(IDuplexConnection connection, MemoryPool<byte> pool, int minimumSegmentSize)
 136954    {
 136955        _connection = connection;
 56
 57        // We set pauseWriterThreshold to 0 because Slic implements flow-control at the stream level. So there's no need
 58        // to limit the amount of data buffered by the writer pipe. The amount of data buffered is limited to
 59        // (MaxBidirectionalStreams + MaxUnidirectionalStreams) * PeerPauseWriterThreshold bytes.
 136960        _pipe = new Pipe(new PipeOptions(
 136961            pool: pool,
 136962            minimumSegmentSize: minimumSegmentSize,
 136963            pauseWriterThreshold: 0,
 136964            useSynchronizationContext: false));
 65
 136966        WriterTask = Task.Run(
 136967            async () =>
 136968            {
 136969                try
 136970                {
 997571                    while (true)
 997572                    {
 997573                        ReadResult readResult = await _pipe.Reader.ReadAsync(_disposeCts.Token).ConfigureAwait(false);
 136974
 889875                        if (!readResult.Buffer.IsEmpty)
 871676                        {
 871677                            await _connection.WriteAsync(readResult.Buffer, _disposeCts.Token).ConfigureAwait(false);
 869678                            _pipe.Reader.AdvanceTo(readResult.Buffer.End);
 869679                        }
 136980
 887881                        if (readResult.IsCompleted)
 27282                        {
 27283                            await _connection.ShutdownWriteAsync(_disposeCts.Token).ConfigureAwait(false);
 26884                            break;
 136985                        }
 860686                    }
 26887                    _pipe.Reader.Complete();
 26888                }
 109389                catch (OperationCanceledException)
 109390                {
 136991                    // DisposeAsync was called.
 109392                }
 693                catch (Exception exception)
 694                {
 695                    _pipe.Reader.Complete(exception);
 696                }
 273697            });
 136998    }
 99
 100    internal void Flush()
 25592101    {
 25592102        ValueTask<FlushResult> flushResult = _pipe.Writer.FlushAsync(CancellationToken.None);
 103
 104        // PauseWriterThreshold is 0 so FlushAsync should always complete synchronously.
 25592105        Debug.Assert(flushResult.IsCompletedSuccessfully);
 25592106    }
 107
 108    /// <summary>Requests the shut down of the duplex connection writes after the buffered data is written on the
 109    /// duplex connection.</summary>
 110    internal void Shutdown() =>
 111        // Completing the pipe writer makes the background write task complete successfully.
 272112        _pipe.Writer.Complete();
 113}