Normalizing Flows
  • About
  • API
  • Examples
    • Augmented Normalizing Flow based on Real NVP
    • Changing the base distribution of a flow model
    • Mixed Circular and Normal Neural Spline Flow
    • Comparison of Planar, Radial, and Affine Coupling Flows
    • Conditional Normalizing Flow Model
    • Glow
    • Learn Distribution given by an Image using Real NVP
    • Neural Spline Flow
    • Neural Spline Flow on a Circular and a Normal Coordinate
    • Planar flow
    • Real NVP
    • Residual Flow
    • Variational Autoencoder with Normalizing Flows
  • Search
  • Previous
  • Next
  • Comparison of Planar, Radial, and Affine Coupling Flows

Comparison of Planar, Radial, and Affine Coupling Flows¶

In this notebook, we train normalizing flows to fit predefined prior distributions, testing their expressivity. The plots are generated to visualize the learned distributions for given layers $K$, and the training loss is plotted to compare the expressivity of different flows.

In [ ]:
Copied!
%load_ext autoreload
%autoreload 2

# Import required packages
import torch
import numpy as np

import normflows as nf

from matplotlib import pyplot as plt
from tqdm import tqdm

print("PyTorch version: %s" % torch.__version__)
dev = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print("Using device: %s" % dev)

#z shape is (batch_size, num_samples, dim)
%load_ext autoreload %autoreload 2 # Import required packages import torch import numpy as np import normflows as nf from matplotlib import pyplot as plt from tqdm import tqdm print("PyTorch version: %s" % torch.__version__) dev = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") print("Using device: %s" % dev) #z shape is (batch_size, num_samples, dim)

Prior target distribution visualization¶

In [ ]:
Copied!
priors = []
priors.append(nf.distributions.TwoModes(2.0, 0.2))
priors.append(nf.distributions.Sinusoidal(0.4, 4))
priors.append(nf.distributions.Sinusoidal_gap(0.4, 4))
priors.append(nf.distributions.Sinusoidal_split(0.4, 4))
priors.append(nf.distributions.Smiley(0.15))


# Plot prior distributions
grid_size = 200
grid_length = 4.0
grid_shape = ([-grid_length, grid_length], [-grid_length, grid_length])

space_mesh = torch.linspace(-grid_length, grid_length, grid_size)
xx, yy = torch.meshgrid(space_mesh, space_mesh)
z = torch.cat([xx.unsqueeze(2), yy.unsqueeze(2)], 2)
z = z.reshape(-1, 2)

K_arr = [2, 8, 32]
max_iter = 30000
batch_size = 512
num_samples = 256
save_iter = 1000

for k in range(len(priors)):
    log_prob = priors[k].log_prob(z)
    prob = torch.exp(log_prob)

    plt.figure(figsize=(10, 10))
    plt.pcolormesh(xx, yy, prob.reshape(grid_size, grid_size))
    plt.show()
priors = [] priors.append(nf.distributions.TwoModes(2.0, 0.2)) priors.append(nf.distributions.Sinusoidal(0.4, 4)) priors.append(nf.distributions.Sinusoidal_gap(0.4, 4)) priors.append(nf.distributions.Sinusoidal_split(0.4, 4)) priors.append(nf.distributions.Smiley(0.15)) # Plot prior distributions grid_size = 200 grid_length = 4.0 grid_shape = ([-grid_length, grid_length], [-grid_length, grid_length]) space_mesh = torch.linspace(-grid_length, grid_length, grid_size) xx, yy = torch.meshgrid(space_mesh, space_mesh) z = torch.cat([xx.unsqueeze(2), yy.unsqueeze(2)], 2) z = z.reshape(-1, 2) K_arr = [2, 8, 32] max_iter = 30000 batch_size = 512 num_samples = 256 save_iter = 1000 for k in range(len(priors)): log_prob = priors[k].log_prob(z) prob = torch.exp(log_prob) plt.figure(figsize=(10, 10)) plt.pcolormesh(xx, yy, prob.reshape(grid_size, grid_size)) plt.show()

Flow training¶

In [ ]:
Copied!
flow_types = ("Planar", "Radial", "NICE", "RealNVP")
max_iter = 20000
batch_size = 1024
plot_batches = 10 ** 2
plot_samples = 10 ** 4
save_iter = 50

