Blob Storage#

Blob storage is used for storing large and/or ephemeral data that is not suitable for other persistence stores. This includes: - Large permanent data files such as full texts of documents. - Temporary files that are generated during processing but not needed long-term. - Large payloads that are not suitable for REST APIs, such as the enhancement process.

Interface#

External actors such as robots interact with blob storage purely through GET/PUT signed URLs, they are not granted direct access to the storage itself. These have short-lived signatures, typically valid for one hour, but can be refreshed by re-requesting the resource from the Repository API.

The repository codebase interacts with blob storage through the BlobStorageRepository interface, which provides methods for uploading and downloading files, as well as managing file metadata. This interface is primarily a streaming interface, allowing for efficient handling of large files for upload and download without loading them entirely into memory. When getting, files are streamed line-by-line to the caller, and when putting, files can either be streamed with the FileStream class or compiled into an in-memory BytesIO object for simpler cases.

Representation#

Blob files are represented in the repository as a BlobStorageFile object.

Models for handling files in blob storage.

class app.persistence.blob.models.BlobContainer(*values)[source]#

Blob containers.

FULL_TEXTS = 'full_texts'[source]#
OPERATIONS = 'operations'[source]#
pydantic model app.persistence.blob.models.BlobCopyResult[source]#

Model to represent the result of copying a blob storage file.

Show Entity Relationship Diagram
digraph "Entity Relationship Diagram created by erdantic" {
   graph [fontcolor=gray66,
      fontname="Times New Roman,Times,Liberation Serif,serif",
      fontsize=9,
      nodesep=0.5,
      rankdir=LR,
      ranksep=1.5
   ];
   node [fontname="Times New Roman,Times,Liberation Serif,serif",
      fontsize=14,
      label="\N",
      shape=plain
   ];
   edge [dir=both];
   "app.persistence.blob.models.BlobCopyResult"   [label=<<table border="0" cellborder="1" cellspacing="0"><tr><td port="_root" colspan="2"><b>BlobCopyResult</b></td></tr><tr><td>source</td><td port="source">BlobStorageFile</td></tr><tr><td>destination</td><td port="destination">BlobStorageFile</td></tr><tr><td>byte_size</td><td port="byte_size">int</td></tr><tr><td>sha256_checksum</td><td port="sha256_checksum">str</td></tr></table>>,
      tooltip="app.persistence.blob.models.BlobCopyResult&#xA;&#xA;Model to represent the result of copying a blob storage file.&#xA;"];
   "app.persistence.blob.models.BlobStorageFile"   [label=<<table border="0" cellborder="1" cellspacing="0"><tr><td port="_root" colspan="2"><b>BlobStorageFile</b></td></tr><tr><td>location</td><td port="location">BlobStorageLocation</td></tr><tr><td>container</td><td port="container">str</td></tr><tr><td>path</td><td port="path">str</td></tr><tr><td>filename</td><td port="filename">str</td></tr></table>>,
      tooltip="app.persistence.blob.models.BlobStorageFile&#xA;&#xA;Model to represent Blob Storage files.&#xA;"];
   "app.persistence.blob.models.BlobCopyResult":destination:e -> "app.persistence.blob.models.BlobStorageFile":_root:w   [arrowhead=noneteetee,
      arrowtail=nonenone];
   "app.persistence.blob.models.BlobCopyResult":source:e -> "app.persistence.blob.models.BlobStorageFile":_root:w   [arrowhead=noneteetee,
      arrowtail=nonenone];
}

