Time Series
This tutorial demonstrates time series grouping patterns with Tafra: grouping
by year and month, partitioning for parallel forecasting dispatch, chunking for
batch processing, and reassembling results with Tafra.concat().
Build the dataset
We construct 24 months of synthetic daily revenue data (730 rows).
import numpy as np
from tafra import Tafra
rng = np.random.default_rng(42)
# 730 days starting 2023-01-01
dates = np.arange('2023-01-01', '2025-01-01', dtype='datetime64[D]')[:730]
n = len(dates)
# Synthetic revenue with trend + seasonal component
day_index = np.arange(n, dtype=float)
trend = 100.0 + 0.05 * day_index
seasonal = 20.0 * np.sin(2 * np.pi * day_index / 365.25)
noise = rng.normal(0, 5, n)
revenue = trend + seasonal + noise
ts = Tafra({
'date': dates,
'revenue': revenue,
})
print(f'Rows: {ts.rows}')
print(f'Date range: {ts["date"][0]} to {ts["date"][-1]}')
print(f'Revenue range: {ts["revenue"].min():.1f} to {ts["revenue"].max():.1f}')
Data sample (first 7 rows)
date | revenue
-----------+--------
2023-01-01 | 101.5
2023-01-02 | 99.2
2023-01-03 | 103.8
2023-01-04 | 107.1
2023-01-05 | 96.4
2023-01-06 | 102.3
2023-01-07 | 100.9
(Values are approximate -- exact output depends on the RNG seed.)
Extract year and month
Tafra stores datetime columns as numpy datetime64 arrays. Extract year and
month using numpy datetime operations:
# numpy datetime64 arithmetic to extract year and month
ts['year'] = dates.astype('datetime64[Y]').astype(int) + 1970
ts['month'] = dates.astype('datetime64[M]').astype(int) % 12 + 1
print(f'Years: {np.unique(ts["year"])}')
print(f'Months: {np.unique(ts["month"])}')
Group by year
Aggregate annual totals using group_by:
annual = ts.group_by(
['year'],
{
'total_revenue': (np.sum, 'revenue'),
'mean_revenue': (np.mean, 'revenue'),
'std_revenue': (np.std, 'revenue'),
'days': (len, 'revenue'),
},
)
for i in range(annual.rows):
print(f'{annual["year"][i]}:'
f' total={annual["total_revenue"][i]:,.0f}'
f' mean={annual["mean_revenue"][i]:.1f}'
f' std={annual["std_revenue"][i]:.1f}'
f' days={annual["days"][i]:.0f}')
Output (approximate)
Group by year and month
A custom aggregation function extracts the period label from datetime values:
monthly = ts.group_by(
['year', 'month'],
{
'total_revenue': (np.sum, 'revenue'),
'mean_revenue': (np.mean, 'revenue'),
'days': (len, 'revenue'),
},
)
print(f'Monthly groups: {monthly.rows}')
print()
# Show first 6 months
for i in range(6):
print(f'{monthly["year"][i]}-{monthly["month"][i]:02d}:'
f' total={monthly["total_revenue"][i]:,.0f}'
f' mean={monthly["mean_revenue"][i]:.1f}'
f' days={monthly["days"][i]:.0f}')
Output (approximate)
Monthly aggregation -- table (first year)
period | total_revenue | mean_revenue | days
--------+---------------+--------------+-----
2023-01 | 3,145 | 101.5 | 31
2023-02 | 2,922 | 104.4 | 28
2023-03 | 3,478 | 112.2 | 31
2023-04 | 3,504 | 116.8 | 30
2023-05 | 3,795 | 122.4 | 31
2023-06 | 3,706 | 123.5 | 30
2023-07 | 3,752 | 121.0 | 31
2023-08 | 3,604 | 116.3 | 31
2023-09 | 3,294 | 109.8 | 30
2023-10 | 3,321 | 107.1 | 31
2023-11 | 3,125 | 104.2 | 30
2023-12 | 3,420 | 110.3 | 31
(All values approximate.)
Partition by year for parallel forecasting
partition splits the data by group, preserving all rows within each group.
This is ideal for dispatching each year to a separate worker process:
parts = ts.partition(['year'])
for key, sub in parts:
print(f'Year {key[0]}: {sub.rows} rows,'
f' date range {sub["date"][0]} to {sub["date"][-1]}')
Output
Process each partition and reassemble:
def forecast_year(args):
"""Simple linear trend forecast for one year's data."""
key, sub = args
x = np.arange(sub.rows, dtype=float)
# Fit linear trend: revenue = a + b*x
b = (np.mean(x * sub['revenue']) - np.mean(x) * np.mean(sub['revenue'])) / np.var(x)
a = np.mean(sub['revenue']) - b * np.mean(x)
fitted = a + b * x
return Tafra({
'date': sub['date'],
'actual': sub['revenue'],
'fitted': fitted,
'residual': sub['revenue'] - fitted,
})
# Process each year (in parallel you'd use multiprocessing.Pool.map)
results = [forecast_year(part) for part in parts]
combined = Tafra.concat(results)
print(f'Combined rows: {combined.rows}')
print(f'Columns: {list(combined.columns)}')
print(f'Mean absolute residual: {np.mean(np.abs(combined["residual"])):.2f}')
Output (approximate)
Chunk by row count for batch processing
chunk_rows splits into pieces of at most size rows, useful for batch
uploads, memory-constrained processing, or progress reporting:
chunks = ts.chunk_rows(200)
print(f'Number of chunks: {len(chunks)}')
for i, chunk in enumerate(chunks):
print(f'Chunk {i}: {chunk.rows} rows,'
f' {chunk["date"][0]} to {chunk["date"][-1]}')
Output
Split into equal chunks
chunks(n) splits into approximately equal pieces:
equal_parts = ts.chunks(3)
print(f'Number of chunks: {len(equal_parts)}')
for i, chunk in enumerate(equal_parts):
print(f'Chunk {i}: {chunk.rows} rows')
Reassemble with Tafra.concat()
After processing chunks or partitions in parallel, concatenate results back into a single Tafra:
# Process each chunk -- e.g. compute running statistics
processed = []
for chunk in chunks:
result = Tafra({
'date': chunk['date'],
'revenue': chunk['revenue'],
'cumulative_avg': np.cumsum(chunk['revenue']) / np.arange(1, chunk.rows + 1),
})
processed.append(result)
final = Tafra.concat(processed)
print(f'Final rows: {final.rows}')
print(f'Columns: {list(final.columns)}')
Transform: broadcast monthly statistics
Use transform to add monthly averages to every row without reducing the
data:
monthly_stats = ts.transform(
['year', 'month'],
{
'monthly_mean': (np.mean, 'revenue'),
'monthly_std': (np.std, 'revenue'),
},
)
# Compute z-score for each day relative to its month
z_scores = (ts['revenue'] - monthly_stats['monthly_mean']) / monthly_stats['monthly_std']
ts['z_score'] = z_scores
# Find the most unusual days
extreme = np.argsort(np.abs(ts['z_score']))[-3:][::-1]
for idx in extreme:
print(f'{ts["date"][idx]}: revenue={ts["revenue"][idx]:.1f},'
f' z-score={ts["z_score"][idx]:.2f}')
Output (approximate)
Summary
This tutorial covered:
- Constructing a Tafra with
datetime64columns - Extracting year and month from datetime arrays
group_byto aggregate by time period (annual, monthly)partitionto split by year for parallel forecastingchunk_rowsandchunksfor batch splittingTafra.concat()to reassemble processed resultstransformto broadcast monthly statistics back to every row