Thursday, May 7, 2026

A Coding Implementation to Construct a Unified Apache Beam Pipeline Demonstrating Batch and Stream Processing with Occasion-Time Windowing Utilizing DirectRunner

On this tutorial, we display tips on how to construct a unified Apache Beam pipeline that works seamlessly in each batch and stream-like modes utilizing the DirectRunner. We generate artificial, event-time–conscious knowledge and apply mounted windowing with triggers and allowed lateness to display how Apache Beam constantly handles each on-time and late occasions. By switching solely the enter supply, we maintain the core aggregation logic an identical, which helps us clearly perceive how Beam’s event-time mannequin, home windows, and panes behave with out counting on exterior streaming infrastructure. Try the FULL CODES right here.

!pip -q set up -U "grpcio>=1.71.2" "grpcio-status>=1.71.2"
!pip -q set up -U apache-beam crcmod


import apache_beam as beam
from apache_beam.choices.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.set off import AfterWatermark, AfterProcessingTime, AccumulationMode
from apache_beam.testing.test_stream import TestStream
import json
from datetime import datetime, timezone

We set up the required dependencies and guarantee model compatibility in order that Apache Beam. We import the core Beam APIs together with windowing, triggers, and TestStream utilities wanted later within the pipeline. We additionally usher in commonplace Python modules for time dealing with and JSON formatting. Try the FULL CODES right here.

MODE = "stream"
WINDOW_SIZE_SECS = 60
ALLOWED_LATENESS_SECS = 120


def make_event(user_id, event_type, quantity, event_time_epoch_s):
   return {"user_id": user_id, "event_type": event_type, "quantity": float(quantity), "event_time": int(event_time_epoch_s)}


base = datetime.now(timezone.utc).substitute(microsecond=0)
t0 = int(base.timestamp())


BATCH_EVENTS = [
   make_event("u1", "purchase", 20, t0 + 5),
   make_event("u1", "purchase", 15, t0 + 20),
   make_event("u2", "purchase",  8, t0 + 35),
   make_event("u1", "refund",   -5, t0 + 62),
   make_event("u2", "purchase", 12, t0 + 70),
   make_event("u3", "purchase",  9, t0 + 75),
   make_event("u2", "purchase",  3, t0 + 50),
]

We outline the worldwide configuration that controls window dimension, lateness, and execution mode. We create artificial occasions with specific event-time timestamps in order that windowing habits is deterministic and straightforward to motive about. We put together a small dataset that deliberately consists of out-of-order and late occasions to watch Beam’s event-time semantics. Try the FULL CODES right here.

def format_joined_record(kv):
   user_id, d = kv
   return {
       "user_id": user_id,
       "depend": int(d["count"][0]) if d["count"] else 0,
       "sum_amount": float(d["sum_amount"][0]) if d["sum_amount"] else 0.0,
   }


class WindowedUserAgg(beam.PTransform):
   def develop(self, pcoll):
       stamped = pcoll | beam.Map(lambda e: beam.window.TimestampedValue(e, e["event_time"]))
       windowed = stamped | beam.WindowInto(
           FixedWindows(WINDOW_SIZE_SECS),
           allowed_lateness=ALLOWED_LATENESS_SECS,
           set off=AfterWatermark(
               early=AfterProcessingTime(10),
               late=AfterProcessingTime(10),
           ),
           accumulation_mode=AccumulationMode.ACCUMULATING,
       )
       keyed = windowed | beam.Map(lambda e: (e["user_id"], e["amount"]))
       counts = keyed | beam.combiners.Rely.PerKey()
       sums = keyed | beam.CombinePerKey(sum)
       return (
           {"depend": counts, "sum_amount": sums}
           | beam.CoGroupByKey()
           | beam.Map(format_joined_record)
       )

We construct a reusable Beam PTransform that encapsulates all windowed aggregation logic. We apply mounted home windows, triggers, and accumulation guidelines, then group occasions by consumer and compute counts and sums. We maintain this rework unbiased of the info supply, so the identical logic applies to each batch and streaming inputs. Try the FULL CODES right here.

