Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .DS_Store
Binary file not shown.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ target/
*.tmp
java_pid*
dump/test-basedir
.settings
44 changes: 33 additions & 11 deletions core/.project
Original file line number Diff line number Diff line change
@@ -1,13 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>core</name>
<buildSpec>
<buildCommand>
<name>org.scala-ide.sdt.core.scalabuilder</name>
</buildCommand>
</buildSpec>
<natures>
<nature>org.scala-ide.sdt.core.scalanature</nature>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>
<name>core</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.scala-ide.sdt.core.scalabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
<nature>org.scala-ide.sdt.core.scalanature</nature>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
<filteredResources>
<filter>
<id>1765562241624</id>
<name></name>
<type>30</type>
<matcher>
<id>org.eclipse.core.resources.regexFilterMatcher</id>
<arguments>node_modules|\.git|__CREATED_BY_JAVA_LANGUAGE_SERVER__</arguments>
</matcher>
</filter>
</filteredResources>
</projectDescription>
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import org.dbpedia.extraction.transform.Quad
import org.dbpedia.extraction.wikiparser._
import org.dbpedia.extraction.config.mappings.HomepageExtractorConfig
import org.dbpedia.extraction.ontology.Ontology
import org.dbpedia.extraction.util.Language
import org.dbpedia.extraction.util.{Language, DataQualityMonitor}
import org.dbpedia.iri.{IRISyntaxException, UriUtils}

