structure streaming vs autoloader
Structured Streaming
Structured Streaming is an extension of the Spark SQL engine that processes continuous data streams as tables. It enables real-time ingestion and processing of streaming data.
Key Features
- Supports real-time stream processing.
- Uses micro-batch or continuous mode.
- Works with Kafka, Azure Event Hub, Delta Lake, S3, ADLS.
- Fault tolerance and checkpointing via WAL (Write-Ahead Log).
- Supports Exactly-once processing.
from pyspark.sql.functions import col
# Define input path in DBFS
input_path = "dbfs:/mnt/streaming-data/"
# Read streaming data from DBFS
df = (spark.readStream
.format("json")
.schema("id INT, name STRING, timestamp STRING") # Define schema
.load(input_path))
# Transform the data
transformed_df = df.withColumn("timestamp", col("timestamp").cast("timestamp"))
# Write to Delta Table in DBFS
query = (transformed_df.writeStream
.format("delta")
.option("checkpointLocation", "dbfs:/mnt/checkpoints/structured_streaming")
.start("dbfs:/mnt/delta/structured_streaming_output"))
Micro-Batch Streaming with
GROUP BY
AggregationMicro-Batch Trigger Modes
(A) Run Every Fixed Interval (Processing Time)
.trigger(Trigger.ProcessingTime("5 seconds")) # Runs every 5 seconds
- Best for near real-time processing.
- Controls the batch frequency to balance latency vs. resource utilization.
(B) Run Once and Stop
.trigger(Trigger.Once()) # Runs once, processes all available data, then stops
- Useful for batch-style processing with streaming APIs.
- Can be scheduled periodically via Databricks Jobs.
(C) Run for Available Data and Stop
.trigger(Trigger.AvailableNow()) # Processes all new data and stops
- Similar to
Trigger.Once()
, but will process only new data.
from pyspark.sql.functions import col, sum
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType
from pyspark.sql.streaming import Trigger
# Define schema
schema = StructType([
StructField("order_id", IntegerType(), True),
StructField("category", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("timestamp", TimestampType(), True)
])
# Define input path
input_path = "dbfs:/mnt/streaming-sales/"
# Read streaming data in micro-batches
df = (spark.readStream
.format("json")
.schema(schema)
.load(input_path))
# Perform Group By aggregation (Total sales per category)
aggregated_df = (df
.groupBy("category")
.agg(sum("amount").alias("total_sales")))
# Write output with micro-batching (every 10 seconds)
query = (aggregated_df.writeStream
.format("delta")
.option("checkpointLocation", "dbfs:/mnt/checkpoints/microbatch_groupby")
.trigger(Trigger.ProcessingTime("10 seconds")) # Micro-batch every 10 seconds
.outputMode("complete") # Required for aggregations without watermarking
.start("dbfs:/mnt/delta/microbatch_groupby_output"))
When to Use foreachBatch()
in Structured Streaming?
foreachBatch()
is used when you need more control over each micro-batch, such as:
✅ Writing to multiple destinations (e.g., Delta + Kafka).
✅ Running custom transformations before writing.
✅ Writing to JDBC databases (e.g., MySQL, PostgreSQL, SQL Server).
✅ Implementing custom business logic for each batch.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType
from pyspark.sql.streaming import Trigger
# Define schema
schema = StructType([
StructField("order_id", IntegerType(), True),
StructField("category", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("timestamp", TimestampType(), True)
])
# Define input path
input_path = "dbfs:/mnt/streaming-sales/"
# Read streaming data in micro-batches
df = (spark.readStream
.format("json")
.schema(schema)
.load(input_path))
# Define custom function for `foreachBatch()`
def write_to_multiple_sinks(batch_df, batch_id):
# Write to Delta Lake
batch_df.write.format("delta").mode("append").save("dbfs:/mnt/delta/foreachbatch_output")
# Write to Console (for debugging)
batch_df.show()
# Write with foreachBatch (runs on each micro-batch)
query = (df.writeStream
.foreachBatch(write_to_multiple_sinks) # Apply function for each batch
.trigger(Trigger.ProcessingTime("10 seconds")) # Micro-batch every 10 seconds
.option("checkpointLocation", "dbfs:/mnt/checkpoints/foreachbatch_example")
.start())
AUTOLOADER EXAMPLES
from pyspark.sql.functions import col
# Define input path in DBFS
input_path = "dbfs:/mnt/autoloader-data/"
# Read new files using Auto Loader
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "dbfs:/mnt/schema/autoloader/")
.load(input_path))
# Transform the data
transformed_df = df.withColumn("timestamp", col("timestamp").cast("timestamp"))
# Write to Delta Table in DBFS
query = (transformed_df.writeStream
.format("delta")
.option("checkpointLocation", "dbfs:/mnt/checkpoints/autoloader")
.start("dbfs:/mnt/delta/autoloader_output"))
Summary: When to Use Each?
Feature | Structured Streaming | Auto Loader |
---|---|---|
Best for | Continuous event-based streaming | Incremental file ingestion |
Use Case | Kafka, event hubs, logs | JSON, CSV, Parquet files in DBFS |
Schema Evolution | Needs manual handling | Auto-managed |
Performance | Good for real-time data | Optimized for cloud storage & DBFS |
🚀 Auto Loader is the best choice for ingesting files from DBFS efficiently, whereas Structured Streaming is better for real-time event-based processing.