"""`nuScenes <https://www.nuscenes.org/>`_ Dataset."""
import os
from typing import Any, ClassVar, override
import numpy as np
import torch
from nuscenes.eval.detection.constants import DETECTION_NAMES
from nuscenes.eval.detection.utils import category_to_detection_name
from PIL import Image
from torch import Tensor
from torch.utils.data import Dataset
from vision3d.datasets import FusionInputs, SampleTargets
from vision3d.tensors import (
BoundingBox3DFormat,
BoundingBoxes3D,
CameraExtrinsics,
CameraImages,
CameraIntrinsics,
PointCloud3D,
)
# Camera ordering for consistent multi-camera tensor layout
CAMERA_NAMES: list[str] = [
"CAM_FRONT",
"CAM_FRONT_RIGHT",
"CAM_BACK_RIGHT",
"CAM_BACK",
"CAM_BACK_LEFT",
"CAM_FRONT_LEFT",
]
[docs]
class NuScenes3D(Dataset[tuple[FusionInputs, SampleTargets]]):
"""`nuScenes <https://www.nuscenes.org/>`_ 3D object detection dataset.
Returns samples in the **global frame** with annotations as
:class:`~vision3d.tensors.BoundingBoxes3D` in ``XYZLWHY`` format (yaw
extracted from quaternion). Multi-camera images, intrinsics, and extrinsics
are returned for all 6 cameras.
Requires the ``nuscenes-devkit`` package.
Args:
root (str or pathlib.Path): Root directory of the nuScenes dataset.
version (str): Dataset version. Default: ``"v1.0-mini"``.
split (str): One of ``"train"`` or ``"val"``. Default: ``"train"``.
transforms (Callable, optional): A function/transform that takes input
sample and its target as entry and returns a transformed version.
"""
camera_names: ClassVar[list[str]] = CAMERA_NAMES
classes: ClassVar[list[str]] = list(DETECTION_NAMES)
class_to_idx: ClassVar[dict[str, int]] = {name: i for i, name in enumerate(classes)}
def __init__(
self,
root: str | os.PathLike[str],
version: str = "v1.0-mini",
split: str = "train",
transforms: Any | None = None,
) -> None:
try:
from nuscenes.nuscenes import NuScenes
except ImportError as e:
msg = "nuscenes-devkit is required. Install with: uv sync --group nuscenes"
raise ImportError(msg) from e
self.root = str(root)
self.version = version
self.split = split
self.transforms = transforms
self._nusc = NuScenes(version=version, dataroot=self.root, verbose=False)
# Collect sample tokens for the requested split
split_scenes = _get_split_scenes(version, split)
self._sample_tokens: list[str] = []
for scene in self._nusc.scene:
if scene["name"] in split_scenes:
token = scene["first_sample_token"]
while token:
self._sample_tokens.append(token)
sample = self._nusc.get("sample", token)
token = sample["next"]
def __len__(self) -> int:
"""Return the number of samples."""
return len(self._sample_tokens)
@override
def __getitem__(self, index: int) -> tuple[FusionInputs, SampleTargets]:
"""Load a single sample.
Args:
index (int): Index.
Returns:
Tuple of ``(inputs, targets)``.
**inputs** is a dict with keys:
- ``"points"``: :class:`PointCloud3D` in lidar frame ``[N, 5]``
(x, y, z, intensity, ring_index).
- ``"images"``: :class:`CameraImages` ``[6, 3, H, W]``.
- ``"extrinsics"``: :class:`CameraExtrinsics` ``[6, 4, 4]``
(lidar-to-camera).
- ``"intrinsics"``: :class:`CameraIntrinsics` ``[6, 3, 3]``.
**targets** is a dict with keys:
- ``"boxes"``: :class:`BoundingBoxes3D` in lidar frame,
format ``XYZLWHY``.
- ``"labels"``: :class:`~torch.Tensor` of class indices.
"""
sample = self._nusc.get("sample", self._sample_tokens[index])
# Lidar
lidar_data = self._nusc.get("sample_data", sample["data"]["LIDAR_TOP"])
points = self._load_lidar(lidar_data)
lidar_ego_pose = self._nusc.get("ego_pose", lidar_data["ego_pose_token"])
lidar_calib = self._nusc.get(
"calibrated_sensor", lidar_data["calibrated_sensor_token"]
)
# Transform from lidar to global
lidar_to_global = _make_transform(
lidar_ego_pose["translation"],
lidar_ego_pose["rotation"],
) @ _make_transform(
lidar_calib["translation"],
lidar_calib["rotation"],
)
# Cameras
images_list = []
intrinsics_list = []
extrinsics_list = []
for cam_name in self.camera_names:
cam_data = self._nusc.get("sample_data", sample["data"][cam_name])
cam_calib = self._nusc.get(
"calibrated_sensor", cam_data["calibrated_sensor_token"]
)
cam_ego_pose = self._nusc.get("ego_pose", cam_data["ego_pose_token"])
# Camera image
img = self._load_image(cam_data)
images_list.append(img)
# Intrinsics
K = torch.tensor(cam_calib["camera_intrinsic"], dtype=torch.float32)
intrinsics_list.append(K)
# Extrinsics: lidar-to-camera
cam_to_global = _make_transform(
cam_ego_pose["translation"],
cam_ego_pose["rotation"],
) @ _make_transform(
cam_calib["translation"],
cam_calib["rotation"],
)
lidar_to_cam = torch.linalg.inv(cam_to_global) @ lidar_to_global
extrinsics_list.append(lidar_to_cam)
inputs: FusionInputs = {
"points": PointCloud3D(points),
"images": CameraImages(torch.stack(images_list)),
"extrinsics": CameraExtrinsics(torch.stack(extrinsics_list)),
"intrinsics": CameraIntrinsics(
torch.stack(intrinsics_list),
image_size=(images_list[0].shape[-2], images_list[0].shape[-1]),
),
}
# Annotations (in global frame -> convert to lidar frame)
targets = self._load_annotations(sample, lidar_to_global)
if self.transforms is not None:
inputs, targets = self.transforms(inputs, targets)
return inputs, targets
def _load_lidar(self, lidar_data: dict[str, Any]) -> Tensor:
path = os.path.join(self.root, lidar_data["filename"])
points = np.fromfile(path, dtype=np.float32).reshape(-1, 5)
return torch.from_numpy(points)
def _load_image(self, cam_data: dict[str, Any]) -> Tensor:
path = os.path.join(self.root, cam_data["filename"])
img = np.array(Image.open(path).convert("RGB"))
return torch.from_numpy(img).permute(2, 0, 1).float() / 255.0
def _load_annotations(
self, sample: dict[str, Any], lidar_to_global: Tensor
) -> SampleTargets:
"""Load annotations and convert from global to lidar frame.
Returns:
Dict with ``"boxes"`` (:class:`BoundingBoxes3D`, XYZLWHY format),
``"labels"`` (int tensor).
"""
global_to_lidar = torch.linalg.inv(lidar_to_global)
label_ids: list[int] = []
boxes: list[list[float]] = []
for ann_token in sample["anns"]:
ann = self._nusc.get("sample_annotation", ann_token)
det_name = category_to_detection_name(ann["category_name"])
if det_name is None:
continue
label_ids.append(self.class_to_idx[det_name])
# Center: global -> lidar
center_global = torch.tensor(
[*ann["translation"], 1.0], dtype=torch.float32
)
center_lidar = (global_to_lidar @ center_global)[:3]
# Dimensions: nuScenes stores (w, l, h), we want (l, w, h)
w, l, h = ann["size"]
# Rotation: quaternion -> yaw
yaw = _quaternion_to_yaw(ann["rotation"], global_to_lidar[:3, :3])
boxes.append(
[
center_lidar[0].item(),
center_lidar[1].item(),
center_lidar[2].item(),
l,
w,
h,
yaw,
]
)
if not boxes:
return {
"boxes": BoundingBoxes3D(
torch.zeros(0, 7), format=BoundingBox3DFormat.XYZLWHY
),
"labels": torch.zeros(0, dtype=torch.int64),
}
return {
"boxes": BoundingBoxes3D(
torch.tensor(boxes, dtype=torch.float32),
format=BoundingBox3DFormat.XYZLWHY,
),
"labels": torch.tensor(label_ids, dtype=torch.int64),
}
def _make_transform(translation: list[float], rotation_wxyz: list[float]) -> Tensor:
"""Build a 4x4 transform from translation + quaternion (wxyz).
Returns:
``[4, 4]`` homogeneous transform matrix.
"""
from pyquaternion import Quaternion
q = Quaternion(rotation_wxyz)
T = torch.eye(4, dtype=torch.float32)
T[:3, :3] = torch.tensor(q.rotation_matrix, dtype=torch.float32)
T[:3, 3] = torch.tensor(translation, dtype=torch.float32)
return T
def _quaternion_to_yaw(
rotation_wxyz: list[float], global_to_lidar_rot: Tensor
) -> float:
"""Convert a global-frame quaternion to yaw angle in lidar frame.
Args:
rotation_wxyz: Quaternion in wxyz format (global frame).
global_to_lidar_rot: ``[3, 3]`` rotation from global to lidar.
Returns:
Yaw angle in radians.
"""
from pyquaternion import Quaternion
q = Quaternion(rotation_wxyz)
# Forward vector in global frame
forward_global = q.rotate(np.array([1.0, 0.0, 0.0]))
# Rotate to lidar frame
forward_lidar = (
global_to_lidar_rot @ torch.tensor(forward_global, dtype=torch.float32)
).numpy()
# Yaw = atan2(y, x) in lidar frame
return float(np.arctan2(forward_lidar[1], forward_lidar[0]))
def _get_split_scenes(version: str, split: str) -> set[str]:
"""Get scene names for the given version and split.
Uses the official split definitions from ``nuscenes.utils.splits``.
Returns:
Set of scene name strings.
Raises:
ValueError: If version/split combination is not supported.
"""
from nuscenes.utils import splits
version_to_splits: dict[str, dict[str, list[str]]] = {
"v1.0-mini": {"train": splits.mini_train, "val": splits.mini_val},
"v1.0-trainval": {"train": splits.train, "val": splits.val},
"v1.0-test": {"test": splits.test},
}
if version not in version_to_splits:
msg = f"Unsupported version: {version}"
raise ValueError(msg)
split_map = version_to_splits[version]
if split not in split_map:
msg = f"Unsupported split '{split}' for version '{version}'"
raise ValueError(msg)
return set(split_map[split])