Skip to content

Pointcept:Examples:Weld

배관 원주 용접(C-seam) Point Cloud 데이터의 Semantic Segmentation 훈련 가이드

WARNING

Claude Code 에서 생성된 코드들 이다. 코드 검증이 꼭 필요하다.

클래스 정의

ID

클래스명

설명

0

pipe

배관 본체 표면

1

weld_bead

정상 용접 비드

2

porosity

기공 결함

3

crack

균열 결함

4

undercut

언더컷 결함

5

spatter

스패터

6

incomplete_fusion

용입 부족 결함

pointcept/datasets/pipe_weld.py

데이터셋 클래스 (PipeWeldDataset) 정의

"""
Pipe C-Seam Weld Dataset

Dataset for semantic segmentation of pipe circumferential seam (C-seam) welds.
Supports point cloud data from 3D scanners (structured light, laser profilers, etc.)

Classes:
    0: pipe       - pipe body surface
    1: weld_bead  - normal weld bead
    2: porosity   - porosity defect
    3: crack      - crack defect
    4: undercut   - undercut defect
    5: spatter    - weld spatter
    6: incomplete_fusion - incomplete fusion defect
"""

import os
import glob
import numpy as np
from copy import deepcopy

from .builder import DATASETS
from .defaults import DefaultDataset


@DATASETS.register_module()
class PipeWeldDataset(DefaultDataset):
    VALID_ASSETS = [
        "coord",
        "color",
        "normal",
        "strength",
        "segment",
        "instance",
    ]

    class2id = {
        "pipe": 0,
        "weld_bead": 1,
        "porosity": 2,
        "crack": 3,
        "undercut": 4,
        "spatter": 5,
        "incomplete_fusion": 6,
    }

    def get_data_name(self, idx):
        data_path = self.data_list[idx % len(self.data_list)]
        return os.path.basename(data_path)

    def get_data(self, idx):
        data_dict = super().get_data(idx)

        # If strength (intensity) exists, use it as additional feature
        if "strength" in data_dict:
            data_dict["strength"] = data_dict["strength"].reshape(-1, 1).astype(
                np.float32
            )
        return data_dict

pointcept/datasets/init.py

데이터셋 등록 한다.

from .defaults import DefaultDataset, DefaultImagePointDataset, ConcatDataset
from .builder import build_dataset
from .utils import point_collate_fn, collate_fn

# indoor scene
from .s3dis import S3DISDataset
from .scannet import ScanNetDataset, ScanNet200Dataset
from .scannetpp import ScanNetPPDataset
from .scannet_pair import ScanNetPairDataset
from .hm3d import HM3DDataset
from .structure3d import Structured3DDataset
from .aeo import AEODataset

# outdoor scene
from .semantic_kitti import SemanticKITTIDataset
from .nuscenes import NuScenesDataset
from .waymo import WaymoDataset

# object
from .modelnet import ModelNetDataset
from .shapenet_part import ShapeNetPartDataset

# industrial
from .pipe_weld import PipeWeldDataset

# dataloader
from .dataloader import MultiDatasetDataloader

데이터 디렉토리 구조

원본 데이터 (전처리 전)

data/pipe_weld_raw/
├── train/
│   ├── sample_001.ply       # PLY, PCD, XYZ, CSV 지원
│   ├── sample_001.labels    # (선택) 포인트별 라벨 파일
│   ├── sample_002.ply
│   └── ...
├── val/
│   └── ...
└── test/
    └── ...

처리된 데이터 (Pointcept 포맷)

data/pipe_weld/
├── train/
│   ├── sample_001/
│   │   ├── coord.npy      # (N, 3) float32 - XYZ 좌표
│   │   ├── color.npy      # (N, 3) float32 - RGB 색상
│   │   ├── normal.npy     # (N, 3) float32 - 법선 벡터
│   │   ├── strength.npy   # (N, 1) float32 - 반사 강도 (선택)
│   │   └── segment.npy    # (N,)   int32   - 클래스 라벨
│   ├── sample_002/
│   └── ...
├── val/
│   └── ...
└── test/
    └── ...

훈련 절차

Step 1: 샘플 데이터 생성 (테스트용)

