< Summary

Information
Class: IceRpc.Transports.Tcp.Internal.TcpConnection
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Tcp/Internal/TcpConnection.cs
Tag: 1321_24790053727
Line coverage
73%
Covered lines: 122
Uncovered lines: 44
Coverable lines: 166
Total lines: 436
Line coverage: 73.4%
Branch coverage
79%
Covered branches: 31
Total branches: 39
Branch coverage: 79.4%
Method coverage
90%
Covered methods: 9
Fully covered methods: 4
Total methods: 10
Method coverage: 90%
Full method coverage: 40%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%22100%
ConnectAsync(...)100%11100%
Dispose()100%66100%
ReadAsync(...)100%22100%
PerformReadAsync()100%22100%
ShutdownWriteAsync(...)100%11100%
PerformShutdownAsync()100%2255.55%
WriteAsync(...)100%11100%
PerformWriteAsync()68%422569.66%
AbortAndObserveAsync()100%210%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Tcp/Internal/TcpConnection.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using System.Buffers;
 4using System.Diagnostics;
 5using System.Net;
 6using System.Net.Security;
 7using System.Net.Sockets;
 8using System.Runtime.InteropServices;
 9
 10namespace IceRpc.Transports.Tcp.Internal;
 11
 12/// <summary>Implements <see cref="IDuplexConnection" /> for tcp with or without TLS.</summary>
 13/// <remarks>Unlike Coloc, the Tcp transport is not a "checked" transport, which means it does not need to detect
 14/// violations of the duplex transport contract or report such violations. It assumes its clients are sufficiently well
 15/// tested to never violate this contract. As a result, this implementation does not throw
 16/// <see cref="InvalidOperationException" />.</remarks>
 17internal abstract class TcpConnection : IDuplexConnection
 18{
 19    internal abstract Socket Socket { get; }
 20
 21    internal abstract SslStream? SslStream { get; }
 22
 23    private protected volatile bool _isDisposed;
 24
 25    // The MaxDataSize of the SSL implementation.
 26    private const int MaxSslDataSize = 16 * 1024;
 27
 28    private bool _isShutdown;
 29    private readonly int _maxSslBufferSize;
 22730    private readonly List<ArraySegment<byte>> _segments = new();
 31    private readonly IMemoryOwner<byte>? _writeBufferOwner;
 32
 33    public Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken)
 21434    {
 21435        ObjectDisposedException.ThrowIf(_isDisposed, this);
 21436        return ConnectAsyncCore(cancellationToken);
 21437    }
 38
 39    public void Dispose()
 24840    {
 24841        _isDisposed = true;
 42
 24843        if (SslStream is SslStream sslStream)
 6444        {
 6445            sslStream.Dispose();
 6446        }
 47
 48        // If shutdown was called, we can just dispose the connection to complete the graceful TCP closure. Otherwise,
 49        // we abort the TCP connection to ensure the connection doesn't end up in the TIME_WAIT state.
 24850        if (_isShutdown)
 2751        {
 2752            Socket.Dispose();
 2753        }
 54        else
 22155        {
 22156            Socket.Close(0);
 22157        }
 24858        _writeBufferOwner?.Dispose();
 24859    }
 60
 61    public ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
 935262    {
 935263        ObjectDisposedException.ThrowIf(_isDisposed, this);
 64
 935265        return buffer.Length > 0 ? PerformReadAsync() :
 935266            throw new ArgumentException($"The {nameof(buffer)} cannot be empty.", nameof(buffer));
 67
 68        async ValueTask<int> PerformReadAsync()
 934969        {
 70            try
 934971            {
 934972                return SslStream is SslStream sslStream ?
 934973                    await SslStream.ReadAsync(buffer, cancellationToken).ConfigureAwait(false) :
 934974                    await Socket.ReceiveAsync(buffer, SocketFlags.None, cancellationToken).ConfigureAwait(false);
 75            }
 676            catch (IOException exception)
 677            {
 678                throw exception.ToIceRpcException();
 79            }
 1980            catch (SocketException exception)
 1981            {
 1982                throw exception.ToIceRpcException();
 83            }
 930784        }
 934985    }
 86
 87    public Task ShutdownWriteAsync(CancellationToken cancellationToken)
 2788    {
 2789        ObjectDisposedException.ThrowIf(_isDisposed, this);
 90
 2791        return PerformShutdownAsync();
 92
 93        async Task PerformShutdownAsync()
 2794        {
 95            try
 2796            {
 2797                if (SslStream is SslStream sslStream)
 998                {
 999                    Task shutdownTask = sslStream.ShutdownAsync();
 100
 101                    try
 9102                    {
 9103                        await shutdownTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 9104                    }
 0105                    catch (OperationCanceledException)
 0106                    {
 0107                        await AbortAndObserveAsync(shutdownTask).ConfigureAwait(false);
 0108                        throw;
 109                    }
 110                }
 111
 112                // Shutdown the socket send side to send a TCP FIN packet. We don't close the read side because we want
 113                // to be notified when the peer shuts down it's side of the socket (through the ReceiveAsync call).
 27114                Socket.Shutdown(SocketShutdown.Send);
 115
 116                // If shutdown is successful mark the connection as shutdown to ensure Dispose won't reset the TCP
 117                // connection.
 27118                _isShutdown = true;
 119            }
 0120            catch (IOException exception)
 121            {
 0122                throw exception.ToIceRpcException();
 123            }
 0124            catch (SocketException exception)
 125            {
 0126                throw exception.ToIceRpcException();
 127            }
 128        }
 54129    }
 130
 131    public ValueTask WriteAsync(ReadOnlySequence<byte> buffer, CancellationToken cancellationToken)
 106132    {
 106133        ObjectDisposedException.ThrowIf(_isDisposed, this);
 106134        return PerformWriteAsync();
 135
 136        async ValueTask PerformWriteAsync()
 106137        {
 138            try
 106139            {
 106140                if (SslStream is SslStream sslStream)
 21141                {
 21142                    if (buffer.IsSingleSegment)
 19143                    {
 19144                        await sslStream.WriteAsync(buffer.First, cancellationToken).ConfigureAwait(false);
 15145                    }
 146                    else
 2147                    {
 148                        // Coalesce leading segments up to _maxSslBufferSize. We don't coalesce trailing segments as we
 149                        // assume these segments are large enough.
 2150                        int leadingSize = 0;
 2151                        int leadingSegmentCount = 0;
 304152                        foreach (ReadOnlyMemory<byte> memory in buffer)
 149153                        {
 149154                            if (leadingSize + memory.Length <= _maxSslBufferSize)
 149155                            {
 149156                                leadingSize += memory.Length;
 149157                                leadingSegmentCount++;
 149158                            }
 159                            else
 0160                            {
 0161                                break;
 162                            }
 149163                        }
 164
 2165                        if (leadingSegmentCount > 1)
 2166                        {
 2167                            ReadOnlySequence<byte> leading = buffer.Slice(0, leadingSize);
 2168                            buffer = buffer.Slice(leadingSize); // buffer can become empty
 169
 2170                            Debug.Assert(_writeBufferOwner is not null);
 2171                            Memory<byte> writeBuffer = _writeBufferOwner.Memory[0..leadingSize];
 2172                            leading.CopyTo(writeBuffer.Span);
 173
 174                            // Send the "coalesced" leading segments
 2175                            await sslStream.WriteAsync(writeBuffer, cancellationToken).ConfigureAwait(false);
 2176                        }
 177                        // else no need to coalesce (copy) a single segment
 178
 179                        // Send the remaining segments one by one
 2180                        if (buffer.IsEmpty)
 2181                        {
 182                            // done
 2183                        }
 0184                        else if (buffer.IsSingleSegment)
 0185                        {
 0186                            await sslStream.WriteAsync(buffer.First, cancellationToken).ConfigureAwait(false);
 0187                        }
 188                        else
 0189                        {
 0190                            foreach (ReadOnlyMemory<byte> memory in buffer)
 0191                            {
 0192                                await sslStream.WriteAsync(memory, cancellationToken).ConfigureAwait(false);
 0193                            }
 0194                        }
 2195                    }
 17196                }
 197                else
 85198                {
 85199                    if (buffer.IsSingleSegment)
 81200                    {
 81201                        int bytesSent = await Socket.SendAsync(
 81202                            buffer.First,
 81203                            SocketFlags.None,
 81204                            cancellationToken).ConfigureAwait(false);
 205
 73206                        if (bytesSent != buffer.First.Length)
 0207                        {
 208                            // This should never happen.
 0209                            throw new IceRpcException(
 0210                                IceRpcError.IceRpcError,
 0211                                $"Short write on TCP socket: expected {buffer.First.Length} bytes but sent {bytesSent}."
 212                        }
 73213                    }
 214                    else
 4215                    {
 4216                        _segments.Clear();
 4217                        long totalBytes = buffer.Length;
 608218                        foreach (ReadOnlyMemory<byte> memory in buffer)
 298219                        {
 298220                            if (MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment))
 298221                            {
 298222                                _segments.Add(segment);
 298223                            }
 224                            else
 0225                            {
 0226                                throw new ArgumentException(
 0227                                    $"The {nameof(buffer)} must be backed by arrays.",
 0228                                    nameof(buffer));
 229                            }
 298230                        }
 231
 4232                        Task<int> sendTask = Socket.SendAsync(_segments, SocketFlags.None);
 233
 234                        int bytesSent;
 235                        try
 4236                        {
 4237                            bytesSent = await sendTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 4238                        }
 0239                        catch (OperationCanceledException)
 0240                        {
 0241                            await AbortAndObserveAsync(sendTask).ConfigureAwait(false);
 0242                            throw;
 243                        }
 244
 4245                        if (bytesSent != totalBytes)
 246                        {
 247                            // This should never happen.
 0248                            throw new IceRpcException(
 0249                                IceRpcError.IceRpcError,
 0250                                $"Short write on TCP socket: expected {totalBytes} bytes but sent {bytesSent}.");
 251                        }
 4252                    }
 253                }
 254            }
 1255            catch (IOException exception)
 256            {
 1257                throw exception.ToIceRpcException();
 258            }
 4259            catch (SocketException exception)
 260            {
 4261                throw exception.ToIceRpcException();
 262            }
 263        }
 200264    }
 265
 227266    private protected TcpConnection(IMemoryOwner<byte>? memoryOwner)
 227267    {
 227268        _writeBufferOwner = memoryOwner;
 269        // When coalescing leading buffers in WriteAsync (SSL only), the upper size limit is the lesser of the size of
 270        // the buffer we rented from the memory pool (typically 4K) and MaxSslDataSize (16K).
 227271        _maxSslBufferSize = Math.Min(memoryOwner?.Memory.Length ?? 0, MaxSslDataSize);
 227272    }
 273
 274    private protected abstract Task<TransportConnectionInformation> ConnectAsyncCore(
 275        CancellationToken cancellationToken);
 276
 277    /// <summary>Aborts the connection and then observes the exception of the provided task.</summary>
 278    private async Task AbortAndObserveAsync(Task task)
 0279    {
 0280        Socket.Close(0);
 281        try
 0282        {
 0283            await task.ConfigureAwait(false);
 0284        }
 0285        catch
 0286        {
 287            // observe exception
 0288        }
 0289    }
 290}
 291
 292internal class TcpClientConnection : TcpConnection
 293{
 294    internal override Socket Socket { get; }
 295
 296    internal override SslStream? SslStream => _sslStream;
 297
 298    private readonly EndPoint _address;
 299    private readonly SslClientAuthenticationOptions? _authenticationOptions;
 300
 301    private SslStream? _sslStream;
 302
 303    internal TcpClientConnection(
 304        TransportAddress transportAddress,
 305        SslClientAuthenticationOptions? authenticationOptions,
 306        MemoryPool<byte> pool,
 307        int minimumSegmentSize,
 308        TcpClientTransportOptions options)
 309        : base(authenticationOptions is not null ? pool.Rent(minimumSegmentSize) : null)
 310    {
 311        _address = IPAddress.TryParse(transportAddress.Host, out IPAddress? ipAddress) ?
 312            new IPEndPoint(ipAddress, transportAddress.Port) :
 313            new DnsEndPoint(transportAddress.Host, transportAddress.Port);
 314
 315        _authenticationOptions = authenticationOptions;
 316
 317        // When using IPv6 address family we use the socket constructor without AddressFamily parameter to ensure
 318        // dual-mode socket are used in platforms that support them.
 319        Socket = ipAddress?.AddressFamily == AddressFamily.InterNetwork ?
 320            new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp) :
 321            new Socket(SocketType.Stream, ProtocolType.Tcp);
 322
 323        try
 324        {
 325            if (options.LocalNetworkAddress is IPEndPoint localNetworkAddress)
 326            {
 327                Socket.Bind(localNetworkAddress);
 328            }
 329
 330            Socket.Configure(options);
 331        }
 332        catch (SocketException exception)
 333        {
 334            Socket.Dispose();
 335            throw exception.ToIceRpcException();
 336        }
 337        catch
 338        {
 339            Socket.Dispose();
 340            throw;
 341        }
 342    }
 343
 344    private protected override async Task<TransportConnectionInformation> ConnectAsyncCore(
 345        CancellationToken cancellationToken)
 346    {
 347        bool isConnected = false;
 348        try
 349        {
 350            Debug.Assert(Socket is not null);
 351
 352            // Connect to the peer.
 353            await Socket.ConnectAsync(_address, cancellationToken).ConfigureAwait(false);
 354            isConnected = true;
 355
 356            if (_authenticationOptions is not null)
 357            {
 358                _sslStream = new SslStream(new NetworkStream(Socket, false), false);
 359
 360                await _sslStream.AuthenticateAsClientAsync(
 361                    _authenticationOptions,
 362                    cancellationToken).ConfigureAwait(false);
 363            }
 364
 365            return new TransportConnectionInformation(
 366                localNetworkAddress: Socket.LocalEndPoint!,
 367                remoteNetworkAddress: Socket.RemoteEndPoint!,
 368                _sslStream?.RemoteCertificate);
 369        }
 370        catch (IOException exception)
 371        {
 372            throw exception.ToIceRpcException();
 373        }
 374        catch (SocketException exception) when (isConnected)
 375        {
 376            // This can happen if the peer closes the connection immediately after accepting it, which can cause the
 377            // endpoint information to be unavailable. Any SocketException at this point means the connection is no
 378            // longer usable.
 379            throw new IceRpcException(IceRpcError.ConnectionAborted, exception);
 380        }
 381        catch (SocketException exception)
 382        {
 383            throw exception.ToIceRpcException();
 384        }
 385    }
 386}
 387
 388internal class TcpServerConnection : TcpConnection
 389{
 390    internal override Socket Socket { get; }
 391
 392    internal override SslStream? SslStream => _sslStream;
 393
 394    private readonly SslServerAuthenticationOptions? _authenticationOptions;
 395    private SslStream? _sslStream;
 396
 397    internal TcpServerConnection(
 398        Socket socket,
 399        SslServerAuthenticationOptions? authenticationOptions,
 400        MemoryPool<byte> pool,
 401        int minimumSegmentSize)
 402        : base(authenticationOptions is not null ? pool.Rent(minimumSegmentSize) : null)
 403    {
 404        Socket = socket;
 405        _authenticationOptions = authenticationOptions;
 406    }
 407
 408    private protected override async Task<TransportConnectionInformation> ConnectAsyncCore(
 409        CancellationToken cancellationToken)
 410    {
 411        try
 412        {
 413            if (_authenticationOptions is not null)
 414            {
 415                // This can only be created with a connected socket.
 416                _sslStream = new SslStream(new NetworkStream(Socket, false), false);
 417                await _sslStream.AuthenticateAsServerAsync(
 418                    _authenticationOptions,
 419                    cancellationToken).ConfigureAwait(false);
 420            }
 421
 422            return new TransportConnectionInformation(
 423                localNetworkAddress: Socket.LocalEndPoint!,
 424                remoteNetworkAddress: Socket.RemoteEndPoint!,
 425                _sslStream?.RemoteCertificate);
 426        }
 427        catch (IOException exception)
 428        {
 429            throw exception.ToIceRpcException();
 430        }
 431        catch (SocketException exception)
 432        {
 433            throw exception.ToIceRpcException();
 434        }
 435    }
 436}