Blob Storage#

Blob storage is used for storing large and/or ephemeral data that is not suitable for other persistence stores. This includes: - Large permanent data files such as full texts of documents. - Temporary files that are generated during processing but not needed long-term. - Large payloads that are not suitable for REST APIs, such as batch processes.

Interface#

External actors such as robots interact with blob storage purely through GET/PUT signed URLs, they are not granted direct access to the storage itself. These have short-lived signatures, typically valid for one hour, but can be refreshed by re-requesting the resource from the Repository API.

The repository codebase interacts with blob storage through the BlobStorageRepository interface, which provides methods for uploading and downloading files, as well as managing file metadata. This interface is primarily a streaming interface, allowing for efficient handling of large files for upload and download without loading them entirely into memory. When getting, files are streamed line-by-line to the caller, and when putting, files can either be streamed with the FileStream class or compiled into an in-memory BytesIO object for simpler cases.

Representation#

Blob files are represented in the repository as a BlobStorageFile object.

Models for handling files in blob storage.

class app.persistence.blob.models.BlobSignedUrlType(*values)[source]#

Blob Storage interaction types.

DOWNLOAD = 'download'[source]#
UPLOAD = 'upload'[source]#
pydantic model app.persistence.blob.models.BlobStorageFile[source]#

Model to represent Blob Storage files.

Show Entity Relationship Diagram
digraph "Entity Relationship Diagram created by erdantic" {
   graph [fontcolor=gray66,
      fontname="Times New Roman,Times,Liberation Serif,serif",
      fontsize=9,
      nodesep=0.5,
      rankdir=LR,
      ranksep=1.5
   ];
   node [fontname="Times New Roman,Times,Liberation Serif,serif",
      fontsize=14,
      label="\N",
      shape=plain
   ];
   edge [dir=both];
   "app.persistence.blob.models.BlobStorageFile"   [label=<<table border="0" cellborder="1" cellspacing="0"><tr><td port="_root" colspan="2"><b>BlobStorageFile</b></td></tr><tr><td>location</td><td port="location">BlobStorageLocation</td></tr><tr><td>container</td><td port="container">str</td></tr><tr><td>path</td><td port="path">str</td></tr><tr><td>filename</td><td port="filename">str</td></tr></table>>,
      tooltip="app.persistence.blob.models.BlobStorageFile&#xA;&#xA;Model to represent Blob Storage files.&#xA;"];
}

Show JSON schema
{
   "title": "BlobStorageFile",
   "description": "Model to represent Blob Storage files.",
   "type": "object",
   "properties": {
      "location": {
         "$ref": "#/$defs/BlobStorageLocation",
         "description": "The location of the blob storage."
      },
      "container": {
         "description": "The name of the container in Azure Blob Storage.",
         "pattern": "^[^/]*$",
         "title": "Container",
         "type": "string"
      },
      "path": {
         "description": "The path to the file in Azure Blob Storage.",
         "title": "Path",
         "type": "string"
      },
      "filename": {
         "description": "The name of the file in Azure Blob Storage.",
         "pattern": "^[^/]*$",
         "title": "Filename",
         "type": "string"
      }
   },
   "$defs": {
      "BlobStorageLocation": {
         "description": "Blob Storage locations.",
         "enum": [
            "azure",
            "minio"
         ],
         "title": "BlobStorageLocation",
         "type": "string"
      }
   },
   "required": [
      "location",
      "container",
      "path",
      "filename"
   ]
}

Config:
  • frozen: bool = True

Fields:
field container: str [Required][source]#

The name of the container in Azure Blob Storage.

Constraints:
  • pattern = ^[^/]*$

field filename: str [Required][source]#

The name of the file in Azure Blob Storage.

Constraints:
  • pattern = ^[^/]*$

field location: BlobStorageLocation [Required][source]#

The location of the blob storage.

field path: str [Required][source]#

The path to the file in Azure Blob Storage.

async classmethod from_sql(sql: str) Self[source]#

Populate the model from a SQL representation.

async to_sql() str[source]#

Return the SQL persistence representation of the file.

property content_type: str[source]#

Return the content type of the file based on its extension.

class app.persistence.blob.models.BlobStorageLocation(*values)[source]#

Blob Storage locations.

AZURE = 'azure'[source]#
MINIO = 'minio'[source]#

Client#

Each blob storage client implements the GenericBlobStorageClient interface, which defines methods for interacting with blob storage.

class app.persistence.blob.client.GenericBlobStorageClient[source]#

Abstract base class for blob storage clients.

This class defines the interface for blob storage operations.

abstractmethod async generate_signed_url(file: BlobStorageFile, interaction_type: BlobSignedUrlType) str[source]#

Generate a signed URL for the file in blob storage.

Parameters:
  • file (BlobStorageFile) – The file for which to generate the signed URL.

  • interaction_type (BlobSignedUrlType) – The type of interaction (upload or download).

Returns:

The signed URL for the file.

Return type:

str

abstractmethod async stream_file(file: BlobStorageFile) AsyncGenerator[str, None][source]#

Stream a file line-by-line from the blob storage.