실제 데이터 없이 파이프라인을 검증하기 위한 합성 데이터 생성:

python3 pointcept/datasets/preprocessing/pipe_weld/generate_sample_data.py \
    --output_root data/pipe_weld \
    --num_train 20 \
    --num_val 5 \
    --num_test 5 \
    --seed 42

pointcept/datasets/preprocessing/pipe_weld/generate_sample_data.py

샘플 코드 자체는 그냥 랜덤으로 이미지를 만드는 거다:

"""
Generate synthetic sample data for Pipe C-Seam Weld dataset.

Creates realistic synthetic pipe + weld bead point clouds with defect labels
for testing the training pipeline before real data is available.

Usage:
    python generate_sample_data.py --output_root data/pipe_weld --num_train 20 --num_val 5 --num_test 5
"""

import os
import argparse
import numpy as np


def generate_pipe_cylinder(
    radius=50.0, length=200.0, num_points=50000, noise_std=0.1
):
    """Generate a cylindrical pipe surface point cloud (along Z-axis)."""
    theta = np.random.uniform(0, 2 * np.pi, num_points)
    z = np.random.uniform(-length / 2, length / 2, num_points)

    x = radius * np.cos(theta) + np.random.normal(0, noise_std, num_points)
    y = radius * np.sin(theta) + np.random.normal(0, noise_std, num_points)
    z = z + np.random.normal(0, noise_std, num_points)

    coords = np.stack([x, y, z], axis=-1).astype(np.float32)

    # Normals point outward radially
    normals = np.stack([np.cos(theta), np.sin(theta), np.zeros(num_points)], axis=-1)
    normals = normals.astype(np.float32)

    return coords, normals, theta, z


def generate_weld_bead(
    radius=50.0,
    z_center=0.0,
    bead_width=5.0,
    bead_height=2.0,
    num_points=10000,
    noise_std=0.15,
):
    """Generate a circumferential weld bead (C-seam) around the pipe."""
    theta = np.random.uniform(0, 2 * np.pi, num_points)
    z_offset = np.random.normal(0, bead_width / 4, num_points)

    # Weld bead profile: Gaussian-like cross-section
    bead_profile = bead_height * np.exp(
        -0.5 * (z_offset / (bead_width / 4)) ** 2
    )
    r = radius + bead_profile + np.random.normal(0, noise_std, num_points)

    x = r * np.cos(theta)
    y = r * np.sin(theta)
    z = z_center + z_offset + np.random.normal(0, noise_std, num_points)

    coords = np.stack([x, y, z], axis=-1).astype(np.float32)

    # Normals (approximately radial)
    normals = np.stack([np.cos(theta), np.sin(theta), np.zeros(num_points)], axis=-1)
    normals = normals.astype(np.float32)

    return coords, normals, theta


