Skip to content

Commit 3a57b1b

Browse files
authored
Merge pull request #76 from enigbe/2025-12-bound-incoming-request-and-postgres-service
Bound incoming request and add postgres service
2 parents 2d7cb75 + b6d80c7 commit 3a57b1b

File tree

6 files changed

+203
-15
lines changed

6 files changed

+203
-15
lines changed

rust/docker-compose.yml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
services:
2+
postgres:
3+
image: postgres:15
4+
environment:
5+
POSTGRES_DB: postgres
6+
POSTGRES_USER: postgres
7+
POSTGRES_PASSWORD: postgres
8+
volumes:
9+
- postgres-data:/var/lib/postgresql/data
10+
ports:
11+
- "5432:5432"
12+
networks:
13+
- app-network
14+
15+
volumes:
16+
postgres-data:
17+
18+
networks:
19+
app-network:
20+
driver: bridge

rust/impls/src/postgres_store.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -699,6 +699,10 @@ mod tests {
699699
use super::{drop_database, DUMMY_MIGRATION, MIGRATIONS};
700700
use crate::postgres_store::PostgresPlaintextBackend;
701701
use api::define_kv_store_tests;
702+
use api::kv_store::KvStore;
703+
use api::types::{DeleteObjectRequest, GetObjectRequest, KeyValue, PutObjectRequest};
704+
705+
use bytes::Bytes;
702706
use tokio::sync::OnceCell;
703707
use tokio_postgres::NoTls;
704708

@@ -814,4 +818,70 @@ mod tests {
814818

815819
drop_database(POSTGRES_ENDPOINT, DEFAULT_DB, vss_db, NoTls).await.unwrap();
816820
}
821+
822+
#[tokio::test]
823+
async fn supports_objects_up_to_non_large_object_threshold() {
824+
let vss_db = "supports_objects_up_to_non_large_object_threshold";
825+
let _ = drop_database(POSTGRES_ENDPOINT, DEFAULT_DB, vss_db, NoTls).await;
826+
827+
const MAXIMUM_SUPPORTED_VALUE_SIZE: usize = 1024 * 1024 * 1024;
828+
const PROTOCOL_OVERHEAD_MARGIN: usize = 150;
829+
830+
// Construct entry that's for a field that's the maximum size of a non-"large_object" object
831+
let large_value = vec![0u8; MAXIMUM_SUPPORTED_VALUE_SIZE - PROTOCOL_OVERHEAD_MARGIN];
832+
let kv = KeyValue { key: "k1".into(), version: 0, value: Bytes::from(large_value.clone()) };
833+
834+
{
835+
let store =
836+
PostgresPlaintextBackend::new(POSTGRES_ENDPOINT, DEFAULT_DB, vss_db).await.unwrap();
837+
let (start, end) = store.migrate_vss_database(MIGRATIONS).await.unwrap();
838+
assert_eq!(start, MIGRATIONS_START);
839+
assert_eq!(end, MIGRATIONS_END);
840+
assert_eq!(store.get_upgrades_list().await, [MIGRATIONS_START]);
841+
assert_eq!(store.get_schema_version().await, MIGRATIONS_END);
842+
843+
// Round trip with non-large_object of threshold size
844+
845+
store
846+
.put(
847+
"token".to_string(),
848+
PutObjectRequest {
849+
store_id: "store_id".to_string(),
850+
global_version: None,
851+
transaction_items: vec![kv],
852+
delete_items: vec![],
853+
},
854+
)
855+
.await
856+
.unwrap();
857+
858+
let resp_kv = store
859+
.get(
860+
"token".to_string(),
861+
GetObjectRequest { store_id: "store_id".to_string(), key: "k1".to_string() },
862+
)
863+
.await
864+
.unwrap()
865+
.value
866+
.unwrap();
867+
assert_eq!(
868+
resp_kv.value.len(),
869+
MAXIMUM_SUPPORTED_VALUE_SIZE - PROTOCOL_OVERHEAD_MARGIN
870+
);
871+
assert_eq!(resp_kv.value, large_value);
872+
873+
store
874+
.delete(
875+
"token".to_string(),
876+
DeleteObjectRequest {
877+
store_id: "store_id".to_string(),
878+
key_value: Some(resp_kv),
879+
},
880+
)
881+
.await
882+
.unwrap();
883+
};
884+
885+
drop_database(POSTGRES_ENDPOINT, DEFAULT_DB, vss_db, NoTls).await.unwrap();
886+
}
817887
}

rust/server/src/main.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use auth_impls::jwt::JWTAuthorizer;
2929
use auth_impls::signature::SignatureValidatingAuthorizer;
3030
use impls::postgres_store::{PostgresPlaintextBackend, PostgresTlsBackend};
3131
use util::logger::ServerLogger;
32-
use vss_service::VssService;
32+
use vss_service::{VssService, VssServiceConfig};
3333

3434
mod util;
3535
mod vss_service;
@@ -42,6 +42,16 @@ fn main() {
4242
eprintln!("Failed to load configuration: {}", e);
4343
std::process::exit(-1);
4444
});
45+
let vss_service_config = match config.max_request_body_size {
46+
Some(size) => match VssServiceConfig::new(size) {
47+
Ok(config) => config,
48+
Err(e) => {
49+
eprintln!("Configuration validation error: {}", e);
50+
std::process::exit(-1);
51+
},
52+
},
53+
None => VssServiceConfig::default(),
54+
};
4555

4656
let logger = match ServerLogger::init(config.log_level, &config.log_file) {
4757
Ok(logger) => logger,
@@ -162,7 +172,7 @@ fn main() {
162172
match res {
163173
Ok((stream, _)) => {
164174
let io_stream = TokioIo::new(stream);
165-
let vss_service = VssService::new(Arc::clone(&store), Arc::clone(&authorizer));
175+
let vss_service = VssService::new(Arc::clone(&store), Arc::clone(&authorizer), vss_service_config);
166176
runtime.spawn(async move {
167177
if let Err(err) = http1::Builder::new().serve_connection(io_stream, vss_service).await {
168178
warn!("Failed to serve connection: {}", err);

rust/server/src/util/config.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use log::LevelFilter;
22
use serde::Deserialize;
3-
use std::net::SocketAddr;
43
use std::path::PathBuf;
54

65
const BIND_ADDR_VAR: &str = "VSS_BIND_ADDRESS";
6+
const MAX_REQUEST_BODY_SIZE_VAR: &str = "VSS_MAX_REQUEST_BODY_SIZE";
77
const LOG_FILE_VAR: &str = "VSS_LOG_FILE";
88
const LOG_LEVEL_VAR: &str = "VSS_LOG_LEVEL";
99
const JWT_RSA_PEM_VAR: &str = "VSS_JWT_RSA_PEM";
@@ -28,6 +28,7 @@ struct TomlConfig {
2828
#[derive(Deserialize)]
2929
struct ServerConfig {
3030
bind_address: Option<String>,
31+
max_request_body_size: Option<usize>,
3132
}
3233

3334
#[derive(Deserialize)]
@@ -59,6 +60,7 @@ struct LogConfig {
5960
// Encapsulates the result of reading both the environment variables and the config file.
6061
pub(crate) struct Configuration {
6162
pub(crate) bind_address: String,
63+
pub(crate) max_request_body_size: Option<usize>,
6264
pub(crate) rsa_pem: Option<String>,
6365
pub(crate) postgresql_prefix: String,
6466
pub(crate) default_db: String,
@@ -99,14 +101,28 @@ pub(crate) fn load_configuration(config_file_path: Option<&str>) -> Result<Confi
99101
None => TomlConfig::default(), // All fields are set to `None`
100102
};
101103

104+
let (bind_address_config, max_request_body_size_config) = match server_config {
105+
Some(c) => (c.bind_address, c.max_request_body_size),
106+
None => (None, None),
107+
};
108+
102109
let bind_address_env = read_env(BIND_ADDR_VAR)?;
103110
let bind_address = read_config(
104111
bind_address_env,
105-
server_config.and_then(|c| c.bind_address),
112+
bind_address_config,
106113
"VSS server bind address",
107114
BIND_ADDR_VAR,
108115
)?;
109116

117+
let max_request_body_size_env = read_env(MAX_REQUEST_BODY_SIZE_VAR)?
118+
.map(|mrbs| {
119+
mrbs.parse::<usize>().map_err(|e| {
120+
format!("Unable to parse the maximum request body size environment variable: {}", e)
121+
})
122+
})
123+
.transpose()?;
124+
let max_request_body_size = max_request_body_size_env.or(max_request_body_size_config);
125+
110126
let log_level_env: Option<LevelFilter> = read_env(LOG_LEVEL_VAR)?
111127
.map(|level_str| {
112128
level_str
@@ -187,6 +203,7 @@ pub(crate) fn load_configuration(config_file_path: Option<&str>) -> Result<Confi
187203

188204
Ok(Configuration {
189205
bind_address,
206+
max_request_body_size,
190207
log_file,
191208
log_level,
192209
rsa_pem,

rust/server/src/vss_service.rs

Lines changed: 78 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use http_body_util::{BodyExt, Full};
1+
use http_body_util::{BodyExt, Full, Limited};
22
use hyper::body::{Bytes, Incoming};
33
use hyper::service::Service;
44
use hyper::{Request, Response, StatusCode};
@@ -22,15 +22,44 @@ use log::{debug, trace};
2222

2323
use crate::util::KeyValueVecKeyPrinter;
2424

25+
const MAXIMUM_REQUEST_BODY_SIZE: usize = 1024 * 1024 * 1024;
26+
27+
#[derive(Clone, Copy)]
28+
pub(crate) struct VssServiceConfig {
29+
maximum_request_body_size: usize,
30+
}
31+
32+
impl VssServiceConfig {
33+
pub fn new(maximum_request_body_size: usize) -> Result<Self, String> {
34+
if maximum_request_body_size > MAXIMUM_REQUEST_BODY_SIZE {
35+
return Err(format!(
36+
"Maximum request body size {} exceeds maximum {}",
37+
maximum_request_body_size, MAXIMUM_REQUEST_BODY_SIZE
38+
));
39+
}
40+
41+
Ok(Self { maximum_request_body_size })
42+
}
43+
}
44+
45+
impl Default for VssServiceConfig {
46+
fn default() -> Self {
47+
Self { maximum_request_body_size: MAXIMUM_REQUEST_BODY_SIZE }
48+
}
49+
}
50+
2551
#[derive(Clone)]
2652
pub struct VssService {
2753
store: Arc<dyn KvStore>,
2854
authorizer: Arc<dyn Authorizer>,
55+
config: VssServiceConfig,
2956
}
3057

3158
impl VssService {
32-
pub(crate) fn new(store: Arc<dyn KvStore>, authorizer: Arc<dyn Authorizer>) -> Self {
33-
Self { store, authorizer }
59+
pub(crate) fn new(
60+
store: Arc<dyn KvStore>, authorizer: Arc<dyn Authorizer>, config: VssServiceConfig,
61+
) -> Self {
62+
Self { store, authorizer, config }
3463
}
3564
}
3665

@@ -45,22 +74,51 @@ impl Service<Request<Incoming>> for VssService {
4574
let store = Arc::clone(&self.store);
4675
let authorizer = Arc::clone(&self.authorizer);
4776
let path = req.uri().path().to_owned();
77+
let maximum_request_body_size = self.config.maximum_request_body_size;
4878

4979
Box::pin(async move {
5080
let prefix_stripped_path = path.strip_prefix(BASE_PATH_PREFIX).unwrap_or_default();
5181

5282
match prefix_stripped_path {
5383
"/getObject" => {
54-
handle_request(store, authorizer, req, handle_get_object_request).await
84+
handle_request(
85+
store,
86+
authorizer,
87+
req,
88+
maximum_request_body_size,
89+
handle_get_object_request,
90+
)
91+
.await
5592
},
5693
"/putObjects" => {
57-
handle_request(store, authorizer, req, handle_put_object_request).await
94+
handle_request(
95+
store,
96+
authorizer,
97+
req,
98+
maximum_request_body_size,
99+
handle_put_object_request,
100+
)
101+
.await
58102
},
59103
"/deleteObject" => {
60-
handle_request(store, authorizer, req, handle_delete_object_request).await
104+
handle_request(
105+
store,
106+
authorizer,
107+
req,
108+
maximum_request_body_size,
109+
handle_delete_object_request,
110+
)
111+
.await
61112
},
62113
"/listKeyVersions" => {
63-
handle_request(store, authorizer, req, handle_list_object_request).await
114+
handle_request(
115+
store,
116+
authorizer,
117+
req,
118+
maximum_request_body_size,
119+
handle_list_object_request,
120+
)
121+
.await
64122
},
65123
_ => {
66124
let error_msg = "Invalid request path.".as_bytes();
@@ -140,7 +198,7 @@ async fn handle_request<
140198
Fut: Future<Output = Result<R, VssError>> + Send,
141199
>(
142200
store: Arc<dyn KvStore>, authorizer: Arc<dyn Authorizer>, request: Request<Incoming>,
143-
handler: F,
201+
maximum_request_body_size: usize, handler: F,
144202
) -> Result<<VssService as Service<Request<Incoming>>>::Response, hyper::Error> {
145203
let (parts, body) = request.into_parts();
146204
let headers_map = parts
@@ -155,8 +213,18 @@ async fn handle_request<
155213
Ok(auth_response) => auth_response.user_token,
156214
Err(e) => return Ok(build_error_response(e)),
157215
};
158-
// TODO: we should bound the amount of data we read to avoid allocating too much memory.
159-
let bytes = body.collect().await?.to_bytes();
216+
217+
let limited_body = Limited::new(body, maximum_request_body_size);
218+
let bytes = match limited_body.collect().await {
219+
Ok(body) => body.to_bytes(),
220+
Err(_) => {
221+
return Ok(Response::builder()
222+
.status(StatusCode::PAYLOAD_TOO_LARGE)
223+
.body(Full::new(Bytes::from("Request body too large")))
224+
// unwrap safety: body only errors when previous chained calls failed.
225+
.unwrap());
226+
},
227+
};
160228
match T::decode(bytes) {
161229
Ok(request) => match handler(store.clone(), user_token, request).await {
162230
Ok(response) => Ok(Response::builder()

rust/server/vss-server-config.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
[server_config]
22
bind_address = "127.0.0.1:8080" # Optional in TOML, can be overridden by env var `VSS_BIND_ADDRESS`
3+
# Maximum request body size in bytes. Can be set here or be overridden by env var 'VSS_MAX_REQUEST_BODY_SIZE'
4+
# Defaults to the maximum possible value of 1 GB if unset.
5+
# max_request_body_size = 1073741824
36

47
# Uncomment the table below to verify JWT tokens in the HTTP Authorization header against the given RSA public key,
58
# can be overridden by env var `VSS_JWT_RSA_PEM`
@@ -28,4 +31,4 @@ vss_database = "vss" # Optional in TOML, can be overridden by env var
2831

2932
# [log_config]
3033
# level = "debug" # Uncomment, or set env var `VSS_LOG_LEVEL` to set the log level, the default is "debug"
31-
# file = "vss.log" # Uncomment, or set env var `VSS_LOG_FILE` to set the log file path, the default is "vss.log"
34+
# file = "vss.log" # Uncomment, or set env var `VSS_LOG_FILE` to set the log file path, the default is "vss.log"

0 commit comments

Comments
 (0)