Dataverse: Bulk Performance Settings - Max Threads and Chunks

Currently, I'm still focusing on how to improve bulk processing in Dataverse. After implementing User Multiplexing, we will learn another consideration that we can apply which is the total threads and chunks of data per thread. Of course, some variables such as network speed, machine specs, Dataverse environments (the machine specs that Microsoft set for your environment), and customizations (Plugin/Workflow for that specific entity/table that you are testing), (etc.. variables that I might not know at the moment) will be different. But, the purpose of this article is to let you know how to experiment and adjust the settings for you to get the best performance. Let's learn!

Benchmark Code

I already mentioned about this in the past (I guess). Hands down, the best template code for processing bulk data you can get from this article is from the great Mark Carrington - Founder of SQL4CDS. And, based on his code, I changed a bit here and there to fit User Multiplexing and also assigned the correct chunks of data to the App User:

using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Configs;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.PowerPlatform.Dataverse.Client;
using Microsoft.Xrm.Sdk;
using Microsoft.Xrm.Sdk.Messages;
using System.Net;

namespace DataverseBenchmarkProject
{
    [MemoryDiagnoser]
    [Config(typeof(Config))]
    [Orderer(BenchmarkDotNet.Order.SummaryOrderPolicy.FastestToSlowest)]
    [SimpleJob(launchCount: 1, warmupCount: 0)]
    public class MaxDegreeOfParellismBenchmark
    {
        private readonly XrmConnection _xrmConnection;

        private class Config : ManualConfig
        {
            public Config()
            {
                SummaryStyle = DefaultConfig.Instance.SummaryStyle
                    .WithTimeUnit(Perfolizer.Horology.TimeUnit.Second);
            }
        }

        public MaxDegreeOfParellismBenchmark()
        {
            #region Optimize Connection
            // Change max connections from .NET to a remote service default: 2
            ServicePointManager.DefaultConnectionLimit = 65000;
            // Bump up the min threads reserved for this app to ramp connections faster - minWorkerThreads defaults to 4, minIOCP defaults to 4
            ThreadPool.SetMinThreads(100, 100);
            // Turn off the Expect 100 to continue message - 'true' will cause the caller to wait until it round-trip confirms a connection to the server
            ServicePointManager.Expect100Continue = false;
            // Can decrease overall transmission overhead but can cause delay in data packet arrival
            ServicePointManager.UseNagleAlgorithm = false;
            #endregion Optimize Connection

            ServiceClient.MaxConnectionTimeout = TimeSpan.FromHours(2);

            var connectionStrings = Startup.GetApplicationHost().Services.GetService<ConnectionString>()!;
            _xrmConnection = new XrmConnection(connectionStrings);
        }

        private const int _totalRecordPerBatch = 30;

        [Benchmark]
        public void CreateMultipleRecommendedDegreesOfParallelism()
        {
            var groups = GetRequests("CreateMultipleRecommendedDegreesOfParallelism")
                .ToArray()
                .AsChunksCrmConnection(_totalRecordPerBatch, _xrmConnection);

            var client = _xrmConnection.GetServiceClient();
            var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = client.RecommendedDegreesOfParallelism };
            Parallel.ForEach(groups, parallelOptions, requestBatch =>
               {
                   var executeMultipleRequest = new ExecuteMultipleRequest
                   {
                       Requests = new OrganizationRequestCollection(),
                       Settings = new ExecuteMultipleSettings
                       {
                           ContinueOnError = true,
                           ReturnResponses = false
                       }
                   };

                   foreach (var entity in requestBatch.Data)
                   {
                       executeMultipleRequest.Requests.Add(new CreateRequest { Target = entity });
                   }

                   var result = (ExecuteMultipleResponse)requestBatch.ServiceClient.Execute(executeMultipleRequest);

                   if (result.IsFaulted)
                   {
                       throw new Exception(result.Responses.First().Response.Results["TraceText"].ToString());
                   }
               });
        }

