Dataverse: Get best Threads and Rows Count

To know the best settings for pushing data to Dataverse is tedious work (for batch processing). We need to consider the client hardware (Logical processor - to support multithreading and ram capacity), Network, and also it is unique for each of the tables that you want to run (for the Plugins/Workflow/Power Automate that trigger afterwards). And at last, we also need to consider the final boss, aka the Dataverse API Limit, to avoid the requests being rejected by the system. Even Microsoft put some guidance on how to do it. Practically, it is still hard to implement, and that is the reason I initiated the code to do it. If you do not agree/have a better idea to be applied, please do let me know!

Logic

Because we are basically just sending ExecuteMultipleRequest to Dataverse. The idea is to increase the rows and threads gradually to avoid being rejected by the system, and only record the improvement based on a certain percentage that we set:

using CsvHelper;
using Microsoft.PowerPlatform.Dataverse.Client;
using Microsoft.Xrm.Sdk;
using Microsoft.Xrm.Sdk.Messages;
using System.Diagnostics;

namespace DataverseBenchmarkTools.Business
{
    public class DataverseBenchmark
    {
        private const int RowIncrement = 50;
        private const int DefaultRowLimit = 200;
        private const double DefaultImprovementThreshold = 10.0;

        public enum Mode
        {
            Create,
            Update
        }

        public class InputModel : TInput
        {
            public string CsvFilePath { get; set; } = "";
            public char Delimiter { get; set; } = ',';
            public string Entity { get; set; } = "";
            public bool AllowDuplicates { get; set; } = false;
            public int Rows { get; set; } = 200;
            public int MaxRows { get; set; } = 1000;
            public int Threads { get; set; } = 3;
            public bool WarmingUp { get; set; }
            public int SleepTime { get; set; }
            public Mode Mode { get; set; }
            public double MaxProcessorPercentage { get; set; } = 1.5;
            public int StoppedIfSubsequentErrorFound { get; set; } = 2;
            public int StoppedIfSubsequentEnhancementNotFound { get; set; } = 3;
            public double PercentImprovement { get; set; } = 10.0;
        }

        public class OutputModel : TOutput
        {
            public int BestMaxRow { get; set; }
            public int BestThreads { get; set; }
            public TimeSpan BestTime { get; set; }
            public double BestRowsPerSecond { get; set; }
        }

        public class Operation : OperationBase<InputModel, OutputModel>
        {
            public Operation(IServiceProvider serviceProvider, InputModel input) : base(serviceProvider, input)
            {
            }

            private Microsoft.Xrm.Sdk.Metadata.EntityMetadata RetrieveEntityMetadata()
            {
                var retrieveEntityRequest = new RetrieveEntityRequest
                {
                    EntityFilters = Microsoft.Xrm.Sdk.Metadata.EntityFilters.Entity |
                        Microsoft.Xrm.Sdk.Metadata.EntityFilters.Attributes |
                        Microsoft.Xrm.Sdk.Metadata.EntityFilters.Relationships,
                    LogicalName = Input.Entity,
                    RetrieveAsIfPublished = true
                };

                var retrieveEntityResponse = (RetrieveEntityResponse)OrganizationService.Execute(retrieveEntityRequest);
                return retrieveEntityResponse.EntityMetadata;
            }

            private CsvReader PrepareCsvReader()
            {
                var reader = new StreamReader(Input.CsvFilePath);
                var csvConfig = new CsvHelper.Configuration.CsvConfiguration(System.Globalization.CultureInfo.InvariantCulture)
                {
                    Delimiter = Input.Delimiter.ToString()
                };
                var csv = new CsvReader(reader, csvConfig);
                csv.Read();
                csv.ReadHeader();

                var headerRecord = csv.HeaderRecord;
                if (headerRecord == null)
                {
                    throw new Exception("Header Record is null");
                }

                return csv;
            }

