Claude’s Corner: Shofo — Common Crawl for Video, Sold to AI Labs

AI Video & Visuals


# How to Build a Video Dataset Platform (Shofo Clone) with Claude Code

A step-by-step guide to building a B2B video dataset indexing and labeling service for AI labs — a "Common Crawl for videos."

---

## Step 1: Database Schema & Core Data Model

Design your schema around three core entities: the video index, dataset requests, and labeling jobs.

```sql
-- Videos raw index
CREATE TABLE videos (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  source_url TEXT NOT NULL UNIQUE,
  platform TEXT NOT NULL, -- 'tiktok', 'youtube', 'instagram', 'web'
  duration_seconds FLOAT,
  resolution TEXT,
  language TEXT,
  captured_at TIMESTAMPTZ DEFAULT now(),
  metadata JSONB DEFAULT '{}', -- raw platform metadata
  embedding VECTOR(1536), -- CLIP embedding for semantic search
  status TEXT DEFAULT 'raw' -- raw, sanitized, labeled, rejected
);

-- Semantic labels & annotations
CREATE TABLE video_labels (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  video_id UUID REFERENCES videos(id),
  label_type TEXT NOT NULL, -- 'object', 'activity', 'scene', 'reasoning'
  label_value TEXT NOT NULL,
  confidence FLOAT,
  bbox JSONB, -- bounding box for objects: {x, y, w, h, frame}
  frame_range INT4RANGE, -- [start_frame, end_frame)
  model_version TEXT,
  created_at TIMESTAMPTZ DEFAULT now()
);

-- Customer dataset requests
CREATE TABLE dataset_requests (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  customer_id UUID NOT NULL,
  query TEXT NOT NULL, -- natural language: "50k cooking videos with hand-object interaction"
  parsed_filters JSONB, -- structured filters extracted from query
  target_count INT NOT NULL,
  annotation_types TEXT[] DEFAULT '{}', -- ['object', 'activity', 'reasoning']
  status TEXT DEFAULT 'pending', -- pending, processing, delivered, failed
  delivered_at TIMESTAMPTZ,
  output_url TEXT, -- signed S3 URL to delivered dataset
  created_at TIMESTAMPTZ DEFAULT now()
);

-- Processing jobs
CREATE TABLE processing_jobs (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  video_id UUID REFERENCES videos(id),
  job_type TEXT NOT NULL, -- 'sanitize', 'embed', 'label_objects', 'label_activities', 'reasoning'
  status TEXT DEFAULT 'queued',
  started_at TIMESTAMPTZ,
  completed_at TIMESTAMPTZ,
  error TEXT,
  worker_id TEXT
);

CREATE INDEX ON videos USING hnsw (embedding vector_cosine_ops);
CREATE INDEX ON video_labels (video_id, label_type);
CREATE INDEX ON videos (platform, status);
```

---

## Step 2: Video Crawling Infrastructure

Build a distributed crawler that continuously indexes short-form video platforms and the public web.

```python
# crawler/base_crawler.py
import asyncio
import httpx
from playwright.async_api import async_playwright
from typing import AsyncGenerator

class VideoCrawler:
    """Base crawler — subclass per platform."""
    
    def __init__(self, db_pool, storage_client):
        self.db = db_pool
        self.storage = storage_client
    
    async def crawl(self) -> AsyncGenerator[dict, None]:
        raise NotImplementedError
    
    async def download_and_store(self, url: str, video_id: str) -> str:
        """Download video, upload to S3, return storage key."""
        async with httpx.AsyncClient(follow_redirects=True) as client:
            r = await client.get(url)
            key = f"raw/{video_id}.mp4"
            await self.storage.put_object(key, r.content)
            return key

# crawler/yt_dlp_crawler.py
import yt_dlp
import asyncio
from concurrent.futures import ThreadPoolExecutor

class YtDlpCrawler(VideoCrawler):
    """Crawls YouTube, TikTok, Instagram via yt-dlp."""
    
    SOURCES = [
        "https://www.tiktok.com/tag/cooking",
        "https://www.youtube.com/results?search_query=cooking+tutorial",
        # Add thousands more — trending hashtags, channels, playlists
    ]
    
    def __init__(self, db_pool, storage_client):
        super().__init__(db_pool, storage_client)
        self.executor = ThreadPoolExecutor(max_workers=20)
    
    def _yt_dlp_extract(self, url: str) -> list[dict]:
        ydl_opts = {
            'quiet': True,
            'extract_flat': True,
            'ignoreerrors': True,
        }
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            info = ydl.extract_info(url, download=False)
            return info.get('entries', [info]) if info else []
    
    async def crawl(self):
        loop = asyncio.get_event_loop()
        for source in self.SOURCES:
            entries = await loop.run_in_executor(
                self.executor, self._yt_dlp_extract, source
            )
            for entry in entries:
                if entry and entry.get('url'):
                    yield {
                        'source_url': entry['url'],
                        'platform': 'youtube',
                        'duration_seconds': entry.get('duration'),
                        'metadata': {k: entry.get(k) for k in ['title', 'uploader', 'view_count', 'like_count']},
                    }

# Kubernetes CronJob: run crawler fleet every 6 hours
# Deploy as: 50 concurrent crawler pods per platform
```

**Key decisions:**
- Use yt-dlp for platform-specific extraction (handles auth rotation, anti-bot)
- Store raw video in S3-compatible object storage (Cloudflare R2 is cheap)
- Use Postgres with pgvector for the searchable index
- Rate-limit per platform to avoid IP bans — rotate proxies via Bright Data or Oxylabs

---

## Step 3: Video Processing Pipeline

Build the ML pipeline: sanitize → embed → label → reason.

```python
# pipeline/sanitizer.py
import cv2
import numpy as np
from nudenet import NudeDetector

class VideoSanitizer:
    """Remove NSFW content, corrupted frames, duplicates."""
    
    def __init__(self):
        self.nude_detector = NudeDetector()
        self.seen_hashes = set()
    
    def perceptual_hash(self, frame: np.ndarray) -> str:
        small = cv2.resize(frame, (8, 8))
        gray = cv2.cvtColor(small, cv2.COLOR_BGR2GRAY)
        avg = gray.mean()
        return ''.join('1' if p > avg else '0' for p in gray.flatten())
    
    def is_safe(self, video_path: str) -> tuple[bool, str]:
        cap = cv2.VideoCapture(video_path)
        frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        
        # Sample 1 frame per second
        fps = cap.get(cv2.CAP_PROP_FPS) or 30
        sample_frames = []
        
        for i in range(0, frame_count, int(fps)):
            cap.set(cv2.CAP_PROP_POS_FRAMES, i)
            ret, frame = cap.read()
            if ret:
                sample_frames.append(frame)
        
        cap.release()
        
        # NSFW check
        for frame in sample_frames[::5]:  # check every 5th sampled frame
            detections = self.nude_detector.detect_image(frame)
            if any(d['score'] > 0.7 for d in detections if d['class'] in ['EXPOSED_BREAST', 'EXPOSED_GENITALIA']):
                return False, 'nsfw'
        
        return True, 'ok'

# pipeline/embedder.py
import torch
import clip
from PIL import Image

class VideoEmbedder:
    """Generate CLIP embeddings from video keyframes."""
    
    def __init__(self):
        self.model, self.preprocess = clip.load("ViT-L/14")
        self.model.eval()
    
    @torch.no_grad()
    def embed_video(self, video_path: str) -> list[float]:
        cap = cv2.VideoCapture(video_path)
        fps = cap.get(cv2.CAP_PROP_FPS) or 30
        frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        
        embeddings = []
        for i in range(0, frame_count, int(fps * 2)):  # every 2s
            cap.set(cv2.CAP_PROP_POS_FRAMES, i)
            ret, frame = cap.read()
            if ret:
                img = self.preprocess(Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)))
                emb = self.model.encode_image(img.unsqueeze(0)).squeeze().tolist()
                embeddings.append(emb)
        
        cap.release()
        # Mean-pool keyframe embeddings → single video embedding
        return list(np.mean(embeddings, axis=0)) if embeddings else [0.0] * 768

# pipeline/labeler.py
import anthropic

class VideoLabeler:
    """Object detection + activity recognition + reasoning annotations."""
    
    def __init__(self):
        self.client = anthropic.Anthropic()
        # Also use specialized CV models for speed/cost
        # YOLO for objects, VideoMAE for activities
    
    def label_with_vision(self, frame_path: str, label_types: list[str]) -> dict:
        with open(frame_path, 'rb') as f:
            image_data = base64.b64encode(f.read()).decode()
        
        prompt_parts = []
        if 'object' in label_types:
            prompt_parts.append("List all objects visible with bounding boxes (x%, y%, w%, h%).")
        if 'activity' in label_types:
            prompt_parts.append("Describe all human activities in detail.")
        if 'reasoning' in label_types:
            prompt_parts.append("Provide a step-by-step reasoning annotation of what is happening and why.")
        
        response = self.client.messages.create(
            model="claude-opus-4-7",
            max_tokens=1024,
            messages=[{
                "role": "user",
                "content": [
                    {"type": "image", "source": {"type": "base64", "media_type": "image/jpeg", "data": image_data}},
                    {"type": "text", "text": "\n".join(prompt_parts) + "\nRespond as JSON."}
                ]
            }]
        )
        return json.loads(response.content[0].text)
```

**Architecture note:** Use YOLO v11 for fast object detection (real-time), reserve Claude for reasoning annotations (expensive but high quality). VideoMAE for temporal activity recognition. This hybrid approach balances cost and quality.

---

## Step 4: Natural Language Query Engine

Parse customer requests into structured filters and execute them against the index.

```python
# query/parser.py
import anthropic
import json

class DatasetQueryParser:
    """Turn NL requests into structured search filters."""
    
    def __init__(self):
        self.client = anthropic.Anthropic()
    
    def parse(self, natural_language_query: str) -> dict:
        response = self.client.messages.create(
            model="claude-sonnet-4-6",
            max_tokens=512,
            system="""You are a dataset query parser. Convert natural language dataset requests into structured JSON filters.
Output format:
{
  "activities": ["string"],       // e.g. ["cooking", "chopping"]  
  "objects": ["string"],          // e.g. ["pan", "knife", "hand"]
  "object_interactions": ["string"], // e.g. ["hand holding pan"]
  "scene": "string",              // e.g. "kitchen"
  "duration_min": int,            // seconds
  "duration_max": int,
  "annotation_types": ["string"], // ["object", "activity", "reasoning"]
  "count": int,                   // target videos
  "quality": "high|medium|any"
}""",
            messages=[{"role": "user", "content": natural_language_query}]
        )
        return json.loads(response.content[0].text)
    
    def execute(self, filters: dict, db_pool) -> list[str]:
        """Execute filters against the vector index."""
        # Build semantic search query from activities + objects
        query_text = " ".join(filters.get('activities', []) + filters.get('objects', []))
        
        # Use pgvector for semantic similarity + SQL for structured filters
        sql = """
        SELECT v.id, v.source_url,
               (v.embedding <=> $1::vector) as distance
        FROM videos v
        JOIN video_labels vl ON v.id = vl.video_id
        WHERE v.status = 'labeled'
          AND ($2::text[] IS NULL OR vl.label_value = ANY($2))
          AND ($3::float IS NULL OR v.duration_seconds >= $3)
          AND ($4::float IS NULL OR v.duration_seconds <= $4)
        ORDER BY distance
        LIMIT $5;
        """
        # Execute and return video IDs
        ...
```

---

## Step 5: Dataset Packaging & Delivery

Package filtered videos with their annotations into standardized formats.

```python
# delivery/packager.py
import boto3
import zipfile
import json
from pathlib import Path

class DatasetPackager:
    """Package videos + annotations → deliver to customer."""
    
    SUPPORTED_FORMATS = ['webdataset', 'huggingface', 'raw_zip']
    
    def __init__(self, s3_client, bucket: str):
        self.s3 = s3_client
        self.bucket = bucket
    
    def package_webdataset(self, video_ids: list[str], request_id: str, db) -> str:
        """Package as WebDataset tar shards (PyTorch-native format)."""
        import webdataset as wds
        
        output_key = f"datasets/{request_id}/"
        shard_size = 1000  # videos per shard
        
        for shard_idx, chunk in enumerate(chunks(video_ids, shard_size)):
            shard_path = f"/tmp/shard-{shard_idx:05d}.tar"
            
            with wds.TarWriter(shard_path) as sink:
                for video_id in chunk:
                    video = db.get_video(video_id)
                    labels = db.get_labels(video_id)
                    
                    # Download video from storage
                    video_bytes = self.s3.get_object(Bucket=self.bucket, Key=video['storage_key'])['Body'].read()
                    
                    sink.write({
                        "__key__": video_id,
                        "mp4": video_bytes,
                        "json": json.dumps({
                            "source_url": video['source_url'],
                            "duration": video['duration_seconds'],
                            "labels": labels,
                            "metadata": video['metadata']
                        }).encode()
                    })
            
            # Upload shard to S3
            self.s3.upload_file(shard_path, self.bucket, f"{output_key}shard-{shard_idx:05d}.tar")
        
        # Generate signed URL (7-day expiry)
        url = self.s3.generate_presigned_url(
            'get_object',
            Params={'Bucket': self.bucket, 'Key': output_key},
            ExpiresIn=604800
        )
        return url
    
    def generate_datacard(self, request_id: str, filters: dict, stats: dict) -> dict:
        """Generate HuggingFace-compatible datacard."""
        return {
            "dataset_info": {
                "description": f"Custom video dataset: {filters}",
                "license": "cc-by-4.0",
                "splits": {"train": {"num_examples": stats['total_videos']}},
                "features": {
                    "video": {"dtype": "video"},
                    "labels": {"dtype": "dict"},
                }
            }
        }
```

---

## Step 6: API & Customer Portal

Build the B2B interface — REST API for programmatic access, simple web UI for dataset ordering.

```python
# api/main.py
from fastapi import FastAPI, BackgroundTasks, Depends, HTTPException
from pydantic import BaseModel

app = FastAPI(title="Shofo API", version="1.0.0")

class DatasetRequest(BaseModel):
    query: str           # "50k cooking videos with hand-object interactions"
    format: str = "webdataset"  # webdataset | huggingface | raw_zip
    webhook_url: str | None = None

@app.post("/v1/datasets/request")
async def request_dataset(
    req: DatasetRequest,
    background_tasks: BackgroundTasks,
    customer=Depends(authenticate_customer)
):
    # Parse NL query → structured filters
    parser = DatasetQueryParser()
    filters = parser.parse(req.query)
    
    # Create DB record
    request_id = await db.create_dataset_request(
        customer_id=customer.id,
        query=req.query,
        parsed_filters=filters,
        target_count=filters.get('count', 10000)
    )
    
    # Queue processing job
    background_tasks.add_task(process_dataset_request, request_id, filters, req.format)
    
    return {"request_id": request_id, "status": "queued", "estimated_delivery_hours": 24}

@app.get("/v1/datasets/{request_id}")
async def get_dataset_status(request_id: str, customer=Depends(authenticate_customer)):
    record = await db.get_dataset_request(request_id)
    return {
        "status": record['status'],
        "progress": record.get('progress_pct', 0),
        "download_url": record.get('output_url'),
        "delivered_at": record.get('delivered_at')
    }

@app.get("/v1/search")
async def search_videos(
    q: str,
    limit: int = 100,
    customer=Depends(authenticate_customer)
):
    """Semantic search over the video index."""
    embedder = VideoEmbedder()
    # Embed query text with CLIP text encoder
    query_embedding = embedder.embed_text(q)
    results = await db.vector_search(query_embedding, limit=limit)
    return {"results": results, "total": len(results)}
```

**Customer portal:** Next.js app with three pages — dashboard (active requests), new request form (NL query box), and dataset library. Keep it minimal; your customers are AI research leads who prefer APIs over dashboards.

---

## Step 7: Deployment & Scaling

Deploy on Kubernetes with separate fleets for crawling, processing, and serving.

```yaml
# k8s/crawler-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: video-crawler-fleet
spec:
  replicas: 50  # Scale by platform volume
  template:
    spec:
      containers:
      - name: crawler
        image: shofo/crawler:latest
        resources:
          requests: { cpu: "500m", memory: "1Gi" }
          limits: { cpu: "2", memory: "4Gi" }
        env:
        - name: PROXY_URL
          valueFrom:
            secretKeyRef: { name: proxy-creds, key: url }

# k8s/gpu-labeler-deployment.yaml  
apiVersion: apps/v1
kind: Deployment
metadata:
  name: gpu-labeler-fleet
spec:
  replicas: 10  # GPU nodes are expensive — right-size carefully
  template:
    spec:
      nodeSelector:
        accelerator: nvidia-a100
      containers:
      - name: labeler
        image: shofo/labeler:latest
        resources:
          limits:
            nvidia.com/gpu: "1"
```

**Cost model:**
- Crawling: ~$0.001 per video collected (proxy + compute)
- Storage: ~$0.01 per GB/month on R2 (free egress)
- YOLO labeling: ~$0.0001 per video (GPU inference)
- Claude reasoning: ~$0.002 per video (most expensive label type)
- Delivery: Customers pay $X per 1k videos delivered (margin ~60-70%)

**Go-to-market:** Publish a free small dataset on HuggingFace (like Shofo did with `shofo-tiktok-general-small`) to establish credibility. Email every AI lab that has published a paper mentioning "video training data shortage."





Source link