Show JSON schema
{
   "title": "BlobCopyResult",
   "description": "Model to represent the result of copying a blob storage file.",
   "type": "object",
   "properties": {
      "source": {
         "$ref": "#/$defs/BlobStorageFile",
         "description": "The source file that was copied."
      },
      "destination": {
         "$ref": "#/$defs/BlobStorageFile",
         "description": "The destination file that was copied to."
      },
      "byte_size": {
         "description": "The size of the copied file in bytes.",
         "title": "Byte Size",
         "type": "integer"
      },
      "sha256_checksum": {
         "description": "The SHA256 checksum of the copied file.",
         "title": "Sha256 Checksum",
         "type": "string"
      }
   },
   "$defs": {
      "BlobStorageFile": {
         "description": "Model to represent Blob Storage files.",
         "properties": {
            "location": {
               "$ref": "#/$defs/BlobStorageLocation",
               "description": "The location of the blob storage."
            },
            "container": {
               "description": "The name of the container in blob storage.",
               "pattern": "^[^/]*$",
               "title": "Container",
               "type": "string"
            },
            "path": {
               "description": "The path to the file in blob storage.",
               "title": "Path",
               "type": "string"
            },
            "filename": {
               "description": "The name of the file in blob storage.",
               "pattern": "^[^/]*$",
               "title": "Filename",
               "type": "string"
            }
         },
         "required": [
            "location",
            "container",
            "path",
            "filename"
         ],
         "title": "BlobStorageFile",
         "type": "object"
      },
      "BlobStorageLocation": {
         "description": "Blob Storage locations.",
         "enum": [
            "azure",
            "minio",
            "http",
            "https"
         ],
         "title": "BlobStorageLocation",
         "type": "string"
      }
   },
   "required": [
      "source",
      "destination",
      "byte_size",
      "sha256_checksum"
   ]
}

Fields:
field byte_size: int [Required][source]#

The size of the copied file in bytes.

field destination: BlobStorageFile [Required][source]#

The destination file that was copied to.

field sha256_checksum: str [Required][source]#

The SHA256 checksum of the copied file.

field source: BlobStorageFile [Required][source]#

The source file that was copied.

class app.persistence.blob.models.BlobSignedUrlType(*values)[source]#

Blob Storage interaction types.

DOWNLOAD = 'download'[source]#
UPLOAD = 'upload'[source]#
pydantic model app.persistence.blob.models.BlobStorageFile[source]#

Model to represent Blob Storage files.

Show Entity Relationship Diagram
digraph "Entity Relationship Diagram created by erdantic" {
   graph [fontcolor=gray66,
      fontname="Times New Roman,Times,Liberation Serif,serif",
      fontsize=9,
      nodesep=0.5,
      rankdir=LR,
      ranksep=1.5
   ];
   node [fontname="Times New Roman,Times,Liberation Serif,serif",
      fontsize=14,
      label="\N",
      shape=plain
   ];
   edge [dir=both];
   "app.persistence.blob.models.BlobStorageFile"   [label=<<table border="0" cellborder="1" cellspacing="0"><tr><td port="_root" colspan="2"><b>BlobStorageFile</b></td></tr><tr><td>location</td><td port="location">BlobStorageLocation</td></tr><tr><td>container</td><td port="container">str</td></tr><tr><td>path</td><td port="path">str</td></tr><tr><td>filename</td><td port="filename">str</td></tr></table>>,
      tooltip="app.persistence.blob.models.BlobStorageFile&#xA;&#xA;Model to represent Blob Storage files.&#xA;"];
}

Show JSON schema
{
   "title": "BlobStorageFile",
   "description": "Model to represent Blob Storage files.",
   "type": "object",
   "properties": {
      "location": {
         "$ref": "#/$defs/BlobStorageLocation",
         "description": "The location of the blob storage."
      },
      "container": {
         "description": "The name of the container in blob storage.",
         "pattern": "^[^/]*$",
         "title": "Container",
         "type": "string"
      },
      "path": {
         "description": "The path to the file in blob storage.",
         "title": "Path",
         "type": "string"
      },
      "filename": {
         "description": "The name of the file in blob storage.",
         "pattern": "^[^/]*$",
         "title": "Filename",
         "type": "string"
      }
   },
   "$defs": {
      "BlobStorageLocation": {
         "description": "Blob Storage locations.",
         "enum": [
            "azure",
            "minio",
            "http",
            "https"
         ],
         "title": "BlobStorageLocation",
         "type": "string"
      }
   },
   "required": [
      "location",
      "container",
      "path",
      "filename"
   ]
}

Config:
  • frozen: bool = True

Fields:
Validators:
  • _coerce_from_uri » all fields

field container: str [Required][source]#

The name of the container in blob storage.

Constraints:
  • pattern = ^[^/]*$

Validated by:
  • _coerce_from_uri

field filename: str [Required][source]#

The name of the file in blob storage.

Constraints:
  • pattern = ^[^/]*$

Validated by:
  • _coerce_from_uri

field location: BlobStorageLocation [Required][source]#

The location of the blob storage.

Validated by:
  • _coerce_from_uri

field path: str [Required][source]#

The path to the file in blob storage.

Validated by:
  • _coerce_from_uri

