diff options
author | Rutger Broekhoff | 2024-07-11 19:11:51 +0200 |
---|---|---|
committer | Rutger Broekhoff | 2024-07-11 19:11:51 +0200 |
commit | 3e67a3486eed22522f4352503ef7067ca81a8050 (patch) | |
tree | 971f0a84c073731c944069bdefdb848099428bdf /gitolfs3-server/src/handler.rs | |
parent | 0822bdf658e03bdb5b31a90b8c64f3d0b6a6cff7 (diff) | |
download | gitolfs3-3e67a3486eed22522f4352503ef7067ca81a8050.tar.gz gitolfs3-3e67a3486eed22522f4352503ef7067ca81a8050.zip |
Split server code into multiple smaller modules
Diffstat (limited to 'gitolfs3-server/src/handler.rs')
-rw-r--r-- | gitolfs3-server/src/handler.rs | 455 |
1 files changed, 455 insertions, 0 deletions
diff --git a/gitolfs3-server/src/handler.rs b/gitolfs3-server/src/handler.rs new file mode 100644 index 0000000..6516291 --- /dev/null +++ b/gitolfs3-server/src/handler.rs | |||
@@ -0,0 +1,455 @@ | |||
1 | use std::{collections::HashMap, sync::Arc}; | ||
2 | |||
3 | use aws_sdk_s3::{error::SdkError, operation::head_object::HeadObjectOutput}; | ||
4 | use axum::{ | ||
5 | extract::{Path, State}, | ||
6 | http::{header, HeaderMap, StatusCode}, | ||
7 | response::{IntoResponse, Response}, | ||
8 | Json, | ||
9 | }; | ||
10 | use base64::{prelude::BASE64_STANDARD, Engine}; | ||
11 | use chrono::Utc; | ||
12 | use gitolfs3_common::{generate_tag, Claims, HexByte, Oid, Operation, SpecificClaims}; | ||
13 | use serde::{de, Deserialize}; | ||
14 | use tokio::sync::Mutex; | ||
15 | |||
16 | use crate::{ | ||
17 | api::{ | ||
18 | is_git_lfs_json_mimetype, make_error_resp, BatchRequest, BatchRequestObject, BatchResponse, | ||
19 | BatchResponseObject, BatchResponseObjectAction, BatchResponseObjectActions, GitLfsJson, | ||
20 | HashAlgo, RepositoryName, TransferAdapter, LFS_MIME, REPO_NOT_FOUND, | ||
21 | }, | ||
22 | authz::{authorize_batch, authorize_get, Trusted}, | ||
23 | config::AuthorizationConfig, | ||
24 | dlimit::DownloadLimiter, | ||
25 | }; | ||
26 | |||
27 | pub struct AppState { | ||
28 | pub s3_client: aws_sdk_s3::Client, | ||
29 | pub s3_bucket: String, | ||
30 | pub authz_conf: AuthorizationConfig, | ||
31 | // Should not end with a slash. | ||
32 | pub base_url: String, | ||
33 | pub dl_limiter: Arc<Mutex<DownloadLimiter>>, | ||
34 | } | ||
35 | |||
36 | fn validate_checksum(oid: Oid, obj: &HeadObjectOutput) -> bool { | ||
37 | if let Some(checksum) = obj.checksum_sha256() { | ||
38 | if let Ok(checksum) = BASE64_STANDARD.decode(checksum) { | ||
39 | if let Ok(checksum32b) = TryInto::<[u8; 32]>::try_into(checksum) { | ||
40 | return Oid::from(checksum32b) == oid; | ||
41 | } | ||
42 | } | ||
43 | } | ||
44 | true | ||
45 | } | ||
46 | |||
47 | fn validate_size(expected: i64, obj: &HeadObjectOutput) -> bool { | ||
48 | if let Some(length) = obj.content_length() { | ||
49 | return length == expected; | ||
50 | } | ||
51 | true | ||
52 | } | ||
53 | |||
54 | async fn handle_upload_object( | ||
55 | state: &AppState, | ||
56 | repo: &str, | ||
57 | obj: &BatchRequestObject, | ||
58 | ) -> Option<BatchResponseObject> { | ||
59 | let (oid0, oid1) = (HexByte(obj.oid[0]), HexByte(obj.oid[1])); | ||
60 | let full_path = format!("{repo}/lfs/objects/{}/{}/{}", oid0, oid1, obj.oid); | ||
61 | |||
62 | match state | ||
63 | .s3_client | ||
64 | .head_object() | ||
65 | .bucket(&state.s3_bucket) | ||
66 | .key(full_path.clone()) | ||
67 | .checksum_mode(aws_sdk_s3::types::ChecksumMode::Enabled) | ||
68 | .send() | ||
69 | .await | ||
70 | { | ||
71 | Ok(result) => { | ||
72 | if validate_size(obj.size, &result) && validate_checksum(obj.oid, &result) { | ||
73 | return None; | ||
74 | } | ||
75 | } | ||
76 | Err(SdkError::ServiceError(e)) if e.err().is_not_found() => {} | ||
77 | Err(e) => { | ||
78 | println!("Failed to HeadObject (repo {repo}, OID {}): {e}", obj.oid); | ||
79 | return Some(BatchResponseObject::error( | ||
80 | obj, | ||
81 | StatusCode::INTERNAL_SERVER_ERROR, | ||
82 | "Failed to query object information".to_string(), | ||
83 | )); | ||
84 | } | ||
85 | }; | ||
86 | |||
87 | let expires_in = std::time::Duration::from_secs(5 * 60); | ||
88 | let expires_at = Utc::now() + expires_in; | ||
89 | |||
90 | let Ok(config) = aws_sdk_s3::presigning::PresigningConfig::expires_in(expires_in) else { | ||
91 | return Some(BatchResponseObject::error( | ||
92 | obj, | ||
93 | StatusCode::INTERNAL_SERVER_ERROR, | ||
94 | "Failed to generate upload URL".to_string(), | ||
95 | )); | ||
96 | }; | ||
97 | let Ok(presigned) = state | ||
98 | .s3_client | ||
99 | .put_object() | ||
100 | .bucket(&state.s3_bucket) | ||
101 | .key(full_path) | ||
102 | .checksum_sha256(obj.oid.to_string()) | ||
103 | .content_length(obj.size) | ||
104 | .presigned(config) | ||
105 | .await | ||
106 | else { | ||
107 | return Some(BatchResponseObject::error( | ||
108 | obj, | ||
109 | StatusCode::INTERNAL_SERVER_ERROR, | ||
110 | "Failed to generate upload URL".to_string(), | ||
111 | )); | ||
112 | }; | ||
113 | Some(BatchResponseObject { | ||
114 | oid: obj.oid, | ||
115 | size: obj.size, | ||
116 | authenticated: Some(true), | ||
117 | actions: BatchResponseObjectActions { | ||
118 | upload: Some(BatchResponseObjectAction { | ||
119 | header: presigned | ||
120 | .headers() | ||
121 | .map(|(k, v)| (k.to_owned(), v.to_owned())) | ||
122 | .collect(), | ||
123 | expires_at, | ||
124 | href: presigned.uri().to_string(), | ||
125 | }), | ||
126 | ..Default::default() | ||
127 | }, | ||
128 | error: None, | ||
129 | }) | ||
130 | } | ||
131 | |||
132 | async fn handle_download_object( | ||
133 | state: &AppState, | ||
134 | repo: &str, | ||
135 | obj: &BatchRequestObject, | ||
136 | trusted: bool, | ||
137 | ) -> BatchResponseObject { | ||
138 | let (oid0, oid1) = (HexByte(obj.oid[0]), HexByte(obj.oid[1])); | ||
139 | let full_path = format!("{repo}/lfs/objects/{}/{}/{}", oid0, oid1, obj.oid); | ||
140 | |||
141 | let result = match state | ||
142 | .s3_client | ||
143 | .head_object() | ||
144 | .bucket(&state.s3_bucket) | ||
145 | .key(&full_path) | ||
146 | .checksum_mode(aws_sdk_s3::types::ChecksumMode::Enabled) | ||
147 | .send() | ||
148 | .await | ||
149 | { | ||
150 | Ok(result) => result, | ||
151 | Err(e) => { | ||
152 | println!("Failed to HeadObject (repo {repo}, OID {}): {e}", obj.oid); | ||
153 | return BatchResponseObject::error( | ||
154 | obj, | ||
155 | StatusCode::INTERNAL_SERVER_ERROR, | ||
156 | "Failed to query object information".to_string(), | ||
157 | ); | ||
158 | } | ||
159 | }; | ||
160 | |||
161 | // Scaleway actually doesn't provide SHA256 suport, but maybe in the future :) | ||
162 | if !validate_checksum(obj.oid, &result) { | ||
163 | return BatchResponseObject::error( | ||
164 | obj, | ||
165 | StatusCode::UNPROCESSABLE_ENTITY, | ||
166 | "Object corrupted".to_string(), | ||
167 | ); | ||
168 | } | ||
169 | if !validate_size(obj.size, &result) { | ||
170 | return BatchResponseObject::error( | ||
171 | obj, | ||
172 | StatusCode::UNPROCESSABLE_ENTITY, | ||
173 | "Incorrect size specified (or object corrupted)".to_string(), | ||
174 | ); | ||
175 | } | ||
176 | |||
177 | let expires_in = std::time::Duration::from_secs(5 * 60); | ||
178 | let expires_at = Utc::now() + expires_in; | ||
179 | |||
180 | if trusted { | ||
181 | let Ok(config) = aws_sdk_s3::presigning::PresigningConfig::expires_in(expires_in) else { | ||
182 | return BatchResponseObject::error( | ||
183 | obj, | ||
184 | StatusCode::INTERNAL_SERVER_ERROR, | ||
185 | "Failed to generate upload URL".to_string(), | ||
186 | ); | ||
187 | }; | ||
188 | let Ok(presigned) = state | ||
189 | .s3_client | ||
190 | .get_object() | ||
191 | .bucket(&state.s3_bucket) | ||
192 | .key(full_path) | ||
193 | .presigned(config) | ||
194 | .await | ||
195 | else { | ||
196 | return BatchResponseObject::error( | ||
197 | obj, | ||
198 | StatusCode::INTERNAL_SERVER_ERROR, | ||
199 | "Failed to generate upload URL".to_string(), | ||
200 | ); | ||
201 | }; | ||
202 | return BatchResponseObject { | ||
203 | oid: obj.oid, | ||
204 | size: obj.size, | ||
205 | authenticated: Some(true), | ||
206 | actions: BatchResponseObjectActions { | ||
207 | download: Some(BatchResponseObjectAction { | ||
208 | header: presigned | ||
209 | .headers() | ||
210 | .map(|(k, v)| (k.to_owned(), v.to_owned())) | ||
211 | .collect(), | ||
212 | expires_at, | ||
213 | href: presigned.uri().to_string(), | ||
214 | }), | ||
215 | ..Default::default() | ||
216 | }, | ||
217 | error: None, | ||
218 | }; | ||
219 | } | ||
220 | |||
221 | if let Some(content_length) = result.content_length() { | ||
222 | if content_length > 0 { | ||
223 | match state | ||
224 | .dl_limiter | ||
225 | .lock() | ||
226 | .await | ||
227 | .request(content_length as u64) | ||
228 | .await | ||
229 | { | ||
230 | Ok(true) => {} | ||
231 | Ok(false) => { | ||
232 | return BatchResponseObject::error( | ||
233 | obj, | ||
234 | StatusCode::SERVICE_UNAVAILABLE, | ||
235 | "Public LFS downloads temporarily unavailable".to_string(), | ||
236 | ); | ||
237 | } | ||
238 | Err(e) => { | ||
239 | println!("Failed to request {content_length} bytes from download limiter: {e}"); | ||
240 | return BatchResponseObject::error( | ||
241 | obj, | ||
242 | StatusCode::INTERNAL_SERVER_ERROR, | ||
243 | "Internal server error".to_string(), | ||
244 | ); | ||
245 | } | ||
246 | } | ||
247 | } | ||
248 | } | ||
249 | |||
250 | let Some(tag) = generate_tag( | ||
251 | Claims { | ||
252 | specific_claims: SpecificClaims::Download(obj.oid), | ||
253 | repo_path: repo, | ||
254 | expires_at, | ||
255 | }, | ||
256 | &state.authz_conf.key, | ||
257 | ) else { | ||
258 | return BatchResponseObject::error( | ||
259 | obj, | ||
260 | StatusCode::INTERNAL_SERVER_ERROR, | ||
261 | "Internal server error".to_string(), | ||
262 | ); | ||
263 | }; | ||
264 | |||
265 | let upload_path = format!( | ||
266 | "{repo}/info/lfs/objects/{}/{}/{}", | ||
267 | HexByte(obj.oid[0]), | ||
268 | HexByte(obj.oid[1]), | ||
269 | obj.oid, | ||
270 | ); | ||
271 | |||
272 | BatchResponseObject { | ||
273 | oid: obj.oid, | ||
274 | size: obj.size, | ||
275 | authenticated: Some(true), | ||
276 | actions: BatchResponseObjectActions { | ||
277 | download: Some(BatchResponseObjectAction { | ||
278 | header: { | ||
279 | let mut map = HashMap::new(); | ||
280 | map.insert( | ||
281 | "Authorization".to_string(), | ||
282 | format!("Gitolfs3-Hmac-Sha256 {tag} {}", expires_at.timestamp()), | ||
283 | ); | ||
284 | map | ||
285 | }, | ||
286 | expires_at, | ||
287 | href: format!("{}/{upload_path}", state.base_url), | ||
288 | }), | ||
289 | ..Default::default() | ||
290 | }, | ||
291 | error: None, | ||
292 | } | ||
293 | } | ||
294 | |||
295 | fn repo_exists(name: &str) -> bool { | ||
296 | let Ok(metadata) = std::fs::metadata(name) else { | ||
297 | return false; | ||
298 | }; | ||
299 | metadata.is_dir() | ||
300 | } | ||
301 | |||
302 | fn is_repo_public(name: &str) -> Option<bool> { | ||
303 | if !repo_exists(name) { | ||
304 | return None; | ||
305 | } | ||
306 | match std::fs::metadata(format!("{name}/git-daemon-export-ok")) { | ||
307 | Ok(metadata) if metadata.is_file() => Some(true), | ||
308 | Err(e) if e.kind() == std::io::ErrorKind::NotFound => Some(false), | ||
309 | _ => None, | ||
310 | } | ||
311 | } | ||
312 | |||
313 | pub async fn batch( | ||
314 | State(state): State<Arc<AppState>>, | ||
315 | headers: HeaderMap, | ||
316 | RepositoryName(repo): RepositoryName, | ||
317 | GitLfsJson(Json(payload)): GitLfsJson<BatchRequest>, | ||
318 | ) -> Response { | ||
319 | let Some(public) = is_repo_public(&repo) else { | ||
320 | return REPO_NOT_FOUND.into_response(); | ||
321 | }; | ||
322 | let Trusted(trusted) = match authorize_batch( | ||
323 | &state.authz_conf, | ||
324 | &repo, | ||
325 | public, | ||
326 | payload.operation, | ||
327 | &headers, | ||
328 | ) { | ||
329 | Ok(authn) => authn, | ||
330 | Err(e) => return e.into_response(), | ||
331 | }; | ||
332 | |||
333 | if !headers | ||
334 | .get_all("Accept") | ||
335 | .iter() | ||
336 | .filter_map(|v| v.to_str().ok()) | ||
337 | .any(is_git_lfs_json_mimetype) | ||
338 | { | ||
339 | let message = format!("Expected `{LFS_MIME}` in list of acceptable response media types"); | ||
340 | return make_error_resp(StatusCode::NOT_ACCEPTABLE, &message).into_response(); | ||
341 | } | ||
342 | |||
343 | if payload.hash_algo != HashAlgo::Sha256 { | ||
344 | let message = "Unsupported hashing algorithm specified"; | ||
345 | return make_error_resp(StatusCode::CONFLICT, message).into_response(); | ||
346 | } | ||
347 | if !payload.transfers.is_empty() && !payload.transfers.contains(&TransferAdapter::Basic) { | ||
348 | let message = "Unsupported transfer adapter specified (supported: basic)"; | ||
349 | return make_error_resp(StatusCode::CONFLICT, message).into_response(); | ||
350 | } | ||
351 | |||
352 | let mut resp = BatchResponse { | ||
353 | transfer: TransferAdapter::Basic, | ||
354 | objects: vec![], | ||
355 | hash_algo: HashAlgo::Sha256, | ||
356 | }; | ||
357 | for obj in payload.objects { | ||
358 | match payload.operation { | ||
359 | Operation::Download => resp | ||
360 | .objects | ||
361 | .push(handle_download_object(&state, &repo, &obj, trusted).await), | ||
362 | Operation::Upload => { | ||
363 | if let Some(obj_resp) = handle_upload_object(&state, &repo, &obj).await { | ||
364 | resp.objects.push(obj_resp); | ||
365 | } | ||
366 | } | ||
367 | }; | ||
368 | } | ||
369 | GitLfsJson(Json(resp)).into_response() | ||
370 | } | ||
371 | |||
372 | #[derive(Deserialize, Copy, Clone)] | ||
373 | #[serde(remote = "Self")] | ||
374 | pub struct FileParams { | ||
375 | oid0: HexByte, | ||
376 | oid1: HexByte, | ||
377 | oid: Oid, | ||
378 | } | ||
379 | |||
380 | impl<'de> Deserialize<'de> for FileParams { | ||
381 | fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> | ||
382 | where | ||
383 | D: serde::Deserializer<'de>, | ||
384 | { | ||
385 | let unchecked @ FileParams { | ||
386 | oid0: HexByte(oid0), | ||
387 | oid1: HexByte(oid1), | ||
388 | oid, | ||
389 | } = FileParams::deserialize(deserializer)?; | ||
390 | if oid0 != oid.as_bytes()[0] { | ||
391 | return Err(de::Error::custom( | ||
392 | "first OID path part does not match first byte of full OID", | ||
393 | )); | ||
394 | } | ||
395 | if oid1 != oid.as_bytes()[1] { | ||
396 | return Err(de::Error::custom( | ||
397 | "second OID path part does not match first byte of full OID", | ||
398 | )); | ||
399 | } | ||
400 | Ok(unchecked) | ||
401 | } | ||
402 | } | ||
403 | |||
404 | pub async fn obj_download( | ||
405 | State(state): State<Arc<AppState>>, | ||
406 | headers: HeaderMap, | ||
407 | RepositoryName(repo): RepositoryName, | ||
408 | Path(FileParams { oid0, oid1, oid }): Path<FileParams>, | ||
409 | ) -> Response { | ||
410 | if let Err(e) = authorize_get(&state.authz_conf, &repo, oid, &headers) { | ||
411 | return e.into_response(); | ||
412 | } | ||
413 | |||
414 | let full_path = format!("{repo}/lfs/objects/{}/{}/{}", oid0, oid1, oid); | ||
415 | let result = match state | ||
416 | .s3_client | ||
417 | .get_object() | ||
418 | .bucket(&state.s3_bucket) | ||
419 | .key(full_path) | ||
420 | .checksum_mode(aws_sdk_s3::types::ChecksumMode::Enabled) | ||
421 | .send() | ||
422 | .await | ||
423 | { | ||
424 | Ok(result) => result, | ||
425 | Err(e) => { | ||
426 | println!("Failed to GetObject (repo {repo}, OID {oid}): {e}"); | ||
427 | return ( | ||
428 | StatusCode::INTERNAL_SERVER_ERROR, | ||
429 | "Failed to query object information", | ||
430 | ) | ||
431 | .into_response(); | ||
432 | } | ||
433 | }; | ||
434 | |||
435 | let mut headers = header::HeaderMap::new(); | ||
436 | if let Some(content_type) = result.content_type { | ||
437 | let Ok(header_value) = content_type.try_into() else { | ||
438 | return ( | ||
439 | StatusCode::INTERNAL_SERVER_ERROR, | ||
440 | "Object has invalid content type", | ||
441 | ) | ||
442 | .into_response(); | ||
443 | }; | ||
444 | headers.insert(header::CONTENT_TYPE, header_value); | ||
445 | } | ||
446 | if let Some(content_length) = result.content_length { | ||
447 | headers.insert(header::CONTENT_LENGTH, content_length.into()); | ||
448 | } | ||
449 | |||
450 | let async_read = result.body.into_async_read(); | ||
451 | let stream = tokio_util::io::ReaderStream::new(async_read); | ||
452 | let body = axum::body::Body::from_stream(stream); | ||
453 | |||
454 | (headers, body).into_response() | ||
455 | } | ||