< Summary

Information
Class: IceRpc.Transports.Coloc.Internal.ColocConnection
Assembly: IceRpc.Transports.Coloc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Transports.Coloc/Internal/ColocConnection.cs
Tag: 275_13775359185
Line coverage
82%
Covered lines: 110
Uncovered lines: 23
Coverable lines: 133
Total lines: 274
Line coverage: 82.7%
Branch coverage
81%
Covered branches: 39
Total branches: 48
Branch coverage: 81.2%
Method coverage
100%
Covered methods: 8
Total methods: 8
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
Dispose()100%11100%
ReadAsync()88.88%18.631887.5%
CopySegmentToMemory()100%22100%
ShutdownWriteAsync(...)83.33%6.11685.71%
WriteAsync()75%13.831276.66%
.ctor(...)100%11100%
Dispose(...)75%9.3872.72%
FinishConnect()50%2.15266.66%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Transports.Coloc/Internal/ColocConnection.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using IceRpc.Transports.Internal;
 5using System.Buffers;
 6using System.Diagnostics;
 7using System.IO.Pipelines;
 8
 9namespace IceRpc.Transports.Coloc.Internal;
 10
 11/// <summary>The colocated connection class to exchange data within the same process. The implementation copies the send
 12/// buffer into the receive buffer.</summary>
 13internal abstract class ColocConnection : IDuplexConnection
 14{
 15    private protected PipeReader? _reader;
 16    // FlagEnumExtensions operations are used to update the state. These operations are atomic and don't require mutex
 17    // locking.
 18    private protected int _state;
 19
 20    private readonly ServerAddress _serverAddress;
 21    private readonly PipeWriter _writer;
 22
 23    public abstract Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken);
 24
 25    public void Dispose()
 225126    {
 225127        Dispose(true);
 225128        GC.SuppressFinalize(this);
 225129    }
 30
 31    public async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
 13497932    {
 13497933        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 34
 13497735        if (_reader is null)
 236        {
 237            throw new InvalidOperationException("Reading is not allowed before connection is connected.");
 38        }
 13497539        if (!_state.TrySetFlag(State.Reading))
 240        {
 241            throw new InvalidOperationException("Reading is already in progress.");
 42        }
 43
 44        try
 13497345        {
 13497346            ReadResult readResult = await _reader.ReadAsync(cancellationToken).ConfigureAwait(false);
 13375847            if (readResult.IsCanceled)
 048            {
 49                // Dispose canceled ReadAsync.
 050                throw new IceRpcException(IceRpcError.OperationAborted);
 51            }
 13375852            else if (readResult.IsCompleted && readResult.Buffer.IsEmpty)
 26053            {
 26054                return 0;
 55            }
 56
 57            // We could eventually add a CopyTo(this ReadOnlySequence<byte> src, Memory<byte> dest) extension method
 58            // if we need this in other places.
 59            int read;
 13349860            if (readResult.Buffer.IsSingleSegment)
 1031061            {
 1031062                read = CopySegmentToMemory(readResult.Buffer.First, buffer);
 1031063            }
 64            else
 12318865            {
 12318866                read = 0;
 66371867                foreach (ReadOnlyMemory<byte> segment in readResult.Buffer)
 20867168                {
 20867169                    read += CopySegmentToMemory(segment, buffer[read..]);
 20867170                    if (read == buffer.Length)
 12318871                    {
 12318872                        break;
 73                    }
 8548374                }
 12318875            }
 13349876            _reader.AdvanceTo(readResult.Buffer.GetPosition(read));
 13349877            return read;
 78        }
 79        finally
 13497380        {
 13497381            if (_state.HasFlag(State.Disposed))
 082            {
 083                _reader.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 084            }
 13497385            _state.ClearFlag(State.Reading);
 13497386        }
 87
 88        static int CopySegmentToMemory(ReadOnlyMemory<byte> source, Memory<byte> destination)
 21898189        {
 21898190            if (source.Length > destination.Length)
 11865091            {
 11865092                source[0..destination.Length].CopyTo(destination);
 11865093                return destination.Length;
 94            }
 95            else
 10033196            {
 10033197                source.CopyTo(destination);
 10033198                return source.Length;
 99            }
 218981100        }
 133758101    }
 102
 103    public Task ShutdownWriteAsync(CancellationToken cancellationToken)
 280104    {
 280105        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 106
 280107        if (_reader is null)
 2108        {
 2109            throw new InvalidOperationException("Shutdown is not allowed before the connection is connected.");
 110        }
 278111        if (_state.HasFlag(State.Writing))
 2112        {
 2113            throw new InvalidOperationException("Shutdown or writing is in progress");
 114        }
 276115        if (!_state.TrySetFlag(State.ShuttingDown))
 0116        {
 0117            throw new InvalidOperationException("Shutdown has already been called.");
 118        }
 119
 276120        _writer.Complete();
 276121        return Task.CompletedTask;
 276122    }
 123
 124    public async ValueTask WriteAsync(ReadOnlySequence<byte> buffer, CancellationToken cancellationToken)
 13297125    {
 13297126        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 127
 13295128        if (buffer.IsEmpty)
 0129        {
 0130            throw new ArgumentException($"The {nameof(buffer)} cannot be empty.", nameof(buffer));
 131        }
 132
 13295133        if (_reader is null)
 2134        {
 2135            throw new InvalidOperationException("Writing is not allowed before the connection is connected.");
 136        }
 13293137        if (_state.HasFlag(State.ShuttingDown))
 4138        {
 4139            throw new InvalidOperationException("Writing is not allowed after the connection is shutdown.");
 140        }
 13289141        if (!_state.TrySetFlag(State.Writing))
 2142        {
 2143            throw new InvalidOperationException("Writing is already in progress.");
 144        }
 145
 146        try
 13287147        {
 13287148            _writer.Write(buffer);
 13287149            FlushResult flushResult = await _writer.FlushAsync(cancellationToken).ConfigureAwait(false);
 13266150            if (flushResult.IsCanceled)
 0151            {
 152                // Dispose canceled ReadAsync.
 0153                throw new IceRpcException(IceRpcError.OperationAborted);
 154            }
 13266155        }
 156        finally
 13287157        {
 13287158            Debug.Assert(!_state.HasFlag(State.ShuttingDown));
 13287159            if (_state.HasFlag(State.Disposed))
 0160            {
 0161                _writer.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 0162            }
 13287163            _state.ClearFlag(State.Writing);
 13287164        }
 13266165    }
 166
 1899167    public ColocConnection(ServerAddress serverAddress, PipeWriter writer)
 1899168    {
 1899169        _serverAddress = serverAddress;
 1899170        _writer = writer;
 1899171    }
 172
 173    private protected virtual void Dispose(bool disposing)
 2251174    {
 2251175        if (_state.TrySetFlag(State.Disposed))
 1897176        {
 177            // _reader can be null if connection establishment failed or didn't run.
 1897178            if (_reader is not null)
 1784179            {
 1784180                if (_state.HasFlag(State.Reading))
 0181                {
 0182                    _reader.CancelPendingRead();
 0183                }
 184                else
 1784185                {
 1784186                    _reader.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 1784187                }
 1784188            }
 189
 1897190            if (_state.HasFlag(State.Writing))
 0191            {
 0192                _writer.CancelPendingFlush();
 0193            }
 194            else
 1897195            {
 1897196                _writer.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 1897197            }
 1897198        }
 2251199    }
 200
 201    private protected TransportConnectionInformation FinishConnect()
 1733202    {
 1733203        Debug.Assert(_reader is not null);
 204
 1733205        if (_state.HasFlag(State.Disposed))
 0206        {
 0207            _reader.Complete();
 0208            throw new ObjectDisposedException($"{typeof(ColocConnection)}");
 209        }
 210
 1733211        var colocEndPoint = new ColocEndPoint(_serverAddress);
 1733212        return new TransportConnectionInformation(colocEndPoint, colocEndPoint, null);
 1733213    }
 214
 215    private protected enum State : int
 216    {
 217        Disposed = 1,
 218        Reading = 2,
 219        ShuttingDown = 4,
 220        Writing = 8,
 221    }
 222}
 223
 224/// <summary>The colocated client connection class.</summary>
 225internal class ClientColocConnection : ColocConnection
 226{
 227    private readonly Func<PipeReader, CancellationToken, Task<PipeReader>> _connectAsync;
 228    private PipeReader? _localPipeReader;
 229
 230    public override async Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken)
 231    {
 232        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 233
 234        if (_reader is not null)
 235        {
 236            throw new InvalidOperationException("Connection establishment cannot be called twice.");
 237        }
 238
 239        Debug.Assert(!_state.HasFlag(State.ShuttingDown));
 240
 241        if (_localPipeReader is not null)
 242        {
 243            _reader = await _connectAsync(_localPipeReader, cancellationToken).ConfigureAwait(false);
 244            _localPipeReader = null; // The server-side connection is now responsible for completing the pipe reader.
 245        }
 246        return FinishConnect();
 247    }
 248
 249    internal ClientColocConnection(
 250        ServerAddress serverAddress,
 251        Pipe localPipe,
 252        Func<PipeReader, CancellationToken, Task<PipeReader>> connectAsync)
 253        : base(serverAddress, localPipe.Writer)
 254    {
 255        _connectAsync = connectAsync;
 256        _localPipeReader = localPipe.Reader;
 257    }
 258
 259    private protected override void Dispose(bool disposing)
 260    {
 261        base.Dispose(disposing);
 262        _localPipeReader?.Complete();
 263    }
 264}
 265
 266/// <summary>The colocated server connection class.</summary>
 267internal class ServerColocConnection : ColocConnection
 268{
 269    public override Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken) =>
 270        Task.FromResult(FinishConnect());
 271
 272    public ServerColocConnection(ServerAddress serverAddress, PipeWriter writer, PipeReader reader)
 273       : base(serverAddress, writer) => _reader = reader;
 274}