使用领域事件的一种直接做法是:在 应用服务 (Application Service) 中产生事件并发布出去。例如,对于“用户昵称更新”的场景来讲,对应的应用服务 UserCommandService 实现如下:

1
2
3
4
5
6
7
8
9
10
member public this.UpdateMyName (command: UpdateUsernameCommand) (user: User) =
let user = userRepository.GetById user.Id
let oldName = user.Username
let newName = command.Username

user.UpdateUsername newName
|> userRepository.Save

UsernameChangeEvent (user.Id, newName, oldName)
|> eventPublisher.Publish

这里,在更新了用户姓名之后,即刻调用事件发布器 eventPublisher.Publish 将事件发送到消息队列中。虽然这种方式比较流行,但它至少存在两个问题:

  1. 领域事件本应属于领域模型的一部分,也即应该从领域模型中产生,而这里却在应用服务中产生
  2. 对聚合根(本例中的 User )的持久化和对事件的发布可能导致数据不一致问题。

对于第1个问题,可以采用“从领域模型中返回领域事件”的方式:

1
2
3
4
5
6
7
8
9
member public this.UpdateMyName (command: UpdateUsernameCommand) (user: User) =
let user = userRepository.GetById user.Id
let oldName = user.Username
let newName = command.Username

user.UpdateUsername newName // UpdateUsername 中构建 UsernameChangeEvent
|> fun user event ->
userRepository.Save user
eventPublisher.Publish event

这种方式保证了领域事件是从领域模型中产生,但仍然存在第二个问题。

第二个问题中所谓的“数据一致性”,表示的是将聚合根保存到数据库和将领域事件发布到消息队列之间的一致性。由于数据库和消息队列属于异构的数据源,要保证他们之间的数据一致性需要引入分布式事务。

但是分布式事务通常是比较重量级的,再加上当下的诸多常见消息队列均不支持分布式事务(比如Kafka),因此并不建议使用分布式事务来解决这个问题。

Transactional Outbox 便是一种方案,概括来说,这种方式将一个分布式事务的问题拆解为多个本地事务,并采用“至少一次投递(At Least Once Delivery)”原则保证消息的发布。具体来讲,发布方在与业务数据相同的数据库中为领域事件创建相应的事件发布表(Outbox table),然后在保存业务数据的同时将所产生的事件保存到事件发布表中,由于此时二者都属于同一个数据库的本地事务所管辖,因此保证了“业务操作”与“事件产生”之间的一致性。此时的代码变成了:

1
2
3
4
5
6
7
8
9
member public this.UpdateMyName (command: UpdateUsernameCommand) (user: User) =
let user = userRepository.GetById user.Id
let oldName = user.Username
let newName = command.Username

user.UpdateUsername newName // UpdateUsername 中构建 UsernameChangeEvent
|> fun user event ->
userRepository.Save user
eventStore.Save event // 这儿用 eventStore 代替 eventPublisher 啦

应用服务不再将事件直接发布出去,而是将事件保存到数据库中,之后,另一个模块将从数据库中读取事件并发布。

然而,这种方式依然有个缺点:每个需要产生领域事件的场景都需要应用服务先后调用repository.Save()eventStore.Save(),导致了代码重复。解决方法也很简单——在聚合根中临时保存领域事件,然后在资源库中同时保存聚合根和领域事件到数据库。

在这种方式下,首先需要在聚合根的基类中完成与领域事件相关的各种设施,包括创建临时性的事件容器events以及通用的事件产生方法RaiseEvent()

1
2
3
4
5
6
7
8
9
[<AbstractClass>]
type IAggregateRoot =
...
let events = Collections.Generic.List<DomainEvent> ()

member private this.RaiseEvent (event: DomainEvent) =
events.Add event

...

在聚合根基类AggregateRoot中,events字段用于临时保存聚合根中所产生的所有事件,各实际的聚合根类通过调用RaiseEvent()events中添加事件。比如,对于“用户修改昵称”而言,User实现如下:

