ploosh.
Documentation
Fabric notebook orchestration
This page provides a complete notebook implementation for running Ploosh in Microsoft Fabric and tracking results over time.
Notebook structure
The notebook is composed of 4 cells:
- Parameters — Input variables for flexible execution
- Imports — Load the required libraries
- Ploosh execution — Run the test cases
- Results history — Persist results into a Delta table
Cell 1: Parameters
# Parameters (can be overridden by Fabric Pipeline)
casessubfolder = "/"
cases_filter = "*.yaml"
These parameters enable flexible execution:
casessubfolder: Target a specific subfolder withinploosh_cases/cases_filter: Filter which YAML files to process
Cell 2: Imports
from ploosh import execute_cases
from pyspark.sql.functions import col, lit, to_date
from pyspark.sql.types import (
StructType, StructField, StringType, DoubleType,
LongType, TimestampType
)
Cell 3: Ploosh execution
outputfolder = "plooshoutputs"casesfolderpath = f"/lakehouse/default/Files/plooshcases{casessub_folder}"
connectionsfile = f"/lakehouse/default/Files/plooshconnections.yaml"
outputpath = f"/lakehouse/default/Files/{outputfolder}"
execute_cases(
cases=casesfolderpath,
connections=connections_file,
spark_session=spark,
filter=cases_filter,
pathoutput=outputpath
)
The spark variable is automatically available in Fabric notebooks.Cell 4: Results history
This cell reads the JSON output generated by Ploosh and appends it to a Delta table for historical tracking.
sparkoutputpath = f"Files/{outputfolder}/json/testresults.json"schema = StructType([
StructField("execution_id", StringType(), True),
StructField("name", StringType(), True),
StructField("state", StringType(), True),
StructField("source", StructType([
StructField("start", TimestampType(), True),
StructField("end", TimestampType(), True),
StructField("duration", DoubleType(), True),
StructField("count", LongType(), True),
StructField("executed_action", StringType(), True)
]), True),
StructField("expected", StructType([
StructField("start", TimestampType(), True),
StructField("end", TimestampType(), True),
StructField("duration", DoubleType(), True),
StructField("count", LongType(), True),
StructField("executed_action", StringType(), True)
]), True),
StructField("compare", StructType([
StructField("start", TimestampType(), True),
StructField("end", TimestampType(), True),
StructField("duration", DoubleType(), True),
StructField("success_rate", DoubleType(), True)
]), True),
StructField("error", StructType([
StructField("type", StringType(), True),
StructField("message", StringType(), True),
StructField("detailfilepath", StringType(), True)
]), True)
])
df = spark.read.schema(schema).option("multiline", "true").json(sparkoutputpath)
flatten_cols = [
col("execution_id"),
col("name"),
col("state"),
todate(col("source.start")).alias("executiondate"),
col("source.start").alias("source_start"),
col("source.end").alias("source_end"),
col("source.duration").alias("source_duration"),
col("source.count").cast("integer").alias("source_count"),
col("source.executedaction").alias("sourceexecuted_action"),
col("expected.start").alias("expected_start"),
col("expected.end").alias("expected_end"),
col("expected.duration").alias("expected_duration"),
col("expected.count").cast("integer").alias("expected_count"),
col("expected.executedaction").alias("expectedexecuted_action"),
col("compare.start").alias("compare_start"),
col("compare.end").alias("compare_end"),
col("compare.duration").alias("compare_duration"),
col("compare.successrate").alias("comparesuccess_rate"),
col("error.type").alias("error_type"),
col("error.message").alias("error_message"),
col("error.detailfilepath").alias("errordetailfile_path"),
]
dfflat = df.select(*flattencols)
df_flat.write.mode("append") \
.option("mergeSchema", "true") \
.saveAsTable("ploosh_results")
Results table schema
The ploosh_results Delta table contains:
| Column | Type | Description |
|---|---|---|
executionid | string | Unique identifier for the test run |
name | string | Test case name |
state | string | Result: passed, failed, error |
executiondate | date | Date of execution |
sourcestart | timestamp | Source data loading start time |
sourceend | timestamp | Source data loading end time |
sourceduration | double | Source loading duration in seconds |
sourcecount | integer | Number of rows in source dataset |
sourceexecutedaction | string | Query or path executed for source |
expectedstart | timestamp | Expected data loading start time |
expectedend | timestamp | Expected data loading end time |
expectedduration | double | Expected loading duration in seconds |
expectedcount | integer | Number of rows in expected dataset |
expectedexecutedaction | string | Query or path executed for expected |
comparestart | timestamp | Comparison start time |
compareend | timestamp | Comparison end time |
compareduration | double | Comparison duration in seconds |
comparesuccessrate | double | Percentage of matching rows (0.0 to 1.0) |
errortype | string | Error category (headers, count, data, compare) |
errormessage | string | Error description |
errordetailfilepath | string | Path to XLSX gap analysis file |
Pipeline integration
This notebook can be called from a Fabric Pipeline using a Notebook activity:
- Create a new Pipeline
- Add a Notebook activity
- Point it to the orchestration notebook
- Override parameters (
casessubfolder,cases_filter) as needed - Schedule the pipeline or trigger it after upstream pipelines complete