Probabilistic Graphical Models
Probabilistic Graphical Models (PGMs) provide a framework for modeling complex systems by representing probability distributions over sets of random variables using graphs.
Theoretical Foundations
1. Bayesian Networks (Directed Graphical Models)
-
Probability Factorization
- Joint probability factorizes as:
- Where represents the parents of node
-
Conditional Independence
- Variables are conditionally independent given their Markov blanket
- Markov blanket includes: parents, children, and children's parents
2. Markov Random Fields (Undirected Graphical Models)
-
Gibbs Distribution
P(X = x) = \frac{1}{Z} \prod_{c \in C} \phi_c(x_c)
where:
- is the partition function
- are potential functions
- is the set of maximal cliques
-
Hammersley-Clifford Theorem
- Any positive distribution that satisfies local Markov properties can be represented as a Gibbs distribution
Implementation Examples
1. Simple Bayesian Network
import numpy as np
from scipy.stats import norm
import networkx as nx
import matplotlib.pyplot as plt
class BayesianNetwork:
def __init__(self):
self.graph = nx.DiGraph()
self.cpds = {} # Conditional Probability Distributions
def add_node(self, node_name, cpd):
"""Add node with its conditional probability distribution"""
self.graph.add_node(node_name)
self.cpds[node_name] = cpd
def add_edge(self, parent, child):
"""Add directed edge from parent to child"""
self.graph.add_edge(parent, child)
def get_parents(self, node):
"""Get parents of a node"""
return list(self.graph.predecessors(node))
def sample(self, node, parent_values=None):
"""Sample from node's distribution given parent values"""
cpd = self.cpds[node]
if parent_values is None:
return cpd()
return cpd(**parent_values)
# Example: Weather Network
def rain_cpd():
"""Prior probability of rain"""
return np.random.choice([0, 1], p=[0.7, 0.3])
def sprinkler_cpd(rain):
"""Conditional probability of sprinkler given rain"""
if rain:
return np.random.choice([0, 1], p=[0.9, 0.1])
return np.random.choice([0, 1], p=[0.4, 0.6])
def grass_wet_cpd(rain, sprinkler):
"""Conditional probability of wet grass given rain and sprinkler"""
if rain and sprinkler:
return np.random.choice([0, 1], p=[0.1, 0.9])
elif rain:
return np.random.choice([0, 1], p=[0.2, 0.8])
elif sprinkler:
return np.random.choice([0, 1], p=[0.3, 0.7])
return np.random.choice([0, 1], p=[0.9, 0.1])
# Create and visualize network
bn = BayesianNetwork()
bn.add_node('Rain', rain_cpd)
bn.add_node('Sprinkler', sprinkler_cpd)
bn.add_node('Grass_Wet', grass_wet_cpd)
bn.add_edge('Rain', 'Sprinkler')
bn.add_edge('Rain', 'Grass_Wet')
bn.add_edge('Sprinkler', 'Grass_Wet')
2. Markov Random Field
class MarkovRandomField:
def __init__(self):
self.graph = nx.Graph()
self.potentials = {}
def add_node(self, node):
self.graph.add_node(node)
def add_edge(self, node1, node2, potential_func):
"""Add edge with associated potential function"""
self.graph.add_edge(node1, node2)
self.potentials[(node1, node2)] = potential_func
def get_neighbors(self, node):
"""Get neighbors of a node"""
return list(self.graph.neighbors(node))
def compute_energy(self, configuration):
"""Compute energy of a configuration"""
energy = 0
for edge in self.graph.edges():
potential = self.potentials[edge]
energy += -np.log(potential(
configuration[edge[0]],
configuration[edge[1]]
))
return energy
# Example: Ising Model
def ising_potential(s1, s2, beta=1.0):
"""Potential function for the Ising model"""
return np.exp(beta * s1 * s2)
# Create 2D Ising model
def create_2d_ising_model(n, beta=1.0):
mrf = MarkovRandomField()
# Add nodes
for i in range(n):
for j in range(n):
mrf.add_node((i, j))
# Add horizontal edges
for i in range(n):
for j in range(n-1):
mrf.add_edge((i,j), (i,j+1),
lambda s1, s2: ising_potential(s1, s2, beta))
# Add vertical edges
for i in range(n-1):
for j in range(n):
mrf.add_edge((i,j), (i+1,j),
lambda s1, s2: ising_potential(s1, s2, beta))
return mrf
Inference Methods
1. Variable Elimination
def variable_elimination(factors, query_var, evidence):
"""Implement variable elimination algorithm"""
def multiply_factors(f1, f2):
"""Multiply two factors"""
variables = list(set(f1.variables + f2.variables))
new_factor = Factor(variables)
# Implementation details...
return new_factor
def sum_out_variable(factor, variable):
"""Sum out a variable from a factor"""
remaining_vars = [v for v in factor.variables if v != variable]
new_factor = Factor(remaining_vars)
# Implementation details...
return new_factor
# Order variables for elimination
elimination_order = get_elimination_order(factors, query_var)
# Eliminate variables one by one
for var in elimination_order:
# Multiply all factors containing var
relevant_factors = [f for f in factors if var in f.variables]
if not relevant_factors:
continue
product = relevant_factors[0]
for f in relevant_factors[1:]:
product = multiply_factors(product, f)
# Sum out variable
new_factor = sum_out_variable(product, var)
# Update factor list
factors = [f for f in factors if f not in relevant_factors]
factors.append(new_factor)
return normalize(multiply_all_factors(factors))
2. Belief Propagation
class BeliefPropagation:
def __init__(self, graph):
self.graph = graph
self.messages = {}
self.beliefs = {}
def init_messages(self):
"""Initialize messages to uniform distributions"""
for edge in self.graph.edges():
self.messages[edge] = np.ones(2) / 2
self.messages[edge[::-1]] = np.ones(2) / 2
def update_message(self, i, j):
"""Update message from node i to node j"""
neighbors = list(self.graph.neighbors(i))
neighbors.remove(j)
msg = np.ones(2)
for x_i in [0, 1]:
for x_j in [0, 1]:
# Compute product of incoming messages
product = 1
for n in neighbors:
product *= self.messages[(n, i)][x_i]
# Multiply by local potential
potential = self.graph.potentials[(i, j)](x_i, x_j)
msg[x_j] *= potential * product
# Normalize message
msg /= np.sum(msg)
self.messages[(i, j)] = msg
def run_belief_propagation(self, max_iter=100, tol=1e-6):
"""Run belief propagation algorithm"""
self.init_messages()
for iteration in range(max_iter):
old_messages = self.messages.copy()
# Update all messages
for edge in self.graph.edges():
self.update_message(*edge)
self.update_message(*edge[::-1])
# Check convergence
diff = max(np.max(np.abs(old_messages[e] - self.messages[e]))
for e in self.messages)
if diff < tol:
break
# Compute final beliefs
self.compute_beliefs()
def compute_beliefs(self):
"""Compute beliefs for all nodes"""
for node in self.graph.nodes():
belief = np.ones(2)
for neighbor in self.graph.neighbors(node):
belief *= self.messages[(neighbor, node)]
belief /= np.sum(belief)
self.beliefs[node] = belief
Learning Methods
1. Maximum Likelihood Estimation
def mle_parameters(data, structure):
"""Maximum likelihood estimation of parameters"""
parameters = {}
for node in structure.nodes():
parents = structure.get_parents(node)
if not parents:
# Estimate prior probability
counts = np.bincount(data[node])
parameters[node] = counts / len(data)
else:
# Estimate conditional probabilities
parent_configs = get_parent_configurations(data, parents)
for config in parent_configs:
mask = get_config_mask(data, parents, config)
counts = np.bincount(data[node][mask])
parameters[(node, config)] = counts / len(mask)
return parameters
2. Structure Learning
def score_structure(data, structure, score_type='bic'):
"""Score a network structure"""
def compute_bic(data, structure):
"""Compute BIC score"""
ll = log_likelihood(data, structure)
n_params = count_parameters(structure)
n_samples = len(data)
return ll - 0.5 * n_params * np.log(n_samples)
if score_type == 'bic':
return compute_bic(data, structure)
# Add other scoring functions as needed
def structure_learning(data, score_type='bic'):
"""Learn network structure from data"""
current_structure = initialize_structure()
current_score = score_structure(data, current_structure, score_type)
while True:
# Try possible edge additions/deletions/reversals
operations = get_valid_operations(current_structure)
best_operation = None
best_score = current_score
for op in operations:
new_structure = apply_operation(current_structure, op)
score = score_structure(data, new_structure, score_type)
if score > best_score:
best_score = score
best_operation = op
if best_operation is None:
break
current_structure = apply_operation(
current_structure, best_operation)
current_score = best_score
return current_structure
Advanced Topics
1. Dynamic Bayesian Networks
class DynamicBayesianNetwork:
def __init__(self, transition_model, emission_model):
self.transition_model = transition_model
self.emission_model = emission_model
def forward_algorithm(self, observations):
"""Implement forward algorithm for filtering"""
T = len(observations)
n_states = self.transition_model.shape[0]
alpha = np.zeros((T, n_states))
# Initialize
alpha[0] = self.emission_model[:, observations[0]]
# Forward pass
for t in range(1, T):
for j in range(n_states):
alpha[t, j] = self.emission_model[j, observations[t]] * \
np.sum(alpha[t-1] * self.transition_model[:, j])
return alpha
2. Factor Graphs
class FactorGraph:
def __init__(self):
self.var_nodes = set()
self.factor_nodes = set()
self.edges = set()
def add_variable(self, var):
self.var_nodes.add(var)
def add_factor(self, factor, connected_vars):
self.factor_nodes.add(factor)
for var in connected_vars:
self.edges.add((factor, var))
def get_neighboring_factors(self, var):
return {f for f, v in self.edges if v == var}
def get_neighboring_variables(self, factor):
return {v for f, v in self.edges if f == factor}
Best Practices
-
Model Selection
- Choose appropriate graph structure
- Consider domain knowledge
- Balance model complexity
- Validate assumptions
-
Inference
- Select suitable inference method
- Consider computational constraints
- Handle numerical stability
- Implement approximations when needed
-
Learning
- Handle missing data
- Use regularization
- Cross-validate parameters
- Monitor convergence
-
Evaluation
- Use multiple metrics
- Test predictive performance
- Validate causal relationships
- Consider interpretability