Stop Subscribing to Domain Events

The notification fires before the projection commits. The user sees a stale balance. Emitting derived events from inside the projection — as the read model — closes the race for good.

Share
Stop Subscribing to Domain Events

You have a wallet balance projection. It listens to MoneyWasAdded and MoneyWasWithdrawn, computes the current balance, and writes it to a read model table. A notification service pushes a WebSocket update whenever a user's balance changes.

But are you sure, that once your Event Subscriber was triggered to fetch the balance and send it over websocket it will actually be valid one?

The Race Condition Nobody Warns You About

So what would be the obvious approach to trigger action when we want to react on Event: subscribe the notification service to the same domain events the projection uses. MoneyWasAdded fires, the notification service picks it up, sends a push to the user's browser.

The user sees the notification. They tap it. The page refreshes.

And shows the old balance.

The notification arrived before the projection finished updating the read model. The user is now staring at a number that contradicts the message they just received. They refresh again. Still old. They refresh a third time — now it is correct.

It gets worse when we need to send notification (email, sms, mail etc) which includes the actual balance. The notification service does not have the computed value — it only has the raw event saying money was added. So it reaches for data from Projection Read Model, which is not yet up to date. Sending outdated balance to the Customer.

So solve that we may reach for different workarounds:

Version the read model and retry. Stamp each row with a version, attach the same version to the notification event, and have consumers retry until the read model has caught up. The race is gone — but every notification now triggers a retry loop hitting the database, and most of those reads are wasted because the projection has not finished yet. You have not removed the polling, you have moved it out of the timer and into the consumer.

Make projections synchronous. Update every read model in the same transaction as the command. The user always sees fresh data. But every projection is now on the critical path of every write. Action latency grows with each projection you add — and it keeps growing, because new projections keep landing. Worse, a slow or failing projection blocks writes entirely. The isolation between read and write models is gone, and that isolation was the reason you separated them in the first place.

Enrich the events. Pack the projected fields into the domain event itself, so subscribers do not need to load anything. Now MoneyWasAdded carries the new balance, the user's tier, and whatever else current consumers happen to need. Every new subscriber adds new requirements. Events grow until they are no longer events — they stop describing what happened in the domain and start describing what consumers want to know. And what if we calculate the balance wrong, now we have wrong balance in the Event Stream.

The above solutions will work to some degree, each will bring it's own pains, but still it's good to have those in your toolkit - as no architecture is ideal. However in this article we won't be focusing on workarounds, and we will take a look different approach which is not so well known to solve this problem.

So the real problem is not the notification itself. The problem is that your projection computes derived facts — and nothing else in your system has access to those facts at the moment they become true.

Event Emission from Projections

What we actually need is a triggering event that carries the computed balance, emitted only after the projection has updated the read model. A derived fact, not a raw domain occurrence.

The shape of the solution is straightforward: let the projection do its job first, then announce what changed. The domain event arrives, the projection updates the read model, and only then does it emit a derived event for everyone else to react to.
The projection processes the domain event and updates the read model first. Only after that does it emit a derived event carrying the computed balance — so by the time any consumer sees it, the read model already reflects the new state.

In Ecotone, you inject EventStreamEmitter into your projection's event handlers. After updating the read model, you call emit() to publish derived events.

#[ProjectionV2('wallet_balance')]
#[FromAggregateStream(Wallet::class)]
class WalletBalanceProjection
{
    public function __construct(
        private DocumentStore $documentStore
    ) {}

    #[EventHandler]
    public function whenMoneyWasAdded(
        MoneyWasAddedToWallet $event,
        EventStreamEmitter $emitter
    ): void {
        $wallet = $this->getWalletFor($event->walletId);
        $wallet = $wallet->add($event->amount);
        $this->saveWallet($wallet);

        $emitter->emit([
            new WalletBalanceWasChanged(
                $event->walletId, 
                $wallet->currentBalance
            )
        ]);
    }
}

The projection updates the read model first, then emits a derived event carrying the computed balance.

The emit() method stores the event in a stream named project_{projectionName} — in this case, project_wallet_balance. Any service can subscribe to that stream like it would to any other event stream.

