aboutsummaryrefslogtreecommitdiffstats
path: root/gitolfs3-server/src/handler.rs
diff options
context:
space:
mode:
authorLibravatar Rutger Broekhoff2024-07-11 19:11:51 +0200
committerLibravatar Rutger Broekhoff2024-07-11 19:11:51 +0200
commit3e67a3486eed22522f4352503ef7067ca81a8050 (patch)
tree971f0a84c073731c944069bdefdb848099428bdf /gitolfs3-server/src/handler.rs
parent0822bdf658e03bdb5b31a90b8c64f3d0b6a6cff7 (diff)
downloadgitolfs3-3e67a3486eed22522f4352503ef7067ca81a8050.tar.gz
gitolfs3-3e67a3486eed22522f4352503ef7067ca81a8050.zip
Split server code into multiple smaller modules
Diffstat (limited to 'gitolfs3-server/src/handler.rs')
-rw-r--r--gitolfs3-server/src/handler.rs455
1 files changed, 455 insertions, 0 deletions
diff --git a/gitolfs3-server/src/handler.rs b/gitolfs3-server/src/handler.rs
new file mode 100644
index 0000000..6516291
--- /dev/null
+++ b/gitolfs3-server/src/handler.rs
@@ -0,0 +1,455 @@
1use std::{collections::HashMap, sync::Arc};
2
3use aws_sdk_s3::{error::SdkError, operation::head_object::HeadObjectOutput};
4use axum::{
5 extract::{Path, State},
6 http::{header, HeaderMap, StatusCode},
7 response::{IntoResponse, Response},
8 Json,
9};
10use base64::{prelude::BASE64_STANDARD, Engine};
11use chrono::Utc;
12use gitolfs3_common::{generate_tag, Claims, HexByte, Oid, Operation, SpecificClaims};
13use serde::{de, Deserialize};
14use tokio::sync::Mutex;
15
16use crate::{
17 api::{
18 is_git_lfs_json_mimetype, make_error_resp, BatchRequest, BatchRequestObject, BatchResponse,
19 BatchResponseObject, BatchResponseObjectAction, BatchResponseObjectActions, GitLfsJson,
20 HashAlgo, RepositoryName, TransferAdapter, LFS_MIME, REPO_NOT_FOUND,
21 },
22 authz::{authorize_batch, authorize_get, Trusted},
23 config::AuthorizationConfig,
24 dlimit::DownloadLimiter,
25};
26
27pub struct AppState {
28 pub s3_client: aws_sdk_s3::Client,
29 pub s3_bucket: String,
30 pub authz_conf: AuthorizationConfig,
31 // Should not end with a slash.
32 pub base_url: String,
33 pub dl_limiter: Arc<Mutex<DownloadLimiter>>,
34}
35
36fn validate_checksum(oid: Oid, obj: &HeadObjectOutput) -> bool {
37 if let Some(checksum) = obj.checksum_sha256() {
38 if let Ok(checksum) = BASE64_STANDARD.decode(checksum) {
39 if let Ok(checksum32b) = TryInto::<[u8; 32]>::try_into(checksum) {
40 return Oid::from(checksum32b) == oid;
41 }
42 }
43 }
44 true
45}
46
47fn validate_size(expected: i64, obj: &HeadObjectOutput) -> bool {
48 if let Some(length) = obj.content_length() {
49 return length == expected;
50 }
51 true
52}
53
54async fn handle_upload_object(
55 state: &AppState,
56 repo: &str,
57 obj: &BatchRequestObject,
58) -> Option<BatchResponseObject> {
59 let (oid0, oid1) = (HexByte(obj.oid[0]), HexByte(obj.oid[1]));
60 let full_path = format!("{repo}/lfs/objects/{}/{}/{}", oid0, oid1, obj.oid);
61
62 match state
63 .s3_client
64 .head_object()
65 .bucket(&state.s3_bucket)
66 .key(full_path.clone())
67 .checksum_mode(aws_sdk_s3::types::ChecksumMode::Enabled)
68 .send()
69 .await
70 {
71 Ok(result) => {
72 if validate_size(obj.size, &result) && validate_checksum(obj.oid, &result) {
73 return None;
74 }
75 }
76 Err(SdkError::ServiceError(e)) if e.err().is_not_found() => {}
77 Err(e) => {
78 println!("Failed to HeadObject (repo {repo}, OID {}): {e}", obj.oid);
79 return Some(BatchResponseObject::error(
80 obj,
81 StatusCode::INTERNAL_SERVER_ERROR,
82 "Failed to query object information".to_string(),
83 ));
84 }
85 };
86
87 let expires_in = std::time::Duration::from_secs(5 * 60);
88 let expires_at = Utc::now() + expires_in;
89
90 let Ok(config) = aws_sdk_s3::presigning::PresigningConfig::expires_in(expires_in) else {
91 return Some(BatchResponseObject::error(
92 obj,
93 StatusCode::INTERNAL_SERVER_ERROR,
94 "Failed to generate upload URL".to_string(),
95 ));
96 };
97 let Ok(presigned) = state
98 .s3_client
99 .put_object()
100 .bucket(&state.s3_bucket)
101 .key(full_path)
102 .checksum_sha256(obj.oid.to_string())
103 .content_length(obj.size)
104 .presigned(config)
105 .await
106 else {
107 return Some(BatchResponseObject::error(
108 obj,
109 StatusCode::INTERNAL_SERVER_ERROR,
110 "Failed to generate upload URL".to_string(),
111 ));
112 };
113 Some(BatchResponseObject {
114 oid: obj.oid,
115 size: obj.size,
116 authenticated: Some(true),
117 actions: BatchResponseObjectActions {
118 upload: Some(BatchResponseObjectAction {
119 header: presigned
120 .headers()
121 .map(|(k, v)| (k.to_owned(), v.to_owned()))
122 .collect(),
123 expires_at,
124 href: presigned.uri().to_string(),
125 }),
126 ..Default::default()
127 },
128 error: None,
129 })
130}
131
132async fn handle_download_object(
133 state: &AppState,
134 repo: &str,
135 obj: &BatchRequestObject,
136 trusted: bool,
137) -> BatchResponseObject {
138 let (oid0, oid1) = (HexByte(obj.oid[0]), HexByte(obj.oid[1]));
139 let full_path = format!("{repo}/lfs/objects/{}/{}/{}", oid0, oid1, obj.oid);
140
141 let result = match state
142 .s3_client
143 .head_object()
144 .bucket(&state.s3_bucket)
145 .key(&full_path)
146 .checksum_mode(aws_sdk_s3::types::ChecksumMode::Enabled)
147 .send()
148 .await
149 {
150 Ok(result) => result,
151 Err(e) => {
152 println!("Failed to HeadObject (repo {repo}, OID {}): {e}", obj.oid);
153 return BatchResponseObject::error(
154 obj,
155 StatusCode::INTERNAL_SERVER_ERROR,
156 "Failed to query object information".to_string(),
157 );
158 }
159 };
160
161 // Scaleway actually doesn't provide SHA256 suport, but maybe in the future :)
162 if !validate_checksum(obj.oid, &result) {
163 return BatchResponseObject::error(
164 obj,
165 StatusCode::UNPROCESSABLE_ENTITY,
166 "Object corrupted".to_string(),
167 );
168 }
169 if !validate_size(obj.size, &result) {
170 return BatchResponseObject::error(
171 obj,
172 StatusCode::UNPROCESSABLE_ENTITY,
173 "Incorrect size specified (or object corrupted)".to_string(),
174 );
175 }
176
177 let expires_in = std::time::Duration::from_secs(5 * 60);
178 let expires_at = Utc::now() + expires_in;
179
180 if trusted {
181 let Ok(config) = aws_sdk_s3::presigning::PresigningConfig::expires_in(expires_in) else {
182 return BatchResponseObject::error(
183 obj,
184 StatusCode::INTERNAL_SERVER_ERROR,
185 "Failed to generate upload URL".to_string(),
186 );
187 };
188 let Ok(presigned) = state
189 .s3_client
190 .get_object()
191 .bucket(&state.s3_bucket)
192 .key(full_path)
193 .presigned(config)
194 .await
195 else {
196 return BatchResponseObject::error(
197 obj,
198 StatusCode::INTERNAL_SERVER_ERROR,
199 "Failed to generate upload URL".to_string(),
200 );
201 };
202 return BatchResponseObject {
203 oid: obj.oid,
204 size: obj.size,
205 authenticated: Some(true),
206 actions: BatchResponseObjectActions {
207 download: Some(BatchResponseObjectAction {
208 header: presigned
209 .headers()
210 .map(|(k, v)| (k.to_owned(), v.to_owned()))
211 .collect(),
212 expires_at,
213 href: presigned.uri().to_string(),
214 }),
215 ..Default::default()
216 },
217 error: None,
218 };
219 }
220
221 if let Some(content_length) = result.content_length() {
222 if content_length > 0 {
223 match state
224 .dl_limiter
225 .lock()
226 .await
227 .request(content_length as u64)
228 .await
229 {
230 Ok(true) => {}
231 Ok(false) => {
232 return BatchResponseObject::error(
233 obj,
234 StatusCode::SERVICE_UNAVAILABLE,
235 "Public LFS downloads temporarily unavailable".to_string(),
236 );
237 }
238 Err(e) => {
239 println!("Failed to request {content_length} bytes from download limiter: {e}");
240 return BatchResponseObject::error(
241 obj,
242 StatusCode::INTERNAL_SERVER_ERROR,
243 "Internal server error".to_string(),
244 );
245 }
246 }
247 }
248 }
249
250 let Some(tag) = generate_tag(
251 Claims {
252 specific_claims: SpecificClaims::Download(obj.oid),
253 repo_path: repo,
254 expires_at,
255 },
256 &state.authz_conf.key,
257 ) else {
258 return BatchResponseObject::error(
259 obj,
260 StatusCode::INTERNAL_SERVER_ERROR,
261 "Internal server error".to_string(),
262 );
263 };
264
265 let upload_path = format!(
266 "{repo}/info/lfs/objects/{}/{}/{}",
267 HexByte(obj.oid[0]),
268 HexByte(obj.oid[1]),
269 obj.oid,
270 );
271
272 BatchResponseObject {
273 oid: obj.oid,
274 size: obj.size,
275 authenticated: Some(true),
276 actions: BatchResponseObjectActions {
277 download: Some(BatchResponseObjectAction {
278 header: {
279 let mut map = HashMap::new();
280 map.insert(
281 "Authorization".to_string(),
282 format!("Gitolfs3-Hmac-Sha256 {tag} {}", expires_at.timestamp()),
283 );
284 map
285 },
286 expires_at,
287 href: format!("{}/{upload_path}", state.base_url),
288 }),
289 ..Default::default()
290 },
291 error: None,
292 }
293}
294
295fn repo_exists(name: &str) -> bool {
296 let Ok(metadata) = std::fs::metadata(name) else {
297 return false;
298 };
299 metadata.is_dir()
300}
301
302fn is_repo_public(name: &str) -> Option<bool> {
303 if !repo_exists(name) {
304 return None;
305 }
306 match std::fs::metadata(format!("{name}/git-daemon-export-ok")) {
307 Ok(metadata) if metadata.is_file() => Some(true),
308 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Some(false),
309 _ => None,
310 }
311}
312
313pub async fn batch(
314 State(state): State<Arc<AppState>>,
315 headers: HeaderMap,
316 RepositoryName(repo): RepositoryName,
317 GitLfsJson(Json(payload)): GitLfsJson<BatchRequest>,
318) -> Response {
319 let Some(public) = is_repo_public(&repo) else {
320 return REPO_NOT_FOUND.into_response();
321 };
322 let Trusted(trusted) = match authorize_batch(
323 &state.authz_conf,
324 &repo,
325 public,
326 payload.operation,
327 &headers,
328 ) {
329 Ok(authn) => authn,
330 Err(e) => return e.into_response(),
331 };
332
333 if !headers
334 .get_all("Accept")
335 .iter()
336 .filter_map(|v| v.to_str().ok())
337 .any(is_git_lfs_json_mimetype)
338 {
339 let message = format!("Expected `{LFS_MIME}` in list of acceptable response media types");
340 return make_error_resp(StatusCode::NOT_ACCEPTABLE, &message).into_response();
341 }
342
343 if payload.hash_algo != HashAlgo::Sha256 {
344 let message = "Unsupported hashing algorithm specified";
345 return make_error_resp(StatusCode::CONFLICT, message).into_response();
346 }
347 if !payload.transfers.is_empty() && !payload.transfers.contains(&TransferAdapter::Basic) {
348 let message = "Unsupported transfer adapter specified (supported: basic)";
349 return make_error_resp(StatusCode::CONFLICT, message).into_response();
350 }
351
352 let mut resp = BatchResponse {
353 transfer: TransferAdapter::Basic,
354 objects: vec![],
355 hash_algo: HashAlgo::Sha256,
356 };
357 for obj in payload.objects {
358 match payload.operation {
359 Operation::Download => resp
360 .objects
361 .push(handle_download_object(&state, &repo, &obj, trusted).await),
362 Operation::Upload => {
363 if let Some(obj_resp) = handle_upload_object(&state, &repo, &obj).await {
364 resp.objects.push(obj_resp);
365 }
366 }
367 };
368 }
369 GitLfsJson(Json(resp)).into_response()
370}
371
372#[derive(Deserialize, Copy, Clone)]
373#[serde(remote = "Self")]
374pub struct FileParams {
375 oid0: HexByte,
376 oid1: HexByte,
377 oid: Oid,
378}
379
380impl<'de> Deserialize<'de> for FileParams {
381 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
382 where
383 D: serde::Deserializer<'de>,
384 {
385 let unchecked @ FileParams {
386 oid0: HexByte(oid0),
387 oid1: HexByte(oid1),
388 oid,
389 } = FileParams::deserialize(deserializer)?;
390 if oid0 != oid.as_bytes()[0] {
391 return Err(de::Error::custom(
392 "first OID path part does not match first byte of full OID",
393 ));
394 }
395 if oid1 != oid.as_bytes()[1] {
396 return Err(de::Error::custom(
397 "second OID path part does not match first byte of full OID",
398 ));
399 }
400 Ok(unchecked)
401 }
402}
403
404pub async fn obj_download(
405 State(state): State<Arc<AppState>>,
406 headers: HeaderMap,
407 RepositoryName(repo): RepositoryName,
408 Path(FileParams { oid0, oid1, oid }): Path<FileParams>,
409) -> Response {
410 if let Err(e) = authorize_get(&state.authz_conf, &repo, oid, &headers) {
411 return e.into_response();
412 }
413
414 let full_path = format!("{repo}/lfs/objects/{}/{}/{}", oid0, oid1, oid);
415 let result = match state
416 .s3_client
417 .get_object()
418 .bucket(&state.s3_bucket)
419 .key(full_path)
420 .checksum_mode(aws_sdk_s3::types::ChecksumMode::Enabled)
421 .send()
422 .await
423 {
424 Ok(result) => result,
425 Err(e) => {
426 println!("Failed to GetObject (repo {repo}, OID {oid}): {e}");
427 return (
428 StatusCode::INTERNAL_SERVER_ERROR,
429 "Failed to query object information",
430 )
431 .into_response();
432 }
433 };
434
435 let mut headers = header::HeaderMap::new();
436 if let Some(content_type) = result.content_type {
437 let Ok(header_value) = content_type.try_into() else {
438 return (
439 StatusCode::INTERNAL_SERVER_ERROR,
440 "Object has invalid content type",
441 )
442 .into_response();
443 };
444 headers.insert(header::CONTENT_TYPE, header_value);
445 }
446 if let Some(content_length) = result.content_length {
447 headers.insert(header::CONTENT_LENGTH, content_length.into());
448 }
449
450 let async_read = result.body.into_async_read();
451 let stream = tokio_util::io::ReaderStream::new(async_read);
452 let body = axum::body::Body::from_stream(stream);
453
454 (headers, body).into_response()
455}