diff options
| author | Rutger Broekhoff | 2024-07-11 19:11:51 +0200 |
|---|---|---|
| committer | Rutger Broekhoff | 2024-07-11 19:11:51 +0200 |
| commit | 3e67a3486eed22522f4352503ef7067ca81a8050 (patch) | |
| tree | 971f0a84c073731c944069bdefdb848099428bdf /gitolfs3-server/src/handler.rs | |
| parent | 0822bdf658e03bdb5b31a90b8c64f3d0b6a6cff7 (diff) | |
| download | gitolfs3-3e67a3486eed22522f4352503ef7067ca81a8050.tar.gz gitolfs3-3e67a3486eed22522f4352503ef7067ca81a8050.zip | |
Split server code into multiple smaller modules
Diffstat (limited to 'gitolfs3-server/src/handler.rs')
| -rw-r--r-- | gitolfs3-server/src/handler.rs | 455 |
1 files changed, 455 insertions, 0 deletions
diff --git a/gitolfs3-server/src/handler.rs b/gitolfs3-server/src/handler.rs new file mode 100644 index 0000000..6516291 --- /dev/null +++ b/gitolfs3-server/src/handler.rs | |||
| @@ -0,0 +1,455 @@ | |||
| 1 | use std::{collections::HashMap, sync::Arc}; | ||
| 2 | |||
| 3 | use aws_sdk_s3::{error::SdkError, operation::head_object::HeadObjectOutput}; | ||
| 4 | use axum::{ | ||
| 5 | extract::{Path, State}, | ||
| 6 | http::{header, HeaderMap, StatusCode}, | ||
| 7 | response::{IntoResponse, Response}, | ||
| 8 | Json, | ||
| 9 | }; | ||
| 10 | use base64::{prelude::BASE64_STANDARD, Engine}; | ||
| 11 | use chrono::Utc; | ||
| 12 | use gitolfs3_common::{generate_tag, Claims, HexByte, Oid, Operation, SpecificClaims}; | ||
| 13 | use serde::{de, Deserialize}; | ||
| 14 | use tokio::sync::Mutex; | ||
| 15 | |||
| 16 | use crate::{ | ||
| 17 | api::{ | ||
| 18 | is_git_lfs_json_mimetype, make_error_resp, BatchRequest, BatchRequestObject, BatchResponse, | ||
| 19 | BatchResponseObject, BatchResponseObjectAction, BatchResponseObjectActions, GitLfsJson, | ||
| 20 | HashAlgo, RepositoryName, TransferAdapter, LFS_MIME, REPO_NOT_FOUND, | ||
| 21 | }, | ||
| 22 | authz::{authorize_batch, authorize_get, Trusted}, | ||
| 23 | config::AuthorizationConfig, | ||
| 24 | dlimit::DownloadLimiter, | ||
| 25 | }; | ||
| 26 | |||
| 27 | pub struct AppState { | ||
| 28 | pub s3_client: aws_sdk_s3::Client, | ||
| 29 | pub s3_bucket: String, | ||
| 30 | pub authz_conf: AuthorizationConfig, | ||
| 31 | // Should not end with a slash. | ||
| 32 | pub base_url: String, | ||
| 33 | pub dl_limiter: Arc<Mutex<DownloadLimiter>>, | ||
| 34 | } | ||
| 35 | |||
| 36 | fn validate_checksum(oid: Oid, obj: &HeadObjectOutput) -> bool { | ||
| 37 | if let Some(checksum) = obj.checksum_sha256() { | ||
| 38 | if let Ok(checksum) = BASE64_STANDARD.decode(checksum) { | ||
| 39 | if let Ok(checksum32b) = TryInto::<[u8; 32]>::try_into(checksum) { | ||
| 40 | return Oid::from(checksum32b) == oid; | ||
| 41 | } | ||
| 42 | } | ||
| 43 | } | ||
| 44 | true | ||
| 45 | } | ||
| 46 | |||
| 47 | fn validate_size(expected: i64, obj: &HeadObjectOutput) -> bool { | ||
| 48 | if let Some(length) = obj.content_length() { | ||
| 49 | return length == expected; | ||
| 50 | } | ||
| 51 | true | ||
| 52 | } | ||
| 53 | |||
| 54 | async fn handle_upload_object( | ||
| 55 | state: &AppState, | ||
| 56 | repo: &str, | ||
| 57 | obj: &BatchRequestObject, | ||
| 58 | ) -> Option<BatchResponseObject> { | ||
| 59 | let (oid0, oid1) = (HexByte(obj.oid[0]), HexByte(obj.oid[1])); | ||
| 60 | let full_path = format!("{repo}/lfs/objects/{}/{}/{}", oid0, oid1, obj.oid); | ||
| 61 | |||
| 62 | match state | ||
| 63 | .s3_client | ||
| 64 | .head_object() | ||
| 65 | .bucket(&state.s3_bucket) | ||
| 66 | .key(full_path.clone()) | ||
| 67 | .checksum_mode(aws_sdk_s3::types::ChecksumMode::Enabled) | ||
| 68 | .send() | ||
| 69 | .await | ||
| 70 | { | ||
| 71 | Ok(result) => { | ||
| 72 | if validate_size(obj.size, &result) && validate_checksum(obj.oid, &result) { | ||
| 73 | return None; | ||
| 74 | } | ||
| 75 | } | ||
| 76 | Err(SdkError::ServiceError(e)) if e.err().is_not_found() => {} | ||
| 77 | Err(e) => { | ||
| 78 | println!("Failed to HeadObject (repo {repo}, OID {}): {e}", obj.oid); | ||
| 79 | return Some(BatchResponseObject::error( | ||
| 80 | obj, | ||
| 81 | StatusCode::INTERNAL_SERVER_ERROR, | ||
| 82 | "Failed to query object information".to_string(), | ||
| 83 | )); | ||
| 84 | } | ||
| 85 | }; | ||
| 86 | |||
| 87 | let expires_in = std::time::Duration::from_secs(5 * 60); | ||
| 88 | let expires_at = Utc::now() + expires_in; | ||
| 89 | |||
| 90 | let Ok(config) = aws_sdk_s3::presigning::PresigningConfig::expires_in(expires_in) else { | ||
| 91 | return Some(BatchResponseObject::error( | ||
| 92 | obj, | ||
| 93 | StatusCode::INTERNAL_SERVER_ERROR, | ||
| 94 | "Failed to generate upload URL".to_string(), | ||
| 95 | )); | ||
| 96 | }; | ||
| 97 | let Ok(presigned) = state | ||
| 98 | .s3_client | ||
| 99 | .put_object() | ||
| 100 | .bucket(&state.s3_bucket) | ||
| 101 | .key(full_path) | ||
| 102 | .checksum_sha256(obj.oid.to_string()) | ||
| 103 | .content_length(obj.size) | ||
| 104 | .presigned(config) | ||
| 105 | .await | ||
| 106 | else { | ||
| 107 | return Some(BatchResponseObject::error( | ||
| 108 | obj, | ||
| 109 | StatusCode::INTERNAL_SERVER_ERROR, | ||
| 110 | "Failed to generate upload URL".to_string(), | ||
| 111 | )); | ||
| 112 | }; | ||
| 113 | Some(BatchResponseObject { | ||
| 114 | oid: obj.oid, | ||
| 115 | size: obj.size, | ||
| 116 | authenticated: Some(true), | ||
| 117 | actions: BatchResponseObjectActions { | ||
| 118 | upload: Some(BatchResponseObjectAction { | ||
| 119 | header: presigned | ||
| 120 | .headers() | ||
| 121 | .map(|(k, v)| (k.to_owned(), v.to_owned())) | ||
| 122 | .collect(), | ||
| 123 | expires_at, | ||
| 124 | href: presigned.uri().to_string(), | ||
| 125 | }), | ||
| 126 | ..Default::default() | ||
| 127 | }, | ||
| 128 | error: None, | ||
| 129 | }) | ||
| 130 | } | ||
| 131 | |||
| 132 | async fn handle_download_object( | ||
| 133 | state: &AppState, | ||
| 134 | repo: &str, | ||
| 135 | obj: &BatchRequestObject, | ||
| 136 | trusted: bool, | ||
| 137 | ) -> BatchResponseObject { | ||
| 138 | let (oid0, oid1) = (HexByte(obj.oid[0]), HexByte(obj.oid[1])); | ||
| 139 | let full_path = format!("{repo}/lfs/objects/{}/{}/{}", oid0, oid1, obj.oid); | ||
| 140 | |||
| 141 | let result = match state | ||
| 142 | .s3_client | ||
| 143 | .head_object() | ||
| 144 | .bucket(&state.s3_bucket) | ||
| 145 | .key(&full_path) | ||
| 146 | .checksum_mode(aws_sdk_s3::types::ChecksumMode::Enabled) | ||
| 147 | .send() | ||
| 148 | .await | ||
| 149 | { | ||
| 150 | Ok(result) => result, | ||
| 151 | Err(e) => { | ||
| 152 | println!("Failed to HeadObject (repo {repo}, OID {}): {e}", obj.oid); | ||
| 153 | return BatchResponseObject::error( | ||
| 154 | obj, | ||
| 155 | StatusCode::INTERNAL_SERVER_ERROR, | ||
| 156 | "Failed to query object information".to_string(), | ||
| 157 | ); | ||
| 158 | } | ||
| 159 | }; | ||
| 160 | |||
| 161 | // Scaleway actually doesn't provide SHA256 suport, but maybe in the future :) | ||
| 162 | if !validate_checksum(obj.oid, &result) { | ||
| 163 | return BatchResponseObject::error( | ||
| 164 | obj, | ||
| 165 | StatusCode::UNPROCESSABLE_ENTITY, | ||
| 166 | "Object corrupted".to_string(), | ||
| 167 | ); | ||
| 168 | } | ||
| 169 | if !validate_size(obj.size, &result) { | ||
| 170 | return BatchResponseObject::error( | ||
| 171 | obj, | ||
| 172 | StatusCode::UNPROCESSABLE_ENTITY, | ||
| 173 | "Incorrect size specified (or object corrupted)".to_string(), | ||
| 174 | ); | ||
| 175 | } | ||
| 176 | |||
| 177 | let expires_in = std::time::Duration::from_secs(5 * 60); | ||
| 178 | let expires_at = Utc::now() + expires_in; | ||
| 179 | |||
| 180 | if trusted { | ||
| 181 | let Ok(config) = aws_sdk_s3::presigning::PresigningConfig::expires_in(expires_in) else { | ||
| 182 | return BatchResponseObject::error( | ||
| 183 | obj, | ||
| 184 | StatusCode::INTERNAL_SERVER_ERROR, | ||
| 185 | "Failed to generate upload URL".to_string(), | ||
| 186 | ); | ||
| 187 | }; | ||
| 188 | let Ok(presigned) = state | ||
| 189 | .s3_client | ||
| 190 | .get_object() | ||
| 191 | .bucket(&state.s3_bucket) | ||
| 192 | .key(full_path) | ||
| 193 | .presigned(config) | ||
| 194 | .await | ||
| 195 | else { | ||
| 196 | return BatchResponseObject::error( | ||
| 197 | obj, | ||
| 198 | StatusCode::INTERNAL_SERVER_ERROR, | ||
| 199 | "Failed to generate upload URL".to_string(), | ||
| 200 | ); | ||
| 201 | }; | ||
| 202 | return BatchResponseObject { | ||
| 203 | oid: obj.oid, | ||
| 204 | size: obj.size, | ||
| 205 | authenticated: Some(true), | ||
| 206 | actions: BatchResponseObjectActions { | ||
| 207 | download: Some(BatchResponseObjectAction { | ||
| 208 | header: presigned | ||
| 209 | .headers() | ||
| 210 | .map(|(k, v)| (k.to_owned(), v.to_owned())) | ||
| 211 | .collect(), | ||
| 212 | expires_at, | ||
| 213 | href: presigned.uri().to_string(), | ||
| 214 | }), | ||
| 215 | ..Default::default() | ||
| 216 | }, | ||
| 217 | error: None, | ||
| 218 | }; | ||
| 219 | } | ||
| 220 | |||
| 221 | if let Some(content_length) = result.content_length() { | ||
| 222 | if content_length > 0 { | ||
| 223 | match state | ||
| 224 | .dl_limiter | ||
| 225 | .lock() | ||
| 226 | .await | ||
| 227 | .request(content_length as u64) | ||
| 228 | .await | ||
| 229 | { | ||
| 230 | Ok(true) => {} | ||
| 231 | Ok(false) => { | ||
| 232 | return BatchResponseObject::error( | ||
| 233 | obj, | ||
| 234 | StatusCode::SERVICE_UNAVAILABLE, | ||
| 235 | "Public LFS downloads temporarily unavailable".to_string(), | ||
| 236 | ); | ||
| 237 | } | ||
| 238 | Err(e) => { | ||
| 239 | println!("Failed to request {content_length} bytes from download limiter: {e}"); | ||
| 240 | return BatchResponseObject::error( | ||
| 241 | obj, | ||
| 242 | StatusCode::INTERNAL_SERVER_ERROR, | ||
| 243 | "Internal server error".to_string(), | ||
| 244 | ); | ||
| 245 | } | ||
| 246 | } | ||
| 247 | } | ||
| 248 | } | ||
| 249 | |||
| 250 | let Some(tag) = generate_tag( | ||
| 251 | Claims { | ||
| 252 | specific_claims: SpecificClaims::Download(obj.oid), | ||
| 253 | repo_path: repo, | ||
| 254 | expires_at, | ||
| 255 | }, | ||
| 256 | &state.authz_conf.key, | ||
| 257 | ) else { | ||
| 258 | return BatchResponseObject::error( | ||
| 259 | obj, | ||
| 260 | StatusCode::INTERNAL_SERVER_ERROR, | ||
| 261 | "Internal server error".to_string(), | ||
| 262 | ); | ||
| 263 | }; | ||
| 264 | |||
| 265 | let upload_path = format!( | ||
| 266 | "{repo}/info/lfs/objects/{}/{}/{}", | ||
| 267 | HexByte(obj.oid[0]), | ||
| 268 | HexByte(obj.oid[1]), | ||
| 269 | obj.oid, | ||
| 270 | ); | ||
| 271 | |||
| 272 | BatchResponseObject { | ||
| 273 | oid: obj.oid, | ||
| 274 | size: obj.size, | ||
| 275 | authenticated: Some(true), | ||
| 276 | actions: BatchResponseObjectActions { | ||
| 277 | download: Some(BatchResponseObjectAction { | ||
| 278 | header: { | ||
| 279 | let mut map = HashMap::new(); | ||
| 280 | map.insert( | ||
| 281 | "Authorization".to_string(), | ||
| 282 | format!("Gitolfs3-Hmac-Sha256 {tag} {}", expires_at.timestamp()), | ||
| 283 | ); | ||
| 284 | map | ||
| 285 | }, | ||
| 286 | expires_at, | ||
| 287 | href: format!("{}/{upload_path}", state.base_url), | ||
| 288 | }), | ||
| 289 | ..Default::default() | ||
| 290 | }, | ||
| 291 | error: None, | ||
| 292 | } | ||
| 293 | } | ||
| 294 | |||
| 295 | fn repo_exists(name: &str) -> bool { | ||
| 296 | let Ok(metadata) = std::fs::metadata(name) else { | ||
| 297 | return false; | ||
| 298 | }; | ||
| 299 | metadata.is_dir() | ||
| 300 | } | ||
| 301 | |||
| 302 | fn is_repo_public(name: &str) -> Option<bool> { | ||
| 303 | if !repo_exists(name) { | ||
| 304 | return None; | ||
| 305 | } | ||
| 306 | match std::fs::metadata(format!("{name}/git-daemon-export-ok")) { | ||
| 307 | Ok(metadata) if metadata.is_file() => Some(true), | ||
| 308 | Err(e) if e.kind() == std::io::ErrorKind::NotFound => Some(false), | ||
| 309 | _ => None, | ||
| 310 | } | ||
| 311 | } | ||
| 312 | |||
| 313 | pub async fn batch( | ||
| 314 | State(state): State<Arc<AppState>>, | ||
| 315 | headers: HeaderMap, | ||
| 316 | RepositoryName(repo): RepositoryName, | ||
| 317 | GitLfsJson(Json(payload)): GitLfsJson<BatchRequest>, | ||
| 318 | ) -> Response { | ||
| 319 | let Some(public) = is_repo_public(&repo) else { | ||
| 320 | return REPO_NOT_FOUND.into_response(); | ||
| 321 | }; | ||
| 322 | let Trusted(trusted) = match authorize_batch( | ||
| 323 | &state.authz_conf, | ||
| 324 | &repo, | ||
| 325 | public, | ||
| 326 | payload.operation, | ||
| 327 | &headers, | ||
| 328 | ) { | ||
| 329 | Ok(authn) => authn, | ||
| 330 | Err(e) => return e.into_response(), | ||
| 331 | }; | ||
| 332 | |||
| 333 | if !headers | ||
| 334 | .get_all("Accept") | ||
| 335 | .iter() | ||
| 336 | .filter_map(|v| v.to_str().ok()) | ||
| 337 | .any(is_git_lfs_json_mimetype) | ||
| 338 | { | ||
| 339 | let message = format!("Expected `{LFS_MIME}` in list of acceptable response media types"); | ||
| 340 | return make_error_resp(StatusCode::NOT_ACCEPTABLE, &message).into_response(); | ||
| 341 | } | ||
| 342 | |||
| 343 | if payload.hash_algo != HashAlgo::Sha256 { | ||
| 344 | let message = "Unsupported hashing algorithm specified"; | ||
| 345 | return make_error_resp(StatusCode::CONFLICT, message).into_response(); | ||
| 346 | } | ||
| 347 | if !payload.transfers.is_empty() && !payload.transfers.contains(&TransferAdapter::Basic) { | ||
| 348 | let message = "Unsupported transfer adapter specified (supported: basic)"; | ||
| 349 | return make_error_resp(StatusCode::CONFLICT, message).into_response(); | ||
| 350 | } | ||
| 351 | |||
| 352 | let mut resp = BatchResponse { | ||
| 353 | transfer: TransferAdapter::Basic, | ||
| 354 | objects: vec![], | ||
| 355 | hash_algo: HashAlgo::Sha256, | ||
| 356 | }; | ||
| 357 | for obj in payload.objects { | ||
| 358 | match payload.operation { | ||
| 359 | Operation::Download => resp | ||
| 360 | .objects | ||
| 361 | .push(handle_download_object(&state, &repo, &obj, trusted).await), | ||
| 362 | Operation::Upload => { | ||
| 363 | if let Some(obj_resp) = handle_upload_object(&state, &repo, &obj).await { | ||
| 364 | resp.objects.push(obj_resp); | ||
| 365 | } | ||
| 366 | } | ||
| 367 | }; | ||
| 368 | } | ||
| 369 | GitLfsJson(Json(resp)).into_response() | ||
| 370 | } | ||
| 371 | |||
| 372 | #[derive(Deserialize, Copy, Clone)] | ||
| 373 | #[serde(remote = "Self")] | ||
| 374 | pub struct FileParams { | ||
| 375 | oid0: HexByte, | ||
| 376 | oid1: HexByte, | ||
| 377 | oid: Oid, | ||
| 378 | } | ||
| 379 | |||
| 380 | impl<'de> Deserialize<'de> for FileParams { | ||
| 381 | fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> | ||
| 382 | where | ||
| 383 | D: serde::Deserializer<'de>, | ||
| 384 | { | ||
| 385 | let unchecked @ FileParams { | ||
| 386 | oid0: HexByte(oid0), | ||
| 387 | oid1: HexByte(oid1), | ||
| 388 | oid, | ||
| 389 | } = FileParams::deserialize(deserializer)?; | ||
| 390 | if oid0 != oid.as_bytes()[0] { | ||
| 391 | return Err(de::Error::custom( | ||
| 392 | "first OID path part does not match first byte of full OID", | ||
| 393 | )); | ||
| 394 | } | ||
| 395 | if oid1 != oid.as_bytes()[1] { | ||
| 396 | return Err(de::Error::custom( | ||
| 397 | "second OID path part does not match first byte of full OID", | ||
| 398 | )); | ||
| 399 | } | ||
| 400 | Ok(unchecked) | ||
| 401 | } | ||
| 402 | } | ||
| 403 | |||
| 404 | pub async fn obj_download( | ||
| 405 | State(state): State<Arc<AppState>>, | ||
| 406 | headers: HeaderMap, | ||
| 407 | RepositoryName(repo): RepositoryName, | ||
| 408 | Path(FileParams { oid0, oid1, oid }): Path<FileParams>, | ||
| 409 | ) -> Response { | ||
| 410 | if let Err(e) = authorize_get(&state.authz_conf, &repo, oid, &headers) { | ||
| 411 | return e.into_response(); | ||
| 412 | } | ||
| 413 | |||
| 414 | let full_path = format!("{repo}/lfs/objects/{}/{}/{}", oid0, oid1, oid); | ||
| 415 | let result = match state | ||
| 416 | .s3_client | ||
| 417 | .get_object() | ||
| 418 | .bucket(&state.s3_bucket) | ||
| 419 | .key(full_path) | ||
| 420 | .checksum_mode(aws_sdk_s3::types::ChecksumMode::Enabled) | ||
| 421 | .send() | ||
| 422 | .await | ||
| 423 | { | ||
| 424 | Ok(result) => result, | ||
| 425 | Err(e) => { | ||
| 426 | println!("Failed to GetObject (repo {repo}, OID {oid}): {e}"); | ||
| 427 | return ( | ||
| 428 | StatusCode::INTERNAL_SERVER_ERROR, | ||
| 429 | "Failed to query object information", | ||
| 430 | ) | ||
| 431 | .into_response(); | ||
| 432 | } | ||
| 433 | }; | ||
| 434 | |||
| 435 | let mut headers = header::HeaderMap::new(); | ||
| 436 | if let Some(content_type) = result.content_type { | ||
| 437 | let Ok(header_value) = content_type.try_into() else { | ||
| 438 | return ( | ||
| 439 | StatusCode::INTERNAL_SERVER_ERROR, | ||
| 440 | "Object has invalid content type", | ||
| 441 | ) | ||
| 442 | .into_response(); | ||
| 443 | }; | ||
| 444 | headers.insert(header::CONTENT_TYPE, header_value); | ||
| 445 | } | ||
| 446 | if let Some(content_length) = result.content_length { | ||
| 447 | headers.insert(header::CONTENT_LENGTH, content_length.into()); | ||
| 448 | } | ||
| 449 | |||
| 450 | let async_read = result.body.into_async_read(); | ||
| 451 | let stream = tokio_util::io::ReaderStream::new(async_read); | ||
| 452 | let body = axum::body::Body::from_stream(stream); | ||
| 453 | |||
| 454 | (headers, body).into_response() | ||
| 455 | } | ||