Blob Storage#
Blob storage is used for storing large and/or ephemeral data that is not suitable for other persistence stores. This includes: - Large permanent data files such as full texts of documents. - Temporary files that are generated during processing but not needed long-term. - Large payloads that are not suitable for REST APIs, such as batch processes.
Interface#
External actors such as robots interact with blob storage purely through GET/PUT
signed URLs, they are not granted direct access to the storage itself. These have short-lived signatures, typically valid for one hour, but can be refreshed by re-requesting the resource from the Repository API.
The repository codebase interacts with blob storage through the BlobStorageRepository
interface, which provides methods for uploading and downloading files, as well as managing file metadata. This interface is primarily a streaming interface, allowing for efficient handling of large files for upload and download without loading them entirely into memory. When getting, files are streamed line-by-line to the caller, and when putting, files can either be streamed with the FileStream
class or compiled into an in-memory BytesIO
object for simpler cases.
Representation#
Blob files are represented in the repository as a BlobStorageFile
object.
Models for handling files in blob storage.
- class app.persistence.blob.models.BlobSignedUrlType(*values)[source]#
Blob Storage interaction types.
- pydantic model app.persistence.blob.models.BlobStorageFile[source]#
Model to represent Blob Storage files.
Show Entity Relationship Diagram
Show JSON schema
{ "title": "BlobStorageFile", "description": "Model to represent Blob Storage files.", "type": "object", "properties": { "location": { "$ref": "#/$defs/BlobStorageLocation", "description": "The location of the blob storage." }, "container": { "description": "The name of the container in Azure Blob Storage.", "pattern": "^[^/]*$", "title": "Container", "type": "string" }, "path": { "description": "The path to the file in Azure Blob Storage.", "title": "Path", "type": "string" }, "filename": { "description": "The name of the file in Azure Blob Storage.", "pattern": "^[^/]*$", "title": "Filename", "type": "string" } }, "$defs": { "BlobStorageLocation": { "description": "Blob Storage locations.", "enum": [ "azure", "minio" ], "title": "BlobStorageLocation", "type": "string" } }, "required": [ "location", "container", "path", "filename" ] }
- Config:
frozen: bool = True
- Fields:
- field container: str [Required][source]#
The name of the container in Azure Blob Storage.
- Constraints:
pattern = ^[^/]*$
- field filename: str [Required][source]#
The name of the file in Azure Blob Storage.
- Constraints:
pattern = ^[^/]*$
- field location: BlobStorageLocation [Required][source]#
The location of the blob storage.
Client#
Each blob storage client implements the GenericBlobStorageClient
interface, which defines methods for interacting with blob storage.
- class app.persistence.blob.client.GenericBlobStorageClient[source]#
Abstract base class for blob storage clients.
This class defines the interface for blob storage operations.
- abstractmethod async generate_signed_url(file: BlobStorageFile, interaction_type: BlobSignedUrlType) str [source]#
Generate a signed URL for the file in blob storage.
- Parameters:
file (BlobStorageFile) – The file for which to generate the signed URL.
interaction_type (BlobSignedUrlType) – The type of interaction (upload or download).
- Returns:
The signed URL for the file.
- Return type:
str
- abstractmethod async stream_file(file: BlobStorageFile) AsyncGenerator[str, None] [source]#
Stream a file line-by-line from the blob storage.
- Parameters:
file (BlobStorageFile) – The file to stream.
- Returns:
An async generator that yields lines from the file.
- Return type:
AsyncGenerator[str, None]
- abstractmethod async upload_file(content: FileStream | BytesIO, file: BlobStorageFile) None [source]#
Upload a file to the blob storage.
- Parameters:
content (FileStream | BytesIO) – The content of the file to upload.
file (BlobStorageFile) – The file to upload.
Blob Repository#
The codebase interacts with the blob clients through the BlobStorageRepository
interface:
- class app.persistence.blob.repository.BlobRepository[source]#
Repository for managing files in blob storage.
- async get_signed_url(file: BlobStorageFile, interaction_type: BlobSignedUrlType) HttpUrl [source]#
Generate a signed URL for a file in Blob Storage.
- Parameters:
file (BlobStorageFile) – The file for which to generate the signed URL.
interaction_type (BlobSignedUrlType) – The type of interaction (upload or download).
- Returns:
The signed URL for the file.
- Return type:
HttpUrl
- stream_file_from_blob_storage(file: BlobStorageFile) AsyncGenerator[AsyncIterator[str], None] [source]#
Stream a file line-by-line from Blob Storage.
Usage:
async with blob_repo.stream_file_from_blob_storage(file) as stream: async for line in stream: print(line)
- Parameters:
file (BlobStorageFile) – The file to stream.
- Returns:
An async generator that yields lines one at a time from the file.
- Return type:
AsyncGenerator[str, None]
- Yield:
Lines from the file, one at a time.
- Return type:
Iterator[AsyncGenerator[AsyncIterator[str], None]]
- async upload_file_to_blob_storage(content: FileStream | BytesIO, path: str, filename: str, container: str | None = None, location: BlobStorageLocation | None = None) BlobStorageFile [source]#
Upload a file to Blob Storage.
See
app.persistence.blob.stream.FileStream
for examples of how to create and use aFileStream
object.- Parameters:
content (FileStream | BytesIO) – The content of the file to upload.
path (str) – The path to upload the file to.
filename (str) – The name of the file to upload.
container (str | None) – The container to upload the file to, defaults to
app.core.config.Settings.default_blob_container
.location (BlobStorageLocation | None) – The location of the blob storage, defaults to
app.core.config.Settings.default_blob_location
.
- Returns:
The information of the uploaded file.
- Return type:
File Content#
When getting, the file content is streamed line-by-line to the caller, allowing for efficient handling of large files without loading them entirely into memory.
When putting, files can either be streamed with the FileStream
class or compiled into an in-memory BytesIO
object for simpler cases.
- class app.domain.base.SDKJsonlMixin[source]#
Mixin for SDK JSONL attributes.
This flags that the model is used by the SDK to marshal data in and out of JSONL files.
- class app.persistence.blob.stream.FileStream(fn: Callable[[...], Awaitable[Streamable]] | None = None, fn_kwargs: Sequence[dict[str, Any]] | None = None, generator: AsyncGenerator[Streamable, None] | None = None)[source]#
A helper class to convert a service function or generator into an async file stream.
This allows memory-efficient streaming of data from a function that returns a list of objects that inherit from
app.domain.base.SDKJsonlMixin
, which identifies domain models that can be converted to JSONL format, or from an async generator that yields strings.Example usage:
class DomainModel(SDKJsonlMixin): ... async def get_chunk(ids: list[UUID4], other_arg: str) -> list[DomainModel]: return repository.get_domain_models(ids, other_arg) file_stream = FileStream(fn=get_chunk, fn_kwargs=[ {"ids": [id1, id2], "other_arg": "value"}, {"ids": [id3, id4], "other_arg": "value"} ]) blob_repository.upload_file_to_blob_storage( content=file_stream, path="path/to/file.jsonl", filename="file.jsonl", )
- async read() BytesIO [source]#
Read all data from the FileStream into memory and return as a file-like object.
For implementations where async generators are not supported we read all data into memory and return a BytesIO object. Currently this applies only to MinIO which is only used for local and testing purposes. If this becomes a problem we can also look into composing blobs: https://min.io/docs/minio/linux/developers/python/API.html#compose_object
- Returns:
A BytesIO object containing all the data.
- Return type:
BytesIO
Implementations#
Azure#
Azure Blob Storage is used for application deployments. At present there is one container, destiny-repository-<env>-ops
, which is used only for ephemeral operational data. The file tree is as below:
destiny-repository-<env>-ops/
├── batch_enhancement_result/
│ └── <batch_request_id>_robot.jsonl - the enhancement result as published by the robot to the repository
│ └── <batch_request_id>_repo.jsonl - the validation result of importing the above as published by the repository
├── batch_enhancement_request_reference_data/
│ └── <batch_request_id>.jsonl - the reference data provided to the robot for the batch enhancement request
MinIO#
MinIO is used for testing and local development. It is S3-compatible, if an AWS implementation is ever desired. However the current implementation is synchronous and so does not utilise the memory efficiency of the FileStream interface.