The emitted event must live in the same storage as the read model. Push it straight to Kafka or RabbitMQ and the event flies out before the projection commits — the subscriber picks it up, queries the read model, and sees stale data. The same race we set out to fix. The projection's own stream acts as the outbox; a relay can ship events to a broker afterwards.

The race condition from our opening example becomes structurally impossible. When the notification service receives WalletBalanceWasChanged, the read model is guaranteed to already reflect that balance.

Subscribing to emitted events works like subscribing to any other event:

class WalletNotificationService
{
    #[EventHandler]
    public function when(
        WalletBalanceWasChanged $event
    ): void {
        // Send WebSocket push — the read model
        // is guaranteed to reflect this balance
    }
}

The notification service subscribes to the derived event. By the time it fires, the read model already contains the updated balance.

The Cost of Per-Event Emission

There is a price tag on the pattern above. Every projected event now does three things: read the current state, write the updated read model, and emit a derived event. That is at least one extra write to the event stream per domain event — on top of the read model update that was already there. At low volume nobody notices, but with higher volume it becomes significant.

Worse, most of those emissions are noise. If a wallet receives twenty MoneyWasAdded events in the same batch, do you really want to emit twenty WalletBalanceWasChanged events?

This is exactly what #[ProjectionState] was built for. Instead of reading and writing the read model on every event, you carry the computed state in memory across the batch and persist it once at the end. Combine that with emission inside #[ProjectionFlush] and you announce changes only when there is something worth announcing.

#[Partitioned]
#[ProjectionV2('wallet_balance')]
#[FromAggregateStream(Wallet::class)]
class WalletBalanceProjection
{
    #[EventHandler]
    public function whenWalletInitialized(
        WalletWasInitialized $event,
        #[ProjectionState] array $wallet = []
    ): array {
        return [
            'walletId' => $event->walletId,
            'balance' => 0,
        ];
    }

    #[EventHandler]
    public function whenMoneyWasAdded(
        MoneyWasAddedToWallet $event,
        #[ProjectionState] array $wallet
    ): array {
        $wallet['balance'] += $event->amount;
        return $wallet;
    }

    #[ProjectionFlush]
    public function flush(
        #[ProjectionState] array $wallet,
        EventStreamEmitter $emitter
    ): void {
        $this->saveWallet($wallet);
        $emitter->emit([
            new WalletBalanceWasChanged(
                $wallet['walletId'], 
                $wallet['balance']
            )
        ]);
    }
}

Event handlers accumulate state in memory. The flush handler runs once per batch per partition — a single read model write and a single emission, regardless of how many events arrived.

Twenty events into the same wallet now produce one read model write and one WalletBalanceWasChanged event carrying the final balance. The race condition guarantee still holds — flush and emission share the same transaction — but you are no longer paying per-event for it.

Notice there is no read from the read model anywhere in this projection. Ecotone carries the state for you between events and persists it on flush. You decide whether to emit based on the state you already have in memory — no getWalletFor(), no extra query just to figure out what changed.

What Happens When You Rebuild

Say your wallet balance projection has been running for two years. Millions of events processed, millions of WalletBalanceWasChanged events emitted. Downstream consumers — notifications, compliance checks, analytics — have processed all of them.

Now you need to rebuild. Maybe you added a new field to the read model. You reset the projection and replay from the beginning.

Without suppression, every single historical event gets re-emitted. Your notification service sends two years of balance change alerts to every user. Your compliance system re-triggers every check it has already completed. Your analytics pipeline double-counts everything.

Ecotone prevents this automatically. During a rebuild, emitted events are suppressed. The projection replays and reconstructs its read model, but emit() calls produce nothing. Once the rebuild catches up to the live stream, emission resumes normally. No duplicate notifications. No phantom events flooding downstream consumers.

This is a key difference from using the EventBus directly — the EventBus would happily republish every event during replay. The EventStreamEmitter knows the difference between "processing historical events" and "processing new events," and only emits for the latter.

Remapping into New Event Streams

Raw event streams are fine-grained. Each aggregate produces its own stream of domain events: OrderWasPlaced, PaymentWasReceived, ShipmentWasDispatched. But downstream consumers rarely want that granularity. They want to know: "Is this order fully completed?"

