Dataverse: Updated ServiceClient Strategy For Better Performance

If you remember, I created this blog post on how to create a class that wrapped ServiceClient. I must do it because instancing a ServiceClient is expensive (around 2 seconds). Hence, I need to find a way to "store" it in memory and ensure it is always ready when I need to call it.

Fast-forward. Today, I want to test how well Anthropic Claude 3.7 Sonnet gives recommendations about that code. The prompt that I gave is to ensure it is "high performance" and "fast." Here is the result (the code below is running in net8.0)!

Connection Classes

For your reference, here is the ConnectionString record:

namespace DataverseBenchmarkProject.Connections
{
    public record ConnectionString(string[] ConnectionStrings);
}

Here is the updated Connection class:

using Microsoft.PowerPlatform.Dataverse.Client;

namespace DataverseBenchmarkProject.Connections
{
    /// <summary>
    /// Represents a connection to Dataverse.
    /// </summary>
    public class Connection : IDisposable
    {
        // Constants for connection expiration
        private const int _connectionLifetimeMinutes = 55;
        private const int _preemptiveRenewalThresholdMinutes = 5;

        // Connection properties
        public required string ConnectionString { get; set; }
        private int _counter = 0;
        public int Counter => _counter;

        // Connection state
        private ServiceClient? _currentClient;
        private DateTime _expirationTime = DateTime.MinValue;
        private Task<ServiceClient>? _renewalTask;
        private readonly SemaphoreSlim _connectionLock = new(1, 1);
        private bool _disposed;

        /// <summary>
        /// Gets a ServiceClient instance, creating or renewing it if necessary.
        /// </summary>
        /// <returns>A ServiceClient instance.</returns>
        public ServiceClient GetServiceClient()
        {
            if (_disposed)
            {
                throw new ObjectDisposedException(nameof(Connection));
            }

            // Fast path - if we have a valid client, increment counter and return it
            if (_currentClient != null && DateTime.UtcNow < _expirationTime)
            {
                // Check if we need to start preemptive renewal
                if (_renewalTask == null && ShouldStartRenewal())
                {
                    StartRenewalTask();
                }

                Interlocked.Increment(ref _counter);
                return _currentClient;
            }

            // Slow path - we need to create or renew the client
            _connectionLock.Wait();
            try
            {
                // Check again after acquiring the lock
                if (_currentClient != null && DateTime.UtcNow < _expirationTime)
                {
                    Interlocked.Increment(ref _counter);
                    return _currentClient;
                }

                // If we have a completed renewal task, use its result
                if (_renewalTask != null && _renewalTask.IsCompleted && !_renewalTask.IsFaulted)
                {
                    SetNewClient(_renewalTask.Result);
                    _renewalTask = null;
                }
                // Otherwise, create a new client synchronously
                else
                {
                    var newClient = CreateServiceClient();
                    if (!newClient.IsReady)
                    {
                        throw new InvalidOperationException(
                            "Failed to create a ready ServiceClient."
                        );
                    }
                    SetNewClient(newClient);
                }

                Interlocked.Increment(ref _counter);
                return _currentClient!;
            }
            finally
            {
                _connectionLock.Release();
            }
        }

        /// <summary>
        /// Determines if we should start a renewal task.
        /// </summary>
        /// <returns>True if renewal should start, false otherwise.</returns>
        private bool ShouldStartRenewal()
        {
            var timeUntilExpiration = _expirationTime - DateTime.UtcNow;
            return timeUntilExpiration.TotalMinutes <= _preemptiveRenewalThresholdMinutes;
        }

        /// <summary>
        /// Starts a task to renew the connection.
        /// </summary>
        private void StartRenewalTask()
        {
            _renewalTask = Task.Run(() => CreateServiceClient());
        }

        /// <summary>
        /// Creates a new ServiceClient instance.
        /// </summary>
        /// <returns>A new ServiceClient instance.</returns>
        private ServiceClient CreateServiceClient()
        {
            return new ServiceClient(ConnectionString) { EnableAffinityCookie = false };
        }

        /// <summary>
        /// Sets a new client as the current client.
        /// </summary>
        /// <param name="newClient">The new ServiceClient instance.</param>
        private void SetNewClient(ServiceClient newClient)
        {
            var oldClient = _currentClient;
            _currentClient = newClient;
            _expirationTime = DateTime.UtcNow.AddMinutes(_connectionLifetimeMinutes);
            Interlocked.Exchange(ref _counter, 0);

            // Dispose the old client in the background
            if (oldClient != null)
            {
                Task.Run(() => oldClient.Dispose());
            }
        }

