CICD In Azure understanding

 


structure streaming vs autoloader



Structured Streaming

Structured Streaming is an extension of the Spark SQL engine that processes continuous data streams as tables. It enables real-time ingestion and processing of streaming data.

Key Features

  • Supports real-time stream processing.
  • Uses micro-batch or continuous mode.
  • Works with Kafka, Azure Event Hub, Delta Lake, S3, ADLS.
  • Fault tolerance and checkpointing via WAL (Write-Ahead Log).
  • Supports Exactly-once processing.



from pyspark.sql.functions import col

# Define input path in DBFS
input_path = "dbfs:/mnt/streaming-data/"

# Read streaming data from DBFS
df = (spark.readStream
    .format("json")
    .schema("id INT, name STRING, timestamp STRING")  # Define schema
    .load(input_path))

# Transform the data
transformed_df = df.withColumn("timestamp", col("timestamp").cast("timestamp"))

# Write to Delta Table in DBFS
query = (transformed_df.writeStream
    .format("delta")
    .option("checkpointLocation", "dbfs:/mnt/checkpoints/structured_streaming")
    .start("dbfs:/mnt/delta/structured_streaming_output"))



Micro-Batch Streaming with GROUP BY Aggregation

Micro-Batch Trigger Modes

(A) Run Every Fixed Interval (Processing Time)

.trigger(Trigger.ProcessingTime("5 seconds")) # Runs every 5 seconds
  • Best for near real-time processing.
  • Controls the batch frequency to balance latency vs. resource utilization.

(B) Run Once and Stop

.trigger(Trigger.Once()) # Runs once, processes all available data, then stops
  • Useful for batch-style processing with streaming APIs.
  • Can be scheduled periodically via Databricks Jobs.

(C) Run for Available Data and Stop

.trigger(Trigger.AvailableNow()) # Processes all new data and stops
  • Similar to Trigger.Once(), but will process only new data.


from pyspark.sql.functions import col, sum
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType
from pyspark.sql.streaming import Trigger

# Define schema
schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("category", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("timestamp", TimestampType(), True)
])

# Define input path
input_path = "dbfs:/mnt/streaming-sales/"

# Read streaming data in micro-batches
df = (spark.readStream
    .format("json")
    .schema(schema)
    .load(input_path))

# Perform Group By aggregation (Total sales per category)
aggregated_df = (df
    .groupBy("category")
    .agg(sum("amount").alias("total_sales")))

# Write output with micro-batching (every 10 seconds)
query = (aggregated_df.writeStream
    .format("delta")
    .option("checkpointLocation", "dbfs:/mnt/checkpoints/microbatch_groupby")
    .trigger(Trigger.ProcessingTime("10 seconds"))  # Micro-batch every 10 seconds
    .outputMode("complete")  # Required for aggregations without watermarking
    .start("dbfs:/mnt/delta/microbatch_groupby_output"))




When to Use foreachBatch() in Structured Streaming?

foreachBatch() is used when you need more control over each micro-batch, such as:
✅ Writing to multiple destinations (e.g., Delta + Kafka).
✅ Running custom transformations before writing.
✅ Writing to JDBC databases (e.g., MySQL, PostgreSQL, SQL Server).
✅ Implementing custom business logic for each batch.


from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType
from pyspark.sql.streaming import Trigger

# Define schema
schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("category", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("timestamp", TimestampType(), True)
])

# Define input path
input_path = "dbfs:/mnt/streaming-sales/"

# Read streaming data in micro-batches
df = (spark.readStream
    .format("json")
    .schema(schema)
    .load(input_path))

# Define custom function for `foreachBatch()`
def write_to_multiple_sinks(batch_df, batch_id):
    # Write to Delta Lake
    batch_df.write.format("delta").mode("append").save("dbfs:/mnt/delta/foreachbatch_output")

    # Write to Console (for debugging)
    batch_df.show()

# Write with foreachBatch (runs on each micro-batch)
query = (df.writeStream
    .foreachBatch(write_to_multiple_sinks)  # Apply function for each batch
    .trigger(Trigger.ProcessingTime("10 seconds"))  # Micro-batch every 10 seconds
    .option("checkpointLocation", "dbfs:/mnt/checkpoints/foreachbatch_example")
    .start())


AUTOLOADER EXAMPLES 


from pyspark.sql.functions import col

# Define input path in DBFS
input_path = "dbfs:/mnt/autoloader-data/"

# Read new files using Auto Loader
df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", "dbfs:/mnt/schema/autoloader/")
    .load(input_path))

# Transform the data
transformed_df = df.withColumn("timestamp", col("timestamp").cast("timestamp"))

# Write to Delta Table in DBFS
query = (transformed_df.writeStream
    .format("delta")
    .option("checkpointLocation", "dbfs:/mnt/checkpoints/autoloader")
    .start("dbfs:/mnt/delta/autoloader_output"))



Summary: When to Use Each?

FeatureStructured StreamingAuto Loader
Best forContinuous event-based streamingIncremental file ingestion
Use CaseKafka, event hubs, logsJSON, CSV, Parquet files in DBFS
Schema EvolutionNeeds manual handlingAuto-managed
PerformanceGood for real-time dataOptimized for cloud storage & DBFS

🚀 Auto Loader is the best choice for ingesting files from DBFS efficiently, whereas Structured Streaming is better for real-time event-based processing.