Hypha Space Logo

DiLoCo Training

This guide explains how to set up and run distributed training in Hypha using the DiLoCo (Distributed Low-Communication) based algorithm. It covers the training workflow and configuration. For initial setup, see Installation and Quick Start.

Overview

DiLoCo is an approach that enables efficient training across geographically distributed and poorly-connected infrastructure. It achieves comparable performance to traditional synchronous training while drastically reducing communication.

What You'll Do

To run a training job, you will:

  1. Start a data node to serve your dataset
  2. Launch training workers that perform local optimization steps
  3. Start a parameter server worker that aggregates updates
  4. Configure and run the scheduler to orchestrate the job

How It Works

DiLoCo uses a two-level optimization strategy:

This approach dramatically reduces network traffic because workers only synchronize after hundreds of local steps rather than after every batch.

Roles and Workflow

Understanding each component's role helps diagnose issues and optimize performance.

Components

Data Node (docs/data.md)

Training Workers (docs/worker.md)

Parameter Server Worker (docs/worker.md)

Scheduler (docs/scheduler.md)

Execution Flow

A DiLoCo training job progresses through several phases, from initial resource discovery to the iterative training loop. The scheduler orchestrates the entire workflow, coordinating between training workers, the parameter server, and data nodes. The process begins with workers bidding on tasks based on their capabilities and pricing, followed by initialization where workers download models and data. Once training begins, workers perform local optimization independently, periodically synchronizing through the parameter server to update the global model. Throughout this process, the scheduler continuously manages resource leases and tracks data slice usage to ensure efficient utilization and prevent conflicts.

sequenceDiagram
    participant S as Scheduler
    participant W as Training Workers
    participant PS as Parameter Server
    participant DN as Data Node

    Note over S,DN: Phase 1: Discovery & Selection
    S--)W: Publish worker requirements (pub/sub)
    W->>W: Evaluate job against capabilities
    W->>S: Submit offers (resources, pricing)
    S->>S: Select offers (greedy strategy)
    S->>W: Establish leases (reserve resources)

    Note over S,DN: Phase 2: Initialization
    S->>W: Dispatch task
    W->>W: Download model from HuggingFace
    W->>W: Download preprocessor

    Note over S,DN: Phase 3: Training Loop (repeats)
    loop
        W->>S: Request data slice
        S->>W: Assign slice (AVAILABLE → ASSIGNED)
        W->>DN: Fetch SafeTensors slice
        DN->>W: Stream data
        W->>S: Confirm slice received (ASSIGNED → USED)
    end

    loop k local steps
        W->>W: Train on local batch
        W->>S: Report metrics (loss, throughput)
    end

    Note over S,DN: Phase 4: Synchronization
    S->>S: Determine sync timing (simulation)
    S->>W: Trigger synchronization
    W->>W: Compute pseudo-gradient (Δθ)
    W->>PS: Send pseudo-gradient
    PS->>PS: Aggregate gradients (mean)
    PS->>PS: Apply outer optimizer (Nesterov)
    PS->>W: Broadcast updated global gradients

Data Distribution

Efficient data loading and distribution is critical for training throughput. Datasets are provided in slices via data nodes. You need to prepare your data by slicing it and store them in the SafeTensors format, fast loading. Additionally, we rely on Snappy compression to minimize the storage footprint and faster transmission:

from safetensors.torch import save
from snappy import compress
import torch

# Vision data example
def save_vision_slice(images, labels, output_path):
    tensors = {
        "images": torch.stack(images),  # [N, C, H, W]
        "labels": torch.tensor(labels),  # [N]
    }        
    with open(output_path, 'wb') as out_file:
        out_file.write(compress(save(tensors)))

# Language data example
def save_language_slice(input_ids, attention_masks, output_path):
    tensors = {
        "input_ids": torch.tensor(input_ids),      # [N, seq_len]
        "attention_mask": torch.tensor(attention_masks),  # [N, seq_len]
    }
    with open(output_path, 'wb') as out_file:
        out_file.write(compress(save(tensors)))

See Data for complete dataset preparation instructions and how to serve the prepared dataset. Once readay, start your data node to serve the dataset:

hypha-data run -c config.toml

Training Workers

Training workers perform the core computation of the DiLoCo algorithm. Each worker begins by advertising its resources and executor capabilities to the scheduler, then receives job configuration via the Job Bridge socket. Once configured, the worker downloads the model and preprocessor from HuggingFace and begins fetching data slices from the data node. The worker then trains for k local steps using the inner optimizer (AdamW), continuously reporting metrics such as loss and throughput to the scheduler. After completing its local training steps, the worker computes and sends pseudo-gradients (weight deltas) to the parameter server, receives the updated global gradient to update the weights, and repeats the cycle.

Configure workers to run the DiLoCo training executor:

[[executors]]
class = "train"
name = "diloco-transformer"
runtime = "process"
cmd = "uv"
args = [
    "run",
    "--python", "3.12",
    "--no-project",
    "--with", "hypha-accelerate-executor[<extra for CUDA/ROCm version>] @ https://github.com/hypha-space/hypha/releases/download/v<version>/hypha_accelerate_executor-<version without semver channel or metadata>-py3-none-any.whl",
    # Optional: add `--extra`, "<variant>" here to pin a specific torch build (see docs below)
    "--", # N.B. this standalone `--` is the separator between `uv` opts and the cmd to be executed
    "accelerate",
    "launch",
    "--config_file", "<path/to/accelerate.yaml>",
    "-m", "hypha.accelerate_executor.training",
    "--socket", "{SOCKET_PATH}",
    "--work-dir", "{WORK_DIR}",
    "--job", "{JOB_JSON}",
]

