< Summary

Information
Class: IceRpc.Transports.Coloc.Internal.ColocConnection
Assembly: IceRpc.Transports.Coloc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Transports.Coloc/Internal/ColocConnection.cs
Tag: 1856_27024993493
Line coverage
83%
Covered lines: 113
Uncovered lines: 23
Coverable lines: 136
Total lines: 279
Line coverage: 83%
Branch coverage
82%
Covered branches: 41
Total branches: 50
Branch coverage: 82%
Method coverage
100%
Covered methods: 8
Fully covered methods: 3
Total methods: 8
Method coverage: 100%
Full method coverage: 37.5%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
Dispose()100%11100%
ReadAsync()90%212088.37%
CopySegmentToMemory()100%22100%
ShutdownWriteAsync(...)83.33%6685.71%
WriteAsync()75%141276.66%
.ctor(...)100%11100%
Dispose(...)75%9872.72%
FinishConnect()50%2266.66%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Transports.Coloc/Internal/ColocConnection.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using IceRpc.Transports.Internal;
 5using System.Buffers;
 6using System.Diagnostics;
 7using System.IO.Pipelines;
 8
 9namespace IceRpc.Transports.Coloc.Internal;
 10
 11/// <summary>The colocated connection class to exchange data within the same process. The implementation copies the send
 12/// buffer into the receive buffer.</summary>
 13internal abstract class ColocConnection : IDuplexConnection
 14{
 15    private protected PipeReader? _reader;
 16    // FlagEnumExtensions operations are used to update the state. These operations are atomic and don't require mutex
 17    // locking.
 18    private protected int _state;
 19
 20    private readonly TransportAddress _transportAddress;
 21    private readonly PipeWriter _writer;
 22
 23    public abstract Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken);
 24
 25    public void Dispose()
 125326    {
 125327        Dispose(true);
 125328        GC.SuppressFinalize(this);
 125329    }
 30
 31    public async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
 4996332    {
 4996333        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 34
 4996235        if (buffer.Length == 0)
 136        {
 137            throw new ArgumentException($"The {nameof(buffer)} cannot be empty.", nameof(buffer));
 38        }
 39
 4996140        if (_reader is null)
 141        {
 142            throw new InvalidOperationException("Reading is not allowed before connection is connected.");
 43        }
 4996044        if (!_state.TrySetFlag(State.Reading))
 145        {
 146            throw new InvalidOperationException("Reading is already in progress.");
 47        }
 48
 49        try
 4995950        {
 4995951            ReadResult readResult = await _reader.ReadAsync(cancellationToken).ConfigureAwait(false);
 4927952            if (readResult.IsCanceled)
 053            {
 54                // Dispose canceled ReadAsync.
 055                throw new IceRpcException(IceRpcError.OperationAborted);
 56            }
 4927957            else if (readResult.IsCompleted && readResult.Buffer.IsEmpty)
 13658            {
 13659                return 0;
 60            }
 61
 62            // We could eventually add a CopyTo(this ReadOnlySequence<byte> src, Memory<byte> dest) extension method
 63            // if we need this in other places.
 64            int read;
 4914365            if (readResult.Buffer.IsSingleSegment)
 613966            {
 613967                read = CopySegmentToMemory(readResult.Buffer.First, buffer);
 613968            }
 69            else
 4300470            {
 4300471                read = 0;
 24123072                foreach (ReadOnlyMemory<byte> segment in readResult.Buffer)
 7761073                {
 7761074                    read += CopySegmentToMemory(segment, buffer[read..]);
 7761075                    if (read == buffer.Length)
 4300276                    {
 4300277                        break;
 78                    }
 3460879                }
 4300480            }
 4914381            _reader.AdvanceTo(readResult.Buffer.GetPosition(read));
 4914382            return read;
 83        }
 84        finally
 4995985        {
 4995986            if (_state.HasFlag(State.Disposed))
 087            {
 088                _reader.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 089            }
 4995990            _state.ClearFlag(State.Reading);
 4995991        }
 92
 93        static int CopySegmentToMemory(ReadOnlyMemory<byte> source, Memory<byte> destination)
 8374994        {
 8374995            if (source.Length > destination.Length)
 3919496            {
 3919497                source[0..destination.Length].CopyTo(destination);
 3919498                return destination.Length;
 99            }
 100            else
 44555101            {
 44555102                source.CopyTo(destination);
 44555103                return source.Length;
 104            }
 83749105        }
 49279106    }
 107
 108    public Task ShutdownWriteAsync(CancellationToken cancellationToken)
 146109    {
 146110        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 111
 146112        if (_reader is null)
 1113        {
 1114            throw new InvalidOperationException("Shutdown is not allowed before the connection is connected.");
 115        }
 145116        if (_state.HasFlag(State.Writing))
 1117        {
 1118            throw new InvalidOperationException("Shutdown or writing is in progress");
 119        }
 144120        if (!_state.TrySetFlag(State.ShuttingDown))
 0121        {
 0122            throw new InvalidOperationException("Shutdown has already been called.");
 123        }
 124
 144125        _writer.Complete();
 144126        return Task.CompletedTask;
 144127    }
 128
 129    public async ValueTask WriteAsync(ReadOnlySequence<byte> buffer, CancellationToken cancellationToken)
 9390130    {
 9390131        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 132
 9389133        if (buffer.IsEmpty)
 0134        {
 0135            throw new ArgumentException($"The {nameof(buffer)} cannot be empty.", nameof(buffer));
 136        }
 137
 9389138        if (_reader is null)
 1139        {
 1140            throw new InvalidOperationException("Writing is not allowed before the connection is connected.");
 141        }
 9388142        if (_state.HasFlag(State.ShuttingDown))
 2143        {
 2144            throw new InvalidOperationException("Writing is not allowed after the connection is shutdown.");
 145        }
 9386146        if (!_state.TrySetFlag(State.Writing))
 1147        {
 1148            throw new InvalidOperationException("Writing is already in progress.");
 149        }
 150
 151        try
 9385152        {
 9385153            _writer.Write(buffer);
 9385154            FlushResult flushResult = await _writer.FlushAsync(cancellationToken).ConfigureAwait(false);
 9382155            if (flushResult.IsCanceled)
 0156            {
 157                // Dispose canceled ReadAsync.
 0158                throw new IceRpcException(IceRpcError.OperationAborted);
 159            }
 9382160        }
 161        finally
 9385162        {
 9385163            Debug.Assert(!_state.HasFlag(State.ShuttingDown));
 9385164            if (_state.HasFlag(State.Disposed))
 0165            {
 0166                _writer.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 0167            }
 9385168            _state.ClearFlag(State.Writing);
 9385169        }
 9382170    }
 171
 1049172    public ColocConnection(TransportAddress transportAddress, PipeWriter writer)
 1049173    {
 1049174        _transportAddress = transportAddress;
 1049175        _writer = writer;
 1049176    }
 177
 178    private protected virtual void Dispose(bool disposing)
 1253179    {
 1253180        if (_state.TrySetFlag(State.Disposed))
 1048181        {
 182            // _reader can be null if connection establishment failed or didn't run.
 1048183            if (_reader is not null)
 992184            {
 992185                if (_state.HasFlag(State.Reading))
 0186                {
 0187                    _reader.CancelPendingRead();
 0188                }
 189                else
 992190                {
 992191                    _reader.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 992192                }
 992193            }
 194
 1048195            if (_state.HasFlag(State.Writing))
 0196            {
 0197                _writer.CancelPendingFlush();
 0198            }
 199            else
 1048200            {
 1048201                _writer.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 1048202            }
 1048203        }
 1253204    }
 205
 206    private protected TransportConnectionInformation FinishConnect()
 966207    {
 966208        Debug.Assert(_reader is not null);
 209
 966210        if (_state.HasFlag(State.Disposed))
 0211        {
 0212            _reader.Complete();
 0213            throw new ObjectDisposedException($"{typeof(ColocConnection)}");
 214        }
 215
 966216        var colocEndPoint = new ColocEndPoint(_transportAddress);
 966217        return new TransportConnectionInformation(colocEndPoint, colocEndPoint, null);
 966218    }
 219
 220    private protected enum State : int
 221    {
 222        Disposed = 1,
 223        Reading = 2,
 224        ShuttingDown = 4,
 225        Writing = 8,
 226    }
 227}
 228
 229/// <summary>The colocated client connection class.</summary>
 230internal class ClientColocConnection : ColocConnection
 231{
 232    private readonly Func<PipeReader, CancellationToken, Task<PipeReader>> _connectAsync;
 233    private PipeReader? _localPipeReader;
 234
 235    public override async Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken)
 236    {
 237        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 238
 239        if (_reader is not null)
 240        {
 241            throw new InvalidOperationException("Connection establishment cannot be called twice.");
 242        }
 243
 244        Debug.Assert(!_state.HasFlag(State.ShuttingDown));
 245
 246        if (_localPipeReader is not null)
 247        {
 248            _reader = await _connectAsync(_localPipeReader, cancellationToken).ConfigureAwait(false);
 249            _localPipeReader = null; // The server-side connection is now responsible for completing the pipe reader.
 250        }
 251        return FinishConnect();
 252    }
 253
 254    internal ClientColocConnection(
 255        TransportAddress transportAddress,
 256        Pipe localPipe,
 257        Func<PipeReader, CancellationToken, Task<PipeReader>> connectAsync)
 258        : base(transportAddress, localPipe.Writer)
 259    {
 260        _connectAsync = connectAsync;
 261        _localPipeReader = localPipe.Reader;
 262    }
 263
 264    private protected override void Dispose(bool disposing)
 265    {
 266        base.Dispose(disposing);
 267        _localPipeReader?.Complete();
 268    }
 269}
 270
 271/// <summary>The colocated server connection class.</summary>
 272internal class ServerColocConnection : ColocConnection
 273{
 274    public override Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken) =>
 275        Task.FromResult(FinishConnect());
 276
 277    public ServerColocConnection(TransportAddress transportAddress, PipeWriter writer, PipeReader reader)
 278       : base(transportAddress, writer) => _reader = reader;
 279}