-
Notifications
You must be signed in to change notification settings - Fork 211
Add workflow results #1275
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add workflow results #1275
Changes from 24 commits
b8d3226
36ff69b
f73ca9c
6768fe0
f6d0d53
8a95a94
efe5f55
8e61481
b2d3013
3c998a8
af0787c
3c24f9a
ee06c5d
57fd481
b17b484
24daddb
7646545
e39c01a
8a140fd
ef63a96
1075bd3
20e717f
1b63a97
2384628
0459053
c94492a
90c98cb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| # Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| from abc import ABC, abstractmethod | ||
| from dataclasses import dataclass, field | ||
| from typing import Any | ||
|
|
||
| from nemo_curator.tasks import Task | ||
|
|
||
|
|
||
| @dataclass | ||
| class WorkflowRunResult: | ||
| """Container returned by high-level workflows to expose pipeline outputs. | ||
| Attributes: | ||
| workflow_name: Human readable workflow identifier (e.g., "fuzzy_dedup"). | ||
| pipeline_tasks: Mapping of pipeline names to the ``Task`` objects they produced. | ||
| metadata: Free-form dictionary for workflow specific timing or counters. | ||
| """ | ||
|
|
||
| workflow_name: str | ||
| pipeline_tasks: dict[str, list[Task]] = field(default_factory=dict) | ||
| metadata: dict[str, Any] = field(default_factory=dict) | ||
|
|
||
| def add_pipeline_tasks(self, pipeline_name: str, tasks: list[Task] | None) -> None: | ||
| """Record the tasks emitted by a pipeline run (empty list if None).""" | ||
| self.pipeline_tasks[pipeline_name] = list(tasks or []) | ||
|
|
||
| def extend_metadata(self, updates: dict[str, Any] | None = None) -> None: | ||
| """Update metadata dictionary in-place.""" | ||
| if updates: | ||
| self.metadata.update(updates) | ||
|
|
||
| def add_metadata(self, key: str, value: Any) -> None: # noqa: ANN401 | ||
| """Add a metadata key-value pair.""" | ||
| self.metadata[key] = value | ||
|
|
||
| def get_metadata(self, key: str) -> Any: # noqa: ANN401 | ||
| """Get a metadata value.""" | ||
| return self.metadata.get(key) | ||
|
|
||
|
|
||
| class WorkflowBase(ABC): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Curious about the need for this class. Is it to add more stuff while expanding in the future?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yup basically |
||
| @abstractmethod | ||
| def run(self, *args, **kwargs) -> WorkflowRunResult: ... | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -21,6 +21,7 @@ | |||||||||||||||||||||||||||||||||||||
| from nemo_curator.backends.experimental.ray_actor_pool import RayActorPoolExecutor | ||||||||||||||||||||||||||||||||||||||
| from nemo_curator.backends.utils import merge_executor_configs, warn_on_env_var_override | ||||||||||||||||||||||||||||||||||||||
| from nemo_curator.pipeline import Pipeline | ||||||||||||||||||||||||||||||||||||||
| from nemo_curator.pipeline.workflow import WorkflowBase, WorkflowRunResult | ||||||||||||||||||||||||||||||||||||||
| from nemo_curator.stages.deduplication.exact.identification import ExactDuplicateIdentification | ||||||||||||||||||||||||||||||||||||||
| from nemo_curator.stages.deduplication.id_generator import ( | ||||||||||||||||||||||||||||||||||||||
| create_id_generator_actor, | ||||||||||||||||||||||||||||||||||||||
|
|
@@ -33,7 +34,7 @@ | |||||||||||||||||||||||||||||||||||||
| ID_GENERATOR_OUTPUT_FILENAME = "exact_id_generator.json" | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| class ExactDeduplicationWorkflow: | ||||||||||||||||||||||||||||||||||||||
| class ExactDeduplicationWorkflow(WorkflowBase): | ||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||
| A pipeline that performs exact deduplication of a dataset. | ||||||||||||||||||||||||||||||||||||||
| It consists of the following stages: | ||||||||||||||||||||||||||||||||||||||
|
|
@@ -165,17 +166,27 @@ def _validate_initial_tasks(self, initial_tasks: list[FileGroupTask] | None) -> | |||||||||||||||||||||||||||||||||||||
| msg = "input_path to the dataset must be provided if initial_tasks are not provided manually." | ||||||||||||||||||||||||||||||||||||||
| raise ValueError(msg) | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| def run( | ||||||||||||||||||||||||||||||||||||||
| def run( # noqa: PLR0915 | ||||||||||||||||||||||||||||||||||||||
| self, initial_tasks: list[FileGroupTask] | None = None, executor: RayActorPoolExecutor | None = None | ||||||||||||||||||||||||||||||||||||||
| ) -> None: | ||||||||||||||||||||||||||||||||||||||
| ) -> WorkflowRunResult: | ||||||||||||||||||||||||||||||||||||||
| """Run the deduplication pipeline. | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| Args: | ||||||||||||||||||||||||||||||||||||||
| initial_tasks: | ||||||||||||||||||||||||||||||||||||||
| Set of FileGroupTasks generated by a previous stage pointing to the dataset to be deduplicated. | ||||||||||||||||||||||||||||||||||||||
| If not provided, the pipeline will generate the input tasks based on the input_dir and input_file_extensions. | ||||||||||||||||||||||||||||||||||||||
| executor: RayActorPoolExecutor | None | ||||||||||||||||||||||||||||||||||||||
| Executor to use for the pipeline. | ||||||||||||||||||||||||||||||||||||||
| If not provided, the default RayActorPoolExecutor will be used. | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| Returns: | ||||||||||||||||||||||||||||||||||||||
| WorkflowRunResult object containing the results and timing information | ||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||
| self._validate_initial_tasks(initial_tasks) | ||||||||||||||||||||||||||||||||||||||
| workflow_result = WorkflowRunResult(workflow_name="exact_deduplication") | ||||||||||||||||||||||||||||||||||||||
| input_filegroups_time = 0.0 | ||||||||||||||||||||||||||||||||||||||
| identification_time = 0.0 | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| if executor is None: | ||||||||||||||||||||||||||||||||||||||
| executor = RayActorPoolExecutor(config=self.executor_config) | ||||||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||||||
|
|
@@ -185,6 +196,8 @@ def run( | |||||||||||||||||||||||||||||||||||||
| previous_config = executor.config | ||||||||||||||||||||||||||||||||||||||
| executor.config = merge_executor_configs(executor.config, self.executor_config) | ||||||||||||||||||||||||||||||||||||||
| warn_on_env_var_override(previous_config, executor.config) | ||||||||||||||||||||||||||||||||||||||
| total_start_time = time.time() | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| if self.assign_id: | ||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||
| create_id_generator_actor() | ||||||||||||||||||||||||||||||||||||||
|
|
@@ -196,26 +209,31 @@ def run( | |||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||
| raise RuntimeError(err_msg) from None | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| id_generator_path = None | ||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||
| start_time = time.time() | ||||||||||||||||||||||||||||||||||||||
| if initial_tasks is None: | ||||||||||||||||||||||||||||||||||||||
| input_filegroups_pipeline = self._create_input_filegroups() | ||||||||||||||||||||||||||||||||||||||
| initial_tasks = input_filegroups_pipeline.run(executor=executor, initial_tasks=initial_tasks) | ||||||||||||||||||||||||||||||||||||||
| initial_filegroups_end_time = time.time() | ||||||||||||||||||||||||||||||||||||||
| logger.info( | ||||||||||||||||||||||||||||||||||||||
| f"Created input tasks from {self.input_path} in {(initial_filegroups_end_time - start_time):.2f} seconds" | ||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||
| input_start_time = time.time() | ||||||||||||||||||||||||||||||||||||||
| initial_tasks = input_filegroups_pipeline.run(executor=executor, initial_tasks=None) | ||||||||||||||||||||||||||||||||||||||
| input_filegroups_time = time.time() - input_start_time | ||||||||||||||||||||||||||||||||||||||
| workflow_result.add_metadata("input_filegroups_time", input_filegroups_time) | ||||||||||||||||||||||||||||||||||||||
| workflow_result.add_pipeline_tasks("input_filegroups", initial_tasks) | ||||||||||||||||||||||||||||||||||||||
| logger.info(f"Created input tasks from {self.input_path} in {input_filegroups_time:.2f} seconds") | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| initial_tasks = initial_tasks or [] | ||||||||||||||||||||||||||||||||||||||
| identification_pipeline = self._create_identification_pipeline(num_input_tasks=len(initial_tasks)) | ||||||||||||||||||||||||||||||||||||||
| identification_start_time = time.time() | ||||||||||||||||||||||||||||||||||||||
| removal_id_tasks = identification_pipeline.run(executor=executor, initial_tasks=initial_tasks) | ||||||||||||||||||||||||||||||||||||||
| identification_end_time = time.time() | ||||||||||||||||||||||||||||||||||||||
| logger.info( | ||||||||||||||||||||||||||||||||||||||
| f"Exact duplicate identification pipeline completed in {(identification_end_time - identification_start_time):.2f} seconds" | ||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||
| identification_time = identification_end_time - identification_start_time | ||||||||||||||||||||||||||||||||||||||
| workflow_result.add_metadata("identification_time", identification_time) | ||||||||||||||||||||||||||||||||||||||
| workflow_result.add_pipeline_tasks("identification", removal_id_tasks) | ||||||||||||||||||||||||||||||||||||||
| logger.info(f"Exact duplicate identification pipeline completed in {identification_time:.2f} seconds") | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| num_duplicates = sum(task._metadata.get("num_removal_ids", 0) for task in removal_id_tasks) | ||||||||||||||||||||||||||||||||||||||
| if num_duplicates == 0: | ||||||||||||||||||||||||||||||||||||||
| num_duplicates_identified = sum( | ||||||||||||||||||||||||||||||||||||||
| task._metadata.get("num_removal_ids", 0) for task in removal_id_tasks or [] | ||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||
| if num_duplicates_identified == 0: | ||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+233
to
+236
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The variable Fix: Initialize
Suggested change
|
||||||||||||||||||||||||||||||||||||||
| logger.info("No exact duplicates found in the dataset.") | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| if self.assign_id: | ||||||||||||||||||||||||||||||||||||||
|
|
@@ -227,8 +245,18 @@ def run( | |||||||||||||||||||||||||||||||||||||
| else None, | ||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||
| logger.info(f"Id generator written to {id_generator_path}") | ||||||||||||||||||||||||||||||||||||||
| end_time = time.time() | ||||||||||||||||||||||||||||||||||||||
| logger.info(f"Exact deduplication pipeline completed in {(end_time - start_time):.2f} seconds") | ||||||||||||||||||||||||||||||||||||||
| finally: | ||||||||||||||||||||||||||||||||||||||
| if self.assign_id: | ||||||||||||||||||||||||||||||||||||||
| kill_id_generator_actor() | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| total_end_time = time.time() | ||||||||||||||||||||||||||||||||||||||
| total_time = total_end_time - total_start_time | ||||||||||||||||||||||||||||||||||||||
| workflow_summary = { | ||||||||||||||||||||||||||||||||||||||
| "total_time": total_time, | ||||||||||||||||||||||||||||||||||||||
| "num_duplicates": num_duplicates_identified, | ||||||||||||||||||||||||||||||||||||||
| # paths | ||||||||||||||||||||||||||||||||||||||
| "id_generator_path": id_generator_path, | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
| workflow_result.extend_metadata(workflow_summary) | ||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+254
to
+260
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The variable To fix this, initialize
Suggested change
Then update the workflow_summary to conditionally include it only when it's not None. |
||||||||||||||||||||||||||||||||||||||
| logger.info(f"Exact deduplication pipeline completed in {total_time:.2f} seconds") | ||||||||||||||||||||||||||||||||||||||
| return workflow_result | ||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ | |
| from nemo_curator.backends.experimental.ray_actor_pool import RayActorPoolExecutor | ||
| from nemo_curator.backends.utils import merge_executor_configs, warn_on_env_var_override | ||
| from nemo_curator.pipeline import Pipeline | ||
| from nemo_curator.pipeline.workflow import WorkflowBase, WorkflowRunResult | ||
| from nemo_curator.stages.deduplication.fuzzy.buckets_to_edges import BucketsToEdgesStage | ||
| from nemo_curator.stages.deduplication.fuzzy.connected_components import ConnectedComponentsStage | ||
| from nemo_curator.stages.deduplication.fuzzy.identify_duplicates import IdentifyDuplicatesStage | ||
|
|
@@ -37,7 +38,7 @@ | |
| ID_GENERATOR_OUTPUT_FILENAME = "fuzzy_id_generator.json" | ||
|
|
||
|
|
||
| class FuzzyDeduplicationWorkflow: | ||
| class FuzzyDeduplicationWorkflow(WorkflowBase): | ||
| """ | ||
| A pipeline that performs fuzzy deduplication of a dataset. | ||
| It consists of the following stages: | ||
|
|
@@ -267,9 +268,9 @@ def _validate_initial_tasks(self, initial_tasks: list[FileGroupTask] | None) -> | |
| msg = "input_path to the dataset must be provided if initial_tasks are not provided manually." | ||
| raise ValueError(msg) | ||
|
|
||
| def run( | ||
| def run( # noqa: PLR0915 | ||
| self, initial_tasks: list[FileGroupTask] | None = None, executor: RayActorPoolExecutor | None = None | ||
| ) -> None: | ||
| ) -> WorkflowRunResult: | ||
| """Run the deduplication pipeline. | ||
|
|
||
| Args: | ||
|
|
@@ -280,6 +281,11 @@ def run( | |
|
|
||
| """ | ||
| self._validate_initial_tasks(initial_tasks) | ||
| workflow_result = WorkflowRunResult(workflow_name="fuzzy_deduplication") | ||
| minhash_time = 0.0 | ||
| lsh_time = 0.0 | ||
| connected_components_time = 0.0 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: can we call this cc_pipeline_time or connected_components_pipeline time? The nit because the pipeline does much more than connected components (buckets to edges, connected components, shuffle on component etc). |
||
|
|
||
| if executor is None: | ||
| executor = RayActorPoolExecutor(config=self.executor_config) | ||
| else: | ||
|
|
@@ -290,6 +296,8 @@ def run( | |
| executor.config = merge_executor_configs(executor.config, self.executor_config) | ||
| warn_on_env_var_override(previous_config, executor.config) | ||
|
|
||
| total_start_time = time.time() | ||
|
|
||
| try: | ||
| create_id_generator_actor() | ||
| except ValueError: | ||
|
|
@@ -300,50 +308,66 @@ def run( | |
| """ | ||
| raise RuntimeError(err_msg) from None | ||
|
|
||
| id_generator_path = None | ||
| try: | ||
| # Step 1: Minhash | ||
| minhash_pipeline = self._create_minhash_pipeline(generate_input_filegroups=initial_tasks is None) | ||
| start_time = time.time() | ||
| minhash_pipeline.run(executor=executor, initial_tasks=initial_tasks) | ||
| minhash_start_time = time.time() | ||
| minhash_tasks = minhash_pipeline.run(executor=executor, initial_tasks=initial_tasks) | ||
| minhash_end_time = time.time() | ||
| logger.info(f"Minhash pipeline completed in {(minhash_end_time - start_time):.2f} seconds") | ||
| minhash_time = minhash_end_time - minhash_start_time | ||
| workflow_result.add_pipeline_tasks("minhash", minhash_tasks) | ||
| workflow_result.add_metadata("minhash_time", minhash_time) | ||
| logger.info(f"Minhash pipeline completed in {minhash_time:.2f} seconds") | ||
| output_fs = get_fs( | ||
| self.output_path, | ||
| self.write_kwargs.get("storage_options") if self.write_kwargs is not None else None, | ||
| ) | ||
| id_generator_path = output_fs.sep.join([self.output_path, ID_GENERATOR_OUTPUT_FILENAME]) | ||
| write_id_generator_to_disk( | ||
| id_generator_path, | ||
| storage_options=self.write_kwargs.get("storage_options") if self.write_kwargs is not None else None, | ||
| ) | ||
| logger.info(f"Id generator written to {id_generator_path}") | ||
| workflow_result.add_metadata("id_generator_path", id_generator_path) | ||
|
|
||
| # Step 2: LSH | ||
| lsh_pipeline = self._create_lsh_pipeline() | ||
| lsh_start_time = time.time() | ||
| # LSH stage generates it's own input tasks from the minhash directory | ||
| lsh_tasks = lsh_pipeline.run(executor=executor, initial_tasks=None) | ||
| lsh_end_time = time.time() | ||
| logger.info(f"LSH pipeline completed in {(lsh_end_time - lsh_start_time):.2f} seconds") | ||
| lsh_time = lsh_end_time - lsh_start_time | ||
| workflow_result.add_pipeline_tasks("lsh", lsh_tasks) | ||
| workflow_result.add_metadata("lsh_time", lsh_time) | ||
| logger.info(f"LSH pipeline completed in {lsh_time:.2f} seconds") | ||
|
|
||
| valid_lsh_tasks = [task for task in lsh_tasks if task._metadata.get("num_docs", 0) > 0] | ||
| valid_lsh_tasks = [task for task in lsh_tasks or [] if task._metadata.get("num_docs", 0) > 0] | ||
| if len(valid_lsh_tasks) == 0: | ||
| logger.info("No potential duplicates found in the dataset. Skipping connected components pipeline.") | ||
| workflow_result.add_metadata("num_duplicates", 0) | ||
| else: | ||
| # Step 3: Connected components | ||
| connected_components_pipeline = self._create_connected_components_pipeline() | ||
| connected_components_start_time = time.time() | ||
| connected_components_tasks = connected_components_pipeline.run( | ||
| executor=executor, initial_tasks=valid_lsh_tasks | ||
| ) | ||
| connected_components_end_time = time.time() | ||
| logger.info( | ||
| f"Connected components pipeline completed in {(connected_components_end_time - connected_components_start_time):.2f} seconds" | ||
| ) | ||
| num_removed_documents = sum( | ||
| task._metadata.get("num_removal_ids", 0) for task in connected_components_tasks | ||
| connected_components_time = connected_components_end_time - connected_components_start_time | ||
| workflow_result.add_pipeline_tasks("connected_components", connected_components_tasks) | ||
| workflow_result.add_metadata("connected_components_time", connected_components_time) | ||
| logger.info(f"Connected components pipeline completed in {connected_components_time:.2f} seconds") | ||
| num_duplicates_identified = sum( | ||
| task._metadata.get("num_removal_ids", 0) for task in (connected_components_tasks or []) | ||
| ) | ||
| logger.info(f"Number of documents removed: {num_removed_documents}") | ||
| output_fs = get_fs( | ||
| self.output_path, | ||
| self.write_kwargs.get("storage_options") if self.write_kwargs is not None else None, | ||
| ) | ||
| id_generator_path = output_fs.sep.join([self.output_path, ID_GENERATOR_OUTPUT_FILENAME]) | ||
| write_id_generator_to_disk( | ||
| id_generator_path, | ||
| storage_options=self.write_kwargs.get("storage_options") | ||
| if self.write_kwargs is not None | ||
| else None, | ||
| ) | ||
| logger.info(f"Id generator written to {id_generator_path}") | ||
| end_time = time.time() | ||
| logger.info(f"Fuzzy deduplication pipeline completed in {(end_time - start_time):.2f} seconds") | ||
| workflow_result.add_metadata("num_duplicates", num_duplicates_identified) | ||
| logger.info(f"Number of documents removed: {num_duplicates_identified}") | ||
| finally: | ||
| kill_id_generator_actor() | ||
|
|
||
| total_end_time = time.time() | ||
| total_time = total_end_time - total_start_time | ||
| workflow_result.add_metadata("total_time", total_time) | ||
| logger.info(f"Fuzzy deduplication pipeline completed in {total_time:.2f} seconds") | ||
| return workflow_result | ||
Uh oh!
There was an error while loading. Please reload this page.