def generate_defects(
    radius=50.0,
    z_center=0.0,
    bead_width=5.0,
    bead_height=2.0,
    num_defects=3,
    points_per_defect=500,
):
    """Generate various weld defects near the weld bead."""
    all_coords = []
    all_normals = []
    all_labels = []

    defect_types = {
        2: "porosity",
        3: "crack",
        4: "undercut",
        5: "spatter",
        6: "incomplete_fusion",
    }

    for _ in range(num_defects):
        defect_class = np.random.choice(list(defect_types.keys()))
        theta_center = np.random.uniform(0, 2 * np.pi)
        n = points_per_defect

        if defect_class == 2:  # porosity - small spherical voids
            theta = theta_center + np.random.normal(0, 0.05, n)
            z = z_center + np.random.normal(0, bead_width / 6, n)
            r_offset = bead_height * np.exp(
                -0.5 * ((z - z_center) / (bead_width / 4)) ** 2
            )
            r = radius + r_offset + np.random.normal(0, 0.8, n)
            # Porosity creates small dips
            r -= np.abs(np.random.normal(0, 0.5, n))

        elif defect_class == 3:  # crack - linear along weld
            theta = theta_center + np.linspace(-0.1, 0.1, n) + np.random.normal(
                0, 0.005, n
            )
            z = z_center + np.random.normal(0, bead_width / 8, n)
            r_offset = bead_height * np.exp(
                -0.5 * ((z - z_center) / (bead_width / 4)) ** 2
            )
            r = radius + r_offset + np.random.normal(0, 0.3, n)
            # Crack creates a groove
            r -= np.abs(np.random.normal(0.3, 0.2, n))

        elif defect_class == 4:  # undercut - groove at weld toe
            theta = theta_center + np.random.normal(0, 0.08, n)
            z = z_center + np.sign(np.random.randn(n)) * (
                bead_width / 2 + np.abs(np.random.normal(0, bead_width / 6, n))
            )
            r = radius - np.abs(np.random.normal(0.5, 0.3, n))

        elif defect_class == 5:  # spatter - scattered droplets
            theta = theta_center + np.random.normal(0, 0.3, n)
            z = z_center + np.random.normal(0, bead_width, n)
            r = radius + np.abs(np.random.normal(1.0, 0.5, n))

        else:  # incomplete_fusion - flat region at weld root
            theta = theta_center + np.random.normal(0, 0.06, n)
            z = z_center + np.random.normal(0, bead_width / 4, n)
            r = radius + np.random.normal(0, 0.2, n)  # barely raised

        x = r * np.cos(theta)
        y = r * np.sin(theta)
        coords = np.stack([x, y, z], axis=-1).astype(np.float32)
        normals = np.stack(
            [np.cos(theta), np.sin(theta), np.zeros(n)], axis=-1
        ).astype(np.float32)

        all_coords.append(coords)
        all_normals.append(normals)
        all_labels.append(np.full(n, defect_class, dtype=np.int32))

    return (
        np.concatenate(all_coords),
        np.concatenate(all_normals),
        np.concatenate(all_labels),
    )


def generate_sample(sample_id, output_dir, pipe_radius=None):
    """Generate one complete pipe weld sample."""
    os.makedirs(output_dir, exist_ok=True)

    # Random pipe parameters
    if pipe_radius is None:
        pipe_radius = np.random.uniform(30.0, 80.0)
    pipe_length = np.random.uniform(150.0, 300.0)
    bead_width = np.random.uniform(3.0, 8.0)
    bead_height = np.random.uniform(1.0, 3.5)
    z_center = np.random.uniform(-20.0, 20.0)

    num_pipe_pts = np.random.randint(40000, 80000)
    num_weld_pts = np.random.randint(8000, 15000)
    num_defects = np.random.randint(1, 6)
    pts_per_defect = np.random.randint(200, 800)

    # Generate pipe body
    pipe_coords, pipe_normals, _, _ = generate_pipe_cylinder(
        radius=pipe_radius, length=pipe_length, num_points=num_pipe_pts
    )
    pipe_labels = np.zeros(num_pipe_pts, dtype=np.int32)  # class 0: pipe

    # Generate weld bead
    weld_coords, weld_normals, _ = generate_weld_bead(
        radius=pipe_radius,
        z_center=z_center,
        bead_width=bead_width,
        bead_height=bead_height,
        num_points=num_weld_pts,
    )
    weld_labels = np.ones(num_weld_pts, dtype=np.int32)  # class 1: weld_bead

    # Generate defects
    defect_coords, defect_normals, defect_labels = generate_defects(
        radius=pipe_radius,
        z_center=z_center,
        bead_width=bead_width,
        bead_height=bead_height,
        num_defects=num_defects,
        points_per_defect=pts_per_defect,
    )

    # Combine all
    coord = np.concatenate([pipe_coords, weld_coords, defect_coords], axis=0)
    normal = np.concatenate([pipe_normals, weld_normals, defect_normals], axis=0)
    segment = np.concatenate([pipe_labels, weld_labels, defect_labels], axis=0)

    # Generate synthetic color (grayish metal with weld discoloration)
    color = np.ones((len(coord), 3), dtype=np.float32) * 160  # base gray
    color += np.random.normal(0, 10, color.shape)  # noise
    # Weld bead: slightly darker/different color
    weld_mask = segment == 1
    color[weld_mask] = color[weld_mask] * 0.7 + np.array([30, 20, 10])
    # Defects: reddish tint
    defect_mask = segment >= 2
    color[defect_mask] = color[defect_mask] * 0.8 + np.array([40, -10, -10])
    color = np.clip(color, 0, 255).astype(np.float32)

    # Save
    np.save(os.path.join(output_dir, "coord.npy"), coord)
    np.save(os.path.join(output_dir, "normal.npy"), normal)
    np.save(os.path.join(output_dir, "color.npy"), color)
    np.save(os.path.join(output_dir, "segment.npy"), segment)

    return len(coord), {
        i: int(np.sum(segment == i)) for i in range(7) if np.sum(segment == i) > 0
    }


