How to build data pipelines for machine learning | By Shaw Talebi | May 2024

Machine Learning


First, import some libraries and a secret YouTube API key. If you don't have an API key, you can create one by following this guide.

import requests
import json
import polars as pl
from my_sk import my_key

from youtube_transcript_api import YouTubeTranscriptApi

Next, we'll define variables that will help us extract video data from the YouTube API. Here, we specify the YouTube channel ID and API URL to initialize it. page tokencreate a list to save video data.

# define channel ID
channel_id = 'UCa9gErQ9AE5jT2DZLjXBIdA'

# define url for API
url = 'https://www.googleapis.com/youtube/v3/search'

# initialize page token
page_token = None

# intialize list to store video data
video_record_list = []

The next part of the code may be scary, so I'll explain what's going on first. Make a GET request to YouTube's search API. This is the same as searching for videos on YouTube, but instead of using the UI, you perform the search programmatically.

Search results are limited to 50 results per page, so you must perform a recursive search to return all videos that match your search criteria. In Python code it looks like this:

# extract video data across multiple search result pages

while page_token != 0:
# define parameters for API call
params = {'key': my_key, 'channelId': channel_id,
'part': ["snippet","id"], 'order': "date",
'maxResults':50, 'pageToken': page_token}
# make get request
response = requests.get(url, params=params)

# append video data from page results to list
video_record_list += getVideoRecords(response)

try:
# grab next page token
page_token = json.loads(response.text)['nextPageToken']
except:
# if no next page token kill while loop
page_token = 0

getVideoRecords() A user-defined function that extracts relevant information from an API response.

# extract video data from single search result page

def getVideoRecords(response: requests.models.Response) -> list:
"""
Function to extract YouTube video data from GET request response
"""

# initialize list to store video data from page results
video_record_list = []

for raw_item in json.loads(response.text)['items']:

# only execute for youtube videos
if raw_item['id']['kind'] != "youtube#video":
continue

# extract relevant data
video_record = {}
video_record['video_id'] = raw_item['id']['videoId']
video_record['datetime'] = raw_item['snippet']['publishedAt']
video_record['title'] = raw_item['snippet']['title']

# append record to list
video_record_list.append(video_record)

return video_record_list

Now that we have information about all YouTube videos, let's extract the auto-generated captions. Store video data in a Polars dataframe for easy access to video IDs.

# store data in polars dataframe
df = pl.DataFrame(video_record_list)
print(df.head())
The beginning of the data frame. Image by author.

To get the video caption, youtube_transcript_api Python library. Loop through each video ID in the dataframe and extract the associated transcript.

# intialize list to store video captions
transcript_text_list = []

# loop through each row of dataframe
for i in range(len(df)):

# try to extract captions
try:
# get transcript
transcript = YouTubeTranscriptApi.get_transcript(df['video_id'][i])
# extract text transcript
transcript_text = extract_text(transcript)
# if not captions available set as n/a
except:
transcript_text = "n/a"

# append transcript text to list
transcript_text_list.append(transcript_text)

Again, we use a user-defined function called . extract text() Extract the required information from the API.

def extract_text(transcript: list) -> str:
"""
Function to extract text from transcript dictionary
"""

text_list = [transcript[i]['text'] for i in range(len(transcript))]
return ' '.join(text_list)

Then you can add each video's transcript to the dataframe.

# add transcripts to dataframe
df = df.with_columns(pl.Series(name="transcript", values=transcript_text_list))
print(df.head())
The beginning of the data frame containing the transcript. Image by author.

Transform the extracted data to support downstream use cases. This requires exploratory data analysis (EDA).

pass duplicates

A good starting point for EDA is to look at the number of unique rows and elements in each column. Here we expected each row to be uniquely identified by video_id. Additionally, each column must not have repeating elements, except for videos where transcripts are not available. I set this to “''.Not applicable”.

Here's the code to find out that information. From the output, you can see that the data matches your expectations.

# shape + unique values
print("shape:", df.shape)
print("n unique rows:", df.n_unique())
for j in range(df.shape[1]):
print("n unique elements (" + df.columns[j] + "):", df[:,j].n_unique())

### output
# shape: (84, 4)
# n unique rows: 84
# n unique elements (video_id): 84
# n unique elements (datetime): 84
# n unique elements (title): 84
# n unique elements (transcript): 82

Check dtype

Next, examine the data type of each column. In the image above you can see that all columns are strings.

this is, video id, titleand transcriptthis is not a good choice. Date Time column. This type can be changed by:

# change datetime to Datetime dtype
df = df.with_columns(pl.col('datetime').cast(pl.Datetime))
print(df.head())
The beginning of the data frame after updating the datetime dtype. Image by author.

Handling special characters

Since we are working with text data, it is important to be aware of special strings. This requires manually skimming the text, but after a few minutes he finds two special cases. ' → ' and & → &

In the code below, I replaced these strings with the appropriate characters and changed the “''.Sha“to”show”.

# list all special strings and their replacements
special_strings = [''', '&', 'sha ']
special_string_replacements = ["'", "&", "Shaw "]

# replace each special string appearing in title and transcript columns
for i in range(len(special_strings)):
df = df.with_columns(df['title'].str.replace(special_strings[i],
special_string_replacements[i]).alias('title'))
df = df.with_columns(df['transcript'].str.replace(special_strings[i],
special_string_replacements[i]).alias('transcript'))

The dataset here is very small (84×4, approximately 900,000 characters), so you can save the data directly to your project directory. This can be done in one line of code using: write_parquet() Polars method. Final file size is 341 KB.

# write data to file
df.write_parquet('data/video-transcripts.parquet')

We've covered the basics of building data pipelines in the context of full-stack data science and looked at concrete examples using real-world data.

In the next article in this series, we'll continue to dive into the data science technology stack and learn how to use this data pipeline to develop a semantic search system for YouTube videos.

Click here to continue this series 👇



Source link

Leave a Reply

Your email address will not be published. Required fields are marked *