Source code for parallelkdepy.core

"""
Low-level plumbing: Manage Julia session and interfacing between Python and Julia.
"""

from typing import Sequence, Optional

import numpy as np

from juliacall import Main as jl

_initialized = False


def _init_julia():
    global _initialized

    if not _initialized:
        jl.seval("using ParallelKDE")
        _initialized = True


AvailableDevices = ["cpu", "cuda"]
AvailableImplementations = {"cpu": ["serial", "threaded"], "cuda": ["cuda"]}


[docs] def str_to_symbol(s: str): return jl.Symbol(s)
[docs] def device_to_str(device) -> str: devices = { jl.ParallelKDE.Devices.IsCPU(): "cpu", jl.ParallelKDE.Devices.IsCUDA(): "cuda", } return devices[device]
[docs] def create_grid(ranges: Sequence, device: str = "cpu", b32: Optional[bool] = None): """ Create a grid instance of the Julia object `ParallelKDE.Grid`. Parameters ---------- ranges : Sequence The ranges for the grid. device : str, optional The device type, e.g., 'cpu' or 'cuda'. Default is 'cpu'. b32 : Optional[bool], optional Whether to use 32-bit precision for GPU devices. Default is None, which behaves as True (32-bit precision) if the device is 'cuda'. Setting it as False for 'cuda' devices will use 64-bit precision. This keyword argument is ignored when device is 'cpu'. Returns ------- juliacall.AnyValue The created grid object in Julia. """ ranges = [jl.range(start, stop, length) for start, stop, length in ranges] if device not in AvailableDevices: raise ValueError( f"Unsupported device type: {device}. Available devices: {AvailableDevices}" ) b32 = b32 if b32 is not None else (device != "cpu") if device == "cpu": grid = jl.initialize_grid(*ranges, b32=b32) else: grid = jl.initialize_grid(*ranges, device=str_to_symbol(device), b32=b32) return grid
[docs] def grid_shape(grid_jl) -> tuple: return jl.size(grid_jl)
[docs] def grid_device(grid_jl) -> str: device_jl = jl.ParallelKDE.get_device(grid_jl) try: return device_to_str(device_jl) except KeyError: raise ValueError(f"Unsupported device type: {device_jl}")
[docs] def grid_coordinates(grid_jl) -> tuple[np.ndarray, ...]: coords_np = jl.get_coordinates(grid_jl).to_numpy() return tuple(np.ascontiguousarray(coords_np[i]) for i in range(coords_np.shape[0]))
[docs] def grid_step(grid_jl) -> list: return list(jl.spacings(grid_jl).to_numpy())
[docs] def grid_bounds(grid_jl) -> list[tuple]: bounds_np = jl.bounds(grid_jl).to_numpy() return list(zip(bounds_np[0], bounds_np[1]))
[docs] def grid_initial_bandwidth(grid_jl) -> list: return list(jl.initial_bandwidth(grid_jl).to_numpy())
[docs] def grid_fftgrid(grid_jl): return jl.fftgrid(grid_jl)
[docs] def find_grid( data: np.ndarray, grid_bounds: Optional[Sequence[tuple]] = None, grid_dims: Optional[Sequence] = None, grid_steps: Optional[Sequence] = None, grid_padding: Optional[Sequence] = None, device: str = "cpu", ): data = data.transpose() if data.ndim > 1 else data device = str_to_symbol(device) return jl.find_grid( data, grid_bounds=grid_bounds, grid_dims=grid_dims, grid_steps=grid_steps, grid_padding=grid_padding, device=device, )
[docs] def initialize_dirac_sequence( data: np.ndarray, grid_jl=None, bootstrap_indices: Optional[np.ndarray] = None, device: str = "cpu", method: Optional[str] = None, ) -> np.ndarray: """ Creates a numpy array with the dirac sequence obtained from the data on the grid. Parameters ---------- data : np.ndarray Numpy array of the data with shape (n_samples, n_features). grid_jl Julia grid object. bootstrap_indices : Optional[np.ndarray], optional Optional numpy array of bootstrap indices. If provided, it should have shape (n_bootstraps, n_samples). device : str, optional The device to store the array, e.g., 'cpu' or 'cuda'. Default is 'cpu'. method : str, optional The method to use for initializing the Dirac sequence, e.g., 'serial' or 'parallel'. Default is 'serial'. """ if data.ndim != 2: raise ValueError("Data must be 2-dimensional (n_samples, n_features).") data = data.transpose() if data.ndim > 1 else data if device not in AvailableDevices: raise ValueError( f"Unsupported device type: {device}. Available devices: {AvailableDevices}" ) if (method is not None) and (method not in AvailableImplementations[device]): raise ValueError("Unsupported method for the given device type.") device = str_to_symbol(device) method = str_to_symbol(method) if method is not None else method bootstrap_indices = ( bootstrap_indices.transpose() if ((bootstrap_indices is not None) and (bootstrap_indices.ndim > 1)) else bootstrap_indices ) dirac_sequences = jl.initialize_dirac_sequence( data, grid=grid_jl, bootstrap_idxs=bootstrap_indices, device=device, method=method, ).to_numpy() dirac_sequences = np.moveaxis(dirac_sequences, -1, 0) return np.ascontiguousarray(dirac_sequences)
[docs] def create_density_estimation( data: np.ndarray, grid, dims: Optional[Sequence] = None, grid_bounds: Optional[Sequence[tuple]] = None, grid_padding: Optional[Sequence] = None, device: str = "cpu", ): data = data.transpose() if data.ndim > 1 else data return jl.initialize_estimation( data, grid=grid, dims=dims, grid_bounds=grid_bounds, grid_padding=grid_padding, device=str_to_symbol(device), )
[docs] def estimate_density(density_estimation, estimation_method: str, **kwargs): kwargs = { k: str_to_symbol(v) if isinstance(v, str) else v for k, v in kwargs.items() } jl.estimate_density_b( density_estimation, str_to_symbol(estimation_method), **kwargs ) return None
[docs] def get_density(density_estimation, **kwargs) -> np.ndarray: density = jl.get_density(density_estimation, **kwargs) density_np = density.to_numpy() return np.ascontiguousarray(density_np)