Store Architecture
The oups.store
module provides the core functionality for managing collections of ordered parquet datasets. It consists of three main components working together to provide efficient storage, indexing, and querying of time-series data.
Overview
The store architecture is designed around three key components:
Indexer: Provides a schema-based indexing system for organizing datasets
OrderedParquetDataset: Manages individual parquet datasets with ordering validation
Store: Provides a collection interface for multiple indexed datasets
Main Components
Indexer
The indexer system allows you to define hierarchical schemas for organizing your datasets using dataclasses decorated with @toplevel
and optionally @sublevel
. This provides a structured way to organize related datasets in common directories.
Motivation
Datasets are gathered in a parent directory as a collection. Each materializes as parquet files located in a child directory whose naming is derived from a user-defined index. By formalizing this index through dataclasses, index management (user scope) is dissociated from path management (oups scope).
Decorators
oups provides 2 class decorators for defining an indexing logic:
@toplevel
is compulsory, and defines naming logic of the first directory level@sublevel
is optional, and can be used as many times as needed for sub-directories
@toplevel Decorator
The @toplevel
decorator:
Generates paths from attribute values (
__str__
andto_path
methods)Generates class instances (
from_path
classmethod)Validates attribute values at instantiation
Calls
@dataclass
withorder
andfrozen
parameters set asTrue
Accepts an optional
fields_sep
parameter (default-
) to define field separatorsOnly accepts
int
orstr
attribute typesIf an attribute is a
@sublevel
-decorated class, it must be positioned last
@sublevel Decorator
The @sublevel
decorator:
Is an alias for
@dataclass
withorder
andfrozen
set asTrue
Only accepts
int
orstr
attribute typesIf another deeper sub-level is defined, it must be positioned as last attribute
Hierarchical Example
from oups.store import toplevel, sublevel
@sublevel
class Sampling:
frequency: str
@toplevel
class Measure:
quantity: str
city: str
sampling: Sampling
# Define different indexes for temperature in Berlin
berlin_1D = Measure('temperature', 'berlin', Sampling('1D'))
berlin_1W = Measure('temperature', 'berlin', Sampling('1W'))
# When this indexer is connected to a Store, the directory structure will look like:
# temperature-berlin/
# ├── 1D/
# │ ├── file_0000.parquet
# │ └── file_0001.parquet
# └── 1W/
# ├── file_0000.parquet
# └── file_0001.parquet
Simple Example
from oups.store import toplevel
@toplevel
class TimeSeriesIndex:
symbol: str
date: str
# This creates a schema where datasets are organized as:
# symbol-date/ (e.g., "AAPL-2023.01.01/")
OrderedParquetDataset
OrderedParquetDataset
is the core class for managing individual parquet datasets with strict ordering validation. It provides:
Key Features:
Ordered Storage: Data is stored in row groups ordered by a specified column
Incremental Updates: Efficiently merge new data with existing data
Row Group Management: Automatic splitting and merging of row groups
Metadata Tracking: Comprehensive metadata for each row group
Metadata Updates: Add, update, or remove custom key-value metadata
Duplicate Handling: Configurable duplicate detection and removal
Write Optimization: Configurable row group sizes and merge strategies
File Structure:
parent_directory/
├── my_dataset/ # Dataset directory
│ ├── file_0000.parquet # Row group files
│ └── file_0001.parquet
├── my_dataset_opdmd # Metadata file
└── my_dataset.lock # Lock file
Example:
from oups.store import OrderedParquetDataset
import pandas as pd
# Create or load a dataset
dataset = OrderedParquetDataset("/path/to/dataset", ordered_on="timestamp")
# Write data
df = pd.DataFrame({
"timestamp": pd.date_range("2023-01-01", periods=1000),
"value": range(1000)
})
dataset.write(df=df)
# Read data back
result = dataset.to_pandas()
Store
The Store
class provides a collection interface for managing multiple OrderedParquetDataset
instances organized according to an indexer schema.
Key Features:
Schema-based Organization: Uses indexer schemas for dataset discovery
Lazy Loading: Datasets are loaded on-demand
Collection Interface: Dictionary-like access to datasets
Cross-dataset Operations: Advanced querying across multiple datasets
Automatic Discovery: Finds existing datasets matching the schema
Example:
from oups.store import Store
from oups.store import toplevel
@toplevel
class StockIndex:
symbol: str
year: str
# Create store
store = Store("/path/to/data", StockIndex)
# Access datasets
aapl_2023 = store[StockIndex("AAPL", "2023")]
# Iterate over all datasets
for key in store:
dataset = store[key]
print(f"Dataset {key} has {len(dataset)} row groups")
Advanced Features
Write Method
The write()
function provides advanced data writing capabilities:
Parameters:
row_group_target_size
: Control row group sizes (int or pandas frequency string)duplicates_on
: Specify columns for duplicate detectionmax_n_off_target_rgs
: Control row group coalescing behaviorkey_value_metadata
: Store custom metadata (supports add/update/remove operations)
Example:
from oups.store import write
# Write with time-based row groups and metadata
write(
"/path/to/dataset",
ordered_on="timestamp",
df=df,
row_group_target_size="1D", # One row group per day
duplicates_on=["timestamp", "symbol"],
key_value_metadata={
"source": "market_data",
"version": "2.1",
"processed_by": "data_pipeline"
}
)
# Update existing metadata (add new, update existing, remove with None)
write(
"/path/to/dataset",
ordered_on="timestamp",
df=new_df,
key_value_metadata={
"version": "2.2", # Update existing
"last_updated": "2023-12-01", # Add new
"processed_by": None # Remove existing
}
)
iter_intersections
The iter_intersections()
method enables efficient querying across multiple datasets with overlapping ranges:
Key Features:
Range Queries: Query specific ranges (time, numeric, etc.) across multiple datasets
Intersection Detection: Automatically finds overlapping row groups
Memory Efficient: Streams data without loading entire datasets
Synchronized Iteration: Iterates through multiple datasets in sync
Example:
# Query multiple datasets for overlapping data
keys = [StockIndex("AAPL", "2023"), StockIndex("GOOGL", "2023")]
for intersection in store.iter_intersections(
keys,
start=pd.Timestamp("2023-01-01"),
end_excl=pd.Timestamp("2023-02-01")
):
for key, df in intersection.items():
print(f"Data from {key}: {len(df)} rows")
Best Practices
Indexer Design: Design your indexer schema to match your data access patterns
Ordered Column: Choose an appropriate column for ordering (typically timestamp)
Row Group Size: Balance between query performance and storage efficiency
Duplicate Handling: Use
duplicates_on
when data quality is a concernMetadata: Use key-value metadata to store important dataset information
See Also
API Reference - Complete API reference
Quickstart - Getting started guide