Generic Cosmos DB SQL Client in ASP.NET Core

This post describes how you can create a generic Cosmos DB SQL client in ASP.NET Core. Azure Cosmos DB is a NoSQL database that is designed with horizontal partitioning and multi-master replication on a global scale. Azure Cosmos DB provides native support for NoSQL and OSS APIs including MongoDB, Cassandra, Gremlin and SQL.

Cosmos DB stores data in JSON documents and every document represent a model. A JSON document in Cosmos DB should have properties for id, partion key and type. Id and partion key is usually GUID:s, the type property is used when the document should be deserialized to a model.

Options

Our generic Cosmos DB client need options and we have created a class for these options. We use an appsettings.json file to store values for these options.

public class CosmosDatabaseOptions
{
    #region Variables

    public string Uri { get; set; }
    public string Key { get; set; }
    public string Database { get; set; }
    public string Collection { get; set; }
    public Int32 MaxDegreeOfParallelism { get; set; }
    public Int32 MaxBufferedItemCount { get; set; }
    public Int32 RequestTimeoutInSeconds { get; set; }
    public Int32 RetryCount { get; set; }
    public Int32 WaitTimeInSeconds { get; set; }

    #endregion

    #region Constructors

    /// <summary>
    /// Create a new post
    /// </summary>
    public CosmosDatabaseOptions()
    {
        // Set values for instance variables
        this.Uri = "";
        this.Key = "";
        this.Database = "";
        this.Collection = "";
        this.MaxDegreeOfParallelism = -1;
        this.MaxBufferedItemCount = 100;
        this.RequestTimeoutInSeconds = 60;
        this.RetryCount = 9;
        this.WaitTimeInSeconds = 30;

    } // End of the constructor

    #endregion

} // End of the class
{
  "Logging": {
    "IncludeScopes": false,
    "LogLevel": {
      "Default": "Information"
    }
  },
  "CosmosDatabaseOptions": {
    "Uri": "https://mysite.documents.azure.com:443/",
    "Key": "YOUR-KEY",
    "Database": "development",
    "Collection": "items",
    "MaxDegreeOfParallelism": -1,
    "MaxBufferedItemCount": 100
  }
}

Services

We need to register options and repositories in the ConfigureServices method in the StartUp class. We register our Cosmos DB client and a static page repository that will use our client.

public void ConfigureServices(IServiceCollection services)
{
    // Add the mvc framework
    services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);

    // Create options
    services.Configure<CosmosDatabaseOptions>(configuration.GetSection("CosmosDatabaseOptions"));

    // Add repositories
    services.AddSingleton<ICosmosDatabaseRepository, CosmosDatabaseRepository>();
    services.AddSingleton<IStaticPageRepository, StaticPageRepository>();

} // End of the ConfigureServices method

Interface

public interface ICosmosDatabaseRepository
{
    Task<bool> CreateDatabase();
    Task<bool> CreateCollection();
    Task<bool> Add<T>(T item);
    Task<bool> Upsert<T>(T item);
    Task<bool> Update<T>(string id, T item);
    Task<bool> Update<T>(string id, T item, string etag);
    Task<ModelItem<T>> GetById<T>(string id, string partion_key);
    Task<ModelItem<T>> GetByIdWithEtag<T>(string id, string partion_key);
    Task<ModelItem<T>> GetByQuery<T>(string sql, SqlParameterCollection parameters);
    Task<ModelItem<T>> GetByQueryWithEtag<T>(string sql, SqlParameterCollection parameters);
    Task<ModelPage<T>> GetListByQuery<T>(string sql, SqlParameterCollection parameters, Int32 page_size, string ct);
    Task<bool> DeleteOnId(string id, string partion_key);
    Task<string> RunStoredProcedure(string stored_procedure_id, string partion_key, dynamic[] parameters);
    void Dispose();

} // End of the interface

Client

