Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@
}
},
"sync": {
"layers": ["obstacles"],
"layers": [
{
"name": "obstacles",
"query": "query { obstacles { createdBy creationTime deleted entityVersion geography { coordinates { latitude longitude } graphicsObjectKind { value } height obstacleHeightsRange { displayName } } id identifiers { essence { displayName value } name number } lastUpdateTime lastUpdatedBy } }"
}
],
"syncIntervalMs": 500,
"pollIntervalMs": 600000,
"pageSize": 1000,
Expand Down
5 changes: 3 additions & 2 deletions src/containerConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ export const registerExternalValues = async (options?: RegisterOptions): Promise
configInstance.initializeMetrics(metricsRegistry);

const syncConfig = getSyncConfig();
const dataSource = await initializeDb(syncConfig.layers);
const layers = syncConfig.layers.map((l) => l.name);
const dataSource = await initializeDb(layers);
const { host, port, database } = dataSource.options as { host: string; port: number; database: string };
logger.info(`Database connected to ${host}:${port}/${database} (layer partitions: ${syncConfig.layers.join(', ')})`);
logger.info(`Database connected to ${host}:${port}/${database} (layer partitions: ${layers.join(', ')})`);

const syncManager = new SyncManager(logger);

Expand Down
4 changes: 2 additions & 2 deletions src/dal/repositories/layerDataRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async function insertRow(layerName: string, object: LayerObject): Promise<void>
.insert()
.into(LayerObjectEntity)
.values({ layerName, id: object.id, geom: object.geom, properties: object.properties } as unknown as ObjectLiteral)
.orIgnore()
.orUpdate(['geom', 'properties'], ['layer_name', 'id'])
.execute();
}

Expand All @@ -41,7 +41,7 @@ export async function insertObjects(logger: Logger, layerName: string, objects:
.insert()
.into(LayerObjectEntity)
.values(rows as unknown as ObjectLiteral[])
.orIgnore()
.orUpdate(['geom', 'properties'], ['layer_name', 'id'])
.execute();
} catch (batchErr) {
logger.warn({ msg: 'Batch insert failed, falling back to per-object inserts', layerName, count: objects.length, err: batchErr });
Expand Down
2 changes: 1 addition & 1 deletion src/externalClients/layersClient/layersClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export async function fetchPage(logger: Logger, layerName: string, sequence: str
async () => {
const response = await axios.post<GraphQLResponse>(
config.thirdPartyBaseUrl,
{ query: buildLayerQuery(layerName) },
{ query: buildLayerQuery(layerName, config.layers) },
{
headers: {
'Content-Type': 'application/json',
Expand Down
44 changes: 10 additions & 34 deletions src/externalClients/layersClientModel.ts
Original file line number Diff line number Diff line change
@@ -1,36 +1,12 @@
// Third-party API uses the layer name as the root query field.
// Pagination/auth inputs are passed via HTTP headers, not GraphQL variables.
export function buildLayerQuery(layerName: string): string {
return `query {
${layerName} {
createdBy
creationTime
deleted
entityVersion
geography {
coordinates {
latitude
longitude
}
graphicsObjectKind {
value
}
height
obstacleHeightsRange {
displayName
}
}
id
identifiers {
essence {
displayName
value
}
name
number
}
lastUpdateTime
lastUpdatedBy
import type { LayerConfig } from '../types';

// The query per layer comes from config (sync.layers), matched by layer name.
// The layer name is the root GraphQL query field. A layer with no configured
// query is treated as a misconfiguration and skipped (error thrown).
export function buildLayerQuery(layerName: string, layers: LayerConfig[]): string {
const layer = layers.find((l) => l.name === layerName);
if (layer === undefined || layer.query.trim() === '') {
throw new Error(`No query configured for layer "${layerName}"`);
}
}`;
return layer.query;
}
5 changes: 3 additions & 2 deletions src/scheduler/syncManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ export class SyncManager {
public async start(): Promise<void> {
const config = getSyncConfig();

this.logger.info(`Initializing sync for layers: ${config.layers.join(', ')}`);
const layers = config.layers.map((l) => l.name);
this.logger.info(`Initializing sync for layers: ${layers.join(', ')}`);

await this.initialize(config.layers);
await this.initialize(layers);

this.running = true;
this.loopPromise = this.runSchedulerLoop();
Expand Down
1 change: 1 addition & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ export { SyncStatus, SyncStateEntry } from './syncState';
export type { ScheduleEntry } from './scheduler';
export type { LayerObject, ThirdPartyResponse } from './thirdParty';
export type { SyncConfig } from './syncConfig';
export type { LayerConfig } from './layerConfig';
export type { DbConfig } from './dbConfig';
4 changes: 4 additions & 0 deletions src/types/layerConfig.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export interface LayerConfig {
name: string;
query: string;
}
5 changes: 4 additions & 1 deletion src/types/syncConfig.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import type { LayerConfig } from './layerConfig';

export interface SyncConfig {
layers: string[];
// Layers to sync; each entry pairs a layer name with its GraphQL query.
layers: LayerConfig[];
syncIntervalMs: number;
pollIntervalMs: number;
pageSize: number;
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/scheduler/syncManager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ vi.mock('@src/handler/layerSyncHandler', () => ({
const TEST_LAYERS = ['layer_alpha', 'layer_beta'];

const syncConfig: SyncConfig = {
layers: TEST_LAYERS,
layers: TEST_LAYERS.map((layer) => ({ name: layer, query: `query { ${layer} { id } }` })),
syncIntervalMs: 10,
pollIntervalMs: 1_000_000,
pageSize: 100,
Expand Down
75 changes: 74 additions & 1 deletion tests/unit/dal/layerDataRepository.spec.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
import type { Logger } from '@map-colonies/js-logger';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';

import { deleteDeprecatedObjects } from '@src/dal/repositories/layerDataRepository';
import { deleteDeprecatedObjects, insertObjects } from '@src/dal/repositories/layerDataRepository';

const mockExecute = vi.fn();
const mockWhere = vi.fn().mockReturnValue({ execute: mockExecute });
const mockFrom = vi.fn().mockReturnValue({ where: mockWhere });
const mockDelete = vi.fn().mockReturnValue({ from: mockFrom });
const mockCreateQueryBuilder = vi.fn().mockReturnValue({ delete: mockDelete });
// insert chain: createQueryBuilder().insert().into().values().orUpdate().execute()
const mockInsertExecute = vi.fn();
const mockOrUpdate = vi.fn().mockReturnValue({ execute: mockInsertExecute });
const mockValues = vi.fn().mockReturnValue({ orUpdate: mockOrUpdate });
const mockInto = vi.fn().mockReturnValue({ values: mockValues });
const mockInsert = vi.fn().mockReturnValue({ into: mockInto });

const mockGetRepository = vi.fn().mockReturnValue({ createQueryBuilder: mockCreateQueryBuilder });

vi.mock('@src/dal/connection', () => ({
Expand Down Expand Up @@ -95,3 +102,69 @@ describe('deleteDeprecatedObjects', () => {
expect(mockExecute).not.toHaveBeenCalled();
});
});

function makeObject(id: string): LayerObject {
return { id, geom: { type: 'Point', coordinates: [0, 0] }, properties: { foo: id } };
}

describe('insertObjects', () => {
let logger: Logger;

beforeEach(() => {
logger = makeLogger();
vi.clearAllMocks();
mockCreateQueryBuilder.mockReturnValue({ insert: mockInsert });
mockInsert.mockReturnValue({ into: mockInto });
mockInto.mockReturnValue({ values: mockValues });
mockValues.mockReturnValue({ orUpdate: mockOrUpdate });
mockOrUpdate.mockReturnValue({ execute: mockInsertExecute });
});

afterEach(() => {
vi.restoreAllMocks();
});

it('upserts on the (layer_name, id) conflict, overwriting geom and properties', async () => {
mockInsertExecute.mockResolvedValueOnce(undefined);

await insertObjects(logger, 'obstacles', [makeObject('a'), makeObject('b')]);

expect(mockInsertExecute).toHaveBeenCalledTimes(1);
expect(mockOrUpdate).toHaveBeenCalledWith(['geom', 'properties'], ['layer_name', 'id']);
});

it('does nothing when objects is empty', async () => {
await insertObjects(logger, 'obstacles', []);

expect(mockInsertExecute).not.toHaveBeenCalled();
});

it('falls back to per-object upserts when the batch insert throws', async () => {
mockInsertExecute.mockRejectedValueOnce(new Error('batch error')).mockResolvedValueOnce(undefined).mockResolvedValueOnce(undefined);

await insertObjects(logger, 'obstacles', [makeObject('a'), makeObject('b')]);

// batch attempt + 2 per-object attempts
expect(mockInsertExecute).toHaveBeenCalledTimes(3);
expect(logger.warn).toHaveBeenCalledWith(
expect.objectContaining({ msg: 'Batch insert failed, falling back to per-object inserts', layerName: 'obstacles', count: 2 })
);
});

it('logs and continues when an individual upsert also fails', async () => {
Comment thread
asafmas-rnd marked this conversation as resolved.
mockInsertExecute
.mockRejectedValueOnce(new Error('batch error'))
.mockRejectedValueOnce(new Error('individual error'))
.mockResolvedValueOnce(undefined);

await insertObjects(logger, 'obstacles', [makeObject('a'), makeObject('b')]);

expect(mockInsertExecute).toHaveBeenCalledTimes(3);
expect(logger.error).toHaveBeenCalledWith(
expect.objectContaining({ msg: 'Insert failed for object, skipping', layerName: 'obstacles', id: 'a' })
);
expect(logger.error).toHaveBeenCalledTimes(1);

expect(mockValues).toHaveBeenLastCalledWith(expect.objectContaining({ layerName: 'obstacles', id: 'b' }));
});
});
2 changes: 1 addition & 1 deletion tests/unit/externalClients/layersClient.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ vi.mock('@src/common/syncConfig', () => ({
}));

const syncConfig: SyncConfig = {
layers: ['obstacles'],
layers: [{ name: 'obstacles', query: 'query { obstacles { id } }' }],
syncIntervalMs: 500,
pollIntervalMs: 600_000,
pageSize: 100,
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/handler/layerSyncHandler.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ vi.mock('@src/externalClients/layersClient/layersClient', () => ({
}));

const syncConfig: SyncConfig = {
layers: ['obstacles'],
layers: [{ name: 'obstacles', query: 'query { obstacles { id } }' }],
syncIntervalMs: 500,
pollIntervalMs: 600_000,
pageSize: 100,
Expand Down
Loading