def main():
    parser = argparse.ArgumentParser(
        description="Generate synthetic pipe weld point cloud samples"
    )
    parser.add_argument(
        "--output_root",
        type=str,
        default="data/pipe_weld",
        help="Output root directory",
    )
    parser.add_argument("--num_train", type=int, default=20)
    parser.add_argument("--num_val", type=int, default=5)
    parser.add_argument("--num_test", type=int, default=5)
    parser.add_argument("--seed", type=int, default=42)
    args = parser.parse_args()

    np.random.seed(args.seed)

    splits = {
        "train": args.num_train,
        "val": args.num_val,
        "test": args.num_test,
    }

    for split, count in splits.items():
        print(f"\nGenerating {split}: {count} samples")
        split_dir = os.path.join(args.output_root, split)

        for i in range(count):
            sample_name = f"pipe_weld_{split}_{i:04d}"
            sample_dir = os.path.join(split_dir, sample_name)
            n_pts, class_dist = generate_sample(i, sample_dir)
            print(f"  [{i + 1}/{count}] {sample_name}: {n_pts} points, classes: {class_dist}")

    print(f"\nDone! Sample data saved to: {args.output_root}")
    print("\nClass mapping:")
    print("  0: pipe (pipe body)")
    print("  1: weld_bead (normal weld)")
    print("  2: porosity")
    print("  3: crack")
    print("  4: undercut")
    print("  5: spatter")
    print("  6: incomplete_fusion")


if __name__ == "__main__":
    main()

Step 2: 실제 데이터 전처리

실제 스캔 데이터(PLY, PCD, XYZ, CSV)를 Pointcept npy 포맷으로 변환:

python3 pointcept/datasets/preprocessing/pipe_weld/preprocess_pipe_weld.py \
    --raw_root data/pipe_weld_raw \
    --output_root data/pipe_weld

pointcept/datasets/preprocessing/pipe_weld/preprocess_pipe_weld.py

변환하는 Python 코드는 다음과 같다:

"""
Preprocessing script for Pipe C-Seam Weld Point Cloud data.

Converts raw point cloud files (PLY, PCD, XYZ, CSV) into the Pointcept
.npy format used by DefaultDataset.

Expected raw data structure:
    data/pipe_weld_raw/
        train/
            sample_001.ply   (or .pcd, .xyz, .csv)
            sample_001.labels (optional: per-point integer labels, one per line)
        val/
            ...
        test/
            ...

Output structure:
    data/pipe_weld/
        train/
            sample_001/
                coord.npy    (N, 3) float32
                color.npy    (N, 3) float32 (if available)
                normal.npy   (N, 3) float32 (if available)
                strength.npy (N, 1) float32 (if available: intensity/reflectance)
                segment.npy  (N,)   int32
        val/
            ...
        test/
            ...

Usage:
    python preprocess_pipe_weld.py --raw_root data/pipe_weld_raw --output_root data/pipe_weld
"""

import os
import argparse
import numpy as np
import glob
from pathlib import Path

try:
    import open3d as o3d
except ImportError:
    o3d = None
    print(
        "Warning: open3d not installed. Only CSV/XYZ formats will be supported. "
        "Install with: pip install open3d"
    )


