From 0266fb05515c542050ec936cc1a3d222e7fcef97 Mon Sep 17 00:00:00 2001 From: Jyoti Wadhwani Date: Mon, 8 Nov 2021 13:32:08 -0800 Subject: [PATCH] feat: support for soft deleted aspects (#130) * support for soft deleted aspects * return optional value for add method * remove resource level changes * keep ListResultMetadata the same * add a test for createAndGet * use correct Nonnull annotation * keep the old add method the same * add test to soft delete when no metadata exists * minor refactor * address comments * add note for not supporting PUH in delete * add javadocs --- .../linkedin/metadata/dao/BaseLocalDAO.java | 121 +++++++++---- .../metadata/dao/BaseLocalDAOTest.java | 19 ++- dao-impl/ebean-dao/gma-create-all.sql | 2 +- .../linkedin/metadata/dao/EbeanLocalDAO.java | 112 +++++++----- .../metadata/dao/EbeanMetadataAspect.java | 3 +- .../metadata/dao/ImmutableLocalDAO.java | 12 +- .../src/main/resources/gma-create-all.sql | 2 +- .../metadata/dao/EbeanLocalDAOTest.java | 160 +++++++++++++++++- 8 files changed, 348 insertions(+), 83 deletions(-) diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java b/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java index 47522c822..6a9f71ca2 100644 --- a/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java @@ -271,6 +271,46 @@ public boolean isLocalSecondaryIndexEnabled() { return _enableLocalSecondaryIndex; } + /** + * Logic common to both {@link #add(Urn, Class, Function, AuditStamp)} and {@link #delete(Urn, Class, AuditStamp, int)} methods. + * + * @param urn urn the URN for the entity the aspect is attached to + * @param latest {@link AspectEntry} that corresponds to the latest metadata stored + * @param newValue new metadata that needs to be added/stored + * @param aspectClass aspectClass of the aspect being saved + * @param auditStamp audit stamp for the operation + * @param equalityTester {@link EqualityTester} that is an interface for testing equality between two objects of the same type + * @param must be a supported aspect type in {@code ASPECT_UNION} + * @return {@link AddResult} corresponding to the old and new value of metadata + */ + private AddResult addCommon(@Nonnull URN urn, + @Nullable AspectEntry latest, @Nullable ASPECT newValue, @Nonnull Class aspectClass, + @Nonnull AuditStamp auditStamp, @Nonnull EqualityTester equalityTester) { + + final ASPECT oldValue = latest == null ? null : latest.getAspect(); + // Skip saving if there's no actual change + if ((oldValue == null && newValue == null) || oldValue != null && newValue != null && equalityTester.equals( + oldValue, newValue)) { + return new AddResult<>(oldValue, oldValue); + } + + // Save the newValue as the latest version + long largestVersion = + saveLatest(urn, aspectClass, oldValue, latest == null ? null : latest.getExtraInfo().getAudit(), newValue, + auditStamp); + + // Apply retention policy + applyRetention(urn, aspectClass, getRetention(aspectClass), largestVersion); + + // Save to local secondary index + // TODO: add support for soft deleted aspects in local secondary index + if (_enableLocalSecondaryIndex && newValue != null) { + updateLocalIndex(urn, newValue, largestVersion); + } + + return new AddResult<>(oldValue, newValue); + } + /** * Adds a new version of aspect for an entity. * @@ -293,58 +333,43 @@ public ASPECT add(@Nonnull URN urn, @Nonnull Cla final EqualityTester equalityTester = getEqualityTester(aspectClass); final AddResult result = runInTransactionWithRetry(() -> { - // 1. Compute newValue based on oldValue - AspectEntry latest = getLatest(urn, aspectClass); + // Compute newValue based on oldValue + final AspectEntry latest = getLatest(urn, aspectClass); final ASPECT oldValue = latest == null ? null : latest.getAspect(); final ASPECT newValue = updateLambda.apply(Optional.ofNullable(oldValue)); + if (newValue == null) { + throw new UnsupportedOperationException("Do not support adding null metadata in add method"); + } checkValidAspect(newValue.getClass()); if (_modelValidationOnWrite) { validateAgainstSchema(newValue); } - // 2. Invoke pre-update hooks, if any + // Invoke pre-update hooks, if any if (_aspectPreUpdateHooksMap.containsKey(aspectClass)) { _aspectPreUpdateHooksMap.get(aspectClass).forEach(hook -> hook.accept(urn, newValue)); } - // 3. Skip saving if there's no actual change - if (oldValue != null && equalityTester.equals(oldValue, newValue)) { - return new AddResult<>(oldValue, oldValue); - } - - // 4. Save the newValue as the latest version - long largestVersion = - saveLatest(urn, aspectClass, oldValue, latest == null ? null : latest.getExtraInfo().getAudit(), newValue, - auditStamp); - - // 5. Apply retention policy - applyRetention(urn, aspectClass, getRetention(aspectClass), largestVersion); - - // 6. Save to local secondary index - if (_enableLocalSecondaryIndex) { - updateLocalIndex(urn, newValue, largestVersion); - } - - return new AddResult<>(oldValue, newValue); + return addCommon(urn, latest, newValue, aspectClass, auditStamp, equalityTester); }, maxTransactionRetry); final ASPECT oldValue = result.getOldValue(); final ASPECT newValue = result.getNewValue(); - // 7. Produce MAE after a successful update + // Produce MAE after a successful update if (_alwaysEmitAuditEvent || oldValue != newValue) { _producer.produceMetadataAuditEvent(urn, oldValue, newValue); } - // TODO: Replace step 7 with step 7.1 after pipeline is fully migrated to aspect specific events. - // 7.1 Produce aspect specific MAE after a successful update + // TODO: Replace the previous step with the step below, after pipeline is fully migrated to aspect specific events. + // Produce aspect specific MAE after a successful update if (_emitAspectSpecificAuditEvent) { if (_alwaysEmitAspectSpecificAuditEvent || oldValue != newValue) { _producer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, newValue); } } - // 8. Invoke post-update hooks if there's any + // Invoke post-update hooks if there's any if (_aspectPostUpdateHooksMap.containsKey(aspectClass)) { _aspectPostUpdateHooksMap.get(aspectClass).forEach(hook -> hook.accept(urn, newValue)); } @@ -352,6 +377,34 @@ public ASPECT add(@Nonnull URN urn, @Nonnull Cla return newValue; } + /** + * Deletes the latest version of aspect for an entity. + * + *

The new aspect will have an automatically assigned version number, which is guaranteed to be positive and + * monotonically increasing. Older versions of aspect will be purged automatically based on the retention setting. + * + *

Note that we do not support Post-update hooks while soft deleting an aspect + * + * @param urn urn the URN for the entity the aspect is attached to + * @param aspectClass aspectClass of the aspect being saved + * @param auditStamp the audit stamp of the previous latest aspect, null if new value is the first version + * @param maxTransactionRetry maximum number of transaction retries before throwing an exception + * @param must be a supported aspect type in {@code ASPECT_UNION} + */ + public void delete(@Nonnull URN urn, @Nonnull Class aspectClass, + @Nonnull AuditStamp auditStamp, int maxTransactionRetry) { + + checkValidAspect(aspectClass); + + runInTransactionWithRetry(() -> { + final AspectEntry latest = getLatest(urn, aspectClass); + + return addCommon(urn, latest, null, aspectClass, auditStamp, new DefaultEqualityTester<>()); + }, maxTransactionRetry); + + // TODO: add support for sending MAE for soft deleted aspects + } + /** * Similar to {@link #add(Urn, Class, Function, AuditStamp, int)} but uses the default maximum transaction retry. */ @@ -370,6 +423,15 @@ public ASPECT add(@Nonnull URN urn, @Nonnull ASP return add(urn, (Class) newValue.getClass(), ignored -> newValue, auditStamp); } + /** + * Similar to {@link #delete(Urn, Class, AuditStamp, int)} but uses the default maximum transaction retry. + */ + @Nonnull + public void delete(@Nonnull URN urn, @Nonnull Class aspectClass, + @Nonnull AuditStamp auditStamp) { + delete(urn, aspectClass, auditStamp, DEFAULT_MAX_TRANSACTION_RETRY); + } + private void applyRetention(@Nonnull URN urn, @Nonnull Class aspectClass, @Nonnull Retention retention, long largestVersion) { if (retention instanceof IndefiniteRetention) { @@ -400,7 +462,7 @@ private void applyRetention(@Nonnull URN urn, @N */ protected abstract long saveLatest(@Nonnull URN urn, @Nonnull Class aspectClass, @Nullable ASPECT oldEntry, @Nullable AuditStamp oldAuditStamp, - @Nonnull ASPECT newEntry, @Nonnull AuditStamp newAuditStamp); + @Nullable ASPECT newEntry, @Nonnull AuditStamp newAuditStamp); /** * Saves the new value of an aspect to local secondary index. @@ -587,12 +649,13 @@ protected abstract long getNextVersion(@Nonnull * * @param urn {@link Urn} for the entity * @param value the aspect to save + * @param aspectClass the type of aspect to save * @param auditStamp the {@link AuditStamp} for the aspect * @param version the version for the aspect * @param insert use insert, instead of update, operation to save */ - protected abstract void save(@Nonnull URN urn, @Nonnull RecordTemplate value, @Nonnull AuditStamp auditStamp, - long version, boolean insert); + protected abstract void save(@Nonnull URN urn, @Nullable RecordTemplate value, + @Nonnull Class aspectClass, @Nonnull AuditStamp auditStamp, long version, boolean insert); /** * Returns a boolean representing if an Urn has any Aspects associated with it (i.e. if it exists in the DB). diff --git a/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java b/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java index e6a090ee6..3dc9f3cfc 100644 --- a/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java +++ b/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java @@ -73,7 +73,8 @@ protected long getNextVersion(FooUrn urn, Class< } @Override - protected void save(FooUrn urn, RecordTemplate value, AuditStamp auditStamp, long version, boolean insert) { + protected void save(FooUrn urn, RecordTemplate value, Class aspectClass, + AuditStamp auditStamp, long version, boolean insert) { } @@ -233,6 +234,22 @@ public void testMAEEmissionNoValueChange() throws URISyntaxException { verifyNoMoreInteractions(_mockEventProducer); } + @Test + public void testMAEWithNullValue() throws URISyntaxException { + FooUrn urn = new FooUrn(1); + AspectFoo foo = new AspectFoo().setValue("foo"); + _dummyLocalDAO.setAlwaysEmitAuditEvent(true); + expectGetLatest(urn, AspectFoo.class, Arrays.asList(null, makeAspectEntry(foo, _dummyAuditStamp))); + + _dummyLocalDAO.add(urn, foo, _dummyAuditStamp); + _dummyLocalDAO.delete(urn, AspectFoo.class, _dummyAuditStamp); + + verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, null, foo); + // TODO: ensure MAE is produced with newValue set as null for soft deleted aspect + // verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, foo, null); + verifyNoMoreInteractions(_mockEventProducer); + } + @Test public void testAddSamePreUpdateHookTwice() { BiConsumer hook = (urn, foo) -> { diff --git a/dao-impl/ebean-dao/gma-create-all.sql b/dao-impl/ebean-dao/gma-create-all.sql index e1cf9bab6..b56dbbc08 100644 --- a/dao-impl/ebean-dao/gma-create-all.sql +++ b/dao-impl/ebean-dao/gma-create-all.sql @@ -8,7 +8,7 @@ create table metadata_aspect ( urn varchar(500) not null, aspect varchar(200) not null, version bigint not null, - metadata clob not null, + metadata clob, createdon timestamp not null, createdby varchar(255) not null, createdfor varchar(255), diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java index 3ce09f1e6..cc2e8d601 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java @@ -288,23 +288,23 @@ protected T runInTransactionWithRetry(@Nonnull Supplier block, int maxTra @Override protected long saveLatest(@Nonnull URN urn, @Nonnull Class aspectClass, - @Nullable ASPECT oldValue, @Nullable AuditStamp oldAuditStamp, @Nonnull ASPECT newValue, + @Nullable ASPECT oldValue, @Nullable AuditStamp oldAuditStamp, @Nullable ASPECT newValue, @Nonnull AuditStamp newAuditStamp) { // Save oldValue as the largest version + 1 long largestVersion = 0; if (oldValue != null && oldAuditStamp != null) { largestVersion = getNextVersion(urn, aspectClass); - save(urn, oldValue, oldAuditStamp, largestVersion, true); + save(urn, oldValue, aspectClass, oldAuditStamp, largestVersion, true); // update latest version if (_useOptimisticLocking) { - saveWithOptimisticLocking(urn, newValue, newAuditStamp, LATEST_VERSION, false, + saveWithOptimisticLocking(urn, newValue, aspectClass, newAuditStamp, LATEST_VERSION, false, new Timestamp(oldAuditStamp.getTime())); } else { - save(urn, newValue, newAuditStamp, LATEST_VERSION, false); + save(urn, newValue, aspectClass, newAuditStamp, LATEST_VERSION, false); } } else { - save(urn, newValue, newAuditStamp, LATEST_VERSION, true); + save(urn, newValue, aspectClass, newAuditStamp, LATEST_VERSION, true); } return largestVersion; @@ -335,18 +335,26 @@ protected AspectEntry getLatest(@Nonnull return null; } - return new AspectEntry<>(RecordUtils.toRecordTemplate(aspectClass, latest.getMetadata()), toExtraInfo(latest)); + final Optional optionalExtraInfo = toExtraInfo(latest); + return optionalExtraInfo.map( + extraInfo -> new AspectEntry<>(RecordUtils.toRecordTemplate(aspectClass, latest.getMetadata()), extraInfo)) + .orElse(null); } @Nonnull - private EbeanMetadataAspect buildMetadataAspectBean(@Nonnull URN urn, @Nonnull RecordTemplate value, - @Nonnull AuditStamp auditStamp, long version) { + private EbeanMetadataAspect buildMetadataAspectBean(@Nonnull URN urn, + @Nullable RecordTemplate value, @Nonnull Class aspectClass, @Nonnull AuditStamp auditStamp, + long version) { - final String aspectName = ModelUtils.getAspectName(value.getClass()); + final String aspectName = ModelUtils.getAspectName(aspectClass); final EbeanMetadataAspect aspect = new EbeanMetadataAspect(); aspect.setKey(new PrimaryKey(urn.toString(), aspectName, version)); - aspect.setMetadata(RecordUtils.toJsonString(value)); + if (value != null) { + aspect.setMetadata(RecordUtils.toJsonString(value)); + } else { + aspect.setMetadata(null); + } aspect.setCreatedOn(new Timestamp(auditStamp.getTime())); aspect.setCreatedBy(auditStamp.getActor().toString()); @@ -359,10 +367,11 @@ private EbeanMetadataAspect buildMetadataAspectBean(@Nonnull URN urn, @Nonnull R } // visible for testing - protected void saveWithOptimisticLocking(@Nonnull URN urn, @Nonnull RecordTemplate value, - @Nonnull AuditStamp newAuditStamp, long version, boolean insert, @Nonnull Object oldTimestamp) { + protected void saveWithOptimisticLocking(@Nonnull URN urn, + @Nullable RecordTemplate value, @Nonnull Class aspectClass, @Nonnull AuditStamp newAuditStamp, + long version, boolean insert, @Nonnull Object oldTimestamp) { - final EbeanMetadataAspect aspect = buildMetadataAspectBean(urn, value, newAuditStamp, version); + final EbeanMetadataAspect aspect = buildMetadataAspectBean(urn, value, aspectClass, newAuditStamp, version); if (insert) { _server.insert(aspect); @@ -399,10 +408,10 @@ protected void saveWithOptimisticLocking(@Nonnull URN urn, @Nonnull RecordTempla } @Override - protected void save(@Nonnull URN urn, @Nonnull RecordTemplate value, @Nonnull AuditStamp auditStamp, long version, - boolean insert) { + protected void save(@Nonnull URN urn, @Nullable RecordTemplate value, + @Nonnull Class aspectClass, @Nonnull AuditStamp auditStamp, long version, boolean insert) { - final EbeanMetadataAspect aspect = buildMetadataAspectBean(urn, value, auditStamp, version); + final EbeanMetadataAspect aspect = buildMetadataAspectBean(urn, value, aspectClass, auditStamp, version); if (insert) { _server.insert(aspect); @@ -534,11 +543,12 @@ protected void applyTimeBasedRetention(@Nonnull } // TODO: Improve this O(n^2) search + return keys.stream() .collect(Collectors.toMap(Function.identity(), key -> records.stream() .filter(record -> matchKeys(key, record.getKey())) .findFirst() - .map(record -> toRecordTemplate(key.getAspectClass(), record)))); + .flatMap(record -> toRecordTemplate(key.getAspectClass(), record)))); } @Override @@ -556,7 +566,13 @@ protected void applyTimeBasedRetention(@Nonnull keys.forEach(key -> records.stream() .filter(record -> matchKeys(key, record.getKey())) .findFirst() - .map(record -> result.put(key, toRecordTemplateWithExtraInfo(key.getAspectClass(), record)))); + .map(record -> { + final Class aspectClass = (Class) key.getAspectClass(); + final Optional> aspectWithExtraInfo = toRecordTemplateWithExtraInfo(aspectClass, record); + aspectWithExtraInfo.ifPresent( + recordTemplateAspectWithExtraInfo -> result.put(key, recordTemplateAspectWithExtraInfo)); + return null; + })); return result; } @@ -739,6 +755,24 @@ public ListResult listUrns(@Nonnull Class ListResult getListResult(@Nonnull Class aspectClass, + @Nonnull PagedList pagedList, int start) { + final List aspects = new ArrayList<>(); + pagedList.getList().forEach(a -> { + final Optional record = toRecordTemplate(aspectClass, a); + record.ifPresent(aspects::add); + }); + + final List extraInfos = new ArrayList<>(); + pagedList.getList().forEach(record -> { + final Optional extraInfo = EbeanLocalDAO.toExtraInfo(record); + extraInfo.ifPresent(extraInfos::add); + }); + final ListResultMetadata listResultMetadata = makeListResultMetadata(extraInfos); + return toListResult(aspects, listResultMetadata, pagedList, start); + } + @Override @Nonnull public ListResult list(@Nonnull Class aspectClass, @Nonnull URN urn, @@ -757,11 +791,7 @@ public ListResult list(@Nonnull Class aspects = - pagedList.getList().stream().map(a -> toRecordTemplate(aspectClass, a)).collect(Collectors.toList()); - final ListResultMetadata listResultMetadata = makeListResultMetadata( - pagedList.getList().stream().map(EbeanLocalDAO::toExtraInfo).collect(Collectors.toList())); - return toListResult(aspects, listResultMetadata, pagedList, start); + return getListResult(aspectClass, pagedList, start); } @Override @@ -782,11 +812,7 @@ public ListResult list(@Nonnull Class aspects = - pagedList.getList().stream().map(a -> toRecordTemplate(aspectClass, a)).collect(Collectors.toList()); - final ListResultMetadata listResultMetadata = makeListResultMetadata( - pagedList.getList().stream().map(EbeanLocalDAO::toExtraInfo).collect(Collectors.toList())); - return toListResult(aspects, listResultMetadata, pagedList, start); + return getListResult(aspectClass, pagedList, start); } @Override @@ -807,16 +833,24 @@ URN getUrn(@Nonnull String urn) { } @Nonnull - private static ASPECT toRecordTemplate(@Nonnull Class aspectClass, + private static Optional toRecordTemplate(@Nonnull Class aspectClass, @Nonnull EbeanMetadataAspect aspect) { - return RecordUtils.toRecordTemplate(aspectClass, aspect.getMetadata()); + if (aspect.getMetadata() == null) { + return Optional.empty(); + } + return Optional.of(RecordUtils.toRecordTemplate(aspectClass, aspect.getMetadata())); } @Nonnull - private static AspectWithExtraInfo toRecordTemplateWithExtraInfo( + private static Optional> toRecordTemplateWithExtraInfo( @Nonnull Class aspectClass, @Nonnull EbeanMetadataAspect aspect) { - return new AspectWithExtraInfo<>(RecordUtils.toRecordTemplate(aspectClass, aspect.getMetadata()), - toExtraInfo(aspect)); + if (aspect.getMetadata() == null) { + return Optional.empty(); + } + final Optional optionalExtraInfo = toExtraInfo(aspect); + return optionalExtraInfo.map( + extraInfo -> new AspectWithExtraInfo<>(RecordUtils.toRecordTemplate(aspectClass, aspect.getMetadata()), + extraInfo)); } @Nonnull @@ -837,7 +871,10 @@ private ListResult toListResult(@Nonnull List values, @Nullable ListRe } @Nonnull - private static ExtraInfo toExtraInfo(@Nonnull EbeanMetadataAspect aspect) { + private static Optional toExtraInfo(@Nonnull EbeanMetadataAspect aspect) { + if (aspect.getMetadata() == null) { + return Optional.empty(); + } final ExtraInfo extraInfo = new ExtraInfo(); extraInfo.setVersion(aspect.getKey().getVersion()); extraInfo.setAudit(makeAuditStamp(aspect)); @@ -847,7 +884,7 @@ private static ExtraInfo toExtraInfo(@Nonnull EbeanMetadataAspect aspect) { throw new ModelConversionException(e.getMessage()); } - return extraInfo; + return Optional.of(extraInfo); } @Nonnull @@ -986,8 +1023,7 @@ private static void validateConditionAndValue(@Nonnull IndexCriterion criterion) } @Nonnull - static String getFieldColumn(@Nonnull String path, - @Nonnull String aspectName) { + static String getFieldColumn(@Nonnull String path, @Nonnull String aspectName) { final String[] pathSpecArray = RecordUtils.getPathSpecAsArray(path); // get nested field @@ -1242,7 +1278,7 @@ private static String constructCountAggregateSQLQuery(@Nonnull IndexCriterionArr * * @param indexCriterionArray {@link IndexCriterionArray} whose values will be used to set parameters in metadata * index query based on its position - * @param indexSortCriterion {@link IndexGroupByCriterion} whose values will be used to set parameters in query + * @param indexGroupByCriterion {@link IndexGroupByCriterion} whose values will be used to set parameters in query * @param indexQuery {@link Query} whose ordered parameters need to be set, based on it's position */ @Nonnull diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanMetadataAspect.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanMetadataAspect.java index 8e13c5deb..b59e0307a 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanMetadataAspect.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanMetadataAspect.java @@ -70,9 +70,8 @@ public static class PrimaryKey { @Index protected PrimaryKey key; - @NonNull @Lob - @Column(name = METADATA_COLUMN, nullable = false) + @Column(name = METADATA_COLUMN, nullable = true) protected String metadata; @NonNull diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/ImmutableLocalDAO.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/ImmutableLocalDAO.java index 95dc1bc99..b4b3095d0 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/ImmutableLocalDAO.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/ImmutableLocalDAO.java @@ -47,7 +47,11 @@ public ImmutableLocalDAO(@Nonnull Class aspectUnionClass, super(aspectUnionClass, new DummyMetadataEventProducer<>(), createProductionH2ServerConfig(aspectUnionClass.getCanonicalName()), urnClass); _server.execute(Ebean.createSqlUpdate(readSQLfromFile(GMA_CREATE_ALL_SQL))); - urnAspectMap.forEach((key, value) -> super.save(key, value, DUMMY_AUDIT_STAMP, LATEST_VERSION, true)); + urnAspectMap.forEach((key, value) -> { + if (value != null) { + super.save(key, value, value.getClass(), DUMMY_AUDIT_STAMP, LATEST_VERSION, true); + } + }); } // For testing purpose @@ -55,7 +59,11 @@ public ImmutableLocalDAO(@Nonnull Class aspectUnionClass, @Nonnull Map urnAspectMap, boolean ddlGenerate, @Nonnull Class urnClass) { super(aspectUnionClass, new DummyMetadataEventProducer<>(), createTestingH2ServerConfig(), urnClass); - urnAspectMap.forEach((key, value) -> super.save(key, value, DUMMY_AUDIT_STAMP, LATEST_VERSION, true)); + urnAspectMap.forEach((key, value) -> { + if (value != null) { + super.save(key, value, value.getClass(), DUMMY_AUDIT_STAMP, LATEST_VERSION, true); + } + }); } /** diff --git a/dao-impl/ebean-dao/src/main/resources/gma-create-all.sql b/dao-impl/ebean-dao/src/main/resources/gma-create-all.sql index a36f3c604..459fd7124 100644 --- a/dao-impl/ebean-dao/src/main/resources/gma-create-all.sql +++ b/dao-impl/ebean-dao/src/main/resources/gma-create-all.sql @@ -2,7 +2,7 @@ create table metadata_aspect ( urn varchar(500) not null, aspect varchar(200) not null, version bigint not null, - metadata clob not null, + metadata clob, createdon timestamp not null, createdby varchar(255) not null, createdfor varchar(255), diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java index c1c0a8a87..a85cd3305 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java @@ -46,6 +46,7 @@ import com.linkedin.testing.urn.FooUrn; import io.ebean.EbeanServer; import io.ebean.EbeanServerFactory; +import io.ebean.PagedList; import io.ebean.Transaction; import java.sql.Timestamp; import java.time.Clock; @@ -62,6 +63,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import javax.persistence.OptimisticLockException; import javax.persistence.RollbackException; import org.mockito.InOrder; @@ -212,6 +214,54 @@ public void testDefaultEqualityTester() { verifyNoMoreInteractions(_mockProducer); } + @Test + public void testSoftDeletedAspect() { + EbeanLocalDAO dao = createDao(FooUrn.class); + FooUrn urn = makeFooUrn(1); + String aspectName = ModelUtils.getAspectName(AspectFoo.class); + AspectFoo v1 = new AspectFoo().setValue("foo"); + AspectFoo v0 = new AspectFoo().setValue("bar"); + + dao.add(urn, v1, _dummyAuditStamp); + dao.add(urn, v0, _dummyAuditStamp); + dao.delete(urn, AspectFoo.class, _dummyAuditStamp); + + // latest version of metadata should be null + EbeanMetadataAspect aspect = getMetadata(urn, aspectName, 0); + assertNull(aspect.getMetadata()); + + aspect = getMetadata(urn, aspectName, 1); + AspectFoo actual = RecordUtils.toRecordTemplate(AspectFoo.class, aspect.getMetadata()); + assertEquals(actual, v1); + + aspect = getMetadata(urn, aspectName, 2); + actual = RecordUtils.toRecordTemplate(AspectFoo.class, aspect.getMetadata()); + assertEquals(actual, v0); + + InOrder inOrder = inOrder(_mockProducer); + inOrder.verify(_mockProducer, times(1)).produceMetadataAuditEvent(urn, null, v1); + inOrder.verify(_mockProducer, times(1)).produceMetadataAuditEvent(urn, v1, v0); + // TODO: verify that MAE was produced with newValue set as null for soft deleted aspect + verifyNoMoreInteractions(_mockProducer); + } + + @Test + public void testSoftDeletedAspectWithNoExistingMetadata() { + EbeanLocalDAO dao = createDao(FooUrn.class); + FooUrn urn = makeFooUrn(1); + String aspectName = ModelUtils.getAspectName(AspectFoo.class); + + // no metadata already exists + dao.delete(urn, AspectFoo.class, _dummyAuditStamp); + + // since old and new value are the same i.e. null, no metadata will be saved + EbeanMetadataAspect aspect = getMetadata(urn, aspectName, 0); + assertNull(aspect); + + // no MAE will be produced + verifyNoMoreInteractions(_mockProducer); + } + @Test public void testAlwaysFalseEqualityTester() { EbeanLocalDAO dao = createDao(FooUrn.class); @@ -419,6 +469,31 @@ public void testGetMultipleAspectsForMultipleUrns() { assertFalse(result.get(urn3).get(AspectBar.class).isPresent()); } + @Test + public void testGetListResult() { + EbeanLocalDAO dao = createDao(FooUrn.class); + FooUrn urn = makeFooUrn(1); + AspectFoo v1 = new AspectFoo().setValue("val1"); + AspectFoo v2 = new AspectFoo().setValue("val2"); + AspectFoo v4 = new AspectFoo().setValue("val4"); + // set v0 metadata as null + EbeanMetadataAspect a0 = getMetadata(urn, AspectFoo.class.getCanonicalName(), 0, null); + EbeanMetadataAspect a1 = getMetadata(urn, AspectFoo.class.getCanonicalName(), 1, v1); + EbeanMetadataAspect a2 = getMetadata(urn, AspectFoo.class.getCanonicalName(), 2, v2); + EbeanMetadataAspect a3 = getMetadata(urn, AspectFoo.class.getCanonicalName(), 3, null); + EbeanMetadataAspect a4 = getMetadata(urn, AspectFoo.class.getCanonicalName(), 4, v4); + List listAspects = Arrays.asList(a0, a1, a2, a3, a4); + + PagedList pagedList = mock(PagedList.class); + when(pagedList.getList()).thenReturn(listAspects); + + ListResult metadata = dao.getListResult(AspectFoo.class, pagedList, 0); + List nonNullVersions = metadata.getMetadata().getExtraInfos().stream().map(ExtraInfo::getVersion).collect(Collectors.toList()); + + assertEquals(metadata.getValues(), Arrays.asList(v1, v2, v4)); + assertEquals(nonNullVersions, Arrays.asList(1L, 2L, 4L)); + } + @Test public void testBackfill() { EbeanLocalDAO dao = createDao(FooUrn.class); @@ -1259,7 +1334,7 @@ public void testList() { assertNotNull(results.getMetadata()); List expectedVersions = Arrays.asList(0L, 1L, 2L, 3L, 4L); - List expectedUrns = Arrays.asList(makeFooUrn(0), makeFooUrn(1), makeFooUrn(2), makeFooUrn(3), makeFooUrn(4)); + List expectedUrns = Collections.singletonList(urn0); assertVersionMetadata(results.getMetadata(), expectedVersions, expectedUrns, 1234L, Urns.createFromTypeSpecificString("test", "foo"), Urns.createFromTypeSpecificString("test", "bar")); @@ -1275,6 +1350,45 @@ public void testList() { assertNotNull(results.getMetadata()); } + @Test + public void testListWithNullMetadata() { + EbeanLocalDAO dao = createDao(FooUrn.class); + List foos = new LinkedList<>(); + for (int i = 0; i < 3; i++) { + FooUrn urn = makeFooUrn(i); + + for (int j = 0; j < 3; j++) { + AspectFoo foo = new AspectFoo().setValue("foo" + j); + addMetadata(urn, AspectFoo.class.getCanonicalName(), j, foo); + if (i == 0) { + foos.add(foo); + } + } + + for (int j = 3; j < 6; j++) { + addMetadata(urn, AspectFoo.class.getCanonicalName(), j, null); + } + } + + FooUrn urn0 = makeFooUrn(0); + + ListResult results = dao.list(AspectFoo.class, urn0, 0, 5); + + assertTrue(results.isHavingMore()); + assertEquals(results.getNextStart(), 5); + assertEquals(results.getTotalCount(), 6); + assertEquals(results.getPageSize(), 5); + assertEquals(results.getTotalPageCount(), 2); + assertEquals(results.getValues().size(), 3); + assertEquals(results.getValues(), foos); + + assertNotNull(results.getMetadata()); + List expectedNonNullVersions = Arrays.asList(0L, 1L, 2L); + List expectedUrns = Collections.singletonList(urn0); + assertVersionMetadata(results.getMetadata(), expectedNonNullVersions, expectedUrns, 1234L, + Urns.createFromTypeSpecificString("test", "foo"), Urns.createFromTypeSpecificString("test", "bar")); + } + private static LocalDAOStorageConfig makeLocalDAOStorageConfig(Class aspectClass, List pegasusPaths) { Map, LocalDAOStorageConfig.AspectStorageConfig> aspectStorageConfigMap = @@ -1342,7 +1456,7 @@ public void testListAspectsForAllUrns() { assertEquals(results.getTotalPageCount(), 2); assertNotNull(results.getMetadata()); - assertVersionMetadata(results.getMetadata(), Arrays.asList(0L), Arrays.asList(makeFooUrn(0)), 1234L, + assertVersionMetadata(results.getMetadata(), Collections.singletonList(0L), Arrays.asList(makeFooUrn(0), makeFooUrn(1)), 1234L, Urns.createFromTypeSpecificString("test", "foo"), Urns.createFromTypeSpecificString("test", "bar")); // Test list latest aspects @@ -1369,7 +1483,8 @@ public void testListAspectsForAllUrns() { assertEquals(results.getTotalPageCount(), 1); assertNotNull(results.getMetadata()); - assertVersionMetadata(results.getMetadata(), Arrays.asList(1L), Arrays.asList(makeUrn(2)), 1234L, + assertVersionMetadata(results.getMetadata(), Collections.singletonList(1L), + Arrays.asList(makeFooUrn(0), makeFooUrn(1), makeFooUrn(2)), 1234L, Urns.createFromTypeSpecificString("test", "foo"), Urns.createFromTypeSpecificString("test", "bar")); } @@ -2017,6 +2132,25 @@ public void testGetWithExtraInfoSpecificVersion() { new ExtraInfo().setAudit(makeAuditStamp(creator2, impersonator2, 456)).setVersion(1).setUrn(urn))); } + @Test + public void testGetForSoftDeletedAspect() { + EbeanLocalDAO dao = createDao(FooUrn.class); + FooUrn urn = makeFooUrn(1); + AspectFoo v0 = new AspectFoo().setValue("foo"); + Urn creator1 = Urns.createFromTypeSpecificString("test", "testCreator1"); + Urn impersonator1 = Urns.createFromTypeSpecificString("test", "testImpersonator1"); + Urn creator2 = Urns.createFromTypeSpecificString("test", "testCreator2"); + Urn impersonator2 = Urns.createFromTypeSpecificString("test", "testImpersonator2"); + addMetadataWithAuditStamp(urn, AspectFoo.class.getCanonicalName(), 0, v0, 123, creator1.toString(), + impersonator1.toString()); + addMetadataWithAuditStamp(urn, AspectFoo.class.getCanonicalName(), 1, null, 456, creator2.toString(), + impersonator2.toString()); + + Optional> foo = dao.getWithExtraInfo(AspectFoo.class, urn, 1); + + assertFalse(foo.isPresent()); + } + @Test public void testGetWithExtraInfoMultipleKeys() { EbeanLocalDAO dao = createDao(FooUrn.class); @@ -2321,24 +2455,31 @@ public void testOptimisticLockException() { // call save method with timestamp 123 but timestamp is already changed to 456 // expect OptimisticLockException if optimistic locking is enabled - dao.saveWithOptimisticLocking(fooUrn, fooAspect, makeAuditStamp("fooActor", 789), 0, false, new Timestamp(123)); + dao.saveWithOptimisticLocking(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", 789), 0, false, + new Timestamp(123)); } - private void addMetadata(Urn urn, String aspectName, long version, RecordTemplate metadata) { + @Nonnull + private EbeanMetadataAspect getMetadata(Urn urn, String aspectName, long version, @Nullable RecordTemplate metadata) { EbeanMetadataAspect aspect = new EbeanMetadataAspect(); aspect.setKey(new EbeanMetadataAspect.PrimaryKey(urn.toString(), aspectName, version)); - aspect.setMetadata(RecordUtils.toJsonString(metadata)); + if (metadata != null) { + aspect.setMetadata(RecordUtils.toJsonString(metadata)); + } aspect.setCreatedOn(new Timestamp(1234)); aspect.setCreatedBy("urn:li:test:foo"); aspect.setCreatedFor("urn:li:test:bar"); + return aspect; + } + + private void addMetadata(Urn urn, String aspectName, long version, @Nullable RecordTemplate metadata) { + EbeanMetadataAspect aspect = getMetadata(urn, aspectName, version, metadata); _server.save(aspect); } private void addMetadataWithAuditStamp(Urn urn, String aspectName, long version, RecordTemplate metadata, long timeStamp, String creator, String impersonator) { - EbeanMetadataAspect aspect = new EbeanMetadataAspect(); - aspect.setKey(new EbeanMetadataAspect.PrimaryKey(urn.toString(), aspectName, version)); - aspect.setMetadata(RecordUtils.toJsonString(metadata)); + EbeanMetadataAspect aspect = getMetadata(urn, aspectName, version, metadata); aspect.setCreatedOn(new Timestamp(timeStamp)); aspect.setCreatedBy(creator); aspect.setCreatedFor(impersonator); @@ -2383,6 +2524,7 @@ private void assertVersionMetadata(ListResultMetadata listResultMetadata, List extraInfos = listResultMetadata.getExtraInfos(); assertEquals(extraInfos.stream().map(ExtraInfo::getVersion).collect(Collectors.toSet()), versions); + assertEquals(extraInfos.stream().map(ExtraInfo::getUrn).collect(Collectors.toSet()), urns); extraInfos.forEach(v -> { assertEquals(v.getAudit().getTime(), time);