            private OrganizationRequest[][] ProcessRecords(dynamic[] records, Microsoft.Xrm.Sdk.Metadata.AttributeMetadata[] attributeMetadata, int limit)
            {
                var batchedEntityRequests = records.Select<dynamic, OrganizationRequest?>(record =>
                {
                    var expandoDict = (IDictionary<string, object>)record;
                    var entity = new Entity(Input.Entity);
                    foreach (var attr in attributeMetadata)
                    {
                        string value = (expandoDict.TryGetValue(attr.LogicalName, out var val) ? (val.ToString() ?? "") : "").Trim();
                        SetEntityAttribute(entity, attr, value);
                    }

                    if (entity.Attributes.Count == 0) return null;
                    return Input.Mode == Mode.Create ? new CreateRequest { Target = entity } :
                        new UpdateRequest { Target = entity };
                })
                .Where(e => e != null)
                .Chunk(limit).ToArray();

                return batchedEntityRequests!;
            }

            private async Task<BenchmarkResult> RunBenchmarkIteration(OrganizationRequest[][] batchedEntityRequests, int limit, int currentThreadCount, IOrganizationServiceAsync2 service)
            {
                var stopWatch = new Stopwatch();
                stopWatch.Start();
                var startTime = DateTime.Now;

                var semaphore = new SemaphoreSlim(currentThreadCount);
                var tasks = new List<Task>();
                var isError = false;

                foreach (var entityRecords in batchedEntityRequests)
                {
                    await semaphore.WaitAsync();
                    tasks.Add(Task.Run(async () =>
                    {
                        try
                        {
                            var executeMultipleRequest = new ExecuteMultipleRequest
                            {
                                Settings = new ExecuteMultipleSettings
                                {
                                    ContinueOnError = true,
                                    ReturnResponses = false
                                },
                                Requests = []
                            };
                            executeMultipleRequest.Requests.AddRange(entityRecords);

                            var threadResult = await service.ExecuteAsync(executeMultipleRequest) as ExecuteMultipleResponse;
                            if (threadResult == null)
                            {
                                Console.WriteLine("The ExecuteMultipleResponse is null.");
                            }
                            else
                            {
                                var hasError = threadResult.Responses.Any(r => r.Fault != null);
                                if (hasError)
                                {
                                    isError = true;
                                    Console.WriteLine("An error was found: " + threadResult.Responses.First(r => r.Fault != null).Fault.Message);
                                }
                            }
                        }
                        catch (Exception ex)
                        {
                            isError = true;
                            Console.WriteLine("An error was found: " + ex.Message);
                        }
                        finally
                        {
                            semaphore.Release();
                        }
                    }));
                }

                await Task.WhenAll(tasks);

                stopWatch.Stop();
                var endTime = DateTime.Now;

                var rowsPerSecond = batchedEntityRequests.Sum(b => b.Length) / stopWatch.Elapsed.TotalSeconds;

                return new BenchmarkResult
                {
                    CreatedDate = startTime,
                    EndDate = endTime,
                    TimeSpan = stopWatch.Elapsed,
                    MaxRowSetting = limit,
                    MaxThreads = currentThreadCount,
                    IsError = isError,
                    RowsPerSecond = rowsPerSecond
                };
            }

            private void UpdateBestSettings(BenchmarkResult benchmarkResult, ref BenchmarkResult bestSettings, ref int consecutiveNonImprovements)
            {
                if (!benchmarkResult.IsError && HasSignificantImprovement(bestSettings.RowsPerSecond,
                    benchmarkResult.RowsPerSecond, Input.PercentImprovement))
                {
                    if (benchmarkResult.RowsPerSecond != double.MinValue) consecutiveNonImprovements = 0;

                    bestSettings = new BenchmarkResult
                    {
                        CreatedDate = benchmarkResult.CreatedDate,
                        EndDate = benchmarkResult.EndDate,
                        TimeSpan = benchmarkResult.TimeSpan,
                        MaxRowSetting = benchmarkResult.MaxRowSetting,
                        MaxThreads = benchmarkResult.MaxThreads,
                        RowsPerSecond = benchmarkResult.RowsPerSecond
                    };
                }
                else
                {
                    consecutiveNonImprovements++;
                }
            }

