Mixed Circular and Normal Neural Spline Flow¶
This is a Neural Spline Flow model which has circularand unbounded random variables combined in one random vector.
In [ ]:
Copied!
# Import packages
import torch
import numpy as np
import normflows as nf
from matplotlib import pyplot as plt
from tqdm import tqdm
# Import packages
import torch
import numpy as np
import normflows as nf
from matplotlib import pyplot as plt
from tqdm import tqdm
In [ ]:
Copied!
# Set up target
class Target:
def __init__(self, ndim, ind_circ):
self.ndim = ndim
self.ind_circ = ind_circ
def sample(self, n):
s = torch.randn(n, self.ndim)
c = torch.rand(n, self.ndim) > 0.6
s = c * (0.3 * s - 0.5) + (1 - 1. * c) * (s + 1.3)
u = torch.rand(n, len(self.ind_circ))
s_ = torch.acos(2 * u - 1)
c = torch.rand(n, len(self.ind_circ)) > 0.3
s_[c] = -s_[c]
s[:, self.ind_circ] = (s_ + 1) % (2 * np.pi) - np.pi
return s
# Visualize target
target = Target(2, [1])
s = target.sample(1000000)
plt.hist(s[:, 0].data.numpy(), bins=200)
plt.show()
plt.hist(s[:, 1].data.numpy(), bins=200)
plt.show()
# Set up target
class Target:
def __init__(self, ndim, ind_circ):
self.ndim = ndim
self.ind_circ = ind_circ
def sample(self, n):
s = torch.randn(n, self.ndim)
c = torch.rand(n, self.ndim) > 0.6
s = c * (0.3 * s - 0.5) + (1 - 1. * c) * (s + 1.3)
u = torch.rand(n, len(self.ind_circ))
s_ = torch.acos(2 * u - 1)
c = torch.rand(n, len(self.ind_circ)) > 0.3
s_[c] = -s_[c]
s[:, self.ind_circ] = (s_ + 1) % (2 * np.pi) - np.pi
return s
# Visualize target
target = Target(2, [1])
s = target.sample(1000000)
plt.hist(s[:, 0].data.numpy(), bins=200)
plt.show()
plt.hist(s[:, 1].data.numpy(), bins=200)
plt.show()
In [ ]:
Copied!
base = nf.distributions.UniformGaussian(2, [1], torch.tensor([1., 2 * np.pi]))
# Visualize base
s = base.sample(1000000)
plt.hist(s[:, 0].data.numpy(), bins=200)
plt.show()
plt.hist(s[:, 1].data.numpy(), bins=200)
plt.show()
base = nf.distributions.UniformGaussian(2, [1], torch.tensor([1., 2 * np.pi]))
# Visualize base
s = base.sample(1000000)
plt.hist(s[:, 0].data.numpy(), bins=200)
plt.show()
plt.hist(s[:, 1].data.numpy(), bins=200)
plt.show()
In [ ]:
Copied!
# Create normalizing flow
K = 20
flow_layers = []
for i in range(K):
flow_layers += [nf.flows.CircularAutoregressiveRationalQuadraticSpline(2, 1, 128, [1],
tail_bound=torch.tensor([5., np.pi]),
permute_mask=True)]
model = nf.NormalizingFlow(base, flow_layers)
# Move model on GPU if available
enable_cuda = True
device = torch.device('cuda' if torch.cuda.is_available() and enable_cuda else 'cpu')
model = model.to(device)
# Create normalizing flow
K = 20
flow_layers = []
for i in range(K):
flow_layers += [nf.flows.CircularAutoregressiveRationalQuadraticSpline(2, 1, 128, [1],
tail_bound=torch.tensor([5., np.pi]),
permute_mask=True)]
model = nf.NormalizingFlow(base, flow_layers)
# Move model on GPU if available
enable_cuda = True
device = torch.device('cuda' if torch.cuda.is_available() and enable_cuda else 'cpu')
model = model.to(device)
In [ ]:
Copied!
model.eval()
with torch.no_grad():
s, _ = model.sample(50000)
model.train()
plt.hist(s[:, 0].cpu().data.numpy(), bins=100)
plt.show()
plt.hist(s[:, 1].cpu().data.numpy(), bins=100)
plt.show()
model.eval()
with torch.no_grad():
s, _ = model.sample(50000)
model.train()
plt.hist(s[:, 0].cpu().data.numpy(), bins=100)
plt.show()
plt.hist(s[:, 1].cpu().data.numpy(), bins=100)
plt.show()
In [ ]:
Copied!
# Train model
max_iter = 20000
num_samples = 2 ** 10
show_iter = 5000
loss_hist = np.array([])
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-4)
for it in tqdm(range(max_iter)):
optimizer.zero_grad()
# Get training samples
x = target.sample(num_samples)
# Compute loss
loss = model.forward_kld(x.to(device))
# Do backprop and optimizer step
if ~(torch.isnan(loss) | torch.isinf(loss)):
loss.backward()
optimizer.step()
# Log loss
loss_hist = np.append(loss_hist, loss.to('cpu').data.numpy())
# Plot learned density
if (it + 1) % show_iter == 0:
model.eval()
with torch.no_grad():
s, _ = model.sample(50000)
model.train()
plt.hist(s[:, 0].cpu().data.numpy(), bins=100)
plt.show()
plt.hist((s[:, 1].cpu().data.numpy() - 1) % (2 * np.pi), bins=100)
plt.show()
# Plot loss
plt.figure(figsize=(10, 10))
plt.plot(loss_hist, label='loss')
plt.legend()
plt.show()
# Train model
max_iter = 20000
num_samples = 2 ** 10
show_iter = 5000
loss_hist = np.array([])
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-4)
for it in tqdm(range(max_iter)):
optimizer.zero_grad()
# Get training samples
x = target.sample(num_samples)
# Compute loss
loss = model.forward_kld(x.to(device))
# Do backprop and optimizer step
if ~(torch.isnan(loss) | torch.isinf(loss)):
loss.backward()
optimizer.step()
# Log loss
loss_hist = np.append(loss_hist, loss.to('cpu').data.numpy())
# Plot learned density
if (it + 1) % show_iter == 0:
model.eval()
with torch.no_grad():
s, _ = model.sample(50000)
model.train()
plt.hist(s[:, 0].cpu().data.numpy(), bins=100)
plt.show()
plt.hist((s[:, 1].cpu().data.numpy() - 1) % (2 * np.pi), bins=100)
plt.show()
# Plot loss
plt.figure(figsize=(10, 10))
plt.plot(loss_hist, label='loss')
plt.legend()
plt.show()