A coding guide for building scalable, end-to-end machine learning data pipelines for high-performance structured and image data processing using Daft.

Machine Learning


In this tutorial, we’ll explore how to use it. frivolous As a high-performance Python native data engine for building end-to-end analytic pipelines. We first load the real-world MNIST dataset and then transform it step by step using UDFs, feature engineering, aggregation, joins, and lazy execution. It also shows how structured data processing, numerical computation, and machine learning can be seamlessly combined. Ultimately, you’ll go beyond manipulating data to building complete model-aware pipelines that leverage Daft’s scalable execution engine.

!pip -q install daft pyarrow pandas numpy scikit-learn


import os
os.environ["DO_NOT_TRACK"] = "true"


import numpy as np
import pandas as pd
import daft
from daft import col


print("Daft version:", getattr(daft, "__version__", "unknown"))


URL = "https://github.com/Eventual-Inc/mnist-json/raw/master/mnist_handwritten_test.json.gz"


df = daft.read_json(URL)
print("\nSchema (sampled):")
print(df.schema())


print("\nPeek:")
df.show(5)

Install Daft and its support libraries directly into Google Colab to ensure a clean and reproducible environment. Configure optional settings and verify the installed version to ensure everything is working correctly. This establishes a stable foundation for building end-to-end data pipelines.

def to_28x28(pixels):
   arr = np.array(pixels, dtype=np.float32)
   if arr.size != 784:
       return None
   return arr.reshape(28, 28)


df2 = (
   df
   .with_column(
       "img_28x28",
       col("image").apply(to_28x28, return_dtype=daft.DataType.python())
   )
   .with_column(
       "pixel_mean",
       col("img_28x28").apply(lambda x: float(np.mean(x)) if x is not None else None,
                              return_dtype=daft.DataType.float32())
   )
   .with_column(
       "pixel_std",
       col("img_28x28").apply(lambda x: float(np.std(x)) if x is not None else None,
                              return_dtype=daft.DataType.float32())
   )
)


print("\nAfter reshaping + simple features:")
df2.select("label", "pixel_mean", "pixel_std").show(5)

Load the actual MNIST JSON dataset directly from a remote URL using Daft’s native reader. Inspect your schema and preview your data to understand its structure and column types. This allows you to validate your dataset before applying transformations or feature engineering.

@daft.udf(return_dtype=daft.DataType.list(daft.DataType.float32()), batch_size=512)
def featurize(images_28x28):
   out = []
   for img in images_28x28.to_pylist():
       if img is None:
           out.append(None)
           continue
       img = np.asarray(img, dtype=np.float32)
       row_sums = img.sum(axis=1) / 255.0
       col_sums = img.sum(axis=0) / 255.0
       total = img.sum() + 1e-6
       ys, xs = np.indices(img.shape)
       cy = float((ys * img).sum() / total) / 28.0
       cx = float((xs * img).sum() / total) / 28.0
       vec = np.concatenate([row_sums, col_sums, np.array([cy, cx, img.mean()/255.0, img.std()/255.0], dtype=np.float32)])
       out.append(vec.astype(np.float32).tolist())
   return out


df3 = df2.with_column("features", featurize(col("img_28x28")))


print("\nFeature column created (list[float]):")
df3.select("label", "features").show(2)

Reshape the raw pixel array into a structured 28 × 28 image using a row-wise UDF. Enrich your dataset by calculating statistical features such as mean and standard deviation. Applying these transformations transforms raw image data into a representation suitable for structured models.

label_stats = (
   df3.groupby("label")
      .agg(
          col("label").count().alias("n"),
          col("pixel_mean").mean().alias("mean_pixel_mean"),
          col("pixel_std").mean().alias("mean_pixel_std"),
      )
      .sort("label")
)


print("\nLabel distribution + summary stats:")
label_stats.show(10)


df4 = df3.join(label_stats, on="label", how="left")


print("\nJoined label stats back onto each row:")
df4.select("label", "n", "mean_pixel_mean", "mean_pixel_std").show(5)

Implement a batch UDF to extract richer feature vectors from the reshaped images. Perform group-by-group aggregations and combine summary statistics into datasets to enhance context. It shows how scalable computation and advanced analytics can be combined within Daft.

small = df4.select("label", "features").collect().to_pandas()


small = small.dropna(subset=["label", "features"]).reset_index(drop=True)


X = np.vstack(small["features"].apply(np.array).values).astype(np.float32)
y = small["label"].astype(int).values


from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report


clf = LogisticRegression(max_iter=1000, n_jobs=None)
clf.fit(X_train, y_train)


pred = clf.predict(X_test)
acc = accuracy_score(y_test, pred)


print("\nBaseline accuracy (feature-engineered LogisticRegression):", round(acc, 4))
print("\nClassification report:")
print(classification_report(y_test, pred, digits=4))


out_df = df4.select("label", "features", "pixel_mean", "pixel_std", "n")
out_path = "/content/daft_mnist_features.parquet"
out_df.write_parquet(out_path)


print("\nWrote parquet to:", out_path)


df_back = daft.read_parquet(out_path)
print("\nRead-back check:")
df_back.show(3)

Materialize the selected columns into pandas and train a baseline logistic regression model. We evaluate performance to verify the usefulness of engineered features. It also persists processed datasets to Parquet format, completing an end-to-end pipeline from raw data ingestion to production-ready storage.

In this tutorial, you used Daft to build a production-style data workflow, moving from raw JSON ingestion to feature engineering, aggregation, model training, and Parquet persistence. We demonstrated how to integrate advanced UDF logic, perform efficient group-by and join operations, and materialize downstream machine learning results within a clean and scalable framework. Through this process, we learned how Daft can handle complex transformations while being Pythonic and efficient. We now have a reusable end-to-end pipeline that shows how modern data engineering and machine learning workflows can be combined in an integrated environment.


Please check See the complete code here. Please feel free to follow us too Twitter Don’t forget to join us 120,000+ ML subreddits and subscribe our newsletter. hang on! Are you on telegram? You can now also participate by telegram.


Michal Sutter is a data science expert with a master’s degree in data science from the University of Padova. With a strong foundation in statistical analysis, machine learning, and data engineering, Michal excels at transforming complex datasets into actionable insights.



Source link