您的位置:首页 > 编程语言 > Java开发

基于RxJava实现事件总线

2016-12-28 16:06 513 查看
本文介绍RxJava中的Subject,同时对事件总线进行抽象,并给出两种实现扩展,方便使用。

RxJava实现事件总线的方法

Subject: RxJava的事件总线

RxJava实现事件总线很简单,就是创建一个Subject对象即可。

Subject<Object, Object> bus = PublishSubject.create()


Subject既是被观察者Observable,又是观察者(或叫订阅者,Subscriber)。

PublishSubject不是线程安全的,但它提供toSerialized方法可以转化为线程安全的SerializedSubject,事件总线的线程安全版本,如下所示:

Subject<Object, Object> bus = PublishSubject.create().toSerialized()


事件发送

当我们要发送事件(可以是任意对象)时,只需调用onNext方法:

bus.onNext(evt)


其中,evt是任意对象,作为事件发送出去(具体说,应该是“广播”),任何总线的监听者都能够收到这个事件。

事件监听

当我们要监听事件时,只需订阅总线:

bus.subscribe(evt -> {
// 当监听到事件时,相应的处理代码,(略)
});


其中evt为接受到的事件对象,即发送端发送出去的对象。

关闭事件监听

subscribe方法返回一个Subscription对象,即:

Subscription subscription = bus.subscribe(evt -> ...);


当我们不需要观察事件时,可以取消订阅:

if (!subscription.isUnsubscribed()) {
subscription.unsubscribe();
}


事件过滤

RxJava有大量强大的操作符,可以方便我们做很多处理, 比如过滤filter,转化map,线程调度observedOn。

实际上,很多情况下我们的监听者不需要处理任何事件,而只关注某些事件,因此需要对监听到的事件做过滤操作,这是RxJava擅长的,在subscribe前用filter操作符,即可:

bus.filter(evt -> ...)
.subscribe(evt -> ...)


事件转化

当我们只对事件中某些属性感兴趣,只需用map操作符将事件转化为需要的属性:

bus.filter(evt -> ...)
.map(evt -> evt.getXXX())
.subscribe(xxx -> ...)


线程调度

上面的事件发送到接受处理的过程,都是在单线程里进行的。我们可以使用observeOn操作符将接收处理逻辑指定到相应的线程池中处理:

bus.filter(evt -> ...)
.map(evt -> evt.getXXX())
.observeOn(Schedulers.computation())   // 使用计算线程池处理
.subscribe(xxx -> ...)


抽象与扩展

事件的定义

事件应该包含以下信息:事件发生的时间,事件源,具体事件。

对于具体事件,需要实现
Event
接口:

/**
* @author wen-zhe.liu@asml.com
*
*/
public interface Event {

}


下面是总线中事件的定义:

/**
* The Class BusEvent.
*
* @author wen-zhe.liu@asml.com
* @param <E>
*          the element type
*/
public class BusEvent<E extends Event> {

private final long occurTime;
private final Object eventSource;
private final E event;

/**
* Instantiates a new bus event.
*
* @param eventSource
*          the event source
* @param event
*          the event
*/
public BusEvent(Object eventSource, E event) {
occurTime = System.currentTimeMillis();
this.eventSource = eventSource;
this.event = event;
}

/**
* Gets the occur time.
*
* @return the occur time
*/
public long getOccurTime() {
return occurTime;
}

/**
* usually user don't have to get the content of event source, just to filter its reference.
*
* @return the event source
*/
public Object getEventSource() {
return eventSource;
}

/**
* Gets the event.
*
* @return the event
*/
public E getEvent() {
return event;
}
}


事件总线的抽象

/**
* The Interface EventBus.
*
* @author wen-zhe.liu@asml.com
*/
public interface EventBus {

/**
* Post.
*
* @param o
*          the o
*/
void post(Object o);

/**
* Async post.
*
* @param o
*          the o
*/
void asyncPost(Object o);

/**
* Observe.
*
* @param <T>
*          the generic type
* @param eventType
*          the event type
* @return the observable
*/
<T> Observable<T> observe(Class<T> eventType);

/**
* Observe event.
*
* @param <T>
*          the generic type
* @param eventType
*          the event type
* @return the bus event filter
*/
<T extends Event> BusEventFilter<T> observeEvent(Class<T> eventType);

/**
* Observe event.
*
* @param <T>
*          the generic type
* @param event
*          the event
* @return the bus event filter
*/
<T extends Event> BusEventFilter<T> observeEvent(T event);

/**
* Removes the event bus.
*
* @param eventType
*          the event type
*/
void removeEventBus(Class<?> eventType);

/**
* Release
*/
void release();
}


