Tutorial
This guide will get you started with the oups.store
module for managing ordered parquet datasets.
Basic Concepts
The store module is built around three key concepts:
Indexer: Defines how datasets are organized using dataclass schemas
OrderedParquetDataset: Individual datasets with validated ordering
Store: Collection manager for multiple datasets
Let’s walk through a complete example.
Understanding Types and Parameters
Before diving into examples, it’s helpful to understand the key types and parameters used throughout the oups library:
Index Types
Indexer classes are dataclasses decorated with @toplevel
that define the schema for organizing datasets. They can optionally include @sublevel
classes for hierarchical organization.
Ordered Column Types
The ordered_on
parameter accepts:
str
: Single column name (e.g.,"timestamp"
)Tuple[str]
: Multi-index column name for hierarchical columns (e.g.,("date", "time")
)
Row Group Target Size Types
The row_group_target_size
parameter accepts:
int
: Target number of rows per row group (e.g.,10000
)str
: Pandas frequency string for time-based grouping (e.g.,"1D"
for daily,"1H"
for hourly)
Key-Value Metadata
Custom metadata stored as Dict[str, str]
alongside parquet files. This can include source information, processing parameters, or any other relevant metadata.
Setting Up an Indexer
First, define how you want to organize your datasets using a class decorated with @toplevel
:
from oups.store import toplevel
@toplevel
class WeatherIndex:
country: str
city: str
This creates a schema where datasets will be organized in directories like germany-berlin/
, france-paris/
, etc.
Creating a Store
Create a store instance that will manage your collection of datasets:
from oups.store import Store
import os
# Define the base directory for your data collection
data_path = os.path.expanduser('~/Documents/data/weather_data')
# Create the store
store = Store(data_path, WeatherIndex)
Working with Datasets
Writing Data
import pandas as pd
# Create an index for Berlin weather data
berlin_key = WeatherIndex('germany', 'berlin')
# Create sample data
df = pd.DataFrame({
'timestamp': pd.date_range('2023-01-01', periods=30, freq='D'),
'temperature': range(20, 50),
'humidity': range(30, 60)
})
# Get reference to the dataset (initializes the dataset if it doesn't exist)
berlin_dataset = store[berlin_key]
# Write the data with timestamp ordering
berlin_dataset.write(df=df, ordered_on='timestamp')
The directory structure will now look like:
weather_data/
├── germany-berlin/
│ ├── file_0000.parquet
│ └── file_0001.parquet
├── germany-berlin_opdmd
└── germany-berlin.lock
Reading Data
# Read all data back as a pandas DataFrame
result_df = berlin_dataset.to_pandas()
print(f"Dataset has {len(result_df)} rows")
# Check dataset metadata
print(f"Ordered on: {berlin_dataset.ordered_on}")
print(f"Number of row groups: {len(berlin_dataset)}")
Adding More Data
Incremental Updates
# Add more recent data
new_df = pd.DataFrame({
'timestamp': pd.date_range('2023-02-01', periods=15, freq='D'),
'temperature': range(15, 30),
'humidity': range(40, 55)
})
# This will merge with existing data in the correct order
berlin_dataset.write(df=new_df, ordered_on='timestamp')
Adding Another City
# Add data for Paris
paris_key = WeatherIndex('france', 'paris')
paris_df = pd.DataFrame({
'timestamp': pd.date_range('2023-01-01', periods=25, freq='D'),
'temperature': range(25, 50),
'humidity': range(35, 60)
})
store[paris_key].write(df=paris_df, ordered_on='timestamp')
Exploring Your Store
List All Datasets
print(f"Total datasets: {len(store)}")
for key in store:
dataset = store[key]
print(f"{key}: {len(dataset)} row groups")
Query Multiple Datasets
# Query data from multiple cities for a specific time range
keys = [WeatherIndex('germany', 'berlin'), WeatherIndex('france', 'paris')]
start_date = pd.Timestamp('2023-01-15')
end_date = pd.Timestamp('2023-01-25')
for intersection in store.iter_intersections(keys, start=start_date, end_excl=end_date):
for key, df in intersection.items():
print(f"Data from {key}: {len(df)} rows")
print(f"Temperature range: {df['temperature'].min()}-{df['temperature'].max()}")
Advanced Features
Hierarchical Indexing
For more complex organization, you can create hierarchical indexers using @sublevel
:
from oups.store import toplevel, sublevel
@sublevel
class DateInfo:
year: str
month: str
@toplevel
class HierarchicalIndex:
symbol: str
date_info: DateInfo
# This creates paths like: AAPL/2023-01/
key = HierarchicalIndex("AAPL", DateInfo("2023", "01"))
store_hierarchical = Store("/path/to/financial_data", HierarchicalIndex)
Time-based Row Groups
from oups.store import write
# Organize data into daily row groups
write(
store[berlin_key],
ordered_on='timestamp',
df=df,
row_group_target_size='1D' # One row group per day
)
Advanced Write Options
The write
function supports many advanced options for optimizing storage and handling duplicates:
from oups.store import write
# Advanced write with all options
write(
"/path/to/dataset",
ordered_on="timestamp",
df=df,
row_group_target_size="1D", # Daily row groups
duplicates_on=["timestamp", "symbol"], # Drop duplicates based on these columns
max_n_off_target_rgs=2, # Coalesce small row groups
key_value_metadata={
"source": "bloomberg",
"version": "1.0",
"processed_by": "data_pipeline"
}
)
Handling Duplicates
# Remove duplicates based on timestamp and location
write(
store[berlin_key],
ordered_on='timestamp',
df=df_with_duplicates,
duplicates_on=['timestamp'] # Drop rows with same timestamp
)
Custom Metadata
# Add metadata to your dataset
write(
store[berlin_key],
ordered_on='timestamp',
df=df,
key_value_metadata={
'source': 'weather_station_001',
'units': 'celsius',
'version': '1.0'
}
)
# Update existing metadata (add new, update existing, remove with None)
write(
store[berlin_key],
ordered_on='timestamp',
df=new_df,
key_value_metadata={
'version': '1.1', # Update existing
'last_updated': '2023-12-01', # Add new
'processed_by': None # Remove existing
}
)
Cross-Dataset Queries
For more complex scenarios, you can query multiple datasets simultaneously:
# Define a financial indexer for cross-dataset queries
@toplevel
class StockIndex:
category: str
subcategory: str
# Query multiple datasets simultaneously
keys = [StockIndex("stocks", "tech"), StockIndex("stocks", "finance")]
for intersection in store.iter_intersections(
keys,
start=pd.Timestamp("2023-01-01"),
end_excl=pd.Timestamp("2023-02-01")
):
for key, df in intersection.items():
print(f"Processing {key}: {len(df)} rows")
Next Steps
Learn more about indexing and explore the complete Store Architecture architecture documentation
Review the full API Reference reference
Understand the Why oups? and design philosophy