Safe update in Cosmos DB with etag, ASP.NET Core

This post explains how you can do safe updates of documents in Cosmos DB with optimistic concurrency control by using the etag of a document. Cosmos DB implements optimistic concurrency and you can not lock documents while reading from or writing to a document.

Optimistic concurrency means that if two concurrent operations attempts to update a document within a logical partion, one operation will win and the other operation will fail. When this happens, your document will most likely contain incorrect data and it might be important to have correct data in the document.

Optimistic Concurrency Control (OCC) allows you to prevent lost updates and to keep your data correct. OCC can be implemented by using the etag of a document. The value of the etag is automatically generated and updated by the server every time a document is updated. By checking if the etag has changed between a read and an update, you can ensure that you are updating the latest version of the document.

Cosmos DB Client

I have already written a post about a generic cosmos db client that includes the methods that we need. We need to get a document with an etag and to update a document by checking the etag. I also include code for the ModelItem type and the ModelPage type.

public async Task<ModelItem<T>> GetByIdWithEtag<T>(string id, string partion_key)
{
    // Create variables to return
    ModelItem<T> model = new ModelItem<T>();

    try
    {
        // Get the response
        ResourceResponse<Document> response = await this.client.ReadDocumentAsync(UriFactory.CreateDocumentUri(this.options.Database, this.options.Collection, id), 
            new RequestOptions { PartitionKey = new PartitionKey(partion_key) });

        // Get the document
        Document document = response.Resource;

        // Get the etag
        model.etag = document.ETag;

        // Get the post
        model.item = (T)(dynamic)document;
    }
    catch (DocumentClientException de)
    {
        // Log the exception
        this.logger.LogDebug(de, $"GetByIdWithEtag", null);
        model.error = true;
    }

    // Return the model
    return model;

} // End of the GetByIdWithEtag method

public async Task<bool> Update<T>(string id, T item, string etag)
{
    try
    {
        // Create an access condition
        AccessCondition ac = new AccessCondition { Condition = etag, Type = AccessConditionType.IfMatch };

        // Update the document
        await this.client.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(this.options.Database, this.options.Collection, id), item, new RequestOptions { AccessCondition = ac });
    }
    catch (DocumentClientException de)
    {
        // Check for exceptions
        if (de.StatusCode == HttpStatusCode.PreconditionFailed)
        {
            return false;
        }
        else
        {
            // Log the exception
            this.logger.LogError(de, $"Update, id: {id}, etag: {etag}", null);
        }
    }

    // Return a success response
    return true;

} // End of the Update method
public class ModelItem<T>
{
    #region Variables

    public T item { get; set; }
    public string etag { get; set; }
    public bool error { get; set; }

    #endregion

    #region Constructors

    public ModelItem()
    {
        // Set values for instance variables
        this.item = default(T);
        this.etag = "";
        this.error = false;

    } // End of the constructor

    public ModelItem(T item, string etag, bool error)
    {
        // Set values for instance variables
        this.item = item;
        this.etag = etag;
        this.error = false;

    } // End of the constructor

    #endregion

} // End of the class

public class ModelPage<T>
{
    #region Variables

    public IList<T> items { get; set; }
    public string ct { get; set; }
    public bool error { get; set; }

    #endregion

    #region Constructors

    public ModelPage()
    {
        // Set values for instance variables
        this.items = new List<T>();
        this.ct = "";
        this.error = false;

    } // End of the constructor

    #endregion

} // End of the class

Update with etag

The method below is used to get a list with locations and to save the continuation token in a document so that we can continue to get more locations at a later point in time. We need to implement optimistic concurrency control when the continuation token is saved. We first get a continuation token and then we loop until we successfully have updated the JobLocationsPage document.

public async Task<ModelPage<LocationDocument>> GetLocations(Int32 page_size)
{
    // Create the locations page to return
    ModelPage<LocationDocument> page = new ModelPage<LocationDocument>();

    // Get the job locations page 
    ModelItem<JobLocationsPage> job_tuple = await this.cosmos_database_repository.GetByIdWithEtag<JobLocationsPage>("xx481cd9-7961-4c6e-960e-7cb6e5cde5e8", "xx481cd9-7961-4c6e-960e-7cb6e5cde5e8");

    // Loop until a successful update
    while (true)
    {
        // Get locations
        page = await this.location_repository.GetChunk("page_name", "ASC", page_size, job_tuple.item.continuation_token);

        // Update the job locations page
        job_tuple.item.continuation_token = page.ct;
        bool success = await this.cosmos_database_repository.Update<JobLocationsPage>(job_tuple.item.id, job_tuple.item, job_tuple.etag);

        // Check if the update failed
        if (success == false)
        {
            // Get the tuple again
            job_tuple = await this.cosmos_database_repository.GetByIdWithEtag<JobLocationsPage>("xx481cd9-7961-4c6e-960e-7cb6e5cde5e8", "xx481cd9-7961-4c6e-960e-7cb6e5cde5e8");

            // Continue the loop
            continue;
        }

        // Break out from the loop
        break;

    } // End of the while(true) loop

    // Return the page with locations
    return page;

} // End of the GetLocations method

Leave a Reply

Your email address will not be published. Required fields are marked *