Quickstart
This guide will get you started with the oups.store
module for managing ordered parquet datasets.
Basic Concepts
The store module is built around three key concepts:
Indexer: Defines how datasets are organized using dataclass schemas
OrderedParquetDataset: Individual datasets with validated ordering
Store: Collection manager for multiple datasets
Let’s walk through a complete example.
Setting Up an Indexer
First, define how you want to organize your datasets using a class decorated with @toplevel
:
from oups.store import toplevel
@toplevel
class WeatherIndex:
country: str
city: str
This creates a schema where datasets will be organized in directories like germany-berlin/
, france-paris/
, etc.
Creating a Store
Create a store instance that will manage your collection of datasets:
from oups.store import Store
import os
# Define the base directory for your data collection
data_path = os.path.expanduser('~/Documents/data/weather_data')
# Create the store
store = Store(data_path, WeatherIndex)
Working with Datasets
Writing Data
import pandas as pd
# Create an index for Berlin weather data
berlin_key = WeatherIndex('germany', 'berlin')
# Create sample data
df = pd.DataFrame({
'timestamp': pd.date_range('2023-01-01', periods=30, freq='D'),
'temperature': range(20, 50),
'humidity': range(30, 60)
})
# Get reference to the dataset (initializes the dataset if it doesn't exist)
berlin_dataset = store[berlin_key]
# Write the data with timestamp ordering
berlin_dataset.write(df=df, ordered_on='timestamp')
The directory structure will now look like:
weather_data/
├── germany-berlin/
│ ├── file_0000.parquet
│ └── file_0001.parquet
├── germany-berlin_opdmd
└── germany-berlin.lock
Reading Data
# Read all data back as a pandas DataFrame
result_df = berlin_dataset.to_pandas()
print(f"Dataset has {len(result_df)} rows")
# Check dataset metadata
print(f"Ordered on: {berlin_dataset.ordered_on}")
print(f"Number of row groups: {len(berlin_dataset)}")
Adding More Data
Incremental Updates
# Add more recent data
new_df = pd.DataFrame({
'timestamp': pd.date_range('2023-02-01', periods=15, freq='D'),
'temperature': range(15, 30),
'humidity': range(40, 55)
})
# This will merge with existing data in the correct order
berlin_dataset.write(df=new_df, ordered_on='timestamp')
Adding Another City
# Add data for Paris
paris_key = WeatherIndex('france', 'paris')
paris_df = pd.DataFrame({
'timestamp': pd.date_range('2023-01-01', periods=25, freq='D'),
'temperature': range(25, 50),
'humidity': range(35, 60)
})
store[paris_key].write(df=paris_df, ordered_on='timestamp')
Exploring Your Store
List All Datasets
print(f"Total datasets: {len(store)}")
for key in store:
dataset = store[key]
print(f"{key}: {len(dataset)} row groups")
Query Multiple Datasets
# Query data from multiple cities for a specific time range
keys = [WeatherIndex('germany', 'berlin'), WeatherIndex('france', 'paris')]
start_date = pd.Timestamp('2023-01-15')
end_date = pd.Timestamp('2023-01-25')
for intersection in store.iter_intersections(keys, start=start_date, end_excl=end_date):
for key, df in intersection.items():
print(f"Data from {key}: {len(df)} rows")
print(f"Temperature range: {df['temperature'].min()}-{df['temperature'].max()}")
Advanced Features
Time-based Row Groups
from oups.store import write
# Organize data into daily row groups
write(
store[berlin_key],
ordered_on='timestamp',
df=df,
row_group_target_size='1D' # One row group per day
)
Handling Duplicates
# Remove duplicates based on timestamp and location
write(
store[berlin_key],
ordered_on='timestamp',
df=df_with_duplicates,
duplicates_on=['timestamp'] # Drop rows with same timestamp
)
Custom Metadata
# Add metadata to your dataset
write(
store[berlin_key],
ordered_on='timestamp',
df=df,
key_value_metadata={
'source': 'weather_station_001',
'units': 'celsius',
'version': '1.0'
}
)
Next Steps
Explore the complete Store Architecture architecture documentation
Learn more about indexing in Store Architecture (Indexer section)
Review the full API Reference reference
Understand the Why oups? and design philosophy