Our generic client for Cosmos DB includes methods for Insert, Update, Upsert, Get and Delete. The class handles logging, this can be changed if you want to return errors instead. The ModelItem class is a wrapper class that includes item, etag and a boolean for errors.

public class CosmosDatabaseRepository : ICosmosDatabaseRepository
{
    #region Variables

    private readonly ILogger logger;
    private readonly CosmosDatabaseOptions options;
    private readonly DocumentClient client;

    #endregion

    #region Constructors

    public CosmosDatabaseRepository(ILogger<ICosmosDatabaseRepository> logger, IOptions<CosmosDatabaseOptions> options)
    {
        // Set values for instance variables
        this.logger = logger;
        this.options = options.Value;

        // Create a connection policy
        ConnectionPolicy connectionPolicy = new ConnectionPolicy();
        connectionPolicy.RequestTimeout = TimeSpan.FromSeconds(this.options.RequestTimeoutInSeconds);
        connectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = this.options.RetryCount;
        connectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = this.options.WaitTimeInSeconds;
        connectionPolicy.ConnectionMode = ConnectionMode.Direct;
        connectionPolicy.ConnectionProtocol = Protocol.Tcp;

        // Create a document client
        this.client = new DocumentClient(new Uri(this.options.Uri), this.options.Key, connectionPolicy);

        // Call OpenAsync to avoid startup latency on first request
        this.client.OpenAsync();

    } // End of the constructor

    #endregion

    #region Create methods

    public async Task<bool> CreateDatabase()
    {
        try
        {
            await client.ReadDatabaseAsync(UriFactory.CreateDatabaseUri(this.options.Database));
        }
        catch (DocumentClientException de)
        {
            if (de.StatusCode == HttpStatusCode.NotFound)
            {
                await client.CreateDatabaseAsync(new Database { Id = this.options.Database });
            }
            else
            {
                // Log the exception
                this.logger.LogError(de, $"Create database: {this.options.Database}", null);
                return false;
            }
        }

        // Return success
        return true;

    } // End of the CreateDatabase method

    public async Task<bool> CreateCollection()
    {
        try
        {
            await client.ReadDocumentCollectionAsync(UriFactory.CreateDocumentCollectionUri(this.options.Database, this.options.Collection));
        }
        catch (DocumentClientException de)
        {
            if (de.StatusCode == HttpStatusCode.NotFound)
            {
                await client.CreateDocumentCollectionAsync(
                    UriFactory.CreateDatabaseUri(this.options.Database),
                    new DocumentCollection { Id = this.options.Collection },
                    new RequestOptions { OfferThroughput = 1000, PartitionKey = new PartitionKey("/id") });
            }
            else
            {
                // Log the exception
                this.logger.LogError(de, $"Create collection: {this.options.Collection}", null);
                return false;
            }
        }

        // Return success
        return true;

    } // End of the CreateCollection method

    public async Task<bool> Add<T>(T item)
    {
        try
        {
            // Create the document
            await this.client.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri(this.options.Database, this.options.Collection), item);
        }
        catch (DocumentClientException de)
        {
            // Log the exception
            this.logger.LogError(de, $"Add", null);
            return false;
        }

        // Return success
        return true;

    } // End of the Add method

    #endregion

    #region Update methods

    public async Task<bool> Upsert<T>(T item)
    {
        try
        {
            // Upsert the document
            await this.client.UpsertDocumentAsync(UriFactory.CreateDocumentCollectionUri(this.options.Database, this.options.Collection), item);
        }
        catch (DocumentClientException de)
        {
            // Log the exception
            this.logger.LogError(de, $"Upsert", null);
            return false;
        }

        // Return success
        return true;

    } // End of the Upsert method

    public async Task<bool> Update<T>(string id, T item)
    {
        try
        {
            await this.client.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(this.options.Database, this.options.Collection, id), item);
        }
        catch (DocumentClientException de)
        {
            // Log the exception
            this.logger.LogError(de, $"Update, id: {id}", null);
            return false;
        }

        // Return success
        return true;

    } // End of the Update method

    public async Task<bool> Update<T>(string id, T item, string etag)
    {
        try
        {
            // Create an access condition
            AccessCondition ac = new AccessCondition { Condition = etag, Type = AccessConditionType.IfMatch };

            // Update the document
            await this.client.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(this.options.Database, this.options.Collection, id), item, new RequestOptions { AccessCondition = ac });
        }
        catch (DocumentClientException de)
        {
            // Check for exceptions
            if (de.StatusCode == HttpStatusCode.PreconditionFailed)
            {
                return false;
            }
            else
            {
                // Log the exception
                this.logger.LogError(de, $"Update, id: {id}, etag: {etag}", null);
            }
        }

        // Return a success response
        return true;

    } // End of the Update method

    #endregion

    #region Get methods

    public async Task<ModelItem<T>> GetById<T>(string id, string partion_key)
    {
        // Create variables to return
        ModelItem<T> model = new ModelItem<T>();

        try
        {
            // Get the response
            DocumentResponse<T> response = await this.client.ReadDocumentAsync<T>(UriFactory.CreateDocumentUri(this.options.Database, this.options.Collection, id), new RequestOptions { PartitionKey = new PartitionKey(partion_key) });

            // Get the post
            model.item = response.Document;
        }
        catch (DocumentClientException de)
        {
            // Log the exception
            this.logger.LogDebug(de, $"GetById", null);
            model.error = true;
        }

        // Return the model
        return model;

    } // End of the GetById method

    public async Task<ModelItem<T>> GetByIdWithEtag<T>(string id, string partion_key)
    {
        // Create variables to return
        ModelItem<T> model = new ModelItem<T>();

        try
        {
            // Get the response
            ResourceResponse<Document> response = await this.client.ReadDocumentAsync(UriFactory.CreateDocumentUri(this.options.Database, this.options.Collection, id), 
                new RequestOptions { PartitionKey = new PartitionKey(partion_key) });

            // Get the document
            Document document = response.Resource;

            // Get the etag
            model.etag = document.ETag;

            // Get the post
            model.item = (T)(dynamic)document;
        }
        catch (DocumentClientException de)
        {
            // Log the exception
            this.logger.LogDebug(de, $"GetByIdWithEtag", null);
            model.error = true;
        }

        // Return the model
        return model;

    } // End of the GetByIdWithEtag method

    public async Task<ModelItem<T>> GetByQuery<T>(string sql, SqlParameterCollection parameters)
    {
        // Create variables to return
        ModelItem<T> model = new ModelItem<T>();

        // Set query options
        FeedOptions queryOptions = new FeedOptions { EnableCrossPartitionQuery = true, MaxDegreeOfParallelism = this.options.MaxDegreeOfParallelism, MaxBufferedItemCount = this.options.MaxBufferedItemCount, MaxItemCount = 1 };

        try
        {
            // Create a query
            IDocumentQuery<T> query = this.client.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(this.options.Database, this.options.Collection),
                new SqlQuerySpec
                {
                    QueryText = sql,
                    Parameters = parameters

                }, queryOptions).AsDocumentQuery();

            // Get the response
            FeedResponse<T> response = await query.ExecuteNextAsync<T>();

            // Get the post
            foreach (T item in response)
            {
                model.item = item;
                break;
            }
        }
        catch (DocumentClientException de)
        {
            // Log the exception
            this.logger.LogError(de, $"GetByQuery", null);
            model.error = true;
        }

        // Return the model
        return model;

    } // End of the GetByQuery method

    public async Task<ModelItem<T>> GetByQueryWithEtag<T>(string sql, SqlParameterCollection parameters)
    {
        // Create variables to return
        ModelItem<T> model = new ModelItem<T>();

        // Set query options
        FeedOptions queryOptions = new FeedOptions { EnableCrossPartitionQuery = true, MaxDegreeOfParallelism = this.options.MaxDegreeOfParallelism, MaxBufferedItemCount = this.options.MaxBufferedItemCount, MaxItemCount = 1 };

        try
        {
            // Create a query
            IDocumentQuery<Document> query = this.client.CreateDocumentQuery<Document>(UriFactory.CreateDocumentCollectionUri(this.options.Database, this.options.Collection),
                new SqlQuerySpec
                {
                    QueryText = sql,
                    Parameters = parameters

                }, queryOptions).AsDocumentQuery();

            // Get the response
            FeedResponse<Document> response = await query.ExecuteNextAsync<Document>();

            // Get the post
            foreach (Document item in response)
            {
                model.item = (T)(dynamic)item;
                model.etag = item.ETag;
                break;
            }        
        }
        catch (DocumentClientException de)
        {
            // Log the exception
            this.logger.LogError(de, $"GetByQueryWithEtag", null);
            model.error = true;
        }

        // Return the model
        return model;

    } // End of the GetByQueryWithEtag method

    public async Task<ModelPage<T>> GetListByQuery<T>(string sql, SqlParameterCollection parameters, Int32 page_size, string ct)
    {
        // Create variables to return
        ModelPage<T> page = new ModelPage<T>();

        // Set query options
        FeedOptions queryOptions = new FeedOptions { EnableCrossPartitionQuery = true, MaxDegreeOfParallelism = this.options.MaxDegreeOfParallelism, MaxBufferedItemCount = this.options.MaxBufferedItemCount, MaxItemCount = page_size, RequestContinuation = ct };

        try
        {
            // Create the query
            IDocumentQuery<T> query = this.client.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(this.options.Database, this.options.Collection),
                new SqlQuerySpec
                {
                    QueryText = sql,
                    Parameters = parameters

                }, queryOptions).AsDocumentQuery();

            // Get the response
            FeedResponse<T> response = await query.ExecuteNextAsync<T>();

            // Get the continuation token
            page.ct = response.ResponseContinuation;

            // Get posts
            page.items = response.ToList<T>();
        }
        catch (DocumentClientException de)
        {
            // Log the exception
            this.logger.LogError(de, $"GetListByQuery", null);
            page.error = true;
        }

        // Return the page
        return page;

    } // End of the GetListByQuery method

    #endregion

    #region Delete methods

    public async Task<bool> DeleteOnId(string id, string partion_key)
    {
        // Create a boolean that indicates success
        bool success = false;

        try
        {
            // Delete a document
            await this.client.DeleteDocumentAsync(UriFactory.CreateDocumentUri(this.options.Database, this.options.Collection, id), new RequestOptions { PartitionKey = new PartitionKey(partion_key) });

            // The delete operation was successful
            success = true;
        }
        catch (DocumentClientException de)
        {
            if (de.StatusCode == HttpStatusCode.NotFound)
            {
                // The post was not found, do not raise an exception
                return true;
            }
            else
            {
                // Log the exception
                this.logger.LogError(de, $"DeleteOnId", null);
            }
        }

        // Return the success boolean
        return success;

    } // End of the DeleteOnId method

    #endregion

    #region Stored procedures

    public async Task<string> RunStoredProcedure(string stored_procedure_id, string partion_key, dynamic[] parameters)
    {
        // Create a string to return
        string result = "";

        try
        {
            // Run the stored procedure
            result = await this.client.ExecuteStoredProcedureAsync<string>(UriFactory.CreateStoredProcedureUri(this.options.Database, this.options.Collection, stored_procedure_id), new RequestOptions { PartitionKey = new PartitionKey(partion_key) }, parameters);      
        }
        catch (DocumentClientException de)
        {
            // Log the exception
            this.logger.LogError(de, $"RunStoredProcedure", null);
        }

        // Return the string
        return result;

    } // End of the RunStoredProcedure method

    #endregion

    #region Dispose methods

    public void Dispose()
    {
        this.client.Dispose();

    } // End of the Dispose method

    #endregion

} // End of the class

