In this tutorial, we walk step by step through using Hugging Face’s LeRobot library to train and evaluate a behavior-cloning policy on the PushT dataset. We begin by setting up the environment in Google Colab, installing the required dependencies, and loading the dataset through LeRobot’s unified API. We then design a compact visuomotor policy that combines a convolutional backbone with a small MLP head, allowing us to map image and state observations directly to robot actions. By training on a subset of the dataset for speed, we are able to quickly demonstrate how LeRobot enables reproducible, dataset-driven robot learning pipelines. Check out the FULL CODES here.
!pip -q install --upgrade lerobot torch torchvision timm imageio[ffmpeg]
import os, math, random, io, sys, json, pathlib, time
import torch, torch.nn as nn, torch.nn.functional as F
from torch.utils.data import DataLoader, Subset
from torchvision.utils import make_grid, save_image
import numpy as np
import imageio.v2 as imageio
try:
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
except Exception:
from lerobot.datasets.lerobot_dataset import LeRobotDataset
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
SEED = 42
random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED)
We begin by installing the required libraries and setting up our environment for training. We import all the essential modules, configure the dataset loader, and fix the random seed to ensure reproducibility. We also detect whether we are running on a GPU or CPU, allowing our experiments to run efficiently. Check out the FULL CODES here.
REPO_ID = "lerobot/pusht"
ds = LeRobotDataset(REPO_ID)
print("Dataset length:", len(ds))
s0 = ds[0]
keys = list(s0.keys())
print("Sample keys:", keys)
def key_with(prefixes):
for k in keys:
for p in prefixes:
if k.startswith(p): return k
return None
K_IMG = key_with(["observation.image", "observation.images", "observation.rgb"])
K_STATE = key_with(["observation.state"])
K_ACT = "action"
assert K_ACT in s0, f"No 'action' key found in sample. Found: {keys}"
print("Using keys -> IMG:", K_IMG, "STATE:", K_STATE, "ACT:", K_ACT)
We load the PushT dataset with LeRobot and inspect its structure. We check the available keys, identify which ones correspond to images, states, and actions, and map them for consistent access throughout our training pipeline. Check out the FULL CODES here.
class PushTWrapper(torch.utils.data.Dataset):
def __init__(self, base):
self.base = base
def __len__(self): return len(self.base)
def __getitem__(self, i):
x = self.base[i]
img = x[K_IMG]
if img.ndim == 4: img = img[-1]
img = img.float() / 255.0 if img.dtype==torch.uint8 else img.float()
state = x.get(K_STATE, torch.zeros(2))
state = state.float().reshape(-1)
act = x[K_ACT].float().reshape(-1)
if img.shape[-2:] != (96,96):
img = F.interpolate(img.unsqueeze(0), size=(96,96), mode="bilinear", align_corners=False)[0]
return {"image": img, "state": state, "action": act}
wrapped = PushTWrapper(ds)
N = len(wrapped)
idx = list(range(N))
random.shuffle(idx)
n_train = int(0.9*N)
train_idx, val_idx = idx[:n_train], idx[n_train:]
train_ds = Subset(wrapped, train_idx[:12000])
val_ds = Subset(wrapped, val_idx[:2000])
BATCH = 128
train_loader = DataLoader(train_ds, batch_size=BATCH, shuffle=True, num_workers=2, pin_memory=True)
val_loader = DataLoader(val_ds, batch_size=BATCH, shuffle=False, num_workers=2, pin_memory=True)
We wrap each sample so we consistently get a normalized 96×96 image, a flattened state, and an action, picking the last frame if a temporal stack is present. We then shuffle, split into train/val, and cap sizes for fast Colab runs. Finally, we create efficient DataLoaders with batching, shuffling, and pinned memory to keep training smooth. Check out the FULL CODES here.
class SmallBackbone(nn.Module):
def __init__(self, out=256):
super().__init__()
self.conv = nn.Sequential(
nn.Conv2d(3, 32, 5, 2, 2), nn.ReLU(inplace=True),
nn.Conv2d(32, 64, 3, 2, 1), nn.ReLU(inplace=True),
nn.Conv2d(64,128, 3, 2, 1), nn.ReLU(inplace=True),
nn.Conv2d(128,128,3, 1, 1), nn.ReLU(inplace=True),
)
self.head = nn.Sequential(nn.AdaptiveAvgPool2d(1), nn.Flatten(), nn.Linear(128, out), nn.ReLU(inplace=True))
def forward(self, x): return self.head(self.conv(x))
class BCPolicy(nn.Module):
def __init__(self, img_dim=256, state_dim=2, hidden=256, act_dim=2):
super().__init__()
self.backbone = SmallBackbone(img_dim)
self.mlp = nn.Sequential(
nn.Linear(img_dim + state_dim, hidden), nn.ReLU(inplace=True),
nn.Linear(hidden, hidden//2), nn.ReLU(inplace=True),
nn.Linear(hidden//2, act_dim)
)
def forward(self, img, state):
z = self.backbone(img)
if state.ndim==1: state = state.unsqueeze(0)
z = torch.cat([z, state], dim=-1)
return self.mlp(z)
policy = BCPolicy().to(DEVICE)
opt = torch.optim.AdamW(policy.parameters(), lr=3e-4, weight_decay=1e-4)
scaler = torch.cuda.amp.GradScaler(enabled=(DEVICE=="cuda"))
@torch.no_grad()
def evaluate():
policy.eval()
mse, n = 0.0, 0
for batch in val_loader:
img = batch["image"].to(DEVICE, non_blocking=True)
st = batch["state"].to(DEVICE, non_blocking=True)
act = batch["action"].to(DEVICE, non_blocking=True)
pred = policy(img, st)
mse += F.mse_loss(pred, act, reduction="sum").item()
n += act.numel()
return mse / n
def cosine_lr(step, total, base=3e-4, min_lr=3e-5):
if step>=total: return min_lr
cos = 0.5*(1+math.cos(math.pi*step/total))
return min_lr + (base-min_lr)*cos
EPOCHS = 4
steps_total = EPOCHS*len(train_loader)
step = 0
best = float("inf")
ckpt = "/content/lerobot_pusht_bc.pt"
for epoch in range(EPOCHS):
policy.train()
for batch in train_loader:
lr = cosine_lr(step, steps_total); step += 1
for g in opt.param_groups: g["lr"] = lr
img = batch["image"].to(DEVICE, non_blocking=True)
st = batch["state"].to(DEVICE, non_blocking=True)
act = batch["action"].to(DEVICE, non_blocking=True)
opt.zero_grad(set_to_none=True)
with torch.cuda.amp.autocast(enabled=(DEVICE=="cuda")):
pred = policy(img, st)
loss = F.smooth_l1_loss(pred, act)
scaler.scale(loss).backward()
nn.utils.clip_grad_norm_(policy.parameters(), 1.0)
scaler.step(opt); scaler.update()
val_mse = evaluate()
print(f"Epoch {epoch+1}/{EPOCHS} | Val MSE: {val_mse:.6f}")
if val_mse
We define a compact visuomotor policy: a CNN backbone extracts image features that we fuse with the robot state to predict 2-D actions. We train with AdamW, a cosine learning-rate schedule, mixed precision, and gradient clipping, while evaluating with MSE on the validation set. We checkpoint the best model by validation loss so we can reload the strongest policy later. Check out the FULL CODES here.
policy.load_state_dict(torch.load(ckpt)["state_dict"]); policy.eval()
os.makedirs("/content/vis", exist_ok=True)
def draw_arrow(imgCHW, action_xy, scale=40):
import PIL.Image, PIL.ImageDraw
C,H,W = imgCHW.shape
arr = (imgCHW.clamp(0,1).permute(1,2,0).cpu().numpy()*255).astype(np.uint8)
im = PIL.Image.fromarray(arr)
dr = PIL.ImageDraw.Draw(im)
cx, cy = W//2, H//2
dx, dy = float(action_xy[0])*scale, float(-action_xy[1])*scale
dr.line((cx, cy, cx+dx, cy+dy), width=3, fill=(0,255,0))
return np.array(im)
frames = []
with torch.no_grad():
for i in range(60):
b = wrapped[i]
img = b["image"].unsqueeze(0).to(DEVICE)
st = b["state"].unsqueeze(0).to(DEVICE)
pred = policy(img, st)[0].cpu()
frames.append(draw_arrow(b["image"], pred))
video_path = "/content/vis/pusht_pred.mp4"
imageio.mimsave(video_path, frames, fps=10)
print("Wrote", video_path)
grid = make_grid(torch.stack([wrapped[i]["image"] for i in range(16)]), nrow=8)
save_image(grid, "/content/vis/grid.png")
print("Saved grid:", "/content/vis/grid.png")
We reload the best checkpoint and switch the policy to eval so we can visualize its behavior. We overlay predicted action arrows on frames, stitch them into a short MP4, and also save a quick image grid for a snapshot view of the dataset. This lets us confirm, at a glance, what actions our model outputs on real PushT observations.
In conclusion, we see how easily LeRobot integrates data handling, policy definition, and evaluation into a single framework. By training our lightweight policy and visualizing predicted actions on PushT frames, we confirm that the library gives us a practical entry point into robot learning without needing real-world hardware. We are now equipped to extend the pipeline to more advanced models, such as diffusion or ACT policies, to experiment with different datasets, and even to share our trained policies on the Hugging Face Hub.
Check out the FULL CODES here. Feel free to check out our GitHub Page for Tutorials, Codes and Notebooks. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter.
Asif Razzaq is the CEO of Marktechpost Media Inc.. As a visionary entrepreneur and engineer, Asif is committed to harnessing the potential of Artificial Intelligence for social good. His most recent endeavor is the launch of an Artificial Intelligence Media Platform, Marktechpost, which stands out for its in-depth coverage of machine learning and deep learning news that is both technically sound and easily understandable by a wide audience. The platform boasts of over 2 million monthly views, illustrating its popularity among audiences.