< Summary

Information
Class: IceRpc.Transports.Slic.Internal.SlicConnection
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicConnection.cs
Tag: 278_19370051549
Line coverage
89%
Covered lines: 916
Uncovered lines: 102
Coverable lines: 1018
Total lines: 1578
Line coverage: 89.9%
Branch coverage
90%
Covered branches: 297
Total branches: 330
Branch coverage: 90%
Method coverage
97%
Covered methods: 45
Total methods: 46
Method coverage: 97.8%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_IsServer()100%11100%
get_MinSegmentSize()100%11100%
get_PeerInitialStreamWindowSize()100%11100%
get_PeerMaxStreamFrameSize()100%11100%
get_Pool()100%11100%
get_InitialStreamWindowSize()100%11100%
get_StreamWindowUpdateThreshold()100%11100%
.ctor(...)100%44100%
AcceptStreamAsync()100%66100%
ConnectAsync(...)75%4.05485.71%
PerformConnectAsync()100%24.052495.5%
DecodeInitialize()75%4.02490%
DecodeInitializeAckOrVersion()66.66%6.01692.85%
ReadFrameAsync()100%88100%
CloseAsync()100%1414100%
CreateStreamAsync()100%141497.72%
DisposeAsync()100%22100%
PerformDisposeAsync()93.75%16.221690.47%
SendPing()100%1.04166.66%
SendReadPing()100%22100%
SendWritePing()0%620%
FillBufferWriterAsync(...)100%11100%
ReleaseStream(...)100%88100%
ThrowIfClosed()100%22100%
WriteConnectionFrame(...)100%22100%
WriteStreamFrame(...)87.5%88100%
WriteStreamDataFrameAsync()94.44%36.443693.02%
EncodeStreamFrameHeader()100%22100%
AddStream(...)83.33%6.04690%
DecodeParameters(...)75%34.52473.68%
DecodeParamValue()100%1.03170%
EncodeParameters()100%66100%
EncodeParameter()100%11100%
IsUnknownStream(...)100%1212100%
ReadFrameAsync(...)95.65%23.072394.87%
ReadCloseFrameAsync()100%1313100%
ReadPingFrameAndWritePongFrameAsync()100%11100%
ReadPongFrameAsync()66.66%6.09686.66%
ReadStreamWindowUpdateFrameAsync()100%22100%
ReadFrameBodyAsync()100%44100%
ReadFrameHeaderAsync()83.33%6.22681.81%
TryDecodeHeader()75%13.511278.12%
ReadFramesAsync()83.33%6.1686%
ReadStreamDataFrameAsync()77.77%91.043665.11%
TryClose(...)100%66100%
WriteFrame(...)100%44100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicConnection.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using IceRpc.Transports.Internal;
 5using System.Buffers;
 6using System.Collections.Concurrent;
 7using System.Diagnostics;
 8using System.IO.Pipelines;
 9using System.Security.Authentication;
 10using System.Threading.Channels;
 11using ZeroC.Slice;
 12
 13namespace IceRpc.Transports.Slic.Internal;
 14
 15/// <summary>The Slic connection implements an <see cref="IMultiplexedConnection" /> on top of a <see
 16/// cref="IDuplexConnection" />.</summary>
 17internal class SlicConnection : IMultiplexedConnection
 18{
 19    /// <summary>Gets a value indicating whether or not this is the server-side of the connection.</summary>
 3375620    internal bool IsServer { get; }
 21
 22    /// <summary>Gets the minimum size of the segment requested from <see cref="Pool" />.</summary>
 1090123    internal int MinSegmentSize { get; }
 24
 25    /// <summary>Gets the peer's initial stream window size. This property is set to the <see
 26    /// cref="ParameterKey.InitialStreamWindowSize"/> value carried by the <see cref="FrameType.Initialize" />
 27    /// frame.</summary>
 654028    internal int PeerInitialStreamWindowSize { get; private set; }
 29
 30    /// <summary>Gets the maximum size of stream frames accepted by the peer. This property is set to the <see
 31    /// cref="ParameterKey.MaxStreamFrameSize"/> value carried by the <see cref="FrameType.Initialize" />
 32    /// frame.</summary>
 1939733    internal int PeerMaxStreamFrameSize { get; private set; }
 34
 35    /// <summary>Gets the <see cref="MemoryPool{T}" /> used for obtaining memory buffers.</summary>
 1090136    internal MemoryPool<byte> Pool { get; }
 37
 38    /// <summary>Gets the initial stream window size.</summary>
 1915439    internal int InitialStreamWindowSize { get; }
 40
 41    /// <summary>Gets the window update threshold. When the window size is increased and this threshold reached, a <see
 42    /// cref="FrameType.StreamWindowUpdate" /> frame is sent.</summary>
 1258843    internal int StreamWindowUpdateThreshold => InitialStreamWindowSize / StreamWindowUpdateRatio;
 44
 45    // The ratio used to compute the StreamWindowUpdateThreshold. For now, the stream window update is sent when the
 46    // window size grows over InitialStreamWindowSize / StreamWindowUpdateRatio.
 47    private const int StreamWindowUpdateRatio = 2;
 48
 49    private readonly Channel<IMultiplexedStream> _acceptStreamChannel;
 50    private int _bidirectionalStreamCount;
 51    private SemaphoreSlim? _bidirectionalStreamSemaphore;
 52    private readonly CancellationToken _closedCancellationToken;
 137253    private readonly CancellationTokenSource _closedCts = new();
 54    private string? _closedMessage;
 55    private Task<TransportConnectionInformation>? _connectTask;
 137256    private readonly CancellationTokenSource _disposedCts = new();
 57    private Task? _disposeTask;
 58    private readonly SlicDuplexConnectionDecorator _duplexConnection;
 59    private readonly DuplexConnectionReader _duplexConnectionReader;
 60    private readonly SlicDuplexConnectionWriter _duplexConnectionWriter;
 61    private bool _isClosed;
 62    private ulong? _lastRemoteBidirectionalStreamId;
 63    private ulong? _lastRemoteUnidirectionalStreamId;
 64    private readonly TimeSpan _localIdleTimeout;
 65    private readonly int _maxBidirectionalStreams;
 66    private readonly int _maxStreamFrameSize;
 67    private readonly int _maxUnidirectionalStreams;
 68    // _mutex ensure the assignment of _lastRemoteXxx members and the addition of the stream to _streams is
 69    // an atomic operation.
 137270    private readonly object _mutex = new();
 71    private ulong _nextBidirectionalId;
 72    private ulong _nextUnidirectionalId;
 73    private IceRpcError? _peerCloseError;
 137274    private TimeSpan _peerIdleTimeout = Timeout.InfiniteTimeSpan;
 75    private int _pendingPongCount;
 76    private Task? _readFramesTask;
 77
 137278    private readonly ConcurrentDictionary<ulong, SlicStream> _streams = new();
 79    private int _streamSemaphoreWaitCount;
 137280    private readonly TaskCompletionSource _streamSemaphoreWaitClosed =
 137281        new(TaskCreationOptions.RunContinuationsAsynchronously);
 82
 83    private int _unidirectionalStreamCount;
 84    private SemaphoreSlim? _unidirectionalStreamSemaphore;
 85
 86    // This is only set for server connections to ensure that _duplexConnectionWriter.Write is not called after
 87    // _duplexConnectionWriter.Shutdown. This can occur if the client-side of the connection sends the close frame
 88    // followed by the shutdown of the duplex connection and if CloseAsync is called at the same time on the server
 89    // connection.
 90    private bool _writerIsShutdown;
 91
 92    public async ValueTask<IMultiplexedStream> AcceptStreamAsync(CancellationToken cancellationToken)
 463893    {
 463894        lock (_mutex)
 463895        {
 463896            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 97
 463698            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 299            {
 2100                throw new InvalidOperationException("Cannot accept stream before connecting the Slic connection.");
 101            }
 4634102            if (_isClosed)
 19103            {
 19104                throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 105            }
 4615106        }
 107
 108        try
 4615109        {
 4615110            return await _acceptStreamChannel.Reader.ReadAsync(cancellationToken).ConfigureAwait(false);
 111        }
 204112        catch (ChannelClosedException exception)
 204113        {
 204114            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 203115            Debug.Assert(exception.InnerException is not null);
 116            // The exception given to ChannelWriter.Complete(Exception? exception) is the InnerException.
 203117            throw ExceptionUtil.Throw(exception.InnerException);
 118        }
 3965119    }
 120
 121    public Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken)
 1334122    {
 1334123        lock (_mutex)
 1334124        {
 1334125            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 126
 1334127            if (_connectTask is not null)
 2128            {
 2129                throw new InvalidOperationException("Cannot connect twice a Slic connection.");
 130            }
 1332131            if (_isClosed)
 0132            {
 0133                throw new InvalidOperationException("Cannot connect a closed Slic connection.");
 134            }
 1332135            _connectTask = PerformConnectAsync();
 1332136        }
 1332137        return _connectTask;
 138
 139        async Task<TransportConnectionInformation> PerformConnectAsync()
 1332140        {
 1332141            await Task.Yield(); // Exit mutex lock
 142
 143            // Connect the duplex connection.
 144            TransportConnectionInformation transportConnectionInformation;
 1332145            TimeSpan peerIdleTimeout = TimeSpan.MaxValue;
 146
 147            try
 1332148            {
 1332149                transportConnectionInformation = await _duplexConnection.ConnectAsync(cancellationToken)
 1332150                    .ConfigureAwait(false);
 151
 152                // Initialize the Slic connection.
 1287153                if (IsServer)
 647154                {
 155                    // Read the Initialize frame.
 647156                    (ulong version, InitializeBody? initializeBody) = await ReadFrameAsync(
 647157                        DecodeInitialize,
 647158                        cancellationToken).ConfigureAwait(false);
 159
 635160                    if (initializeBody is null)
 4161                    {
 162                        // Unsupported version, try to negotiate another version by sending a Version frame with the
 163                        // Slic versions supported by this server.
 4164                        ulong[] supportedVersions = new ulong[] { SlicDefinitions.V1 };
 165
 4166                        WriteConnectionFrame(FrameType.Version, new VersionBody(supportedVersions).Encode);
 167
 4168                        (version, initializeBody) = await ReadFrameAsync(
 4169                            (frameType, buffer) =>
 4170                            {
 4171                                if (frameType is null)
 2172                                {
 4173                                    // The client shut down the connection because it doesn't support any of the
 4174                                    // server's supported Slic versions.
 2175                                    throw new IceRpcException(
 2176                                        IceRpcError.ConnectionRefused,
 2177                                        $"The connection was refused because the client Slic version {version} is not su
 4178                                }
 4179                                else
 2180                                {
 2181                                    return DecodeInitialize(frameType, buffer);
 4182                                }
 2183                            },
 4184                            cancellationToken).ConfigureAwait(false);
 2185                    }
 186
 633187                    Debug.Assert(initializeBody is not null);
 188
 633189                    DecodeParameters(initializeBody.Value.Parameters);
 190
 191                    // Write back an InitializeAck frame.
 633192                    WriteConnectionFrame(FrameType.InitializeAck, new InitializeAckBody(EncodeParameters()).Encode);
 633193                }
 194                else
 640195                {
 196                    // Write the Initialize frame.
 640197                    WriteConnectionFrame(
 640198                        FrameType.Initialize,
 640199                        (ref SliceEncoder encoder) =>
 640200                        {
 640201                            encoder.EncodeVarUInt62(SlicDefinitions.V1);
 640202                            new InitializeBody(EncodeParameters()).Encode(ref encoder);
 1280203                        });
 204
 205                    // Read and decode the InitializeAck or Version frame.
 640206                    (InitializeAckBody? initializeAckBody, VersionBody? versionBody) = await ReadFrameAsync(
 640207                        DecodeInitializeAckOrVersion,
 640208                        cancellationToken).ConfigureAwait(false);
 209
 593210                    Debug.Assert(initializeAckBody is not null || versionBody is not null);
 211
 593212                    if (initializeAckBody is not null)
 589213                    {
 589214                        DecodeParameters(initializeAckBody.Value.Parameters);
 589215                    }
 216
 593217                    if (versionBody is not null)
 4218                    {
 4219                        if (versionBody.Value.Versions.Contains(SlicDefinitions.V1))
 2220                        {
 2221                            throw new InvalidDataException(
 2222                                "The server supported versions include the version initially requested.");
 223                        }
 224                        else
 2225                        {
 226                            // We only support V1 and the peer rejected V1.
 2227                            throw new IceRpcException(
 2228                                IceRpcError.ConnectionRefused,
 2229                                $"The connection was refused because the server only supports Slic version(s) {string.Jo
 230                        }
 231                    }
 589232                }
 1222233            }
 12234            catch (InvalidDataException exception)
 12235            {
 12236                throw new IceRpcException(
 12237                    IceRpcError.IceRpcError,
 12238                    "The connection was aborted by a Slic protocol error.",
 12239                    exception);
 240            }
 50241            catch (OperationCanceledException)
 50242            {
 50243                throw;
 244            }
 8245            catch (AuthenticationException)
 8246            {
 8247                throw;
 248            }
 40249            catch (IceRpcException)
 40250            {
 40251                throw;
 252            }
 0253            catch (Exception exception)
 0254            {
 0255                Debug.Fail($"ConnectAsync failed with an unexpected exception: {exception}");
 0256                throw;
 257            }
 258
 259            // Enable the idle timeout checks after the connection establishment. The Ping frames sent by the keep alive
 260            // check are not expected until the Slic connection initialization completes. The idle timeout check uses
 261            // the smallest idle timeout.
 1222262            TimeSpan idleTimeout = _peerIdleTimeout == Timeout.InfiniteTimeSpan ? _localIdleTimeout :
 1222263                (_peerIdleTimeout < _localIdleTimeout ? _peerIdleTimeout : _localIdleTimeout);
 264
 1222265            if (idleTimeout != Timeout.InfiniteTimeSpan)
 1218266            {
 1218267                _duplexConnection.Enable(idleTimeout);
 1218268            }
 269
 1222270            _readFramesTask = ReadFramesAsync(_disposedCts.Token);
 271
 1222272            return transportConnectionInformation;
 1222273        }
 274
 275        static (ulong, InitializeBody?) DecodeInitialize(FrameType? frameType, ReadOnlySequence<byte> buffer)
 639276        {
 639277            if (frameType != FrameType.Initialize)
 0278            {
 0279                throw new InvalidDataException($"Received unexpected {frameType} frame.");
 280            }
 281
 639282            return SliceEncoding.Slice2.DecodeBuffer<(ulong, InitializeBody?)>(
 639283                buffer,
 639284                (ref SliceDecoder decoder) =>
 639285                {
 639286                    ulong version = decoder.DecodeVarUInt62();
 637287                    if (version == SlicDefinitions.V1)
 633288                    {
 633289                        return (version, new InitializeBody(ref decoder));
 639290                    }
 639291                    else
 4292                    {
 4293                        decoder.Skip((int)(buffer.Length - decoder.Consumed));
 4294                        return (version, null);
 639295                    }
 1276296                });
 637297        }
 298
 299        static (InitializeAckBody?, VersionBody?) DecodeInitializeAckOrVersion(
 300            FrameType? frameType,
 301            ReadOnlySequence<byte> buffer) =>
 597302            frameType switch
 597303            {
 591304                FrameType.InitializeAck => (
 591305                    SliceEncoding.Slice2.DecodeBuffer(
 591306                        buffer,
 591307                        (ref SliceDecoder decoder) => new InitializeAckBody(ref decoder)),
 591308                    null),
 6309                FrameType.Version => (
 6310                    null,
 6311                    SliceEncoding.Slice2.DecodeBuffer(
 6312                        buffer,
 12313                        (ref SliceDecoder decoder) => new VersionBody(ref decoder))),
 0314                _ => throw new InvalidDataException($"Received unexpected Slic frame: '{frameType}'."),
 597315            };
 316
 317        async ValueTask<T> ReadFrameAsync<T>(
 318            Func<FrameType?, ReadOnlySequence<byte>, T> decodeFunc,
 319            CancellationToken cancellationToken)
 1291320        {
 1291321            (FrameType FrameType, int FrameSize, ulong?)? header =
 1291322                await ReadFrameHeaderAsync(cancellationToken).ConfigureAwait(false);
 323
 324            ReadOnlySequence<byte> buffer;
 1238325            if (header is null || header.Value.FrameSize == 0)
 8326            {
 8327                buffer = ReadOnlySequence<byte>.Empty;
 8328            }
 329            else
 1230330            {
 1230331                buffer = await _duplexConnectionReader.ReadAtLeastAsync(
 1230332                    header.Value.FrameSize,
 1230333                    cancellationToken).ConfigureAwait(false);
 1230334                if (buffer.Length > header.Value.FrameSize)
 9335                {
 9336                    buffer = buffer.Slice(0, header.Value.FrameSize);
 9337                }
 1230338            }
 339
 1238340            T decodedFrame = decodeFunc(header?.FrameType, buffer);
 1230341            _duplexConnectionReader.AdvanceTo(buffer.End);
 1230342            return decodedFrame;
 1230343        }
 1332344    }
 345
 346    public async Task CloseAsync(MultiplexedConnectionCloseError closeError, CancellationToken cancellationToken)
 201347    {
 201348        lock (_mutex)
 201349        {
 201350            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 351
 201352            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 2353            {
 2354                throw new InvalidOperationException("Cannot close a Slic connection before connecting it.");
 355            }
 199356        }
 357
 199358        bool waitForWriterShutdown = false;
 199359        if (TryClose(new IceRpcException(IceRpcError.OperationAborted), "The connection was closed."))
 162360        {
 162361            lock (_mutex)
 162362            {
 363                // The duplex connection writer of a server connection might already be shutdown
 364                // (_writerIsShutdown=true) if the client-side sent the Close frame and shut down the duplex connection.
 365                // This doesn't apply to the client-side since the server-side doesn't shutdown the duplex connection
 366                // writer after sending the Close frame.
 162367                if (!IsServer || !_writerIsShutdown)
 161368                {
 161369                    WriteFrame(FrameType.Close, streamId: null, new CloseBody((ulong)closeError).Encode);
 161370                    if (IsServer)
 80371                    {
 80372                        _duplexConnectionWriter.Flush();
 80373                    }
 374                    else
 81375                    {
 376                        // The sending of the client-side Close frame is followed by the shutdown of the duplex
 377                        // connection. For TCP, it's important to always shutdown the connection on the client-side
 378                        // first to avoid TIME_WAIT states on the server-side.
 81379                        _duplexConnectionWriter.Shutdown();
 81380                        waitForWriterShutdown = true;
 81381                    }
 161382                }
 162383            }
 162384        }
 385
 199386        if (waitForWriterShutdown)
 81387        {
 81388            await _duplexConnectionWriter.WriterTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 81389        }
 390
 391        // Now, wait for the peer to close the write side of the connection, which will terminate the read frames task.
 199392        Debug.Assert(_readFramesTask is not null);
 199393        await _readFramesTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 193394    }
 395
 396    public async ValueTask<IMultiplexedStream> CreateStreamAsync(
 397        bool bidirectional,
 398        CancellationToken cancellationToken)
 4095399    {
 4095400        lock (_mutex)
 4095401        {
 4095402            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 403
 4089404            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 4405            {
 4406                throw new InvalidOperationException("Cannot create stream before connecting the Slic connection.");
 407            }
 4085408            if (_isClosed)
 15409            {
 15410                throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 411            }
 412
 4070413            ++_streamSemaphoreWaitCount;
 4070414        }
 415
 416        try
 4070417        {
 4070418            using var createStreamCts = CancellationTokenSource.CreateLinkedTokenSource(
 4070419                _closedCancellationToken,
 4070420                cancellationToken);
 421
 4070422            SemaphoreSlim? streamCountSemaphore = bidirectional ?
 4070423                _bidirectionalStreamSemaphore :
 4070424                _unidirectionalStreamSemaphore;
 425
 4070426            if (streamCountSemaphore is null)
 2427            {
 428                // The stream semaphore is null if the peer's max streams configuration is 0. In this case, we let
 429                // CreateStreamAsync hang indefinitely until the connection is closed.
 2430                await Task.Delay(-1, createStreamCts.Token).ConfigureAwait(false);
 0431            }
 432            else
 4068433            {
 4068434                await streamCountSemaphore.WaitAsync(createStreamCts.Token).ConfigureAwait(false);
 4051435            }
 436
 4051437            return new SlicStream(this, bidirectional, isRemote: false);
 438        }
 19439        catch (OperationCanceledException)
 19440        {
 19441            cancellationToken.ThrowIfCancellationRequested();
 13442            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 12443            Debug.Assert(_isClosed);
 12444            throw new IceRpcException(_peerCloseError ?? IceRpcError.OperationAborted, _closedMessage);
 445        }
 446        finally
 4070447        {
 4070448            lock (_mutex)
 4070449            {
 4070450                --_streamSemaphoreWaitCount;
 4070451                if (_isClosed && _streamSemaphoreWaitCount == 0)
 13452                {
 13453                    _streamSemaphoreWaitClosed.SetResult();
 13454                }
 4070455            }
 4070456        }
 4051457    }
 458
 459    public ValueTask DisposeAsync()
 1897460    {
 1897461        lock (_mutex)
 1897462        {
 1897463            _disposeTask ??= PerformDisposeAsync();
 1897464        }
 1897465        return new(_disposeTask);
 466
 467        async Task PerformDisposeAsync()
 1370468        {
 469            // Make sure we execute the code below without holding the mutex lock.
 1370470            await Task.Yield();
 1370471            TryClose(new IceRpcException(IceRpcError.OperationAborted), "The connection was disposed.");
 472
 1370473            _disposedCts.Cancel();
 474
 475            try
 1370476            {
 1370477                await Task.WhenAll(
 1370478                    _connectTask ?? Task.CompletedTask,
 1370479                    _readFramesTask ?? Task.CompletedTask,
 1370480                    _streamSemaphoreWaitClosed.Task).ConfigureAwait(false);
 772481            }
 598482            catch
 598483            {
 484                // Expected if any of these tasks failed or was canceled. Each task takes care of handling unexpected
 485                // exceptions so there's no need to handle them here.
 598486            }
 487
 488            // Clean-up the streams that might still be queued on the channel.
 1422489            while (_acceptStreamChannel.Reader.TryRead(out IMultiplexedStream? stream))
 52490            {
 52491                if (stream.IsBidirectional)
 11492                {
 11493                    stream.Output.Complete();
 11494                    stream.Input.Complete();
 11495                }
 41496                else if (stream.IsRemote)
 41497                {
 41498                    stream.Input.Complete();
 41499                }
 500                else
 0501                {
 0502                    stream.Output.Complete();
 0503                }
 52504            }
 505
 506            try
 1370507            {
 508                // Prevents unobserved task exceptions.
 1370509                await _acceptStreamChannel.Reader.Completion.ConfigureAwait(false);
 0510            }
 1370511            catch
 1370512            {
 1370513            }
 514
 1370515            await _duplexConnectionWriter.DisposeAsync().ConfigureAwait(false);
 1370516            _duplexConnectionReader.Dispose();
 1370517            _duplexConnection.Dispose();
 518
 1370519            _disposedCts.Dispose();
 1370520            _bidirectionalStreamSemaphore?.Dispose();
 1370521            _unidirectionalStreamSemaphore?.Dispose();
 1370522            _closedCts.Dispose();
 1370523        }
 1897524    }
 525
 1372526    internal SlicConnection(
 1372527        IDuplexConnection duplexConnection,
 1372528        MultiplexedConnectionOptions options,
 1372529        SlicTransportOptions slicOptions,
 1372530        bool isServer)
 1372531    {
 1372532        IsServer = isServer;
 533
 1372534        Pool = options.Pool;
 1372535        MinSegmentSize = options.MinSegmentSize;
 1372536        _maxBidirectionalStreams = options.MaxBidirectionalStreams;
 1372537        _maxUnidirectionalStreams = options.MaxUnidirectionalStreams;
 538
 1372539        InitialStreamWindowSize = slicOptions.InitialStreamWindowSize;
 1372540        _localIdleTimeout = slicOptions.IdleTimeout;
 1372541        _maxStreamFrameSize = slicOptions.MaxStreamFrameSize;
 542
 1372543        _acceptStreamChannel = Channel.CreateUnbounded<IMultiplexedStream>(new UnboundedChannelOptions
 1372544        {
 1372545            SingleReader = true,
 1372546            SingleWriter = true
 1372547        });
 548
 1372549        _closedCancellationToken = _closedCts.Token;
 550
 551        // Only the client-side sends pings to keep the connection alive when idle timeout (set later) is not infinite.
 1372552        _duplexConnection = IsServer ?
 1372553            new SlicDuplexConnectionDecorator(duplexConnection) :
 1372554            new SlicDuplexConnectionDecorator(duplexConnection, SendReadPing, SendWritePing);
 555
 1372556        _duplexConnectionReader = new DuplexConnectionReader(_duplexConnection, options.Pool, options.MinSegmentSize);
 1372557        _duplexConnectionWriter = new SlicDuplexConnectionWriter(
 1372558            _duplexConnection,
 1372559            options.Pool,
 1372560            options.MinSegmentSize);
 561
 562        // We use the same stream ID numbering scheme as Quic.
 1372563        if (IsServer)
 677564        {
 677565            _nextBidirectionalId = 1;
 677566            _nextUnidirectionalId = 3;
 677567        }
 568        else
 695569        {
 695570            _nextBidirectionalId = 0;
 695571            _nextUnidirectionalId = 2;
 695572        }
 573
 574        void SendPing(long payload)
 27575        {
 576            try
 27577            {
 27578                WriteConnectionFrame(FrameType.Ping, new PingBody(payload).Encode);
 26579            }
 1580            catch (IceRpcException)
 1581            {
 582                // Expected if the connection is closed.
 1583            }
 0584            catch (Exception exception)
 0585            {
 0586                Debug.Fail($"The sending of a Ping frame failed with an unexpected exception: {exception}");
 0587                throw;
 588            }
 27589        }
 590
 591        void SendReadPing()
 27592        {
 593            // This local function is no-op if there is already a pending Pong.
 27594            if (Interlocked.CompareExchange(ref _pendingPongCount, 1, 0) == 0)
 27595            {
 27596                SendPing(1L);
 27597            }
 27598        }
 599
 600        void SendWritePing()
 0601        {
 602            // _pendingPongCount can be <= 0 if an unexpected pong is received. If it's the case, the connection is
 603            // being torn down and there's no point in sending a ping frame.
 0604            if (Interlocked.Increment(ref _pendingPongCount) > 0)
 0605            {
 0606                SendPing(0L);
 0607            }
 0608        }
 1372609    }
 610
 611    /// <summary>Fills the given writer with stream data received on the connection.</summary>
 612    /// <param name="bufferWriter">The destination buffer writer.</param>
 613    /// <param name="byteCount">The amount of stream data to read.</param>
 614    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 615    internal ValueTask FillBufferWriterAsync(
 616        IBufferWriter<byte> bufferWriter,
 617        int byteCount,
 618        CancellationToken cancellationToken) =>
 18883619        _duplexConnectionReader.FillBufferWriterAsync(bufferWriter, byteCount, cancellationToken);
 620
 621    /// <summary>Releases a stream from the connection. The connection stream count is decremented and if this is a
 622    /// client allow a new stream to be started.</summary>
 623    /// <param name="stream">The released stream.</param>
 624    internal void ReleaseStream(SlicStream stream)
 8037625    {
 8037626        Debug.Assert(stream.IsStarted);
 627
 8037628        _streams.Remove(stream.Id, out SlicStream? _);
 629
 8037630        if (stream.IsRemote)
 4017631        {
 4017632            if (stream.IsBidirectional)
 1263633            {
 1263634                Interlocked.Decrement(ref _bidirectionalStreamCount);
 1263635            }
 636            else
 2754637            {
 2754638                Interlocked.Decrement(ref _unidirectionalStreamCount);
 2754639            }
 4017640        }
 4020641        else if (!_isClosed)
 3333642        {
 3333643            if (stream.IsBidirectional)
 1141644            {
 1141645                _bidirectionalStreamSemaphore!.Release();
 1141646            }
 647            else
 2192648            {
 2192649                _unidirectionalStreamSemaphore!.Release();
 2192650            }
 3333651        }
 8037652    }
 653
 654    /// <summary>Throws the connection closure exception if the connection is already closed.</summary>
 655    internal void ThrowIfClosed()
 15638656    {
 15638657        lock (_mutex)
 15638658        {
 15638659            if (_isClosed)
 17660            {
 17661                throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 662            }
 15621663        }
 15621664    }
 665
 666    /// <summary>Writes a connection frame.</summary>
 667    /// <param name="frameType">The frame type.</param>
 668    /// <param name="encode">The action to encode the frame.</param>
 669    internal void WriteConnectionFrame(FrameType frameType, EncodeAction? encode)
 1330670    {
 1330671        Debug.Assert(frameType < FrameType.Stream);
 672
 1330673        lock (_mutex)
 1330674        {
 1330675            if (_isClosed)
 1676            {
 1677                throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 678            }
 1329679            WriteFrame(frameType, streamId: null, encode);
 1329680            _duplexConnectionWriter.Flush();
 1329681        }
 1329682    }
 683
 684    /// <summary>Writes a stream frame.</summary>
 685    /// <param name="stream">The stream to write the frame for.</param>
 686    /// <param name="frameType">The frame type.</param>
 687    /// <param name="encode">The action to encode the frame.</param>
 688    /// <param name="writeReadsClosedFrame"><see langword="true" /> if a <see cref="FrameType.StreamReadsClosed" />
 689    /// frame should be written after the stream frame.</param>
 690    /// <remarks>This method is called by streams and might be called on a closed connection. The connection might
 691    /// also be closed concurrently while it's in progress.</remarks>
 692    internal void WriteStreamFrame(
 693        SlicStream stream,
 694        FrameType frameType,
 695        EncodeAction? encode,
 696        bool writeReadsClosedFrame)
 5930697    {
 698        // Ensure that this method is called for any FrameType.StreamXxx frame type except FrameType.Stream.
 5930699        Debug.Assert(frameType >= FrameType.StreamLast && stream.IsStarted);
 700
 5930701        lock (_mutex)
 5930702        {
 5930703            if (_isClosed)
 2704            {
 2705                return;
 706            }
 707
 5928708            WriteFrame(frameType, stream.Id, encode);
 5928709            if (writeReadsClosedFrame)
 156710            {
 156711                WriteFrame(FrameType.StreamReadsClosed, stream.Id, encode: null);
 156712            }
 5928713            if (frameType == FrameType.StreamLast)
 1112714            {
 715                // Notify the stream that the last stream frame is considered sent at this point. This will close
 716                // writes on the stream and allow the stream to be released if reads are also closed.
 1112717                stream.WroteLastStreamFrame();
 1112718            }
 5928719            _duplexConnectionWriter.Flush();
 5928720        }
 5930721    }
 722
 723    /// <summary>Writes a stream data frame.</summary>
 724    /// <param name="stream">The stream to write the frame for.</param>
 725    /// <param name="source1">The first stream frame data source.</param>
 726    /// <param name="source2">The second stream frame data source.</param>
 727    /// <param name="endStream"><see langword="true" /> to write a <see cref="FrameType.StreamLast" /> frame and
 728    /// <see langword="false" /> to write a <see cref="FrameType.Stream" /> frame.</param>
 729    /// <param name="writeReadsClosedFrame"><see langword="true" /> if a <see cref="FrameType.StreamReadsClosed" />
 730    /// frame should be written after the stream frame.</param>
 731    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 732    /// <remarks>This method is called by streams and might be called on a closed connection. The connection might
 733    /// also be closed concurrently while it's in progress.</remarks>
 734    internal async ValueTask<FlushResult> WriteStreamDataFrameAsync(
 735        SlicStream stream,
 736        ReadOnlySequence<byte> source1,
 737        ReadOnlySequence<byte> source2,
 738        bool endStream,
 739        bool writeReadsClosedFrame,
 740        CancellationToken cancellationToken)
 15595741    {
 15595742        Debug.Assert(!source1.IsEmpty || endStream);
 743
 15595744        if (_connectTask is null)
 0745        {
 0746            throw new InvalidOperationException("Cannot send a stream frame before calling ConnectAsync.");
 747        }
 748
 15595749        using var writeCts = CancellationTokenSource.CreateLinkedTokenSource(
 15595750            _closedCancellationToken,
 15595751            cancellationToken);
 752
 753        try
 15595754        {
 755            do
 20199756            {
 757                // Next, ensure send credit is available. If not, this will block until the receiver allows sending
 758                // additional data.
 20199759                int sendCredit = 0;
 20199760                if (!source1.IsEmpty || !source2.IsEmpty)
 20189761                {
 20189762                    sendCredit = await stream.AcquireSendCreditAsync(writeCts.Token).ConfigureAwait(false);
 18161763                    Debug.Assert(sendCredit > 0);
 18161764                }
 765
 766                // Gather data from source1 or source2 up to sendCredit bytes or the peer maximum stream frame size.
 18171767                int sendMaxSize = Math.Min(sendCredit, PeerMaxStreamFrameSize);
 768                ReadOnlySequence<byte> sendSource1;
 769                ReadOnlySequence<byte> sendSource2;
 18171770                if (!source1.IsEmpty)
 16162771                {
 16162772                    int length = Math.Min((int)source1.Length, sendMaxSize);
 16162773                    sendSource1 = source1.Slice(0, length);
 16162774                    source1 = source1.Slice(length);
 16162775                }
 776                else
 2009777                {
 2009778                    sendSource1 = ReadOnlySequence<byte>.Empty;
 2009779                }
 780
 18171781                if (source1.IsEmpty && !source2.IsEmpty)
 4072782                {
 4072783                    int length = Math.Min((int)source2.Length, sendMaxSize - (int)sendSource1.Length);
 4072784                    sendSource2 = source2.Slice(0, length);
 4072785                    source2 = source2.Slice(length);
 4072786                }
 787                else
 14099788                {
 14099789                    sendSource2 = ReadOnlySequence<byte>.Empty;
 14099790                }
 791
 792                // If there's no data left to send and endStream is true, it's the last stream frame.
 18171793                bool lastStreamFrame = endStream && source1.IsEmpty && source2.IsEmpty;
 794
 18171795                lock (_mutex)
 18171796                {
 18171797                    if (_isClosed)
 0798                    {
 0799                        throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 800                    }
 801
 18171802                    if (!stream.IsStarted)
 4020803                    {
 4020804                        if (stream.IsBidirectional)
 1260805                        {
 1260806                            AddStream(_nextBidirectionalId, stream);
 1260807                            _nextBidirectionalId += 4;
 1260808                        }
 809                        else
 2760810                        {
 2760811                            AddStream(_nextUnidirectionalId, stream);
 2760812                            _nextUnidirectionalId += 4;
 2760813                        }
 4020814                    }
 815
 816                    // Notify the stream that we're consuming sendSize credit. It's important to call this before
 817                    // sending the stream frame to avoid race conditions where the StreamWindowUpdate frame could be
 818                    // received before the send credit was updated.
 18171819                    if (sendCredit > 0)
 18161820                    {
 18161821                        stream.ConsumedSendCredit((int)(sendSource1.Length + sendSource2.Length));
 18161822                    }
 823
 18171824                    EncodeStreamFrameHeader(stream.Id, sendSource1.Length + sendSource2.Length, lastStreamFrame);
 825
 18171826                    if (lastStreamFrame)
 1465827                    {
 828                        // Notify the stream that the last stream frame is considered sent at this point. This will
 829                        // complete writes on the stream and allow the stream to be released if reads are also
 830                        // completed.
 1465831                        stream.WroteLastStreamFrame();
 1465832                    }
 833
 834                    // Write and flush the stream frame.
 18171835                    if (!sendSource1.IsEmpty)
 16162836                    {
 16162837                        _duplexConnectionWriter.Write(sendSource1);
 16162838                    }
 18171839                    if (!sendSource2.IsEmpty)
 4072840                    {
 4072841                        _duplexConnectionWriter.Write(sendSource2);
 4072842                    }
 843
 18171844                    if (writeReadsClosedFrame)
 696845                    {
 696846                        WriteFrame(FrameType.StreamReadsClosed, stream.Id, encode: null);
 696847                    }
 18171848                    _duplexConnectionWriter.Flush();
 18171849                }
 18171850            }
 18171851            while (!source1.IsEmpty || !source2.IsEmpty); // Loop until there's no data left to send.
 13567852        }
 2028853        catch (OperationCanceledException)
 2028854        {
 2028855            cancellationToken.ThrowIfCancellationRequested();
 856
 0857            Debug.Assert(_isClosed);
 0858            throw new IceRpcException(_peerCloseError ?? IceRpcError.OperationAborted, _closedMessage);
 859        }
 860
 13567861        return new FlushResult(isCanceled: false, isCompleted: false);
 862
 863        void EncodeStreamFrameHeader(ulong streamId, long size, bool lastStreamFrame)
 18171864        {
 18171865            var encoder = new SliceEncoder(_duplexConnectionWriter, SliceEncoding.Slice2);
 18171866            encoder.EncodeFrameType(!lastStreamFrame ? FrameType.Stream : FrameType.StreamLast);
 18171867            Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(4);
 18171868            int startPos = encoder.EncodedByteCount;
 18171869            encoder.EncodeVarUInt62(streamId);
 18171870            SliceEncoder.EncodeVarUInt62((ulong)(encoder.EncodedByteCount - startPos + size), sizePlaceholder);
 18171871        }
 13567872    }
 873
 874    private void AddStream(ulong id, SlicStream stream)
 8037875    {
 8037876        lock (_mutex)
 8037877        {
 8037878            if (_isClosed)
 0879            {
 0880                throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 881            }
 882
 8037883            _streams[id] = stream;
 884
 885            // Assign the stream ID within the mutex to ensure that the addition of the stream to the connection and the
 886            // stream ID assignment are atomic.
 8037887            stream.Id = id;
 888
 889            // Keep track of the last assigned stream ID. This is used to figure out if the stream is known or unknown.
 8037890            if (stream.IsRemote)
 4017891            {
 4017892                if (stream.IsBidirectional)
 1263893                {
 1263894                    _lastRemoteBidirectionalStreamId = id;
 1263895                }
 896                else
 2754897                {
 2754898                    _lastRemoteUnidirectionalStreamId = id;
 2754899                }
 4017900            }
 8037901        }
 8037902    }
 903
 904    private void DecodeParameters(IDictionary<ParameterKey, IList<byte>> parameters)
 1222905    {
 1222906        int? maxStreamFrameSize = null;
 1222907        int? peerInitialStreamWindowSize = null;
 15470908        foreach ((ParameterKey key, IList<byte> buffer) in parameters)
 5902909        {
 5902910            switch (key)
 911            {
 912                case ParameterKey.MaxBidirectionalStreams:
 1094913                {
 1094914                    int value = DecodeParamValue(buffer);
 1094915                    if (value > 0)
 1094916                    {
 1094917                        _bidirectionalStreamSemaphore = new SemaphoreSlim(value, value);
 1094918                    }
 1094919                    break;
 920                }
 921                case ParameterKey.MaxUnidirectionalStreams:
 1184922                {
 1184923                    int value = DecodeParamValue(buffer);
 1184924                    if (value > 0)
 1184925                    {
 1184926                        _unidirectionalStreamSemaphore = new SemaphoreSlim(value, value);
 1184927                    }
 1184928                    break;
 929                }
 930                case ParameterKey.IdleTimeout:
 1180931                {
 1180932                    _peerIdleTimeout = TimeSpan.FromMilliseconds(DecodeParamValue(buffer));
 1180933                    if (_peerIdleTimeout == TimeSpan.Zero)
 0934                    {
 0935                        throw new InvalidDataException(
 0936                            "The IdleTimeout Slic connection parameter is invalid, it must be greater than 0 s.");
 937                    }
 1180938                    break;
 939                }
 940                case ParameterKey.MaxStreamFrameSize:
 1222941                {
 1222942                    maxStreamFrameSize = DecodeParamValue(buffer);
 1222943                    if (maxStreamFrameSize < 1024)
 0944                    {
 0945                        throw new InvalidDataException(
 0946                            "The MaxStreamFrameSize connection parameter is invalid, it must be greater than 1KB.");
 947                    }
 1222948                    break;
 949                }
 950                case ParameterKey.InitialStreamWindowSize:
 1222951                {
 1222952                    peerInitialStreamWindowSize = DecodeParamValue(buffer);
 1222953                    if (peerInitialStreamWindowSize < 1024)
 0954                    {
 0955                        throw new InvalidDataException(
 0956                            "The InitialStreamWindowSize connection parameter is invalid, it must be greater than 1KB.")
 957                    }
 1222958                    break;
 959                }
 960                // Ignore unsupported parameter.
 961            }
 5902962        }
 963
 1222964        if (maxStreamFrameSize is null)
 0965        {
 0966            throw new InvalidDataException(
 0967                "The peer didn't send the required MaxStreamFrameSize connection parameter.");
 968        }
 969        else
 1222970        {
 1222971            PeerMaxStreamFrameSize = maxStreamFrameSize.Value;
 1222972        }
 973
 1222974        if (peerInitialStreamWindowSize is null)
 0975        {
 0976            throw new InvalidDataException(
 0977                "The peer didn't send the required InitialStreamWindowSize connection parameter.");
 978        }
 979        else
 1222980        {
 1222981            PeerInitialStreamWindowSize = peerInitialStreamWindowSize.Value;
 1222982        }
 983
 984        // all parameter values are currently integers in the range 0..Int32Max encoded as varuint62.
 985        static int DecodeParamValue(IList<byte> buffer)
 5902986        {
 987            // The IList<byte> decoded by the IceRPC + Slice integration is backed by an array
 5902988            ulong value = SliceEncoding.Slice2.DecodeBuffer(
 5902989                new ReadOnlySequence<byte>((byte[])buffer),
 11804990                (ref SliceDecoder decoder) => decoder.DecodeVarUInt62());
 991            try
 5902992            {
 5902993                return checked((int)value);
 994            }
 0995            catch (OverflowException exception)
 0996            {
 0997                throw new InvalidDataException("The value is out of the varuint32 accepted range.", exception);
 998            }
 5902999        }
 12221000    }
 1001
 1002    private Dictionary<ParameterKey, IList<byte>> EncodeParameters()
 12731003    {
 12731004        var parameters = new List<KeyValuePair<ParameterKey, IList<byte>>>
 12731005        {
 12731006            // Required parameters.
 12731007            EncodeParameter(ParameterKey.MaxStreamFrameSize, (ulong)_maxStreamFrameSize),
 12731008            EncodeParameter(ParameterKey.InitialStreamWindowSize, (ulong)InitialStreamWindowSize)
 12731009        };
 1010
 1011        // Optional parameters.
 12731012        if (_localIdleTimeout != Timeout.InfiniteTimeSpan)
 12691013        {
 12691014            parameters.Add(EncodeParameter(ParameterKey.IdleTimeout, (ulong)_localIdleTimeout.TotalMilliseconds));
 12691015        }
 12731016        if (_maxBidirectionalStreams > 0)
 11671017        {
 11671018            parameters.Add(EncodeParameter(ParameterKey.MaxBidirectionalStreams, (ulong)_maxBidirectionalStreams));
 11671019        }
 12731020        if (_maxUnidirectionalStreams > 0)
 12731021        {
 12731022            parameters.Add(EncodeParameter(ParameterKey.MaxUnidirectionalStreams, (ulong)_maxUnidirectionalStreams));
 12731023        }
 1024
 12731025        return new Dictionary<ParameterKey, IList<byte>>(parameters);
 1026
 1027        static KeyValuePair<ParameterKey, IList<byte>> EncodeParameter(ParameterKey key, ulong value)
 62551028        {
 62551029            int sizeLength = SliceEncoder.GetVarUInt62EncodedSize(value);
 62551030            byte[] buffer = new byte[sizeLength];
 62551031            SliceEncoder.EncodeVarUInt62(value, buffer);
 62551032            return new(key, buffer);
 62551033        }
 12731034    }
 1035
 1036    private bool IsUnknownStream(ulong streamId)
 98491037    {
 98491038        bool isRemote = streamId % 2 == (IsServer ? 0ul : 1ul);
 98491039        bool isBidirectional = streamId % 4 < 2;
 98491040        if (isRemote)
 53361041        {
 53361042            if (isBidirectional)
 25111043            {
 25111044                return _lastRemoteBidirectionalStreamId is null || streamId > _lastRemoteBidirectionalStreamId;
 1045            }
 1046            else
 28251047            {
 28251048                return _lastRemoteUnidirectionalStreamId is null || streamId > _lastRemoteUnidirectionalStreamId;
 1049            }
 1050        }
 1051        else
 45131052        {
 45131053            if (isBidirectional)
 22371054            {
 22371055                return streamId >= _nextBidirectionalId;
 1056            }
 1057            else
 22761058            {
 22761059                return streamId >= _nextUnidirectionalId;
 1060            }
 1061        }
 98491062    }
 1063
 1064    private Task ReadFrameAsync(FrameType frameType, int size, ulong? streamId, CancellationToken cancellationToken)
 249861065    {
 249861066        if (frameType >= FrameType.Stream && streamId is null)
 01067        {
 01068            throw new InvalidDataException("Received stream frame without stream ID.");
 1069        }
 1070
 249861071        switch (frameType)
 1072        {
 1073            case FrameType.Close:
 1651074            {
 1651075                return ReadCloseFrameAsync(size, cancellationToken);
 1076            }
 1077            case FrameType.Ping:
 301078            {
 301079                return ReadPingFrameAndWritePongFrameAsync(size, cancellationToken);
 1080            }
 1081            case FrameType.Pong:
 321082            {
 321083                return ReadPongFrameAsync(size, cancellationToken);
 1084            }
 1085            case FrameType.Stream:
 1086            case FrameType.StreamLast:
 191791087            {
 191791088                return ReadStreamDataFrameAsync(frameType, size, streamId!.Value, cancellationToken);
 1089            }
 1090            case FrameType.StreamWindowUpdate:
 20671091            {
 20671092                if (IsUnknownStream(streamId!.Value))
 21093                {
 21094                    throw new InvalidDataException($"Received {frameType} frame for unknown stream.");
 1095                }
 1096
 20651097                return ReadStreamWindowUpdateFrameAsync(size, streamId!.Value, cancellationToken);
 1098            }
 1099            case FrameType.StreamReadsClosed:
 1100            case FrameType.StreamWritesClosed:
 35071101            {
 35071102                if (size > 0)
 41103                {
 41104                    throw new InvalidDataException($"Unexpected body for {frameType} frame.");
 1105                }
 35031106                if (IsUnknownStream(streamId!.Value))
 41107                {
 41108                    throw new InvalidDataException($"Received {frameType} frame for unknown stream.");
 1109                }
 1110
 34991111                if (_streams.TryGetValue(streamId.Value, out SlicStream? stream))
 26571112                {
 26571113                    if (frameType == FrameType.StreamWritesClosed)
 801114                    {
 801115                        stream.ReceivedWritesClosedFrame();
 801116                    }
 1117                    else
 25771118                    {
 25771119                        stream.ReceivedReadsClosedFrame();
 25771120                    }
 26571121                }
 34991122                return Task.CompletedTask;
 1123            }
 1124            default:
 61125            {
 61126                throw new InvalidDataException($"Received unexpected {frameType} frame.");
 1127            }
 1128        }
 1129
 1130        async Task ReadCloseFrameAsync(int size, CancellationToken cancellationToken)
 1651131        {
 1651132            CloseBody closeBody = await ReadFrameBodyAsync(
 1651133                FrameType.Close,
 1651134                size,
 1631135                (ref SliceDecoder decoder) => new CloseBody(ref decoder),
 1651136                cancellationToken).ConfigureAwait(false);
 1137
 1611138            IceRpcError? peerCloseError = closeBody.ApplicationErrorCode switch
 1611139            {
 1171140                (ulong)MultiplexedConnectionCloseError.NoError => IceRpcError.ConnectionClosedByPeer,
 81141                (ulong)MultiplexedConnectionCloseError.Refused => IceRpcError.ConnectionRefused,
 161142                (ulong)MultiplexedConnectionCloseError.ServerBusy => IceRpcError.ServerBusy,
 101143                (ulong)MultiplexedConnectionCloseError.Aborted => IceRpcError.ConnectionAborted,
 101144                _ => null
 1611145            };
 1146
 1147            bool notAlreadyClosed;
 1611148            if (peerCloseError is null)
 101149            {
 101150                notAlreadyClosed = TryClose(
 101151                    new IceRpcException(IceRpcError.ConnectionAborted),
 101152                    $"The connection was closed by the peer with an unknown application error code: '{closeBody.Applicat
 101153                    IceRpcError.ConnectionAborted);
 101154            }
 1155            else
 1511156            {
 1511157                notAlreadyClosed = TryClose(
 1511158                    new IceRpcException(peerCloseError.Value),
 1511159                    "The connection was closed by the peer.",
 1511160                    peerCloseError);
 1511161            }
 1162
 1163            // The server-side of the duplex connection is only shutdown once the client-side is shutdown. When using
 1164            // TCP, this ensures that the server TCP connection won't end-up in the TIME_WAIT state on the server-side.
 1611165            if (notAlreadyClosed && !IsServer)
 561166            {
 1167                // DisposeAsync waits for the reads frames task to complete before disposing the writer.
 561168                lock (_mutex)
 561169                {
 561170                    _duplexConnectionWriter.Shutdown();
 561171                }
 561172                await _duplexConnectionWriter.WriterTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 541173            }
 1591174        }
 1175
 1176        async Task ReadPingFrameAndWritePongFrameAsync(int size, CancellationToken cancellationToken)
 301177        {
 1178            // Read the ping frame.
 301179            PingBody pingBody = await ReadFrameBodyAsync(
 301180                FrameType.Ping,
 301181                size,
 281182                (ref SliceDecoder decoder) => new PingBody(ref decoder),
 301183                cancellationToken).ConfigureAwait(false);
 1184
 1185            // Return a pong frame with the ping payload.
 261186            WriteConnectionFrame(FrameType.Pong, new PongBody(pingBody.Payload).Encode);
 261187        }
 1188
 1189        async Task ReadPongFrameAsync(int size, CancellationToken cancellationToken)
 321190        {
 321191            if (Interlocked.Decrement(ref _pendingPongCount) >= 0)
 261192            {
 1193                // Ensure the pong frame payload value is expected.
 1194
 261195                PongBody pongBody = await ReadFrameBodyAsync(
 261196                    FrameType.Pong,
 261197                    size,
 261198                    (ref SliceDecoder decoder) => new PongBody(ref decoder),
 261199                    cancellationToken).ConfigureAwait(false);
 1200
 1201                // For now, we only send a 0 or 1 payload value (0 for "write ping" and 1 for "read ping").
 261202                if (pongBody.Payload != 0L && pongBody.Payload != 1L)
 01203                {
 01204                    throw new InvalidDataException($"Received {nameof(FrameType.Pong)} with unexpected payload.");
 1205                }
 261206            }
 1207            else
 61208            {
 1209                // If not waiting for a pong frame, this pong frame is unexpected.
 61210                throw new InvalidDataException($"Received unexpected {nameof(FrameType.Pong)} frame.");
 1211            }
 261212        }
 1213
 1214        async Task ReadStreamWindowUpdateFrameAsync(int size, ulong streamId, CancellationToken cancellationToken)
 20651215        {
 20651216            StreamWindowUpdateBody frame = await ReadFrameBodyAsync(
 20651217                FrameType.StreamWindowUpdate,
 20651218                size,
 20651219                (ref SliceDecoder decoder) => new StreamWindowUpdateBody(ref decoder),
 20651220                cancellationToken).ConfigureAwait(false);
 20651221            if (_streams.TryGetValue(streamId, out SlicStream? stream))
 19911222            {
 19911223                stream.ReceivedWindowUpdateFrame(frame);
 19911224            }
 20651225        }
 1226
 1227        async Task<T> ReadFrameBodyAsync<T>(
 1228            FrameType frameType,
 1229            int size,
 1230            DecodeFunc<T> decodeFunc,
 1231            CancellationToken cancellationToken)
 22861232        {
 22861233            if (size <= 0)
 41234            {
 41235                throw new InvalidDataException($"Unexpected empty body for {frameType} frame.");
 1236            }
 1237
 22821238            ReadOnlySequence<byte> buffer = await _duplexConnectionReader.ReadAtLeastAsync(size, cancellationToken)
 22821239                .ConfigureAwait(false);
 1240
 22821241            if (buffer.Length > size)
 14081242            {
 14081243                buffer = buffer.Slice(0, size);
 14081244            }
 1245
 22821246            T decodedFrame = SliceEncoding.Slice2.DecodeBuffer(buffer, decodeFunc);
 22781247            _duplexConnectionReader.AdvanceTo(buffer.End);
 22781248            return decodedFrame;
 22781249        }
 249701250    }
 1251
 1252    private async ValueTask<(FrameType FrameType, int FrameSize, ulong? StreamId)?> ReadFrameHeaderAsync(
 1253        CancellationToken cancellationToken)
 274631254    {
 274631255        while (true)
 274631256        {
 1257            // Read data from the pipe reader.
 274631258            if (!_duplexConnectionReader.TryRead(out ReadOnlySequence<byte> buffer))
 178791259            {
 178791260                buffer = await _duplexConnectionReader.ReadAsync(cancellationToken).ConfigureAwait(false);
 169121261            }
 1262
 264961263            if (buffer.IsEmpty)
 2641264            {
 2641265                return null;
 1266            }
 1267
 262321268            if (TryDecodeHeader(
 262321269                buffer,
 262321270                out (FrameType FrameType, int FrameSize, ulong? StreamId) header,
 262321271                out int consumed))
 262221272            {
 262221273                _duplexConnectionReader.AdvanceTo(buffer.GetPosition(consumed));
 262221274                return header;
 1275            }
 1276            else
 01277            {
 01278                _duplexConnectionReader.AdvanceTo(buffer.Start, buffer.End);
 01279            }
 01280        }
 1281
 1282        static bool TryDecodeHeader(
 1283            ReadOnlySequence<byte> buffer,
 1284            out (FrameType FrameType, int FrameSize, ulong? StreamId) header,
 1285            out int consumed)
 262321286        {
 262321287            header = default;
 262321288            consumed = default;
 1289
 262321290            var decoder = new SliceDecoder(buffer, SliceEncoding.Slice2);
 1291
 1292            // Decode the frame type and frame size.
 262321293            if (!decoder.TryDecodeUInt8(out byte frameType) || !decoder.TryDecodeVarUInt62(out ulong frameSize))
 01294            {
 01295                return false;
 1296            }
 1297
 262321298            header.FrameType = frameType.AsFrameType();
 1299            try
 262261300            {
 262261301                header.FrameSize = checked((int)frameSize);
 262261302            }
 01303            catch (OverflowException exception)
 01304            {
 01305                throw new InvalidDataException("The frame size can't be larger than int.MaxValue.", exception);
 1306            }
 1307
 1308            // If it's a stream frame, try to decode the stream ID
 262261309            if (header.FrameType >= FrameType.Stream)
 247571310            {
 247571311                if (header.FrameSize == 0)
 21312                {
 21313                    throw new InvalidDataException("Invalid stream frame size.");
 1314                }
 1315
 247551316                consumed = (int)decoder.Consumed;
 247551317                if (!decoder.TryDecodeVarUInt62(out ulong streamId))
 01318                {
 01319                    return false;
 1320                }
 247551321                header.StreamId = streamId;
 247551322                header.FrameSize -= (int)decoder.Consumed - consumed;
 1323
 247551324                if (header.FrameSize < 0)
 21325                {
 21326                    throw new InvalidDataException("Invalid stream frame size.");
 1327                }
 247531328            }
 1329
 262221330            consumed = (int)decoder.Consumed;
 262221331            return true;
 262221332        }
 264861333    }
 1334
 1335    private async Task ReadFramesAsync(CancellationToken cancellationToken)
 12221336    {
 1337        try
 12221338        {
 261721339            while (true)
 261721340            {
 261721341                (FrameType Type, int Size, ulong? StreamId)? header = await ReadFrameHeaderAsync(cancellationToken)
 261721342                    .ConfigureAwait(false);
 1343
 252481344                if (header is null)
 2621345                {
 2621346                    lock (_mutex)
 2621347                    {
 2621348                        if (!_isClosed)
 01349                        {
 1350                            // Unexpected duplex connection shutdown.
 01351                            throw new IceRpcException(IceRpcError.ConnectionAborted);
 1352                        }
 2621353                    }
 1354                    // The peer has shut down the duplex connection.
 2621355                    break;
 1356                }
 1357
 249861358                await ReadFrameAsync(header.Value.Type, header.Value.Size, header.Value.StreamId, cancellationToken)
 249861359                    .ConfigureAwait(false);
 249501360            }
 1361
 2621362            if (IsServer)
 1351363            {
 1351364                Debug.Assert(_isClosed);
 1365
 1366                // The server-side of the duplex connection is only shutdown once the client-side is shutdown. When
 1367                // using TCP, this ensures that the server TCP connection won't end-up in the TIME_WAIT state on the
 1368                // server-side.
 1369
 1370                // DisposeAsync waits for the reads frames task to complete before disposing the writer.
 1351371                lock (_mutex)
 1351372                {
 1351373                    _duplexConnectionWriter.Shutdown();
 1374
 1375                    // Make sure that CloseAsync doesn't call Write on the writer if it's called shortly after the peer
 1376                    // shutdown its side of the connection (which triggers ReadFrameHeaderAsync to return null).
 1351377                    _writerIsShutdown = true;
 1351378                }
 1379
 1351380                await _duplexConnectionWriter.WriterTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 1331381            }
 2601382        }
 4721383        catch (OperationCanceledException)
 4721384        {
 1385            // Expected, DisposeAsync was called.
 4721386        }
 4521387        catch (IceRpcException exception)
 4521388        {
 4521389            TryClose(exception, "The connection was lost.", IceRpcError.ConnectionAborted);
 4521390            throw;
 1391        }
 381392        catch (InvalidDataException exception)
 381393        {
 381394            var rpcException = new IceRpcException(
 381395                IceRpcError.IceRpcError,
 381396                "The connection was aborted by a Slic protocol error.",
 381397                exception);
 381398            TryClose(rpcException, rpcException.Message, IceRpcError.IceRpcError);
 381399            throw rpcException;
 1400        }
 01401        catch (Exception exception)
 01402        {
 01403            Debug.Fail($"The read frames task completed due to an unhandled exception: {exception}");
 01404            TryClose(exception, "The connection was lost.", IceRpcError.ConnectionAborted);
 01405            throw;
 1406        }
 7321407    }
 1408
 1409    private async Task ReadStreamDataFrameAsync(
 1410        FrameType type,
 1411        int size,
 1412        ulong streamId,
 1413        CancellationToken cancellationToken)
 191791414    {
 191791415        bool endStream = type == FrameType.StreamLast;
 191791416        bool isRemote = streamId % 2 == (IsServer ? 0ul : 1ul);
 191791417        bool isBidirectional = streamId % 4 < 2;
 1418
 191791419        if (!isBidirectional && !isRemote)
 01420        {
 01421            throw new InvalidDataException(
 01422                "Received unexpected stream frame on local unidirectional stream.");
 1423        }
 191791424        else if (size == 0 && !endStream)
 21425        {
 21426            throw new InvalidDataException($"Received invalid {nameof(FrameType.Stream)} frame.");
 1427        }
 1428
 191771429        if (!_streams.TryGetValue(streamId, out SlicStream? stream) && isRemote && IsUnknownStream(streamId))
 40171430        {
 1431            // Create a new remote stream.
 1432
 40171433            if (size == 0)
 01434            {
 01435                throw new InvalidDataException("Received empty stream frame on new stream.");
 1436            }
 1437
 40171438            if (isBidirectional)
 12631439            {
 12631440                if (streamId > _lastRemoteBidirectionalStreamId + 4)
 01441                {
 01442                    throw new InvalidDataException("Invalid stream ID.");
 1443                }
 1444
 12631445                if (_bidirectionalStreamCount == _maxBidirectionalStreams)
 01446                {
 01447                    throw new IceRpcException(
 01448                        IceRpcError.IceRpcError,
 01449                        $"The maximum bidirectional stream count {_maxBidirectionalStreams} was reached.");
 1450                }
 12631451                Interlocked.Increment(ref _bidirectionalStreamCount);
 12631452            }
 1453            else
 27541454            {
 27541455                if (streamId > _lastRemoteUnidirectionalStreamId + 4)
 01456                {
 01457                    throw new InvalidDataException("Invalid stream ID.");
 1458                }
 1459
 27541460                if (_unidirectionalStreamCount == _maxUnidirectionalStreams)
 01461                {
 01462                    throw new IceRpcException(
 01463                        IceRpcError.IceRpcError,
 01464                        $"The maximum unidirectional stream count {_maxUnidirectionalStreams} was reached");
 1465                }
 27541466                Interlocked.Increment(ref _unidirectionalStreamCount);
 27541467            }
 1468
 1469            // The stream is registered with the connection and queued on the channel. The caller of AcceptStreamAsync
 1470            // is responsible for cleaning up the stream.
 40171471            stream = new SlicStream(this, isBidirectional, isRemote: true);
 1472
 1473            try
 40171474            {
 40171475                AddStream(streamId, stream);
 1476
 1477                try
 40171478                {
 40171479                    await _acceptStreamChannel.Writer.WriteAsync(
 40171480                        stream,
 40171481                        cancellationToken).ConfigureAwait(false);
 40171482                }
 01483                catch (ChannelClosedException exception)
 01484                {
 1485                    // The exception given to ChannelWriter.Complete(Exception? exception) is the InnerException.
 01486                    Debug.Assert(exception.InnerException is not null);
 01487                    throw ExceptionUtil.Throw(exception.InnerException);
 1488                }
 40171489            }
 01490            catch (IceRpcException)
 01491            {
 1492                // The two methods above throw IceRpcException if the connection has been closed (either by CloseAsync
 1493                // or because the close frame was received). We cleanup up the stream but don't throw to not abort the
 1494                // reading. The connection graceful closure still needs to read on the connection to figure out when the
 1495                // peer shuts down the duplex connection.
 01496                Debug.Assert(_isClosed);
 01497                stream.Input.Complete();
 01498                if (isBidirectional)
 01499                {
 01500                    stream.Output.Complete();
 01501                }
 01502            }
 40171503        }
 1504
 191771505        bool isDataConsumed = false;
 191771506        if (stream is not null)
 189001507        {
 1508            // Let the stream consume the stream frame data.
 189001509            isDataConsumed = await stream.ReceivedDataFrameAsync(
 189001510                size,
 189001511                endStream,
 189001512                cancellationToken).ConfigureAwait(false);
 189001513        }
 1514
 191771515        if (!isDataConsumed)
 2941516        {
 1517            // The stream (if any) didn't consume the data. Read and ignore the data using a helper pipe.
 2941518            var pipe = new Pipe(
 2941519                new PipeOptions(
 2941520                    pool: Pool,
 2941521                    pauseWriterThreshold: 0,
 2941522                    minimumSegmentSize: MinSegmentSize,
 2941523                    useSynchronizationContext: false));
 1524
 2941525            await _duplexConnectionReader.FillBufferWriterAsync(
 2941526                    pipe.Writer,
 2941527                    size,
 2941528                    cancellationToken).ConfigureAwait(false);
 1529
 2921530            pipe.Writer.Complete();
 2921531            pipe.Reader.Complete();
 2921532        }
 191751533    }
 1534
 1535    private bool TryClose(Exception exception, string closeMessage, IceRpcError? peerCloseError = null)
 22201536    {
 22201537        lock (_mutex)
 22201538        {
 22201539            if (_isClosed)
 8501540            {
 8501541                return false;
 1542            }
 13701543            _isClosed = true;
 13701544            _closedMessage = closeMessage;
 13701545            _peerCloseError = peerCloseError;
 13701546            if (_streamSemaphoreWaitCount == 0)
 13571547            {
 13571548                _streamSemaphoreWaitClosed.SetResult();
 13571549            }
 13701550        }
 1551
 1552        // Cancel pending CreateStreamAsync, AcceptStreamAsync and WriteStreamDataFrameAsync operations.
 13701553        _closedCts.Cancel();
 13701554        _acceptStreamChannel.Writer.TryComplete(exception);
 1555
 1556        // Close streams.
 66021557        foreach (SlicStream stream in _streams.Values)
 12461558        {
 12461559            stream.Close(exception);
 12461560        }
 1561
 13701562        return true;
 22201563    }
 1564
 1565    private void WriteFrame(FrameType frameType, ulong? streamId, EncodeAction? encode)
 82701566    {
 82701567        var encoder = new SliceEncoder(_duplexConnectionWriter, SliceEncoding.Slice2);
 82701568        encoder.EncodeFrameType(frameType);
 82701569        Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(4);
 82701570        int startPos = encoder.EncodedByteCount;
 82701571        if (streamId is not null)
 67801572        {
 67801573            encoder.EncodeVarUInt62(streamId.Value);
 67801574        }
 82701575        encode?.Invoke(ref encoder);
 82701576        SliceEncoder.EncodeVarUInt62((ulong)(encoder.EncodedByteCount - startPos), sizePlaceholder);
 82701577    }
 1578}

Methods/Properties

get_IsServer()
get_MinSegmentSize()
get_PeerInitialStreamWindowSize()
get_PeerMaxStreamFrameSize()
get_Pool()
get_InitialStreamWindowSize()
get_StreamWindowUpdateThreshold()
.ctor(IceRpc.Transports.IDuplexConnection,IceRpc.Transports.MultiplexedConnectionOptions,IceRpc.Transports.Slic.SlicTransportOptions,System.Boolean)
AcceptStreamAsync()
ConnectAsync(System.Threading.CancellationToken)
PerformConnectAsync()
DecodeInitialize()
DecodeInitializeAckOrVersion()
ReadFrameAsync()
CloseAsync()
CreateStreamAsync()
DisposeAsync()
PerformDisposeAsync()
SendPing()
SendReadPing()
SendWritePing()
FillBufferWriterAsync(System.Buffers.IBufferWriter`1<System.Byte>,System.Int32,System.Threading.CancellationToken)
ReleaseStream(IceRpc.Transports.Slic.Internal.SlicStream)
ThrowIfClosed()
WriteConnectionFrame(IceRpc.Transports.Slic.Internal.FrameType,ZeroC.Slice.EncodeAction)
WriteStreamFrame(IceRpc.Transports.Slic.Internal.SlicStream,IceRpc.Transports.Slic.Internal.FrameType,ZeroC.Slice.EncodeAction,System.Boolean)
WriteStreamDataFrameAsync()
EncodeStreamFrameHeader()
AddStream(System.UInt64,IceRpc.Transports.Slic.Internal.SlicStream)
DecodeParameters(System.Collections.Generic.IDictionary`2<IceRpc.Transports.Slic.Internal.ParameterKey,System.Collections.Generic.IList`1<System.Byte>>)
DecodeParamValue()
EncodeParameters()
EncodeParameter()
IsUnknownStream(System.UInt64)
ReadFrameAsync(IceRpc.Transports.Slic.Internal.FrameType,System.Int32,System.Nullable`1<System.UInt64>,System.Threading.CancellationToken)
ReadCloseFrameAsync()
ReadPingFrameAndWritePongFrameAsync()
ReadPongFrameAsync()
ReadStreamWindowUpdateFrameAsync()
ReadFrameBodyAsync()
ReadFrameHeaderAsync()
TryDecodeHeader()
ReadFramesAsync()
ReadStreamDataFrameAsync()
TryClose(System.Exception,System.String,System.Nullable`1<IceRpc.IceRpcError>)
WriteFrame(IceRpc.Transports.Slic.Internal.FrameType,System.Nullable`1<System.UInt64>,ZeroC.Slice.EncodeAction)