Skip to content

Commit 5efba43

Browse files
committed
Add pipeline_with_ai_parse_document example
This example demonstrates incremental document processing using: - ai_parse_document for extracting structured data from PDFs/images - ai_query for LLM-based content analysis - Lakeflow Declarative Pipelines for streaming incremental processing Key features: - Incremental streaming pipeline with 3 stages - Visual debugging notebook with interactive bounding boxes - Error handling and variant support - Production-ready DAB configuration
1 parent 9e7da7b commit 5efba43

9 files changed

Lines changed: 1190 additions & 0 deletions

File tree

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Databricks
2+
.databricks/
3+
4+
# Python
5+
build/
6+
dist/
7+
__pycache__/
8+
*.egg-info
9+
.venv/
10+
*.py[cod]
11+
12+
# Local configuration (keep your settings private)
13+
databricks.local.yml
14+
15+
# IDE
16+
.idea/
17+
.vscode/
18+
.DS_Store
19+
20+
# Scratch/temporary files
21+
scratch/**
22+
!scratch/README.md
23+
24+
# Test documents (don't commit large PDFs)
25+
*.pdf
26+
*.png
27+
*.jpg
28+
*.jpeg
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
# AI Document Processing Pipeline with Lakeflow
2+
3+
A Databricks Asset Bundle demonstrating **incremental document processing** using `ai_parse_document`, `ai_query`, and Lakeflow Declarative Pipelines.
4+
5+
## Overview
6+
7+
This example shows how to build an incremental streaming pipeline that:
8+
1. **Parses** PDFs and images using [`ai_parse_document`](https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_parse_document)
9+
2. **Extracts** clean text with incremental processing
10+
3. **Analyzes** content using [`ai_query`](https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_query) with LLMs
11+
12+
All stages use [Lakeflow Declarative Pipelines](https://docs.databricks.com/aws/en/dlt/concepts) streaming tables for efficient incremental processing.
13+
14+
## Architecture
15+
16+
```
17+
Source Documents (UC Volume)
18+
19+
ai_parse_document → parsed_documents_raw (variant)
20+
21+
text extraction → parsed_documents_text (string)
22+
23+
ai_query → parsed_documents_structured (json)
24+
```
25+
26+
### Key Features
27+
28+
- **Incremental processing**: Only new files are processed using Lakeflow streaming tables
29+
- **Error handling**: Gracefully handles parsing failures
30+
- **Visual debugging**: Interactive notebook for inspecting results
31+
32+
## Prerequisites
33+
34+
- Databricks workspace with Unity Catalog
35+
- Databricks CLI v0.218.0+
36+
- Unity Catalog volumes for:
37+
- Source documents (PDFs/images)
38+
- Parsed output images
39+
- AI functions (`ai_parse_document`, `ai_query`)
40+
41+
## Quick Start
42+
43+
1. **Install and authenticate**
44+
```bash
45+
databricks auth login --host https://your-workspace.cloud.databricks.com
46+
```
47+
48+
2. **Configure** `databricks.yml` with your workspace settings
49+
50+
3. **Validate** the bundle configuration
51+
```bash
52+
databricks bundle validate
53+
```
54+
55+
4. **Deploy**
56+
```bash
57+
databricks bundle deploy
58+
```
59+
60+
5. **Upload documents** to your source volume
61+
62+
6. **Run pipeline** from the Databricks UI (Lakeflow Pipelines)
63+
64+
## Configuration
65+
66+
Edit `databricks.yml`:
67+
68+
```yaml
69+
variables:
70+
catalog: main # Your catalog
71+
schema: default # Your schema
72+
source_volume_path: /Volumes/main/default/source_docs # Source PDFs
73+
output_volume_path: /Volumes/main/default/parsed_out # Parsed images
74+
```
75+
76+
## Pipeline Stages
77+
78+
### Stage 1: Document Parsing
79+
**File**: `src/transformations/ai_parse_document_variant.sql`
80+
81+
Uses `ai_parse_document` to extract text, tables, and metadata from PDFs/images:
82+
- Reads files from volume using `READ_FILES`
83+
- Stores variant output with bounding boxes
84+
- Incremental: processes only new files
85+
86+
### Stage 2: Text Extraction
87+
**File**: `src/transformations/ai_parse_document_text.sql`
88+
89+
Extracts clean concatenated text using `transform()`:
90+
- Handles both parser v1.0 and v1.1 formats
91+
- Uses `transform()` for incremental append (no aggregations)
92+
- Includes error handling for failed parses
93+
94+
### Stage 3: AI Query Extraction
95+
**File**: `src/transformations/ai_query_extraction.sql`
96+
97+
Applies LLM to extract structured insights:
98+
- Uses `ai_query` with Claude Sonnet 4
99+
- Customizable prompt for domain-specific extraction
100+
- Outputs structured JSON
101+
102+
## Visual Debugger
103+
104+
The included notebook visualizes parsing results with interactive bounding boxes.
105+
106+
**Open**: `src/explorations/ai_parse_document -- debug output.py`
107+
108+
**Configure widgets**:
109+
- `input_file`: `/Volumes/main/default/source_docs/sample.pdf`
110+
- `image_output_path`: `/Volumes/main/default/parsed_out/`
111+
- `page_selection`: `all` (or `1-3`, `1,5,10`)
112+
113+
**Features**:
114+
- Color-coded bounding boxes by element type
115+
- Hover tooltips showing extracted content
116+
- Automatic image scaling
117+
- Page selection support
118+
119+
## Customization
120+
121+
**Change source file patterns**:
122+
```sql
123+
-- In ai_parse_document_variant.sql
124+
FROM STREAM READ_FILES(
125+
'/Volumes/main/default/source_docs/*.{pdf,jpg,png}',
126+
format => 'binaryFile'
127+
)
128+
```
129+
130+
**Customize AI extraction**:
131+
```sql
132+
-- In ai_query_extraction.sql
133+
ai_query(
134+
'databricks-claude-sonnet-4',
135+
concat('Extract key dates and amounts from: ', text),
136+
returnType => 'STRING',
137+
modelParameters => named_struct('max_tokens', 2000, 'temperature', 0.1)
138+
)
139+
```
140+
141+
## Project Structure
142+
143+
```
144+
.
145+
├── databricks.yml # Bundle configuration
146+
├── resources/
147+
│ └── ai_parse_document_pipeline.pipeline.yml
148+
├── src/
149+
│ ├── transformations/
150+
│ │ ├── ai_parse_document_variant.sql
151+
│ │ ├── ai_parse_document_text.sql
152+
│ │ └── ai_query_extraction.sql
153+
│ └── explorations/
154+
│ └── ai_parse_document -- debug output.py
155+
└── README.md
156+
```
157+
158+
## Key Concepts
159+
160+
### Incremental Processing with Lakeflow
161+
Streaming tables process only new data:
162+
```sql
163+
CREATE OR REFRESH STREAMING TABLE table_name AS (
164+
SELECT * FROM STREAM(source_table)
165+
)
166+
```
167+
168+
### Avoiding Complete Mode
169+
Using `transform()` instead of aggregations enables incremental append:
170+
```sql
171+
-- Good: Incremental append
172+
transform(array, element -> element.content)
173+
174+
-- Avoid: Forces complete mode
175+
collect_list(...) GROUP BY ...
176+
```
177+
178+
## Resources
179+
180+
- [Databricks Asset Bundles](https://docs.databricks.com/dev-tools/bundles/)
181+
- [Lakeflow Declarative Pipelines](https://docs.databricks.com/aws/en/dlt/concepts)
182+
- [`ai_parse_document` Function](https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_parse_document)
183+
- [`ai_query` Function](https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_query)
184+
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# This is a Databricks asset bundle definition for ai_parse_document_pipeline.
2+
# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation.
3+
bundle:
4+
name: ai_parse_document_pipeline
5+
6+
variables:
7+
catalog:
8+
description: The catalog name for the pipeline
9+
default: main
10+
schema:
11+
description: The schema name for the pipeline
12+
default: default
13+
source_volume_path:
14+
description: Source volume path for PDF files
15+
default: /Volumes/main/default/source_documents
16+
output_volume_path:
17+
description: Output volume path for processed images
18+
default: /Volumes/main/default/parsed_output
19+
raw_table_name:
20+
description: Name of the raw parsed documents table
21+
default: parsed_documents_raw
22+
text_table_name:
23+
description: Name of the text output table
24+
default: parsed_documents_text
25+
26+
include:
27+
- resources/*.yml
28+
29+
targets:
30+
dev:
31+
# The default target uses 'mode: development' to create a development copy.
32+
# - Deployed resources get prefixed with '[dev my_user_name]'
33+
# - Any job schedules and triggers are paused by default.
34+
# See also https://docs.databricks.com/dev-tools/bundles/deployment-modes.html.
35+
mode: development
36+
default: true
37+
workspace:
38+
host: https://your-workspace.cloud.databricks.com
39+
40+
prod:
41+
mode: production
42+
workspace:
43+
host: https://your-workspace.cloud.databricks.com
44+
permissions:
45+
- group_name: users
46+
level: CAN_VIEW
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
resources:
2+
pipelines:
3+
ai_parse_document_pipeline:
4+
name: ai_parse_document_pipeline
5+
libraries:
6+
- file:
7+
path: ../src/transformations/ai_parse_document_variant.sql
8+
- file:
9+
path: ../src/transformations/ai_parse_document_text.sql
10+
- file:
11+
path: ../src/transformations/ai_query_extraction.sql
12+
catalog: ${var.catalog}
13+
channel: CURRENT
14+
photon: true
15+
schema: ${var.schema}
16+
serverless: true
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
"""
2+
setup.py configuration script describing how to build and package this project.
3+
4+
This file is primarily used by the setuptools library and typically should not
5+
be executed directly. See README.md for how to deploy, test, and run
6+
the my_project project.
7+
"""
8+
9+
from setuptools import setup, find_packages
10+
11+
import sys
12+
13+
sys.path.append("./src")
14+
15+
import datetime
16+
import my_project
17+
18+
local_version = datetime.datetime.utcnow().strftime("%Y%m%d.%H%M%S")
19+
20+
setup(
21+
name="my_project",
22+
# We use timestamp as Local version identifier (https://peps.python.org/pep-0440/#local-version-identifiers.)
23+
# to ensure that changes to wheel package are picked up when used on all-purpose clusters
24+
version=my_project.__version__ + "+" + local_version,
25+
url="https://databricks.com",
26+
author="jas.bali@databricks.com",
27+
description="wheel file based on my_project/src",
28+
packages=find_packages(where="./src"),
29+
package_dir={"": "src"},
30+
entry_points={
31+
"packages": [
32+
"main=my_project.main:main",
33+
],
34+
},
35+
install_requires=[
36+
# Dependencies in case the output wheel file is used as a library dependency.
37+
# For defining dependencies, when this package is used in Databricks, see:
38+
# https://docs.databricks.com/dev-tools/bundles/library-dependencies.html
39+
"setuptools"
40+
],
41+
)

0 commit comments

Comments
 (0)