< Summary

Information
Class: IceRpc.Transports.Slic.Internal.SlicStream
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicStream.cs
Tag: 1856_27024993493
Line coverage
92%
Covered lines: 213
Uncovered lines: 18
Coverable lines: 231
Total lines: 448
Line coverage: 92.2%
Branch coverage
92%
Covered branches: 78
Total branches: 84
Branch coverage: 92.8%
Method coverage
100%
Covered methods: 28
Fully covered methods: 23
Total methods: 28
Method coverage: 100%
Full method coverage: 82.1%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_Id()50%2271.42%
set_Id(...)100%11100%
get_Input()50%22100%
get_IsBidirectional()100%11100%
get_IsRemote()100%11100%
get_IsStarted()100%11100%
get_Output()50%22100%
get_WritesClosed()100%11100%
get_WindowUpdateThreshold()100%11100%
.ctor(...)100%1212100%
AcquireSendCreditAsync(...)100%11100%
Close(...)100%44100%
CloseReads(...)100%242492.5%
CloseWrites(...)100%151486.04%
ConsumedSendCredit(...)100%11100%
FillBufferWriterAsync(...)100%11100%
ReceivedDataFrameAsync(...)100%66100%
ReceivedReadsClosedFrame()50%22100%
ReceivedWindowUpdateFrame(...)50%3250%
ReceivedWritesClosedFrame()50%22100%
WindowUpdate(...)100%1172.72%
WriteStreamFrameAsync(...)100%22100%
WroteLastStreamFrame()100%22100%
ThrowIfConnectionClosed()100%11100%
TrySetReadsClosed()100%11100%
TrySetWritesClosed()100%22100%
TrySetState(...)100%66100%
WriteStreamFrame(...)100%11100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicStream.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Transports.Internal;
 4using System.Buffers;
 5using System.Diagnostics;
 6using System.IO.Pipelines;
 7using ZeroC.Slice.Codec;
 8
 9namespace IceRpc.Transports.Slic.Internal;
 10
 11/// <summary>The stream implementation for Slic.</summary>
 12/// <remarks>The stream implementation implements flow control to ensure data isn't buffered indefinitely if the
 13/// application doesn't consume it.</remarks>
 14internal class SlicStream : IMultiplexedStream
 15{
 16    public ulong Id
 17    {
 18        get
 1770119        {
 1770120            ulong id = Volatile.Read(ref _id);
 1770121            if (id == ulong.MaxValue)
 022            {
 023                throw new InvalidOperationException("The stream ID isn't allocated yet.");
 24            }
 1770125            return id;
 1770126        }
 27
 28        set
 420529        {
 420530            Debug.Assert(_id == ulong.MaxValue);
 420531            Volatile.Write(ref _id, value);
 420532        }
 33    }
 34
 35    public PipeReader Input =>
 1389136        _inputPipeReader ?? throw new InvalidOperationException("A local unidirectional stream has no Input.");
 37
 38    /// <inheritdoc/>
 3560939    public bool IsBidirectional { get; }
 40
 41    /// <inheritdoc/>
 3228642    public bool IsRemote { get; }
 43
 44    /// <inheritdoc/>
 2737145    public bool IsStarted => Volatile.Read(ref _id) != ulong.MaxValue;
 46
 47    public PipeWriter Output =>
 909948        _outputPipeWriter ?? throw new InvalidOperationException("A remote unidirectional stream has no Output.");
 49
 227850    public Task WritesClosed => _writesClosedTcs.Task;
 51
 743052    internal int WindowUpdateThreshold => _connection.StreamWindowUpdateThreshold;
 53
 54    private bool _closeReadsOnWritesClosure;
 55    private readonly SlicConnection _connection;
 422156    private ulong _id = ulong.MaxValue;
 57    private readonly SlicPipeReader? _inputPipeReader;
 58    // This mutex protects _writesClosePending, _closeReadsOnWritesClosure.
 422159    private readonly Lock _mutex = new();
 60    private readonly SlicPipeWriter? _outputPipeWriter;
 61    // FlagEnumExtensions operations are used to update the state. These operations are atomic and don't require mutex
 62    // locking.
 63    private int _state;
 422164    private readonly TaskCompletionSource _writesClosedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
 65    private bool _writesClosePending;
 66
 422167    internal SlicStream(SlicConnection connection, bool isBidirectional, bool isRemote)
 422168    {
 422169        _connection = connection;
 70
 422171        IsBidirectional = isBidirectional;
 422172        IsRemote = isRemote;
 73
 422174        if (!IsBidirectional)
 284775        {
 284776            if (IsRemote)
 141977            {
 78                // Write-side of remote unidirectional stream is marked as closed.
 141979                TrySetWritesClosed();
 141980            }
 81            else
 142882            {
 83                // Read-side of local unidirectional stream is marked as closed.
 142884                TrySetReadsClosed();
 142885            }
 284786        }
 87
 422188        if (IsRemote || IsBidirectional)
 279389        {
 279390            _inputPipeReader = new SlicPipeReader(this, _connection);
 279391        }
 92
 422193        if (!IsRemote || IsBidirectional)
 280294        {
 280295            _outputPipeWriter = new SlicPipeWriter(this, _connection);
 280296        }
 422197    }
 98
 99    /// <summary>Acquires send credit.</summary>
 100    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 101    /// <returns>The available send credit.</returns>
 102    /// <remarks>This method should be called before sending a <see cref="FrameType.Stream"/> or <see
 103    /// cref="FrameType.StreamLast"/> frame to ensure enough send credit is available. If no send credit is available,
 104    /// it will block until send credit is available. The send credit matches the size of the peer's flow-control
 105    /// window.</remarks>
 106    internal ValueTask<int> AcquireSendCreditAsync(CancellationToken cancellationToken) =>
 9223107        _outputPipeWriter!.AcquireSendCreditAsync(cancellationToken);
 108
 109    /// <summary>Closes the read and write sides of the stream and notifies the stream <see cref="Input" /> and <see
 110    /// cref="Output" /> of the reads and writes closure.</summary>
 111    internal void Close(Exception closeException)
 691112    {
 691113        if (TrySetReadsClosed())
 333114        {
 333115            Debug.Assert(_inputPipeReader is not null);
 333116            _inputPipeReader.CompleteReads(closeException);
 333117        }
 691118        if (TrySetWritesClosed())
 391119        {
 391120            Debug.Assert(_outputPipeWriter is not null);
 391121            _outputPipeWriter.CompleteWrites(closeException);
 391122        }
 691123    }
 124
 125    /// <summary>Closes the read-side of the stream. It's only called by <see cref="SlicPipeReader.Complete" />, <see
 126    /// cref="SlicPipeReader.TryRead" /> or <see cref="SlicPipeReader.ReadAsync" /> and never called concurrently.
 127    /// </summary>
 128    /// <param name="graceful"><see langword="true" /> if the application consumed all the stream data from the stream
 129    /// <see cref="Input" />; otherwise, <see langword="false" />.</param>
 130    internal void CloseReads(bool graceful)
 4700131    {
 4700132        bool writeReadsClosedFrame = false;
 133
 134        lock (_mutex)
 4700135        {
 4700136            if (IsStarted && !_state.HasFlag(State.ReadsClosed))
 2404137            {
 138                // As an optimization, if reads are gracefully closed once the buffered data is consumed but before
 139                // writes are closed, we don't send the StreamReadsClosed frame just yet. Instead, when writes are
 140                // closed, CloseWrites will bundle the sending of the StreamReadsClosed with the sending of the
 141                // StreamLast or StreamWritesClosed frame. This allows to send both frames with a single write on the
 142                // duplex connection.
 2404143                if (graceful &&
 2404144                    IsBidirectional &&
 2404145                    IsRemote &&
 2404146                    !_state.HasFlag(State.WritesClosed) &&
 2404147                    !_writesClosePending)
 491148                {
 491149                    _closeReadsOnWritesClosure = true;
 491150                }
 1913151                else if (!graceful || IsRemote)
 1346152                {
 153                    // If forcefully closed because the input was completed before the data was fully read or if writes
 154                    // are already closed and the stream is a remote stream, we send the StreamReadsClosed frame to
 155                    // notify the peer that reads are closed.
 1346156                    writeReadsClosedFrame = true;
 1346157                }
 2404158            }
 4700159        }
 160
 4700161        if (writeReadsClosedFrame)
 1346162        {
 1346163            if (IsRemote)
 1271164            {
 165                // If it's a remote stream, we close writes before sending the StreamReadsClosed frame to ensure
 166                // _connection._bidirectionalStreamCount or _connection._unidirectionalStreamCount is decreased before
 167                // the peer receives the frame. This is necessary to prevent a race condition where the peer could
 168                // release the connection's bidirectional or unidirectional stream semaphore before this connection's
 169                // stream count is actually decreased.
 1271170                TrySetReadsClosed();
 1271171            }
 172
 173            try
 1346174            {
 1346175                WriteStreamFrame(FrameType.StreamReadsClosed, encode: null, writeReadsClosedFrame: false);
 1346176            }
 0177            catch (IceRpcException)
 0178            {
 179                // Ignore connection failures.
 0180            }
 181
 1346182            if (!IsRemote)
 75183            {
 184                // We can now close reads to permit a new stream to be started. The peer will receive the
 185                // StreamReadsClosed frame before the new stream sends a Stream frame.
 75186                TrySetReadsClosed();
 75187            }
 1346188        }
 189        else
 3354190        {
 3354191            TrySetReadsClosed();
 3354192        }
 4700193    }
 194
 195    /// <summary>Closes the write-side of the stream. It's only called by <see cref="SlicPipeWriter.Complete" /> and
 196    /// never called concurrently.</summary>
 197    /// <param name="graceful"><see langword="true" /> if the application wrote all the stream data on the stream <see
 198    /// cref="Output" />; otherwise, <see langword="false" />.</param>
 199    internal void CloseWrites(bool graceful)
 2774200    {
 2774201        bool writeWritesClosedFrame = false;
 2774202        bool writeReadsClosedFrame = false;
 203
 204        lock (_mutex)
 2774205        {
 2774206            if (IsStarted && !_state.HasFlag(State.WritesClosed) && !_writesClosePending)
 642207            {
 642208                writeReadsClosedFrame = _closeReadsOnWritesClosure;
 642209                _writesClosePending = true;
 642210                writeWritesClosedFrame = true;
 642211            }
 2774212        }
 213
 2774214        if (writeWritesClosedFrame)
 642215        {
 642216            if (IsRemote)
 260217            {
 218                // If it's a remote stream, we close writes before sending the StreamLast or StreamWritesClosed
 219                // frame to ensure _connection._bidirectionalStreamCount or _connection._unidirectionalStreamCount
 220                // is decreased before the peer receives the frame. This is necessary to prevent a race condition
 221                // where the peer could release the connection's bidirectional or unidirectional stream semaphore
 222                // before this connection's stream count is actually decreased.
 260223                TrySetWritesClosed();
 260224            }
 225
 642226            if (graceful)
 594227            {
 228                try
 594229                {
 594230                    WriteStreamFrame(FrameType.StreamLast, encode: null, writeReadsClosedFrame);
 594231                }
 0232                catch (IceRpcException)
 0233                {
 234                    // Ignore connection failures.
 0235                }
 236
 237                // If the stream is a local stream, writes are not closed until the StreamReadsClosed frame is
 238                // received from the peer (see ReceivedReadsClosedFrame). This ensures that the connection's
 239                // bidirectional or unidirectional stream semaphore is released only once the peer consumed the
 240                // buffered data.
 594241            }
 242            else
 48243            {
 244                try
 48245                {
 48246                    WriteStreamFrame(FrameType.StreamWritesClosed, encode: null, writeReadsClosedFrame);
 48247                }
 0248                catch (IceRpcException)
 0249                {
 250                    // Ignore connection failures.
 0251                }
 252
 48253                if (!IsRemote)
 27254                {
 255                    // We can now close writes to allow starting a new stream. Since the sending of frames is
 256                    // serialized over the connection, the peer will receive this StreamWritesClosed frame before
 257                    // a new stream sends a StreamFrame frame.
 27258                    TrySetWritesClosed();
 27259                }
 48260            }
 642261        }
 262        else
 2132263        {
 2132264            TrySetWritesClosed();
 2132265        }
 2774266    }
 267
 268    /// <summary>Notifies the stream of the amount of data consumed by the connection to send a <see
 269    /// cref="FrameType.Stream" /> or <see cref="FrameType.StreamLast" /> frame.</summary>
 270    /// <param name="size">The size of the stream frame.</param>
 8214271    internal void ConsumedSendCredit(int size) => _outputPipeWriter!.ConsumedSendCredit(size);
 272
 273    /// <summary>Fills the given writer with stream data received on the connection.</summary>
 274    /// <param name="bufferWriter">The destination buffer writer.</param>
 275    /// <param name="byteCount">The amount of stream data to read.</param>
 276    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 277    internal ValueTask FillBufferWriterAsync(
 278        IBufferWriter<byte> bufferWriter,
 279        int byteCount,
 280        CancellationToken cancellationToken) =>
 8684281        _connection.FillBufferWriterAsync(bufferWriter, byteCount, cancellationToken);
 282
 283    /// <summary>Notifies the stream of the reception of a <see cref="FrameType.Stream" /> or <see
 284    /// cref="FrameType.StreamLast" /> frame.</summary>
 285    /// <param name="size">The size of the data carried by the stream frame.</param>
 286    /// <param name="endStream"><see langword="true" /> if the received stream frame is the <see
 287    /// cref="FrameType.StreamLast" /> frame; otherwise, <see langword="false" />.</param>
 288    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 289    internal ValueTask<bool> ReceivedDataFrameAsync(int size, bool endStream, CancellationToken cancellationToken)
 8709290    {
 8709291        Debug.Assert(_inputPipeReader is not null);
 8709292        if (_state.HasFlag(State.ReadsClosed))
 25293        {
 25294            return new(false);
 295        }
 296        else
 8684297        {
 8684298            if (endStream && !IsRemote)
 567299            {
 300                // For a local stream we can close reads after we have received the StreamLast frame. For remote
 301                // streams reads are closed after the application has consumed all the data.
 567302                CloseReads(graceful: true);
 567303            }
 8684304            return _inputPipeReader.ReceivedDataFrameAsync(size, endStream, cancellationToken);
 305        }
 8709306    }
 307
 308    /// <summary>Notifies the stream of the reception of a <see cref="FrameType.StreamReadsClosed" /> frame.</summary>
 309    internal void ReceivedReadsClosedFrame()
 1313310    {
 1313311        TrySetWritesClosed();
 1313312        _outputPipeWriter?.CompleteWrites(exception: null);
 1313313    }
 314
 315    /// <summary>Notifies the stream of the reception of a <see cref="FrameType.StreamWindowUpdate" /> frame.</summary>
 316    /// <param name="frame">The body of the <see cref="FrameType.StreamWindowUpdate" /> frame.</param>
 317    internal void ReceivedWindowUpdateFrame(StreamWindowUpdateBody frame)
 1216318    {
 1216319        if (frame.WindowSizeIncrement > SlicTransportOptions.MaxWindowSize)
 0320        {
 0321            throw new IceRpcException(
 0322                IceRpcError.IceRpcError,
 0323                $"The window update is trying to increase the window size to a value larger than allowed.");
 324        }
 1216325        _outputPipeWriter!.ReceivedWindowUpdateFrame((int)frame.WindowSizeIncrement);
 1215326    }
 327
 328    /// <summary>Notifies the stream of the reception of a <see cref="FrameType.StreamWritesClosed" /> frame.</summary>
 329    internal void ReceivedWritesClosedFrame()
 47330    {
 47331        TrySetReadsClosed();
 332
 333        // Read operations will return a TruncatedData error if the peer closed writes.
 47334        _inputPipeReader?.CompleteReads(new IceRpcException(IceRpcError.TruncatedData));
 47335    }
 336
 337    /// <summary>Notifies the stream of the window update.</summary>
 338    /// <param name="size">The amount of data consumed by the application on the stream <see cref="Input" />.</param>
 339    internal void WindowUpdate(int size)
 1255340    {
 341        try
 1255342        {
 343            // Notify the sender of the window update to permit the sending of additional data.
 1255344            WriteStreamFrame(
 1255345                FrameType.StreamWindowUpdate,
 1255346                new StreamWindowUpdateBody((ulong)size).Encode,
 1255347                writeReadsClosedFrame: false);
 1255348        }
 0349        catch (IceRpcException)
 0350        {
 351            // Ignore connection failures.
 0352        }
 1255353    }
 354
 355    /// <summary>Writes a <see cref="FrameType.Stream" /> or <see cref="FrameType.StreamLast" /> frame on the
 356    /// connection.</summary>
 357    /// <param name="source1">The first stream frame data source.</param>
 358    /// <param name="source2">The second stream frame data source.</param>
 359    /// <param name="endStream"><see langword="true" /> to write a <see cref="FrameType.StreamLast" /> frame; otherwise,
 360    /// <see langword="false" />.</param>
 361    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 362    internal ValueTask<FlushResult> WriteStreamFrameAsync(
 363        ReadOnlySequence<byte> source1,
 364        ReadOnlySequence<byte> source2,
 365        bool endStream,
 366        CancellationToken cancellationToken)
 7942367    {
 7942368        bool writeReadsClosedFrame = false;
 7942369        if (endStream)
 1801370        {
 371            lock (_mutex)
 1801372            {
 1801373                writeReadsClosedFrame = _closeReadsOnWritesClosure;
 1801374                _writesClosePending = true;
 1801375            }
 1801376        }
 377
 7942378        return _connection.WriteStreamDataFrameAsync(
 7942379            this,
 7942380            source1,
 7942381            source2,
 7942382            endStream,
 7942383            writeReadsClosedFrame,
 7942384            cancellationToken);
 7942385    }
 386
 387    /// <summary>Notifies the stream that the <see cref="FrameType.StreamLast" /> was written by the
 388    /// connection.</summary>
 389    internal void WroteLastStreamFrame()
 1393390    {
 1393391        if (IsRemote)
 628392        {
 628393            TrySetWritesClosed();
 628394        }
 395        // For local streams, writes will be closed only once the peer sends the StreamReadsClosed frame.
 396
 1393397        _writesClosedTcs.TrySetResult();
 1393398    }
 399
 400    /// <summary>Throws the connection closure exception if the connection is closed.</summary>
 7968401    internal void ThrowIfConnectionClosed() => _connection.ThrowIfClosed();
 402
 6866403    private bool TrySetReadsClosed() => TrySetState(State.ReadsClosed);
 404
 405    private bool TrySetWritesClosed()
 6470406    {
 6470407        if (TrySetState(State.WritesClosed))
 4216408        {
 4216409            _writesClosedTcs.TrySetResult();
 4216410            return true;
 411        }
 412        else
 2254413        {
 2254414            return false;
 415        }
 6470416    }
 417
 418    private bool TrySetState(State state)
 13336419    {
 13336420        if (_state.TrySetFlag(state, out int newState))
 8435421        {
 8435422            if (newState.HasFlag(State.ReadsClosed | State.WritesClosed))
 4216423            {
 424                // The stream reads and writes are closed, it's time to release the stream to either allow creating or
 425                // accepting a new stream.
 4216426                if (IsStarted)
 4205427                {
 4205428                    _connection.ReleaseStream(this);
 4205429                }
 4216430            }
 8435431            return true;
 432        }
 433        else
 4901434        {
 4901435            return false;
 436        }
 13336437    }
 438
 439    private void WriteStreamFrame(FrameType frameType, EncodeAction? encode, bool writeReadsClosedFrame) =>
 3243440        _connection.WriteStreamFrame(stream: this, frameType, encode, writeReadsClosedFrame);
 441
 442    [Flags]
 443    private enum State : int
 444    {
 445        ReadsClosed = 1,
 446        WritesClosed = 2
 447    }
 448}