其中,为了方便使用,定义了内部抽象类BusEventFilter。

/**
* The Class BusEventFilter.
*
* @param <T>
*          the generic type
*/
public abstract class BusEventFilter<T extends Event> {

protected final Class<T> eventType;
private final T matchedEvent;  // null for don't have to matched, type is enough
@SuppressWarnings("rawtypes")
private final List<Func1<? super BusEvent, Boolean>> filters = new ArrayList<>();

protected BusEventFilter(Class<T> eventType) {
this.eventType = eventType;
this.matchedEvent = null;
}

@SuppressWarnings("unchecked")
protected BusEventFilter(T event) {
this.eventType = (Class<T>) event.getClass();
this.matchedEvent = event;
}

/** filter bus event.
*
* @param filter
*          the filter
* @return the bus event filter
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public BusEventFilter<T> filter(Func1<? super BusEvent<T>, Boolean> filter) {
filters.add((Func1<? super BusEvent, Boolean>) filter);
return this;
}

/**
* filter by event source
* <p>
* supplier to get event source, to make it calculate in run time, not in first time.
*
* @param eventSourceSupplier
*          the event source supplier
* @return the bus event filter
*/
public BusEventFilter<T> filterEventSource(Supplier<Object> eventSourceSupplier) {
return filter(evt -> evt.getEventSource() == eventSourceSupplier.get());
}

/**
* filter by event source in a collection
* <p>
* supplier to get event source list, to make it calculate in run time, not in first time.
*
* @param eventSourceCollectionSupplier
*          the event source collection supplier
* @return the bus event filter
*/
public BusEventFilter<T> filterEventSourceIn(Supplier<Collection<?>> eventSourceCollectionSupplier) {
return filter(evt -> eventSourceCollectionSupplier.get().stream().anyMatch(
obj -> obj == evt.getEventSource()));
}

/** Then, convert to observable event stream.
*
* @return the observable
*/
public Observable<T> then() {
return andThen()
.map(evt -> evt.getEvent())
.cast(eventType)
// if don't have to match event, match event type is enough (the case matchedEvent == null)
.filter(evt -> matchedEvent == null || matchedEvent.equals(evt));
}

/** And then.
*
* @return the observable
*/
public Observable<BusEvent> andThen() {
return filters.stream().reduce(
observeBusEvent().filter(busEvent -> eventType.isInstance(busEvent.getEvent())),
(busEvent, filter) -> busEvent.filter(filter),
(a, b) -> a);
}

protected abstract Observable<BusEvent> observeBusEvent();
}


单一总线实现

单一总线是在整个程序范围内,任何事件都发送给这个总线,任何总线的监听器都能够响应事件,这样设计简单、使用方便。其实现简单,就是对RxJava的进行简单的封装而已。

/**
* The Class RxEventBus. Single event bus for all event.
*
* @author wen-zhe.liu@asml.com
*/
public class RxEventBus implements EventBus {

private final Subject<Object, Object> bus = PublishSubject.create().toSerialized();

/* (non-Javadoc)
* @see com.asml.jex.rx.event.EventBus#post(java.lang.Object)
*/
@Override
public void post(Object o) {
bus.onNext(o);
}

/* (non-Javadoc)
* @see com.asml.jex.rx.event.EventBus#asyncPost(java.lang.Object)
*/
@Override
public void asyncPost(Object o) {
Schedulers.computation().createWorker().schedule(() ->
bus.onNext(o)
);
}

@Override
public void removeEventBus(Class<?> eventType) {
// nothing to do as it is a single event bus
}

@Override
public void release() {
bus.onCompleted();
}

/* (non-Javadoc)
* @see com.asml.jex.rx.event.EventBus#observe(java.lang.Class)
*/
@Override
public <T> Observable<T> observe(Class<T> eventType) {
return bus.ofType(eventType);
}

/* (non-Javadoc)
* @see com.asml.jex.rx.event.EventBus#observeEvent(java.lang.Class)
*/
@Override
public <T extends Event> BusEventFilter<T> observeEvent(Class<T> eventType) {
return new BusEventFilter<>(eventType);
}

/* (non-Javadoc)
* @see com.asml.jex.rx.event.EventBus#observeEvent(T)
*/
@Override
public <T extends Event> BusEventFilter<T> observeEvent(T event) {
return new BusEventFilter<>(event);
}

/**
* The Class BusEventFilter.
*
* @param <T>
*          the generic type
*/
public class BusEventFilter<T extends Event> extends EventBus.BusEventFilter<T> {

private BusEventFilter(Class<T> eventType) {
super(eventType);
}

@SuppressWarnings("unchecked")
private BusEventFilter(T event) {
super(event);
}

@Override
protected Observable<BusEvent> observeBusEvent() {
return observe(BusEvent.class);
}
}
}


多总线实现

如果有太多的监听器(比如10000个),任何一个事件发送给单一总线,所有监听器都会响应,尽管绝大多数都在过滤阶段忽略掉,但仍然会带来一定的系统开销。更好的办法就是采用多总线的方式,通过多个总线来分担事件,事件发往响应的总线,而只有少数几个监听器接收到事件,从而提高效率。

关于多总线如何分工,有多种办法。这里根据不同事件类型,选择不同的事件总线。一种事件类型对应一个事件总线。

/**
* The Class RxEventBus. Multiple event buses, each one to one event type.
*
* @author wen-zhe.liu@asml.com
*/
public class RxEventBuses implements EventBus {

private final Map<Class<?>, Subject<Object, Object>> eventBuses = new IdentityHashMap<>();

/**
* Post event, support BusEvent, support hierarchy event.
*
* @param o
*          the o
*/
@Override
public synchronized void post(Object o) {
Class<? extends Object> eventType = checkBusEvent(o).getClass();

List<Subject<Object, Object>> evtBuses = eventBuses.entrySet().stream()
.filter(eventBus -> eventBus.getKey().isAssignableFrom(eventType))
.map(eventBus -> eventBus.getValue())
.collect(Collectors.toList());  // to list: prevent ConcurrentModificationException in forEach as stream

for (Subject<Object, Object> eventBus : evtBuses) {
eventBus.onNext(o);
}
}

private Object checkBusEvent(Object o) {
return o instanceof BusEvent<?> ? ((BusEvent<?>) o).getEvent() : o;
}

private synchronized Subject<Object, Object> getOrCreateBus(Class<?> eventType) {
Subject<Object, Object> bus = eventBuses.get(eventType);
if (bus == null) {
bus = PublishSubject.create().toSerialized();
eventBuses.put(eventType, bus);
}
return bus;
}

/* (non-Javadoc)
* @see com.asml.jex.rx.event.EventBus#removeEventBus(java.lang.Class)
*/
@Override
public void removeEventBus(Class<?> eventType) {
Subject<Object, Object> bus = null;
synchronized (this) {
bus = eventBuses.remove(eventType);
}
if (bus != null) {
bus.onCompleted();
}
}

@Override
public synchronized void release() {
eventBuses.values().forEach(bus -> bus.onCompleted());
eventBuses.clear();
}

/**
* Async post.
*
* @param o
*          the o
*/
@Override
public void asyncPost(Object o) {
Schedulers.computation().createWorker().schedule(() ->
post(o)
);
}

/**
* Observe.
*
* @param <T>
*          the generic type
* @param eventType
*          the event type
* @return the observable
*/
@Override
public <T> Observable<T> observe(Class<T> eventType) {
Subject<Object, Object> bus = getOrCreateBus(eventType);
return bus.ofType(eventType);
}

/**
* Observe event.
*
* @param <T>
*          the generic type
* @param eventType
*          the event type
* @return the bus event filter
*/
@Override
public <T extends Event> BusEventFilter<T> observeEvent(Class<T> eventType) {
return new BusEventFilter<>(eventType);
}

/**
* Observe event.
*
* @param <T>
*          the generic type
* @param event
*          the event
* @return the bus event filter
*/
@Override
public <T extends Event> BusEventFilter<T> observeEvent(T event) {
return new BusEventFilter<>(event);
}

/**
* The Class BusEventFilter.
*
* @param <T>
*          the generic type
*/
public class BusEventFilter<T extends Event> extends EventBus.BusEventFilter<T> {

protected BusEventFilter(Class<T> eventType) {
super(eventType);
}

private BusEventFilter(T event) {
super(event);
}

@Override
protected Observable<BusEvent> observeBusEvent() {
return getOrCreateBus(eventType).ofType(BusEvent.class);
}
}
}


事件总线仓库