That answer requires correlating events from Order, Payment, and Shipping aggregates. A projection is the natural place to do that correlation — subscribe to multiple streams, accumulate state in #[ProjectionState], and emit a single derived event from #[ProjectionFlush] once all conditions are met.

#[ProjectionV2('order_lifecycle')]
#[FromAggregateStream(Order::class)]
#[FromAggregateStream(Payment::class)]
#[FromAggregateStream(Shipment::class)]
class OrderLifecycleProjection
{
    #[EventHandler]
    public function whenOrderPlaced(
        OrderWasPlaced $event,
        #[ProjectionState] array $order = []
    ): array {
        return [
            'orderId' => $event->orderId,
            'ordered' => true,
            'paid' => false,
            'shipped' => false,
        ];
    }

    #[EventHandler]
    public function whenPaymentReceived(
        PaymentWasReceived $event,
        #[ProjectionState] array $order
    ): array {
        $order['paid'] = true;
        return $order;
    }

    #[EventHandler]
    public function whenShipped(
        ShipmentWasDispatched $event,
        #[ProjectionState] array $order
    ): array {
        $order['shipped'] = true;
        return $order;
    }

    #[ProjectionFlush]
    public function flush(
        #[ProjectionState] array $order,
        EventStreamEmitter $emitter
    ): void {
        if ($order['ordered'] 
            && $order['paid'] 
            && $order['shipped']
        ) {
            $emitter->linkTo('completed_orders', [
                new OrderFullyCompleted($order['orderId'])
            ]);
        }
    }
}

Each order's progress accumulates in #[ProjectionState] as events arrive across the three aggregates. No trackStep(), no allStepsComplete(), no helper to duplicate across handlers — the state is right there in memory. #[ProjectionFlush] checks once per batch and writes a single OrderFullyCompleted into the completed_orders stream when the order is done.

Notice the switch from emit() to linkTo('completed_orders', ...). The EventStreamEmitter interface offers both: emit() writes to the projection's own stream (project_{name}), while linkTo() writes to any stream name you choose. Same transactional guarantee, same rebuild suppression — just a stream you name yourself. Instead of dropping the derived event into the projection's own stream, we are publishing it under a name we chose deliberately. The projection becomes an event translator — converting low-level domain events from three aggregates into a single high-level business fact, written to a stream that describes what it contains rather than where it came from.

That naming choice is what turns this into a high-level abstraction. You can build a stream of "the things our business considers important" — completed_orders, cancelled_subscriptions, kyc_approved_customers — and feed each one from whatever combination of raw aggregates and projections it takes to compute. Consumers do not see the choreography. They see facts.

This is also where the design starts paying off across team boundaries. Streams like completed_orders do not have to be consumed by your own code at all — they can become the public API other teams build on. The reporting team subscribes to it for their dashboards. Finance ingests it into the books. A data pipeline pulls it into the warehouse. None of them need to know that an order is currently spread across three aggregates with their own internal vocabularies, and none of them break when you split, merge, or rename those aggregates tomorrow.

Your domain events stay where they belong — internal to the bounded context that produces them, free to evolve as the domain evolves. The completed_orders stream becomes the contract you maintain for the outside world: stable, intention-revealing, and decoupled from the implementation behind it.

Sharing Events Across Applications

The same mechanism extends across application boundaries. Another application — a billing system, a fraud-detection service, a CRM — wants to react when an order is fully completed. It runs in its own process, owns its own database, and has no business calling into yours to ask for details.

If all you publish is an order ID, every consuming application has to call back to fetch the rest. That couples them to your HTTP API, your read model schema, and your availability — exactly the boundaries event-driven integration was supposed to avoid. So the published event needs to carry enough for a remote application to act on its own.

By the time #[ProjectionFlush] runs, the projection has everything it could need: the order ID from the placed event, the amount and currency from the payment event, the carrier and timestamp from the shipment event. Pack what the contract requires into the published event, and let your domain events stay terse.

