"""`KITTI 3D Object Detection <http://www.cvlibs.net/datasets/kitti/eval_object.php?obj_benchmark=3d>`_ Dataset."""
import csv
import os
from typing import Any, ClassVar, override
import numpy as np
import torch
from PIL import Image
from torch import Tensor
from torch.utils.data import Dataset
from torchvision.datasets.utils import download_and_extract_archive
from vision3d.datasets import FusionInputs, SampleTargets
from vision3d.tensors import (
BoundingBox3DFormat,
BoundingBoxes3D,
CameraExtrinsics,
CameraImages,
CameraIntrinsics,
PointCloud3D,
)
[docs]
class Kitti3D(Dataset[tuple[FusionInputs, SampleTargets | None]]):
"""`KITTI 3D <http://www.cvlibs.net/datasets/kitti/eval_object.php?obj_benchmark=3d>`_ Dataset.
Returns samples in **lidar frame** (X-forward, Y-left, Z-up), converting
from KITTI's camera-frame annotations automatically.
Args:
root (str or ``pathlib.Path``): Root directory where data is downloaded to.
Expects the following folder structure if download=False:
.. code::
<root>
└── Kitti3D/
└── raw/
├── training/
| ├── velodyne/
| ├── label_2/
| ├── calib/
| └── image_2/
└── testing/
├── velodyne/
├── calib/
└── image_2/
train (bool, optional): Use ``train`` split if true, else ``test`` split.
Defaults to ``True``.
transforms (Callable, optional): A function/transform that takes input
sample and its target as entry and returns a transformed version.
download (bool, optional): If true, downloads the dataset from the internet
and puts it in root directory. If dataset is already downloaded, it is
not downloaded again.
"""
data_url: ClassVar[str] = "https://s3.eu-central-1.amazonaws.com/avg-kitti/"
resources: ClassVar[list[str]] = [
"data_object_velodyne.zip",
"data_object_image_2.zip",
"data_object_label_2.zip",
"data_object_calib.zip",
]
velodyne_dir_name: ClassVar[str] = "velodyne"
image_dir_name: ClassVar[str] = "image_2"
labels_dir_name: ClassVar[str] = "label_2"
calib_dir_name: ClassVar[str] = "calib"
classes: ClassVar[list[str]] = [
"Car",
"Pedestrian",
"Cyclist",
"Van",
"Truck",
"Person_sitting",
"Tram",
"Misc",
]
class_to_idx: ClassVar[dict[str, int]] = {name: i for i, name in enumerate(classes)}
def __init__(
self,
root: str | os.PathLike[str],
train: bool = True,
transforms: Any | None = None,
download: bool = False,
) -> None:
self.root = str(root)
self.train = train
self.transforms = transforms
self._location = "training" if train else "testing"
if download:
self.download()
if not self._check_exists():
raise RuntimeError(
"Dataset not found. You may use download=True to download it."
)
velodyne_dir = os.path.join(
self._raw_folder, self._location, self.velodyne_dir_name
)
self._frame_ids = sorted(
os.path.splitext(f)[0]
for f in os.listdir(velodyne_dir)
if f.endswith(".bin")
)
def __len__(self) -> int:
"""Return the number of frames."""
return len(self._frame_ids)
@override
def __getitem__(self, index: int) -> tuple[FusionInputs, SampleTargets | None]:
"""Load a single frame.
Args:
index (int): Index.
Returns:
Tuple of ``(inputs, targets)``.
**inputs** is a dict with keys:
- ``"points"``: :class:`PointCloud3D` in lidar frame ``[N, 4]``
(x, y, z, intensity).
- ``"images"``: :class:`CameraImages` ``[1, 3, H, W]`` (left camera).
- ``"extrinsics"``: :class:`CameraExtrinsics` ``[1, 4, 4]``
(lidar-to-camera).
- ``"intrinsics"``: :class:`CameraIntrinsics` ``[1, 3, 3]``.
**targets** is a dict (training) or None (testing) with keys:
- ``"boxes"``: :class:`BoundingBoxes3D` in lidar frame,
format ``XYZLWHY``.
- ``"labels"``: :class:`~torch.Tensor` of class indices.
"""
frame_id = self._frame_ids[index]
base = os.path.join(self._raw_folder, self._location)
points = self._load_velodyne(base, frame_id)
calib = self._load_calib(base, frame_id)
image = self._load_image(base, frame_id)
# Filter points to camera FOV using K @ extrinsics[:3, :]
img_h, img_w = image.shape[2], image.shape[3]
K = calib["intrinsics"][0] # [3, 3]
ext = calib["extrinsics"][0] # [4, 4]
lidar_to_img = K @ ext[:3, :] # [3, 4]
fov_mask = _get_fov_mask(points[:, :3], lidar_to_img, img_h, img_w)
points = points[fov_mask]
inputs: FusionInputs = {
"points": PointCloud3D(points),
"images": CameraImages(image),
"extrinsics": CameraExtrinsics(calib["extrinsics"]),
"intrinsics": CameraIntrinsics(
calib["intrinsics"], image_size=(img_h, img_w)
),
}
targets = None
if self.train:
targets = self._load_targets(base, frame_id, calib)
if self.transforms is not None:
inputs, targets = self.transforms(inputs, targets)
return inputs, targets
@property
def _raw_folder(self) -> str:
return os.path.join(self.root, self.__class__.__name__, "raw")
def _check_exists(self) -> bool:
folders = [self.velodyne_dir_name, self.calib_dir_name]
if self.train:
folders.append(self.labels_dir_name)
return all(
os.path.isdir(os.path.join(self._raw_folder, self._location, d))
for d in folders
)
[docs]
def download(self) -> None:
"""Download the KITTI data if it doesn't exist already."""
if self._check_exists():
return
os.makedirs(self._raw_folder, exist_ok=True)
for fname in self.resources:
download_and_extract_archive(
url=f"{self.data_url}{fname}",
download_root=self._raw_folder,
filename=fname,
)
def _load_velodyne(self, base: str, frame_id: str) -> Tensor:
path = os.path.join(base, self.velodyne_dir_name, f"{frame_id}.bin")
points = np.fromfile(path, dtype=np.float32).reshape(-1, 4)
return torch.from_numpy(points)
def _load_image(self, base: str, frame_id: str) -> Tensor:
path = os.path.join(base, self.image_dir_name, f"{frame_id}.png")
if os.path.exists(path):
img = np.array(Image.open(path).convert("RGB"))
# [H, W, 3] -> [1, 3, H, W]
return torch.from_numpy(img).permute(2, 0, 1).unsqueeze(0).float() / 255.0
return torch.zeros(1, 3, 1, 1)
def _load_calib(self, base: str, frame_id: str) -> dict[str, Tensor]:
"""Parse KITTI calibration file.
Returns:
Dict with ``"extrinsics"`` (lidar-to-camera, ``[1, 4, 4]``) and
``"intrinsics"`` (camera P2 projection, ``[1, 3, 3]``).
"""
path = os.path.join(base, self.calib_dir_name, f"{frame_id}.txt")
calib_data: dict[str, np.ndarray] = {}
with open(path) as f:
for line in f:
if ":" not in line:
continue
key, value = line.split(":", 1)
calib_data[key.strip()] = np.array(
[float(x) for x in value.split()], dtype=np.float32
)
# P2: 3x4 projection matrix for left color camera
p2 = calib_data["P2"].reshape(3, 4)
K = p2[:, :3]
intrinsics = torch.from_numpy(K).unsqueeze(0) # [1, 3, 3]
# R0_rect: 3x3 rectification rotation
r0 = np.eye(4, dtype=np.float32)
r0[:3, :3] = calib_data["R0_rect"].reshape(3, 3)
# Tr_velo_to_cam: 3x4 velodyne-to-camera
velo_to_cam = np.eye(4, dtype=np.float32)
velo_to_cam[:3, :] = calib_data["Tr_velo_to_cam"].reshape(3, 4)
# P2's 4th column encodes a stereo baseline offset in camera frame.
# Fold it into the extrinsic so that K @ extrinsics[:3, :] gives
# exact pixel-accurate projection (matching P2 @ R0 @ Tr_velo_to_cam).
baseline = np.eye(4, dtype=np.float32)
baseline[:3, 3] = np.linalg.solve(K, p2[:, 3])
extrinsics = torch.from_numpy(baseline @ r0 @ velo_to_cam).unsqueeze(
0
) # [1, 4, 4]
return {"extrinsics": extrinsics, "intrinsics": intrinsics}
def _load_targets(
self,
base: str,
frame_id: str,
calib: dict[str, Tensor],
) -> SampleTargets:
"""Parse KITTI label file and convert to lidar frame.
Returns:
Dict with ``"boxes"`` (:class:`BoundingBoxes3D`, XYZLWHY format),
``"labels"`` (int tensor).
"""
path = os.path.join(base, self.labels_dir_name, f"{frame_id}.txt")
label_ids: list[int] = []
boxes_cam: list[list[float]] = []
with open(path) as f:
for line in csv.reader(f, delimiter=" "):
if not line or line[0] == "DontCare":
continue
label_ids.append(self.class_to_idx.get(line[0], -1))
# KITTI label: h, w, l, x, y, z, rotation_y (camera frame)
h, w, l = float(line[8]), float(line[9]), float(line[10])
x, y, z = float(line[11]), float(line[12]), float(line[13])
ry = float(line[14])
boxes_cam.append([x, y, z, h, w, l, ry])
if not boxes_cam:
return {
"boxes": BoundingBoxes3D(
torch.zeros(0, 7), format=BoundingBox3DFormat.XYZLWHY
),
"labels": torch.zeros(0, dtype=torch.int64),
}
boxes_cam_t = torch.tensor(boxes_cam, dtype=torch.float32)
boxes_lidar = _cam_to_lidar_boxes(boxes_cam_t, calib["extrinsics"][0])
return {
"boxes": BoundingBoxes3D(boxes_lidar, format=BoundingBox3DFormat.XYZLWHY),
"labels": torch.tensor(label_ids, dtype=torch.int64),
}
def _cam_to_lidar_boxes(boxes_cam: Tensor, extrinsics: Tensor) -> Tensor:
"""Convert KITTI camera-frame boxes to lidar-frame XYZLWHY format.
Args:
boxes_cam: ``[N, 7]`` boxes as ``(x, y, z, h, w, l, ry)`` in camera
frame.
extrinsics: ``[4, 4]`` lidar-to-camera matrix.
Returns:
``[N, 7]`` boxes as ``(cx, cy, cz, l, w, h, yaw)`` in lidar frame.
"""
x_cam, y_cam, z_cam = boxes_cam[:, 0], boxes_cam[:, 1], boxes_cam[:, 2]
h, w, l = boxes_cam[:, 3], boxes_cam[:, 4], boxes_cam[:, 5]
ry = boxes_cam[:, 6]
# KITTI location is at the bottom center of the box in camera frame.
# Camera Y points down, so shift up by half height to get the geometric center.
y_cam = y_cam - h / 2
# Transform center from camera to lidar using inverse extrinsics
ones = torch.ones_like(x_cam)
centers_cam = torch.stack([x_cam, y_cam, z_cam, ones], dim=-1) # [N, 4]
cam_to_lidar = torch.linalg.inv(extrinsics)
centers_lidar = (cam_to_lidar @ centers_cam.T).T[:, :3] # [N, 3]
# Dimensions: KITTI h,w,l -> our l,w,h (dx,dy,dz in lidar frame)
l_lidar = l # camera Z -> lidar X
w_lidar = w # camera X -> lidar Y
h_lidar = h # camera Y -> lidar Z
# Rotation: KITTI rotation_y is around camera Y (pointing down)
# In lidar frame, yaw is around Z (pointing up)
yaw = -ry - np.pi / 2
return torch.stack(
[
centers_lidar[:, 0],
centers_lidar[:, 1],
centers_lidar[:, 2],
l_lidar,
w_lidar,
h_lidar,
yaw,
],
dim=-1,
)
def _get_fov_mask(
points_3d: Tensor,
proj_matrix: Tensor,
img_h: int,
img_w: int,
) -> Tensor:
"""Get boolean mask for points that project into the camera image.
Args:
points_3d: ``[N, 3]`` 3D points.
proj_matrix: ``[3, 4]`` projection matrix that maps ``points_3d`` to
image coordinates (e.g. ``P2`` for camera-frame points, or
``P2 @ R0 @ Tr`` for lidar-frame points).
img_h: Image height in pixels.
img_w: Image width in pixels.
Returns:
Boolean mask ``[N]``. True for points with positive depth that project
within image bounds.
"""
n = points_3d.shape[0]
ones = torch.ones(n, 1, dtype=points_3d.dtype)
pts_hom = torch.cat([points_3d, ones], dim=1) # [N, 4]
# Project: [3, 4] @ [4, N] -> [3, N]
pts_img = (proj_matrix @ pts_hom.T).T # [N, 3]
depth = pts_img[:, 2]
u = pts_img[:, 0] / depth.clamp(min=1e-6)
v = pts_img[:, 1] / depth.clamp(min=1e-6)
valid = (depth > 0) & (u >= 0) & (u < img_w) & (v >= 0) & (v < img_h)
return valid