Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Deduplicate delta #401

Closed
wants to merge 33 commits into from
Closed

fix: Deduplicate delta #401

wants to merge 33 commits into from

Conversation

ralphrass
Copy link
Contributor

@ralphrass ralphrass commented Feb 11, 2025

Why? 📖

We want that Delta operations to be treated as a regular writer such as any other feature store writer.

What? 🔧

  • Delta config
  • Delta Feature Store Writer
  • Delta Writer

Type of change

Please delete options that are not relevant.

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update
  • Release

How everything was tested? 📏

Unit tests

hmeretti and others added 21 commits February 12, 2021 14:48
* [MLOP-634] Butterfree dev workflow, set triggers for branches staging and master (#280)

* Change github actions pipelines.

* Change pipeline logic.

* [BUG] Fix Staging GithubActions Pipeline (#283)

* New step on pipelie.

* Some adjusts.

* Apply only wheel. (#285)

* [BUG] Change version on setup.py to PyPI (#286)

* Add new make command to change version.

* Change command order.

* Change desc and variable name.

* Change command name.

* Keep milliseconds when using 'from_ms' argument in timestamp feature (#284)

* changed timestamp resolution

* fix import

* simple refactor

Co-authored-by: Henrique Camargo <[email protected]>

* Change trigger for pipeline staging (#287)

* Change trigger to publish dev pipeline.

* Some fix.

* Create a dev package. (#288)

* [MLOP-633] Butterfree dev workflow, update documentation (#281)

* Update workflow doc.

* Update README

* Add pre-release.

* Fix typo.

* [MLOP-632] Butterfree dev workflow, automate release description (#279)

* release 1.1.4

* update changelog

Co-authored-by: Mayara Moromisato <[email protected]>
Co-authored-by: Henrique Camargo <[email protected]>
Co-authored-by: AlvaroMarquesAndrade <[email protected]>
* [MLOP-636] Create migration classes (#282)

* [MLOP-635] Rebase Incremental Job/Interval Run branch for test on selected feature sets (#278)

* Add interval branch modifications.

* Add interval_runs notebook.

* Add tests.

* Apply style (black, flack8 and mypy).

* Fix tests.

* Change version to create package dev.

* Allow slide selection (#293)

* Fix Slide Duration Typo (#295)

* [MLOP-637] Implement diff method (#292)

* [MLOP-640] Create CLI with migrate command (#298)

* [MLOP-645] Implement query method, cassandra (#291)

* [MLOP-671] Implement get_schema on Spark client (#301)

* [MLOP-648] Implement query method, metastore (#294)

* Fix Validation Step (#302)

* [MLOP-647] [MLOP-646] Apply migrations (#300)

* add apply migration method

* add test apply migration

* add migrate actor with tests

* mypy compliant

* fix test interaction with mocked object

* Rebase and some adjusts.

Co-authored-by: Mayara Moromisato <[email protected]>

* [BUG] Apply create_partitions to historical validate (#303)

* Apply create_partitions to historical validate.

* Remove comments and adjusts.

* [BUG] Fix key path for validate read (#304)

* Fix key path

* bump version

Co-authored-by: AlvaroMarquesAndrade <1a789766b1c4c8b679e80f11fa6d63d42fa4bcdf>

* [FIX] Add Partition types for Metastore (#305)

* [MLOP-639] Track logs in S3 (#306)

* Apply tracking logs and logging config.

* Adjusts in CLI and logging.conf.

* Some adjusts.

* Change version to generate new dev package

* Fix version.

* Apply style.

* Add new assert in the migrate unit test.

* [BUG] Change logging config (#307)

* Change logging config.

* Some adjusts.

* Remove a code smell.

* Change solution for tracking logs (#308)

* Change tracking logs method.

* Change version to generate dev package.

* Change path name in S3

* Read and write consistency level options (#309)

* modify cassandra client to be region aware

* add option for the user to set read and write consistency levels on cassandra config

* add tests

* use env vars instead

* Update butterfree/configs/db/cassandra_config.py

Co-authored-by: Rodrigo Martins de Oliveira <[email protected]>

* Update butterfree/configs/db/cassandra_config.py

Co-authored-by: Rodrigo Martins de Oliveira <[email protected]>

Co-authored-by: Rodrigo Martins de Oliveira <[email protected]>

* Fix kafka reader. (#310)

* Fix path validate. (#311)

* Add local dc property (#312)

* add local dc property

* update version

* Remove metastore migrate (#313)

* Remove metastore migrate.

* Change version to create a dev package.

* Fix link in our docs. (#315)

* [BUG] Fix Cassandra Connect Session (#316)

* Fix Cassandra Connect Session.

* Apply style.

* Fix migration query. (#318)

* Fix migration query add type key. (#319)

* Fix db-config condition (#321)

* Fix db-config condition.

* Apply style.

* MLOP-642 Document migration in Butterfree (#320)

* update docs

* add more information and reference new cli.md file

* [MLOP-702] Debug mode for Automate Migration (#322)

* Create flag debug-mode.

* Fix tests.

* Fix migrate test.

* [MLOP-727] Improve logging messages (#325)

* Fix logging message for local file

* Remove json import

* [MLOP-728] Improve logging messages (#324)

* Improve logs.

* Revert debug-mode condition.

* Fix method to generate agg feature name. (#326)

* [MLOP-691]  Include step to add partition to SparkMetastore during writing of Butterfree (#327)

* Change writer type for interval mode.

* Some adjusts.

* Release 1.2.0

Co-authored-by: AlvaroMarquesAndrade <[email protected]>
Co-authored-by: Igor Gustavo Hoelscher <[email protected]>
Co-authored-by: Felipe Victorino Caputo <[email protected]>
Co-authored-by: Rodrigo Martins de Oliveira <[email protected]>
Co-authored-by: Gabriel Brandão <[email protected]>
* Add the missing link for H3 geohash (#330)

* Add the missing link for H3 geohash

* Update the H3 geohash link.

* Update the same link 

Update the same link in in spark_function_and_window.ipynb example

* Update README.md (#331)

* Update Github Actions Workflow runner (#332)

* Update Workflow runner version

* bump flake8-bandit

* chore: bypass false positive for S105

Co-authored-by: Lucas Cardozo <[email protected]>

* Delete sphinx version. (#334)

* Update files to staging (#336)

Co-authored-by: Rodrigo Martins de Oliveira <[email protected]>

* Update butterfree/configs/db/cassandra_config.py

Co-authored-by: Rodrigo Martins de Oliveira <[email protected]>

Co-authored-by: Rodrigo Martins de Oliveira <[email protected]>

* Fix kafka reader. (#310)

* Fix path validate. (#311)

* Add local dc property (#312)

* release 1.2.1

Co-authored-by: Jay Vala <[email protected]>
Co-authored-by: Rodrigo Martins de Oliveira <[email protected]>
Co-authored-by: Lucas Fonseca <[email protected]>
Co-authored-by: Lucas Cardozo <[email protected]>
Co-authored-by: Felipe Victorino Caputo <[email protected]>
* Less strict requirements (#333)

* bump a few requirements; increase lower bound for h3 version range; adding pyarrow dev dependency

* fix type repr for spark types; fix: broken tests (pyspark 3.4)

---------

Co-authored-by: Ralph Rassweiler <[email protected]>

* feat: optional row count validation (#340)

* fix: parameter, libs (#341)

---------
* feat(MLOP-1985): optional params (#347)

---------
* feat(MLOP-2145): add feature set creation script (#351)

* feat: add feature set creation script

* feat(mlop-2145): updating auto fs creation (#352)

* feat(updating-auto-fs-creation): adding methods to the class as private and add Table dataclass

* feat(updating-auto-fs-creation): using dataclass and adding typing

* feat(updating-auto-fs-creation): finish using all type hints and apply format

* feat(updating-auto-fs-creation): add docstring and auto-infer by df

* fix(updating-auto-fs-creation): remove unused format

* feat(updating-auto-fs-creation): creating flake8 ignore list

* feat(updating-auto-fs-creation): apply fmt

* feat(updating-auto-fs-creation): init file

* feat(updating-auto-fs-creation): making more readable

* feat(updating-auto-fs-creation): remove wrong file

* feat(updating-auto-fs-creation): apply fmt

* feat(updating-auto-fs-creation): ignoring mypy

* feat(updating-auto-fs-creation): add unit test

* feat(updating-auto-fs-creation): using Dataframe from pyspark

---------
* feat(mlop-2269): bump versions (#355)

* fix: bump versions adjust tests

* add checklist

* chore: bump python

* bump pyspark

* chore: java version all steps modified

* fix: sphinx version (#356)
* feat(MLOP-2236): add NTZ (#360)

* feat: NTZ and new tests
* fix: package
* fix: to lower case
* pin numpy
* fix: Cassandra config keys (#366)
* fix: new type (#368)
* Add Delta support (#370)
* fix: performance improvements (#374)
* fix: version, format (#376)
* fix: performance improvements (#374)
* fix StorageLevel
* chore(mlops-2456): rolling back test

* fix: rollback repartition (#386)
## [1.4.6](https://github.com/quintoandar/butterfree/releases/tag/1.4.6)

### Fixed
* [MLOP-2519] avoid configuring logger at lib level
([#393](#393))
* Rollback to latest stable release
([#391](#391))

[MLOP-2519]:
https://quintoandar.atlassian.net/browse/MLOP-2519?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ

---------

Co-authored-by: Mayara Moromisato <[email protected]>
Co-authored-by: hmeretti <[email protected]>
Co-authored-by: Henrique Camargo <[email protected]>
Co-authored-by: AlvaroMarquesAndrade <[email protected]>
Co-authored-by: Igor Gustavo Hoelscher <[email protected]>
Co-authored-by: Mayara Moromisato <[email protected]>
Co-authored-by: Felipe Victorino Caputo <[email protected]>
Co-authored-by: Rodrigo Martins de Oliveira <[email protected]>
Co-authored-by: Gabriel Brandão <[email protected]>
Co-authored-by: Jay Vala <[email protected]>
Co-authored-by: Lucas Fonseca <[email protected]>
Co-authored-by: Lucas Cardozo <[email protected]>
Co-authored-by: Ralph Rassweiler <[email protected]>
Co-authored-by: João Albuquerque <[email protected]>
Co-authored-by: Fernando Barbosa <[email protected]>
Co-authored-by: João Albuquerque <[email protected]>
Co-authored-by: Lucas Cardozo <[email protected]>
## [1.4.7](https://github.com/quintoandar/butterfree/releases/tag/1.4.7)
* Build latest release on PyPI.

### Removed
* chore: delete butterfree.configs.logger module

---------

Co-authored-by: Mayara Moromisato <[email protected]>
Co-authored-by: hmeretti <[email protected]>
Co-authored-by: Henrique Camargo <[email protected]>
Co-authored-by: AlvaroMarquesAndrade <[email protected]>
Co-authored-by: Igor Gustavo Hoelscher <[email protected]>
Co-authored-by: Mayara Moromisato <[email protected]>
Co-authored-by: Felipe Victorino Caputo <[email protected]>
Co-authored-by: Rodrigo Martins de Oliveira <[email protected]>
Co-authored-by: Gabriel Brandão <[email protected]>
Co-authored-by: Jay Vala <[email protected]>
Co-authored-by: Lucas Fonseca <[email protected]>
Co-authored-by: Lucas Cardozo <[email protected]>
Co-authored-by: Ralph Rassweiler <[email protected]>
Co-authored-by: João Albuquerque <[email protected]>
Co-authored-by: Fernando Barbosa <[email protected]>
Co-authored-by: João Albuquerque <[email protected]>
Co-authored-by: Lucas Cardozo <[email protected]>
@ralphrass ralphrass self-assigned this Feb 11, 2025
Copy link

Check out this pull request on  ReviewNB

See visual diffs & provide feedback on Jupyter Notebooks.


Powered by ReviewNB

@ralphrass ralphrass marked this pull request as ready for review February 11, 2025 20:20
@ralphrass ralphrass requested a review from a team as a code owner February 11, 2025 20:20
Copy link
Collaborator

@lecardozo lecardozo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice fix! I think we can look for a better placement for this feature though.

extra: the commit tree seems weird. Maybe you checked-out from a previous commit? Also, let's wait for the release-please improvements before merging this 😃

@@ -137,11 +139,13 @@ def __init__(
feature_set: FeatureSet,
sink: Sink,
spark_client: Optional[SparkClient] = None,
delta_config: Optional[DeltaConfig] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a weird place for this config 🤔 Since this is directly related to the writing part of the ETL, it should be better placed close to the Sink (probably as a custom HistoricalFeatureStoreWriter?)

@ralphrass
Copy link
Contributor Author

Nice fix! I think we can look for a better placement for this feature though.

extra: the commit tree seems weird. Maybe you checked-out from a previous commit? Also, let's wait for the release-please improvements before merging this 😃

Yes, I pulled from Master instead of Staging, sorry about that.

@ralphrass ralphrass requested a review from lecardozo February 12, 2025 12:51
raise ValueError(
"feature_set must be provided when deduplicate=True"
)
df_to_merge = feature_set._filter_duplicated_rows(source_df)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not suuuper comfortable with the idea having this deduplication happening at this level for two reasons:

  1. deduplication is essentially another transformation, so maybe we should left this responsibility on the feature set level?
  2. _filter_duplicated_rows is a feature set private method that we should try to avoid using outside of the feature set context to avoid unnecessary coupling.

Should we just assume that whenever this delta writer is going to be used with its MERGE operations that the dedup is handled on the feature set level?

when_matched_update: Optional[str] = None,
when_matched_delete: Optional[str] = None,
):
self.config = DeltaConfig(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was wondering if we should reuse keep the standard of having writer configs as concrete classes of the AbstractWriterConfig. I understand that we wouldn't reuse much of the enforced methods there. Any thoughts on that?

@ralphrass ralphrass requested a review from lecardozo February 20, 2025 13:02
@ralphrass ralphrass closed this Feb 20, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants