< Summary

Information
Class: IceRpc.Transports.Slic.Internal.SlicConnection
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicConnection.cs
Tag: 276_17717543480
Line coverage
89%
Covered lines: 916
Uncovered lines: 102
Coverable lines: 1018
Total lines: 1578
Line coverage: 89.9%
Branch coverage
90%
Covered branches: 297
Total branches: 330
Branch coverage: 90%
Method coverage
97%
Covered methods: 45
Total methods: 46
Method coverage: 97.8%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_IsServer()100%11100%
get_MinSegmentSize()100%11100%
get_PeerInitialStreamWindowSize()100%11100%
get_PeerMaxStreamFrameSize()100%11100%
get_Pool()100%11100%
get_InitialStreamWindowSize()100%11100%
get_StreamWindowUpdateThreshold()100%11100%
.ctor(...)100%44100%
AcceptStreamAsync()100%66100%
ConnectAsync(...)75%4.05485.71%
PerformConnectAsync()100%24.052495.5%
DecodeInitialize()75%4.02490%
DecodeInitializeAckOrVersion()66.66%6.01692.85%
ReadFrameAsync()100%88100%
CloseAsync()100%1414100%
CreateStreamAsync()100%141497.72%
DisposeAsync()100%22100%
PerformDisposeAsync()93.75%16.221690.47%
SendPing()100%1.04166.66%
SendReadPing()100%22100%
SendWritePing()0%620%
FillBufferWriterAsync(...)100%11100%
ReleaseStream(...)100%88100%
ThrowIfClosed()100%22100%
WriteConnectionFrame(...)100%22100%
WriteStreamFrame(...)87.5%88100%
WriteStreamDataFrameAsync()94.44%36.443693.02%
EncodeStreamFrameHeader()100%22100%
AddStream(...)83.33%6.04690%
DecodeParameters(...)75%34.52473.68%
DecodeParamValue()100%1.03170%
EncodeParameters()100%66100%
EncodeParameter()100%11100%
IsUnknownStream(...)100%1212100%
ReadFrameAsync(...)95.65%23.072394.87%
ReadCloseFrameAsync()100%1313100%
ReadPingFrameAndWritePongFrameAsync()100%11100%
ReadPongFrameAsync()66.66%6.09686.66%
ReadStreamWindowUpdateFrameAsync()100%22100%
ReadFrameBodyAsync()100%44100%
ReadFrameHeaderAsync()83.33%6.22681.81%
TryDecodeHeader()75%13.511278.12%
ReadFramesAsync()83.33%6.1686%
ReadStreamDataFrameAsync()77.77%91.043665.11%
TryClose(...)100%66100%
WriteFrame(...)100%44100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicConnection.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using IceRpc.Transports.Internal;
 5using System.Buffers;
 6using System.Collections.Concurrent;
 7using System.Diagnostics;
 8using System.IO.Pipelines;
 9using System.Security.Authentication;
 10using System.Threading.Channels;
 11using ZeroC.Slice;
 12
 13namespace IceRpc.Transports.Slic.Internal;
 14
 15/// <summary>The Slic connection implements an <see cref="IMultiplexedConnection" /> on top of a <see
 16/// cref="IDuplexConnection" />.</summary>
 17internal class SlicConnection : IMultiplexedConnection
 18{
 19    /// <summary>Gets a value indicating whether or not this is the server-side of the connection.</summary>
 3359820    internal bool IsServer { get; }
 21
 22    /// <summary>Gets the minimum size of the segment requested from <see cref="Pool" />.</summary>
 1079123    internal int MinSegmentSize { get; }
 24
 25    /// <summary>Gets the peer's initial stream window size. This property is set to the <see
 26    /// cref="ParameterKey.InitialStreamWindowSize"/> value carried by the <see cref="FrameType.Initialize" />
 27    /// frame.</summary>
 654028    internal int PeerInitialStreamWindowSize { get; private set; }
 29
 30    /// <summary>Gets the maximum size of stream frames accepted by the peer. This property is set to the <see
 31    /// cref="ParameterKey.MaxStreamFrameSize"/> value carried by the <see cref="FrameType.Initialize" />
 32    /// frame.</summary>
 1938333    internal int PeerMaxStreamFrameSize { get; private set; }
 34
 35    /// <summary>Gets the <see cref="MemoryPool{T}" /> used for obtaining memory buffers.</summary>
 1079136    internal MemoryPool<byte> Pool { get; }
 37
 38    /// <summary>Gets the initial stream window size.</summary>
 1941139    internal int InitialStreamWindowSize { get; }
 40
 41    /// <summary>Gets the window update threshold. When the window size is increased and this threshold reached, a <see
 42    /// cref="FrameType.StreamWindowUpdate" /> frame is sent.</summary>
 1284243    internal int StreamWindowUpdateThreshold => InitialStreamWindowSize / StreamWindowUpdateRatio;
 44
 45    // The ratio used to compute the StreamWindowUpdateThreshold. For now, the stream window update is sent when the
 46    // window size grows over InitialStreamWindowSize / StreamWindowUpdateRatio.
 47    private const int StreamWindowUpdateRatio = 2;
 48
 49    private readonly Channel<IMultiplexedStream> _acceptStreamChannel;
 50    private int _bidirectionalStreamCount;
 51    private SemaphoreSlim? _bidirectionalStreamSemaphore;
 52    private readonly CancellationToken _closedCancellationToken;
 136953    private readonly CancellationTokenSource _closedCts = new();
 54    private string? _closedMessage;
 55    private Task<TransportConnectionInformation>? _connectTask;
 136956    private readonly CancellationTokenSource _disposedCts = new();
 57    private Task? _disposeTask;
 58    private readonly SlicDuplexConnectionDecorator _duplexConnection;
 59    private readonly DuplexConnectionReader _duplexConnectionReader;
 60    private readonly SlicDuplexConnectionWriter _duplexConnectionWriter;
 61    private bool _isClosed;
 62    private ulong? _lastRemoteBidirectionalStreamId;
 63    private ulong? _lastRemoteUnidirectionalStreamId;
 64    private readonly TimeSpan _localIdleTimeout;
 65    private readonly int _maxBidirectionalStreams;
 66    private readonly int _maxStreamFrameSize;
 67    private readonly int _maxUnidirectionalStreams;
 68    // _mutex ensure the assignment of _lastRemoteXxx members and the addition of the stream to _streams is
 69    // an atomic operation.
 136970    private readonly object _mutex = new();
 71    private ulong _nextBidirectionalId;
 72    private ulong _nextUnidirectionalId;
 73    private IceRpcError? _peerCloseError;
 136974    private TimeSpan _peerIdleTimeout = Timeout.InfiniteTimeSpan;
 75    private int _pendingPongCount;
 76    private Task? _readFramesTask;
 77
 136978    private readonly ConcurrentDictionary<ulong, SlicStream> _streams = new();
 79    private int _streamSemaphoreWaitCount;
 136980    private readonly TaskCompletionSource _streamSemaphoreWaitClosed =
 136981        new(TaskCreationOptions.RunContinuationsAsynchronously);
 82
 83    private int _unidirectionalStreamCount;
 84    private SemaphoreSlim? _unidirectionalStreamSemaphore;
 85
 86    // This is only set for server connections to ensure that _duplexConnectionWriter.Write is not called after
 87    // _duplexConnectionWriter.Shutdown. This can occur if the client-side of the connection sends the close frame
 88    // followed by the shutdown of the duplex connection and if CloseAsync is called at the same time on the server
 89    // connection.
 90    private bool _writerIsShutdown;
 91
 92    public async ValueTask<IMultiplexedStream> AcceptStreamAsync(CancellationToken cancellationToken)
 463993    {
 463994        lock (_mutex)
 463995        {
 463996            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 97
 463798            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 299            {
 2100                throw new InvalidOperationException("Cannot accept stream before connecting the Slic connection.");
 101            }
 4635102            if (_isClosed)
 23103            {
 23104                throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 105            }
 4612106        }
 107
 108        try
 4612109        {
 4612110            return await _acceptStreamChannel.Reader.ReadAsync(cancellationToken).ConfigureAwait(false);
 111        }
 184112        catch (ChannelClosedException exception)
 184113        {
 184114            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 183115            Debug.Assert(exception.InnerException is not null);
 116            // The exception given to ChannelWriter.Complete(Exception? exception) is the InnerException.
 183117            throw ExceptionUtil.Throw(exception.InnerException);
 118        }
 3968119    }
 120
 121    public Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken)
 1331122    {
 1331123        lock (_mutex)
 1331124        {
 1331125            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 126
 1331127            if (_connectTask is not null)
 2128            {
 2129                throw new InvalidOperationException("Cannot connect twice a Slic connection.");
 130            }
 1329131            if (_isClosed)
 0132            {
 0133                throw new InvalidOperationException("Cannot connect a closed Slic connection.");
 134            }
 1329135            _connectTask = PerformConnectAsync();
 1329136        }
 1329137        return _connectTask;
 138
 139        async Task<TransportConnectionInformation> PerformConnectAsync()
 1329140        {
 1329141            await Task.Yield(); // Exit mutex lock
 142
 143            // Connect the duplex connection.
 144            TransportConnectionInformation transportConnectionInformation;
 1329145            TimeSpan peerIdleTimeout = TimeSpan.MaxValue;
 146
 147            try
 1329148            {
 1329149                transportConnectionInformation = await _duplexConnection.ConnectAsync(cancellationToken)
 1329150                    .ConfigureAwait(false);
 151
 152                // Initialize the Slic connection.
 1283153                if (IsServer)
 644154                {
 155                    // Read the Initialize frame.
 644156                    (ulong version, InitializeBody? initializeBody) = await ReadFrameAsync(
 644157                        DecodeInitialize,
 644158                        cancellationToken).ConfigureAwait(false);
 159
 634160                    if (initializeBody is null)
 4161                    {
 162                        // Unsupported version, try to negotiate another version by sending a Version frame with the
 163                        // Slic versions supported by this server.
 4164                        ulong[] supportedVersions = new ulong[] { SlicDefinitions.V1 };
 165
 4166                        WriteConnectionFrame(FrameType.Version, new VersionBody(supportedVersions).Encode);
 167
 4168                        (version, initializeBody) = await ReadFrameAsync(
 4169                            (frameType, buffer) =>
 4170                            {
 4171                                if (frameType is null)
 2172                                {
 4173                                    // The client shut down the connection because it doesn't support any of the
 4174                                    // server's supported Slic versions.
 2175                                    throw new IceRpcException(
 2176                                        IceRpcError.ConnectionRefused,
 2177                                        $"The connection was refused because the client Slic version {version} is not su
 4178                                }
 4179                                else
 2180                                {
 2181                                    return DecodeInitialize(frameType, buffer);
 4182                                }
 2183                            },
 4184                            cancellationToken).ConfigureAwait(false);
 2185                    }
 186
 632187                    Debug.Assert(initializeBody is not null);
 188
 632189                    DecodeParameters(initializeBody.Value.Parameters);
 190
 191                    // Write back an InitializeAck frame.
 632192                    WriteConnectionFrame(FrameType.InitializeAck, new InitializeAckBody(EncodeParameters()).Encode);
 632193                }
 194                else
 639195                {
 196                    // Write the Initialize frame.
 639197                    WriteConnectionFrame(
 639198                        FrameType.Initialize,
 639199                        (ref SliceEncoder encoder) =>
 639200                        {
 639201                            encoder.EncodeVarUInt62(SlicDefinitions.V1);
 639202                            new InitializeBody(EncodeParameters()).Encode(ref encoder);
 1278203                        });
 204
 205                    // Read and decode the InitializeAck or Version frame.
 639206                    (InitializeAckBody? initializeAckBody, VersionBody? versionBody) = await ReadFrameAsync(
 639207                        DecodeInitializeAckOrVersion,
 639208                        cancellationToken).ConfigureAwait(false);
 209
 592210                    Debug.Assert(initializeAckBody is not null || versionBody is not null);
 211
 592212                    if (initializeAckBody is not null)
 588213                    {
 588214                        DecodeParameters(initializeAckBody.Value.Parameters);
 588215                    }
 216
 592217                    if (versionBody is not null)
 4218                    {
 4219                        if (versionBody.Value.Versions.Contains(SlicDefinitions.V1))
 2220                        {
 2221                            throw new InvalidDataException(
 2222                                "The server supported versions include the version initially requested.");
 223                        }
 224                        else
 2225                        {
 226                            // We only support V1 and the peer rejected V1.
 2227                            throw new IceRpcException(
 2228                                IceRpcError.ConnectionRefused,
 2229                                $"The connection was refused because the server only supports Slic version(s) {string.Jo
 230                        }
 231                    }
 588232                }
 1220233            }
 12234            catch (InvalidDataException exception)
 12235            {
 12236                throw new IceRpcException(
 12237                    IceRpcError.IceRpcError,
 12238                    "The connection was aborted by a Slic protocol error.",
 12239                    exception);
 240            }
 49241            catch (OperationCanceledException)
 49242            {
 49243                throw;
 244            }
 8245            catch (AuthenticationException)
 8246            {
 8247                throw;
 248            }
 40249            catch (IceRpcException)
 40250            {
 40251                throw;
 252            }
 0253            catch (Exception exception)
 0254            {
 0255                Debug.Fail($"ConnectAsync failed with an unexpected exception: {exception}");
 0256                throw;
 257            }
 258
 259            // Enable the idle timeout checks after the connection establishment. The Ping frames sent by the keep alive
 260            // check are not expected until the Slic connection initialization completes. The idle timeout check uses
 261            // the smallest idle timeout.
 1220262            TimeSpan idleTimeout = _peerIdleTimeout == Timeout.InfiniteTimeSpan ? _localIdleTimeout :
 1220263                (_peerIdleTimeout < _localIdleTimeout ? _peerIdleTimeout : _localIdleTimeout);
 264
 1220265            if (idleTimeout != Timeout.InfiniteTimeSpan)
 1216266            {
 1216267                _duplexConnection.Enable(idleTimeout);
 1216268            }
 269
 1220270            _readFramesTask = ReadFramesAsync(_disposedCts.Token);
 271
 1220272            return transportConnectionInformation;
 1220273        }
 274
 275        static (ulong, InitializeBody?) DecodeInitialize(FrameType? frameType, ReadOnlySequence<byte> buffer)
 638276        {
 638277            if (frameType != FrameType.Initialize)
 0278            {
 0279                throw new InvalidDataException($"Received unexpected {frameType} frame.");
 280            }
 281
 638282            return SliceEncoding.Slice2.DecodeBuffer<(ulong, InitializeBody?)>(
 638283                buffer,
 638284                (ref SliceDecoder decoder) =>
 638285                {
 638286                    ulong version = decoder.DecodeVarUInt62();
 636287                    if (version == SlicDefinitions.V1)
 632288                    {
 632289                        return (version, new InitializeBody(ref decoder));
 638290                    }
 638291                    else
 4292                    {
 4293                        decoder.Skip((int)(buffer.Length - decoder.Consumed));
 4294                        return (version, null);
 638295                    }
 1274296                });
 636297        }
 298
 299        static (InitializeAckBody?, VersionBody?) DecodeInitializeAckOrVersion(
 300            FrameType? frameType,
 301            ReadOnlySequence<byte> buffer) =>
 596302            frameType switch
 596303            {
 590304                FrameType.InitializeAck => (
 590305                    SliceEncoding.Slice2.DecodeBuffer(
 590306                        buffer,
 590307                        (ref SliceDecoder decoder) => new InitializeAckBody(ref decoder)),
 590308                    null),
 6309                FrameType.Version => (
 6310                    null,
 6311                    SliceEncoding.Slice2.DecodeBuffer(
 6312                        buffer,
 12313                        (ref SliceDecoder decoder) => new VersionBody(ref decoder))),
 0314                _ => throw new InvalidDataException($"Received unexpected Slic frame: '{frameType}'."),
 596315            };
 316
 317        async ValueTask<T> ReadFrameAsync<T>(
 318            Func<FrameType?, ReadOnlySequence<byte>, T> decodeFunc,
 319            CancellationToken cancellationToken)
 1287320        {
 1287321            (FrameType FrameType, int FrameSize, ulong?)? header =
 1287322                await ReadFrameHeaderAsync(cancellationToken).ConfigureAwait(false);
 323
 324            ReadOnlySequence<byte> buffer;
 1236325            if (header is null || header.Value.FrameSize == 0)
 8326            {
 8327                buffer = ReadOnlySequence<byte>.Empty;
 8328            }
 329            else
 1228330            {
 1228331                buffer = await _duplexConnectionReader.ReadAtLeastAsync(
 1228332                    header.Value.FrameSize,
 1228333                    cancellationToken).ConfigureAwait(false);
 1228334                if (buffer.Length > header.Value.FrameSize)
 14335                {
 14336                    buffer = buffer.Slice(0, header.Value.FrameSize);
 14337                }
 1228338            }
 339
 1236340            T decodedFrame = decodeFunc(header?.FrameType, buffer);
 1228341            _duplexConnectionReader.AdvanceTo(buffer.End);
 1228342            return decodedFrame;
 1228343        }
 1329344    }
 345
 346    public async Task CloseAsync(MultiplexedConnectionCloseError closeError, CancellationToken cancellationToken)
 202347    {
 202348        lock (_mutex)
 202349        {
 202350            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 351
 202352            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 2353            {
 2354                throw new InvalidOperationException("Cannot close a Slic connection before connecting it.");
 355            }
 200356        }
 357
 200358        bool waitForWriterShutdown = false;
 200359        if (TryClose(new IceRpcException(IceRpcError.OperationAborted), "The connection was closed."))
 166360        {
 166361            lock (_mutex)
 166362            {
 363                // The duplex connection writer of a server connection might already be shutdown
 364                // (_writerIsShutdown=true) if the client-side sent the Close frame and shut down the duplex connection.
 365                // This doesn't apply to the client-side since the server-side doesn't shutdown the duplex connection
 366                // writer after sending the Close frame.
 166367                if (!IsServer || !_writerIsShutdown)
 165368                {
 165369                    WriteFrame(FrameType.Close, streamId: null, new CloseBody((ulong)closeError).Encode);
 165370                    if (IsServer)
 81371                    {
 81372                        _duplexConnectionWriter.Flush();
 81373                    }
 374                    else
 84375                    {
 376                        // The sending of the client-side Close frame is followed by the shutdown of the duplex
 377                        // connection. For TCP, it's important to always shutdown the connection on the client-side
 378                        // first to avoid TIME_WAIT states on the server-side.
 84379                        _duplexConnectionWriter.Shutdown();
 84380                        waitForWriterShutdown = true;
 84381                    }
 165382                }
 166383            }
 166384        }
 385
 200386        if (waitForWriterShutdown)
 84387        {
 84388            await _duplexConnectionWriter.WriterTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 83389        }
 390
 391        // Now, wait for the peer to close the write side of the connection, which will terminate the read frames task.
 199392        Debug.Assert(_readFramesTask is not null);
 199393        await _readFramesTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 195394    }
 395
 396    public async ValueTask<IMultiplexedStream> CreateStreamAsync(
 397        bool bidirectional,
 398        CancellationToken cancellationToken)
 4095399    {
 4095400        lock (_mutex)
 4095401        {
 4095402            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 403
 4089404            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 4405            {
 4406                throw new InvalidOperationException("Cannot create stream before connecting the Slic connection.");
 407            }
 4085408            if (_isClosed)
 14409            {
 14410                throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 411            }
 412
 4071413            ++_streamSemaphoreWaitCount;
 4071414        }
 415
 416        try
 4071417        {
 4071418            using var createStreamCts = CancellationTokenSource.CreateLinkedTokenSource(
 4071419                _closedCancellationToken,
 4071420                cancellationToken);
 421
 4071422            SemaphoreSlim? streamCountSemaphore = bidirectional ?
 4071423                _bidirectionalStreamSemaphore :
 4071424                _unidirectionalStreamSemaphore;
 425
 4071426            if (streamCountSemaphore is null)
 2427            {
 428                // The stream semaphore is null if the peer's max streams configuration is 0. In this case, we let
 429                // CreateStreamAsync hang indefinitely until the connection is closed.
 2430                await Task.Delay(-1, createStreamCts.Token).ConfigureAwait(false);
 0431            }
 432            else
 4069433            {
 4069434                await streamCountSemaphore.WaitAsync(createStreamCts.Token).ConfigureAwait(false);
 4052435            }
 436
 4052437            return new SlicStream(this, bidirectional, isRemote: false);
 438        }
 19439        catch (OperationCanceledException)
 19440        {
 19441            cancellationToken.ThrowIfCancellationRequested();
 13442            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 12443            Debug.Assert(_isClosed);
 12444            throw new IceRpcException(_peerCloseError ?? IceRpcError.OperationAborted, _closedMessage);
 445        }
 446        finally
 4071447        {
 4071448            lock (_mutex)
 4071449            {
 4071450                --_streamSemaphoreWaitCount;
 4071451                if (_isClosed && _streamSemaphoreWaitCount == 0)
 13452                {
 13453                    _streamSemaphoreWaitClosed.SetResult();
 13454                }
 4071455            }
 4071456        }
 4052457    }
 458
 459    public ValueTask DisposeAsync()
 1894460    {
 1894461        lock (_mutex)
 1894462        {
 1894463            _disposeTask ??= PerformDisposeAsync();
 1894464        }
 1894465        return new(_disposeTask);
 466
 467        async Task PerformDisposeAsync()
 1367468        {
 469            // Make sure we execute the code below without holding the mutex lock.
 1367470            await Task.Yield();
 1367471            TryClose(new IceRpcException(IceRpcError.OperationAborted), "The connection was disposed.");
 472
 1367473            _disposedCts.Cancel();
 474
 475            try
 1367476            {
 1367477                await Task.WhenAll(
 1367478                    _connectTask ?? Task.CompletedTask,
 1367479                    _readFramesTask ?? Task.CompletedTask,
 1367480                    _streamSemaphoreWaitClosed.Task).ConfigureAwait(false);
 770481            }
 597482            catch
 597483            {
 484                // Expected if any of these tasks failed or was canceled. Each task takes care of handling unexpected
 485                // exceptions so there's no need to handle them here.
 597486            }
 487
 488            // Clean-up the streams that might still be queued on the channel.
 1419489            while (_acceptStreamChannel.Reader.TryRead(out IMultiplexedStream? stream))
 52490            {
 52491                if (stream.IsBidirectional)
 10492                {
 10493                    stream.Output.Complete();
 10494                    stream.Input.Complete();
 10495                }
 42496                else if (stream.IsRemote)
 42497                {
 42498                    stream.Input.Complete();
 42499                }
 500                else
 0501                {
 0502                    stream.Output.Complete();
 0503                }
 52504            }
 505
 506            try
 1367507            {
 508                // Prevents unobserved task exceptions.
 1367509                await _acceptStreamChannel.Reader.Completion.ConfigureAwait(false);
 0510            }
 1367511            catch
 1367512            {
 1367513            }
 514
 1367515            await _duplexConnectionWriter.DisposeAsync().ConfigureAwait(false);
 1367516            _duplexConnectionReader.Dispose();
 1367517            _duplexConnection.Dispose();
 518
 1367519            _disposedCts.Dispose();
 1367520            _bidirectionalStreamSemaphore?.Dispose();
 1367521            _unidirectionalStreamSemaphore?.Dispose();
 1367522            _closedCts.Dispose();
 1367523        }
 1894524    }
 525
 1369526    internal SlicConnection(
 1369527        IDuplexConnection duplexConnection,
 1369528        MultiplexedConnectionOptions options,
 1369529        SlicTransportOptions slicOptions,
 1369530        bool isServer)
 1369531    {
 1369532        IsServer = isServer;
 533
 1369534        Pool = options.Pool;
 1369535        MinSegmentSize = options.MinSegmentSize;
 1369536        _maxBidirectionalStreams = options.MaxBidirectionalStreams;
 1369537        _maxUnidirectionalStreams = options.MaxUnidirectionalStreams;
 538
 1369539        InitialStreamWindowSize = slicOptions.InitialStreamWindowSize;
 1369540        _localIdleTimeout = slicOptions.IdleTimeout;
 1369541        _maxStreamFrameSize = slicOptions.MaxStreamFrameSize;
 542
 1369543        _acceptStreamChannel = Channel.CreateUnbounded<IMultiplexedStream>(new UnboundedChannelOptions
 1369544        {
 1369545            SingleReader = true,
 1369546            SingleWriter = true
 1369547        });
 548
 1369549        _closedCancellationToken = _closedCts.Token;
 550
 551        // Only the client-side sends pings to keep the connection alive when idle timeout (set later) is not infinite.
 1369552        _duplexConnection = IsServer ?
 1369553            new SlicDuplexConnectionDecorator(duplexConnection) :
 1369554            new SlicDuplexConnectionDecorator(duplexConnection, SendReadPing, SendWritePing);
 555
 1369556        _duplexConnectionReader = new DuplexConnectionReader(_duplexConnection, options.Pool, options.MinSegmentSize);
 1369557        _duplexConnectionWriter = new SlicDuplexConnectionWriter(
 1369558            _duplexConnection,
 1369559            options.Pool,
 1369560            options.MinSegmentSize);
 561
 562        // We use the same stream ID numbering scheme as Quic.
 1369563        if (IsServer)
 675564        {
 675565            _nextBidirectionalId = 1;
 675566            _nextUnidirectionalId = 3;
 675567        }
 568        else
 694569        {
 694570            _nextBidirectionalId = 0;
 694571            _nextUnidirectionalId = 2;
 694572        }
 573
 574        void SendPing(long payload)
 29575        {
 576            try
 29577            {
 29578                WriteConnectionFrame(FrameType.Ping, new PingBody(payload).Encode);
 27579            }
 2580            catch (IceRpcException)
 2581            {
 582                // Expected if the connection is closed.
 2583            }
 0584            catch (Exception exception)
 0585            {
 0586                Debug.Fail($"The sending of a Ping frame failed with an unexpected exception: {exception}");
 0587                throw;
 588            }
 29589        }
 590
 591        void SendReadPing()
 29592        {
 593            // This local function is no-op if there is already a pending Pong.
 29594            if (Interlocked.CompareExchange(ref _pendingPongCount, 1, 0) == 0)
 29595            {
 29596                SendPing(1L);
 29597            }
 29598        }
 599
 600        void SendWritePing()
 0601        {
 602            // _pendingPongCount can be <= 0 if an unexpected pong is received. If it's the case, the connection is
 603            // being torn down and there's no point in sending a ping frame.
 0604            if (Interlocked.Increment(ref _pendingPongCount) > 0)
 0605            {
 0606                SendPing(0L);
 0607            }
 0608        }
 1369609    }
 610
 611    /// <summary>Fills the given writer with stream data received on the connection.</summary>
 612    /// <param name="bufferWriter">The destination buffer writer.</param>
 613    /// <param name="byteCount">The amount of stream data to read.</param>
 614    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 615    internal ValueTask FillBufferWriterAsync(
 616        IBufferWriter<byte> bufferWriter,
 617        int byteCount,
 618        CancellationToken cancellationToken) =>
 18882619        _duplexConnectionReader.FillBufferWriterAsync(bufferWriter, byteCount, cancellationToken);
 620
 621    /// <summary>Releases a stream from the connection. The connection stream count is decremented and if this is a
 622    /// client allow a new stream to be started.</summary>
 623    /// <param name="stream">The released stream.</param>
 624    internal void ReleaseStream(SlicStream stream)
 8042625    {
 8042626        Debug.Assert(stream.IsStarted);
 627
 8042628        _streams.Remove(stream.Id, out SlicStream? _);
 629
 8042630        if (stream.IsRemote)
 4020631        {
 4020632            if (stream.IsBidirectional)
 1264633            {
 1264634                Interlocked.Decrement(ref _bidirectionalStreamCount);
 1264635            }
 636            else
 2756637            {
 2756638                Interlocked.Decrement(ref _unidirectionalStreamCount);
 2756639            }
 4020640        }
 4022641        else if (!_isClosed)
 3340642        {
 3340643            if (stream.IsBidirectional)
 1145644            {
 1145645                _bidirectionalStreamSemaphore!.Release();
 1145646            }
 647            else
 2195648            {
 2195649                _unidirectionalStreamSemaphore!.Release();
 2195650            }
 3340651        }
 8042652    }
 653
 654    /// <summary>Throws the connection closure exception if the connection is already closed.</summary>
 655    internal void ThrowIfClosed()
 15638656    {
 15638657        lock (_mutex)
 15638658        {
 15638659            if (_isClosed)
 16660            {
 16661                throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 662            }
 15622663        }
 15622664    }
 665
 666    /// <summary>Writes a connection frame.</summary>
 667    /// <param name="frameType">The frame type.</param>
 668    /// <param name="encode">The action to encode the frame.</param>
 669    internal void WriteConnectionFrame(FrameType frameType, EncodeAction? encode)
 1331670    {
 1331671        Debug.Assert(frameType < FrameType.Stream);
 672
 1331673        lock (_mutex)
 1331674        {
 1331675            if (_isClosed)
 2676            {
 2677                throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 678            }
 1329679            WriteFrame(frameType, streamId: null, encode);
 1329680            _duplexConnectionWriter.Flush();
 1329681        }
 1329682    }
 683
 684    /// <summary>Writes a stream frame.</summary>
 685    /// <param name="stream">The stream to write the frame for.</param>
 686    /// <param name="frameType">The frame type.</param>
 687    /// <param name="encode">The action to encode the frame.</param>
 688    /// <param name="writeReadsClosedFrame"><see langword="true" /> if a <see cref="FrameType.StreamReadsClosed" />
 689    /// frame should be written after the stream frame.</param>
 690    /// <remarks>This method is called by streams and might be called on a closed connection. The connection might
 691    /// also be closed concurrently while it's in progress.</remarks>
 692    internal void WriteStreamFrame(
 693        SlicStream stream,
 694        FrameType frameType,
 695        EncodeAction? encode,
 696        bool writeReadsClosedFrame)
 6028697    {
 698        // Ensure that this method is called for any FrameType.StreamXxx frame type except FrameType.Stream.
 6028699        Debug.Assert(frameType >= FrameType.StreamLast && stream.IsStarted);
 700
 6028701        lock (_mutex)
 6028702        {
 6028703            if (_isClosed)
 5704            {
 5705                return;
 706            }
 707
 6023708            WriteFrame(frameType, stream.Id, encode);
 6023709            if (writeReadsClosedFrame)
 144710            {
 144711                WriteFrame(FrameType.StreamReadsClosed, stream.Id, encode: null);
 144712            }
 6023713            if (frameType == FrameType.StreamLast)
 1111714            {
 715                // Notify the stream that the last stream frame is considered sent at this point. This will close
 716                // writes on the stream and allow the stream to be released if reads are also closed.
 1111717                stream.WroteLastStreamFrame();
 1111718            }
 6023719            _duplexConnectionWriter.Flush();
 6023720        }
 6028721    }
 722
 723    /// <summary>Writes a stream data frame.</summary>
 724    /// <param name="stream">The stream to write the frame for.</param>
 725    /// <param name="source1">The first stream frame data source.</param>
 726    /// <param name="source2">The second stream frame data source.</param>
 727    /// <param name="endStream"><see langword="true" /> to write a <see cref="FrameType.StreamLast" /> frame and
 728    /// <see langword="false" /> to write a <see cref="FrameType.Stream" /> frame.</param>
 729    /// <param name="writeReadsClosedFrame"><see langword="true" /> if a <see cref="FrameType.StreamReadsClosed" />
 730    /// frame should be written after the stream frame.</param>
 731    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 732    /// <remarks>This method is called by streams and might be called on a closed connection. The connection might
 733    /// also be closed concurrently while it's in progress.</remarks>
 734    internal async ValueTask<FlushResult> WriteStreamDataFrameAsync(
 735        SlicStream stream,
 736        ReadOnlySequence<byte> source1,
 737        ReadOnlySequence<byte> source2,
 738        bool endStream,
 739        bool writeReadsClosedFrame,
 740        CancellationToken cancellationToken)
 15595741    {
 15595742        Debug.Assert(!source1.IsEmpty || endStream);
 743
 15595744        if (_connectTask is null)
 0745        {
 0746            throw new InvalidOperationException("Cannot send a stream frame before calling ConnectAsync.");
 747        }
 748
 15595749        using var writeCts = CancellationTokenSource.CreateLinkedTokenSource(
 15595750            _closedCancellationToken,
 15595751            cancellationToken);
 752
 753        try
 15595754        {
 755            do
 20186756            {
 757                // Next, ensure send credit is available. If not, this will block until the receiver allows sending
 758                // additional data.
 20186759                int sendCredit = 0;
 20186760                if (!source1.IsEmpty || !source2.IsEmpty)
 20176761                {
 20176762                    sendCredit = await stream.AcquireSendCreditAsync(writeCts.Token).ConfigureAwait(false);
 18149763                    Debug.Assert(sendCredit > 0);
 18149764                }
 765
 766                // Gather data from source1 or source2 up to sendCredit bytes or the peer maximum stream frame size.
 18159767                int sendMaxSize = Math.Min(sendCredit, PeerMaxStreamFrameSize);
 768                ReadOnlySequence<byte> sendSource1;
 769                ReadOnlySequence<byte> sendSource2;
 18159770                if (!source1.IsEmpty)
 16149771                {
 16149772                    int length = Math.Min((int)source1.Length, sendMaxSize);
 16149773                    sendSource1 = source1.Slice(0, length);
 16149774                    source1 = source1.Slice(length);
 16149775                }
 776                else
 2010777                {
 2010778                    sendSource1 = ReadOnlySequence<byte>.Empty;
 2010779                }
 780
 18159781                if (source1.IsEmpty && !source2.IsEmpty)
 4073782                {
 4073783                    int length = Math.Min((int)source2.Length, sendMaxSize - (int)sendSource1.Length);
 4073784                    sendSource2 = source2.Slice(0, length);
 4073785                    source2 = source2.Slice(length);
 4073786                }
 787                else
 14086788                {
 14086789                    sendSource2 = ReadOnlySequence<byte>.Empty;
 14086790                }
 791
 792                // If there's no data left to send and endStream is true, it's the last stream frame.
 18159793                bool lastStreamFrame = endStream && source1.IsEmpty && source2.IsEmpty;
 794
 18159795                lock (_mutex)
 18159796                {
 18159797                    if (_isClosed)
 0798                    {
 0799                        throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 800                    }
 801
 18159802                    if (!stream.IsStarted)
 4022803                    {
 4022804                        if (stream.IsBidirectional)
 1262805                        {
 1262806                            AddStream(_nextBidirectionalId, stream);
 1262807                            _nextBidirectionalId += 4;
 1262808                        }
 809                        else
 2760810                        {
 2760811                            AddStream(_nextUnidirectionalId, stream);
 2760812                            _nextUnidirectionalId += 4;
 2760813                        }
 4022814                    }
 815
 816                    // Notify the stream that we're consuming sendSize credit. It's important to call this before
 817                    // sending the stream frame to avoid race conditions where the StreamWindowUpdate frame could be
 818                    // received before the send credit was updated.
 18159819                    if (sendCredit > 0)
 18149820                    {
 18149821                        stream.ConsumedSendCredit((int)(sendSource1.Length + sendSource2.Length));
 18149822                    }
 823
 18159824                    EncodeStreamFrameHeader(stream.Id, sendSource1.Length + sendSource2.Length, lastStreamFrame);
 825
 18159826                    if (lastStreamFrame)
 1468827                    {
 828                        // Notify the stream that the last stream frame is considered sent at this point. This will
 829                        // complete writes on the stream and allow the stream to be released if reads are also
 830                        // completed.
 1468831                        stream.WroteLastStreamFrame();
 1468832                    }
 833
 834                    // Write and flush the stream frame.
 18159835                    if (!sendSource1.IsEmpty)
 16149836                    {
 16149837                        _duplexConnectionWriter.Write(sendSource1);
 16149838                    }
 18159839                    if (!sendSource2.IsEmpty)
 4073840                    {
 4073841                        _duplexConnectionWriter.Write(sendSource2);
 4073842                    }
 843
 18159844                    if (writeReadsClosedFrame)
 697845                    {
 697846                        WriteFrame(FrameType.StreamReadsClosed, stream.Id, encode: null);
 697847                    }
 18159848                    _duplexConnectionWriter.Flush();
 18159849                }
 18159850            }
 18159851            while (!source1.IsEmpty || !source2.IsEmpty); // Loop until there's no data left to send.
 13568852        }
 2027853        catch (OperationCanceledException)
 2027854        {
 2027855            cancellationToken.ThrowIfCancellationRequested();
 856
 0857            Debug.Assert(_isClosed);
 0858            throw new IceRpcException(_peerCloseError ?? IceRpcError.OperationAborted, _closedMessage);
 859        }
 860
 13568861        return new FlushResult(isCanceled: false, isCompleted: false);
 862
 863        void EncodeStreamFrameHeader(ulong streamId, long size, bool lastStreamFrame)
 18159864        {
 18159865            var encoder = new SliceEncoder(_duplexConnectionWriter, SliceEncoding.Slice2);
 18159866            encoder.EncodeFrameType(!lastStreamFrame ? FrameType.Stream : FrameType.StreamLast);
 18159867            Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(4);
 18159868            int startPos = encoder.EncodedByteCount;
 18159869            encoder.EncodeVarUInt62(streamId);
 18159870            SliceEncoder.EncodeVarUInt62((ulong)(encoder.EncodedByteCount - startPos + size), sizePlaceholder);
 18159871        }
 13568872    }
 873
 874    private void AddStream(ulong id, SlicStream stream)
 8042875    {
 8042876        lock (_mutex)
 8042877        {
 8042878            if (_isClosed)
 0879            {
 0880                throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 881            }
 882
 8042883            _streams[id] = stream;
 884
 885            // Assign the stream ID within the mutex to ensure that the addition of the stream to the connection and the
 886            // stream ID assignment are atomic.
 8042887            stream.Id = id;
 888
 889            // Keep track of the last assigned stream ID. This is used to figure out if the stream is known or unknown.
 8042890            if (stream.IsRemote)
 4020891            {
 4020892                if (stream.IsBidirectional)
 1264893                {
 1264894                    _lastRemoteBidirectionalStreamId = id;
 1264895                }
 896                else
 2756897                {
 2756898                    _lastRemoteUnidirectionalStreamId = id;
 2756899                }
 4020900            }
 8042901        }
 8042902    }
 903
 904    private void DecodeParameters(IDictionary<ParameterKey, IList<byte>> parameters)
 1220905    {
 1220906        int? maxStreamFrameSize = null;
 1220907        int? peerInitialStreamWindowSize = null;
 15446908        foreach ((ParameterKey key, IList<byte> buffer) in parameters)
 5893909        {
 5893910            switch (key)
 911            {
 912                case ParameterKey.MaxBidirectionalStreams:
 1093913                {
 1093914                    int value = DecodeParamValue(buffer);
 1093915                    if (value > 0)
 1093916                    {
 1093917                        _bidirectionalStreamSemaphore = new SemaphoreSlim(value, value);
 1093918                    }
 1093919                    break;
 920                }
 921                case ParameterKey.MaxUnidirectionalStreams:
 1182922                {
 1182923                    int value = DecodeParamValue(buffer);
 1182924                    if (value > 0)
 1182925                    {
 1182926                        _unidirectionalStreamSemaphore = new SemaphoreSlim(value, value);
 1182927                    }
 1182928                    break;
 929                }
 930                case ParameterKey.IdleTimeout:
 1178931                {
 1178932                    _peerIdleTimeout = TimeSpan.FromMilliseconds(DecodeParamValue(buffer));
 1178933                    if (_peerIdleTimeout == TimeSpan.Zero)
 0934                    {
 0935                        throw new InvalidDataException(
 0936                            "The IdleTimeout Slic connection parameter is invalid, it must be greater than 0 s.");
 937                    }
 1178938                    break;
 939                }
 940                case ParameterKey.MaxStreamFrameSize:
 1220941                {
 1220942                    maxStreamFrameSize = DecodeParamValue(buffer);
 1220943                    if (maxStreamFrameSize < 1024)
 0944                    {
 0945                        throw new InvalidDataException(
 0946                            "The MaxStreamFrameSize connection parameter is invalid, it must be greater than 1KB.");
 947                    }
 1220948                    break;
 949                }
 950                case ParameterKey.InitialStreamWindowSize:
 1220951                {
 1220952                    peerInitialStreamWindowSize = DecodeParamValue(buffer);
 1220953                    if (peerInitialStreamWindowSize < 1024)
 0954                    {
 0955                        throw new InvalidDataException(
 0956                            "The InitialStreamWindowSize connection parameter is invalid, it must be greater than 1KB.")
 957                    }
 1220958                    break;
 959                }
 960                // Ignore unsupported parameter.
 961            }
 5893962        }
 963
 1220964        if (maxStreamFrameSize is null)
 0965        {
 0966            throw new InvalidDataException(
 0967                "The peer didn't send the required MaxStreamFrameSize connection parameter.");
 968        }
 969        else
 1220970        {
 1220971            PeerMaxStreamFrameSize = maxStreamFrameSize.Value;
 1220972        }
 973
 1220974        if (peerInitialStreamWindowSize is null)
 0975        {
 0976            throw new InvalidDataException(
 0977                "The peer didn't send the required InitialStreamWindowSize connection parameter.");
 978        }
 979        else
 1220980        {
 1220981            PeerInitialStreamWindowSize = peerInitialStreamWindowSize.Value;
 1220982        }
 983
 984        // all parameter values are currently integers in the range 0..Int32Max encoded as varuint62.
 985        static int DecodeParamValue(IList<byte> buffer)
 5893986        {
 987            // The IList<byte> decoded by the IceRPC + Slice integration is backed by an array
 5893988            ulong value = SliceEncoding.Slice2.DecodeBuffer(
 5893989                new ReadOnlySequence<byte>((byte[])buffer),
 11786990                (ref SliceDecoder decoder) => decoder.DecodeVarUInt62());
 991            try
 5893992            {
 5893993                return checked((int)value);
 994            }
 0995            catch (OverflowException exception)
 0996            {
 0997                throw new InvalidDataException("The value is out of the varuint32 accepted range.", exception);
 998            }
 5893999        }
 12201000    }
 1001
 1002    private Dictionary<ParameterKey, IList<byte>> EncodeParameters()
 12711003    {
 12711004        var parameters = new List<KeyValuePair<ParameterKey, IList<byte>>>
 12711005        {
 12711006            // Required parameters.
 12711007            EncodeParameter(ParameterKey.MaxStreamFrameSize, (ulong)_maxStreamFrameSize),
 12711008            EncodeParameter(ParameterKey.InitialStreamWindowSize, (ulong)InitialStreamWindowSize)
 12711009        };
 1010
 1011        // Optional parameters.
 12711012        if (_localIdleTimeout != Timeout.InfiniteTimeSpan)
 12671013        {
 12671014            parameters.Add(EncodeParameter(ParameterKey.IdleTimeout, (ulong)_localIdleTimeout.TotalMilliseconds));
 12671015        }
 12711016        if (_maxBidirectionalStreams > 0)
 11671017        {
 11671018            parameters.Add(EncodeParameter(ParameterKey.MaxBidirectionalStreams, (ulong)_maxBidirectionalStreams));
 11671019        }
 12711020        if (_maxUnidirectionalStreams > 0)
 12711021        {
 12711022            parameters.Add(EncodeParameter(ParameterKey.MaxUnidirectionalStreams, (ulong)_maxUnidirectionalStreams));
 12711023        }
 1024
 12711025        return new Dictionary<ParameterKey, IList<byte>>(parameters);
 1026
 1027        static KeyValuePair<ParameterKey, IList<byte>> EncodeParameter(ParameterKey key, ulong value)
 62471028        {
 62471029            int sizeLength = SliceEncoder.GetVarUInt62EncodedSize(value);
 62471030            byte[] buffer = new byte[sizeLength];
 62471031            SliceEncoder.EncodeVarUInt62(value, buffer);
 62471032            return new(key, buffer);
 62471033        }
 12711034    }
 1035
 1036    private bool IsUnknownStream(ulong streamId)
 98111037    {
 98111038        bool isRemote = streamId % 2 == (IsServer ? 0ul : 1ul);
 98111039        bool isBidirectional = streamId % 4 < 2;
 98111040        if (isRemote)
 52601041        {
 52601042            if (isBidirectional)
 24331043            {
 24331044                return _lastRemoteBidirectionalStreamId is null || streamId > _lastRemoteBidirectionalStreamId;
 1045            }
 1046            else
 28271047            {
 28271048                return _lastRemoteUnidirectionalStreamId is null || streamId > _lastRemoteUnidirectionalStreamId;
 1049            }
 1050        }
 1051        else
 45511052        {
 45511053            if (isBidirectional)
 22701054            {
 22701055                return streamId >= _nextBidirectionalId;
 1056            }
 1057            else
 22811058            {
 22811059                return streamId >= _nextUnidirectionalId;
 1060            }
 1061        }
 98111062    }
 1063
 1064    private Task ReadFrameAsync(FrameType frameType, int size, ulong? streamId, CancellationToken cancellationToken)
 249631065    {
 249631066        if (frameType >= FrameType.Stream && streamId is null)
 01067        {
 01068            throw new InvalidDataException("Received stream frame without stream ID.");
 1069        }
 1070
 249631071        switch (frameType)
 1072        {
 1073            case FrameType.Close:
 1681074            {
 1681075                return ReadCloseFrameAsync(size, cancellationToken);
 1076            }
 1077            case FrameType.Ping:
 311078            {
 311079                return ReadPingFrameAndWritePongFrameAsync(size, cancellationToken);
 1080            }
 1081            case FrameType.Pong:
 331082            {
 331083                return ReadPongFrameAsync(size, cancellationToken);
 1084            }
 1085            case FrameType.Stream:
 1086            case FrameType.StreamLast:
 190611087            {
 190611088                return ReadStreamDataFrameAsync(frameType, size, streamId!.Value, cancellationToken);
 1089            }
 1090            case FrameType.StreamWindowUpdate:
 21481091            {
 21481092                if (IsUnknownStream(streamId!.Value))
 21093                {
 21094                    throw new InvalidDataException($"Received {frameType} frame for unknown stream.");
 1095                }
 1096
 21461097                return ReadStreamWindowUpdateFrameAsync(size, streamId!.Value, cancellationToken);
 1098            }
 1099            case FrameType.StreamReadsClosed:
 1100            case FrameType.StreamWritesClosed:
 35161101            {
 35161102                if (size > 0)
 41103                {
 41104                    throw new InvalidDataException($"Unexpected body for {frameType} frame.");
 1105                }
 35121106                if (IsUnknownStream(streamId!.Value))
 41107                {
 41108                    throw new InvalidDataException($"Received {frameType} frame for unknown stream.");
 1109                }
 1110
 35081111                if (_streams.TryGetValue(streamId.Value, out SlicStream? stream))
 26711112                {
 26711113                    if (frameType == FrameType.StreamWritesClosed)
 801114                    {
 801115                        stream.ReceivedWritesClosedFrame();
 801116                    }
 1117                    else
 25911118                    {
 25911119                        stream.ReceivedReadsClosedFrame();
 25911120                    }
 26711121                }
 35081122                return Task.CompletedTask;
 1123            }
 1124            default:
 61125            {
 61126                throw new InvalidDataException($"Received unexpected {frameType} frame.");
 1127            }
 1128        }
 1129
 1130        async Task ReadCloseFrameAsync(int size, CancellationToken cancellationToken)
 1681131        {
 1681132            CloseBody closeBody = await ReadFrameBodyAsync(
 1681133                FrameType.Close,
 1681134                size,
 1661135                (ref SliceDecoder decoder) => new CloseBody(ref decoder),
 1681136                cancellationToken).ConfigureAwait(false);
 1137
 1641138            IceRpcError? peerCloseError = closeBody.ApplicationErrorCode switch
 1641139            {
 1201140                (ulong)MultiplexedConnectionCloseError.NoError => IceRpcError.ConnectionClosedByPeer,
 81141                (ulong)MultiplexedConnectionCloseError.Refused => IceRpcError.ConnectionRefused,
 161142                (ulong)MultiplexedConnectionCloseError.ServerBusy => IceRpcError.ServerBusy,
 101143                (ulong)MultiplexedConnectionCloseError.Aborted => IceRpcError.ConnectionAborted,
 101144                _ => null
 1641145            };
 1146
 1147            bool notAlreadyClosed;
 1641148            if (peerCloseError is null)
 101149            {
 101150                notAlreadyClosed = TryClose(
 101151                    new IceRpcException(IceRpcError.ConnectionAborted),
 101152                    $"The connection was closed by the peer with an unknown application error code: '{closeBody.Applicat
 101153                    IceRpcError.ConnectionAborted);
 101154            }
 1155            else
 1541156            {
 1541157                notAlreadyClosed = TryClose(
 1541158                    new IceRpcException(peerCloseError.Value),
 1541159                    "The connection was closed by the peer.",
 1541160                    peerCloseError);
 1541161            }
 1162
 1163            // The server-side of the duplex connection is only shutdown once the client-side is shutdown. When using
 1164            // TCP, this ensures that the server TCP connection won't end-up in the TIME_WAIT state on the server-side.
 1641165            if (notAlreadyClosed && !IsServer)
 531166            {
 1167                // DisposeAsync waits for the reads frames task to complete before disposing the writer.
 531168                lock (_mutex)
 531169                {
 531170                    _duplexConnectionWriter.Shutdown();
 531171                }
 531172                await _duplexConnectionWriter.WriterTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 531173            }
 1641174        }
 1175
 1176        async Task ReadPingFrameAndWritePongFrameAsync(int size, CancellationToken cancellationToken)
 311177        {
 1178            // Read the ping frame.
 311179            PingBody pingBody = await ReadFrameBodyAsync(
 311180                FrameType.Ping,
 311181                size,
 291182                (ref SliceDecoder decoder) => new PingBody(ref decoder),
 311183                cancellationToken).ConfigureAwait(false);
 1184
 1185            // Return a pong frame with the ping payload.
 271186            WriteConnectionFrame(FrameType.Pong, new PongBody(pingBody.Payload).Encode);
 271187        }
 1188
 1189        async Task ReadPongFrameAsync(int size, CancellationToken cancellationToken)
 331190        {
 331191            if (Interlocked.Decrement(ref _pendingPongCount) >= 0)
 271192            {
 1193                // Ensure the pong frame payload value is expected.
 1194
 271195                PongBody pongBody = await ReadFrameBodyAsync(
 271196                    FrameType.Pong,
 271197                    size,
 271198                    (ref SliceDecoder decoder) => new PongBody(ref decoder),
 271199                    cancellationToken).ConfigureAwait(false);
 1200
 1201                // For now, we only send a 0 or 1 payload value (0 for "write ping" and 1 for "read ping").
 271202                if (pongBody.Payload != 0L && pongBody.Payload != 1L)
 01203                {
 01204                    throw new InvalidDataException($"Received {nameof(FrameType.Pong)} with unexpected payload.");
 1205                }
 271206            }
 1207            else
 61208            {
 1209                // If not waiting for a pong frame, this pong frame is unexpected.
 61210                throw new InvalidDataException($"Received unexpected {nameof(FrameType.Pong)} frame.");
 1211            }
 271212        }
 1213
 1214        async Task ReadStreamWindowUpdateFrameAsync(int size, ulong streamId, CancellationToken cancellationToken)
 21461215        {
 21461216            StreamWindowUpdateBody frame = await ReadFrameBodyAsync(
 21461217                FrameType.StreamWindowUpdate,
 21461218                size,
 21461219                (ref SliceDecoder decoder) => new StreamWindowUpdateBody(ref decoder),
 21461220                cancellationToken).ConfigureAwait(false);
 21461221            if (_streams.TryGetValue(streamId, out SlicStream? stream))
 20641222            {
 20641223                stream.ReceivedWindowUpdateFrame(frame);
 20641224            }
 21461225        }
 1226
 1227        async Task<T> ReadFrameBodyAsync<T>(
 1228            FrameType frameType,
 1229            int size,
 1230            DecodeFunc<T> decodeFunc,
 1231            CancellationToken cancellationToken)
 23721232        {
 23721233            if (size <= 0)
 41234            {
 41235                throw new InvalidDataException($"Unexpected empty body for {frameType} frame.");
 1236            }
 1237
 23681238            ReadOnlySequence<byte> buffer = await _duplexConnectionReader.ReadAtLeastAsync(size, cancellationToken)
 23681239                .ConfigureAwait(false);
 1240
 23681241            if (buffer.Length > size)
 15131242            {
 15131243                buffer = buffer.Slice(0, size);
 15131244            }
 1245
 23681246            T decodedFrame = SliceEncoding.Slice2.DecodeBuffer(buffer, decodeFunc);
 23641247            _duplexConnectionReader.AdvanceTo(buffer.End);
 23641248            return decodedFrame;
 23641249        }
 249471250    }
 1251
 1252    private async ValueTask<(FrameType FrameType, int FrameSize, ulong? StreamId)?> ReadFrameHeaderAsync(
 1253        CancellationToken cancellationToken)
 274351254    {
 274351255        while (true)
 274351256        {
 1257            // Read data from the pipe reader.
 274351258            if (!_duplexConnectionReader.TryRead(out ReadOnlySequence<byte> buffer))
 175891259            {
 175891260                buffer = await _duplexConnectionReader.ReadAsync(cancellationToken).ConfigureAwait(false);
 166291261            }
 1262
 264751263            if (buffer.IsEmpty)
 2681264            {
 2681265                return null;
 1266            }
 1267
 262071268            if (TryDecodeHeader(
 262071269                buffer,
 262071270                out (FrameType FrameType, int FrameSize, ulong? StreamId) header,
 262071271                out int consumed))
 261971272            {
 261971273                _duplexConnectionReader.AdvanceTo(buffer.GetPosition(consumed));
 261971274                return header;
 1275            }
 1276            else
 01277            {
 01278                _duplexConnectionReader.AdvanceTo(buffer.Start, buffer.End);
 01279            }
 01280        }
 1281
 1282        static bool TryDecodeHeader(
 1283            ReadOnlySequence<byte> buffer,
 1284            out (FrameType FrameType, int FrameSize, ulong? StreamId) header,
 1285            out int consumed)
 262071286        {
 262071287            header = default;
 262071288            consumed = default;
 1289
 262071290            var decoder = new SliceDecoder(buffer, SliceEncoding.Slice2);
 1291
 1292            // Decode the frame type and frame size.
 262071293            if (!decoder.TryDecodeUInt8(out byte frameType) || !decoder.TryDecodeVarUInt62(out ulong frameSize))
 01294            {
 01295                return false;
 1296            }
 1297
 262071298            header.FrameType = frameType.AsFrameType();
 1299            try
 262011300            {
 262011301                header.FrameSize = checked((int)frameSize);
 262011302            }
 01303            catch (OverflowException exception)
 01304            {
 01305                throw new InvalidDataException("The frame size can't be larger than int.MaxValue.", exception);
 1306            }
 1307
 1308            // If it's a stream frame, try to decode the stream ID
 262011309            if (header.FrameType >= FrameType.Stream)
 247291310            {
 247291311                if (header.FrameSize == 0)
 21312                {
 21313                    throw new InvalidDataException("Invalid stream frame size.");
 1314                }
 1315
 247271316                consumed = (int)decoder.Consumed;
 247271317                if (!decoder.TryDecodeVarUInt62(out ulong streamId))
 01318                {
 01319                    return false;
 1320                }
 247271321                header.StreamId = streamId;
 247271322                header.FrameSize -= (int)decoder.Consumed - consumed;
 1323
 247271324                if (header.FrameSize < 0)
 21325                {
 21326                    throw new InvalidDataException("Invalid stream frame size.");
 1327                }
 247251328            }
 1329
 261971330            consumed = (int)decoder.Consumed;
 261971331            return true;
 261971332        }
 264651333    }
 1334
 1335    private async Task ReadFramesAsync(CancellationToken cancellationToken)
 12201336    {
 1337        try
 12201338        {
 261481339            while (true)
 261481340            {
 261481341                (FrameType Type, int Size, ulong? StreamId)? header = await ReadFrameHeaderAsync(cancellationToken)
 261481342                    .ConfigureAwait(false);
 1343
 252291344                if (header is null)
 2661345                {
 2661346                    lock (_mutex)
 2661347                    {
 2661348                        if (!_isClosed)
 01349                        {
 1350                            // Unexpected duplex connection shutdown.
 01351                            throw new IceRpcException(IceRpcError.ConnectionAborted);
 1352                        }
 2661353                    }
 1354                    // The peer has shut down the duplex connection.
 2661355                    break;
 1356                }
 1357
 249631358                await ReadFrameAsync(header.Value.Type, header.Value.Size, header.Value.StreamId, cancellationToken)
 249631359                    .ConfigureAwait(false);
 249281360            }
 1361
 2661362            if (IsServer)
 1351363            {
 1351364                Debug.Assert(_isClosed);
 1365
 1366                // The server-side of the duplex connection is only shutdown once the client-side is shutdown. When
 1367                // using TCP, this ensures that the server TCP connection won't end-up in the TIME_WAIT state on the
 1368                // server-side.
 1369
 1370                // DisposeAsync waits for the reads frames task to complete before disposing the writer.
 1351371                lock (_mutex)
 1351372                {
 1351373                    _duplexConnectionWriter.Shutdown();
 1374
 1375                    // Make sure that CloseAsync doesn't call Write on the writer if it's called shortly after the peer
 1376                    // shutdown its side of the connection (which triggers ReadFrameHeaderAsync to return null).
 1351377                    _writerIsShutdown = true;
 1351378                }
 1379
 1351380                await _duplexConnectionWriter.WriterTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 1331381            }
 2641382        }
 4661383        catch (OperationCanceledException)
 4661384        {
 1385            // Expected, DisposeAsync was called.
 4661386        }
 4521387        catch (IceRpcException exception)
 4521388        {
 4521389            TryClose(exception, "The connection was lost.", IceRpcError.ConnectionAborted);
 4521390            throw;
 1391        }
 381392        catch (InvalidDataException exception)
 381393        {
 381394            var rpcException = new IceRpcException(
 381395                IceRpcError.IceRpcError,
 381396                "The connection was aborted by a Slic protocol error.",
 381397                exception);
 381398            TryClose(rpcException, rpcException.Message, IceRpcError.IceRpcError);
 381399            throw rpcException;
 1400        }
 01401        catch (Exception exception)
 01402        {
 01403            Debug.Fail($"The read frames task completed due to an unhandled exception: {exception}");
 01404            TryClose(exception, "The connection was lost.", IceRpcError.ConnectionAborted);
 01405            throw;
 1406        }
 7301407    }
 1408
 1409    private async Task ReadStreamDataFrameAsync(
 1410        FrameType type,
 1411        int size,
 1412        ulong streamId,
 1413        CancellationToken cancellationToken)
 190611414    {
 190611415        bool endStream = type == FrameType.StreamLast;
 190611416        bool isRemote = streamId % 2 == (IsServer ? 0ul : 1ul);
 190611417        bool isBidirectional = streamId % 4 < 2;
 1418
 190611419        if (!isBidirectional && !isRemote)
 01420        {
 01421            throw new InvalidDataException(
 01422                "Received unexpected stream frame on local unidirectional stream.");
 1423        }
 190611424        else if (size == 0 && !endStream)
 21425        {
 21426            throw new InvalidDataException($"Received invalid {nameof(FrameType.Stream)} frame.");
 1427        }
 1428
 190591429        if (!_streams.TryGetValue(streamId, out SlicStream? stream) && isRemote && IsUnknownStream(streamId))
 40201430        {
 1431            // Create a new remote stream.
 1432
 40201433            if (size == 0)
 01434            {
 01435                throw new InvalidDataException("Received empty stream frame on new stream.");
 1436            }
 1437
 40201438            if (isBidirectional)
 12641439            {
 12641440                if (streamId > _lastRemoteBidirectionalStreamId + 4)
 01441                {
 01442                    throw new InvalidDataException("Invalid stream ID.");
 1443                }
 1444
 12641445                if (_bidirectionalStreamCount == _maxBidirectionalStreams)
 01446                {
 01447                    throw new IceRpcException(
 01448                        IceRpcError.IceRpcError,
 01449                        $"The maximum bidirectional stream count {_maxBidirectionalStreams} was reached.");
 1450                }
 12641451                Interlocked.Increment(ref _bidirectionalStreamCount);
 12641452            }
 1453            else
 27561454            {
 27561455                if (streamId > _lastRemoteUnidirectionalStreamId + 4)
 01456                {
 01457                    throw new InvalidDataException("Invalid stream ID.");
 1458                }
 1459
 27561460                if (_unidirectionalStreamCount == _maxUnidirectionalStreams)
 01461                {
 01462                    throw new IceRpcException(
 01463                        IceRpcError.IceRpcError,
 01464                        $"The maximum unidirectional stream count {_maxUnidirectionalStreams} was reached");
 1465                }
 27561466                Interlocked.Increment(ref _unidirectionalStreamCount);
 27561467            }
 1468
 1469            // The stream is registered with the connection and queued on the channel. The caller of AcceptStreamAsync
 1470            // is responsible for cleaning up the stream.
 40201471            stream = new SlicStream(this, isBidirectional, isRemote: true);
 1472
 1473            try
 40201474            {
 40201475                AddStream(streamId, stream);
 1476
 1477                try
 40201478                {
 40201479                    await _acceptStreamChannel.Writer.WriteAsync(
 40201480                        stream,
 40201481                        cancellationToken).ConfigureAwait(false);
 40201482                }
 01483                catch (ChannelClosedException exception)
 01484                {
 1485                    // The exception given to ChannelWriter.Complete(Exception? exception) is the InnerException.
 01486                    Debug.Assert(exception.InnerException is not null);
 01487                    throw ExceptionUtil.Throw(exception.InnerException);
 1488                }
 40201489            }
 01490            catch (IceRpcException)
 01491            {
 1492                // The two methods above throw IceRpcException if the connection has been closed (either by CloseAsync
 1493                // or because the close frame was received). We cleanup up the stream but don't throw to not abort the
 1494                // reading. The connection graceful closure still needs to read on the connection to figure out when the
 1495                // peer shuts down the duplex connection.
 01496                Debug.Assert(_isClosed);
 01497                stream.Input.Complete();
 01498                if (isBidirectional)
 01499                {
 01500                    stream.Output.Complete();
 01501                }
 01502            }
 40201503        }
 1504
 190591505        bool isDataConsumed = false;
 190591506        if (stream is not null)
 189131507        {
 1508            // Let the stream consume the stream frame data.
 189131509            isDataConsumed = await stream.ReceivedDataFrameAsync(
 189131510                size,
 189131511                endStream,
 189131512                cancellationToken).ConfigureAwait(false);
 189131513        }
 1514
 190591515        if (!isDataConsumed)
 1771516        {
 1517            // The stream (if any) didn't consume the data. Read and ignore the data using a helper pipe.
 1771518            var pipe = new Pipe(
 1771519                new PipeOptions(
 1771520                    pool: Pool,
 1771521                    pauseWriterThreshold: 0,
 1771522                    minimumSegmentSize: MinSegmentSize,
 1771523                    useSynchronizationContext: false));
 1524
 1771525            await _duplexConnectionReader.FillBufferWriterAsync(
 1771526                    pipe.Writer,
 1771527                    size,
 1771528                    cancellationToken).ConfigureAwait(false);
 1529
 1741530            pipe.Writer.Complete();
 1741531            pipe.Reader.Complete();
 1741532        }
 190561533    }
 1534
 1535    private bool TryClose(Exception exception, string closeMessage, IceRpcError? peerCloseError = null)
 22211536    {
 22211537        lock (_mutex)
 22211538        {
 22211539            if (_isClosed)
 8541540            {
 8541541                return false;
 1542            }
 13671543            _isClosed = true;
 13671544            _closedMessage = closeMessage;
 13671545            _peerCloseError = peerCloseError;
 13671546            if (_streamSemaphoreWaitCount == 0)
 13541547            {
 13541548                _streamSemaphoreWaitClosed.SetResult();
 13541549            }
 13671550        }
 1551
 1552        // Cancel pending CreateStreamAsync, AcceptStreamAsync and WriteStreamDataFrameAsync operations.
 13671553        _closedCts.Cancel();
 13671554        _acceptStreamChannel.Writer.TryComplete(exception);
 1555
 1556        // Close streams.
 65631557        foreach (SlicStream stream in _streams.Values)
 12311558        {
 12311559            stream.Close(exception);
 12311560        }
 1561
 13671562        return true;
 22211563    }
 1564
 1565    private void WriteFrame(FrameType frameType, ulong? streamId, EncodeAction? encode)
 83581566    {
 83581567        var encoder = new SliceEncoder(_duplexConnectionWriter, SliceEncoding.Slice2);
 83581568        encoder.EncodeFrameType(frameType);
 83581569        Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(4);
 83581570        int startPos = encoder.EncodedByteCount;
 83581571        if (streamId is not null)
 68641572        {
 68641573            encoder.EncodeVarUInt62(streamId.Value);
 68641574        }
 83581575        encode?.Invoke(ref encoder);
 83581576        SliceEncoder.EncodeVarUInt62((ulong)(encoder.EncodedByteCount - startPos), sizePlaceholder);
 83581577    }
 1578}

Methods/Properties

get_IsServer()
get_MinSegmentSize()
get_PeerInitialStreamWindowSize()
get_PeerMaxStreamFrameSize()
get_Pool()
get_InitialStreamWindowSize()
get_StreamWindowUpdateThreshold()
.ctor(IceRpc.Transports.IDuplexConnection,IceRpc.Transports.MultiplexedConnectionOptions,IceRpc.Transports.Slic.SlicTransportOptions,System.Boolean)
AcceptStreamAsync()
ConnectAsync(System.Threading.CancellationToken)
PerformConnectAsync()
DecodeInitialize()
DecodeInitializeAckOrVersion()
ReadFrameAsync()
CloseAsync()
CreateStreamAsync()
DisposeAsync()
PerformDisposeAsync()
SendPing()
SendReadPing()
SendWritePing()
FillBufferWriterAsync(System.Buffers.IBufferWriter`1<System.Byte>,System.Int32,System.Threading.CancellationToken)
ReleaseStream(IceRpc.Transports.Slic.Internal.SlicStream)
ThrowIfClosed()
WriteConnectionFrame(IceRpc.Transports.Slic.Internal.FrameType,ZeroC.Slice.EncodeAction)
WriteStreamFrame(IceRpc.Transports.Slic.Internal.SlicStream,IceRpc.Transports.Slic.Internal.FrameType,ZeroC.Slice.EncodeAction,System.Boolean)
WriteStreamDataFrameAsync()
EncodeStreamFrameHeader()
AddStream(System.UInt64,IceRpc.Transports.Slic.Internal.SlicStream)
DecodeParameters(System.Collections.Generic.IDictionary`2<IceRpc.Transports.Slic.Internal.ParameterKey,System.Collections.Generic.IList`1<System.Byte>>)
DecodeParamValue()
EncodeParameters()
EncodeParameter()
IsUnknownStream(System.UInt64)
ReadFrameAsync(IceRpc.Transports.Slic.Internal.FrameType,System.Int32,System.Nullable`1<System.UInt64>,System.Threading.CancellationToken)
ReadCloseFrameAsync()
ReadPingFrameAndWritePongFrameAsync()
ReadPongFrameAsync()
ReadStreamWindowUpdateFrameAsync()
ReadFrameBodyAsync()
ReadFrameHeaderAsync()
TryDecodeHeader()
ReadFramesAsync()
ReadStreamDataFrameAsync()
TryClose(System.Exception,System.String,System.Nullable`1<IceRpc.IceRpcError>)
WriteFrame(IceRpc.Transports.Slic.Internal.FrameType,System.Nullable`1<System.UInt64>,ZeroC.Slice.EncodeAction)