< Summary

Information
Class: IceRpc.Transports.Tcp.Internal.TcpConnection
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Tcp/Internal/TcpConnection.cs
Tag: 275_13775359185
Line coverage
76%
Covered lines: 118
Uncovered lines: 37
Coverable lines: 155
Total lines: 411
Line coverage: 76.1%
Branch coverage
77%
Covered branches: 28
Total branches: 36
Branch coverage: 77.7%
Method coverage
90%
Covered methods: 9
Total methods: 10
Method coverage: 90%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%22100%
ConnectAsync(...)100%11100%
Dispose()100%66100%
ReadAsync(...)50%22100%
PerformReadAsync()100%22100%
ShutdownWriteAsync(...)100%11100%
PerformShutdownAsync()100%2.3257.89%
WriteAsync(...)100%11100%
PerformWriteAsync()68.18%30.492274.02%
AbortAndObserveAsync()100%210%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Tcp/Internal/TcpConnection.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using System.Buffers;
 4using System.Diagnostics;
 5using System.Net;
 6using System.Net.Security;
 7using System.Net.Sockets;
 8using System.Runtime.InteropServices;
 9
 10namespace IceRpc.Transports.Tcp.Internal;
 11
 12/// <summary>Implements <see cref="IDuplexConnection" /> for tcp with or without TLS.</summary>
 13/// <remarks>Unlike Coloc, the Tcp transport is not a "checked" transport, which means it does not need to detect
 14/// violations of the duplex transport contract or report such violations. It assumes its clients are sufficiently well
 15/// tested to never violate this contract. As a result, this implementation does not throw
 16/// <see cref="InvalidOperationException" />.</remarks>
 17internal abstract class TcpConnection : IDuplexConnection
 18{
 19    internal abstract Socket Socket { get; }
 20
 21    internal abstract SslStream? SslStream { get; }
 22
 23    private protected volatile bool _isDisposed;
 24
 25    // The MaxDataSize of the SSL implementation.
 26    private const int MaxSslDataSize = 16 * 1024;
 27
 28    private bool _isShutdown;
 29    private readonly int _maxSslBufferSize;
 43230    private readonly List<ArraySegment<byte>> _segments = new();
 31    private readonly IMemoryOwner<byte>? _writeBufferOwner;
 32
 33    public Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken)
 39634    {
 39635        ObjectDisposedException.ThrowIf(_isDisposed, this);
 39636        return ConnectAsyncCore(cancellationToken);
 39637    }
 38
 39    public void Dispose()
 46640    {
 46641        _isDisposed = true;
 42
 46643        if (SslStream is SslStream sslStream)
 11844        {
 11845            sslStream.Dispose();
 11846        }
 47
 48        // If shutdown was called, we can just dispose the connection to complete the graceful TCP closure. Otherwise,
 49        // we abort the TCP connection to ensure the connection doesn't end up in the TIME_WAIT state.
 46650        if (_isShutdown)
 5051        {
 5052            Socket.Dispose();
 5053        }
 54        else
 41655        {
 41656            Socket.Close(0);
 41657        }
 46658        _writeBufferOwner?.Dispose();
 46659    }
 60
 61    public ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
 1864262    {
 1864263        ObjectDisposedException.ThrowIf(_isDisposed, this);
 64
 1864265        return buffer.Length > 0 ? PerformReadAsync() :
 1864266            throw new ArgumentException($"The {nameof(buffer)} cannot be empty.", nameof(buffer));
 67
 68        async ValueTask<int> PerformReadAsync()
 1864269        {
 70            try
 1864271            {
 1864272                return SslStream is SslStream sslStream ?
 1864273                    await SslStream.ReadAsync(buffer, cancellationToken).ConfigureAwait(false) :
 1864274                    await Socket.ReceiveAsync(buffer, SocketFlags.None, cancellationToken).ConfigureAwait(false);
 75            }
 1276            catch (IOException exception)
 1277            {
 1278                throw exception.ToIceRpcException();
 79            }
 2480            catch (SocketException exception)
 2481            {
 2482                throw exception.ToIceRpcException();
 83            }
 1856684        }
 1864285    }
 86
 87    public Task ShutdownWriteAsync(CancellationToken cancellationToken)
 5088    {
 5089        ObjectDisposedException.ThrowIf(_isDisposed, this);
 90
 5091        return PerformShutdownAsync();
 92
 93        async Task PerformShutdownAsync()
 5094        {
 95            try
 5096            {
 5097                if (SslStream is SslStream sslStream)
 1498                {
 1499                    Task shutdownTask = sslStream.ShutdownAsync();
 100
 101                    try
 14102                    {
 14103                        await shutdownTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 14104                    }
 0105                    catch (OperationCanceledException)
 0106                    {
 0107                        await AbortAndObserveAsync(shutdownTask).ConfigureAwait(false);
 0108                        throw;
 109                    }
 110                }
 111
 112                // Shutdown the socket send side to send a TCP FIN packet. We don't close the read side because we want
 113                // to be notified when the peer shuts down it's side of the socket (through the ReceiveAsync call).
 50114                Socket.Shutdown(SocketShutdown.Send);
 115
 116                // If shutdown is successful mark the connection as shutdown to ensure Dispose won't reset the TCP
 117                // connection.
 50118                _isShutdown = true;
 119            }
 0120            catch (IOException exception)
 121            {
 0122                throw exception.ToIceRpcException();
 123            }
 0124            catch (SocketException exception)
 125            {
 0126                throw exception.ToIceRpcException();
 127            }
 50128        }
 50129    }
 130
 131    public ValueTask WriteAsync(ReadOnlySequence<byte> buffer, CancellationToken cancellationToken)
 182132    {
 182133        ObjectDisposedException.ThrowIf(_isDisposed, this);
 182134        return PerformWriteAsync();
 135
 136        async ValueTask PerformWriteAsync()
 182137        {
 138            try
 182139            {
 182140                if (SslStream is SslStream sslStream)
 28141                {
 28142                    if (buffer.IsSingleSegment)
 24143                    {
 24144                        await sslStream.WriteAsync(buffer.First, cancellationToken).ConfigureAwait(false);
 16145                    }
 146                    else
 4147                    {
 148                        // Coalesce leading segments up to _maxSslBufferSize. We don't coalesce trailing segments as we
 149                        // assume these segments are large enough.
 4150                        int leadingSize = 0;
 4151                        int leadingSegmentCount = 0;
 608152                        foreach (ReadOnlyMemory<byte> memory in buffer)
 298153                        {
 298154                            if (leadingSize + memory.Length <= _maxSslBufferSize)
 298155                            {
 298156                                leadingSize += memory.Length;
 298157                                leadingSegmentCount++;
 298158                            }
 159                            else
 0160                            {
 0161                                break;
 162                            }
 298163                        }
 164
 4165                        if (leadingSegmentCount > 1)
 4166                        {
 4167                            ReadOnlySequence<byte> leading = buffer.Slice(0, leadingSize);
 4168                            buffer = buffer.Slice(leadingSize); // buffer can become empty
 169
 4170                            Debug.Assert(_writeBufferOwner is not null);
 4171                            Memory<byte> writeBuffer = _writeBufferOwner.Memory[0..leadingSize];
 4172                            leading.CopyTo(writeBuffer.Span);
 173
 174                            // Send the "coalesced" leading segments
 4175                            await sslStream.WriteAsync(writeBuffer, cancellationToken).ConfigureAwait(false);
 4176                        }
 177                        // else no need to coalesce (copy) a single segment
 178
 179                        // Send the remaining segments one by one
 4180                        if (buffer.IsEmpty)
 4181                        {
 182                            // done
 4183                        }
 0184                        else if (buffer.IsSingleSegment)
 0185                        {
 0186                            await sslStream.WriteAsync(buffer.First, cancellationToken).ConfigureAwait(false);
 0187                        }
 188                        else
 0189                        {
 0190                            foreach (ReadOnlyMemory<byte> memory in buffer)
 0191                            {
 0192                                await sslStream.WriteAsync(memory, cancellationToken).ConfigureAwait(false);
 0193                            }
 0194                        }
 4195                    }
 20196                }
 197                else
 154198                {
 154199                    if (buffer.IsSingleSegment)
 146200                    {
 146201                        _ = await Socket.SendAsync(buffer.First, SocketFlags.None, cancellationToken)
 146202                            .ConfigureAwait(false);
 130203                    }
 204                    else
 8205                    {
 8206                        _segments.Clear();
 1216207                        foreach (ReadOnlyMemory<byte> memory in buffer)
 596208                        {
 596209                            if (MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment))
 596210                            {
 596211                                _segments.Add(segment);
 596212                            }
 213                            else
 0214                            {
 0215                                throw new ArgumentException(
 0216                                    $"The {nameof(buffer)} must be backed by arrays.",
 0217                                    nameof(buffer));
 218                            }
 596219                        }
 220
 8221                        Task sendTask = Socket.SendAsync(_segments, SocketFlags.None);
 222
 223                        try
 8224                        {
 8225                            await sendTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 8226                        }
 0227                        catch (OperationCanceledException)
 0228                        {
 0229                            await AbortAndObserveAsync(sendTask).ConfigureAwait(false);
 0230                            throw;
 231                        }
 232                    }
 233                }
 234            }
 2235            catch (IOException exception)
 236            {
 2237                throw exception.ToIceRpcException();
 238            }
 8239            catch (SocketException exception)
 240            {
 8241                throw exception.ToIceRpcException();
 242            }
 158243        }
 182244    }
 245
 432246    private protected TcpConnection(IMemoryOwner<byte>? memoryOwner)
 432247    {
 432248        _writeBufferOwner = memoryOwner;
 249        // When coalescing leading buffers in WriteAsync (SSL only), the upper size limit is the lesser of the size of
 250        // the buffer we rented from the memory pool (typically 4K) and MaxSslDataSize (16K).
 432251        _maxSslBufferSize = Math.Min(memoryOwner?.Memory.Length ?? 0, MaxSslDataSize);
 432252    }
 253
 254    private protected abstract Task<TransportConnectionInformation> ConnectAsyncCore(
 255        CancellationToken cancellationToken);
 256
 257    /// <summary>Aborts the connection and then observes the exception of the provided task.</summary>
 258    private async Task AbortAndObserveAsync(Task task)
 0259    {
 0260        Socket.Close(0);
 261        try
 0262        {
 0263            await task.ConfigureAwait(false);
 0264        }
 0265        catch
 0266        {
 267            // observe exception
 0268        }
 0269    }
 270}
 271
 272internal class TcpClientConnection : TcpConnection
 273{
 274    internal override Socket Socket { get; }
 275
 276    internal override SslStream? SslStream => _sslStream;
 277
 278    private readonly EndPoint _addr;
 279    private readonly SslClientAuthenticationOptions? _authenticationOptions;
 280
 281    private SslStream? _sslStream;
 282
 283    internal TcpClientConnection(
 284        ServerAddress serverAddress,
 285        SslClientAuthenticationOptions? authenticationOptions,
 286        MemoryPool<byte> pool,
 287        int minimumSegmentSize,
 288        TcpClientTransportOptions options)
 289        : base(authenticationOptions is not null ? pool.Rent(minimumSegmentSize) : null)
 290    {
 291        _addr = IPAddress.TryParse(serverAddress.Host, out IPAddress? ipAddress) ?
 292            new IPEndPoint(ipAddress, serverAddress.Port) :
 293            new DnsEndPoint(serverAddress.Host, serverAddress.Port);
 294
 295        _authenticationOptions = authenticationOptions;
 296
 297        // When using IPv6 address family we use the socket constructor without AddressFamily parameter to ensure
 298        // dual-mode socket are used in platforms that support them.
 299        Socket = ipAddress?.AddressFamily == AddressFamily.InterNetwork ?
 300            new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp) :
 301            new Socket(SocketType.Stream, ProtocolType.Tcp);
 302
 303        try
 304        {
 305            if (options.LocalNetworkAddress is IPEndPoint localNetworkAddress)
 306            {
 307                Socket.Bind(localNetworkAddress);
 308            }
 309
 310            Socket.Configure(options);
 311        }
 312        catch (SocketException exception)
 313        {
 314            Socket.Dispose();
 315            throw exception.ToIceRpcException();
 316        }
 317        catch
 318        {
 319            Socket.Dispose();
 320            throw;
 321        }
 322    }
 323
 324    private protected override async Task<TransportConnectionInformation> ConnectAsyncCore(
 325        CancellationToken cancellationToken)
 326    {
 327        try
 328        {
 329            Debug.Assert(Socket is not null);
 330
 331            // Connect to the peer.
 332            await Socket.ConnectAsync(_addr, cancellationToken).ConfigureAwait(false);
 333
 334            // Workaround: a canceled Socket.ConnectAsync call can return successfully but the Socket is closed because
 335            // of the cancellation. See https://github.com/dotnet/runtime/issues/75889.
 336            cancellationToken.ThrowIfCancellationRequested();
 337
 338            if (_authenticationOptions is not null)
 339            {
 340                _sslStream = new SslStream(new NetworkStream(Socket, false), false);
 341
 342                await _sslStream.AuthenticateAsClientAsync(
 343                    _authenticationOptions,
 344                    cancellationToken).ConfigureAwait(false);
 345            }
 346
 347            return new TransportConnectionInformation(
 348                localNetworkAddress: Socket.LocalEndPoint!,
 349                remoteNetworkAddress: Socket.RemoteEndPoint!,
 350                _sslStream?.RemoteCertificate);
 351        }
 352        catch (IOException exception)
 353        {
 354            throw exception.ToIceRpcException();
 355        }
 356        catch (SocketException exception)
 357        {
 358            throw exception.ToIceRpcException();
 359        }
 360    }
 361}
 362
 363internal class TcpServerConnection : TcpConnection
 364{
 365    internal override Socket Socket { get; }
 366
 367    internal override SslStream? SslStream => _sslStream;
 368
 369    private readonly SslServerAuthenticationOptions? _authenticationOptions;
 370    private SslStream? _sslStream;
 371
 372    internal TcpServerConnection(
 373        Socket socket,
 374        SslServerAuthenticationOptions? authenticationOptions,
 375        MemoryPool<byte> pool,
 376        int minimumSegmentSize)
 377        : base(authenticationOptions is not null ? pool.Rent(minimumSegmentSize) : null)
 378    {
 379        Socket = socket;
 380        _authenticationOptions = authenticationOptions;
 381    }
 382
 383    private protected override async Task<TransportConnectionInformation> ConnectAsyncCore(
 384        CancellationToken cancellationToken)
 385    {
 386        try
 387        {
 388            if (_authenticationOptions is not null)
 389            {
 390                // This can only be created with a connected socket.
 391                _sslStream = new SslStream(new NetworkStream(Socket, false), false);
 392                await _sslStream.AuthenticateAsServerAsync(
 393                    _authenticationOptions,
 394                    cancellationToken).ConfigureAwait(false);
 395            }
 396
 397            return new TransportConnectionInformation(
 398                localNetworkAddress: Socket.LocalEndPoint!,
 399                remoteNetworkAddress: Socket.RemoteEndPoint!,
 400                _sslStream?.RemoteCertificate);
 401        }
 402        catch (IOException exception)
 403        {
 404            throw exception.ToIceRpcException();
 405        }
 406        catch (SocketException exception)
 407        {
 408            throw exception.ToIceRpcException();
 409        }
 410    }
 411}