import scala.language.reflectiveCalls
Expand All @@ -26,6 +26,9 @@ extends PageNodeExtractor
{
private val language = context.language.wikiCode

// Extraction quality monitor for logging and metrics
private val monitor = DataQualityMonitor.forExtractor("HomepageExtractor")

private val propertyNames = HomepageExtractorConfig.propertyNames(language)

private val official = HomepageExtractorConfig.official(language)
Expand All @@ -48,7 +51,10 @@ extends PageNodeExtractor

override def extract(page: PageNode, subjectUri: String): Seq[Quad] =
{
if(page.title.namespace != Namespace.Main) return Seq.empty
if(page.title.namespace != Namespace.Main) {
monitor.logSkipped(page.title.encoded, s"Not in main namespace: ${page.title.namespace}")
return Seq.empty
}

val list = collectProperties(page).filter(p => propertyNames.contains(p.key.toLowerCase)).flatMap {
NodeUtil.splitPropertyNode(_, splitPropertyNodeLinkStrict, true)
Expand Down Expand Up @@ -118,12 +124,34 @@ extends PageNodeExtractor
{
UriUtils.createURI(url) match{
case Success(u) => UriUtils.cleanLink(u) match{
case Some(c) => Seq(new Quad(context.language, DBpediaDatasets.Homepages, subjectUri, homepageProperty, c , node.sourceIri))
case None => Seq()
case Some(c) =>
monitor.logSuccess(subjectUri, 1)
Seq(new Quad(context.language, DBpediaDatasets.Homepages, subjectUri, homepageProperty, c , node.sourceIri))
case None =>
monitor.logInvalidData(
subjectUri,
"URL could not be cleaned",
data = Some(url)
)
Seq()
}
case Failure(f) => f match{
case _ : IRISyntaxException => Seq() // TODO: log
case _ => Seq()
case ex: IRISyntaxException =>
monitor.logInvalidData(
subjectUri,
"Malformed IRI syntax",
exception = Some(ex),
data = Some(url)
)
Seq()
case ex =>
monitor.logInvalidData(
subjectUri,
"Unexpected error creating URI",
exception = Some(ex),
data = Some(url)
)
Seq()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package org.dbpedia.extraction.util

import java.util.logging.{Level, Logger}
import java.util.concurrent.atomic.AtomicLong
import scala.collection.concurrent.TrieMap

/**
* Centralized monitoring and logging system for extraction quality and errors.
*
* Features:
* - Structured logging with context (extractor, page, error type)
* - Metrics collection for error rates
* - Thread-safe error counting
* - Export capabilities for failed extractions
*
* Usage:
* {{{
* val monitor = DataQualityMonitor.forExtractor("HomepageExtractor")
* monitor.logInvalidData("Einstein", "Invalid IRI: malformed URL", exception)
* monitor.getMetrics() // Get error statistics
* }}}
*/
object DataQualityMonitor {

private val logger = Logger.getLogger(classOf[DataQualityMonitor].getName)

// Global metrics storage (thread-safe)
private val errorCounts = new TrieMap[String, AtomicLong]()
private val errorDetails = new TrieMap[String, collection.mutable.ListBuffer[ExtractionError]]()

/**
* Create a monitor for a specific extractor
*/
def forExtractor(extractorName: String): DataQualityMonitor = {
new DataQualityMonitor(extractorName)
}

/**
* Get global extraction statistics
*/
def getGlobalMetrics(): Map[String, Long] = {
errorCounts.map { case (key, counter) => (key, counter.get()) }.toMap
}

/**
* Get detailed errors for analysis
*/
def getErrorDetails(errorType: String, limit: Int = 100): List[ExtractionError] = {
errorDetails.get(errorType) match {
case Some(errors) => errors.take(limit).toList
case None => List.empty
}
}
Comment on lines +25 to +30
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Potential thread-safety issue when reading error details.

errors.take(limit) operates on a mutable ListBuffer that may be concurrently modified by recordError. Consider synchronizing the read or returning a snapshot.

   def getErrorDetails(errorType: String, limit: Int = 100): List[ExtractionError] = {
     errorDetails.get(errorType) match {
-      case Some(errors) => errors.take(limit).toList
+      case Some(errors) => errors.synchronized { errors.take(limit).toList }
       case None => List.empty
     }
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def getErrorDetails(errorType: String, limit: Int = 100): List[ExtractionError] = {
errorDetails.get(errorType) match {
case Some(errors) => errors.take(limit).toList
case None => List.empty
}
}
def getErrorDetails(errorType: String, limit: Int = 100): List[ExtractionError] = {
errorDetails.get(errorType) match {
case Some(errors) => errors.synchronized { errors.take(limit).toList }
case None => List.empty
}
}
🤖 Prompt for AI Agents
In core/src/main/scala/org/dbpedia/extraction/util/DataQualityMonitor.scala
around lines 48 to 53, the method getErrorDetails reads from a mutable
ListBuffer without synchronization which can race with concurrent recordError
updates; change the implementation to obtain a thread-safe snapshot by
synchronizing on the ListBuffer (or the map entry) while copying its contents
and then return the copy (e.g., synchronize around errors and call
errors.take(limit).toList or errors.toList.take(limit)), or replace the mutable
ListBuffer with a concurrent/immutable collection and return its immutable
snapshot.


/**
* Export errors to CSV format for analysis
*/
def exportToCsv(errorType: String, limit: Int = 1000): String = {
val errors = getErrorDetails(errorType, limit)
val header = "Extractor,PageTitle,ErrorMessage,Timestamp\n"
val rows = errors.map(e =>
s"${e.extractorName},${e.pageTitle},${e.message.replaceAll(",", ";")},${e.timestamp}"
).mkString("\n")
header + rows
}
Comment on lines +32 to +39
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

CSV export is vulnerable to malformed output.

Only message is escaped (commas replaced with semicolons), but extractorName and pageTitle could also contain commas, newlines, or quotes that would corrupt the CSV structure.

Consider proper RFC 4180 CSV escaping:

   def exportToCsv(errorType: String, limit: Int = 1000): String = {
     val errors = getErrorDetails(errorType, limit)
     val header = "Extractor,PageTitle,ErrorMessage,Timestamp\n"
+    def escapeCsvField(field: String): String = {
+      if (field.contains(",") || field.contains("\"") || field.contains("\n"))
+        "\"" + field.replace("\"", "\"\"") + "\""
+      else field
+    }
     val rows = errors.map(e =>
-      s"${e.extractorName},${e.pageTitle},${e.message.replaceAll(",", ";")},${e.timestamp}"
+      s"${escapeCsvField(e.extractorName)},${escapeCsvField(e.pageTitle)},${escapeCsvField(e.message)},${e.timestamp}"
     ).mkString("\n")
     header + rows
   }
🤖 Prompt for AI Agents
In core/src/main/scala/org/dbpedia/extraction/util/DataQualityMonitor.scala
around lines 32 to 39, the CSV export currently only sanitizes the message field
and so extractorName and pageTitle (and message newlines/quotes) can break CSV
structure; change the implementation to produce RFC4180-compliant CSV by either:
1) using a CSV library (e.g., OpenCSV or Apache Commons CSV) to build rows
safely, or 2) implementing RFC4180 escaping for each field — wrap every field in
double quotes, double any internal double quotes, and convert newlines to \r\n
(or preserve them inside quoted fields) before joining with commas; ensure the
header remains and apply the escaping to extractorName, pageTitle, and message
(and timestamp if necessary).


/**
* Reset all metrics (useful for testing)
*/
def reset(): Unit = {
errorCounts.clear()
errorDetails.clear()
}
}

/**
* Monitor instance for a specific extractor
*/
class DataQualityMonitor(val extractorName: String) {

private val logger = Logger.getLogger(s"org.dbpedia.extraction.monitor.$extractorName")

/**
* Log invalid data with context
*
* @param pageTitle The Wikipedia page being processed
* @param reason Description of why the data is invalid
* @param exception Optional exception that caused the error
* @param data Optional invalid data for debugging
*/
def logInvalidData(
pageTitle: String,
reason: String,
exception: Option[Throwable] = None,
data: Option[String] = None
): Unit = {
val message = buildMessage(pageTitle, reason, data)

// Log with appropriate level
exception match {
case Some(ex) => logger.log(Level.WARNING, message, ex)
case None => logger.warning(message)
}

// Record metrics
recordError(pageTitle, reason, exception)
}

/**
* Log skipped extraction with reason
*/
def logSkipped(pageTitle: String, reason: String): Unit = {
logger.fine(s"[$extractorName] Skipped '$pageTitle': $reason")
}

/**
* Log successful extraction with statistics
*/
def logSuccess(pageTitle: String, triplesCount: Int): Unit = {
logger.fine(s"[$extractorName] Extracted $triplesCount triples from '$pageTitle'")
}

/**
* Get metrics for this extractor
*/
def getMetrics(): Map[String, Long] = {
DataQualityMonitor.errorCounts
.filter { case (key, _) => key.startsWith(s"$extractorName:") }
.map { case (key, counter) => (key, counter.get()) }
.toMap
}

/**
* Get total error count for this extractor
*/
def getTotalErrors(): Long = {
getMetrics().values.sum
}

// Private helper methods

private def buildMessage(pageTitle: String, reason: String, data: Option[String]): String = {
val dataStr = data.map(d => s" | Data: ${truncate(d, 200)}").getOrElse("")
s"[$extractorName] Invalid data in '$pageTitle': $reason$dataStr"
}

private def recordError(pageTitle: String, reason: String, exception: Option[Throwable]): Unit = {
val errorType = s"$extractorName:${categorizeError(reason, exception)}"

// Increment counter
DataQualityMonitor.errorCounts
.getOrElseUpdate(errorType, new AtomicLong(0))
.incrementAndGet()

// Store details (limit to prevent memory issues)
val errorDetail = ExtractionError(
extractorName = extractorName,
pageTitle = pageTitle,
message = reason,
exceptionType = exception.map(_.getClass.getSimpleName),
timestamp = System.currentTimeMillis()
)

DataQualityMonitor.errorDetails
.getOrElseUpdate(errorType, collection.mutable.ListBuffer.empty)
.synchronized {
val buffer = DataQualityMonitor.errorDetails(errorType)
if (buffer.size < 10000) { // Limit storage
buffer += errorDetail
}
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Simplify synchronization and fix redundant buffer lookup.

The buffer is fetched via getOrElseUpdate, then re-fetched inside the synchronized block. Use the buffer returned by getOrElseUpdate directly.

-    DataQualityMonitor.errorDetails
-      .getOrElseUpdate(errorType, collection.mutable.ListBuffer.empty)
-      .synchronized {
-        val buffer = DataQualityMonitor.errorDetails(errorType)
-        if (buffer.size < 10000) { // Limit storage
-          buffer += errorDetail
-        }
-      }
+    val buffer = DataQualityMonitor.errorDetails
+      .getOrElseUpdate(errorType, collection.mutable.ListBuffer.empty)
+    buffer.synchronized {
+      if (buffer.size < 10000) { // Limit storage
+        buffer += errorDetail
+      }
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
DataQualityMonitor.errorDetails
.getOrElseUpdate(errorType, collection.mutable.ListBuffer.empty)
.synchronized {
val buffer = DataQualityMonitor.errorDetails(errorType)
if (buffer.size < 10000) { // Limit storage
buffer += errorDetail
}
}
}
val buffer = DataQualityMonitor.errorDetails
.getOrElseUpdate(errorType, collection.mutable.ListBuffer.empty)
buffer.synchronized {
if (buffer.size < 10000) { // Limit storage
buffer += errorDetail
}
}
}
🤖 Prompt for AI Agents
In core/src/main/scala/org/dbpedia/extraction/util/DataQualityMonitor.scala
around lines 164 to 172, the code calls getOrElseUpdate to obtain a buffer but
then re-reads the map inside the synchronized block; replace that by storing the
result of getOrElseUpdate in a local val (e.g. val buffer =
DataQualityMonitor.errorDetails.getOrElseUpdate(errorType,
collection.mutable.ListBuffer.empty)) and synchronize on that buffer
(buffer.synchronized { if (buffer.size < 10000) buffer += errorDetail }) so you
use the retrieved buffer directly and avoid the redundant map lookup.


private def categorizeError(reason: String, exception: Option[Throwable]): String = {
exception match {
case Some(ex) => ex.getClass.getSimpleName
case None if reason.toLowerCase.contains("invalid") => "InvalidData"
case None if reason.toLowerCase.contains("malformed") => "MalformedData"
case None if reason.toLowerCase.contains("missing") => "MissingData"
case None => "Other"
}
}

private def truncate(str: String, maxLength: Int): String = {
if (str.length <= maxLength) str
else str.substring(0, maxLength) + "..."
}
}

/**
* Case class representing an extraction error
*/
case class ExtractionError(
extractorName: String,
pageTitle: String,
message: String,
exceptionType: Option[String],
timestamp: Long
)
Loading
Loading