In this tutorial, you will design an end-to-end production-style analysis and modeling pipeline using: vex Efficiently manipulate millions of rows without materializing the data in memory. We generate realistic large-scale datasets, design rich behavioral and city-level features using delay formulas and approximate statistics, and aggregate insights at scale. We then integrate Vaex with scikit-learn to train and evaluate predictive models, demonstrating how Vaex can serve as the backbone of high-performance exploratory analysis and machine learning workflows.
!pip -q install "vaex==4.19.0" "vaex-core==4.19.0" "vaex-ml==0.19.0" "vaex-viz==0.6.0" "vaex-hdf5==0.15.0" "pyarrow>=14" "scikit-learn>=1.3"
import os, time, json, numpy as np, pandas as pd
import vaex
import vaex.ml
from vaex.ml.sklearn import Predictor
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score, average_precision_score
print("Python:", __import__("sys").version.split()[0])
print("vaex:", vaex.__version__)
print("numpy:", np.__version__)
print("pandas:", pd.__version__)
rng = np.random.default_rng(7)
n = 2_000_000
cities = np.array(["Montreal","Toronto","Vancouver","Calgary","Ottawa","Edmonton","Quebec City","Winnipeg"], dtype=object)
city = rng.choice(cities, size=n, replace=True, p=np.array([0.16,0.18,0.12,0.10,0.10,0.10,0.10,0.14]))
age = rng.integers(18, 75, size=n, endpoint=False).astype("int32")
tenure_m = rng.integers(0, 180, size=n, endpoint=False).astype("int32")
tx = rng.poisson(lam=22, size=n).astype("int32")
base_income = rng.lognormal(mean=10.6, sigma=0.45, size=n).astype("float64")
city_mult = pd.Series({"Montreal":0.92,"Toronto":1.05,"Vancouver":1.10,"Calgary":1.02,"Ottawa":1.00,"Edmonton":0.98,"Quebec City":0.88,"Winnipeg":0.90}).reindex(city).to_numpy()
income = (base_income * city_mult * (1.0 + 0.004*(age-35)) * (1.0 + 0.0025*np.minimum(tenure_m,120))).astype("float64")
income = np.clip(income, 18_000, 420_000)
noise = rng.normal(0, 1, size=n).astype("float64")
score_latent = (
0.55*np.log1p(income/1000.0)
+ 0.28*np.log1p(tx)
+ 0.18*np.sqrt(np.maximum(tenure_m,0)/12.0 + 1e-9)
- 0.012*(age-40)
+ 0.22*(city == "Vancouver").astype("float64")
+ 0.15*(city == "Toronto").astype("float64")
+ 0.10*(city == "Ottawa").astype("float64")
+ 0.65*noise
)
p = 1.0/(1.0 + np.exp(-(score_latent - np.quantile(score_latent, 0.70))))
target = (rng.random(n) < p).astype("int8")
df = vaex.from_arrays(city=city, age=age, tenure_m=tenure_m, tx=tx, income=income, target=target)
df["income_k"] = df.income / 1000.0
df["tenure_y"] = df.tenure_m / 12.0
df["log_income"] = df.income.log1p()
df["tx_per_year"] = df.tx / (df.tenure_y + 0.25)
df["value_score"] = (0.35*df.log_income + 0.20*df.tx_per_year + 0.10*df.tenure_y - 0.015*df.age)
df["is_new"] = df.tenure_m < 6
df["is_senior"] = df.age >= 60
print("\nRows:", len(df), "Columns:", len(df.get_column_names()))
print(df[["city","age","tenure_m","tx","income","income_k","value_score","target"]].head(5))
Generate a large, realistic synthetic dataset and initialize a Vaex DataFrame to run lazily with millions of rows. Since the core numerical features are directly designed as expressions, no intermediate data is materialized. Validate your setup by inspecting a small sample of the schema, row counts, and calculated values.
encoder = vaex.ml.LabelEncoder(features=["city"])
df = encoder.fit_transform(df)
city_map = encoder.labels_["city"]
inv_city_map = {v:k for k,v in city_map.items()}
n_cities = len(city_map)
p95_income_k_by_city = df.percentile_approx("income_k", 95, binby="label_encoded_city", shape=n_cities, limits=[-0.5, n_cities-0.5])
p50_value_by_city = df.percentile_approx("value_score", 50, binby="label_encoded_city", shape=n_cities, limits=[-0.5, n_cities-0.5])
avg_income_k_by_city = df.mean("income_k", binby="label_encoded_city", shape=n_cities, limits=[-0.5, n_cities-0.5])
target_rate_by_city = df.mean("target", binby="label_encoded_city", shape=n_cities, limits=[-0.5, n_cities-0.5])
n_by_city = df.count(binby="label_encoded_city", shape=n_cities, limits=[-0.5, n_cities-0.5])
p95_income_k_by_city = np.asarray(p95_income_k_by_city).reshape(-1)
p50_value_by_city = np.asarray(p50_value_by_city).reshape(-1)
avg_income_k_by_city = np.asarray(avg_income_k_by_city).reshape(-1)
target_rate_by_city = np.asarray(target_rate_by_city).reshape(-1)
n_by_city = np.asarray(n_by_city).reshape(-1)
city_table = pd.DataFrame({
"city_id": np.arange(n_cities),
"city": [inv_city_map[i] for i in range(n_cities)],
"n": n_by_city.astype("int64"),
"avg_income_k": avg_income_k_by_city,
"p95_income_k": p95_income_k_by_city,
"median_value_score": p50_value_by_city,
"target_rate": target_rate_by_city
}).sort_values(["target_rate","p95_income_k"], ascending=False)
print("\nCity summary:")
print(city_table.to_string(index=False))
df_city_features = vaex.from_pandas(city_table[["city","p95_income_k","avg_income_k","median_value_score","target_rate"]], copy_index=False)
df = df.join(df_city_features, on="city", rsuffix="_city")
df["income_vs_city_p95"] = df.income_k / (df.p95_income_k + 1e-9)
df["value_vs_city_median"] = df.value_score - df.median_value_score
Encode categorical city data and use binning-based operations to compute scalable approximate per-city statistics. Combine these aggregates into city-level tables and join them back to the main dataset. We then derive relative characteristics that compare each record to the urban context.
features_num = [
"age","tenure_y","tx","income_k","log_income","tx_per_year","value_score",
"p95_income_k","avg_income_k","median_value_score","target_rate",
"income_vs_city_p95","value_vs_city_median"
]
scaler = vaex.ml.StandardScaler(features=features_num, with_mean=True, with_std=True, prefix="z_")
df = scaler.fit_transform(df)
features = ["z_"+f for f in features_num] + ["label_encoded_city"]
df_train, df_test = df.split_random([0.80, 0.20], random_state=42)
model = LogisticRegression(max_iter=250, solver="lbfgs", n_jobs=None)
vaex_model = Predictor(model=model, features=features, target="target", prediction_name="pred")
t0 = time.time()
vaex_model.fit(df=df_train)
fit_s = time.time() - t0
df_test = vaex_model.transform(df_test)
y_true = df_test["target"].to_numpy()
y_pred = df_test["pred"].to_numpy()
auc = roc_auc_score(y_true, y_pred)
ap = average_precision_score(y_true, y_pred)
print("\nModel:")
print("fit_seconds:", round(fit_s, 3))
print("test_auc:", round(float(auc), 4))
print("test_avg_precision:", round(float(ap), 4))
Standardize all numerical features using Vaex’s ML utilities to prepare a consistent feature vector for modeling. I split the dataset without loading the entire dataset into memory. Train a logistic regression model through Vaex’s sklearn wrapper and evaluate its predictive performance.
deciles = np.linspace(0, 1, 11)
cuts = np.quantile(y_pred, deciles)
cuts[0] = -np.inf
cuts[-1] = np.inf
bucket = np.digitize(y_pred, cuts[1:-1], right=True).astype("int32")
df_test_local = vaex.from_arrays(y_true=y_true.astype("int8"), y_pred=y_pred.astype("float64"), bucket=bucket)
lift = df_test_local.groupby(by="bucket", agg={"n": vaex.agg.count(), "rate": vaex.agg.mean("y_true"), "avg_pred": vaex.agg.mean("y_pred")}).sort("bucket")
lift_pd = lift.to_pandas_df()
baseline = float(df_test_local["y_true"].mean())
lift_pd["lift"] = lift_pd["rate"] / (baseline + 1e-12)
print("\nDecile lift table:")
print(lift_pd.to_string(index=False))
Analyze model behavior by dividing predictions into deciles and calculating lift metrics. Evaluate the quality of your rankings by calculating baseline rates and comparing them across score buckets. Combine results into a compact lift table that reflects real-world model diagnostics.
out_dir = "/content/vaex_artifacts"
os.makedirs(out_dir, exist_ok=True)
parquet_path = os.path.join(out_dir, "customers_vaex.parquet")
state_path = os.path.join(out_dir, "vaex_pipeline.json")
base_cols = ["city","label_encoded_city","age","tenure_m","tenure_y","tx","income","income_k","value_score","target"]
export_cols = base_cols + ["z_"+f for f in features_num]
df_export = df[export_cols].sample(n=500_000, random_state=1)
if os.path.exists(parquet_path):
os.remove(parquet_path)
df_export.export_parquet(parquet_path)
pipeline_state = {
"vaex_version": vaex.__version__,
"encoder_labels": {k: {str(kk): int(vv) for kk,vv in v.items()} for k,v in encoder.labels_.items()},
"scaler_mean": [float(x) for x in scaler.mean_],
"scaler_std": [float(x) for x in scaler.std_],
"features_num": features_num,
"export_cols": export_cols,
}
with open(state_path, "w") as f:
json.dump(pipeline_state, f)
df_reopen = vaex.open(parquet_path)
df_reopen["income_k"] = df_reopen.income / 1000.0
df_reopen["tenure_y"] = df_reopen.tenure_m / 12.0
df_reopen["log_income"] = df_reopen.income.log1p()
df_reopen["tx_per_year"] = df_reopen.tx / (df_reopen.tenure_y + 0.25)
df_reopen["value_score"] = (0.35*df_reopen.log_income + 0.20*df_reopen.tx_per_year + 0.10*df_reopen.tenure_y - 0.015*df_reopen.age)
df_city_features = vaex.from_pandas(city_table[["city","p95_income_k","avg_income_k","median_value_score","target_rate"]], copy_index=False)
df_reopen = df_reopen.join(df_city_features, on="city", rsuffix="_city")
df_reopen["income_vs_city_p95"] = df_reopen.income_k / (df_reopen.p95_income_k + 1e-9)
df_reopen["value_vs_city_median"] = df_reopen.value_score - df_reopen.median_value_score
with open(state_path, "r") as f:
st = json.load(f)
labels_city = {k: int(v) for k,v in st["encoder_labels"]["city"].items()}
df_reopen["label_encoded_city"] = df_reopen.city.map(labels_city, default_value=-1)
for i, feat in enumerate(st["features_num"]):
mean_i = st["scaler_mean"][i]
std_i = st["scaler_std"][i] if st["scaler_std"][i] != 0 else 1.0
df_reopen["z_"+feat] = (df_reopen[feat] - mean_i) / std_i
df_reopen = vaex_model.transform(df_reopen)
print("\nArtifacts written:")
print(parquet_path)
print(state_path)
print("\nReopened parquet predictions (head):")
print(df_reopen[["city","income_k","value_score","pred","target"]].head(8))
print("\nDone.")
Export large-scale, fully functional samples to Parquet and retain their complete preprocessing state to ensure reproducibility. Reload data and definitively rebuild all designed features from saved metadata. Run inference on the reloaded dataset to ensure that your pipeline remains stable and deployable end-to-end.
In conclusion, we demonstrated how Vaex enables fast and memory-efficient data processing while supporting advanced feature engineering, aggregation, and model integration. We showed that approximate statistics, lazy computation, and out-of-core execution can scale cleanly from analysis to deployable artifacts. By exporting reproducible functionality and preserving complete pipeline state, we closed the loop from raw data to inference and demonstrated how Vaex fits naturally into modern big data Python workflows.
Please check See the complete code here. Also, feel free to follow us Twitter Don’t forget to join us 120,000+ ML subreddits and subscribe our newsletter. hang on! Are you on telegram? You can now also participate by telegram.