How to use the client?

We have a repository that handles static pages and this repository uses our Cosmos DB client.

public class StaticPageRepository : IStaticPageRepository
{
    #region Variables

    private readonly ICosmosDatabaseRepository cosmos_database_repository;

    #endregion

    #region Constructors

    public StaticPageRepository(ICosmosDatabaseRepository cosmos_database_repository)
    {
        this.cosmos_database_repository = cosmos_database_repository;

    } // End of the constructor

    #endregion

    #region Add methods

    public async Task<bool> Add(StaticPageDocument item)
    {
        // Create a document
        return await this.cosmos_database_repository.Add<StaticPageDocument>(item);

    } // End of the Add method

    #endregion

    #region Update methods

    public async Task<bool> Upsert(StaticPageDocument item)
    {
        // Upsert a document
        return await this.cosmos_database_repository.Upsert<StaticPageDocument>(item);

    } // End of the Upsert method

    public async Task<bool> Update(StaticPageDocument item)
    {
        // Replace a document
        return await this.cosmos_database_repository.Update<StaticPageDocument>(item.id, item);

    } // End of the Update method

    #endregion

    #region Get methods

    public async Task<ModelItem<StaticPageDocument>> GetById(string id)
    {
        // Return the post
        return await this.cosmos_database_repository.GetById<StaticPageDocument>(id, id);

    } // End of the GetById method

    public async Task<ModelItem<StaticPagePost>> GetByPageName(string page_name, string language_code)
    {
        // Create the sql string
        string key = $"s.translations.{language_code}";
        string sql = $"SELECT s.id, s.meta_robots, s.show_as_page, s.page_name, s.main_image, {key}.link_name, {key}.title, {key}.text_html, {key}.meta_description, {key}.meta_keywords "
            + $"FROM s WHERE s.model_type = @model_type AND s.page_name = @page_name";

        // Create parameters
        SqlParameterCollection parameters = new SqlParameterCollection()
        {
            new SqlParameter("@model_type", "static_page"),
            new SqlParameter("@page_name", page_name)
        };

        // Return the post
        return await this.cosmos_database_repository.GetByQuery<StaticPagePost>(sql, parameters);

    } // End of the GetByPageName method

    public async Task<ModelPage<StaticPagePost>> GetByConnectionId(Int32 connection_id, string language_code, string sort_field, string sort_order, Int32 page_size, string ct)
    {
        // Make sure that sort variables are valid
        sort_field = GetValidSortField(sort_field);
        sort_order = GetValidSortOrder(sort_order);

        // Create the sql string
        string key = $"s.translations.{language_code}"; 
        string sql = $"SELECT s.id, s.meta_robots, s.show_as_page, s.page_name, s.main_image, {key}.link_name, {key}.title, {key}.text_html, {key}.meta_description, {key}.meta_keywords " 
            + $"FROM s WHERE s.model_type = @model_type AND s.connection_id = @connection_id ORDER BY s.{sort_field} {sort_order}";

        // Create parameters
        SqlParameterCollection parameters = new SqlParameterCollection()
        {
            new SqlParameter("@model_type", "static_page"),
            new SqlParameter("@connection_id", connection_id)
        };

        // Return posts
        return await this.cosmos_database_repository.GetListByQuery<StaticPagePost>(sql, parameters, page_size, ct);

    } // End of the GetByConnectionId method

    public async Task<ModelPage<StaticPageMeta>> GetMetaByConnectionId(Int32 connection_id, string language_code, string sort_field, string sort_order, Int32 page_size, string ct)
    {
        // Make sure that sort variables are valid
        sort_field = GetValidSortField(sort_field);
        sort_order = GetValidSortOrder(sort_order);

        // Create the sql string
        string key = $"s.translations.{language_code}";
        string sql = $"SELECT s.id, s.meta_robots, s.show_as_page, s.page_name, s.main_image, {key}.link_name, {key}.title, {key}.meta_description, {key}.meta_keywords "
            + $"FROM s WHERE s.model_type = @model_type AND s.connection_id = @connection_id ORDER BY s.{sort_field} {sort_order}";

        // Create parameters
        SqlParameterCollection parameters = new SqlParameterCollection()
        {
            new SqlParameter("@model_type", "static_page"),
            new SqlParameter("@connection_id", connection_id)
        };

        // Return posts
        return await this.cosmos_database_repository.GetListByQuery<StaticPageMeta>(sql, parameters, page_size, ct);

    } // End of the GetMetaByConnectionId method

