Skip to content

Commit

Permalink
MAT-7960: update valuesets to batch endpoint and set maxidle time
Browse files Browse the repository at this point in the history
  • Loading branch information
chubert-sb committed Jan 13, 2025
1 parent d25d042 commit ef0b8e0
Show file tree
Hide file tree
Showing 6 changed files with 402 additions and 253 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,93 +50,111 @@ public List<ManifestExpansion> getManifests(UmlsUser umlsUser) {
return manifestOptions;
}

// spin off requests based on values provided in the expansion
public List<QdmValueSet> recursivelyRequestAllValueSetsExpansionsForQDM(
public List<QdmValueSet> requestAllValueSetsExpansionsForQDM(
List<QdmValueSet> allValueSets,
ValueSetsSearchCriteria searchCriteria,
String apiKey,
ValueSetsSearchCriteria.ValueSetParams vsParam,
ValueSetsSearchCriteria valueSetsSearchCriteria,
List<CodeSystemEntry> codeSystemEntries) {
IParser parser = fhirContext.newJsonParser();
String resource =
fhirTerminologyServiceWebClient.getValueSetResource(
apiKey,
vsParam,
valueSetsSearchCriteria.getProfile(),
valueSetsSearchCriteria.getIncludeDraft(),
valueSetsSearchCriteria.getActiveOnly(),
valueSetsSearchCriteria.getManifestExpansion());

ValueSet valueSetResource = parser.parseResource(ValueSet.class, resource);
var total = valueSetResource.getExpansion().getTotal(); // total valuesets

List<QdmValueSet.Concept> concepts =
getValueSetConcepts(valueSetResource, codeSystemEntries, "QDM");
log.info(
"vs total [{}] count: [{}] offset: [{}], oid: [{}]",
total,
vsParam.getCount(),
vsParam.getOffset(),
vsParam.getOid());

// Check if the ValueSet with the same oid already exists in allValueSets
QdmValueSet existingValueSet =
allValueSets.stream()
.filter(vs -> vs.getOid().equals(vsParam.getOid()))
.findFirst()
.orElse(null);
if (existingValueSet != null) {
List<QdmValueSet.Concept> updatedConcepts = new ArrayList<>(existingValueSet.getConcepts());
updatedConcepts.addAll(concepts);
// Create a new QdmValueSet with the updated concepts
QdmValueSet updatedValueSet =
QdmValueSet.builder()
.oid(existingValueSet.getOid())
.displayName(existingValueSet.getDisplayName())
.version(existingValueSet.getVersion())
.concepts(updatedConcepts)
.build();
// Replace the existing QdmValueSet in the list
allValueSets.set(allValueSets.indexOf(existingValueSet), updatedValueSet);
} else {
allValueSets.add(
QdmValueSet.builder()
.oid(valueSetResource.getIdPart())
.displayName(valueSetResource.getName())
.version(valueSetResource.getVersion())
.concepts(concepts)
.build());
}
// if the total results in the searchSet are still greater than our current offset + the count
// of our last request, then we request again
if (vsParam.getOffset() + vsParam.getCount() <= total) {
vsParam.setOffset(vsParam.getOffset() + 1000);
return recursivelyRequestAllValueSetsExpansionsForQDM(
allValueSets, apiKey, vsParam, valueSetsSearchCriteria, codeSystemEntries);
}
String resource = fhirTerminologyServiceWebClient.getValueSetResources(apiKey, searchCriteria);

Bundle bundleResource = parser.parseResource(Bundle.class, resource);

bundleResource
.getEntry()
.forEach(
bundleEntryComponent -> {
ValueSet valueSetResource = (ValueSet) bundleEntryComponent.getResource();

var total = valueSetResource.getExpansion().getTotal(); // total valuesets

List<QdmValueSet.Concept> concepts =
getValueSetConcepts(valueSetResource, codeSystemEntries, "QDM");
log.info(
"vs total [{}] count: [{}] offset: [{}], oid: [{}]",
total,
valueSetResource.getExpansion().getContains().size(),
valueSetResource.getExpansion().getOffset(),
valueSetResource.getId());

// Check if the ValueSet with the same oid already exists in allValueSets
QdmValueSet existingValueSet =
allValueSets.stream()
.filter(vs -> vs.getOid().equals(valueSetResource.getId()))
.findFirst()
.orElse(null);
if (existingValueSet != null) {
List<QdmValueSet.Concept> updatedConcepts =
new ArrayList<>(existingValueSet.getConcepts());
updatedConcepts.addAll(concepts);
// Create a new QdmValueSet with the updated concepts
QdmValueSet updatedValueSet =
QdmValueSet.builder()
.oid(existingValueSet.getOid())
.displayName(existingValueSet.getDisplayName())
.version(existingValueSet.getVersion())
.concepts(updatedConcepts)
.build();
// Replace the existing QdmValueSet in the list
allValueSets.set(allValueSets.indexOf(existingValueSet), updatedValueSet);
} else {
allValueSets.add(
QdmValueSet.builder()
.oid(valueSetResource.getIdPart())
.displayName(valueSetResource.getName())
.version(valueSetResource.getVersion())
.concepts(concepts)
.build());
}

// if the total results in the searchSet are still greater than our current offset +
// the count
// of our last request, then we request again
if (valueSetResource.getExpansion().getOffset()
+ valueSetResource.getExpansion().getContains().size()
< total) {
ValueSetsSearchCriteria newSearch =
ValueSetsSearchCriteria.builder()
.includeDraft(searchCriteria.getIncludeDraft())
.manifestExpansion(searchCriteria.getManifestExpansion())
.activeOnly(searchCriteria.getActiveOnly())
.build();

ValueSetsSearchCriteria.ValueSetParams newParams =
ValueSetsSearchCriteria.ValueSetParams.builder()
.count(1000)
.offset(valueSetResource.getExpansion().getOffset() + 1000)
.oid(
valueSetResource
.getIdentifier()
.get(0)
.getValue()
.replace("urn:oid:", ""))
.build();
newSearch.setValueSetParams(List.of(newParams));

requestAllValueSetsExpansionsForQDM(
allValueSets, newSearch, apiKey, codeSystemEntries);
}
});

return allValueSets;
}

public List<QdmValueSet> getValueSetsExpansionsForQdm(
ValueSetsSearchCriteria valueSetsSearchCriteria, UmlsUser umlsUser) {
List<CodeSystemEntry> codeSystemEntries = mappingService.getCodeSystemEntries();
return valueSetsSearchCriteria.getValueSetParams().stream()
.map(
vsParam -> {
vsParam.setCount(1000);
vsParam.setOffset(0);
return vsParam;
})
.flatMap(
vsParam ->
recursivelyRequestAllValueSetsExpansionsForQDM(
new ArrayList<>(),
umlsUser.getApiKey(),
vsParam,
valueSetsSearchCriteria,
codeSystemEntries)
.stream())
.collect(Collectors.toList());
valueSetsSearchCriteria.setValueSetParams(
valueSetsSearchCriteria.getValueSetParams().stream()
.map(
vsParam -> {
vsParam.setCount(1000);
vsParam.setOffset(0);
return vsParam;
})
.collect(Collectors.toList()));
return requestAllValueSetsExpansionsForQDM(
new ArrayList<>(), valueSetsSearchCriteria, umlsUser.getApiKey(), codeSystemEntries);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,36 @@
package gov.cms.madie.terminology.webclient;

import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.parser.IParser;
import gov.cms.madie.terminology.exceptions.VsacValueSetExpansionException;
import gov.cms.madie.terminology.exceptions.VsacResourceNotFoundException;
import gov.cms.madie.terminology.models.CodeSystem;
import gov.cms.madie.terminology.util.TerminologyServiceUtil;
import gov.cms.madie.models.measure.ManifestExpansion;
import gov.cms.madie.terminology.dto.ValueSetsSearchCriteria;
import io.netty.channel.ChannelOption;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.r4.model.Bundle;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.DefaultUriBuilderFactory;
import org.springframework.web.util.UriComponentsBuilder;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;

import java.net.URI;
import java.net.URLEncoder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

Expand All @@ -35,23 +44,31 @@ public class FhirTerminologyServiceWebClient {
private final String codeLookupsUrl;
private final String defaultProfile;
private final String searchValueSetEndpoint;
private final FhirContext fhirContext;

public FhirTerminologyServiceWebClient(
@Value("${client.fhir-terminology-service.base-url}") String fhirTerminologyServiceBaseUrl,
@Value("${client.fhir-terminology-service.manifests-urn}") String manifestUrn,
@Value("${client.fhir-terminology-service.code-system-urn}") String codeSystemUrn,
@Value("${client.fhir-terminology-service.code-lookups}") String codeLookupsUrl,
@Value("${client.default_profile}") String defaultProfile,
@Value("${client.search_value_set_endpoint}") String searchValueSetEndpoint) {
@Value("${client.search_value_set_endpoint}") String searchValueSetEndpoint,
FhirContext fhirContext) {
this.fhirContext = fhirContext;
DefaultUriBuilderFactory uriBuilderFactory =
new DefaultUriBuilderFactory(fhirTerminologyServiceBaseUrl);
uriBuilderFactory.setEncodingMode(DefaultUriBuilderFactory.EncodingMode.NONE);
ConnectionProvider provider =
ConnectionProvider.builder("custom").maxIdleTime(Duration.ofSeconds(300)).build();
HttpClient client =
HttpClient.create(provider).followRedirect(true).option(ChannelOption.SO_KEEPALIVE, true);
fhirTerminologyWebClient =
WebClient.builder()
.uriBuilderFactory(uriBuilderFactory)
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.codecs(
clientCodecConfigurer -> clientCodecConfigurer.defaultCodecs().maxInMemorySize(-1))
.clientConnector(new ReactorClientHttpConnector(client))
.build();
this.manifestPath = manifestUrn;
this.codeSystemPath = codeSystemUrn;
Expand Down Expand Up @@ -118,6 +135,27 @@ public String getValueSetResource(
return fetchResourceFromVsac(uri.toString(), apiKey, "ValueSet");
}

public String getValueSetResources(String apiKey, ValueSetsSearchCriteria searchCriteria) {
String profile =
StringUtils.isNotBlank(searchCriteria.getProfile())
? defaultProfile
: searchCriteria.getProfile();
List<String> uriList =
searchCriteria.getValueSetParams().stream()
.map(
valueSetParam ->
TerminologyServiceUtil.buildValueSetResourceUri(
valueSetParam,
profile,
searchCriteria.getIncludeDraft(),
searchCriteria.getActiveOnly(),
searchCriteria.getManifestExpansion())
.toString())
.collect(Collectors.toList());

return fetchBatchResourcesFromVsac(uriList, apiKey, "ValueSet");
}

public String getCodeResource(String code, CodeSystem codeSystem, String apiKey) {
Map<String, String> params =
Map.of(
Expand Down Expand Up @@ -169,4 +207,68 @@ public String fetchResourceFromVsac(String uri, String apiKey, String resourceTy
})
.block();
}

public String fetchBatchResourcesFromVsac(List<String> uri, String apiKey, String resourceType) {
String result =
fhirTerminologyWebClient
.post()
.headers(headers -> headers.setBasicAuth("apikey", apiKey))
.bodyValue(buildBatchBundle(uri))
.header("Content-Type", "application/fhir+json")
.accept(new MediaType("application", "fhir+json", Charset.defaultCharset()))
.exchangeToMono(
clientResponse -> {
if (clientResponse.statusCode().equals(HttpStatus.OK)) {
return clientResponse.bodyToMono(String.class);
} else if (clientResponse.statusCode().equals(HttpStatus.NOT_FOUND)) {
log.debug("Received NOT_FOUND response while retrieving {}", resourceType);
return clientResponse
.createException()
.flatMap(
ex ->
Mono.error(
new VsacResourceNotFoundException(
"",
ex.getStatusCode(),
ex.getStatusText(),
ex.getResponseBodyAsString(),
"")));

} else {
log.debug("Received NON-OK response while retrieving {}", resourceType);
return clientResponse
.createException()
.flatMap(
ex ->
Mono.error(
new VsacValueSetExpansionException(
"",
ex.getStatusCode(),
ex.getStatusText(),
ex.getResponseBodyAsString(),
uri.contains("manifest") ? "Manifest" : "Latest",
"")));
}
})
.block();
return result;
}

private String buildBatchBundle(List<String> uri) {

Bundle bundle = new Bundle();
bundle.setType(Bundle.BundleType.BATCH);
uri.forEach(
value -> {
Bundle.BundleEntryComponent compo = new Bundle.BundleEntryComponent();
Bundle.BundleEntryRequestComponent request = new Bundle.BundleEntryRequestComponent();
request.setMethod(Bundle.HTTPVerb.GET);
request.setUrl(value);
compo.setRequest(request);
bundle.addEntry(compo);
});

IParser parser = fhirContext.newJsonParser();
return parser.encodeToString(bundle);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,7 @@ void getValueSetsExpansionsForQdm_When_ManifestExpansionIsProvided() {
.id("ecqm-update-2022-05-05")
.build())
.build();
when(fhirTerminologyServiceWebClient.getValueSetResource(
anyString(), any(), anyString(), anyString(), anyString(), any()))
when(fhirTerminologyServiceWebClient.getValueSetResources(anyString(), any()))
.thenReturn(mockValueSetResourceWithCodes);
when(fhirContext.newJsonParser()).thenReturn(FhirContext.forR4().newJsonParser());
when(mappingService.getCodeSystemEntries()).thenReturn(codeSystemEntries);
Expand Down Expand Up @@ -218,13 +217,8 @@ void getsValueSetsExpansionsForQdm_withNoCodes_When_ManifestExpansionIsProvided(
.build())
.build();
when(fhirContext.newJsonParser()).thenReturn(FhirContext.forR4().newJsonParser());
when(fhirTerminologyServiceWebClient.getValueSetResource(
anyString(),
any(ValueSetsSearchCriteria.ValueSetParams.class),
anyString(),
anyString(),
anyString(),
any(ManifestExpansion.class)))
when(fhirTerminologyServiceWebClient.getValueSetResources(
anyString(), any(ValueSetsSearchCriteria.class)))
.thenReturn(mockValueSetResourceWithNoCodes);
when(mappingService.getCodeSystemEntries()).thenReturn(codeSystemEntries);
List<QdmValueSet> result =
Expand Down
Loading

0 comments on commit ef0b8e0

Please sign in to comment.