Blob Storage#
Blob storage is used for storing large and/or ephemeral data that is not suitable for other persistence stores. This includes: - Large permanent data files such as full texts of documents. - Temporary files that are generated during processing but not needed long-term. - Large payloads that are not suitable for REST APIs, such as the enhancement process.
Interface#
External actors such as robots interact with blob storage purely through GET/PUT signed URLs, they are not granted direct access to the storage itself. These have short-lived signatures, typically valid for one hour, but can be refreshed by re-requesting the resource from the Repository API.
The repository codebase interacts with blob storage through the BlobStorageRepository interface, which provides methods for uploading and downloading files, as well as managing file metadata. This interface is primarily a streaming interface, allowing for efficient handling of large files for upload and download without loading them entirely into memory. When getting, files are streamed line-by-line to the caller, and when putting, files can either be streamed with the FileStream class or compiled into an in-memory BytesIO object for simpler cases.
Representation#
Blob files are represented in the repository as a BlobStorageFile object.
Models for handling files in blob storage.
- pydantic model app.persistence.blob.models.BlobCopyResult[source]#
Model to represent the result of copying a blob storage file.
Show Entity Relationship Diagram
![digraph "Entity Relationship Diagram created by erdantic" {
graph [fontcolor=gray66,
fontname="Times New Roman,Times,Liberation Serif,serif",
fontsize=9,
nodesep=0.5,
rankdir=LR,
ranksep=1.5
];
node [fontname="Times New Roman,Times,Liberation Serif,serif",
fontsize=14,
label="\N",
shape=plain
];
edge [dir=both];
"app.persistence.blob.models.BlobCopyResult" [label=<<table border="0" cellborder="1" cellspacing="0"><tr><td port="_root" colspan="2"><b>BlobCopyResult</b></td></tr><tr><td>source</td><td port="source">BlobStorageFile</td></tr><tr><td>destination</td><td port="destination">BlobStorageFile</td></tr><tr><td>byte_size</td><td port="byte_size">int</td></tr><tr><td>sha256_checksum</td><td port="sha256_checksum">str</td></tr></table>>,
tooltip="app.persistence.blob.models.BlobCopyResult

Model to represent the result of copying a blob storage file.
"];
"app.persistence.blob.models.BlobStorageFile" [label=<<table border="0" cellborder="1" cellspacing="0"><tr><td port="_root" colspan="2"><b>BlobStorageFile</b></td></tr><tr><td>location</td><td port="location">BlobStorageLocation</td></tr><tr><td>container</td><td port="container">str</td></tr><tr><td>path</td><td port="path">str</td></tr><tr><td>filename</td><td port="filename">str</td></tr></table>>,
tooltip="app.persistence.blob.models.BlobStorageFile

Model to represent Blob Storage files.
"];
"app.persistence.blob.models.BlobCopyResult":destination:e -> "app.persistence.blob.models.BlobStorageFile":_root:w [arrowhead=noneteetee,
arrowtail=nonenone];
"app.persistence.blob.models.BlobCopyResult":source:e -> "app.persistence.blob.models.BlobStorageFile":_root:w [arrowhead=noneteetee,
arrowtail=nonenone];
}](../../_images/graphviz-29b5cbbaf56b76a3051ee7b7091f59d95a226793.png)
Show JSON schema
{ "title": "BlobCopyResult", "description": "Model to represent the result of copying a blob storage file.", "type": "object", "properties": { "source": { "$ref": "#/$defs/BlobStorageFile", "description": "The source file that was copied." }, "destination": { "$ref": "#/$defs/BlobStorageFile", "description": "The destination file that was copied to." }, "byte_size": { "description": "The size of the copied file in bytes.", "title": "Byte Size", "type": "integer" }, "sha256_checksum": { "description": "The SHA256 checksum of the copied file.", "title": "Sha256 Checksum", "type": "string" } }, "$defs": { "BlobStorageFile": { "description": "Model to represent Blob Storage files.", "properties": { "location": { "$ref": "#/$defs/BlobStorageLocation", "description": "The location of the blob storage." }, "container": { "description": "The name of the container in blob storage.", "pattern": "^[^/]*$", "title": "Container", "type": "string" }, "path": { "description": "The path to the file in blob storage.", "title": "Path", "type": "string" }, "filename": { "description": "The name of the file in blob storage.", "pattern": "^[^/]*$", "title": "Filename", "type": "string" } }, "required": [ "location", "container", "path", "filename" ], "title": "BlobStorageFile", "type": "object" }, "BlobStorageLocation": { "description": "Blob Storage locations.", "enum": [ "azure", "minio", "http", "https" ], "title": "BlobStorageLocation", "type": "string" } }, "required": [ "source", "destination", "byte_size", "sha256_checksum" ] }
- Fields:
- field destination: BlobStorageFile [Required][source]#
The destination file that was copied to.
- field source: BlobStorageFile [Required][source]#
The source file that was copied.
- class app.persistence.blob.models.BlobSignedUrlType(*values)[source]#
Blob Storage interaction types.
- pydantic model app.persistence.blob.models.BlobStorageFile[source]#
Model to represent Blob Storage files.
Show Entity Relationship Diagram
![digraph "Entity Relationship Diagram created by erdantic" {
graph [fontcolor=gray66,
fontname="Times New Roman,Times,Liberation Serif,serif",
fontsize=9,
nodesep=0.5,
rankdir=LR,
ranksep=1.5
];
node [fontname="Times New Roman,Times,Liberation Serif,serif",
fontsize=14,
label="\N",
shape=plain
];
edge [dir=both];
"app.persistence.blob.models.BlobStorageFile" [label=<<table border="0" cellborder="1" cellspacing="0"><tr><td port="_root" colspan="2"><b>BlobStorageFile</b></td></tr><tr><td>location</td><td port="location">BlobStorageLocation</td></tr><tr><td>container</td><td port="container">str</td></tr><tr><td>path</td><td port="path">str</td></tr><tr><td>filename</td><td port="filename">str</td></tr></table>>,
tooltip="app.persistence.blob.models.BlobStorageFile

Model to represent Blob Storage files.
"];
}](../../_images/graphviz-0be2fc10c27322d3ef84ac4a0fa5cf8ba6054221.png)
Show JSON schema
{ "title": "BlobStorageFile", "description": "Model to represent Blob Storage files.", "type": "object", "properties": { "location": { "$ref": "#/$defs/BlobStorageLocation", "description": "The location of the blob storage." }, "container": { "description": "The name of the container in blob storage.", "pattern": "^[^/]*$", "title": "Container", "type": "string" }, "path": { "description": "The path to the file in blob storage.", "title": "Path", "type": "string" }, "filename": { "description": "The name of the file in blob storage.", "pattern": "^[^/]*$", "title": "Filename", "type": "string" } }, "$defs": { "BlobStorageLocation": { "description": "Blob Storage locations.", "enum": [ "azure", "minio", "http", "https" ], "title": "BlobStorageLocation", "type": "string" } }, "required": [ "location", "container", "path", "filename" ] }
- Config:
frozen: bool = True
- Fields:
- Validators:
_coerce_from_uri»all fields
- field container: str [Required][source]#
The name of the container in blob storage.
- Constraints:
pattern = ^[^/]*$
- Validated by:
_coerce_from_uri
- field filename: str [Required][source]#
The name of the file in blob storage.
- Constraints:
pattern = ^[^/]*$
- Validated by:
_coerce_from_uri
- field location: BlobStorageLocation [Required][source]#
The location of the blob storage.
- Validated by:
_coerce_from_uri
- class app.persistence.blob.models.BlobStorageLocation(*values)[source]#
Blob Storage locations.
- classmethod remote() frozenset[BlobStorageLocation][source]#
Return the set of remote blob storage locations.
Client#
Each blob storage client implements the GenericBlobStorageClient interface, which defines methods for interacting with blob storage.
- class app.persistence.blob.client.GenericBlobStorageClient[source]#
Abstract base class for blob storage clients.
This class defines the interface for blob storage operations.
- abstractmethod async generate_signed_url(file: BlobStorageFile, interaction_type: BlobSignedUrlType, content_disposition: str | None) str[source]#
Generate a signed URL for the file in blob storage.
- Parameters:
file (BlobStorageFile) – The file for which to generate the signed URL.
interaction_type (BlobSignedUrlType) – The type of interaction (upload or download).
content_disposition (str | None) – Override for the Content-Disposition response header served when the signed URL is fetched. Set to
"attachment"for untrusted content (e.g. full-text blobs) to neutralize inline browser rendering. Included only forDOWNLOADinteractions.
- Returns:
The signed URL for the file.
- Return type:
str
- abstractmethod async stream_chunks(file: BlobStorageFile) AsyncGenerator[bytes, None][source]#
Stream a file as raw byte chunks from the blob storage.
- Parameters:
file (BlobStorageFile) – The file to stream.
- Returns:
An async generator that yields byte chunks from the file.
- Return type:
AsyncGenerator[bytes, None]
- async stream_file(file: BlobStorageFile) AsyncGenerator[str, None][source]#
Stream a file line-by-line from the blob storage.
- Parameters:
file (BlobStorageFile) – The file to stream.
- Returns:
An async generator that yields lines from the file.
- Return type:
AsyncGenerator[str, None]
- abstractmethod async upload_file(content: FileStream | BytesIO | AsyncIterator[bytes], file: BlobStorageFile, content_type: str | None = None) None[source]#
Upload a file to the blob storage.
- Parameters:
content (FileStream | BytesIO | AsyncIterator[bytes]) – The content of the file to upload.
file (BlobStorageFile) – The file to upload.
content_type (str | None) – Optional MIME type to attach to the uploaded object. If not provided, implementations infer it from
file.filename.
Blob Repository#
The codebase interacts with the blob clients through the BlobStorageRepository interface:
- class app.persistence.blob.repository.BlobRepository[source]#
Repository for managing files in blob storage.
- async copy(source: BlobStorageFile, destination: BlobStorageFile, max_bytes: int | None = None, content_type: str | None = None) BlobCopyResult[source]#
Stream a file from source to destination, computing sha256 and size.
- Parameters:
source (BlobStorageFile) – The source file to copy.
destination (BlobStorageFile) – The destination to copy the file to.
max_bytes (int | None) – Optional cap on the total bytes streamed. The stream is aborted (raising
BlobSizeExceededError) once the cumulative chunk size strictly exceeds this.Nonedisables the check.content_type (str | None) – MIME type to attach to the uploaded destination. If the caller has an authoritative content type (e.g. declared on a full-text enhancement), pass it here so it isn’t lossily re-derived from the destination filename. Defaults to
None, which lets the backend infer fromdestination.filename.
- Raises:
BlobSizeExceededError – if
max_bytesis set and the source yields more bytes than allowed.
- destination(path: str, filename: str, container: BlobContainer = BlobContainer.OPERATIONS) BlobStorageFile[source]#
Reserve a BlobStorageFile destination without performing any I/O.
Useful for pre-allocating a location that will be written to later (e.g. a record stored before its content is uploaded).
- async get_signed_url(file: BlobStorageFile, interaction_type: BlobSignedUrlType, content_disposition: str | None = 'attachment') HttpUrl[source]#
Generate a signed URL for a file in Blob Storage.
- Parameters:
file (BlobStorageFile) – The file for which to generate the signed URL.
interaction_type (BlobSignedUrlType) – The type of interaction (upload or download).
content_disposition (str | None) – Override for the signed download’s Content-Disposition response header. Defaults to
"attachment"so browsers never render fetched bytes inline. PassNoneto opt out if a future caller wants inline rendering.
- Returns:
The signed URL for the file.
- Return type:
HttpUrl
- stream_file_from_blob_storage(file: BlobStorageFile) AsyncGenerator[AsyncIterator[str], None][source]#
Stream a file line-by-line from Blob Storage.
Usage:
async with blob_repo.stream_file_from_blob_storage(file) as stream: async for line in stream: print(line)
- Parameters:
file (BlobStorageFile) – The file to stream.
- Returns:
An async generator that yields lines one at a time from the file.
- Return type:
AsyncGenerator[str, None]
- Yield:
Lines from the file, one at a time.
- Return type:
Iterator[AsyncGenerator[AsyncIterator[str], None]]
- async upload_file_to_blob_storage(content: FileStream | BytesIO, path: str, filename: str, container: BlobContainer = BlobContainer.OPERATIONS, content_type: str | None = None) BlobStorageFile[source]#
Upload a file to Blob Storage.
See
app.persistence.blob.stream.FileStreamfor examples of how to create and use aFileStreamobject.- Parameters:
content (FileStream | BytesIO) – The content of the file to upload.
path (str) – The path to upload the file to.
filename (str) – The name of the file to upload.
container (BlobContainer) – The logical container to upload the file to. The physical container name is resolved via the active blob backend.
content_type (str | None) – Optional MIME type to attach to the uploaded object. If not provided, it is inferred from
filename.
- Returns:
The information of the uploaded file.
- Return type:
File Content#
When getting, the file content is streamed line-by-line to the caller, allowing for efficient handling of large files without loading them entirely into memory.
When putting, files can either be streamed with the FileStream class or compiled into an in-memory BytesIO object for simpler cases.
- class app.persistence.blob.stream.FileStream(fn: Callable[[...], Awaitable[Streamable]] | None = None, fn_kwargs: Sequence[dict[str, Any]] | None = None, generator: AsyncGenerator[Streamable, None] | None = None)[source]#
A helper class to convert a service function or generator into an async file stream.
This allows memory-efficient streaming of data from a function that returns a string or list of strings.
Example usage:
async def get_chunk(ids: list[UUID], other_arg: str) -> list[DomainModel]: return repository.get_strings(ids, other_arg) file_stream = FileStream(fn=get_chunk, fn_kwargs=[ {"ids": [id1, id2], "other_arg": "value"}, {"ids": [id3, id4], "other_arg": "value"} ]) blob_repository.upload_file_to_blob_storage( content=file_stream, path="path/to/file.jsonl", filename="file.jsonl", )
- async read() BytesIO[source]#
Read all data from the FileStream into memory and return as a file-like object.
For implementations where async generators are not supported we read all data into memory and return a BytesIO object. Currently this applies only to MinIO which is only used for local and testing purposes. If this becomes a problem we can also look into composing blobs: https://min.io/docs/minio/linux/developers/python/API.html#compose_object
- Returns:
A BytesIO object containing all the data.
- Return type:
BytesIO
Implementations#
Azure#
Azure Blob Storage is used for application deployments. At present there is one container, destiny-repository-<env>-ops, which is used only for ephemeral operational data. The file tree is as below:
destiny-repository-<env>-ops/
├── enhancement_result/
│ └── <request_id>_robot.jsonl - the enhancement result as published by the robot to the repository
│ └── <request_id>_repo.jsonl - the validation result of importing the above as published by the repository
├── enhancement_request_reference_data/
│ └── <request_id>.jsonl - the reference data provided to the robot for the enhancement request
MinIO#
MinIO is used for testing and local development. It is S3-compatible, if an AWS implementation is ever desired. However the current implementation is synchronous and so does not utilise the memory efficiency of the FileStream interface.