1
2
3
4
5
6
7
8
member public this.UpdateUsername (name: string, user: User) =
if this.Username = name then
()
else
let oldName = this.Username
this.Username <- name
UsernameChangeEvent (user.Id, name, oldName)
|> this.RaiseEvent

这里,聚合根 User 不再返回领域事件,而是将领域事件通过AggregateRoot.RaiseEvent()暂时性地保存到自身的events中。之后在保存User时,资源库的公共基类BaseRepositorySave()方法同时完成对聚合根和领域事件的持久化:

1
2
3
4
5
6
7
8
9
member public this.Save<AR: when AR :> AggrateRoot> (it: AR) =
match it with
| null -> failwith "..."
| it when it.Events |> isEmpty |> not ->
this.SaveEvents it.Events
this.CleanEvents ()
| _ -> ()

db.Save it

Save()方法中,首先获取到聚合根中的所有领域事件,然后通过SaveEvents()方法将它们保存到发布事件表中,最后通过db.Save it保存聚合根。需要注意的是,在这种方式下,AggregateRoot中的events字段是不能被持久化的,因为需要保证每次从数据库中加载出聚合根时events都是空的,为此在SaveEvents()保存了领域事件后,立即调用it.clearEvents()将所有的领域事件清空掉,以免领域事件随着聚合根一道被持久化到数据库中。

到目前为止,对领域事件的处理都还没有涉及到与任何消息中间件相关的内容,也即事件的产生是一个完全独立于消息队列的关注点,此时不用关心领域事件之后将以何种形式发布出去,Kafka 也好,RabbitMQ 也罢。除了关注点分离的好处外,这种解耦也使得系统在有可能切换消息中间件时更加的简单。

对于“在应用服务中通过eventPublisher.Publish()直接发布事件”而言,事件的产生和发布是同时完成的;但是对于“在聚合根中临时性保存领域事件”的方式来说,它只解决了事件的产生问题,并未解决事件的发布问题,事件的发布方应该采用“发射后不管(Fire And Forget)”的原则,即发布方无需了解消费方是如何处理领域事件的,甚至都不需要知道事件被哪些消费方所消费。

但是因为发送事件需要操作消息中间件,而更新事件状态需要操作数据库。在不使用分布式事务的情况下,此时的代码对于“事件发布成功 + 数据库落库成功”来讲是皆大欢喜的,但是依然无法排除有很小的概率导致事件发送成功了但是状态却为得到更新的情况。要解决这个问题,有一个选择是做妥协,即事件发布方无法保证事件的“精确一次性投递(Exactly Once)”,而是保证“至少一次投递(At Least Once)”。假设在事件发布成功之后,由于种种原因导致事件的状态未得到更新,即依然为CREATED状态,那么稍后,当事件兜底机制启动时,它将加载系统中尚未发布的事件进行发布,其中就包含状态为CREATED的事件,进而导致事件的重复投递。

“至少一次投递”将更多的负担转嫁给了事件的消费方,使得事件发送方得以全身而退。

事件消费的重点在于如何解决发布方的“至少一次投递”问题。举个例子,假设在电商系统中,订单子系统发布了“订单已成交”(OrderPlacedEvent)事件,积分子系统消费这个事件时会给用户新增与订单价格等额的积分,但是对事件的“至少一次投递”有可能导致该事件被重复投递进而导致重复给用户积分的情况产生。解决这个问题通常有2种方式:

  1. 将消费方自身的处理逻辑设计为幂等的,即多次执行和一次执行的结果是相同的
  2. 消费方在数据库中建立一个事件消费表,用于跟踪已经被消费的事件

第一种方式是最理想的,消费方不用引入额外的支撑性机制,但是这种方式对消费方的要求太高,并不是所有场景都能将消费方本身的处理逻辑设计为幂等。因此,实践中主要采用第二种方式。