diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java index d0b5faf7b6ed..3428d0109c73 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java @@ -146,11 +146,19 @@ private void deleteDataQualityDataIndex() { private void createDataAssetsDataStream() { DataInsightsSearchInterface searchInterface = getSearchInterface(); + ElasticSearchConfiguration config = searchRepository.getElasticSearchConfiguration(); + String language = + config != null && config.getSearchIndexMappingLanguage() != null + ? config.getSearchIndexMappingLanguage().value() + : "en"; + try { for (String dataAssetType : dataAssetTypes) { + IndexMapping dataAssetIndex = searchRepository.getIndexMapping(dataAssetType); String dataStreamName = getDataStreamName(dataAssetType); if (!searchInterface.dataAssetDataStreamExists(dataStreamName)) { - searchInterface.createDataAssetsDataStream(dataStreamName); + searchInterface.createDataAssetsDataStream( + dataStreamName, dataAssetType, dataAssetIndex, language); } } } catch (IOException ex) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/DataInsightsSearchInterface.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/DataInsightsSearchInterface.java index 0d06b3e03be2..774a5151d5fc 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/DataInsightsSearchInterface.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/DataInsightsSearchInterface.java @@ -3,6 +3,7 @@ import java.io.IOException; import java.io.InputStream; import org.openmetadata.service.exception.UnhandledServerException; +import org.openmetadata.service.search.models.IndexMapping; public interface DataInsightsSearchInterface { @@ -23,7 +24,9 @@ default String readResource(String resourceFile) { } } - void createDataAssetsDataStream(String name) throws IOException; + void createDataAssetsDataStream( + String name, String entityType, IndexMapping entityIndexMapping, String language) + throws IOException; void deleteDataAssetDataStream(String name) throws IOException; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/elasticsearch/ElasticSearchDataInsightsClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/elasticsearch/ElasticSearchDataInsightsClient.java index 150fc48c33cc..9b6955dfa7d7 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/elasticsearch/ElasticSearchDataInsightsClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/elasticsearch/ElasticSearchDataInsightsClient.java @@ -4,7 +4,11 @@ import es.org.elasticsearch.client.Response; import es.org.elasticsearch.client.RestClient; import java.io.IOException; +import java.util.List; +import java.util.Map; import org.openmetadata.service.apps.bundles.insights.search.DataInsightsSearchInterface; +import org.openmetadata.service.search.models.IndexMapping; +import org.openmetadata.service.util.JsonUtils; public class ElasticSearchDataInsightsClient implements DataInsightsSearchInterface { private final RestClient client; @@ -51,8 +55,53 @@ public Boolean dataAssetDataStreamExists(String name) throws IOException { return response.getStatusLine().getStatusCode() == 200; } + private String buildMapping( + String entityType, + IndexMapping entityIndexMapping, + String language, + String indexMappingTemplateStr) { + Map>>> indexMappingTemplate = + JsonUtils.readOrConvertValue(indexMappingTemplateStr, Map.class); + Map>> entityConfig = + JsonUtils.readOrConvertValue(readResource("/dataInsights/config.json"), Map.class); + Map>> entityIndexMap = + JsonUtils.readOrConvertValue( + readResource( + String.format(entityIndexMapping.getIndexMappingFile(), language.toLowerCase())), + Map.class); + + List entityAttributes = entityConfig.get("mappingFields").get("common"); + entityAttributes.addAll(entityConfig.get("mappingFields").get(entityType)); + + indexMappingTemplate + .get("template") + .get("settings") + .put("analysis", entityIndexMap.get("settings").get("analysis")); + + for (String attribute : entityAttributes) { + if (!indexMappingTemplate + .get("template") + .get("mappings") + .get("properties") + .containsKey(attribute)) { + Object value = entityIndexMap.get("mappings").get("properties").get(attribute); + if (value != null) { + indexMappingTemplate + .get("template") + .get("mappings") + .get("properties") + .put(attribute, value); + } + } + } + + return JsonUtils.pojoToJson(indexMappingTemplate); + } + @Override - public void createDataAssetsDataStream(String name) throws IOException { + public void createDataAssetsDataStream( + String name, String entityType, IndexMapping entityIndexMapping, String language) + throws IOException { String resourcePath = "/dataInsights/elasticsearch"; createLifecyclePolicy( "di-data-assets-lifecycle", @@ -62,7 +111,11 @@ public void createDataAssetsDataStream(String name) throws IOException { readResource(String.format("%s/indexSettingsTemplate.json", resourcePath))); createComponentTemplate( "di-data-assets-mapping", - readResource(String.format("%s/indexMappingsTemplate.json", resourcePath))); + buildMapping( + entityType, + entityIndexMapping, + language, + readResource(String.format("%s/indexMappingsTemplate.json", resourcePath)))); createIndexTemplate( "di-data-assets", readResource(String.format("%s/indexTemplate.json", resourcePath))); createDataStream(name); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/opensearch/OpenSearchDataInsightsClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/opensearch/OpenSearchDataInsightsClient.java index da94394a3a5f..5d4b73f2294f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/opensearch/OpenSearchDataInsightsClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/search/opensearch/OpenSearchDataInsightsClient.java @@ -1,7 +1,11 @@ package org.openmetadata.service.apps.bundles.insights.search.opensearch; import java.io.IOException; +import java.util.List; +import java.util.Map; import org.openmetadata.service.apps.bundles.insights.search.DataInsightsSearchInterface; +import org.openmetadata.service.search.models.IndexMapping; +import org.openmetadata.service.util.JsonUtils; import os.org.opensearch.client.Request; import os.org.opensearch.client.Response; import os.org.opensearch.client.ResponseException; @@ -62,15 +66,64 @@ public Boolean dataAssetDataStreamExists(String name) throws IOException { return response.getStatusLine().getStatusCode() == 200; } + private String buildMapping( + String entityType, + IndexMapping entityIndexMapping, + String language, + String indexMappingTemplateStr) { + Map>>> indexMappingTemplate = + JsonUtils.readOrConvertValue(indexMappingTemplateStr, Map.class); + Map>> entityConfig = + JsonUtils.readOrConvertValue(readResource("/dataInsights/config.json"), Map.class); + Map>> entityIndexMap = + JsonUtils.readOrConvertValue( + readResource( + String.format(entityIndexMapping.getIndexMappingFile(), language.toLowerCase())), + Map.class); + + List entityAttributes = entityConfig.get("mappingFields").get("common"); + entityAttributes.addAll(entityConfig.get("mappingFields").get(entityType)); + + indexMappingTemplate + .get("template") + .get("settings") + .put("analysis", entityIndexMap.get("settings").get("analysis")); + + for (String attribute : entityAttributes) { + if (!indexMappingTemplate + .get("template") + .get("mappings") + .get("properties") + .containsKey(attribute)) { + Object value = entityIndexMap.get("mappings").get("properties").get(attribute); + if (value != null) { + indexMappingTemplate + .get("template") + .get("mappings") + .get("properties") + .put(attribute, value); + } + } + } + + return JsonUtils.pojoToJson(indexMappingTemplate); + } + @Override - public void createDataAssetsDataStream(String name) throws IOException { + public void createDataAssetsDataStream( + String name, String entityType, IndexMapping entityIndexMapping, String language) + throws IOException { String resourcePath = "/dataInsights/opensearch"; createLifecyclePolicy( "di-data-assets-lifecycle", readResource(String.format("%s/indexLifecyclePolicy.json", resourcePath))); createComponentTemplate( "di-data-assets-mapping", - readResource(String.format("%s/indexMappingsTemplate.json", resourcePath))); + buildMapping( + entityType, + entityIndexMapping, + language, + readResource(String.format("%s/indexMappingsTemplate.json", resourcePath)))); createIndexTemplate( "di-data-assets", readResource(String.format("%s/indexTemplate.json", resourcePath))); createDataStream(name); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java index 2e30832ea5d8..b7e5da81ded8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java @@ -7,6 +7,7 @@ import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getInitialStatsForEntities; +import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -29,11 +30,13 @@ import org.openmetadata.service.apps.bundles.insights.workflows.dataAssets.processors.DataInsightsEntityEnricherProcessor; import org.openmetadata.service.apps.bundles.insights.workflows.dataAssets.processors.DataInsightsOpenSearchProcessor; import org.openmetadata.service.exception.SearchIndexException; +import org.openmetadata.service.exception.UnhandledServerException; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.ListFilter; import org.openmetadata.service.search.SearchRepository; import org.openmetadata.service.search.elasticsearch.ElasticSearchIndexSink; import org.openmetadata.service.search.opensearch.OpenSearchIndexSink; +import org.openmetadata.service.util.JsonUtils; import org.openmetadata.service.util.ResultList; import org.openmetadata.service.workflows.interfaces.Processor; import org.openmetadata.service.workflows.interfaces.Sink; @@ -43,6 +46,7 @@ @Slf4j public class DataAssetsWorkflow { public static final String DATA_STREAM_KEY = "DataStreamKey"; + public static final String ENTITY_TYPE_FIELDS_KEY = "EnityTypeFields"; private final int retentionDays = 30; private final Long startTimestamp; private final Long endTimestamp; @@ -51,6 +55,7 @@ public class DataAssetsWorkflow { private final CollectionDAO collectionDAO; private final List sources = new ArrayList<>(); private final Set entityTypes; + private final Map>> entityTypeFields; private DataInsightsEntityEnricherProcessor entityEnricher; private Processor entityProcessor; @@ -91,10 +96,20 @@ public DataAssetsWorkflow( TimestampUtils.getStartOfDayTimestamp(TimestampUtils.subtractDays(timestamp, 1)); } + Map>> entityTypeFields = null; + + try (InputStream in = getClass().getResourceAsStream("/dataInsights/config.json")) { + assert in != null; + entityTypeFields = JsonUtils.readOrConvertValue(new String(in.readAllBytes()), Map.class); + } catch (Exception e) { + throw new UnhandledServerException("Failed to load DataInsight Search Configurations."); + } + this.batchSize = batchSize; this.searchRepository = searchRepository; this.collectionDAO = collectionDAO; this.entityTypes = entityTypes; + this.entityTypeFields = entityTypeFields; } private void initialize() { @@ -146,6 +161,7 @@ public void process() throws SearchIndexException { deleteDataBeforeInserting(getDataStreamName(source.getEntityType())); contextData.put(DATA_STREAM_KEY, getDataStreamName(source.getEntityType())); contextData.put(ENTITY_TYPE_KEY, source.getEntityType()); + contextData.put(ENTITY_TYPE_FIELDS_KEY, getEntityTypeFields(source.getEntityType())); while (!source.isDone().get()) { try { @@ -163,6 +179,12 @@ public void process() throws SearchIndexException { } } + private List getEntityTypeFields(String entityType) { + List fields = entityTypeFields.get("mappingFields").get("common"); + fields.addAll(entityTypeFields.get("mappingFields").get(entityType)); + return fields; + } + private void processEntity( ResultList resultList, Map contextData, diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/processors/DataInsightsEntityEnricherProcessor.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/processors/DataInsightsEntityEnricherProcessor.java index a1714dc07eb3..c54ada72491c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/processors/DataInsightsEntityEnricherProcessor.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/processors/DataInsightsEntityEnricherProcessor.java @@ -3,6 +3,7 @@ import static org.openmetadata.schema.EntityInterface.ENTITY_TYPE_TO_CLASS_MAP; import static org.openmetadata.service.apps.bundles.insights.utils.TimestampUtils.END_TIMESTAMP_KEY; import static org.openmetadata.service.apps.bundles.insights.utils.TimestampUtils.START_TIMESTAMP_KEY; +import static org.openmetadata.service.apps.bundles.insights.workflows.dataAssets.DataAssetsWorkflow.ENTITY_TYPE_FIELDS_KEY; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.TIMESTAMP_KEY; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats; @@ -141,6 +142,8 @@ private Map enrichEntity( Long endTimestamp = (Long) entityVersionMap.get("endTimestamp"); Map entityMap = JsonUtils.getMap(entity); + entityMap.keySet().retainAll((List) contextData.get(ENTITY_TYPE_FIELDS_KEY)); + String entityType = (String) contextData.get(ENTITY_TYPE_KEY); List> interfaces = List.of(entity.getClass().getInterfaces()); diff --git a/openmetadata-service/src/main/resources/dataInsights/config.json b/openmetadata-service/src/main/resources/dataInsights/config.json new file mode 100644 index 000000000000..2172dbad9637 --- /dev/null +++ b/openmetadata-service/src/main/resources/dataInsights/config.json @@ -0,0 +1,121 @@ +{ + "mappingFields": { + "common": [ + "id", + "description", + "displayName", + "name", + "deleted", + "version", + "owners", + "tags", + "followers", + "extension", + "votes", + "fullyQualifiedName", + "domain", + "dataProducts", + "certification" + ], + "table": [ + "tableType", + "columns", + "databaseSchema", + "database", + "service", + "serviceType" + ], + "storedProcedure": [ + "storedProcedureType", + "databaseSchema", + "database", + "service", + "serviceType" + ], + "databaseSchema": [ + "database", + "service", + "serviceType" + ], + "database": [ + "service", + "serviceType" + ], + "chart": [ + "service", + "serviceType", + "chartType" + ], + "dashboard": [ + "service", + "serviceType", + "dashboardType" + ], + "dashboardDataModel": [ + "service", + "serviceType", + "dataModelType", + "project", + "columns" + ], + "pipeline": [ + "service", + "serviceType", + "pipelineStatus" + ], + "topic": [ + "service", + "serviceType" + ], + "container": [ + "service", + "serviceType", + "numberOfObjects", + "size", + "fileFormats", + "parent", + "children", + "prefix" + ], + "searchIndex": [ + "service", + "serviceType", + "indexType", + "fields" + ], + "mlmodel": [ + "service", + "serviceType", + "mlStore", + "algorithm", + "mlFeatures", + "mlHyperParameters", + "target", + "dashboard", + "server" + ], + "dataProduct": [ + "experts", + "domain", + "assets" + ], + "glossaryTerm": [ + "synonyms", + "glossary", + "parent", + "children", + "relatedTerms", + "references", + "reviewers", + "status", + "usageCount", + "childrenCount" + ], + "tag": [ + "classification", + "parent", + "children", + "usageCount" + ] + } +} \ No newline at end of file diff --git a/openmetadata-service/src/main/resources/dataInsights/elasticsearch/indexMappingsTemplate.json b/openmetadata-service/src/main/resources/dataInsights/elasticsearch/indexMappingsTemplate.json index f8fdd93ff83c..19da7fc9120d 100644 --- a/openmetadata-service/src/main/resources/dataInsights/elasticsearch/indexMappingsTemplate.json +++ b/openmetadata-service/src/main/resources/dataInsights/elasticsearch/indexMappingsTemplate.json @@ -1,138 +1,16 @@ { "template": { - "settings": { - "analysis": { - "normalizer": { - "lowercase_normalizer": { - "type": "custom", - "char_filter": [], - "filter": [ - "lowercase" - ] - } - }, - "analyzer": { - "om_analyzer": { - "tokenizer": "letter", - "filter": [ - "lowercase", - "om_stemmer" - ] - }, - "om_ngram": { - "tokenizer": "ngram", - "min_gram": 3, - "max_gram": 10, - "filter": [ - "lowercase" - ] - } - }, - "filter": { - "om_stemmer": { - "type": "stemmer", - "name": "english" - } - } - } - }, + "settings": {}, "mappings": { "properties": { "@timestamp": { "type": "date" }, - "owners": { - "properties": { - "id": { - "type": "keyword", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 36 - } - } - }, - "type": { - "type": "keyword" - }, - "name": { - "type": "keyword", - "normalizer": "lowercase_normalizer", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - }, - "displayName": { - "type": "keyword", - "fields": { - "keyword": { - "type": "keyword", - "normalizer": "lowercase_normalizer", - "ignore_above": 256 - } - } - }, - "fullyQualifiedName": { - "type": "text" - }, - "description": { - "type": "text" - }, - "deleted": { - "type": "text" - }, - "href": { - "type": "text" - } - } - }, - "domain": { - "properties": { - "id": { - "type": "keyword", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 36 - } - } - }, - "type": { - "type": "keyword" - }, - "name": { - "type": "keyword", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - }, - "displayName": { - "type": "keyword", - "fields": { - "keyword": { - "type": "keyword", - "normalizer": "lowercase_normalizer", - "ignore_above": 256 - } - } - }, - "fullyQualifiedName": { + "id": { + "type": "text", + "fields": { + "keyword": { "type": "keyword" - }, - "description": { - "type": "text" - }, - "deleted": { - "type": "text" - }, - "href": { - "type": "text" } } } diff --git a/openmetadata-service/src/main/resources/dataInsights/opensearch/indexMappingsTemplate.json b/openmetadata-service/src/main/resources/dataInsights/opensearch/indexMappingsTemplate.json index f8fdd93ff83c..19da7fc9120d 100644 --- a/openmetadata-service/src/main/resources/dataInsights/opensearch/indexMappingsTemplate.json +++ b/openmetadata-service/src/main/resources/dataInsights/opensearch/indexMappingsTemplate.json @@ -1,138 +1,16 @@ { "template": { - "settings": { - "analysis": { - "normalizer": { - "lowercase_normalizer": { - "type": "custom", - "char_filter": [], - "filter": [ - "lowercase" - ] - } - }, - "analyzer": { - "om_analyzer": { - "tokenizer": "letter", - "filter": [ - "lowercase", - "om_stemmer" - ] - }, - "om_ngram": { - "tokenizer": "ngram", - "min_gram": 3, - "max_gram": 10, - "filter": [ - "lowercase" - ] - } - }, - "filter": { - "om_stemmer": { - "type": "stemmer", - "name": "english" - } - } - } - }, + "settings": {}, "mappings": { "properties": { "@timestamp": { "type": "date" }, - "owners": { - "properties": { - "id": { - "type": "keyword", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 36 - } - } - }, - "type": { - "type": "keyword" - }, - "name": { - "type": "keyword", - "normalizer": "lowercase_normalizer", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - }, - "displayName": { - "type": "keyword", - "fields": { - "keyword": { - "type": "keyword", - "normalizer": "lowercase_normalizer", - "ignore_above": 256 - } - } - }, - "fullyQualifiedName": { - "type": "text" - }, - "description": { - "type": "text" - }, - "deleted": { - "type": "text" - }, - "href": { - "type": "text" - } - } - }, - "domain": { - "properties": { - "id": { - "type": "keyword", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 36 - } - } - }, - "type": { - "type": "keyword" - }, - "name": { - "type": "keyword", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - }, - "displayName": { - "type": "keyword", - "fields": { - "keyword": { - "type": "keyword", - "normalizer": "lowercase_normalizer", - "ignore_above": 256 - } - } - }, - "fullyQualifiedName": { + "id": { + "type": "text", + "fields": { + "keyword": { "type": "keyword" - }, - "description": { - "type": "text" - }, - "deleted": { - "type": "text" - }, - "href": { - "type": "text" } } } diff --git a/openmetadata-service/src/main/resources/elasticsearch/en/pipeline_index_mapping.json b/openmetadata-service/src/main/resources/elasticsearch/en/pipeline_index_mapping.json index c79e23422519..3c0cd989c72c 100644 --- a/openmetadata-service/src/main/resources/elasticsearch/en/pipeline_index_mapping.json +++ b/openmetadata-service/src/main/resources/elasticsearch/en/pipeline_index_mapping.json @@ -204,6 +204,9 @@ } } }, + "startDate": { + "type": "text" + }, "tasks": { "properties": { "name": { @@ -234,6 +237,12 @@ }, "taskType": { "type": "text" + }, + "startDate": { + "type": "text" + }, + "endDate": { + "type": "text" } } }, diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/kpi/KpiResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/kpi/KpiResourceTest.java index 32af712a0293..838e6e19b372 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/kpi/KpiResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/kpi/KpiResourceTest.java @@ -82,7 +82,8 @@ private void createDataAssetsDataStream() { String dataStreamName = String.format("%s-%s", "di-data-assets", dataAssetType).toLowerCase(); if (!searchInterface.dataAssetDataStreamExists(dataStreamName)) { - searchInterface.createDataAssetsDataStream(dataStreamName); + searchInterface.createDataAssetsDataStream( + dataStreamName, dataAssetType, getSearchRepository().getIndexMapping(dataAssetType)); } } } catch (IOException ex) {