# How to Build a Video Dataset Platform (Shofo Clone) with Claude Code
A step-by-step guide to building a B2B video dataset indexing and labeling service for AI labs — a "Common Crawl for videos."
---
## Step 1: Database Schema & Core Data Model
Design your schema around three core entities: the video index, dataset requests, and labeling jobs.
```sql
-- Videos raw index
CREATE TABLE videos (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
source_url TEXT NOT NULL UNIQUE,
platform TEXT NOT NULL, -- 'tiktok', 'youtube', 'instagram', 'web'
duration_seconds FLOAT,
resolution TEXT,
language TEXT,
captured_at TIMESTAMPTZ DEFAULT now(),
metadata JSONB DEFAULT '{}', -- raw platform metadata
embedding VECTOR(1536), -- CLIP embedding for semantic search
status TEXT DEFAULT 'raw' -- raw, sanitized, labeled, rejected
);
-- Semantic labels & annotations
CREATE TABLE video_labels (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
video_id UUID REFERENCES videos(id),
label_type TEXT NOT NULL, -- 'object', 'activity', 'scene', 'reasoning'
label_value TEXT NOT NULL,
confidence FLOAT,
bbox JSONB, -- bounding box for objects: {x, y, w, h, frame}
frame_range INT4RANGE, -- [start_frame, end_frame)
model_version TEXT,
created_at TIMESTAMPTZ DEFAULT now()
);
-- Customer dataset requests
CREATE TABLE dataset_requests (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
customer_id UUID NOT NULL,
query TEXT NOT NULL, -- natural language: "50k cooking videos with hand-object interaction"
parsed_filters JSONB, -- structured filters extracted from query
target_count INT NOT NULL,
annotation_types TEXT[] DEFAULT '{}', -- ['object', 'activity', 'reasoning']
status TEXT DEFAULT 'pending', -- pending, processing, delivered, failed
delivered_at TIMESTAMPTZ,
output_url TEXT, -- signed S3 URL to delivered dataset
created_at TIMESTAMPTZ DEFAULT now()
);
-- Processing jobs
CREATE TABLE processing_jobs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
video_id UUID REFERENCES videos(id),
job_type TEXT NOT NULL, -- 'sanitize', 'embed', 'label_objects', 'label_activities', 'reasoning'
status TEXT DEFAULT 'queued',
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
error TEXT,
worker_id TEXT
);
CREATE INDEX ON videos USING hnsw (embedding vector_cosine_ops);
CREATE INDEX ON video_labels (video_id, label_type);
CREATE INDEX ON videos (platform, status);
```
---
## Step 2: Video Crawling Infrastructure
Build a distributed crawler that continuously indexes short-form video platforms and the public web.
```python
# crawler/base_crawler.py
import asyncio
import httpx
from playwright.async_api import async_playwright
from typing import AsyncGenerator
class VideoCrawler:
"""Base crawler — subclass per platform."""
def __init__(self, db_pool, storage_client):
self.db = db_pool
self.storage = storage_client
async def crawl(self) -> AsyncGenerator[dict, None]:
raise NotImplementedError
async def download_and_store(self, url: str, video_id: str) -> str:
"""Download video, upload to S3, return storage key."""
async with httpx.AsyncClient(follow_redirects=True) as client:
r = await client.get(url)
key = f"raw/{video_id}.mp4"
await self.storage.put_object(key, r.content)
return key
# crawler/yt_dlp_crawler.py
import yt_dlp
import asyncio
from concurrent.futures import ThreadPoolExecutor
class YtDlpCrawler(VideoCrawler):
"""Crawls YouTube, TikTok, Instagram via yt-dlp."""
SOURCES = [
"https://www.tiktok.com/tag/cooking",
"https://www.youtube.com/results?search_query=cooking+tutorial",
# Add thousands more — trending hashtags, channels, playlists
]
def __init__(self, db_pool, storage_client):
super().__init__(db_pool, storage_client)
self.executor = ThreadPoolExecutor(max_workers=20)
def _yt_dlp_extract(self, url: str) -> list[dict]:
ydl_opts = {
'quiet': True,
'extract_flat': True,
'ignoreerrors': True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
return info.get('entries', [info]) if info else []
async def crawl(self):
loop = asyncio.get_event_loop()
for source in self.SOURCES:
entries = await loop.run_in_executor(
self.executor, self._yt_dlp_extract, source
)
for entry in entries:
if entry and entry.get('url'):
yield {
'source_url': entry['url'],
'platform': 'youtube',
'duration_seconds': entry.get('duration'),
'metadata': {k: entry.get(k) for k in ['title', 'uploader', 'view_count', 'like_count']},
}
# Kubernetes CronJob: run crawler fleet every 6 hours
# Deploy as: 50 concurrent crawler pods per platform
```
**Key decisions:**
- Use yt-dlp for platform-specific extraction (handles auth rotation, anti-bot)
- Store raw video in S3-compatible object storage (Cloudflare R2 is cheap)
- Use Postgres with pgvector for the searchable index
- Rate-limit per platform to avoid IP bans — rotate proxies via Bright Data or Oxylabs
---
## Step 3: Video Processing Pipeline
Build the ML pipeline: sanitize → embed → label → reason.
```python
# pipeline/sanitizer.py
import cv2
import numpy as np
from nudenet import NudeDetector
class VideoSanitizer:
"""Remove NSFW content, corrupted frames, duplicates."""
def __init__(self):
self.nude_detector = NudeDetector()
self.seen_hashes = set()
def perceptual_hash(self, frame: np.ndarray) -> str:
small = cv2.resize(frame, (8, 8))
gray = cv2.cvtColor(small, cv2.COLOR_BGR2GRAY)
avg = gray.mean()
return ''.join('1' if p > avg else '0' for p in gray.flatten())
def is_safe(self, video_path: str) -> tuple[bool, str]:
cap = cv2.VideoCapture(video_path)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Sample 1 frame per second
fps = cap.get(cv2.CAP_PROP_FPS) or 30
sample_frames = []
for i in range(0, frame_count, int(fps)):
cap.set(cv2.CAP_PROP_POS_FRAMES, i)
ret, frame = cap.read()
if ret:
sample_frames.append(frame)
cap.release()
# NSFW check
for frame in sample_frames[::5]: # check every 5th sampled frame
detections = self.nude_detector.detect_image(frame)
if any(d['score'] > 0.7 for d in detections if d['class'] in ['EXPOSED_BREAST', 'EXPOSED_GENITALIA']):
return False, 'nsfw'
return True, 'ok'
# pipeline/embedder.py
import torch
import clip
from PIL import Image
class VideoEmbedder:
"""Generate CLIP embeddings from video keyframes."""
def __init__(self):
self.model, self.preprocess = clip.load("ViT-L/14")
self.model.eval()
@torch.no_grad()
def embed_video(self, video_path: str) -> list[float]:
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS) or 30
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
embeddings = []
for i in range(0, frame_count, int(fps * 2)): # every 2s
cap.set(cv2.CAP_PROP_POS_FRAMES, i)
ret, frame = cap.read()
if ret:
img = self.preprocess(Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)))
emb = self.model.encode_image(img.unsqueeze(0)).squeeze().tolist()
embeddings.append(emb)
cap.release()
# Mean-pool keyframe embeddings → single video embedding
return list(np.mean(embeddings, axis=0)) if embeddings else [0.0] * 768
# pipeline/labeler.py
import anthropic
class VideoLabeler:
"""Object detection + activity recognition + reasoning annotations."""
def __init__(self):
self.client = anthropic.Anthropic()
# Also use specialized CV models for speed/cost
# YOLO for objects, VideoMAE for activities
def label_with_vision(self, frame_path: str, label_types: list[str]) -> dict:
with open(frame_path, 'rb') as f:
image_data = base64.b64encode(f.read()).decode()
prompt_parts = []
if 'object' in label_types:
prompt_parts.append("List all objects visible with bounding boxes (x%, y%, w%, h%).")
if 'activity' in label_types:
prompt_parts.append("Describe all human activities in detail.")
if 'reasoning' in label_types:
prompt_parts.append("Provide a step-by-step reasoning annotation of what is happening and why.")
response = self.client.messages.create(
model="claude-opus-4-7",
max_tokens=1024,
messages=[{
"role": "user",
"content": [
{"type": "image", "source": {"type": "base64", "media_type": "image/jpeg", "data": image_data}},
{"type": "text", "text": "\n".join(prompt_parts) + "\nRespond as JSON."}
]
}]
)
return json.loads(response.content[0].text)
```
**Architecture note:** Use YOLO v11 for fast object detection (real-time), reserve Claude for reasoning annotations (expensive but high quality). VideoMAE for temporal activity recognition. This hybrid approach balances cost and quality.
---
## Step 4: Natural Language Query Engine
Parse customer requests into structured filters and execute them against the index.
```python
# query/parser.py
import anthropic
import json
class DatasetQueryParser:
"""Turn NL requests into structured search filters."""
def __init__(self):
self.client = anthropic.Anthropic()
def parse(self, natural_language_query: str) -> dict:
response = self.client.messages.create(
model="claude-sonnet-4-6",
max_tokens=512,
system="""You are a dataset query parser. Convert natural language dataset requests into structured JSON filters.
Output format:
{
"activities": ["string"], // e.g. ["cooking", "chopping"]
"objects": ["string"], // e.g. ["pan", "knife", "hand"]
"object_interactions": ["string"], // e.g. ["hand holding pan"]
"scene": "string", // e.g. "kitchen"
"duration_min": int, // seconds
"duration_max": int,
"annotation_types": ["string"], // ["object", "activity", "reasoning"]
"count": int, // target videos
"quality": "high|medium|any"
}""",
messages=[{"role": "user", "content": natural_language_query}]
)
return json.loads(response.content[0].text)
def execute(self, filters: dict, db_pool) -> list[str]:
"""Execute filters against the vector index."""
# Build semantic search query from activities + objects
query_text = " ".join(filters.get('activities', []) + filters.get('objects', []))
# Use pgvector for semantic similarity + SQL for structured filters
sql = """
SELECT v.id, v.source_url,
(v.embedding <=> $1::vector) as distance
FROM videos v
JOIN video_labels vl ON v.id = vl.video_id
WHERE v.status = 'labeled'
AND ($2::text[] IS NULL OR vl.label_value = ANY($2))
AND ($3::float IS NULL OR v.duration_seconds >= $3)
AND ($4::float IS NULL OR v.duration_seconds <= $4)
ORDER BY distance
LIMIT $5;
"""
# Execute and return video IDs
...
```
---
## Step 5: Dataset Packaging & Delivery
Package filtered videos with their annotations into standardized formats.
```python
# delivery/packager.py
import boto3
import zipfile
import json
from pathlib import Path
class DatasetPackager:
"""Package videos + annotations → deliver to customer."""
SUPPORTED_FORMATS = ['webdataset', 'huggingface', 'raw_zip']
def __init__(self, s3_client, bucket: str):
self.s3 = s3_client
self.bucket = bucket
def package_webdataset(self, video_ids: list[str], request_id: str, db) -> str:
"""Package as WebDataset tar shards (PyTorch-native format)."""
import webdataset as wds
output_key = f"datasets/{request_id}/"
shard_size = 1000 # videos per shard
for shard_idx, chunk in enumerate(chunks(video_ids, shard_size)):
shard_path = f"/tmp/shard-{shard_idx:05d}.tar"
with wds.TarWriter(shard_path) as sink:
for video_id in chunk:
video = db.get_video(video_id)
labels = db.get_labels(video_id)
# Download video from storage
video_bytes = self.s3.get_object(Bucket=self.bucket, Key=video['storage_key'])['Body'].read()
sink.write({
"__key__": video_id,
"mp4": video_bytes,
"json": json.dumps({
"source_url": video['source_url'],
"duration": video['duration_seconds'],
"labels": labels,
"metadata": video['metadata']
}).encode()
})
# Upload shard to S3
self.s3.upload_file(shard_path, self.bucket, f"{output_key}shard-{shard_idx:05d}.tar")
# Generate signed URL (7-day expiry)
url = self.s3.generate_presigned_url(
'get_object',
Params={'Bucket': self.bucket, 'Key': output_key},
ExpiresIn=604800
)
return url
def generate_datacard(self, request_id: str, filters: dict, stats: dict) -> dict:
"""Generate HuggingFace-compatible datacard."""
return {
"dataset_info": {
"description": f"Custom video dataset: {filters}",
"license": "cc-by-4.0",
"splits": {"train": {"num_examples": stats['total_videos']}},
"features": {
"video": {"dtype": "video"},
"labels": {"dtype": "dict"},
}
}
}
```
---
## Step 6: API & Customer Portal
Build the B2B interface — REST API for programmatic access, simple web UI for dataset ordering.
```python
# api/main.py
from fastapi import FastAPI, BackgroundTasks, Depends, HTTPException
from pydantic import BaseModel
app = FastAPI(title="Shofo API", version="1.0.0")
class DatasetRequest(BaseModel):
query: str # "50k cooking videos with hand-object interactions"
format: str = "webdataset" # webdataset | huggingface | raw_zip
webhook_url: str | None = None
@app.post("/v1/datasets/request")
async def request_dataset(
req: DatasetRequest,
background_tasks: BackgroundTasks,
customer=Depends(authenticate_customer)
):
# Parse NL query → structured filters
parser = DatasetQueryParser()
filters = parser.parse(req.query)
# Create DB record
request_id = await db.create_dataset_request(
customer_id=customer.id,
query=req.query,
parsed_filters=filters,
target_count=filters.get('count', 10000)
)
# Queue processing job
background_tasks.add_task(process_dataset_request, request_id, filters, req.format)
return {"request_id": request_id, "status": "queued", "estimated_delivery_hours": 24}
@app.get("/v1/datasets/{request_id}")
async def get_dataset_status(request_id: str, customer=Depends(authenticate_customer)):
record = await db.get_dataset_request(request_id)
return {
"status": record['status'],
"progress": record.get('progress_pct', 0),
"download_url": record.get('output_url'),
"delivered_at": record.get('delivered_at')
}
@app.get("/v1/search")
async def search_videos(
q: str,
limit: int = 100,
customer=Depends(authenticate_customer)
):
"""Semantic search over the video index."""
embedder = VideoEmbedder()
# Embed query text with CLIP text encoder
query_embedding = embedder.embed_text(q)
results = await db.vector_search(query_embedding, limit=limit)
return {"results": results, "total": len(results)}
```
**Customer portal:** Next.js app with three pages — dashboard (active requests), new request form (NL query box), and dataset library. Keep it minimal; your customers are AI research leads who prefer APIs over dashboards.
---
## Step 7: Deployment & Scaling
Deploy on Kubernetes with separate fleets for crawling, processing, and serving.
```yaml
# k8s/crawler-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: video-crawler-fleet
spec:
replicas: 50 # Scale by platform volume
template:
spec:
containers:
- name: crawler
image: shofo/crawler:latest
resources:
requests: { cpu: "500m", memory: "1Gi" }
limits: { cpu: "2", memory: "4Gi" }
env:
- name: PROXY_URL
valueFrom:
secretKeyRef: { name: proxy-creds, key: url }
# k8s/gpu-labeler-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: gpu-labeler-fleet
spec:
replicas: 10 # GPU nodes are expensive — right-size carefully
template:
spec:
nodeSelector:
accelerator: nvidia-a100
containers:
- name: labeler
image: shofo/labeler:latest
resources:
limits:
nvidia.com/gpu: "1"
```
**Cost model:**
- Crawling: ~$0.001 per video collected (proxy + compute)
- Storage: ~$0.01 per GB/month on R2 (free egress)
- YOLO labeling: ~$0.0001 per video (GPU inference)
- Claude reasoning: ~$0.002 per video (most expensive label type)
- Delivery: Customers pay $X per 1k videos delivered (margin ~60-70%)
**Go-to-market:** Publish a free small dataset on HuggingFace (like Shofo did with `shofo-tiktok-general-small`) to establish credibility. Email every AI lab that has published a paper mentioning "video training data shortage."
