Time Series Analysis

Understanding and implementing time series analysis and forecasting techniques for analyzing and forecasting time series data

Time Series Preprocessing

Data Loading and Cleaning

import pandas as pd
import numpy as np
from statsmodels.tsa.stattools import adfuller

def load_time_series(data_path, date_column, target_column):
    # Load data
    df = pd.read_csv(data_path)
    
    # Convert to datetime
    df[date_column] = pd.to_datetime(df[date_column])
    
    # Set index
    df.set_index(date_column, inplace=True)
    
    # Sort index
    df.sort_index(inplace=True)
    
    return df[target_column]

def check_stationarity(series):
    # Perform Augmented Dickey-Fuller test
    result = adfuller(series)
    
    return {
        'test_statistic': result[0],
        'p_value': result[1],
        'critical_values': result[4],
        'is_stationary': result[1] < 0.05
    }

Time Series Decomposition

from statsmodels.tsa.seasonal import seasonal_decompose

def decompose_time_series(series, period=None):
    # Perform decomposition
    decomposition = seasonal_decompose(
        series,
        period=period,
        extrapolate_trend='freq'
    )
    
    return {
        'trend': decomposition.trend,
        'seasonal': decomposition.seasonal,
        'residual': decomposition.resid
    }

def plot_decomposition(decomposition):
    fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(12, 10))
    
    # Plot components
    decomposition['trend'].plot(ax=ax1)
    ax1.set_title('Trend')
    
    decomposition['seasonal'].plot(ax=ax2)
    ax2.set_title('Seasonal')
    
    decomposition['residual'].plot(ax=ax3)
    ax3.set_title('Residual')
    
    plt.tight_layout()
    return fig

Statistical Models

ARIMA Models

from statsmodels.tsa.arima.model import ARIMA
from pmdarima import auto_arima

class ARIMAForecaster:
    def __init__(self, order=None):
        self.order = order
        self.model = None
        
    def find_optimal_order(self, series):
        # Automatically find best parameters
        model = auto_arima(
            series,
            start_p=0, start_q=0,
            max_p=5, max_q=5,
            m=1,
            start_P=0, start_Q=0,
            max_P=5, max_Q=5,
            seasonal=True,
            d=1, D=1,
            trace=True,
            error_action='ignore',
            suppress_warnings=True,
            stepwise=True
        )
        
        self.order = model.order
        return self.order
    
    def fit(self, series):
        if self.order is None:
            self.find_optimal_order(series)
            
        self.model = ARIMA(
            series,
            order=self.order
        ).fit()
        
        return self
    
    def predict(self, steps=1):
        return self.model.forecast(steps)
    
    def get_diagnostics(self):
        return {
            'aic': self.model.aic,
            'bic': self.model.bic,
            'residuals': self.model.resid
        }

SARIMA Models

from statsmodels.tsa.statespace.sarimax import SARIMAX

class SARIMAXForecaster:
    def __init__(self, order=(1,1,1), seasonal_order=(1,1,1,12)):
        self.order = order
        self.seasonal_order = seasonal_order
        self.model = None
        
    def fit(self, series, exog=None):
        self.model = SARIMAX(
            series,
            order=self.order,
            seasonal_order=self.seasonal_order,
            exog=exog
        ).fit()
        
        return self
    
    def predict(self, steps=1, exog=None):
        return self.model.forecast(steps, exog=exog)
    
    def get_summary(self):
        return self.model.summary()

Deep Learning Models

LSTM for Time Series

import torch
import torch.nn as nn

class LSTMForecaster(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers=1):
        super().__init__()
        
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True
        )
        
        self.linear = nn.Linear(hidden_size, 1)
        
    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        predictions = self.linear(lstm_out[:, -1, :])
        return predictions

def train_lstm_forecaster(model, train_loader, num_epochs=100):
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters())
    
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        
        for X_batch, y_batch in train_loader:
            optimizer.zero_grad()
            
            # Forward pass
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            
            # Backward pass
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
            
        avg_loss = total_loss / len(train_loader)
        if epoch % 10 == 0:
            print(f'Epoch [{epoch}/{num_epochs}], Loss: {avg_loss:.4f}')

Transformer for Time Series

class TimeSeriesTransformer(nn.Module):
    def __init__(
        self,
        input_size,
        d_model=512,
        nhead=8,
        num_layers=6
    ):
        super().__init__()
        
        self.embedding = nn.Linear(input_size, d_model)
        
        self.transformer = nn.Transformer(
            d_model=d_model,
            nhead=nhead,
            num_encoder_layers=num_layers,
            num_decoder_layers=num_layers
        )
        
        self.fc = nn.Linear(d_model, 1)
        
    def forward(self, src, tgt):
        # Embed input
        src = self.embedding(src)
        tgt = self.embedding(tgt)
        
        # Transform
        output = self.transformer(src, tgt)
        
        # Predict
        predictions = self.fc(output)
        return predictions

Feature Engineering

Time-based Features

def create_time_features(df):
    df = df.copy()
    
    # Extract time components
    df['hour'] = df.index.hour
    df['dayofweek'] = df.index.dayofweek
    df['quarter'] = df.index.quarter
    df['month'] = df.index.month
    df['year'] = df.index.year
    df['dayofyear'] = df.index.dayofyear
    
    # Cyclical features
    df['hour_sin'] = np.sin(2 * np.pi * df['hour']/24)
    df['hour_cos'] = np.cos(2 * np.pi * df['hour']/24)
    
    return df

def create_lag_features(series, lags):
    df = pd.DataFrame(series)
    
    for lag in lags:
        df[f'lag_{lag}'] = series.shift(lag)
    
    return df.dropna()

Rolling Statistics

def add_rolling_features(df, windows=[7, 14, 30]):
    for window in windows:
        df[f'rolling_mean_{window}'] = df['value'].rolling(window=window).mean()
        df[f'rolling_std_{window}'] = df['value'].rolling(window=window).std()
        df[f'rolling_min_{window}'] = df['value'].rolling(window=window).min()
        df[f'rolling_max_{window}'] = df['value'].rolling(window=window).max()
    
    return df

Evaluation Metrics

def calculate_time_series_metrics(y_true, y_pred):
    metrics = {}
    
    # Mean Absolute Error
    metrics['mae'] = np.mean(np.abs(y_true - y_pred))
    
    # Root Mean Square Error
    metrics['rmse'] = np.sqrt(np.mean((y_true - y_pred)**2))
    
    # Mean Absolute Percentage Error
    metrics['mape'] = np.mean(np.abs((y_true - y_pred) / y_true)) * 100
    
    # Symmetric Mean Absolute Percentage Error
    metrics['smape'] = np.mean(2 * np.abs(y_pred - y_true) / (np.abs(y_pred) + np.abs(y_true))) * 100
    
    return metrics

def plot_forecast_vs_actual(y_true, y_pred, title='Forecast vs Actual'):
    plt.figure(figsize=(12, 6))
    plt.plot(y_true, label='Actual')
    plt.plot(y_pred, label='Forecast')
    plt.title(title)
    plt.legend()
    return plt

Cross-Validation

from sklearn.model_selection import TimeSeriesSplit

def time_series_cv(model, X, y, n_splits=5):
    tscv = TimeSeriesSplit(n_splits=n_splits)
    scores = []
    
    for train_idx, val_idx in tscv.split(X):
        X_train, X_val = X[train_idx], X[val_idx]
        y_train, y_val = y[train_idx], y[val_idx]
        
        # Train model
        model.fit(X_train, y_train)
        
        # Make predictions
        y_pred = model.predict(X_val)
        
        # Calculate metrics
        metrics = calculate_time_series_metrics(y_val, y_pred)
        scores.append(metrics)
    
    return pd.DataFrame(scores)

Best Practices

  1. Data Preparation
def prepare_time_series_data(df, target_col, test_size=0.2):
    # Sort by date
    df = df.sort_index()
    
    # Create features
    df = create_time_features(df)
    df = add_rolling_features(df)
    
    # Split data
    train_size = int(len(df) * (1 - test_size))
    train_data = df[:train_size]
    test_data = df[train_size:]
    
    return train_data, test_data
  1. Model Selection
def select_forecasting_model(series, seasonality=False, exog=None):
    if seasonality:
        if exog is not None:
            return SARIMAXForecaster()
        else:
            return ARIMAForecaster()
    else:
        if len(series) > 1000:
            return LSTMForecaster(
                input_size=1,
                hidden_size=64
            )
        else:
            return ARIMAForecaster()
  1. Forecast Evaluation
def evaluate_forecast(y_true, y_pred, confidence_level=0.95):
    # Calculate metrics
    metrics = calculate_time_series_metrics(y_true, y_pred)
    
    # Calculate prediction intervals
    std_error = np.std(y_true - y_pred)
    z_score = stats.norm.ppf((1 + confidence_level) / 2)
    
    lower_bound = y_pred - z_score * std_error
    upper_bound = y_pred + z_score * std_error
    
    return {
        'metrics': metrics,
        'prediction_intervals': {
            'lower': lower_bound,
            'upper': upper_bound
        }
    }