From bc709f0f23be345a1e2ccd06acd36bd5dac40bde Mon Sep 17 00:00:00 2001 From: Rutger Broekhoff Date: Fri, 12 Jul 2024 00:29:57 +0200 Subject: Restructure server --- gitolfs3-server/src/handler.rs | 388 ++++++++++++++++++++--------------------- 1 file changed, 194 insertions(+), 194 deletions(-) (limited to 'gitolfs3-server/src/handler.rs') diff --git a/gitolfs3-server/src/handler.rs b/gitolfs3-server/src/handler.rs index 6516291..b9f9bcf 100644 --- a/gitolfs3-server/src/handler.rs +++ b/gitolfs3-server/src/handler.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, sync::Arc}; use aws_sdk_s3::{error::SdkError, operation::head_object::HeadObjectOutput}; use axum::{ extract::{Path, State}, - http::{header, HeaderMap, StatusCode}, + http, response::{IntoResponse, Response}, Json, }; @@ -33,102 +33,6 @@ pub struct AppState { pub dl_limiter: Arc>, } -fn validate_checksum(oid: Oid, obj: &HeadObjectOutput) -> bool { - if let Some(checksum) = obj.checksum_sha256() { - if let Ok(checksum) = BASE64_STANDARD.decode(checksum) { - if let Ok(checksum32b) = TryInto::<[u8; 32]>::try_into(checksum) { - return Oid::from(checksum32b) == oid; - } - } - } - true -} - -fn validate_size(expected: i64, obj: &HeadObjectOutput) -> bool { - if let Some(length) = obj.content_length() { - return length == expected; - } - true -} - -async fn handle_upload_object( - state: &AppState, - repo: &str, - obj: &BatchRequestObject, -) -> Option { - let (oid0, oid1) = (HexByte(obj.oid[0]), HexByte(obj.oid[1])); - let full_path = format!("{repo}/lfs/objects/{}/{}/{}", oid0, oid1, obj.oid); - - match state - .s3_client - .head_object() - .bucket(&state.s3_bucket) - .key(full_path.clone()) - .checksum_mode(aws_sdk_s3::types::ChecksumMode::Enabled) - .send() - .await - { - Ok(result) => { - if validate_size(obj.size, &result) && validate_checksum(obj.oid, &result) { - return None; - } - } - Err(SdkError::ServiceError(e)) if e.err().is_not_found() => {} - Err(e) => { - println!("Failed to HeadObject (repo {repo}, OID {}): {e}", obj.oid); - return Some(BatchResponseObject::error( - obj, - StatusCode::INTERNAL_SERVER_ERROR, - "Failed to query object information".to_string(), - )); - } - }; - - let expires_in = std::time::Duration::from_secs(5 * 60); - let expires_at = Utc::now() + expires_in; - - let Ok(config) = aws_sdk_s3::presigning::PresigningConfig::expires_in(expires_in) else { - return Some(BatchResponseObject::error( - obj, - StatusCode::INTERNAL_SERVER_ERROR, - "Failed to generate upload URL".to_string(), - )); - }; - let Ok(presigned) = state - .s3_client - .put_object() - .bucket(&state.s3_bucket) - .key(full_path) - .checksum_sha256(obj.oid.to_string()) - .content_length(obj.size) - .presigned(config) - .await - else { - return Some(BatchResponseObject::error( - obj, - StatusCode::INTERNAL_SERVER_ERROR, - "Failed to generate upload URL".to_string(), - )); - }; - Some(BatchResponseObject { - oid: obj.oid, - size: obj.size, - authenticated: Some(true), - actions: BatchResponseObjectActions { - upload: Some(BatchResponseObjectAction { - header: presigned - .headers() - .map(|(k, v)| (k.to_owned(), v.to_owned())) - .collect(), - expires_at, - href: presigned.uri().to_string(), - }), - ..Default::default() - }, - error: None, - }) -} - async fn handle_download_object( state: &AppState, repo: &str, @@ -152,24 +56,24 @@ async fn handle_download_object( println!("Failed to HeadObject (repo {repo}, OID {}): {e}", obj.oid); return BatchResponseObject::error( obj, - StatusCode::INTERNAL_SERVER_ERROR, + http::StatusCode::INTERNAL_SERVER_ERROR, "Failed to query object information".to_string(), ); } }; - // Scaleway actually doesn't provide SHA256 suport, but maybe in the future :) - if !validate_checksum(obj.oid, &result) { + // Scaleway actually doesn't provide SHA256 support, but maybe in the future :) + if !s3_validate_checksum(obj.oid, &result) { return BatchResponseObject::error( obj, - StatusCode::UNPROCESSABLE_ENTITY, + http::StatusCode::UNPROCESSABLE_ENTITY, "Object corrupted".to_string(), ); } - if !validate_size(obj.size, &result) { + if !s3_validate_size(obj.size, &result) { return BatchResponseObject::error( obj, - StatusCode::UNPROCESSABLE_ENTITY, + http::StatusCode::UNPROCESSABLE_ENTITY, "Incorrect size specified (or object corrupted)".to_string(), ); } @@ -181,7 +85,7 @@ async fn handle_download_object( let Ok(config) = aws_sdk_s3::presigning::PresigningConfig::expires_in(expires_in) else { return BatchResponseObject::error( obj, - StatusCode::INTERNAL_SERVER_ERROR, + http::StatusCode::INTERNAL_SERVER_ERROR, "Failed to generate upload URL".to_string(), ); }; @@ -195,7 +99,7 @@ async fn handle_download_object( else { return BatchResponseObject::error( obj, - StatusCode::INTERNAL_SERVER_ERROR, + http::StatusCode::INTERNAL_SERVER_ERROR, "Failed to generate upload URL".to_string(), ); }; @@ -231,7 +135,7 @@ async fn handle_download_object( Ok(false) => { return BatchResponseObject::error( obj, - StatusCode::SERVICE_UNAVAILABLE, + http::StatusCode::SERVICE_UNAVAILABLE, "Public LFS downloads temporarily unavailable".to_string(), ); } @@ -239,7 +143,7 @@ async fn handle_download_object( println!("Failed to request {content_length} bytes from download limiter: {e}"); return BatchResponseObject::error( obj, - StatusCode::INTERNAL_SERVER_ERROR, + http::StatusCode::INTERNAL_SERVER_ERROR, "Internal server error".to_string(), ); } @@ -257,7 +161,7 @@ async fn handle_download_object( ) else { return BatchResponseObject::error( obj, - StatusCode::INTERNAL_SERVER_ERROR, + http::StatusCode::INTERNAL_SERVER_ERROR, "Internal server error".to_string(), ); }; @@ -292,83 +196,6 @@ async fn handle_download_object( } } -fn repo_exists(name: &str) -> bool { - let Ok(metadata) = std::fs::metadata(name) else { - return false; - }; - metadata.is_dir() -} - -fn is_repo_public(name: &str) -> Option { - if !repo_exists(name) { - return None; - } - match std::fs::metadata(format!("{name}/git-daemon-export-ok")) { - Ok(metadata) if metadata.is_file() => Some(true), - Err(e) if e.kind() == std::io::ErrorKind::NotFound => Some(false), - _ => None, - } -} - -pub async fn batch( - State(state): State>, - headers: HeaderMap, - RepositoryName(repo): RepositoryName, - GitLfsJson(Json(payload)): GitLfsJson, -) -> Response { - let Some(public) = is_repo_public(&repo) else { - return REPO_NOT_FOUND.into_response(); - }; - let Trusted(trusted) = match authorize_batch( - &state.authz_conf, - &repo, - public, - payload.operation, - &headers, - ) { - Ok(authn) => authn, - Err(e) => return e.into_response(), - }; - - if !headers - .get_all("Accept") - .iter() - .filter_map(|v| v.to_str().ok()) - .any(is_git_lfs_json_mimetype) - { - let message = format!("Expected `{LFS_MIME}` in list of acceptable response media types"); - return make_error_resp(StatusCode::NOT_ACCEPTABLE, &message).into_response(); - } - - if payload.hash_algo != HashAlgo::Sha256 { - let message = "Unsupported hashing algorithm specified"; - return make_error_resp(StatusCode::CONFLICT, message).into_response(); - } - if !payload.transfers.is_empty() && !payload.transfers.contains(&TransferAdapter::Basic) { - let message = "Unsupported transfer adapter specified (supported: basic)"; - return make_error_resp(StatusCode::CONFLICT, message).into_response(); - } - - let mut resp = BatchResponse { - transfer: TransferAdapter::Basic, - objects: vec![], - hash_algo: HashAlgo::Sha256, - }; - for obj in payload.objects { - match payload.operation { - Operation::Download => resp - .objects - .push(handle_download_object(&state, &repo, &obj, trusted).await), - Operation::Upload => { - if let Some(obj_resp) = handle_upload_object(&state, &repo, &obj).await { - resp.objects.push(obj_resp); - } - } - }; - } - GitLfsJson(Json(resp)).into_response() -} - #[derive(Deserialize, Copy, Clone)] #[serde(remote = "Self")] pub struct FileParams { @@ -382,11 +209,11 @@ impl<'de> Deserialize<'de> for FileParams { where D: serde::Deserializer<'de>, { - let unchecked @ FileParams { + let unchecked @ Self { oid0: HexByte(oid0), oid1: HexByte(oid1), oid, - } = FileParams::deserialize(deserializer)?; + } = Self::deserialize(deserializer)?; if oid0 != oid.as_bytes()[0] { return Err(de::Error::custom( "first OID path part does not match first byte of full OID", @@ -401,9 +228,9 @@ impl<'de> Deserialize<'de> for FileParams { } } -pub async fn obj_download( +pub async fn handle_obj_download( State(state): State>, - headers: HeaderMap, + headers: http::HeaderMap, RepositoryName(repo): RepositoryName, Path(FileParams { oid0, oid1, oid }): Path, ) -> Response { @@ -425,26 +252,26 @@ pub async fn obj_download( Err(e) => { println!("Failed to GetObject (repo {repo}, OID {oid}): {e}"); return ( - StatusCode::INTERNAL_SERVER_ERROR, + http::StatusCode::INTERNAL_SERVER_ERROR, "Failed to query object information", ) .into_response(); } }; - let mut headers = header::HeaderMap::new(); + let mut headers = http::header::HeaderMap::new(); if let Some(content_type) = result.content_type { let Ok(header_value) = content_type.try_into() else { return ( - StatusCode::INTERNAL_SERVER_ERROR, + http::StatusCode::INTERNAL_SERVER_ERROR, "Object has invalid content type", ) .into_response(); }; - headers.insert(header::CONTENT_TYPE, header_value); + headers.insert(http::header::CONTENT_TYPE, header_value); } if let Some(content_length) = result.content_length { - headers.insert(header::CONTENT_LENGTH, content_length.into()); + headers.insert(http::header::CONTENT_LENGTH, content_length.into()); } let async_read = result.body.into_async_read(); @@ -453,3 +280,176 @@ pub async fn obj_download( (headers, body).into_response() } + +async fn handle_upload_object( + state: &AppState, + repo: &str, + obj: &BatchRequestObject, +) -> Option { + let (oid0, oid1) = (HexByte(obj.oid[0]), HexByte(obj.oid[1])); + let full_path = format!("{repo}/lfs/objects/{}/{}/{}", oid0, oid1, obj.oid); + + match state + .s3_client + .head_object() + .bucket(&state.s3_bucket) + .key(full_path.clone()) + .checksum_mode(aws_sdk_s3::types::ChecksumMode::Enabled) + .send() + .await + { + Ok(result) => { + if s3_validate_size(obj.size, &result) && s3_validate_checksum(obj.oid, &result) { + return None; + } + } + Err(SdkError::ServiceError(e)) if e.err().is_not_found() => {} + Err(e) => { + println!("Failed to HeadObject (repo {repo}, OID {}): {e}", obj.oid); + return Some(BatchResponseObject::error( + obj, + http::StatusCode::INTERNAL_SERVER_ERROR, + "Failed to query object information".to_string(), + )); + } + }; + + let expires_in = std::time::Duration::from_secs(5 * 60); + let expires_at = Utc::now() + expires_in; + + let Ok(config) = aws_sdk_s3::presigning::PresigningConfig::expires_in(expires_in) else { + return Some(BatchResponseObject::error( + obj, + http::StatusCode::INTERNAL_SERVER_ERROR, + "Failed to generate upload URL".to_string(), + )); + }; + let Ok(presigned) = state + .s3_client + .put_object() + .bucket(&state.s3_bucket) + .key(full_path) + .checksum_sha256(obj.oid.to_string()) + .content_length(obj.size) + .presigned(config) + .await + else { + return Some(BatchResponseObject::error( + obj, + http::StatusCode::INTERNAL_SERVER_ERROR, + "Failed to generate upload URL".to_string(), + )); + }; + Some(BatchResponseObject { + oid: obj.oid, + size: obj.size, + authenticated: Some(true), + actions: BatchResponseObjectActions { + upload: Some(BatchResponseObjectAction { + header: presigned + .headers() + .map(|(k, v)| (k.to_owned(), v.to_owned())) + .collect(), + expires_at, + href: presigned.uri().to_string(), + }), + ..Default::default() + }, + error: None, + }) +} + +pub async fn handle_batch( + State(state): State>, + headers: http::HeaderMap, + RepositoryName(repo): RepositoryName, + GitLfsJson(Json(payload)): GitLfsJson, +) -> Response { + let Some(public) = is_repo_public(&repo) else { + return REPO_NOT_FOUND.into_response(); + }; + let Trusted(trusted) = match authorize_batch( + &state.authz_conf, + &repo, + public, + payload.operation, + &headers, + ) { + Ok(authn) => authn, + Err(e) => return e.into_response(), + }; + + if !headers + .get_all("Accept") + .iter() + .filter_map(|v| v.to_str().ok()) + .any(is_git_lfs_json_mimetype) + { + let message = format!("Expected `{LFS_MIME}` in list of acceptable response media types"); + return make_error_resp(http::StatusCode::NOT_ACCEPTABLE, &message).into_response(); + } + + if payload.hash_algo != HashAlgo::Sha256 { + let message = "Unsupported hashing algorithm specified"; + return make_error_resp(http::StatusCode::CONFLICT, message).into_response(); + } + if !payload.transfers.is_empty() && !payload.transfers.contains(&TransferAdapter::Basic) { + let message = "Unsupported transfer adapter specified (supported: basic)"; + return make_error_resp(http::StatusCode::CONFLICT, message).into_response(); + } + + let mut resp = BatchResponse { + transfer: TransferAdapter::Basic, + objects: vec![], + hash_algo: HashAlgo::Sha256, + }; + for obj in payload.objects { + match payload.operation { + Operation::Download => resp + .objects + .push(handle_download_object(&state, &repo, &obj, trusted).await), + Operation::Upload => { + if let Some(obj_resp) = handle_upload_object(&state, &repo, &obj).await { + resp.objects.push(obj_resp); + } + } + }; + } + GitLfsJson(Json(resp)).into_response() +} + +fn s3_validate_checksum(oid: Oid, obj: &HeadObjectOutput) -> bool { + if let Some(checksum) = obj.checksum_sha256() { + if let Ok(checksum) = BASE64_STANDARD.decode(checksum) { + if let Ok(checksum32b) = TryInto::<[u8; 32]>::try_into(checksum) { + return Oid::from(checksum32b) == oid; + } + } + } + true +} + +fn s3_validate_size(expected: i64, obj: &HeadObjectOutput) -> bool { + if let Some(length) = obj.content_length() { + return length == expected; + } + true +} + +fn repo_exists(name: &str) -> bool { + let Ok(metadata) = std::fs::metadata(name) else { + return false; + }; + metadata.is_dir() +} + +fn is_repo_public(name: &str) -> Option { + if !repo_exists(name) { + return None; + } + match std::fs::metadata(format!("{name}/git-daemon-export-ok")) { + Ok(metadata) if metadata.is_file() => Some(true), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Some(false), + _ => None, + } +} -- cgit v1.2.3