Skip to content

Commit 02cad29

Browse files
committed
add table update trigger example
1 parent 54a2dfa commit 02cad29

6 files changed

Lines changed: 152 additions & 0 deletions

File tree

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# pydabs_job_table_update_trigger
2+
3+
This example demonstrates a workflow when producers write to Unity Catalog tables, consumers can trigger on table updates instead of time‑based schedules.
4+
5+
6+
The Lakeflow Job is configured with:
7+
- **Table update trigger**: To run a job when new data is ready without the need for a continuously running cluster or knowledge of the processes that update a table.
8+
- **Configurable wait times**:
9+
- Minimum time between triggers: 0 seconds
10+
- Wait after last file change: 3600 seconds
11+
- **Automatic processing**: When updates are detected, the job automatically runs and processes them
12+
13+
* `src/`: Notebook source code for this project.
14+
* `src/assets/consume_table.py`:
15+
* `resources/`: Resource configurations (jobs, pipelines, etc.)
16+
* `resources/table_update.py`: PyDABs job with table update trigger configuration.
17+
18+
19+
## Getting started
20+
21+
Choose how you want to work on this project:
22+
23+
(a) Directly in your Databricks workspace, see
24+
https://docs.databricks.com/dev-tools/bundles/workspace.
25+
26+
(b) Locally with an IDE like Cursor or VS Code, see
27+
https://docs.databricks.com/vscode-ext.
28+
29+
(c) With command line tools, see https://docs.databricks.com/dev-tools/cli/databricks-cli.html
30+
31+
If you're developing with an IDE, dependencies for this project should be installed using uv:
32+
33+
* Make sure you have the UV package manager installed.
34+
It's an alternative to tools like pip: https://docs.astral.sh/uv/getting-started/installation/.
35+
* Run `uv sync --dev` to install the project's dependencies.
36+
37+
38+
# Using this project using the CLI
39+
40+
The Databricks workspace and IDE extensions provide a graphical interface for working
41+
with this project. It's also possible to interact with it directly using the CLI:
42+
43+
1. Authenticate to your Databricks workspace, if you have not done so already:
44+
```
45+
$ databricks configure
46+
```
47+
48+
2. To deploy a development copy of this project, type:
49+
```
50+
$ databricks bundle deploy --target dev
51+
```
52+
(Note that "dev" is the default target, so the `--target` parameter
53+
is optional here.)
54+
55+
This deploys everything that's defined for this project.
56+
For example, this project will deploy a job called
57+
`[dev yourname] table_update_example` to your workspace.
58+
You can find that resource by opening your workspace and clicking on **Jobs & Pipelines**.
59+
60+
3. Development vs. Production behavior
61+
- Dev target (mode: development): Schedules and automatic triggers are disabled by design, so the job will not auto-fire on file arrival. Use manual runs to test the logic.
62+
You can also manually run it with:
63+
64+
```
65+
$ databricks bundle run table_update_example
66+
```
67+
- Prod target (mode: production): Automatic triggers are active. Uploading a file to the configured Unity Catalog Volume path will trigger the job run when the trigger evaluates.
68+
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# This is a Databricks asset bundle definition for pydabs_airflow.
2+
# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation.
3+
bundle:
4+
name: pydabs_job_table_update_trigger
5+
6+
python:
7+
venv_path: .venv
8+
# Functions called to load resources defined in Python. See resources/__init__.py
9+
resources:
10+
- "resources:load_resources"
11+
12+
include:
13+
- resources/*.yml
14+
- resources/*/*.yml
15+
16+
targets:
17+
dev:
18+
mode: development
19+
default: true
20+
workspace:
21+
host: https://myworkspace.databricks.com
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
[project]
2+
name = "pydabs_job_table_update_trigger"
3+
version = "0.0.1"
4+
authors = [{ name = "Databricks Field Engineering" }]
5+
requires-python = ">=3.10,<=3.13"
6+
dependencies = [
7+
# Any dependencies for jobs and pipelines in this project can be added here
8+
# See also https://docs.databricks.com/dev-tools/bundles/library-dependencies
9+
#
10+
# LIMITATION: for pipelines, dependencies are cached during development;
11+
# add dependencies to the 'environment' section of pipeline.yml file instead
12+
]
13+
14+
[dependency-groups]
15+
dev = [
16+
"pytest",
17+
"databricks-connect>=15.4,<15.5",
18+
"databricks-bundles==0.275.0",
19+
]
20+
21+
[build-system]
22+
requires = ["hatchling"]
23+
build-backend = "hatchling.build"
24+
25+
[tool.black]
26+
line-length = 125
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from databricks.bundles.core import (
2+
Bundle,
3+
Resources,
4+
load_resources_from_current_package_module,
5+
)
6+
7+
8+
def load_resources(bundle: Bundle) -> Resources:
9+
"""
10+
'load_resources' function is referenced in databricks.yml and is responsible for loading
11+
bundle resources defined in Python code. This function is called by Databricks CLI during
12+
bundle deployment. After deployment, this function is not used.
13+
"""
14+
15+
# the default implementation loads all Python files in 'resources' directory
16+
return load_resources_from_current_package_module()
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from databricks.bundles.jobs import Job, NotebookTask, Task, TableUpdateTriggerConfiguration
2+
3+
consume_table = Task(
4+
task_key="consume_table",
5+
notebook_task=NotebookTask(notebook_path="src/assets/consume_table.py"),
6+
)
7+
8+
job = Job(
9+
name="table_update_example",
10+
trigger=TableUpdateTriggerConfiguration(
11+
table_names=["main.analytics.daily_events"],
12+
min_time_between_triggers_seconds=0,
13+
wait_after_last_change_seconds=3600,
14+
),
15+
tasks=[consume_table],
16+
)
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from pyspark.sql import functions as F
2+
3+
source_table = "main.analytics.daily_events"
4+
# Insert consumer logic here
5+
df = spark.read.table(source_table)

0 commit comments

Comments
 (0)