class AddWindowInfo(beam.DoFn):
   def course of(self, component, window=beam.DoFn.WindowParam, pane_info=beam.DoFn.PaneInfoParam):
       ws = float(window.begin)
       we = float(window.finish)
       yield {
           **component,
           "window_start_utc": datetime.fromtimestamp(ws, tz=timezone.utc).strftime("%H:%M:%S"),
           "window_end_utc": datetime.fromtimestamp(we, tz=timezone.utc).strftime("%H:%M:%S"),
           "pane_timing": str(pane_info.timing),
           "pane_is_first": pane_info.is_first,
           "pane_is_last": pane_info.is_last,
       }


def build_test_stream():
   return (
       TestStream()
       .advance_watermark_to(t0)
       .add_elements([
           beam.window.TimestampedValue(make_event("u1", "purchase", 20, t0 + 5), t0 + 5),
           beam.window.TimestampedValue(make_event("u1", "purchase", 15, t0 + 20), t0 + 20),
           beam.window.TimestampedValue(make_event("u2", "purchase", 8, t0 + 35), t0 + 35),
       ])
       .advance_processing_time(5)
       .advance_watermark_to(t0 + 61)
       .add_elements([
           beam.window.TimestampedValue(make_event("u1", "refund", -5, t0 + 62), t0 + 62),
           beam.window.TimestampedValue(make_event("u2", "purchase", 12, t0 + 70), t0 + 70),
           beam.window.TimestampedValue(make_event("u3", "purchase", 9, t0 + 75), t0 + 75),
       ])
       .advance_processing_time(5)
       .add_elements([
           beam.window.TimestampedValue(make_event("u2", "purchase", 3, t0 + 50), t0 + 50),
       ])
       .advance_watermark_to(t0 + 121)
       .advance_watermark_to_infinity()
   )

We enrich every aggregated file with window and pane metadata so we will clearly see when and why outcomes are emitted. We convert Beam’s inner timestamps into human-readable UTC instances for readability. We additionally outline a TestStream that simulates actual streaming habits utilizing watermarks, processing-time advances, and late knowledge. Try the FULL CODES right here.

def run_batch():
   with beam.Pipeline(choices=PipelineOptions([])) as p:
       (
           p
           | beam.Create(BATCH_EVENTS)
           | WindowedUserAgg()
           | beam.ParDo(AddWindowInfo())
           | beam.Map(json.dumps)
           | beam.Map(print)
       )


def run_stream():
   opts = PipelineOptions([])
   opts.view_as(StandardOptions).streaming = True
   with beam.Pipeline(choices=opts) as p:
       (
           p
           | build_test_stream()
           | WindowedUserAgg()
           | beam.ParDo(AddWindowInfo())
           | beam.Map(json.dumps)
           | beam.Map(print)
       )


run_stream() if MODE == "stream" else run_batch()

We wire every thing collectively into executable batch and stream-like pipelines. We toggle between modes by altering a single flag whereas reusing the identical aggregation rework. We run the pipeline and print the windowed outcomes instantly, making the execution circulation and outputs simple to examine.

In conclusion, we demonstrated that the identical Beam pipeline can course of each bounded batch knowledge and unbounded, stream-like knowledge whereas preserving an identical windowing and aggregation semantics. We noticed how watermarks, triggers, and accumulation modes affect when outcomes are emitted and the way late knowledge updates beforehand computed home windows. Additionally, we centered on the conceptual foundations of Beam’s unified mannequin, offering a strong base for later scaling the identical design to actual streaming runners and manufacturing environments.


Try the FULL CODES right here. Additionally, be at liberty to comply with us on Twitter and don’t neglect to hitch our 100k+ ML SubReddit and Subscribe to our Publication. Wait! are you on telegram? now you possibly can be a part of us on telegram as nicely.

Try our newest launch of ai2025.dev, a 2025-focused analytics platform that turns mannequin launches, benchmarks, and ecosystem exercise right into a structured dataset you possibly can filter, examine, and export


Michal Sutter is an information science skilled with a Grasp of Science in Knowledge Science from the College of Padova. With a strong basis in statistical evaluation, machine studying, and knowledge engineering, Michal excels at reworking advanced datasets into actionable insights.

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles