Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
*.generation.properties
*.user.generation.properties


loader/logs
logs
Expand Down
4 changes: 3 additions & 1 deletion dataset-generation/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<name>Dataset Generation API</name>
<description>Europeana Dataset API generation module</description>
<version>0.1-SNAPSHOT</version>
<packaging>war</packaging>
<packaging>jar</packaging>

<properties>
<maven.compiler.source>21</maven.compiler.source>
Expand Down Expand Up @@ -75,6 +75,8 @@
</dependencies>

<build>
<!--this removes the version number from the deployed archive-->
<finalName>${artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import eu.europeana.api.dataset.generation.service.DatasetGenerationExecutor;
import eu.europeana.api.dataset.generation.service.ScheduleDatasetService;
import jakarta.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.boot.WebApplicationType;
Expand All @@ -20,7 +21,6 @@
import java.util.*;

import static eu.europeana.api.dataset.generation.utils.AppConfigConstants.BEAN_BATCH_SCHEDULED_DATASET_SERVICE;
import static eu.europeana.api.dataset.generation.utils.AppConfigConstants.LAST_HARVEST_DATE_BEAN;

/**
* Main application. Allows deploying as a war and logs instance data when deployed in Cloud Foundry
Expand All @@ -44,9 +44,6 @@ public class DatasetGenerationApp extends TaskletSupport {
@Resource
GeneratorSettings settings;

@Resource(name = LAST_HARVEST_DATE_BEAN)
Date lastHarvestDate;

/**
* Starts the application workflow for dataset generation. This method is triggered upon the application
* being fully initialized and ready to process.
Expand All @@ -55,6 +52,9 @@ public class DatasetGenerationApp extends TaskletSupport {
*
* 1. Retrieves the last harvest date from a specified file to determine the datasets to be processed.
* - If no last harvest date is found, logs a message indicating that all datasets will be harvested.
* - If a forced harvest is specified, logs a message indicating that all/ALL datasets will be harvested.
* -If a specific set of datasets is specified, logs a message indicating which datasets will be harvested.
*
* 2. Reads datasets from the Search API based on the retrieved last harvest date.
* 3. Maps the response and stores into H2 (in memory DB)
* 4. Executes the scheduled dataset processing using the dataset generation executor.
Expand All @@ -65,18 +65,10 @@ public class DatasetGenerationApp extends TaskletSupport {
public void start() throws EuropeanaApiException {
LOG.info("Starting Dataset Generation App ...");

if (lastHarvestDate == null) {
LOG.info("No previous harvest date found, All the datasets will be harvested .....");
}

if (settings.isForceHarvest()) {
lastHarvestDate = null;
LOG.info("Forced harvest of all the datasets. datasetsToHarvest: {}", settings.getDatasetToHarvest() );
}

List<Dataset> datasetToSchedule = searchApiDatasetReader.getDataset(lastHarvestDate);
Date lastHarvestDate = resolveLastHarvestDate();
List<Dataset> datasetsToSchedule = fetchDatasets(lastHarvestDate);

scheduleDatasetService.scheduleDatasetsForDownload(datasetToSchedule);
scheduleDatasetService.scheduleDatasetsForDownload(datasetsToSchedule);
datasetGenerationExecutor.runScheduledDatasets();
}

Expand Down Expand Up @@ -105,8 +97,55 @@ public static void main(String[] args) {

// 😈 wait for completion of scheduled tasks execution
// TODO ADD logic to await for the scheduled dataset completion
//
context.close();
System.exit(0);
}


/**
* Resolves the last harvest date to determine the datasets that need to be processed.
*
* @return The last harvest date retrieved from the specified file. Returns {@code null}
* if a forced harvest is specified or no previous harvest date is found.
*/
private Date resolveLastHarvestDate() {
if (settings.isForceHarvest()) {
LOG.info("Forced harvest of all datasets. datasetsToHarvest: {}", settings.getDatasetToHarvest());
return null;
}

Date lastHarvestDate = TaskletSupport.getLastHarvestDate(settings.getLastHarvestDateFile());
if (lastHarvestDate == null) {
LOG.info("No previous harvest date found, all datasets will be harvested.");
}
return lastHarvestDate;
}

/**
* Fetches datasets based on the provided last harvest date or a specific set of datasets
* specified in the application settings.
*
* If the application settings contain a non-blank list of datasets to harvest, only those
* datasets will be fetched. Otherwise, datasets modified after the provided last harvest
* date will be retrieved.
*
* @param lastHarvestDate The date used to filter datasets modified after this date. If null,
* all available datasets are considered.
* @return A list of {@code Dataset} objects representing the datasets retrieved from the
* Search API.
* @throws EuropeanaApiException If an error occurs while interacting with the Search API.
*/
private List<Dataset> fetchDatasets(Date lastHarvestDate) throws EuropeanaApiException {
if (StringUtils.isNotBlank(settings.getDatasetToHarvest())) {
List<String> datasetsToHarvest =
Arrays.stream(settings.getDatasetToHarvest().split(","))
.map(String::trim)
.toList();

LOG.info("Harvest specific datasets: {}", datasetsToHarvest);
return searchApiDatasetReader.getDataset(null, datasetsToHarvest);
}

return searchApiDatasetReader.getDataset(lastHarvestDate);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import eu.europeana.api.dataset.generation.model.JobParameter;
import eu.europeana.api.dataset.generation.model.ScheduledDataset;
import eu.europeana.api.dataset.generation.processor.DatasetDeletionTasklet;
import eu.europeana.api.dataset.generation.processor.TaskletSupport;
import eu.europeana.api.dataset.generation.reader.ScheduledDatasetDbReaderJdbc;
import eu.europeana.api.dataset.generation.reader.SearchApiDatasetReader;
import eu.europeana.api.dataset.generation.service.ScheduleDatasetService;
Expand Down Expand Up @@ -47,14 +48,12 @@

import javax.sql.DataSource;
import javax.xml.transform.TransformerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.io.File;
import java.nio.file.*;
import java.time.Instant;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static eu.europeana.api.dataset.generation.utils.AppConfigConstants.*;

Expand Down Expand Up @@ -193,7 +192,7 @@ public FileDeletionService getFileDeletionService() {
public DatasetDeletionTasklet getDeletionTasklet() {
return new DatasetDeletionTasklet(
settings.getSnapshotFile(),
settings.getCsvReportPath(),
getStatusReportCsvPath(),
applicationContext.getBean(SearchApiDatasetReader.class),
new FileDeletionService(settings.getDatasetsFolder())) ;
}
Expand Down Expand Up @@ -248,7 +247,7 @@ public DataSourceInitializer batchSchemaInitializer(DataSource dataSource) {
public DatasetReportListener getdDatasetReportListener() {
return new DatasetReportListener(
settings.isForceHarvest(),
getLastHarvestDate(),
TaskletSupport.getLastHarvestDate(settings.getLastHarvestDateFile()),
settings.getSnapshotFile(),
settings.getDatasetsFolder());
}
Expand All @@ -265,7 +264,7 @@ public ItemWriter<ScheduledDataset> getFlatFileItemWriter() {
FlatFileItemWriter<ScheduledDataset> writer = new FlatFileItemWriter<>();

writer.setName("DatasetStatusReport");
writer.setResource(new FileSystemResource(settings.getCsvReportPath()));
writer.setResource(new FileSystemResource(getStatusReportCsvPath()));
writer.setAppendAllowed(true);

writer.setHeaderCallback(w -> w.write(CSV_REPORT_HEADER));
Expand Down Expand Up @@ -305,24 +304,30 @@ public DelimitedLineAggregator<ScheduledDataset> lineAggregator(
return aggregator;
}


/**
* Retrieves the last harvest date from the specified file. The file is expected to contain
* a single ISO-8601 formatted date string. If the file cannot be read or contains invalid
* data, an error is logged and null is returned.
* Generates an absolute file path for a CSV report within the specified directory.
* The method ensures the generated file name does not overwrite any existing file
* by appending an incremented counter if a file with the same base name exists.
*
* @return the last harvest date as a {@link Date} object if successfully parsed, or null
* if an error occurs or the date is invalid.
* @return the absolute path of the generated CSV report file.
*/
@Bean(LAST_HARVEST_DATE_BEAN)
public Date getLastHarvestDate() {
try {
String content = Files.readString(Path.of(settings.getLastHarvestDateFile())).trim();
return Date.from(Instant.parse(content));
} catch (IOException e) {
LOG.error("Error reading last harvest date file - {}", e.getMessage());
@Bean(STATUS_REPORT_CSV_PATH_BEAN)
public String getStatusReportCsvPath() {
String baseName = "status_" +
LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy_MM_dd"));
String extension = ".csv";

File file = new File(settings.getDatasetsFolder() + baseName + extension);

int counter = 1;
while (file.exists()) {
file = new File(settings.getDatasetsFolder() + baseName + "_" + counter + extension);
counter++;
}
return null;
}

LOG.info("Status report CSV file path: {}", file.getAbsolutePath());
return file.getAbsolutePath();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
* @since 23 Feb 2026
*/
@Configuration
@PropertySource("classpath:dataset.generation.properties")
@PropertySource(value = "classpath:dataset.generation.user.properties", ignoreResourceNotFound = true)
@PropertySource(
value = {"classpath:dataset.generation.properties", "classpath:dataset.generation.user.properties"},
ignoreResourceNotFound = true)
public class GeneratorSettings {

@Value("${search.api.url}")
Expand Down Expand Up @@ -94,19 +95,18 @@ public String getSnapshotFile() {
return getDatasetsFolder() + snapshotFile;
}

public String getCsvReportPath() {
return getDatasetsFolder() +
"status_" +
LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy_MM_dd")) +
".csv";
}

public String getFailedSetsFile() {
return getDatasetsFolder() + "failed-sets.txt";
}

/**
* Determines if the harvest should be forced for all datasets.
* Checks whether the dataset to harvest is set to "ALL", ignoring case.
*
* @return true if the dataset to harvest is "ALL", false otherwise.
*/
public boolean isForceHarvest() {
return StringUtils.equalsIgnoreCase(getDatasetToHarvest(), "ALL");
return StringUtils.isNotEmpty(getDatasetToHarvest()) && StringUtils.equalsIgnoreCase(getDatasetToHarvest(), "ALL");
}

public String getLastHarvestDateFile() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ public interface DataFormatter {
* The method transforms the metadata into a specific output format and streams the formatted
* content to the given output stream.
*
* @param recordId the identifier of the record being written
* @param metadata the {@link Document} containing metadata to be formatted and written
* @param zipOut the {@link OutputStream} to which the formatted metadata is written
* @throws EuropeanaApiException if an error occurs during the metadata transformation or writing process
*/
void write(Document metadata, OutputStream zipOut) throws EuropeanaApiException;
void write(String recordId, Document metadata, OutputStream zipOut) throws EuropeanaApiException;

/**
* Retrieves the file extension associated with the output format handled by this formatter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@
import org.apache.jena.rdf.model.Model;
import org.apache.jena.rdf.model.ModelFactory;
import org.apache.jena.rdfxml.xmlinput1.DOM2Model;
import org.apache.jena.shared.JenaException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.w3c.dom.Document;
import org.xml.sax.SAXParseException;

import javax.xml.transform.TransformerFactory;

/**
Expand All @@ -40,7 +39,7 @@ public record TurtleFormatter(TransformerFactory transformerFactory) implements

// TODO find a solution to not close zip
@Override
public void write(Document metadata, OutputStream zipOut) throws EuropeanaApiException {
public void write(String recordId, Document metadata, OutputStream zipOut) throws EuropeanaApiException {
OutputStream safeOut = new FilterOutputStream(zipOut) {
@Override
public void close() throws IOException {
Expand All @@ -49,9 +48,16 @@ public void close() throws IOException {
};

try (TurtleRecordWriter writer = new TurtleRecordWriter(safeOut)) {
writer.write(toModel(metadata));
Model m = toModel(metadata);
if (m != null) {
writer.write(m);
} else {
LOG.error("Skipping record - {} , due to invalid data found - " , recordId);
}
} catch (IOException e) {
throw new DataFormatterException("Error writing the metadata in turtle format " + e.getMessage(), e);
} catch (NoSuchFieldException | IllegalAccessException e) {
LOG.error("Error disabling the errorForSpaceInURI field in ReaderRDFXML_ARP1 for the TurtleRecordWriter " + e.getMessage(), e);
}
}

Expand All @@ -69,7 +75,12 @@ public static Model toModel(Document doc) throws DataFormatterException {
DOM2Model dom2Model = DOM2Model.createD2M("", m);
dom2Model.load(doc);
return m;
} catch (SAXParseException e) {
} catch (Exception e) {
// for all invalid jena errors, skip those records and Log them
if (e instanceof JenaException) {
LOG.error("Invalid data found - " + e.getMessage(), e);
return null;
}
throw new DataFormatterException("Error converting document to Jena model - " + e.getMessage(), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public record XMLFormatter(TransformerFactory transformerFactory) implements DataFormatter {

@Override
public void write(Document metadata, OutputStream zipOut) throws EuropeanaApiException {
public void write(String recordId, Document metadata, OutputStream zipOut) throws EuropeanaApiException {
try {
Transformer transformer = transformerFactory.newTransformer();
transformer.transform(new DOMSource(metadata), new StreamResult(zipOut));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ public FileMetadata extractFileMetadata(String fileName) {
}


// TODO now with ATOMIC_SWAP , creation date will be updated as well everytime,
// so we probably need to remove that and only rely on modified_date
public class FileMetadata {
private final Date creationDate;
private final Date modifiedDate;
Expand Down
Loading
Loading