Comparison of Planar, Radial, and Affine Coupling Flows¶
In this notebook, we train normalizing flows to fit predefined prior distributions, testing their expressivity. The plots are generated to visualize the learned distributions for given layers $K$, and the training loss is plotted to compare the expressivity of different flows.
In [ ]:
Copied!
%load_ext autoreload
%autoreload 2
# Import required packages
import torch
import numpy as np
import normflows as nf
from matplotlib import pyplot as plt
from tqdm import tqdm
print("PyTorch version: %s" % torch.__version__)
dev = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print("Using device: %s" % dev)
#z shape is (batch_size, num_samples, dim)
%load_ext autoreload
%autoreload 2
# Import required packages
import torch
import numpy as np
import normflows as nf
from matplotlib import pyplot as plt
from tqdm import tqdm
print("PyTorch version: %s" % torch.__version__)
dev = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print("Using device: %s" % dev)
#z shape is (batch_size, num_samples, dim)
Prior target distribution visualization¶
In [ ]:
Copied!
priors = []
priors.append(nf.distributions.TwoModes(2.0, 0.2))
priors.append(nf.distributions.Sinusoidal(0.4, 4))
priors.append(nf.distributions.Sinusoidal_gap(0.4, 4))
priors.append(nf.distributions.Sinusoidal_split(0.4, 4))
priors.append(nf.distributions.Smiley(0.15))
# Plot prior distributions
grid_size = 200
grid_length = 4.0
grid_shape = ([-grid_length, grid_length], [-grid_length, grid_length])
space_mesh = torch.linspace(-grid_length, grid_length, grid_size)
xx, yy = torch.meshgrid(space_mesh, space_mesh)
z = torch.cat([xx.unsqueeze(2), yy.unsqueeze(2)], 2)
z = z.reshape(-1, 2)
K_arr = [2, 8, 32]
max_iter = 30000
batch_size = 512
num_samples = 256
save_iter = 1000
for k in range(len(priors)):
log_prob = priors[k].log_prob(z)
prob = torch.exp(log_prob)
plt.figure(figsize=(10, 10))
plt.pcolormesh(xx, yy, prob.reshape(grid_size, grid_size))
plt.show()
priors = []
priors.append(nf.distributions.TwoModes(2.0, 0.2))
priors.append(nf.distributions.Sinusoidal(0.4, 4))
priors.append(nf.distributions.Sinusoidal_gap(0.4, 4))
priors.append(nf.distributions.Sinusoidal_split(0.4, 4))
priors.append(nf.distributions.Smiley(0.15))
# Plot prior distributions
grid_size = 200
grid_length = 4.0
grid_shape = ([-grid_length, grid_length], [-grid_length, grid_length])
space_mesh = torch.linspace(-grid_length, grid_length, grid_size)
xx, yy = torch.meshgrid(space_mesh, space_mesh)
z = torch.cat([xx.unsqueeze(2), yy.unsqueeze(2)], 2)
z = z.reshape(-1, 2)
K_arr = [2, 8, 32]
max_iter = 30000
batch_size = 512
num_samples = 256
save_iter = 1000
for k in range(len(priors)):
log_prob = priors[k].log_prob(z)
prob = torch.exp(log_prob)
plt.figure(figsize=(10, 10))
plt.pcolormesh(xx, yy, prob.reshape(grid_size, grid_size))
plt.show()
Flow training¶
In [ ]:
Copied!
flow_types = ("Planar", "Radial", "NICE", "RealNVP")
max_iter = 20000
batch_size = 1024
plot_batches = 10 ** 2
plot_samples = 10 ** 4
save_iter = 50
for name in flow_types:
K_arr = [2, 8, 32]
for K in K_arr:
print("Flow type {} with K = {}".format(name, K))
for k in range(len(priors)):
if k == 0 or k == 4:
anneal_iter = 10000
else: # turn annealing off when fitting to sinusoidal distributions
anneal_iter = 1
flows = []
b = torch.tensor([0,1])
for i in range(K):
if name == "Planar":
flows += [nf.flows.Planar((2,))]
elif name == "Radial":
flows += [nf.flows.Radial((2,))]
elif name == "NICE":
flows += [nf.flows.MaskedAffineFlow(b, nf.nets.MLP([2, 16, 16, 2], init_zeros=True))]
elif name == "RealNVP":
flows += [nf.flows.MaskedAffineFlow(b, nf.nets.MLP([2, 16, 16, 2], init_zeros=True),
nf.nets.MLP([2, 16, 16, 2], init_zeros=True))]
b = 1-b # parity alternation for mask
q0 = nf.distributions.DiagGaussian(2)
nfm = nf.NormalizingFlow(p=priors[k], q0=q0, flows=flows)
nfm.to(dev) # Move model on GPU if available
# Train model
loss_hist = np.array([])
log_q_hist = np.array([])
log_p_hist = np.array([])
x = torch.zeros(batch_size, device=dev)
optimizer = torch.optim.Adam(nfm.parameters(), lr=1e-3, weight_decay=1e-3)
for it in tqdm(range(max_iter)):
optimizer.zero_grad()
loss = nfm.reverse_kld(batch_size, np.min([1.0, 0.01 + it / anneal_iter]))
if ~(torch.isnan(loss) | torch.isinf(loss)):
loss.backward()
optimizer.step()
if (it + 1) % save_iter == 0:
loss_hist = np.append(loss_hist, loss.cpu().data.numpy())
# Plot learned posterior distribution
z_np = np.zeros((0, 2))
for i in range(plot_batches):
z, _ = nfm.sample(plot_samples)
z_np = np.concatenate((z_np, z.cpu().data.numpy()))
plt.figure(figsize=(10, 10))
plt.hist2d(z_np[:, 0], z_np[:, 1], (grid_size, grid_size), grid_shape)
plt.show()
np.save("{}-K={}-k={}".format(name,K,k), (z_np, loss.cpu().data.numpy()))
# Plot training history
plt.figure(figsize=(10, 10))
plt.plot(loss_hist, label='loss')
plt.legend()
plt.show()
flow_types = ("Planar", "Radial", "NICE", "RealNVP")
max_iter = 20000
batch_size = 1024
plot_batches = 10 ** 2
plot_samples = 10 ** 4
save_iter = 50
for name in flow_types:
K_arr = [2, 8, 32]
for K in K_arr:
print("Flow type {} with K = {}".format(name, K))
for k in range(len(priors)):
if k == 0 or k == 4:
anneal_iter = 10000
else: # turn annealing off when fitting to sinusoidal distributions
anneal_iter = 1
flows = []
b = torch.tensor([0,1])
for i in range(K):
if name == "Planar":
flows += [nf.flows.Planar((2,))]
elif name == "Radial":
flows += [nf.flows.Radial((2,))]
elif name == "NICE":
flows += [nf.flows.MaskedAffineFlow(b, nf.nets.MLP([2, 16, 16, 2], init_zeros=True))]
elif name == "RealNVP":
flows += [nf.flows.MaskedAffineFlow(b, nf.nets.MLP([2, 16, 16, 2], init_zeros=True),
nf.nets.MLP([2, 16, 16, 2], init_zeros=True))]
b = 1-b # parity alternation for mask
q0 = nf.distributions.DiagGaussian(2)
nfm = nf.NormalizingFlow(p=priors[k], q0=q0, flows=flows)
nfm.to(dev) # Move model on GPU if available
# Train model
loss_hist = np.array([])
log_q_hist = np.array([])
log_p_hist = np.array([])
x = torch.zeros(batch_size, device=dev)
optimizer = torch.optim.Adam(nfm.parameters(), lr=1e-3, weight_decay=1e-3)
for it in tqdm(range(max_iter)):
optimizer.zero_grad()
loss = nfm.reverse_kld(batch_size, np.min([1.0, 0.01 + it / anneal_iter]))
if ~(torch.isnan(loss) | torch.isinf(loss)):
loss.backward()
optimizer.step()
if (it + 1) % save_iter == 0:
loss_hist = np.append(loss_hist, loss.cpu().data.numpy())
# Plot learned posterior distribution
z_np = np.zeros((0, 2))
for i in range(plot_batches):
z, _ = nfm.sample(plot_samples)
z_np = np.concatenate((z_np, z.cpu().data.numpy()))
plt.figure(figsize=(10, 10))
plt.hist2d(z_np[:, 0], z_np[:, 1], (grid_size, grid_size), grid_shape)
plt.show()
np.save("{}-K={}-k={}".format(name,K,k), (z_np, loss.cpu().data.numpy()))
# Plot training history
plt.figure(figsize=(10, 10))
plt.plot(loss_hist, label='loss')
plt.legend()
plt.show()
Expressivity plot of flows¶
In [ ]:
Copied!
fig = plt.figure(figsize=(14, 10))
K_arr = [2, 8, 32]
nrows=5
ncols=7
axes = [ fig.add_subplot(nrows, ncols, r * ncols + c + 1) for r in range(0, nrows) for c in range(0, ncols) ]
for ax in axes:
ax.set_xticks([])
ax.set_yticks([])
grid_size = 100
grid_length = 4.0
grid_shape = ([-grid_length, grid_length], [-grid_length, grid_length])
space_mesh = torch.linspace(-grid_length, grid_length, grid_size)
xx, yy = torch.meshgrid(space_mesh, space_mesh)
z = torch.cat([xx.unsqueeze(2), yy.unsqueeze(2)], 2)
z = z.reshape(-1, 2)
axes[0].annotate('Target', xy=(0.5, 1.10), xytext=(0.5, 1.20), xycoords='axes fraction',
fontsize=24, ha='center', va='bottom',
arrowprops=dict(arrowstyle='-[, widthB=1.5, lengthB=0.2', lw=2.0))
for k in range(5):
axes[k*ncols].set_ylabel('{}'.format(k+1), rotation=0, fontsize=20, labelpad=15)
log_prob = priors[k].log_prob(z)
prob = torch.exp(log_prob)
axes[k*ncols + 0].pcolormesh(xx, yy, prob.reshape(grid_size, grid_size))
for l in range(len(K_arr)):
K = K_arr[l]
if l == 1:
axes[0*ncols + l+1].annotate('Planar flows', xy=(0.5, 1.10), xytext=(0.5, 1.20), xycoords='axes fraction',
fontsize=24, ha='center', va='bottom',
arrowprops=dict(arrowstyle='-[, widthB=6.0, lengthB=0.2', lw=2.0))
axes[4*ncols + l+1].set_xlabel('K = {}'.format(K), fontsize=20)
for k in range(5):
z_np, _ = np.load("Planar-K={}-k={}.npy".format(K,k), allow_pickle=True)
axes[k*ncols + l+1].hist2d(z_np[:, 0], z_np[:, 1], (grid_size, grid_size), grid_shape)
for l in range(len(K_arr)):
K = K_arr[l]
if l == 1:
axes[0*ncols + l+1+len(K_arr)].annotate('Radial flows', xy=(0.5, 1.10), xytext=(0.5, 1.20), xycoords='axes fraction',
fontsize=24, ha='center', va='bottom',
arrowprops=dict(arrowstyle='-[, widthB=6.0, lengthB=0.2', lw=2.0))
axes[4*ncols + l+1+len(K_arr)].set_xlabel('K = {}'.format(K), fontsize=20)
for k in range(5):
z_np, _ = np.load("Radial-K={}-k={}.npy".format(K,k), allow_pickle=True)
axes[k*ncols + l+1+len(K_arr)].hist2d(z_np[:, 0], z_np[:, 1], (grid_size, grid_size), grid_shape)
fig.subplots_adjust(hspace=0.02, wspace=0.02)
for l in range(1,4):
for k in range(5):
pos1 = axes[k*ncols + l].get_position() # get the original position
pos2 = [pos1.x0 + 0.01, pos1.y0, pos1.width, pos1.height]
axes[k*ncols + l].set_position(pos2) # set a new position
for l in range(4,7):
for k in range(5):
pos1 = axes[k*ncols + l].get_position() # get the original position
pos2 = [pos1.x0 + 0.02, pos1.y0, pos1.width, pos1.height]
axes[k*ncols + l].set_position(pos2) # set a new position
fig = plt.figure(figsize=(14, 10))
K_arr = [2, 8, 32]
nrows=5
ncols=7
axes = [ fig.add_subplot(nrows, ncols, r * ncols + c + 1) for r in range(0, nrows) for c in range(0, ncols) ]
for ax in axes:
ax.set_xticks([])
ax.set_yticks([])
grid_size = 100
grid_length = 4.0
grid_shape = ([-grid_length, grid_length], [-grid_length, grid_length])
space_mesh = torch.linspace(-grid_length, grid_length, grid_size)
xx, yy = torch.meshgrid(space_mesh, space_mesh)
z = torch.cat([xx.unsqueeze(2), yy.unsqueeze(2)], 2)
z = z.reshape(-1, 2)
axes[0].annotate('Target', xy=(0.5, 1.10), xytext=(0.5, 1.20), xycoords='axes fraction',
fontsize=24, ha='center', va='bottom',
arrowprops=dict(arrowstyle='-[, widthB=1.5, lengthB=0.2', lw=2.0))
for k in range(5):
axes[k*ncols].set_ylabel('{}'.format(k+1), rotation=0, fontsize=20, labelpad=15)
log_prob = priors[k].log_prob(z)
prob = torch.exp(log_prob)
axes[k*ncols + 0].pcolormesh(xx, yy, prob.reshape(grid_size, grid_size))
for l in range(len(K_arr)):
K = K_arr[l]
if l == 1:
axes[0*ncols + l+1].annotate('Planar flows', xy=(0.5, 1.10), xytext=(0.5, 1.20), xycoords='axes fraction',
fontsize=24, ha='center', va='bottom',
arrowprops=dict(arrowstyle='-[, widthB=6.0, lengthB=0.2', lw=2.0))
axes[4*ncols + l+1].set_xlabel('K = {}'.format(K), fontsize=20)
for k in range(5):
z_np, _ = np.load("Planar-K={}-k={}.npy".format(K,k), allow_pickle=True)
axes[k*ncols + l+1].hist2d(z_np[:, 0], z_np[:, 1], (grid_size, grid_size), grid_shape)
for l in range(len(K_arr)):
K = K_arr[l]
if l == 1:
axes[0*ncols + l+1+len(K_arr)].annotate('Radial flows', xy=(0.5, 1.10), xytext=(0.5, 1.20), xycoords='axes fraction',
fontsize=24, ha='center', va='bottom',
arrowprops=dict(arrowstyle='-[, widthB=6.0, lengthB=0.2', lw=2.0))
axes[4*ncols + l+1+len(K_arr)].set_xlabel('K = {}'.format(K), fontsize=20)
for k in range(5):
z_np, _ = np.load("Radial-K={}-k={}.npy".format(K,k), allow_pickle=True)
axes[k*ncols + l+1+len(K_arr)].hist2d(z_np[:, 0], z_np[:, 1], (grid_size, grid_size), grid_shape)
fig.subplots_adjust(hspace=0.02, wspace=0.02)
for l in range(1,4):
for k in range(5):
pos1 = axes[k*ncols + l].get_position() # get the original position
pos2 = [pos1.x0 + 0.01, pos1.y0, pos1.width, pos1.height]
axes[k*ncols + l].set_position(pos2) # set a new position
for l in range(4,7):
for k in range(5):
pos1 = axes[k*ncols + l].get_position() # get the original position
pos2 = [pos1.x0 + 0.02, pos1.y0, pos1.width, pos1.height]
axes[k*ncols + l].set_position(pos2) # set a new position
Comparison of Planar, Radial, and Affine on given prior distribution¶
In [ ]:
Copied!
from itertools import repeat
k_arr = [0, 2, 4]
fig, axes = plt.subplots(nrows=1, ncols=3, figsize=(15, 5))
markers = ['s', 'o', 'v', 'P', 'd']
for k in range(len(k_arr)):
loss = [[] for i in repeat(None, len(flow_types))]
for intt, name in enumerate(flow_types):
for K in K_arr:
_, loss_v = np.load("{}-K={}-k={}.npy".format(name,K,k), allow_pickle=True)
loss[intt].append(loss_v)
axes[k].plot(K_arr, loss[intt], marker=markers[intt], label=name)
axes[k].set_title('Target {}'.format(k_arr[k]+1), fontsize=16)
axes[k].set_xlabel('Flow length', fontsize=12)
axes[k].set_ylabel('Variational bound (nats)', fontsize=12)
axes[k].legend()
axes[k].grid('major')
fig.tight_layout(pad=2.0)
from itertools import repeat
k_arr = [0, 2, 4]
fig, axes = plt.subplots(nrows=1, ncols=3, figsize=(15, 5))
markers = ['s', 'o', 'v', 'P', 'd']
for k in range(len(k_arr)):
loss = [[] for i in repeat(None, len(flow_types))]
for intt, name in enumerate(flow_types):
for K in K_arr:
_, loss_v = np.load("{}-K={}-k={}.npy".format(name,K,k), allow_pickle=True)
loss[intt].append(loss_v)
axes[k].plot(K_arr, loss[intt], marker=markers[intt], label=name)
axes[k].set_title('Target {}'.format(k_arr[k]+1), fontsize=16)
axes[k].set_xlabel('Flow length', fontsize=12)
axes[k].set_ylabel('Variational bound (nats)', fontsize=12)
axes[k].legend()
axes[k].grid('major')
fig.tight_layout(pad=2.0)