Skip to content

Commit 42603d3

Browse files
authored
Merge pull request #94 from UiPath/fix/use_sqllite_llamaindex_storage
fix: store context in sqllite
2 parents 3626d1d + a20cb2a commit 42603d3

6 files changed

Lines changed: 228 additions & 144 deletions

File tree

pyproject.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
[project]
22
name = "uipath-llamaindex"
3-
version = "0.1.4"
3+
version = "0.1.5"
44
description = "UiPath LlamaIndex SDK"
55
readme = { file = "README.md", content-type = "text/markdown" }
66
requires-python = ">=3.11"
77
dependencies = [
8+
"aiosqlite>=0.21.0",
89
"llama-index>=0.14.8",
910
"llama-index-embeddings-azure-openai>=0.4.1",
1011
"llama-index-llms-azure-openai>=0.4.2",
1112
"openinference-instrumentation-llama-index>=4.3.9",
12-
"uipath>=2.2.16, <2.3.0",
13+
"uipath>=2.2.26, <2.3.0",
1314
]
1415
classifiers = [
1516
"Intended Audience :: Developers",

src/uipath_llamaindex/runtime/_attribute_normalizer.py renamed to src/uipath_llamaindex/runtime/_telemetry.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
logger = logging.getLogger(__name__)
1616

1717

18-
class AttributeNormalizingSpanProcessor(SpanProcessor):
18+
class ToolCallAttributeNormalizer(SpanProcessor):
1919
"""Normalizes LlamaIndex tool call attributes to match other frameworks.
2020
2121
Unwraps {"kwargs": {...}} to flat {...} format for consistency with LangChain.

src/uipath_llamaindex/runtime/factory.py

Lines changed: 35 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,16 @@
1717
from uipath.runtime.errors import UiPathErrorCategory
1818
from workflows import Workflow
1919

20-
from uipath_llamaindex.runtime._attribute_normalizer import (
21-
AttributeNormalizingSpanProcessor,
20+
from uipath_llamaindex.runtime._telemetry import (
21+
ToolCallAttributeNormalizer,
2222
)
2323
from uipath_llamaindex.runtime.config import LlamaIndexConfig
2424
from uipath_llamaindex.runtime.errors import (
2525
UiPathLlamaIndexErrorCode,
2626
UiPathLlamaIndexRuntimeError,
2727
)
2828
from uipath_llamaindex.runtime.runtime import UiPathLlamaIndexRuntime
29-
from uipath_llamaindex.runtime.storage import PickleResumableStorage
29+
from uipath_llamaindex.runtime.storage import SQLiteResumableStorage
3030
from uipath_llamaindex.runtime.workflow import LlamaIndexWorkflowLoader
3131

3232

@@ -45,12 +45,14 @@ def __init__(
4545
"""
4646
self.context = context
4747
self._config: LlamaIndexConfig | None = None
48-
self._storage_path: str | None = None
4948

5049
self._workflow_cache: dict[str, Workflow] = {}
5150
self._workflow_loaders: dict[str, LlamaIndexWorkflowLoader] = {}
5251
self._workflow_lock = asyncio.Lock()
5352

53+
self._storage_lock = asyncio.Lock()
54+
self._storage: SQLiteResumableStorage | None = None
55+
5456
self._setup_instrumentation(self.context.trace_manager)
5557

5658
def _setup_instrumentation(self, trace_manager: UiPathTraceManager | None) -> None:
@@ -60,26 +62,37 @@ def _setup_instrumentation(self, trace_manager: UiPathTraceManager | None) -> No
6062

6163
if trace_manager:
6264
trace_manager.tracer_provider.add_span_processor(
63-
AttributeNormalizingSpanProcessor()
65+
ToolCallAttributeNormalizer()
6466
)
6567

6668
def _get_storage_path(self) -> str:
6769
"""Get the storage path for workflow state."""
68-
if self._storage_path is None:
69-
if self.context.runtime_dir and self.context.state_file:
70-
path = os.path.join(self.context.runtime_dir, self.context.state_file)
71-
if not self.context.resume and self.context.job_id is None:
72-
# If not resuming and no job id, delete the previous state file
73-
if os.path.exists(path):
74-
os.remove(path)
75-
os.makedirs(self.context.runtime_dir, exist_ok=True)
76-
self._storage_path = path
77-
else:
78-
default_path = os.path.join("__uipath", "state.db")
79-
os.makedirs(os.path.dirname(default_path), exist_ok=True)
80-
self._storage_path = default_path
81-
82-
return self._storage_path
70+
if self.context.runtime_dir and self.context.state_file:
71+
path = os.path.join(self.context.runtime_dir, self.context.state_file)
72+
if not self.context.resume and self.context.job_id is None:
73+
# If not resuming and no job id, delete the previous state file
74+
if os.path.exists(path):
75+
os.remove(path)
76+
os.makedirs(self.context.runtime_dir, exist_ok=True)
77+
return path
78+
79+
default_path = os.path.join("__uipath", "state.db")
80+
os.makedirs(os.path.dirname(default_path), exist_ok=True)
81+
return default_path
82+
83+
async def _get_storage(self) -> SQLiteResumableStorage:
84+
"""Get or create the shared storage instance."""
85+
if self._storage is not None:
86+
return self._storage
87+
88+
async with self._storage_lock:
89+
if self._storage is not None:
90+
return self._storage
91+
92+
storage_path = self._get_storage_path()
93+
self._storage = SQLiteResumableStorage(storage_path)
94+
await self._storage.setup()
95+
return self._storage
8396

8497
def _load_config(self) -> LlamaIndexConfig:
8598
"""Load llama_index.json configuration."""
@@ -199,7 +212,6 @@ async def discover_runtimes(self) -> list[UiPathRuntimeProtocol]:
199212
List of LlamaIndexRuntime instances, one per entrypoint
200213
"""
201214
entrypoints = self.discover_entrypoints()
202-
storage_path = self._get_storage_path()
203215

204216
runtimes: list[UiPathRuntimeProtocol] = []
205217
for entrypoint in entrypoints:
@@ -209,7 +221,6 @@ async def discover_runtimes(self) -> list[UiPathRuntimeProtocol]:
209221
workflow=workflow,
210222
runtime_id=entrypoint,
211223
entrypoint=entrypoint,
212-
storage_path=storage_path,
213224
)
214225
runtimes.append(runtime)
215226

@@ -220,7 +231,6 @@ async def _create_runtime_instance(
220231
workflow: Workflow,
221232
runtime_id: str,
222233
entrypoint: str,
223-
storage_path: str,
224234
) -> UiPathRuntimeProtocol:
225235
"""
226236
Create a runtime instance from a workflow.
@@ -229,12 +239,12 @@ async def _create_runtime_instance(
229239
workflow: The workflow
230240
runtime_id: Unique identifier for the runtime instance
231241
entrypoint: Workflow entrypoint name
232-
storage_path: Path for state storage
233242
234243
Returns:
235244
Configured runtime instance
236245
"""
237-
storage = PickleResumableStorage(storage_path)
246+
247+
storage = await self._get_storage()
238248

239249
base_runtime = UiPathLlamaIndexRuntime(
240250
workflow=workflow,
@@ -265,15 +275,12 @@ async def new_runtime(
265275
Returns:
266276
Configured runtime instance with workflow
267277
"""
268-
storage_path = self._get_storage_path()
269-
270278
workflow = await self._resolve_workflow(entrypoint)
271279

272280
return await self._create_runtime_instance(
273281
workflow=workflow,
274282
runtime_id=runtime_id,
275283
entrypoint=entrypoint,
276-
storage_path=storage_path,
277284
)
278285

279286
async def dispose(self) -> None:

src/uipath_llamaindex/runtime/runtime.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
UiPathLlamaIndexRuntimeError,
4848
)
4949
from uipath_llamaindex.runtime.schema import get_entrypoints_schema, get_workflow_schema
50-
from uipath_llamaindex.runtime.storage import PickleResumableStorage
50+
from uipath_llamaindex.runtime.storage import SQLiteResumableStorage
5151

5252
from ._serialize import serialize_output
5353

@@ -62,7 +62,7 @@ def __init__(
6262
workflow: Workflow,
6363
runtime_id: str | None = None,
6464
entrypoint: str | None = None,
65-
storage: PickleResumableStorage | None = None,
65+
storage: SQLiteResumableStorage | None = None,
6666
debug_mode: bool = False,
6767
):
6868
"""
@@ -76,7 +76,7 @@ def __init__(
7676
self.workflow: Workflow = workflow
7777
self.runtime_id: str = runtime_id or "default"
7878
self.entrypoint: str | None = entrypoint
79-
self.storage: PickleResumableStorage | None = storage
79+
self.storage: SQLiteResumableStorage | None = storage
8080
self.debug_mode: bool = debug_mode
8181
self._context: Context | None = None
8282

@@ -142,7 +142,7 @@ async def _run_workflow(
142142
is_resuming = bool(options and options.resume)
143143

144144
if is_resuming:
145-
self._context = self._load_context()
145+
self._context = await self._load_context()
146146
else:
147147
self._context = Context(self.workflow)
148148

@@ -215,7 +215,7 @@ async def _run_workflow(
215215

216216
if suspended_event is not None:
217217
await asyncio.sleep(0) # Yield control to event loop
218-
self._save_context()
218+
await self._save_context()
219219
await handler.cancel_run()
220220
if isinstance(suspended_event, BreakpointEvent):
221221
yield self._create_breakpoint_result(suspended_event)
@@ -325,12 +325,12 @@ def _create_runtime_error(self, e: Exception) -> UiPathLlamaIndexRuntimeError:
325325
UiPathErrorCategory.USER,
326326
)
327327

328-
def _load_context(self) -> Context:
328+
async def _load_context(self) -> Context:
329329
"""Load the workflow context from storage."""
330330
if not self.storage:
331331
return Context(self.workflow)
332332

333-
context_dict = self.storage.load_context()
333+
context_dict = await self.storage.load_context(runtime_id=self.runtime_id)
334334

335335
if context_dict:
336336
from workflows.context.serializers import JsonPickleSerializer
@@ -344,7 +344,7 @@ def _load_context(self) -> Context:
344344
else:
345345
return Context(self.workflow)
346346

347-
def _save_context(self) -> None:
347+
async def _save_context(self) -> None:
348348
"""Save the current workflow context to storage."""
349349
if not self.storage or not self._context:
350350
return None
@@ -354,7 +354,9 @@ def _save_context(self) -> None:
354354
serializer = JsonPickleSerializer()
355355
context_dict = self._context.to_dict(serializer=serializer)
356356

357-
self.storage.save_context(context_dict)
357+
await self.storage.save_context(
358+
runtime_id=self.runtime_id, context_dict=context_dict
359+
)
358360

359361
async def dispose(self) -> None:
360362
"""Cleanup runtime resources."""

0 commit comments

Comments
 (0)