Skip to content

Commit 54a2dfa

Browse files
committed
update file arrival trigger example
1 parent 236c688 commit 54a2dfa

4 files changed

Lines changed: 18 additions & 66 deletions

File tree

knowledge_base/pydabs_job_file_arrival/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ The Lakeflow Job is configured with:
1010
- **Automatic processing**: When files are detected, the job automatically runs and processes them
1111

1212
* `src/`: Notebook source code for this project.
13-
* `src/process_files.ipynb`: Processes newly arrived files from the volume path.
13+
* `src/process_files.py`: Processes newly arrived files from the volume path.
1414
* `resources/`: Resource configurations (jobs, pipelines, etc.)
1515
* `resources/file_arrival.py`: PyDABs job with file arrival trigger configuration.
1616

Lines changed: 12 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,16 @@
1-
from databricks.bundles.jobs import (
2-
FileArrivalTriggerConfiguration,
3-
Job,
4-
NotebookTask,
5-
Task,
6-
TriggerSettings,
1+
from databricks.bundles.jobs import FileArrivalTriggerConfiguration, Job, Task, NotebookTask
2+
3+
process_files = Task(
4+
task_key="process_files",
5+
notebook_task=NotebookTask(notebook_path="src/files/process_files.py"),
76
)
87

9-
pydabs_job_file_arrival = Job(
10-
name="pydabs_job_file_arrival",
11-
tasks=[
12-
Task(
13-
task_key="process_new_files",
14-
notebook_task=NotebookTask(
15-
notebook_path="src/process_files.ipynb",
16-
base_parameters={
17-
"file_arrival_location": "{{job.trigger.file_arrival.location}}"
18-
},
19-
),
20-
)
21-
],
22-
trigger=TriggerSettings(
23-
file_arrival=FileArrivalTriggerConfiguration(
24-
url="/Volumes/your_catalog/your_schema/your_volume/",
25-
min_time_between_triggers_seconds=60,
26-
wait_after_last_change_seconds=90,
27-
),
8+
job = Job(
9+
name="file_arrival_example",
10+
trigger=FileArrivalTriggerConfiguration(
11+
url="Volumes/main.raw.incoming", # UC volume or external location
12+
min_time_between_triggers_seconds=60,
13+
wait_after_last_change_seconds=90,
2814
),
15+
tasks=[process_files],
2916
)

knowledge_base/pydabs_job_file_arrival/src/process_files.ipynb

Lines changed: 0 additions & 40 deletions
This file was deleted.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from pyspark.sql import functions as F
2+
3+
df = spark.read.format("cloudFiles") \
4+
.option("cloudFiles.format", "csv") \
5+
.load("/Volumes/main/raw/incoming")

0 commit comments

Comments
 (0)