Skip to content

Commit

Permalink
Properly dispose the scope. (#559)
Browse files Browse the repository at this point in the history
The IDocumentClient scope is not properly disposed.
  • Loading branch information
jackliums authored Jun 28, 2019
1 parent 81ba1e7 commit cdf4be1
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using EnsureThat;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.Documents.Linq;
using Microsoft.Health.Fhir.Core.Features.Search;
using Microsoft.Health.Fhir.Core.Models;
using Microsoft.Health.Fhir.CosmosDb.Features.Search.Queries;
Expand Down Expand Up @@ -71,34 +69,20 @@ private async Task<SearchResult> ExecuteSearchAsync(

if (searchOptions.CountOnly)
{
IDocumentQuery<int> documentCountQuery = _fhirDataStore.CreateDocumentQuery<int>(sqlQuerySpec, feedOptions);

using (documentCountQuery)
{
return new SearchResult(
(await documentCountQuery.ExecuteNextAsync<int>(cancellationToken)).Single(),
searchOptions.UnsupportedSearchParams);
}
return new SearchResult(
(await _fhirDataStore.ExecuteDocumentQueryAsync<int>(sqlQuerySpec, feedOptions, cancellationToken)).Single(),
searchOptions.UnsupportedSearchParams);
}

IDocumentQuery<Document> documentQuery = _fhirDataStore.CreateDocumentQuery<Document>(
sqlQuerySpec,
feedOptions);

using (documentQuery)
{
Debug.Assert(documentQuery != null, $"The {nameof(documentQuery)} should not be null.");
FeedResponse<Document> fetchedResults = await _fhirDataStore.ExecuteDocumentQueryAsync<Document>(sqlQuerySpec, feedOptions, cancellationToken);

FeedResponse<Document> fetchedResults = await documentQuery.ExecuteNextAsync<Document>(cancellationToken);
FhirCosmosResourceWrapper[] wrappers = fetchedResults
.Select(r => r.GetPropertyValue<FhirCosmosResourceWrapper>(SearchValueConstants.RootAliasName)).ToArray();

FhirCosmosResourceWrapper[] wrappers = fetchedResults
.Select(r => r.GetPropertyValue<FhirCosmosResourceWrapper>(SearchValueConstants.RootAliasName)).ToArray();

return new SearchResult(
wrappers,
searchOptions.UnsupportedSearchParams,
fetchedResults.ResponseContinuation);
}
return new SearchResult(
wrappers,
searchOptions.UnsupportedSearchParams,
fetchedResults.ResponseContinuation);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ public CosmosFhirDataStore(
_hardDelete = new HardDelete();
}

private IDocumentClient DocumentClient => _documentClientFactory().Value;

private string DatabaseId { get; }

private string CollectionId { get; }
Expand All @@ -110,18 +108,21 @@ public async Task<UpsertOutcome> UpsertAsync(
{
_logger.LogDebug($"Upserting {resource.ResourceTypeName}/{resource.ResourceId}, ETag: \"{weakETag?.VersionId}\", AllowCreate: {allowCreate}, KeepHistory: {keepHistory}");

UpsertWithHistoryModel response = await _retryExceptionPolicyFactory.CreateRetryPolicy().ExecuteAsync(
async ct => await _upsertWithHistoryProc.Execute(
DocumentClient,
CollectionUri,
cosmosWrapper,
weakETag?.VersionId,
allowCreate,
keepHistory,
ct),
cancellationToken);

return new UpsertOutcome(response.Wrapper, response.OutcomeType);
using (IScoped<IDocumentClient> documentClient = _documentClientFactory())
{
UpsertWithHistoryModel response = await _retryExceptionPolicyFactory.CreateRetryPolicy().ExecuteAsync(
async ct => await _upsertWithHistoryProc.Execute(
documentClient.Value,
CollectionUri,
cosmosWrapper,
weakETag?.VersionId,
allowCreate,
keepHistory,
ct),
cancellationToken);

return new UpsertOutcome(response.Wrapper, response.OutcomeType);
}
}
catch (DocumentClientException dce)
{
Expand Down Expand Up @@ -172,21 +173,26 @@ public async Task<ResourceWrapper> GetAsync(ResourceKey key, CancellationToken c

var sqlQuerySpec = new SqlQuerySpec("select * from root r where r.resourceId = @resourceId and r.version = @version", sqlParameterCollection);

var executor = CreateDocumentQuery<FhirCosmosResourceWrapper>(
sqlQuerySpec,
new FeedOptions { PartitionKey = new PartitionKey(key.ToPartitionKey()) });

var result = await executor.ExecuteNextAsync<FhirCosmosResourceWrapper>(cancellationToken);
using (IScoped<IDocumentClient> documentClient = _documentClientFactory())
{
FeedResponse<FhirCosmosResourceWrapper> result = await ExecuteDocumentQueryAsync<FhirCosmosResourceWrapper>(
sqlQuerySpec,
new FeedOptions { PartitionKey = new PartitionKey(key.ToPartitionKey()) },
cancellationToken);

return result.FirstOrDefault();
return result.FirstOrDefault();
}
}

try
{
return await DocumentClient.ReadDocumentAsync<FhirCosmosResourceWrapper>(
UriFactory.CreateDocumentUri(DatabaseId, CollectionId, key.Id),
new RequestOptions { PartitionKey = new PartitionKey(key.ToPartitionKey()) },
cancellationToken);
using (IScoped<IDocumentClient> documentClient = _documentClientFactory())
{
return await documentClient.Value.ReadDocumentAsync<FhirCosmosResourceWrapper>(
UriFactory.CreateDocumentUri(DatabaseId, CollectionId, key.Id),
new RequestOptions { PartitionKey = new PartitionKey(key.ToPartitionKey()) },
cancellationToken);
}
}
catch (DocumentClientException e) when (e.StatusCode == HttpStatusCode.NotFound)
{
Expand All @@ -202,15 +208,18 @@ public async Task HardDeleteAsync(ResourceKey key, CancellationToken cancellatio
{
_logger.LogDebug($"Obliterating {key.ResourceType}/{key.Id}");

StoredProcedureResponse<IList<string>> response = await _retryExceptionPolicyFactory.CreateRetryPolicy().ExecuteAsync(
async ct => await _hardDelete.Execute(
DocumentClient,
CollectionUri,
key,
ct),
cancellationToken);

_logger.LogDebug($"Hard-deleted {response.Response.Count} documents, which consumed {response.RequestCharge} RUs. The list of hard-deleted documents: {string.Join(", ", response.Response)}.");
using (IScoped<IDocumentClient> documentClient = _documentClientFactory())
{
StoredProcedureResponse<IList<string>> response = await _retryExceptionPolicyFactory.CreateRetryPolicy().ExecuteAsync(
async ct => await _hardDelete.Execute(
documentClient.Value,
CollectionUri,
key,
ct),
cancellationToken);

_logger.LogDebug($"Hard-deleted {response.Response.Count} documents, which consumed {response.RequestCharge} RUs. The list of hard-deleted documents: {string.Join(", ", response.Response)}.");
}
}
catch (DocumentClientException dce)
{
Expand All @@ -225,20 +234,21 @@ public async Task HardDeleteAsync(ResourceKey key, CancellationToken cancellatio
}
}

internal IDocumentQuery<T> CreateDocumentQuery<T>(
SqlQuerySpec sqlQuerySpec,
FeedOptions feedOptions = null)
internal async Task<FeedResponse<T>> ExecuteDocumentQueryAsync<T>(SqlQuerySpec sqlQuerySpec, FeedOptions feedOptions, CancellationToken cancellationToken)
{
EnsureArg.IsNotNull(sqlQuerySpec, nameof(sqlQuerySpec));

CosmosQueryContext context = new CosmosQueryContext(CollectionUri, sqlQuerySpec, feedOptions);
var context = new CosmosQueryContext(CollectionUri, sqlQuerySpec, feedOptions);

return _cosmosDocumentQueryFactory.Create<T>(DocumentClient, context);
}
using (IScoped<IDocumentClient> documentClient = _documentClientFactory())
{
IDocumentQuery<T> documentQuery = _cosmosDocumentQueryFactory.Create<T>(documentClient.Value, context);

private static string GetValue(HttpStatusCode type)
{
return ((int)type).ToString();
using (documentQuery)
{
return await documentQuery.ExecuteNextAsync<T>(cancellationToken);
}
}
}

public void Build(IListedCapabilityStatement statement)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ public CosmosFhirOperationDataStore(
CollectionUri = cosmosDataStoreConfiguration.GetRelativeCollectionUri(collectionConfiguration.CollectionId);
}

private IDocumentClient DocumentClient => _documentClientFactory().Value;

private string DatabaseId { get; }

private string CollectionId { get; }
Expand All @@ -88,14 +86,17 @@ public async Task<ExportJobOutcome> CreateExportJobAsync(ExportJobRecord jobReco

try
{
ResourceResponse<Document> result = await DocumentClient.CreateDocumentAsync(
CollectionUri,
cosmosExportJob,
new RequestOptions() { PartitionKey = new PartitionKey(CosmosDbExportConstants.ExportJobPartitionKey) },
disableAutomaticIdGeneration: true,
cancellationToken: cancellationToken);
using (IScoped<IDocumentClient> documentClient = _documentClientFactory())
{
ResourceResponse<Document> result = await documentClient.Value.CreateDocumentAsync(
CollectionUri,
cosmosExportJob,
new RequestOptions() { PartitionKey = new PartitionKey(CosmosDbExportConstants.ExportJobPartitionKey) },
disableAutomaticIdGeneration: true,
cancellationToken: cancellationToken);

return new ExportJobOutcome(jobRecord, WeakETag.FromVersionId(result.Resource.ETag));
return new ExportJobOutcome(jobRecord, WeakETag.FromVersionId(result.Resource.ETag));
}
}
catch (DocumentClientException dce)
{
Expand All @@ -115,14 +116,17 @@ public async Task<ExportJobOutcome> GetExportJobByIdAsync(string id, Cancellatio

try
{
DocumentResponse<CosmosExportJobRecordWrapper> cosmosExportJobRecord = await DocumentClient.ReadDocumentAsync<CosmosExportJobRecordWrapper>(
UriFactory.CreateDocumentUri(DatabaseId, CollectionId, id),
new RequestOptions { PartitionKey = new PartitionKey(CosmosDbExportConstants.ExportJobPartitionKey) },
cancellationToken);
using (IScoped<IDocumentClient> documentClient = _documentClientFactory())
{
DocumentResponse<CosmosExportJobRecordWrapper> cosmosExportJobRecord = await documentClient.Value.ReadDocumentAsync<CosmosExportJobRecordWrapper>(
UriFactory.CreateDocumentUri(DatabaseId, CollectionId, id),
new RequestOptions { PartitionKey = new PartitionKey(CosmosDbExportConstants.ExportJobPartitionKey) },
cancellationToken);

var outcome = new ExportJobOutcome(cosmosExportJobRecord.Document.JobRecord, WeakETag.FromVersionId(cosmosExportJobRecord.Document.ETag));
var outcome = new ExportJobOutcome(cosmosExportJobRecord.Document.JobRecord, WeakETag.FromVersionId(cosmosExportJobRecord.Document.ETag));

return outcome;
return outcome;
}
}
catch (DocumentClientException dce)
{
Expand All @@ -146,7 +150,9 @@ public async Task<ExportJobOutcome> GetExportJobByHashAsync(string hash, Cancell

try
{
IDocumentQuery<CosmosExportJobRecordWrapper> query = DocumentClient.CreateDocumentQuery<CosmosExportJobRecordWrapper>(
using (IScoped<IDocumentClient> documentClient = _documentClientFactory())
{
IDocumentQuery<CosmosExportJobRecordWrapper> query = documentClient.Value.CreateDocumentQuery<CosmosExportJobRecordWrapper>(
CollectionUri,
new SqlQuerySpec(
GetJobByHashQuery,
Expand All @@ -157,17 +163,18 @@ public async Task<ExportJobOutcome> GetExportJobByHashAsync(string hash, Cancell
new FeedOptions { PartitionKey = new PartitionKey(CosmosDbExportConstants.ExportJobPartitionKey) })
.AsDocumentQuery();

FeedResponse<CosmosExportJobRecordWrapper> result = await query.ExecuteNextAsync<CosmosExportJobRecordWrapper>();
FeedResponse<CosmosExportJobRecordWrapper> result = await query.ExecuteNextAsync<CosmosExportJobRecordWrapper>();

if (result.Count == 1)
{
// We found an existing job that matches the hash.
CosmosExportJobRecordWrapper wrapper = result.First();
if (result.Count == 1)
{
// We found an existing job that matches the hash.
CosmosExportJobRecordWrapper wrapper = result.First();

return new ExportJobOutcome(wrapper.JobRecord, WeakETag.FromVersionId(wrapper.ETag));
}
return new ExportJobOutcome(wrapper.JobRecord, WeakETag.FromVersionId(wrapper.ETag));
}

return null;
return null;
}
}
catch (DocumentClientException dce)
{
Expand Down Expand Up @@ -204,14 +211,17 @@ public async Task<ExportJobOutcome> UpdateExportJobAsync(ExportJobRecord jobReco

try
{
ResourceResponse<Document> replaceResult = await DocumentClient.ReplaceDocumentAsync(
using (IScoped<IDocumentClient> documentClient = _documentClientFactory())
{
ResourceResponse<Document> replaceResult = await documentClient.Value.ReplaceDocumentAsync(
UriFactory.CreateDocumentUri(DatabaseId, CollectionId, jobRecord.Id),
cosmosExportJob,
requestOptions,
cancellationToken: cancellationToken);

var latestETag = replaceResult.Resource.ETag;
return new ExportJobOutcome(jobRecord, WeakETag.FromVersionId(latestETag));
var latestETag = replaceResult.Resource.ETag;
return new ExportJobOutcome(jobRecord, WeakETag.FromVersionId(latestETag));
}
}
catch (DocumentClientException dce)
{
Expand Down Expand Up @@ -240,16 +250,19 @@ public async Task<IReadOnlyCollection<ExportJobOutcome>> AcquireExportJobsAsync(
{
try
{
StoredProcedureResponse<IReadOnlyCollection<CosmosExportJobRecordWrapper>> response = await _retryExceptionPolicyFactory.CreateRetryPolicy().ExecuteAsync(
using (IScoped<IDocumentClient> documentClient = _documentClientFactory())
{
StoredProcedureResponse<IReadOnlyCollection<CosmosExportJobRecordWrapper>> response = await _retryExceptionPolicyFactory.CreateRetryPolicy().ExecuteAsync(
async ct => await _acquireExportJobs.ExecuteAsync(
DocumentClient,
documentClient.Value,
CollectionUri,
maximumNumberOfConcurrentJobsAllowed,
(ushort)jobHeartbeatTimeoutThreshold.TotalSeconds,
ct),
cancellationToken);

return response.Response.Select(wrapper => new ExportJobOutcome(wrapper.JobRecord, WeakETag.FromVersionId(wrapper.ETag))).ToList();
return response.Response.Select(wrapper => new ExportJobOutcome(wrapper.JobRecord, WeakETag.FromVersionId(wrapper.ETag))).ToList();
}
}
catch (DocumentClientException dce)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Features.Operations
{
[Collection(FhirOperationTestConstants.FhirOperationTests)]
[FhirStorageTestsFixtureArgumentSets(DataStore.CosmosDb)]
public class FhirOperationDataStoreTests : IClassFixture<FhirStorageTestsFixture>, IAsyncLifetime
public class CosmosFhirOperationDataStoreTests : IClassFixture<FhirStorageTestsFixture>, IAsyncLifetime
{
private IFhirOperationDataStore _operationDataStore;
private IFhirStorageTestHelper _testHelper;

private readonly CreateExportRequest _exportRequest = new CreateExportRequest(new Uri("http://localhost/ExportJob"), "destinationType", "destinationConnection");

public FhirOperationDataStoreTests(FhirStorageTestsFixture fixture)
public CosmosFhirOperationDataStoreTests(FhirStorageTestsFixture fixture)
{
_operationDataStore = fixture.OperationDataStore;
_testHelper = fixture.TestHelper;
Expand Down

0 comments on commit cdf4be1

Please sign in to comment.