Pointcept:Examples:Weld
배관 원주 용접(C-seam) Point Cloud 데이터의 Semantic Segmentation 훈련 가이드
| WARNING |
| Claude Code 에서 생성된 코드들 이다. 코드 검증이 꼭 필요하다. |
클래스 정의
| ID | 클래스명 | 설명 |
| 0 | | 배관 본체 표면 |
| 1 | | 정상 용접 비드 |
| 2 | | 기공 결함 |
| 3 | | 균열 결함 |
| 4 | | 언더컷 결함 |
| 5 | | 스패터 |
| 6 | | 용입 부족 결함 |
pointcept/datasets/pipe_weld.py
데이터셋 클래스 (PipeWeldDataset) 정의
"""
Pipe C-Seam Weld Dataset
Dataset for semantic segmentation of pipe circumferential seam (C-seam) welds.
Supports point cloud data from 3D scanners (structured light, laser profilers, etc.)
Classes:
0: pipe - pipe body surface
1: weld_bead - normal weld bead
2: porosity - porosity defect
3: crack - crack defect
4: undercut - undercut defect
5: spatter - weld spatter
6: incomplete_fusion - incomplete fusion defect
"""
import os
import glob
import numpy as np
from copy import deepcopy
from .builder import DATASETS
from .defaults import DefaultDataset
@DATASETS.register_module()
class PipeWeldDataset(DefaultDataset):
VALID_ASSETS = [
"coord",
"color",
"normal",
"strength",
"segment",
"instance",
]
class2id = {
"pipe": 0,
"weld_bead": 1,
"porosity": 2,
"crack": 3,
"undercut": 4,
"spatter": 5,
"incomplete_fusion": 6,
}
def get_data_name(self, idx):
data_path = self.data_list[idx % len(self.data_list)]
return os.path.basename(data_path)
def get_data(self, idx):
data_dict = super().get_data(idx)
# If strength (intensity) exists, use it as additional feature
if "strength" in data_dict:
data_dict["strength"] = data_dict["strength"].reshape(-1, 1).astype(
np.float32
)
return data_dict
pointcept/datasets/init.py
데이터셋 등록 한다.
from .defaults import DefaultDataset, DefaultImagePointDataset, ConcatDataset
from .builder import build_dataset
from .utils import point_collate_fn, collate_fn
# indoor scene
from .s3dis import S3DISDataset
from .scannet import ScanNetDataset, ScanNet200Dataset
from .scannetpp import ScanNetPPDataset
from .scannet_pair import ScanNetPairDataset
from .hm3d import HM3DDataset
from .structure3d import Structured3DDataset
from .aeo import AEODataset
# outdoor scene
from .semantic_kitti import SemanticKITTIDataset
from .nuscenes import NuScenesDataset
from .waymo import WaymoDataset
# object
from .modelnet import ModelNetDataset
from .shapenet_part import ShapeNetPartDataset
# industrial
from .pipe_weld import PipeWeldDataset
# dataloader
from .dataloader import MultiDatasetDataloader
데이터 디렉토리 구조
원본 데이터 (전처리 전)
data/pipe_weld_raw/
├── train/
│ ├── sample_001.ply # PLY, PCD, XYZ, CSV 지원
│ ├── sample_001.labels # (선택) 포인트별 라벨 파일
│ ├── sample_002.ply
│ └── ...
├── val/
│ └── ...
└── test/
└── ...
처리된 데이터 (Pointcept 포맷)
data/pipe_weld/
├── train/
│ ├── sample_001/
│ │ ├── coord.npy # (N, 3) float32 - XYZ 좌표
│ │ ├── color.npy # (N, 3) float32 - RGB 색상
│ │ ├── normal.npy # (N, 3) float32 - 법선 벡터
│ │ ├── strength.npy # (N, 1) float32 - 반사 강도 (선택)
│ │ └── segment.npy # (N,) int32 - 클래스 라벨
│ ├── sample_002/
│ └── ...
├── val/
│ └── ...
└── test/
└── ...
훈련 절차
Step 1: 샘플 데이터 생성 (테스트용)
실제 데이터 없이 파이프라인을 검증하기 위한 합성 데이터 생성:
python3 pointcept/datasets/preprocessing/pipe_weld/generate_sample_data.py \
--output_root data/pipe_weld \
--num_train 20 \
--num_val 5 \
--num_test 5 \
--seed 42
pointcept/datasets/preprocessing/pipe_weld/generate_sample_data.py
샘플 코드 자체는 그냥 랜덤으로 이미지를 만드는 거다:
"""
Generate synthetic sample data for Pipe C-Seam Weld dataset.
Creates realistic synthetic pipe + weld bead point clouds with defect labels
for testing the training pipeline before real data is available.
Usage:
python generate_sample_data.py --output_root data/pipe_weld --num_train 20 --num_val 5 --num_test 5
"""
import os
import argparse
import numpy as np
def generate_pipe_cylinder(
radius=50.0, length=200.0, num_points=50000, noise_std=0.1
):
"""Generate a cylindrical pipe surface point cloud (along Z-axis)."""
theta = np.random.uniform(0, 2 * np.pi, num_points)
z = np.random.uniform(-length / 2, length / 2, num_points)
x = radius * np.cos(theta) + np.random.normal(0, noise_std, num_points)
y = radius * np.sin(theta) + np.random.normal(0, noise_std, num_points)
z = z + np.random.normal(0, noise_std, num_points)
coords = np.stack([x, y, z], axis=-1).astype(np.float32)
# Normals point outward radially
normals = np.stack([np.cos(theta), np.sin(theta), np.zeros(num_points)], axis=-1)
normals = normals.astype(np.float32)
return coords, normals, theta, z
def generate_weld_bead(
radius=50.0,
z_center=0.0,
bead_width=5.0,
bead_height=2.0,
num_points=10000,
noise_std=0.15,
):
"""Generate a circumferential weld bead (C-seam) around the pipe."""
theta = np.random.uniform(0, 2 * np.pi, num_points)
z_offset = np.random.normal(0, bead_width / 4, num_points)
# Weld bead profile: Gaussian-like cross-section
bead_profile = bead_height * np.exp(
-0.5 * (z_offset / (bead_width / 4)) ** 2
)
r = radius + bead_profile + np.random.normal(0, noise_std, num_points)
x = r * np.cos(theta)
y = r * np.sin(theta)
z = z_center + z_offset + np.random.normal(0, noise_std, num_points)
coords = np.stack([x, y, z], axis=-1).astype(np.float32)
# Normals (approximately radial)
normals = np.stack([np.cos(theta), np.sin(theta), np.zeros(num_points)], axis=-1)
normals = normals.astype(np.float32)
return coords, normals, theta
def generate_defects(
radius=50.0,
z_center=0.0,
bead_width=5.0,
bead_height=2.0,
num_defects=3,
points_per_defect=500,
):
"""Generate various weld defects near the weld bead."""
all_coords = []
all_normals = []
all_labels = []
defect_types = {
2: "porosity",
3: "crack",
4: "undercut",
5: "spatter",
6: "incomplete_fusion",
}
for _ in range(num_defects):
defect_class = np.random.choice(list(defect_types.keys()))
theta_center = np.random.uniform(0, 2 * np.pi)
n = points_per_defect
if defect_class == 2: # porosity - small spherical voids
theta = theta_center + np.random.normal(0, 0.05, n)
z = z_center + np.random.normal(0, bead_width / 6, n)
r_offset = bead_height * np.exp(
-0.5 * ((z - z_center) / (bead_width / 4)) ** 2
)
r = radius + r_offset + np.random.normal(0, 0.8, n)
# Porosity creates small dips
r -= np.abs(np.random.normal(0, 0.5, n))
elif defect_class == 3: # crack - linear along weld
theta = theta_center + np.linspace(-0.1, 0.1, n) + np.random.normal(
0, 0.005, n
)
z = z_center + np.random.normal(0, bead_width / 8, n)
r_offset = bead_height * np.exp(
-0.5 * ((z - z_center) / (bead_width / 4)) ** 2
)
r = radius + r_offset + np.random.normal(0, 0.3, n)
# Crack creates a groove
r -= np.abs(np.random.normal(0.3, 0.2, n))
elif defect_class == 4: # undercut - groove at weld toe
theta = theta_center + np.random.normal(0, 0.08, n)
z = z_center + np.sign(np.random.randn(n)) * (
bead_width / 2 + np.abs(np.random.normal(0, bead_width / 6, n))
)
r = radius - np.abs(np.random.normal(0.5, 0.3, n))
elif defect_class == 5: # spatter - scattered droplets
theta = theta_center + np.random.normal(0, 0.3, n)
z = z_center + np.random.normal(0, bead_width, n)
r = radius + np.abs(np.random.normal(1.0, 0.5, n))
else: # incomplete_fusion - flat region at weld root
theta = theta_center + np.random.normal(0, 0.06, n)
z = z_center + np.random.normal(0, bead_width / 4, n)
r = radius + np.random.normal(0, 0.2, n) # barely raised
x = r * np.cos(theta)
y = r * np.sin(theta)
coords = np.stack([x, y, z], axis=-1).astype(np.float32)
normals = np.stack(
[np.cos(theta), np.sin(theta), np.zeros(n)], axis=-1
).astype(np.float32)
all_coords.append(coords)
all_normals.append(normals)
all_labels.append(np.full(n, defect_class, dtype=np.int32))
return (
np.concatenate(all_coords),
np.concatenate(all_normals),
np.concatenate(all_labels),
)
def generate_sample(sample_id, output_dir, pipe_radius=None):
"""Generate one complete pipe weld sample."""
os.makedirs(output_dir, exist_ok=True)
# Random pipe parameters
if pipe_radius is None:
pipe_radius = np.random.uniform(30.0, 80.0)
pipe_length = np.random.uniform(150.0, 300.0)
bead_width = np.random.uniform(3.0, 8.0)
bead_height = np.random.uniform(1.0, 3.5)
z_center = np.random.uniform(-20.0, 20.0)
num_pipe_pts = np.random.randint(40000, 80000)
num_weld_pts = np.random.randint(8000, 15000)
num_defects = np.random.randint(1, 6)
pts_per_defect = np.random.randint(200, 800)
# Generate pipe body
pipe_coords, pipe_normals, _, _ = generate_pipe_cylinder(
radius=pipe_radius, length=pipe_length, num_points=num_pipe_pts
)
pipe_labels = np.zeros(num_pipe_pts, dtype=np.int32) # class 0: pipe
# Generate weld bead
weld_coords, weld_normals, _ = generate_weld_bead(
radius=pipe_radius,
z_center=z_center,
bead_width=bead_width,
bead_height=bead_height,
num_points=num_weld_pts,
)
weld_labels = np.ones(num_weld_pts, dtype=np.int32) # class 1: weld_bead
# Generate defects
defect_coords, defect_normals, defect_labels = generate_defects(
radius=pipe_radius,
z_center=z_center,
bead_width=bead_width,
bead_height=bead_height,
num_defects=num_defects,
points_per_defect=pts_per_defect,
)
# Combine all
coord = np.concatenate([pipe_coords, weld_coords, defect_coords], axis=0)
normal = np.concatenate([pipe_normals, weld_normals, defect_normals], axis=0)
segment = np.concatenate([pipe_labels, weld_labels, defect_labels], axis=0)
# Generate synthetic color (grayish metal with weld discoloration)
color = np.ones((len(coord), 3), dtype=np.float32) * 160 # base gray
color += np.random.normal(0, 10, color.shape) # noise
# Weld bead: slightly darker/different color
weld_mask = segment == 1
color[weld_mask] = color[weld_mask] * 0.7 + np.array([30, 20, 10])
# Defects: reddish tint
defect_mask = segment >= 2
color[defect_mask] = color[defect_mask] * 0.8 + np.array([40, -10, -10])
color = np.clip(color, 0, 255).astype(np.float32)
# Save
np.save(os.path.join(output_dir, "coord.npy"), coord)
np.save(os.path.join(output_dir, "normal.npy"), normal)
np.save(os.path.join(output_dir, "color.npy"), color)
np.save(os.path.join(output_dir, "segment.npy"), segment)
return len(coord), {
i: int(np.sum(segment == i)) for i in range(7) if np.sum(segment == i) > 0
}
def main():
parser = argparse.ArgumentParser(
description="Generate synthetic pipe weld point cloud samples"
)
parser.add_argument(
"--output_root",
type=str,
default="data/pipe_weld",
help="Output root directory",
)
parser.add_argument("--num_train", type=int, default=20)
parser.add_argument("--num_val", type=int, default=5)
parser.add_argument("--num_test", type=int, default=5)
parser.add_argument("--seed", type=int, default=42)
args = parser.parse_args()
np.random.seed(args.seed)
splits = {
"train": args.num_train,
"val": args.num_val,
"test": args.num_test,
}
for split, count in splits.items():
print(f"\nGenerating {split}: {count} samples")
split_dir = os.path.join(args.output_root, split)
for i in range(count):
sample_name = f"pipe_weld_{split}_{i:04d}"
sample_dir = os.path.join(split_dir, sample_name)
n_pts, class_dist = generate_sample(i, sample_dir)
print(f" [{i + 1}/{count}] {sample_name}: {n_pts} points, classes: {class_dist}")
print(f"\nDone! Sample data saved to: {args.output_root}")
print("\nClass mapping:")
print(" 0: pipe (pipe body)")
print(" 1: weld_bead (normal weld)")
print(" 2: porosity")
print(" 3: crack")
print(" 4: undercut")
print(" 5: spatter")
print(" 6: incomplete_fusion")
if __name__ == "__main__":
main()
Step 2: 실제 데이터 전처리
실제 스캔 데이터(PLY, PCD, XYZ, CSV)를 Pointcept npy 포맷으로 변환:
python3 pointcept/datasets/preprocessing/pipe_weld/preprocess_pipe_weld.py \
--raw_root data/pipe_weld_raw \
--output_root data/pipe_weld
pointcept/datasets/preprocessing/pipe_weld/preprocess_pipe_weld.py
변환하는 Python 코드는 다음과 같다:
"""
Preprocessing script for Pipe C-Seam Weld Point Cloud data.
Converts raw point cloud files (PLY, PCD, XYZ, CSV) into the Pointcept
.npy format used by DefaultDataset.
Expected raw data structure:
data/pipe_weld_raw/
train/
sample_001.ply (or .pcd, .xyz, .csv)
sample_001.labels (optional: per-point integer labels, one per line)
val/
...
test/
...
Output structure:
data/pipe_weld/
train/
sample_001/
coord.npy (N, 3) float32
color.npy (N, 3) float32 (if available)
normal.npy (N, 3) float32 (if available)
strength.npy (N, 1) float32 (if available: intensity/reflectance)
segment.npy (N,) int32
val/
...
test/
...
Usage:
python preprocess_pipe_weld.py --raw_root data/pipe_weld_raw --output_root data/pipe_weld
"""
import os
import argparse
import numpy as np
import glob
from pathlib import Path
try:
import open3d as o3d
except ImportError:
o3d = None
print(
"Warning: open3d not installed. Only CSV/XYZ formats will be supported. "
"Install with: pip install open3d"
)
def load_point_cloud(filepath):
"""Load point cloud from various formats. Returns dict with available fields."""
ext = os.path.splitext(filepath)[1].lower()
result = {}
if ext in (".ply", ".pcd") and o3d is not None:
pcd = o3d.io.read_point_cloud(filepath)
result["coord"] = np.asarray(pcd.points, dtype=np.float32)
if pcd.has_colors():
result["color"] = (np.asarray(pcd.colors) * 255).astype(np.float32)
if pcd.has_normals():
result["normal"] = np.asarray(pcd.normals, dtype=np.float32)
elif ext == ".xyz":
# Format: x y z [r g b] [nx ny nz]
data = np.loadtxt(filepath, dtype=np.float32)
result["coord"] = data[:, :3]
if data.shape[1] >= 6:
result["color"] = data[:, 3:6]
if data.shape[1] >= 9:
result["normal"] = data[:, 6:9]
elif ext == ".csv":
# Format: x,y,z[,r,g,b][,nx,ny,nz][,intensity][,label]
# Header row expected
data = np.genfromtxt(filepath, delimiter=",", skip_header=1, dtype=np.float32)
result["coord"] = data[:, :3]
if data.shape[1] >= 6:
result["color"] = data[:, 3:6]
if data.shape[1] >= 9:
result["normal"] = data[:, 6:9]
if data.shape[1] >= 10:
result["strength"] = data[:, 9:10]
if data.shape[1] >= 11:
result["segment"] = data[:, 10].astype(np.int32)
elif ext == ".npy":
data = np.load(filepath)
result["coord"] = data[:, :3].astype(np.float32)
if data.shape[1] >= 6:
result["color"] = data[:, 3:6].astype(np.float32)
if data.shape[1] >= 9:
result["normal"] = data[:, 6:9].astype(np.float32)
else:
raise ValueError(f"Unsupported file format: {ext}")
return result
def estimate_normals(coords, knn=30):
"""Estimate normals using open3d if available."""
if o3d is None:
return None
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(coords)
pcd.estimate_normals(
search_param=o3d.geometry.KDTreeSearchParamKNN(knn=knn)
)
pcd.orient_normals_consistent_tangent_plane(k=knn)
return np.asarray(pcd.normals, dtype=np.float32)
def load_labels(label_path, num_points):
"""Load per-point labels from a file."""
if os.path.exists(label_path):
labels = np.loadtxt(label_path, dtype=np.int32)
assert len(labels) == num_points, (
f"Label count ({len(labels)}) != point count ({num_points})"
)
return labels
return np.zeros(num_points, dtype=np.int32) # default: all pipe (class 0)
def process_one(raw_path, output_dir, estimate_normal=True):
"""Process a single point cloud file into .npy format."""
stem = Path(raw_path).stem
sample_dir = os.path.join(output_dir, stem)
os.makedirs(sample_dir, exist_ok=True)
data = load_point_cloud(raw_path)
coord = data["coord"]
num_points = len(coord)
# Save coord
np.save(os.path.join(sample_dir, "coord.npy"), coord)
# Save color (if available)
if "color" in data:
np.save(os.path.join(sample_dir, "color.npy"), data["color"])
# Save or estimate normals
if "normal" in data:
np.save(os.path.join(sample_dir, "normal.npy"), data["normal"])
elif estimate_normal:
normals = estimate_normals(coord)
if normals is not None:
np.save(os.path.join(sample_dir, "normal.npy"), normals)
# Save strength/intensity
if "strength" in data:
np.save(os.path.join(sample_dir, "strength.npy"), data["strength"])
# Save segment labels
if "segment" in data:
segment = data["segment"]
else:
# Look for separate label file
label_path = os.path.splitext(raw_path)[0] + ".labels"
segment = load_labels(label_path, num_points)
np.save(os.path.join(sample_dir, "segment.npy"), segment)
return num_points
def main():
parser = argparse.ArgumentParser(
description="Preprocess pipe weld point cloud data for Pointcept"
)
parser.add_argument(
"--raw_root",
type=str,
required=True,
help="Root directory of raw point cloud data",
)
parser.add_argument(
"--output_root",
type=str,
required=True,
help="Output root directory for processed data",
)
parser.add_argument(
"--no_normal_estimation",
action="store_true",
help="Skip normal estimation for files without normals",
)
args = parser.parse_args()
supported_exts = ("*.ply", "*.pcd", "*.xyz", "*.csv", "*.npy")
for split in ["train", "val", "test"]:
raw_split_dir = os.path.join(args.raw_root, split)
if not os.path.exists(raw_split_dir):
print(f"Skipping {split}: directory not found")
continue
output_split_dir = os.path.join(args.output_root, split)
os.makedirs(output_split_dir, exist_ok=True)
files = []
for ext in supported_exts:
files.extend(glob.glob(os.path.join(raw_split_dir, ext)))
files.sort()
print(f"\nProcessing {split}: {len(files)} files")
for i, f in enumerate(files):
n_pts = process_one(
f,
output_split_dir,
estimate_normal=not args.no_normal_estimation,
)
print(f" [{i + 1}/{len(files)}] {os.path.basename(f)} -> {n_pts} points")
print("\nDone! Processed data saved to:", args.output_root)
if __name__ == "__main__":
main()
지원 입력 포맷:
| 포맷 | 확장자 | 필요 라이브러리 | 비고 |
| PLY | | open3d | XYZ + RGB + Normal 자동 파싱 |
| PCD | | open3d | Point Cloud Library 포맷 |
| XYZ | | numpy | |
| CSV | | numpy | |
| NPY | | numpy | numpy 배열 (N, 3+) |
라벨 파일:
- CSV 내
label열에 포함되거나 - 별도
.labels파일 (포인트당 한 줄, 정수 라벨) - 라벨 파일 없으면 모든 포인트가
0(pipe)으로 기본 설정
Step 3: 훈련 실행
# 단일 GPU
python3 tools/train.py configs/pipe_weld/semseg-spunet-v1m1-0-base.py
# 멀티 GPU (예: 4장)
python3 tools/train.py configs/pipe_weld/semseg-spunet-v1m1-0-base.py \
--num-gpus 4
configs/pipe_weld/semseg-spunet-v1m1-0-base.py
_base_ = ["../_base_/default_runtime.py"]
# misc custom setting
batch_size = 8 # total bs in all gpus (pipe weld samples are smaller than indoor scenes)
mix_prob = 0
empty_cache = False
enable_amp = True
# model settings
model = dict(
type="DefaultSegmentor",
backbone=dict(
type="SpUNet-v1m1",
in_channels=6, # color(3) + normal(3)
num_classes=7,
channels=(32, 64, 128, 256, 256, 128, 96, 96),
layers=(2, 3, 4, 6, 2, 2, 2, 2),
),
criteria=[
dict(
type="CrossEntropyLoss",
loss_weight=1.0,
ignore_index=-1,
# Class weights to handle imbalance (defects are rare)
# pipe, weld_bead, porosity, crack, undercut, spatter, incomplete_fusion
weight=[0.5, 1.0, 5.0, 8.0, 6.0, 4.0, 7.0],
),
],
)
# scheduler settings
epoch = 400
optimizer = dict(type="SGD", lr=0.05, momentum=0.9, weight_decay=0.0001, nesterov=True)
scheduler = dict(
type="OneCycleLR",
max_lr=optimizer["lr"],
pct_start=0.05,
anneal_strategy="cos",
div_factor=10.0,
final_div_factor=10000.0,
)
# dataset settings
dataset_type = "PipeWeldDataset"
data_root = "data/pipe_weld"
data = dict(
num_classes=7,
ignore_index=-1,
names=[
"pipe",
"weld_bead",
"porosity",
"crack",
"undercut",
"spatter",
"incomplete_fusion",
],
train=dict(
type=dataset_type,
split="train",
data_root=data_root,
transform=[
dict(type="CenterShift", apply_z=True),
dict(
type="RandomDropout",
dropout_ratio=0.2,
dropout_application_ratio=0.2,
),
dict(
type="RandomRotate",
angle=[-1, 1],
axis="z",
center=[0, 0, 0],
p=0.5,
),
dict(
type="RandomRotate",
angle=[-1 / 64, 1 / 64],
axis="x",
p=0.5,
),
dict(
type="RandomRotate",
angle=[-1 / 64, 1 / 64],
axis="y",
p=0.5,
),
dict(type="RandomScale", scale=[0.9, 1.1]),
dict(type="RandomFlip", p=0.5),
dict(type="RandomJitter", sigma=0.005, clip=0.02),
dict(
type="ElasticDistortion",
distortion_params=[[0.2, 0.4], [0.8, 1.6]],
),
dict(type="ChromaticAutoContrast", p=0.2, blend_factor=None),
dict(type="ChromaticTranslation", p=0.95, ratio=0.05),
dict(type="ChromaticJitter", p=0.95, std=0.05),
dict(
type="GridSample",
grid_size=0.5, # 0.5mm voxel for weld inspection
hash_type="fnv",
mode="train",
return_grid_coord=True,
),
dict(type="SphereCrop", point_max=100000, mode="random"),
dict(type="CenterShift", apply_z=False),
dict(type="NormalizeColor"),
dict(type="ShufflePoint"),
dict(type="ToTensor"),
dict(
type="Collect",
keys=("coord", "grid_coord", "segment"),
feat_keys=("color", "normal"),
),
],
test_mode=False,
),
val=dict(
type=dataset_type,
split="val",
data_root=data_root,
transform=[
dict(type="CenterShift", apply_z=True),
dict(type="Copy", keys_dict={"segment": "origin_segment"}),
dict(
type="GridSample",
grid_size=0.5,
hash_type="fnv",
mode="train",
return_grid_coord=True,
return_inverse=True,
),
dict(type="CenterShift", apply_z=False),
dict(type="NormalizeColor"),
dict(type="ToTensor"),
dict(
type="Collect",
keys=(
"coord",
"grid_coord",
"segment",
"origin_segment",
"inverse",
),
feat_keys=("color", "normal"),
),
],
test_mode=False,
),
test=dict(
type=dataset_type,
split="val",
data_root=data_root,
transform=[
dict(type="CenterShift", apply_z=True),
dict(type="NormalizeColor"),
],
test_mode=True,
test_cfg=dict(
voxelize=dict(
type="GridSample",
grid_size=0.5,
hash_type="fnv",
mode="test",
return_grid_coord=True,
),
crop=None,
post_transform=[
dict(type="CenterShift", apply_z=False),
dict(type="ToTensor"),
dict(
type="Collect",
keys=("coord", "grid_coord", "index"),
feat_keys=("color", "normal"),
),
],
aug_transform=[
[
dict(
type="RandomRotateTargetAngle",
angle=[0],
axis="z",
center=[0, 0, 0],
p=1,
)
],
[
dict(
type="RandomRotateTargetAngle",
angle=[1 / 2],
axis="z",
center=[0, 0, 0],
p=1,
)
],
[
dict(
type="RandomRotateTargetAngle",
angle=[1],
axis="z",
center=[0, 0, 0],
p=1,
)
],
[
dict(
type="RandomRotateTargetAngle",
angle=[3 / 2],
axis="z",
center=[0, 0, 0],
p=1,
)
],
],
),
),
)
# hook
hooks = [
dict(type="CheckpointLoader"),
dict(type="ModelHook"),
dict(type="IterationTimer", warmup_iter=2),
dict(type="InformationWriter"),
dict(type="SemSegEvaluator"),
dict(type="CheckpointSaver", save_freq=None),
dict(type="PreciseEvaluator", test_last=False),
]
Step 4: 테스트/추론
python3 tools/test.py configs/pipe_weld/semseg-spunet-v1m1-0-base.py \
--options weight=exp/pipe_weld/model/model_best.pth
모델 설정
아키텍처: SpUNet-v1m1
Backbone: Sparse U-Net v1m1
├── in_channels: 6 (color 3 + normal 3)
├── num_classes: 7
├── channels: (32, 64, 128, 256, 256, 128, 96, 96)
└── layers: (2, 3, 4, 6, 2, 2, 2, 2)
손실 함수
- CrossEntropyLoss with class weights:
[0.5, 1.0, 5.0, 8.0, 6.0, 4.0, 7.0] - 결함 클래스가 희소하므로 가중치를 높게 설정 (crack 최대 8.0)
옵티마이저 & 스케줄러
| 항목 | 설정 |
| Optimizer | SGD (lr=0.05, momentum=0.9, weight_decay=1e-4, nesterov) |
| Scheduler | OneCycleLR (pct_start=0.05, cosine annealing) |
| Epoch | 400 |
| Batch Size | 8 |
| AMP | Enabled (float16) |
데이터 증강 (Transform)
훈련 시 적용되는 변환
| 변환 | 파라미터 | 용도 |
| | | 좌표 중심 이동 | |
| | | 포인트 랜덤 제거 | |
| | | Z축 회전 | |
| | | X/Y축 미세 회전 | |
| | 랜덤 스케일링 | |
| | | 랜덤 반전 | |
| | | 포인트 노이즈 | |
| | [[0.2,0.4],[0.8,1.6]] | 탄성 변형 |
| | | 색상 자동 대비 | |
| | | 색상 이동 | |
| | | 색상 노이즈 | |
| | | 복셀화 (0.5 단위) | |
| | | 최대 포인트 수 제한 | |
| | - | 색상 [0,1] 정규화 |
검증/테스트 시 변환
-
CenterShift+GridSample+NormalizeColor+ToTensor - 테스트 시 4방향 회전 TTA (Test-Time Augmentation) 적용
핵심 설계 포인트
grid_size 조정
- 현재 설정:
grid_size=0.5 - 데이터 단위가 mm 이면 →
grid_size=0.5(0.5mm 복셀) - 데이터 단위가 m 이면 →
grid_size=0.0005(0.5mm 복셀) - 용접 결함은 수 mm 단위이므로 스캐너 해상도에 맞춰 조정 필요
Normal 벡터의 중요성
- 용접 비드의 표면 형상(돌출, 함몰)이 결함 판별의 핵심 특징
- Normal이 없는 데이터는 전처리 시 open3d로 자동 추정 (
--no_normal_estimation으로 비활성화 가능)
클래스 불균형 대응
- 결함 포인트는 전체의 1-5% 수준
- CrossEntropyLoss class weight로 1차 대응
- 추가 대응:
loop파라미터로 데이터 반복, oversampling 고려
생성된 파일 목록
| 파일 경로 | 역할 |
| 데이터셋 클래스 (PipeWeldDataset) | |
| 데이터셋 등록 (import 추가) | |
| #pointcept/datasets/preprocessing/pipe_weld/preprocess_pipe_weld.py | 원본→npy 전처리 |
| #pointcept/datasets/preprocessing/pipe_weld/generate_sample_data.py | 합성 샘플 생성 |
| 훈련 Config |
공개 데이터셋 참고
| 데이터셋 | URL | 포맷 | 라이선스 |
| Mendeley - 3D Butt Weld Scans | TXT (X,Y,Z,I) | CC BY 4.0 | |
| Zenodo - SAW Repository | HDF5/TXT | CC BY 4.0 | |
| Kaggle - AAU Sewer Point Clouds | https://www.kaggle.com/datasets/aalborguniversity/sewerpointclouds | Point Cloud | CC BY 4.0 |
| Kaggle - OpenTrench3D | PLY | CC BY-NC 4.0 |