    public async Task<ModelPage<StaticPageMeta>> GetBySearch(string keywords, string language_code, string sort_field, string sort_order, Int32 page_size, string ct)
    {
        // Make sure that sort variables are valid
        sort_field = GetValidSortField(sort_field);
        sort_order = GetValidSortOrder(sort_order);

        // Check if there is keywords
        bool keywords_exists = string.IsNullOrEmpty(keywords) == false ? true : false;

        // Create the sql string
        string key = $"s.translations.{language_code}";
        string sql = $"SELECT s.id, s.meta_robots, s.show_as_page, s.page_name, s.main_image, {key}.link_name, {key}.title, {key}.meta_description, {key}.meta_keywords FROM s ";
        if(keywords_exists == true)
        {
            sql += "JOIN keywords IN s.keywords ";
        }
        sql += $"WHERE s.model_type = @model_type ";
        if (keywords_exists == true)
        {
            sql += $"AND keywords = @keywords ";
        }
        sql += $"ORDER BY s.{sort_field} {sort_order}";

        // Create parameters
        SqlParameterCollection parameters = new SqlParameterCollection();
        parameters.Add(new SqlParameter("@model_type", "static_page"));
        if (keywords_exists == true)
        {
            parameters.Add(new SqlParameter("@keywords", keywords.ToLower()));
        }

        // Return the list
        return await this.cosmos_database_repository.GetListByQuery<StaticPageMeta>(sql, parameters, page_size, ct);

    } // End of the GetBySearch method

    public async Task<ModelPage<StaticPageMeta>> GetAll(string language_code, string sort_field, string sort_order, Int32 page_size, string ct)
    {
        // Make sure that sort variables are valid
        sort_field = GetValidSortField(sort_field);
        sort_order = GetValidSortOrder(sort_order);

        // Create the sql string
        string key = $"s.translations.{language_code}";
        string sql = $"SELECT s.id, s.meta_robots, s.show_as_page, s.page_name, s.main_image, {key}.link_name, {key}.title, {key}.meta_description, {key}.meta_keywords "
            + $"FROM s WHERE s.model_type = @model_type ORDER BY s.{sort_field} {sort_order}";

        // Create parameters
        SqlParameterCollection parameters = new SqlParameterCollection()
        {
            new SqlParameter("@model_type", "static_page"),
            new SqlParameter("@connection_id", 0),
            new SqlParameter("@show_as_page", 1)
        };

        // Return posts
        return await this.cosmos_database_repository.GetListByQuery<StaticPageMeta>(sql, parameters, page_size, ct);

    } // End of the GetAll method

    #endregion

    #region Delete methods

    public async Task<bool> DeleteOnId(string id)
    {
        // Delete a document
        return await this.cosmos_database_repository.DeleteOnId(id, id);

    } // End of the DeleteOnId method

    #endregion

    #region Validation

    public string GetValidSortField(string sort_field)
    {
        // Make sure that the sort field is valid
        if (sort_field != "date_updated" && sort_field != "page_name")
        {
            sort_field = "page_name";
        }

        // Return the string
        return sort_field;

    } // End of the GetValidSortField method

    public string GetValidSortOrder(string sort_order)
    {
        // Make sure that the sort order is valid
        if (sort_order != "ASC" && sort_order != "DESC")
        {
            sort_order = "ASC";
        }

        // Return the string
        return sort_order;

    } // End of the GetValidSortOrder method

    #endregion

} // End of the class

Leave a Reply

Your email address will not be published. Required fields are marked *