ParquetSet
Purpose and creation
An instance of ParquetSet
class gathers a collection of datasets.
ParquetSet
instantiation requires the definition of a collection path and a dataset indexing logic.
A collection path is directory path (existing or not) where will be (are) gathered directories for each dataset.
An indexing logic is formalized by use of a
@toplevel
-decorated class as presented in Collection indexing.
from os import path as os_path
from oups import ParquetSet, toplevel
# Define an indexing logic to generate each individual dataset folder name.
@toplevel
class DatasetIndex:
country: str
city: str
# Define a collection path.
store_path = os_path.expanduser('~/Documents/data/weather_knowledge_base')
# Initialize a parquet dataset collection.
ps = ParquetSet(store_path, DatasetIndex)
Usage notes
Dataframe format
Row index is dropped when recording. If the index of your dataframe is meaningful, make sure to reset it as a column. This only applies to pandas dataframes, as vaex ones have no row index.
pandas_df = pandas_df.reset_index()
Column multi-index can be recorded. Here again vaex has no support for column multi-index. But if your vaex dataframe comes from a pandas one initially with column multi-index, you can expand it again at recording.
# With 'vaex_df' created from a pandas dataframe with column multi-index.
ps[idx] = {'cmidx_expand'=True}, vaex_df
Writing
When recording data to disk,
ParquetSet
instance accepts atuple
which first item is then a dict defining recording setting. Parameters accepted are those ofoups.writer.write
function and complementary todirpath
anddata
(see API reference for a review).
ps[idx] = {'row_group_size'=5_000_000, 'compression'='BROTLI'}, df
New datasets can be added to the same collection, as long as the index used is an instance from the same
@toplevel
-decorated class as the one used atParquetSet
instantiation.
Reading
A
ParquetSet
instance returns aParquetHandle
which gives access to data either through ‘handles’ (vaex dataframe or fastparquet parquet file) or directly as a pandas dataframe.fastparquet parquet file
ps[idx].pf
,or pandas dataframe
ps[idx].pdf
,or vaex dataframe
ps[idx].vdf
.
Updating
If using an index already present in a ParquetSet
instance, existing data is updated with new one. Different keywords control data updating logic. These keywords can also be reviewed in API reference, looking at oups.writer.write
function signature.
ordered_on
, defaultNone
This keyword specifies the name of a column according which dataset is ordered (ascending order).
When specified, position of the new data with respect to existing data is checked. It allows data insertion.
It also enforces sharp row group boundaries, meaning that a row group will necessarily starts with a new value in column specified by
ordered_on
at the expense of ensuring a constant row group size. When used, no newly written row group start in the middle of duplicates values. The main motivation for this feature relates to the need to includeordered_on
column to identify duplicates, as discussed in next section.
duplicates_on
, defaultNone
This keyword specifies the names of columns to identify duplicates. If it is an empty list []
, all columns are used.
Motivation for dropping duplicates is that new values (from new data) can replace old values (in existing data). Typical use case is that of updating OHLC financial datasets, for which the High, Low and Close values of the last candle (in-progress) can change until the candle is completed. When appending newer data, values of this last candle need then to be updated.
The implementation of this logic has been managed as an iterative process on row groups to be written, one row group per one row group (and not over the full dataset). This makes it a low memory footprint operation. This has also 2 important implications. Make sure to understand them and check if it applies correctly to your own use case. If not, a solution for you is to prepare the data the way you intend it to be before recording it anew.
Duplicates in existing data that is not rewritten are not dropped.
Conversely, duplicates in existing data that is rewritten are dropped.
Values in
ordered_on
column also contribute to identifying duplicates. If not already present,ordered_on
column is thus forced into the list of columns defined byduplicates_on
.
max_nirgs
, defaultNone
This keyword specifies the maximum number allowed of incomplete row groups. An incomplete row group is one that does not quite reach max_row_group_size
yet (some approximations of this target are managed within the code).
By using this parameter, you allow a buffer of trailing incomplete row groups. Hence, new data is not systematically merged to existing one, but only appended as new row groups.
The interest is that an appending operation is faster than merging with existing row groups, and for adding only few more rows, merging seems like a heavy, unjustified operation.
Setting max_nirgs
triggers assessment of 2 conditions to initiate a merge (coalescing all incomplete trailing row groups to try making complete ones). Either one or the other has to be met to validate a merge.
max_nirgs
is reached;The total number of rows within the incomplete row groups summed with the number of rows in the new data equals or exceeds max_row_group_size.
Beware that if this feature is used jointly with duplicates_on
, and if new data overlaps with existing data, only overlapping groups are merged together. ‘Full’ coalescing (i.e. with all trailing incomplete row groups) is triggered only if one the abovementionned condition is met.
# Re-using previous variables.
# Initiating a new dataset
ps[idx1] = df1
# Appending the same data.
ps[idx1] = {'max_nirgs': 4}, df1
# Reading.
ps[idx1].pdf
Out[2]:
timestamp temperature
0 2021-01-01 10
1 2021-01-02 11
2 2021-01-03 12
3 2021-01-04 13
4 2021-01-05 14
5 2021-01-01 10 # new appended data
6 2021-01-02 11
7 2021-01-03 12
8 2021-01-04 13
9 2021-01-05 14
Other “goodies”
# Re-using previous variables.
# Review store content.
ps
Out[3]:
germany-berlin
japan-tokyo
# Get number of datasets.
len(ps)
Out[4]: 2
# Delete a dataset (delete data from disk).
del ps[idx1]
ps
Out[5]: japan-tokyo
# 'Discover' an existing dataset collection.
# (initial schema definition is needed)
ps2 = ParquetSet(store_path, DatasetIndex)
ps2
Out[6]: japan-tokyo
# Get min and max from a column of a given dataset.
min_, max_ = ps2[idx2].min_max(col='temperature')