Understanding the Problem: Calculating Watch Time Based on Play/Stop Events
=====================================
In this article, we will explore how to calculate watch time for a video based on play and stop events. We will use Apache Spark, a popular open-source data processing engine, to achieve this.
Background
The problem statement involves analyzing event logs from devices that play videos. The goal is to calculate the total watch time for each video ID by considering the differences in timestamps between consecutive “play” and “stop” events.
Sample Input Data
The sample input data provided consists of a table with columns device_id, video_id, event_timestamp, and event_type. This represents a log of events where each row corresponds to an event occurring on a device playing a video.
data1 = [("Android", 1, '2021-07-24 12:01:19.000', "play"),
("Android", 1, '2021-07-24 12:02:19.000', "stop"),
("Apple", 1, '2021-07-24 12:03:19.000', "play"),
("Apple", 1, '2021-07-24 12:04:19.000', "stop")]
Schema Definition
The data is stored in a schema defined using Apache Spark’s StructType and StructField. This schema defines the structure of each column:
schema1 = StructType([
StructField('device_id', StringType(), True),
StructField('video_id', IntegerType(), True),
StructField('event_timestamp', StringType(), True),
StructField('event_type', StringType(), True)
])
Creating a DataFrame
We create a DataFrame from the input data using spark.createDataFrame:
transaction = spark.createDataFrame(data1, schema=schema1)
Converting Timestamps to Date/Time Format
We convert the event timestamps from string format to date/time format using to_timestamp:
transaction = transaction.withColumn("Converted_timestamp", to_timestamp("event_timestamp"))
Solution Overview
Our solution involves several steps to calculate watch time based on play and stop events. We will outline these steps below.
Step 1: Grouping Events by Device ID and Video ID
We group the events by device_id and video_id using Window.partitionBy. This allows us to consider events from the same device playing the same video together:
next_event_timestamp = lead('Converted_timestamp').over(Window.partitionBy('device_id', 'video_id').orderBy('event_timestamp'))
Step 2: Filtering Play Events
We filter the grouped events to only include play events using where:
transaction = transaction.where(f.col('event_type') == 'play')
Step 3: Calculating Play Time Minutes
For each group, we calculate the difference between the next event timestamp (i.e., the timestamp of the next “play” event) and the current event timestamp. We then divide this by 60 to convert minutes:
transaction = transaction.withColumn('play_time_minutes', (f.col('next_event_timestamp').cast(IntegerType()) - f.col('Converted_timestamp').cast(IntegerType()))/60)
Step 4: Grouping by Video ID and Calculating Total Play Time Minutes
Finally, we group the results by video_id and calculate the sum of play time minutes for each video:
transaction = transaction.groupby('video_id')
.agg(f.sum('play_time_minutes').alias('total_play_time_minutes'))
Putting it All Together
Here’s the complete code snippet that combines all the steps above:
from pyspark.sql import SparkSession, functions as F
import pandas as pd
# Create a Spark Session
spark = SparkSession.builder.appName("Watch Time Calculator").getOrCreate()
# Sample input data
data1 = [("Android", 1, '2021-07-24 12:01:19.000', "play"),
("Android", 1, '2021-07-24 12:02:19.000', "stop"),
("Apple", 1, '2021-07-24 12:03:19.000', "play"),
("Apple", 1, '2021-07-24 12:04:19.000', "stop")]
# Define the schema
schema = SparkSession.builder.getSpecificSchema(
["device_id", "video_id", "event_timestamp", "event_type"]
)
# Create a DataFrame from the input data
df = spark.createDataFrame(data1, schema)
# Convert timestamps to date/time format
df = df.withColumn("Converted_timestamp", F.to_datetime("event_timestamp"))
# Group events by device ID and video ID
next_event_timestamp = df.lead("Converted_timestamp").over(
Window.partitionBy("device_id", "video_id").orderBy("event_timestamp")
)
# Filter play events
play_df = df.where(df["event_type"] == "play")
# Calculate play time minutes
play_df = play_df.withColumn("play_time_minutes", (next_event_timestamp - F.col("Converted_timestamp")).cast(float))
play_df = play_df.withColumn("play_time_minutes", play_df["play_time_minutes"].div(60))
# Group by video ID and calculate total play time minutes
result = play_df.groupBy("video_id")
.agg(F.sum("play_time_minutes").alias("total_play_time_minutes"))
print(result)
Conclusion
In this article, we demonstrated how to calculate watch time based on play and stop events using Apache Spark. We outlined the steps involved in achieving this goal, including grouping events by device ID and video ID, filtering play events, calculating play time minutes, and grouping by video ID to calculate total play time minutes.
We also provided a complete code snippet that combines all these steps into a single script.
This solution can be applied to various scenarios where event logs from devices need to be analyzed to determine watch time.
Last modified on 2025-03-09