Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 25 additions & 9 deletions store/postgres/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,14 @@ impl CopyState {
dst: Arc<Layout>,
target_block: BlockPtr,
) -> Result<CopyState, StoreError> {
let tables = TableState::load(conn, primary, src.as_ref(), dst.as_ref()).await?;
let tables = TableState::load(
conn,
primary,
src.as_ref(),
dst.as_ref(),
target_block.number,
)
.await?;
let (finished, mut unfinished): (Vec<_>, Vec<_>) =
tables.into_iter().partition(|table| table.finished());
unfinished.sort_by_key(|table| table.dst.object.to_string());
Expand Down Expand Up @@ -329,6 +336,7 @@ struct TableState {
dst_site: Arc<Site>,
batcher: VidBatcher,
duration_ms: i64,
target_block: BlockNumber,
}

impl TableState {
Expand All @@ -351,6 +359,7 @@ impl TableState {
dst_site,
batcher,
duration_ms: 0,
target_block: target_block.number,
})
}

Expand All @@ -363,6 +372,7 @@ impl TableState {
primary: Primary,
src_layout: &Layout,
dst_layout: &Layout,
target_block: BlockNumber,
) -> Result<Vec<TableState>, StoreError> {
use copy_table_state as cts;

Expand Down Expand Up @@ -429,6 +439,7 @@ impl TableState {
dst_site: dst_layout.site.clone(),
batcher,
duration_ms,
target_block,
};
states.push(state);
}
Expand Down Expand Up @@ -503,15 +514,20 @@ impl TableState {
}

async fn copy_batch(&mut self, conn: &mut AsyncPgConnection) -> Result<Status, StoreError> {
let (duration, count) = self
let (duration, count): (_, Option<i32>) = self
.batcher
.step(async |start, end| {
let count =
rq::CopyEntityBatchQuery::new(self.dst.as_ref(), &self.src, start, end)?
.count_current()
.get_result::<i64>(conn)
.await
.optional()?;
.step(async |start: i64, end: i64| {
let count = rq::CopyEntityBatchQuery::new(
self.dst.as_ref(),
&self.src,
start,
end,
self.target_block,
)?
.count_current()
.get_result::<i64>(conn)
.await
.optional()?;
Ok(count.unwrap_or(0) as i32)
})
.await?;
Expand Down
20 changes: 15 additions & 5 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1648,11 +1648,21 @@ impl DeploymentStore {
.await?;
}

// Rewind the subgraph so that entity versions that are
// clamped in the future (beyond `block`) become valid for
// all blocks after `block`. `revert_block` gets rid of
// everything including the block passed to it. We want to
// preserve `block` and therefore revert `block+1`
// CopyEntityBatchQuery now reverts entity versions
// during copying, making this rewind redundant for new
// copies. We keep it for backward compatibility: a copy
// that was started before this change and is resumed
// after upgrading will have already-copied rows that
// weren't reverted during copy. For data that was
// already reverted during copy, this is a no-op. This
// code can be removed once a release with this change
// has been out for a while and we are sure that there
// are no more copies in progress that started before
// the change
//
// `revert_block` gets rid of everything including the
// block passed to it. We want to preserve `block` and
// therefore revert `block+1`
let start = Instant::now();
let block_to_revert: BlockNumber = block
.number
Expand Down
24 changes: 23 additions & 1 deletion store/postgres/src/relational_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5093,6 +5093,7 @@ pub struct CopyEntityBatchQuery<'a> {
columns: Vec<&'a Column>,
first_vid: i64,
last_vid: i64,
target_block: BlockNumber,
}

impl<'a> CopyEntityBatchQuery<'a> {
Expand All @@ -5101,6 +5102,7 @@ impl<'a> CopyEntityBatchQuery<'a> {
src: &'a Table,
first_vid: i64,
last_vid: i64,
target_block: BlockNumber,
) -> Result<Self, StoreError> {
let mut columns = Vec::new();
for dcol in &dst.columns {
Expand All @@ -5127,6 +5129,7 @@ impl<'a> CopyEntityBatchQuery<'a> {
columns,
first_vid,
last_vid,
target_block,
})
}

Expand Down Expand Up @@ -5211,7 +5214,16 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
);
out.push_sql(&checked_conversion);
}
(false, false) => out.push_sql(BLOCK_RANGE_COLUMN),
(false, false) => {
let range_conv = format!(
r#"
case when upper({BLOCK_RANGE_COLUMN}) > {}
then int4range(lower({BLOCK_RANGE_COLUMN}), null)
else {BLOCK_RANGE_COLUMN} end"#,
self.target_block
);
out.push_sql(&range_conv)
}
}

match (self.src.has_causality_region, self.dst.has_causality_region) {
Expand Down Expand Up @@ -5241,6 +5253,16 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
out.push_bind_param::<BigInt, _>(&self.first_vid)?;
out.push_sql(" and vid <= ");
out.push_bind_param::<BigInt, _>(&self.last_vid)?;
out.push_sql(" and ");
if self.src.immutable {
out.push_sql(BLOCK_COLUMN);
} else {
out.push_sql("lower(");
out.push_sql(BLOCK_RANGE_COLUMN);
out.push_sql(")");
}
out.push_sql(" <= ");
out.push_bind_param::<Integer, _>(&self.target_block)?;
out.push_sql("\n returning ");
if self.dst.immutable {
out.push_sql("true");
Expand Down
Loading