Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ export(pg_release_config)
export(pg_reset_config)
export(pg_set_config)
export(pg_set_rawfolder)
export(pg_update_checksums)
export(pgcitations)
export(pgout_path)
export(pgsearch)
Expand Down
201 changes: 195 additions & 6 deletions R/build_priogrid.R
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ get_temporal_hash <- function(config = pg_current_config()) {
#' @param type Character string specifying release type (e.g., "05deg_yearly"). Use for official releases.
#' @param spatial_hash Character string with 6-character spatial hash. If NULL, computed from current config.
#' @param temporal_hash Character string with 6-character temporal hash. If NULL, computed from current config.
#' @param config A `pg_config` object. Defaults to [pg_current_config()].
#'
#' @return Character string with file path
#' @export
Expand Down Expand Up @@ -101,6 +102,7 @@ pgout_path <- function(version = NULL,
#' If NULL (default), calculates all available variables.
#' @param overwrite Logical. If FALSE (default), skips variables that already
#' exist in the output folder. If TRUE, recalculates all specified variables.
#' @param config A `pg_config` object. Defaults to [pg_current_config()].
#'
#' @return NULL (invisibly). Called for side effects (saving files).
#' @export
Expand Down Expand Up @@ -177,11 +179,43 @@ calc_pg <- function(varnames = NULL, overwrite = FALSE, config = pg_current_conf
# Calculate variables
message(paste("Variables to calculate:", paste(valid_varnames, collapse = ", ")))
message(paste("Saving to:", save_to))
failed_vars <- character(0)
for (varname in valid_varnames) {
message(paste("Calculating", varname))
gen_fun <- get(paste0("gen_", varname))
r <- gen_fun(config = config)
save_pgvariable(r, varname, save_to = save_to)
result <- tryCatch({
gen_fun <- get(paste0("gen_", varname))
r <- gen_fun(config = config)
save_pgvariable(r, varname, save_to = save_to)
NULL
}, error = function(e) {
is_file_error <- grepl(
"not found|does not exist|No such file|cannot open|missing files|No files in metadata",
conditionMessage(e), ignore.case = TRUE
)
if (is_file_error) {
warning(
sprintf("Skipping '%s': a required source file is missing or unreadable.\n", varname),
" Error: ", conditionMessage(e), "\n",
" Check availability: pg_data_availability()\n",
" Download missing: download_pg_rawdata()",
call. = FALSE)
} else {
warning(
sprintf("Skipping '%s': computation failed.\n", varname),
" Error: ", conditionMessage(e),
call. = FALSE)
}
varname
})
if (!is.null(result)) failed_vars <- c(failed_vars, result)
}

if (length(failed_vars) > 0) {
message(
"calc_pg() completed with ", length(failed_vars), " failure(s): ",
paste(failed_vars, collapse = ", "), "\n",
"Succeeded: ", length(valid_varnames) - length(failed_vars),
" / ", length(valid_varnames))
}

invisible(NULL)
Expand Down Expand Up @@ -222,9 +256,95 @@ save_pgvariable <- function(rast, varname, save_to = pgout_path()) {
}

saveRDS(rast, filepath)
.pg_record_checksum(filepath, varname, save_to)
invisible(NULL)
}

# Internal helper: record an MD5 checksum entry in _checksums.csv
.pg_record_checksum <- function(filepath, label, base_path) {
md5_val <- tools::md5sum(filepath)
checksum_file <- file.path(base_path, "_checksums.csv")
row <- data.frame(varname = label, filename = basename(filepath), md5 = md5_val,
recorded_at = as.character(Sys.time()), stringsAsFactors = FALSE)
if (file.exists(checksum_file)) {
existing <- utils::read.csv(checksum_file, stringsAsFactors = FALSE)
existing <- existing[existing$varname != label, ]
updated <- rbind(existing, row)
} else {
updated <- row
}
utils::write.csv(updated, checksum_file, row.names = FALSE)
invisible(NULL)
}

# Internal admin helper: bulk-bootstrap _checksums.csv for all .rds/.parquet
# files in a resolved output folder. Useful after files have been placed without
# going through save_pgvariable() / read_pg_static() / read_pg_timevarying().
#
# @param config,version,type,spatial_hash,temporal_hash Passed to resolve_pg_mode().
# @param overwrite_existing Logical. If FALSE (default), skips files that already
# have an entry in _checksums.csv. If TRUE, recomputes all entries.
# @return A list with elements `n_added` and `n_updated` (integers), invisibly.
# @keywords internal
.pg_bootstrap_checksums <- function(config = NULL,
version = NULL,
type = "05deg_yearly",
spatial_hash = NULL,
temporal_hash = NULL,
overwrite_existing = FALSE) {

cfg <- resolve_pg_mode(config, version, type, spatial_hash, temporal_hash)
base_path <- cfg$base_path

if (!dir.exists(base_path)) {
stop("Output directory does not exist: ", base_path, call. = FALSE)
}

target_files <- list.files(base_path, pattern = "\\.(rds|parquet)$", full.names = FALSE)

if (length(target_files) == 0) {
message("No .rds or .parquet files found in: ", base_path)
return(invisible(list(n_added = 0L, n_updated = 0L)))
}

checksum_file <- file.path(base_path, "_checksums.csv")
existing_labels <- if (file.exists(checksum_file)) {
utils::read.csv(checksum_file, stringsAsFactors = FALSE)$varname
} else {
character(0)
}

n_added <- 0L
n_updated <- 0L

for (fname in target_files) {
label <- if (grepl("\\.rds$", fname)) tools::file_path_sans_ext(fname) else fname
already_exists <- label %in% existing_labels

if (already_exists && !overwrite_existing) next

.pg_record_checksum(file.path(base_path, fname), label, base_path)

if (already_exists) {
n_updated <- n_updated + 1L
} else {
n_added <- n_added + 1L
existing_labels <- c(existing_labels, label)
}
}

message(
"Bootstrap complete for: ", base_path, "\n",
" Added: ", n_added, " new checksum(s)\n",
" Updated: ", n_updated, " existing checksum(s)\n",
" Skipped: ", length(target_files) - n_added - n_updated,
" already-present checksum(s) (use overwrite_existing=TRUE to force)"
)

invisible(list(n_added = n_added, n_updated = n_updated))
}


#' Load a PRIO-GRID variable
#'
#' Loads a PRIO-GRID variable from disk and returns it as a terra SpatRaster.
Expand All @@ -245,6 +365,8 @@ save_pgvariable <- function(rast, varname, save_to = pgout_path()) {
#' temporal_hash. Loads from the specified custom folder directly.
#' @param temporal_hash Character string with 6-character temporal hash. Requires
#' spatial_hash.
#' @param verify_checksums Logical. If TRUE, verifies the file's MD5 checksum
#' against stored values. Default FALSE.
#'
#' @return Terra SpatRaster object
#' @export
Expand All @@ -271,7 +393,8 @@ load_pgvariable <- function(varname,
version = NULL,
type = "05deg_yearly",
spatial_hash = NULL,
temporal_hash = NULL) {
temporal_hash = NULL,
verify_checksums = FALSE) {
rlang::check_installed("terra", reason = "to load PRIO-GRID variable rasters")

has_hashes <- !is.null(spatial_hash) && !is.null(temporal_hash)
Expand Down Expand Up @@ -316,6 +439,26 @@ load_pgvariable <- function(varname,
stop("Variable '", varname, "' not found at: ", filepath)
}

if (isTRUE(verify_checksums)) {
cs_file <- file.path(dirname(filepath), "_checksums.csv")
if (file.exists(cs_file)) {
cs_df <- utils::read.csv(cs_file, stringsAsFactors = FALSE)
expected <- cs_df[cs_df$varname == varname, ]
if (nrow(expected) == 1) {
actual <- tools::md5sum(filepath)
if (actual != expected$md5) {
warning(
"Checksum mismatch for '", varname, "': the file may be corrupted.\n",
" Expected MD5: ", expected$md5, "\n",
" Actual MD5: ", actual, "\n",
" File: ", filepath, "\n",
" Recalculate with: calc_pg('", varname, "', overwrite = TRUE)",
call. = FALSE)
}
}
}
}

terra::unwrap(readRDS(filepath))
}

Expand Down Expand Up @@ -438,6 +581,8 @@ resolve_pg_mode <- function(config = NULL,
#' @param test Logical. If TRUE, prints coverage summary for each variable.
#' @param overwrite Logical. If FALSE (default) and cached file exists, returns
#' cached data. If TRUE, rebuilds from individual variables.
#' @param verify_checksums Logical. If TRUE, verifies checksums of cached files
#' against stored MD5 values. Default FALSE.
#'
#' @return data.table with pgid as rows and variables as columns, or list of
#' terra SpatRasters if as_raster=TRUE
Expand Down Expand Up @@ -465,7 +610,8 @@ read_pg_static <- function(config = NULL,
temporal_hash = NULL,
as_raster = FALSE,
test = FALSE,
overwrite = FALSE) {
overwrite = FALSE,
verify_checksums = FALSE) {

cfg <- resolve_pg_mode(config, version, type, spatial_hash, temporal_hash, overwrite)

Expand All @@ -481,6 +627,24 @@ read_pg_static <- function(config = NULL,

# Return cached if available (lightweight path — no terra needed)
if (!as_raster && !test && file.exists(fname) && !cfg$overwrite) {
if (isTRUE(verify_checksums)) {
cs_file <- file.path(cfg$base_path, "_checksums.csv")
if (file.exists(cs_file)) {
cs_df <- utils::read.csv(cs_file, stringsAsFactors = FALSE)
expected <- cs_df[cs_df$varname == "pg_static.parquet", ]
if (nrow(expected) == 1) {
actual <- tools::md5sum(fname)
if (actual != expected$md5) {
warning(
"Checksum mismatch for cached 'pg_static.parquet': the file may be corrupted.\n",
" Expected MD5: ", expected$md5, "\n",
" Actual MD5: ", actual, "\n",
" Rebuild with: read_pg_static(overwrite = TRUE)",
call. = FALSE)
}
}
}
}
return(nanoparquet::read_parquet(fname))
}

Expand Down Expand Up @@ -533,6 +697,7 @@ read_pg_static <- function(config = NULL,
# Save to cache
rlang::check_installed("arrow", reason = "to write parquet files")
arrow::write_parquet(df, fname)
.pg_record_checksum(fname, "pg_static.parquet", cfg$base_path)

return(df)
}
Expand All @@ -558,6 +723,8 @@ read_pg_static <- function(config = NULL,
#' @param test Logical. If TRUE, returns coverage summary data.frame.
#' @param overwrite Logical. If FALSE (default) and cached file exists, returns
#' cached data. If TRUE, rebuilds from individual variables.
#' @param verify_checksums Logical. If TRUE, verifies checksums of cached files
#' against stored MD5 values. Default FALSE.
#'
#' @return data.table with pgid + measurement_date as rows and variables as columns,
#' or list of terra SpatRasters if as_raster=TRUE, or coverage test data.frame if test=TRUE
Expand All @@ -582,7 +749,8 @@ read_pg_timevarying <- function(config = NULL,
temporal_hash = NULL,
as_raster = FALSE,
test = FALSE,
overwrite = FALSE) {
overwrite = FALSE,
verify_checksums = FALSE) {

cfg <- resolve_pg_mode(config, version, type, spatial_hash, temporal_hash, overwrite)

Expand All @@ -598,6 +766,24 @@ read_pg_timevarying <- function(config = NULL,

# Return cached if available (lightweight path — no terra needed)
if (!as_raster && !test && file.exists(fname) && !cfg$overwrite) {
if (isTRUE(verify_checksums)) {
cs_file <- file.path(cfg$base_path, "_checksums.csv")
if (file.exists(cs_file)) {
cs_df <- utils::read.csv(cs_file, stringsAsFactors = FALSE)
expected <- cs_df[cs_df$varname == "pg_timevarying.parquet", ]
if (nrow(expected) == 1) {
actual <- tools::md5sum(fname)
if (actual != expected$md5) {
warning(
"Checksum mismatch for cached 'pg_timevarying.parquet': the file may be corrupted.\n",
" Expected MD5: ", expected$md5, "\n",
" Actual MD5: ", actual, "\n",
" Rebuild with: read_pg_timevarying(overwrite = TRUE)",
call. = FALSE)
}
}
}
}
return(nanoparquet::read_parquet(fname))
}

Expand Down Expand Up @@ -669,6 +855,7 @@ read_pg_timevarying <- function(config = NULL,
compress = "gzip")
rlang::check_installed("arrow", reason = "to write parquet files")
arrow::write_parquet(df, fname)
.pg_record_checksum(fname, "pg_timevarying.parquet", cfg$base_path)

return(df)
}
Expand Down Expand Up @@ -817,6 +1004,8 @@ pg_list_custom <- function() {
#' @param version Character string with release version
#' @param type Character string with release type (default: "05deg_yearly")
#' @param overwrite Logical. If TRUE, re-downloads even if file exists.
#' @param list_releases Logical. If TRUE, prints and returns a data.frame of
#' available releases instead of downloading. Default FALSE.
#'
#' @return NULL (invisibly). Called for side effects (downloading data).
#' @export
Expand Down
10 changes: 7 additions & 3 deletions R/config.R
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ pg_config <- function(nrow = 360L,
start_date = as.Date("1850-12-31"),
end_date = Sys.Date(),
verbose = TRUE,
automatic_download = TRUE) {
automatic_download = TRUE,
verify_checksums = FALSE) {

if (!is.numeric(extent) || length(extent) != 4) stop("extent must be a numeric vector of length 4")

Expand All @@ -48,7 +49,8 @@ pg_config <- function(nrow = 360L,
start_date = as.Date(start_date),
end_date = if (identical(end_date, "today")) Sys.Date() else as.Date(end_date),
verbose = as.logical(verbose),
automatic_download = as.logical(automatic_download)
automatic_download = as.logical(automatic_download),
verify_checksums = as.logical(verify_checksums)
)

validate_pg_config(cfg)
Expand Down Expand Up @@ -87,7 +89,8 @@ pg_set_config <- function(...) {
updates <- list(...)

valid_fields <- c("nrow", "ncol", "crs", "extent", "temporal_resolution",
"start_date", "end_date", "verbose", "automatic_download")
"start_date", "end_date", "verbose", "automatic_download",
"verify_checksums")
invalid <- setdiff(names(updates), valid_fields)
if (length(invalid) > 0) {
stop("Unknown config fields: ", paste(invalid, collapse = ", "),
Expand Down Expand Up @@ -237,6 +240,7 @@ validate_pg_config <- function(cfg) {
if (!inherits(cfg$end_date, "Date")) stop("end_date must be a Date")
if (!is.logical(cfg$verbose) || is.na(cfg$verbose)) stop("verbose must be logical")
if (!is.logical(cfg$automatic_download) || is.na(cfg$automatic_download)) stop("automatic_download must be logical")
if (!is.logical(cfg$verify_checksums) || is.na(cfg$verify_checksums)) stop("verify_checksums must be logical")
invisible(NULL)
}

Expand Down
Loading