            private void LogBenchmarkResult(BenchmarkResult benchmarkResult, BenchmarkResult bestSettings)
            {
                Console.WriteLine($"Run completed for {benchmarkResult.MaxRowSetting} rows and {benchmarkResult.MaxThreads} threads with {benchmarkResult.RowsPerSecond} rows/second. Best settings: {bestSettings.MaxRowSetting} rows, {bestSettings.MaxThreads} threads, with {bestSettings.RowsPerSecond} rows/second.");
            }

            private void UpdateLoopVariables(ref int skip, ref int limit, ref int currentThreadCount,
                ref bool isWarmingUp, int consecutiveNonImprovements, int maxAllowedThreads)
            {
                if (!Input.AllowDuplicates)
                {
                    skip += limit;
                }

                if (isWarmingUp && Input.WarmingUp)
                {
                    isWarmingUp = false;
                }
                else
                {
                    limit += RowIncrement;

                    // If no improvement found x times, increase the thread size
                    if (limit >= Input.MaxRows || consecutiveNonImprovements >= Input.StoppedIfSubsequentEnhancementNotFound)
                    {
                        consecutiveNonImprovements = 0;
                        currentThreadCount++;
                        limit = DefaultRowLimit;
                    }

                    if (currentThreadCount >= maxAllowedThreads && limit == DefaultRowLimit)
                    {
                        Console.WriteLine("Maximum threads reached.");
                    }

                    if (Input.WarmingUp)
                    {
                        isWarmingUp = true;
                    }
                }
            }

            private bool ShouldContinueBenchmark(int consecutiveErrors, int maxConsecutiveErrors, int currentThreadCount, int maxAllowedThreads, int limit)
            {
                if (consecutiveErrors >= maxConsecutiveErrors)
                {
                    Console.WriteLine("Maximum errors reached.");
                    return false;
                }

                if (currentThreadCount >= maxAllowedThreads && limit == DefaultRowLimit)
                {
                    return false;
                }

                return true;
            }

            public class BenchmarkResult
            {
                public TimeSpan TimeSpan { get; set; }
                public bool IsError { get; set; }
                public DateTime CreatedDate { get; set; }
                public DateTime EndDate { get; set; }
                public int MaxRowSetting { get; set; }
                public int MaxThreads { get; set; }
                public double RowsPerSecond { get; set; }
            }

