Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/config/yaml.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Our pipeline can ingest .csv, .txt, or .json data from an input location. See th
#### Fields

- `storage` **StorageConfig**
- `type` **file|blob|cosmosdb** - The storage type to use. Default=`file`
- `type` **FileStorage|AzureBlobStorage|AzureCosmosStorage** - The storage type to use. Default=`FileStorage`
Comment thread
dworthen marked this conversation as resolved.
Outdated
- `base_dir` **str** - The base directory to write output artifacts to, relative to the root.
- `connection_string` **str** - (blob/cosmosdb only) The Azure Storage connection string.
- `container_name` **str** - (blob/cosmosdb only) The Azure Storage container name.
Expand Down Expand Up @@ -115,7 +115,7 @@ This section controls the storage mechanism used by the pipeline used for export

#### Fields

- `type` **file|memory|blob|cosmosdb** - The storage type to use. Default=`file`
- `type` **FileStorage|AzureBlobStorage|AzureCosmosStorage** - The storage type to use. Default=`FileStorage`
- `base_dir` **str** - The base directory to write output artifacts to, relative to the root.
- `connection_string` **str** - (blob/cosmosdb only) The Azure Storage connection string.
- `container_name` **str** - (blob/cosmosdb only) The Azure Storage container name.
Expand All @@ -128,7 +128,7 @@ The section defines a secondary storage location for running incremental indexin

#### Fields

- `type` **file|memory|blob|cosmosdb** - The storage type to use. Default=`file`
- `type` **FileStorage|AzureBlobStorage|AzureCosmosStorage** - The storage type to use. Default=`FileStorage`
- `base_dir` **str** - The base directory to write output artifacts to, relative to the root.
- `connection_string` **str** - (blob/cosmosdb only) The Azure Storage connection string.
- `container_name` **str** - (blob/cosmosdb only) The Azure Storage container name.
Expand Down
14 changes: 14 additions & 0 deletions packages/graphrag-storage/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# GraphRAG Storage

```python
from graphrag_storage import StorageConfig, create_storage
from graphrag_storage.file_storage import FileStorage

storage = create_storage(
StorageConfig(
type="FileStorage", # or FileStorage.__name__
base_dir="output"
)
)

```
15 changes: 15 additions & 0 deletions packages/graphrag-storage/graphrag_storage/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License

"""The GraphRAG Storage package."""

from graphrag_storage.storage import Storage
from graphrag_storage.storage_config import StorageConfig
from graphrag_storage.storage_factory import create_storage, register_storage

__all__ = [
"Storage",
"StorageConfig",
"create_storage",
"register_storage",
]
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License

"""Azure Blob Storage implementation of PipelineStorage."""
"""Azure Blob Storage implementation of Storage."""

import logging
import re
Expand All @@ -12,53 +12,65 @@
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient

from graphrag.storage.pipeline_storage import (
PipelineStorage,
from graphrag_storage.storage import (
Storage,
get_timestamp_formatted_with_local_tz,
)

logger = logging.getLogger(__name__)


class BlobPipelineStorage(PipelineStorage):
class AzureBlobStorage(Storage):
"""The Blob-Storage implementation."""

_connection_string: str | None
_container_name: str
_base_dir: str | None
_encoding: str
_storage_account_blob_url: str | None
_blob_service_client: BlobServiceClient
_storage_account_name: str | None

def __init__(self, **kwargs: Any) -> None:
def __init__(
self,
base_dir: str | None = None,
connection_string: str | None = None,
storage_account_blob_url: str | None = None,
container_name: str | None = None,
encoding: str = "utf-8",
**kwargs: Any,
Comment thread
dworthen marked this conversation as resolved.
) -> None:
"""Create a new BlobStorage instance."""
connection_string = kwargs.get("connection_string")
storage_account_blob_url = kwargs.get("storage_account_blob_url")
base_dir = kwargs.get("base_dir")
container_name = kwargs["container_name"]
if container_name is None:
msg = "No container name provided for blob storage."
raise ValueError(msg)
if connection_string is None and storage_account_blob_url is None:
msg = "No storage account blob url provided for blob storage."
msg = "AzureBlobStorage requires either a connection_string or storage_account_blob_url to be specified."
logger.error(msg)
raise ValueError(msg)

if connection_string is not None and storage_account_blob_url is not None:
msg = "AzureBlobStorage requires only one of connection_string or storage_account_blob_url to be specified, not both."
logger.error(msg)
raise ValueError(msg)

if container_name is None:
msg = "AzureBlobStorage requires a container_name to be specified."
logger.error(msg)
raise ValueError(msg)

_validate_blob_container_name(container_name)

logger.info(
"Creating blob storage at [%s] and base_dir [%s]", container_name, base_dir
)
if connection_string:
self._blob_service_client = BlobServiceClient.from_connection_string(
connection_string
)
else:
if storage_account_blob_url is None:
msg = "Either connection_string or storage_account_blob_url must be provided."
raise ValueError(msg)

elif storage_account_blob_url:
self._blob_service_client = BlobServiceClient(
account_url=storage_account_blob_url,
credential=DefaultAzureCredential(),
)
self._encoding = kwargs.get("encoding", "utf-8")
self._encoding = encoding
self._container_name = container_name
self._connection_string = connection_string
self._base_dir = base_dir
Expand Down Expand Up @@ -208,12 +220,12 @@ async def delete(self, key: str) -> None:
async def clear(self) -> None:
"""Clear the cache."""

def child(self, name: str | None) -> "PipelineStorage":
def child(self, name: str | None) -> "Storage":
"""Create a child storage instance."""
if name is None:
return self
path = str(Path(self._base_dir) / name) if self._base_dir else name
return BlobPipelineStorage(
return AzureBlobStorage(
connection_string=self._connection_string,
container_name=self._container_name,
encoding=self._encoding,
Expand Down Expand Up @@ -245,7 +257,7 @@ async def get_creation_date(self, key: str) -> str:
return ""


def validate_blob_container_name(container_name: str):
def _validate_blob_container_name(container_name: str) -> None:
Comment thread
dworthen marked this conversation as resolved.
"""
Check if the provided blob container name is valid based on Azure rules.

Expand All @@ -267,32 +279,25 @@ def validate_blob_container_name(container_name: str):
"""
# Check the length of the name
if len(container_name) < 3 or len(container_name) > 63:
return ValueError(
f"Container name must be between 3 and 63 characters in length. Name provided was {len(container_name)} characters long."
)
msg = f"Container name must be between 3 and 63 characters in length. Name provided was {len(container_name)} characters long."
raise ValueError(msg)

# Check if the name starts with a letter or number
if not container_name[0].isalnum():
return ValueError(
f"Container name must start with a letter or number. Starting character was {container_name[0]}."
)
msg = f"Container name must start with a letter or number. Starting character was {container_name[0]}."
raise ValueError(msg)

# Check for valid characters (letters, numbers, hyphen) and lowercase letters
if not re.match(r"^[a-z0-9-]+$", container_name):
return ValueError(
f"Container name must only contain:\n- lowercase letters\n- numbers\n- or hyphens\nName provided was {container_name}."
)
msg = f"Container name must only contain:\n- lowercase letters\n- numbers\n- or hyphens\nName provided was {container_name}."
raise ValueError(msg)

# Check for consecutive hyphens
if "--" in container_name:
return ValueError(
f"Container name cannot contain consecutive hyphens. Name provided was {container_name}."
)
msg = f"Container name cannot contain consecutive hyphens. Name provided was {container_name}."
raise ValueError(msg)

# Check for hyphens at the end of the name
if container_name[-1] == "-":
return ValueError(
f"Container name cannot end with a hyphen. Name provided was {container_name}."
)

return True
msg = f"Container name cannot end with a hyphen. Name provided was {container_name}."
raise ValueError(msg)
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@
from azure.cosmos.exceptions import CosmosResourceNotFoundError
from azure.cosmos.partition_key import PartitionKey
from azure.identity import DefaultAzureCredential

from graphrag.logger.progress import Progress
from graphrag.storage.pipeline_storage import (
PipelineStorage,

from graphrag_storage.storage import (
Storage,
get_timestamp_formatted_with_local_tz,
)

logger = logging.getLogger(__name__)


class CosmosDBPipelineStorage(PipelineStorage):
class AzureCosmosStorage(Storage):
"""The CosmosDB-Storage Implementation."""

_cosmos_client: CosmosClient
Expand All @@ -39,28 +39,40 @@ class CosmosDBPipelineStorage(PipelineStorage):
_encoding: str
_no_id_prefixes: list[str]

def __init__(self, **kwargs: Any) -> None:
def __init__(
self,
base_dir: str | None = None,
container_name: str | None = None,
connection_string: str | None = None,
cosmosdb_account_url: str | None = None,
**kwargs: Any,
) -> None:
"""Create a CosmosDB storage instance."""
logger.info("Creating cosmosdb storage")
cosmosdb_account_url = kwargs.get("cosmosdb_account_url")
connection_string = kwargs.get("connection_string")
database_name = kwargs["base_dir"]
container_name = kwargs["container_name"]
if not database_name:
msg = "No base_dir provided for database name"
database_name = base_dir
if database_name is None:
msg = "CosmosDB Storage requires a base_dir to be specified. This is used as the database name."
logger.error(msg)
raise ValueError(msg)

if connection_string is None and cosmosdb_account_url is None:
msg = "connection_string or cosmosdb_account_url is required."
msg = "CosmosDB Storage requires either a connection_string or cosmosdb_account_url to be specified."
logger.error(msg)
raise ValueError(msg)

if connection_string is not None and cosmosdb_account_url is not None:
msg = "CosmosDB Storage requires either a connection_string or cosmosdb_account_url to be specified, not both."
logger.error(msg)
raise ValueError(msg)

if container_name is None:
msg = "CosmosDB Storage requires a container_name to be specified."
logger.error(msg)
raise ValueError(msg)

if connection_string:
self._cosmos_client = CosmosClient.from_connection_string(connection_string)
else:
if cosmosdb_account_url is None:
msg = (
"Either connection_string or cosmosdb_account_url must be provided."
)
raise ValueError(msg)
elif cosmosdb_account_url:
self._cosmos_client = CosmosClient(
url=cosmosdb_account_url,
credential=DefaultAzureCredential(),
Expand Down Expand Up @@ -307,7 +319,7 @@ def keys(self) -> list[str]:
msg = "CosmosDB storage does yet not support listing keys."
raise NotImplementedError(msg)

def child(self, name: str | None) -> PipelineStorage:
def child(self, name: str | None) -> "Storage":
"""Create a child storage instance."""
return self

Expand Down
Loading
Loading