classmethod from_uri(uri: str) Self[source]#

Populate the model from its URI representation.

to_uri() str[source]#

Return the URI representation of the file.

property is_remote: bool[source]#

Whether this blob lives at a URL we don’t own (http/https).

class app.persistence.blob.models.BlobStorageLocation(*values)[source]#

Blob Storage locations.

AZURE = 'azure'[source]#
HTTP = 'http'[source]#
HTTPS = 'https'[source]#
MINIO = 'minio'[source]#
classmethod remote() frozenset[BlobStorageLocation][source]#

Return the set of remote blob storage locations.

app.persistence.blob.models.infer_content_type(filename: str) str[source]#

Infer the MIME content type for filename from its extension.

Falls back to application/octet-stream for unknown extensions.

Client#

Each blob storage client implements the GenericBlobStorageClient interface, which defines methods for interacting with blob storage.

class app.persistence.blob.client.GenericBlobStorageClient[source]#

Abstract base class for blob storage clients.

This class defines the interface for blob storage operations.

abstractmethod async generate_signed_url(file: BlobStorageFile, interaction_type: BlobSignedUrlType, content_disposition: str | None) str[source]#

Generate a signed URL for the file in blob storage.

Parameters:
  • file (BlobStorageFile) – The file for which to generate the signed URL.

  • interaction_type (BlobSignedUrlType) – The type of interaction (upload or download).

  • content_disposition (str | None) – Override for the Content-Disposition response header served when the signed URL is fetched. Set to "attachment" for untrusted content (e.g. full-text blobs) to neutralize inline browser rendering. Included only for DOWNLOAD interactions.

Returns:

The signed URL for the file.

Return type:

str

abstractmethod async stream_chunks(file: BlobStorageFile) AsyncGenerator[bytes, None][source]#

Stream a file as raw byte chunks from the blob storage.

Parameters:

file (BlobStorageFile) – The file to stream.

Returns:

An async generator that yields byte chunks from the file.

Return type:

AsyncGenerator[bytes, None]

async stream_file(file: BlobStorageFile) AsyncGenerator[str, None][source]#

Stream a file line-by-line from the blob storage.

Parameters:

file (BlobStorageFile) – The file to stream.

Returns:

An async generator that yields lines from the file.

Return type:

AsyncGenerator[str, None]

abstractmethod async upload_file(content: FileStream | BytesIO | AsyncIterator[bytes], file: BlobStorageFile, content_type: str | None = None) None[source]#

Upload a file to the blob storage.

Parameters:
  • content (FileStream | BytesIO | AsyncIterator[bytes]) – The content of the file to upload.

  • file (BlobStorageFile) – The file to upload.

  • content_type (str | None) – Optional MIME type to attach to the uploaded object. If not provided, implementations infer it from file.filename.

Blob Repository#

The codebase interacts with the blob clients through the BlobStorageRepository interface:

class app.persistence.blob.repository.BlobRepository[source]#

Repository for managing files in blob storage.

async copy(source: BlobStorageFile, destination: BlobStorageFile, max_bytes: int | None = None, content_type: str | None = None) BlobCopyResult[source]#

Stream a file from source to destination, computing sha256 and size.

Parameters:
  • source (BlobStorageFile) – The source file to copy.

  • destination (BlobStorageFile) – The destination to copy the file to.

  • max_bytes (int | None) – Optional cap on the total bytes streamed. The stream is aborted (raising BlobSizeExceededError) once the cumulative chunk size strictly exceeds this. None disables the check.

  • content_type (str | None) – MIME type to attach to the uploaded destination. If the caller has an authoritative content type (e.g. declared on a full-text enhancement), pass it here so it isn’t lossily re-derived from the destination filename. Defaults to None, which lets the backend infer from destination.filename.

Raises:

BlobSizeExceededError – if max_bytes is set and the source yields more bytes than allowed.

destination(path: str, filename: str, container: BlobContainer = BlobContainer.OPERATIONS) BlobStorageFile[source]#

Reserve a BlobStorageFile destination without performing any I/O.

Useful for pre-allocating a location that will be written to later (e.g. a record stored before its content is uploaded).

async get_signed_url(file: BlobStorageFile, interaction_type: BlobSignedUrlType, content_disposition: str | None = 'attachment') HttpUrl[source]#

Generate a signed URL for a file in Blob Storage.

Parameters:
  • file (BlobStorageFile) – The file for which to generate the signed URL.

  • interaction_type (BlobSignedUrlType) – The type of interaction (upload or download).

  • content_disposition (str | None) – Override for the signed download’s Content-Disposition response header. Defaults to "attachment" so browsers never render fetched bytes inline. Pass None to opt out if a future caller wants inline rendering.

Returns:

The signed URL for the file.

Return type:

HttpUrl

stream_file_from_blob_storage(file: BlobStorageFile) AsyncGenerator[AsyncIterator[str], None][source]#

Stream a file line-by-line from Blob Storage.

Usage:

async with blob_repo.stream_file_from_blob_storage(file) as stream:
    async for line in stream:
        print(line)
Parameters:

file (BlobStorageFile) – The file to stream.

Returns:

An async generator that yields lines one at a time from the file.

Return type:

AsyncGenerator[str, None]

Yield:

Lines from the file, one at a time.

Return type:

Iterator[AsyncGenerator[AsyncIterator[str], None]]

async upload_file_to_blob_storage(content: FileStream | BytesIO, path: str, filename: str, container: BlobContainer = BlobContainer.OPERATIONS, content_type: str | None = None) BlobStorageFile[source]#

Upload a file to Blob Storage.

See app.persistence.blob.stream.FileStream for examples of how to create and use a FileStream object.

Parameters:
  • content (FileStream | BytesIO) – The content of the file to upload.

  • path (str) – The path to upload the file to.

  • filename (str) – The name of the file to upload.

  • container (BlobContainer) – The logical container to upload the file to. The physical container name is resolved via the active blob backend.

  • content_type (str | None) – Optional MIME type to attach to the uploaded object. If not provided, it is inferred from filename.

Returns:

The information of the uploaded file.

Return type:

BlobStorageFile

File Content#

When getting, the file content is streamed line-by-line to the caller, allowing for efficient handling of large files without loading them entirely into memory.

When putting, files can either be streamed with the FileStream class or compiled into an in-memory BytesIO object for simpler cases.

class app.persistence.blob.stream.FileStream(fn: Callable[[...], Awaitable[Streamable]] | None = None, fn_kwargs: Sequence[dict[str, Any]] | None = None, generator: AsyncGenerator[Streamable, None] | None = None)[source]#

A helper class to convert a service function or generator into an async file stream.

This allows memory-efficient streaming of data from a function that returns a string or list of strings.

Example usage:

async def get_chunk(ids: list[UUID], other_arg: str) -> list[DomainModel]:
    return repository.get_strings(ids, other_arg)

file_stream = FileStream(fn=get_chunk, fn_kwargs=[
    {"ids": [id1, id2], "other_arg": "value"},
    {"ids": [id3, id4], "other_arg": "value"}
])
blob_repository.upload_file_to_blob_storage(
    content=file_stream,
    path="path/to/file.jsonl",
    filename="file.jsonl",
)
async read() BytesIO[source]#

Read all data from the FileStream into memory and return as a file-like object.

For implementations where async generators are not supported we read all data into memory and return a BytesIO object. Currently this applies only to MinIO which is only used for local and testing purposes. If this becomes a problem we can also look into composing blobs: https://min.io/docs/minio/linux/developers/python/API.html#compose_object

Returns:

A BytesIO object containing all the data.

Return type:

BytesIO

stream() AsyncGenerator[bytes, None][source]#

Stream data from the FileStream’s function or generator.

Returns:

An async generator yielding bytes.

Return type:

AsyncGenerator[bytes, None]

Yield:

The next chunk of data.

Return type:

Iterator[AsyncGenerator[bytes, None]]

Implementations#

Azure#

Azure Blob Storage is used for application deployments. At present there is one container, destiny-repository-<env>-ops, which is used only for ephemeral operational data. The file tree is as below:

destiny-repository-<env>-ops/
    ├── enhancement_result/
    │   └── <request_id>_robot.jsonl - the enhancement result as published by the robot to the repository
    │   └── <request_id>_repo.jsonl  - the validation result of importing the above as published by the repository
    ├── enhancement_request_reference_data/
    │   └── <request_id>.jsonl - the reference data provided to the robot for the enhancement request

MinIO#

MinIO is used for testing and local development. It is S3-compatible, if an AWS implementation is ever desired. However the current implementation is synchronous and so does not utilise the memory efficiency of the FileStream interface.