Parameters:

file (BlobStorageFile) – The file to stream.

Returns:

An async generator that yields lines from the file.

Return type:

AsyncGenerator[str, None]

abstractmethod async upload_file(content: FileStream | BytesIO, file: BlobStorageFile) None[source]#

Upload a file to the blob storage.

Parameters:

Blob Repository#

The codebase interacts with the blob clients through the BlobStorageRepository interface:

class app.persistence.blob.repository.BlobRepository[source]#

Repository for managing files in blob storage.

async get_signed_url(file: BlobStorageFile, interaction_type: BlobSignedUrlType) HttpUrl[source]#

Generate a signed URL for a file in Blob Storage.

Parameters:
  • file (BlobStorageFile) – The file for which to generate the signed URL.

  • interaction_type (BlobSignedUrlType) – The type of interaction (upload or download).

Returns:

The signed URL for the file.

Return type:

HttpUrl

stream_file_from_blob_storage(file: BlobStorageFile) AsyncGenerator[AsyncIterator[str], None][source]#

Stream a file line-by-line from Blob Storage.

Usage:

async with blob_repo.stream_file_from_blob_storage(file) as stream:
    async for line in stream:
        print(line)
Parameters:

file (BlobStorageFile) – The file to stream.

Returns:

An async generator that yields lines one at a time from the file.

Return type:

AsyncGenerator[str, None]

Yield:

Lines from the file, one at a time.

Return type:

Iterator[AsyncGenerator[AsyncIterator[str], None]]

async upload_file_to_blob_storage(content: FileStream | BytesIO, path: str, filename: str, container: str | None = None, location: BlobStorageLocation | None = None) BlobStorageFile[source]#

Upload a file to Blob Storage.

See app.persistence.blob.stream.FileStream for examples of how to create and use a FileStream object.

Parameters:
Returns:

The information of the uploaded file.

Return type:

BlobStorageFile

File Content#

When getting, the file content is streamed line-by-line to the caller, allowing for efficient handling of large files without loading them entirely into memory.

When putting, files can either be streamed with the FileStream class or compiled into an in-memory BytesIO object for simpler cases.

class app.domain.base.SDKJsonlMixin[source]#

Mixin for SDK JSONL attributes.

This flags that the model is used by the SDK to marshal data in and out of JSONL files.

model_config: ClassVar[ConfigDict] = {}[source]#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

abstractmethod async to_sdk() _JsonlFileInputMixIn[source]#

Convert the model to an SDK JSONL input mixin.

class app.persistence.blob.stream.FileStream(fn: Callable[[...], Awaitable[Streamable]] | None = None, fn_kwargs: Sequence[dict[str, Any]] | None = None, generator: AsyncGenerator[Streamable, None] | None = None)[source]#

A helper class to convert a service function or generator into an async file stream.

This allows memory-efficient streaming of data from a function that returns a list of objects that inherit from app.domain.base.SDKJsonlMixin, which identifies domain models that can be converted to JSONL format, or from an async generator that yields strings.

Example usage:

class DomainModel(SDKJsonlMixin):
    ...

async def get_chunk(ids: list[UUID4], other_arg: str) -> list[DomainModel]:
    return repository.get_domain_models(ids, other_arg)

file_stream = FileStream(fn=get_chunk, fn_kwargs=[
    {"ids": [id1, id2], "other_arg": "value"},
    {"ids": [id3, id4], "other_arg": "value"}
])
blob_repository.upload_file_to_blob_storage(
    content=file_stream,
    path="path/to/file.jsonl",
    filename="file.jsonl",
)
async read() BytesIO[source]#

Read all data from the FileStream into memory and return as a file-like object.

For implementations where async generators are not supported we read all data into memory and return a BytesIO object. Currently this applies only to MinIO which is only used for local and testing purposes. If this becomes a problem we can also look into composing blobs: https://min.io/docs/minio/linux/developers/python/API.html#compose_object

Returns:

A BytesIO object containing all the data.

Return type:

BytesIO

async stream() AsyncGenerator[bytes, None][source]#

Stream data from the FileStream’s function or generator.

Returns:

An async generator yielding bytes.

Return type:

AsyncGenerator[bytes, None]

Yield:

The next chunk of data.

Return type:

Iterator[AsyncGenerator[bytes, None]]

Implementations#

Azure#

Azure Blob Storage is used for application deployments. At present there is one container, destiny-repository-<env>-ops, which is used only for ephemeral operational data. The file tree is as below:

destiny-repository-<env>-ops/
    ├── batch_enhancement_result/
    │   └── <batch_request_id>_robot.jsonl - the enhancement result as published by the robot to the repository
    │   └── <batch_request_id>_repo.jsonl  - the validation result of importing the above as published by the repository
    ├── batch_enhancement_request_reference_data/
    │   └── <batch_request_id>.jsonl - the reference data provided to the robot for the batch enhancement request

MinIO#

MinIO is used for testing and local development. It is S3-compatible, if an AWS implementation is ever desired. However the current implementation is synchronous and so does not utilise the memory efficiency of the FileStream interface.