diff --git a/config/default.json b/config/default.json index 14b276c..7f33d12 100644 --- a/config/default.json +++ b/config/default.json @@ -28,7 +28,12 @@ } }, "sync": { - "layers": ["obstacles"], + "layers": [ + { + "name": "obstacles", + "query": "query { obstacles { createdBy creationTime deleted entityVersion geography { coordinates { latitude longitude } graphicsObjectKind { value } height obstacleHeightsRange { displayName } } id identifiers { essence { displayName value } name number } lastUpdateTime lastUpdatedBy } }" + } + ], "syncIntervalMs": 500, "pollIntervalMs": 600000, "pageSize": 1000, diff --git a/src/containerConfig.ts b/src/containerConfig.ts index c50f22f..482c447 100644 --- a/src/containerConfig.ts +++ b/src/containerConfig.ts @@ -29,9 +29,10 @@ export const registerExternalValues = async (options?: RegisterOptions): Promise configInstance.initializeMetrics(metricsRegistry); const syncConfig = getSyncConfig(); - const dataSource = await initializeDb(syncConfig.layers); + const layers = syncConfig.layers.map((l) => l.name); + const dataSource = await initializeDb(layers); const { host, port, database } = dataSource.options as { host: string; port: number; database: string }; - logger.info(`Database connected to ${host}:${port}/${database} (layer partitions: ${syncConfig.layers.join(', ')})`); + logger.info(`Database connected to ${host}:${port}/${database} (layer partitions: ${layers.join(', ')})`); const syncManager = new SyncManager(logger); diff --git a/src/dal/repositories/layerDataRepository.ts b/src/dal/repositories/layerDataRepository.ts index ab921e6..46ca515 100644 --- a/src/dal/repositories/layerDataRepository.ts +++ b/src/dal/repositories/layerDataRepository.ts @@ -19,7 +19,7 @@ async function insertRow(layerName: string, object: LayerObject): Promise .insert() .into(LayerObjectEntity) .values({ layerName, id: object.id, geom: object.geom, properties: object.properties } as unknown as ObjectLiteral) - .orIgnore() + .orUpdate(['geom', 'properties'], ['layer_name', 'id']) .execute(); } @@ -41,7 +41,7 @@ export async function insertObjects(logger: Logger, layerName: string, objects: .insert() .into(LayerObjectEntity) .values(rows as unknown as ObjectLiteral[]) - .orIgnore() + .orUpdate(['geom', 'properties'], ['layer_name', 'id']) .execute(); } catch (batchErr) { logger.warn({ msg: 'Batch insert failed, falling back to per-object inserts', layerName, count: objects.length, err: batchErr }); diff --git a/src/externalClients/layersClient/layersClient.ts b/src/externalClients/layersClient/layersClient.ts index 242127c..d7f97ea 100644 --- a/src/externalClients/layersClient/layersClient.ts +++ b/src/externalClients/layersClient/layersClient.ts @@ -64,7 +64,7 @@ export async function fetchPage(logger: Logger, layerName: string, sequence: str async () => { const response = await axios.post( config.thirdPartyBaseUrl, - { query: buildLayerQuery(layerName) }, + { query: buildLayerQuery(layerName, config.layers) }, { headers: { 'Content-Type': 'application/json', diff --git a/src/externalClients/layersClientModel.ts b/src/externalClients/layersClientModel.ts index cc30505..44288ff 100644 --- a/src/externalClients/layersClientModel.ts +++ b/src/externalClients/layersClientModel.ts @@ -1,36 +1,12 @@ -// Third-party API uses the layer name as the root query field. -// Pagination/auth inputs are passed via HTTP headers, not GraphQL variables. -export function buildLayerQuery(layerName: string): string { - return `query { - ${layerName} { - createdBy - creationTime - deleted - entityVersion - geography { - coordinates { - latitude - longitude - } - graphicsObjectKind { - value - } - height - obstacleHeightsRange { - displayName - } - } - id - identifiers { - essence { - displayName - value - } - name - number - } - lastUpdateTime - lastUpdatedBy +import type { LayerConfig } from '../types'; + +// The query per layer comes from config (sync.layers), matched by layer name. +// The layer name is the root GraphQL query field. A layer with no configured +// query is treated as a misconfiguration and skipped (error thrown). +export function buildLayerQuery(layerName: string, layers: LayerConfig[]): string { + const layer = layers.find((l) => l.name === layerName); + if (layer === undefined || layer.query.trim() === '') { + throw new Error(`No query configured for layer "${layerName}"`); } -}`; + return layer.query; } diff --git a/src/scheduler/syncManager.ts b/src/scheduler/syncManager.ts index 3382efd..59fba0f 100644 --- a/src/scheduler/syncManager.ts +++ b/src/scheduler/syncManager.ts @@ -23,9 +23,10 @@ export class SyncManager { public async start(): Promise { const config = getSyncConfig(); - this.logger.info(`Initializing sync for layers: ${config.layers.join(', ')}`); + const layers = config.layers.map((l) => l.name); + this.logger.info(`Initializing sync for layers: ${layers.join(', ')}`); - await this.initialize(config.layers); + await this.initialize(layers); this.running = true; this.loopPromise = this.runSchedulerLoop(); diff --git a/src/types/index.ts b/src/types/index.ts index dffea1d..73b4c2a 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -2,4 +2,5 @@ export { SyncStatus, SyncStateEntry } from './syncState'; export type { ScheduleEntry } from './scheduler'; export type { LayerObject, ThirdPartyResponse } from './thirdParty'; export type { SyncConfig } from './syncConfig'; +export type { LayerConfig } from './layerConfig'; export type { DbConfig } from './dbConfig'; diff --git a/src/types/layerConfig.ts b/src/types/layerConfig.ts new file mode 100644 index 0000000..f9bfa21 --- /dev/null +++ b/src/types/layerConfig.ts @@ -0,0 +1,4 @@ +export interface LayerConfig { + name: string; + query: string; +} diff --git a/src/types/syncConfig.ts b/src/types/syncConfig.ts index b168b2e..e89933b 100644 --- a/src/types/syncConfig.ts +++ b/src/types/syncConfig.ts @@ -1,5 +1,8 @@ +import type { LayerConfig } from './layerConfig'; + export interface SyncConfig { - layers: string[]; + // Layers to sync; each entry pairs a layer name with its GraphQL query. + layers: LayerConfig[]; syncIntervalMs: number; pollIntervalMs: number; pageSize: number; diff --git a/tests/integration/scheduler/syncManager.spec.ts b/tests/integration/scheduler/syncManager.spec.ts index 95a3dd1..ac6b376 100644 --- a/tests/integration/scheduler/syncManager.spec.ts +++ b/tests/integration/scheduler/syncManager.spec.ts @@ -21,7 +21,7 @@ vi.mock('@src/handler/layerSyncHandler', () => ({ const TEST_LAYERS = ['layer_alpha', 'layer_beta']; const syncConfig: SyncConfig = { - layers: TEST_LAYERS, + layers: TEST_LAYERS.map((layer) => ({ name: layer, query: `query { ${layer} { id } }` })), syncIntervalMs: 10, pollIntervalMs: 1_000_000, pageSize: 100, diff --git a/tests/unit/dal/layerDataRepository.spec.ts b/tests/unit/dal/layerDataRepository.spec.ts index 217a9f7..31b1b6c 100644 --- a/tests/unit/dal/layerDataRepository.spec.ts +++ b/tests/unit/dal/layerDataRepository.spec.ts @@ -1,13 +1,20 @@ import type { Logger } from '@map-colonies/js-logger'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; -import { deleteDeprecatedObjects } from '@src/dal/repositories/layerDataRepository'; +import { deleteDeprecatedObjects, insertObjects } from '@src/dal/repositories/layerDataRepository'; const mockExecute = vi.fn(); const mockWhere = vi.fn().mockReturnValue({ execute: mockExecute }); const mockFrom = vi.fn().mockReturnValue({ where: mockWhere }); const mockDelete = vi.fn().mockReturnValue({ from: mockFrom }); const mockCreateQueryBuilder = vi.fn().mockReturnValue({ delete: mockDelete }); +// insert chain: createQueryBuilder().insert().into().values().orUpdate().execute() +const mockInsertExecute = vi.fn(); +const mockOrUpdate = vi.fn().mockReturnValue({ execute: mockInsertExecute }); +const mockValues = vi.fn().mockReturnValue({ orUpdate: mockOrUpdate }); +const mockInto = vi.fn().mockReturnValue({ values: mockValues }); +const mockInsert = vi.fn().mockReturnValue({ into: mockInto }); + const mockGetRepository = vi.fn().mockReturnValue({ createQueryBuilder: mockCreateQueryBuilder }); vi.mock('@src/dal/connection', () => ({ @@ -95,3 +102,69 @@ describe('deleteDeprecatedObjects', () => { expect(mockExecute).not.toHaveBeenCalled(); }); }); + +function makeObject(id: string): LayerObject { + return { id, geom: { type: 'Point', coordinates: [0, 0] }, properties: { foo: id } }; +} + +describe('insertObjects', () => { + let logger: Logger; + + beforeEach(() => { + logger = makeLogger(); + vi.clearAllMocks(); + mockCreateQueryBuilder.mockReturnValue({ insert: mockInsert }); + mockInsert.mockReturnValue({ into: mockInto }); + mockInto.mockReturnValue({ values: mockValues }); + mockValues.mockReturnValue({ orUpdate: mockOrUpdate }); + mockOrUpdate.mockReturnValue({ execute: mockInsertExecute }); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('upserts on the (layer_name, id) conflict, overwriting geom and properties', async () => { + mockInsertExecute.mockResolvedValueOnce(undefined); + + await insertObjects(logger, 'obstacles', [makeObject('a'), makeObject('b')]); + + expect(mockInsertExecute).toHaveBeenCalledTimes(1); + expect(mockOrUpdate).toHaveBeenCalledWith(['geom', 'properties'], ['layer_name', 'id']); + }); + + it('does nothing when objects is empty', async () => { + await insertObjects(logger, 'obstacles', []); + + expect(mockInsertExecute).not.toHaveBeenCalled(); + }); + + it('falls back to per-object upserts when the batch insert throws', async () => { + mockInsertExecute.mockRejectedValueOnce(new Error('batch error')).mockResolvedValueOnce(undefined).mockResolvedValueOnce(undefined); + + await insertObjects(logger, 'obstacles', [makeObject('a'), makeObject('b')]); + + // batch attempt + 2 per-object attempts + expect(mockInsertExecute).toHaveBeenCalledTimes(3); + expect(logger.warn).toHaveBeenCalledWith( + expect.objectContaining({ msg: 'Batch insert failed, falling back to per-object inserts', layerName: 'obstacles', count: 2 }) + ); + }); + + it('logs and continues when an individual upsert also fails', async () => { + mockInsertExecute + .mockRejectedValueOnce(new Error('batch error')) + .mockRejectedValueOnce(new Error('individual error')) + .mockResolvedValueOnce(undefined); + + await insertObjects(logger, 'obstacles', [makeObject('a'), makeObject('b')]); + + expect(mockInsertExecute).toHaveBeenCalledTimes(3); + expect(logger.error).toHaveBeenCalledWith( + expect.objectContaining({ msg: 'Insert failed for object, skipping', layerName: 'obstacles', id: 'a' }) + ); + expect(logger.error).toHaveBeenCalledTimes(1); + + expect(mockValues).toHaveBeenLastCalledWith(expect.objectContaining({ layerName: 'obstacles', id: 'b' })); + }); +}); diff --git a/tests/unit/externalClients/layersClient.spec.ts b/tests/unit/externalClients/layersClient.spec.ts index 05403c8..42aaffd 100644 --- a/tests/unit/externalClients/layersClient.spec.ts +++ b/tests/unit/externalClients/layersClient.spec.ts @@ -12,7 +12,7 @@ vi.mock('@src/common/syncConfig', () => ({ })); const syncConfig: SyncConfig = { - layers: ['obstacles'], + layers: [{ name: 'obstacles', query: 'query { obstacles { id } }' }], syncIntervalMs: 500, pollIntervalMs: 600_000, pageSize: 100, diff --git a/tests/unit/handler/layerSyncHandler.spec.ts b/tests/unit/handler/layerSyncHandler.spec.ts index c19b5a5..9d76f29 100644 --- a/tests/unit/handler/layerSyncHandler.spec.ts +++ b/tests/unit/handler/layerSyncHandler.spec.ts @@ -29,7 +29,7 @@ vi.mock('@src/externalClients/layersClient/layersClient', () => ({ })); const syncConfig: SyncConfig = { - layers: ['obstacles'], + layers: [{ name: 'obstacles', query: 'query { obstacles { id } }' }], syncIntervalMs: 500, pollIntervalMs: 600_000, pageSize: 100,