From 3c10b71586494796dc31f3ef976cb91bf97561e2 Mon Sep 17 00:00:00 2001 From: Chris Lee Date: Sat, 31 Aug 2024 08:00:04 -0700 Subject: [PATCH] feat: invoke test mode to dual write from ingestion in DAOs. (#404) * invoke test mode to dual write from ingestion in DAOs. * refactor utils and add tests --- .../linkedin/metadata/dao/BaseLocalDAO.java | 43 +++-- .../dao/internal/BaseGraphWriterDAO.java | 19 +- .../metadata/dao/BaseLocalDAOTest.java | 12 +- .../metadata/dao/EbeanLocalAccess.java | 21 ++- .../linkedin/metadata/dao/EbeanLocalDAO.java | 46 ++--- .../dao/EbeanLocalRelationshipWriterDAO.java | 29 +-- .../metadata/dao/IEbeanLocalAccess.java | 12 +- .../metadata/dao/ImmutableLocalDAO.java | 4 +- .../metadata/dao/utils/SQLSchemaUtils.java | 43 +++++ .../metadata/dao/utils/SQLStatementUtils.java | 14 +- .../metadata/dao/EbeanLocalAccessTest.java | 28 +-- .../metadata/dao/EbeanLocalDAOTest.java | 84 ++++++++- .../EbeanLocalRelationshipQueryDAOTest.java | 166 +++++++++--------- .../EbeanLocalRelationshipWriterDAOTest.java | 48 ++++- .../dao/utils/SQLStatementUtilsTest.java | 10 +- ...l-with-non-dollar-virtual-column-names.sql | 14 ++ .../resources/ebean-local-dao-create-all.sql | 14 ++ ...bean-local-relationship-dao-create-all.sql | 14 ++ .../dao/internal/Neo4jGraphWriterDAO.java | 2 +- .../metadata/dao/Neo4jQueryDAOTest.java | 38 ++-- .../dao/internal/Neo4jGraphWriterDAOTest.java | 32 ++-- 21 files changed, 457 insertions(+), 236 deletions(-) diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java b/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java index 683a56c7d..e29ca5a4e 100644 --- a/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java @@ -500,7 +500,9 @@ private AddResult addCommon(@Nonnull URN } // Save the newValue as the latest version - long largestVersion = saveLatest(urn, aspectClass, oldValue, oldAuditStamp, newValue, auditStamp, latest.isSoftDeleted, trackingContext); + long largestVersion = + saveLatest(urn, aspectClass, oldValue, oldAuditStamp, newValue, auditStamp, latest.isSoftDeleted, + trackingContext, ingestionParams.isTestMode()); // Apply retention policy applyRetention(urn, aspectClass, getRetention(aspectClass), largestVersion); @@ -588,7 +590,7 @@ public List addMany(@Nonnull URN urn, @Nonnull List AddResult aspectUpdateHelper(URN urn, AspectUpdateLambda updateTuple, @Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext) { - AspectEntry latest = getLatest(urn, updateTuple.getAspectClass()); + AspectEntry latest = getLatest(urn, updateTuple.getAspectClass(), updateTuple.getIngestionParams().isTestMode()); // TODO(yanyang) added for job-gms duplicity debug, throwaway afterwards if (log.isDebugEnabled()) { @@ -743,11 +745,16 @@ public ASPECT add(@Nonnull URN urn, AspectUpdate @Nonnull public ASPECT add(@Nonnull URN urn, AspectUpdateLambda updateLambda, @Nonnull AuditStamp auditStamp, int maxTransactionRetry, @Nullable IngestionTrackingContext trackingContext) { - checkValidAspect(updateLambda.getAspectClass()); - - final AddResult result = runInTransactionWithRetry(() -> aspectUpdateHelper(urn, updateLambda, auditStamp, trackingContext), - maxTransactionRetry); - + final Class aspectClass = updateLambda.getAspectClass(); + checkValidAspect(aspectClass); + // dual-write to test table while test mode is enabled. + if (updateLambda.getIngestionParams().isTestMode()) { + runInTransactionWithRetry(() -> aspectUpdateHelper(urn, updateLambda, auditStamp, trackingContext), + maxTransactionRetry); + } + final AddResult result = runInTransactionWithRetry(() -> aspectUpdateHelper(urn, + new AspectUpdateLambda<>(aspectClass, updateLambda.getUpdateLambda(), + updateLambda.getIngestionParams().setTestMode(false)), auditStamp, trackingContext), maxTransactionRetry); return unwrapAddResult(urn, result, auditStamp, trackingContext); } @@ -779,7 +786,7 @@ public void delete(@Nonnull URN urn, @Nonnull Cl checkValidAspect(aspectClass); runInTransactionWithRetry(() -> { - final AspectEntry latest = getLatest(urn, aspectClass); + final AspectEntry latest = getLatest(urn, aspectClass, false); final IngestionParams ingestionParams = new IngestionParams().setIngestionMode(IngestionMode.LIVE); return addCommon(urn, latest, null, aspectClass, auditStamp, new DefaultEqualityTester<>(), trackingContext, ingestionParams); }, maxTransactionRetry); @@ -821,10 +828,11 @@ public ASPECT add(@Nonnull URN urn, @Nonnull ASP */ @Nonnull public ASPECT add(@Nonnull URN urn, @Nonnull ASPECT newValue, - @Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext, @Nullable IngestionParams ingestionParams) { + @Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext, + @Nullable IngestionParams ingestionParams) { final IngestionParams nonNullIngestionParams = - ingestionParams == null ? new IngestionParams().setIngestionMode(IngestionMode.LIVE).setTestMode(false) - : ingestionParams; + ingestionParams == null || !ingestionParams.hasTestMode() ? new IngestionParams().setIngestionMode( + IngestionMode.LIVE).setTestMode(false) : ingestionParams; return add(urn, (Class) newValue.getClass(), ignored -> newValue, auditStamp, trackingContext, nonNullIngestionParams); } @@ -873,12 +881,13 @@ private void applyRetention(@Nonnull URN urn, @N * @param newEntry {@link RecordTemplate} of the new latest value of aspect * @param newAuditStamp the audit stamp for the operation * @param isSoftDeleted flag to indicate if the previous latest value of aspect was soft deleted + * @param isTestMode whether the test mode is enabled or not * @return the largest version */ protected abstract long saveLatest(@Nonnull URN urn, @Nonnull Class aspectClass, @Nullable ASPECT oldEntry, @Nullable AuditStamp oldAuditStamp, @Nullable ASPECT newEntry, @Nonnull AuditStamp newAuditStamp, boolean isSoftDeleted, - @Nullable IngestionTrackingContext trackingContext); + @Nullable IngestionTrackingContext trackingContext, boolean isTestMode); /** * Saves the new value of an aspect to entity tables. This is used when backfilling metadata from the old schema to @@ -1059,11 +1068,12 @@ public ListResult> getAspects(@Nonnull Set AspectEntry getLatest(@Nonnull URN urn, - @Nonnull Class aspectClass); + @Nonnull Class aspectClass, boolean isTestMode); /** * Gets the next version to use for an entity's specific aspect type. @@ -1083,10 +1093,11 @@ protected abstract long getNextVersion(@Nonnull * @param aspectClass the type of aspect to insert * @param auditStamp the {@link AuditStamp} for the aspect * @param version the version for the aspect + * @param isTestMode whether the test mode is enabled or not */ protected abstract void insert(@Nonnull URN urn, @Nullable RecordTemplate value, @Nonnull Class aspectClass, @Nonnull AuditStamp auditStamp, long version, - @Nullable IngestionTrackingContext trackingContext); + @Nullable IngestionTrackingContext trackingContext, boolean isTestMode); /** * Update an aspect for an entity with specific version and {@link AuditStamp} with optimistic locking. @@ -1097,10 +1108,12 @@ protected abstract void insert(@Nonnull URN urn, * @param newAuditStamp the {@link AuditStamp} for the new aspect * @param version the version for the aspect * @param oldTimestamp the timestamp for the old aspect + * @param isTestMode whether the test mode is enabled or not */ protected abstract void updateWithOptimisticLocking(@Nonnull URN urn, @Nullable RecordTemplate value, @Nonnull Class aspectClass, @Nonnull AuditStamp newAuditStamp, - long version, @Nonnull Timestamp oldTimestamp, @Nullable IngestionTrackingContext trackingContext); + long version, @Nonnull Timestamp oldTimestamp, @Nullable IngestionTrackingContext trackingContext, + boolean isTestMode); /** * Returns a boolean representing if an Urn has any Aspects associated with it (i.e. if it exists in the DB). diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/internal/BaseGraphWriterDAO.java b/dao-api/src/main/java/com/linkedin/metadata/dao/internal/BaseGraphWriterDAO.java index c2ab40108..e0dc9155d 100644 --- a/dao-api/src/main/java/com/linkedin/metadata/dao/internal/BaseGraphWriterDAO.java +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/internal/BaseGraphWriterDAO.java @@ -63,11 +63,10 @@ public void removeEntity(@Nonnull URN urn) throws Exception { * @param relationship the relationship to be persisted * @param relationship type. Must be a type defined in com.linkedin.metadata.relationship. */ - public void addRelationship(@Nonnull RELATIONSHIP relationship) + public void addRelationship(@Nonnull RELATIONSHIP relationship, boolean isTestMode) throws Exception { - addRelationship(relationship, RemovalOption.REMOVE_NONE); + addRelationship(relationship, RemovalOption.REMOVE_NONE, isTestMode); } - /** * Adds a relationship in the graph, with removal operations before adding. * @@ -76,8 +75,8 @@ public void addRelationship(@Nonnull RELAT * @param relationship type. Must be a type defined in com.linkedin.metadata.relationship. */ public void addRelationship(@Nonnull RELATIONSHIP relationship, - @Nonnull RemovalOption removalOption) throws Exception { - addRelationships(Collections.singletonList(relationship), removalOption); + @Nonnull RemovalOption removalOption, boolean isTestMode) throws Exception { + addRelationships(Collections.singletonList(relationship), removalOption, isTestMode); } /** @@ -86,9 +85,9 @@ public void addRelationship(@Nonnull RELAT * @param relationships the list of relationships to be persisted * @param relationship type. Must be a type defined in com.linkedin.metadata.relationship. */ - public void addRelationships(@Nonnull List relationships) - throws Exception { - addRelationships(relationships, RemovalOption.REMOVE_NONE); + public void addRelationships(@Nonnull List relationships, + boolean isTestMode) throws Exception { + addRelationships(relationships, RemovalOption.REMOVE_NONE, isTestMode); } /** @@ -99,7 +98,7 @@ public void addRelationships(@Nonnull List * @param relationship type. Must be a type defined in com.linkedin.metadata.relationship. */ public abstract void addRelationships(@Nonnull List relationships, - @Nonnull RemovalOption removalOption) throws Exception; + @Nonnull RemovalOption removalOption, boolean isTestMode) throws Exception; /** * Deletes an relationship in the graph. @@ -120,4 +119,4 @@ public void removeRelationship(@Nonnull RE */ public abstract void removeRelationships( @Nonnull List relationships) throws Exception; -} +} \ No newline at end of file diff --git a/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java b/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java index d537929a0..1b0dd99e6 100644 --- a/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java +++ b/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java @@ -79,7 +79,7 @@ public DummyLocalDAO(Class aspectClass, BiFunction long saveLatest(FooUrn urn, Class aspectClass, ASPECT oldEntry, AuditStamp oldAuditStamp, ASPECT newEntry, AuditStamp newAuditStamp, boolean isSoftDeleted, - @Nullable IngestionTrackingContext trackingContext) { + @Nullable IngestionTrackingContext trackingContext, boolean isTestMode) { return 0; } @@ -101,7 +101,8 @@ protected T runInTransactionWithRetry(Supplier block, int maxTransactionR } @Override - protected AspectEntry getLatest(FooUrn urn, Class aspectClass) { + protected AspectEntry getLatest(FooUrn urn, Class aspectClass, + boolean isTestMode) { return _getLatestFunction.apply(urn, aspectClass); } @@ -112,14 +113,15 @@ protected long getNextVersion(FooUrn urn, Class< @Override protected void insert(FooUrn urn, RecordTemplate value, Class aspectClass, - AuditStamp auditStamp, long version, @Nullable IngestionTrackingContext trackingContext) { + AuditStamp auditStamp, long version, @Nullable IngestionTrackingContext trackingContext, boolean isTestMode) { } @Override protected void updateWithOptimisticLocking(@Nonnull FooUrn urn, @Nullable RecordTemplate value, @Nonnull Class aspectClass, @Nonnull AuditStamp newAuditStamp, - long version, @Nonnull Timestamp oldTimestamp, @Nullable IngestionTrackingContext trackingContext) { + long version, @Nonnull Timestamp oldTimestamp, @Nullable IngestionTrackingContext trackingContext, + boolean isTestMode) { } @@ -356,7 +358,7 @@ public void testMAEv5WithOverride() throws URISyntaxException { _dummyLocalDAO._transactionRunner); // pretend there is already foo in the database - when(dummyLocalDAO.getLatest(urn, AspectFoo.class)) + when(dummyLocalDAO.getLatest(urn, AspectFoo.class, false)) .thenReturn(new BaseLocalDAO.AspectEntry<>(foo, null, false)); // try to add foo again but with the OVERRIDE write mode diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java index ea6b1ab2b..6137c5dbd 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java @@ -94,8 +94,8 @@ public void ensureSchemaUpToDate() { @Override @Transactional public int add(@Nonnull URN urn, @Nullable ASPECT newValue, @Nonnull Class aspectClass, - @Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext ingestionTrackingContext) { - return addWithOptimisticLocking(urn, newValue, aspectClass, auditStamp, null, ingestionTrackingContext); + @Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext ingestionTrackingContext, boolean isTestMode) { + return addWithOptimisticLocking(urn, newValue, aspectClass, auditStamp, null, ingestionTrackingContext, isTestMode); } @Override @@ -105,7 +105,8 @@ public int addWithOptimisticLocking( @Nonnull Class aspectClass, @Nonnull AuditStamp auditStamp, @Nullable Timestamp oldTimestamp, - @Nullable IngestionTrackingContext ingestionTrackingContext) { + @Nullable IngestionTrackingContext ingestionTrackingContext, + boolean isTestMode) { final long timestamp = auditStamp.hasTime() ? auditStamp.getTime() : System.currentTimeMillis(); final String actor = auditStamp.hasActor() ? auditStamp.getActor().toString() : DEFAULT_ACTOR; @@ -115,10 +116,10 @@ public int addWithOptimisticLocking( final SqlUpdate sqlUpdate; if (oldTimestamp != null) { sqlUpdate = _server.createSqlUpdate( - SQLStatementUtils.createAspectUpdateWithOptimisticLockSql(urn, aspectClass, urnExtraction)); + SQLStatementUtils.createAspectUpdateWithOptimisticLockSql(urn, aspectClass, urnExtraction, isTestMode)); sqlUpdate.setParameter("oldTimestamp", oldTimestamp.toString()); } else { - sqlUpdate = _server.createSqlUpdate(SQLStatementUtils.createAspectUpsertSql(urn, aspectClass, urnExtraction)); + sqlUpdate = _server.createSqlUpdate(SQLStatementUtils.createAspectUpsertSql(urn, aspectClass, urnExtraction, isTestMode)); } sqlUpdate.setParameter("urn", urn.toString()) .setParameter("lastmodifiedon", new Timestamp(timestamp).toString()) @@ -161,18 +162,20 @@ public int addWithOptimisticLocking( * @param keysCount number of keys to query * @param position position of the key to start from * @param includeSoftDeleted whether to include soft deleted aspect in the query + * @param isTestMode whether the operation is in test mode or not */ @Override public List batchGetUnion( @Nonnull List> aspectKeys, int keysCount, int position, - boolean includeSoftDeleted) { + boolean includeSoftDeleted, boolean isTestMode) { final int end = Math.min(aspectKeys.size(), position + keysCount); final Map, Set> keysToQueryMap = new HashMap<>(); for (int index = position; index < end; index++) { final Urn entityUrn = aspectKeys.get(index).getUrn(); final Class aspectClass = (Class) aspectKeys.get(index).getAspectClass(); - if (checkColumnExists(getTableName(entityUrn), getAspectColumnName(aspectClass))) { + if (checkColumnExists(isTestMode ? getTestTableName(entityUrn) : getTableName(entityUrn), + getAspectColumnName(aspectClass))) { keysToQueryMap.computeIfAbsent(aspectClass, unused -> new HashSet<>()).add(entityUrn); } } @@ -181,8 +184,8 @@ public List batchGetUnion( Map> selectStatements = keysToQueryMap.entrySet() .stream() .collect(Collectors.toMap( - entry -> SQLStatementUtils.createAspectReadSql(entry.getKey(), entry.getValue(), includeSoftDeleted), - entry -> entry.getKey())); + entry -> SQLStatementUtils.createAspectReadSql(entry.getKey(), entry.getValue(), includeSoftDeleted, + isTestMode), entry -> entry.getKey())); // consolidate/join the results final Map> sqlRows = new LinkedHashMap<>(); diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java index a16d91648..c26974f3f 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java @@ -595,7 +595,8 @@ protected T runInTransactionWithRetry(@Nonnull Supplier block, int maxTra @Override protected long saveLatest(@Nonnull URN urn, @Nonnull Class aspectClass, @Nullable ASPECT oldValue, @Nullable AuditStamp oldAuditStamp, @Nullable ASPECT newValue, - @Nonnull AuditStamp newAuditStamp, boolean isSoftDeleted, @Nullable IngestionTrackingContext trackingContext) { + @Nonnull AuditStamp newAuditStamp, boolean isSoftDeleted, @Nullable IngestionTrackingContext trackingContext, + boolean isTestMode) { // First, check that if the aspect is going to be soft-deleted that it does not have any relationships derived from it. // We currently don't support soft-deleting aspects from which local relationships are derived from. if (newValue == null) { @@ -634,11 +635,11 @@ protected long saveLatest(@Nonnull URN urn, @Non } // Move latest version to historical version by insert a new record only if we are not overwriting the latest version. if (!_overwriteLatestVersionEnabled) { - insert(urn, oldValue, aspectClass, oldAuditStamp, largestVersion, trackingContext); + insert(urn, oldValue, aspectClass, oldAuditStamp, largestVersion, trackingContext, isTestMode); } // update latest version updateWithOptimisticLocking(urn, newValue, aspectClass, newAuditStamp, LATEST_VERSION, - new Timestamp(oldAuditStamp.getTime()), trackingContext); + new Timestamp(oldAuditStamp.getTime()), trackingContext, isTestMode); } else { // When for fresh ingestion or with changeLog disabled @@ -649,11 +650,11 @@ protected long saveLatest(@Nonnull URN urn, @Non } } - insert(urn, newValue, aspectClass, newAuditStamp, LATEST_VERSION, trackingContext); + insert(urn, newValue, aspectClass, newAuditStamp, LATEST_VERSION, trackingContext, isTestMode); } // Add any local relationships that are derived from the aspect. - addRelationshipsIfAny(urn, newValue, aspectClass); + addRelationshipsIfAny(urn, newValue, aspectClass, isTestMode); return largestVersion; } @@ -671,7 +672,7 @@ public void updateEntityTables(@Nonnull URN urn, return null; // unused } AuditStamp auditStamp = makeAuditStamp(result); - _localAccess.add(urn, toRecordTemplate(aspectClass, result).orElse(null), aspectClass, auditStamp, null); + _localAccess.add(urn, toRecordTemplate(aspectClass, result).orElse(null), aspectClass, auditStamp, null, false); return null; // unused }, 1); } @@ -686,7 +687,7 @@ public List backfillLo } Optional aspect = toRecordTemplate(aspectClass, results.get(0)); if (aspect.isPresent()) { - return addRelationshipsIfAny(urn, aspect.get(), aspectClass); + return addRelationshipsIfAny(urn, aspect.get(), aspectClass, false); } return Collections.emptyList(); }, 1); @@ -700,7 +701,7 @@ public List backfillLo * @return metadata aspect ebean model {@link EbeanMetadataAspect} */ private @Nullable EbeanMetadataAspect queryLatest(@Nonnull URN urn, - @Nonnull Class aspectClass) { + @Nonnull Class aspectClass, boolean isTestMode) { EbeanMetadataAspect result; if (_schemaConfig == SchemaConfig.OLD_SCHEMA_ONLY) { @@ -720,8 +721,9 @@ public List backfillLo } } else { // for new schema or dual-schema, get latest data from new schema. (Resolving the read de-coupling issue) - final List results = _localAccess.batchGetUnion( - Collections.singletonList(new AspectKey<>(aspectClass, urn, LATEST_VERSION)), 1, 0, true); + final List results = + _localAccess.batchGetUnion(Collections.singletonList(new AspectKey<>(aspectClass, urn, LATEST_VERSION)), 1, 0, + true, isTestMode); result = results.isEmpty() ? null : results.get(0); } return result; @@ -730,8 +732,8 @@ public List backfillLo @Override @Nonnull protected AspectEntry getLatest(@Nonnull URN urn, - @Nonnull Class aspectClass) { - EbeanMetadataAspect latest = queryLatest(urn, aspectClass); + @Nonnull Class aspectClass, boolean isTestMode) { + EbeanMetadataAspect latest = queryLatest(urn, aspectClass, isTestMode); if (latest == null) { return new AspectEntry<>(null, null); } @@ -809,7 +811,8 @@ private SqlUpdate assembleOldSchemaSqlUpdate(@Nonnull EbeanMetadataAspect aspect @Override protected void updateWithOptimisticLocking(@Nonnull URN urn, @Nullable RecordTemplate value, @Nonnull Class aspectClass, @Nonnull AuditStamp newAuditStamp, - long version, @Nonnull Timestamp oldTimestamp, @Nullable IngestionTrackingContext trackingContext) { + long version, @Nonnull Timestamp oldTimestamp, @Nullable IngestionTrackingContext trackingContext, + boolean isTestMode) { final EbeanMetadataAspect aspect = buildMetadataAspectBean(urn, value, aspectClass, newAuditStamp, version); @@ -832,7 +835,7 @@ protected void updateWithOptimisticLocking(@Nonn // Note: when cold-archive is enabled, this method: updateWithOptimisticLocking will not be called. _server.execute(oldSchemaSqlUpdate); return _localAccess.addWithOptimisticLocking(urn, (ASPECT) value, aspectClass, newAuditStamp, oldTimestamp, - trackingContext); + trackingContext, isTestMode); }, 1); } else { // In OLD_SCHEMA mode since aspect table is the SOT and the getLatest (oldTimestamp) is from the aspect table @@ -849,14 +852,15 @@ protected void updateWithOptimisticLocking(@Nonn @Override protected void insert(@Nonnull URN urn, @Nullable RecordTemplate value, - @Nonnull Class aspectClass, @Nonnull AuditStamp auditStamp, long version, @Nullable IngestionTrackingContext trackingContext) { + @Nonnull Class aspectClass, @Nonnull AuditStamp auditStamp, long version, + @Nullable IngestionTrackingContext trackingContext, boolean isTestMode) { final EbeanMetadataAspect aspect = buildMetadataAspectBean(urn, value, aspectClass, auditStamp, version); if (_schemaConfig != SchemaConfig.OLD_SCHEMA_ONLY && version == LATEST_VERSION) { // insert() could be called when updating log table (moving current versions into new history version) // the metadata entity tables shouldn't been updated. - _localAccess.add(urn, (ASPECT) value, aspectClass, auditStamp, trackingContext); + _localAccess.add(urn, (ASPECT) value, aspectClass, auditStamp, trackingContext, isTestMode); } if (_changeLogEnabled) { @@ -871,10 +875,11 @@ protected void insert(@Nonnull URN urn, @Nullabl * @param urn Urn of the metadata update * @param aspect Aspect of the metadata update * @param aspectClass Aspect class of the metadata update + * @param isTestMode Whether the test mode is enabled or not * @return List of LocalRelationshipUpdates that were executed */ public List addRelationshipsIfAny(@Nonnull URN urn, @Nullable ASPECT aspect, - @Nonnull Class aspectClass) { + @Nonnull Class aspectClass, boolean isTestMode) { if (_relationshipSource == RelationshipSource.ASPECT_METADATA) { // TODO: not yet implemented throw new UnsupportedOperationException("This method has not been implemented yet to support the " @@ -883,7 +888,7 @@ public List addRelatio if (_localRelationshipBuilderRegistry != null && _localRelationshipBuilderRegistry.isRegistered(aspectClass)) { List localRelationshipUpdates = _localRelationshipBuilderRegistry.getLocalRelationshipBuilder(aspect).buildRelationships(urn, aspect); - _localRelationshipWriterDAO.processLocalRelationshipUpdates(urn, localRelationshipUpdates); + _localRelationshipWriterDAO.processLocalRelationshipUpdates(urn, localRelationshipUpdates, isTestMode); return localRelationshipUpdates; } } else { @@ -1131,13 +1136,14 @@ List batchGetHelper(@Nonnull List resultsOldSchema = batchGetUnion(keys, keysCount, position); - final List resultsNewSchema = _localAccess.batchGetUnion(keys, keysCount, position, false); + final List resultsNewSchema = + _localAccess.batchGetUnion(keys, keysCount, position, false, false); EBeanDAOUtils.compareResults(resultsOldSchema, resultsNewSchema, "batchGet"); return resultsOldSchema; } diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalRelationshipWriterDAO.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalRelationshipWriterDAO.java index 7aae38b36..e98cb8c40 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalRelationshipWriterDAO.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalRelationshipWriterDAO.java @@ -47,14 +47,14 @@ public EbeanLocalRelationshipWriterDAO(EbeanServer server) { * @param relationshipUpdates Updates to local relationship tables. */ @Transactional - public void processLocalRelationshipUpdates(@Nonnull Urn urn, - @Nonnull List relationshipUpdates) { + public void processLocalRelationshipUpdates(@Nonnull Urn urn, + @Nonnull List relationshipUpdates, boolean isTestMode) { for (LocalRelationshipUpdates relationshipUpdate : relationshipUpdates) { if (relationshipUpdate.getRelationships().isEmpty()) { clearRelationshipsByEntity(urn, relationshipUpdate.getRelationshipClass(), - relationshipUpdate.getRemovalOption()); + relationshipUpdate.getRemovalOption(), isTestMode); } else { - addRelationships(relationshipUpdate.getRelationships(), relationshipUpdate.getRemovalOption()); + addRelationships(relationshipUpdate.getRelationships(), relationshipUpdate.getRemovalOption(), isTestMode); } } } @@ -65,7 +65,7 @@ public void processLocalRelationshipUpdates(@Non * @param relationshipClass relationship that needs to be cleared */ public void clearRelationshipsByEntity(@Nonnull Urn urn, - @Nonnull Class relationshipClass, @Nonnull RemovalOption removalOption) { + @Nonnull Class relationshipClass, @Nonnull RemovalOption removalOption, boolean isTestMode) { if (removalOption == RemovalOption.REMOVE_NONE || removalOption == RemovalOption.REMOVE_ALL_EDGES_FROM_SOURCE_TO_DESTINATION) { // this method is to handle the case of adding empty relationship list to clear relationships of an entity urn @@ -73,9 +73,9 @@ public void clearRelationshipsByEntity(@Nonnull Urn urn, return; } RelationshipValidator.validateRelationshipSchema(relationshipClass); - SqlUpdate deletionSQL = _server.createSqlUpdate( - SQLStatementUtils.deleteLocaRelationshipSQL(SQLSchemaUtils.getRelationshipTableName(relationshipClass), - removalOption)); + SqlUpdate deletionSQL = _server.createSqlUpdate(SQLStatementUtils.deleteLocaRelationshipSQL( + isTestMode ? SQLSchemaUtils.getTestRelationshipTableName(relationshipClass) + : SQLSchemaUtils.getRelationshipTableName(relationshipClass), removalOption)); if (removalOption == RemovalOption.REMOVE_ALL_EDGES_FROM_SOURCE) { deletionSQL.setParameter(CommonColumnName.SOURCE, urn.toString()); } else if (removalOption == RemovalOption.REMOVE_ALL_EDGES_TO_DESTINATION) { @@ -86,7 +86,7 @@ public void clearRelationshipsByEntity(@Nonnull Urn urn, @Override public void addRelationships(@Nonnull List relationships, - @Nonnull RemovalOption removalOption) { + @Nonnull RemovalOption removalOption, boolean isTestMode) { // split relationships by relationship type Map> relationshipGroupMap = relationships.stream() .collect(Collectors.groupingBy(relationship -> relationship.getClass().getCanonicalName())); @@ -96,7 +96,7 @@ public void addRelationships(@Nonnull List -> GraphUtils.checkSameUrn(relationshipGroup, removalOption, CommonColumnName.SOURCE, CommonColumnName.DESTINATION)); relationshipGroupMap.values().forEach(relationshipGroup -> { - addRelationshipGroup(relationshipGroup, removalOption); + addRelationshipGroup(relationshipGroup, removalOption, isTestMode); }); } @@ -116,7 +116,7 @@ public void removeEntities(@Nonnull List urns) { } private void addRelationshipGroup(@Nonnull final List relationshipGroup, - @Nonnull RemovalOption removalOption) { + @Nonnull RemovalOption removalOption, boolean isTestMode) { if (relationshipGroup.size() == 0) { return; } @@ -125,7 +125,8 @@ private void addRelationshipGroup(@Nonnull RelationshipValidator.validateRelationshipSchema(firstRelationship.getClass()); // Process remove option to delete some local relationships if needed before adding new relationships. - processRemovalOption(SQLSchemaUtils.getRelationshipTableName(firstRelationship), firstRelationship, removalOption); + processRemovalOption(isTestMode ? SQLSchemaUtils.getTestRelationshipTableName(firstRelationship) + : SQLSchemaUtils.getRelationshipTableName(firstRelationship), firstRelationship, removalOption); long now = Instant.now().toEpochMilli(); @@ -133,7 +134,9 @@ private void addRelationshipGroup(@Nonnull Urn source = getSourceUrnFromRelationship(relationship); Urn destination = getDestinationUrnFromRelationship(relationship); - _server.createSqlUpdate(SQLStatementUtils.insertLocalRelationshipSQL(SQLSchemaUtils.getRelationshipTableName(relationship))) + _server.createSqlUpdate(SQLStatementUtils.insertLocalRelationshipSQL( + isTestMode ? SQLSchemaUtils.getTestRelationshipTableName(relationship) + : SQLSchemaUtils.getRelationshipTableName(relationship))) .setParameter(CommonColumnName.METADATA, RecordUtils.toJsonString(relationship)) .setParameter(CommonColumnName.SOURCE_TYPE, source.getEntityType()) .setParameter(CommonColumnName.DESTINATION_TYPE, destination.getEntityType()) diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java index 801f53366..c9102188d 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java @@ -30,10 +30,11 @@ public interface IEbeanLocalAccess { * @param aspectClass class of the aspect * @param auditStamp audit timestamp * @param ingestionTrackingContext the ingestionTrackingContext of the MCE responsible for this update + * @param isTestMode whether the test mode is enabled or not * @return number of rows inserted or updated */ int add(@Nonnull URN urn, @Nullable ASPECT newValue, @Nonnull Class aspectClass, - @Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext ingestionTrackingContext); + @Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext ingestionTrackingContext, boolean isTestMode); /** * Update aspect on entity table with optimistic locking. (compare-and-update on oldTimestamp). @@ -45,10 +46,12 @@ int add(@Nonnull URN urn, @Nullable ASPECT newVa * @param auditStamp audit timestamp * @param oldTimestamp old time stamp for optimistic lock checking * @param ingestionTrackingContext the ingestionTrackingContext of the MCE responsible for calling this update + * @param isTestMode whether the test mode is enabled or not * @return number of rows inserted or updated */ - int addWithOptimisticLocking(@Nonnull URN urn, @Nullable ASPECT newValue, @Nonnull Class aspectClass, - @Nonnull AuditStamp auditStamp, @Nullable Timestamp oldTimestamp, @Nullable IngestionTrackingContext ingestionTrackingContext); + int addWithOptimisticLocking(@Nonnull URN urn, @Nullable ASPECT newValue, + @Nonnull Class aspectClass, @Nonnull AuditStamp auditStamp, @Nullable Timestamp oldTimestamp, + @Nullable IngestionTrackingContext ingestionTrackingContext, boolean isTestMode); /** * Get read aspects from entity table. This a new schema implementation for batchGetUnion() in {@link EbeanLocalDAO} @@ -56,12 +59,13 @@ int addWithOptimisticLocking(@Nonnull URN urn, @ * @param keysCount pagination key count limit * @param position starting position of pagination * @param includeSoftDeleted include soft deleted aspects, default false + * @param isTestMode whether the operation is in test mode or not * @param metadata aspect value * @return a list of {@link EbeanMetadataAspect} as get response */ @Nonnull List batchGetUnion(@Nonnull List> keys, - int keysCount, int position, boolean includeSoftDeleted); + int keysCount, int position, boolean includeSoftDeleted, boolean isTestMode); /** * Returns list of urns that satisfy the given filter conditions. diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/ImmutableLocalDAO.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/ImmutableLocalDAO.java index 5773e98e0..9dd5c64b9 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/ImmutableLocalDAO.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/ImmutableLocalDAO.java @@ -54,7 +54,7 @@ public ImmutableLocalDAO(@Nonnull Class aspectUnionClass, _server.execute(Ebean.createSqlUpdate(readSQLfromFile(GMA_CREATE_ALL_SQL))); urnAspectMap.forEach((key, value) -> { if (value != null) { - super.insert(key, value, value.getClass(), DUMMY_AUDIT_STAMP, LATEST_VERSION, null); + super.insert(key, value, value.getClass(), DUMMY_AUDIT_STAMP, LATEST_VERSION, null, false); } }); } @@ -66,7 +66,7 @@ public ImmutableLocalDAO(@Nonnull Class aspectUnionClass, super(aspectUnionClass, new DummyMetadataEventProducer<>(), createTestingH2ServerConfig(), urnClass); urnAspectMap.forEach((key, value) -> { if (value != null) { - super.insert(key, value, value.getClass(), DUMMY_AUDIT_STAMP, LATEST_VERSION, null); + super.insert(key, value, value.getClass(), DUMMY_AUDIT_STAMP, LATEST_VERSION, null, false); } }); } diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLSchemaUtils.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLSchemaUtils.java index 8db56ad42..bdc2da3be 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLSchemaUtils.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLSchemaUtils.java @@ -18,6 +18,7 @@ public class SQLSchemaUtils { private static final String GMA = "gma"; public static final String ENTITY_TABLE_PREFIX = "metadata_entity_"; public static final String RELATIONSHIP_TABLE_PREFIX = "metadata_relationship_"; + public static final String TEST_TABLE_SUFFIX = "_test"; public static final String ASPECT_PREFIX = "a_"; public static final String INDEX_PREFIX = "i_"; @@ -36,6 +37,16 @@ public static String getTableName(@Nonnull Urn urn) { return getTableName(urn.getEntityType()); } + /** + * Get MySQL test table name from entity urn, e.g. urn:li:dataset to metadata_entity_dataset_test. + * @param urn {@link Urn} of the entity + * @return entity table name + */ + @Nonnull + public static String getTestTableName(@Nonnull Urn urn) { + return getTableName(urn) + TEST_TABLE_SUFFIX; + } + /** * Get MySQL table name from entity type string. * @param entityType entity type as string, such as "dataset", "chart" ..etc @@ -46,6 +57,16 @@ public static String getTableName(@Nonnull String entityType) { return ENTITY_TABLE_PREFIX + entityType.toLowerCase(); } + /** + * Get MySQL test table name from entity type string. + * @param entityType entity type as string, such as "dataset", "chart" ..etc + * @return entity table name + */ + @Nonnull + public static String getTestTableName(@Nonnull String entityType) { + return getTableName(entityType) + TEST_TABLE_SUFFIX; + } + /** * Derive the local relationship table name from RELATIONSHIP record. * @param relationship The RELATIONSHIP record. @@ -66,6 +87,28 @@ public static String getRelationshipTableN return RELATIONSHIP_TABLE_PREFIX + relationship.getSimpleName().toLowerCase(); } + /** + * Derive the test local relationship table name from RELATIONSHIP record. + * @param relationship The RELATIONSHIP record. + * @return Local relationship table name as a string. + */ + @Nonnull + public static String getTestRelationshipTableName( + @Nonnull final RELATIONSHIP relationship) { + return getRelationshipTableName(relationship) + TEST_TABLE_SUFFIX; + } + + /** + * Derive the test local relationship table name from RELATIONSHIP record class. + * @param relationship The RELATIONSHIP record. + * @return Local relationship table name as a string. + */ + @Nonnull + public static String getTestRelationshipTableName( + @Nonnull final Class relationship) { + return getRelationshipTableName(relationship) + TEST_TABLE_SUFFIX; + } + /** * Get column name from aspect class canonical name. */ diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLStatementUtils.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLStatementUtils.java index 761e203f2..74410111f 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLStatementUtils.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLStatementUtils.java @@ -156,14 +156,14 @@ public static String createExistSql(@Nonnull Urn urn) { * @return aspect read sql statement for a single aspect (across multiple tables and urns) */ public static String createAspectReadSql(@Nonnull Class aspectClass, - @Nonnull Set urns, boolean includeSoftDeleted) { + @Nonnull Set urns, boolean includeSoftDeleted, boolean isTestMode) { if (urns.size() == 0) { throw new IllegalArgumentException("Need at least 1 urn to query."); } final String columnName = getAspectColumnName(aspectClass); StringBuilder stringBuilder = new StringBuilder(); List selectStatements = urns.stream().map(urn -> { - final String tableName = getTableName(urn); + final String tableName = isTestMode ? getTestTableName(urn) : getTableName(urn); final String sqlTemplate = includeSoftDeleted ? SQL_READ_ASPECT_WITH_SOFT_DELETED_TEMPLATE : SQL_READ_ASPECT_TEMPLATE; return String.format(sqlTemplate, columnName, tableName, escapeReservedCharInUrn(urn.toString()), columnName); @@ -220,11 +220,12 @@ public static String createListAspectWithPaginat * @param urn entity urn * @param aspect type * @param aspectClass aspect class + * @param isTestMode whether the test mode is enabled or not * @return aspect upsert sql */ public static String createAspectUpsertSql(@Nonnull Urn urn, - @Nonnull Class aspectClass, boolean urnExtraction) { - final String tableName = getTableName(urn); + @Nonnull Class aspectClass, boolean urnExtraction, boolean isTestMode) { + final String tableName = isTestMode ? getTestTableName(urn) : getTableName(urn); final String columnName = getAspectColumnName(aspectClass); return String.format(urnExtraction ? SQL_UPSERT_ASPECT_WITH_URN_TEMPLATE : SQL_UPSERT_ASPECT_TEMPLATE, tableName, columnName, columnName); } @@ -235,11 +236,12 @@ public static String createAspectUpsertSql(@Nonn * @param urn entity urn * @param aspect type * @param aspectClass aspect class + * @param isTestMode whether the test mode is enabled or not * @return aspect upsert sql */ public static String createAspectUpdateWithOptimisticLockSql(@Nonnull Urn urn, - @Nonnull Class aspectClass, boolean urnExtraction) { - final String tableName = getTableName(urn); + @Nonnull Class aspectClass, boolean urnExtraction, boolean isTestMode) { + final String tableName = isTestMode ? getTestTableName(urn) : getTableName(urn); final String columnName = getAspectColumnName(aspectClass); return String.format(urnExtraction ? SQL_UPDATE_ASPECT_WITH_URN_TEMPLATE : SQL_UPDATE_ASPECT_TEMPLATE, tableName, columnName, columnName, columnName); diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java index 70ae5f058..532ac7bf9 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java @@ -90,7 +90,7 @@ public void setupTest() throws IOException { AspectFoo aspectFoo = new AspectFoo(); aspectFoo.setValue(String.valueOf(i)); AuditStamp auditStamp = makeAuditStamp("foo", System.currentTimeMillis()); - _ebeanLocalAccessFoo.add(fooUrn, aspectFoo, AspectFoo.class, auditStamp, null); + _ebeanLocalAccessFoo.add(fooUrn, aspectFoo, AspectFoo.class, auditStamp, null, false); } } @@ -104,7 +104,7 @@ public void testGetAspect() { // When get AspectFoo from urn:li:foo:0 List ebeanMetadataAspectList = - _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey), 1000, 0, false); + _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey), 1000, 0, false, false); assertEquals(1, ebeanMetadataAspectList.size()); EbeanMetadataAspect ebeanMetadataAspect = ebeanMetadataAspectList.get(0); @@ -121,7 +121,7 @@ public void testGetAspect() { // When get AspectFoo from urn:li:foo:9999 (does not exist) FooUrn nonExistFooUrn = makeFooUrn(9999); AspectKey nonExistKey = new AspectKey(AspectFoo.class, nonExistFooUrn, 0L); - ebeanMetadataAspectList = _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(nonExistKey), 1000, 0, false); + ebeanMetadataAspectList = _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(nonExistKey), 1000, 0, false, false); // Expect: get AspectFoo from urn:li:foo:9999 returns empty result assertTrue(ebeanMetadataAspectList.isEmpty()); @@ -291,7 +291,7 @@ public void testCountAggregate() { AspectFoo aspectFoo = new AspectFoo(); aspectFoo.setValue(String.valueOf(25)); AuditStamp auditStamp = makeAuditStamp("foo", System.currentTimeMillis()); - _ebeanLocalAccessFoo.add(fooUrn, aspectFoo, AspectFoo.class, auditStamp, null); + _ebeanLocalAccessFoo.add(fooUrn, aspectFoo, AspectFoo.class, auditStamp, null, false); countMap = _ebeanLocalAccessFoo.countAggregate(indexFilter, indexGroupByCriterion); // Expect: there are 2 counts for value 25 @@ -305,28 +305,28 @@ public void testEscapeSpecialCharInUrn() { // Single quote is a special char in SQL. BurgerUrn johnsBurgerUrn1 = makeBurgerUrn("urn:li:burger:John's burger"); - _ebeanLocalAccessBurger.add(johnsBurgerUrn1, aspectFoo, AspectFoo.class, auditStamp, null); + _ebeanLocalAccessBurger.add(johnsBurgerUrn1, aspectFoo, AspectFoo.class, auditStamp, null, false); AspectKey aspectKey1 = new AspectKey(AspectFoo.class, johnsBurgerUrn1, 0L); - List ebeanMetadataAspectList = _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey1), 1, 0, false); + List ebeanMetadataAspectList = _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey1), 1, 0, false, false); assertEquals(1, ebeanMetadataAspectList.size()); assertEquals(ebeanMetadataAspectList.get(0).getKey().getUrn(), johnsBurgerUrn1.toString()); // Double quote is a special char in SQL. BurgerUrn johnsBurgerUrn2 = makeBurgerUrn("urn:li:burger:John\"s burger"); - _ebeanLocalAccessBurger.add(johnsBurgerUrn2, aspectFoo, AspectFoo.class, auditStamp, null); + _ebeanLocalAccessBurger.add(johnsBurgerUrn2, aspectFoo, AspectFoo.class, auditStamp, null, false); AspectKey aspectKey2 = new AspectKey(AspectFoo.class, johnsBurgerUrn2, 0L); - ebeanMetadataAspectList = _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey2), 1, 0, false); + ebeanMetadataAspectList = _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey2), 1, 0, false, false); assertEquals(1, ebeanMetadataAspectList.size()); assertEquals(ebeanMetadataAspectList.get(0).getKey().getUrn(), johnsBurgerUrn2.toString()); // Backslash is a special char in SQL. BurgerUrn johnsBurgerUrn3 = makeBurgerUrn("urn:li:burger:John\\s burger"); - _ebeanLocalAccessBurger.add(johnsBurgerUrn3, aspectFoo, AspectFoo.class, auditStamp, null); + _ebeanLocalAccessBurger.add(johnsBurgerUrn3, aspectFoo, AspectFoo.class, auditStamp, null, false); AspectKey aspectKey3 = new AspectKey(AspectFoo.class, johnsBurgerUrn3, 0L); - ebeanMetadataAspectList = _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey3), 1, 0, false); + ebeanMetadataAspectList = _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey3), 1, 0, false, false); assertEquals(1, ebeanMetadataAspectList.size()); assertEquals(ebeanMetadataAspectList.get(0).getKey().getUrn(), johnsBurgerUrn3.toString()); } @@ -335,7 +335,7 @@ public void testEscapeSpecialCharInUrn() { public void testUrnExtraction() { FooUrn urn1 = makeFooUrn(1); AspectFoo foo1 = new AspectFoo().setValue("foo"); - _ebeanLocalAccessFoo.add(urn1, foo1, AspectFoo.class, makeAuditStamp("actor", _now), null); + _ebeanLocalAccessFoo.add(urn1, foo1, AspectFoo.class, makeAuditStamp("actor", _now), null, false); List results; // get content of virtual column @@ -388,14 +388,14 @@ public void testFindLatestMetadataAspect() throws URISyntaxException { @Test public void testGetAspectNoSoftDeleteCheck() { FooUrn fooUrn = makeFooUrn(0); - _ebeanLocalAccessFoo.add(fooUrn, null, AspectFoo.class, makeAuditStamp("foo", System.currentTimeMillis()), null); + _ebeanLocalAccessFoo.add(fooUrn, null, AspectFoo.class, makeAuditStamp("foo", System.currentTimeMillis()), null, false); AspectKey aspectKey = new AspectKey(AspectFoo.class, fooUrn, 0L); List ebeanMetadataAspectList = - _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey), 1000, 0, false); + _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey), 1000, 0, false, false); assertEquals(0, ebeanMetadataAspectList.size()); ebeanMetadataAspectList = - _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey), 1000, 0, true); + _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey), 1000, 0, true, false); assertFalse(ebeanMetadataAspectList.isEmpty()); assertEquals(fooUrn.toString(), ebeanMetadataAspectList.get(0).getKey().getUrn()); } diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java index 1d3295648..f21a4b695 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java @@ -336,6 +336,49 @@ public void testAddOne() { verifyNoMoreInteractions(_mockProducer); } + @Test + public void testAddOneInTestMode() { + if (_schemaConfig == SchemaConfig.NEW_SCHEMA_ONLY && !_enableChangeLog) { + Clock mockClock = mock(Clock.class); + when(mockClock.millis()).thenReturn(_now); + EbeanLocalDAO dao = createDao(FooUrn.class); + dao.setClock(mockClock); + FooUrn urn = makeFooUrn(1); + String aspectName = ModelUtils.getAspectName(AspectFoo.class); + AspectFoo expected = new AspectFoo().setValue("foo"); + Urn actor = Urns.createFromTypeSpecificString("test", "actor"); + Urn impersonator = Urns.createFromTypeSpecificString("test", "impersonator"); + + dao.add(urn, expected, makeAuditStamp(actor, impersonator, _now), null, new IngestionParams().setTestMode(true)); + + EbeanMetadataAspect aspectTest = getTestMetadata(urn, aspectName, 0); + + assertNotNull(aspectTest); + assertEquals(aspectTest.getKey().getUrn(), urn.toString()); + assertEquals(aspectTest.getKey().getAspect(), aspectName); + assertEquals(aspectTest.getKey().getVersion(), 0); + assertEquals(aspectTest.getCreatedOn(), new Timestamp(_now)); + assertEquals(aspectTest.getCreatedBy(), "urn:li:test:actor"); + + AspectFoo actualTest = RecordUtils.toRecordTemplate(AspectFoo.class, aspectTest.getMetadata()); + assertEquals(actualTest, expected); + + EbeanMetadataAspect aspect = getMetadata(urn, aspectName, 0); + + assertNotNull(aspect); + assertEquals(aspect.getKey().getUrn(), urn.toString()); + assertEquals(aspect.getKey().getAspect(), aspectName); + assertEquals(aspect.getKey().getVersion(), 0); + assertEquals(aspect.getCreatedOn(), new Timestamp(_now)); + assertEquals(aspect.getCreatedBy(), "urn:li:test:actor"); + + AspectFoo actual = RecordUtils.toRecordTemplate(AspectFoo.class, aspect.getMetadata()); + assertEquals(actual, expected); + verify(_mockProducer, times(1)).produceMetadataAuditEvent(urn, null, expected); + verifyNoMoreInteractions(_mockProducer); + } + } + @Test public void testAddTwo() { EbeanLocalDAO dao = createDao(FooUrn.class); @@ -2802,7 +2845,7 @@ public void testOptimisticLockException() { // call save method with timestamp (_now - 100) but timestamp is already changed to _now // expect OptimisticLockException if optimistic locking is enabled dao.updateWithOptimisticLocking(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now + 100), - 0, new Timestamp(_now - 100), null); + 0, new Timestamp(_now - 100), null, false); } else if (_enableChangeLog) { // either NEW or DUAL schema, whereas entity table is the SOT and aspect table is the log table @@ -2812,7 +2855,7 @@ public void testOptimisticLockException() { // 2. (foo:1, lastmodified(_now + 1), version=0) in aspect table (discrepancy) // 3. (foo:1, lastmodified(_now)) in entity table - dao.insert(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now), 0, null); + dao.insert(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now), 0, null, false); // make inconsistent timestamp on aspect table aspect.setCreatedOn(new Timestamp(_now + 1)); _server.update(aspect); @@ -2821,25 +2864,25 @@ public void testOptimisticLockException() { try { fooAspect.setValue("bar"); dao.updateWithOptimisticLocking(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now + 200), 0, - new Timestamp(_now), null); + new Timestamp(_now), null, false); } catch (OptimisticLockException e) { fail("Expect the update pass since the old timestamp matches the lastmodified in entity table"); } // Expect: update succeed and the values are updated - assertEquals(dao.getLatest(fooUrn, AspectFoo.class).getAspect().getValue(), "bar"); - assertEquals(dao.getLatest(fooUrn, AspectFoo.class).getExtraInfo().getAudit().getTime(), Long.valueOf(_now + 200L)); + assertEquals(dao.getLatest(fooUrn, AspectFoo.class, false).getAspect().getValue(), "bar"); + assertEquals(dao.getLatest(fooUrn, AspectFoo.class, false).getExtraInfo().getAudit().getTime(), Long.valueOf(_now + 200L)); // When: update with old timestamp does not match the lastmodified in the entity table // Expect: OptimisticLockException. dao.updateWithOptimisticLocking(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now + 400), 0, - new Timestamp(_now + 100), null); + new Timestamp(_now + 100), null, false); } else { // Given: changeLog is disabled assertFalse(_enableChangeLog); // When: updateWithOptimisticLocking is called try { dao.updateWithOptimisticLocking(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now + 400), 0, - new Timestamp(_now + 100), null); + new Timestamp(_now + 100), null, false); fail("UnsupportedOperationException should be thrown"); } catch (UnsupportedOperationException uoe) { // Expect: UnsupportedOperationException is thrown @@ -2981,7 +3024,7 @@ public void testAddRelationships() throws URISyntaxException { fooDao.setLocalRelationshipBuilderRegistry(new SampleLocalRelationshipRegistryImpl()); // Add only the local relationships - fooDao.addRelationshipsIfAny(fooUrn, aspectFooBar, AspectFooBar.class); + fooDao.addRelationshipsIfAny(fooUrn, aspectFooBar, AspectFooBar.class, false); // Verify that the local relationships were added relationships = ebeanLocalRelationshipQueryDAO.findRelationships( @@ -3351,6 +3394,31 @@ private EbeanMetadataAspect getMetadata(Urn urn, String aspectName, long version new EbeanMetadataAspect.PrimaryKey(urn.toString(), aspectName, version)); } + private EbeanMetadataAspect getTestMetadata(Urn urn, String aspectName, long version) { + if (_schemaConfig == SchemaConfig.NEW_SCHEMA_ONLY && version == 0) { + String aspectColumn = getAspectColumnName(aspectName); + String template = "select urn, lastmodifiedon, lastmodifiedby, createdfor, %s from metadata_entity_%s_test"; + String query = String.format(template, aspectColumn, urn.getEntityType()); + SqlRow result = _server.createSqlQuery(query).findOne(); + if (result != null) { + EbeanMetadataAspect ema = new EbeanMetadataAspect(); + String metadata = extractAspectJsonString(result.getString(aspectColumn)); + if (metadata == null) { + metadata = DELETED_VALUE; + } + ema.setMetadata(metadata); + ema.setKey(new PrimaryKey(urn.toString(), aspectName, version)); + ema.setCreatedOn(result.getTimestamp("lastmodifiedon")); + ema.setCreatedBy(result.getString("lastmodifiedby")); + ema.setCreatedFor(result.getString("creatdfor")); + return ema; + } + return null; + } + return _server.find(EbeanMetadataAspect.class, + new EbeanMetadataAspect.PrimaryKey(urn.toString(), aspectName, version)); + } + private void assertVersionMetadata(ListResultMetadata listResultMetadata, List versions, List urns, Long time, Urn actor, Urn impersonator) { List extraInfos = listResultMetadata.getExtraInfos(); diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/localrelationship/EbeanLocalRelationshipQueryDAOTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/localrelationship/EbeanLocalRelationshipQueryDAOTest.java index 5c56776f0..e34b6fc73 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/localrelationship/EbeanLocalRelationshipQueryDAOTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/localrelationship/EbeanLocalRelationshipQueryDAOTest.java @@ -119,7 +119,7 @@ public void recreateTables() throws IOException { @Test public void testFindOneEntity() throws URISyntaxException, OperationNotSupportedException { // Ingest data - _fooUrnEBeanLocalAccess.add(new FooUrn(1), new AspectFoo().setValue("foo"), AspectFoo.class, new AuditStamp(), null); + _fooUrnEBeanLocalAccess.add(new FooUrn(1), new AspectFoo().setValue("foo"), AspectFoo.class, new AuditStamp(), null, false); // Prepare filter LocalRelationshipCriterion filterCriterion = EBeanDAOUtils.buildRelationshipFieldCriterion(LocalRelationshipValue.create("foo"), @@ -137,8 +137,8 @@ public void testFindOneEntity() throws URISyntaxException, OperationNotSupported @Test public void testFindOneEntityTwoAspects() throws URISyntaxException, OperationNotSupportedException { // Ingest data - _fooUrnEBeanLocalAccess.add(new FooUrn(1), new AspectFoo().setValue("foo"), AspectFoo.class, new AuditStamp(), null); - _fooUrnEBeanLocalAccess.add(new FooUrn(1), new AspectBar().setValue("bar"), AspectBar.class, new AuditStamp(), null); + _fooUrnEBeanLocalAccess.add(new FooUrn(1), new AspectFoo().setValue("foo"), AspectFoo.class, new AuditStamp(), null, false); + _fooUrnEBeanLocalAccess.add(new FooUrn(1), new AspectBar().setValue("bar"), AspectBar.class, new AuditStamp(), null, false); // Prepare filter LocalRelationshipCriterion filterCriterion = EBeanDAOUtils.buildRelationshipFieldCriterion(LocalRelationshipValue.create("foo"), @@ -177,18 +177,18 @@ public void testFindOneRelationship(EbeanLocalDAO.SchemaConfig schemaConfig) thr // Add Alice, Bob and Jack into entity tables. if (schemaConfig == EbeanLocalDAO.SchemaConfig.NEW_SCHEMA_ONLY) { - _fooUrnEBeanLocalAccess.add(alice, new AspectFoo().setValue("Alice"), AspectFoo.class, new AuditStamp(), null); - _fooUrnEBeanLocalAccess.add(bob, new AspectFoo().setValue("Bob"), AspectFoo.class, new AuditStamp(), null); - _fooUrnEBeanLocalAccess.add(jack, new AspectFoo().setValue("Jack"), AspectFoo.class, new AuditStamp(), null); + _fooUrnEBeanLocalAccess.add(alice, new AspectFoo().setValue("Alice"), AspectFoo.class, new AuditStamp(), null, false); + _fooUrnEBeanLocalAccess.add(bob, new AspectFoo().setValue("Bob"), AspectFoo.class, new AuditStamp(), null, false); + _fooUrnEBeanLocalAccess.add(jack, new AspectFoo().setValue("Jack"), AspectFoo.class, new AuditStamp(), null, false); } // Add Bob reports-to ALice relationship ReportsTo bobReportsToAlice = new ReportsTo().setSource(bob).setDestination(alice); - _localRelationshipWriterDAO.addRelationship(bobReportsToAlice); + _localRelationshipWriterDAO.addRelationship(bobReportsToAlice, false); // Add Jack reports-to ALice relationship ReportsTo jackReportsToAlice = new ReportsTo().setSource(jack).setDestination(alice); - _localRelationshipWriterDAO.addRelationship(jackReportsToAlice); + _localRelationshipWriterDAO.addRelationship(jackReportsToAlice, false); // Find all reports-to relationship for Alice. LocalRelationshipFilter filter; @@ -250,28 +250,28 @@ public void testFindOneRelationshipWithFilter(EbeanLocalDAO.SchemaConfig schemaC // Add Kafka_Topic, HDFS_Dataset and Restli_Service into entity tables. if (schemaConfig == EbeanLocalDAO.SchemaConfig.NEW_SCHEMA_ONLY) { _fooUrnEBeanLocalAccess.add(kafka, new AspectFoo().setValue("Kafka_Topic"), AspectFoo.class, new AuditStamp(), - null); + null, false); _fooUrnEBeanLocalAccess.add(hdfs, new AspectFoo().setValue("HDFS_Dataset"), AspectFoo.class, new AuditStamp(), - null); + null, false); _fooUrnEBeanLocalAccess.add(restli, new AspectFoo().setValue("Restli_Service"), AspectFoo.class, new AuditStamp(), - null); + null, false); // Add Spark and Samza into entity tables. - _barUrnEBeanLocalAccess.add(spark, new AspectFoo().setValue("Spark"), AspectFoo.class, new AuditStamp(), null); - _barUrnEBeanLocalAccess.add(samza, new AspectFoo().setValue("Samza"), AspectFoo.class, new AuditStamp(), null); + _barUrnEBeanLocalAccess.add(spark, new AspectFoo().setValue("Spark"), AspectFoo.class, new AuditStamp(), null, false); + _barUrnEBeanLocalAccess.add(samza, new AspectFoo().setValue("Samza"), AspectFoo.class, new AuditStamp(), null, false); } // Add Spark consume-from hdfs relationship ConsumeFrom sparkConsumeFromHdfs = new ConsumeFrom().setSource(spark).setDestination(hdfs).setEnvironment(EnvorinmentType.OFFLINE); - _localRelationshipWriterDAO.addRelationship(sparkConsumeFromHdfs); + _localRelationshipWriterDAO.addRelationship(sparkConsumeFromHdfs, false); // Add Samza consume-from kafka relationship ConsumeFrom samzaConsumeFromKafka = new ConsumeFrom().setSource(samza).setDestination(kafka).setEnvironment(EnvorinmentType.NEARLINE); - _localRelationshipWriterDAO.addRelationship(samzaConsumeFromKafka); + _localRelationshipWriterDAO.addRelationship(samzaConsumeFromKafka, false); // Add Samza consume-from restli relationship ConsumeFrom samzaConsumeFromRestli = new ConsumeFrom().setSource(samza).setDestination(restli).setEnvironment(EnvorinmentType.ONLINE); - _localRelationshipWriterDAO.addRelationship(samzaConsumeFromRestli); + _localRelationshipWriterDAO.addRelationship(samzaConsumeFromRestli, false); // Find all consume-from relationship for Samza. LocalRelationshipCriterion filterUrnCriterion = EBeanDAOUtils.buildRelationshipFieldCriterion( @@ -325,18 +325,18 @@ public void testFindOneRelationshipWithEntityUrn(EbeanLocalDAO.SchemaConfig sche // Add Alice, Bob and Jack into entity tables. if (schemaConfig == EbeanLocalDAO.SchemaConfig.NEW_SCHEMA_ONLY) { - _fooUrnEBeanLocalAccess.add(alice, new AspectFoo().setValue("Alice"), AspectFoo.class, new AuditStamp(), null); - _fooUrnEBeanLocalAccess.add(bob, new AspectFoo().setValue("Bob"), AspectFoo.class, new AuditStamp(), null); - _fooUrnEBeanLocalAccess.add(jack, new AspectFoo().setValue("Jack"), AspectFoo.class, new AuditStamp(), null); + _fooUrnEBeanLocalAccess.add(alice, new AspectFoo().setValue("Alice"), AspectFoo.class, new AuditStamp(), null, false); + _fooUrnEBeanLocalAccess.add(bob, new AspectFoo().setValue("Bob"), AspectFoo.class, new AuditStamp(), null, false); + _fooUrnEBeanLocalAccess.add(jack, new AspectFoo().setValue("Jack"), AspectFoo.class, new AuditStamp(), null, false); } // Add Bob reports-to ALice relationship ReportsTo bobReportsToAlice = new ReportsTo().setSource(bob).setDestination(alice); - _localRelationshipWriterDAO.addRelationship(bobReportsToAlice); + _localRelationshipWriterDAO.addRelationship(bobReportsToAlice, false); // Add Jack reports-to ALice relationship ReportsTo jackReportsToAlice = new ReportsTo().setSource(jack).setDestination(alice); - _localRelationshipWriterDAO.addRelationship(jackReportsToAlice); + _localRelationshipWriterDAO.addRelationship(jackReportsToAlice, false); // Find all reports-to relationship for Alice. LocalRelationshipFilter destFilter; @@ -399,28 +399,28 @@ public void testFindOneRelationshipWithFilterWithEntityUrn(EbeanLocalDAO.SchemaC // Add Kafka_Topic, HDFS_Dataset and Restli_Service into entity tables. if (schemaConfig == EbeanLocalDAO.SchemaConfig.NEW_SCHEMA_ONLY) { _fooUrnEBeanLocalAccess.add(kafka, new AspectFoo().setValue("Kafka_Topic"), AspectFoo.class, new AuditStamp(), - null); + null, false); _fooUrnEBeanLocalAccess.add(hdfs, new AspectFoo().setValue("HDFS_Dataset"), AspectFoo.class, new AuditStamp(), - null); + null, false); _fooUrnEBeanLocalAccess.add(restli, new AspectFoo().setValue("Restli_Service"), AspectFoo.class, new AuditStamp(), - null); + null, false); // Add Spark and Samza into entity tables. - _barUrnEBeanLocalAccess.add(spark, new AspectFoo().setValue("Spark"), AspectFoo.class, new AuditStamp(), null); - _barUrnEBeanLocalAccess.add(samza, new AspectFoo().setValue("Samza"), AspectFoo.class, new AuditStamp(), null); + _barUrnEBeanLocalAccess.add(spark, new AspectFoo().setValue("Spark"), AspectFoo.class, new AuditStamp(), null, false); + _barUrnEBeanLocalAccess.add(samza, new AspectFoo().setValue("Samza"), AspectFoo.class, new AuditStamp(), null, false); } // Add Spark consume-from hdfs relationship ConsumeFrom sparkConsumeFromHdfs = new ConsumeFrom().setSource(spark).setDestination(hdfs).setEnvironment(EnvorinmentType.OFFLINE); - _localRelationshipWriterDAO.addRelationship(sparkConsumeFromHdfs); + _localRelationshipWriterDAO.addRelationship(sparkConsumeFromHdfs, false); // Add Samza consume-from kafka relationship ConsumeFrom samzaConsumeFromKafka = new ConsumeFrom().setSource(samza).setDestination(kafka).setEnvironment(EnvorinmentType.NEARLINE); - _localRelationshipWriterDAO.addRelationship(samzaConsumeFromKafka); + _localRelationshipWriterDAO.addRelationship(samzaConsumeFromKafka, false); // Add Samza consume-from restli relationship ConsumeFrom samzaConsumeFromRestli = new ConsumeFrom().setSource(samza).setDestination(restli).setEnvironment(EnvorinmentType.ONLINE); - _localRelationshipWriterDAO.addRelationship(samzaConsumeFromRestli); + _localRelationshipWriterDAO.addRelationship(samzaConsumeFromRestli, false); // Find all consume-from relationship for Samza. LocalRelationshipCriterion filterUrnCriterion = EBeanDAOUtils.buildRelationshipFieldCriterion( @@ -467,15 +467,15 @@ public void testFindOneRelationshipForCrewUsage(EbeanLocalDAO.SchemaConfig schem // Add Kafka_Topic, HDFS_Dataset and Restli_Service into entity tables. if (schemaConfig == EbeanLocalDAO.SchemaConfig.NEW_SCHEMA_ONLY) { _fooUrnEBeanLocalAccess.add(kafka, new AspectFoo().setValue("Kafka_Topic"), AspectFoo.class, new AuditStamp(), - null); + null, false); _fooUrnEBeanLocalAccess.add(hdfs, new AspectFoo().setValue("HDFS_Dataset"), AspectFoo.class, new AuditStamp(), - null); + null, false); _fooUrnEBeanLocalAccess.add(restli, new AspectFoo().setValue("Restli_Service"), AspectFoo.class, new AuditStamp(), - null); + null, false); // Add Spark and Samza into entity tables. - _barUrnEBeanLocalAccess.add(spark, new AspectFoo().setValue("Spark"), AspectFoo.class, new AuditStamp(), null); - _barUrnEBeanLocalAccess.add(samza, new AspectFoo().setValue("Samza"), AspectFoo.class, new AuditStamp(), null); + _barUrnEBeanLocalAccess.add(spark, new AspectFoo().setValue("Spark"), AspectFoo.class, new AuditStamp(), null, false); + _barUrnEBeanLocalAccess.add(samza, new AspectFoo().setValue("Samza"), AspectFoo.class, new AuditStamp(), null, false); } // crew1 is a non-mg entity @@ -484,23 +484,23 @@ public void testFindOneRelationshipForCrewUsage(EbeanLocalDAO.SchemaConfig schem // add kafka owned by crew1 OwnedBy kafkaOwnedByCrew1 = new OwnedBy().setSource(kafka).setDestination(crew1); - _localRelationshipWriterDAO.addRelationship(kafkaOwnedByCrew1); + _localRelationshipWriterDAO.addRelationship(kafkaOwnedByCrew1, false); // add hdfs owned by crew1 OwnedBy hdfsOwnedByCrew1 = new OwnedBy().setSource(hdfs).setDestination(crew1); - _localRelationshipWriterDAO.addRelationship(hdfsOwnedByCrew1); + _localRelationshipWriterDAO.addRelationship(hdfsOwnedByCrew1, false); // add restli owned by crew1 OwnedBy restliOwnedByCrew1 = new OwnedBy().setSource(restli).setDestination(crew1); - _localRelationshipWriterDAO.addRelationship(restliOwnedByCrew1); + _localRelationshipWriterDAO.addRelationship(restliOwnedByCrew1, false); // add spark owned by crew2 OwnedBy sparkOwnedByCrew2 = new OwnedBy().setSource(spark).setDestination(crew2); - _localRelationshipWriterDAO.addRelationship(sparkOwnedByCrew2); + _localRelationshipWriterDAO.addRelationship(sparkOwnedByCrew2, false); // add samza owned by crew2 OwnedBy samzaOwnedByCrew2 = new OwnedBy().setSource(samza).setDestination(crew2); - _localRelationshipWriterDAO.addRelationship(samzaOwnedByCrew2); + _localRelationshipWriterDAO.addRelationship(samzaOwnedByCrew2, false); // Find all owned-by relationship for crew1. LocalRelationshipCriterion filterUrnCriterion = EBeanDAOUtils.buildRelationshipFieldCriterion( @@ -542,11 +542,11 @@ public void testFindOneRelationshipWithFilterOnSourceEntityForCrewUsage(EbeanLoc // Add Kafka_Topic, HDFS_Dataset and Restli_Service into entity tables. if (schemaConfig == EbeanLocalDAO.SchemaConfig.NEW_SCHEMA_ONLY) { _fooUrnEBeanLocalAccess.add(kafka, new AspectFoo().setValue("Kafka_Topic"), AspectFoo.class, new AuditStamp(), - null); + null, false); _fooUrnEBeanLocalAccess.add(hdfs, new AspectFoo().setValue("HDFS_Dataset"), AspectFoo.class, new AuditStamp(), - null); + null, false); _fooUrnEBeanLocalAccess.add(restli, new AspectFoo().setValue("Restli_Service"), AspectFoo.class, new AuditStamp(), - null); + null, false); } // crew is a non-mg entity @@ -554,15 +554,15 @@ public void testFindOneRelationshipWithFilterOnSourceEntityForCrewUsage(EbeanLoc // add kafka owned by crew OwnedBy kafkaOwnedByCrew = new OwnedBy().setSource(kafka).setDestination(crew); - _localRelationshipWriterDAO.addRelationship(kafkaOwnedByCrew); + _localRelationshipWriterDAO.addRelationship(kafkaOwnedByCrew, false); // add hdfs owned by crew OwnedBy hdfsOwnedByCrew = new OwnedBy().setSource(hdfs).setDestination(crew); - _localRelationshipWriterDAO.addRelationship(hdfsOwnedByCrew); + _localRelationshipWriterDAO.addRelationship(hdfsOwnedByCrew, false); // add restli owned by crew OwnedBy restliOwnedByCrew = new OwnedBy().setSource(restli).setDestination(crew); - _localRelationshipWriterDAO.addRelationship(restliOwnedByCrew); + _localRelationshipWriterDAO.addRelationship(restliOwnedByCrew, false); // Find all owned-by relationship for crew. LocalRelationshipCriterion filterUrnCriterion = EBeanDAOUtils.buildRelationshipFieldCriterion( @@ -623,33 +623,33 @@ void testFindRelationshipsWithEntityUrnOffsetAndCount(EbeanLocalDAO.SchemaConfig // Add Alice, Bob, Jack, Lisa, Rose, and Jenny into entity tables. if (schemaConfig == EbeanLocalDAO.SchemaConfig.NEW_SCHEMA_ONLY) { - _fooUrnEBeanLocalAccess.add(alice, new AspectFoo().setValue("Alice"), AspectFoo.class, new AuditStamp(), null); - _fooUrnEBeanLocalAccess.add(bob, new AspectFoo().setValue("Bob"), AspectFoo.class, new AuditStamp(), null); - _fooUrnEBeanLocalAccess.add(jack, new AspectFoo().setValue("Jack"), AspectFoo.class, new AuditStamp(), null); - _fooUrnEBeanLocalAccess.add(lisa, new AspectFoo().setValue("Lisa"), AspectFoo.class, new AuditStamp(), null); - _fooUrnEBeanLocalAccess.add(rose, new AspectFoo().setValue("Rose"), AspectFoo.class, new AuditStamp(), null); - _fooUrnEBeanLocalAccess.add(jenny, new AspectFoo().setValue("Jenny"), AspectFoo.class, new AuditStamp(), null); + _fooUrnEBeanLocalAccess.add(alice, new AspectFoo().setValue("Alice"), AspectFoo.class, new AuditStamp(), null, false); + _fooUrnEBeanLocalAccess.add(bob, new AspectFoo().setValue("Bob"), AspectFoo.class, new AuditStamp(), null, false); + _fooUrnEBeanLocalAccess.add(jack, new AspectFoo().setValue("Jack"), AspectFoo.class, new AuditStamp(), null, false); + _fooUrnEBeanLocalAccess.add(lisa, new AspectFoo().setValue("Lisa"), AspectFoo.class, new AuditStamp(), null, false); + _fooUrnEBeanLocalAccess.add(rose, new AspectFoo().setValue("Rose"), AspectFoo.class, new AuditStamp(), null, false); + _fooUrnEBeanLocalAccess.add(jenny, new AspectFoo().setValue("Jenny"), AspectFoo.class, new AuditStamp(), null, false); } // Add Bob reports-to ALice relationship ReportsTo bobReportsToAlice = new ReportsTo().setSource(bob).setDestination(alice); - _localRelationshipWriterDAO.addRelationship(bobReportsToAlice); + _localRelationshipWriterDAO.addRelationship(bobReportsToAlice, false); // Add Jack reports-to ALice relationship ReportsTo jackReportsToAlice = new ReportsTo().setSource(jack).setDestination(alice); - _localRelationshipWriterDAO.addRelationship(jackReportsToAlice); + _localRelationshipWriterDAO.addRelationship(jackReportsToAlice, false); // Add Lisa reports-to ALice relationship ReportsTo lisaReportsToAlice = new ReportsTo().setSource(lisa).setDestination(alice); - _localRelationshipWriterDAO.addRelationship(lisaReportsToAlice); + _localRelationshipWriterDAO.addRelationship(lisaReportsToAlice, false); // Add Rose reports-to ALice relationship ReportsTo roseReportsToAlice = new ReportsTo().setSource(rose).setDestination(alice); - _localRelationshipWriterDAO.addRelationship(roseReportsToAlice); + _localRelationshipWriterDAO.addRelationship(roseReportsToAlice, false); // Add Jenny reports-to ALice relationship ReportsTo jennyReportsToAlice = new ReportsTo().setSource(jenny).setDestination(alice); - _localRelationshipWriterDAO.addRelationship(jennyReportsToAlice); + _localRelationshipWriterDAO.addRelationship(jennyReportsToAlice, false); // Find all reports-to relationship for Alice. LocalRelationshipFilter filter; @@ -703,7 +703,7 @@ ReportsTo.class, new LocalRelationshipFilter().setCriteria(new LocalRelationship public void testIsMgEntityUrn() throws Exception { // add foo to EBeanLocalAccess to create table FooUrn fooUrn = new FooUrn(1); - _fooUrnEBeanLocalAccess.add(fooUrn, new AspectFoo().setValue("Alice"), AspectFoo.class, new AuditStamp(), null); + _fooUrnEBeanLocalAccess.add(fooUrn, new AspectFoo().setValue("Alice"), AspectFoo.class, new AuditStamp(), null, false); // EbeanLocalRelationshipQueryDAOTest does not have the same package as EbeanLocalRelationshipQueryDAO (cant access protected method directly). Method isMgEntityUrnMethod = EbeanLocalRelationshipQueryDAO.class.getDeclaredMethod("isMgEntityUrn", Urn.class); @@ -723,17 +723,17 @@ public void testFindEntitiesOneHopAwayIncomingDirection() throws Exception { FooUrn jack = new FooUrn(3); // Add Alice, Bob and Jack into entity tables. - _fooUrnEBeanLocalAccess.add(alice, new AspectFoo().setValue("Alice"), AspectFoo.class, new AuditStamp(), null); - _fooUrnEBeanLocalAccess.add(bob, new AspectFoo().setValue("Bob"), AspectFoo.class, new AuditStamp(), null); - _fooUrnEBeanLocalAccess.add(jack, new AspectFoo().setValue("Jack"), AspectFoo.class, new AuditStamp(), null); + _fooUrnEBeanLocalAccess.add(alice, new AspectFoo().setValue("Alice"), AspectFoo.class, new AuditStamp(), null, false); + _fooUrnEBeanLocalAccess.add(bob, new AspectFoo().setValue("Bob"), AspectFoo.class, new AuditStamp(), null, false); + _fooUrnEBeanLocalAccess.add(jack, new AspectFoo().setValue("Jack"), AspectFoo.class, new AuditStamp(), null, false); // Add Bob reports-to ALice relationship ReportsTo bobReportsToAlice = new ReportsTo().setSource(bob).setDestination(alice); - _localRelationshipWriterDAO.addRelationship(bobReportsToAlice); + _localRelationshipWriterDAO.addRelationship(bobReportsToAlice, false); // Add Jack reports-to ALice relationship ReportsTo jackReportsToAlice = new ReportsTo().setSource(jack).setDestination(alice); - _localRelationshipWriterDAO.addRelationship(jackReportsToAlice); + _localRelationshipWriterDAO.addRelationship(jackReportsToAlice, false); // Find all Alice's direct reports. LocalRelationshipCriterion filterCriterion = EBeanDAOUtils.buildRelationshipFieldCriterion(LocalRelationshipValue.create("Alice"), @@ -771,22 +771,22 @@ public void testFindEntitiesOneHopAwayOutgoingDirection() throws Exception { BarUrn mit = new BarUrn(2); // Add Alice and Bob into entity tables. - _fooUrnEBeanLocalAccess.add(alice, new AspectFoo().setValue("Alice"), AspectFoo.class, new AuditStamp(), null); - _fooUrnEBeanLocalAccess.add(bob, new AspectFoo().setValue("Bob"), AspectFoo.class, new AuditStamp(), null); + _fooUrnEBeanLocalAccess.add(alice, new AspectFoo().setValue("Alice"), AspectFoo.class, new AuditStamp(), null, false); + _fooUrnEBeanLocalAccess.add(bob, new AspectFoo().setValue("Bob"), AspectFoo.class, new AuditStamp(), null, false); // Add Stanford and MIT into entity tables. - _barUrnEBeanLocalAccess.add(stanford, new AspectFoo().setValue("Stanford"), AspectFoo.class, new AuditStamp(), null); - _barUrnEBeanLocalAccess.add(mit, new AspectFoo().setValue("MIT"), AspectFoo.class, new AuditStamp(), null); + _barUrnEBeanLocalAccess.add(stanford, new AspectFoo().setValue("Stanford"), AspectFoo.class, new AuditStamp(), null, false); + _barUrnEBeanLocalAccess.add(mit, new AspectFoo().setValue("MIT"), AspectFoo.class, new AuditStamp(), null, false); // Add Alice belongs to MIT and Stanford. BelongsTo aliceBelongsToMit = new BelongsTo().setSource(alice).setDestination(mit); BelongsTo aliceBelongsToStanford = new BelongsTo().setSource(alice).setDestination(stanford); - _localRelationshipWriterDAO.addRelationship(aliceBelongsToMit); - _localRelationshipWriterDAO.addRelationship(aliceBelongsToStanford); + _localRelationshipWriterDAO.addRelationship(aliceBelongsToMit, false); + _localRelationshipWriterDAO.addRelationship(aliceBelongsToStanford, false); // Add Bob belongs to Stanford. BelongsTo bobBelongsToStandford = new BelongsTo().setSource(bob).setDestination(stanford); - _localRelationshipWriterDAO.addRelationship(bobBelongsToStandford); + _localRelationshipWriterDAO.addRelationship(bobBelongsToStandford, false); // Alice filter LocalRelationshipCriterion filterCriterion = EBeanDAOUtils.buildRelationshipFieldCriterion(LocalRelationshipValue.create("Alice"), @@ -826,30 +826,30 @@ public void testFindEntitiesOneHopAwayUndirected() throws Exception { FooUrn john = new FooUrn(4); // Add Alice, Bob, Jack and John into entity tables. - _fooUrnEBeanLocalAccess.add(alice, new AspectFoo().setValue("Alice"), AspectFoo.class, new AuditStamp(), null); - _fooUrnEBeanLocalAccess.add(bob, new AspectFoo().setValue("Bob"), AspectFoo.class, new AuditStamp(), null); - _fooUrnEBeanLocalAccess.add(jack, new AspectFoo().setValue("Jack"), AspectFoo.class, new AuditStamp(), null); - _fooUrnEBeanLocalAccess.add(john, new AspectFoo().setValue("John"), AspectFoo.class, new AuditStamp(), null); + _fooUrnEBeanLocalAccess.add(alice, new AspectFoo().setValue("Alice"), AspectFoo.class, new AuditStamp(), null, false); + _fooUrnEBeanLocalAccess.add(bob, new AspectFoo().setValue("Bob"), AspectFoo.class, new AuditStamp(), null, false); + _fooUrnEBeanLocalAccess.add(jack, new AspectFoo().setValue("Jack"), AspectFoo.class, new AuditStamp(), null, false); + _fooUrnEBeanLocalAccess.add(john, new AspectFoo().setValue("John"), AspectFoo.class, new AuditStamp(), null, false); - _fooUrnEBeanLocalAccess.add(alice, new AspectBar().setValue("32"), AspectBar.class, new AuditStamp(), null); // Alice 32 years old + _fooUrnEBeanLocalAccess.add(alice, new AspectBar().setValue("32"), AspectBar.class, new AuditStamp(), null, false); // Alice 32 years old - _fooUrnEBeanLocalAccess.add(bob, new AspectBar().setValue("52"), AspectBar.class, new AuditStamp(), null); // Bob 52 years old + _fooUrnEBeanLocalAccess.add(bob, new AspectBar().setValue("52"), AspectBar.class, new AuditStamp(), null, false); // Bob 52 years old - _fooUrnEBeanLocalAccess.add(jack, new AspectBar().setValue("16"), AspectBar.class, new AuditStamp(), null); // Jack 16 years old + _fooUrnEBeanLocalAccess.add(jack, new AspectBar().setValue("16"), AspectBar.class, new AuditStamp(), null, false); // Jack 16 years old - _fooUrnEBeanLocalAccess.add(john, new AspectBar().setValue("42"), AspectBar.class, new AuditStamp(), null); // John 42 years old + _fooUrnEBeanLocalAccess.add(john, new AspectBar().setValue("42"), AspectBar.class, new AuditStamp(), null, false); // John 42 years old // Add Alice pair-with Jack relationships. Alice --> Jack. PairsWith alicePairsWithJack = new PairsWith().setSource(alice).setDestination(jack); - _localRelationshipWriterDAO.addRelationship(alicePairsWithJack); + _localRelationshipWriterDAO.addRelationship(alicePairsWithJack, false); // Add Bob pair-with Alice relationships. Bob --> Alice. PairsWith bobPairsWithAlice = new PairsWith().setSource(bob).setDestination(alice); - _localRelationshipWriterDAO.addRelationship(bobPairsWithAlice); + _localRelationshipWriterDAO.addRelationship(bobPairsWithAlice, false); // Add Alice pair-with John relationships. Alice --> John. PairsWith alicePairsWithJohn = new PairsWith().setSource(alice).setDestination(john); - _localRelationshipWriterDAO.addRelationship(alicePairsWithJohn); + _localRelationshipWriterDAO.addRelationship(alicePairsWithJohn, false); // Alice filter LocalRelationshipCriterion filterCriterion = EBeanDAOUtils.buildRelationshipFieldCriterion(LocalRelationshipValue.create("Alice"), @@ -892,7 +892,7 @@ public void testFindEntitiesOneHopAwayUndirected() throws Exception { @Test public void testFindOneEntityWithInCondition() throws URISyntaxException, OperationNotSupportedException { // Ingest data - _fooUrnEBeanLocalAccess.add(new FooUrn(1), new AspectFoo().setValue("foo"), AspectFoo.class, new AuditStamp(), null); + _fooUrnEBeanLocalAccess.add(new FooUrn(1), new AspectFoo().setValue("foo"), AspectFoo.class, new AuditStamp(), null, false); // Prepare filter LocalRelationshipCriterion filterCriterion = EBeanDAOUtils.buildRelationshipFieldCriterion(LocalRelationshipValue.create(new StringArray("foo")), @@ -910,7 +910,7 @@ public void testFindOneEntityWithInCondition() throws URISyntaxException, Operat @Test public void testFindNoEntityWithInCondition() throws URISyntaxException, OperationNotSupportedException { // Ingest data - _fooUrnEBeanLocalAccess.add(new FooUrn(1), new AspectFoo().setValue("foo"), AspectFoo.class, new AuditStamp(), null); + _fooUrnEBeanLocalAccess.add(new FooUrn(1), new AspectFoo().setValue("foo"), AspectFoo.class, new AuditStamp(), null, false); // Prepare filter LocalRelationshipCriterion filterCriterion = EBeanDAOUtils.buildRelationshipFieldCriterion(LocalRelationshipValue.create(new StringArray("bar")), @@ -926,7 +926,7 @@ public void testFindNoEntityWithInCondition() throws URISyntaxException, Operati @Test public void testFindEntitiesWithEmptyRelationshipFilter() throws URISyntaxException { // Ingest data - _fooUrnEBeanLocalAccess.add(new FooUrn(1), new AspectFoo().setValue("foo"), AspectFoo.class, new AuditStamp(), null); + _fooUrnEBeanLocalAccess.add(new FooUrn(1), new AspectFoo().setValue("foo"), AspectFoo.class, new AuditStamp(), null, false); // Create empty filter LocalRelationshipFilter emptyFilter = new LocalRelationshipFilter(); diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/localrelationship/EbeanLocalRelationshipWriterDAOTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/localrelationship/EbeanLocalRelationshipWriterDAOTest.java index e0e6e7b29..d8f32bbe6 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/localrelationship/EbeanLocalRelationshipWriterDAOTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/localrelationship/EbeanLocalRelationshipWriterDAOTest.java @@ -56,7 +56,7 @@ public void testAddRelationshipWithRemoveAllEdgesToDestination() throws URISynta List before = _server.createSqlQuery("select * from metadata_relationship_belongsto where source='urn:li:bar:000'").findList(); assertEquals(before.size(), 1); - _localRelationshipWriterDAO.processLocalRelationshipUpdates(FooUrn.createFromString("urn:li:foo:123"), updates); + _localRelationshipWriterDAO.processLocalRelationshipUpdates(FooUrn.createFromString("urn:li:foo:123"), updates, false); // After processing verification List all = _server.createSqlQuery("select * from metadata_relationship_belongsto").findList(); @@ -91,7 +91,7 @@ public void testAddRelationshipWithRemoveNone() throws URISyntaxException { List before = _server.createSqlQuery("select * from metadata_relationship_reportsto where source='urn:li:bar:000'").findList(); assertEquals(before.size(), 1); - _localRelationshipWriterDAO.processLocalRelationshipUpdates(FooUrn.createFromString("urn:li:foo:123"), updates); + _localRelationshipWriterDAO.processLocalRelationshipUpdates(FooUrn.createFromString("urn:li:foo:123"), updates, false); // After processing verification List after = _server.createSqlQuery("select * from metadata_relationship_reportsto where destination='urn:li:foo:123'").findList(); @@ -103,6 +103,42 @@ public void testAddRelationshipWithRemoveNone() throws URISyntaxException { _server.execute(Ebean.createSqlUpdate("truncate metadata_relationship_reportsto")); } + @Test + public void testAddRelationshipWithRemoveNoneInTestMode() throws URISyntaxException { + _server.execute(Ebean.createSqlUpdate( + insertRelationships("metadata_relationship_reportsto_test", "urn:li:bar:000", "bar", "urn:li:foo:123", "foo"))); + + AspectFooBar aspectFooBar = new AspectFooBar().setBars( + new BarUrnArray(BarUrn.createFromString("urn:li:bar:123"), BarUrn.createFromString("urn:li:bar:456"), + BarUrn.createFromString("urn:li:bar:789"))); + + List updates = + new ReportsToLocalRelationshipBuilder(AspectFooBar.class).buildRelationships( + FooUrn.createFromString("urn:li:foo:123"), aspectFooBar); + + // Before processing + List beforeTest = + _server.createSqlQuery("select * from metadata_relationship_reportsto_test where source='urn:li:bar:000'") + .findList(); + assertEquals(beforeTest.size(), 1); + + _localRelationshipWriterDAO.processLocalRelationshipUpdates(FooUrn.createFromString("urn:li:foo:123"), updates, + true); + + // After processing verification + List afterTest = + _server.createSqlQuery("select * from metadata_relationship_reportsto_test where destination='urn:li:foo:123'") + .findList(); + assertEquals(afterTest.size(), 4); + List edgesTest = + _server.createSqlQuery("select * from metadata_relationship_reportsto_test where source='urn:li:bar:000'") + .findList(); + assertEquals(edgesTest.size(), 1); + + // Clean up + _server.execute(Ebean.createSqlUpdate("truncate metadata_relationship_reportsto_test")); + } + @Test public void testAddRelationshipWithRemoveAllEdgesFromSourceToDestination() throws URISyntaxException { _server.execute(Ebean.createSqlUpdate(insertRelationships("metadata_relationship_pairswith", "urn:li:bar:123", @@ -123,7 +159,7 @@ public void testAddRelationshipWithRemoveAllEdgesFromSourceToDestination() throw List before = _server.createSqlQuery("select * from metadata_relationship_pairswith").findList(); assertEquals(before.size(), 3); - _localRelationshipWriterDAO.processLocalRelationshipUpdates(FooUrn.createFromString("urn:li:foo:123"), updates); + _localRelationshipWriterDAO.processLocalRelationshipUpdates(FooUrn.createFromString("urn:li:foo:123"), updates, false); // After processing verification List all = _server.createSqlQuery("select * from metadata_relationship_pairswith").findList(); @@ -160,7 +196,7 @@ public void testAddRelationshipWithRemoveAllEdgesFromSource() throws URISyntaxEx List before = _server.createSqlQuery("select * from metadata_relationship_versionof").findList(); assertEquals(before.size(), 3); - _localRelationshipWriterDAO.processLocalRelationshipUpdates(FooUrn.createFromString("urn:li:foo:123"), updates); + _localRelationshipWriterDAO.processLocalRelationshipUpdates(FooUrn.createFromString("urn:li:foo:123"), updates, false); // After processing verification List all = _server.createSqlQuery("select * from metadata_relationship_versionof").findList(); @@ -198,7 +234,7 @@ public void testClearRelationshipsByEntityUrn() throws URISyntaxException { assertEquals(before.size(), 2); _localRelationshipWriterDAO.clearRelationshipsByEntity(barUrn, PairsWith.class, - BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_FROM_SOURCE); + BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_FROM_SOURCE, false); // After processing verification List all = _server.createSqlQuery("select * from metadata_relationship_pairswith where deleted_ts is null").findList(); @@ -212,7 +248,7 @@ public void testClearRelationshipsByEntityUrn() throws URISyntaxException { "bar", "urn:li:foo:456", "foo"))); _localRelationshipWriterDAO.clearRelationshipsByEntity(fooUrn, PairsWith.class, - BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_TO_DESTINATION); + BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_TO_DESTINATION, false); // After processing verification all = _server.createSqlQuery("select * from metadata_relationship_pairswith where deleted_ts is null").findList(); diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/SQLStatementUtilsTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/SQLStatementUtilsTest.java index d666d43d9..68b1e10a3 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/SQLStatementUtilsTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/SQLStatementUtilsTest.java @@ -39,13 +39,13 @@ public void testCreateUpsertAspectSql() { "INSERT INTO metadata_entity_foo (urn, a_urn, a_aspectfoo, lastmodifiedon, lastmodifiedby) VALUE (:urn, " + ":a_urn, :metadata, :lastmodifiedon, :lastmodifiedby) ON DUPLICATE KEY UPDATE a_aspectfoo = :metadata," + " lastmodifiedon = :lastmodifiedon;"; - assertEquals(SQLStatementUtils.createAspectUpsertSql(fooUrn, AspectFoo.class, true), expectedSql); + assertEquals(SQLStatementUtils.createAspectUpsertSql(fooUrn, AspectFoo.class, true, false), expectedSql); expectedSql = "INSERT INTO metadata_entity_foo (urn, a_aspectfoo, lastmodifiedon, lastmodifiedby) VALUE (:urn, " + ":metadata, :lastmodifiedon, :lastmodifiedby) ON DUPLICATE KEY UPDATE a_aspectfoo = :metadata," + " lastmodifiedon = :lastmodifiedon;"; - assertEquals(SQLStatementUtils.createAspectUpsertSql(fooUrn, AspectFoo.class, false), expectedSql); + assertEquals(SQLStatementUtils.createAspectUpsertSql(fooUrn, AspectFoo.class, false, false), expectedSql); } @Test @@ -59,7 +59,7 @@ public void testCreateAspectReadSql() { "SELECT urn, a_aspectfoo, lastmodifiedon, lastmodifiedby FROM metadata_entity_foo WHERE urn = 'urn:li:foo:1' " + "AND JSON_EXTRACT(a_aspectfoo, '$.gma_deleted') IS NULL UNION ALL SELECT urn, a_aspectfoo, lastmodifiedon, lastmodifiedby " + "FROM metadata_entity_foo WHERE urn = 'urn:li:foo:2' AND JSON_EXTRACT(a_aspectfoo, '$.gma_deleted') IS NULL"; - assertEquals(SQLStatementUtils.createAspectReadSql(AspectFoo.class, set, false), expectedSql); + assertEquals(SQLStatementUtils.createAspectReadSql(AspectFoo.class, set, false, false), expectedSql); } @Test @@ -369,7 +369,7 @@ public void testUpdateAspectWithOptimisticLockSql() { "UPDATE metadata_entity_foo SET a_aspectfoo = :metadata, a_urn = :a_urn, lastmodifiedon = :lastmodifiedon, " + "lastmodifiedby = :lastmodifiedby WHERE urn = :urn and (JSON_EXTRACT(a_aspectfoo, '$.lastmodifiedon') = " + ":oldTimestamp OR JSON_EXTRACT(a_aspectfoo, '$.gma_deleted') IS NOT NULL);"; - assertEquals(SQLStatementUtils.createAspectUpdateWithOptimisticLockSql(fooUrn, AspectFoo.class, true), + assertEquals(SQLStatementUtils.createAspectUpdateWithOptimisticLockSql(fooUrn, AspectFoo.class, true, false), expectedSql); expectedSql = @@ -377,7 +377,7 @@ public void testUpdateAspectWithOptimisticLockSql() { + ":lastmodifiedby WHERE urn = :urn and (JSON_EXTRACT(a_aspectfoo, '$.lastmodifiedon') = :oldTimestamp " + "OR JSON_EXTRACT(a_aspectfoo, '$.gma_deleted') IS NOT NULL);"; assertEquals( - SQLStatementUtils.createAspectUpdateWithOptimisticLockSql(fooUrn, AspectFoo.class, false), + SQLStatementUtils.createAspectUpdateWithOptimisticLockSql(fooUrn, AspectFoo.class, false, false), expectedSql); } } \ No newline at end of file diff --git a/dao-impl/ebean-dao/src/test/resources/ebean-local-dao-create-all-with-non-dollar-virtual-column-names.sql b/dao-impl/ebean-dao/src/test/resources/ebean-local-dao-create-all-with-non-dollar-virtual-column-names.sql index 1a2827e4c..dd77d0305 100644 --- a/dao-impl/ebean-dao/src/test/resources/ebean-local-dao-create-all-with-non-dollar-virtual-column-names.sql +++ b/dao-impl/ebean-dao/src/test/resources/ebean-local-dao-create-all-with-non-dollar-virtual-column-names.sql @@ -1,4 +1,5 @@ DROP TABLE IF EXISTS metadata_entity_foo; +DROP TABLE IF EXISTS metadata_entity_foo_test; DROP TABLE IF EXISTS metadata_entity_bar; DROP TABLE IF EXISTS metadata_entity_burger; DROP TABLE IF EXISTS metadata_aspect; @@ -15,6 +16,15 @@ CREATE TABLE IF NOT EXISTS metadata_entity_foo ( CONSTRAINT pk_metadata_entity_foo PRIMARY KEY (urn) ); +-- initialize foo entity test table +CREATE TABLE IF NOT EXISTS metadata_entity_foo_test ( + urn VARCHAR(100) NOT NULL, + lastmodifiedon TIMESTAMP NOT NULL, + lastmodifiedby VARCHAR(255) NOT NULL, + createdfor VARCHAR(255), + CONSTRAINT pk_metadata_entity_foo_test PRIMARY KEY (urn) + ); + -- initialize bar entity table CREATE TABLE IF NOT EXISTS metadata_entity_bar ( urn VARCHAR(100) NOT NULL, @@ -64,11 +74,15 @@ CREATE TABLE metadata_aspect ( ); ALTER TABLE metadata_entity_foo ADD a_urn JSON; +ALTER TABLE metadata_entity_foo_test ADD a_urn JSON; ALTER TABLE metadata_entity_bar ADD a_urn JSON; -- add foo aspect to foo entity ALTER TABLE metadata_entity_foo ADD a_aspectfoo JSON; +-- add foo aspect to foo test entity +ALTER TABLE metadata_entity_foo_test ADD a_aspectfoo JSON; + -- add bar aspect to foo entity ALTER TABLE metadata_entity_foo ADD a_aspectbar JSON; diff --git a/dao-impl/ebean-dao/src/test/resources/ebean-local-dao-create-all.sql b/dao-impl/ebean-dao/src/test/resources/ebean-local-dao-create-all.sql index 5692fd3af..a2f4d9b05 100644 --- a/dao-impl/ebean-dao/src/test/resources/ebean-local-dao-create-all.sql +++ b/dao-impl/ebean-dao/src/test/resources/ebean-local-dao-create-all.sql @@ -1,4 +1,5 @@ DROP TABLE IF EXISTS metadata_entity_foo; +DROP TABLE IF EXISTS metadata_entity_foo_test; DROP TABLE IF EXISTS metadata_entity_bar; DROP TABLE IF EXISTS metadata_entity_burger; DROP TABLE IF EXISTS metadata_aspect; @@ -15,6 +16,15 @@ CREATE TABLE IF NOT EXISTS metadata_entity_foo ( CONSTRAINT pk_metadata_entity_foo PRIMARY KEY (urn) ); +-- initialize foo entity test table +CREATE TABLE IF NOT EXISTS metadata_entity_foo_test ( + urn VARCHAR(100) NOT NULL, + lastmodifiedon TIMESTAMP NOT NULL, + lastmodifiedby VARCHAR(255) NOT NULL, + createdfor VARCHAR(255), + CONSTRAINT pk_metadata_entity_foo_test PRIMARY KEY (urn) +); + -- initialize bar entity table CREATE TABLE IF NOT EXISTS metadata_entity_bar ( urn VARCHAR(100) NOT NULL, @@ -64,11 +74,15 @@ CREATE TABLE metadata_aspect ( ); ALTER TABLE metadata_entity_foo ADD a_urn JSON; +ALTER TABLE metadata_entity_foo_test ADD a_urn JSON; ALTER TABLE metadata_entity_bar ADD a_urn JSON; -- add foo aspect to foo entity ALTER TABLE metadata_entity_foo ADD a_aspectfoo JSON; +-- add foo aspect to foo test entity +ALTER TABLE metadata_entity_foo_test ADD a_aspectfoo JSON; + -- add bar aspect to foo entity ALTER TABLE metadata_entity_foo ADD a_aspectbar JSON; diff --git a/dao-impl/ebean-dao/src/test/resources/ebean-local-relationship-dao-create-all.sql b/dao-impl/ebean-dao/src/test/resources/ebean-local-relationship-dao-create-all.sql index c42ef347e..d80c95e81 100644 --- a/dao-impl/ebean-dao/src/test/resources/ebean-local-relationship-dao-create-all.sql +++ b/dao-impl/ebean-dao/src/test/resources/ebean-local-relationship-dao-create-all.sql @@ -33,6 +33,20 @@ CREATE TABLE IF NOT EXISTS metadata_relationship_reportsto ( PRIMARY KEY (id) ); + +CREATE TABLE IF NOT EXISTS metadata_relationship_reportsto_test ( + id BIGINT NOT NULL AUTO_INCREMENT, + metadata JSON NOT NULL, + source VARCHAR(1000) NOT NULL, + source_type VARCHAR(100) NOT NULL, + destination VARCHAR(1000) NOT NULL, + destination_type VARCHAR(100) NOT NULL, + lastmodifiedon TIMESTAMP NOT NULL, + lastmodifiedby VARCHAR(255) NOT NULL, + deleted_ts DATETIME(6) DEFAULT NULL, + PRIMARY KEY (id) + ); + CREATE TABLE IF NOT EXISTS metadata_relationship_ownedby ( id BIGINT NOT NULL AUTO_INCREMENT, metadata JSON NOT NULL, diff --git a/dao-impl/neo4j-dao/src/main/java/com/linkedin/metadata/dao/internal/Neo4jGraphWriterDAO.java b/dao-impl/neo4j-dao/src/main/java/com/linkedin/metadata/dao/internal/Neo4jGraphWriterDAO.java index a6154bb1c..aea4c5eee 100644 --- a/dao-impl/neo4j-dao/src/main/java/com/linkedin/metadata/dao/internal/Neo4jGraphWriterDAO.java +++ b/dao-impl/neo4j-dao/src/main/java/com/linkedin/metadata/dao/internal/Neo4jGraphWriterDAO.java @@ -169,7 +169,7 @@ public void removeEntities(@Nonnull List urns) { @Override public void addRelationships(@Nonnull List relationships, - @Nonnull RemovalOption removalOption) { + @Nonnull RemovalOption removalOption, boolean isTestMode) { if (relationships.isEmpty()) { return; } diff --git a/dao-impl/neo4j-dao/src/test/java/com/linkedin/metadata/dao/Neo4jQueryDAOTest.java b/dao-impl/neo4j-dao/src/test/java/com/linkedin/metadata/dao/Neo4jQueryDAOTest.java index 6ea5a10d4..36f4114d7 100644 --- a/dao-impl/neo4j-dao/src/test/java/com/linkedin/metadata/dao/Neo4jQueryDAOTest.java +++ b/dao-impl/neo4j-dao/src/test/java/com/linkedin/metadata/dao/Neo4jQueryDAOTest.java @@ -141,16 +141,16 @@ public void testFindEntityWithOneRelationship() throws Exception { // create relationship urn1 -(r:Foo)-> urn2 -(r:Foo)-> urn3 (example use case: ReportTo list) // also relationship urn1 -(r:Foo)-> urn4, and urn1 -(r:Bar)-> urn5 RelationshipFoo relationshipFoo1To2 = new RelationshipFoo().setSource(urn1).setDestination(urn2); - _writer.addRelationship(relationshipFoo1To2); + _writer.addRelationship(relationshipFoo1To2, false); RelationshipFoo relationshipFoo2o3 = new RelationshipFoo().setSource(urn2).setDestination(urn3).setType("apa"); - _writer.addRelationship(relationshipFoo2o3); + _writer.addRelationship(relationshipFoo2o3, false); RelationshipFoo relationshipFoo1o4 = new RelationshipFoo().setSource(urn1).setDestination(urn4); - _writer.addRelationship(relationshipFoo1o4); + _writer.addRelationship(relationshipFoo1o4, false); RelationshipBar relationshipBar1o5 = new RelationshipBar().setSource(urn1).setDestination(urn5); - _writer.addRelationship(relationshipBar1o5); + _writer.addRelationship(relationshipBar1o5, false); // test source filter with urn Filter sourceFilter = newFilter("urn", urn2.toString()); @@ -227,10 +227,10 @@ public void testFindEntitiesMultiHops() throws Exception { // create relationship urn1 -> urn2 -> urn3 (use case: ReportTo list) RelationshipFoo relationshipFoo1To2 = new RelationshipFoo().setSource(urn1).setDestination(urn2); - _writer.addRelationship(relationshipFoo1To2); + _writer.addRelationship(relationshipFoo1To2, false); RelationshipFoo relationshipFoo2o3 = new RelationshipFoo().setSource(urn2).setDestination(urn3); - _writer.addRelationship(relationshipFoo2o3); + _writer.addRelationship(relationshipFoo2o3, false); // From urn1, get result with one hop, such as direct manager Filter sourceFilter = newFilter("urn", urn1.toString()); @@ -329,11 +329,11 @@ public void testFindEntitiesViaTraversePathes() throws Exception { // create relationship urn1 -> urn2 with RelationshipFoo (ex: CorpGroup1 HasMember CorpUser2) RelationshipFoo relationshipFoo1To2 = new RelationshipFoo().setSource(urn1).setDestination(urn2); - _writer.addRelationship(relationshipFoo1To2); + _writer.addRelationship(relationshipFoo1To2, false); // create relationship urn2 -> urn3 with RelationshipBar (ex: Dataset3 is OwnedBy CorpUser2) RelationshipBar relationshipFoo2o3 = new RelationshipBar().setSource(urn3).setDestination(urn2); - _writer.addRelationship(relationshipFoo2o3); + _writer.addRelationship(relationshipFoo2o3, false); // test source filter with urn Filter sourceFilter = newFilter("urn", urn1.toString()); @@ -372,11 +372,11 @@ public void testFindEntitiesViaTraversePathes() throws Exception { // create relationship urn4 -> urn5 with RelationshipBar RelationshipBar relationshipFoo4o5 = new RelationshipBar().setSource(urn5).setDestination(urn4); - _writer.addRelationship(relationshipFoo4o5); + _writer.addRelationship(relationshipFoo4o5, false); // create relationship urn1 -> urn4 with RelationshipFoo RelationshipFoo relationshipFoo1To4 = new RelationshipFoo().setSource(urn1).setDestination(urn4); - _writer.addRelationship(relationshipFoo1To4); + _writer.addRelationship(relationshipFoo1To4, false); List paths3 = new ArrayList(); paths3.add( @@ -428,7 +428,7 @@ public void testFindRelationship() throws Exception { // create relationship urn1 -> urn2 RelationshipFoo relationship = new RelationshipFoo().setSource(urn1).setDestination(urn2); - _writer.addRelationship(relationship); + _writer.addRelationship(relationship, false); // find by source Filter filter1 = newFilter("urn", urn1.toString()); @@ -458,7 +458,7 @@ public void testFindRelationshipByQuery() throws Exception { _writer.addEntity(entity2); RelationshipFoo relationship = new RelationshipFoo().setSource(urn1).setDestination(urn2); - _writer.addRelationship(relationship); + _writer.addRelationship(relationship, false); // with type Map params = new HashMap<>(); @@ -501,11 +501,11 @@ public void testFindNodesInPath() throws Exception { _writer.addEntity(entity6); // Create relationships - simulate reportsto use case - _writer.addRelationship(new RelationshipFoo().setSource(urn6).setDestination(urn3)); - _writer.addRelationship(new RelationshipFoo().setSource(urn5).setDestination(urn3)); - _writer.addRelationship(new RelationshipFoo().setSource(urn4).setDestination(urn2)); - _writer.addRelationship(new RelationshipFoo().setSource(urn3).setDestination(urn1)); - _writer.addRelationship(new RelationshipFoo().setSource(urn2).setDestination(urn1)); + _writer.addRelationship(new RelationshipFoo().setSource(urn6).setDestination(urn3), false); + _writer.addRelationship(new RelationshipFoo().setSource(urn5).setDestination(urn3), false); + _writer.addRelationship(new RelationshipFoo().setSource(urn4).setDestination(urn2), false); + _writer.addRelationship(new RelationshipFoo().setSource(urn3).setDestination(urn1), false); + _writer.addRelationship(new RelationshipFoo().setSource(urn2).setDestination(urn1), false); // Get reports roll-up - 2 levels Filter sourceFilter = newFilter("urn", urn1.toString()); @@ -614,10 +614,10 @@ public void testRunFreeFormQuery() throws Exception { } private void createFooRelationship(FooUrn f1, FooUrn f2) throws Exception { - _writer.addRelationship(new RelationshipFoo().setSource(f1).setDestination(f2)); + _writer.addRelationship(new RelationshipFoo().setSource(f1).setDestination(f2), false); } private void createBarRelationship(BarUrn d1, FooUrn f1) throws Exception { - _writer.addRelationship(new RelationshipBar().setSource(d1).setDestination(f1)); + _writer.addRelationship(new RelationshipBar().setSource(d1).setDestination(f1), false); } } diff --git a/dao-impl/neo4j-dao/src/test/java/com/linkedin/metadata/dao/internal/Neo4jGraphWriterDAOTest.java b/dao-impl/neo4j-dao/src/test/java/com/linkedin/metadata/dao/internal/Neo4jGraphWriterDAOTest.java index a0eabaf58..adf6c58e4 100644 --- a/dao-impl/neo4j-dao/src/test/java/com/linkedin/metadata/dao/internal/Neo4jGraphWriterDAOTest.java +++ b/dao-impl/neo4j-dao/src/test/java/com/linkedin/metadata/dao/internal/Neo4jGraphWriterDAOTest.java @@ -161,7 +161,7 @@ public void testAddRelationshipNodeNonExist() throws Exception { BarUrn urn2 = makeBarUrn(2); RelationshipFoo relationship = new RelationshipFoo().setSource(urn1).setDestination(urn2); - _dao.addRelationship(relationship, REMOVE_NONE); + _dao.addRelationship(relationship, REMOVE_NONE, false); assertRelationshipFoo(_helper.getEdges(relationship), 1); assertEntityFoo(_helper.getNode(urn1).get(), new EntityFoo().setUrn(urn1)); @@ -176,7 +176,7 @@ public void testPartialUpdateEntityCreatedByRelationship() throws Exception { FooUrn urn2 = makeFooUrn(2); RelationshipFoo relationship = new RelationshipFoo().setSource(urn1).setDestination(urn2); - _dao.addRelationship(relationship, REMOVE_NONE); + _dao.addRelationship(relationship, REMOVE_NONE, false); // Check if adding an entity with same urn and with label creates a new node _dao.addEntity(new EntityFoo().setUrn(urn1)); @@ -199,17 +199,17 @@ public void testAddRemoveRelationships() throws Exception { // add relationship1 (urn1 -> urn2) RelationshipFoo relationship1 = new RelationshipFoo().setSource(urn1).setDestination(urn2); - _dao.addRelationship(relationship1, REMOVE_NONE); + _dao.addRelationship(relationship1, REMOVE_NONE, false); assertRelationshipFoo(_helper.getEdges(relationship1), 1); // add relationship1 again - _dao.addRelationship(relationship1); + _dao.addRelationship(relationship1, false); assertRelationshipFoo(_helper.getEdges(relationship1), 1); // add relationship2 (urn1 -> urn3) Urn urn3 = makeUrn(3); RelationshipFoo relationship2 = new RelationshipFoo().setSource(urn1).setDestination(urn3); - _dao.addRelationship(relationship2); + _dao.addRelationship(relationship2, false); assertRelationshipFoo(_helper.getEdgesFromSource(urn1, RelationshipFoo.class), 2); // remove relationship1 @@ -244,28 +244,28 @@ public void testAddRelationshipRemoveAll() throws Exception { // add relationship1 (urn1 -> urn2) RelationshipFoo relationship1 = new RelationshipFoo().setSource(urn1).setDestination(urn2); - _dao.addRelationship(relationship1, REMOVE_NONE); + _dao.addRelationship(relationship1, REMOVE_NONE, false); assertRelationshipFoo(_helper.getEdges(relationship1), 1); // add relationship2 (urn1 -> urn3), removeAll from source Urn urn3 = makeUrn(3); RelationshipFoo relationship2 = new RelationshipFoo().setSource(urn1).setDestination(urn3); - _dao.addRelationship(relationship2, REMOVE_ALL_EDGES_FROM_SOURCE); + _dao.addRelationship(relationship2, REMOVE_ALL_EDGES_FROM_SOURCE, false); assertRelationshipFoo(_helper.getEdgesFromSource(urn1, RelationshipFoo.class), 1); // add relationship3 (urn4 -> urn3), removeAll from destination Urn urn4 = makeUrn(4); RelationshipFoo relationship3 = new RelationshipFoo().setSource(urn4).setDestination(urn3); - _dao.addRelationship(relationship3, REMOVE_ALL_EDGES_TO_DESTINATION); + _dao.addRelationship(relationship3, REMOVE_ALL_EDGES_TO_DESTINATION, false); assertRelationshipFoo(_helper.getEdgesFromSource(urn1, RelationshipFoo.class), 0); assertRelationshipFoo(_helper.getEdgesFromSource(urn4, RelationshipFoo.class), 1); // add relationship3 again without removal - _dao.addRelationship(relationship3); + _dao.addRelationship(relationship3, false); assertRelationshipFoo(_helper.getEdgesFromSource(urn4, RelationshipFoo.class), 1); // add relationship3 again, removeAll from source & destination - _dao.addRelationship(relationship3, REMOVE_ALL_EDGES_FROM_SOURCE_TO_DESTINATION); + _dao.addRelationship(relationship3, REMOVE_ALL_EDGES_FROM_SOURCE_TO_DESTINATION, false); assertRelationshipFoo(_helper.getEdgesFromSource(urn1, RelationshipFoo.class), 0); assertRelationshipFoo(_helper.getEdgesFromSource(urn4, RelationshipFoo.class), 1); } @@ -297,10 +297,10 @@ public void upsertEdgeAddNewProperty() throws Exception { new RelationshipFoo().setSource(foo.getUrn()).setDestination(bar.getUrn()); final RelationshipFoo updatedRelationship = new RelationshipFoo().setSource(foo.getUrn()).setDestination(bar.getUrn()).setType("test"); - _dao.addRelationship(initialRelationship); + _dao.addRelationship(initialRelationship, false); // when - _dao.addRelationship(updatedRelationship); + _dao.addRelationship(updatedRelationship, false); // then assertEquals(_queryDao.findRelationships(EntityFoo.class, @@ -338,10 +338,10 @@ public void upsertEdgeChangeProperty() throws Exception { new RelationshipFoo().setSource(foo.getUrn()).setDestination(bar.getUrn()).setType("before"); final RelationshipFoo updatedRelationship = new RelationshipFoo().setSource(foo.getUrn()).setDestination(bar.getUrn()).setType("after"); - _dao.addRelationship(initialRelationship); + _dao.addRelationship(initialRelationship, false); // when - _dao.addRelationship(updatedRelationship); + _dao.addRelationship(updatedRelationship, false); // then assertEquals(_queryDao.findRelationships(EntityFoo.class, @@ -380,10 +380,10 @@ public void upsertEdgeRemoveProperty() throws Exception { new RelationshipFoo().setSource(foo.getUrn()).setDestination(bar.getUrn()).setType("before"); final RelationshipFoo updatedRelationship = new RelationshipFoo().setSource(foo.getUrn()).setDestination(bar.getUrn()); - _dao.addRelationship(initialRelationship); + _dao.addRelationship(initialRelationship, false); // when - _dao.addRelationship(updatedRelationship); + _dao.addRelationship(updatedRelationship, false); // then assertEquals(_queryDao.findRelationships(EntityFoo.class,