See Worker for complete worker configuration.

Parameter Server

The parameter server aggregates updates from training workers and maintains the global model state. It waits for pseudo-gradients from all workers (with a configurable timeout), then aggregates them by computing their mean. The server applies the outer optimizer using Nesterov momentum, where velocity = momentum × velocity_prev + Δθ_avg and Δθ_update = learning_rate × (momentum × velocity + Δθ_avg). Finally, it broadcasts the updates to all workers, s.t. they can update their local model and begin with the next training cycle.

Run a dedicated worker with the parameter server executor:

[[executors]]
class = "aggregate"
name = "parameter-server"
runtime = "parameter-server"

The parameter server can be colocated with training workers or run separately.

Scheduler

The scheduler orchestrates the entire training workflow and manages resource allocation. It publishes job requirements via pub/sub and evaluates worker offers based on a pricing strategy such as greedy selection or lowest price. Once workers are selected, the scheduler establishes leases to reserve resources and prevent double-booking. Throughout training, it continuously tracks worker performance metrics including batch processing times and throughput, using this data to determine optimal synchronization timing through simulation. The scheduler also manages data slice assignment and epoch progression while forwarding metrics to monitoring systems like AIM and OpenTelemetry.

Performance-Aware Scheduling

The scheduler adapts to heterogeneous workers by tracking per-worker batch processing times and allocating different batch counts based on each worker's speed. It simulates future progress to determine optimal synchronization points, minimizing the time faster workers spend waiting for stragglers.

Configuration

Define your training job and start the scheduler.

[scheduler.job]
dataset_name = "my-dataset"  # Must match data node announcement

[scheduler.job.model]
repository = "microsoft/resnet-50"
type = "vision-classification"
# For private models:
# token = "hf_..."

[scheduler.job.preprocessor]
repository = "microsoft/resnet-50"

[scheduler.job.inner_optimizer]
learning_rate = 0.001
schedule = "cosine-with-warmup"
warmup_steps = 2000
total_steps = 500000

[scheduler.job.outer_optimizer]
learning_rate = 0.7
momentum = 0.9

Resource Requirements

Specify minimum worker requirements:

[scheduler.job.resources]
min_memory_mb = 8192
min_gpus = 1

Model Types

Vision Classification:

[scheduler.job.model]
repository = "microsoft/resnet-50"
type = "image-classification"

Uses AutoModelForImageClassification.from_pretrained().

All HuggingFace models compatible with the Auto classes are supported. See the HuggingFace documentation for details.

See docs/scheduler.md for complete configuration reference.

Monitoring and Metrics

AIM Integration

If you want the Scheduler to send metrics to AIM, you can download and set up our AIM Driver Connector from our releases page. Please follow its instructions to set up the connector and scheduler.

After that, configure the scheduler to expose metrics for AIM:

[scheduler]
status_bridge = "127.0.0.1:61000"

Once set up and running, access dashboard at http://localhost:43800 (default) to view:

Algorithm Details

This section provides technical details of the DiLoCo algorithm for those who want to understand the implementation.

Inner Optimization Loop

Workers train locally for k steps using their assigned data:

# Initialize from parameter server
θ_worker = download_model_from_huggingface()
θ_initial = θ_worker.clone()

# Local training for k steps
for step in range(k):
    batch = fetch_data_from_data_node()
    loss = forward_pass(model, batch)
    gradients = backward_pass(loss)
    θ_worker = inner_optimizer.step(θ_worker, gradients)  # AdamW

    report_metrics(loss, batch_size)

# Compute pseudo-gradient and weight by processed samples
Δθ = θ_worker - θ_initial
weighted_Δθ = Δθ * ( local_sample_size / global_sample_size)

send_to_parameter_server(weighted_Δθ)

Key Points:

Outer Optimization (Parameter Server)

The parameter server aggregates and applies Nesterov momentum:

# Wait for all workers (with timeout)
pseudo_gradients = [Δθ_1, Δθ_2, ..., Δθ_N]

# Sum weighted pseudo-gradients
Δθ_avg = sum(pseudo_gradients)

# Apply Nesterov momentum
velocity = momentum × velocity_prev + Δθ_avg
Δθ_update = learning_rate × (momentum × velocity + Δθ_avg)

# Broadcast to workers
broadcast_to_workers(Δθ_update)

Key Points:

Performance-Aware Synchronization

The scheduler uses simulation to determine optimal sync timing:

def project_completions(workers, target_data_points):
    """Simulate worker progress to find optimal sync point."""
    updates_per_worker = {w: 0 for w in workers}
    next_completion_time = {w: w.avg_batch_time for w in workers}
    current_time = 0
    data_processed = 0

    while data_processed < target_data_points:
        # Find worker finishing next batch
        next_worker = min(workers, key=lambda w: next_completion_time[w])
        current_time = next_completion_time[next_worker]

        # Record completion
        data_processed += next_worker.batch_size
        updates_per_worker[next_worker] += 1
        next_completion_time[next_worker] += next_worker.avg_batch_time

    return current_time, updates_per_worker

def should_schedule_sync(workers, target_remaining):
    """Decide whether to trigger synchronization now."""
    time_to_complete, updates = project_completions(workers, target_remaining)
    slowest_worker_time = max([
        updates[w] × w.avg_batch_time for w in workers
    ])
    threshold = 5 * 60  # 5 minutes
    return slowest_worker_time < threshold

Benefits: