diff --git a/.gitignore b/.gitignore
index 5b70ae1..8dab2b1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,6 @@
*.generation.properties
+*.user.generation.properties
+
loader/logs
logs
diff --git a/dataset-generation/pom.xml b/dataset-generation/pom.xml
index 61b723d..67d60f8 100644
--- a/dataset-generation/pom.xml
+++ b/dataset-generation/pom.xml
@@ -14,7 +14,7 @@
Dataset Generation API
Europeana Dataset API generation module
0.1-SNAPSHOT
- war
+ jar
21
@@ -75,6 +75,8 @@
+
+ ${artifactId}
org.springframework.boot
diff --git a/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/DatasetGenerationApp.java b/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/DatasetGenerationApp.java
index c829ea2..9d3f3a2 100644
--- a/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/DatasetGenerationApp.java
+++ b/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/DatasetGenerationApp.java
@@ -8,6 +8,7 @@
import eu.europeana.api.dataset.generation.service.DatasetGenerationExecutor;
import eu.europeana.api.dataset.generation.service.ScheduleDatasetService;
import jakarta.annotation.Resource;
+import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.boot.WebApplicationType;
@@ -20,7 +21,6 @@
import java.util.*;
import static eu.europeana.api.dataset.generation.utils.AppConfigConstants.BEAN_BATCH_SCHEDULED_DATASET_SERVICE;
-import static eu.europeana.api.dataset.generation.utils.AppConfigConstants.LAST_HARVEST_DATE_BEAN;
/**
* Main application. Allows deploying as a war and logs instance data when deployed in Cloud Foundry
@@ -44,9 +44,6 @@ public class DatasetGenerationApp extends TaskletSupport {
@Resource
GeneratorSettings settings;
- @Resource(name = LAST_HARVEST_DATE_BEAN)
- Date lastHarvestDate;
-
/**
* Starts the application workflow for dataset generation. This method is triggered upon the application
* being fully initialized and ready to process.
@@ -55,6 +52,9 @@ public class DatasetGenerationApp extends TaskletSupport {
*
* 1. Retrieves the last harvest date from a specified file to determine the datasets to be processed.
* - If no last harvest date is found, logs a message indicating that all datasets will be harvested.
+ * - If a forced harvest is specified, logs a message indicating that all/ALL datasets will be harvested.
+ * -If a specific set of datasets is specified, logs a message indicating which datasets will be harvested.
+ *
* 2. Reads datasets from the Search API based on the retrieved last harvest date.
* 3. Maps the response and stores into H2 (in memory DB)
* 4. Executes the scheduled dataset processing using the dataset generation executor.
@@ -65,18 +65,10 @@ public class DatasetGenerationApp extends TaskletSupport {
public void start() throws EuropeanaApiException {
LOG.info("Starting Dataset Generation App ...");
- if (lastHarvestDate == null) {
- LOG.info("No previous harvest date found, All the datasets will be harvested .....");
- }
-
- if (settings.isForceHarvest()) {
- lastHarvestDate = null;
- LOG.info("Forced harvest of all the datasets. datasetsToHarvest: {}", settings.getDatasetToHarvest() );
- }
-
- List datasetToSchedule = searchApiDatasetReader.getDataset(lastHarvestDate);
+ Date lastHarvestDate = resolveLastHarvestDate();
+ List datasetsToSchedule = fetchDatasets(lastHarvestDate);
- scheduleDatasetService.scheduleDatasetsForDownload(datasetToSchedule);
+ scheduleDatasetService.scheduleDatasetsForDownload(datasetsToSchedule);
datasetGenerationExecutor.runScheduledDatasets();
}
@@ -105,8 +97,55 @@ public static void main(String[] args) {
// 😈 wait for completion of scheduled tasks execution
// TODO ADD logic to await for the scheduled dataset completion
-//
context.close();
System.exit(0);
}
+
+
+ /**
+ * Resolves the last harvest date to determine the datasets that need to be processed.
+ *
+ * @return The last harvest date retrieved from the specified file. Returns {@code null}
+ * if a forced harvest is specified or no previous harvest date is found.
+ */
+ private Date resolveLastHarvestDate() {
+ if (settings.isForceHarvest()) {
+ LOG.info("Forced harvest of all datasets. datasetsToHarvest: {}", settings.getDatasetToHarvest());
+ return null;
+ }
+
+ Date lastHarvestDate = TaskletSupport.getLastHarvestDate(settings.getLastHarvestDateFile());
+ if (lastHarvestDate == null) {
+ LOG.info("No previous harvest date found, all datasets will be harvested.");
+ }
+ return lastHarvestDate;
+ }
+
+ /**
+ * Fetches datasets based on the provided last harvest date or a specific set of datasets
+ * specified in the application settings.
+ *
+ * If the application settings contain a non-blank list of datasets to harvest, only those
+ * datasets will be fetched. Otherwise, datasets modified after the provided last harvest
+ * date will be retrieved.
+ *
+ * @param lastHarvestDate The date used to filter datasets modified after this date. If null,
+ * all available datasets are considered.
+ * @return A list of {@code Dataset} objects representing the datasets retrieved from the
+ * Search API.
+ * @throws EuropeanaApiException If an error occurs while interacting with the Search API.
+ */
+ private List fetchDatasets(Date lastHarvestDate) throws EuropeanaApiException {
+ if (StringUtils.isNotBlank(settings.getDatasetToHarvest())) {
+ List datasetsToHarvest =
+ Arrays.stream(settings.getDatasetToHarvest().split(","))
+ .map(String::trim)
+ .toList();
+
+ LOG.info("Harvest specific datasets: {}", datasetsToHarvest);
+ return searchApiDatasetReader.getDataset(null, datasetsToHarvest);
+ }
+
+ return searchApiDatasetReader.getDataset(lastHarvestDate);
+ }
}
diff --git a/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/config/AppAutoConfig.java b/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/config/AppAutoConfig.java
index 8dc9862..2822364 100644
--- a/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/config/AppAutoConfig.java
+++ b/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/config/AppAutoConfig.java
@@ -15,6 +15,7 @@
import eu.europeana.api.dataset.generation.model.JobParameter;
import eu.europeana.api.dataset.generation.model.ScheduledDataset;
import eu.europeana.api.dataset.generation.processor.DatasetDeletionTasklet;
+import eu.europeana.api.dataset.generation.processor.TaskletSupport;
import eu.europeana.api.dataset.generation.reader.ScheduledDatasetDbReaderJdbc;
import eu.europeana.api.dataset.generation.reader.SearchApiDatasetReader;
import eu.europeana.api.dataset.generation.service.ScheduleDatasetService;
@@ -47,14 +48,12 @@
import javax.sql.DataSource;
import javax.xml.transform.TransformerFactory;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
+import java.io.File;
+import java.nio.file.*;
import java.time.Instant;
+import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
import java.util.*;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import static eu.europeana.api.dataset.generation.utils.AppConfigConstants.*;
@@ -193,7 +192,7 @@ public FileDeletionService getFileDeletionService() {
public DatasetDeletionTasklet getDeletionTasklet() {
return new DatasetDeletionTasklet(
settings.getSnapshotFile(),
- settings.getCsvReportPath(),
+ getStatusReportCsvPath(),
applicationContext.getBean(SearchApiDatasetReader.class),
new FileDeletionService(settings.getDatasetsFolder())) ;
}
@@ -248,7 +247,7 @@ public DataSourceInitializer batchSchemaInitializer(DataSource dataSource) {
public DatasetReportListener getdDatasetReportListener() {
return new DatasetReportListener(
settings.isForceHarvest(),
- getLastHarvestDate(),
+ TaskletSupport.getLastHarvestDate(settings.getLastHarvestDateFile()),
settings.getSnapshotFile(),
settings.getDatasetsFolder());
}
@@ -265,7 +264,7 @@ public ItemWriter getFlatFileItemWriter() {
FlatFileItemWriter writer = new FlatFileItemWriter<>();
writer.setName("DatasetStatusReport");
- writer.setResource(new FileSystemResource(settings.getCsvReportPath()));
+ writer.setResource(new FileSystemResource(getStatusReportCsvPath()));
writer.setAppendAllowed(true);
writer.setHeaderCallback(w -> w.write(CSV_REPORT_HEADER));
@@ -305,24 +304,30 @@ public DelimitedLineAggregator lineAggregator(
return aggregator;
}
+
/**
- * Retrieves the last harvest date from the specified file. The file is expected to contain
- * a single ISO-8601 formatted date string. If the file cannot be read or contains invalid
- * data, an error is logged and null is returned.
+ * Generates an absolute file path for a CSV report within the specified directory.
+ * The method ensures the generated file name does not overwrite any existing file
+ * by appending an incremented counter if a file with the same base name exists.
*
- * @return the last harvest date as a {@link Date} object if successfully parsed, or null
- * if an error occurs or the date is invalid.
+ * @return the absolute path of the generated CSV report file.
*/
- @Bean(LAST_HARVEST_DATE_BEAN)
- public Date getLastHarvestDate() {
- try {
- String content = Files.readString(Path.of(settings.getLastHarvestDateFile())).trim();
- return Date.from(Instant.parse(content));
- } catch (IOException e) {
- LOG.error("Error reading last harvest date file - {}", e.getMessage());
+ @Bean(STATUS_REPORT_CSV_PATH_BEAN)
+ public String getStatusReportCsvPath() {
+ String baseName = "status_" +
+ LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy_MM_dd"));
+ String extension = ".csv";
+
+ File file = new File(settings.getDatasetsFolder() + baseName + extension);
+
+ int counter = 1;
+ while (file.exists()) {
+ file = new File(settings.getDatasetsFolder() + baseName + "_" + counter + extension);
+ counter++;
}
- return null;
- }
+ LOG.info("Status report CSV file path: {}", file.getAbsolutePath());
+ return file.getAbsolutePath();
+ }
}
diff --git a/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/config/GeneratorSettings.java b/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/config/GeneratorSettings.java
index 0782ab4..0ae9194 100644
--- a/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/config/GeneratorSettings.java
+++ b/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/config/GeneratorSettings.java
@@ -16,8 +16,9 @@
* @since 23 Feb 2026
*/
@Configuration
-@PropertySource("classpath:dataset.generation.properties")
-@PropertySource(value = "classpath:dataset.generation.user.properties", ignoreResourceNotFound = true)
+@PropertySource(
+ value = {"classpath:dataset.generation.properties", "classpath:dataset.generation.user.properties"},
+ ignoreResourceNotFound = true)
public class GeneratorSettings {
@Value("${search.api.url}")
@@ -94,19 +95,18 @@ public String getSnapshotFile() {
return getDatasetsFolder() + snapshotFile;
}
- public String getCsvReportPath() {
- return getDatasetsFolder() +
- "status_" +
- LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy_MM_dd")) +
- ".csv";
- }
-
public String getFailedSetsFile() {
return getDatasetsFolder() + "failed-sets.txt";
}
+ /**
+ * Determines if the harvest should be forced for all datasets.
+ * Checks whether the dataset to harvest is set to "ALL", ignoring case.
+ *
+ * @return true if the dataset to harvest is "ALL", false otherwise.
+ */
public boolean isForceHarvest() {
- return StringUtils.equalsIgnoreCase(getDatasetToHarvest(), "ALL");
+ return StringUtils.isNotEmpty(getDatasetToHarvest()) && StringUtils.equalsIgnoreCase(getDatasetToHarvest(), "ALL");
}
public String getLastHarvestDateFile() {
diff --git a/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/format/DataFormatter.java b/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/format/DataFormatter.java
index 5543dc5..6528233 100644
--- a/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/format/DataFormatter.java
+++ b/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/format/DataFormatter.java
@@ -18,11 +18,12 @@ public interface DataFormatter {
* The method transforms the metadata into a specific output format and streams the formatted
* content to the given output stream.
*
+ * @param recordId the identifier of the record being written
* @param metadata the {@link Document} containing metadata to be formatted and written
* @param zipOut the {@link OutputStream} to which the formatted metadata is written
* @throws EuropeanaApiException if an error occurs during the metadata transformation or writing process
*/
- void write(Document metadata, OutputStream zipOut) throws EuropeanaApiException;
+ void write(String recordId, Document metadata, OutputStream zipOut) throws EuropeanaApiException;
/**
* Retrieves the file extension associated with the output format handled by this formatter.
diff --git a/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/format/impl/TurtleFormatter.java b/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/format/impl/TurtleFormatter.java
index 7140b9b..10d4f64 100644
--- a/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/format/impl/TurtleFormatter.java
+++ b/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/format/impl/TurtleFormatter.java
@@ -10,11 +10,10 @@
import org.apache.jena.rdf.model.Model;
import org.apache.jena.rdf.model.ModelFactory;
import org.apache.jena.rdfxml.xmlinput1.DOM2Model;
+import org.apache.jena.shared.JenaException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.w3c.dom.Document;
-import org.xml.sax.SAXParseException;
-
import javax.xml.transform.TransformerFactory;
/**
@@ -40,7 +39,7 @@ public record TurtleFormatter(TransformerFactory transformerFactory) implements
// TODO find a solution to not close zip
@Override
- public void write(Document metadata, OutputStream zipOut) throws EuropeanaApiException {
+ public void write(String recordId, Document metadata, OutputStream zipOut) throws EuropeanaApiException {
OutputStream safeOut = new FilterOutputStream(zipOut) {
@Override
public void close() throws IOException {
@@ -49,9 +48,16 @@ public void close() throws IOException {
};
try (TurtleRecordWriter writer = new TurtleRecordWriter(safeOut)) {
- writer.write(toModel(metadata));
+ Model m = toModel(metadata);
+ if (m != null) {
+ writer.write(m);
+ } else {
+ LOG.error("Skipping record - {} , due to invalid data found - " , recordId);
+ }
} catch (IOException e) {
throw new DataFormatterException("Error writing the metadata in turtle format " + e.getMessage(), e);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ LOG.error("Error disabling the errorForSpaceInURI field in ReaderRDFXML_ARP1 for the TurtleRecordWriter " + e.getMessage(), e);
}
}
@@ -69,7 +75,12 @@ public static Model toModel(Document doc) throws DataFormatterException {
DOM2Model dom2Model = DOM2Model.createD2M("", m);
dom2Model.load(doc);
return m;
- } catch (SAXParseException e) {
+ } catch (Exception e) {
+ // for all invalid jena errors, skip those records and Log them
+ if (e instanceof JenaException) {
+ LOG.error("Invalid data found - " + e.getMessage(), e);
+ return null;
+ }
throw new DataFormatterException("Error converting document to Jena model - " + e.getMessage(), e);
}
}
diff --git a/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/format/impl/XMLFormatter.java b/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/format/impl/XMLFormatter.java
index 3cbd38f..c007300 100644
--- a/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/format/impl/XMLFormatter.java
+++ b/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/format/impl/XMLFormatter.java
@@ -26,7 +26,7 @@
public record XMLFormatter(TransformerFactory transformerFactory) implements DataFormatter {
@Override
- public void write(Document metadata, OutputStream zipOut) throws EuropeanaApiException {
+ public void write(String recordId, Document metadata, OutputStream zipOut) throws EuropeanaApiException {
try {
Transformer transformer = transformerFactory.newTransformer();
transformer.transform(new DOMSource(metadata), new StreamResult(zipOut));
diff --git a/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/listener/DatasetReportListener.java b/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/listener/DatasetReportListener.java
index efb3157..217329f 100644
--- a/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/listener/DatasetReportListener.java
+++ b/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/listener/DatasetReportListener.java
@@ -133,6 +133,8 @@ public FileMetadata extractFileMetadata(String fileName) {
}
+ // TODO now with ATOMIC_SWAP , creation date will be updated as well everytime,
+ // so we probably need to remove that and only rely on modified_date
public class FileMetadata {
private final Date creationDate;
private final Date modifiedDate;
diff --git a/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/processor/BaseProcessor.java b/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/processor/BaseProcessor.java
index 67fa182..d95b196 100644
--- a/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/processor/BaseProcessor.java
+++ b/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/processor/BaseProcessor.java
@@ -5,6 +5,13 @@
import org.jspecify.annotations.Nullable;
import org.springframework.batch.item.ItemProcessor;
+import java.io.IOException;
+import java.nio.file.AtomicMoveNotSupportedException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.List;
+
/**
* Abstract base class for processing {@link ScheduledDataset} objects.
*
@@ -22,4 +29,31 @@ public abstract class BaseProcessor implements ItemProcessor tempFiles, List targetFiles) throws IOException {
+ for (int i = 0; i < tempFiles.size(); i++) {
+ Path temp = tempFiles.get(i);
+ Path target = targetFiles.get(i);
+
+ try {
+ Files.move(temp, target,
+ StandardCopyOption.ATOMIC_MOVE,
+ StandardCopyOption.REPLACE_EXISTING);
+ } catch (AtomicMoveNotSupportedException e) {
+ // fallback if FS doesn't support atomic move
+ Files.move(temp, target,
+ StandardCopyOption.REPLACE_EXISTING);
+ }
+ }
+ }
}
diff --git a/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/processor/JobCompletionTasklet.java b/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/processor/JobCompletionTasklet.java
index 3c35f03..e38762c 100644
--- a/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/processor/JobCompletionTasklet.java
+++ b/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/processor/JobCompletionTasklet.java
@@ -1,5 +1,6 @@
package eu.europeana.api.dataset.generation.processor;
+import com.fasterxml.jackson.databind.ObjectMapper;
import eu.europeana.api.commons_sb3.slack.SlackConnection;
import eu.europeana.api.dataset.generation.config.GeneratorSettings;
import eu.europeana.api.dataset.generation.service.ScheduleDatasetService;
@@ -25,6 +26,8 @@
import java.util.Map;
import java.util.stream.Collectors;
+import static eu.europeana.api.dataset.generation.utils.AppConfigConstants.STATUS_REPORT_CSV_PATH_BEAN;
+
/**
* The DatasetReportTasklet class is a Spring Batch Tasklet implementation that performs two main tasks:
* - Updates the last harvest date to a file in the specified location.
@@ -41,6 +44,8 @@ public class JobCompletionTasklet extends TaskletSupport implements Tasklet {
public static final String HARVEST_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'";
+ private ObjectMapper objectMapper = new ObjectMapper();
+
@Resource
SlackConnection slackConnection;
@@ -50,11 +55,14 @@ public class JobCompletionTasklet extends TaskletSupport implements Tasklet {
@Resource
ScheduleDatasetService scheduleDatasetService;
+ @Resource(name = STATUS_REPORT_CSV_PATH_BEAN)
+ String statusReportCsvFile;
+
@Override
public @Nullable RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
updateLastHarvestDate();
updateFailedSetsFile();
- slackConnection.publishStatusReport(buildSlackMessage(Path.of(settings.getCsvReportPath())).toString());
+ slackConnection.publishStatusReport(buildSlackMessage());
LOG.info("Job completed successfully...!!");
return RepeatStatus.FINISHED;
}
@@ -120,11 +128,11 @@ public void updateFailedSetsFile() {
* The message includes a dataset report header, a summary of status counts,
* and a preview of a table containing dataset information.
*
- * @param csvPath the path to the CSV file containing dataset information
* @return a map representing the message structure in Slack block format
* @throws IOException if an error occurs while reading the CSV file
*/
- public Map buildSlackMessage(Path csvPath) throws IOException {
+ public String buildSlackMessage() throws IOException {
+ Path csvPath = Path.of(statusReportCsvFile);
Map counts = countStatus(csvPath);
String table = buildTable(csvPath, 10);// max 10 rows for now, a full report will be attached
long total = scheduleDatasetService.count();
@@ -137,14 +145,17 @@ public Map buildSlackMessage(Path csvPath) throws IOException {
blocks.add(section("📊 *Dataset Report*")); // Header
blocks.add(section(total + " datasets were processed "));
- blocks.add(section("Status Overview"));
+ blocks.add(section("Status Overview "));
blocks.add(section("```" + summary + "```")); // Summary block
+ blocks.add(section("Full report <" + csvPath + "|here>"));
blocks.add(section("```" + table + "```")); // Table preview
+ String jsonPayload = objectMapper.writeValueAsString(Map.of("blocks", blocks));
+
if (LOG.isTraceEnabled()) {
- LOG.trace("Slack message blocks: {}", blocks);
+ LOG.trace("Slack message blocks: {}", jsonPayload);
}
- return Map.of("blocks", blocks);
+ return jsonPayload;
}
/**
diff --git a/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/processor/OaiPmhZipProcessor.java b/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/processor/OaiPmhZipProcessor.java
index bbc4b8f..232fec4 100644
--- a/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/processor/OaiPmhZipProcessor.java
+++ b/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/processor/OaiPmhZipProcessor.java
@@ -13,8 +13,13 @@
import org.springframework.stereotype.Component;
import java.io.File;
+import java.nio.file.AtomicMoveNotSupportedException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import static eu.europeana.api.dataset.generation.utils.AppConfigConstants.DATA_FORMATS_BEAN;
@@ -51,10 +56,19 @@ public class OaiPmhZipProcessor extends BaseProcessor {
ScheduledDataset doProcessing(ScheduledDataset dataset) throws Exception {
List sinks = new ArrayList<>();
+ List tempFiles = new ArrayList<>();
+ List targetFiles = new ArrayList<>();
for (var entry : formats.entrySet()) {
- File zipFile = new File(createFolder(entry.getKey().name()), dataset.getDatasetId() + ".zip");
- sinks.add(new ZipRecordSink(dataset.getDatasetId(), zipFile, entry.getValue()));
+ Path dir = createFolder(entry.getKey()).toPath();
+
+ Path target = dir.resolve(dataset.getDatasetId() + ".zip");
+ Path temp = Files.createTempFile(dir, dataset.getDatasetId() + ".", ".zip.tmp");
+
+ tempFiles.add(temp);
+ targetFiles.add(target);
+
+ sinks.add(new ZipRecordSink(dataset.getDatasetId(), temp.toFile(), entry.getValue()));
}
try (MultiRecordSink multiSink = new MultiRecordSink(sinks)) {
@@ -62,22 +76,39 @@ ScheduledDataset doProcessing(ScheduledDataset dataset) throws Exception {
scheduleDatasetService.updateFailedRecords(dataset, failedRecords); // update failedRecords and hasBeenProcessed
}
+ // 🔥 Atomic swap happens ONLY after all sinks are closed
+ performAtomicSwap(tempFiles, targetFiles);
+
return dataset;
}
/**
* Creates a new folder within the datasets directory if it does not already exist.
*
- * @param folderName the name of the folder to be created
+ * @param rdfFormat rdf format of the folder
* @return the {@link File} object representing the folder
*/
- private File createFolder(String folderName) {
- File folder = new File(settings.getDatasetsFolder(), folderName);
+ private File createFolder(RdfFormat rdfFormat) {
+ File folder = new File(settings.getDatasetsFolder(), getFolderName(rdfFormat));
if (!folder.exists() ) {
folder.mkdirs();
}
return folder;
}
+
+ /**
+ * Determines the folder name based on the provided RDF format.
+ * If the RDF format is XML, the folder name corresponds to the format's name.
+ * Otherwise, the alternative name of the RDF format is used in uppercase.
+ *
+ * This is to synchronize the turtle format folder name to TTL and not TURTLE
+ * @param rdfFormat the RDF format used to determine the folder name
+ * @return the folder name as a string, based on the given RDF format
+ */
+ private String getFolderName(RdfFormat rdfFormat) {
+ return rdfFormat.equals(RdfFormat.XML) ? rdfFormat.name() : rdfFormat.getExtension().toUpperCase(Locale.ROOT);
+ }
+
}
diff --git a/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/processor/TaskletSupport.java b/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/processor/TaskletSupport.java
index 148dfdb..6ea81bf 100644
--- a/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/processor/TaskletSupport.java
+++ b/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/processor/TaskletSupport.java
@@ -10,10 +10,8 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.BasicFileAttributes;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
+import java.time.Instant;
+import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -24,6 +22,25 @@ public class TaskletSupport {
private static final Logger LOG = LogManager.getLogger(TaskletSupport.class);
+
+ /**
+ * Retrieves the last harvest date from the specified file. The file is expected to contain
+ * a single ISO-8601 formatted date string. If the file cannot be read or contains invalid
+ * data, an error is logged and null is returned.
+ *
+ * @return the last harvest date as a {@link Date} object if successfully parsed, or null
+ * if an error occurs or the date is invalid.
+ */
+ public static Date getLastHarvestDate(String lastHarvestFile) {
+ try {
+ String content = Files.readString(Path.of(lastHarvestFile)).trim();
+ return Date.from(Instant.parse(content));
+ } catch (IOException e) {
+ LOG.error("Error reading last harvest date file - {}", e.getMessage());
+ }
+ return null;
+ }
+
/**
* Loads a snapshot of dataset identifiers from a file. If the file does not exist,
* an empty set is returned.
@@ -120,8 +137,8 @@ public static Map countStatus(Path csvPath) throws IOException {
public static String buildTable(Path csvPath, int maxRows) throws IOException {
StringBuilder sb = new StringBuilder();
- sb.append(String.format("%-12s %-10s %-10s %-10s%n",
- "ID", "Status", "Total", "Failed"));
+ sb.append(String.format("%-15s %-15s %-15s %-15s%n\n",
+ "Dataset", "Status", "Total Records", "Failed Records"));
try (Stream lines = Files.lines(csvPath)) {
Iterator it = lines.skip(1).iterator();
@@ -129,7 +146,7 @@ public static String buildTable(Path csvPath, int maxRows) throws IOException {
while (it.hasNext() && count < maxRows) {
String[] c = it.next().split(",");
- sb.append(String.format("%-12s %-10s %-10s %-10s%n",
+ sb.append(String.format("%-15s %-15s %-15s %-15s%n",
c[0], c[1], c[2], c[3]));
count++;
diff --git a/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/reader/SearchApiDatasetReader.java b/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/reader/SearchApiDatasetReader.java
index 51b6e49..3aef1cf 100644
--- a/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/reader/SearchApiDatasetReader.java
+++ b/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/reader/SearchApiDatasetReader.java
@@ -27,9 +27,11 @@
import java.util.*;
import static eu.europeana.api.dataset.generation.utils.AppConfigConstants.SEARCH_RECORD_AUTH_HANDLER;
-import static eu.europeana.api.dataset.generation.utils.ModelConstants.facets;
+import static eu.europeana.api.dataset.generation.utils.ModelConstants.facets;
+
/**
* This class is responsible for reading datasets from the search API.
+ *
* @author Srishti Singh
* @since 23 Feb 2026
*/
@@ -49,7 +51,27 @@ public class SearchApiDatasetReader {
HttpConnection searchApiClient = new HttpConnection(true);
/**
- * Reads datasets from the search API based on the provided timestamp update.
+ * Retrieves a list of datasets updated after the given timestamp and filtered
+ * by the specified dataset IDs to harvest.
+ *
+ * @param timestampUpdate The timestamp to filter datasets. Only datasets updated after
+ * this timestamp will be included.
+ * @param datasetsToHarvest A list of dataset IDs to filter the datasets. If the list is
+ * empty, no filtering by dataset ID will be applied.
+ * @return A list of datasets that match the update timestamp and dataset ID criteria.
+ * @throws EuropeanaApiException If an error occurs during dataset retrieval or filtering.
+ */
+ public List getDataset(Date timestampUpdate, List datasetsToHarvest) throws EuropeanaApiException {
+ List datasets = getDataset(timestampUpdate);
+ if (datasetsToHarvest.size() > 0) {
+ datasets.removeIf(dataset -> !datasetsToHarvest.contains(dataset.getDatasetId()));
+ }
+ return datasets;
+ }
+
+ /**
+ * Reads datasets from the search API based on the provided timestamp update
+ *
* @param timestampUpdate The timestamp update to filter datasets.
* @return List of datasets retrieved from the search API.
* @throws EuropeanaApiException If there's an error during API interaction.
@@ -71,8 +93,8 @@ public List getDataset(Date timestampUpdate) throws EuropeanaApiExcepti
JSONObject field = fields.getJSONObject(i);
//set names from solr are in the form of _ so we split
datasets.add(new Dataset(
- StringUtils.substringBefore(field.getString("label"), "_"),
- field.getLong("count")));
+ StringUtils.substringBefore(field.getString("label"), "_")
+ , field.getLong("count")));
}
}
} else {
diff --git a/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/service/impl/ZipRecordSink.java b/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/service/impl/ZipRecordSink.java
index 73afe20..3ce99f3 100644
--- a/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/service/impl/ZipRecordSink.java
+++ b/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/service/impl/ZipRecordSink.java
@@ -47,7 +47,7 @@ public void consume(Record record) throws EuropeanaApiException {
String entryName = (recordId != null ? getDirectoryName(recordId) : "record_" + counter) + formatter.getFileExtension();
ZipEntry entry = new ZipEntry(entryName);
zipOut.putNextEntry(entry);
- formatter.write(record.metadata, zipOut);
+ formatter.write(recordId, record.metadata, zipOut);
zipOut.closeEntry();
counter++;
} catch (IOException e) {
diff --git a/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/utils/AppConfigConstants.java b/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/utils/AppConfigConstants.java
index c3a5966..add879c 100644
--- a/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/utils/AppConfigConstants.java
+++ b/dataset-generation/src/main/java/eu/europeana/api/dataset/generation/utils/AppConfigConstants.java
@@ -14,7 +14,7 @@ public interface AppConfigConstants {
String SCHEDULED_DATASET_WRITER = "scheduledDatasetWriter";
String SLACK_CONNECTION_BEAN = "slackConnection";
String DATA_FORMATS_BEAN = "dataFormats";
- String LAST_HARVEST_DATE_BEAN = "lastHarvestDate";
+ String STATUS_REPORT_CSV_PATH_BEAN = "statusReportCsvPath";
// other constants
String CSV_REPORT_HEADER = "DatasetId,FileStatus,TotalRecords,FailedRecord";
diff --git a/dataset-generation/src/main/resources/dataset.generation.properties.template b/dataset-generation/src/main/resources/dataset.generation.properties.template
index e1fc123..737f090 100644
--- a/dataset-generation/src/main/resources/dataset.generation.properties.template
+++ b/dataset-generation/src/main/resources/dataset.generation.properties.template
@@ -12,4 +12,7 @@ keycloak.token.grant.params =
dataset.files.location=
snapshot.file=datasets_snapshot.txt
-slack.webhook=
\ No newline at end of file
+slack.webhook=
+
+## Can be set to ALL/all for a force harvest of all datasets. Also specific datasets can be set here. Example : 1533, 536, 1514, 1524
+dataset.to.harvest=
\ No newline at end of file
diff --git a/dataset-generation/src/main/resources/log4j2.xml b/dataset-generation/src/main/resources/log4j2.xml
index 21053b4..e4756f7 100644
--- a/dataset-generation/src/main/resources/log4j2.xml
+++ b/dataset-generation/src/main/resources/log4j2.xml
@@ -15,6 +15,7 @@
+