            private void SetEntityAttribute(Entity entity, Microsoft.Xrm.Sdk.Metadata.AttributeMetadata attr, string value)
            {
                if (string.IsNullOrWhiteSpace(value)) return;

                if (attr.AttributeType == null || new[] {
                    Microsoft.Xrm.Sdk.Metadata.AttributeTypeCode.PartyList,
                    Microsoft.Xrm.Sdk.Metadata.AttributeTypeCode.Virtual,
                    Microsoft.Xrm.Sdk.Metadata.AttributeTypeCode.CalendarRules,
                    Microsoft.Xrm.Sdk.Metadata.AttributeTypeCode.EntityName,
                    Microsoft.Xrm.Sdk.Metadata.AttributeTypeCode.ManagedProperty
                }.Contains(attr.AttributeType.Value))
                {
                    throw new Exception("The tool does not support this attribute type at the moment.");
                }

                switch (attr.AttributeType)
                {
                    case Microsoft.Xrm.Sdk.Metadata.AttributeTypeCode.Picklist:
                    case Microsoft.Xrm.Sdk.Metadata.AttributeTypeCode.State:
                    case Microsoft.Xrm.Sdk.Metadata.AttributeTypeCode.Status:
                        if (int.TryParse(value, out var statusValue))
                        {
                            entity[attr.LogicalName] = new OptionSetValue(statusValue);
                        }
                        break;
                    case Microsoft.Xrm.Sdk.Metadata.AttributeTypeCode.Boolean:
                        if (bool.TryParse(value, out var boolValue))
                        {
                            entity[attr.LogicalName] = boolValue;
                        }
                        else if (value == "1")
                        {
                            entity[attr.LogicalName] = true;
                        }
                        else if (value == "0")
                        {
                            entity[attr.LogicalName] = false;
                        }
                        break;
                    case Microsoft.Xrm.Sdk.Metadata.AttributeTypeCode.DateTime:
                        if (DateTime.TryParse(value, out var dateValue))
                        {
                            entity[attr.LogicalName] = dateValue;
                        }
                        break;
                    case Microsoft.Xrm.Sdk.Metadata.AttributeTypeCode.Integer:
                        if (int.TryParse(value, out var intValue))
                        {
                            entity[attr.LogicalName] = intValue;
                        }
                        break;
                    case Microsoft.Xrm.Sdk.Metadata.AttributeTypeCode.BigInt:
                        if (long.TryParse(value, out var longValue))
                        {
                            entity[attr.LogicalName] = longValue;
                        }
                        break;
                    case Microsoft.Xrm.Sdk.Metadata.AttributeTypeCode.Decimal:
                        if (decimal.TryParse(value, out var decValue))
                        {
                            entity[attr.LogicalName] = decValue;
                        }
                        break;
                    case Microsoft.Xrm.Sdk.Metadata.AttributeTypeCode.Double:
                        if (double.TryParse(value, out var doubValue))
                        {
                            entity[attr.LogicalName] = doubValue;
                        }
                        break;
                    case Microsoft.Xrm.Sdk.Metadata.AttributeTypeCode.Money:
                        if (decimal.TryParse(value, out var moneyValue))
                        {
                            entity[attr.LogicalName] = new Money(moneyValue);
                        }
                        break;
                    case Microsoft.Xrm.Sdk.Metadata.AttributeTypeCode.Lookup:
                    case Microsoft.Xrm.Sdk.Metadata.AttributeTypeCode.Owner:
                    case Microsoft.Xrm.Sdk.Metadata.AttributeTypeCode.Customer:
                        if (Guid.TryParse(value, out var guidValue))
                        {
                            entity[attr.LogicalName] = new EntityReference
                            {
                                Id = guidValue,
                                LogicalName = attr.EntityLogicalName
                            };
                        }
                        break;
                    case Microsoft.Xrm.Sdk.Metadata.AttributeTypeCode.Uniqueidentifier:
                        if (Guid.TryParse(value, out var idValue))
                        {
                            entity[attr.LogicalName] = idValue;
                        }
                        break;
                    default:
                        entity[attr.LogicalName] = value;
                        break;
                }
            }

            public override async Task<OutputModel> HandleExecuteAsync()
            {
                var maxAllowedThreads = (int)(Environment.ProcessorCount * Input.MaxProcessorPercentage);

                var entityMetadata = RetrieveEntityMetadata();
                using var csv = PrepareCsvReader();

                var bestSettings = new BenchmarkResult { TimeSpan = TimeSpan.MaxValue, RowsPerSecond = double.MinValue };
                var runs = new List<BenchmarkResult>();

                var skip = 0;
                var limit = Input.Rows;
                var isWarmingUp = Input.WarmingUp;
                var currentThreadCount = Input.Threads;
                var consecutiveErrors = 0;
                var consecutiveNonImprovements = 1;

                var service = OrganizationService;

                do
                {
                    var records = csv.GetRecords<dynamic>()
                        .Skip(skip)
                        .Take(limit * currentThreadCount)
                        .ToArray();

                    var attributeMetadata = entityMetadata.Attributes
                        .Where(attr => csv.HeaderRecord!.Any(d => d == attr.LogicalName)).ToArray();

                    var batchedEntityRequests = ProcessRecords(records, attributeMetadata, limit);

                    var benchmarkResult = await RunBenchmarkIteration(batchedEntityRequests, limit, currentThreadCount, service);

                    if (benchmarkResult.IsError)
                    {
                        consecutiveErrors++;
                    }

                    if ((!isWarmingUp && Input.WarmingUp) || !Input.WarmingUp)
                    {
                        runs.Add(benchmarkResult);
                        UpdateBestSettings(benchmarkResult, ref bestSettings, ref consecutiveNonImprovements);
                        LogBenchmarkResult(benchmarkResult, bestSettings);
                    }

                    UpdateLoopVariables(ref skip, ref limit, ref currentThreadCount, ref isWarmingUp, consecutiveNonImprovements, maxAllowedThreads);

                    if (!isWarmingUp)
                    {
                        // Renew the service to avoid any caching/token expired issues
                        service = OrganizationService;
                    }

                    if (!ShouldContinueBenchmark(consecutiveErrors, Input.StoppedIfSubsequentErrorFound, currentThreadCount, maxAllowedThreads, limit))
                        break;

                    Console.WriteLine($"Sleep for {Input.SleepTime} minute(s)..");
                    Thread.Sleep(TimeSpan.FromMinutes(Input.SleepTime));
                } while (true);

                return new OutputModel
                {
                    BestMaxRow = bestSettings.MaxRowSetting,
                    BestThreads = bestSettings.MaxThreads,
                    BestTime = bestSettings.TimeSpan,
                    BestRowsPerSecond = bestSettings.RowsPerSecond
                };
            }
        }

