< Summary

Information
Class: IceRpc.Transports.Slic.Internal.SlicConnection
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicConnection.cs
Tag: 1856_27024993493
Line coverage
90%
Covered lines: 979
Uncovered lines: 105
Coverable lines: 1084
Total lines: 1728
Line coverage: 90.3%
Branch coverage
90%
Covered branches: 310
Total branches: 344
Branch coverage: 90.1%
Method coverage
97%
Covered methods: 47
Fully covered methods: 25
Total methods: 48
Method coverage: 97.9%
Full method coverage: 52%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_IsServer()100%11100%
get_MinSegmentSize()100%11100%
get_PeerInitialStreamWindowSize()100%11100%
get_PeerMaxStreamFrameSize()100%11100%
get_Pool()100%11100%
get_InitialStreamWindowSize()100%11100%
get_PauseWriterThreshold()100%11100%
get_StreamWindowUpdateThreshold()100%11100%
.ctor(...)100%44100%
AcceptStreamAsync()100%66100%
ConnectAsync(...)75%4484.61%
PerformConnectAsync()100%262696.19%
DecodeInitialize()75%4489.47%
DecodeInitializeAckOrVersion()66.66%6690%
ReadFrameAsync()100%88100%
CloseAsync()92.85%141495.45%
CreateStreamAsync()100%141497.61%
DisposeAsync()100%22100%
PerformDisposeAsync()93.75%161690.69%
SendPingAsync()100%1144.44%
SendReadPing()50%22100%
SendWritePing()0%620%
FillBufferWriterAsync(...)100%11100%
ReleaseStream(...)100%88100%
ThrowIfClosed()100%22100%
WriteConnectionFrameAsync()50%2284.61%
WriteStreamFrame(...)50%22100%
WriteStreamFrameAsync()83.33%6685.29%
WriteStreamDataFrameAsync()94.44%363693.18%
EncodeStreamFrameHeader()100%22100%
AddStream(...)100%66100%
DecodeParameters(...)76.92%362675.4%
DecodeParamValue()100%1166.66%
EncodeParameters()83.33%66100%
EncodeParameter()100%11100%
IsUnknownStream(...)100%1212100%
ReadFrameAsync(...)95.65%232394.87%
ReadCloseFrameAsync()100%1313100%
ReadPingFrameAndWritePongFrameAsync()100%11100%
ReadPongFrameAsync()66.66%6686.66%
ReadStreamWindowUpdateFrameAsync()100%22100%
ReadFrameBodyAsync()100%44100%
ReadFrameHeaderAsync()83.33%6681.81%
TryDecodeHeader()81.25%181680.55%
ReadFramesAsync()83.33%6685.71%
ReadStreamDataFrameAsync()88.09%584279.16%
TryClose(...)100%66100%
WriteFrame(...)100%44100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicConnection.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using IceRpc.Transports.Internal;
 5using System.Buffers;
 6using System.Collections.Concurrent;
 7using System.Diagnostics;
 8using System.IO.Pipelines;
 9using System.Security.Authentication;
 10using System.Threading.Channels;
 11using ZeroC.Slice.Codec;
 12
 13namespace IceRpc.Transports.Slic.Internal;
 14
 15/// <summary>The Slic connection implements an <see cref="IMultiplexedConnection" /> on top of a <see
 16/// cref="IDuplexConnection" />.</summary>
 17internal class SlicConnection : IMultiplexedConnection
 18{
 19    /// <summary>Gets a value indicating whether or not this is the server-side of the connection.</summary>
 1720120    internal bool IsServer { get; }
 21
 22    /// <summary>Gets the minimum size of the segment requested from <see cref="Pool" />.</summary>
 568623    internal int MinSegmentSize { get; }
 24
 25    /// <summary>Gets the peer's initial stream window size. This property is set to the <see
 26    /// cref="ParameterKey.InitialStreamWindowSize"/> value carried by the <see cref="FrameType.Initialize" />
 27    /// frame.</summary>
 348028    internal int PeerInitialStreamWindowSize { get; private set; }
 29
 30    /// <summary>Gets the maximum size of stream frames accepted by the peer. This property is set to the <see
 31    /// cref="ParameterKey.MaxStreamFrameSize"/> value carried by the <see cref="FrameType.Initialize" />
 32    /// frame.</summary>
 988033    internal int PeerMaxStreamFrameSize { get; private set; }
 34
 35    /// <summary>Gets the <see cref="MemoryPool{T}" /> used for obtaining memory buffers.</summary>
 568636    internal MemoryPool<byte> Pool { get; }
 37
 38    /// <summary>Gets the initial stream window size.</summary>
 1092439    internal int InitialStreamWindowSize { get; }
 40
 41    /// <summary>Gets the pause writer threshold for the connection's outbound pipe.</summary>
 75342    internal int PauseWriterThreshold { get; }
 43
 44    /// <summary>Gets the window update threshold. When the window size is increased and this threshold reached, a <see
 45    /// cref="FrameType.StreamWindowUpdate" /> frame is sent.</summary>
 743046    internal int StreamWindowUpdateThreshold => InitialStreamWindowSize / StreamWindowUpdateRatio;
 47
 48    // The maximum body size for non-stream frames (Initialize, InitializeAck, Version, Close, Ping, Pong). This
 49    // value is the maximum value that can be encoded as a 2-byte varuint62, which allows WriteFrame to use a 2-byte
 50    // size placeholder. Stream data frames are not subject to this limit; they are gated by per-stream flow control.
 51    private const int MaxControlFrameBodySize = 16_383;
 52
 53    // The ratio used to compute the StreamWindowUpdateThreshold. For now, the stream window update is sent when the
 54    // window size grows over InitialStreamWindowSize / StreamWindowUpdateRatio.
 55    private const int StreamWindowUpdateRatio = 2;
 56
 57    private readonly Channel<IMultiplexedStream> _acceptStreamChannel;
 58    private int _bidirectionalStreamCount;
 59    private SemaphoreSlim? _bidirectionalStreamSemaphore;
 60    private readonly CancellationToken _closedCancellationToken;
 75361    private readonly CancellationTokenSource _closedCts = new();
 62    private string? _closedMessage;
 63    private Task<TransportConnectionInformation>? _connectTask;
 75364    private readonly CancellationTokenSource _disposedCts = new();
 65    private Task? _disposeTask;
 66    private readonly SlicDuplexConnectionDecorator _duplexConnection;
 67    private readonly DuplexConnectionReader _duplexConnectionReader;
 68    private readonly SlicDuplexConnectionWriter _duplexConnectionWriter;
 69
 70    // Invariant: _isClosed only ever transitions false -> true (under _mutex, by TryClose). Every writer site
 71    // (WriteConnectionFrameAsync, WriteStreamFrame, WriteStreamDataFrameAsync, CloseAsync) re-checks _isClosed under
 72    // _mutex *after* acquiring _writeSemaphore, so it bails out before issuing any new Write/WriteFrame on
 73    // _duplexConnectionWriter once _isClosed has been observed true.
 74    private bool _isClosed;
 75    private ulong? _lastRemoteBidirectionalStreamId;
 76    private ulong? _lastRemoteUnidirectionalStreamId;
 77    private readonly TimeSpan _localIdleTimeout;
 78    private readonly int _maxBidirectionalStreams;
 79    private readonly int _maxStreamFrameSize;
 80    private readonly int _maxUnidirectionalStreams;
 81    // _mutex ensure the assignment of _lastRemoteXxx members and the addition of the stream to _streams is
 82    // an atomic operation.
 75383    private readonly Lock _mutex = new();
 84    private ulong _nextBidirectionalId;
 85    private ulong _nextUnidirectionalId;
 86    private IceRpcError? _peerCloseError;
 75387    private TimeSpan _peerIdleTimeout = Timeout.InfiniteTimeSpan;
 88    private int _pendingPongCount;
 89    private Task? _readFramesTask;
 90
 75391    private readonly ConcurrentDictionary<ulong, SlicStream> _streams = new();
 92    private int _streamSemaphoreWaitCount;
 75393    private readonly TaskCompletionSource _streamSemaphoreWaitClosed =
 75394        new(TaskCreationOptions.RunContinuationsAsynchronously);
 95
 96    private int _unidirectionalStreamCount;
 97    private SemaphoreSlim? _unidirectionalStreamSemaphore;
 98
 99    // Serializes writes to _duplexConnectionWriter so that frame bytes are appended to the outbound pipe in order and
 100    // the pipe's pauseWriterThreshold is observed strictly. This async lock is held across the FlushAsync call, so a
 101    // single parked flush blocks all other connection writers until the background writer task drains enough data.
 102    // Not disposed: background fire-and-forget writes (e.g. StreamWindowUpdate from sync code paths) may attempt to
 103    // acquire it after DisposeAsync, and we don't want to have to handle ObjectDisposedException at every call site.
 104    // Skipping Dispose is harmless here because we never access SemaphoreSlim.AvailableWaitHandle, so no unmanaged
 105    // wait handle is ever allocated.
 106#pragma warning disable CA2213
 753107    private readonly SemaphoreSlim _writeSemaphore = new(1, 1);
 108#pragma warning restore CA2213
 109
 110    // This is only set for server connections to ensure that _duplexConnectionWriter.Write is not called after
 111    // _duplexConnectionWriter.Shutdown. This can occur if the client-side of the connection sends the close frame
 112    // followed by the shutdown of the duplex connection and if CloseAsync is called at the same time on the server
 113    // connection. Guarded by _writeSemaphore.
 114    private bool _writerIsShutdown;
 115
 116    public async ValueTask<IMultiplexedStream> AcceptStreamAsync(CancellationToken cancellationToken)
 2458117    {
 118        lock (_mutex)
 2458119        {
 2458120            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 121
 2457122            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 1123            {
 1124                throw new InvalidOperationException("Cannot accept stream before connecting the Slic connection.");
 125            }
 2456126            if (_isClosed)
 12127            {
 12128                throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 129            }
 2444130        }
 131
 132        try
 2444133        {
 2444134            return await _acceptStreamChannel.Reader.ReadAsync(cancellationToken).ConfigureAwait(false);
 135        }
 114136        catch (ChannelClosedException exception)
 114137        {
 114138            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 113139            Debug.Assert(exception.InnerException is not null);
 140            // The exception given to ChannelWriter.Complete(Exception? exception) is the InnerException.
 113141            throw ExceptionUtil.Throw(exception.InnerException);
 142        }
 2077143    }
 144
 145    public Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken)
 734146    {
 147        lock (_mutex)
 734148        {
 734149            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 150
 734151            if (_connectTask is not null)
 1152            {
 1153                throw new InvalidOperationException("Cannot connect twice a Slic connection.");
 154            }
 733155            if (_isClosed)
 0156            {
 0157                throw new InvalidOperationException("Cannot connect a closed Slic connection.");
 158            }
 733159            _connectTask = PerformConnectAsync();
 733160        }
 733161        return _connectTask;
 162
 163        async Task<TransportConnectionInformation> PerformConnectAsync()
 733164        {
 733165            await Task.Yield(); // Exit mutex lock
 166
 167            // Connect the duplex connection.
 168            TransportConnectionInformation transportConnectionInformation;
 733169            TimeSpan peerIdleTimeout = TimeSpan.MaxValue;
 170
 171            try
 733172            {
 733173                transportConnectionInformation = await _duplexConnection.ConnectAsync(cancellationToken)
 733174                    .ConfigureAwait(false);
 175
 176                // Initialize the Slic connection.
 710177                if (IsServer)
 360178                {
 179                    // Read the Initialize frame.
 360180                    (ulong version, InitializeBody? initializeBody) = await ReadFrameAsync(
 360181                        DecodeInitialize,
 360182                        cancellationToken).ConfigureAwait(false);
 183
 353184                    if (initializeBody is null)
 2185                    {
 186                        // Unsupported version, try to negotiate another version by sending a Version frame with the
 187                        // Slic versions supported by this server.
 2188                        ulong[] supportedVersions = new ulong[] { SlicDefinitions.V1 };
 189
 2190                        await WriteConnectionFrameAsync(
 2191                            FrameType.Version,
 2192                            new VersionBody(supportedVersions).Encode,
 2193                            cancellationToken).ConfigureAwait(false);
 194
 2195                        (version, initializeBody) = await ReadFrameAsync(
 2196                            (frameType, buffer) =>
 2197                            {
 2198                                if (frameType is null)
 1199                                {
 2200                                    // The client shut down the connection because it doesn't support any of the
 2201                                    // server's supported Slic versions.
 1202                                    throw new IceRpcException(
 1203                                        IceRpcError.ConnectionRefused,
 1204                                        $"The connection was refused because the client Slic version {version} is not su
 2205                                }
 2206                                else
 1207                                {
 1208                                    return DecodeInitialize(frameType, buffer);
 2209                                }
 1210                            },
 2211                            cancellationToken).ConfigureAwait(false);
 1212                    }
 213
 352214                    Debug.Assert(initializeBody is not null);
 215
 352216                    DecodeParameters(initializeBody.Value.Parameters);
 217
 218                    // Write back an InitializeAck frame.
 351219                    await WriteConnectionFrameAsync(
 351220                        FrameType.InitializeAck,
 351221                        new InitializeAckBody(EncodeParameters()).Encode,
 351222                        cancellationToken).ConfigureAwait(false);
 351223                }
 224                else
 350225                {
 226                    // Write the Initialize frame.
 350227                    await WriteConnectionFrameAsync(
 350228                        FrameType.Initialize,
 350229                        (ref SliceEncoder encoder) =>
 350230                        {
 350231                            encoder.EncodeVarUInt62(SlicDefinitions.V1);
 350232                            new InitializeBody(EncodeParameters()).Encode(ref encoder);
 350233                        },
 350234                        cancellationToken).ConfigureAwait(false);
 235
 236                    // Read and decode the InitializeAck or Version frame.
 350237                    (InitializeAckBody? initializeAckBody, VersionBody? versionBody) = await ReadFrameAsync(
 350238                        DecodeInitializeAckOrVersion,
 350239                        cancellationToken).ConfigureAwait(false);
 240
 327241                    Debug.Assert(initializeAckBody is not null || versionBody is not null);
 242
 327243                    if (initializeAckBody is not null)
 325244                    {
 325245                        DecodeParameters(initializeAckBody.Value.Parameters);
 325246                    }
 247
 327248                    if (versionBody is not null)
 2249                    {
 2250                        if (versionBody.Value.Versions.Contains(SlicDefinitions.V1))
 1251                        {
 1252                            throw new InvalidDataException(
 1253                                "The server supported versions include the version initially requested.");
 254                        }
 255                        else
 1256                        {
 257                            // We only support V1 and the peer rejected V1.
 1258                            throw new IceRpcException(
 1259                                IceRpcError.ConnectionRefused,
 1260                                $"The connection was refused because the server only supports Slic version(s) {string.Jo
 261                        }
 262                    }
 325263                }
 676264            }
 8265            catch (InvalidDataException exception)
 8266            {
 8267                throw new IceRpcException(
 8268                    IceRpcError.IceRpcError,
 8269                    "The connection was aborted by a Slic protocol error.",
 8270                    exception);
 271            }
 25272            catch (OperationCanceledException)
 25273            {
 25274                throw;
 275            }
 4276            catch (AuthenticationException)
 4277            {
 4278                throw;
 279            }
 20280            catch (IceRpcException)
 20281            {
 20282                throw;
 283            }
 0284            catch (Exception exception)
 0285            {
 0286                Debug.Fail($"ConnectAsync failed with an unexpected exception: {exception}");
 0287                throw;
 288            }
 289
 290            // Enable the idle timeout checks after the connection establishment. The Ping frames sent by the keep alive
 291            // check are not expected until the Slic connection initialization completes. The idle timeout check uses
 292            // the smallest idle timeout. Timeout.InfiniteTimeSpan is -1 ms so we can't compare it directly with
 293            // positive timeouts.
 294            TimeSpan idleTimeout;
 676295            if (_localIdleTimeout == Timeout.InfiniteTimeSpan)
 2296            {
 2297                idleTimeout = _peerIdleTimeout;
 2298            }
 674299            else if (_peerIdleTimeout == Timeout.InfiniteTimeSpan)
 23300            {
 23301                idleTimeout = _localIdleTimeout;
 23302            }
 303            else
 651304            {
 651305                idleTimeout = _peerIdleTimeout < _localIdleTimeout ? _peerIdleTimeout : _localIdleTimeout;
 651306            }
 307
 676308            if (idleTimeout != Timeout.InfiniteTimeSpan)
 674309            {
 674310                _duplexConnection.Enable(idleTimeout);
 674311            }
 312
 676313            _readFramesTask = ReadFramesAsync(_disposedCts.Token);
 314
 676315            return transportConnectionInformation;
 676316        }
 317
 318        static (ulong, InitializeBody?) DecodeInitialize(FrameType? frameType, ReadOnlySequence<byte> buffer)
 355319        {
 355320            if (frameType != FrameType.Initialize)
 0321            {
 0322                throw new InvalidDataException($"Received unexpected {frameType} frame.");
 323            }
 324
 355325            return buffer.DecodeSliceBuffer<(ulong, InitializeBody?)>(
 355326                (ref SliceDecoder decoder) =>
 355327                {
 355328                    ulong version = decoder.DecodeVarUInt62();
 354329                    if (version == SlicDefinitions.V1)
 352330                    {
 352331                        return (version, new InitializeBody(ref decoder));
 355332                    }
 355333                    else
 2334                    {
 2335                        decoder.Skip((int)(buffer.Length - decoder.Consumed));
 2336                        return (version, null);
 355337                    }
 709338                });
 354339        }
 340
 341        static (InitializeAckBody?, VersionBody?) DecodeInitializeAckOrVersion(
 342            FrameType? frameType,
 343            ReadOnlySequence<byte> buffer) =>
 329344            frameType switch
 329345            {
 326346                FrameType.InitializeAck => (
 326347                    buffer.DecodeSliceBuffer((ref SliceDecoder decoder) => new InitializeAckBody(ref decoder)),
 326348                    null),
 3349                FrameType.Version => (
 3350                    null,
 6351                    buffer.DecodeSliceBuffer((ref SliceDecoder decoder) => new VersionBody(ref decoder))),
 0352                _ => throw new InvalidDataException($"Received unexpected Slic frame: '{frameType}'."),
 329353            };
 354
 355        async ValueTask<T> ReadFrameAsync<T>(
 356            Func<FrameType?, ReadOnlySequence<byte>, T> decodeFunc,
 357            CancellationToken cancellationToken)
 712358        {
 712359            (FrameType FrameType, int FrameSize, ulong?)? header =
 712360                await ReadFrameHeaderAsync(cancellationToken).ConfigureAwait(false);
 361
 362            ReadOnlySequence<byte> buffer;
 685363            if (header is null || header.Value.FrameSize == 0)
 4364            {
 4365                buffer = ReadOnlySequence<byte>.Empty;
 4366            }
 367            else
 681368            {
 681369                buffer = await _duplexConnectionReader.ReadAtLeastAsync(
 681370                    header.Value.FrameSize,
 681371                    cancellationToken).ConfigureAwait(false);
 681372                if (buffer.Length > header.Value.FrameSize)
 7373                {
 7374                    buffer = buffer.Slice(0, header.Value.FrameSize);
 7375                }
 681376            }
 377
 685378            T decodedFrame = decodeFunc(header?.FrameType, buffer);
 681379            _duplexConnectionReader.AdvanceTo(buffer.End);
 681380            return decodedFrame;
 681381        }
 733382    }
 383
 384    public async Task CloseAsync(MultiplexedConnectionCloseError closeError, CancellationToken cancellationToken)
 109385    {
 386        lock (_mutex)
 109387        {
 109388            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 389
 109390            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 1391            {
 1392                throw new InvalidOperationException("Cannot close a Slic connection before connecting it.");
 393            }
 108394        }
 395
 108396        bool waitForWriterShutdown = false;
 108397        if (TryClose(new IceRpcException(IceRpcError.OperationAborted), "The connection was closed."))
 93398        {
 93399            using (await _writeSemaphore.AcquireAsync(cancellationToken).ConfigureAwait(false))
 93400            {
 93401                if (IsServer && _writerIsShutdown)
 0402                {
 403                    // ReadFramesAsync already shut down the writer because the client-side sent its Close frame and
 404                    // shut down the duplex connection. Nothing more to send. The client-side is unaffected: it never
 405                    // shuts down the writer from ReadFramesAsync.
 0406                }
 407                else
 93408                {
 93409                    WriteFrame(FrameType.Close, streamId: null, new CloseBody((ulong)closeError).Encode);
 93410                    if (IsServer)
 47411                    {
 412                        // Link with _disposedCts so a concurrent DisposeAsync can break out of a flush parked on
 413                        // PauseWriterThreshold. Without the link, server CloseAsync(None) would deadlock with
 414                        // DisposeAsync: CloseAsync holds _writeSemaphore across this flush while DisposeAsync waits to
 415                        // acquire it before disposing the writer (which is what would otherwise unblock the flush).
 47416                        using var flushCts = CancellationTokenSource.CreateLinkedTokenSource(
 47417                            cancellationToken,
 47418                            _disposedCts.Token);
 419                        try
 47420                        {
 47421                            await _duplexConnectionWriter.FlushAsync(flushCts.Token).ConfigureAwait(false);
 46422                        }
 1423                        catch (OperationCanceledException) when (
 1424                            _disposedCts.IsCancellationRequested && !cancellationToken.IsCancellationRequested)
 1425                        {
 1426                            throw new IceRpcException(IceRpcError.OperationAborted, "The connection was disposed.");
 427                        }
 46428                    }
 429                    else
 46430                    {
 431                        // The sending of the client-side Close frame is followed by the shutdown of the duplex
 432                        // connection. For TCP, it's important to always shutdown the connection on the client-side firs
 433                        // to avoid TIME_WAIT states on the server-side.
 46434                        _duplexConnectionWriter.Shutdown();
 46435                        waitForWriterShutdown = true;
 46436                    }
 92437                }
 92438            }
 92439        }
 440
 107441        if (waitForWriterShutdown)
 46442        {
 46443            await _duplexConnectionWriter.WriterTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 46444        }
 445
 446        // Now, wait for the peer to close the write side of the connection, which will terminate the read frames task.
 107447        Debug.Assert(_readFramesTask is not null);
 107448        await _readFramesTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 104449    }
 450
 451    public async ValueTask<IMultiplexedStream> CreateStreamAsync(
 452        bool bidirectional,
 453        CancellationToken cancellationToken)
 2139454    {
 455        lock (_mutex)
 2139456        {
 2139457            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 458
 2136459            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 2460            {
 2461                throw new InvalidOperationException("Cannot create stream before connecting the Slic connection.");
 462            }
 2134463            if (_isClosed)
 6464            {
 6465                throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 466            }
 467
 2128468            ++_streamSemaphoreWaitCount;
 2128469        }
 470
 471        try
 2128472        {
 2128473            using var createStreamCts = CancellationTokenSource.CreateLinkedTokenSource(
 2128474                _closedCancellationToken,
 2128475                cancellationToken);
 476
 2128477            SemaphoreSlim? streamCountSemaphore = bidirectional ?
 2128478                _bidirectionalStreamSemaphore :
 2128479                _unidirectionalStreamSemaphore;
 480
 2128481            if (streamCountSemaphore is null)
 1482            {
 483                // The stream semaphore is null if the peer's max streams configuration is 0. In this case, we let
 484                // CreateStreamAsync hang indefinitely until the connection is closed.
 1485                await Task.Delay(-1, createStreamCts.Token).ConfigureAwait(false);
 0486            }
 487            else
 2127488            {
 2127489                await streamCountSemaphore.WaitAsync(createStreamCts.Token).ConfigureAwait(false);
 2118490            }
 491
 2118492            return new SlicStream(this, bidirectional, isRemote: false);
 493        }
 10494        catch (OperationCanceledException)
 10495        {
 10496            cancellationToken.ThrowIfCancellationRequested();
 7497            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 6498            Debug.Assert(_isClosed);
 6499            throw new IceRpcException(_peerCloseError ?? IceRpcError.OperationAborted, _closedMessage);
 500        }
 501        finally
 2128502        {
 503            lock (_mutex)
 2128504            {
 2128505                --_streamSemaphoreWaitCount;
 2128506                if (_isClosed && _streamSemaphoreWaitCount == 0)
 7507                {
 7508                    _streamSemaphoreWaitClosed.SetResult();
 7509                }
 2128510            }
 2128511        }
 2118512    }
 513
 514    public ValueTask DisposeAsync()
 1031515    {
 516        lock (_mutex)
 1031517        {
 1031518            _disposeTask ??= PerformDisposeAsync();
 1031519        }
 1031520        return new(_disposeTask);
 521
 522        async Task PerformDisposeAsync()
 752523        {
 524            // Make sure we execute the code below without holding the mutex lock.
 752525            await Task.Yield();
 752526            TryClose(new IceRpcException(IceRpcError.OperationAborted), "The connection was disposed.");
 527
 752528            _disposedCts.Cancel();
 529
 530            try
 752531            {
 752532                await Task.WhenAll(
 752533                    _connectTask ?? Task.CompletedTask,
 752534                    _readFramesTask ?? Task.CompletedTask,
 752535                    _streamSemaphoreWaitClosed.Task).ConfigureAwait(false);
 418536            }
 334537            catch
 334538            {
 539                // Expected if any of these tasks failed or was canceled. Each task takes care of handling unexpected
 540                // exceptions so there's no need to handle them here.
 334541            }
 542
 543            // Clean-up the streams that might still be queued on the channel.
 777544            while (_acceptStreamChannel.Reader.TryRead(out IMultiplexedStream? stream))
 25545            {
 25546                if (stream.IsBidirectional)
 5547                {
 5548                    stream.Output.Complete();
 5549                    stream.Input.Complete();
 5550                }
 20551                else if (stream.IsRemote)
 20552                {
 20553                    stream.Input.Complete();
 20554                }
 555                else
 0556                {
 0557                    stream.Output.Complete();
 0558                }
 25559            }
 560
 561            try
 752562            {
 563                // Prevents unobserved task exceptions.
 752564                await _acceptStreamChannel.Reader.Completion.ConfigureAwait(false);
 0565            }
 752566            catch
 752567            {
 752568            }
 569
 570            // Acquire (and never release) the write semaphore so no in-flight writer (e.g. a stream frame parked on
 571            // FlushAsync due to PauseWriterThreshold) can race with the writer disposal below. The wait is bounded:
 572            // every writer site uses a cancellation token derived from _closedCancellationToken or _disposedCts.Token,
 573            // both of which are cancelled by the time we reach this point.
 752574            await _writeSemaphore.WaitAsync(CancellationToken.None).ConfigureAwait(false);
 575
 752576            await _duplexConnectionWriter.DisposeAsync().ConfigureAwait(false);
 752577            _duplexConnectionReader.Dispose();
 752578            _duplexConnection.Dispose();
 579
 752580            _disposedCts.Dispose();
 752581            _bidirectionalStreamSemaphore?.Dispose();
 752582            _unidirectionalStreamSemaphore?.Dispose();
 752583            _closedCts.Dispose();
 752584        }
 1031585    }
 586
 753587    internal SlicConnection(
 753588        IDuplexConnection duplexConnection,
 753589        MultiplexedConnectionOptions options,
 753590        SlicTransportOptions slicOptions,
 753591        bool isServer)
 753592    {
 753593        IsServer = isServer;
 594
 753595        Pool = options.Pool;
 753596        MinSegmentSize = options.MinSegmentSize;
 753597        _maxBidirectionalStreams = options.MaxBidirectionalStreams;
 753598        _maxUnidirectionalStreams = options.MaxUnidirectionalStreams;
 599
 753600        InitialStreamWindowSize = slicOptions.InitialStreamWindowSize;
 753601        PauseWriterThreshold = slicOptions.PauseWriterThreshold;
 753602        _localIdleTimeout = slicOptions.IdleTimeout;
 753603        _maxStreamFrameSize = slicOptions.MaxStreamFrameSize;
 604
 753605        _acceptStreamChannel = Channel.CreateUnbounded<IMultiplexedStream>(new UnboundedChannelOptions
 753606        {
 753607            SingleReader = true,
 753608            SingleWriter = true
 753609        });
 610
 753611        _closedCancellationToken = _closedCts.Token;
 612
 613        // Only the client-side sends pings to keep the connection alive when idle timeout (set later) is not infinite.
 753614        _duplexConnection = IsServer ?
 753615            new SlicDuplexConnectionDecorator(duplexConnection) :
 753616            new SlicDuplexConnectionDecorator(duplexConnection, SendReadPing, SendWritePing);
 617
 753618        _duplexConnectionReader = new DuplexConnectionReader(_duplexConnection, options.Pool, options.MinSegmentSize);
 753619        _duplexConnectionWriter = new SlicDuplexConnectionWriter(
 753620            _duplexConnection,
 753621            options.Pool,
 753622            options.MinSegmentSize,
 753623            PauseWriterThreshold);
 624
 625        // We use the same stream ID numbering scheme as QUIC.
 753626        if (IsServer)
 375627        {
 375628            _nextBidirectionalId = 1;
 375629            _nextUnidirectionalId = 3;
 375630        }
 631        else
 378632        {
 378633            _nextBidirectionalId = 0;
 378634            _nextUnidirectionalId = 2;
 378635        }
 636
 637        async Task SendPingAsync(long payload)
 14638        {
 639            try
 14640            {
 14641                await WriteConnectionFrameAsync(
 14642                    FrameType.Ping,
 14643                    new PingBody(payload).Encode,
 14644                    _closedCancellationToken).ConfigureAwait(false);
 14645            }
 0646            catch (IceRpcException)
 0647            {
 648                // Expected if the connection is closed.
 0649            }
 0650            catch (OperationCanceledException)
 0651            {
 652                // Expected if the connection is closed.
 0653            }
 0654            catch (Exception exception)
 0655            {
 0656                Debug.Fail($"The sending of a Ping frame failed with an unexpected exception: {exception}");
 0657            }
 14658        }
 659
 660        void SendReadPing()
 14661        {
 662            // No-op if there is already a pending Pong.
 14663            if (Interlocked.CompareExchange(ref _pendingPongCount, 1, 0) == 0)
 14664            {
 665                // Timer callbacks cannot await; fire-and-forget. SendPingAsync swallows expected exceptions and
 666                // Debug.Fails on unexpected ones, so the unobserved task carries no exception.
 14667                _ = SendPingAsync(1L);
 14668            }
 14669        }
 670
 671        void SendWritePing()
 0672        {
 673            // _pendingPongCount can be <= 0 if an unexpected pong is received. If it's the case, the connection is
 674            // being torn down and there's no point in sending a ping frame.
 0675            if (Interlocked.Increment(ref _pendingPongCount) > 0)
 0676            {
 0677                _ = SendPingAsync(0L);
 0678            }
 0679        }
 753680    }
 681
 682    /// <summary>Fills the given writer with stream data received on the connection.</summary>
 683    /// <param name="bufferWriter">The destination buffer writer.</param>
 684    /// <param name="byteCount">The amount of stream data to read.</param>
 685    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 686    internal ValueTask FillBufferWriterAsync(
 687        IBufferWriter<byte> bufferWriter,
 688        int byteCount,
 689        CancellationToken cancellationToken) =>
 8684690        _duplexConnectionReader.FillBufferWriterAsync(bufferWriter, byteCount, cancellationToken);
 691
 692    /// <summary>Releases a stream from the connection. The connection stream count is decremented and if this is a
 693    /// client allow a new stream to be started.</summary>
 694    /// <param name="stream">The released stream.</param>
 695    internal void ReleaseStream(SlicStream stream)
 4205696    {
 4205697        Debug.Assert(stream.IsStarted);
 698
 4205699        _streams.Remove(stream.Id, out SlicStream? _);
 700
 4205701        if (stream.IsRemote)
 2102702        {
 2102703            if (stream.IsBidirectional)
 684704            {
 684705                Interlocked.Decrement(ref _bidirectionalStreamCount);
 684706            }
 707            else
 1418708            {
 1418709                Interlocked.Decrement(ref _unidirectionalStreamCount);
 1418710            }
 2102711        }
 2103712        else if (!_isClosed)
 1719713        {
 1719714            if (stream.IsBidirectional)
 620715            {
 620716                _bidirectionalStreamSemaphore!.Release();
 620717            }
 718            else
 1099719            {
 1099720                _unidirectionalStreamSemaphore!.Release();
 1099721            }
 1719722        }
 4205723    }
 724
 725    /// <summary>Throws the connection closure exception if the connection is already closed.</summary>
 726    internal void ThrowIfClosed()
 7968727    {
 728        lock (_mutex)
 7968729        {
 7968730            if (_isClosed)
 8731            {
 8732                throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 733            }
 7960734        }
 7960735    }
 736
 737    /// <summary>Writes a connection frame.</summary>
 738    /// <param name="frameType">The frame type.</param>
 739    /// <param name="encode">The action to encode the frame.</param>
 740    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 741    internal async ValueTask WriteConnectionFrameAsync(
 742        FrameType frameType,
 743        EncodeAction? encode,
 744        CancellationToken cancellationToken)
 731745    {
 731746        Debug.Assert(frameType < FrameType.Stream);
 747
 731748        using (await _writeSemaphore.AcquireAsync(cancellationToken).ConfigureAwait(false))
 731749        {
 750            lock (_mutex)
 731751            {
 731752                if (_isClosed)
 0753                {
 0754                    throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 755                }
 731756            }
 731757            WriteFrame(frameType, streamId: null, encode);
 731758            await _duplexConnectionWriter.FlushAsync(cancellationToken).ConfigureAwait(false);
 731759        }
 731760    }
 761
 762    /// <summary>Writes a stream frame as a fire-and-forget operation. Used by sync code paths (e.g.
 763    /// <see cref="SlicPipeReader.Complete"/>, <see cref="SlicPipeWriter.Complete"/>, window updates) that cannot
 764    /// await.</summary>
 765    /// <param name="stream">The stream to write the frame for.</param>
 766    /// <param name="frameType">The frame type.</param>
 767    /// <param name="encode">The action to encode the frame.</param>
 768    /// <param name="writeReadsClosedFrame"><see langword="true" /> if a <see cref="FrameType.StreamReadsClosed" />
 769    /// frame should be written after the stream frame.</param>
 770    /// <remarks>This method is called by streams and might be called on a closed connection. The connection might
 771    /// also be closed concurrently while it's in progress.</remarks>
 772    internal void WriteStreamFrame(
 773        SlicStream stream,
 774        FrameType frameType,
 775        EncodeAction? encode,
 776        bool writeReadsClosedFrame)
 3243777    {
 778        // Ensure that this method is called for any FrameType.StreamXxx frame type except FrameType.Stream.
 3243779        Debug.Assert(frameType >= FrameType.StreamLast && stream.IsStarted);
 780
 781        // SemaphoreSlim.WaitAsync atomically updates the semaphore state (acquires it or enqueues the waiter)
 782        // synchronously, before the await can yield. Two sequential calls from the same thread therefore enqueue
 783        // in call order, preserving wire ordering of the resulting frames.
 3243784        _ = WriteStreamFrameAsync();
 785
 786        async Task WriteStreamFrameAsync()
 3243787        {
 788            SemaphoreLock semaphoreLock;
 789            try
 3243790            {
 3243791                semaphoreLock = await _writeSemaphore.AcquireAsync(_closedCancellationToken).ConfigureAwait(false);
 3242792            }
 1793            catch (OperationCanceledException)
 1794            {
 795                // The connection was closed while waiting for the semaphore.
 1796                return;
 797            }
 798
 3242799            using (semaphoreLock)
 3242800            {
 801                lock (_mutex)
 3242802                {
 3242803                    if (_isClosed)
 0804                    {
 0805                        return;
 806                    }
 3242807                }
 808
 3242809                WriteFrame(frameType, stream.Id, encode);
 3242810                if (writeReadsClosedFrame)
 105811                {
 105812                    WriteFrame(FrameType.StreamReadsClosed, stream.Id, encode: null);
 105813                }
 3242814                if (frameType == FrameType.StreamLast)
 594815                {
 816                    // Notify the stream that the last stream frame is considered sent at this point. This will
 817                    // close writes on the stream and allow the stream to be released if reads are also closed.
 594818                    stream.WroteLastStreamFrame();
 594819                }
 820
 821                try
 3242822                {
 3242823                    await _duplexConnectionWriter.FlushAsync(_closedCancellationToken).ConfigureAwait(false);
 3241824                }
 1825                catch (OperationCanceledException)
 1826                {
 827                    // The connection was closed while flushing.
 1828                }
 0829                catch (InvalidOperationException)
 0830                {
 831                    // The pipe writer was completed (Shutdown called) â€” connection is going away.
 0832                }
 3242833            }
 3243834        }
 3243835    }
 836
 837    /// <summary>Writes a stream data frame.</summary>
 838    /// <param name="stream">The stream to write the frame for.</param>
 839    /// <param name="source1">The first stream frame data source.</param>
 840    /// <param name="source2">The second stream frame data source.</param>
 841    /// <param name="endStream"><see langword="true" /> to write a <see cref="FrameType.StreamLast" /> frame and
 842    /// <see langword="false" /> to write a <see cref="FrameType.Stream" /> frame.</param>
 843    /// <param name="writeReadsClosedFrame"><see langword="true" /> if a <see cref="FrameType.StreamReadsClosed" />
 844    /// frame should be written after the stream frame.</param>
 845    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 846    /// <remarks>This method is called by streams and might be called on a closed connection. The connection might
 847    /// also be closed concurrently while it's in progress.</remarks>
 848    internal async ValueTask<FlushResult> WriteStreamDataFrameAsync(
 849        SlicStream stream,
 850        ReadOnlySequence<byte> source1,
 851        ReadOnlySequence<byte> source2,
 852        bool endStream,
 853        bool writeReadsClosedFrame,
 854        CancellationToken cancellationToken)
 7942855    {
 7942856        Debug.Assert(!source1.IsEmpty || endStream);
 857
 7942858        if (_connectTask is null)
 0859        {
 0860            throw new InvalidOperationException("Cannot send a stream frame before calling ConnectAsync.");
 861        }
 862
 7942863        using var writeCts = CancellationTokenSource.CreateLinkedTokenSource(
 7942864            _closedCancellationToken,
 7942865            cancellationToken);
 866
 867        try
 7942868        {
 869            do
 9228870            {
 871                // Next, ensure send credit is available. If not, this will block until the receiver allows sending
 872                // additional data.
 9228873                int sendCredit = 0;
 9228874                if (!source1.IsEmpty || !source2.IsEmpty)
 9223875                {
 9223876                    sendCredit = await stream.AcquireSendCreditAsync(writeCts.Token).ConfigureAwait(false);
 9197877                    Debug.Assert(sendCredit > 0);
 9197878                }
 879
 880                // Gather data from source1 or source2 up to sendCredit bytes or the peer maximum stream frame size.
 9202881                int sendMaxSize = Math.Min(sendCredit, PeerMaxStreamFrameSize);
 882                ReadOnlySequence<byte> sendSource1;
 883                ReadOnlySequence<byte> sendSource2;
 9202884                if (!source1.IsEmpty)
 8211885                {
 8211886                    int length = Math.Min((int)source1.Length, sendMaxSize);
 8211887                    sendSource1 = source1.Slice(0, length);
 8211888                    source1 = source1.Slice(length);
 8211889                }
 890                else
 991891                {
 991892                    sendSource1 = ReadOnlySequence<byte>.Empty;
 991893                }
 894
 9202895                if (source1.IsEmpty && !source2.IsEmpty)
 2045896                {
 2045897                    int length = Math.Min((int)source2.Length, sendMaxSize - (int)sendSource1.Length);
 2045898                    sendSource2 = source2.Slice(0, length);
 2045899                    source2 = source2.Slice(length);
 2045900                }
 901                else
 7157902                {
 7157903                    sendSource2 = ReadOnlySequence<byte>.Empty;
 7157904                }
 905
 906                // If there's no data left to send and endStream is true, it's the last stream frame.
 9202907                bool lastStreamFrame = endStream && source1.IsEmpty && source2.IsEmpty;
 908
 9202909                using (await _writeSemaphore.AcquireAsync(writeCts.Token).ConfigureAwait(false))
 8219910                {
 911                    lock (_mutex)
 8219912                    {
 8219913                        if (_isClosed)
 0914                        {
 0915                            throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 916                        }
 8219917                    }
 918
 8219919                    if (!stream.IsStarted)
 2103920                    {
 2103921                        if (stream.IsBidirectional)
 682922                        {
 682923                            AddStream(_nextBidirectionalId, stream);
 682924                            _nextBidirectionalId += 4;
 682925                        }
 926                        else
 1421927                        {
 1421928                            AddStream(_nextUnidirectionalId, stream);
 1421929                            _nextUnidirectionalId += 4;
 1421930                        }
 2103931                    }
 932
 933                    // Notify the stream that we're consuming sendSize credit. It's important to call this before
 934                    // sending the stream frame to avoid race conditions where the StreamWindowUpdate frame could
 935                    // be received before the send credit was updated.
 8219936                    if (sendCredit > 0)
 8214937                    {
 8214938                        stream.ConsumedSendCredit((int)(sendSource1.Length + sendSource2.Length));
 8214939                    }
 940
 8219941                    EncodeStreamFrameHeader(stream.Id, sendSource1.Length + sendSource2.Length, lastStreamFrame);
 942
 8219943                    if (lastStreamFrame)
 799944                    {
 945                        // Notify the stream that the last stream frame is considered sent at this point. This
 946                        // will complete writes on the stream and allow the stream to be released if reads are
 947                        // also completed.
 799948                        stream.WroteLastStreamFrame();
 799949                    }
 950
 951                    // Write the stream frame.
 8219952                    if (!sendSource1.IsEmpty)
 8211953                    {
 8211954                        _duplexConnectionWriter.Write(sendSource1);
 8211955                    }
 8219956                    if (!sendSource2.IsEmpty)
 1062957                    {
 1062958                        _duplexConnectionWriter.Write(sendSource2);
 1062959                    }
 960
 8219961                    if (writeReadsClosedFrame)
 379962                    {
 379963                        WriteFrame(FrameType.StreamReadsClosed, stream.Id, encode: null);
 379964                    }
 965
 966                    // Flush the stream frame. This may block if the outbound pipe's pauseWriterThreshold has been
 967                    // reached â€” the connection's write semaphore is held during the await, so all other connection
 968                    // writers wait until the background writer task drains enough data.
 8219969                    await _duplexConnectionWriter.FlushAsync(writeCts.Token).ConfigureAwait(false);
 8214970                }
 8214971            }
 8214972            while (!source1.IsEmpty || !source2.IsEmpty); // Loop until there's no data left to send.
 6928973        }
 1014974        catch (OperationCanceledException)
 1014975        {
 1014976            cancellationToken.ThrowIfCancellationRequested();
 977
 0978            Debug.Assert(_isClosed);
 0979            throw new IceRpcException(_peerCloseError ?? IceRpcError.OperationAborted, _closedMessage);
 980        }
 981
 6928982        return new FlushResult(isCanceled: false, isCompleted: false);
 983
 984        void EncodeStreamFrameHeader(ulong streamId, long size, bool lastStreamFrame)
 8219985        {
 8219986            var encoder = new SliceEncoder(_duplexConnectionWriter);
 8219987            encoder.EncodeFrameType(!lastStreamFrame ? FrameType.Stream : FrameType.StreamLast);
 8219988            Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(4);
 8219989            int startPos = encoder.EncodedByteCount;
 8219990            encoder.EncodeVarUInt62(streamId);
 8219991            SliceEncoder.EncodeVarUInt62((ulong)(encoder.EncodedByteCount - startPos + size), sizePlaceholder);
 8219992        }
 6928993    }
 994
 995    private void AddStream(ulong id, SlicStream stream)
 4206996    {
 997        lock (_mutex)
 4206998        {
 4206999            if (_isClosed)
 11000            {
 11001                throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 1002            }
 1003
 42051004            _streams[id] = stream;
 1005
 1006            // Assign the stream ID within the mutex to ensure that the addition of the stream to the connection and the
 1007            // stream ID assignment are atomic.
 42051008            stream.Id = id;
 1009
 1010            // Keep track of the last assigned stream ID. This is used to figure out if the stream is known or unknown.
 42051011            if (stream.IsRemote)
 21021012            {
 21021013                if (stream.IsBidirectional)
 6841014                {
 6841015                    _lastRemoteBidirectionalStreamId = id;
 6841016                }
 1017                else
 14181018                {
 14181019                    _lastRemoteUnidirectionalStreamId = id;
 14181020                }
 21021021            }
 42051022        }
 42051023    }
 1024
 1025    private void DecodeParameters(IDictionary<ParameterKey, IList<byte>> parameters)
 6771026    {
 6771027        int? maxStreamFrameSize = null;
 6771028        int? peerInitialStreamWindowSize = null;
 85341029        foreach ((ParameterKey key, IList<byte> buffer) in parameters)
 32521030        {
 32521031            switch (key)
 1032            {
 1033                case ParameterKey.MaxBidirectionalStreams:
 5951034                {
 5951035                    int value = DecodeParamValue(buffer);
 5951036                    if (value > 0)
 5951037                    {
 5951038                        _bidirectionalStreamSemaphore = new SemaphoreSlim(value, value);
 5951039                    }
 5951040                    break;
 1041                }
 1042                case ParameterKey.MaxUnidirectionalStreams:
 6531043                {
 6531044                    int value = DecodeParamValue(buffer);
 6531045                    if (value > 0)
 6531046                    {
 6531047                        _unidirectionalStreamSemaphore = new SemaphoreSlim(value, value);
 6531048                    }
 6531049                    break;
 1050                }
 1051                case ParameterKey.IdleTimeout:
 6511052                {
 6511053                    _peerIdleTimeout = TimeSpan.FromMilliseconds(DecodeParamValue(buffer));
 6511054                    if (_peerIdleTimeout == TimeSpan.Zero)
 01055                    {
 01056                        throw new InvalidDataException(
 01057                            "The IdleTimeout Slic connection parameter is invalid, it must be greater than 0 s.");
 1058                    }
 6511059                    break;
 1060                }
 1061                case ParameterKey.MaxStreamFrameSize:
 6771062                {
 6771063                    maxStreamFrameSize = DecodeParamValue(buffer);
 6771064                    if (maxStreamFrameSize < 1024)
 01065                    {
 01066                        throw new InvalidDataException(
 01067                            "The MaxStreamFrameSize connection parameter is invalid, it must be at least 1 KB.");
 1068                    }
 6771069                    if (maxStreamFrameSize > SlicTransportOptions.MaxStreamFrameSizeCeiling)
 11070                    {
 11071                        throw new InvalidDataException(
 11072                            $"The MaxStreamFrameSize connection parameter is invalid, it cannot exceed {SlicTransportOpt
 1073                    }
 6761074                    break;
 1075                }
 1076                case ParameterKey.InitialStreamWindowSize:
 6761077                {
 6761078                    peerInitialStreamWindowSize = DecodeParamValue(buffer);
 6761079                    if (peerInitialStreamWindowSize < 1024)
 01080                    {
 01081                        throw new InvalidDataException(
 01082                            "The InitialStreamWindowSize connection parameter is invalid, it must be at least 1 KB.");
 1083                    }
 6761084                    break;
 1085                }
 1086                // Ignore unsupported parameter.
 1087            }
 32511088        }
 1089
 6761090        if (maxStreamFrameSize is null)
 01091        {
 01092            throw new InvalidDataException(
 01093                "The peer didn't send the required MaxStreamFrameSize connection parameter.");
 1094        }
 1095        else
 6761096        {
 6761097            PeerMaxStreamFrameSize = maxStreamFrameSize.Value;
 6761098        }
 1099
 6761100        if (peerInitialStreamWindowSize is null)
 01101        {
 01102            throw new InvalidDataException(
 01103                "The peer didn't send the required InitialStreamWindowSize connection parameter.");
 1104        }
 1105        else
 6761106        {
 6761107            PeerInitialStreamWindowSize = peerInitialStreamWindowSize.Value;
 6761108        }
 1109
 1110        // all parameter values are currently integers in the range 0..Int32Max encoded as varuint62.
 1111        static int DecodeParamValue(IList<byte> buffer)
 32521112        {
 1113            // The IList<byte> decoded by the IceRPC + Slice integration is backed by an array
 32521114            ulong value = new ReadOnlySequence<byte>((byte[])buffer).DecodeSliceBuffer(
 65041115                (ref SliceDecoder decoder) => decoder.DecodeVarUInt62());
 1116            try
 32521117            {
 32521118                return checked((int)value);
 1119            }
 01120            catch (OverflowException exception)
 01121            {
 01122                throw new InvalidDataException("The value is out of the varuint32 accepted range.", exception);
 1123            }
 32521124        }
 6761125    }
 1126
 1127    private Dictionary<ParameterKey, IList<byte>> EncodeParameters()
 7011128    {
 7011129        var parameters = new List<KeyValuePair<ParameterKey, IList<byte>>>
 7011130        {
 7011131            // Required parameters.
 7011132            EncodeParameter(ParameterKey.MaxStreamFrameSize, (ulong)_maxStreamFrameSize),
 7011133            EncodeParameter(ParameterKey.InitialStreamWindowSize, (ulong)InitialStreamWindowSize)
 7011134        };
 1135
 1136        // Optional parameters.
 7011137        if (_localIdleTimeout != Timeout.InfiniteTimeSpan)
 6991138        {
 6991139            parameters.Add(EncodeParameter(ParameterKey.IdleTimeout, (ulong)_localIdleTimeout.TotalMilliseconds));
 6991140        }
 7011141        if (_maxBidirectionalStreams > 0)
 6351142        {
 6351143            parameters.Add(EncodeParameter(ParameterKey.MaxBidirectionalStreams, (ulong)_maxBidirectionalStreams));
 6351144        }
 7011145        if (_maxUnidirectionalStreams > 0)
 7011146        {
 7011147            parameters.Add(EncodeParameter(ParameterKey.MaxUnidirectionalStreams, (ulong)_maxUnidirectionalStreams));
 7011148        }
 1149
 7011150        return new Dictionary<ParameterKey, IList<byte>>(parameters);
 1151
 1152        static KeyValuePair<ParameterKey, IList<byte>> EncodeParameter(ParameterKey key, ulong value)
 34371153        {
 34371154            int sizeLength = SliceEncoder.GetVarUInt62EncodedSize(value);
 34371155            byte[] buffer = new byte[sizeLength];
 34371156            SliceEncoder.EncodeVarUInt62(value, buffer);
 34371157            return new(key, buffer);
 34371158        }
 7011159    }
 1160
 1161    private bool IsUnknownStream(ulong streamId)
 52561162    {
 52561163        bool isRemote = streamId % 2 == (IsServer ? 0ul : 1ul);
 52561164        bool isBidirectional = streamId % 4 < 2;
 52561165        if (isRemote)
 28011166        {
 28011167            if (isBidirectional)
 13581168            {
 13581169                return _lastRemoteBidirectionalStreamId is null || streamId > _lastRemoteBidirectionalStreamId;
 1170            }
 1171            else
 14431172            {
 14431173                return _lastRemoteUnidirectionalStreamId is null || streamId > _lastRemoteUnidirectionalStreamId;
 1174            }
 1175        }
 1176        else
 24551177        {
 24551178            if (isBidirectional)
 12881179            {
 12881180                return streamId >= _nextBidirectionalId;
 1181            }
 1182            else
 11671183            {
 11671184                return streamId >= _nextUnidirectionalId;
 1185            }
 1186        }
 52561187    }
 1188
 1189    private Task ReadFrameAsync(FrameType frameType, int size, ulong? streamId, CancellationToken cancellationToken)
 120101190    {
 120101191        if (frameType >= FrameType.Stream && streamId is null)
 01192        {
 01193            throw new InvalidDataException("Received stream frame without stream ID.");
 1194        }
 1195
 120101196        switch (frameType)
 1197        {
 1198            case FrameType.Close:
 941199            {
 941200                return ReadCloseFrameAsync(size, cancellationToken);
 1201            }
 1202            case FrameType.Ping:
 161203            {
 161204                return ReadPingFrameAndWritePongFrameAsync(size, cancellationToken);
 1205            }
 1206            case FrameType.Pong:
 171207            {
 171208                return ReadPongFrameAsync(size, cancellationToken);
 1209            }
 1210            case FrameType.Stream:
 1211            case FrameType.StreamLast:
 87791212            {
 87791213                return ReadStreamDataFrameAsync(frameType, size, streamId!.Value, cancellationToken);
 1214            }
 1215            case FrameType.StreamWindowUpdate:
 12571216            {
 12571217                if (IsUnknownStream(streamId!.Value))
 11218                {
 11219                    throw new InvalidDataException($"Received {frameType} frame for unknown stream.");
 1220                }
 1221
 12561222                return ReadStreamWindowUpdateFrameAsync(size, streamId!.Value, cancellationToken);
 1223            }
 1224            case FrameType.StreamReadsClosed:
 1225            case FrameType.StreamWritesClosed:
 18441226            {
 18441227                if (size > 0)
 21228                {
 21229                    throw new InvalidDataException($"Unexpected body for {frameType} frame.");
 1230                }
 18421231                if (IsUnknownStream(streamId!.Value))
 21232                {
 21233                    throw new InvalidDataException($"Received {frameType} frame for unknown stream.");
 1234                }
 1235
 18401236                if (_streams.TryGetValue(streamId.Value, out SlicStream? stream))
 13601237                {
 13601238                    if (frameType == FrameType.StreamWritesClosed)
 471239                    {
 471240                        stream.ReceivedWritesClosedFrame();
 471241                    }
 1242                    else
 13131243                    {
 13131244                        stream.ReceivedReadsClosedFrame();
 13131245                    }
 13601246                }
 18401247                return Task.CompletedTask;
 1248            }
 1249            default:
 31250            {
 31251                throw new InvalidDataException($"Received unexpected {frameType} frame.");
 1252            }
 1253        }
 1254
 1255        async Task ReadCloseFrameAsync(int size, CancellationToken cancellationToken)
 941256        {
 941257            CloseBody closeBody = await ReadFrameBodyAsync(
 941258                FrameType.Close,
 941259                size,
 931260                (ref SliceDecoder decoder) => new CloseBody(ref decoder),
 941261                cancellationToken).ConfigureAwait(false);
 1262
 921263            IceRpcError? peerCloseError = closeBody.ApplicationErrorCode switch
 921264            {
 701265                (ulong)MultiplexedConnectionCloseError.NoError => IceRpcError.ConnectionClosedByPeer,
 41266                (ulong)MultiplexedConnectionCloseError.Refused => IceRpcError.ConnectionRefused,
 81267                (ulong)MultiplexedConnectionCloseError.ServerBusy => IceRpcError.ServerBusy,
 51268                (ulong)MultiplexedConnectionCloseError.Aborted => IceRpcError.ConnectionAborted,
 51269                _ => null
 921270            };
 1271
 1272            bool notAlreadyClosed;
 921273            if (peerCloseError is null)
 51274            {
 51275                notAlreadyClosed = TryClose(
 51276                    new IceRpcException(IceRpcError.ConnectionAborted),
 51277                    $"The connection was closed by the peer with an unknown application error code: '{closeBody.Applicat
 51278                    IceRpcError.ConnectionAborted);
 51279            }
 1280            else
 871281            {
 871282                notAlreadyClosed = TryClose(
 871283                    new IceRpcException(peerCloseError.Value),
 871284                    "The connection was closed by the peer.",
 871285                    peerCloseError);
 871286            }
 1287
 1288            // The server-side of the duplex connection is only shutdown once the client-side is shutdown. When using
 1289            // TCP, this ensures that the server TCP connection won't end-up in the TIME_WAIT state on the server-side.
 921290            if (notAlreadyClosed && !IsServer)
 261291            {
 1292                // DisposeAsync waits for the reads frames task to complete before disposing the writer.
 1293                // _writeSemaphore alone serializes access to the writer.
 261294                using (await _writeSemaphore.AcquireAsync(cancellationToken).ConfigureAwait(false))
 261295                {
 261296                    _duplexConnectionWriter.Shutdown();
 261297                }
 261298                await _duplexConnectionWriter.WriterTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 261299            }
 921300        }
 1301
 1302        async Task ReadPingFrameAndWritePongFrameAsync(int size, CancellationToken cancellationToken)
 161303        {
 1304            // Read the ping frame.
 161305            PingBody pingBody = await ReadFrameBodyAsync(
 161306                FrameType.Ping,
 161307                size,
 151308                (ref SliceDecoder decoder) => new PingBody(ref decoder),
 161309                cancellationToken).ConfigureAwait(false);
 1310
 1311            // Return a pong frame with the ping payload.
 141312            await WriteConnectionFrameAsync(
 141313                FrameType.Pong,
 141314                new PongBody(pingBody.Payload).Encode,
 141315                cancellationToken).ConfigureAwait(false);
 141316        }
 1317
 1318        async Task ReadPongFrameAsync(int size, CancellationToken cancellationToken)
 171319        {
 171320            if (Interlocked.Decrement(ref _pendingPongCount) >= 0)
 141321            {
 1322                // Ensure the pong frame payload value is expected.
 1323
 141324                PongBody pongBody = await ReadFrameBodyAsync(
 141325                    FrameType.Pong,
 141326                    size,
 141327                    (ref SliceDecoder decoder) => new PongBody(ref decoder),
 141328                    cancellationToken).ConfigureAwait(false);
 1329
 1330                // For now, we only send a 0 or 1 payload value (0 for "write ping" and 1 for "read ping").
 141331                if (pongBody.Payload != 0L && pongBody.Payload != 1L)
 01332                {
 01333                    throw new InvalidDataException($"Received {nameof(FrameType.Pong)} with unexpected payload.");
 1334                }
 141335            }
 1336            else
 31337            {
 1338                // If not waiting for a pong frame, this pong frame is unexpected.
 31339                throw new InvalidDataException($"Received unexpected {nameof(FrameType.Pong)} frame.");
 1340            }
 141341        }
 1342
 1343        async Task ReadStreamWindowUpdateFrameAsync(int size, ulong streamId, CancellationToken cancellationToken)
 12561344        {
 12561345            StreamWindowUpdateBody frame = await ReadFrameBodyAsync(
 12561346                FrameType.StreamWindowUpdate,
 12561347                size,
 12561348                (ref SliceDecoder decoder) => new StreamWindowUpdateBody(ref decoder),
 12561349                cancellationToken).ConfigureAwait(false);
 12561350            if (_streams.TryGetValue(streamId, out SlicStream? stream))
 12161351            {
 12161352                stream.ReceivedWindowUpdateFrame(frame);
 12151353            }
 12551354        }
 1355
 1356        async Task<T> ReadFrameBodyAsync<T>(
 1357            FrameType frameType,
 1358            int size,
 1359            DecodeFunc<T> decodeFunc,
 1360            CancellationToken cancellationToken)
 13801361        {
 13801362            if (size <= 0)
 21363            {
 21364                throw new InvalidDataException($"Unexpected empty body for {frameType} frame.");
 1365            }
 1366
 13781367            ReadOnlySequence<byte> buffer = await _duplexConnectionReader.ReadAtLeastAsync(size, cancellationToken)
 13781368                .ConfigureAwait(false);
 1369
 13781370            if (buffer.Length > size)
 9541371            {
 9541372                buffer = buffer.Slice(0, size);
 9541373            }
 1374
 13781375            T decodedFrame = buffer.DecodeSliceBuffer(decodeFunc);
 13761376            _duplexConnectionReader.AdvanceTo(buffer.End);
 13761377            return decodedFrame;
 13761378        }
 120021379    }
 1380
 1381    private async ValueTask<(FrameType FrameType, int FrameSize, ulong? StreamId)?> ReadFrameHeaderAsync(
 1382        CancellationToken cancellationToken)
 133781383    {
 133781384        while (true)
 133781385        {
 1386            // Read data from the pipe reader.
 133781387            if (!_duplexConnectionReader.TryRead(out ReadOnlySequence<byte> buffer))
 84921388            {
 84921389                buffer = await _duplexConnectionReader.ReadAsync(cancellationToken).ConfigureAwait(false);
 79531390            }
 1391
 128391392            if (buffer.IsEmpty)
 1391393            {
 1391394                return null;
 1395            }
 1396
 127001397            if (TryDecodeHeader(
 127001398                buffer,
 127001399                out (FrameType FrameType, int FrameSize, ulong? StreamId) header,
 127001400                out int consumed))
 126941401            {
 126941402                _duplexConnectionReader.AdvanceTo(buffer.GetPosition(consumed));
 126941403                return header;
 1404            }
 1405            else
 01406            {
 01407                _duplexConnectionReader.AdvanceTo(buffer.Start, buffer.End);
 01408            }
 01409        }
 1410
 1411        static bool TryDecodeHeader(
 1412            ReadOnlySequence<byte> buffer,
 1413            out (FrameType FrameType, int FrameSize, ulong? StreamId) header,
 1414            out int consumed)
 127001415        {
 127001416            header = default;
 127001417            consumed = default;
 1418
 127001419            var decoder = new SliceDecoder(buffer);
 1420
 1421            // Decode the frame type and frame size.
 127001422            if (!decoder.TryDecodeUInt8(out byte frameType) || !decoder.TryDecodeVarUInt62(out ulong frameSize))
 01423            {
 01424                return false;
 1425            }
 1426
 127001427            header.FrameType = frameType.AsFrameType();
 1428            try
 126971429            {
 126971430                header.FrameSize = checked((int)frameSize);
 126971431            }
 01432            catch (OverflowException exception)
 01433            {
 01434                throw new InvalidDataException("The frame size can't be larger than int.MaxValue.", exception);
 1435            }
 1436
 1437            // Reject oversized control frame bodies before any buffering occurs.
 126971438            if (header.FrameType < FrameType.Stream && header.FrameSize > MaxControlFrameBodySize)
 11439            {
 11440                throw new InvalidDataException(
 11441                    $"The {header.FrameType} frame body size ({header.FrameSize}) exceeds the maximum allowed size ({Max
 1442            }
 1443
 1444            // If it's a stream frame, try to decode the stream ID
 126961445            if (header.FrameType >= FrameType.Stream)
 118821446            {
 118821447                if (header.FrameSize == 0)
 11448                {
 11449                    throw new InvalidDataException("Invalid stream frame size.");
 1450                }
 1451
 118811452                consumed = (int)decoder.Consumed;
 118811453                if (!decoder.TryDecodeVarUInt62(out ulong streamId))
 01454                {
 01455                    return false;
 1456                }
 118811457                header.StreamId = streamId;
 118811458                header.FrameSize -= (int)decoder.Consumed - consumed;
 1459
 118811460                if (header.FrameSize < 0)
 11461                {
 11462                    throw new InvalidDataException("Invalid stream frame size.");
 1463                }
 118801464            }
 1465
 126941466            consumed = (int)decoder.Consumed;
 126941467            return true;
 126941468        }
 128331469    }
 1470
 1471    private async Task ReadFramesAsync(CancellationToken cancellationToken)
 6761472    {
 1473        try
 6761474        {
 126661475            while (true)
 126661476            {
 126661477                (FrameType Type, int Size, ulong? StreamId)? header = await ReadFrameHeaderAsync(cancellationToken)
 126661478                    .ConfigureAwait(false);
 1479
 121481480                if (header is null)
 1381481                {
 1482                    lock (_mutex)
 1381483                    {
 1381484                        if (!_isClosed)
 01485                        {
 1486                            // Unexpected duplex connection shutdown.
 01487                            throw new IceRpcException(IceRpcError.ConnectionAborted);
 1488                        }
 1381489                    }
 1490                    // The peer has shut down the duplex connection.
 1381491                    break;
 1492                }
 1493
 120101494                await ReadFrameAsync(header.Value.Type, header.Value.Size, header.Value.StreamId, cancellationToken)
 120101495                    .ConfigureAwait(false);
 119901496            }
 1497
 1381498            if (IsServer)
 711499            {
 711500                Debug.Assert(_isClosed);
 1501
 1502                // The server-side of the duplex connection is only shutdown once the client-side is shutdown. When
 1503                // using TCP, this ensures that the server TCP connection won't end-up in the TIME_WAIT state on the
 1504                // server-side.
 1505
 1506                // DisposeAsync waits for the reads frames task to complete before disposing the writer.
 1507                // _writeSemaphore alone serializes access to the writer and guards _writerIsShutdown.
 711508                using (await _writeSemaphore.AcquireAsync(cancellationToken).ConfigureAwait(false))
 711509                {
 711510                    _duplexConnectionWriter.Shutdown();
 1511
 1512                    // Make sure that CloseAsync doesn't call Write on the writer if it's called shortly after the peer
 1513                    // shutdown its side of the connection (which triggers ReadFrameHeaderAsync to return null).
 711514                    _writerIsShutdown = true;
 711515                }
 1516
 711517                await _duplexConnectionWriter.WriterTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 701518            }
 1371519        }
 2611520        catch (OperationCanceledException)
 2611521        {
 1522            // Expected, DisposeAsync was called.
 2611523        }
 2561524        catch (IceRpcException exception)
 2561525        {
 2561526            TryClose(exception, "The connection was lost.", IceRpcError.ConnectionAborted);
 2561527            throw;
 1528        }
 221529        catch (InvalidDataException exception)
 221530        {
 221531            var rpcException = new IceRpcException(
 221532                IceRpcError.IceRpcError,
 221533                "The connection was aborted by a Slic protocol error.",
 221534                exception);
 221535            TryClose(rpcException, rpcException.Message, IceRpcError.IceRpcError);
 221536            throw rpcException;
 1537        }
 01538        catch (Exception exception)
 01539        {
 01540            Debug.Fail($"The read frames task completed due to an unhandled exception: {exception}");
 01541            TryClose(exception, "The connection was lost.", IceRpcError.ConnectionAborted);
 01542            throw;
 1543        }
 3981544    }
 1545
 1546    private async Task ReadStreamDataFrameAsync(
 1547        FrameType type,
 1548        int size,
 1549        ulong streamId,
 1550        CancellationToken cancellationToken)
 87791551    {
 87791552        bool endStream = type == FrameType.StreamLast;
 87791553        bool isRemote = streamId % 2 == (IsServer ? 0ul : 1ul);
 87791554        bool isBidirectional = streamId % 4 < 2;
 1555
 87791556        if (!isBidirectional && !isRemote)
 01557        {
 01558            throw new InvalidDataException(
 01559                "Received unexpected stream frame on local unidirectional stream.");
 1560        }
 87791561        else if (size == 0 && !endStream)
 11562        {
 11563            throw new InvalidDataException($"Received invalid {nameof(FrameType.Stream)} frame.");
 1564        }
 87781565        else if (size > _maxStreamFrameSize)
 11566        {
 11567            throw new InvalidDataException(
 11568                $"Received stream frame with size {size} exceeding the advertised maximum of {_maxStreamFrameSize} bytes
 1569        }
 1570
 87771571        if (!_streams.TryGetValue(streamId, out SlicStream? stream) && isRemote && IsUnknownStream(streamId))
 21051572        {
 1573            // Create a new remote stream.
 1574
 21051575            if (size == 0)
 01576            {
 01577                throw new InvalidDataException("Received empty stream frame on new stream.");
 1578            }
 1579
 21051580            if (isBidirectional)
 6851581            {
 6851582                ulong expectedStreamId = _lastRemoteBidirectionalStreamId is ulong lastId
 6851583                    ? lastId + 4
 6851584                    : (IsServer ? 0ul : 1ul);
 6851585                if (streamId != expectedStreamId)
 11586                {
 11587                    throw new InvalidDataException("Invalid stream ID.");
 1588                }
 1589
 6841590                if (_bidirectionalStreamCount == _maxBidirectionalStreams)
 01591                {
 01592                    throw new IceRpcException(
 01593                        IceRpcError.IceRpcError,
 01594                        $"The maximum bidirectional stream count {_maxBidirectionalStreams} was reached.");
 1595                }
 6841596                Interlocked.Increment(ref _bidirectionalStreamCount);
 6841597            }
 1598            else
 14201599            {
 14201600                ulong expectedStreamId = _lastRemoteUnidirectionalStreamId is ulong lastId
 14201601                    ? lastId + 4
 14201602                    : (IsServer ? 2ul : 3ul);
 14201603                if (streamId != expectedStreamId)
 11604                {
 11605                    throw new InvalidDataException("Invalid stream ID.");
 1606                }
 1607
 14191608                if (_unidirectionalStreamCount == _maxUnidirectionalStreams)
 01609                {
 01610                    throw new IceRpcException(
 01611                        IceRpcError.IceRpcError,
 01612                        $"The maximum unidirectional stream count {_maxUnidirectionalStreams} was reached.");
 1613                }
 14191614                Interlocked.Increment(ref _unidirectionalStreamCount);
 14191615            }
 1616
 1617            // The stream is registered with the connection and queued on the channel. The caller of AcceptStreamAsync
 1618            // is responsible for cleaning up the stream.
 21031619            stream = new SlicStream(this, isBidirectional, isRemote: true);
 1620
 1621            try
 21031622            {
 21031623                AddStream(streamId, stream);
 1624
 1625                try
 21021626                {
 21021627                    await _acceptStreamChannel.Writer.WriteAsync(
 21021628                        stream,
 21021629                        cancellationToken).ConfigureAwait(false);
 21021630                }
 01631                catch (ChannelClosedException exception)
 01632                {
 1633                    // The exception given to ChannelWriter.Complete(Exception? exception) is the InnerException.
 01634                    Debug.Assert(exception.InnerException is not null);
 01635                    throw ExceptionUtil.Throw(exception.InnerException);
 1636                }
 21021637            }
 11638            catch (IceRpcException)
 11639            {
 1640                // The two methods above throw IceRpcException if the connection has been closed (either by CloseAsync
 1641                // or because the close frame was received). We cleanup up the stream but don't throw to not abort the
 1642                // reading. The connection graceful closure still needs to read on the connection to figure out when the
 1643                // peer shuts down the duplex connection.
 11644                Debug.Assert(_isClosed);
 11645                stream.Input.Complete();
 11646                if (isBidirectional)
 01647                {
 01648                    stream.Output.Complete();
 01649                }
 11650            }
 21031651        }
 1652
 87751653        bool isDataConsumed = false;
 87751654        if (stream is not null)
 87091655        {
 1656            // Let the stream consume the stream frame data.
 87091657            isDataConsumed = await stream.ReceivedDataFrameAsync(
 87091658                size,
 87091659                endStream,
 87091660                cancellationToken).ConfigureAwait(false);
 87091661        }
 1662
 87751663        if (!isDataConsumed)
 911664        {
 1665            // The stream (if any) didn't consume the data. Read and ignore the data using a helper pipe.
 911666            var pipe = new Pipe(
 911667                new PipeOptions(
 911668                    pool: Pool,
 911669                    pauseWriterThreshold: 0,
 911670                    minimumSegmentSize: MinSegmentSize,
 911671                    useSynchronizationContext: false));
 1672
 911673            await _duplexConnectionReader.FillBufferWriterAsync(
 911674                    pipe.Writer,
 911675                    size,
 911676                    cancellationToken).ConfigureAwait(false);
 1677
 911678            pipe.Writer.Complete();
 911679            pipe.Reader.Complete();
 911680        }
 87751681    }
 1682
 1683    private bool TryClose(Exception exception, string closeMessage, IceRpcError? peerCloseError = null)
 12301684    {
 1685        lock (_mutex)
 12301686        {
 12301687            if (_isClosed)
 4781688            {
 4781689                return false;
 1690            }
 7521691            _isClosed = true;
 7521692            _closedMessage = closeMessage;
 7521693            _peerCloseError = peerCloseError;
 7521694            if (_streamSemaphoreWaitCount == 0)
 7451695            {
 7451696                _streamSemaphoreWaitClosed.SetResult();
 7451697            }
 7521698        }
 1699
 1700        // Cancel pending CreateStreamAsync, AcceptStreamAsync and WriteStreamDataFrameAsync operations.
 7521701        _closedCts.Cancel();
 7521702        _acceptStreamChannel.Writer.TryComplete(exception);
 1703
 1704        // Close streams.
 36381705        foreach (SlicStream stream in _streams.Values)
 6911706        {
 6911707            stream.Close(exception);
 6911708        }
 1709
 7521710        return true;
 12301711    }
 1712
 1713    private void WriteFrame(FrameType frameType, ulong? streamId, EncodeAction? encode)
 45501714    {
 45501715        var encoder = new SliceEncoder(_duplexConnectionWriter);
 45501716        encoder.EncodeFrameType(frameType);
 1717        // 2 bytes is sufficient: control frame bodies are limited to MaxControlFrameBodySize (16,383) and the
 1718        // stream frames encoded by WriteFrame carry at most a stream ID + a small body (e.g., StreamWindowUpdate).
 45501719        Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(2);
 45501720        int startPos = encoder.EncodedByteCount;
 45501721        if (streamId is not null)
 37261722        {
 37261723            encoder.EncodeVarUInt62(streamId.Value);
 37261724        }
 45501725        encode?.Invoke(ref encoder);
 45501726        SliceEncoder.EncodeVarUInt62((ulong)(encoder.EncodedByteCount - startPos), sizePlaceholder);
 45501727    }
 1728}

Methods/Properties

get_IsServer()
get_MinSegmentSize()
get_PeerInitialStreamWindowSize()
get_PeerMaxStreamFrameSize()
get_Pool()
get_InitialStreamWindowSize()
get_PauseWriterThreshold()
get_StreamWindowUpdateThreshold()
.ctor(IceRpc.Transports.IDuplexConnection,IceRpc.Transports.MultiplexedConnectionOptions,IceRpc.Transports.Slic.SlicTransportOptions,System.Boolean)
AcceptStreamAsync()
ConnectAsync(System.Threading.CancellationToken)
PerformConnectAsync()
DecodeInitialize()
DecodeInitializeAckOrVersion()
ReadFrameAsync()
CloseAsync()
CreateStreamAsync()
DisposeAsync()
PerformDisposeAsync()
SendPingAsync()
SendReadPing()
SendWritePing()
FillBufferWriterAsync(System.Buffers.IBufferWriter`1<System.Byte>,System.Int32,System.Threading.CancellationToken)
ReleaseStream(IceRpc.Transports.Slic.Internal.SlicStream)
ThrowIfClosed()
WriteConnectionFrameAsync()
WriteStreamFrame(IceRpc.Transports.Slic.Internal.SlicStream,IceRpc.Transports.Slic.Internal.FrameType,ZeroC.Slice.Codec.EncodeAction,System.Boolean)
WriteStreamFrameAsync()
WriteStreamDataFrameAsync()
EncodeStreamFrameHeader()
AddStream(System.UInt64,IceRpc.Transports.Slic.Internal.SlicStream)
DecodeParameters(System.Collections.Generic.IDictionary`2<IceRpc.Transports.Slic.Internal.ParameterKey,System.Collections.Generic.IList`1<System.Byte>>)
DecodeParamValue()
EncodeParameters()
EncodeParameter()
IsUnknownStream(System.UInt64)
ReadFrameAsync(IceRpc.Transports.Slic.Internal.FrameType,System.Int32,System.Nullable`1<System.UInt64>,System.Threading.CancellationToken)
ReadCloseFrameAsync()
ReadPingFrameAndWritePongFrameAsync()
ReadPongFrameAsync()
ReadStreamWindowUpdateFrameAsync()
ReadFrameBodyAsync()
ReadFrameHeaderAsync()
TryDecodeHeader()
ReadFramesAsync()
ReadStreamDataFrameAsync()
TryClose(System.Exception,System.String,System.Nullable`1<IceRpc.IceRpcError>)
WriteFrame(IceRpc.Transports.Slic.Internal.FrameType,System.Nullable`1<System.UInt64>,ZeroC.Slice.Codec.EncodeAction)