diff --git a/src/dal/repositories/layerDataRepository.ts b/src/dal/repositories/layerDataRepository.ts index 7de3567..cb018bd 100644 --- a/src/dal/repositories/layerDataRepository.ts +++ b/src/dal/repositories/layerDataRepository.ts @@ -1,6 +1,7 @@ import { trace } from '@opentelemetry/api'; import { asyncCallWithSpan } from '@map-colonies/tracing-utils'; import type { ObjectLiteral, Repository } from 'typeorm'; +import type { Logger } from '@map-colonies/js-logger'; import type { LayerObject } from '../entities'; import { LayerObjectEntity } from '../entities'; import { getDataSource } from '../connection'; @@ -12,7 +13,17 @@ function getRepository(): Repository { return getDataSource().getRepository(LayerObjectEntity); } -export async function insertObjects(layerName: string, objects: LayerObject[]): Promise { +async function insertRow(layerName: string, object: LayerObject): Promise { + await getRepository() + .createQueryBuilder() + .insert() + .into(LayerObjectEntity) + .values({ layerName, id: object.id, geom: object.geom, properties: object.properties } as unknown as ObjectLiteral) + .orIgnore() + .execute(); +} + +export async function insertObjects(logger: Logger, layerName: string, objects: LayerObject[]): Promise { if (objects.length === 0) return; const rows = objects.map((o) => ({ @@ -24,13 +35,31 @@ export async function insertObjects(layerName: string, objects: LayerObject[]): await asyncCallWithSpan( async () => { - await getRepository() - .createQueryBuilder() - .insert() - .into(LayerObjectEntity) - .values(rows as unknown as ObjectLiteral[]) - .orIgnore() - .execute(); + try { + await getRepository() + .createQueryBuilder() + .insert() + .into(LayerObjectEntity) + .values(rows as unknown as ObjectLiteral[]) + .orIgnore() + .execute(); + } catch (batchErr) { + logger.warn({ msg: 'Batch insert failed, falling back to per-object inserts', layerName, count: objects.length, err: batchErr }); + for (const object of objects) { + try { + await insertRow(layerName, object); + } catch (err) { + logger.error({ + msg: 'Insert failed for object, skipping', + layerName, + id: object.id, + geom: object.geom, + properties: object.properties, + err, + }); + } + } + } }, tracer, 'layerDataRepository.insertObjects' diff --git a/src/handler/layerSyncHandler.ts b/src/handler/layerSyncHandler.ts index 3f48b4d..07e5d4a 100644 --- a/src/handler/layerSyncHandler.ts +++ b/src/handler/layerSyncHandler.ts @@ -25,7 +25,7 @@ export async function fetchAndSyncLayerPage(logger: Logger, entry: ScheduleEntry if (response.objects.length > 0) { logger.info(`Inserting ${response.objects.length} new objects into layer "${entry.layerName}"`); - await layerDataRepository.insertObjects(entry.layerName, response.objects); + await layerDataRepository.insertObjects(logger, entry.layerName, response.objects); } if (response.deletedIds.length > 0) { diff --git a/tests/unit/handler/layerSyncHandler.spec.ts b/tests/unit/handler/layerSyncHandler.spec.ts index 7991ed8..af5a1f9 100644 --- a/tests/unit/handler/layerSyncHandler.spec.ts +++ b/tests/unit/handler/layerSyncHandler.spec.ts @@ -99,7 +99,7 @@ describe('layerSyncHandler', () => { await fetchAndSyncLayerPage(makeLogger(), entry); expect(layerClient.fetchPage).toHaveBeenCalledWith(expect.anything(), 'obstacles', '5'); - expect(layerDataRepository.insertObjects).toHaveBeenCalledWith('obstacles', objects); + expect(layerDataRepository.insertObjects).toHaveBeenCalledWith(expect.anything(), 'obstacles', objects); expect(layerDataRepository.deleteDeprecatedObjects).toHaveBeenCalledWith('obstacles', ['old-1']); expect(syncStateRepository.updateSequence).toHaveBeenCalledWith('obstacles', '10'); });