def load_point_cloud(filepath):
    """Load point cloud from various formats. Returns dict with available fields."""
    ext = os.path.splitext(filepath)[1].lower()
    result = {}

    if ext in (".ply", ".pcd") and o3d is not None:
        pcd = o3d.io.read_point_cloud(filepath)
        result["coord"] = np.asarray(pcd.points, dtype=np.float32)

        if pcd.has_colors():
            result["color"] = (np.asarray(pcd.colors) * 255).astype(np.float32)

        if pcd.has_normals():
            result["normal"] = np.asarray(pcd.normals, dtype=np.float32)

    elif ext == ".xyz":
        # Format: x y z [r g b] [nx ny nz]
        data = np.loadtxt(filepath, dtype=np.float32)
        result["coord"] = data[:, :3]
        if data.shape[1] >= 6:
            result["color"] = data[:, 3:6]
        if data.shape[1] >= 9:
            result["normal"] = data[:, 6:9]

    elif ext == ".csv":
        # Format: x,y,z[,r,g,b][,nx,ny,nz][,intensity][,label]
        # Header row expected
        data = np.genfromtxt(filepath, delimiter=",", skip_header=1, dtype=np.float32)
        result["coord"] = data[:, :3]
        if data.shape[1] >= 6:
            result["color"] = data[:, 3:6]
        if data.shape[1] >= 9:
            result["normal"] = data[:, 6:9]
        if data.shape[1] >= 10:
            result["strength"] = data[:, 9:10]
        if data.shape[1] >= 11:
            result["segment"] = data[:, 10].astype(np.int32)

    elif ext == ".npy":
        data = np.load(filepath)
        result["coord"] = data[:, :3].astype(np.float32)
        if data.shape[1] >= 6:
            result["color"] = data[:, 3:6].astype(np.float32)
        if data.shape[1] >= 9:
            result["normal"] = data[:, 6:9].astype(np.float32)

    else:
        raise ValueError(f"Unsupported file format: {ext}")

    return result


def estimate_normals(coords, knn=30):
    """Estimate normals using open3d if available."""
    if o3d is None:
        return None
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(coords)
    pcd.estimate_normals(
        search_param=o3d.geometry.KDTreeSearchParamKNN(knn=knn)
    )
    pcd.orient_normals_consistent_tangent_plane(k=knn)
    return np.asarray(pcd.normals, dtype=np.float32)


def load_labels(label_path, num_points):
    """Load per-point labels from a file."""
    if os.path.exists(label_path):
        labels = np.loadtxt(label_path, dtype=np.int32)
        assert len(labels) == num_points, (
            f"Label count ({len(labels)}) != point count ({num_points})"
        )
        return labels
    return np.zeros(num_points, dtype=np.int32)  # default: all pipe (class 0)


def process_one(raw_path, output_dir, estimate_normal=True):
    """Process a single point cloud file into .npy format."""
    stem = Path(raw_path).stem
    sample_dir = os.path.join(output_dir, stem)
    os.makedirs(sample_dir, exist_ok=True)

    data = load_point_cloud(raw_path)
    coord = data["coord"]
    num_points = len(coord)

    # Save coord
    np.save(os.path.join(sample_dir, "coord.npy"), coord)

    # Save color (if available)
    if "color" in data:
        np.save(os.path.join(sample_dir, "color.npy"), data["color"])

    # Save or estimate normals
    if "normal" in data:
        np.save(os.path.join(sample_dir, "normal.npy"), data["normal"])
    elif estimate_normal:
        normals = estimate_normals(coord)
        if normals is not None:
            np.save(os.path.join(sample_dir, "normal.npy"), normals)

    # Save strength/intensity
    if "strength" in data:
        np.save(os.path.join(sample_dir, "strength.npy"), data["strength"])

    # Save segment labels
    if "segment" in data:
        segment = data["segment"]
    else:
        # Look for separate label file
        label_path = os.path.splitext(raw_path)[0] + ".labels"
        segment = load_labels(label_path, num_points)
    np.save(os.path.join(sample_dir, "segment.npy"), segment)

    return num_points


def main():
    parser = argparse.ArgumentParser(
        description="Preprocess pipe weld point cloud data for Pointcept"
    )
    parser.add_argument(
        "--raw_root",
        type=str,
        required=True,
        help="Root directory of raw point cloud data",
    )
    parser.add_argument(
        "--output_root",
        type=str,
        required=True,
        help="Output root directory for processed data",
    )
    parser.add_argument(
        "--no_normal_estimation",
        action="store_true",
        help="Skip normal estimation for files without normals",
    )
    args = parser.parse_args()

    supported_exts = ("*.ply", "*.pcd", "*.xyz", "*.csv", "*.npy")

    for split in ["train", "val", "test"]:
        raw_split_dir = os.path.join(args.raw_root, split)
        if not os.path.exists(raw_split_dir):
            print(f"Skipping {split}: directory not found")
            continue

        output_split_dir = os.path.join(args.output_root, split)
        os.makedirs(output_split_dir, exist_ok=True)

        files = []
        for ext in supported_exts:
            files.extend(glob.glob(os.path.join(raw_split_dir, ext)))
        files.sort()

        print(f"\nProcessing {split}: {len(files)} files")
        for i, f in enumerate(files):
            n_pts = process_one(
                f,
                output_split_dir,
                estimate_normal=not args.no_normal_estimation,
            )
            print(f"  [{i + 1}/{len(files)}] {os.path.basename(f)} -> {n_pts} points")

    print("\nDone! Processed data saved to:", args.output_root)


if __name__ == "__main__":
    main()

지원 입력 포맷:

포맷

확장자

필요 라이브러리

비고

PLY

.ply

open3d

XYZ + RGB + Normal 자동 파싱

PCD

.pcd

open3d

Point Cloud Library 포맷

XYZ

.xyz

numpy

x y z [r g b] [nx ny nz] 공백 구분

CSV

.csv

numpy

x,y,z[,r,g,b][,nx,ny,nz][,intensity][,label] 헤더 포함

NPY

.npy

numpy

numpy 배열 (N, 3+)

라벨 파일:

  • CSV 내 label 열에 포함되거나
  • 별도 .labels 파일 (포인트당 한 줄, 정수 라벨)
  • 라벨 파일 없으면 모든 포인트가 0 (pipe)으로 기본 설정

Step 3: 훈련 실행

# 단일 GPU
python3 tools/train.py configs/pipe_weld/semseg-spunet-v1m1-0-base.py

# 멀티 GPU (예: 4장)
python3 tools/train.py configs/pipe_weld/semseg-spunet-v1m1-0-base.py \
    --num-gpus 4

configs/pipe_weld/semseg-spunet-v1m1-0-base.py

_base_ = ["../_base_/default_runtime.py"]

# misc custom setting
batch_size = 8  # total bs in all gpus (pipe weld samples are smaller than indoor scenes)
mix_prob = 0
empty_cache = False
enable_amp = True

# model settings
model = dict(
    type="DefaultSegmentor",
    backbone=dict(
        type="SpUNet-v1m1",
        in_channels=6,  # color(3) + normal(3)
        num_classes=7,
        channels=(32, 64, 128, 256, 256, 128, 96, 96),
        layers=(2, 3, 4, 6, 2, 2, 2, 2),
    ),
    criteria=[
        dict(
            type="CrossEntropyLoss",
            loss_weight=1.0,
            ignore_index=-1,
            # Class weights to handle imbalance (defects are rare)
            # pipe, weld_bead, porosity, crack, undercut, spatter, incomplete_fusion
            weight=[0.5, 1.0, 5.0, 8.0, 6.0, 4.0, 7.0],
        ),
    ],
)

# scheduler settings
epoch = 400
optimizer = dict(type="SGD", lr=0.05, momentum=0.9, weight_decay=0.0001, nesterov=True)
scheduler = dict(
    type="OneCycleLR",
    max_lr=optimizer["lr"],
    pct_start=0.05,
    anneal_strategy="cos",
    div_factor=10.0,
    final_div_factor=10000.0,
)

# dataset settings
dataset_type = "PipeWeldDataset"
data_root = "data/pipe_weld"

data = dict(
    num_classes=7,
    ignore_index=-1,
    names=[
        "pipe",
        "weld_bead",
        "porosity",
        "crack",
        "undercut",
        "spatter",
        "incomplete_fusion",
    ],
    train=dict(
        type=dataset_type,
        split="train",
        data_root=data_root,
        transform=[
            dict(type="CenterShift", apply_z=True),
            dict(
                type="RandomDropout",
                dropout_ratio=0.2,
                dropout_application_ratio=0.2,
            ),
            dict(
                type="RandomRotate",
                angle=[-1, 1],
                axis="z",
                center=[0, 0, 0],
                p=0.5,
            ),
            dict(
                type="RandomRotate",
                angle=[-1 / 64, 1 / 64],
                axis="x",
                p=0.5,
            ),
            dict(
                type="RandomRotate",
                angle=[-1 / 64, 1 / 64],
                axis="y",
                p=0.5,
            ),
            dict(type="RandomScale", scale=[0.9, 1.1]),
            dict(type="RandomFlip", p=0.5),
            dict(type="RandomJitter", sigma=0.005, clip=0.02),
            dict(
                type="ElasticDistortion",
                distortion_params=[[0.2, 0.4], [0.8, 1.6]],
            ),
            dict(type="ChromaticAutoContrast", p=0.2, blend_factor=None),
            dict(type="ChromaticTranslation", p=0.95, ratio=0.05),
            dict(type="ChromaticJitter", p=0.95, std=0.05),
            dict(
                type="GridSample",
                grid_size=0.5,  # 0.5mm voxel for weld inspection
                hash_type="fnv",
                mode="train",
                return_grid_coord=True,
            ),
            dict(type="SphereCrop", point_max=100000, mode="random"),
            dict(type="CenterShift", apply_z=False),
            dict(type="NormalizeColor"),
            dict(type="ShufflePoint"),
            dict(type="ToTensor"),
            dict(
                type="Collect",
                keys=("coord", "grid_coord", "segment"),
                feat_keys=("color", "normal"),
            ),
        ],
        test_mode=False,
    ),
    val=dict(
        type=dataset_type,
        split="val",
        data_root=data_root,
        transform=[
            dict(type="CenterShift", apply_z=True),
            dict(type="Copy", keys_dict={"segment": "origin_segment"}),
            dict(
                type="GridSample",
                grid_size=0.5,
                hash_type="fnv",
                mode="train",
                return_grid_coord=True,
                return_inverse=True,
            ),
            dict(type="CenterShift", apply_z=False),
            dict(type="NormalizeColor"),
            dict(type="ToTensor"),
            dict(
                type="Collect",
                keys=(
                    "coord",
                    "grid_coord",
                    "segment",
                    "origin_segment",
                    "inverse",
                ),
                feat_keys=("color", "normal"),
            ),
        ],
        test_mode=False,
    ),
    test=dict(
        type=dataset_type,
        split="val",
        data_root=data_root,
        transform=[
            dict(type="CenterShift", apply_z=True),
            dict(type="NormalizeColor"),
        ],
        test_mode=True,
        test_cfg=dict(
            voxelize=dict(
                type="GridSample",
                grid_size=0.5,
                hash_type="fnv",
                mode="test",
                return_grid_coord=True,
            ),
            crop=None,
            post_transform=[
                dict(type="CenterShift", apply_z=False),
                dict(type="ToTensor"),
                dict(
                    type="Collect",
                    keys=("coord", "grid_coord", "index"),
                    feat_keys=("color", "normal"),
                ),
            ],
            aug_transform=[
                [
                    dict(
                        type="RandomRotateTargetAngle",
                        angle=[0],
                        axis="z",
                        center=[0, 0, 0],
                        p=1,
                    )
                ],
                [
                    dict(
                        type="RandomRotateTargetAngle",
                        angle=[1 / 2],
                        axis="z",
                        center=[0, 0, 0],
                        p=1,
                    )
                ],
                [
                    dict(
                        type="RandomRotateTargetAngle",
                        angle=[1],
                        axis="z",
                        center=[0, 0, 0],
                        p=1,
                    )
                ],
                [
                    dict(
                        type="RandomRotateTargetAngle",
                        angle=[3 / 2],
                        axis="z",
                        center=[0, 0, 0],
                        p=1,
                    )
                ],
            ],
        ),
    ),
)

# hook
hooks = [
    dict(type="CheckpointLoader"),
    dict(type="ModelHook"),
    dict(type="IterationTimer", warmup_iter=2),
    dict(type="InformationWriter"),
    dict(type="SemSegEvaluator"),
    dict(type="CheckpointSaver", save_freq=None),
    dict(type="PreciseEvaluator", test_last=False),
]

Step 4: 테스트/추론

python3 tools/test.py configs/pipe_weld/semseg-spunet-v1m1-0-base.py \
    --options weight=exp/pipe_weld/model/model_best.pth

모델 설정

아키텍처: SpUNet-v1m1

Backbone: Sparse U-Net v1m1
├── in_channels: 6 (color 3 + normal 3)
├── num_classes: 7
├── channels: (32, 64, 128, 256, 256, 128, 96, 96)
└── layers:   (2,  3,  4,   6,   2,   2,  2,  2)

손실 함수

  • CrossEntropyLoss with class weights: [0.5, 1.0, 5.0, 8.0, 6.0, 4.0, 7.0]
  • 결함 클래스가 희소하므로 가중치를 높게 설정 (crack 최대 8.0)

옵티마이저 & 스케줄러

항목

설정

Optimizer

SGD (lr=0.05, momentum=0.9, weight_decay=1e-4, nesterov)

Scheduler

OneCycleLR (pct_start=0.05, cosine annealing)

Epoch

400

Batch Size

8

AMP

Enabled (float16)

데이터 증강 (Transform)

훈련 시 적용되는 변환

변환

파라미터

용도

CenterShift

| 좌표 중심 이동

RandomDropout

| 포인트 랜덤 제거

RandomRotate

| Z축 회전

RandomRotate

| X/Y축 미세 회전

RandomScale || scale=[0.9, 1.1]

랜덤 스케일링

RandomFlip

| 랜덤 반전

RandomJitter

| 포인트 노이즈

ElasticDistortion

[[0.2,0.4],[0.8,1.6]]

탄성 변형

ChromaticAutoContrast

| 색상 자동 대비

ChromaticTranslation

| 색상 이동

ChromaticJitter

| 색상 노이즈

GridSample

| 복셀화 (0.5 단위)

SphereCrop

| 최대 포인트 수 제한

NormalizeColor

-

색상 [0,1] 정규화

검증/테스트 시 변환

  • CenterShift + GridSample + NormalizeColor + ToTensor
  • 테스트 시 4방향 회전 TTA (Test-Time Augmentation) 적용

핵심 설계 포인트

grid_size 조정

  • 현재 설정: grid_size=0.5
  • 데이터 단위가 mm 이면 → grid_size=0.5 (0.5mm 복셀)
  • 데이터 단위가 m 이면 → grid_size=0.0005 (0.5mm 복셀)
  • 용접 결함은 수 mm 단위이므로 스캐너 해상도에 맞춰 조정 필요

Normal 벡터의 중요성

  • 용접 비드의 표면 형상(돌출, 함몰)이 결함 판별의 핵심 특징
  • Normal이 없는 데이터는 전처리 시 open3d로 자동 추정 (--no_normal_estimation 으로 비활성화 가능)

클래스 불균형 대응

  • 결함 포인트는 전체의 1-5% 수준
  • CrossEntropyLoss class weight로 1차 대응
  • 추가 대응: loop 파라미터로 데이터 반복, oversampling 고려

생성된 파일 목록

파일 경로

역할

#pointcept/datasets/pipe_weld.py

데이터셋 클래스 (PipeWeldDataset)

#pointcept/datasets/init.py

데이터셋 등록 (import 추가)

#pointcept/datasets/preprocessing/pipe_weld/preprocess_pipe_weld.py

원본→npy 전처리

#pointcept/datasets/preprocessing/pipe_weld/generate_sample_data.py

합성 샘플 생성

#configs/pipe_weld/semseg-spunet-v1m1-0-base.py

훈련 Config

공개 데이터셋 참고

데이터셋

URL

포맷

라이선스

Mendeley - 3D Butt Weld Scans

https://data.mendeley.com/datasets/ktcsh7sykv/1

TXT (X,Y,Z,I)

CC BY 4.0

Zenodo - SAW Repository

https://zenodo.org/records/15083865

HDF5/TXT

CC BY 4.0

Kaggle - AAU Sewer Point Clouds

https://www.kaggle.com/datasets/aalborguniversity/sewerpointclouds

Point Cloud

CC BY 4.0

Kaggle - OpenTrench3D

https://www.kaggle.com/datasets/hestogpony/opentrench3d

PLY

CC BY-NC 4.0

See also