        /// <summary>
        /// Disposes the Connection instance.
        /// </summary>
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        /// <summary>
        /// Disposes the Connection instance.
        /// </summary>
        /// <param name="disposing">Whether the method is called from Dispose() or the finalizer.</param>
        protected virtual void Dispose(bool disposing)
        {
            if (!_disposed)
            {
                if (disposing)
                {
                    // Dispose managed resources
                    _connectionLock.Dispose();
                    _currentClient?.Dispose();
                }

                _disposed = true;
            }
        }
    }
}

Last, here is the XrmConnection class:

using Microsoft.PowerPlatform.Dataverse.Client;

namespace DataverseBenchmarkProject.Connections
{
    public class XrmConnection : IDisposable
    {
        // Use a more efficient collection for storing connections
        private readonly List<Connection> _connections = [];

        // Use a reader-writer lock for better concurrency
        private readonly ReaderWriterLockSlim _rwLock = new();

        // Track if the object has been disposed
        private bool _disposed = false;

        /// <summary>
        /// Initializes a new instance of the XrmConnection class.
        /// </summary>
        /// <param name="connectionString">The connection strings to use.</param>
        public XrmConnection(ConnectionString connectionString)
        {
            InitializeConnections(connectionString.ConnectionStrings).GetAwaiter().GetResult();
        }

        /// <summary>
        /// Asynchronously initializes connections.
        /// </summary>
        /// <param name="connectionStrings">Array of connection strings.</param>
        /// <returns>A task representing the asynchronous operation.</returns>
        private async Task InitializeConnections(string[] connectionStrings)
        {
            // Create a list to hold the tasks
            var connectionTasks = new List<Task<Connection?>>();

            // Create a task for each connection string
            foreach (var connectionStr in connectionStrings)
            {
                connectionTasks.Add(CreateConnectionAsync(connectionStr));
            }

            // Wait for all tasks to complete
            var connections = await Task.WhenAll(connectionTasks);

            // Add all valid connections to the list
            _rwLock.EnterWriteLock();
            try
            {
                foreach (var connection in connections)
                {
                    if (connection != null)
                    {
                        _connections.Add(connection);
                    }
                }
            }
            finally
            {
                _rwLock.ExitWriteLock();
            }

            // Ensure we have at least one valid connection
            if (_connections.Count == 0)
            {
                throw new InvalidOperationException("No valid connections could be established.");
            }
        }

        /// <summary>
        /// Creates a connection asynchronously.
        /// </summary>
        /// <param name="connectionString">The connection string.</param>
        /// <returns>A task that returns a Connection object or null if the connection failed.</returns>
        private async Task<Connection?> CreateConnectionAsync(string connectionString)
        {
            try
            {
                var conn = new Connection { ConnectionString = connectionString };
                // Check if the connection is ready
                var client = await Task.Run(() => conn.GetServiceClient());
                return client.IsReady ? conn : null;
            }
            catch (Exception)
            {
                // Log the exception if needed
                return null;
            }
        }

        /// <summary>
        /// Gets a service client with the lowest usage count.
        /// </summary>
        /// <returns>A ServiceClient instance.</returns>
        public ServiceClient GetServiceClient()
        {
            if (_disposed)
            {
                throw new ObjectDisposedException(nameof(XrmConnection));
            }

            // Use a read lock for better concurrency
            _rwLock.EnterReadLock();
            try
            {
                // Find the connection with the minimum counter
                // This is more efficient than using OrderBy().First()
                Connection? minConnection = null;
                int minCounter = int.MaxValue;

                foreach (var connection in _connections)
                {
                    if (connection.Counter < minCounter)
                    {
                        minCounter = connection.Counter;
                        minConnection = connection;
                    }
                }

                if (minConnection == null)
                {
                    throw new InvalidOperationException("No valid connections available.");
                }

                return minConnection.GetServiceClient();
            }
            finally
            {
                _rwLock.ExitReadLock();
            }
        }

        /// <summary>
        /// Disposes the XrmConnection instance.
        /// </summary>
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        /// <summary>
        /// Disposes the XrmConnection instance.
        /// </summary>
        /// <param name="disposing">Whether the method is called from Dispose() or the finalizer.</param>
        protected virtual void Dispose(bool disposing)
        {
            if (!_disposed)
            {
                if (disposing)
                {
                    // Dispose managed resources
                    _rwLock.Dispose();

                    // Dispose all connections
                    foreach (var connection in _connections)
                    {
                        if (connection is IDisposable disposableConnection)
                        {
                            disposableConnection.Dispose();
                        }
                    }

                    // Clear the connections list
                    _connections.Clear();
                }

                _disposed = true;
            }
        }
    }
}

The appSettings.json will be like this:

{
  "DataverseConnectionString1": "yourconnectionstring1",
  "DataverseConnectionString2":  "yourconnectionstring2",
  "DataverseConnectionString3":  "yourconnectionstring3"
}

If you are using dependency injector on Startup.cs, the code will be like this:

public static class Startup
{
    public static IHost GetApplicationHost()
    {
        var hostBuilder = new HostBuilder()
            .ConfigureAppConfiguration(builder => builder.SetBasePath(Directory.GetCurrentDirectory())
                .AddJsonFile("appSettings.json", optional: false, reloadOnChange: true))
            .ConfigureServices((context, services) =>
            {
                var connectionStrings = context.Configuration.AsEnumerable().Where(e => e.Key.Contains("DataverseConnectionString")).Select(e => e.Value).ToArray();
                var connectionString = new ConnectionString(connectionStrings!);
                services.AddSingleton(new XrmConnection(connectionString!));
            });

        return hostBuilder.Build();
    }
}

Testing

For the testing itself, again, I'm asking for recommendations on how to create "high-performance" bulk requests for data processing.I took the base benchmark class from this blog post and I added the method that the LLM gave:

using BenchmarkDotNet.Attributes;
using DataverseBenchmarkProject.Connections;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Xrm.Sdk;
using Microsoft.Xrm.Sdk.Messages;

namespace DataverseBenchmarkProject;

[MemoryDiagnoser]
[Config(typeof(Config))]
[Orderer(BenchmarkDotNet.Order.SummaryOrderPolicy.FastestToSlowest)]
[SimpleJob(launchCount: 1, warmupCount: 0)]
public class CreateEntitiesBenchmark
{
    public CreateEntitiesBenchmark()
    {
        var connectionStrings = Startup
            .GetApplicationHost()
            .Services.GetService<ConnectionString>()!;
        _xrmConnection = new XrmConnection(connectionStrings);
    }

    private readonly XrmConnection _xrmConnection;
    private readonly int _maxRequestPerBatch = 15;
    private readonly int _workerCount = 25;

    private readonly int _totalData = 400;

    public CreateRequest GenerateRequest(string info)
    {
        return new CreateRequest
        {
            Target = new Entity("contact")
            {
                ["firstname"] = info,
                ["lastname"] = Guid.NewGuid().ToString(),
            },
        };
    }

    [Benchmark(Baseline = true)]
    public void MarkCarringtonExecuteMultipleRequest()
    {
        var requests = new List<CreateRequest>();
        for (int i = 0; i < _totalData; i++)
        {
            requests.Add(GenerateRequest("MarkCarrington"));
        }

        Parallel.ForEach(
            requests,
            new ParallelOptions { MaxDegreeOfParallelism = _workerCount },
            () =>
                new
                {
                    Service = _xrmConnection.GetServiceClient(),
                    EMR = new ExecuteMultipleRequest
                    {
                        Requests = [],
                        Settings = new ExecuteMultipleSettings
                        {
                            ContinueOnError = false,
                            ReturnResponses = true,
                        },
                    },
                },
            (req, loopState, index, threadLocalState) =>
            {
                threadLocalState.EMR.Requests.Add(req);
                if (threadLocalState.EMR.Requests.Count == _maxRequestPerBatch)
                {
                    var result = (ExecuteMultipleResponse)
                        threadLocalState.Service.Execute(threadLocalState.EMR);
                    Console.WriteLine(
                        $"Created MarkCarringtonExecuteMultipleRequest {result.Responses.Count}"
                    );
                    threadLocalState.EMR.Requests.Clear();
                }
                return threadLocalState;
            },
            (threadLocalState) =>
            {
                if (threadLocalState.EMR.Requests.Count > 0)
                {
                    var result = (ExecuteMultipleResponse)
                        threadLocalState.Service.Execute(threadLocalState.EMR);
                    Console.WriteLine(
                        $"Created MarkCarringtonExecuteMultipleRequest {result.Responses.Count}"
                    );
                }
            }
        );
    }

    [Benchmark]
    public void ParallelForEachAsync()
    {
        var requests = new List<CreateRequest>();
        for (int i = 0; i < _totalData; i++)
        {
            requests.Add(GenerateRequest("ParallelForEachAsync"));
        }
        var groupData = requests.Chunk(_maxRequestPerBatch).ToArray();
        var parent = Parallel.ForEachAsync(
            groupData,
            new ParallelOptions { MaxDegreeOfParallelism = _workerCount },
            async (reqs, cancellationToken) =>
            {
                var service = _xrmConnection.GetServiceClient();
                var emr = new ExecuteMultipleRequest
                {
                    Requests = [],
                    Settings = new ExecuteMultipleSettings
                    {
                        ContinueOnError = false,
                        ReturnResponses = true,
                    },
                };
                emr.Requests.AddRange(reqs);
                var result = (ExecuteMultipleResponse)
                    await service.ExecuteAsync(emr, cancellationToken);
                Console.WriteLine($"Created ParallelForEachAsync {result.Responses.Count}");
            }
        );

        parent.Wait();
    }