为了方便使用,用户代码无需手动创建具体的事件总线,而交给事件总线仓库,从仓库中拿就行了。

/**
* The Class EventBusRepo.
*
* @author wen-zhe.liu@asml.com
*/
public class EventBusRepo {

public static EventBus eventBus = new RxEventBuses();

static {
Runtime.getRuntime().addShutdownHook(new Thread(() -> eventBus.release()));
}

/**
* Gets the.
*
* @return the event bus
*/
public static EventBus get() {
return eventBus;
}

/**
* Sets the event bus.
*
* @param evtBus
*          the new event bus
*/
public static void setEventBus(EventBus evtBus) {
eventBus = evtBus;
}
}


缺省采用多总线,用户也可以采用setEventBus更改。

使用示例

定义事件

用户可以定义事件,只要实现
Event
接口即可,可以说普通类,也可以说枚举。举例如下:

public enum XxEvent implements Event {
XX_LOADED, XX_CHANGED
}
public class XxRemoved implements Event {
// ... code of XxRemoved
}


根据惯例,事件命名规则为:名称+过去分词,如XX_LOADED, XX_CHANGED, XxRemoved,等。

如果具体事件无状态,可以定义为枚举值比较方便(这也是为什么Event要定义为接口的原因,因为枚举只能实现接口,不能继承类)。

如果事件有状态,只能定义为普通类。由于事件可能在多个线程中传输,建议设计成不可变类。否则,要考虑线程安全问题。

发送事件

可以从事件总线仓库中拿到具体的事件总线,调用post方法,将事件封装BusEvent中,指定事件源,如下所示:

EventBusRepo.get().post(new BusEvent<>(事件源对象, XX_LOADED));


在DDD中,领域实体可以发布领域事件,可以将上面代码封装在领域实体的基类里。

/**
* @author wen-zhe.liu@asml.com
*
*/
public abstract class DomainEntity<T> {
...
protected void postEvent(DomainEvent event) {
EventBusRepo.get().post(new BusEvent<>(this, event));
...
}


在具体的领域实体类中,可以使用postEvent方法简单地发布事件:

postEvent(XX_LOADED);


接收事件

接收事件代码一般如下所示:

EventBusRepo.get().observeEvent(eventType)
.filter(evt -> ...)
.then()
.subscribe(evt -> ...)


通过observeEvent(eventType)观察事件,这里eventType可以说事件的类对象,如
XxRemoved.class
, 也可以说是对象(一般为枚举值),如
XX_LOADED


然后一般都要对事件进行过滤。通常情况下会对事件源进行过滤,即只对某些事件源感兴趣,除了上面的
filter
方法之外,我还提供了针对事件源过滤更便捷的方法,如下:

.filterEventSource(() -> 事件源对象)


只有事件源对象匹配(引用相等)才能通过。这里函数参数使用一等函数而不是直接的事件源对象,是为了避免在初始化时设置,以解决事件源对象重新赋值的情况。

有时候我们很难定位到具体的事件源对象,只能定位到事件源的集合,那么可以使用以下过滤方法:

.filterEventSourceIn(() -> 事件源对象的集合)


在DDD中,聚合根对象可以监听其内部实体对象发送到事件,因此为了简化用户代码,在聚合根对象的基类中进行封装:

public abstract class DomainAggregateRoot<T> extends DomainEntity<T> {

private final transient RxSubscriptions subscriptions = new RxSubscriptions();

public synchronized void subscribeEvents() {
if (subscriptions.isEmpty()) {
subscriptions.addAll(createEventObservers());
}
}

public synchronized void unsubscribeEvents() {
subscriptions.release();
}

protected Collection<Subscription> createEventObservers() {
return Collections.emptyList();
}

protected static <T extends DomainEvent> BusEventFilter<T> observeEvent(Class<T> eventType) {
return EventBusRepo.get().observeEvent(eventType);
}

protected static <T extends DomainEvent> BusEventFilter<T> observeEvent(T event) {
return EventBusRepo.get().observeEvent(event);
}

public DomainAggregateRoot<T> observe(Subscription... ss) {
subscriptions.observe(ss);
return this;
}
}


具体的聚合根类可以覆盖
createEventObservers
方法,从而实现订阅生命周期管理。

—— 本博客所有内容均为原创,转载请注明作者和出处 ——–

作者:刘文哲

联系方式:liuwenzhe2008@qq.com

博客:http://blog.csdn.net/liuwenzhe2008
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java-响应式编程