        [Benchmark]
        public void CreateMultipleEnvironmentProcessor()
        {
            var groups = GetRequests("CreateMultipleEnvironmentProcessor")
                .ToArray()
                .AsChunksCrmConnection(_totalRecordPerBatch, _xrmConnection);

            var client = _xrmConnection.GetServiceClient();
            var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount };
            Parallel.ForEach(groups, parallelOptions, requestBatch =>
               {
                   var executeMultipleRequest = new ExecuteMultipleRequest
                   {
                       Requests = new OrganizationRequestCollection(),
                       Settings = new ExecuteMultipleSettings
                       {
                           ContinueOnError = true,
                           ReturnResponses = false
                       }
                   };

                   foreach (var entity in requestBatch.Data)
                   {
                       executeMultipleRequest.Requests.Add(new CreateRequest { Target = entity });
                   }

                   var result = (ExecuteMultipleResponse)requestBatch.ServiceClient.Execute(executeMultipleRequest);

                   if (result.IsFaulted)
                   {
                       throw new Exception(result.Responses.First().Response.Results["TraceText"].ToString());
                   }
               });
        }

        [Benchmark]
        public void CreateMultiple100Threads()
        {
            var groups = GetRequests("CreateMultiple100Threads")
                .ToArray()
                .AsChunksCrmConnection(_totalRecordPerBatch, _xrmConnection);

            var client = _xrmConnection.GetServiceClient();
            var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 100 };
            Parallel.ForEach(groups, parallelOptions, requestBatch =>
            {
                var executeMultipleRequest = new ExecuteMultipleRequest
                {
                    Requests = new OrganizationRequestCollection(),
                    Settings = new ExecuteMultipleSettings
                    {
                        ContinueOnError = true,
                        ReturnResponses = false
                    }
                };

                foreach (var entity in requestBatch.Data)
                {
                    executeMultipleRequest.Requests.Add(new CreateRequest { Target = entity });
                }

                var result = (ExecuteMultipleResponse)requestBatch.ServiceClient.Execute(executeMultipleRequest);

                if (result.IsFaulted)
                {
                    throw new Exception(result.Responses.First().Response.Results["TraceText"].ToString());
                }
            });
        }

        private IEnumerable<Entity> GetRequests(string methodName)
        {
            for (int i = 0; i < 500; i++)
            {
                var entity = new Entity("contact")
                {
                    ["firstname"] = methodName,
                    ["lastname"] = Guid.NewGuid().ToString(),
                };

                yield return entity;
            }
        }
    }

    public static class ArrayExtensions
    {
        public static IEnumerable<T[]> AsChunks<T>(this T[] source, int chunkMaxSize)
        {
            var chunks = source.Length / chunkMaxSize;
            var leftOver = source.Length % chunkMaxSize;
            var result = new List<T[]>(chunks + 1);
            var offset = 0;

            for (var i = 0; i < chunks; i++)
            {
                result.Add(new ArraySegment<T>(source,
                                               offset,
                                               chunkMaxSize).ToArray());
                offset += chunkMaxSize;
            }

            if (leftOver > 0)
            {
                result.Add(new ArraySegment<T>(source,
                                               offset,
                                               leftOver).ToArray());
            }

            return result;
        }

        public static IEnumerable<ThreadingModel<T>> AsChunksCrmConnection<T>(this T[] source, int chunkMaxSize, XrmConnection connection)
        {
            var result = source.AsChunks(chunkMaxSize);

            foreach (var group in result)
            {
                var crmConnection = connection.GetServiceClient();
                yield return new ThreadingModel<T> { ServiceClient = crmConnection, Data = group };
            }
        }

        public class ThreadingModel<T>
        {
            public ServiceClient ServiceClient { get; set; }
            public T[] Data { get; set; }
        }
    }

    public record ConnectionString(string[] ConnectionStrings);

    public class XrmConnection
    {
        private static readonly object _lock = new();
        public XrmConnection(ConnectionString connectionString)
        {
            Connections = connectionString.ConnectionStrings.Select(e => new Connection { ConnectionString = e }).ToArray();
        }
        public static Connection[] Connections { get; private set; }

        public string GetConnectionString()
        {
            lock (_lock)
            {
                var minValue = Connections.Min(e => e.Counter);
                var index = -1;
                for (int i = 0; i < Connections.Length; i++)
                {
                    if (Connections[i].Counter != minValue) continue;
                    index = i;
                    break;
                }
                Connections[index].Counter++;

                return Connections[index].ConnectionString;
            }
        }

        public ServiceClient GetServiceClient()
        {
            lock (_lock)
            {
                var minValue = Connections.Min(e => e.Counter);
                var index = -1;
                for (int i = 0; i < Connections.Length; i++)
                {
                    if (Connections[i].Counter != minValue) continue;
                    index = i;
                    break;
                }

                return Connections[index].GetServiceClient();
            }
        }
    }

    public class Connection
    {
        public Connection()
        {
            Counter = 0;
            ExpiredOn = DateTime.MinValue;
        }

        public string ConnectionString { get; set; }
        public int Counter { get; set; }
        public DateTime? ExpiredOn { get; set; }

        private static ServiceClient? _serviceClient = null;
        public ServiceClient GetServiceClient()
        {
            Counter += 1;
            if (_serviceClient == null)
            {
                _serviceClient = new ServiceClient(ConnectionString);
                ExpiredOn = DateTime.UtcNow.AddMinutes(45);
                Console.WriteLine($"New ServiceClient created. Counter: {Counter}. ExpiredOn: {ExpiredOn}");
            }
            else if (ExpiredOn.GetValueOrDefault() <= DateTime.UtcNow)
            {
                _serviceClient = _serviceClient.Clone();
                // https://learn.microsoft.com/en-us/entra/identity-platform/access-tokens
                ExpiredOn = DateTime.UtcNow.AddMinutes(45);
            }
            return _serviceClient;
        }
    }
}

