-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Expand file tree
/
Copy pathtest_asyncio.py
More file actions
40 lines (35 loc) · 1.67 KB
/
test_asyncio.py
File metadata and controls
40 lines (35 loc) · 1.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import os
import pytest
from sqlalchemy.ext.asyncio import create_async_engine
from google.cloud.sqlalchemy_spanner.sqlalchemy_spanner_asyncio import (
SpannerDialect_asyncio,
)
from sqlalchemy.testing.plugin.plugin_base import fixtures
class AsyncioTest(fixtures.TestBase):
@pytest.mark.asyncio
async def test_async_engine_creation(self):
assert os.environ.get("SPANNER_EMULATOR_HOST") is not None
engine = create_async_engine("spanner+spanner_asyncio:///projects/p/instances/i/databases/d")
assert engine.dialect.is_async
assert isinstance(engine.dialect, SpannerDialect_asyncio)
@pytest.mark.asyncio
async def test_async_connection(self, mocker):
from sqlalchemy import text
from sqlalchemy.pool import NullPool
assert os.environ.get("SPANNER_EMULATOR_HOST") is not None
engine = create_async_engine(
"spanner+spanner_asyncio:///projects/p/instances/i/databases/d",
poolclass=NullPool
)
# We need to mock the underlying sync connect
mock_connect = mocker.patch("google.cloud.spanner_dbapi.connect")
mock_sync_conn = mock_connect.return_value
mock_sync_cursor = mock_sync_conn.cursor.return_value
# When we call execute, it should work through the async adapter
async with engine.connect() as conn:
assert conn.dialect == engine.dialect
# This will eventually call cursor.execute in a thread
await conn.execute(text("SELECT 1"))
mock_connect.assert_called_once()
mock_sync_conn.close.assert_called_once()
mock_sync_cursor.execute.assert_called_once()