< Summary

Information
Class: IceRpc.Transports.Slic.Internal.SlicStream
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicStream.cs
Tag: 592_20856082467
Line coverage
92%
Covered lines: 213
Uncovered lines: 18
Coverable lines: 231
Total lines: 448
Line coverage: 92.2%
Branch coverage
92%
Covered branches: 78
Total branches: 84
Branch coverage: 92.8%
Method coverage
100%
Covered methods: 28
Total methods: 28
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_Id()50%2.09271.42%
set_Id(...)100%11100%
get_Input()50%22100%
get_IsBidirectional()100%11100%
get_IsRemote()100%11100%
get_IsStarted()100%11100%
get_Output()50%22100%
get_WritesClosed()100%11100%
get_WindowUpdateThreshold()100%11100%
.ctor(...)100%1212100%
AcquireSendCreditAsync(...)100%11100%
Close(...)100%44100%
CloseReads(...)100%24.242492.5%
CloseWrites(...)100%14.531486.04%
ConsumedSendCredit(...)100%11100%
FillBufferWriterAsync(...)100%11100%
ReceivedDataFrameAsync(...)100%66100%
ReceivedReadsClosedFrame()50%22100%
ReceivedWindowUpdateFrame(...)50%2.5250%
ReceivedWritesClosedFrame()50%22100%
WindowUpdate(...)100%1.02172.72%
WriteStreamFrameAsync(...)100%22100%
WroteLastStreamFrame()100%22100%
ThrowIfConnectionClosed()100%11100%
TrySetReadsClosed()100%11100%
TrySetWritesClosed()100%22100%
TrySetState(...)100%66100%
WriteStreamFrame(...)100%11100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicStream.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Transports.Internal;
 4using System.Buffers;
 5using System.Diagnostics;
 6using System.IO.Pipelines;
 7using ZeroC.Slice;
 8
 9namespace IceRpc.Transports.Slic.Internal;
 10
 11/// <summary>The stream implementation for Slic.</summary>
 12/// <remarks>The stream implementation implements flow control to ensure data isn't buffered indefinitely if the
 13/// application doesn't consume it.</remarks>
 14internal class SlicStream : IMultiplexedStream
 15{
 16    public ulong Id
 17    {
 18        get
 1821319        {
 1821320            ulong id = Volatile.Read(ref _id);
 1821321            if (id == ulong.MaxValue)
 022            {
 023                throw new InvalidOperationException("The stream ID isn't allocated yet.");
 24            }
 1821325            return id;
 1821326        }
 27
 28        set
 410129        {
 410130            Debug.Assert(_id == ulong.MaxValue);
 410131            Volatile.Write(ref _id, value);
 410132        }
 33    }
 34
 35    public PipeReader Input =>
 1170636        _inputPipeReader ?? throw new InvalidOperationException("A local unidirectional stream has no Input.");
 37
 38    /// <inheritdoc/>
 3500139    public bool IsBidirectional { get; }
 40
 41    /// <inheritdoc/>
 3161842    public bool IsRemote { get; }
 43
 44    /// <inheritdoc/>
 2764845    public bool IsStarted => Volatile.Read(ref _id) != ulong.MaxValue;
 46
 47    public PipeWriter Output =>
 897948        _outputPipeWriter ?? throw new InvalidOperationException("A remote unidirectional stream has no Output.");
 49
 224950    public Task WritesClosed => _writesClosedTcs.Task;
 51
 636352    internal int WindowUpdateThreshold => _connection.StreamWindowUpdateThreshold;
 53
 54    private bool _closeReadsOnWritesClosure;
 55    private readonly SlicConnection _connection;
 411856    private ulong _id = ulong.MaxValue;
 57    private readonly SlicPipeReader? _inputPipeReader;
 58    // This mutex protects _writesClosePending, _closeReadsOnWritesClosure.
 411859    private readonly Lock _mutex = new();
 60    private readonly SlicPipeWriter? _outputPipeWriter;
 61    // FlagEnumExtensions operations are used to update the state. These operations are atomic and don't require mutex
 62    // locking.
 63    private int _state;
 411864    private readonly TaskCompletionSource _writesClosedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
 65    private bool _writesClosePending;
 66
 411867    internal SlicStream(SlicConnection connection, bool isBidirectional, bool isRemote)
 411868    {
 411869        _connection = connection;
 70
 411871        IsBidirectional = isBidirectional;
 411872        IsRemote = isRemote;
 73
 411874        if (!IsBidirectional)
 279975        {
 279976            if (IsRemote)
 139477            {
 78                // Write-side of remote unidirectional stream is marked as closed.
 139479                TrySetWritesClosed();
 139480            }
 81            else
 140582            {
 83                // Read-side of local unidirectional stream is marked as closed.
 140584                TrySetReadsClosed();
 140585            }
 279986        }
 87
 411888        if (IsRemote || IsBidirectional)
 271389        {
 271390            _inputPipeReader = new SlicPipeReader(this, _connection);
 271391        }
 92
 411893        if (!IsRemote || IsBidirectional)
 272494        {
 272495            _outputPipeWriter = new SlicPipeWriter(this, _connection);
 272496        }
 411897    }
 98
 99    /// <summary>Acquires send credit.</summary>
 100    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 101    /// <returns>The available send credit.</returns>
 102    /// <remarks>This method should be called before sending a <see cref="FrameType.Stream"/> or <see
 103    /// cref="FrameType.StreamLast"/> frame to ensure enough send credit is available. If no send credit is available,
 104    /// it will block until send credit is available. The send credit matches the size of the peer's flow-control
 105    /// window.</remarks>
 106    internal ValueTask<int> AcquireSendCreditAsync(CancellationToken cancellationToken) =>
 10178107        _outputPipeWriter!.AcquireSendCreditAsync(cancellationToken);
 108
 109    /// <summary>Closes the read and write sides of the stream and notifies the stream <see cref="Input" /> and <see
 110    /// cref="Output" /> of the reads and writes closure.</summary>
 111    internal void Close(Exception closeException)
 673112    {
 673113        if (TrySetReadsClosed())
 310114        {
 310115            Debug.Assert(_inputPipeReader is not null);
 310116            _inputPipeReader.CompleteReads(closeException);
 310117        }
 673118        if (TrySetWritesClosed())
 396119        {
 396120            Debug.Assert(_outputPipeWriter is not null);
 396121            _outputPipeWriter.CompleteWrites(closeException);
 396122        }
 673123    }
 124
 125    /// <summary>Closes the read-side of the stream. It's only called by <see cref="SlicPipeReader.Complete" />, <see
 126    /// cref="SlicPipeReader.TryRead" /> or <see cref="SlicPipeReader.ReadAsync" /> and never called concurrently.
 127    /// </summary>
 128    /// <param name="graceful"><see langword="true" /> if the application consumed all the stream data from the stream
 129    /// <see cref="Input" />; otherwise, <see langword="false" />.</param>
 130    internal void CloseReads(bool graceful)
 4596131    {
 4596132        bool writeReadsClosedFrame = false;
 133
 134        lock (_mutex)
 4596135        {
 4596136            if (IsStarted && !_state.HasFlag(State.ReadsClosed))
 2354137            {
 138                // As an optimization, if reads are gracefully closed once the buffered data is consumed but before
 139                // writes are closed, we don't send the StreamReadsClosed frame just yet. Instead, when writes are
 140                // closed, CloseWrites will bundle the sending of the StreamReadsClosed with the sending of the
 141                // StreamLast or StreamWritesClosed frame. This allows to send both frames with a single write on the
 142                // duplex connection.
 2354143                if (graceful &&
 2354144                    IsBidirectional &&
 2354145                    IsRemote &&
 2354146                    !_state.HasFlag(State.WritesClosed) &&
 2354147                    !_writesClosePending)
 459148                {
 459149                    _closeReadsOnWritesClosure = true;
 459150                }
 1895151                else if (!graceful || IsRemote)
 1340152                {
 153                    // If forcefully closed because the input was completed before the data was fully read or if writes
 154                    // are already closed and the stream is a remote stream, we send the StreamReadsClosed frame to
 155                    // notify the peer that reads are closed.
 1340156                    writeReadsClosedFrame = true;
 1340157                }
 2354158            }
 4596159        }
 160
 4596161        if (writeReadsClosedFrame)
 1340162        {
 1340163            if (IsRemote)
 1275164            {
 165                // If it's a remote stream, we close writes before sending the StreamReadsClosed frame to ensure
 166                // _connection._bidirectionalStreamCount or _connection._unidirectionalStreamCount is decreased before
 167                // the peer receives the frame. This is necessary to prevent a race condition where the peer could
 168                // release the connection's bidirectional or unidirectional stream semaphore before this connection's
 169                // stream count is actually decreased.
 1275170                TrySetReadsClosed();
 1275171            }
 172
 173            try
 1340174            {
 1340175                WriteStreamFrame(FrameType.StreamReadsClosed, encode: null, writeReadsClosedFrame: false);
 1340176            }
 0177            catch (IceRpcException)
 0178            {
 179                // Ignore connection failures.
 0180            }
 181
 1340182            if (!IsRemote)
 65183            {
 184                // We can now close reads to permit a new stream to be started. The peer will receive the
 185                // StreamReadsClosed frame before the new stream sends a Stream frame.
 65186                TrySetReadsClosed();
 65187            }
 1340188        }
 189        else
 3256190        {
 3256191            TrySetReadsClosed();
 3256192        }
 4596193    }
 194
 195    /// <summary>Closes the write-side of the stream. It's only called by <see cref="SlicPipeWriter.Complete" /> and
 196    /// never called concurrently.</summary>
 197    /// <param name="graceful"><see langword="true" /> if the application wrote all the stream data on the stream <see
 198    /// cref="Output" />; otherwise, <see langword="false" />.</param>
 199    internal void CloseWrites(bool graceful)
 2696200    {
 2696201        bool writeWritesClosedFrame = false;
 2696202        bool writeReadsClosedFrame = false;
 203
 204        lock (_mutex)
 2696205        {
 2696206            if (IsStarted && !_state.HasFlag(State.WritesClosed) && !_writesClosePending)
 609207            {
 609208                writeReadsClosedFrame = _closeReadsOnWritesClosure;
 609209                _writesClosePending = true;
 609210                writeWritesClosedFrame = true;
 609211            }
 2696212        }
 213
 2696214        if (writeWritesClosedFrame)
 609215        {
 609216            if (IsRemote)
 245217            {
 218                // If it's a remote stream, we close writes before sending the StreamLast or StreamWritesClosed
 219                // frame to ensure _connection._bidirectionalStreamCount or _connection._unidirectionalStreamCount
 220                // is decreased before the peer receives the frame. This is necessary to prevent a race condition
 221                // where the peer could release the connection's bidirectional or unidirectional stream semaphore
 222                // before this connection's stream count is actually decreased.
 245223                TrySetWritesClosed();
 245224            }
 225
 609226            if (graceful)
 566227            {
 228                try
 566229                {
 566230                    WriteStreamFrame(FrameType.StreamLast, encode: null, writeReadsClosedFrame);
 566231                }
 0232                catch (IceRpcException)
 0233                {
 234                    // Ignore connection failures.
 0235                }
 236
 237                // If the stream is a local stream, writes are not closed until the StreamReadsClosed frame is
 238                // received from the peer (see ReceivedReadsClosedFrame). This ensures that the connection's
 239                // bidirectional or unidirectional stream semaphore is released only once the peer consumed the
 240                // buffered data.
 566241            }
 242            else
 43243            {
 244                try
 43245                {
 43246                    WriteStreamFrame(FrameType.StreamWritesClosed, encode: null, writeReadsClosedFrame);
 43247                }
 0248                catch (IceRpcException)
 0249                {
 250                    // Ignore connection failures.
 0251                }
 252
 43253                if (!IsRemote)
 27254                {
 255                    // We can now close writes to allow starting a new stream. Since the sending of frames is
 256                    // serialized over the connection, the peer will receive this StreamWritesClosed frame before
 257                    // a new stream sends a StreamFrame frame.
 27258                    TrySetWritesClosed();
 27259                }
 43260            }
 609261        }
 262        else
 2087263        {
 2087264            TrySetWritesClosed();
 2087265        }
 2696266    }
 267
 268    /// <summary>Notifies the stream of the amount of data consumed by the connection to send a <see
 269    /// cref="FrameType.Stream" /> or <see cref="FrameType.StreamLast" /> frame.</summary>
 270    /// <param name="size">The size of the stream frame.</param>
 9164271    internal void ConsumedSendCredit(int size) => _outputPipeWriter!.ConsumedSendCredit(size);
 272
 273    /// <summary>Fills the given writer with stream data received on the connection.</summary>
 274    /// <param name="bufferWriter">The destination buffer writer.</param>
 275    /// <param name="byteCount">The amount of stream data to read.</param>
 276    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 277    internal ValueTask FillBufferWriterAsync(
 278        IBufferWriter<byte> bufferWriter,
 279        int byteCount,
 280        CancellationToken cancellationToken) =>
 9536281        _connection.FillBufferWriterAsync(bufferWriter, byteCount, cancellationToken);
 282
 283    /// <summary>Notifies the stream of the reception of a <see cref="FrameType.Stream" /> or <see
 284    /// cref="FrameType.StreamLast" /> frame.</summary>
 285    /// <param name="size">The size of the data carried by the stream frame.</param>
 286    /// <param name="endStream"><see langword="true" /> if the received stream frame is the <see
 287    /// cref="FrameType.StreamLast" /> frame; otherwise, <see langword="false" />.</param>
 288    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 289    internal ValueTask<bool> ReceivedDataFrameAsync(int size, bool endStream, CancellationToken cancellationToken)
 9547290    {
 9547291        Debug.Assert(_inputPipeReader is not null);
 9547292        if (_state.HasFlag(State.ReadsClosed))
 10293        {
 10294            return new(false);
 295        }
 296        else
 9537297        {
 9537298            if (endStream && !IsRemote)
 555299            {
 300                // For a local stream we can close reads after we have received the StreamLast frame. For remote
 301                // streams reads are closed after the application has consumed all the data.
 555302                CloseReads(graceful: true);
 555303            }
 9537304            return _inputPipeReader.ReceivedDataFrameAsync(size, endStream, cancellationToken);
 305        }
 9547306    }
 307
 308    /// <summary>Notifies the stream of the reception of a <see cref="FrameType.StreamReadsClosed" /> frame.</summary>
 309    internal void ReceivedReadsClosedFrame()
 1264310    {
 1264311        TrySetWritesClosed();
 1264312        _outputPipeWriter?.CompleteWrites(exception: null);
 1264313    }
 314
 315    /// <summary>Notifies the stream of the reception of a <see cref="FrameType.StreamWindowUpdate" /> frame.</summary>
 316    /// <param name="frame">The body of the <see cref="FrameType.StreamWindowUpdate" /> frame.</param>
 317    internal void ReceivedWindowUpdateFrame(StreamWindowUpdateBody frame)
 968318    {
 968319        if (frame.WindowSizeIncrement > SlicTransportOptions.MaxWindowSize)
 0320        {
 0321            throw new IceRpcException(
 0322                IceRpcError.IceRpcError,
 0323                $"The window update is trying to increase the window size to a value larger than allowed.");
 324        }
 968325        _outputPipeWriter!.ReceivedWindowUpdateFrame((int)frame.WindowSizeIncrement);
 968326    }
 327
 328    /// <summary>Notifies the stream of the reception of a <see cref="FrameType.StreamWritesClosed" /> frame.</summary>
 329    internal void ReceivedWritesClosedFrame()
 40330    {
 40331        TrySetReadsClosed();
 332
 333        // Read operations will return a TruncatedData error if the peer closed writes.
 40334        _inputPipeReader?.CompleteReads(new IceRpcException(IceRpcError.TruncatedData));
 40335    }
 336
 337    /// <summary>Notifies the stream of the window update.</summary>
 338    /// <param name="size">The amount of data consumed by the application on the stream <see cref="Input" />.</param>
 339    internal void WindowUpdate(int size)
 1010340    {
 341        try
 1010342        {
 343            // Notify the sender of the window update to permit the sending of additional data.
 1010344            WriteStreamFrame(
 1010345                FrameType.StreamWindowUpdate,
 1010346                new StreamWindowUpdateBody((ulong)size).Encode,
 1010347                writeReadsClosedFrame: false);
 1010348        }
 0349        catch (IceRpcException)
 0350        {
 351            // Ignore connection failures.
 0352        }
 1010353    }
 354
 355    /// <summary>Writes a <see cref="FrameType.Stream" /> or <see cref="FrameType.StreamLast" /> frame on the
 356    /// connection.</summary>
 357    /// <param name="source1">The first stream frame data source.</param>
 358    /// <param name="source2">The second stream frame data source.</param>
 359    /// <param name="endStream"><see langword="true" /> to write a <see cref="FrameType.StreamLast" /> frame; otherwise,
 360    /// <see langword="false" />.</param>
 361    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 362    internal ValueTask<FlushResult> WriteStreamFrameAsync(
 363        ReadOnlySequence<byte> source1,
 364        ReadOnlySequence<byte> source2,
 365        bool endStream,
 366        CancellationToken cancellationToken)
 7869367    {
 7869368        bool writeReadsClosedFrame = false;
 7869369        if (endStream)
 1786370        {
 371            lock (_mutex)
 1786372            {
 1786373                writeReadsClosedFrame = _closeReadsOnWritesClosure;
 1786374                _writesClosePending = true;
 1786375            }
 1786376        }
 377
 7869378        return _connection.WriteStreamDataFrameAsync(
 7869379            this,
 7869380            source1,
 7869381            source2,
 7869382            endStream,
 7869383            writeReadsClosedFrame,
 7869384            cancellationToken);
 7869385    }
 386
 387    /// <summary>Notifies the stream that the <see cref="FrameType.StreamLast" /> was written by the
 388    /// connection.</summary>
 389    internal void WroteLastStreamFrame()
 1347390    {
 1347391        if (IsRemote)
 606392        {
 606393            TrySetWritesClosed();
 606394        }
 395        // For local streams, writes will be closed only once the peer sends the StreamReadsClosed frame.
 396
 1347397        _writesClosedTcs.TrySetResult();
 1347398    }
 399
 400    /// <summary>Throws the connection closure exception if the connection is closed.</summary>
 7899401    internal void ThrowIfConnectionClosed() => _connection.ThrowIfClosed();
 402
 6714403    private bool TrySetReadsClosed() => TrySetState(State.ReadsClosed);
 404
 405    private bool TrySetWritesClosed()
 6296406    {
 6296407        if (TrySetState(State.WritesClosed))
 4113408        {
 4113409            _writesClosedTcs.TrySetResult();
 4113410            return true;
 411        }
 412        else
 2183413        {
 2183414            return false;
 415        }
 6296416    }
 417
 418    private bool TrySetState(State state)
 13010419    {
 13010420        if (_state.TrySetFlag(state, out int newState))
 8229421        {
 8229422            if (newState.HasFlag(State.ReadsClosed | State.WritesClosed))
 4113423            {
 424                // The stream reads and writes are closed, it's time to release the stream to either allow creating or
 425                // accepting a new stream.
 4113426                if (IsStarted)
 4101427                {
 4101428                    _connection.ReleaseStream(this);
 4101429                }
 4113430            }
 8229431            return true;
 432        }
 433        else
 4781434        {
 4781435            return false;
 436        }
 13010437    }
 438
 439    private void WriteStreamFrame(FrameType frameType, EncodeAction? encode, bool writeReadsClosedFrame) =>
 2959440        _connection.WriteStreamFrame(stream: this, frameType, encode, writeReadsClosedFrame);
 441
 442    [Flags]
 443    private enum State : int
 444    {
 445        ReadsClosed = 1,
 446        WritesClosed = 2
 447    }
 448}