Skip to content

Commit

Permalink
Merge pull request #77 from DeterminateSystems/429
Browse files Browse the repository at this point in the history
Back off on 429
  • Loading branch information
grahamc authored Jun 13, 2024
2 parents f9076a8 + 51bb9f9 commit ef5c9ec
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 7 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/check-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,9 @@ jobs:
- name: Run nix to test magic-nix-cache-action
run: |
nix develop --command echo "just testing"
- name: Exhaust our GitHub Actions Cache tokens
# Generally skip this step since it is so intensive
if: ${{ false }}
run: |
date >> README.md
nix build .#veryLongChain -v
37 changes: 36 additions & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,36 @@
magic-nix-cache = pkgs.callPackage ./package.nix { };
#inherit (cranePkgs) magic-nix-cache;
default = magic-nix-cache;

veryLongChain =
let
ctx = ./README.md;

# Function to write the current date to a file
startFile =
pkgs.stdenv.mkDerivation {
name = "start-file";
buildCommand = ''
cat ${ctx} > $out
'';
};

# Recursive function to create a chain of derivations
createChain = n: startFile:
pkgs.stdenv.mkDerivation {
name = "chain-${toString n}";
src =
if n == 0 then
startFile
else createChain (n - 1) startFile;
buildCommand = ''
echo $src > $out
'';
};

in
# Starting point of the chain
createChain 200 startFile;
});

devShells = forEachSupportedSystem ({ pkgs, cranePkgs, lib }: {
Expand All @@ -56,10 +86,15 @@
cargo-bloat
cargo-edit
cargo-udeps
cargo-watch
bacon

age
];
] ++ lib.optionals pkgs.stdenv.isDarwin (with pkgs.darwin.apple_sdk.frameworks; [
SystemConfiguration
]);

NIX_CFLAGS_LINK = lib.optionalString pkgs.stdenv.isDarwin "-lc++abi";
};

/*
Expand Down
79 changes: 74 additions & 5 deletions gha-cache/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

use std::fmt;
#[cfg(debug_assertions)]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use async_trait::async_trait;
Expand Down Expand Up @@ -53,6 +54,11 @@ pub enum Error {
#[error("Failed to initialize the client: {0}")]
InitError(Box<dyn std::error::Error + Send + Sync>),

#[error(
"GitHub Actions Cache throttled Magic Nix Cache. Not trying to use it again on this run."
)]
CircuitBreakerTripped,

#[error("Request error: {0}")]
RequestError(#[from] reqwest::Error), // TODO: Better errors

Expand Down Expand Up @@ -96,6 +102,8 @@ pub struct Api {
/// The concurrent upload limit.
concurrency_limit: Arc<Semaphore>,

circuit_breaker_429_tripped: Arc<AtomicBool>,

/// Backend request statistics.
#[cfg(debug_assertions)]
stats: RequestStats,
Expand Down Expand Up @@ -264,11 +272,16 @@ impl Api {
version_hasher,
client,
concurrency_limit: Arc::new(Semaphore::new(MAX_CONCURRENCY)),
circuit_breaker_429_tripped: Arc::new(AtomicBool::from(false)),
#[cfg(debug_assertions)]
stats: Default::default(),
})
}

pub fn circuit_breaker_tripped(&self) -> bool {
self.circuit_breaker_429_tripped.load(Ordering::Relaxed)
}

/// Mutates the cache version/namespace.
pub fn mutate_version(&mut self, data: &[u8]) {
self.version_hasher.update(data);
Expand Down Expand Up @@ -324,6 +337,10 @@ impl Api {
where
S: AsyncRead + Unpin + Send,
{
if self.circuit_breaker_tripped() {
return Err(Error::CircuitBreakerTripped);
}

let mut offset = 0;
let mut futures = Vec::new();
loop {
Expand All @@ -347,6 +364,7 @@ impl Api {
futures.push({
let client = self.client.clone();
let concurrency_limit = self.concurrency_limit.clone();
let circuit_breaker_429_tripped = self.circuit_breaker_429_tripped.clone();
let url = self.construct_url(&format!("caches/{}", allocation.0 .0));

tokio::task::spawn(async move {
Expand Down Expand Up @@ -380,6 +398,8 @@ impl Api {

drop(permit);

circuit_breaker_429_tripped.check_result(&r);

r
})
});
Expand All @@ -401,6 +421,10 @@ impl Api {

/// Downloads a file based on a list of key prefixes.
pub async fn get_file_url(&self, keys: &[&str]) -> Result<Option<String>> {
if self.circuit_breaker_tripped() {
return Err(Error::CircuitBreakerTripped);
}

Ok(self
.get_cache_entry(keys)
.await?
Expand All @@ -419,6 +443,10 @@ impl Api {

/// Retrieves a cache based on a list of key prefixes.
async fn get_cache_entry(&self, keys: &[&str]) -> Result<Option<ArtifactCacheEntry>> {
if self.circuit_breaker_tripped() {
return Err(Error::CircuitBreakerTripped);
}

#[cfg(debug_assertions)]
self.stats.get.fetch_add(1, Ordering::SeqCst);

Expand All @@ -431,6 +459,8 @@ impl Api {
.check_json()
.await;

self.circuit_breaker_429_tripped.check_result(&res);

match res {
Ok(entry) => Ok(Some(entry)),
Err(Error::DecodeError { status, .. }) if status == StatusCode::NO_CONTENT => Ok(None),
Expand All @@ -448,6 +478,10 @@ impl Api {
key: &str,
cache_size: Option<usize>,
) -> Result<ReserveCacheResponse> {
if self.circuit_breaker_tripped() {
return Err(Error::CircuitBreakerTripped);
}

tracing::debug!("Reserving cache for {}", key);

let req = ReserveCacheRequest {
Expand All @@ -466,27 +500,38 @@ impl Api {
.send()
.await?
.check_json()
.await?;
.await;

Ok(res)
self.circuit_breaker_429_tripped.check_result(&res);

res
}

/// Finalizes uploading to a cache.
async fn commit_cache(&self, cache_id: CacheId, size: usize) -> Result<()> {
if self.circuit_breaker_tripped() {
return Err(Error::CircuitBreakerTripped);
}

tracing::debug!("Commiting cache {:?}", cache_id);

let req = CommitCacheRequest { size };

#[cfg(debug_assertions)]
self.stats.post.fetch_add(1, Ordering::SeqCst);

self.client
if let Err(e) = self
.client
.post(self.construct_url(&format!("caches/{}", cache_id.0)))
.json(&req)
.send()
.await?
.check()
.await?;
.await
{
self.circuit_breaker_429_tripped.check_err(&e);
return Err(e);
}

Ok(())
}
Expand Down Expand Up @@ -554,3 +599,27 @@ async fn handle_error(res: reqwest::Response) -> Error {

Error::ApiError { status, info }
}

trait AtomicCircuitBreaker {
fn check_err(&self, e: &Error);
fn check_result<T>(&self, r: &std::result::Result<T, Error>);
}

impl AtomicCircuitBreaker for AtomicBool {
fn check_result<T>(&self, r: &std::result::Result<T, Error>) {
if let Err(ref e) = r {
self.check_err(e)
}
}

fn check_err(&self, e: &Error) {
if let Error::ApiError {
status: reqwest::StatusCode::TOO_MANY_REQUESTS,
info: ref _info,
} = e
{
tracing::info!("Disabling GitHub Actions Cache due to 429: Too Many Requests");
self.store(true, Ordering::Relaxed);
}
}
}
2 changes: 1 addition & 1 deletion magic-nix-cache/src/flakehub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ async fn rewrite_github_actions_token(
let token_response: TokenResponse = token_response
.json()
.await
.with_context(|| format!("converting response into json"))?;
.with_context(|| "converting response into json")?;

let new_github_jwt_string = token_response.value;
let netrc_contents = tokio::fs::read_to_string(netrc_path)
Expand Down
6 changes: 6 additions & 0 deletions magic-nix-cache/src/gha.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ async fn worker(
break;
}
Request::Upload(path) => {
if api.circuit_breaker_tripped() {
tracing::trace!("GitHub Actions gave us a 429, so we're done.",);
continue;
}

if !done.insert(path.clone()) {
continue;
}
Expand Down Expand Up @@ -190,6 +195,7 @@ async fn upload_path(

api.upload_file(narinfo_allocation, narinfo.as_bytes())
.await?;

metrics.narinfos_uploaded.incr();

narinfo_negative_cache
Expand Down

0 comments on commit ef5c9ec

Please sign in to comment.