< Summary

Information
Class: IceRpc.Transports.Coloc.Internal.ColocConnection
Assembly: IceRpc.Transports.Coloc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Transports.Coloc/Internal/ColocConnection.cs
Tag: 592_20856082467
Line coverage
82%
Covered lines: 110
Uncovered lines: 23
Coverable lines: 133
Total lines: 274
Line coverage: 82.7%
Branch coverage
81%
Covered branches: 39
Total branches: 48
Branch coverage: 81.2%
Method coverage
100%
Covered methods: 8
Total methods: 8
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
Dispose()100%11100%
ReadAsync()88.88%18.631887.5%
CopySegmentToMemory()100%22100%
ShutdownWriteAsync(...)83.33%6.11685.71%
WriteAsync()75%13.831276.66%
.ctor(...)100%11100%
Dispose(...)75%9.3872.72%
FinishConnect()50%2.15266.66%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Transports.Coloc/Internal/ColocConnection.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using IceRpc.Transports.Internal;
 5using System.Buffers;
 6using System.Diagnostics;
 7using System.IO.Pipelines;
 8
 9namespace IceRpc.Transports.Coloc.Internal;
 10
 11/// <summary>The colocated connection class to exchange data within the same process. The implementation copies the send
 12/// buffer into the receive buffer.</summary>
 13internal abstract class ColocConnection : IDuplexConnection
 14{
 15    private protected PipeReader? _reader;
 16    // FlagEnumExtensions operations are used to update the state. These operations are atomic and don't require mutex
 17    // locking.
 18    private protected int _state;
 19
 20    private readonly ServerAddress _serverAddress;
 21    private readonly PipeWriter _writer;
 22
 23    public abstract Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken);
 24
 25    public void Dispose()
 114426    {
 114427        Dispose(true);
 114428        GC.SuppressFinalize(this);
 114429    }
 30
 31    public async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
 6563632    {
 6563633        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 34
 6563535        if (_reader is null)
 136        {
 137            throw new InvalidOperationException("Reading is not allowed before connection is connected.");
 38        }
 6563439        if (!_state.TrySetFlag(State.Reading))
 140        {
 141            throw new InvalidOperationException("Reading is already in progress.");
 42        }
 43
 44        try
 6563345        {
 6563346            ReadResult readResult = await _reader.ReadAsync(cancellationToken).ConfigureAwait(false);
 6501947            if (readResult.IsCanceled)
 048            {
 49                // Dispose canceled ReadAsync.
 050                throw new IceRpcException(IceRpcError.OperationAborted);
 51            }
 6501952            else if (readResult.IsCompleted && readResult.Buffer.IsEmpty)
 13653            {
 13654                return 0;
 55            }
 56
 57            // We could eventually add a CopyTo(this ReadOnlySequence<byte> src, Memory<byte> dest) extension method
 58            // if we need this in other places.
 59            int read;
 6488360            if (readResult.Buffer.IsSingleSegment)
 544461            {
 544462                read = CopySegmentToMemory(readResult.Buffer.First, buffer);
 544463            }
 64            else
 5943965            {
 5943966                read = 0;
 32304267                foreach (ReadOnlyMemory<byte> segment in readResult.Buffer)
 10208268                {
 10208269                    read += CopySegmentToMemory(segment, buffer[read..]);
 10208270                    if (read == buffer.Length)
 5943971                    {
 5943972                        break;
 73                    }
 4264374                }
 5943975            }
 6488376            _reader.AdvanceTo(readResult.Buffer.GetPosition(read));
 6488377            return read;
 78        }
 79        finally
 6563380        {
 6563381            if (_state.HasFlag(State.Disposed))
 082            {
 083                _reader.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 084            }
 6563385            _state.ClearFlag(State.Reading);
 6563386        }
 87
 88        static int CopySegmentToMemory(ReadOnlyMemory<byte> source, Memory<byte> destination)
 10752689        {
 10752690            if (source.Length > destination.Length)
 5708391            {
 5708392                source[0..destination.Length].CopyTo(destination);
 5708393                return destination.Length;
 94            }
 95            else
 5044396            {
 5044397                source.CopyTo(destination);
 5044398                return source.Length;
 99            }
 107526100        }
 65019101    }
 102
 103    public Task ShutdownWriteAsync(CancellationToken cancellationToken)
 146104    {
 146105        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 106
 146107        if (_reader is null)
 1108        {
 1109            throw new InvalidOperationException("Shutdown is not allowed before the connection is connected.");
 110        }
 145111        if (_state.HasFlag(State.Writing))
 1112        {
 1113            throw new InvalidOperationException("Shutdown or writing is in progress");
 114        }
 144115        if (!_state.TrySetFlag(State.ShuttingDown))
 0116        {
 0117            throw new InvalidOperationException("Shutdown has already been called.");
 118        }
 119
 144120        _writer.Complete();
 144121        return Task.CompletedTask;
 144122    }
 123
 124    public async ValueTask WriteAsync(ReadOnlySequence<byte> buffer, CancellationToken cancellationToken)
 7059125    {
 7059126        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 127
 7058128        if (buffer.IsEmpty)
 0129        {
 0130            throw new ArgumentException($"The {nameof(buffer)} cannot be empty.", nameof(buffer));
 131        }
 132
 7058133        if (_reader is null)
 1134        {
 1135            throw new InvalidOperationException("Writing is not allowed before the connection is connected.");
 136        }
 7057137        if (_state.HasFlag(State.ShuttingDown))
 2138        {
 2139            throw new InvalidOperationException("Writing is not allowed after the connection is shutdown.");
 140        }
 7055141        if (!_state.TrySetFlag(State.Writing))
 1142        {
 1143            throw new InvalidOperationException("Writing is already in progress.");
 144        }
 145
 146        try
 7054147        {
 7054148            _writer.Write(buffer);
 7054149            FlushResult flushResult = await _writer.FlushAsync(cancellationToken).ConfigureAwait(false);
 7042150            if (flushResult.IsCanceled)
 0151            {
 152                // Dispose canceled ReadAsync.
 0153                throw new IceRpcException(IceRpcError.OperationAborted);
 154            }
 7042155        }
 156        finally
 7054157        {
 7054158            Debug.Assert(!_state.HasFlag(State.ShuttingDown));
 7054159            if (_state.HasFlag(State.Disposed))
 0160            {
 0161                _writer.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 0162            }
 7054163            _state.ClearFlag(State.Writing);
 7054164        }
 7042165    }
 166
 966167    public ColocConnection(ServerAddress serverAddress, PipeWriter writer)
 966168    {
 966169        _serverAddress = serverAddress;
 966170        _writer = writer;
 966171    }
 172
 173    private protected virtual void Dispose(bool disposing)
 1144174    {
 1144175        if (_state.TrySetFlag(State.Disposed))
 965176        {
 177            // _reader can be null if connection establishment failed or didn't run.
 965178            if (_reader is not null)
 908179            {
 908180                if (_state.HasFlag(State.Reading))
 0181                {
 0182                    _reader.CancelPendingRead();
 0183                }
 184                else
 908185                {
 908186                    _reader.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 908187                }
 908188            }
 189
 965190            if (_state.HasFlag(State.Writing))
 0191            {
 0192                _writer.CancelPendingFlush();
 0193            }
 194            else
 965195            {
 965196                _writer.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 965197            }
 965198        }
 1144199    }
 200
 201    private protected TransportConnectionInformation FinishConnect()
 883202    {
 883203        Debug.Assert(_reader is not null);
 204
 883205        if (_state.HasFlag(State.Disposed))
 0206        {
 0207            _reader.Complete();
 0208            throw new ObjectDisposedException($"{typeof(ColocConnection)}");
 209        }
 210
 883211        var colocEndPoint = new ColocEndPoint(_serverAddress);
 883212        return new TransportConnectionInformation(colocEndPoint, colocEndPoint, null);
 883213    }
 214
 215    private protected enum State : int
 216    {
 217        Disposed = 1,
 218        Reading = 2,
 219        ShuttingDown = 4,
 220        Writing = 8,
 221    }
 222}
 223
 224/// <summary>The colocated client connection class.</summary>
 225internal class ClientColocConnection : ColocConnection
 226{
 227    private readonly Func<PipeReader, CancellationToken, Task<PipeReader>> _connectAsync;
 228    private PipeReader? _localPipeReader;
 229
 230    public override async Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken)
 231    {
 232        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 233
 234        if (_reader is not null)
 235        {
 236            throw new InvalidOperationException("Connection establishment cannot be called twice.");
 237        }
 238
 239        Debug.Assert(!_state.HasFlag(State.ShuttingDown));
 240
 241        if (_localPipeReader is not null)
 242        {
 243            _reader = await _connectAsync(_localPipeReader, cancellationToken).ConfigureAwait(false);
 244            _localPipeReader = null; // The server-side connection is now responsible for completing the pipe reader.
 245        }
 246        return FinishConnect();
 247    }
 248
 249    internal ClientColocConnection(
 250        ServerAddress serverAddress,
 251        Pipe localPipe,
 252        Func<PipeReader, CancellationToken, Task<PipeReader>> connectAsync)
 253        : base(serverAddress, localPipe.Writer)
 254    {
 255        _connectAsync = connectAsync;
 256        _localPipeReader = localPipe.Reader;
 257    }
 258
 259    private protected override void Dispose(bool disposing)
 260    {
 261        base.Dispose(disposing);
 262        _localPipeReader?.Complete();
 263    }
 264}
 265
 266/// <summary>The colocated server connection class.</summary>
 267internal class ServerColocConnection : ColocConnection
 268{
 269    public override Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken) =>
 270        Task.FromResult(FinishConnect());
 271
 272    public ServerColocConnection(ServerAddress serverAddress, PipeWriter writer, PipeReader reader)
 273       : base(serverAddress, writer) => _reader = reader;
 274}