< Summary

Information
Class: IceRpc.Transports.Slic.Internal.SlicDuplexConnectionWriter
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicDuplexConnectionWriter.cs
Tag: 592_20856082467
Line coverage
98%
Covered lines: 61
Uncovered lines: 1
Coverable lines: 62
Total lines: 113
Line coverage: 98.3%
Branch coverage
100%
Covered branches: 2
Total branches: 2
Branch coverage: 100%
Method coverage
88%
Covered methods: 8
Total methods: 9
Method coverage: 88.8%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_WriterTask()100%11100%
.ctor(...)100%11100%
Advance(...)100%11100%
DisposeAsync()100%22100%
PerformDisposeAsync()100%11100%
GetMemory(...)100%210%
GetSpan(...)100%11100%
Flush()100%11100%
Shutdown()100%11100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicDuplexConnectionWriter.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using System.Buffers;
 4using System.Diagnostics;
 5using System.IO.Pipelines;
 6
 7namespace IceRpc.Transports.Slic.Internal;
 8
 9/// <summary>A helper class to write data to a duplex connection. Its methods shouldn't be called concurrently. The data
 10/// written to this writer is copied and buffered with an internal pipe. The data from the pipe is written on the duplex
 11/// connection with a background task.</summary>
 12internal class SlicDuplexConnectionWriter : IBufferWriter<byte>, IAsyncDisposable
 13{
 154014    internal Task WriterTask { get; private init; }
 15
 16    private readonly IDuplexConnection _connection;
 70017    private readonly CancellationTokenSource _disposeCts = new();
 18    private Task? _disposeTask;
 19    private readonly Pipe _pipe;
 20
 8983321    public void Advance(int bytes) => _pipe.Writer.Advance(bytes);
 22
 23    /// <inheritdoc/>
 24    public ValueTask DisposeAsync()
 69925    {
 69926        _disposeTask ??= PerformDisposeAsync();
 69927        return new(_disposeTask);
 28
 29        async Task PerformDisposeAsync()
 69930        {
 69931            _disposeCts.Cancel();
 32
 69933            await WriterTask.ConfigureAwait(false);
 34
 69935            _pipe.Reader.Complete();
 69936            _pipe.Writer.Complete();
 37
 69938            _disposeCts.Dispose();
 69939        }
 69940    }
 41
 42    /// <inheritdoc/>
 043    public Memory<byte> GetMemory(int sizeHint = 0) => _pipe.Writer.GetMemory(sizeHint);
 44
 45    /// <inheritdoc/>
 8983346    public Span<byte> GetSpan(int sizeHint = 0) => _pipe.Writer.GetSpan(sizeHint);
 47
 48    /// <summary>Constructs a duplex connection writer.</summary>
 49    /// <param name="connection">The duplex connection to write to.</param>
 50    /// <param name="pool">The memory pool to use.</param>
 51    /// <param name="minimumSegmentSize">The minimum segment size for buffers allocated from <paramref
 52    /// name="pool"/>.</param>
 70053    internal SlicDuplexConnectionWriter(IDuplexConnection connection, MemoryPool<byte> pool, int minimumSegmentSize)
 70054    {
 70055        _connection = connection;
 56
 57        // We set pauseWriterThreshold to 0 because Slic implements flow-control at the stream level. So there's no need
 58        // to limit the amount of data buffered by the writer pipe. The amount of data buffered is limited to
 59        // (MaxBidirectionalStreams + MaxUnidirectionalStreams) * PeerPauseWriterThreshold bytes.
 70060        _pipe = new Pipe(new PipeOptions(
 70061            pool: pool,
 70062            minimumSegmentSize: minimumSegmentSize,
 70063            pauseWriterThreshold: 0,
 70064            useSynchronizationContext: false));
 65
 70066        WriterTask = Task.Run(
 70067            async () =>
 70068            {
 70069                try
 70070                {
 483671                    while (true)
 483672                    {
 483673                        ReadResult readResult = await _pipe.Reader.ReadAsync(_disposeCts.Token).ConfigureAwait(false);
 70074
 428875                        if (!readResult.Buffer.IsEmpty)
 418976                        {
 418977                            await _connection.WriteAsync(readResult.Buffer, _disposeCts.Token).ConfigureAwait(false);
 417878                            _pipe.Reader.AdvanceTo(readResult.Buffer.End);
 417879                        }
 70080
 427781                        if (readResult.IsCompleted)
 14182                        {
 14183                            await _connection.ShutdownWriteAsync(_disposeCts.Token).ConfigureAwait(false);
 13984                            break;
 70085                        }
 413686                    }
 13987                    _pipe.Reader.Complete();
 13988                }
 55689                catch (OperationCanceledException)
 55690                {
 70091                    // DisposeAsync was called.
 55692                }
 493                catch (Exception exception)
 494                {
 495                    _pipe.Reader.Complete(exception);
 496                }
 139997            });
 70098    }
 99
 100    internal void Flush()
 12854101    {
 12854102        ValueTask<FlushResult> flushResult = _pipe.Writer.FlushAsync(CancellationToken.None);
 103
 104        // PauseWriterThreshold is 0 so FlushAsync should always complete synchronously.
 12854105        Debug.Assert(flushResult.IsCompletedSuccessfully);
 12854106    }
 107
 108    /// <summary>Requests the shut down of the duplex connection writes after the buffered data is written on the
 109    /// duplex connection.</summary>
 110    internal void Shutdown() =>
 111        // Completing the pipe writer makes the background write task complete successfully.
 141112        _pipe.Writer.Complete();
 113}