        public static bool HasSignificantImprovement(double oldValue, double newValue, double thresholdPercent = DefaultImprovementThreshold)
        {
            if (oldValue <= 0) return true;
            return newValue >= oldValue * (1.0 + thresholdPercent / 100.0);
        }
    }
}

As you can see in the snippet code above. If during the run we are not getting any improvement for x number, we will directly add a new thread and set the rows to 200 (with the assumption that increasing rows is not worthy to run).

Last, here is how I set to call the business logic above:

using DataverseBenchmarkTools.Business;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.PowerPlatform.Dataverse.Client;

ServiceClient.MaxConnectionTimeout = TimeSpan.FromMinutes(30);

// Bump up the min threads reserved for this app to ramp connections faster - minWorkerThreads defaults to 4, minIOCP defaults to 4 
ThreadPool.SetMinThreads(100, 100);
// Change max connections from .NET to a remote service default: 2
System.Net.ServicePointManager.DefaultConnectionLimit = 65000;
// Turn off the Expect 100 to continue message - 'true' will cause the caller to wait until it round-trip confirms a connection to the server 
System.Net.ServicePointManager.Expect100Continue = false;
// Can decrease overall transmission overhead but can cause delay in data packet arrival
System.Net.ServicePointManager.UseNagleAlgorithm = false;

var builder = Host.CreateApplicationBuilder(args);
builder.Services.AddTransient(_ =>
{
    var serviceClient = new ServiceClient("AuthType=ClientSecret;Url=https://your-crm-url.crm.dynamics.com;ClientId=your-client-id;ClientSecret=your-client-secret")
    {
        UseExponentialRetryDelayForConcurrencyThrottle = false,
        MaxRetryCount = 0,
        UseWebApi = true,
        EnableAffinityCookie = true
    };

    if (!serviceClient.IsReady)
    {
        throw serviceClient.LastException;
    }

    return serviceClient;
});

var host = builder.Build();

var result = await new DataverseBenchmark.Operation(host.Services,
    new DataverseBenchmark.InputModel
    {
        CsvFilePath = "C:\\Code\\performance_benchmark.csv",
        AllowDuplicates = false,
        Delimiter = ';',
        Entity = "contact",
        MaxRows = 1000,
        MaxProcessorPercentage = 1.5,
        Rows = 200,
        Threads = 2,
        StoppedIfSubsequentEnhancementNotFound = 3,
        StoppedIfSubsequentErrorFound = 2,
        Mode = DataverseBenchmark.Mode.Create,
        SleepTime = 1,
        WarmingUp = false,
        PercentImprovement = 5
    }).Execute();

Console.WriteLine($"Best setting found! Row: {result.BestMaxRow}. Threads: {result.BestThreads}. Rows/second: {result.BestRowsPerSecond}..");
Console.ReadKey();

Then we need to prepare csv file with the format of attributeNames of the entity that we want to test as the header. For example, on the Contact entity/table:

firstname;lastname
test1;last1
test2;last2
test3;last3

Sample performance_benchmark.csv

As you can see, the system will try to get the best rows/second. Starts from the least threads and a minimum of 200 rows. Every run, the system will put itself to sleep to give buffer to the Dataverse, and it will resume the next run until we can get the best threads and rows:

Demo run

What do you think?

Happy CRM-ing! 🚀

Leave a comment

Your comment is sent privately to the author and isn't published on the site.