for name in flow_types:
    K_arr = [2, 8, 32]
    for K in K_arr:
        print("Flow type {} with K = {}".format(name, K))
        for k in range(len(priors)):
            if k == 0 or k == 4:
                anneal_iter = 10000
            else: # turn annealing off when fitting to sinusoidal distributions
                anneal_iter = 1
        
            flows = []
            b = torch.tensor([0,1])
            for i in range(K):
                if name == "Planar":
                    flows += [nf.flows.Planar((2,))]
                elif name == "Radial":
                    flows += [nf.flows.Radial((2,))]
                elif name == "NICE":
                    flows += [nf.flows.MaskedAffineFlow(b, nf.nets.MLP([2, 16, 16, 2], init_zeros=True))]
                elif name == "RealNVP":
                    flows += [nf.flows.MaskedAffineFlow(b, nf.nets.MLP([2, 16, 16, 2], init_zeros=True), 
                                                        nf.nets.MLP([2, 16, 16, 2], init_zeros=True))]
                b = 1-b # parity alternation for mask

            q0 = nf.distributions.DiagGaussian(2)
            nfm = nf.NormalizingFlow(p=priors[k], q0=q0, flows=flows)
            nfm.to(dev) # Move model on GPU if available
    
            # Train model
            loss_hist = np.array([])
            log_q_hist = np.array([])
            log_p_hist = np.array([])
            x = torch.zeros(batch_size, device=dev)

            optimizer = torch.optim.Adam(nfm.parameters(), lr=1e-3, weight_decay=1e-3)
            for it in tqdm(range(max_iter)):
                optimizer.zero_grad()
                loss = nfm.reverse_kld(batch_size, np.min([1.0, 0.01 + it / anneal_iter]))
                if ~(torch.isnan(loss) | torch.isinf(loss)):
                    loss.backward()
                    optimizer.step()
            
                if (it + 1) % save_iter == 0:
                    loss_hist = np.append(loss_hist, loss.cpu().data.numpy())

            # Plot learned posterior distribution
            z_np = np.zeros((0, 2))
            for i in range(plot_batches):
                z, _ = nfm.sample(plot_samples)
                z_np = np.concatenate((z_np, z.cpu().data.numpy()))
            plt.figure(figsize=(10, 10))
            plt.hist2d(z_np[:, 0], z_np[:, 1], (grid_size, grid_size), grid_shape)
            plt.show()
            np.save("{}-K={}-k={}".format(name,K,k), (z_np, loss.cpu().data.numpy()))
    
            # Plot training history
            plt.figure(figsize=(10, 10))
            plt.plot(loss_hist, label='loss')
            plt.legend()
            plt.show()
flow_types = ("Planar", "Radial", "NICE", "RealNVP") max_iter = 20000 batch_size = 1024 plot_batches = 10 ** 2 plot_samples = 10 ** 4 save_iter = 50 for name in flow_types: K_arr = [2, 8, 32] for K in K_arr: print("Flow type {} with K = {}".format(name, K)) for k in range(len(priors)): if k == 0 or k == 4: anneal_iter = 10000 else: # turn annealing off when fitting to sinusoidal distributions anneal_iter = 1 flows = [] b = torch.tensor([0,1]) for i in range(K): if name == "Planar": flows += [nf.flows.Planar((2,))] elif name == "Radial": flows += [nf.flows.Radial((2,))] elif name == "NICE": flows += [nf.flows.MaskedAffineFlow(b, nf.nets.MLP([2, 16, 16, 2], init_zeros=True))] elif name == "RealNVP": flows += [nf.flows.MaskedAffineFlow(b, nf.nets.MLP([2, 16, 16, 2], init_zeros=True), nf.nets.MLP([2, 16, 16, 2], init_zeros=True))] b = 1-b # parity alternation for mask q0 = nf.distributions.DiagGaussian(2) nfm = nf.NormalizingFlow(p=priors[k], q0=q0, flows=flows) nfm.to(dev) # Move model on GPU if available # Train model loss_hist = np.array([]) log_q_hist = np.array([]) log_p_hist = np.array([]) x = torch.zeros(batch_size, device=dev) optimizer = torch.optim.Adam(nfm.parameters(), lr=1e-3, weight_decay=1e-3) for it in tqdm(range(max_iter)): optimizer.zero_grad() loss = nfm.reverse_kld(batch_size, np.min([1.0, 0.01 + it / anneal_iter])) if ~(torch.isnan(loss) | torch.isinf(loss)): loss.backward() optimizer.step() if (it + 1) % save_iter == 0: loss_hist = np.append(loss_hist, loss.cpu().data.numpy()) # Plot learned posterior distribution z_np = np.zeros((0, 2)) for i in range(plot_batches): z, _ = nfm.sample(plot_samples) z_np = np.concatenate((z_np, z.cpu().data.numpy())) plt.figure(figsize=(10, 10)) plt.hist2d(z_np[:, 0], z_np[:, 1], (grid_size, grid_size), grid_shape) plt.show() np.save("{}-K={}-k={}".format(name,K,k), (z_np, loss.cpu().data.numpy())) # Plot training history plt.figure(figsize=(10, 10)) plt.plot(loss_hist, label='loss') plt.legend() plt.show()

