< Summary

Information
Class: IceRpc.Transports.Coloc.Internal.ColocConnection
Assembly: IceRpc.Transports.Coloc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Transports.Coloc/Internal/ColocConnection.cs
Tag: 1321_24790053727
Line coverage
83%
Covered lines: 113
Uncovered lines: 23
Coverable lines: 136
Total lines: 279
Line coverage: 83%
Branch coverage
82%
Covered branches: 41
Total branches: 50
Branch coverage: 82%
Method coverage
100%
Covered methods: 8
Fully covered methods: 3
Total methods: 8
Method coverage: 100%
Full method coverage: 37.5%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
Dispose()100%11100%
ReadAsync()90%212088.37%
CopySegmentToMemory()100%22100%
ShutdownWriteAsync(...)83.33%6685.71%
WriteAsync()75%141276.66%
.ctor(...)100%11100%
Dispose(...)75%9872.72%
FinishConnect()50%2266.66%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Transports.Coloc/Internal/ColocConnection.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using IceRpc.Transports.Internal;
 5using System.Buffers;
 6using System.Diagnostics;
 7using System.IO.Pipelines;
 8
 9namespace IceRpc.Transports.Coloc.Internal;
 10
 11/// <summary>The colocated connection class to exchange data within the same process. The implementation copies the send
 12/// buffer into the receive buffer.</summary>
 13internal abstract class ColocConnection : IDuplexConnection
 14{
 15    private protected PipeReader? _reader;
 16    // FlagEnumExtensions operations are used to update the state. These operations are atomic and don't require mutex
 17    // locking.
 18    private protected int _state;
 19
 20    private readonly TransportAddress _transportAddress;
 21    private readonly PipeWriter _writer;
 22
 23    public abstract Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken);
 24
 25    public void Dispose()
 118926    {
 118927        Dispose(true);
 118928        GC.SuppressFinalize(this);
 118929    }
 30
 31    public async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
 6663632    {
 6663633        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 34
 6663535        if (buffer.Length == 0)
 136        {
 137            throw new ArgumentException($"The {nameof(buffer)} cannot be empty.", nameof(buffer));
 38        }
 39
 6663440        if (_reader is null)
 141        {
 142            throw new InvalidOperationException("Reading is not allowed before connection is connected.");
 43        }
 6663344        if (!_state.TrySetFlag(State.Reading))
 145        {
 146            throw new InvalidOperationException("Reading is already in progress.");
 47        }
 48
 49        try
 6663250        {
 6663251            ReadResult readResult = await _reader.ReadAsync(cancellationToken).ConfigureAwait(false);
 6599852            if (readResult.IsCanceled)
 053            {
 54                // Dispose canceled ReadAsync.
 055                throw new IceRpcException(IceRpcError.OperationAborted);
 56            }
 6599857            else if (readResult.IsCompleted && readResult.Buffer.IsEmpty)
 13858            {
 13859                return 0;
 60            }
 61
 62            // We could eventually add a CopyTo(this ReadOnlySequence<byte> src, Memory<byte> dest) extension method
 63            // if we need this in other places.
 64            int read;
 6586065            if (readResult.Buffer.IsSingleSegment)
 638766            {
 638767                read = CopySegmentToMemory(readResult.Buffer.First, buffer);
 638768            }
 69            else
 5947370            {
 5947371                read = 0;
 32344672                foreach (ReadOnlyMemory<byte> segment in readResult.Buffer)
 10224973                {
 10224974                    read += CopySegmentToMemory(segment, buffer[read..]);
 10224975                    if (read == buffer.Length)
 5947176                    {
 5947177                        break;
 78                    }
 4277879                }
 5947380            }
 6586081            _reader.AdvanceTo(readResult.Buffer.GetPosition(read));
 6586082            return read;
 83        }
 84        finally
 6663285        {
 6663286            if (_state.HasFlag(State.Disposed))
 087            {
 088                _reader.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 089            }
 6663290            _state.ClearFlag(State.Reading);
 6663291        }
 92
 93        static int CopySegmentToMemory(ReadOnlyMemory<byte> source, Memory<byte> destination)
 10863694        {
 10863695            if (source.Length > destination.Length)
 5730796            {
 5730797                source[0..destination.Length].CopyTo(destination);
 5730798                return destination.Length;
 99            }
 100            else
 51329101            {
 51329102                source.CopyTo(destination);
 51329103                return source.Length;
 104            }
 108636105        }
 65998106    }
 107
 108    public Task ShutdownWriteAsync(CancellationToken cancellationToken)
 148109    {
 148110        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 111
 148112        if (_reader is null)
 1113        {
 1114            throw new InvalidOperationException("Shutdown is not allowed before the connection is connected.");
 115        }
 147116        if (_state.HasFlag(State.Writing))
 1117        {
 1118            throw new InvalidOperationException("Shutdown or writing is in progress");
 119        }
 146120        if (!_state.TrySetFlag(State.ShuttingDown))
 0121        {
 0122            throw new InvalidOperationException("Shutdown has already been called.");
 123        }
 124
 146125        _writer.Complete();
 146126        return Task.CompletedTask;
 146127    }
 128
 129    public async ValueTask WriteAsync(ReadOnlySequence<byte> buffer, CancellationToken cancellationToken)
 8294130    {
 8294131        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 132
 8293133        if (buffer.IsEmpty)
 0134        {
 0135            throw new ArgumentException($"The {nameof(buffer)} cannot be empty.", nameof(buffer));
 136        }
 137
 8293138        if (_reader is null)
 1139        {
 1140            throw new InvalidOperationException("Writing is not allowed before the connection is connected.");
 141        }
 8292142        if (_state.HasFlag(State.ShuttingDown))
 2143        {
 2144            throw new InvalidOperationException("Writing is not allowed after the connection is shutdown.");
 145        }
 8290146        if (!_state.TrySetFlag(State.Writing))
 1147        {
 1148            throw new InvalidOperationException("Writing is already in progress.");
 149        }
 150
 151        try
 8289152        {
 8289153            _writer.Write(buffer);
 8289154            FlushResult flushResult = await _writer.FlushAsync(cancellationToken).ConfigureAwait(false);
 8279155            if (flushResult.IsCanceled)
 0156            {
 157                // Dispose canceled ReadAsync.
 0158                throw new IceRpcException(IceRpcError.OperationAborted);
 159            }
 8279160        }
 161        finally
 8289162        {
 8289163            Debug.Assert(!_state.HasFlag(State.ShuttingDown));
 8289164            if (_state.HasFlag(State.Disposed))
 0165            {
 0166                _writer.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 0167            }
 8289168            _state.ClearFlag(State.Writing);
 8289169        }
 8279170    }
 171
 993172    public ColocConnection(TransportAddress transportAddress, PipeWriter writer)
 993173    {
 993174        _transportAddress = transportAddress;
 993175        _writer = writer;
 993176    }
 177
 178    private protected virtual void Dispose(bool disposing)
 1189179    {
 1189180        if (_state.TrySetFlag(State.Disposed))
 992181        {
 182            // _reader can be null if connection establishment failed or didn't run.
 992183            if (_reader is not null)
 936184            {
 936185                if (_state.HasFlag(State.Reading))
 0186                {
 0187                    _reader.CancelPendingRead();
 0188                }
 189                else
 936190                {
 936191                    _reader.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 936192                }
 936193            }
 194
 992195            if (_state.HasFlag(State.Writing))
 0196            {
 0197                _writer.CancelPendingFlush();
 0198            }
 199            else
 992200            {
 992201                _writer.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 992202            }
 992203        }
 1189204    }
 205
 206    private protected TransportConnectionInformation FinishConnect()
 910207    {
 910208        Debug.Assert(_reader is not null);
 209
 910210        if (_state.HasFlag(State.Disposed))
 0211        {
 0212            _reader.Complete();
 0213            throw new ObjectDisposedException($"{typeof(ColocConnection)}");
 214        }
 215
 910216        var colocEndPoint = new ColocEndPoint(_transportAddress);
 910217        return new TransportConnectionInformation(colocEndPoint, colocEndPoint, null);
 910218    }
 219
 220    private protected enum State : int
 221    {
 222        Disposed = 1,
 223        Reading = 2,
 224        ShuttingDown = 4,
 225        Writing = 8,
 226    }
 227}
 228
 229/// <summary>The colocated client connection class.</summary>
 230internal class ClientColocConnection : ColocConnection
 231{
 232    private readonly Func<PipeReader, CancellationToken, Task<PipeReader>> _connectAsync;
 233    private PipeReader? _localPipeReader;
 234
 235    public override async Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken)
 236    {
 237        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 238
 239        if (_reader is not null)
 240        {
 241            throw new InvalidOperationException("Connection establishment cannot be called twice.");
 242        }
 243
 244        Debug.Assert(!_state.HasFlag(State.ShuttingDown));
 245
 246        if (_localPipeReader is not null)
 247        {
 248            _reader = await _connectAsync(_localPipeReader, cancellationToken).ConfigureAwait(false);
 249            _localPipeReader = null; // The server-side connection is now responsible for completing the pipe reader.
 250        }
 251        return FinishConnect();
 252    }
 253
 254    internal ClientColocConnection(
 255        TransportAddress transportAddress,
 256        Pipe localPipe,
 257        Func<PipeReader, CancellationToken, Task<PipeReader>> connectAsync)
 258        : base(transportAddress, localPipe.Writer)
 259    {
 260        _connectAsync = connectAsync;
 261        _localPipeReader = localPipe.Reader;
 262    }
 263
 264    private protected override void Dispose(bool disposing)
 265    {
 266        base.Dispose(disposing);
 267        _localPipeReader?.Complete();
 268    }
 269}
 270
 271/// <summary>The colocated server connection class.</summary>
 272internal class ServerColocConnection : ColocConnection
 273{
 274    public override Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken) =>
 275        Task.FromResult(FinishConnect());
 276
 277    public ServerColocConnection(TransportAddress transportAddress, PipeWriter writer, PipeReader reader)
 278       : base(transportAddress, writer) => _reader = reader;
 279}