As you can see in the above code, the benchmark will be comparing MaxDegreeOfParallelism between CrmServiceClient.RecommendedDegreesOfParallelism, Environment.ProcessorCount, and static configuration of 100 threads. The other variable that we also can change is the total chunks of the data per thread that we can set.

Here is the result of _totalRecordPerBatch of 20:

BenchmarkDotNet v0.14.0, Windows 11 (10.0.26100.2033)
AMD Ryzen 5 5600G with Radeon Graphics, 1 CPU, 12 logical and 6 physical cores
.NET SDK 8.0.300
  [Host]     : .NET 8.0.5 (8.0.524.21615), X64 RyuJIT AVX2
  Job-KFCBZV : .NET 8.0.5 (8.0.524.21615), X64 RyuJIT AVX2

LaunchCount=1  WarmupCount=0  
Method Mean Error StdDev Allocated
CreateMultiple100Threads 49.33 s 1.413 s 4.144 s 4.7 MB
CreateMultipleRecommendedDegreesOfParallelism 49.37 s 1.743 s 5.139 s 4.35 MB
CreateMultipleEnvironmentProcessor 50.28 s 1.452 s 4.212 s 4.15 MB

And here is the result of _totalRecordPerBatch of 30:

BenchmarkDotNet v0.14.0, Windows 11 (10.0.26100.2033)
AMD Ryzen 5 5600G with Radeon Graphics, 1 CPU, 12 logical and 6 physical cores
.NET SDK 8.0.300
  [Host]     : .NET 8.0.5 (8.0.524.21615), X64 RyuJIT AVX2
  Job-TYFLDE : .NET 8.0.5 (8.0.524.21615), X64 RyuJIT AVX2

LaunchCount=1  WarmupCount=0  
Method Mean Error StdDev Allocated
CreateMultiple100Threads 48.98 s 1.601 s 4.721 s 4.07 MB
CreateMultipleRecommendedDegreesOfParallelism 49.31 s 1.563 s 4.511 s 3.75 MB
CreateMultipleEnvironmentProcessor 50.65 s 1.783 s 5.256 s 3.81 MB

Summary

As you can see for the machine that I'm using, bumping the _totalRecordPerBatch from 20 - 30 does not significantly change the duration. The best setting from those 3 methods is the CreateMultiple100Threads which means that we need to assess the best threads our machine (and Dataverse) can process. I might create another utility tool to find the best settings that we can apply. But for now, this is the learning that I can share with you!

Happy CRM-ing! 🚀

Leave a comment

Your comment is sent privately to the author and isn't published on the site.