< Summary

Information
Class: IceRpc.Internal.IceProtocolConnection
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Internal/IceProtocolConnection.cs
Tag: 275_13775359185
Line coverage
89%
Covered lines: 845
Uncovered lines: 100
Coverable lines: 945
Total lines: 1565
Line coverage: 89.4%
Branch coverage
83%
Covered branches: 192
Total branches: 230
Branch coverage: 83.4%
Method coverage
100%
Covered methods: 37
Total methods: 37
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.cctor()100%11100%
get_IsServer()100%11100%
.ctor(...)87.5%88100%
ConnectAsync(...)50%2.02283.33%
PerformConnectAsync()83.33%20.711879.71%
EncodeValidateConnectionFrame()100%11100%
DecodeValidateConnectionFrame()100%11100%
DisposeAsync()100%66100%
PerformDisposeAsync()87.5%88100%
InvokeAsync(...)75%8.09888.88%
PerformInvokeAsync()79.16%24.042495.78%
ShutdownAsync(...)87.5%8.07889.47%
PerformShutdownAsync()100%8.07889.47%
EncodeCloseConnectionFrame()100%11100%
SendHeartbeat()75%44100%
SendValidateConnectionFrameAsync()100%2.98237.5%
EncodeValidateConnectionFrame()100%11100%
DecodeRequestIdAndHeader(...)75%13.041280.64%
DecodeResponseHeader(...)68.75%16.861685%
EncodeRequestHeader(...)83.33%6.02692.1%
EncodeResponseHeader(...)100%1212100%
ReadFullPayloadAsync()50%4.25475%
AcquireWriteLockAsync()100%22100%
CreateFrameReaderAsync()100%1.02173.33%
DecrementDispatchInvocationCount()100%66100%
DispatchRequestAsync()93.75%16.011696.2%
IncrementDispatchInvocationCount()100%22100%
ScheduleInactivityCheck()100%11100%
ReadFramesAsync()72.22%23.061875%
AbortTwowayInvocations()100%22100%
ReadFailed()100%11100%
ReadReplyAsync()80%10.021093.93%
ReadRequestAsync()75%20.572088.73%
<ReadRequestAsync()100%1.02170.83%
RefuseNewInvocations(...)100%22100%
SendControlFrameAsync()100%11100%
WriteFailed(...)100%11100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Internal/IceProtocolConnection.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Transports;
 4using IceRpc.Transports.Internal;
 5using System.Buffers;
 6using System.Collections.Immutable;
 7using System.Diagnostics;
 8using System.IO.Pipelines;
 9using System.Security.Authentication;
 10using ZeroC.Slice;
 11
 12namespace IceRpc.Internal;
 13
 14/// <summary>Implements <see cref="IProtocolConnection" /> for the ice protocol.</summary>
 15internal sealed class IceProtocolConnection : IProtocolConnection
 16{
 117    private static readonly IDictionary<RequestFieldKey, ReadOnlySequence<byte>> _idempotentFields =
 118        new Dictionary<RequestFieldKey, ReadOnlySequence<byte>>
 119        {
 120            [RequestFieldKey.Idempotent] = default
 121        }.ToImmutableDictionary();
 22
 35023    private bool IsServer => _transportConnectionInformation is not null;
 24
 25    private IConnectionContext? _connectionContext; // non-null once the connection is established
 26    private Task? _connectTask;
 27    private readonly IDispatcher _dispatcher;
 28
 29    // The number of outstanding dispatches and invocations.
 30    private int _dispatchInvocationCount;
 31
 32    // We don't want the continuation to run from the dispatch or invocation thread.
 37233    private readonly TaskCompletionSource _dispatchesAndInvocationsCompleted =
 37234        new(TaskCreationOptions.RunContinuationsAsynchronously);
 35
 36    private readonly SemaphoreSlim? _dispatchSemaphore;
 37
 38    // This cancellation token source is canceled when the connection is disposed.
 37239    private readonly CancellationTokenSource _disposedCts = new();
 40
 41    private Task? _disposeTask;
 42    private readonly IDuplexConnection _duplexConnection;
 43    private readonly DuplexConnectionReader _duplexConnectionReader;
 44    private readonly IceDuplexConnectionWriter _duplexConnectionWriter;
 37245    private bool _heartbeatEnabled = true;
 37246    private Task _heartbeatTask = Task.CompletedTask;
 47    private readonly TimeSpan _inactivityTimeout;
 48    private readonly Timer _inactivityTimeoutTimer;
 49    private string? _invocationRefusedMessage;
 50    private int _lastRequestId;
 51    private readonly int _maxFrameSize;
 37252    private readonly object _mutex = new();
 53    private readonly PipeOptions _pipeOptions;
 54    private Task? _readFramesTask;
 55
 56    // A connection refuses invocations when it's disposed, shut down, shutting down or merely "shutdown requested".
 57    private bool _refuseInvocations;
 58
 59    // Does ShutdownAsync send a close connection frame?
 37260    private bool _sendCloseConnectionFrame = true;
 61
 62    private Task? _shutdownTask;
 63
 64    // The thread that completes this TCS can run the continuations, and as a result its result must be set without
 65    // holding a lock on _mutex.
 37266    private readonly TaskCompletionSource _shutdownRequestedTcs = new();
 67
 68    // Only set for server connections.
 69    private readonly TransportConnectionInformation? _transportConnectionInformation;
 70
 71    private readonly CancellationTokenSource _twowayDispatchesCts;
 37272    private readonly Dictionary<int, TaskCompletionSource<PipeReader>> _twowayInvocations = new();
 73
 74    private Exception? _writeException; // protected by _writeSemaphore
 37275    private readonly SemaphoreSlim _writeSemaphore = new(1, 1);
 76
 77    public Task<(TransportConnectionInformation ConnectionInformation, Task ShutdownRequested)> ConnectAsync(
 78        CancellationToken cancellationToken)
 37079    {
 80        Task<(TransportConnectionInformation ConnectionInformation, Task ShutdownRequested)> result;
 37081        lock (_mutex)
 37082        {
 37083            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 84
 36685            if (_connectTask is not null)
 086            {
 087                throw new InvalidOperationException("Cannot call connect more than once.");
 88            }
 89
 36690            result = PerformConnectAsync();
 36691            _connectTask = result;
 36692        }
 36693        return result;
 94
 95        async Task<(TransportConnectionInformation ConnectionInformation, Task ShutdownRequested)> PerformConnectAsync()
 36696        {
 97            // Make sure we execute the function without holding the connection mutex lock.
 36698            await Task.Yield();
 99
 100            // _disposedCts is not disposed at this point because DisposeAsync waits for the completion of _connectTask.
 366101            using var connectCts = CancellationTokenSource.CreateLinkedTokenSource(
 366102                cancellationToken,
 366103                _disposedCts.Token);
 104
 105            TransportConnectionInformation transportConnectionInformation;
 106
 107            try
 366108            {
 109                // If the transport connection information is null, we need to connect the transport connection. It's
 110                // null for client connections. The transport connection of a server connection is established by
 111                // Server.
 366112                transportConnectionInformation = _transportConnectionInformation ??
 366113                    await _duplexConnection.ConnectAsync(connectCts.Token).ConfigureAwait(false);
 114
 350115                if (IsServer)
 169116                {
 117                    // Send ValidateConnection frame.
 169118                    await SendControlFrameAsync(EncodeValidateConnectionFrame, connectCts.Token).ConfigureAwait(false);
 119
 120                    // The SendControlFrameAsync is a "write" that schedules a heartbeat when the idle timeout is not
 121                    // infinite. So no need to call ScheduleHeartbeat.
 165122                }
 123                else
 181124                {
 181125                    ReadOnlySequence<byte> buffer = await _duplexConnectionReader.ReadAtLeastAsync(
 181126                        IceDefinitions.PrologueSize,
 181127                        connectCts.Token).ConfigureAwait(false);
 128
 161129                    (IcePrologue validateConnectionFrame, long consumed) = DecodeValidateConnectionFrame(buffer);
 161130                    _duplexConnectionReader.AdvanceTo(buffer.GetPosition(consumed), buffer.End);
 131
 161132                    IceDefinitions.CheckPrologue(validateConnectionFrame);
 159133                    if (validateConnectionFrame.FrameSize != IceDefinitions.PrologueSize)
 0134                    {
 0135                        throw new InvalidDataException(
 0136                            $"Received ice frame with only '{validateConnectionFrame.FrameSize}' bytes.");
 137                    }
 159138                    if (validateConnectionFrame.FrameType != IceFrameType.ValidateConnection)
 0139                    {
 0140                        throw new InvalidDataException(
 0141                            $"Expected '{nameof(IceFrameType.ValidateConnection)}' frame but received frame type '{valid
 142                    }
 143
 144                    // The client connection is now connected, so we schedule the first heartbeat.
 159145                    if (_duplexConnection is IceDuplexConnectionDecorator decorator)
 159146                    {
 159147                        decorator.ScheduleHeartbeat();
 159148                    }
 159149                }
 324150            }
 22151            catch (OperationCanceledException)
 22152            {
 22153                cancellationToken.ThrowIfCancellationRequested();
 154
 10155                Debug.Assert(_disposedCts.Token.IsCancellationRequested);
 10156                throw new IceRpcException(
 10157                    IceRpcError.OperationAborted,
 10158                    "The connection establishment was aborted because the connection was disposed.");
 159            }
 2160            catch (InvalidDataException exception)
 2161            {
 2162                throw new IceRpcException(
 2163                    IceRpcError.ConnectionAborted,
 2164                    "The connection was aborted by an ice protocol error.",
 2165                    exception);
 166            }
 2167            catch (AuthenticationException)
 2168            {
 2169                throw;
 170            }
 16171            catch (IceRpcException)
 16172            {
 16173                throw;
 174            }
 0175            catch (Exception exception)
 0176            {
 0177                Debug.Fail($"ConnectAsync failed with an unexpected exception: {exception}");
 0178                throw;
 179            }
 180
 181            // We assign _readFramesTask with _mutex locked to make sure this assignment occurs before the start of
 182            // DisposeAsync. Once _disposeTask is not null, _readFramesTask is immutable.
 324183            lock (_mutex)
 324184            {
 324185                if (_disposeTask is not null)
 0186                {
 0187                    throw new IceRpcException(
 0188                        IceRpcError.OperationAborted,
 0189                        "The connection establishment was aborted because the connection was disposed.");
 190                }
 191
 192                // This needs to be set before starting the read frames task below.
 324193                _connectionContext = new ConnectionContext(this, transportConnectionInformation);
 194
 324195                _readFramesTask = ReadFramesAsync(_disposedCts.Token);
 324196            }
 197
 198            // The _readFramesTask waits for this PerformConnectAsync completion before reading anything. As soon as
 199            // it receives a request, it will cancel this inactivity check.
 324200            ScheduleInactivityCheck();
 201
 324202            return (transportConnectionInformation, _shutdownRequestedTcs.Task);
 203
 204            static void EncodeValidateConnectionFrame(IBufferWriter<byte> writer)
 169205            {
 169206                var encoder = new SliceEncoder(writer, SliceEncoding.Slice1);
 169207                IceDefinitions.ValidateConnectionFrame.Encode(ref encoder);
 169208            }
 209
 210            static (IcePrologue, long) DecodeValidateConnectionFrame(ReadOnlySequence<byte> buffer)
 161211            {
 161212                var decoder = new SliceDecoder(buffer, SliceEncoding.Slice1);
 161213                return (new IcePrologue(ref decoder), decoder.Consumed);
 161214            }
 324215        }
 366216    }
 217
 218    public ValueTask DisposeAsync()
 412219    {
 412220        lock (_mutex)
 412221        {
 412222            if (_disposeTask is null)
 372223            {
 372224                RefuseNewInvocations("The connection was disposed.");
 225
 372226                _shutdownTask ??= Task.CompletedTask;
 372227                if (_dispatchInvocationCount == 0)
 353228                {
 353229                    _dispatchesAndInvocationsCompleted.TrySetResult();
 353230                }
 231
 372232                _heartbeatEnabled = false; // makes _heartbeatTask immutable
 233
 372234                _disposeTask = PerformDisposeAsync();
 372235            }
 412236        }
 412237        return new(_disposeTask);
 238
 239        async Task PerformDisposeAsync()
 372240        {
 241            // Make sure we execute the code below without holding the mutex lock.
 372242            await Task.Yield();
 243
 372244            _disposedCts.Cancel();
 245
 246            // We don't lock _mutex since once _disposeTask is not null, _connectTask etc are immutable.
 247
 372248            if (_connectTask is not null)
 366249            {
 250                // Wait for all writes to complete. This can't take forever since all writes are canceled by
 251                // _disposedCts.Token.
 366252                await _writeSemaphore.WaitAsync().ConfigureAwait(false);
 253
 254                try
 366255                {
 366256                    await Task.WhenAll(
 366257                        _connectTask,
 366258                        _readFramesTask ?? Task.CompletedTask,
 366259                        _heartbeatTask,
 366260                        _dispatchesAndInvocationsCompleted.Task,
 366261                        _shutdownTask).ConfigureAwait(false);
 262262                }
 104263                catch
 104264                {
 265                    // Expected if any of these tasks failed or was canceled. Each task takes care of handling
 266                    // unexpected exceptions so there's no need to handle them here.
 104267                }
 366268            }
 269
 372270            _duplexConnection.Dispose();
 271
 272            // It's safe to dispose the reader/writer since no more threads are sending/receiving data.
 372273            _duplexConnectionReader.Dispose();
 372274            _duplexConnectionWriter.Dispose();
 275
 372276            _disposedCts.Dispose();
 372277            _twowayDispatchesCts.Dispose();
 278
 372279            _dispatchSemaphore?.Dispose();
 372280            _writeSemaphore.Dispose();
 372281            await _inactivityTimeoutTimer.DisposeAsync().ConfigureAwait(false);
 372282        }
 412283    }
 284
 285    public Task<IncomingResponse> InvokeAsync(OutgoingRequest request, CancellationToken cancellationToken = default)
 2723286    {
 2723287        if (request.Protocol != Protocol.Ice)
 2288        {
 2289            throw new InvalidOperationException(
 2290                $"Cannot send {request.Protocol} request on {Protocol.Ice} connection.");
 291        }
 292
 2721293        lock (_mutex)
 2721294        {
 2721295            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 296
 2719297            if (_refuseInvocations)
 2298            {
 2299                throw new IceRpcException(IceRpcError.InvocationRefused, _invocationRefusedMessage);
 300            }
 2717301            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 0302            {
 0303                throw new InvalidOperationException("Cannot invoke on a connection that is not fully established.");
 304            }
 305
 2717306            IncrementDispatchInvocationCount();
 2717307        }
 308
 2717309        return PerformInvokeAsync();
 310
 311        async Task<IncomingResponse> PerformInvokeAsync()
 2717312        {
 313            // Since _dispatchInvocationCount > 0, _disposedCts is not disposed.
 2717314            using var invocationCts =
 2717315                CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token, cancellationToken);
 316
 2717317            PipeReader? frameReader = null;
 2717318            TaskCompletionSource<PipeReader>? responseCompletionSource = null;
 2717319            int requestId = 0;
 320
 321            try
 2717322            {
 323                // Read the full payload. This can take some time so this needs to be done before acquiring the write
 324                // semaphore.
 2717325                ReadOnlySequence<byte> payloadBuffer = await ReadFullPayloadAsync(request.Payload, invocationCts.Token)
 2717326                    .ConfigureAwait(false);
 327
 328                try
 2717329                {
 330                    // Wait for the writing of other frames to complete.
 2717331                    using SemaphoreLock _ = await AcquireWriteLockAsync(invocationCts.Token).ConfigureAwait(false);
 332
 333                    // Assign the request ID for two-way invocations and keep track of the invocation for receiving the
 334                    // response. The request ID is only assigned once the write semaphore is acquired. We don't want a
 335                    // canceled request to allocate a request ID that won't be used.
 2717336                    lock (_mutex)
 2717337                    {
 2717338                        if (_refuseInvocations)
 0339                        {
 340                            // It's InvocationCanceled and not InvocationRefused because we've read the payload.
 0341                            throw new IceRpcException(IceRpcError.InvocationCanceled, _invocationRefusedMessage);
 342                        }
 343
 2717344                        if (!request.IsOneway)
 701345                        {
 346                            // wrap around back to 1 if we reach int.MaxValue. 0 means one-way.
 701347                            _lastRequestId = _lastRequestId == int.MaxValue ? 1 : _lastRequestId + 1;
 701348                            requestId = _lastRequestId;
 349
 350                            // RunContinuationsAsynchronously because we don't want the "read frames loop" to run the
 351                            // continuation.
 701352                            responseCompletionSource = new(TaskCreationOptions.RunContinuationsAsynchronously);
 701353                            _twowayInvocations[requestId] = responseCompletionSource;
 701354                        }
 2717355                    }
 356
 2717357                    int payloadSize = checked((int)payloadBuffer.Length);
 358
 359                    try
 2717360                    {
 2717361                        EncodeRequestHeader(_duplexConnectionWriter, request, requestId, payloadSize);
 362
 363                        // We write to the duplex connection with _disposedCts.Token instead of invocationCts.Token.
 364                        // Canceling this write operation is fatal to the connection.
 2717365                        await _duplexConnectionWriter.WriteAsync(payloadBuffer, _disposedCts.Token)
 2717366                            .ConfigureAwait(false);
 2715367                    }
 2368                    catch (Exception exception)
 2369                    {
 2370                        WriteFailed(exception);
 2371                        throw;
 372                    }
 2715373                }
 2374                catch (IceRpcException exception) when (exception.IceRpcError != IceRpcError.InvocationCanceled)
 2375                {
 376                    // Since we could not send the request, the server cannot dispatch it and it's safe to retry.
 377                    // This includes the situation where await AcquireWriteLockAsync throws because a previous write
 378                    // failed.
 2379                    throw new IceRpcException(
 2380                        IceRpcError.InvocationCanceled,
 2381                        "Failed to send ice request.",
 2382                        exception);
 383                }
 384                finally
 2717385                {
 386                    // We've read the payload (see ReadFullPayloadAsync) and we are now done with it.
 2717387                    request.Payload.Complete();
 2717388                }
 389
 2715390                if (request.IsOneway)
 2016391                {
 392                    // We're done, there's no response for one-way requests.
 2016393                    return new IncomingResponse(request, _connectionContext!);
 394                }
 395
 396                // Wait to receive the response.
 699397                Debug.Assert(responseCompletionSource is not null);
 699398                frameReader = await responseCompletionSource.Task.WaitAsync(invocationCts.Token).ConfigureAwait(false);
 399
 661400                if (!frameReader.TryRead(out ReadResult readResult))
 0401                {
 0402                    throw new InvalidDataException($"Received empty response frame for request with id '{requestId}'.");
 403                }
 404
 661405                Debug.Assert(readResult.IsCompleted);
 406
 661407                (StatusCode statusCode, string? errorMessage, SequencePosition consumed) =
 661408                    DecodeResponseHeader(readResult.Buffer, requestId);
 409
 661410                frameReader.AdvanceTo(consumed);
 411
 661412                var response = new IncomingResponse(
 661413                    request,
 661414                    _connectionContext!,
 661415                    statusCode,
 661416                    errorMessage)
 661417                {
 661418                    Payload = frameReader
 661419                };
 420
 661421                frameReader = null; // response now owns frameReader
 661422                return response;
 423            }
 18424            catch (OperationCanceledException)
 18425            {
 18426                cancellationToken.ThrowIfCancellationRequested();
 427
 6428                Debug.Assert(_disposedCts.Token.IsCancellationRequested);
 6429                throw new IceRpcException(
 6430                    IceRpcError.OperationAborted,
 6431                    "The invocation was aborted because the connection was disposed.");
 432            }
 433            finally
 2717434            {
 435                // If responseCompletionSource is not completed, we want to complete it to prevent another method from
 436                // setting an unobservable exception in it. And if it's already completed with an exception, we observe
 437                // this exception.
 2717438                if (responseCompletionSource is not null &&
 2717439                    !responseCompletionSource.TrySetResult(InvalidPipeReader.Instance))
 681440                {
 441                    try
 681442                    {
 681443                        _ = await responseCompletionSource.Task.ConfigureAwait(false);
 661444                    }
 20445                    catch
 20446                    {
 447                        // observe exception, if any
 20448                    }
 681449                }
 450
 2717451                lock (_mutex)
 2717452                {
 453                    // Unregister the two-way invocation if registered.
 2717454                    if (requestId > 0 && !_refuseInvocations)
 665455                    {
 665456                        _twowayInvocations.Remove(requestId);
 665457                    }
 458
 2717459                    DecrementDispatchInvocationCount();
 2717460                }
 461
 2717462                frameReader?.Complete();
 2717463            }
 2677464        }
 2717465    }
 466
 467    public Task ShutdownAsync(CancellationToken cancellationToken = default)
 111468    {
 111469        lock (_mutex)
 111470        {
 111471            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 472
 109473            if (_shutdownTask is not null)
 0474            {
 0475                throw new InvalidOperationException("Cannot call ShutdownAsync more than once.");
 476            }
 109477            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 6478            {
 6479                throw new InvalidOperationException("Cannot shut down a protocol connection before it's connected.");
 480            }
 481
 103482            RefuseNewInvocations("The connection was shut down.");
 483
 103484            if (_dispatchInvocationCount == 0)
 76485            {
 76486                _dispatchesAndInvocationsCompleted.TrySetResult();
 76487            }
 103488            _shutdownTask = PerformShutdownAsync(_sendCloseConnectionFrame);
 103489        }
 490
 103491        return _shutdownTask;
 492
 493        async Task PerformShutdownAsync(bool sendCloseConnectionFrame)
 103494        {
 103495            await Task.Yield(); // exit mutex lock
 496
 497            try
 103498            {
 103499                Debug.Assert(_readFramesTask is not null);
 500
 501                // Since DisposeAsync waits for the _shutdownTask completion, _disposedCts is not disposed at this
 502                // point.
 103503                using var shutdownCts = CancellationTokenSource.CreateLinkedTokenSource(
 103504                    cancellationToken,
 103505                    _disposedCts.Token);
 506
 507                // Wait for dispatches and invocations to complete.
 103508                await _dispatchesAndInvocationsCompleted.Task.WaitAsync(shutdownCts.Token).ConfigureAwait(false);
 509
 510                // Stops sending heartbeats. We can't do earlier: while we're waiting for dispatches and invocations to
 511                // complete, we need to keep sending heartbeats otherwise the peer could see the connection as idle and
 512                // abort it.
 95513                lock (_mutex)
 95514                {
 95515                    _heartbeatEnabled = false; // makes _heartbeatTask immutable
 95516                }
 517
 518                // Wait for the last send heartbeat to complete before sending the CloseConnection frame or disposing
 519                // the duplex connection. _heartbeatTask is immutable once _shutdownTask set. _heartbeatTask can be
 520                // canceled by DisposeAsync.
 95521                await _heartbeatTask.WaitAsync(shutdownCts.Token).ConfigureAwait(false);
 522
 95523                if (sendCloseConnectionFrame)
 42524                {
 525                    // Send CloseConnection frame.
 42526                    await SendControlFrameAsync(EncodeCloseConnectionFrame, shutdownCts.Token).ConfigureAwait(false);
 527
 528                    // Wait for the peer to abort the connection as an acknowledgment for this CloseConnection frame.
 529                    // The peer can also send us a CloseConnection frame if it started shutting down at the same time.
 530                    // We can't just return and dispose the duplex connection since the peer can still be reading frames
 531                    // (including the CloseConnection frame) and we don't want to abort this reading.
 38532                    await _readFramesTask.WaitAsync(shutdownCts.Token).ConfigureAwait(false);
 37533                }
 534                else
 53535                {
 536                    // _readFramesTask should be already completed or nearly completed.
 53537                    await _readFramesTask.WaitAsync(shutdownCts.Token).ConfigureAwait(false);
 538
 539                    // _readFramesTask succeeded means the peer is waiting for us to abort the duplex connection;
 540                    // we oblige.
 34541                    _duplexConnection.Dispose();
 34542                }
 71543            }
 11544            catch (OperationCanceledException)
 11545            {
 11546                cancellationToken.ThrowIfCancellationRequested();
 547
 5548                Debug.Assert(_disposedCts.Token.IsCancellationRequested);
 5549                throw new IceRpcException(
 5550                    IceRpcError.OperationAborted,
 5551                    "The connection shutdown was aborted because the connection was disposed.");
 552            }
 21553            catch (IceRpcException)
 21554            {
 21555                throw;
 556            }
 0557            catch (Exception exception)
 0558            {
 0559                Debug.Fail($"ShutdownAsync failed with an unexpected exception: {exception}");
 0560                throw;
 561            }
 562
 563            static void EncodeCloseConnectionFrame(IBufferWriter<byte> writer)
 40564            {
 40565                var encoder = new SliceEncoder(writer, SliceEncoding.Slice1);
 40566                IceDefinitions.CloseConnectionFrame.Encode(ref encoder);
 40567            }
 71568        }
 103569    }
 570
 372571    internal IceProtocolConnection(
 372572        IDuplexConnection duplexConnection,
 372573        TransportConnectionInformation? transportConnectionInformation,
 372574        ConnectionOptions options)
 372575    {
 372576        _twowayDispatchesCts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
 577
 578        // With ice, we always listen for incoming frames (responses) so we need a dispatcher for incoming requests even
 579        // if we don't expect any. This dispatcher throws an ice ObjectNotExistException back to the client, which makes
 580        // more sense than throwing an UnknownException.
 372581        _dispatcher = options.Dispatcher ?? NotFoundDispatcher.Instance;
 582
 372583        _maxFrameSize = options.MaxIceFrameSize;
 372584        _transportConnectionInformation = transportConnectionInformation;
 585
 372586        if (options.MaxDispatches > 0)
 372587        {
 372588            _dispatchSemaphore = new SemaphoreSlim(
 372589                initialCount: options.MaxDispatches,
 372590                maxCount: options.MaxDispatches);
 372591        }
 592
 372593        _inactivityTimeout = options.InactivityTimeout;
 594
 595        // The readerScheduler doesn't matter (we don't call pipe.Reader.ReadAsync on the resulting pipe), and the
 596        // writerScheduler doesn't matter (pipe.Writer.FlushAsync never blocks).
 372597        _pipeOptions = new PipeOptions(
 372598            pool: options.Pool,
 372599            minimumSegmentSize: options.MinSegmentSize,
 372600            pauseWriterThreshold: 0,
 372601            useSynchronizationContext: false);
 602
 372603        if (options.IceIdleTimeout != Timeout.InfiniteTimeSpan)
 372604        {
 372605            duplexConnection = new IceDuplexConnectionDecorator(
 372606                duplexConnection,
 372607                readIdleTimeout: options.EnableIceIdleCheck ? options.IceIdleTimeout : Timeout.InfiniteTimeSpan,
 372608                writeIdleTimeout: options.IceIdleTimeout,
 372609                SendHeartbeat);
 372610        }
 611
 372612        _duplexConnection = duplexConnection;
 372613        _duplexConnectionReader = new DuplexConnectionReader(_duplexConnection, options.Pool, options.MinSegmentSize);
 372614        _duplexConnectionWriter =
 372615            new IceDuplexConnectionWriter(_duplexConnection, options.Pool, options.MinSegmentSize);
 616
 372617        _inactivityTimeoutTimer = new Timer(_ =>
 10618        {
 10619            bool requestShutdown = false;
 372620
 10621            lock (_mutex)
 10622            {
 10623                if (_dispatchInvocationCount == 0 && _shutdownTask is null)
 10624                {
 10625                    requestShutdown = true;
 10626                    RefuseNewInvocations(
 10627                        $"The connection was shut down because it was inactive for over {_inactivityTimeout.TotalSeconds
 10628                }
 10629            }
 372630
 10631            if (requestShutdown)
 10632            {
 372633                // TrySetResult must be called outside the mutex lock.
 10634                _shutdownRequestedTcs.TrySetResult();
 10635            }
 382636        });
 637
 638        void SendHeartbeat()
 23639        {
 23640            lock (_mutex)
 23641            {
 23642                if (_heartbeatTask.IsCompletedSuccessfully && _heartbeatEnabled)
 23643                {
 23644                    _heartbeatTask = SendValidateConnectionFrameAsync(_disposedCts.Token);
 23645                }
 23646            }
 647
 648            async Task SendValidateConnectionFrameAsync(CancellationToken cancellationToken)
 23649            {
 650                // Make sure we execute the function without holding the connection mutex lock.
 23651                await Task.Yield();
 652
 653                try
 23654                {
 23655                    await SendControlFrameAsync(EncodeValidateConnectionFrame, cancellationToken).ConfigureAwait(false);
 23656                }
 0657                catch (OperationCanceledException)
 0658                {
 659                    // Canceled by DisposeAsync
 0660                    throw;
 661                }
 0662                catch (IceRpcException)
 0663                {
 664                    // Expected, typically the peer aborted the connection.
 0665                    throw;
 666                }
 0667                catch (Exception exception)
 0668                {
 0669                    Debug.Fail($"The heartbeat task completed due to an unhandled exception: {exception}");
 0670                    throw;
 671                }
 672
 673                static void EncodeValidateConnectionFrame(IBufferWriter<byte> writer)
 23674                {
 23675                    var encoder = new SliceEncoder(writer, SliceEncoding.Slice1);
 23676                    IceDefinitions.ValidateConnectionFrame.Encode(ref encoder);
 23677                }
 23678            }
 23679        }
 372680    }
 681
 682    private static (int RequestId, IceRequestHeader Header, PipeReader? ContextReader, int Consumed) DecodeRequestIdAndH
 683        ReadOnlySequence<byte> buffer)
 2713684    {
 2713685        var decoder = new SliceDecoder(buffer, SliceEncoding.Slice1);
 686
 2713687        int requestId = decoder.DecodeInt32();
 688
 2713689        var requestHeader = new IceRequestHeader(ref decoder);
 690
 2713691        Pipe? contextPipe = null;
 2713692        long pos = decoder.Consumed;
 2713693        int count = decoder.DecodeSize();
 2713694        if (count > 0)
 8695        {
 32696            for (int i = 0; i < count; ++i)
 8697            {
 8698                decoder.Skip(decoder.DecodeSize()); // Skip the key
 8699                decoder.Skip(decoder.DecodeSize()); // Skip the value
 8700            }
 8701            contextPipe = new Pipe();
 8702            contextPipe.Writer.Write(buffer.Slice(pos, decoder.Consumed - pos));
 8703            contextPipe.Writer.Complete();
 8704        }
 705
 2713706        var encapsulationHeader = new EncapsulationHeader(ref decoder);
 707
 2713708        if (encapsulationHeader.PayloadEncodingMajor != 1 ||
 2713709            encapsulationHeader.PayloadEncodingMinor != 1)
 0710        {
 0711            throw new InvalidDataException(
 0712                $"Unsupported payload encoding '{encapsulationHeader.PayloadEncodingMajor}.{encapsulationHeader.PayloadE
 713        }
 714
 2713715        int payloadSize = encapsulationHeader.EncapsulationSize - 6;
 2713716        if (payloadSize != (buffer.Length - decoder.Consumed))
 0717        {
 0718            throw new InvalidDataException(
 0719                $"Request payload size mismatch: expected {payloadSize} bytes, read {buffer.Length - decoder.Consumed} b
 720        }
 721
 2713722        return (requestId, requestHeader, contextPipe?.Reader, (int)decoder.Consumed);
 2713723    }
 724
 725    private static (StatusCode StatusCode, string? ErrorMessage, SequencePosition Consumed) DecodeResponseHeader(
 726        ReadOnlySequence<byte> buffer,
 727        int requestId)
 661728    {
 661729        ReplyStatus replyStatus = ((int)buffer.FirstSpan[0]).AsReplyStatus();
 730
 661731        if (replyStatus <= ReplyStatus.UserException)
 619732        {
 733            const int headerSize = 7; // reply status byte + encapsulation header
 734
 735            // read and check encapsulation header (6 bytes long)
 736
 619737            if (buffer.Length < headerSize)
 0738            {
 0739                throw new InvalidDataException($"Received invalid frame header for request with id '{requestId}'.");
 740            }
 741
 619742            EncapsulationHeader encapsulationHeader = SliceEncoding.Slice1.DecodeBuffer(
 619743                buffer.Slice(1, 6),
 1238744                (ref SliceDecoder decoder) => new EncapsulationHeader(ref decoder));
 745
 746            // Sanity check
 619747            int payloadSize = encapsulationHeader.EncapsulationSize - 6;
 619748            if (payloadSize != buffer.Length - headerSize)
 0749            {
 0750                throw new InvalidDataException(
 0751                    $"Response payload size/frame size mismatch: payload size is {payloadSize} bytes but frame has {buff
 752            }
 753
 619754            SequencePosition consumed = buffer.GetPosition(headerSize);
 755
 619756            return replyStatus == ReplyStatus.Ok ? (StatusCode.Ok, null, consumed) :
 619757                // Set the error message to the empty string, because null is not allowed for status code > Ok.
 619758                (StatusCode.ApplicationError, "", consumed);
 759        }
 760        else
 42761        {
 762            // An ice system exception.
 763
 42764            StatusCode statusCode = replyStatus switch
 42765            {
 20766                ReplyStatus.ObjectNotExistException => StatusCode.NotFound,
 0767                ReplyStatus.FacetNotExistException => StatusCode.NotFound,
 2768                ReplyStatus.OperationNotExistException => StatusCode.NotImplemented,
 20769                _ => StatusCode.InternalError
 42770            };
 771
 42772            var decoder = new SliceDecoder(buffer.Slice(1), SliceEncoding.Slice1);
 773
 774            string message;
 42775            switch (replyStatus)
 776            {
 777                case ReplyStatus.FacetNotExistException:
 778                case ReplyStatus.ObjectNotExistException:
 779                case ReplyStatus.OperationNotExistException:
 780
 22781                    var requestFailed = new RequestFailedExceptionData(ref decoder);
 782
 22783                    string target = requestFailed.Fragment.Length > 0 ?
 22784                        $"{requestFailed.Identity.ToPath()}#{requestFailed.Fragment}" : requestFailed.Identity.ToPath();
 785
 22786                    message =
 22787                        $"The dispatch failed with status code {statusCode} while dispatching '{requestFailed.Operation}
 22788                    break;
 789                default:
 20790                    message = decoder.DecodeString();
 20791                    break;
 792            }
 42793            decoder.CheckEndOfBuffer();
 42794            return (statusCode, message, buffer.End);
 795        }
 661796    }
 797
 798    private static void EncodeRequestHeader(
 799        IceDuplexConnectionWriter output,
 800        OutgoingRequest request,
 801        int requestId,
 802        int payloadSize)
 2717803    {
 2717804        var encoder = new SliceEncoder(output, SliceEncoding.Slice1);
 805
 806        // Write the request header.
 2717807        encoder.WriteByteSpan(IceDefinitions.FramePrologue);
 2717808        encoder.EncodeIceFrameType(IceFrameType.Request);
 2717809        encoder.EncodeUInt8(0); // compression status
 810
 2717811        Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(4);
 812
 2717813        encoder.EncodeInt32(requestId);
 814
 2717815        byte encodingMajor = 1;
 2717816        byte encodingMinor = 1;
 817
 818        // Request header.
 2717819        var requestHeader = new IceRequestHeader(
 2717820            Identity.Parse(request.ServiceAddress.Path),
 2717821            request.ServiceAddress.Fragment,
 2717822            request.Operation,
 2717823            request.Fields.ContainsKey(RequestFieldKey.Idempotent) ? OperationMode.Idempotent : OperationMode.Normal);
 2717824        requestHeader.Encode(ref encoder);
 2717825        int directWriteSize = 0;
 2717826        if (request.Fields.TryGetValue(RequestFieldKey.Context, out OutgoingFieldValue requestField))
 8827        {
 8828            if (requestField.WriteAction is Action<IBufferWriter<byte>> writeAction)
 8829            {
 830                // This writes directly to the underlying output; we measure how many bytes are written.
 8831                long start = output.UnflushedBytes;
 8832                writeAction(output);
 8833                directWriteSize = (int)(output.UnflushedBytes - start);
 8834            }
 835            else
 0836            {
 0837                encoder.WriteByteSequence(requestField.ByteSequence);
 0838            }
 8839        }
 840        else
 2709841        {
 2709842            encoder.EncodeSize(0);
 2709843        }
 844
 845        // We ignore all other fields. They can't be sent over ice.
 846
 2717847        new EncapsulationHeader(
 2717848            encapsulationSize: payloadSize + 6,
 2717849            encodingMajor,
 2717850            encodingMinor).Encode(ref encoder);
 851
 2717852        int frameSize = checked(encoder.EncodedByteCount + directWriteSize + payloadSize);
 2717853        SliceEncoder.EncodeInt32(frameSize, sizePlaceholder);
 2717854    }
 855
 856    private static void EncodeResponseHeader(
 857        IBufferWriter<byte> writer,
 858        OutgoingResponse response,
 859        IncomingRequest request,
 860        int requestId,
 861        int payloadSize)
 2687862    {
 2687863        var encoder = new SliceEncoder(writer, SliceEncoding.Slice1);
 864
 865        // Write the response header.
 866
 2687867        encoder.WriteByteSpan(IceDefinitions.FramePrologue);
 2687868        encoder.EncodeIceFrameType(IceFrameType.Reply);
 2687869        encoder.EncodeUInt8(0); // compression status
 2687870        Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(4);
 871
 2687872        encoder.EncodeInt32(requestId);
 873
 2687874        if (response.StatusCode > StatusCode.ApplicationError ||
 2687875            (response.StatusCode == StatusCode.ApplicationError && payloadSize == 0))
 46876        {
 877            // system exception
 46878            switch (response.StatusCode)
 879            {
 880                case StatusCode.NotFound:
 881                case StatusCode.NotImplemented:
 26882                    encoder.EncodeReplyStatus(response.StatusCode == StatusCode.NotFound ?
 26883                        ReplyStatus.ObjectNotExistException : ReplyStatus.OperationNotExistException);
 884
 26885                    new RequestFailedExceptionData(Identity.Parse(request.Path), request.Fragment, request.Operation)
 26886                        .Encode(ref encoder);
 26887                    break;
 888                case StatusCode.InternalError:
 14889                    encoder.EncodeReplyStatus(ReplyStatus.UnknownException);
 14890                    encoder.EncodeString(response.ErrorMessage!);
 14891                    break;
 892                default:
 6893                    encoder.EncodeReplyStatus(ReplyStatus.UnknownException);
 6894                    encoder.EncodeString(
 6895                        $"{response.ErrorMessage} {{ Original StatusCode = {response.StatusCode} }}");
 6896                    break;
 897            }
 46898        }
 899        else
 2641900        {
 2641901            encoder.EncodeReplyStatus((ReplyStatus)response.StatusCode);
 902
 903            // When IceRPC receives a response, it ignores the response encoding. So this "1.1" is only relevant to
 904            // a ZeroC Ice client that decodes the response. The only Slice encoding such a client can possibly use
 905            // to decode the response payload is 1.1 or 1.0, and we don't care about interop with 1.0.
 2641906            var encapsulationHeader = new EncapsulationHeader(
 2641907                encapsulationSize: payloadSize + 6,
 2641908                payloadEncodingMajor: 1,
 2641909                payloadEncodingMinor: 1);
 2641910            encapsulationHeader.Encode(ref encoder);
 2641911        }
 912
 2687913        int frameSize = encoder.EncodedByteCount + payloadSize;
 2687914        SliceEncoder.EncodeInt32(frameSize, sizePlaceholder);
 2687915    }
 916
 917    /// <summary>Reads the full Ice payload from the given pipe reader.</summary>
 918    private static async ValueTask<ReadOnlySequence<byte>> ReadFullPayloadAsync(
 919        PipeReader payload,
 920        CancellationToken cancellationToken)
 5366921    {
 922        // We use ReadAtLeastAsync instead of ReadAsync to bypass the PauseWriterThreshold when the payload is
 923        // backed by a Pipe.
 5366924        ReadResult readResult = await payload.ReadAtLeastAsync(int.MaxValue, cancellationToken).ConfigureAwait(false);
 925
 5360926        if (readResult.IsCanceled)
 0927        {
 0928            throw new InvalidOperationException("Unexpected call to CancelPendingRead on ice payload.");
 929        }
 930
 5360931        return readResult.IsCompleted ? readResult.Buffer :
 5360932            throw new ArgumentException("The payload size is greater than int.MaxValue.", nameof(payload));
 5360933    }
 934
 935    /// <summary>Acquires exclusive access to _duplexConnectionWriter.</summary>
 936    /// <returns>A <see cref="SemaphoreLock" /> that releases the acquired semaphore in its Dispose method.</returns>
 937    private async ValueTask<SemaphoreLock> AcquireWriteLockAsync(CancellationToken cancellationToken)
 5638938    {
 5638939        SemaphoreLock semaphoreLock = await _writeSemaphore.AcquireAsync(cancellationToken).ConfigureAwait(false);
 940
 941        // _writeException is protected by _writeSemaphore
 5638942        if (_writeException is not null)
 2943        {
 2944            semaphoreLock.Dispose();
 945
 2946            throw new IceRpcException(
 2947                IceRpcError.ConnectionAborted,
 2948                "The connection was aborted because a previous write operation failed.",
 2949                _writeException);
 950        }
 951
 5636952        return semaphoreLock;
 5636953    }
 954
 955    /// <summary>Creates a pipe reader to simplify the reading of a request or response frame. The frame is read fully
 956    /// and buffered into an internal pipe.</summary>
 957    private async ValueTask<PipeReader> CreateFrameReaderAsync(int size, CancellationToken cancellationToken)
 5390958    {
 5390959        var pipe = new Pipe(_pipeOptions);
 960
 961        try
 5390962        {
 5390963            await _duplexConnectionReader.FillBufferWriterAsync(pipe.Writer, size, cancellationToken)
 5390964                .ConfigureAwait(false);
 5390965        }
 0966        catch
 0967        {
 0968            pipe.Reader.Complete();
 0969            throw;
 970        }
 971        finally
 5390972        {
 5390973            pipe.Writer.Complete();
 5390974        }
 975
 5390976        return pipe.Reader;
 5390977    }
 978
 979    private void DecrementDispatchInvocationCount()
 5426980    {
 5426981        lock (_mutex)
 5426982        {
 5426983            if (--_dispatchInvocationCount == 0)
 2266984            {
 2266985                if (_shutdownTask is not null)
 42986                {
 42987                    _dispatchesAndInvocationsCompleted.TrySetResult();
 42988                }
 989                // We enable the inactivity check in order to complete ShutdownRequested when inactive for too long.
 990                // _refuseInvocations is true when the connection is either about to be "shutdown requested", or shut
 991                // down / disposed. We don't need to complete ShutdownRequested in any of these situations.
 2224992                else if (!_refuseInvocations)
 2204993                {
 2204994                    ScheduleInactivityCheck();
 2204995                }
 2266996            }
 5426997        }
 5426998    }
 999
 1000    /// <summary>Dispatches an incoming request. This method executes in a task spawn from the read frames loop.
 1001    /// </summary>
 1002    private async Task DispatchRequestAsync(IncomingRequest request, int requestId, PipeReader? contextReader)
 27091003    {
 27091004        CancellationToken cancellationToken = request.IsOneway ? _disposedCts.Token : _twowayDispatchesCts.Token;
 1005
 1006        OutgoingResponse? response;
 1007        try
 27091008        {
 1009            // The dispatcher can complete the incoming request payload to release its memory as soon as possible.
 1010            try
 27091011            {
 1012                // _dispatcher.DispatchAsync may very well ignore the cancellation token and we don't want to keep
 1013                // dispatching when the cancellation token is canceled.
 27091014                cancellationToken.ThrowIfCancellationRequested();
 1015
 27091016                response = await _dispatcher.DispatchAsync(request, cancellationToken).ConfigureAwait(false);
 26791017            }
 1018            finally
 27091019            {
 27091020                _dispatchSemaphore?.Release();
 27091021            }
 1022
 26791023            if (response != request.Response)
 21024            {
 21025                throw new InvalidOperationException(
 21026                    "The dispatcher did not return the last response created for this request.");
 1027            }
 26771028        }
 321029        catch when (request.IsOneway)
 01030        {
 1031            // ignored since we're not returning anything
 01032            response = null;
 01033        }
 201034        catch (OperationCanceledException exception) when (exception.CancellationToken == cancellationToken)
 181035        {
 1036            // expected when the connection is disposed or the request is canceled by the peer's shutdown
 181037            response = null;
 181038        }
 141039        catch (Exception exception)
 141040        {
 141041            if (exception is not DispatchException dispatchException)
 101042            {
 101043                dispatchException = new DispatchException(StatusCode.InternalError, innerException: exception);
 101044            }
 141045            response = dispatchException.ToOutgoingResponse(request);
 141046        }
 1047        finally
 27091048        {
 27091049            request.Payload.Complete();
 27091050            contextReader?.Complete();
 1051
 1052            // The field values are now invalid - they point to potentially recycled and reused memory. We
 1053            // replace Fields by an empty dictionary to prevent accidental access to this reused memory.
 27091054            request.Fields = ImmutableDictionary<RequestFieldKey, ReadOnlySequence<byte>>.Empty;
 27091055        }
 1056
 1057        try
 27091058        {
 27091059            if (response is not null)
 26911060            {
 1061                // Read the full response payload. This can take some time so this needs to be done before acquiring
 1062                // the write semaphore.
 26911063                ReadOnlySequence<byte> payload = ReadOnlySequence<byte>.Empty;
 1064
 26911065                if (response.StatusCode <= StatusCode.ApplicationError)
 26491066                {
 1067                    try
 26491068                    {
 26491069                        payload = await ReadFullPayloadAsync(response.Payload, cancellationToken)
 26491070                            .ConfigureAwait(false);
 26431071                    }
 41072                    catch (OperationCanceledException exception) when (exception.CancellationToken == cancellationToken)
 41073                    {
 41074                        throw;
 1075                    }
 21076                    catch (Exception exception)
 21077                    {
 1078                        // We "encode" the exception in the error message.
 1079
 21080                        response = new OutgoingResponse(
 21081                            request,
 21082                            StatusCode.InternalError,
 21083                            "The dispatch failed to read the response payload.",
 21084                            exception);
 21085                    }
 26451086                }
 1087                // else payload remains empty because the payload of a dispatch exception (if any) cannot be sent
 1088                // over ice.
 1089
 26871090                int payloadSize = checked((int)payload.Length);
 1091
 1092                // Wait for writing of other frames to complete.
 26871093                using SemaphoreLock _ = await AcquireWriteLockAsync(cancellationToken).ConfigureAwait(false);
 1094                try
 26871095                {
 26871096                    EncodeResponseHeader(_duplexConnectionWriter, response, request, requestId, payloadSize);
 1097
 1098                    // We write to the duplex connection with _disposedCts.Token instead of cancellationToken.
 1099                    // Canceling this write operation is fatal to the connection.
 26871100                    await _duplexConnectionWriter.WriteAsync(payload, _disposedCts.Token).ConfigureAwait(false);
 26851101                }
 21102                catch (Exception exception)
 21103                {
 21104                    WriteFailed(exception);
 21105                    throw;
 1106                }
 26851107            }
 27031108        }
 61109        catch (OperationCanceledException exception) when (
 61110            exception.CancellationToken == _disposedCts.Token ||
 61111            exception.CancellationToken == cancellationToken)
 61112        {
 1113            // expected when the connection is disposed or the request is canceled by the peer's shutdown
 61114        }
 1115        finally
 27091116        {
 27091117            DecrementDispatchInvocationCount();
 27091118        }
 27091119    }
 1120
 1121    /// <summary>Increments the dispatch-invocation count.</summary>
 1122    /// <remarks>This method must be called with _mutex locked.</remarks>
 1123    private void IncrementDispatchInvocationCount()
 54261124    {
 54261125        if (_dispatchInvocationCount++ == 0)
 22661126        {
 1127            // Cancel inactivity check.
 22661128            _inactivityTimeoutTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
 22661129        }
 54261130    }
 1131
 1132    private void ScheduleInactivityCheck() =>
 25281133        _inactivityTimeoutTimer.Change(_inactivityTimeout, Timeout.InfiniteTimeSpan);
 1134
 1135    /// <summary>Reads incoming frames and returns successfully when a CloseConnection frame is received or when the
 1136    /// connection is aborted during ShutdownAsync or canceled by DisposeAsync.</summary>
 1137    private async Task ReadFramesAsync(CancellationToken cancellationToken)
 3241138    {
 3241139        await Task.Yield(); // exit mutex lock
 1140
 1141        // Wait for _connectTask (which spawned the task running this method) to complete. This way, we won't dispatch
 1142        // any request until _connectTask has completed successfully, and indirectly we won't make any invocation until
 1143        // _connectTask has completed successfully. The creation of the _readFramesTask is the last action taken by
 1144        // _connectTask and as a result this await can't fail.
 3241145        await _connectTask!.ConfigureAwait(false);
 1146
 1147        try
 3241148        {
 57371149            while (!cancellationToken.IsCancellationRequested)
 57331150            {
 57331151                ReadOnlySequence<byte> buffer = await _duplexConnectionReader.ReadAtLeastAsync(
 57331152                    IceDefinitions.PrologueSize,
 57331153                    cancellationToken).ConfigureAwait(false);
 1154
 1155                // First decode and check the prologue.
 1156
 54551157                ReadOnlySequence<byte> prologueBuffer = buffer.Slice(0, IceDefinitions.PrologueSize);
 1158
 54551159                IcePrologue prologue = SliceEncoding.Slice1.DecodeBuffer(
 54551160                    prologueBuffer,
 109101161                    (ref SliceDecoder decoder) => new IcePrologue(ref decoder));
 1162
 54551163                _duplexConnectionReader.AdvanceTo(prologueBuffer.End);
 1164
 54551165                IceDefinitions.CheckPrologue(prologue);
 54531166                if (prologue.FrameSize > _maxFrameSize)
 21167                {
 21168                    throw new InvalidDataException(
 21169                        $"Received frame with size ({prologue.FrameSize}) greater than max frame size.");
 1170                }
 1171
 54511172                if (prologue.CompressionStatus == 2)
 01173                {
 1174                    // The exception handler calls ReadFailed.
 01175                    throw new IceRpcException(
 01176                        IceRpcError.ConnectionAborted,
 01177                        "The connection was aborted because it received a compressed ice frame, and IceRPC does not supp
 1178                }
 1179
 1180                // Then process the frame based on its type.
 54511181                switch (prologue.FrameType)
 1182                {
 1183                    case IceFrameType.CloseConnection:
 381184                    {
 381185                        if (prologue.FrameSize != IceDefinitions.PrologueSize)
 01186                        {
 01187                            throw new InvalidDataException(
 01188                                $"Received {nameof(IceFrameType.CloseConnection)} frame with unexpected data.");
 1189                        }
 1190
 381191                        lock (_mutex)
 381192                        {
 381193                            RefuseNewInvocations(
 381194                                "The connection was shut down because it received a CloseConnection frame from the peer.
 1195
 1196                            // By exiting the "read frames loop" below, we are refusing new dispatches as well.
 1197
 1198                            // Only one side sends the CloseConnection frame.
 381199                            _sendCloseConnectionFrame = false;
 381200                        }
 1201
 1202                        // Even though we're in the "read frames loop", it's ok to cancel CTS and a "synchronous" TCS
 1203                        // below. We won't be reading anything else so it's ok to run continuations synchronously.
 1204
 1205                        // Abort two-way invocations that are waiting for a response (it will never come).
 381206                        AbortTwowayInvocations(
 381207                            IceRpcError.InvocationCanceled,
 381208                            "The invocation was canceled by the shutdown of the peer.");
 1209
 1210                        // Cancel two-way dispatches since the peer is not interested in the responses. This does not
 1211                        // cancel ongoing writes to _duplexConnection: we don't send incomplete/invalid data.
 381212                        _twowayDispatchesCts.Cancel();
 1213
 1214                        // We keep sending heartbeats. If the shutdown request / shutdown is not fulfilled quickly, they
 1215                        // tell the peer we're still alive and maybe stuck waiting for invocations and dispatches to
 1216                        // complete.
 1217
 1218                        // We request a shutdown that will dispose _duplexConnection once all invocations and dispatches
 1219                        // have completed.
 381220                        _shutdownRequestedTcs.TrySetResult();
 381221                        return;
 1222                    }
 1223
 1224                    case IceFrameType.Request:
 27131225                        await ReadRequestAsync(prologue.FrameSize, cancellationToken).ConfigureAwait(false);
 27131226                        break;
 1227
 1228                    case IceFrameType.RequestBatch:
 1229                        // The exception handler calls ReadFailed.
 01230                        throw new IceRpcException(
 01231                            IceRpcError.ConnectionAborted,
 01232                            "The connection was aborted because it received a batch request, and IceRPC does not support
 1233
 1234                    case IceFrameType.Reply:
 26771235                        await ReadReplyAsync(prologue.FrameSize, cancellationToken).ConfigureAwait(false);
 26771236                        break;
 1237
 1238                    case IceFrameType.ValidateConnection:
 231239                    {
 231240                        if (prologue.FrameSize != IceDefinitions.PrologueSize)
 01241                        {
 01242                            throw new InvalidDataException(
 01243                                $"Received {nameof(IceFrameType.ValidateConnection)} frame with unexpected data.");
 1244                        }
 231245                        break;
 1246                    }
 1247
 1248                    default:
 01249                    {
 01250                        throw new InvalidDataException(
 01251                            $"Received Ice frame with unknown frame type '{prologue.FrameType}'.");
 1252                    }
 1253                }
 54131254            } // while
 41255        }
 1151256        catch (OperationCanceledException)
 1151257        {
 1258            // canceled by DisposeAsync, no need to throw anything
 1151259        }
 1631260        catch (IceRpcException exception) when (
 1631261            exception.IceRpcError == IceRpcError.ConnectionAborted &&
 1631262            _dispatchesAndInvocationsCompleted.Task.IsCompleted)
 1181263        {
 1264            // The peer acknowledged receipt of the CloseConnection frame by aborting the duplex connection. Return.
 1265            // See ShutdownAsync.
 1181266        }
 451267        catch (IceRpcException exception)
 451268        {
 451269            ReadFailed(exception);
 451270            throw;
 1271        }
 41272        catch (InvalidDataException exception)
 41273        {
 41274            ReadFailed(exception);
 41275            throw new IceRpcException(
 41276                IceRpcError.ConnectionAborted,
 41277                "The connection was aborted by an ice protocol error.",
 41278                exception);
 1279        }
 01280        catch (Exception exception)
 01281        {
 01282            Debug.Fail($"The read frames task completed due to an unhandled exception: {exception}");
 01283            ReadFailed(exception);
 01284            throw;
 1285        }
 1286
 1287        // Aborts all pending two-way invocations. Must be called outside the mutex lock after setting
 1288        // _refuseInvocations to true.
 1289        void AbortTwowayInvocations(IceRpcError error, string message, Exception? exception = null)
 871290        {
 871291            Debug.Assert(_refuseInvocations);
 1292
 1293            // _twowayInvocations is immutable once _refuseInvocations is true.
 3091294            foreach (TaskCompletionSource<PipeReader> responseCompletionSource in _twowayInvocations.Values)
 241295            {
 1296                // _twowayInvocations can hold completed completion sources.
 241297                _ = responseCompletionSource.TrySetException(new IceRpcException(error, message, exception));
 241298            }
 871299        }
 1300
 1301        // Takes appropriate action after a read failure.
 1302        void ReadFailed(Exception exception)
 491303        {
 1304            // We also prevent new one-way invocations even though they don't need to read the connection.
 491305            RefuseNewInvocations("The connection was lost because a read operation failed.");
 1306
 1307            // It's ok to cancel CTS and a "synchronous" TCS below. We won't be reading anything else so it's ok to run
 1308            // continuations synchronously.
 1309
 491310            AbortTwowayInvocations(
 491311                IceRpcError.ConnectionAborted,
 491312                "The invocation was aborted because the connection was lost.",
 491313                exception);
 1314
 1315            // ReadFailed is called when the connection is dead or the peer sent us a non-supported frame (e.g. a
 1316            // batch request). We don't need to allow outstanding two-way dispatches to complete in these situations, so
 1317            // we cancel them to speed-up the shutdown.
 491318            _twowayDispatchesCts.Cancel();
 1319
 491320            lock (_mutex)
 491321            {
 1322                // Don't send a close connection frame since we can't wait for the peer's acknowledgment.
 491323                _sendCloseConnectionFrame = false;
 491324            }
 1325
 491326            _ = _shutdownRequestedTcs.TrySetResult();
 491327        }
 2751328    }
 1329
 1330    /// <summary>Reads a reply (incoming response) and completes the invocation response completion source with this
 1331    /// response. This method executes "synchronously" in the read frames loop.</summary>
 1332    private async Task ReadReplyAsync(int replyFrameSize, CancellationToken cancellationToken)
 26771333    {
 1334        // Read the remainder of the frame immediately into frameReader.
 26771335        PipeReader replyFrameReader = await CreateFrameReaderAsync(
 26771336            replyFrameSize - IceDefinitions.PrologueSize,
 26771337            cancellationToken).ConfigureAwait(false);
 1338
 26771339        bool completeFrameReader = true;
 1340
 1341        try
 26771342        {
 1343            // Read and decode request ID
 26771344            if (!replyFrameReader.TryRead(out ReadResult readResult) || readResult.Buffer.Length < 4)
 01345            {
 01346                throw new InvalidDataException("Received a response with an invalid request ID.");
 1347            }
 1348
 26771349            ReadOnlySequence<byte> requestIdBuffer = readResult.Buffer.Slice(0, 4);
 26771350            int requestId = SliceEncoding.Slice1.DecodeBuffer(
 26771351                requestIdBuffer,
 53541352                (ref SliceDecoder decoder) => decoder.DecodeInt32());
 26771353            replyFrameReader.AdvanceTo(requestIdBuffer.End);
 1354
 26771355            lock (_mutex)
 26771356            {
 26771357                if (_twowayInvocations.TryGetValue(
 26771358                    requestId,
 26771359                    out TaskCompletionSource<PipeReader>? responseCompletionSource))
 6611360                {
 1361                    // continuation runs asynchronously
 6611362                    if (responseCompletionSource.TrySetResult(replyFrameReader))
 6611363                    {
 6611364                        completeFrameReader = false;
 6611365                    }
 1366                    // else this invocation just completed and is about to remove itself from _twowayInvocations,
 1367                    // or _twowayInvocations is immutable and contains entries for completed invocations.
 6611368                }
 1369                // else the request ID carried by the response is bogus or corresponds to a request that was previously
 1370                // discarded (for example, because its deadline expired).
 26771371            }
 26771372        }
 1373        finally
 26771374        {
 26771375            if (completeFrameReader)
 20161376            {
 20161377                replyFrameReader.Complete();
 20161378            }
 26771379        }
 26771380    }
 1381
 1382    /// <summary>Reads and then dispatches an incoming request in a separate dispatch task. This method executes
 1383    /// "synchronously" in the read frames loop.</summary>
 1384    private async Task ReadRequestAsync(int requestFrameSize, CancellationToken cancellationToken)
 27131385    {
 1386        // Read the request frame.
 27131387        PipeReader requestFrameReader = await CreateFrameReaderAsync(
 27131388            requestFrameSize - IceDefinitions.PrologueSize,
 27131389            cancellationToken).ConfigureAwait(false);
 1390
 1391        // Decode its header.
 1392        int requestId;
 1393        IceRequestHeader requestHeader;
 27131394        PipeReader? contextReader = null;
 1395        IDictionary<RequestFieldKey, ReadOnlySequence<byte>>? fields;
 27131396        Task? dispatchTask = null;
 1397
 1398        try
 27131399        {
 27131400            if (!requestFrameReader.TryRead(out ReadResult readResult))
 01401            {
 01402                throw new InvalidDataException("Received an invalid request frame.");
 1403            }
 1404
 27131405            Debug.Assert(readResult.IsCompleted);
 1406
 27131407            (requestId, requestHeader, contextReader, int consumed) = DecodeRequestIdAndHeader(readResult.Buffer);
 27131408            requestFrameReader.AdvanceTo(readResult.Buffer.GetPosition(consumed));
 1409
 27131410            if (contextReader is null)
 27051411            {
 27051412                fields = requestHeader.OperationMode == OperationMode.Normal ?
 27051413                    ImmutableDictionary<RequestFieldKey, ReadOnlySequence<byte>>.Empty : _idempotentFields;
 27051414            }
 1415            else
 81416            {
 81417                contextReader.TryRead(out ReadResult result);
 81418                Debug.Assert(result.Buffer.Length > 0 && result.IsCompleted);
 81419                fields = new Dictionary<RequestFieldKey, ReadOnlySequence<byte>>()
 81420                {
 81421                    [RequestFieldKey.Context] = result.Buffer
 81422                };
 1423
 81424                if (requestHeader.OperationMode != OperationMode.Normal)
 01425                {
 1426                    // OperationMode can be Idempotent or Nonmutating.
 01427                    fields[RequestFieldKey.Idempotent] = default;
 01428                }
 81429            }
 1430
 27131431            bool releaseDispatchSemaphore = false;
 27131432            if (_dispatchSemaphore is SemaphoreSlim dispatchSemaphore)
 27131433            {
 1434                // This prevents us from receiving any new frames if we're already dispatching the maximum number
 1435                // of requests. We need to do this in the "accept from network loop" to apply back pressure to the
 1436                // caller.
 1437                try
 27131438                {
 27131439                    await dispatchSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
 27131440                    releaseDispatchSemaphore = true;
 27131441                }
 01442                catch (OperationCanceledException)
 01443                {
 1444                    // and return below
 01445                }
 27131446            }
 1447
 27131448            lock (_mutex)
 27131449            {
 27131450                if (_shutdownTask is not null)
 41451                {
 1452                    // The connection is (being) disposed or the connection is shutting down and received a request.
 1453                    // We simply discard it. For a graceful shutdown, the two-way invocation in the peer will throw
 1454                    // IceRpcException(InvocationCanceled). We also discard one-way requests: if we accepted them, they
 1455                    // could delay our shutdown and make it time out.
 41456                    if (releaseDispatchSemaphore)
 41457                    {
 41458                        _dispatchSemaphore!.Release();
 41459                    }
 41460                    return;
 1461                }
 1462
 27091463                IncrementDispatchInvocationCount();
 27091464            }
 1465
 1466            // The scheduling of the task can't be canceled since we want to make sure DispatchRequestAsync will
 1467            // cleanup (decrement _dispatchCount etc.) if DisposeAsync is called. dispatchTask takes ownership of the
 1468            // requestFrameReader and contextReader.
 27091469            dispatchTask = Task.Run(
 27091470                async () =>
 27091471                {
 27091472                    using var request = new IncomingRequest(Protocol.Ice, _connectionContext!)
 27091473                    {
 27091474                        Fields = fields,
 27091475                        Fragment = requestHeader.Fragment,
 27091476                        IsOneway = requestId == 0,
 27091477                        Operation = requestHeader.Operation,
 27091478                        Path = requestHeader.Identity.ToPath(),
 27091479                        Payload = requestFrameReader,
 27091480                    };
 27091481
 27091482                    try
 27091483                    {
 27091484                        await DispatchRequestAsync(
 27091485                            request,
 27091486                            requestId,
 27091487                            contextReader).ConfigureAwait(false);
 27091488                    }
 01489                    catch (IceRpcException)
 01490                    {
 27091491                        // expected when the peer aborts the connection.
 01492                    }
 01493                    catch (Exception exception)
 01494                    {
 27091495                        // With ice, a dispatch cannot throw an exception that comes from the application code:
 27091496                        // any exception thrown when reading the response payload is converted into a DispatchException
 27091497                        // response, and the response header has no fields to encode.
 01498                        Debug.Fail($"ice dispatch {request} failed with an unexpected exception: {exception}");
 01499                        throw;
 27091500                    }
 27091501                },
 27091502                CancellationToken.None);
 27091503        }
 1504        finally
 27131505        {
 27131506            if (dispatchTask is null)
 41507            {
 41508                requestFrameReader.Complete();
 41509                contextReader?.Complete();
 41510            }
 27131511        }
 27131512    }
 1513
 1514    private void RefuseNewInvocations(string message)
 5821515    {
 5821516        lock (_mutex)
 5821517        {
 5821518            _refuseInvocations = true;
 5821519            _invocationRefusedMessage ??= message;
 5821520        }
 5821521    }
 1522
 1523    /// <summary>Sends a control frame. It takes care of acquiring and releasing the write lock and calls
 1524    /// <see cref="WriteFailed" /> if a failure occurs while writing to _duplexConnectionWriter.</summary>
 1525    /// <param name="encode">Encodes the control frame.</param>
 1526    /// <param name="cancellationToken">The cancellation token.</param>
 1527    /// <remarks>If the cancellation token is canceled while writing to the duplex connection, the connection is
 1528    /// aborted.</remarks>
 1529    private async ValueTask SendControlFrameAsync(
 1530        Action<IBufferWriter<byte>> encode,
 1531        CancellationToken cancellationToken)
 2341532    {
 2341533        using SemaphoreLock _ = await AcquireWriteLockAsync(cancellationToken).ConfigureAwait(false);
 1534
 1535        try
 2321536        {
 2321537            encode(_duplexConnectionWriter);
 2321538            await _duplexConnectionWriter.FlushAsync(cancellationToken).ConfigureAwait(false);
 2261539        }
 61540        catch (Exception exception)
 61541        {
 61542            WriteFailed(exception);
 61543            throw;
 1544        }
 2261545    }
 1546
 1547    /// <summary>Takes appropriate action after a write failure.</summary>
 1548    /// <remarks>Must be called outside the mutex lock but after acquiring _writeSemaphore.</remarks>
 1549    private void WriteFailed(Exception exception)
 101550    {
 101551        Debug.Assert(_writeException is null);
 101552        _writeException = exception; // protected by _writeSemaphore
 1553
 1554        // We can't send new invocations without writing to the connection.
 101555        RefuseNewInvocations("The connection was lost because a write operation failed.");
 1556
 1557        // We can't send responses so these dispatches can be canceled.
 101558        _twowayDispatchesCts.Cancel();
 1559
 1560        // We don't change _sendClosedConnectionFrame. If the _readFrameTask is still running, we want ShutdownAsync
 1561        // to send CloseConnection - and fail.
 1562
 101563        _ = _shutdownRequestedTcs.TrySetResult();
 101564    }
 1565}