Skip to content

Commit 66d3de8

Browse files
committed
update for each example
1 parent fdb1bf6 commit 66d3de8

8 files changed

Lines changed: 78 additions & 70 deletions

File tree

knowledge_base/pydabs_job_with_for_each/README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
This example demonstrates a simple Databricks job that uses a foreach task.
44

55
* `src/`: Python source code for this project.
6-
* `src/pydabs_job_with_for_each/`: Shared Python code that can be used by jobs and pipelines.
6+
* `foreach/generate_items.ipynb`: A notebook which returns a list of items to be used for task generation.
7+
* `foreach/process_item.ipynb`: A notebook which will process an item.
78
* `resources/`: Resource configurations (jobs, pipelines, etc.)
89

910

@@ -45,7 +46,7 @@ with this project. It's also possible to interact with it directly using the CLI
4546
4647
This deploys everything that's defined for this project.
4748
For example, the default template would deploy a job called
48-
`[dev yourname] pydabs_airflow_job` to your workspace.
49+
`[dev yourname] for_each_task_example` to your workspace.
4950
You can find that resource by opening your workpace and clicking on **Jobs & Pipelines**.
5051
5152
3. Similarly, to deploy a production copy, type:
Lines changed: 26 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,35 @@
1-
from databricks.bundles.jobs import Job, Task, NotebookTask, ForEachTask, TaskDependency
2-
3-
extract = Task(
4-
task_key="extract",
5-
notebook_task=NotebookTask(notebook_path="src/notebook_extract.py"),
1+
from databricks.bundles.jobs import (
2+
Job,
3+
Task,
4+
NotebookTask,
5+
ForEachTask,
6+
TaskDependency,
7+
JobEnvironment,
8+
Environment,
69
)
7-
process_item_iteration = Task(
8-
task_key="process_item_iteration",
9-
notebook_task=NotebookTask(
10-
notebook_path="src/notebook_process_item.py",
11-
base_parameters={
12-
"index": "{{input}}",
13-
},
14-
),
10+
11+
generate_items = Task(
12+
task_key="generate_items",
13+
notebook_task=NotebookTask(notebook_path="src/foreach/generate_items.ipynb"),
1514
)
15+
1616
process_item = Task(
1717
task_key="process_item",
18-
depends_on=[TaskDependency(task_key="extract")],
1918
for_each_task=ForEachTask(
20-
inputs="{{tasks.extract.values.indexes}}",
21-
task=process_item_iteration,
19+
inputs="{{tasks.generate_items.values.items}}",
20+
task=Task(
21+
task_key="process_item_iteration",
22+
notebook_task=NotebookTask(
23+
notebook_path="src/foreach/process_item.ipynb",
24+
base_parameters={"item": "{{input}}"},
25+
),
26+
),
2227
concurrency=10,
2328
),
29+
depends_on=[TaskDependency(task_key="generate_items")],
2430
)
2531

26-
for_each_example = Job(
27-
name="for_each_example",
28-
tasks=[
29-
extract,
30-
process_item,
31-
],
32-
parameters=[
33-
{
34-
"name": "lookup_file_name",
35-
"default": "/Volumes/main/for_each_example/hotchpotch/my_file.json",
36-
},
37-
],
38-
)
32+
for_each_task_example = Job(
33+
name="for_each_task_example",
34+
tasks=[generate_items, process_item],
35+
)
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": null,
6+
"id": "e4c15846",
7+
"metadata": {},
8+
"outputs": [],
9+
"source": [
10+
"from databricks.sdk.runtime import dbutils\n",
11+
"\n",
12+
"items = [1, 2, 3]\n",
13+
"dbutils.jobs.taskValues.set(key=\"items\", value=items)"
14+
]
15+
}
16+
],
17+
"metadata": {
18+
"language_info": {
19+
"name": "python"
20+
}
21+
},
22+
"nbformat": 4,
23+
"nbformat_minor": 5
24+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": null,
6+
"metadata": {},
7+
"outputs": [],
8+
"source": [
9+
"# Runs once per item in the for-each. Do not call dbutils.jobs.taskValues.set() here.\n",
10+
"from databricks.sdk.runtime import dbutils\n",
11+
"\n",
12+
"# Current iteration value passed from the for-each task (base_parameters: item = {{input}})\n",
13+
"current_item = dbutils.widgets.get(\"item\")\n",
14+
"print(f\"Processing item: {current_item}\")"
15+
]
16+
}
17+
],
18+
"metadata": {
19+
"language_info": {
20+
"name": "python"
21+
}
22+
},
23+
"nbformat": 4,
24+
"nbformat_minor": 5
25+
}

knowledge_base/pydabs_job_with_for_each/src/notebook_extract.py

Lines changed: 0 additions & 18 deletions
This file was deleted.

knowledge_base/pydabs_job_with_for_each/src/notebook_process_item.py

Lines changed: 0 additions & 15 deletions
This file was deleted.

knowledge_base/pydabs_job_with_for_each/src/pydabs_job_with_for_each/__init__.py

Whitespace-only changes.

knowledge_base/pydabs_job_with_for_each/src/pydabs_job_with_for_each/main.py

Lines changed: 0 additions & 6 deletions
This file was deleted.

0 commit comments

Comments
 (0)