Expressivity plot of flows¶

In [ ]:
Copied!
fig = plt.figure(figsize=(14, 10))
K_arr = [2, 8, 32]
nrows=5
ncols=7
axes = [ fig.add_subplot(nrows, ncols, r * ncols + c + 1) for r in range(0, nrows) for c in range(0, ncols) ]

for ax in axes:
    ax.set_xticks([])
    ax.set_yticks([])

grid_size = 100
grid_length = 4.0
grid_shape = ([-grid_length, grid_length], [-grid_length, grid_length])

space_mesh = torch.linspace(-grid_length, grid_length, grid_size)
xx, yy = torch.meshgrid(space_mesh, space_mesh)
z = torch.cat([xx.unsqueeze(2), yy.unsqueeze(2)], 2)
z = z.reshape(-1, 2)
axes[0].annotate('Target', xy=(0.5, 1.10), xytext=(0.5, 1.20), xycoords='axes fraction', 
            fontsize=24, ha='center', va='bottom',
            arrowprops=dict(arrowstyle='-[, widthB=1.5, lengthB=0.2', lw=2.0))
for k in range(5):
    axes[k*ncols].set_ylabel('{}'.format(k+1), rotation=0, fontsize=20, labelpad=15)
    log_prob = priors[k].log_prob(z)
    prob = torch.exp(log_prob)
    axes[k*ncols + 0].pcolormesh(xx, yy, prob.reshape(grid_size, grid_size))


for l in range(len(K_arr)):
    K = K_arr[l]
    if l == 1:
        axes[0*ncols + l+1].annotate('Planar flows', xy=(0.5, 1.10), xytext=(0.5, 1.20), xycoords='axes fraction', 
            fontsize=24, ha='center', va='bottom',
            arrowprops=dict(arrowstyle='-[, widthB=6.0, lengthB=0.2', lw=2.0))
    axes[4*ncols + l+1].set_xlabel('K = {}'.format(K), fontsize=20)
    for k in range(5):
        z_np, _ = np.load("Planar-K={}-k={}.npy".format(K,k), allow_pickle=True)
        axes[k*ncols + l+1].hist2d(z_np[:, 0], z_np[:, 1], (grid_size, grid_size), grid_shape)
        
for l in range(len(K_arr)):
    K = K_arr[l]
    if l == 1:
        axes[0*ncols + l+1+len(K_arr)].annotate('Radial flows', xy=(0.5, 1.10), xytext=(0.5, 1.20), xycoords='axes fraction', 
            fontsize=24, ha='center', va='bottom',
            arrowprops=dict(arrowstyle='-[, widthB=6.0, lengthB=0.2', lw=2.0))
    axes[4*ncols + l+1+len(K_arr)].set_xlabel('K = {}'.format(K), fontsize=20)
    for k in range(5):
        z_np, _ = np.load("Radial-K={}-k={}.npy".format(K,k), allow_pickle=True)
        axes[k*ncols + l+1+len(K_arr)].hist2d(z_np[:, 0], z_np[:, 1], (grid_size, grid_size), grid_shape)

fig.subplots_adjust(hspace=0.02, wspace=0.02)

for l in range(1,4):
    for k in range(5):
        pos1 = axes[k*ncols + l].get_position() # get the original position 
        pos2 = [pos1.x0 + 0.01, pos1.y0,  pos1.width, pos1.height] 
        axes[k*ncols + l].set_position(pos2) # set a new position
        
for l in range(4,7):
    for k in range(5):
        pos1 = axes[k*ncols + l].get_position() # get the original position 
        pos2 = [pos1.x0 + 0.02, pos1.y0,  pos1.width, pos1.height] 
        axes[k*ncols + l].set_position(pos2) # set a new position
