Time Series Analysis
Understanding and implementing time series analysis and forecasting techniques for analyzing and forecasting time series data
Time Series Preprocessing
Data Loading and Cleaning
import pandas as pd
import numpy as np
from statsmodels.tsa.stattools import adfuller
def load_time_series(data_path, date_column, target_column):
# Load data
df = pd.read_csv(data_path)
# Convert to datetime
df[date_column] = pd.to_datetime(df[date_column])
# Set index
df.set_index(date_column, inplace=True)
# Sort index
df.sort_index(inplace=True)
return df[target_column]
def check_stationarity(series):
# Perform Augmented Dickey-Fuller test
result = adfuller(series)
return {
'test_statistic': result[0],
'p_value': result[1],
'critical_values': result[4],
'is_stationary': result[1] < 0.05
}
Time Series Decomposition
from statsmodels.tsa.seasonal import seasonal_decompose
def decompose_time_series(series, period=None):
# Perform decomposition
decomposition = seasonal_decompose(
series,
period=period,
extrapolate_trend='freq'
)
return {
'trend': decomposition.trend,
'seasonal': decomposition.seasonal,
'residual': decomposition.resid
}
def plot_decomposition(decomposition):
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(12, 10))
# Plot components
decomposition['trend'].plot(ax=ax1)
ax1.set_title('Trend')
decomposition['seasonal'].plot(ax=ax2)
ax2.set_title('Seasonal')
decomposition['residual'].plot(ax=ax3)
ax3.set_title('Residual')
plt.tight_layout()
return fig
Statistical Models
ARIMA Models
from statsmodels.tsa.arima.model import ARIMA
from pmdarima import auto_arima
class ARIMAForecaster:
def __init__(self, order=None):
self.order = order
self.model = None
def find_optimal_order(self, series):
# Automatically find best parameters
model = auto_arima(
series,
start_p=0, start_q=0,
max_p=5, max_q=5,
m=1,
start_P=0, start_Q=0,
max_P=5, max_Q=5,
seasonal=True,
d=1, D=1,
trace=True,
error_action='ignore',
suppress_warnings=True,
stepwise=True
)
self.order = model.order
return self.order
def fit(self, series):
if self.order is None:
self.find_optimal_order(series)
self.model = ARIMA(
series,
order=self.order
).fit()
return self
def predict(self, steps=1):
return self.model.forecast(steps)
def get_diagnostics(self):
return {
'aic': self.model.aic,
'bic': self.model.bic,
'residuals': self.model.resid
}
SARIMA Models
from statsmodels.tsa.statespace.sarimax import SARIMAX
class SARIMAXForecaster:
def __init__(self, order=(1,1,1), seasonal_order=(1,1,1,12)):
self.order = order
self.seasonal_order = seasonal_order
self.model = None
def fit(self, series, exog=None):
self.model = SARIMAX(
series,
order=self.order,
seasonal_order=self.seasonal_order,
exog=exog
).fit()
return self
def predict(self, steps=1, exog=None):
return self.model.forecast(steps, exog=exog)
def get_summary(self):
return self.model.summary()
Deep Learning Models
LSTM for Time Series
import torch
import torch.nn as nn
class LSTMForecaster(nn.Module):
def __init__(self, input_size, hidden_size, num_layers=1):
super().__init__()
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True
)
self.linear = nn.Linear(hidden_size, 1)
def forward(self, x):
lstm_out, _ = self.lstm(x)
predictions = self.linear(lstm_out[:, -1, :])
return predictions
def train_lstm_forecaster(model, train_loader, num_epochs=100):
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters())
for epoch in range(num_epochs):
model.train()
total_loss = 0
for X_batch, y_batch in train_loader:
optimizer.zero_grad()
# Forward pass
outputs = model(X_batch)
loss = criterion(outputs, y_batch)
# Backward pass
loss.backward()
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(train_loader)
if epoch % 10 == 0:
print(f'Epoch [{epoch}/{num_epochs}], Loss: {avg_loss:.4f}')
Transformer for Time Series
class TimeSeriesTransformer(nn.Module):
def __init__(
self,
input_size,
d_model=512,
nhead=8,
num_layers=6
):
super().__init__()
self.embedding = nn.Linear(input_size, d_model)
self.transformer = nn.Transformer(
d_model=d_model,
nhead=nhead,
num_encoder_layers=num_layers,
num_decoder_layers=num_layers
)
self.fc = nn.Linear(d_model, 1)
def forward(self, src, tgt):
# Embed input
src = self.embedding(src)
tgt = self.embedding(tgt)
# Transform
output = self.transformer(src, tgt)
# Predict
predictions = self.fc(output)
return predictions
Feature Engineering
Time-based Features
def create_time_features(df):
df = df.copy()
# Extract time components
df['hour'] = df.index.hour
df['dayofweek'] = df.index.dayofweek
df['quarter'] = df.index.quarter
df['month'] = df.index.month
df['year'] = df.index.year
df['dayofyear'] = df.index.dayofyear
# Cyclical features
df['hour_sin'] = np.sin(2 * np.pi * df['hour']/24)
df['hour_cos'] = np.cos(2 * np.pi * df['hour']/24)
return df
def create_lag_features(series, lags):
df = pd.DataFrame(series)
for lag in lags:
df[f'lag_{lag}'] = series.shift(lag)
return df.dropna()
Rolling Statistics
def add_rolling_features(df, windows=[7, 14, 30]):
for window in windows:
df[f'rolling_mean_{window}'] = df['value'].rolling(window=window).mean()
df[f'rolling_std_{window}'] = df['value'].rolling(window=window).std()
df[f'rolling_min_{window}'] = df['value'].rolling(window=window).min()
df[f'rolling_max_{window}'] = df['value'].rolling(window=window).max()
return df
Evaluation Metrics
def calculate_time_series_metrics(y_true, y_pred):
metrics = {}
# Mean Absolute Error
metrics['mae'] = np.mean(np.abs(y_true - y_pred))
# Root Mean Square Error
metrics['rmse'] = np.sqrt(np.mean((y_true - y_pred)**2))
# Mean Absolute Percentage Error
metrics['mape'] = np.mean(np.abs((y_true - y_pred) / y_true)) * 100
# Symmetric Mean Absolute Percentage Error
metrics['smape'] = np.mean(2 * np.abs(y_pred - y_true) / (np.abs(y_pred) + np.abs(y_true))) * 100
return metrics
def plot_forecast_vs_actual(y_true, y_pred, title='Forecast vs Actual'):
plt.figure(figsize=(12, 6))
plt.plot(y_true, label='Actual')
plt.plot(y_pred, label='Forecast')
plt.title(title)
plt.legend()
return plt
Cross-Validation
from sklearn.model_selection import TimeSeriesSplit
def time_series_cv(model, X, y, n_splits=5):
tscv = TimeSeriesSplit(n_splits=n_splits)
scores = []
for train_idx, val_idx in tscv.split(X):
X_train, X_val = X[train_idx], X[val_idx]
y_train, y_val = y[train_idx], y[val_idx]
# Train model
model.fit(X_train, y_train)
# Make predictions
y_pred = model.predict(X_val)
# Calculate metrics
metrics = calculate_time_series_metrics(y_val, y_pred)
scores.append(metrics)
return pd.DataFrame(scores)
Best Practices
- Data Preparation
def prepare_time_series_data(df, target_col, test_size=0.2):
# Sort by date
df = df.sort_index()
# Create features
df = create_time_features(df)
df = add_rolling_features(df)
# Split data
train_size = int(len(df) * (1 - test_size))
train_data = df[:train_size]
test_data = df[train_size:]
return train_data, test_data
- Model Selection
def select_forecasting_model(series, seasonality=False, exog=None):
if seasonality:
if exog is not None:
return SARIMAXForecaster()
else:
return ARIMAForecaster()
else:
if len(series) > 1000:
return LSTMForecaster(
input_size=1,
hidden_size=64
)
else:
return ARIMAForecaster()
- Forecast Evaluation
def evaluate_forecast(y_true, y_pred, confidence_level=0.95):
# Calculate metrics
metrics = calculate_time_series_metrics(y_true, y_pred)
# Calculate prediction intervals
std_error = np.std(y_true - y_pred)
z_score = stats.norm.ppf((1 + confidence_level) / 2)
lower_bound = y_pred - z_score * std_error
upper_bound = y_pred + z_score * std_error
return {
'metrics': metrics,
'prediction_intervals': {
'lower': lower_bound,
'upper': upper_bound
}
}