Notes#
import sys
import os
sys.path.append(os.path.abspath("../.."))
import numpy as np
from scipy.signal import convolve2d
from scipy.spatial import ConvexHull
The ConvexHullData class saves the indicies and number of points in the convex hull.
class ConvexHullData:
def __init__(self, n):
self.indices = np.zeros(n, dtype=int)
self.hullCount = 0
The BFM class saves the dimentions of the source and target matricies and the source distribution.
class BFM:
def __init__(self, n1, n2, mu):
self.n1 = n1
self.n2 = n2
n = max(n1, n2)
self.hull = ConvexHullData(n)
self.argmin = np.zeros(n, dtype=int)
self.temp = np.zeros((n1, n2))
self.xMap = np.zeros((n1 + 1, n2 + 1))
self.yMap = np.zeros((n1 + 1, n2 + 1))
self.rho = np.copy(mu)
self.totalMass = np.sum(mu) / (n1 * n2)
ctransform calls compute_2d_dual_inside. This function computes the 2D dual inside the given arrays dual and u. It uses a temporary array to store intermediate values, calls the compute_dual function for each row of u, and updates the dual array accordingly. The function handles duality in both directions, first row-wise and then column-wise.
def ctransform(self, dual, phi):
self.compute_2d_dual_inside(dual, phi)
def compute_2d_dual_inside(self, dual, u):
temp = np.empty((self.n1, self.n2))
for i in range(self.n1):
self.compute_dual(temp[i, :], u[i, :])
temp = - temp
for j in range(self.n2):
self.compute_dual(dual[:, j], temp[:, j])
This function calculates the pushforward map, which describes how a measure (represented by rho) changes under a transformation. It calls the cal_pushforward_map function to compute the map, then samples the resulting map using sampling_pushforward and copies the result into rho. The input arrays phi and nu are used as part of th
def pushforward(self, rho, phi, nu):
self.cal_pushforward_map(phi)
self.sampling_pushforward(nu)
np.copyto(rho, self.rho)
This function computes the convex dual of a 1D array u and stores the result in dual. It uses the convex hull algorithm to find the relevant points, computes indices, and applies a vectorized computation to efficiently update the dual array based on a set of conditions.
def compute_dual(self, dual, u):
# Prepare data points
n = len(u)
x_coords = np.arange(n) / n # Normalize indices to [0, 1]
points = np.column_stack((x_coords, u))
# Compute convex hull
hull = ConvexHull(points)
# Extract hull vertices
hull_indices = hull.vertices
# Sort indices to maintain order
hull_indices.sort()
# Update hull properties
self.hull.indices[:len(hull_indices)] = hull_indices
self.hull.hullCount = len(hull_indices)
# Proceed with dual indices computation
self.compute_dual_indices(self.argmin, u)
# Vectorized computation to replace the for-loop
n_dual = len(dual)
s = (np.arange(n_dual) + 0.5) / n_dual # Compute s values for all indices
index = self.argmin # Array of indices
x = (index + 0.5) / n_dual # Compute x values based on indices
v1 = s * x - u[index] # Compute v1 for all elements
v2 = s * ((n_dual - 0.5) / n_dual) - u[n_dual - 1] # Compute v2 as a constant array
# Create a boolean mask where v1 > v2
mask = v1 > v2
# Update dual and argmin arrays using the mask
dual[mask] = v1[mask]
dual[~mask] = v2[~mask]
self.argmin[~mask] = n_dual - 1 # Update argmin where v1 <= v2
This function computes the indices needed for duality based on the convex hull of the function u. It calculates the differences between consecutive hull points and slopes, and then uses the searchsorted method to find where the values in the slope array correspond to indices in dualIndices. The result is stored in dualIndices.
def compute_dual_indices(self, dualIndices, u):
hc = self.hull.hullCount
ic = self.hull.indices[:hc]
# Precompute differences
delta_ic = ic[1:] - ic[:-1]
delta_u = u[ic[1:]] - u[ic[:-1]]
# Compute slopes
slopes = len(dualIndices) * delta_u / delta_ic
# Compute s values
s_array = (np.arange(len(dualIndices)) + 0.5) / len(dualIndices)
# Use searchsorted to find the appropriate indices
counters = np.searchsorted(slopes, s_array, side='left')
# Map counters to hull indices
dualIndices[:] = ic[counters]
This function calculates the pushforward map for the dual array by applying a 2D convolution with a kernel. It pads the dual array and computes an interpolation function based on this convolution. The interpolation result is used to update xMap and yMap, which represent the slope in the x and y directions.
def cal_pushforward_map(self, dual):
kernel = np.array([[.25, .25],
[.25, .25]])
dual_padded = np.pad(dual, ((1,1), (1,1)), mode = "edge")
interpolate_function = convolve2d(dual_padded, kernel, mode = "valid")
interpolate_function_padded = np.pad(interpolate_function, ((1,1), (1,1)), mode = "edge")
self.xMap = (0.5 * self.n2 * (interpolate_function_padded[1:-1, 2:] - interpolate_function_padded[1:-1, :-2]))
self.yMap = (0.5 * self.n1 * (interpolate_function_padded[2:, 1:-1] - interpolate_function_padded[:-2, 1:-1]))
The sampling_pushforward function takes in a distribution \(\mu\) and the BFM object. It takes the following steps to update rho with the pushforward of the distribution \(\mu\):
calculate how much each point in the domain is “stretched” by the pushforward operator and decide the density of samples taken near each point based on its maximum stretch.
for each sample point, estimate the location of its pushforward.
Distribute the mass of the sample point(1/(number of samples) of the mass of the original point) to the nearest four grid points according to their relative positions to the sample point.
def sampling_pushforward(self, mu):
pcount = self.n1 * self.n2
self.rho.fill(0)
for i in range(self.n1):
for j in range(self.n2):
mass = mu[i, j]
if mass > 0:
xStretch0 = abs(self.xMap[i, j + 1] - self.xMap[i, j])
xStretch1 = abs(self.xMap[i + 1, j + 1] - self.xMap[i + 1, j])
yStretch0 = abs(self.yMap[i + 1, j] - self.yMap[i, j])
yStretch1 = abs(self.yMap[i + 1, j + 1] - self.yMap[i, j + 1])
xStretch = max(xStretch0, xStretch1)
yStretch = max(yStretch0, yStretch1)
xSamples = max(int(self.n1 * xStretch), 1)
ySamples = max(int(self.n2 * yStretch), 1)
factor = 1 / (xSamples * ySamples)
for l in range(ySamples):
for k in range(xSamples):
a = (k + 0.5) / xSamples
b = (l + 0.5) / ySamples
xPoint = (1 - b) * (1 - a) * self.xMap[i, j] + (1 - b) * a * self.xMap[i, j + 1] + b * (1 - a) * self.xMap[i + 1, j] + a * b * self.xMap[i + 1, j + 1]
yPoint = (1 - b) * (1 - a) * self.yMap[i, j] + (1 - b) * a * self.yMap[i, j + 1] + b * (1 - a) * self.yMap[i + 1, j] + a * b * self.yMap[i + 1, j + 1]
X = xPoint * self.n1 - 0.5
Y = yPoint * self.n2 - 0.5
xIndex = int(X)
yIndex = int(Y)
xFrac = X - xIndex
yFrac = Y - yIndex
xOther = xIndex + 1
yOther = yIndex + 1
xIndex = min(max(xIndex, 0), self.n1 - 1)
xOther = min(max(xOther, 0), self.n1 - 1)
yIndex = min(max(yIndex, 0), self.n2 - 1)
yOther = min(max(yOther, 0), self.n2 - 1)
idx = yIndex, xIndex
idx_yOther = yOther, xIndex
idx_xOther = yIndex, xOther
idx_xyOther = yOther, xOther
self.rho[idx] += (1 - xFrac) * (1 - yFrac) * mass * factor
self.rho[idx_yOther] += (1 - xFrac) * yFrac * mass * factor
self.rho[idx_xOther] += xFrac * (1 - yFrac) * mass * factor
self.rho[idx_xyOther] += xFrac * yFrac * mass * factor
sum_rho = np.sum(self.rho) / pcount
self.rho *= self.totalMass / sum_rho