@@ -4,6 +4,7 @@ const crypto = require('crypto');
44const hlc = require ( './hlc.js' ) ;
55const Debugger = require ( 'debug' ) ;
66const EventEmitter = require ( 'node:events' ) ;
7+ const assert = require ( 'node:assert' ) ;
78const { performance } = require ( 'node:perf_hooks' ) ;
89
910const LAST_PATCH_AT_TIMESTAMP = 0 ;
@@ -177,7 +178,8 @@ const SQLiteOnSteroid = (db, myPeerId = null, options) => {
177178 deleteOldPatches : ( timestamp ) => deleteOldPatches . run ( timestamp ) ,
178179 // directUpsert : (patch) => directUpsert.run(_directUpsertSQLParamsFn(patch)),
179180 debounceFromTimestamp : Number . MAX_SAFE_INTEGER ,
180- debounceTimer : null
181+ debounceTimer : null ,
182+ knownColumns : _columns
181183 } ;
182184 return {
183185 applyPatchesSQL : _applyPatchesSQL ,
@@ -495,51 +497,105 @@ const SQLiteOnSteroid = (db, myPeerId = null, options) => {
495497 tab : tableName ,
496498 delta : _tableStatement . cleanRow ( rowPatch )
497499 } ;
498- _tableStatement . savePatch ( _patch ) ; // TODO insert by batch top optimize a bit
499- lastSequenceId ++ ; // only increment lastSequenceId if patch is saved. TODO add tests. Make it cyclic, increment time when it switch
500- lastPatchAtTimestamp = _patch . at ;
501- _tableStatement ?. applyPatches ?. ( _patch . at ) ; // It is not executed if patch is stored only in pending_patches table (happens when a ping stat message is generated internally)
500+ try {
501+ _tableStatement . savePatch ( _patch ) ; // TODO insert by batch top optimize a bit
502+ lastSequenceId ++ ; // only increment lastSequenceId if patch is saved. TODO add tests. Make it cyclic, increment time when it switch
503+ lastPatchAtTimestamp = _patch . at ;
504+ _tableStatement ?. applyPatches ?. ( _patch . at ) ; // It is not executed if patch is stored only in pending_patches table (happens when a ping stat message is generated internally)
505+ }
506+ catch ( e ) {
507+ console . error ( 'Error processing patch %o ' , _patch , e . message ) ;
508+ return callback ( e ) ;
509+ }
502510 debugWrite ( '--> all peers %o' , _patch ) ;
503511 _broadcast ( _patch ) ;
504512 callback ?. ( null , _generateSessionToken ( myPeerId , _patch . seq ) ) ;
505513 }
506514
515+ /**
516+ * Test if the database is correctly initialized and if the rowPatch is correctly stored/retrieved in/from the database.
517+ *
518+ * The extracted patch from the database should be the same as the inserted rowPatch.
519+ *
520+ * This function cannot be used in production because it creates a real patch.
521+ * But it can be used (table by table) in the tests to verify that everything is working as expected (BigInt are converted to strings, blob ...)
522+ *
523+ * Same usage as `upsert`
524+ *
525+ * @param {* } tableName
526+ * @param {* } rowPatch
527+ * @param {* } callback(err) returns an error if the Replic-sqlite is not correctly initialized
528+ */
529+ function selfTest ( tableName , rowPatch , callback ) {
530+ upsert ( tableName , rowPatch , ( err ) => {
531+ if ( err ) {
532+ return callback ( err ) ;
533+ }
534+ const _missingPatch = globalStatements . getPatchFromColumn ( myPeerId , 0 , Number . MAX_SAFE_INTEGER ) ;
535+ const _patchRetrieved = JSON . parse ( _missingPatch [ 0 ] . patch ) ;
536+ const _knownColumns = tableStatements [ tableName ] . knownColumns ;
537+ const _knownColumnsSorted = [ ..._knownColumns ] . sort ( ) ;
538+ try {
539+ assert . deepStrictEqual ( Object . keys ( _patchRetrieved . delta ) . sort ( ) , _knownColumnsSorted ) ;
540+ assert . deepStrictEqual ( Object . keys ( rowPatch ) . sort ( ) , _knownColumnsSorted ) ;
541+ }
542+ catch ( e ) {
543+ e . message += '\nReplic-sqlite: Please check that all columns are tested (no missing columns in the patch).' ;
544+ return callback ( e ) ;
545+ }
546+ try {
547+ assert . deepStrictEqual ( _patchRetrieved . delta , rowPatch ) ;
548+ }
549+ catch ( e ) {
550+ e . message += `\nReplic-sqlite: Please check that "prepareStatementHook" and "${ tableName } _patches" table are correctly set to convert table columns for insert/extract operations (e.g., BigInt → string, BLOB → string), etc.\n` ;
551+ return callback ( e ) ;
552+ }
553+ callback ( null ) ;
554+ } ) ;
555+ }
556+
507557 function _onPatchReceivedFromPeers ( patch ) {
508- if ( parseInt ( patch . peer , 10 ) === myPeerId ) {
509- debug ( 'Received patch from myself. Ignore it.' ) ;
510- return ;
511- }
512- if ( patch . ver !== dbVersion ) {
513- // if version mismatch, save it in pending_patches table for later processing
514- tableStatements [ PENDING_PATCHES_TABLE_NAME ] . savePatch ( patch ) ;
558+ try {
559+ if ( parseInt ( patch . peer , 10 ) === myPeerId ) {
560+ debug ( 'Received patch from myself. Ignore it.' ) ;
561+ return ;
562+ }
563+ if ( patch . ver !== dbVersion ) {
564+ // if version mismatch, save it in pending_patches table for later processing
565+ tableStatements [ PENDING_PATCHES_TABLE_NAME ] . savePatch ( patch ) ;
566+ _detectMissingSequenceIds ( patch ) ;
567+ return ;
568+ }
569+ let _tableStatement = tableStatements [ patch . tab ] ;
570+ if ( ! _tableStatement ) {
571+ // should never happen since we only send patches to peers with the same dbVersion (manage above)
572+ console . warn ( `Table ${ patch . tab } not found when receiving patch from peers. Ignore patch.` ) ;
573+ return ;
574+ }
575+ _tableStatement . savePatch ( patch ) ;
515576 _detectMissingSequenceIds ( patch ) ;
516- return ;
577+
578+ // update real table, if it is not a ping message generated internally (stored only in pending_patches) with the short and reserved table name '_'
579+ if ( _tableStatement . applyPatches ) {
580+ // All the patches will be applied in the next event loop
581+ if ( _tableStatement . debounceFromTimestamp === Number . MAX_SAFE_INTEGER ) {
582+ _tableStatement . debounceFromTimestamp -- ; // make sure the function is called once in the next event loop (security)
583+ setImmediate ( ( ) => {
584+ _tableStatement . applyPatches ( _tableStatement . debounceFromTimestamp ) ;
585+ _tableStatement . debounceFromTimestamp = Number . MAX_SAFE_INTEGER ;
586+ } ) ;
587+ }
588+ if ( patch . at < _tableStatement . debounceFromTimestamp ) {
589+ // store the timestamp of the oldest patch to apply for the debouncer
590+ // TODO tets this, verify that it applies the patches really from this timestamp
591+ _tableStatement . debounceFromTimestamp = patch . at ;
592+ }
593+ }
517594 }
518- let _tableStatement = tableStatements [ patch . tab ] ;
519- if ( ! _tableStatement ) {
520- // should never happen since we only send patches to peers with the same dbVersion (manage above)
521- console . warn ( `Table ${ patch . tab } not found when receiving patch from peers. Ignore patch.` ) ;
595+ catch ( e ) {
596+ console . error ( 'Error processing patch %o ' , patch , e . message ) ;
522597 return ;
523598 }
524- _tableStatement . savePatch ( patch ) ;
525- _detectMissingSequenceIds ( patch ) ;
526-
527- // update real table, if it is not a ping message generated internally (stored only in pending_patches) with the short and reserved table name '_'
528- if ( _tableStatement . applyPatches ) {
529- // All the patches will be applied in the next event loop
530- if ( _tableStatement . debounceFromTimestamp === Number . MAX_SAFE_INTEGER ) {
531- _tableStatement . debounceFromTimestamp -- ; // make sure the function is called once in the next event loop (security)
532- setImmediate ( ( ) => {
533- _tableStatement . applyPatches ( _tableStatement . debounceFromTimestamp ) ;
534- _tableStatement . debounceFromTimestamp = Number . MAX_SAFE_INTEGER ;
535- } ) ;
536- }
537- if ( patch . at < _tableStatement . debounceFromTimestamp ) {
538- // store the timestamp of the oldest patch to apply for the debouncer
539- // TODO tets this, verify that it applies the patches really from this timestamp
540- _tableStatement . debounceFromTimestamp = patch . at ;
541- }
542- }
543599 }
544600
545601 function addRemotePeer ( remotePeerId , socket , connectionInfo ) {
@@ -656,7 +712,14 @@ const SQLiteOnSteroid = (db, myPeerId = null, options) => {
656712 function _onRequestForMissingPatchFromPeers ( msg ) {
657713 // TODO retourner une erreur si la demande est trop vieilel (patch table nettoyée)
658714 // TODO limiter msg.maxSeq (max paquet de 100, 1000 ?)
659- const _missingPatch = globalStatements . getPatchFromColumn ( msg . peer , msg . minSeq , msg . maxSeq ) ;
715+ let _missingPatch = [ ] ;
716+ try {
717+ _missingPatch = globalStatements . getPatchFromColumn ( msg . peer , msg . minSeq , msg . maxSeq ) ;
718+ }
719+ catch ( e ) {
720+ console . error ( 'Error processing request for missing patch %o ' , msg , e . message ) ;
721+ return ;
722+ }
660723 if ( peerSockets [ msg . forPeer ] ) {
661724 if ( Array . isArray ( _missingPatch ) ) {
662725 for ( const _patch of _missingPatch ) {
@@ -917,6 +980,7 @@ const SQLiteOnSteroid = (db, myPeerId = null, options) => {
917980 metrics,
918981 backoff,
919982 readYourWrite,
983+ selfTest,
920984 _parseSessionToken,
921985 _generateSessionToken,
922986 _onPatchReceivedFromPeers,
0 commit comments