#[ProjectionFlush]
public function flush(
    #[ProjectionState] array $order,
    EventStreamEmitter $emitter
): void {
    if ($order['ordered'] 
        && $order['paid'] 
        && $order['shipped']
    ) {
        $emitter->linkTo('completed_orders', [
            new OrderFullyCompleted(
                orderId:     $order['orderId'],
                customerId:  $order['customerId'],
                totalAmount: $order['totalAmount'],
                currency:    $order['currency'],
                paidAt:      $order['paidAt'],
                shippedAt:   $order['shippedAt'],
                carrier:     $order['carrier'],
            )
        ]);
    }
}

The published OrderFullyCompleted carries everything a remote application needs to act on the order without calling back into your system. The domain events that fed the projection — OrderWasPlaced, PaymentWasReceived, ShipmentWasDispatched — stay focused on diffs.

This is the same idea as the "enrich the events" workaround from the opening of the article — except aimed at the right target. Enriching domain events was a bad idea because it pulled subscribers into your internal schema and forced events to grow to fit consumers you had not even met yet. Enriching a published stream is the opposite move: you decide what the contract carries, you control how it evolves, and the domain events behind it stay clean.

End to end, you now have a real boundary:

  • Internal domain events — crisp, diff-shaped, describing what changed inside an aggregate.
  • Projections — correlate across aggregates, accumulate state, compute derived facts.
  • Public streamscompleted_orders, kyc_approved_customers, cancelled_subscriptions — carrying everything an outside application needs to act.

A billing application subscribes for invoicing. A fraud service subscribes to flag suspicious patterns. A partner integration relays the event downstream. Each one acts on the data in the event itself, never querying your read models, never depending on your aggregate shapes, never blocking your domain from evolving. Your application stays isolated. Theirs stay self-sufficient.

Derived Streams as Input for Other Projections

Once projections emit events, you can chain them. Projection A reads from aggregate streams and emits derived events. Projection B subscribes to A's output stream and builds further derived views.

Picture a pipeline: raw domain events flow into a wallet_balance projection, which emits WalletBalanceWasChanged. A high_value_wallets projection subscribes to that stream and maintains a list of wallets above a threshold. A risk_assessment projection subscribes to the high-value list changes and triggers compliance checks.

Each projection in the chain consumes events and produces derived facts that feed the next stage

The natural concern: is this fragile? If the middle projection fails, does the whole pipeline collapse?

No — and this is where the foundations from earlier articles pay off. Each projection in the chain is independently resumable through position tracking. Each is independently recoverable through self-healing. Each can be independently scaled through partitioning.

A failure in one projection pauses its downstream consumers. But when it recovers, it catches up from where it stopped, and the downstream projections resume automatically. No manual intervention. No coordinated restart sequence. The chain is resilient by construction, not by accident.

Think of it like a river system. If a dam upstream temporarily stops flow, the downstream lakes do not drain — they simply wait. When the dam reopens, water flows again and every reservoir fills back to where it should be. Each node in the pipeline is autonomous and tracks its own state.

This is where the rebuild suppression from earlier becomes critical again. If you rebuild the wallet_balance projection in the middle of this chain, downstream projections do not receive a flood of re-emitted events. The suppression applies at each level independently — you can rebuild any single projection without cascading side effects through the pipeline.

What Comes Next

Your projections emit derived events, chain into pipelines, feed downstream consumers, and publish enriched streams to other applications.

Along the way we have brushed against the harder operational questions. Rebuild suppression stops a replay from flooding downstream consumers with years of re-emitted events. Transactional emission keeps every consumer in lockstep with the read model. But the moment you actually have to evolve a projection in production — add a column derived from the original event, fix a bug in the calculation, change what a field means — none of those guarantees, on their own, tell you how to do it without taking the system down or applying today's logic to yesterday's data.

The next article shows those mechanisms in practice. We will tackle blue-green deployments for projections — deploying ticket_list_v2 next to ticket_list_v1, backfilling V2 in the background while V1 keeps serving, comparing both tables, and switching only when V2 is provably correct. Around that, we will look at how #[ProjectionName] lets a single class power both versions writing to different tables, how #[ProjectionDeployment] keeps the new projection dormant until you say go, how partitioned rebuilds clear and recompute one aggregate at a time so the read model stays available, and how async backfill with parallel workers turns an overnight replay into a coffee break.

In short - Evolving live projections without breaking the consumers that depend on them — that is the next piece.