diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index 54c1a03a896..fd6cd2c4fe0 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -178,7 +178,14 @@ impl CopyState { dst: Arc, target_block: BlockPtr, ) -> Result { - let tables = TableState::load(conn, primary, src.as_ref(), dst.as_ref()).await?; + let tables = TableState::load( + conn, + primary, + src.as_ref(), + dst.as_ref(), + target_block.number, + ) + .await?; let (finished, mut unfinished): (Vec<_>, Vec<_>) = tables.into_iter().partition(|table| table.finished()); unfinished.sort_by_key(|table| table.dst.object.to_string()); @@ -329,6 +336,7 @@ struct TableState { dst_site: Arc, batcher: VidBatcher, duration_ms: i64, + target_block: BlockNumber, } impl TableState { @@ -351,6 +359,7 @@ impl TableState { dst_site, batcher, duration_ms: 0, + target_block: target_block.number, }) } @@ -363,6 +372,7 @@ impl TableState { primary: Primary, src_layout: &Layout, dst_layout: &Layout, + target_block: BlockNumber, ) -> Result, StoreError> { use copy_table_state as cts; @@ -429,6 +439,7 @@ impl TableState { dst_site: dst_layout.site.clone(), batcher, duration_ms, + target_block, }; states.push(state); } @@ -503,15 +514,20 @@ impl TableState { } async fn copy_batch(&mut self, conn: &mut AsyncPgConnection) -> Result { - let (duration, count) = self + let (duration, count): (_, Option) = self .batcher - .step(async |start, end| { - let count = - rq::CopyEntityBatchQuery::new(self.dst.as_ref(), &self.src, start, end)? - .count_current() - .get_result::(conn) - .await - .optional()?; + .step(async |start: i64, end: i64| { + let count = rq::CopyEntityBatchQuery::new( + self.dst.as_ref(), + &self.src, + start, + end, + self.target_block, + )? + .count_current() + .get_result::(conn) + .await + .optional()?; Ok(count.unwrap_or(0) as i32) }) .await?; diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 4d8df5c3a28..d4ab71c7f11 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1648,11 +1648,21 @@ impl DeploymentStore { .await?; } - // Rewind the subgraph so that entity versions that are - // clamped in the future (beyond `block`) become valid for - // all blocks after `block`. `revert_block` gets rid of - // everything including the block passed to it. We want to - // preserve `block` and therefore revert `block+1` + // CopyEntityBatchQuery now reverts entity versions + // during copying, making this rewind redundant for new + // copies. We keep it for backward compatibility: a copy + // that was started before this change and is resumed + // after upgrading will have already-copied rows that + // weren't reverted during copy. For data that was + // already reverted during copy, this is a no-op. This + // code can be removed once a release with this change + // has been out for a while and we are sure that there + // are no more copies in progress that started before + // the change + // + // `revert_block` gets rid of everything including the + // block passed to it. We want to preserve `block` and + // therefore revert `block+1` let start = Instant::now(); let block_to_revert: BlockNumber = block .number diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index 1c746f1338e..34d88585ab0 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -5093,6 +5093,7 @@ pub struct CopyEntityBatchQuery<'a> { columns: Vec<&'a Column>, first_vid: i64, last_vid: i64, + target_block: BlockNumber, } impl<'a> CopyEntityBatchQuery<'a> { @@ -5101,6 +5102,7 @@ impl<'a> CopyEntityBatchQuery<'a> { src: &'a Table, first_vid: i64, last_vid: i64, + target_block: BlockNumber, ) -> Result { let mut columns = Vec::new(); for dcol in &dst.columns { @@ -5127,6 +5129,7 @@ impl<'a> CopyEntityBatchQuery<'a> { columns, first_vid, last_vid, + target_block, }) } @@ -5211,7 +5214,16 @@ impl<'a> QueryFragment for CopyEntityBatchQuery<'a> { ); out.push_sql(&checked_conversion); } - (false, false) => out.push_sql(BLOCK_RANGE_COLUMN), + (false, false) => { + let range_conv = format!( + r#" + case when upper({BLOCK_RANGE_COLUMN}) > {} + then int4range(lower({BLOCK_RANGE_COLUMN}), null) + else {BLOCK_RANGE_COLUMN} end"#, + self.target_block + ); + out.push_sql(&range_conv) + } } match (self.src.has_causality_region, self.dst.has_causality_region) { @@ -5241,6 +5253,16 @@ impl<'a> QueryFragment for CopyEntityBatchQuery<'a> { out.push_bind_param::(&self.first_vid)?; out.push_sql(" and vid <= "); out.push_bind_param::(&self.last_vid)?; + out.push_sql(" and "); + if self.src.immutable { + out.push_sql(BLOCK_COLUMN); + } else { + out.push_sql("lower("); + out.push_sql(BLOCK_RANGE_COLUMN); + out.push_sql(")"); + } + out.push_sql(" <= "); + out.push_bind_param::(&self.target_block)?; out.push_sql("\n returning "); if self.dst.immutable { out.push_sql("true");