Skip to content

Commit 79304c3

Browse files
committed
update conditional execution example
1 parent 545e1dd commit 79304c3

4 files changed

Lines changed: 42 additions & 35 deletions

File tree

knowledge_base/pydabs_job_conditional_execution/README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ The Lakeflow Job consists of following tasks:
66
1. Checks data quality and calculates bad records
77
2. Evaluates if bad records exceed a threshold (100 records)
88
3. Routes to different processing paths based on the condition:
9-
- If bad records > 100: runs `handle_bad_data` task
10-
- If bad records ≤ 100: runs `continue_pipeline` task
9+
- If bad records > 100: runs `fix_path` task
10+
- If bad records ≤ 100: runs `skip_path` task
1111

1212
* `src/`: Notebook source code for this project.
1313
* `src/check_quality.ipynb`: Checks data quality and outputs bad record count
14-
* `src/process_bad_data.ipynb`: Handles cases with high bad record count
15-
* `src/process_good_data.ipynb`: Continues normal pipeline for good data
14+
* `src/fix_path.ipynb`: Handles cases with high bad record count
15+
* `src/skip_path.ipynb`: Continues normal pipeline for good data
1616
* `resources/`: Resource configurations (jobs, pipelines, etc.)
1717
* `resources/conditional_execution.py`: PyDABs job definition with conditional tasks
1818

@@ -55,10 +55,10 @@ with this project. It's also possible to interact with it directly using the CLI
5555
5656
This deploys everything that's defined for this project.
5757
For example, this project will deploy a job called
58-
`[dev yourname] pydabs_job_conditional_execution` to your workspace.
58+
`[dev yourname] conditional_execution_example` to your workspace.
5959
You can find that resource by opening your workspace and clicking on **Jobs & Pipelines**.
6060
6161
3. To run the job, use the "run" command:
6262
```
63-
$ databricks bundle run pydabs_job_conditional_execution
63+
$ databricks bundle run conditional_execution_example
6464
```
Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,43 @@
11
from databricks.bundles.jobs import (
2-
ConditionTask,
32
Job,
4-
NotebookTask,
53
Task,
4+
NotebookTask,
5+
ConditionTask,
6+
ConditionTaskOp,
67
TaskDependency,
78
)
89

9-
pydabs_job_conditional_execution = Job(
10-
name="pydabs_job_conditional_execution",
11-
tasks=[
12-
Task(
13-
task_key="check_data_quality",
14-
notebook_task=NotebookTask(notebook_path="src/check_quality.ipynb"),
15-
),
16-
Task(
17-
task_key="evaluate_quality",
18-
condition_task=ConditionTask(
19-
left="{{tasks.check_data_quality.values.bad_records}}",
20-
op="GREATER_THAN",
21-
right="100",
22-
),
23-
depends_on=[TaskDependency(task_key="check_data_quality")],
24-
),
25-
Task(
26-
task_key="handle_bad_data",
27-
notebook_task=NotebookTask(notebook_path="src/process_bad_data.ipynb"),
28-
depends_on=[TaskDependency(task_key="evaluate_quality", outcome="true")],
29-
),
30-
Task(
31-
task_key="continue_pipeline",
32-
notebook_task=NotebookTask(notebook_path="src/process_good_data.ipynb"),
33-
depends_on=[TaskDependency(task_key="evaluate_quality", outcome="false")],
34-
),
35-
],
10+
# 1) Producer task: runs a notebook and emits a task value
11+
check_quality = Task(
12+
task_key="check_quality",
13+
notebook_task=NotebookTask(notebook_path="src/branch/check_quality.ipynb"),
14+
)
15+
16+
# 2) Branch task: evaluates an expression using an upstream task value
17+
branch = Task(
18+
task_key="branch",
19+
condition_task=ConditionTask(
20+
left="{{tasks.check_quality.values.bad_records}}",
21+
op=ConditionTaskOp.GREATER_THAN,
22+
right="100",
23+
),
24+
depends_on=[TaskDependency(task_key="check_quality")],
25+
)
26+
27+
# 3) Downstream tasks: gated on the condition outcome
28+
fix_path = Task(
29+
task_key="fix_path",
30+
notebook_task=NotebookTask(notebook_path="src/branch/fix_path.ipynb"),
31+
depends_on=[TaskDependency(task_key="branch", outcome="true")],
32+
)
33+
34+
skip_path = Task(
35+
task_key="skip_path",
36+
notebook_task=NotebookTask(notebook_path="src/branch/skip_path.ipynb"),
37+
depends_on=[TaskDependency(task_key="branch", outcome="false")],
38+
)
39+
40+
job = Job(
41+
name="conditional_execution_example",
42+
tasks=[check_quality, branch, fix_path, skip_path],
3643
)

knowledge_base/pydabs_job_conditional_execution/src/process_bad_data.ipynb renamed to knowledge_base/pydabs_job_conditional_execution/src/fix_path.ipynb

File renamed without changes.

knowledge_base/pydabs_job_conditional_execution/src/process_good_data.ipynb renamed to knowledge_base/pydabs_job_conditional_execution/src/skip_path.ipynb

File renamed without changes.

0 commit comments

Comments
 (0)