Prepared for: Printing/Card Manufacturing MIS Date: February 2026
Your manufacturing process has a property that breaks most off-the-shelf systems: a single tracked object transforms into N individually-tracked objects, each of which must later reconverge into an exact, validated set.
Most MIS/ERP systems track at the order or batch level. They answer "how many cards were completed?" They cannot answer "where is card #7 of job J-1044 right now, what operations has it been through, does it have a serial number, and has it been matched to its complement cards for packing?"
This is not a software feature gap. It is a data modeling gap. The systems don't have the concept of an individually-tracked sub-unit that:
The semiconductor industry solved this first — a wafer becomes hundreds of individual dies, each serialized and tracked individually through packaging and test. The pharmaceutical industry solved it second — a batch of ingredients becomes thousands of individually-tracked pills. You are solving the same class of problem for trading cards.
This report is about how to model this precisely, build the foundation correctly, and evolve the architecture to support it at scale.
Before touching code, define the system's job in mathematical terms.
That's it. Everything else is derived from that.
A physical object (sheet, card) has:
A production operation is a function that takes one or more objects, applies work, and returns:
This is the mathematical core. Your software is recording the application of these functions over time, with enough fidelity to answer any question about any object at any point.
Your system must answer two classes of questions:
Forward-looking (capacity planning):
Backward-looking (tracking and traceability):
These two question types have fundamentally different data shapes. This observation is the seed of CQRS — but we don't need to act on it yet.
The most important work you can do is model the domain purely, without any persistence, framework, or infrastructure concern. This is not an intermediate step. This is the foundation that determines everything else's quality.
Before writing a single class, establish the vocabulary. Every term should mean exactly one thing:
| Term | Definition |
|---|---|
| ProductionOrder | A commitment to produce a specific quantity of a specific product for a specific customer |
| Job | The work unit created to fulfill a ProductionOrder (one Order may spawn multiple Jobs) |
| Sheet | A physical substrate (paper) that enters the press. The unit of pre-cut work. |
| Cut | The operation that destroys a Sheet as a tracked entity and creates N CardUnits |
| CardUnit | An individual card. Born at Cut, dies when Packed. Has its own lifecycle. |
| Operation | A discrete unit of work performed on a Sheet or CardUnit at a Station |
| Station | A physical location where an operation type is performed (e.g., FOIL-1, QA-BENCH-3) |
| Embellishment | Any decorative operation: foil stamping, UV coating, die cutting, etc. |
| Memorabilia | A physical insert placed into a card (autograph, relic, etc.) |
| Set | The N CardUnits that originated from the same Sheet and must be reunited for packing |
| Assembly | The gathering of all N cards in a Set, validated as complete before packing |
| Pack | A physical shipment container (box, case, pallet) |
| QA | Quality inspection of a CardUnit. Result is pass or fail with reason. |
Getting this language right means your code reads like the business. When a developer reads sheet.cut(n_up=18) and gets back List[CardUnit], they understand the domain without needing comments.
This distinction from Domain-Driven Design is fundamental:
Entities have identity. Two entities with identical properties are still different things if they have different IDs. A CardUnit is an entity — card #7 and card #8 from the same sheet are different things even if they look identical.
Value Objects have no identity — they are defined entirely by their values. Two value objects with the same values are interchangeable. An Embellishment specification (type: "foil", color: "gold", die: "wave") is a value object — it doesn't need its own ID.
For your system:
# Entity — identity-based equality
@dataclass
class CardUnit:
card_unit_id: CardUnitId
sheet_id: SheetId
position: int # position on sheet (1-18)
job_id: JobId
status: CardUnitStatus
# ... operations history stored separately as events
def __eq__(self, other):
return isinstance(other, CardUnit) and self.card_unit_id == other.card_unit_id
# Value Object — value-based equality
@dataclass(frozen=True)
class EmbellishmentSpec:
embellishment_type: str # "foil" | "uv" | "die_cut" | "glue"
color: str | None
die_name: str | None
position: str | None # "front" | "back"
# No ID. Immutable. Two identical specs are the same thing.
An aggregate is a cluster of objects that must change together to maintain consistency. The aggregate root controls all access.
The critical design question for your system: Is the Sheet the aggregate root, or is each CardUnit its own aggregate?
The answer has major implications for how you model the cut operation and how you handle concurrent updates.
Option A: Sheet as root, CardUnits as children
Sheet (aggregate root)
└── List[CardUnit]
This makes the cut operation clean (it's a method on Sheet that spawns children), and invariants like "all cards come from a valid sheet" are easy to enforce. But it means every operation on a single CardUnit requires loading the whole Sheet aggregate, and concurrent operations on different cards from the same sheet can conflict.
Option B: CardUnit as its own aggregate after cut
Sheet (aggregate root, pre-cut)
↓ cut event
CardUnit (new aggregate root, post-cut)
This is the right answer for your domain. After the cut, CardUnits need to be independently updateable — one card is at QA while another is at the foil stamp station. They can't be locked to each other. Sheet remains the aggregate for pre-cut operations. At the cut, it fires a domain event that spawns N new CardUnit aggregates.
The invariant that bridges them: a Set or Assembly aggregate is responsible for enforcing that all CardUnits are present and accounted for before packing. This aggregate holds the set membership rules and validates completeness.
@dataclass
class Assembly:
assembly_id: AssemblyId
job_id: JobId
sheet_id: SheetId # which sheet these cards came from
expected_count: int # 18 (or N)
gathered_card_ids: List[CardUnitId]
status: AssemblyStatus # "in_progress" | "complete" | "error"
def gather(self, card_unit: CardUnit) -> 'Assembly':
"""Pure function. Returns new Assembly state. Raises if wrong card."""
if card_unit.sheet_id != self.sheet_id:
raise WrongCardForAssembly(card_unit.card_unit_id, self.assembly_id)
if card_unit.card_unit_id in self.gathered_card_ids:
raise DuplicateCardInAssembly(card_unit.card_unit_id)
new_gathered = self.gathered_card_ids + [card_unit.card_unit_id]
new_status = AssemblyStatus.COMPLETE if len(new_gathered) == self.expected_count else AssemblyStatus.IN_PROGRESS
return Assembly(
assembly_id=self.assembly_id,
job_id=self.job_id,
sheet_id=self.sheet_id,
expected_count=self.expected_count,
gathered_card_ids=new_gathered,
status=new_status
)
Notice: pure function, no I/O, no database, returns new state. This is the right model.
This is the architectural heart of your system. Most resources don't address it directly, so let's be precise.
The cut operation is a destructive transformation — the Sheet as a tracked entity ceases to exist, and N CardUnits come into existence. This is not just an update; it is a lifecycle event that creates new aggregate roots.
Before Cut: After Cut:
Sheet S-001 CardUnit S-001-01
CardUnit S-001-02
CardUnit S-001-03
...
CardUnit S-001-18
Key design decisions:
S-001-07 tells you immediately it is position 7 from Sheet S-001. This is deterministic identity, not a UUID.The assembly operation is a convergence — N separate CardUnit lifecycles must merge into a single, validated Set. This is where the zero-error requirement lives.
The Assembly aggregate is responsible for:
This is fundamentally a Process Manager (see section 11) — it tracks the state of a multi-actor, long-running process and enforces completion rules.
Your system must have a policy for:
This is a domain decision, not a software decision. But your data model must support it. The right approach is:
VOIDED (with reason) — it exits the setReplacementCardUnit can be created with explicit reference to the original it replacesDocument this as a domain decision before you model it. Don't let the software make this choice implicitly.
Before introducing any event bus, message queue, or CQRS infrastructure, you should think in events. This is the modeling discipline — not the technology.
A domain event is a fact about something that happened in the past in your domain. It is:
SheetCut, CardUnitEnteredStation, QAPassed, AssemblyCompleted
@dataclass(frozen=True)
class SheetCut:
"""Fired when a sheet is cut into individual card units."""
occurred_at: datetime
sheet_id: SheetId
job_id: JobId
n_up: int # how many cards were cut
operator_id: OperatorId
station_id: StationId
# All 18 CardUnit IDs determined at this moment
card_unit_ids: tuple[CardUnitId, ...]
@dataclass(frozen=True)
class CardUnitEnteredStation:
occurred_at: datetime
card_unit_id: CardUnitId
station_id: StationId
operator_id: OperatorId
# Inferred from sequence: previous station exit
# No "exited" event needed — next "entered" implies exit from previous
@dataclass(frozen=True)
class QAResultRecorded:
occurred_at: datetime
card_unit_id: CardUnitId
result: QAOutcome # PASS | FAIL
inspector_id: OperatorId
failure_reason: str | None # None if PASS
failure_codes: tuple[str, ...] # [] if PASS
A common mistake: treating events as structured logs ("INFO: card scanned at station"). Events are domain facts that could trigger other behavior, update read models, or be replayed to reconstruct state. They carry business meaning, not technical meaning.
A log message: "Card S-001-07 scanned at QA-BENCH-3 by operator OP-42 at 14:32:07" A domain event: QAResultRecorded(cardunitid="S-001-07", result=PASS, inspectorid="OP-42", occurredat=...)
The log is for debugging. The event is for the domain.
Your domain functions should return events, not emit them to a bus. The adapter layer does the emission. This keeps the core pure.
# In core — pure function returns events
def cut_sheet(sheet: Sheet, n_up: int, operator_id: OperatorId, station_id: StationId, at: datetime) -> tuple[Sheet, list[DomainEvent]]:
if sheet.status != SheetStatus.READY_TO_CUT:
raise SheetNotReadyForCut(sheet.sheet_id, sheet.status)
card_unit_ids = [CardUnitId.for_position(sheet.sheet_id, i) for i in range(1, n_up + 1)]
new_sheet = dataclasses.replace(sheet, status=SheetStatus.CUT)
events = [
SheetCut(
occurred_at=at,
sheet_id=sheet.sheet_id,
job_id=sheet.job_id,
n_up=n_up,
operator_id=operator_id,
station_id=station_id,
card_unit_ids=tuple(card_unit_ids)
)
] + [
CardUnitCreated(
occurred_at=at,
card_unit_id=cuid,
sheet_id=sheet.sheet_id,
job_id=sheet.job_id,
position=i + 1,
)
for i, cuid in enumerate(card_unit_ids)
]
return new_sheet, events
# In adapter layer — emits events
sheet, events = cut_sheet(sheet, n_up=18, operator_id=op, station_id=station, at=now())
for event in events:
event_store.append(event)
# optionally: message_bus.publish(event)
CQRS traces back to Bertrand Meyer's Command-Query Separation (CQS) principle (1988), which states:
Every method should either be a command that performs an action, or a query that returns data to the caller, but not both.
Meyer applied this at the method level. Greg Young extended it to the object and system level in 2010. The insight: if a command and a query can't share the same method, they may not need to share the same model either.
Greg Young's original definition:
"CQRS uses the same definition of commands and queries that Meyer used and maintains the viewpoint that they should be pure. The fundamental difference is that, in CQRS, objects are split into two objects, one containing the commands, one containing the queries."
Udi Dahan added the business context:
"CQRS addresses two driving forces: collaboration (multiple actors modifying shared data) and staleness (data shown to users becomes outdated). The pattern separates command processing from query handling to handle these realities more effectively."
Martin Fowler's warning (critically important):
"Be very cautious about using CQRS. CQRS adds risky complexity to your system. For most systems, sharing a model is easier. I've seen CQRS implementation cause significant drag on productivity in real projects."
CQRS solves a specific problem: your write model and read model want to be different shapes.
When you write a command, you care about:
When you read data, you care about:
These are fundamentally different concerns. A normalized, invariant-protecting domain model is terrible at fast reads. A denormalized read table is terrible at enforcing business rules.
CQRS says: use separate models for each. The write model handles commands and produces events. Events update the read model. The read model answers queries.
The most important practical question with CQRS is: how long can the read model lag behind the write model?
Synchronous (same transaction): Update the read model in the same database transaction as the event. Zero lag. This is the minimum viable CQRS and is correct for most applications.
Asynchronous (eventual consistency): Update the read model after the transaction commits, via a queue or event stream. Lag exists — could be milliseconds or seconds. Requires your UI to handle "your action was recorded, results will appear shortly." Adds significant complexity. Only worth it at scale.
For a manufacturing tracking system with dozens of concurrent operators (not thousands of concurrent users), synchronous CQRS is sufficient and correct. Do not add eventual consistency until you have a proven performance problem.
This is the most important section for your build strategy.
Your current tool-hub-try5 is here. Pure functions, no persistence. Excellent foundation.
You add state, but you structure it so CQRS can be introduced without rewriting anything.
Characteristics:
What you're building in Stage 1 looks like CQRS but uses the same database for reads and writes. The separation is conceptual and code-level, not infrastructure-level. This is the minimum viable approach and it carries you very far.
You separate read and write databases. Events are published to a bus or queue. Read models are updated asynchronously.
Only do this if you have proven:
The event log is the source of truth. You never update entity state — you replay events from scratch to reconstruct current state. Read models are always projections.
Only do this if you need:
For a card manufacturing shop, Stage 1 (CQRS-Ready) will serve you for years. Stage 2 only if you grow to hundreds of concurrent operators and measurable DB bottlenecks. Stage 3 almost certainly never.
The signals that tell you it's time to split read and write stores:
Your domain model requires complex joins, aggregations, or transformations for every read query. You're spending more time transforming data for display than processing commands. Example: to show the dashboard, you join 6 tables and aggregate 3 datasets on every page load.
Dashboards and reporting queries are taking table locks that delay write operations. Workers scanning barcodes are waiting because someone ran a report.
You have 10 operators doing writes but 500 managers refreshing dashboards. Or the inverse: massive batch import jobs with few concurrent readers.
The queries your business needs are so complex that they require materialized, pre-computed views that the write model cannot efficiently produce.
Different departments need completely different views of the same underlying events: operations wants real-time station load, finance wants job cost rollups, shipping wants pack manifests. Maintaining these as separate read models from a shared event log is cleaner than trying to query-engineer everything from a normalized schema.
Warning signals that you don't need CQRS:
This is the practical section. Here is exactly how to structure the code so that introducing full CQRS later requires no rewrites — only additions.
CREATE TABLE production_events (
-- Global ordering (safe for concurrent writers)
sequence_num BIGSERIAL PRIMARY KEY,
-- Per-aggregate ordering (for optimistic concurrency)
aggregate_id TEXT NOT NULL,
aggregate_type TEXT NOT NULL, -- 'sheet' | 'card_unit' | 'assembly' | 'job'
aggregate_version INTEGER NOT NULL, -- monotonically increasing per aggregate
-- The event
event_type TEXT NOT NULL, -- 'SheetCut' | 'CardUnitEnteredStation' | ...
event_payload JSONB NOT NULL, -- the full event, serialized
-- Context
occurred_at TIMESTAMPTZ NOT NULL,
recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
operator_id TEXT,
station_id TEXT,
correlation_id TEXT, -- links events from the same user action
-- Enforce no duplicate versions per aggregate (optimistic concurrency)
UNIQUE (aggregate_id, aggregate_type, aggregate_version)
);
CREATE INDEX ON production_events (aggregate_id, aggregate_type);
CREATE INDEX ON production_events (event_type);
CREATE INDEX ON production_events (occurred_at);
This table never gets UPDATE or DELETE. Ever. It is append-only.
These are updated synchronously when events are appended. In Stage 1, they live in the same database:
-- Current state of each card unit
CREATE TABLE rm_card_units (
card_unit_id TEXT PRIMARY KEY,
sheet_id TEXT NOT NULL,
job_id TEXT NOT NULL,
position INTEGER NOT NULL, -- 1..N on sheet
current_station_id TEXT,
status TEXT NOT NULL, -- 'in_progress' | 'qa_passed' | 'qa_failed' | 'assembled' | 'packed'
qa_result TEXT, -- 'pass' | 'fail' | null
qa_failure_reason TEXT,
sequential_number TEXT, -- if serialized
last_event_seq BIGINT, -- which event last updated this
updated_at TIMESTAMPTZ
);
-- Station current load
CREATE TABLE rm_station_load (
station_id TEXT PRIMARY KEY,
card_unit_count INTEGER NOT NULL DEFAULT 0,
sheet_count INTEGER NOT NULL DEFAULT 0,
last_event_seq BIGINT,
updated_at TIMESTAMPTZ
);
-- Job progress
CREATE TABLE rm_job_progress (
job_id TEXT PRIMARY KEY,
total_sheets INTEGER,
total_cards INTEGER,
cards_at_qa INTEGER DEFAULT 0,
cards_qa_passed INTEGER DEFAULT 0,
cards_qa_failed INTEGER DEFAULT 0,
cards_assembled INTEGER DEFAULT 0,
cards_packed INTEGER DEFAULT 0,
status TEXT,
last_event_seq BIGINT,
updated_at TIMESTAMPTZ
);
-- Assembly (gathering) state
CREATE TABLE rm_assemblies (
assembly_id TEXT PRIMARY KEY,
sheet_id TEXT NOT NULL,
job_id TEXT NOT NULL,
expected_count INTEGER NOT NULL,
gathered_count INTEGER NOT NULL DEFAULT 0,
missing_positions INTEGER[],
status TEXT NOT NULL, -- 'pending' | 'in_progress' | 'complete' | 'error'
last_event_seq BIGINT,
updated_at TIMESTAMPTZ
);
These live in core/ with no I/O:
# core/projections/card_unit_projection.py
@dataclass
class CardUnitReadModel:
card_unit_id: str
sheet_id: str
job_id: str
position: int
current_station_id: str | None
status: str
qa_result: str | None
qa_failure_reason: str | None
sequential_number: str | None
last_event_seq: int
def apply_event(state: CardUnitReadModel | None, event: dict) -> CardUnitReadModel | None:
"""
Pure function. Given current read model state and an event dict,
return new read model state.
This is a left-fold: fold(events) -> current_state.
"""
event_type = event["event_type"]
payload = event["event_payload"]
seq = event["sequence_num"]
match event_type:
case "CardUnitCreated":
return CardUnitReadModel(
card_unit_id=payload["card_unit_id"],
sheet_id=payload["sheet_id"],
job_id=payload["job_id"],
position=payload["position"],
current_station_id=None,
status="created",
qa_result=None,
qa_failure_reason=None,
sequential_number=None,
last_event_seq=seq,
)
case "CardUnitEnteredStation":
if state is None: return None
return dataclasses.replace(state,
current_station_id=payload["station_id"],
last_event_seq=seq,
)
case "QAResultRecorded":
if state is None: return None
return dataclasses.replace(state,
status="qa_passed" if payload["result"] == "PASS" else "qa_failed",
qa_result=payload["result"],
qa_failure_reason=payload.get("failure_reason"),
last_event_seq=seq,
)
case _:
return state # event not relevant to this projection
This is the mathematical core of CQRS. Each read model is a left-fold over the event stream:
current_state = fold(apply_event, initial_state, events)
This is functionally equivalent to:
state = None
for event in events:
state = apply_event(state, event)
return state
The power: if you need a new read model, write a new reducer. You never change the event log. You replay historical events through the new reducer and you have your new view — historically complete, back to day one.
In the adapter layer, when you write an event, you immediately update the read model in the same transaction:
# adapters/event_store/postgres.py
def append_event(conn, event: dict, aggregate_version: int) -> dict:
"""
Append event to log and update read models — all in one transaction.
This is synchronous CQRS. No eventual consistency. No message bus.
"""
with conn.transaction():
# 1. Append to event log
row = conn.execute("""
INSERT INTO production_events
(aggregate_id, aggregate_type, aggregate_version, event_type, event_payload, occurred_at, operator_id, station_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING sequence_num, recorded_at
""", [
event["aggregate_id"],
event["aggregate_type"],
aggregate_version,
event["event_type"],
json.dumps(event["payload"]),
event["occurred_at"],
event.get("operator_id"),
event.get("station_id"),
]).fetchone()
event_with_seq = {**event, "sequence_num": row["sequence_num"]}
# 2. Update all relevant read models (pure functions decide new state)
_update_card_unit_projection(conn, event_with_seq)
_update_station_load_projection(conn, event_with_seq)
_update_job_progress_projection(conn, event_with_seq)
_update_assembly_projection(conn, event_with_seq)
return event_with_seq
When you're ready to go to full CQRS, you:
Zero rewrites to core. The reducer functions stay identical.
A projection is a left-fold (functional programming term) over an ordered sequence of events:
fold :: (state → event → state) → initial_state → [event] → state
This is mathematically equivalent to the reduce/accumulate pattern:
from functools import reduce
def build_state(events: list[Event], initial_state: State) -> State:
return reduce(apply_event, events, initial_state)
Live projection: rebuild on every query by replaying events. Simple, always correct, slow for large event streams.
Materialized projection: maintain a pre-built read model table, update it when events come in. Fast queries, requires keeping the table in sync.
The recommended strategy (from EventStoreDB/Kurrent's documentation):
"Start with the live projection strategy — rebuild on demand. Then, if necessary, switch to the full-blown implementation with a separate service."
For your system: materialize the critical dashboards (station load, job progress, assembly status). Keep less-critical views as live projections.
If a Sheet has been through 50 events before being cut, you don't want to replay all 50 to get its current state every time you process a command. A snapshot is a cached state at a known version:
@dataclass
class SheetSnapshot:
sheet_id: str
at_version: int # which event version this snapshot represents
state: dict # serialized Sheet state
created_at: datetime
When loading a Sheet: load the latest snapshot, then replay only events after that snapshot's version. For small systems, you won't need snapshots for a while — aggregate streams rarely exceed 100-200 events.
These two patterns are often confused. The distinction matters for your fan-in problem.
A Saga has no centralized state. Each step publishes an event, and other components react. No coordinator knows the full picture.
CardUnit QA Passes → QAPassed event → Assembly hears it → updates its own state
Simple, decoupled, but hard to debug. If something breaks mid-process, there's no single place to look. Hard to enforce "all 18 cards must be gathered before packing" because nobody owns that invariant.
A Process Manager is a stateful coordinator. It knows where the process is, listens for events, and issues commands to advance it.
@dataclass
class AssemblyProcessManager:
"""
Coordinates the gathering of N CardUnits into a complete Set.
Knows the expected membership. Tracks who has arrived.
Enforces the invariant: all members present before completion.
"""
assembly_id: AssemblyId
job_id: JobId
sheet_id: SheetId
expected_positions: frozenset[int] # {1, 2, 3, ... 18}
gathered_positions: frozenset[int] # which have arrived
status: ProcessStatus
def on_card_unit_scanned_for_assembly(self, event: CardScannedForAssembly) -> list[Command]:
"""React to event. Return commands to issue."""
# Validate: does this card belong here?
if event.sheet_id != self.sheet_id:
return [RejectCard(card_id=event.card_unit_id, reason="wrong_sheet")]
# Validate: is this a duplicate?
if event.position in self.gathered_positions:
return [RejectCard(card_id=event.card_unit_id, reason="duplicate")]
new_gathered = self.gathered_positions | {event.position}
# Are we complete?
if new_gathered == self.expected_positions:
return [CompleteAssembly(assembly_id=self.assembly_id)]
# Missing positions
missing = self.expected_positions - new_gathered
return [UpdateAssemblyProgress(assembly_id=self.assembly_id, missing=missing)]
For your system: use a Process Manager for assembly. The zero-error requirement means you need centralized state that can enforce the completeness invariant. A saga would let cards slip through the cracks.
The Process Manager's state is persisted to its own table and updated via its own events. It is a first-class entity in your domain.
Your problem has been solved at scale in semiconductor manufacturing. Wafer-to-die is the canonical example:
Wafer → Dies (1:300+ transformation)
(Wafer-ID, X, Y) — deterministic, encodes lineageWhat they got right that you should copy:
For your system:
Sheet ID: J1044-S003 (Job 1044, Sheet 3)
CardUnit IDs: J1044-S003-01 (position 1)
J1044-S003-02 (position 2)
...
J1044-S003-18 (position 18)
The ID is a string, but it is not opaque — it carries meaning. A new developer can look at J1044-S003-07 and understand exactly what it is.
Event Modeling (Adam Dymitruk, 2018) is a methodology for designing information systems using events as the primary building block. It is designed to eliminate rework by producing a blueprint that all stakeholders (developers, domain experts, UX designers) can read.
SheetCut, CardUnitEnteredStation, QAPassedCutSheet, ScanCardAtStation, RecordQAResultReading left to right:
[View: Job Queue]
↓ operator reviews queue
[Command: StartJob]
↓
[Event: JobStarted]
↓ creates
[Event: SheetsCreated]
↓
[View: Press Queue / Sheet List]
↓ pressman selects next sheet
[Command: CheckSheetIntoPressStation]
↓
[Event: SheetEnteredPressStation]
↓
[Command: RecordPrintComplete]
↓
[Event: PrintCompleted]
↓
[Event: SheetExitedPressStation]
↓
... (foil stamping, embellishment operations) ...
↓
[View: Cutting Queue]
↓ operator selects sheet
[Command: CutSheet]
↓
[Event: SheetCut]
↓
[Events: CardUnit01Created, CardUnit02Created, ... CardUnit18Created]
↓
[View: Individual Card Queues — 18 parallel lanes]
↓
... (individual operations, QA per card) ...
↓
[View: Assembly Station — shows which cards in set have arrived]
↓ gather operator scans each card
[Command: ScanCardForAssembly]
↓
[Event: CardScannedForAssembly]
↓
[Automation: when all 18 scanned → AssemblyComplete]
↓
[Event: AssemblyCompleted]
↓
[Command: PackSet]
↓
[Event: SetPacked]
This blueprint is your specification. Before writing a single class, you can walk a domain expert through this and verify it matches reality. Every box that doesn't make sense is a domain question to resolve before coding.
Event modeling naturally produces CQRS:
When you draw out the event model, you're implicitly designing your CQRS architecture. The only thing missing is the infrastructure to run it.
Capacity planning requires knowing: how much work can a station do, and how much work is queued?
Station — a physical resource with a capacity rate:
@dataclass
class Station:
station_id: StationId
name: str
station_type: StationType # PRESS | FOIL_STAMP | QA | CUTTING | ASSEMBLY
capacity_unit: CapacityUnit # SHEETS_PER_HOUR | CARDS_PER_HOUR
rated_capacity: Decimal # e.g., 500 (cards/hour)
setup_time_minutes: int # changeover time between jobs
operating_hours: OperatingSchedule # when this station is staffed
Operation — a unit of work for a specific entity type:
@dataclass(frozen=True)
class OperationSpec:
operation_type: str
station_type: StationType
standard_time: Decimal # expected time per unit (minutes)
# Used for scheduling and capacity planning
Queue — derived from events. You don't store the queue — you project it:
def project_station_queue(events: list[Event], station_id: StationId) -> StationQueue:
"""
Project current queue for a station from events.
An entity is 'in queue' if it last entered this station and hasn't left.
"""
...
The simplest correct approach:
This is a directed acyclic graph (DAG) scheduling problem. Your product spec defines the DAG structure (what operations must happen in what order). Your event data gives you current position in the DAG.
@dataclass
class OperationNode:
operation_id: str
operation_type: str
station_type: StationType
required_for_entity_type: str # "sheet" or "card_unit"
predecessors: list[str] # operation_ids that must complete first
standard_time_minutes: Decimal
@dataclass
class ProductSpec:
product_type: str
operation_dag: list[OperationNode]
n_up: int # cards per sheet
Start simple: estimate only. Build the data structures that let you later add constraint-based scheduling. Don't try to build SAP-level scheduling on day one.
This is the master plan. Each phase is self-contained and useful. No phase requires the next phase to deliver value.
What: Define the ubiquitous language as a DECISIONS.md. Build identity primitives as pure functions. Write tests.
Deliverables:
DECISIONS.md — every domain concept defined, every invariant statedcore/identity/ — SheetId, CardUnitId, JobId with deterministic generationcore/domain/models.py — Sheet, CardUnit, Job, Assembly, Station as pure dataclassescore/domain/events.py — all domain events as frozen dataclassescore/domain/commands.py — all commands as frozen dataclassescore/domain/state_machines.py — valid transitions for Sheet and CardUnitNo database. No adapters. No HTTP. Just math.
This phase is the most important. Get the vocabulary right here and everything else follows. Get it wrong and you'll be refactoring across your entire codebase.
What: Implement the business rules as pure functions. The full domain logic for every operation.
Deliverables:
core/production/sheetoperations.py — createsheet(), cutsheet(), enterstation(), exit_station()core/production/cardunitoperations.py — createcardunit(), recordqa(), applysequentialnumber(), placememorabilia()core/production/assembly.py — scancardforassembly(), validateassembly_complete()core/projections/ — all reducer functions (pure folds)core/capacity/ — station capacity calculations, queue length projectionAll functions have signature: (state, command) -> (newstate, list[events]) or (state, event) -> newstate
What: Postgres adapter. Append-only event log + synchronous read model updates.
Deliverables:
adapters/event_store/ — Postgres event log writeradapters/projections/ — synchronous read model updaters (call core reducers, write to DB)What: Minimal HTTP API for barcode scanning. Workers scan once; system infers transitions.
Deliverables:
adapters/flask/scan_api/ — single endpoint POST /scan{barcode, stationid, operatorid, timestamp}What: HTTP endpoints for all read model queries. No writes.
Deliverables:
GET /jobs/{id}/progress — job completion statusGET /stations/{id}/queue — current queue at stationGET /card-units/{id} — current state + history of a cardGET /assemblies/{id} — which cards present, which missingGET /capacity — current system-wide loadThese are trivial because the read models are already maintained.
What: Create and manage production orders and jobs. Connect to existing quoting tools.
Deliverables:
POST /jobs — create a new job (spawns sheets, defines operation plan)GET /jobs — list all active jobsorderquote and tradingcardsheetnormalizer toolsWhat: Forward-looking scheduling from current state.
Deliverables:
core/scheduling/ — DAG-based schedule computationGET /capacity/forecast — predicted load at each station for next N daysGET /jobs/{id}/estimated-completion — derived from current state + remaining operationsWhat: Build on top of the structured event log.
Deliverables:
The AI layer is easy to build when you have a clean, structured event log. You are building the training data in Phases 0-6.
Every event in your log is a structured data point:
{
"event_type": "QAResultRecorded",
"occurred_at": "2026-03-15T14:32:07Z",
"card_unit_id": "J1044-S003-07",
"sheet_id": "J1044-S003",
"job_id": "J1044",
"inspector_id": "OP-42",
"station_id": "QA-BENCH-3",
"result": "FAIL",
"failure_reason": "foil_adhesion",
"failure_codes": ["F-201"]
}
After six months of operation, you have tens of thousands of events like this. You can answer:
An LLM agent can query this structured event log and produce insights that would take weeks to surface from a traditional system. But only if the data is structured. Unstructured logs, spreadsheets, or ad-hoc database schemas produce noise.
Build the event log correctly now. The AI reads it later.
Wrong: UPDATE cardunits SET status = 'qapassed' WHERE id = 'J1044-S003-07' Right: INSERT INTO productionevents (eventtype='QAResultRecorded', ...) → update read model
Once you update in place, you lose the history. You can't answer "when did this card pass QA?" or "how many QA attempts did it take?"
Wrong: cardunitid = uuid4() — tells you nothing about the card Right: cardunitid = "J1044-S003-07" — tells you job, sheet, and position at a glance
Opaque IDs force database lookups to understand what something is. Semantic IDs make the system self-describing.
Wrong: One ProductionOrder aggregate that contains sheets, cards, operations, QA results, everything. Right: Small, focused aggregates. Sheet knows about sheet-level operations. CardUnit knows about card-level operations. Assembly coordinates gathering.
God aggregates create massive contention — you can't update a single card without locking the entire order.
Eventual consistency is a performance optimization with a significant UX and operational cost. Don't add it until you have proven you need it. A synchronous, single-database CQRS can handle thousands of operations per day without breaking a sweat.
Wrong: emitting events like DashboardViewed or ReportGenerated — these are queries, not domain facts. Right: events are state changes in the domain. Only things that happened to physical objects or business entities are events.
If your code uses "item", "thing", "record", or "object" where the business says "card unit", "sheet", and "assembly", you've already introduced translation costs. Every time a developer reads a variable named item, they have to translate. Name things what the business calls them.
You will be tempted to build a nice interface to see your progress. Don't. The UI is a trap — it shapes the data model to what looks good on screen rather than what is correct in the domain. Build the domain model first. Build the API second. Build the UI last.
Udi Dahan: "CQRS is not a top-level architecture. CQRS is something that happens at a much lower level." Apply it within a bounded context (e.g., production tracking), not across your entire system. Your quoting tool doesn't need CQRS. Your label generator doesn't need CQRS. Your production tracking module does.
PHASE 0: Language and Identity
→ DECISIONS.md (domain vocabulary, invariants, non-goals)
→ Identity primitives (deterministic IDs with lineage)
→ Domain model (pure dataclasses)
→ Domain events (frozen dataclasses, past tense)
→ State machines (valid transitions)
→ No persistence. No HTTP. 100% testable.
PHASE 1: Domain Logic
→ Sheet operations (create, station entry/exit, cut → N cards)
→ CardUnit operations (create, individual ops, QA, sequential numbers)
→ Assembly (scan, validate, complete)
→ Projections/reducers (pure left-fold functions)
→ Capacity calculations
→ No persistence. No HTTP. 100% testable.
PHASE 2: Event Store
→ Postgres append-only event log
→ Synchronous read model updates (same transaction)
→ This is "CQRS-Ready" — full CQRS requires only adding a message bus
→ Integration tests with real DB
PHASE 3: Scan API
→ POST /scan → command → events → read model updated
→ Zero-friction for workers
PHASE 4: Read API
→ GET endpoints for all dashboards
→ Trivially implemented from read models
PHASE 5: Job Management
→ Order intake → job creation → sheet spawning
→ Integration with existing tools
PHASE 6: Capacity Planning
→ Scheduling, forecasting, deadline risk
PHASE 7: AI Layer
→ Anomaly detection, prediction, optimization
→ Easy because the event log is clean structured data
CQRS UPGRADE (when signals appear — not before):
→ Separate read database
→ Event publishing after transaction commit
→ Async projection updaters
→ Zero core rewrites — only adapter changes