Skip to content

Commit

Permalink
feat: support for soft deleted aspects (#130)
Browse files Browse the repository at this point in the history
* support for soft deleted aspects

* return optional value for add method

* remove resource level changes

* keep ListResultMetadata the same

* add a test for createAndGet

* use correct Nonnull annotation

* keep the old add method the same

* add test to soft delete when no metadata exists

* minor refactor

* address comments

* add note for not supporting PUH in delete

* add javadocs
  • Loading branch information
jywadhwani authored Nov 8, 2021
1 parent 1d6f311 commit 0266fb0
Show file tree
Hide file tree
Showing 8 changed files with 348 additions and 83 deletions.
121 changes: 92 additions & 29 deletions dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,46 @@ public boolean isLocalSecondaryIndexEnabled() {
return _enableLocalSecondaryIndex;
}

/**
* Logic common to both {@link #add(Urn, Class, Function, AuditStamp)} and {@link #delete(Urn, Class, AuditStamp, int)} methods.
*
* @param urn urn the URN for the entity the aspect is attached to
* @param latest {@link AspectEntry} that corresponds to the latest metadata stored
* @param newValue new metadata that needs to be added/stored
* @param aspectClass aspectClass of the aspect being saved
* @param auditStamp audit stamp for the operation
* @param equalityTester {@link EqualityTester} that is an interface for testing equality between two objects of the same type
* @param <ASPECT> must be a supported aspect type in {@code ASPECT_UNION}
* @return {@link AddResult} corresponding to the old and new value of metadata
*/
private <ASPECT extends RecordTemplate> AddResult<ASPECT> addCommon(@Nonnull URN urn,
@Nullable AspectEntry<ASPECT> latest, @Nullable ASPECT newValue, @Nonnull Class<ASPECT> aspectClass,
@Nonnull AuditStamp auditStamp, @Nonnull EqualityTester<ASPECT> equalityTester) {

final ASPECT oldValue = latest == null ? null : latest.getAspect();
// Skip saving if there's no actual change
if ((oldValue == null && newValue == null) || oldValue != null && newValue != null && equalityTester.equals(
oldValue, newValue)) {
return new AddResult<>(oldValue, oldValue);
}

// Save the newValue as the latest version
long largestVersion =
saveLatest(urn, aspectClass, oldValue, latest == null ? null : latest.getExtraInfo().getAudit(), newValue,
auditStamp);

// Apply retention policy
applyRetention(urn, aspectClass, getRetention(aspectClass), largestVersion);

// Save to local secondary index
// TODO: add support for soft deleted aspects in local secondary index
if (_enableLocalSecondaryIndex && newValue != null) {
updateLocalIndex(urn, newValue, largestVersion);
}

return new AddResult<>(oldValue, newValue);
}

/**
* Adds a new version of aspect for an entity.
*
Expand All @@ -293,65 +333,78 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull Cla
final EqualityTester<ASPECT> equalityTester = getEqualityTester(aspectClass);

final AddResult<ASPECT> result = runInTransactionWithRetry(() -> {
// 1. Compute newValue based on oldValue
AspectEntry<ASPECT> latest = getLatest(urn, aspectClass);
// Compute newValue based on oldValue
final AspectEntry<ASPECT> latest = getLatest(urn, aspectClass);
final ASPECT oldValue = latest == null ? null : latest.getAspect();
final ASPECT newValue = updateLambda.apply(Optional.ofNullable(oldValue));
if (newValue == null) {
throw new UnsupportedOperationException("Do not support adding null metadata in add method");
}
checkValidAspect(newValue.getClass());

if (_modelValidationOnWrite) {
validateAgainstSchema(newValue);
}

// 2. Invoke pre-update hooks, if any
// Invoke pre-update hooks, if any
if (_aspectPreUpdateHooksMap.containsKey(aspectClass)) {
_aspectPreUpdateHooksMap.get(aspectClass).forEach(hook -> hook.accept(urn, newValue));
}

// 3. Skip saving if there's no actual change
if (oldValue != null && equalityTester.equals(oldValue, newValue)) {
return new AddResult<>(oldValue, oldValue);
}

// 4. Save the newValue as the latest version
long largestVersion =
saveLatest(urn, aspectClass, oldValue, latest == null ? null : latest.getExtraInfo().getAudit(), newValue,
auditStamp);

// 5. Apply retention policy
applyRetention(urn, aspectClass, getRetention(aspectClass), largestVersion);

// 6. Save to local secondary index
if (_enableLocalSecondaryIndex) {
updateLocalIndex(urn, newValue, largestVersion);
}

return new AddResult<>(oldValue, newValue);
return addCommon(urn, latest, newValue, aspectClass, auditStamp, equalityTester);
}, maxTransactionRetry);

final ASPECT oldValue = result.getOldValue();
final ASPECT newValue = result.getNewValue();

// 7. Produce MAE after a successful update
// Produce MAE after a successful update
if (_alwaysEmitAuditEvent || oldValue != newValue) {
_producer.produceMetadataAuditEvent(urn, oldValue, newValue);
}

// TODO: Replace step 7 with step 7.1 after pipeline is fully migrated to aspect specific events.
// 7.1 Produce aspect specific MAE after a successful update
// TODO: Replace the previous step with the step below, after pipeline is fully migrated to aspect specific events.
// Produce aspect specific MAE after a successful update
if (_emitAspectSpecificAuditEvent) {
if (_alwaysEmitAspectSpecificAuditEvent || oldValue != newValue) {
_producer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, newValue);
}
}
// 8. Invoke post-update hooks if there's any
// Invoke post-update hooks if there's any
if (_aspectPostUpdateHooksMap.containsKey(aspectClass)) {
_aspectPostUpdateHooksMap.get(aspectClass).forEach(hook -> hook.accept(urn, newValue));
}

return newValue;
}

/**
* Deletes the latest version of aspect for an entity.
*
* <p>The new aspect will have an automatically assigned version number, which is guaranteed to be positive and
* monotonically increasing. Older versions of aspect will be purged automatically based on the retention setting.
*
* <p>Note that we do not support Post-update hooks while soft deleting an aspect
*
* @param urn urn the URN for the entity the aspect is attached to
* @param aspectClass aspectClass of the aspect being saved
* @param auditStamp the audit stamp of the previous latest aspect, null if new value is the first version
* @param maxTransactionRetry maximum number of transaction retries before throwing an exception
* @param <ASPECT> must be a supported aspect type in {@code ASPECT_UNION}
*/
public <ASPECT extends RecordTemplate> void delete(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass,
@Nonnull AuditStamp auditStamp, int maxTransactionRetry) {

checkValidAspect(aspectClass);

runInTransactionWithRetry(() -> {
final AspectEntry<ASPECT> latest = getLatest(urn, aspectClass);

return addCommon(urn, latest, null, aspectClass, auditStamp, new DefaultEqualityTester<>());
}, maxTransactionRetry);

// TODO: add support for sending MAE for soft deleted aspects
}

/**
* Similar to {@link #add(Urn, Class, Function, AuditStamp, int)} but uses the default maximum transaction retry.
*/
Expand All @@ -370,6 +423,15 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull ASP
return add(urn, (Class<ASPECT>) newValue.getClass(), ignored -> newValue, auditStamp);
}

/**
* Similar to {@link #delete(Urn, Class, AuditStamp, int)} but uses the default maximum transaction retry.
*/
@Nonnull
public <ASPECT extends RecordTemplate> void delete(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass,
@Nonnull AuditStamp auditStamp) {
delete(urn, aspectClass, auditStamp, DEFAULT_MAX_TRANSACTION_RETRY);
}

private <ASPECT extends RecordTemplate> void applyRetention(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass,
@Nonnull Retention retention, long largestVersion) {
if (retention instanceof IndefiniteRetention) {
Expand Down Expand Up @@ -400,7 +462,7 @@ private <ASPECT extends RecordTemplate> void applyRetention(@Nonnull URN urn, @N
*/
protected abstract <ASPECT extends RecordTemplate> long saveLatest(@Nonnull URN urn,
@Nonnull Class<ASPECT> aspectClass, @Nullable ASPECT oldEntry, @Nullable AuditStamp oldAuditStamp,
@Nonnull ASPECT newEntry, @Nonnull AuditStamp newAuditStamp);
@Nullable ASPECT newEntry, @Nonnull AuditStamp newAuditStamp);

/**
* Saves the new value of an aspect to local secondary index.
Expand Down Expand Up @@ -587,12 +649,13 @@ protected abstract <ASPECT extends RecordTemplate> long getNextVersion(@Nonnull
*
* @param urn {@link Urn} for the entity
* @param value the aspect to save
* @param aspectClass the type of aspect to save
* @param auditStamp the {@link AuditStamp} for the aspect
* @param version the version for the aspect
* @param insert use insert, instead of update, operation to save
*/
protected abstract void save(@Nonnull URN urn, @Nonnull RecordTemplate value, @Nonnull AuditStamp auditStamp,
long version, boolean insert);
protected abstract <ASPECT extends RecordTemplate> void save(@Nonnull URN urn, @Nullable RecordTemplate value,
@Nonnull Class<ASPECT> aspectClass, @Nonnull AuditStamp auditStamp, long version, boolean insert);

/**
* Returns a boolean representing if an Urn has any Aspects associated with it (i.e. if it exists in the DB).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ protected <ASPECT extends RecordTemplate> long getNextVersion(FooUrn urn, Class<
}

@Override
protected void save(FooUrn urn, RecordTemplate value, AuditStamp auditStamp, long version, boolean insert) {
protected <ASPECT extends RecordTemplate> void save(FooUrn urn, RecordTemplate value, Class<ASPECT> aspectClass,
AuditStamp auditStamp, long version, boolean insert) {

}

Expand Down Expand Up @@ -233,6 +234,22 @@ public void testMAEEmissionNoValueChange() throws URISyntaxException {
verifyNoMoreInteractions(_mockEventProducer);
}

@Test
public void testMAEWithNullValue() throws URISyntaxException {
FooUrn urn = new FooUrn(1);
AspectFoo foo = new AspectFoo().setValue("foo");
_dummyLocalDAO.setAlwaysEmitAuditEvent(true);
expectGetLatest(urn, AspectFoo.class, Arrays.asList(null, makeAspectEntry(foo, _dummyAuditStamp)));

_dummyLocalDAO.add(urn, foo, _dummyAuditStamp);
_dummyLocalDAO.delete(urn, AspectFoo.class, _dummyAuditStamp);

verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, null, foo);
// TODO: ensure MAE is produced with newValue set as null for soft deleted aspect
// verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, foo, null);
verifyNoMoreInteractions(_mockEventProducer);
}

@Test
public void testAddSamePreUpdateHookTwice() {
BiConsumer<FooUrn, AspectFoo> hook = (urn, foo) -> {
Expand Down
2 changes: 1 addition & 1 deletion dao-impl/ebean-dao/gma-create-all.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ create table metadata_aspect (
urn varchar(500) not null,
aspect varchar(200) not null,
version bigint not null,
metadata clob not null,
metadata clob,
createdon timestamp not null,
createdby varchar(255) not null,
createdfor varchar(255),
Expand Down
Loading

0 comments on commit 0266fb0

Please sign in to comment.