fig = plt.figure(figsize=(14, 10)) K_arr = [2, 8, 32] nrows=5 ncols=7 axes = [ fig.add_subplot(nrows, ncols, r * ncols + c + 1) for r in range(0, nrows) for c in range(0, ncols) ] for ax in axes: ax.set_xticks([]) ax.set_yticks([]) grid_size = 100 grid_length = 4.0 grid_shape = ([-grid_length, grid_length], [-grid_length, grid_length]) space_mesh = torch.linspace(-grid_length, grid_length, grid_size) xx, yy = torch.meshgrid(space_mesh, space_mesh) z = torch.cat([xx.unsqueeze(2), yy.unsqueeze(2)], 2) z = z.reshape(-1, 2) axes[0].annotate('Target', xy=(0.5, 1.10), xytext=(0.5, 1.20), xycoords='axes fraction', fontsize=24, ha='center', va='bottom', arrowprops=dict(arrowstyle='-[, widthB=1.5, lengthB=0.2', lw=2.0)) for k in range(5): axes[k*ncols].set_ylabel('{}'.format(k+1), rotation=0, fontsize=20, labelpad=15) log_prob = priors[k].log_prob(z) prob = torch.exp(log_prob) axes[k*ncols + 0].pcolormesh(xx, yy, prob.reshape(grid_size, grid_size)) for l in range(len(K_arr)): K = K_arr[l] if l == 1: axes[0*ncols + l+1].annotate('Planar flows', xy=(0.5, 1.10), xytext=(0.5, 1.20), xycoords='axes fraction', fontsize=24, ha='center', va='bottom', arrowprops=dict(arrowstyle='-[, widthB=6.0, lengthB=0.2', lw=2.0)) axes[4*ncols + l+1].set_xlabel('K = {}'.format(K), fontsize=20) for k in range(5): z_np, _ = np.load("Planar-K={}-k={}.npy".format(K,k), allow_pickle=True) axes[k*ncols + l+1].hist2d(z_np[:, 0], z_np[:, 1], (grid_size, grid_size), grid_shape) for l in range(len(K_arr)): K = K_arr[l] if l == 1: axes[0*ncols + l+1+len(K_arr)].annotate('Radial flows', xy=(0.5, 1.10), xytext=(0.5, 1.20), xycoords='axes fraction', fontsize=24, ha='center', va='bottom', arrowprops=dict(arrowstyle='-[, widthB=6.0, lengthB=0.2', lw=2.0)) axes[4*ncols + l+1+len(K_arr)].set_xlabel('K = {}'.format(K), fontsize=20) for k in range(5): z_np, _ = np.load("Radial-K={}-k={}.npy".format(K,k), allow_pickle=True) axes[k*ncols + l+1+len(K_arr)].hist2d(z_np[:, 0], z_np[:, 1], (grid_size, grid_size), grid_shape) fig.subplots_adjust(hspace=0.02, wspace=0.02) for l in range(1,4): for k in range(5): pos1 = axes[k*ncols + l].get_position() # get the original position pos2 = [pos1.x0 + 0.01, pos1.y0, pos1.width, pos1.height] axes[k*ncols + l].set_position(pos2) # set a new position for l in range(4,7): for k in range(5): pos1 = axes[k*ncols + l].get_position() # get the original position pos2 = [pos1.x0 + 0.02, pos1.y0, pos1.width, pos1.height] axes[k*ncols + l].set_position(pos2) # set a new position

Comparison of Planar, Radial, and Affine on given prior distribution¶

In [ ]:
Copied!
from itertools import repeat

k_arr = [0, 2, 4]
fig, axes = plt.subplots(nrows=1, ncols=3, figsize=(15, 5))
markers = ['s', 'o', 'v', 'P', 'd']

for k in range(len(k_arr)):
    loss = [[] for i in repeat(None, len(flow_types))]
    for intt, name in enumerate(flow_types):
        for K in K_arr:
            _, loss_v = np.load("{}-K={}-k={}.npy".format(name,K,k), allow_pickle=True)
            loss[intt].append(loss_v)
        axes[k].plot(K_arr, loss[intt], marker=markers[intt], label=name)
    axes[k].set_title('Target {}'.format(k_arr[k]+1), fontsize=16)
    axes[k].set_xlabel('Flow length', fontsize=12)
    axes[k].set_ylabel('Variational bound (nats)', fontsize=12)
    axes[k].legend()
    axes[k].grid('major')

fig.tight_layout(pad=2.0)
from itertools import repeat k_arr = [0, 2, 4] fig, axes = plt.subplots(nrows=1, ncols=3, figsize=(15, 5)) markers = ['s', 'o', 'v', 'P', 'd'] for k in range(len(k_arr)): loss = [[] for i in repeat(None, len(flow_types))] for intt, name in enumerate(flow_types): for K in K_arr: _, loss_v = np.load("{}-K={}-k={}.npy".format(name,K,k), allow_pickle=True) loss[intt].append(loss_v) axes[k].plot(K_arr, loss[intt], marker=markers[intt], label=name) axes[k].set_title('Target {}'.format(k_arr[k]+1), fontsize=16) axes[k].set_xlabel('Flow length', fontsize=12) axes[k].set_ylabel('Variational bound (nats)', fontsize=12) axes[k].legend() axes[k].grid('major') fig.tight_layout(pad=2.0)

Documentation built with MkDocs.

Search

From here you can search these documents. Enter your search terms below.

Keyboard Shortcuts

Keys Action
? Open this help
n Next page
p Previous page
s Search