diff --git a/.github/actions/docker-custom-build-and-push/action.yml b/.github/actions/docker-custom-build-and-push/action.yml index ccaff510c120aa..cc2c2bd86416d7 100644 --- a/.github/actions/docker-custom-build-and-push/action.yml +++ b/.github/actions/docker-custom-build-and-push/action.yml @@ -97,10 +97,11 @@ runs: cache-to: | type=inline - name: Upload image locally for testing (if not publishing) - uses: ishworkh/docker-image-artifact-upload@v1 + uses: ishworkh/container-image-artifact-upload@v2.0.0 if: ${{ inputs.publish != 'true' }} with: image: ${{ steps.single_tag.outputs.SINGLE_TAG }} + retention_days: "2" # Code for building multi-platform images and pushing to Docker Hub. - name: Set up QEMU diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 1b10fe6e74372b..98071b536a336a 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -110,7 +110,7 @@ jobs: run: | ./gradlew :datahub-frontend:build :datahub-web-react:build --parallel env: - NODE_OPTIONS: "--max-old-space-size=3072" + NODE_OPTIONS: "--max-old-space-size=4096" - name: Gradle compile (jdk8) for legacy Spark if: ${{ matrix.command == 'except_metadata_ingestion' && needs.setup.outputs.backend_change == 'true' }} run: | diff --git a/.github/workflows/docker-postgres-setup.yml b/.github/workflows/docker-postgres-setup.yml index 956f3f7b1c3903..c028bfb55d48d5 100644 --- a/.github/workflows/docker-postgres-setup.yml +++ b/.github/workflows/docker-postgres-setup.yml @@ -52,7 +52,7 @@ jobs: with: images: | acryldata/datahub-postgres-setup - tags: ${{ needs.setup.outputs.tag }} + image_tag: ${{ needs.setup.outputs.tag }} username: ${{ secrets.ACRYL_DOCKER_USERNAME }} password: ${{ secrets.ACRYL_DOCKER_PASSWORD }} publish: ${{ needs.setup.outputs.publish == 'true' }} diff --git a/.github/workflows/docker-unified.yml b/.github/workflows/docker-unified.yml index 49dd26e1cd27e3..03a9b3afc3bc58 100644 --- a/.github/workflows/docker-unified.yml +++ b/.github/workflows/docker-unified.yml @@ -186,7 +186,7 @@ jobs: - name: Checkout # adding checkout step just to make trivy upload happy uses: acryldata/sane-checkout-action@v3 - name: Download image - uses: ishworkh/docker-image-artifact-download@v1 + uses: ishworkh/container-image-artifact-download@v2.0.0 if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' }} with: image: ${{ env.DATAHUB_GMS_IMAGE }}:${{ needs.setup.outputs.unique_tag }} @@ -257,7 +257,7 @@ jobs: - name: Checkout # adding checkout step just to make trivy upload happy uses: acryldata/sane-checkout-action@v3 - name: Download image - uses: ishworkh/docker-image-artifact-download@v1 + uses: ishworkh/container-image-artifact-download@v2.0.0 if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' }} with: image: ${{ env.DATAHUB_MAE_CONSUMER_IMAGE }}:${{ needs.setup.outputs.unique_tag }} @@ -328,7 +328,7 @@ jobs: - name: Checkout # adding checkout step just to make trivy upload happy uses: acryldata/sane-checkout-action@v3 - name: Download image - uses: ishworkh/docker-image-artifact-download@v1 + uses: ishworkh/container-image-artifact-download@v2.0.0 if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' }} with: image: ${{ env.DATAHUB_MCE_CONSUMER_IMAGE }}:${{ needs.setup.outputs.unique_tag }} @@ -399,7 +399,7 @@ jobs: - name: Checkout # adding checkout step just to make trivy upload happy uses: acryldata/sane-checkout-action@v3 - name: Download image - uses: ishworkh/docker-image-artifact-download@v1 + uses: ishworkh/container-image-artifact-download@v2.0.0 if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' }} with: image: ${{ env.DATAHUB_UPGRADE_IMAGE }}:${{ needs.setup.outputs.unique_tag }} @@ -446,7 +446,7 @@ jobs: ./gradlew :datahub-frontend:dist -x test -x yarnTest -x yarnLint --parallel mv ./datahub-frontend/build/distributions/datahub-frontend-*.zip datahub-frontend.zip env: - NODE_OPTIONS: "--max-old-space-size=3072" + NODE_OPTIONS: "--max-old-space-size=4096" - name: Build and push uses: ./.github/actions/docker-custom-build-and-push with: @@ -472,7 +472,7 @@ jobs: - name: Checkout # adding checkout step just to make trivy upload happy uses: actions/checkout@v4 - name: Download image - uses: ishworkh/docker-image-artifact-download@v1 + uses: ishworkh/container-image-artifact-download@v2.0.0 if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' }} with: image: ${{ env.DATAHUB_FRONTEND_IMAGE }}:${{ needs.setup.outputs.unique_tag }} @@ -533,7 +533,7 @@ jobs: - name: Checkout # adding checkout step just to make trivy upload happy uses: acryldata/sane-checkout-action@v3 - name: Download image - uses: ishworkh/docker-image-artifact-download@v1 + uses: ishworkh/container-image-artifact-download@v2.0.0 if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' }} with: image: ${{ env.DATAHUB_KAFKA_SETUP_IMAGE }}:${{ needs.setup.outputs.unique_tag }} @@ -594,7 +594,7 @@ jobs: - name: Checkout # adding checkout step just to make trivy upload happy uses: acryldata/sane-checkout-action@v3 - name: Download image - uses: ishworkh/docker-image-artifact-download@v1 + uses: ishworkh/container-image-artifact-download@v2.0.0 if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' }} with: image: ${{ env.DATAHUB_MYSQL_SETUP_IMAGE }}:${{ needs.setup.outputs.unique_tag }} @@ -655,7 +655,7 @@ jobs: - name: Checkout # adding checkout step just to make trivy upload happy uses: acryldata/sane-checkout-action@v3 - name: Download image - uses: ishworkh/docker-image-artifact-download@v1 + uses: ishworkh/container-image-artifact-download@v2.0.0 if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' }} with: image: ${{ env.DATAHUB_ELASTIC_SETUP_IMAGE }}:${{ needs.setup.outputs.unique_tag }} @@ -727,7 +727,7 @@ jobs: - name: Check out the repo uses: acryldata/sane-checkout-action@v3 - name: Download Base Image - uses: ishworkh/docker-image-artifact-download@v1 + uses: ishworkh/container-image-artifact-download@v2.0.0 if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' && needs.setup.outputs.ingestion_base_change == 'true' }} with: image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ needs.setup.outputs.ingestion_base_change == 'true' && needs.setup.outputs.unique_tag || 'head' }} @@ -775,7 +775,7 @@ jobs: - name: Check out the repo uses: acryldata/sane-checkout-action@v3 - name: Download Base Image - uses: ishworkh/docker-image-artifact-download@v1 + uses: ishworkh/container-image-artifact-download@v2.0.0 if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' && needs.setup.outputs.ingestion_base_change == 'true' }} with: image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ needs.setup.outputs.ingestion_base_change == 'true' && needs.setup.outputs.unique_tag || 'head' }} @@ -836,7 +836,7 @@ jobs: if: ${{ needs.setup.outputs.ingestion_change == 'true' || needs.setup.outputs.publish == 'true' || needs.setup.outputs.pr-publish =='true' }} run: ./gradlew :metadata-ingestion:codegen - name: Download Base Image - uses: ishworkh/docker-image-artifact-download@v1 + uses: ishworkh/container-image-artifact-download@v2.0.0 if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' && needs.setup.outputs.ingestion_base_change == 'true' }} with: image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ needs.setup.outputs.ingestion_base_change == 'true' && needs.setup.outputs.unique_slim_tag || 'head-slim' }} @@ -883,7 +883,7 @@ jobs: - name: Checkout # adding checkout step just to make trivy upload happy uses: acryldata/sane-checkout-action@v3 - name: Download image Slim Image - uses: ishworkh/docker-image-artifact-download@v1 + uses: ishworkh/container-image-artifact-download@v2.0.0 if: ${{ needs.datahub_ingestion_slim_build.outputs.needs_artifact_download == 'true' }} with: image: ${{ env.DATAHUB_INGESTION_IMAGE }}:${{ needs.datahub_ingestion_slim_build.outputs.tag }} @@ -937,7 +937,7 @@ jobs: if: ${{ needs.setup.outputs.ingestion_change == 'true' || needs.setup.outputs.publish == 'true' || needs.setup.outputs.pr-publish == 'true' }} run: ./gradlew :metadata-ingestion:codegen - name: Download Base Image - uses: ishworkh/docker-image-artifact-download@v1 + uses: ishworkh/container-image-artifact-download@v2.0.0 if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' && needs.setup.outputs.ingestion_base_change == 'true' }} with: image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ needs.setup.outputs.ingestion_base_change == 'true' && needs.setup.outputs.unique_tag || 'head' }} @@ -982,7 +982,7 @@ jobs: - name: Checkout # adding checkout step just to make trivy upload happy uses: acryldata/sane-checkout-action@v3 - name: Download image Full Image - uses: ishworkh/docker-image-artifact-download@v1 + uses: ishworkh/container-image-artifact-download@v2.0.0 if: ${{ needs.datahub_ingestion_full_build.outputs.needs_artifact_download == 'true' }} with: image: ${{ env.DATAHUB_INGESTION_IMAGE }}:${{ needs.datahub_ingestion_full_build.outputs.tag }} @@ -1079,47 +1079,47 @@ jobs: - name: Disk Check run: df -h . && docker images - name: Download GMS image - uses: ishworkh/docker-image-artifact-download@v1 + uses: ishworkh/container-image-artifact-download@v2.0.0 if: ${{ ( needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' ) && needs.gms_build.result == 'success' }} with: image: ${{ env.DATAHUB_GMS_IMAGE }}:${{ needs.setup.outputs.unique_tag }} - name: Download Frontend image - uses: ishworkh/docker-image-artifact-download@v1 + uses: ishworkh/container-image-artifact-download@v2.0.0 if: ${{ ( needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' ) && needs.frontend_build.result == 'success' }} with: image: ${{ env.DATAHUB_FRONTEND_IMAGE }}:${{ needs.setup.outputs.unique_tag }} - name: Download Kafka Setup image - uses: ishworkh/docker-image-artifact-download@v1 + uses: ishworkh/container-image-artifact-download@v2.0.0 if: ${{ ( needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' ) && needs.kafka_setup_build.result == 'success' }} with: image: ${{ env.DATAHUB_KAFKA_SETUP_IMAGE }}:${{ needs.setup.outputs.unique_tag }} - name: Download Mysql Setup image - uses: ishworkh/docker-image-artifact-download@v1 + uses: ishworkh/container-image-artifact-download@v2.0.0 if: ${{ ( needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' ) && needs.mysql_setup_build.result == 'success' }} with: image: ${{ env.DATAHUB_MYSQL_SETUP_IMAGE }}:${{ needs.setup.outputs.unique_tag }} - name: Download Elastic Setup image - uses: ishworkh/docker-image-artifact-download@v1 + uses: ishworkh/container-image-artifact-download@v2.0.0 if: ${{ ( needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' ) && needs.elasticsearch_setup_build.result == 'success' }} with: image: ${{ env.DATAHUB_ELASTIC_SETUP_IMAGE }}:${{ needs.setup.outputs.unique_tag }} - name: Download MCE Consumer image - uses: ishworkh/docker-image-artifact-download@v1 + uses: ishworkh/container-image-artifact-download@v2.0.0 if: ${{ ( needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' ) && needs.mce_consumer_build.result == 'success' }} with: image: ${{ env.DATAHUB_MCE_CONSUMER_IMAGE }}:${{ needs.setup.outputs.unique_tag }} - name: Download MAE Consumer image - uses: ishworkh/docker-image-artifact-download@v1 + uses: ishworkh/container-image-artifact-download@v2.0.0 if: ${{ ( needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' ) && needs.mae_consumer_build.result == 'success' }} with: image: ${{ env.DATAHUB_MAE_CONSUMER_IMAGE }}:${{ needs.setup.outputs.unique_tag }} - name: Download upgrade image - uses: ishworkh/docker-image-artifact-download@v1 + uses: ishworkh/container-image-artifact-download@v2.0.0 if: ${{ ( needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' ) && needs.datahub_upgrade_build.result == 'success' }} with: image: ${{ env.DATAHUB_UPGRADE_IMAGE }}:${{ needs.setup.outputs.unique_tag }} - name: Download datahub-ingestion-slim image - uses: ishworkh/docker-image-artifact-download@v1 + uses: ishworkh/container-image-artifact-download@v2.0.0 if: ${{ needs.datahub_ingestion_slim_build.outputs.needs_artifact_download == 'true' && needs.datahub_ingestion_slim_build.result == 'success' }} with: image: ${{ env.DATAHUB_INGESTION_IMAGE }}:${{ needs.datahub_ingestion_slim_build.outputs.tag }} diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/analytics/resolver/GetChartsResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/analytics/resolver/GetChartsResolver.java index 197ac87c1e22d8..d9b8008d46286a 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/analytics/resolver/GetChartsResolver.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/analytics/resolver/GetChartsResolver.java @@ -125,7 +125,7 @@ private AnalyticsChart getTopUsersChart(OperationContext opContext) { final DateRange trailingMonthDateRange = dateUtil.getTrailingMonthDateRange(); final List columns = ImmutableList.of("Name", "Title", "Email"); - final String topUsersTitle = "Top Users"; + final String topUsersTitle = "Top Users (Last 30 Days)"; final List topUserRows = _analyticsService.getTopNTableChart( _analyticsService.getUsageIndexName(), @@ -198,7 +198,7 @@ private Row buildNewUsersRow(@Nonnull final SearchEntity entity) { private AnalyticsChart getNewUsersChart(OperationContext opContext) { try { final List columns = ImmutableList.of("Name", "Title", "Email"); - final String newUsersTitle = "New Users"; + final String newUsersTitle = "Active Users (Last 30 Days)"; final SearchResult result = searchForNewUsers(opContext); final List newUserRows = new ArrayList<>(); for (SearchEntity entity : result.getEntities()) { diff --git a/datahub-graphql-core/src/main/resources/entity.graphql b/datahub-graphql-core/src/main/resources/entity.graphql index 926cd256a5c5a4..e086273068ee53 100644 --- a/datahub-graphql-core/src/main/resources/entity.graphql +++ b/datahub-graphql-core/src/main/resources/entity.graphql @@ -3838,6 +3838,11 @@ enum CorpUserStatus { A User that has been provisioned and logged in """ ACTIVE + + """ + A user that has been suspended + """ + SUSPENDED } union ResolvedActor = CorpUser | CorpGroup diff --git a/datahub-web-react/build.gradle b/datahub-web-react/build.gradle index b9fffce173c5c4..bf1aa401e3f560 100644 --- a/datahub-web-react/build.gradle +++ b/datahub-web-react/build.gradle @@ -16,7 +16,7 @@ node { } // Version of node to use. - version = '21.2.0' + version = '22.12.0' // Version of Yarn to use. yarnVersion = '1.22.22' @@ -93,7 +93,6 @@ task yarnLintFix(type: YarnTask, dependsOn: [yarnInstall, yarnGenerate]) { } task yarnBuild(type: YarnTask, dependsOn: [yarnInstall, yarnGenerate]) { - environment = [NODE_OPTIONS: "--max-old-space-size=3072 --openssl-legacy-provider"] args = ['run', 'build'] outputs.cacheIf { true } diff --git a/datahub-web-react/package.json b/datahub-web-react/package.json index 31c10804482f0c..2d1d667a89f14a 100644 --- a/datahub-web-react/package.json +++ b/datahub-web-react/package.json @@ -90,7 +90,7 @@ "analyze": "source-map-explorer 'dist/assets/*.js'", "start": "yarn run generate && vite", "ec2-dev": "yarn run generate && CI=true;export CI;vite", - "build": "yarn run generate && NODE_OPTIONS='--max-old-space-size=3072 --openssl-legacy-provider' CI=false vite build", + "build": "yarn run generate && NODE_OPTIONS='--max-old-space-size=4096 --openssl-legacy-provider' CI=false vite build", "test": "vitest", "generate": "graphql-codegen --config codegen.yml", "lint": "eslint . --ext .ts,.tsx --quiet && yarn format-check && yarn type-check", diff --git a/docker/build.gradle b/docker/build.gradle index 25e3dc12036ef9..7b36c0d9acdcf0 100644 --- a/docker/build.gradle +++ b/docker/build.gradle @@ -18,24 +18,131 @@ ext { ':datahub-upgrade', ':metadata-service:war', ] - quickstart_modules = backend_profile_modules + [ - ':metadata-jobs:mce-consumer-job', - ':metadata-jobs:mae-consumer-job', - ':datahub-frontend' + + python_services_modules = [] + + // Common configuration for all tasks + common_config = [ + captureContainersOutput: true, + captureContainersOutputToFiles: project.file('build/container-logs') ] - debug_modules = quickstart_modules - [':metadata-jobs:mce-consumer-job', - ':metadata-jobs:mae-consumer-job'] - compose_args = ['-f', compose_base] - debug_reloadable = [ - 'datahub-gms-debug', - 'system-update-debug', - 'frontend-debug' + // declarative task configuration + quickstart_configs = [ + 'quickstart': [ + profile: 'quickstart-consumers', + modules: python_services_modules + backend_profile_modules + [ + ':datahub-frontend', + ':metadata-jobs:mce-consumer-job', + ':metadata-jobs:mae-consumer-job', + ] + ], + 'quickstartDebug': [ + profile: 'debug', + modules: python_services_modules + backend_profile_modules + [':datahub-frontend'], + isDebug: true + ], + 'quickstartPg': [ + profile: 'quickstart-postgres', + modules: (backend_profile_modules - [':docker:mysql-setup']) + [ + ':docker:postgres-setup', + ':datahub-frontend' + ] + ], + 'quickstartPgDebug': [ + profile: 'debug-postgres', + modules: python_services_modules + (backend_profile_modules - [':docker:mysql-setup']) + [ + ':docker:postgres-setup', + ':datahub-frontend' + ], + isDebug: true + ], + 'quickstartSlim': [ + profile: 'quickstart-backend', + modules: backend_profile_modules + [':docker:datahub-ingestion'], + additionalEnv: [ + 'DATAHUB_ACTIONS_IMAGE': 'acryldata/datahub-ingestion', + 'ACTIONS_VERSION': "v${version}-slim", + 'ACTIONS_EXTRA_PACKAGES': 'acryl-datahub-actions[executor] acryl-datahub-actions', + 'ACTIONS_CONFIG': 'https://raw.githubusercontent.com/acryldata/datahub-actions/main/docker/config/executor.yaml', + 'DATAHUB_LOCAL_COMMON_ENV': "${rootProject.project(':metadata-integration:java:spark-lineage-legacy').projectDir}/spark-smoke-test/smoke-gms.env" + ] + ], + 'quickstartStorage': [ + profile: 'quickstart-storage', + preserveVolumes: true + ] ] - // Postgres - pg_quickstart_modules = quickstart_modules - [':docker:mysql-setup'] + [':docker:postgres-setup'] +} + +// Register all quickstart tasks +quickstart_configs.each { taskName, config -> + tasks.register(taskName) +} + +// Dynamically create all quickstart tasks and configurations +dockerCompose { + // Configure default settings that apply to all configurations + useComposeFiles = [compose_base] + projectName = project_name + projectNamePrefix = '' + buildBeforeUp = false + buildBeforePull = false + stopContainers = false + removeVolumes = false + + quickstart_configs.each { taskName, config -> + "${taskName}" { + isRequiredBy(tasks.named(taskName)) + if (config.profile) { + composeAdditionalArgs = ['--profile', config.profile] + } + + // Common environment variables + environment.put 'DATAHUB_VERSION', config.isDebug ? + System.getenv("DATAHUB_VERSION") ?: "v${version}" : + "v${version}" + environment.put 'DATAHUB_TELEMETRY_ENABLED', 'false' + environment.put "METADATA_TESTS_ENABLED", "true" + environment.put "DATAHUB_REPO", "${docker_registry}" + + // Additional environment variables if specified + if (config.additionalEnv) { + config.additionalEnv.each { key, value -> + environment.put key, value + } + } + + useComposeFiles = [compose_base] + projectName = project_name + projectNamePrefix = '' + buildBeforeUp = false + buildBeforePull = false + stopContainers = false + removeVolumes = false + + // Apply common configuration + common_config.each { key, value -> + delegate."${key}" = value + } + + // Apply additional task-specific configuration if specified + if (config.additionalConfig) { + config.additionalConfig.each { key, value -> + delegate."${key}" = value + } + } + } + } +} - revision = 1 // increment to trigger rebuild +// Configure dependencies for ComposeUp tasks +quickstart_configs.each { taskName, config -> + if (config.modules) { + tasks.getByName("${taskName}ComposeUp").dependsOn( + config.modules.collect { it + ":${config.isDebug ? 'dockerTagDebug' : 'dockerTag'}" } + ) + } } tasks.register('minDockerCompose2.20', Exec) { @@ -43,18 +150,11 @@ tasks.register('minDockerCompose2.20', Exec) { args '-c', 'echo -e "$(docker compose version --short)\n2.20"|sort --version-sort --check=quiet --reverse' } -tasks.register('quickstart') {} -tasks.register('quickstartSlim') {} -tasks.register('quickstartDebug') {} -tasks.register('quickstartPg') {} -tasks.register('quickstartStorage') {} - tasks.register('quickstartNuke') { doFirst { - dockerCompose.quickstart.removeVolumes = true - dockerCompose.quickstartPg.removeVolumes = true - dockerCompose.quickstartSlim.removeVolumes = true - dockerCompose.quickstartDebug.removeVolumes = true + quickstart_configs.each { taskName, config -> + dockerCompose."${taskName}".removeVolumes = !config.preserveVolumes + } } finalizedBy(tasks.withType(ComposeDownForced)) } @@ -63,117 +163,17 @@ tasks.register('quickstartDown') { finalizedBy(tasks.withType(ComposeDownForced)) } -dockerCompose { - quickstart { - isRequiredBy(tasks.named('quickstart')) - composeAdditionalArgs = ['--profile', 'quickstart-consumers'] - - environment.put 'DATAHUB_VERSION', "v${version}" - environment.put 'DATAHUB_TELEMETRY_ENABLED', 'false' // disabled when built locally - - useComposeFiles = [compose_base] - projectName = project_name - projectNamePrefix = '' - buildBeforeUp = false - buildBeforePull = false - stopContainers = false - removeVolumes = false - captureContainersOutput = true - captureContainersOutputToFiles = project.file('build/container-logs') - } - - quickstartPg { - isRequiredBy(tasks.named('quickstartPg')) - composeAdditionalArgs = ['--profile', 'quickstart-postgres'] - - environment.put 'DATAHUB_VERSION', "v${version}" - environment.put 'DATAHUB_TELEMETRY_ENABLED', 'false' // disabled when built locally - - useComposeFiles = [compose_base] - projectName = project_name - projectNamePrefix = '' - buildBeforeUp = false - buildBeforePull = false - stopContainers = false - removeVolumes = false - } - - /** - * The smallest disk footprint required for Spark integration tests - * - * No frontend, mae, mce, or other services - */ - quickstartSlim { - isRequiredBy(tasks.named('quickstartSlim')) - composeAdditionalArgs = ['--profile', 'quickstart-backend'] - - environment.put 'DATAHUB_VERSION', "v${version}" - environment.put "DATAHUB_ACTIONS_IMAGE", "acryldata/datahub-ingestion" - environment.put "ACTIONS_VERSION", "v${version}-slim" - environment.put "ACTIONS_EXTRA_PACKAGES", 'acryl-datahub-actions[executor] acryl-datahub-actions' - environment.put "ACTIONS_CONFIG", 'https://raw.githubusercontent.com/acryldata/datahub-actions/main/docker/config/executor.yaml' - environment.put 'DATAHUB_TELEMETRY_ENABLED', 'false' // disabled when built locally - // disabled for spark-lineage smoke-test - environment.put 'DATAHUB_LOCAL_COMMON_ENV', "${rootProject.project(':metadata-integration:java:spark-lineage-legacy').projectDir}/spark-smoke-test/smoke-gms.env" - - useComposeFiles = [compose_base] - projectName = project_name - projectNamePrefix = '' - buildBeforeUp = false - buildBeforePull = false - stopContainers = false - removeVolumes = false - captureContainersOutput = true - captureContainersOutputToFiles = project.file('build/container-logs') - } - - quickstartDebug { - isRequiredBy(tasks.named('quickstartDebug')) - composeAdditionalArgs = ['--profile', 'debug'] - - if (System.getenv().containsKey("DATAHUB_VERSION")) { - environment.put 'DATAHUB_VERSION', System.getenv("DATAHUB_VERSION") - } - environment.put 'DATAHUB_TELEMETRY_ENABLED', 'false' // disabled when built locally - - useComposeFiles = [compose_base] - projectName = project_name - projectNamePrefix = '' - buildBeforeUp = false - buildBeforePull = false - stopContainers = false - removeVolumes = false - } - - quickstartStorage { - isRequiredBy(tasks.named('quickstartStorage')) - composeAdditionalArgs = ['--profile', 'quickstart-storage'] - - useComposeFiles = [compose_base] - projectName = project_name - projectNamePrefix = '' - buildBeforeUp = false - buildBeforePull = false - stopContainers = false - removeVolumes = false - } -} -tasks.getByName('quickstartComposeUp').dependsOn( - quickstart_modules.collect { it + ':dockerTag' }) -tasks.getByName('quickstartPgComposeUp').dependsOn( - pg_quickstart_modules.collect { it + ':dockerTag' }) -tasks.getByName('quickstartSlimComposeUp').dependsOn( - ([':docker:datahub-ingestion'] + backend_profile_modules) - .collect { it + ':dockerTag' }) -tasks.getByName('quickstartDebugComposeUp').dependsOn( - debug_modules.collect { it + ':dockerTagDebug' } -) tasks.withType(ComposeUp).configureEach { shouldRunAfter('quickstartNuke') dependsOn tasks.named("minDockerCompose2.20") } task debugReload(type: Exec) { - def cmd = ['docker compose -p datahub --profile debug'] + compose_args + ['restart'] + debug_reloadable + def cmd = ['docker compose -p datahub --profile debug'] + ['-f', compose_base] + [ + 'restart', + 'datahub-gms-debug', + 'system-update-debug', + 'frontend-debug' + ] commandLine 'bash', '-c', cmd.join(" ") -} +} \ No newline at end of file diff --git a/docker/datahub-gms/Dockerfile b/docker/datahub-gms/Dockerfile index b15bf3c6f9f17b..47b10535f8deea 100644 --- a/docker/datahub-gms/Dockerfile +++ b/docker/datahub-gms/Dockerfile @@ -6,12 +6,12 @@ ARG ALPINE_REPO_URL=http://dl-cdn.alpinelinux.org/alpine ARG GITHUB_REPO_URL=https://github.com ARG MAVEN_CENTRAL_REPO_URL=https://repo1.maven.org/maven2 -FROM golang:1-alpine3.20 AS binary +FROM golang:1-alpine3.21 AS binary # Re-declaring arg from above to make it available in this stage (will inherit default value) ARG ALPINE_REPO_URL -ENV DOCKERIZE_VERSION=v0.6.1 +ENV DOCKERIZE_VERSION=v0.9.1 WORKDIR /go/src/github.com/jwilder # Optionally set corporate mirror for apk diff --git a/docker/datahub-mae-consumer/Dockerfile b/docker/datahub-mae-consumer/Dockerfile index 6edaa29ee1a8bb..74375072761d89 100644 --- a/docker/datahub-mae-consumer/Dockerfile +++ b/docker/datahub-mae-consumer/Dockerfile @@ -6,12 +6,12 @@ ARG ALPINE_REPO_URL=http://dl-cdn.alpinelinux.org/alpine ARG GITHUB_REPO_URL=https://github.com ARG MAVEN_CENTRAL_REPO_URL=https://repo1.maven.org/maven2 -FROM golang:1-alpine3.20 AS binary +FROM golang:1-alpine3.21 AS binary # Re-declaring arg from above to make it available in this stage (will inherit default value) ARG ALPINE_REPO_URL -ENV DOCKERIZE_VERSION=v0.6.1 +ENV DOCKERIZE_VERSION=v0.9.1 WORKDIR /go/src/github.com/jwilder # Optionally set corporate mirror for apk diff --git a/docker/datahub-mce-consumer/Dockerfile b/docker/datahub-mce-consumer/Dockerfile index 1eb56633c561e6..3adef53cd06068 100644 --- a/docker/datahub-mce-consumer/Dockerfile +++ b/docker/datahub-mce-consumer/Dockerfile @@ -6,12 +6,12 @@ ARG ALPINE_REPO_URL=http://dl-cdn.alpinelinux.org/alpine ARG GITHUB_REPO_URL=https://github.com ARG MAVEN_CENTRAL_REPO_URL=https://repo1.maven.org/maven2 -FROM golang:1-alpine3.20 AS binary +FROM golang:1-alpine3.21 AS binary # Re-declaring arg from above to make it available in this stage (will inherit default value) ARG ALPINE_REPO_URL -ENV DOCKERIZE_VERSION=v0.6.1 +ENV DOCKERIZE_VERSION=v0.9.1 WORKDIR /go/src/github.com/jwilder # Optionally set corporate mirror for apk diff --git a/docker/datahub-upgrade/Dockerfile b/docker/datahub-upgrade/Dockerfile index 3d59a903414b1a..a8ef4e8034fdd5 100644 --- a/docker/datahub-upgrade/Dockerfile +++ b/docker/datahub-upgrade/Dockerfile @@ -6,12 +6,12 @@ ARG ALPINE_REPO_URL=http://dl-cdn.alpinelinux.org/alpine ARG GITHUB_REPO_URL=https://github.com ARG MAVEN_CENTRAL_REPO_URL=https://repo1.maven.org/maven2 -FROM golang:1-alpine3.20 AS binary +FROM golang:1-alpine3.21 AS binary # Re-declaring arg from above to make it available in this stage (will inherit default value) ARG ALPINE_REPO_URL -ENV DOCKERIZE_VERSION=v0.6.1 +ENV DOCKERIZE_VERSION=v0.9.1 WORKDIR /go/src/github.com/jwilder # Optionally set corporate mirror for apk diff --git a/docker/elasticsearch-setup/Dockerfile b/docker/elasticsearch-setup/Dockerfile index 4e64dcbc1e452c..1a6fe5bee6c840 100644 --- a/docker/elasticsearch-setup/Dockerfile +++ b/docker/elasticsearch-setup/Dockerfile @@ -6,11 +6,11 @@ ARG APP_ENV=prod # Defining custom repo urls for use in enterprise environments. Re-used between stages below. ARG ALPINE_REPO_URL=http://dl-cdn.alpinelinux.org/alpine -FROM golang:1-alpine3.20 AS binary +FROM golang:1-alpine3.21 AS binary ARG ALPINE_REPO_URL -ENV DOCKERIZE_VERSION=v0.6.1 +ENV DOCKERIZE_VERSION=v0.9.1 WORKDIR /go/src/github.com/jwilder # Optionally set corporate mirror for apk diff --git a/docker/mysql-setup/Dockerfile b/docker/mysql-setup/Dockerfile index b0ca45ad8f6f24..8a2d42bc233180 100644 --- a/docker/mysql-setup/Dockerfile +++ b/docker/mysql-setup/Dockerfile @@ -1,11 +1,11 @@ # Defining custom repo urls for use in enterprise environments. Re-used between stages below. ARG ALPINE_REPO_URL=http://dl-cdn.alpinelinux.org/alpine -FROM golang:1-alpine3.20 AS binary +FROM golang:1-alpine3.21 AS binary ARG ALPINE_REPO_URL -ENV DOCKERIZE_VERSION=v0.6.1 +ENV DOCKERIZE_VERSION=v0.9.1 WORKDIR /go/src/github.com/jwilder # Optionally set corporate mirror for apk diff --git a/docker/postgres-setup/Dockerfile b/docker/postgres-setup/Dockerfile index e145456e807d4d..31e9687cea15e8 100644 --- a/docker/postgres-setup/Dockerfile +++ b/docker/postgres-setup/Dockerfile @@ -1,11 +1,11 @@ # Defining custom repo urls for use in enterprise environments. Re-used between stages below. ARG ALPINE_REPO_URL=http://dl-cdn.alpinelinux.org/alpine -FROM golang:1-alpine3.20 AS binary +FROM golang:1-alpine3.21 AS binary ARG ALPINE_REPO_URL -ENV DOCKERIZE_VERSION=v0.6.1 +ENV DOCKERIZE_VERSION=v0.9.1 WORKDIR /go/src/github.com/jwilder # Optionally set corporate mirror for apk diff --git a/docs-website/build.gradle b/docs-website/build.gradle index 797863d2019fbd..1be790695e87e6 100644 --- a/docs-website/build.gradle +++ b/docs-website/build.gradle @@ -14,7 +14,7 @@ node { } // Version of node to use. - version = '21.2.0' + version = '22.12.0' // Version of Yarn to use. yarnVersion = '1.22.22' diff --git a/docs/api/tutorials/structured-properties.md b/docs/api/tutorials/structured-properties.md index 95c89424e9ca7a..2caa015e206595 100644 --- a/docs/api/tutorials/structured-properties.md +++ b/docs/api/tutorials/structured-properties.md @@ -73,7 +73,7 @@ mutation createStructuredProperty { {numberValue: 365, description:"Use this for non-sensitive data that can be retained for longer"} ], cardinality: SINGLE, - entityTypes: ["urn:li:entityType:dataset", "urn:li:entityType:dataFlow"], + entityTypes: ["urn:li:entityType:datahub.dataset", "urn:li:entityType:datahub.dataFlow"], } ) { urn diff --git a/docs/managed-datahub/release-notes/v_0_3_7.md b/docs/managed-datahub/release-notes/v_0_3_7.md index 75f5ac21224c27..31302403ea9305 100644 --- a/docs/managed-datahub/release-notes/v_0_3_7.md +++ b/docs/managed-datahub/release-notes/v_0_3_7.md @@ -13,6 +13,12 @@ If you are using an older CLI/SDK version, then please upgrade it. This applies ## Known Issues +### v0.3.7.8 + * Notes Feature + * Adding a Note to an entity will result in that note showing up in the Settings > Home Page list of announcements as well as the profile page of the entity. + * If more than 30 Notes are added to entities, there's a risk that home page announcements will not show up on the home page properly. + * Notes are only supported for Dataset and Column entities in this release. + ### v0.3.7.7 * Postgres regression, non-functional when using postgres @@ -24,7 +30,9 @@ If you are using an older CLI/SDK version, then please upgrade it. This applies ### v0.3.7.8 +- Helm Chart Requirement: 1.4.157+ - [Postgres] Fix regression from MySQL fix in v0.3.7.7 +- [UI] Fix editing post on entity profile page becomes announcement ### v0.3.7.7 diff --git a/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/client/dagster_generator.py b/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/client/dagster_generator.py index 2fdd0a41edf6cb..a87f490f2d947e 100644 --- a/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/client/dagster_generator.py +++ b/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/client/dagster_generator.py @@ -522,7 +522,7 @@ def generate_datajob( # Also, add datahub inputs/outputs if present in input/output metatdata. for input_def_snap in op_def_snap.input_def_snaps: job_property_bag[f"input.{input_def_snap.name}"] = str( - input_def_snap._asdict() + input_def_snap.__dict__ ) if Constant.DATAHUB_INPUTS in input_def_snap.metadata: datajob.inlets.extend( @@ -533,7 +533,7 @@ def generate_datajob( for output_def_snap in op_def_snap.output_def_snaps: job_property_bag[f"output_{output_def_snap.name}"] = str( - output_def_snap._asdict() + output_def_snap.__dict__ ) if ( Constant.DATAHUB_OUTPUTS in output_def_snap.metadata diff --git a/metadata-ingestion/src/datahub/emitter/rest_emitter.py b/metadata-ingestion/src/datahub/emitter/rest_emitter.py index e2bc14925ad383..675717b5ec4829 100644 --- a/metadata-ingestion/src/datahub/emitter/rest_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/rest_emitter.py @@ -291,6 +291,7 @@ def emit_mcps( mcps: List[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]], async_flag: Optional[bool] = None, ) -> int: + logger.debug("Attempting to emit batch mcps") url = f"{self._gms_server}/aspects?action=ingestProposalBatch" for mcp in mcps: ensure_has_system_metadata(mcp) @@ -303,15 +304,22 @@ def emit_mcps( current_chunk_size = INGEST_MAX_PAYLOAD_BYTES for mcp_obj in mcp_objs: mcp_obj_size = len(json.dumps(mcp_obj)) + logger.debug( + f"Iterating through object with size {mcp_obj_size} (type: {mcp_obj.get('aspectName')}" + ) if ( mcp_obj_size + current_chunk_size > INGEST_MAX_PAYLOAD_BYTES or len(mcp_obj_chunks[-1]) >= BATCH_INGEST_MAX_PAYLOAD_LENGTH ): + logger.debug("Decided to create new chunk") mcp_obj_chunks.append([]) current_chunk_size = 0 mcp_obj_chunks[-1].append(mcp_obj) current_chunk_size += mcp_obj_size + logger.debug( + f"Decided to send {len(mcps)} mcps in {len(mcp_obj_chunks)} chunks" + ) for mcp_obj_chunk in mcp_obj_chunks: # TODO: We're calling json.dumps on each MCP object twice, once to estimate @@ -338,8 +346,15 @@ def emit_usage(self, usageStats: UsageAggregation) -> None: def _emit_generic(self, url: str, payload: str) -> None: curl_command = make_curl_command(self._session, "POST", url, payload) + payload_size = len(payload) + if payload_size > INGEST_MAX_PAYLOAD_BYTES: + # since we know total payload size here, we could simply avoid sending such payload at all and report a warning, with current approach we are going to cause whole ingestion to fail + logger.warning( + f"Apparent payload size exceeded {INGEST_MAX_PAYLOAD_BYTES}, might fail with an exception due to the size" + ) logger.debug( - "Attempting to emit to DataHub GMS; using curl equivalent to:\n%s", + "Attempting to emit aspect (size: %s) to DataHub GMS; using curl equivalent to:\n%s", + payload_size, curl_command, ) try: diff --git a/metadata-ingestion/src/datahub/ingestion/api/auto_work_units/auto_ensure_aspect_size.py b/metadata-ingestion/src/datahub/ingestion/api/auto_work_units/auto_ensure_aspect_size.py new file mode 100644 index 00000000000000..559f0b77f59dfa --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/api/auto_work_units/auto_ensure_aspect_size.py @@ -0,0 +1,96 @@ +import json +import logging +from typing import Iterable, List + +from datahub.emitter.rest_emitter import INGEST_MAX_PAYLOAD_BYTES +from datahub.emitter.serialization_helper import pre_json_transform +from datahub.ingestion.api.source import SourceReport +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.metadata.schema_classes import ( + DatasetProfileClass, + SchemaFieldClass, + SchemaMetadataClass, +) + +logger = logging.getLogger(__name__) + + +class EnsureAspectSizeProcessor: + def __init__( + self, report: SourceReport, payload_constraint: int = INGEST_MAX_PAYLOAD_BYTES + ): + self.report = report + self.payload_constraint = payload_constraint + + def ensure_dataset_profile_size( + self, dataset_urn: str, profile: DatasetProfileClass + ) -> None: + """ + This is quite arbitrary approach to ensuring dataset profile aspect does not exceed allowed size, might be adjusted + in the future + """ + sample_fields_size = 0 + if profile.fieldProfiles: + logger.debug(f"Length of field profiles: {len(profile.fieldProfiles)}") + for field in profile.fieldProfiles: + if field.sampleValues: + values_len = 0 + for value in field.sampleValues: + if value: + values_len += len(value) + logger.debug( + f"Field {field.fieldPath} has {len(field.sampleValues)} sample values, taking total bytes {values_len}" + ) + if sample_fields_size + values_len > self.payload_constraint: + field.sampleValues = [] + self.report.warning( + title="Dataset profile truncated due to size constraint", + message="Dataset profile contained too much data and would have caused ingestion to fail", + context=f"Sample values for field {field.fieldPath} were removed from dataset profile for {dataset_urn} due to aspect size constraints", + ) + else: + sample_fields_size += values_len + else: + logger.debug(f"Field {field.fieldPath} has no sample values") + + def ensure_schema_metadata_size( + self, dataset_urn: str, schema: SchemaMetadataClass + ) -> None: + """ + This is quite arbitrary approach to ensuring schema metadata aspect does not exceed allowed size, might be adjusted + in the future + """ + total_fields_size = 0 + logger.debug(f"Amount of schema fields: {len(schema.fields)}") + accepted_fields: List[SchemaFieldClass] = [] + for field in schema.fields: + field_size = len(json.dumps(pre_json_transform(field.to_obj()))) + logger.debug(f"Field {field.fieldPath} takes total {field_size}") + if total_fields_size + field_size < self.payload_constraint: + accepted_fields.append(field) + total_fields_size += field_size + else: + self.report.warning( + title="Schema truncated due to size constraint", + message="Dataset schema contained too much data and would have caused ingestion to fail", + context=f"Field {field.fieldPath} was removed from schema for {dataset_urn} due to aspect size constraints", + ) + + schema.fields = accepted_fields + + def ensure_aspect_size( + self, + stream: Iterable[MetadataWorkUnit], + ) -> Iterable[MetadataWorkUnit]: + """ + We have hard limitation of aspect size being 16 MB. Some aspects can exceed that value causing an exception + on GMS side and failure of the entire ingestion. This processor will attempt to trim suspected aspects. + """ + for wu in stream: + logger.debug(f"Ensuring size of workunit: {wu.id}") + + if schema := wu.get_aspect_of_type(SchemaMetadataClass): + self.ensure_schema_metadata_size(wu.get_urn(), schema) + elif profile := wu.get_aspect_of_type(DatasetProfileClass): + self.ensure_dataset_profile_size(wu.get_urn(), profile) + yield wu diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/aws_common.py b/metadata-ingestion/src/datahub/ingestion/source/aws/aws_common.py index 161aed5bb59881..b76eb95def1ede 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/aws_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/aws_common.py @@ -1,7 +1,12 @@ +import logging +import os from datetime import datetime, timedelta, timezone -from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union +from enum import Enum +from http import HTTPStatus +from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Tuple, Union import boto3 +import requests from boto3.session import Session from botocore.config import DEFAULT_TIMEOUT, Config from botocore.utils import fix_s3_host @@ -14,6 +19,8 @@ ) from datahub.configuration.source_common import EnvConfigMixin +logger = logging.getLogger(__name__) + if TYPE_CHECKING: from mypy_boto3_dynamodb import DynamoDBClient from mypy_boto3_glue import GlueClient @@ -22,6 +29,26 @@ from mypy_boto3_sts import STSClient +class AwsEnvironment(Enum): + EC2 = "EC2" + ECS = "ECS" + EKS = "EKS" + LAMBDA = "LAMBDA" + APP_RUNNER = "APP_RUNNER" + BEANSTALK = "ELASTIC_BEANSTALK" + CLOUD_FORMATION = "CLOUD_FORMATION" + UNKNOWN = "UNKNOWN" + + +class AwsServicePrincipal(Enum): + LAMBDA = "lambda.amazonaws.com" + EKS = "eks.amazonaws.com" + APP_RUNNER = "apprunner.amazonaws.com" + ECS = "ecs.amazonaws.com" + ELASTIC_BEANSTALK = "elasticbeanstalk.amazonaws.com" + EC2 = "ec2.amazonaws.com" + + class AwsAssumeRoleConfig(PermissiveConfigModel): # Using the PermissiveConfigModel to allow the user to pass additional arguments. @@ -34,6 +61,163 @@ class AwsAssumeRoleConfig(PermissiveConfigModel): ) +def get_instance_metadata_token() -> Optional[str]: + """Get IMDSv2 token""" + try: + response = requests.put( + "http://169.254.169.254/latest/api/token", + headers={"X-aws-ec2-metadata-token-ttl-seconds": "21600"}, + timeout=1, + ) + if response.status_code == HTTPStatus.OK: + return response.text + except requests.exceptions.RequestException: + logger.debug("Failed to get IMDSv2 token") + return None + + +def is_running_on_ec2() -> bool: + """Check if code is running on EC2 using IMDSv2""" + token = get_instance_metadata_token() + if not token: + return False + + try: + response = requests.get( + "http://169.254.169.254/latest/meta-data/instance-id", + headers={"X-aws-ec2-metadata-token": token}, + timeout=1, + ) + return response.status_code == HTTPStatus.OK + except requests.exceptions.RequestException: + return False + + +def detect_aws_environment() -> AwsEnvironment: + """ + Detect the AWS environment we're running in. + Order matters as some environments may have multiple indicators. + """ + # Check Lambda first as it's most specific + if os.getenv("AWS_LAMBDA_FUNCTION_NAME"): + if os.getenv("AWS_EXECUTION_ENV", "").startswith("CloudFormation"): + return AwsEnvironment.CLOUD_FORMATION + return AwsEnvironment.LAMBDA + + # Check EKS (IRSA) + if os.getenv("AWS_WEB_IDENTITY_TOKEN_FILE") and os.getenv("AWS_ROLE_ARN"): + return AwsEnvironment.EKS + + # Check App Runner + if os.getenv("AWS_APP_RUNNER_SERVICE_ID"): + return AwsEnvironment.APP_RUNNER + + # Check ECS + if os.getenv("ECS_CONTAINER_METADATA_URI_V4") or os.getenv( + "ECS_CONTAINER_METADATA_URI" + ): + return AwsEnvironment.ECS + + # Check Elastic Beanstalk + if os.getenv("ELASTIC_BEANSTALK_ENVIRONMENT_NAME"): + return AwsEnvironment.BEANSTALK + + if is_running_on_ec2(): + return AwsEnvironment.EC2 + + return AwsEnvironment.UNKNOWN + + +def get_instance_role_arn() -> Optional[str]: + """Get role ARN from EC2 instance metadata using IMDSv2""" + token = get_instance_metadata_token() + if not token: + return None + + try: + response = requests.get( + "http://169.254.169.254/latest/meta-data/iam/security-credentials/", + headers={"X-aws-ec2-metadata-token": token}, + timeout=1, + ) + if response.status_code == 200: + role_name = response.text.strip() + if role_name: + sts = boto3.client("sts") + identity = sts.get_caller_identity() + return identity.get("Arn") + except Exception as e: + logger.debug(f"Failed to get instance role ARN: {e}") + return None + + +def get_lambda_role_arn() -> Optional[str]: + """Get the Lambda function's role ARN""" + try: + function_name = os.getenv("AWS_LAMBDA_FUNCTION_NAME") + if not function_name: + return None + + lambda_client = boto3.client("lambda") + function_config = lambda_client.get_function_configuration( + FunctionName=function_name + ) + return function_config.get("Role") + except Exception as e: + logger.debug(f"Failed to get Lambda role ARN: {e}") + return None + + +def get_current_identity() -> Tuple[Optional[str], Optional[str]]: + """ + Get the current role ARN and source type based on the runtime environment. + Returns (role_arn, credential_source) + """ + env = detect_aws_environment() + + if env == AwsEnvironment.LAMBDA: + role_arn = get_lambda_role_arn() + return role_arn, AwsServicePrincipal.LAMBDA.value + + elif env == AwsEnvironment.EKS: + role_arn = os.getenv("AWS_ROLE_ARN") + return role_arn, AwsServicePrincipal.EKS.value + + elif env == AwsEnvironment.APP_RUNNER: + try: + sts = boto3.client("sts") + identity = sts.get_caller_identity() + return identity.get("Arn"), AwsServicePrincipal.APP_RUNNER.value + except Exception as e: + logger.debug(f"Failed to get App Runner role: {e}") + + elif env == AwsEnvironment.ECS: + try: + metadata_uri = os.getenv("ECS_CONTAINER_METADATA_URI_V4") or os.getenv( + "ECS_CONTAINER_METADATA_URI" + ) + if metadata_uri: + response = requests.get(f"{metadata_uri}/task", timeout=1) + if response.status_code == HTTPStatus.OK: + task_metadata = response.json() + if "TaskARN" in task_metadata: + return ( + task_metadata.get("TaskARN"), + AwsServicePrincipal.ECS.value, + ) + except Exception as e: + logger.debug(f"Failed to get ECS task role: {e}") + + elif env == AwsEnvironment.BEANSTALK: + # Beanstalk uses EC2 instance metadata + return get_instance_role_arn(), AwsServicePrincipal.ELASTIC_BEANSTALK.value + + elif env == AwsEnvironment.EC2: + return get_instance_role_arn(), AwsServicePrincipal.EC2.value + + return None, None + + def assume_role( role: AwsAssumeRoleConfig, aws_region: Optional[str], @@ -95,7 +279,7 @@ class AwsConnectionConfig(ConfigModel): ) aws_profile: Optional[str] = Field( default=None, - description="Named AWS profile to use. Only used if access key / secret are unset. If not set the default will be used", + description="The [named profile](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-profiles.html) to use from AWS credentials. Falls back to default profile if not specified and no access keys provided. Profiles are configured in ~/.aws/credentials or ~/.aws/config.", ) aws_region: Optional[str] = Field(None, description="AWS region code.") @@ -145,6 +329,7 @@ def _normalized_aws_roles(self) -> List[AwsAssumeRoleConfig]: def get_session(self) -> Session: if self.aws_access_key_id and self.aws_secret_access_key: + # Explicit credentials take precedence session = Session( aws_access_key_id=self.aws_access_key_id, aws_secret_access_key=self.aws_secret_access_key, @@ -152,38 +337,57 @@ def get_session(self) -> Session: region_name=self.aws_region, ) elif self.aws_profile: + # Named profile is second priority session = Session( region_name=self.aws_region, profile_name=self.aws_profile ) else: - # Use boto3's credential autodetection. + # Use boto3's credential autodetection session = Session(region_name=self.aws_region) - if self._normalized_aws_roles(): - # Use existing session credentials to start the chain of role assumption. - current_credentials = session.get_credentials() - credentials = { - "AccessKeyId": current_credentials.access_key, - "SecretAccessKey": current_credentials.secret_key, - "SessionToken": current_credentials.token, - } - - for role in self._normalized_aws_roles(): - if self._should_refresh_credentials(): - credentials = assume_role( - role, - self.aws_region, - credentials=credentials, + target_roles = self._normalized_aws_roles() + if target_roles: + current_role_arn, credential_source = get_current_identity() + + # Only assume role if: + # 1. We're not in a known AWS environment with a role, or + # 2. We need to assume a different role than our current one + should_assume_role = current_role_arn is None or any( + role.RoleArn != current_role_arn for role in target_roles + ) + + if should_assume_role: + env = detect_aws_environment() + logger.debug(f"Assuming role(s) from {env.value} environment") + + current_credentials = session.get_credentials() + if current_credentials is None: + raise ValueError("No credentials available for role assumption") + + credentials = { + "AccessKeyId": current_credentials.access_key, + "SecretAccessKey": current_credentials.secret_key, + "SessionToken": current_credentials.token, + } + + for role in target_roles: + if self._should_refresh_credentials(): + credentials = assume_role( + role=role, + aws_region=self.aws_region, + credentials=credentials, + ) + if isinstance(credentials["Expiration"], datetime): + self._credentials_expiration = credentials["Expiration"] + + session = Session( + aws_access_key_id=credentials["AccessKeyId"], + aws_secret_access_key=credentials["SecretAccessKey"], + aws_session_token=credentials["SessionToken"], + region_name=self.aws_region, ) - if isinstance(credentials["Expiration"], datetime): - self._credentials_expiration = credentials["Expiration"] - - session = Session( - aws_access_key_id=credentials["AccessKeyId"], - aws_secret_access_key=credentials["SecretAccessKey"], - aws_session_token=credentials["SessionToken"], - region_name=self.aws_region, - ) + else: + logger.debug(f"Using existing role from {credential_source}") return session diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py index a3304334cb1ebc..cd3c2146e6d848 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py @@ -14,6 +14,17 @@ DEFAULT_DATABASE_TABLE_NAME = "metadata_aspect_v2" DEFAULT_KAFKA_TOPIC_NAME = "MetadataChangeLog_Timeseries_v1" DEFAULT_DATABASE_BATCH_SIZE = 10_000 +DEFAULT_EXCLUDE_ASPECTS = { + "dataHubIngestionSourceKey", + "dataHubIngestionSourceInfo", + "datahubIngestionRunSummary", + "datahubIngestionCheckpoint", + "dataHubSecretKey", + "dataHubSecretValue", + "globalSettingsKey", + "globalSettingsInfo", + "testResults", +} class DataHubSourceConfig(StatefulIngestionConfigBase): @@ -44,7 +55,7 @@ class DataHubSourceConfig(StatefulIngestionConfigBase): ) exclude_aspects: Set[str] = Field( - default_factory=set, + default=DEFAULT_EXCLUDE_ASPECTS, description="Set of aspect names to exclude from ingestion", ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi-lexical-grammar.rule b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi-lexical-grammar.rule index 51a0dff288558f..f237e2503317f2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi-lexical-grammar.rule +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi-lexical-grammar.rule @@ -21,6 +21,11 @@ // | empty_string // | empty_string "," argument_list // - Added sql_string in any_literal +// - Added WS_INLINE? in field expression +// Added to ignore any comments +// %ignore WS // Ignore whitespace +// %ignore CPP_COMMENT // Ignore single-line comments +// %ignore C_COMMENT // Ignore multi-line comments lexical_unit: lexical_elements? @@ -245,6 +250,8 @@ operator_or_punctuator: "," | "=>" | ".." | "..." + | "{{" + | "}}" document: section_document | expression_document @@ -275,6 +282,7 @@ expression: logical_or_expression | if_expression | error_raising_expression | error_handling_expression + | outer_expression logical_or_expression: logical_and_expression @@ -376,6 +384,8 @@ sql_content: /(?:[^\"\\]|\\[\"]|\"\"|\#\(lf\))+/ sql_string: "\"" sql_content "\"" +outer_expression: "{{" expression "}}" + argument_list: WS_INLINE? expression | WS_INLINE? expression WS_INLINE? "," WS_INLINE? argument_list | WS_INLINE? sql_string @@ -409,7 +419,7 @@ record_expression: "[" field_list? "]" field_list: field | field "," field_list -field: field_name WS_INLINE? "=" WS_INLINE? expression +field: WS_INLINE? field_name WS_INLINE? "=" WS_INLINE? expression field_name: generalized_identifier | quoted_identifier @@ -621,4 +631,8 @@ any_literal: record_literal %import common.DIGIT %import common.LF %import common.CR -%import common.ESCAPED_STRING \ No newline at end of file +%import common.ESCAPED_STRING + +%ignore WS // Ignore whitespace +%ignore CPP_COMMENT // Ignore single-line comments +%ignore C_COMMENT // Ignore multi-line comments \ No newline at end of file diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py index c769c6705ac3f6..69f28a0e6e595a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py @@ -265,64 +265,17 @@ def _populate_external_upstreams(self, discovered_tables: List[str]) -> None: with PerfTimer() as timer: self.report.num_external_table_edges_scanned = 0 - for ( - known_lineage_mapping - ) in self._populate_external_lineage_from_copy_history(discovered_tables): - self.sql_aggregator.add(known_lineage_mapping) - logger.info( - "Done populating external lineage from copy history. " - f"Found {self.report.num_external_table_edges_scanned} external lineage edges so far." - ) - - for ( - known_lineage_mapping - ) in self._populate_external_lineage_from_show_query(discovered_tables): - self.sql_aggregator.add(known_lineage_mapping) - - logger.info( - "Done populating external lineage from show external tables. " - f"Found {self.report.num_external_table_edges_scanned} external lineage edges so far." - ) + for entry in self._get_copy_history_lineage(discovered_tables): + self.sql_aggregator.add(entry) + logger.info("Done populating external lineage from copy history. ") self.report.external_lineage_queries_secs = timer.elapsed_seconds() - # Handles the case for explicitly created external tables. - # NOTE: Snowflake does not log this information to the access_history table. - def _populate_external_lineage_from_show_query( - self, discovered_tables: List[str] - ) -> Iterable[KnownLineageMapping]: - external_tables_query: str = SnowflakeQuery.show_external_tables() - try: - for db_row in self.connection.query(external_tables_query): - key = self.identifiers.get_dataset_identifier( - db_row["name"], db_row["schema_name"], db_row["database_name"] - ) - - if key not in discovered_tables: - continue - if db_row["location"].startswith("s3://"): - yield KnownLineageMapping( - upstream_urn=make_s3_urn_for_lineage( - db_row["location"], self.config.env - ), - downstream_urn=self.identifiers.gen_dataset_urn(key), - ) - self.report.num_external_table_edges_scanned += 1 - - self.report.num_external_table_edges_scanned += 1 - except Exception as e: - logger.debug(e, exc_info=e) - self.structured_reporter.warning( - "Error populating external table lineage from Snowflake", - exc=e, - ) - self.report_status(EXTERNAL_LINEAGE, False) - # Handles the case where a table is populated from an external stage/s3 location via copy. # Eg: copy into category_english from @external_s3_stage; # Eg: copy into category_english from 's3://acryl-snow-demo-olist/olist_raw_data/category_english'credentials=(aws_key_id='...' aws_secret_key='...') pattern='.*.csv'; # NOTE: Snowflake does not log this information to the access_history table. - def _populate_external_lineage_from_copy_history( + def _get_copy_history_lineage( self, discovered_tables: List[str] ) -> Iterable[KnownLineageMapping]: query: str = SnowflakeQuery.copy_lineage_history( diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py index 2d2bdc50467c64..174aad0bddd4a8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py @@ -247,9 +247,6 @@ def get_workunits_internal( for entry in self.fetch_copy_history(): queries.append(entry) - # TODO: Add "show external tables" lineage to the main schema extractor. - # Because it's not a time-based thing, it doesn't really make sense in the snowflake-queries extractor. - with self.report.query_log_fetch_timer: for entry in self.fetch_query_log(): queries.append(entry) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py index bc64693b6a1084..4b72b09fafe2dd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py @@ -16,6 +16,7 @@ ClassificationHandler, classification_workunit_processor, ) +from datahub.ingestion.source.aws.s3_util import make_s3_urn_for_lineage from datahub.ingestion.source.common.subtypes import ( DatasetContainerSubTypes, DatasetSubTypes, @@ -35,6 +36,7 @@ ) from datahub.ingestion.source.snowflake.snowflake_data_reader import SnowflakeDataReader from datahub.ingestion.source.snowflake.snowflake_profiler import SnowflakeProfiler +from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery from datahub.ingestion.source.snowflake.snowflake_report import SnowflakeV2Report from datahub.ingestion.source.snowflake.snowflake_schema import ( SCHEMA_PARALLELISM, @@ -65,6 +67,7 @@ get_domain_wu, ) from datahub.ingestion.source_report.ingestion_stage import ( + EXTERNAL_TABLE_DDL_LINEAGE, METADATA_EXTRACTION, PROFILING, ) @@ -96,7 +99,10 @@ TimeType, ) from datahub.metadata.com.linkedin.pegasus2avro.tag import TagProperties -from datahub.sql_parsing.sql_parsing_aggregator import SqlParsingAggregator +from datahub.sql_parsing.sql_parsing_aggregator import ( + KnownLineageMapping, + SqlParsingAggregator, +) from datahub.utilities.registries.domain_registry import DomainRegistry from datahub.utilities.threaded_iterator_executor import ThreadedIteratorExecutor @@ -180,7 +186,8 @@ def __init__( # These are populated as side-effects of get_workunits_internal. self.databases: List[SnowflakeDatabase] = [] - self.aggregator: Optional[SqlParsingAggregator] = aggregator + + self.aggregator = aggregator def get_connection(self) -> SnowflakeConnection: return self.connection @@ -212,6 +219,19 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: self.report.set_ingestion_stage(snowflake_db.name, METADATA_EXTRACTION) yield from self._process_database(snowflake_db) + self.report.set_ingestion_stage("*", EXTERNAL_TABLE_DDL_LINEAGE) + discovered_tables: List[str] = [ + self.identifiers.get_dataset_identifier( + table_name, schema.name, db.name + ) + for db in self.databases + for schema in db.schemas + for table_name in schema.tables + ] + if self.aggregator: + for entry in self._external_tables_ddl_lineage(discovered_tables): + self.aggregator.add(entry) + except SnowflakePermissionError as e: self.structured_reporter.failure( GENERIC_PERMISSION_ERROR_KEY, @@ -1082,3 +1102,33 @@ def get_fk_constraints_for_table( # Access to table but none of its constraints - is this possible ? return constraints.get(table_name, []) + + # Handles the case for explicitly created external tables. + # NOTE: Snowflake does not log this information to the access_history table. + def _external_tables_ddl_lineage( + self, discovered_tables: List[str] + ) -> Iterable[KnownLineageMapping]: + external_tables_query: str = SnowflakeQuery.show_external_tables() + try: + for db_row in self.connection.query(external_tables_query): + key = self.identifiers.get_dataset_identifier( + db_row["name"], db_row["schema_name"], db_row["database_name"] + ) + + if key not in discovered_tables: + continue + if db_row["location"].startswith("s3://"): + yield KnownLineageMapping( + upstream_urn=make_s3_urn_for_lineage( + db_row["location"], self.config.env + ), + downstream_urn=self.identifiers.gen_dataset_urn(key), + ) + self.report.num_external_table_edges_scanned += 1 + + self.report.num_external_table_edges_scanned += 1 + except Exception as e: + self.structured_reporter.warning( + "External table ddl lineage extraction failed", + exc=e, + ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py index e5883dd0349a3a..884e6c49f5b62a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py @@ -161,35 +161,32 @@ def __init__(self, ctx: PipelineContext, config: SnowflakeV2Config): # For database, schema, tables, views, etc self.data_dictionary = SnowflakeDataDictionary(connection=self.connection) self.lineage_extractor: Optional[SnowflakeLineageExtractor] = None - self.aggregator: Optional[SqlParsingAggregator] = None - - if self.config.use_queries_v2 or self.config.include_table_lineage: - self.aggregator = self._exit_stack.enter_context( - SqlParsingAggregator( - platform=self.identifiers.platform, - platform_instance=self.config.platform_instance, - env=self.config.env, - graph=self.ctx.graph, - eager_graph_load=( - # If we're ingestion schema metadata for tables/views, then we will populate - # schemas into the resolver as we go. We only need to do a bulk fetch - # if we're not ingesting schema metadata as part of ingestion. - not ( - self.config.include_technical_schema - and self.config.include_tables - and self.config.include_views - ) - and not self.config.lazy_schema_resolver - ), - generate_usage_statistics=False, - generate_operations=False, - format_queries=self.config.format_sql_queries, - ) + + self.aggregator: SqlParsingAggregator = self._exit_stack.enter_context( + SqlParsingAggregator( + platform=self.identifiers.platform, + platform_instance=self.config.platform_instance, + env=self.config.env, + graph=self.ctx.graph, + eager_graph_load=( + # If we're ingestion schema metadata for tables/views, then we will populate + # schemas into the resolver as we go. We only need to do a bulk fetch + # if we're not ingesting schema metadata as part of ingestion. + not ( + self.config.include_technical_schema + and self.config.include_tables + and self.config.include_views + ) + and not self.config.lazy_schema_resolver + ), + generate_usage_statistics=False, + generate_operations=False, + format_queries=self.config.format_sql_queries, ) - self.report.sql_aggregator = self.aggregator.report + ) + self.report.sql_aggregator = self.aggregator.report if self.config.include_table_lineage: - assert self.aggregator is not None redundant_lineage_run_skip_handler: Optional[ RedundantLineageRunSkipHandler ] = None @@ -487,8 +484,6 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: databases = schema_extractor.databases - # TODO: The checkpoint state for stale entity detection can be committed here. - if self.config.shares: yield from SnowflakeSharesHandler( self.config, self.report diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py index 59f301baf40165..fad54fda453786 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py @@ -1,7 +1,10 @@ import json import logging import re -from typing import Any, Dict, Iterable, List, Optional, Union +from dataclasses import dataclass +from enum import Enum +from typing import Any, Dict, Iterable, List, Optional, Tuple, Union +from urllib.parse import urlparse from pydantic.class_validators import validator from pydantic.fields import Field @@ -11,7 +14,12 @@ from pyhive.sqlalchemy_hive import HiveDate, HiveDecimal, HiveDialect, HiveTimestamp from sqlalchemy.engine.reflection import Inspector -from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance +from datahub.emitter.mce_builder import ( + make_data_platform_urn, + make_dataplatform_instance_urn, + make_dataset_urn_with_platform_instance, + make_schema_field_urn, +) from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.decorators import ( SourceCapability, @@ -29,14 +37,24 @@ TwoTierSQLAlchemyConfig, TwoTierSQLAlchemySource, ) -from datahub.metadata.com.linkedin.pegasus2avro.schema import ( +from datahub.metadata.schema_classes import ( + DataPlatformInstanceClass, + DatasetLineageTypeClass, + DatasetPropertiesClass, DateTypeClass, + FineGrainedLineageClass, + FineGrainedLineageDownstreamTypeClass, + FineGrainedLineageUpstreamTypeClass, NullTypeClass, NumberTypeClass, - SchemaField, + OtherSchemaClass, + SchemaFieldClass, + SchemaMetadataClass, TimeTypeClass, + UpstreamClass, + UpstreamLineageClass, + ViewPropertiesClass, ) -from datahub.metadata.schema_classes import ViewPropertiesClass from datahub.utilities import config_clean from datahub.utilities.hive_schema_to_avro import get_avro_schema_for_hive_column @@ -46,6 +64,511 @@ register_custom_type(HiveTimestamp, TimeTypeClass) register_custom_type(HiveDecimal, NumberTypeClass) + +class StoragePlatform(Enum): + """Enumeration of storage platforms supported for lineage""" + + S3 = "s3" + AZURE = "abs" + GCS = "gcs" + DBFS = "dbfs" + LOCAL = "file" + HDFS = "hdfs" + + +# Mapping of URL schemes to storage platforms +STORAGE_SCHEME_MAPPING = { + # S3 and derivatives + "s3": StoragePlatform.S3, + "s3a": StoragePlatform.S3, + "s3n": StoragePlatform.S3, + # Azure and derivatives + "abfs": StoragePlatform.AZURE, + "abfss": StoragePlatform.AZURE, + "adl": StoragePlatform.AZURE, + "adls": StoragePlatform.AZURE, + "wasb": StoragePlatform.AZURE, + "wasbs": StoragePlatform.AZURE, + # GCS and derivatives + "gs": StoragePlatform.GCS, + "gcs": StoragePlatform.GCS, + # DBFS + "dbfs": StoragePlatform.DBFS, + # Local filesystem + "file": StoragePlatform.LOCAL, + # HDFS + "hdfs": StoragePlatform.HDFS, +} + + +class StoragePathParser: + """Parser for storage paths with platform-specific logic""" + + @staticmethod + def parse_storage_location(location: str) -> Optional[Tuple[StoragePlatform, str]]: + """ + Parse a storage location into platform and normalized path. + + Args: + location: Storage location URI (e.g., s3://bucket/path, abfss://container@account.dfs.core.windows.net/path) + + Returns: + Tuple of (StoragePlatform, normalized_path) if valid, None if invalid + """ + + try: + # Handle special case for local files with no scheme + if location.startswith("/"): + return StoragePlatform.LOCAL, location + + # Parse the URI + parsed = urlparse(location) + scheme = parsed.scheme.lower() + + if not scheme: + return None + + # Look up the platform + platform = STORAGE_SCHEME_MAPPING.get(scheme) + if not platform: + return None + + # Get normalized path based on platform + if platform == StoragePlatform.S3: + # For S3, combine bucket and path + path = f"{parsed.netloc}/{parsed.path.lstrip('/')}" + + elif platform == StoragePlatform.AZURE: + if scheme in ("abfs", "abfss"): + # Format: abfss://container@account.dfs.core.windows.net/path + container = parsed.netloc.split("@")[0] + path = f"{container}/{parsed.path.lstrip('/')}" + else: + # Handle other Azure schemes + path = f"{parsed.netloc}/{parsed.path.lstrip('/')}" + + elif platform == StoragePlatform.GCS: + # For GCS, combine bucket and path + path = f"{parsed.netloc}/{parsed.path.lstrip('/')}" + + elif platform == StoragePlatform.DBFS: + # For DBFS, use path as-is + path = parsed.path.lstrip("/") + + elif platform == StoragePlatform.LOCAL: + # For local files, use full path + path = f"{parsed.netloc}/{parsed.path.lstrip('/')}" + + elif platform == StoragePlatform.HDFS: + # For HDFS, use full path + path = f"{parsed.netloc}/{parsed.path.lstrip('/')}" + + else: + return None + + # Clean up the path + path = path.rstrip("/") # Remove trailing slashes + path = re.sub(r"/+", "/", path) # Normalize multiple slashes + path = f"/{path}" + + return platform, path + + except Exception as exp: + logger.warning(f"Failed to parse storage location {location}: {exp}") + return None + + @staticmethod + def get_platform_name(platform: StoragePlatform) -> str: + """Get the platform name to use in URNs""" + + platform_names = { + StoragePlatform.S3: "s3", + StoragePlatform.AZURE: "adls", + StoragePlatform.GCS: "gcs", + StoragePlatform.DBFS: "dbfs", + StoragePlatform.LOCAL: "file", + StoragePlatform.HDFS: "hdfs", + } + return platform_names[platform] + + +class HiveStorageLineageConfig: + """Configuration for Hive storage lineage.""" + + def __init__( + self, + emit_storage_lineage: bool, + hive_storage_lineage_direction: str, + include_column_lineage: bool, + storage_platform_instance: Optional[str], + ): + if hive_storage_lineage_direction.lower() not in ["upstream", "downstream"]: + raise ValueError( + "hive_storage_lineage_direction must be either upstream or downstream" + ) + + self.emit_storage_lineage = emit_storage_lineage + self.hive_storage_lineage_direction = hive_storage_lineage_direction.lower() + self.include_column_lineage = include_column_lineage + self.storage_platform_instance = storage_platform_instance + + +@dataclass +class HiveStorageSourceReport: + """Report for tracking storage lineage statistics""" + + storage_locations_scanned: int = 0 + filtered_locations: List[str] = Field(default_factory=list) + failed_locations: List[str] = Field(default_factory=list) + + def report_location_scanned(self) -> None: + self.storage_locations_scanned += 1 + + def report_location_filtered(self, location: str) -> None: + self.filtered_locations.append(location) + + def report_location_failed(self, location: str) -> None: + self.failed_locations.append(location) + + +class HiveStorageLineage: + """Handles storage lineage for Hive tables""" + + def __init__( + self, + config: HiveStorageLineageConfig, + env: str, + convert_urns_to_lowercase: bool = False, + ): + self.config = config + self.env = env + self.convert_urns_to_lowercase = convert_urns_to_lowercase + self.report = HiveStorageSourceReport() + + def _make_dataset_platform_instance( + self, + platform: str, + instance: Optional[str], + ) -> DataPlatformInstanceClass: + """Create DataPlatformInstance aspect""" + + return DataPlatformInstanceClass( + platform=make_data_platform_urn(platform), + instance=make_dataplatform_instance_urn(platform, instance) + if instance + else None, + ) + + def _make_storage_dataset_urn( + self, + storage_location: str, + ) -> Optional[Tuple[str, str]]: + """ + Create storage dataset URN from location. + Returns tuple of (urn, platform) if successful, None otherwise. + """ + + platform_instance = None + storage_info = StoragePathParser.parse_storage_location(storage_location) + if not storage_info: + logger.debug(f"Could not parse storage location: {storage_location}") + return None + + platform, path = storage_info + platform_name = StoragePathParser.get_platform_name(platform) + + if self.convert_urns_to_lowercase: + platform_name = platform_name.lower() + path = path.lower() + if self.config.storage_platform_instance: + platform_instance = self.config.storage_platform_instance.lower() + + try: + storage_urn = make_dataset_urn_with_platform_instance( + platform=platform_name, + name=path, + env=self.env, + platform_instance=platform_instance, + ) + return storage_urn, platform_name + except Exception as exp: + logger.error(f"Failed to create URN for {platform_name}:{path}: {exp}") + return None + + def _get_fine_grained_lineages( + self, + dataset_urn: str, + storage_urn: str, + dataset_schema: SchemaMetadataClass, + storage_schema: SchemaMetadataClass, + ) -> Iterable[FineGrainedLineageClass]: + """Generate column-level lineage between dataset and storage""" + + if not self.config.include_column_lineage: + return + + for dataset_field in dataset_schema.fields: + dataset_path = dataset_field.fieldPath + + # Find matching field in storage schema + matching_field = next( + (f for f in storage_schema.fields if f.fieldPath == dataset_path), + None, + ) + + if matching_field: + if self.config.hive_storage_lineage_direction == "upstream": + yield FineGrainedLineageClass( + upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET, + upstreams=[ + make_schema_field_urn( + parent_urn=storage_urn, + field_path=matching_field.fieldPath, + ) + ], + downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD, + downstreams=[ + make_schema_field_urn( + parent_urn=dataset_urn, + field_path=dataset_path, + ) + ], + ) + else: + yield FineGrainedLineageClass( + upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET, + upstreams=[ + make_schema_field_urn( + parent_urn=dataset_urn, + field_path=dataset_path, + ) + ], + downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD, + downstreams=[ + make_schema_field_urn( + parent_urn=storage_urn, + field_path=matching_field.fieldPath, + ) + ], + ) + + def _create_lineage_mcp( + self, + source_urn: str, + target_urn: str, + fine_grained_lineages: Optional[Iterable[FineGrainedLineageClass]] = None, + ) -> Iterable[MetadataWorkUnit]: + """Create lineage MCP between source and target datasets""" + + lineages_list = ( + list(fine_grained_lineages) if fine_grained_lineages is not None else None + ) + + upstream_lineage = UpstreamLineageClass( + upstreams=[ + UpstreamClass(dataset=source_urn, type=DatasetLineageTypeClass.COPY) + ], + fineGrainedLineages=lineages_list, + ) + + yield MetadataWorkUnit( + id=f"{source_urn}-{target_urn}-lineage", + mcp=MetadataChangeProposalWrapper( + entityUrn=target_urn, aspect=upstream_lineage + ), + ) + + def get_storage_dataset_mcp( + self, + storage_location: str, + platform_instance: Optional[str] = None, + schema_metadata: Optional[SchemaMetadataClass] = None, + ) -> Iterable[MetadataWorkUnit]: + """ + Generate MCPs for storage dataset if needed. + This creates the storage dataset entity in DataHub. + """ + + storage_info = StoragePathParser.parse_storage_location( + storage_location, + ) + if not storage_info: + return + + platform, path = storage_info + platform_name = StoragePathParser.get_platform_name(platform) + + if self.convert_urns_to_lowercase: + platform_name = platform_name.lower() + path = path.lower() + if self.config.storage_platform_instance: + platform_instance = self.config.storage_platform_instance.lower() + + try: + storage_urn = make_dataset_urn_with_platform_instance( + platform=platform_name, + name=path, + env=self.env, + platform_instance=platform_instance, + ) + + # Dataset properties + props = DatasetPropertiesClass(name=path) + yield MetadataWorkUnit( + id=f"storage-{storage_urn}-props", + mcp=MetadataChangeProposalWrapper( + entityUrn=storage_urn, + aspect=props, + ), + ) + + # Platform instance + platform_instance_aspect = self._make_dataset_platform_instance( + platform=platform_name, + instance=platform_instance, + ) + yield MetadataWorkUnit( + id=f"storage-{storage_urn}-platform", + mcp=MetadataChangeProposalWrapper( + entityUrn=storage_urn, aspect=platform_instance_aspect + ), + ) + + # Schema if available + if schema_metadata: + storage_schema = SchemaMetadataClass( + schemaName=f"{platform.value}_schema", + platform=f"urn:li:dataPlatform:{platform.value}", + version=0, + fields=schema_metadata.fields, + hash="", + platformSchema=OtherSchemaClass(rawSchema=""), + ) + yield MetadataWorkUnit( + id=f"storage-{storage_urn}-schema", + mcp=MetadataChangeProposalWrapper( + entityUrn=storage_urn, aspect=storage_schema + ), + ) + + except Exception as e: + logger.error( + f"Failed to create storage dataset MCPs for {storage_location}: {e}" + ) + return + + def get_lineage_mcp( + self, + dataset_urn: str, + table: Dict[str, Any], + dataset_schema: Optional[SchemaMetadataClass] = None, + ) -> Iterable[MetadataWorkUnit]: + """ + Generate lineage MCP for a Hive table to its storage location. + + Args: + dataset_urn: URN of the Hive dataset + table: Hive table dictionary containing metadata + dataset_schema: Optional schema metadata for the Hive dataset + + Returns: + MetadataWorkUnit containing the lineage MCP if successful + """ + + platform_instance = None + + if not self.config.emit_storage_lineage: + return + + # Get storage location from table + storage_location = table.get("StorageDescriptor", {}).get("Location") + if not storage_location: + return + + # Create storage dataset URN + storage_info = self._make_storage_dataset_urn(storage_location) + if not storage_info: + self.report.report_location_failed(storage_location) + return + + storage_urn, storage_platform = storage_info + self.report.report_location_scanned() + + if self.config.storage_platform_instance: + platform_instance = self.config.storage_platform_instance.lower() + + # Create storage dataset entity + yield from self.get_storage_dataset_mcp( + storage_location=storage_location, + platform_instance=platform_instance, + schema_metadata=dataset_schema, + ) + + # Get storage schema if available (implement based on storage system) + storage_schema = ( + self._get_storage_schema(storage_location, dataset_schema) + if dataset_schema + else None + ) + + # Generate fine-grained lineage if schemas available + fine_grained_lineages = ( + None + if not (dataset_schema and storage_schema) + else self._get_fine_grained_lineages( + dataset_urn, storage_urn, dataset_schema, storage_schema + ) + ) + + # Create lineage MCP + if self.config.hive_storage_lineage_direction == "upstream": + yield from self._create_lineage_mcp( + source_urn=storage_urn, + target_urn=dataset_urn, + fine_grained_lineages=fine_grained_lineages, + ) + else: + yield from self._create_lineage_mcp( + source_urn=dataset_urn, + target_urn=storage_urn, + fine_grained_lineages=fine_grained_lineages, + ) + + def _get_storage_schema( + self, + storage_location: str, + table_schema: Optional[SchemaMetadataClass] = None, + ) -> Optional[SchemaMetadataClass]: + """ + Get schema metadata for storage location. + Currently supports: + - Delta tables + - Parquet files + - Spark tables + + Returns: + SchemaMetadataClass if schema can be inferred, None otherwise + """ + + if not table_schema: + return None + + storage_info = StoragePathParser.parse_storage_location(storage_location) + if not storage_info: + return None + + platform, _ = storage_info + + return SchemaMetadataClass( + schemaName=f"{platform.value}_schema", + platform=f"urn:li:dataPlatform:{platform.value}", + version=0, + fields=table_schema.fields, + hash="", + platformSchema=OtherSchemaClass(rawSchema=""), + ) + + try: from databricks_dbapi.sqlalchemy_dialects.hive import DatabricksPyhiveDialect from pyhive.sqlalchemy_hive import _type_map @@ -94,8 +617,8 @@ def dbapi_get_columns_patched(self, connection, table_name, schema=None, **kw): DatabricksPyhiveDialect.get_columns = dbapi_get_columns_patched except ModuleNotFoundError: pass -except Exception as e: - logger.warning(f"Failed to patch method due to {e}") +except Exception as exp: + logger.warning(f"Failed to patch method due to {exp}") @reflection.cache # type: ignore @@ -126,10 +649,48 @@ class HiveConfig(TwoTierSQLAlchemyConfig): # defaults scheme: str = Field(default="hive", hidden_from_docs=True) + # Overriding as table location lineage is richer implementation here than with include_table_location_lineage + include_table_location_lineage: bool = Field(default=False, hidden_from_docs=True) + + emit_storage_lineage: bool = Field( + default=False, + description="Whether to emit storage-to-Hive lineage", + ) + hive_storage_lineage_direction: str = Field( + default="upstream", + description="If 'upstream', storage is upstream to Hive. If 'downstream' storage is downstream to Hive", + ) + include_column_lineage: bool = Field( + default=True, + description="When enabled, column-level lineage will be extracted from storage", + ) + storage_platform_instance: Optional[str] = Field( + default=None, + description="Platform instance for the storage system", + ) + @validator("host_port") def clean_host_port(cls, v): return config_clean.remove_protocol(v) + @validator("hive_storage_lineage_direction") + def _validate_direction(cls, v: str) -> str: + """Validate the lineage direction.""" + if v.lower() not in ["upstream", "downstream"]: + raise ValueError( + "storage_lineage_direction must be either upstream or downstream" + ) + return v.lower() + + def get_storage_lineage_config(self) -> HiveStorageLineageConfig: + """Convert base config parameters to HiveStorageLineageConfig""" + return HiveStorageLineageConfig( + emit_storage_lineage=self.emit_storage_lineage, + hive_storage_lineage_direction=self.hive_storage_lineage_direction, + include_column_lineage=self.include_column_lineage, + storage_platform_instance=self.storage_platform_instance, + ) + @platform_name("Hive") @config_class(HiveConfig) @@ -151,12 +712,49 @@ class HiveSource(TwoTierSQLAlchemySource): def __init__(self, config, ctx): super().__init__(config, ctx, "hive") + self.storage_lineage = HiveStorageLineage( + config=config.get_storage_lineage_config(), + env=config.env, + convert_urns_to_lowercase=config.convert_urns_to_lowercase, + ) @classmethod def create(cls, config_dict, ctx): config = HiveConfig.parse_obj(config_dict) return cls(config, ctx) + def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: + """Generate workunits for tables and their storage lineage.""" + for wu in super().get_workunits_internal(): + yield wu + + if not isinstance(wu, MetadataWorkUnit): + continue + + # Get dataset URN and required aspects using workunit methods + try: + dataset_urn = wu.get_urn() + dataset_props = wu.get_aspect_of_type(DatasetPropertiesClass) + schema_metadata = wu.get_aspect_of_type(SchemaMetadataClass) + except Exception as exp: + logger.warning(f"Failed to process workunit {wu.id}: {exp}") + continue + + # Only proceed if we have the necessary properties + if dataset_props and dataset_props.customProperties: + table = { + "StorageDescriptor": { + "Location": dataset_props.customProperties.get("Location") + } + } + + if table.get("StorageDescriptor", {}).get("Location"): + yield from self.storage_lineage.get_lineage_mcp( + dataset_urn=dataset_urn, + table=table, + dataset_schema=schema_metadata, + ) + def get_schema_names(self, inspector): assert isinstance(self.config, HiveConfig) # This condition restricts the ingestion to the specified database. @@ -173,7 +771,7 @@ def get_schema_fields_for_column( pk_constraints: Optional[Dict[Any, Any]] = None, partition_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, - ) -> List[SchemaField]: + ) -> List[SchemaFieldClass]: fields = super().get_schema_fields_for_column( dataset_name, column, diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py index fadcb8ff8f3966..984cf9357199d6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py @@ -49,6 +49,7 @@ DatasetSourceConfigMixin, ) from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated +from datahub.configuration.validate_field_removal import pydantic_removed_field from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import ( ContainerKey, @@ -380,11 +381,6 @@ class TableauConfig( description="[advanced] Number of metadata objects (e.g. CustomSQLTable, PublishedDatasource, etc) to query at a time using the Tableau API.", ) - fetch_size: int = Field( - default=250, - description="Specifies the number of records to retrieve in each batch during a query execution.", - ) - # We've found that even with a small workbook page size (e.g. 10), the Tableau API often # returns warnings like this: # { @@ -499,6 +495,10 @@ class TableauConfig( "This can only be used with ingest_tags enabled as it will overwrite tags entered from the UI.", ) + _fetch_size = pydantic_removed_field( + "fetch_size", + ) + # pre = True because we want to take some decision before pydantic initialize the configuration to default values @root_validator(pre=True) def projects_backward_compatibility(cls, values: Dict) -> Dict: @@ -1147,7 +1147,7 @@ def get_connection_object_page( connection_type: str, query_filter: str, current_cursor: Optional[str], - fetch_size: int = 250, + fetch_size: int, retry_on_auth_error: bool = True, retries_remaining: Optional[int] = None, ) -> Tuple[dict, Optional[str], int]: @@ -1344,7 +1344,11 @@ def get_connection_objects( connection_type=connection_type, query_filter=filter_, current_cursor=current_cursor, - fetch_size=self.config.fetch_size, + # `filter_page` contains metadata object IDs (e.g., Project IDs, Field IDs, Sheet IDs, etc.). + # The number of IDs is always less than or equal to page_size. + # If the IDs are primary keys, the number of metadata objects to load matches the number of records to return. + # In our case, mostly, the IDs are primary key, therefore, fetch_size is set equal to page_size. + fetch_size=page_size, ) yield from connection_objects.get(c.NODES) or [] diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index 9d9a746580f939..7bfa7fdb28aaf8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -26,6 +26,9 @@ gen_containers, ) from datahub.emitter.sql_parsing_builder import SqlParsingBuilder +from datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size import ( + EnsureAspectSizeProcessor, +) from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.decorators import ( SupportStatus, @@ -260,6 +263,7 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: StaleEntityRemovalHandler.create( self, self.config, self.ctx ).workunit_processor, + EnsureAspectSizeProcessor(self.get_report()).ensure_aspect_size, ] def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: diff --git a/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py b/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py index 4308b405e46e37..92407eaae6e901 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py +++ b/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py @@ -14,6 +14,7 @@ USAGE_EXTRACTION_INGESTION = "Usage Extraction Ingestion" USAGE_EXTRACTION_OPERATIONAL_STATS = "Usage Extraction Operational Stats" USAGE_EXTRACTION_USAGE_AGGREGATION = "Usage Extraction Usage Aggregation" +EXTERNAL_TABLE_DDL_LINEAGE = "External table DDL Lineage" QUERIES_EXTRACTION = "Queries Extraction" PROFILING = "Profiling" diff --git a/metadata-ingestion/tests/integration/powerbi/test_m_parser.py b/metadata-ingestion/tests/integration/powerbi/test_m_parser.py index 63821f9038a88c..832d00d9c54702 100644 --- a/metadata-ingestion/tests/integration/powerbi/test_m_parser.py +++ b/metadata-ingestion/tests/integration/powerbi/test_m_parser.py @@ -1171,3 +1171,39 @@ def test_m_query_timeout(mock_get_lark_parser): assert ( is_entry_present ), 'Warning message "M-Query Parsing Timeout" should be present in reporter' + + +def test_comments_in_m_query(): + q: str = 'let\n Source = Snowflake.Databases("xaa48144.snowflakecomputing.com", "COMPUTE_WH", [Role="ACCOUNTADMIN"]),\n SNOWFLAKE_SAMPLE_DATA_Database = Source{[Name="SNOWFLAKE_SAMPLE_DATA", Kind="Database"]}[Data],\n TPCDS_SF100TCL_Schema = SNOWFLAKE_SAMPLE_DATA_Database{[Name="TPCDS_SF100TCL", Kind="Schema"]}[Data],\n ITEM_Table = TPCDS_SF100TCL_Schema{[Name="ITEM", Kind="Table"]}[Data],\n \n // Group by I_BRAND and calculate the count\n BrandCountsTable = Table.Group(ITEM_Table, {"I_BRAND"}, {{"BrandCount", each Table.RowCount(_), Int64.Type}})\nin\n BrandCountsTable' + + table: powerbi_data_classes.Table = powerbi_data_classes.Table( + columns=[], + measures=[], + expression=q, + name="pet_price_index", + full_name="datalake.sandbox_pet.pet_price_index", + ) + + reporter = PowerBiDashboardSourceReport() + + ctx, config, platform_instance_resolver = get_default_instances() + + data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( + table, + reporter, + ctx=ctx, + config=config, + platform_instance_resolver=platform_instance_resolver, + parameters={ + "hostname": "xyz.databricks.com", + "http_path": "/sql/1.0/warehouses/abc", + "catalog": "cat", + "schema": "public", + }, + )[0].upstreams + + assert len(data_platform_tables) == 1 + assert ( + data_platform_tables[0].urn + == "urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpcds_sf100tcl.item,PROD)" + ) diff --git a/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py b/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py index 4b2ac96931b950..c3a8880bf20a09 100644 --- a/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py +++ b/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py @@ -27,6 +27,7 @@ from datahub.ingestion.source.tableau import tableau_constant as c from datahub.ingestion.source.tableau.tableau import ( TableauConfig, + TableauProject, TableauSiteSource, TableauSource, TableauSourceReport, @@ -1324,6 +1325,7 @@ def test_permission_warning(pytestconfig, tmp_path, mock_datahub_graph): query_filter=mock.MagicMock(), current_cursor=None, retries_remaining=1, + fetch_size=10, ) warnings = list(reporter.warnings) @@ -1341,6 +1343,82 @@ def test_permission_warning(pytestconfig, tmp_path, mock_datahub_graph): @freeze_time(FROZEN_TIME) +@pytest.mark.parametrize( + "extract_project_hierarchy, allowed_projects", + [ + (True, ["project1", "project4", "project3"]), + (False, ["project1", "project4"]), + ], +) +def test_extract_project_hierarchy(extract_project_hierarchy, allowed_projects): + context = PipelineContext(run_id="0", pipeline_name="test_tableau") + + config_dict = config_source_default.copy() + + del config_dict["stateful_ingestion"] + del config_dict["projects"] + + config_dict["project_pattern"] = { + "allow": ["project1", "project4"], + "deny": ["project2"], + } + + config_dict["extract_project_hierarchy"] = extract_project_hierarchy + + config = TableauConfig.parse_obj(config_dict) + + site_source = TableauSiteSource( + config=config, + ctx=context, + platform="tableau", + site=SiteItem(name="Site 1", content_url="site1"), + site_id="site1", + report=TableauSourceReport(), + server=Server("https://test-tableau-server.com"), + ) + + all_project_map: Dict[str, TableauProject] = { + "p1": TableauProject( + id="1", + name="project1", + path=[], + parent_id=None, + parent_name=None, + description=None, + ), + "p2": TableauProject( + id="2", + name="project2", + path=[], + parent_id="1", + parent_name="project1", + description=None, + ), + "p3": TableauProject( + id="3", + name="project3", + path=[], + parent_id="1", + parent_name="project1", + description=None, + ), + "p4": TableauProject( + id="4", + name="project4", + path=[], + parent_id=None, + parent_name=None, + description=None, + ), + } + + site_source._init_tableau_project_registry(all_project_map) + + assert allowed_projects == [ + project.name for project in site_source.tableau_project_registry.values() + ] + + @pytest.mark.integration def test_connection_report_test(requests_mock): server_info_response = """ diff --git a/metadata-ingestion/tests/unit/api/source_helpers/test_ensure_aspect_size.py b/metadata-ingestion/tests/unit/api/source_helpers/test_ensure_aspect_size.py new file mode 100644 index 00000000000000..bdf1e0a2e0e860 --- /dev/null +++ b/metadata-ingestion/tests/unit/api/source_helpers/test_ensure_aspect_size.py @@ -0,0 +1,346 @@ +import json +import time +from unittest.mock import patch + +import pytest +from freezegun.api import freeze_time + +from datahub.emitter.aspect import JSON_CONTENT_TYPE +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.rest_emitter import INGEST_MAX_PAYLOAD_BYTES +from datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size import ( + EnsureAspectSizeProcessor, +) +from datahub.ingestion.api.source import SourceReport +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent +from datahub.metadata.schema_classes import ( + ChangeTypeClass, + DatasetFieldProfileClass, + DatasetProfileClass, + DatasetSnapshotClass, + GenericAspectClass, + MetadataChangeProposalClass, + NumberTypeClass, + OtherSchemaClass, + SchemaFieldClass, + SchemaFieldDataTypeClass, + SchemaMetadataClass, + StatusClass, + StringTypeClass, + SubTypesClass, +) + + +@pytest.fixture +def processor(): + return EnsureAspectSizeProcessor(SourceReport()) + + +def too_big_schema_metadata() -> SchemaMetadataClass: + fields = [ + SchemaFieldClass( + "aaaa", + nativeDataType="int", + type=SchemaFieldDataTypeClass(type=NumberTypeClass()), + ), + SchemaFieldClass( + "bbbb", + nativeDataType="string", + type=SchemaFieldDataTypeClass(type=StringTypeClass()), + ), + SchemaFieldClass( + "cccc", + nativeDataType="int", + type=SchemaFieldDataTypeClass(type=NumberTypeClass()), + ), + ] + # simple int type field takes ~160 bytes in JSON representation, below is to assure we exceed the threshold + for f_no in range(1000): + fields.append( + SchemaFieldClass( + fieldPath=f"t{f_no}", + nativeDataType="int", + type=SchemaFieldDataTypeClass(type=NumberTypeClass()), + description=20000 * "a", + ) + ) + + # adding small field to check whether it will still be present in the output + fields.append( + SchemaFieldClass( + "dddd", + nativeDataType="int", + type=SchemaFieldDataTypeClass(type=NumberTypeClass()), + ) + ) + return SchemaMetadataClass( + schemaName="abcdef", + version=1, + platform="s3", + hash="ABCDE1234567890", + platformSchema=OtherSchemaClass(rawSchema="aaa"), + fields=fields, + ) + + +def proper_schema_metadata() -> SchemaMetadataClass: + fields = [ + SchemaFieldClass( + "aaaa", + nativeDataType="int", + type=SchemaFieldDataTypeClass(type=NumberTypeClass()), + ), + SchemaFieldClass( + "bbbb", + nativeDataType="string", + type=SchemaFieldDataTypeClass(type=StringTypeClass()), + ), + SchemaFieldClass( + "cccc", + nativeDataType="int", + type=SchemaFieldDataTypeClass(type=NumberTypeClass()), + ), + ] + return SchemaMetadataClass( + schemaName="abcdef", + version=1, + platform="s3", + hash="ABCDE1234567890", + platformSchema=OtherSchemaClass(rawSchema="aaa"), + fields=fields, + ) + + +def proper_dataset_profile() -> DatasetProfileClass: + sample_values = [ + "23483295", + "234234", + "324234", + "12123", + "3150314", + "19231", + "211", + "93498", + "12837", + "73847", + "12434", + "33466", + "98785", + "4546", + "4547", + "342", + "11", + "34", + "444", + "38576", + ] + field_profiles = [ + DatasetFieldProfileClass(fieldPath="a", sampleValues=sample_values), + DatasetFieldProfileClass(fieldPath="b", sampleValues=sample_values), + DatasetFieldProfileClass(fieldPath="c", sampleValues=sample_values), + DatasetFieldProfileClass(fieldPath="d", sampleValues=sample_values), + DatasetFieldProfileClass(fieldPath="e", sampleValues=sample_values), + DatasetFieldProfileClass(fieldPath="f", sampleValues=sample_values), + DatasetFieldProfileClass(fieldPath="g", sampleValues=sample_values), + DatasetFieldProfileClass(fieldPath="h", sampleValues=sample_values), + DatasetFieldProfileClass(fieldPath="i", sampleValues=sample_values), + DatasetFieldProfileClass(fieldPath="j", sampleValues=sample_values), + ] + return DatasetProfileClass( + timestampMillis=int(time.time()) * 1000, fieldProfiles=field_profiles + ) + + +@freeze_time("2023-01-02 00:00:00") +def test_ensure_size_of_proper_dataset_profile(processor): + profile = proper_dataset_profile() + orig_repr = json.dumps(profile.to_obj()) + processor.ensure_dataset_profile_size( + "urn:li:dataset:(s3, dummy_dataset, DEV)", profile + ) + assert orig_repr == json.dumps( + profile.to_obj() + ), "Aspect was modified in case where workunit processor should have been no-op" + + +@freeze_time("2023-01-02 00:00:00") +def test_ensure_size_of_too_big_schema_metadata(processor): + schema = too_big_schema_metadata() + assert len(schema.fields) == 1004 + + processor.ensure_schema_metadata_size( + "urn:li:dataset:(s3, dummy_dataset, DEV)", schema + ) + assert len(schema.fields) < 1004, "Schema has not been properly truncated" + assert schema.fields[-1].fieldPath == "dddd", "Small field was not added at the end" + # +100kb is completely arbitrary, but we are truncating the aspect based on schema fields size only, not total taken + # by other parameters of the aspect - it is reasonable approach though - schema fields is the only field in schema + # metadata which can be expected to grow out of control + assert ( + len(json.dumps(schema.to_obj())) < INGEST_MAX_PAYLOAD_BYTES + 100000 + ), "Aspect exceeded acceptable size" + + +@freeze_time("2023-01-02 00:00:00") +def test_ensure_size_of_proper_schema_metadata(processor): + schema = proper_schema_metadata() + orig_repr = json.dumps(schema.to_obj()) + processor.ensure_schema_metadata_size( + "urn:li:dataset:(s3, dummy_dataset, DEV)", schema + ) + assert orig_repr == json.dumps( + schema.to_obj() + ), "Aspect was modified in case where workunit processor should have been no-op" + + +@freeze_time("2023-01-02 00:00:00") +def test_ensure_size_of_too_big_dataset_profile(processor): + profile = proper_dataset_profile() + big_field = DatasetFieldProfileClass( + fieldPath="big", + sampleValues=20 * [(int(INGEST_MAX_PAYLOAD_BYTES / 20) - 10) * "a"], + ) + assert profile.fieldProfiles + profile.fieldProfiles.insert(4, big_field) + processor.ensure_dataset_profile_size( + "urn:li:dataset:(s3, dummy_dataset, DEV)", profile + ) + + expected_profile = proper_dataset_profile() + reduced_field = DatasetFieldProfileClass( + fieldPath="big", + sampleValues=[], + ) + assert expected_profile.fieldProfiles + expected_profile.fieldProfiles.insert(4, reduced_field) + assert json.dumps(profile.to_obj()) == json.dumps( + expected_profile.to_obj() + ), "Field 'big' was not properly removed from aspect due to its size" + + +@freeze_time("2023-01-02 00:00:00") +@patch( + "datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size.EnsureAspectSizeProcessor.ensure_schema_metadata_size" +) +@patch( + "datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size.EnsureAspectSizeProcessor.ensure_dataset_profile_size" +) +def test_wu_processor_triggered_by_data_profile_aspect( + ensure_dataset_profile_size_mock, ensure_schema_metadata_size_mock, processor +): + ret = [ # noqa: F841 + *processor.ensure_aspect_size( + [ + MetadataChangeProposalWrapper( + entityUrn="urn:li:dataset:(urn:li:dataPlatform:s3, dummy_name, DEV)", + aspect=proper_dataset_profile(), + ).as_workunit() + ] + ) + ] + ensure_dataset_profile_size_mock.assert_called_once() + ensure_schema_metadata_size_mock.assert_not_called() + + +@freeze_time("2023-01-02 00:00:00") +@patch( + "datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size.EnsureAspectSizeProcessor.ensure_schema_metadata_size" +) +@patch( + "datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size.EnsureAspectSizeProcessor.ensure_dataset_profile_size" +) +def test_wu_processor_triggered_by_data_profile_aspect_mcpc( + ensure_dataset_profile_size_mock, ensure_schema_metadata_size_mock, processor +): + profile_aspect = proper_dataset_profile() + mcpc = MetadataWorkUnit( + id="test", + mcp_raw=MetadataChangeProposalClass( + entityType="dataset", + changeType=ChangeTypeClass.UPSERT, + entityUrn="urn:li:dataset:(urn:li:dataPlatform:s3, dummy_name, DEV)", + aspectName=DatasetProfileClass.ASPECT_NAME, + aspect=GenericAspectClass( + value=json.dumps(profile_aspect.to_obj()).encode(), + contentType=JSON_CONTENT_TYPE, + ), + ), + ) + ret = [*processor.ensure_aspect_size([mcpc])] # noqa: F841 + ensure_dataset_profile_size_mock.assert_called_once() + ensure_schema_metadata_size_mock.assert_not_called() + + +@freeze_time("2023-01-02 00:00:00") +@patch( + "datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size.EnsureAspectSizeProcessor.ensure_schema_metadata_size" +) +@patch( + "datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size.EnsureAspectSizeProcessor.ensure_dataset_profile_size" +) +def test_wu_processor_triggered_by_data_profile_aspect_mce( + ensure_dataset_profile_size_mock, ensure_schema_metadata_size_mock, processor +): + snapshot = DatasetSnapshotClass( + urn="urn:li:dataset:(urn:li:dataPlatform:s3, dummy_name, DEV)", + aspects=[proper_schema_metadata()], + ) + mce = MetadataWorkUnit( + id="test", mce=MetadataChangeEvent(proposedSnapshot=snapshot) + ) + ret = [*processor.ensure_aspect_size([mce])] # noqa: F841 + ensure_schema_metadata_size_mock.assert_called_once() + ensure_dataset_profile_size_mock.assert_not_called() + + +@freeze_time("2023-01-02 00:00:00") +@patch( + "datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size.EnsureAspectSizeProcessor.ensure_schema_metadata_size" +) +@patch( + "datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size.EnsureAspectSizeProcessor.ensure_dataset_profile_size" +) +def test_wu_processor_triggered_by_schema_metadata_aspect( + ensure_dataset_profile_size_mock, ensure_schema_metadata_size_mock, processor +): + ret = [ # noqa: F841 + *processor.ensure_aspect_size( + [ + MetadataChangeProposalWrapper( + entityUrn="urn:li:dataset:(urn:li:dataPlatform:s3, dummy_name, DEV)", + aspect=proper_schema_metadata(), + ).as_workunit() + ] + ) + ] + ensure_schema_metadata_size_mock.assert_called_once() + ensure_dataset_profile_size_mock.assert_not_called() + + +@freeze_time("2023-01-02 00:00:00") +@patch( + "datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size.EnsureAspectSizeProcessor.ensure_schema_metadata_size" +) +@patch( + "datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size.EnsureAspectSizeProcessor.ensure_dataset_profile_size" +) +def test_wu_processor_not_triggered_by_unhandled_aspects( + ensure_dataset_profile_size_mock, ensure_schema_metadata_size_mock, processor +): + ret = [ # noqa: F841 + *processor.ensure_aspect_size( + [ + MetadataChangeProposalWrapper( + entityUrn="urn:li:dataset:(urn:li:dataPlatform:s3, dummy_name, DEV)", + aspect=StatusClass(removed=False), + ).as_workunit(), + MetadataChangeProposalWrapper( + entityUrn="urn:li:dataset:(urn:li:dataPlatform:s3, dummy_name, DEV)", + aspect=SubTypesClass(typeNames=["table"]), + ).as_workunit(), + ] + ) + ] + ensure_schema_metadata_size_mock.assert_not_called() + ensure_dataset_profile_size_mock.assert_not_called() diff --git a/metadata-ingestion/tests/unit/test_aws_common.py b/metadata-ingestion/tests/unit/test_aws_common.py new file mode 100644 index 00000000000000..9291fb91134b1c --- /dev/null +++ b/metadata-ingestion/tests/unit/test_aws_common.py @@ -0,0 +1,328 @@ +import json +import os +from unittest.mock import MagicMock, patch + +import boto3 +import pytest +from moto import mock_iam, mock_lambda, mock_sts + +from datahub.ingestion.source.aws.aws_common import ( + AwsConnectionConfig, + AwsEnvironment, + detect_aws_environment, + get_current_identity, + get_instance_metadata_token, + get_instance_role_arn, + is_running_on_ec2, +) + + +@pytest.fixture +def mock_aws_config(): + return AwsConnectionConfig( + aws_access_key_id="test-key", + aws_secret_access_key="test-secret", + aws_region="us-east-1", + ) + + +class TestAwsCommon: + def test_environment_detection_no_environment(self): + """Test environment detection when no AWS environment is present""" + with patch.dict(os.environ, {}, clear=True): + assert detect_aws_environment() == AwsEnvironment.UNKNOWN + + def test_environment_detection_lambda(self): + """Test Lambda environment detection""" + with patch.dict(os.environ, {"AWS_LAMBDA_FUNCTION_NAME": "test-function"}): + assert detect_aws_environment() == AwsEnvironment.LAMBDA + + def test_environment_detection_lambda_cloudformation(self): + """Test CloudFormation Lambda environment detection""" + with patch.dict( + os.environ, + { + "AWS_LAMBDA_FUNCTION_NAME": "test-function", + "AWS_EXECUTION_ENV": "CloudFormation.xxx", + }, + ): + assert detect_aws_environment() == AwsEnvironment.CLOUD_FORMATION + + def test_environment_detection_eks(self): + """Test EKS environment detection""" + with patch.dict( + os.environ, + { + "AWS_WEB_IDENTITY_TOKEN_FILE": "/var/run/secrets/token", + "AWS_ROLE_ARN": "arn:aws:iam::123456789012:role/test-role", + }, + ): + assert detect_aws_environment() == AwsEnvironment.EKS + + def test_environment_detection_app_runner(self): + """Test App Runner environment detection""" + with patch.dict(os.environ, {"AWS_APP_RUNNER_SERVICE_ID": "service-id"}): + assert detect_aws_environment() == AwsEnvironment.APP_RUNNER + + def test_environment_detection_ecs(self): + """Test ECS environment detection""" + with patch.dict( + os.environ, {"ECS_CONTAINER_METADATA_URI_V4": "http://169.254.170.2/v4"} + ): + assert detect_aws_environment() == AwsEnvironment.ECS + + def test_environment_detection_beanstalk(self): + """Test Elastic Beanstalk environment detection""" + with patch.dict(os.environ, {"ELASTIC_BEANSTALK_ENVIRONMENT_NAME": "my-env"}): + assert detect_aws_environment() == AwsEnvironment.BEANSTALK + + @patch("requests.put") + def test_ec2_metadata_token(self, mock_put): + """Test EC2 metadata token retrieval""" + mock_put.return_value.status_code = 200 + mock_put.return_value.text = "token123" + + token = get_instance_metadata_token() + assert token == "token123" + + mock_put.assert_called_once_with( + "http://169.254.169.254/latest/api/token", + headers={"X-aws-ec2-metadata-token-ttl-seconds": "21600"}, + timeout=1, + ) + + @patch("requests.put") + def test_ec2_metadata_token_failure(self, mock_put): + """Test EC2 metadata token failure case""" + mock_put.return_value.status_code = 404 + + token = get_instance_metadata_token() + assert token is None + + @patch("requests.get") + @patch("requests.put") + def test_is_running_on_ec2(self, mock_put, mock_get): + """Test EC2 instance detection with IMDSv2""" + mock_put.return_value.status_code = 200 + mock_put.return_value.text = "token123" + mock_get.return_value.status_code = 200 + + assert is_running_on_ec2() is True + + mock_put.assert_called_once_with( + "http://169.254.169.254/latest/api/token", + headers={"X-aws-ec2-metadata-token-ttl-seconds": "21600"}, + timeout=1, + ) + mock_get.assert_called_once_with( + "http://169.254.169.254/latest/meta-data/instance-id", + headers={"X-aws-ec2-metadata-token": "token123"}, + timeout=1, + ) + + @patch("requests.get") + @patch("requests.put") + def test_is_running_on_ec2_failure(self, mock_put, mock_get): + """Test EC2 instance detection failure""" + mock_put.return_value.status_code = 404 + assert is_running_on_ec2() is False + + mock_put.return_value.status_code = 200 + mock_put.return_value.text = "token123" + mock_get.return_value.status_code = 404 + assert is_running_on_ec2() is False + + @mock_sts + @mock_lambda + @mock_iam + def test_get_current_identity_lambda(self): + """Test getting identity in Lambda environment""" + with patch.dict( + os.environ, + { + "AWS_LAMBDA_FUNCTION_NAME": "test-function", + "AWS_DEFAULT_REGION": "us-east-1", + }, + ): + # Create IAM role first with proper trust policy + iam_client = boto3.client("iam", region_name="us-east-1") + trust_policy = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": {"Service": "lambda.amazonaws.com"}, + "Action": "sts:AssumeRole", + } + ], + } + iam_client.create_role( + RoleName="test-role", AssumeRolePolicyDocument=json.dumps(trust_policy) + ) + + lambda_client = boto3.client("lambda", region_name="us-east-1") + lambda_client.create_function( + FunctionName="test-function", + Runtime="python3.8", + Role="arn:aws:iam::123456789012:role/test-role", + Handler="index.handler", + Code={"ZipFile": b"def handler(event, context): pass"}, + ) + + role_arn, source = get_current_identity() + assert source == "lambda.amazonaws.com" + assert role_arn == "arn:aws:iam::123456789012:role/test-role" + + @patch("requests.get") + @patch("requests.put") + @mock_sts + def test_get_instance_role_arn_success(self, mock_put, mock_get): + """Test getting EC2 instance role ARN""" + mock_put.return_value.status_code = 200 + mock_put.return_value.text = "token123" + mock_get.return_value.status_code = 200 + mock_get.return_value.text = "test-role" + + with patch("boto3.client") as mock_boto: + mock_sts = MagicMock() + mock_sts.get_caller_identity.return_value = { + "Arn": "arn:aws:sts::123456789012:assumed-role/test-role/instance" + } + mock_boto.return_value = mock_sts + + role_arn = get_instance_role_arn() + assert ( + role_arn == "arn:aws:sts::123456789012:assumed-role/test-role/instance" + ) + + @mock_sts + def test_aws_connection_config_basic(self, mock_aws_config): + """Test basic AWS connection configuration""" + session = mock_aws_config.get_session() + creds = session.get_credentials() + assert creds.access_key == "test-key" + assert creds.secret_key == "test-secret" + + @mock_sts + def test_aws_connection_config_with_session_token(self): + """Test AWS connection with session token""" + config = AwsConnectionConfig( + aws_access_key_id="test-key", + aws_secret_access_key="test-secret", + aws_session_token="test-token", + aws_region="us-east-1", + ) + + session = config.get_session() + creds = session.get_credentials() + assert creds.token == "test-token" + + @mock_sts + def test_aws_connection_config_role_assumption(self): + """Test AWS connection with role assumption""" + config = AwsConnectionConfig( + aws_access_key_id="test-key", + aws_secret_access_key="test-secret", + aws_region="us-east-1", + aws_role="arn:aws:iam::123456789012:role/test-role", + ) + + with patch( + "datahub.ingestion.source.aws.aws_common.get_current_identity" + ) as mock_identity: + mock_identity.return_value = (None, None) + session = config.get_session() + creds = session.get_credentials() + assert creds is not None + + @mock_sts + def test_aws_connection_config_skip_role_assumption(self): + """Test AWS connection skipping role assumption when already in role""" + config = AwsConnectionConfig( + aws_region="us-east-1", + aws_role="arn:aws:iam::123456789012:role/current-role", + ) + + with patch( + "datahub.ingestion.source.aws.aws_common.get_current_identity" + ) as mock_identity: + mock_identity.return_value = ( + "arn:aws:iam::123456789012:role/current-role", + "ec2.amazonaws.com", + ) + session = config.get_session() + assert session is not None + + @mock_sts + def test_aws_connection_config_multiple_roles(self): + """Test AWS connection with multiple role assumption""" + config = AwsConnectionConfig( + aws_access_key_id="test-key", + aws_secret_access_key="test-secret", + aws_region="us-east-1", + aws_role=[ + "arn:aws:iam::123456789012:role/role1", + "arn:aws:iam::123456789012:role/role2", + ], + ) + + with patch( + "datahub.ingestion.source.aws.aws_common.get_current_identity" + ) as mock_identity: + mock_identity.return_value = (None, None) + session = config.get_session() + assert session is not None + + def test_aws_connection_config_validation_error(self): + """Test AWS connection validation""" + with patch.dict( + "os.environ", + { + "AWS_ACCESS_KEY_ID": "test-key", + # Deliberately missing AWS_SECRET_ACCESS_KEY + "AWS_DEFAULT_REGION": "us-east-1", + }, + clear=True, + ): + config = AwsConnectionConfig() # Let it pick up from environment + session = config.get_session() + with pytest.raises( + Exception, + match="Partial credentials found in env, missing: AWS_SECRET_ACCESS_KEY", + ): + session.get_credentials() + + @pytest.mark.parametrize( + "env_vars,expected_environment", + [ + ({}, AwsEnvironment.UNKNOWN), + ({"AWS_LAMBDA_FUNCTION_NAME": "test"}, AwsEnvironment.LAMBDA), + ( + { + "AWS_LAMBDA_FUNCTION_NAME": "test", + "AWS_EXECUTION_ENV": "CloudFormation", + }, + AwsEnvironment.CLOUD_FORMATION, + ), + ( + { + "AWS_WEB_IDENTITY_TOKEN_FILE": "/token", + "AWS_ROLE_ARN": "arn:aws:iam::123:role/test", + }, + AwsEnvironment.EKS, + ), + ({"AWS_APP_RUNNER_SERVICE_ID": "service-123"}, AwsEnvironment.APP_RUNNER), + ( + {"ECS_CONTAINER_METADATA_URI_V4": "http://169.254.170.2"}, + AwsEnvironment.ECS, + ), + ( + {"ELASTIC_BEANSTALK_ENVIRONMENT_NAME": "my-env"}, + AwsEnvironment.BEANSTALK, + ), + ], + ) + def test_environment_detection_parametrized(self, env_vars, expected_environment): + """Parametrized test for environment detection with different configurations""" + with patch.dict(os.environ, env_vars, clear=True): + assert detect_aws_environment() == expected_environment diff --git a/smoke-test/build.gradle b/smoke-test/build.gradle index def3e814b2ba0a..73ecdcb08ea149 100644 --- a/smoke-test/build.gradle +++ b/smoke-test/build.gradle @@ -16,7 +16,7 @@ node { } // Version of node to use. - version = '21.2.0' + version = '22.12.0' // Version of Yarn to use. yarnVersion = '1.22.22'