    [Benchmark]
    public void HighPerformanceParallelProcessing()
    {
        // Pre-generate all requests to avoid allocation during processing
        var requests = new CreateRequest[_totalData];
        for (int i = 0; i < _totalData; i++)
        {
            requests[i] = GenerateRequest("HighPerformanceParallelProcessing");
        }

        // Calculate optimal chunk size based on system
        int processorCount = Environment.ProcessorCount;
        int optimalBatchSize = _maxRequestPerBatch;
        int optimalThreads = _workerCount;

        // Use array for best performance (no resizing overhead)
        var results = new int[optimalThreads];
        var countdown = new CountdownEvent(optimalThreads);
        var chunks = _totalData / optimalThreads;

        // Start worker threads
        for (int threadId = 0; threadId < optimalThreads; threadId++)
        {
            int localThreadId = threadId;
            _ = ThreadPool.UnsafeQueueUserWorkItem(async _ =>
            {
                try
                {
                    // Each thread gets its own service client to avoid contention
                    var service = _xrmConnection.GetServiceClient();
                    int processedCount = 0;

                    // Calculate this thread's work range
                    int start = localThreadId * chunks;
                    int end = (localThreadId == optimalThreads - 1)
                        ? _totalData
                        : (localThreadId + 1) * chunks;

                    // Pre-allocate request collection for reuse
                    var emr = new ExecuteMultipleRequest
                    {
                        Settings = new ExecuteMultipleSettings
                        {
                            ContinueOnError = false,
                            ReturnResponses = true
                        }
                    };

                    var requestCollection = new OrganizationRequestCollection();
                    emr.Requests = requestCollection;

                    // Process assigned range
                    for (int i = start; i < end; i++)
                    {
                        requestCollection.Add(requests[i]);

                        // When batch is full, process it
                        if (requestCollection.Count == optimalBatchSize || i == end - 1)
                        {
                            if (requestCollection.Count > 0)
                            {
                                var response = (ExecuteMultipleResponse)await service.ExecuteAsync(emr);
                                Console.WriteLine($"Created HighPerformanceParallelProcessing {response.Responses.Count}");
                                processedCount += response.Responses.Count;
                                requestCollection.Clear();
                            }
                        }
                    }

                    // Store results
                    results[localThreadId] = processedCount;
                }
                finally
                {
                    // Signal completion
                    countdown.Signal();
                }
            }, new object(), preferLocal: true);
        }

        // Wait for all threads to complete
        countdown.Wait();

        // Aggregate results if needed
        int totalProcessed = 0;
        for (int i = 0; i < results.Length; i++)
        {
            totalProcessed += results[i];
        }

        Console.WriteLine($"Total processed: {totalProcessed}");
    }
}

Here is the result of the benchmark:

BenchmarkDotNet v0.14.0, Windows 11 (10.0.26100.3194)
AMD Ryzen 5 5600G with Radeon Graphics, 1 CPU, 12 logical and 6 physical cores
.NET SDK 9.0.103
  [Host]     : .NET 8.0.13 (8.0.1325.6609), X64 RyuJIT AVX2
  Job-DJBYJI : .NET 8.0.13 (8.0.1325.6609), X64 RyuJIT AVX2

LaunchCount=1  WarmupCount=0  
Method Mean Error StdDev Median Ratio RatioSD Allocated Alloc Ratio
HighPerformanceParallelProcessing 4,000.6 ms 209.4 ms 580.2 ms 3,828.1 ms 0.18 0.03 6.93 MB 1.41
ParallelForEachAsync 5,189.7 ms 152.3 ms 421.9 ms 5,172.2 ms 0.23 0.03 4.99 MB 1.01
MarkCarringtonExecuteMultipleRequest 22,782.3 ms 685.1 ms 1,998.4 ms 22,479.1 ms 1.01 0.12 4.92 MB 1.00

There are slight differences from the previous run result for the mean (ParallelForEachAsync previously the mean is around 4 seconds - and the current result is 5 seconds. But it is what it is based on the result 🥲). But the main improvement that you can see is on the allocated memory that improved a lot (From 16 MB to max 6.93 MB on the new method) which means the recommendations are really works!

Happy CRM-ing! 🚀

Leave a comment

Your comment is sent privately to the author and isn't published on the site.