blazefl.contrib.FedAvgProcessPoolClientTrainer#

class blazefl.contrib.FedAvgProcessPoolClientTrainer(model_selector: ModelSelector, model_name: str, share_dir: Path, state_dir: Path, dataset: PartitionedDataset[FedAvgPartitionType], device: str, num_clients: int, epochs: int, batch_size: int, lr: float, seed: int, num_parallels: int, ipc_mode: Literal['storage', 'shared_memory'])[source]#

Bases: ProcessPoolClientTrainer[FedAvgProcessPoolUplinkPackage, FedAvgDownlinkPackage, FedAvgClientConfig]

Parallel client trainer for the Federated Averaging (FedAvg) algorithm.

This trainer handles the parallelized training and evaluation of local models across multiple clients, distributing tasks to different processes or devices.

model_selector#

Selector for initializing the local model.

Type:

ModelSelector

model_name#

Name of the model to be used.

Type:

str

share_dir#

Directory to store shared data files between processes.

Type:

Path

state_dir#

Directory to save random states for reproducibility.

Type:

Path

dataset#

Dataset partitioned across clients.

Type:

PartitionedDataset

device#

Device to run the models on (‘cpu’ or ‘cuda’).

Type:

str

num_clients#

Total number of clients in the federation.

Type:

int

epochs#

Number of local training epochs per client.

Type:

int

batch_size#

Batch size for local training.

Type:

int

lr#

Learning rate for the optimizer.

Type:

float

seed#

Seed for reproducibility.

Type:

int

num_parallels#

Number of parallel processes for training.

Type:

int

ipc_mode#

Inter-process communication mode.

Type:

Literal[“storage”, “shared_memory”]

device_count#

Number of CUDA devices available (if using GPU).

Type:

int | None

__init__(model_selector: ModelSelector, model_name: str, share_dir: Path, state_dir: Path, dataset: PartitionedDataset[FedAvgPartitionType], device: str, num_clients: int, epochs: int, batch_size: int, lr: float, seed: int, num_parallels: int, ipc_mode: Literal['storage', 'shared_memory']) None[source]#

Initialize the FedAvgParalleClientTrainer.

Parameters:
  • model_selector (ModelSelector) – Selector for initializing the local model.

  • model_name (str) – Name of the model to be used.

  • share_dir (Path) – Directory to store shared data files between processes.

  • state_dir (Path) – Directory to save random states for reproducibility.

  • dataset (PartitionedDataset) – Dataset partitioned across clients.

  • device (str) – Device to run the models on (‘cpu’ or ‘cuda’).

  • num_clients (int) – Total number of clients in the federation.

  • epochs (int) – Number of local training epochs per client.

  • batch_size (int) – Batch size for local training.

  • lr (float) – Learning rate for the optimizer.

  • seed (int) – Seed for reproducibility.

  • num_parallels (int) – Number of parallel processes for training.

Methods

__init__(model_selector, model_name, ...)

Initialize the FedAvgParalleClientTrainer.

get_client_config(cid)

Generate the client configuration for a specific client.

get_client_device(cid)

Retrieve the device to use for processing a given client.

local_process(payload, cid_list)

Manage the parallel processing of clients.

prepare_uplink_package_buffer()

progress_fn(it)

A no-op progress function that can be overridden to provide custom progress tracking.

train(model, model_parameters, train_loader, ...)

Train the model with the given training data loader.

uplink_package()

Retrieve the uplink packages for transmission to the server.

worker(config, payload, device, stop_event, *)

Process a single client's local training and evaluation.

Attributes

get_client_config(cid: int) FedAvgClientConfig[source]#

Generate the client configuration for a specific client.

Parameters:

cid (int) – Client ID.

Returns:

Client configuration data structure.

Return type:

FedAvgClientConfig

progress_fn(it: list[ApplyResult]) Iterable[ApplyResult][source]#

A no-op progress function that can be overridden to provide custom progress tracking.

Parameters:

it (list[ApplyResult]) – A list of ApplyResult objects.

Returns:

The original iterable.

Return type:

Iterable[ApplyResult]

static train(model: Module, model_parameters: Tensor, train_loader: DataLoader, device: str, epochs: int, lr: float, stop_event: Event, cid: int) FedAvgProcessPoolUplinkPackage[source]#

Train the model with the given training data loader.

Parameters:
  • model (torch.nn.Module) – The model to train.

  • model_parameters (torch.Tensor) – Initial global model parameters.

  • train_loader (DataLoader) – DataLoader for the training data.

  • device (str) – Device to run the training on.

  • epochs (int) – Number of local training epochs.

  • lr (float) – Learning rate for the optimizer.

Returns:

Uplink package containing updated model parameters and data size.

Return type:

FedAvgUplinkPackage

Retrieve the uplink packages for transmission to the server.

Returns:

A list of uplink packages.

Return type:

list[FedAvgUplinkPackage]

static worker(config: FedAvgClientConfig | Path, payload: FedAvgDownlinkPackage | Path, device: str, stop_event: Event, *, shm_buffer: FedAvgProcessPoolUplinkPackage | None = None) FedAvgProcessPoolUplinkPackage | Path[source]#

Process a single client’s local training and evaluation.

This method is executed by a worker process and handles loading client configuration and payload, performing the client-specific training, and returning the result.

Parameters:
  • config (FedAvgClientConfig | Path) – The client’s configuration data, or a path to a file containing the configuration if ipc_mode is “storage”.

  • payload (FedAvgDownlinkPackage | Path) – The downlink payload from the server, or a path to a file containing the payload if ipc_mode is “storage”.

  • device (str) – Device to use for processing (e.g., “cpu”, “cuda:0”).

Returns:

The uplink package containing the client’s results, or a path to a file containing the package if ipc_mode is “storage”.

Return type:

FedAvgUplinkPackage | Path