基于RxJava实现事件总线
2016-12-28 16:06
513 查看
本文介绍RxJava中的Subject,同时对事件总线进行抽象,并给出两种实现扩展,方便使用。
Subject既是被观察者Observable,又是观察者(或叫订阅者,Subscriber)。
PublishSubject不是线程安全的,但它提供toSerialized方法可以转化为线程安全的SerializedSubject,事件总线的线程安全版本,如下所示:
其中,evt是任意对象,作为事件发送出去(具体说,应该是“广播”),任何总线的监听者都能够收到这个事件。
其中evt为接受到的事件对象,即发送端发送出去的对象。
当我们不需要观察事件时,可以取消订阅:
实际上,很多情况下我们的监听者不需要处理任何事件,而只关注某些事件,因此需要对监听到的事件做过滤操作,这是RxJava擅长的,在subscribe前用filter操作符,即可:
对于具体事件,需要实现
下面是总线中事件的定义:
其中,为了方便使用,定义了内部抽象类BusEventFilter。
关于多总线如何分工,有多种办法。这里根据不同事件类型,选择不同的事件总线。一种事件类型对应一个事件总线。
缺省采用多总线,用户也可以采用setEventBus更改。
根据惯例,事件命名规则为:名称+过去分词,如XX_LOADED, XX_CHANGED, XxRemoved,等。
如果具体事件无状态,可以定义为枚举值比较方便(这也是为什么Event要定义为接口的原因,因为枚举只能实现接口,不能继承类)。
如果事件有状态,只能定义为普通类。由于事件可能在多个线程中传输,建议设计成不可变类。否则,要考虑线程安全问题。
在DDD中,领域实体可以发布领域事件,可以将上面代码封装在领域实体的基类里。
在具体的领域实体类中,可以使用postEvent方法简单地发布事件:
通过observeEvent(eventType)观察事件,这里eventType可以说事件的类对象,如
然后一般都要对事件进行过滤。通常情况下会对事件源进行过滤,即只对某些事件源感兴趣,除了上面的
只有事件源对象匹配(引用相等)才能通过。这里函数参数使用一等函数而不是直接的事件源对象,是为了避免在初始化时设置,以解决事件源对象重新赋值的情况。
有时候我们很难定位到具体的事件源对象,只能定位到事件源的集合,那么可以使用以下过滤方法:
在DDD中,聚合根对象可以监听其内部实体对象发送到事件,因此为了简化用户代码,在聚合根对象的基类中进行封装:
具体的聚合根类可以覆盖
—— 本博客所有内容均为原创,转载请注明作者和出处 ——–
作者:刘文哲
联系方式:liuwenzhe2008@qq.com
博客:http://blog.csdn.net/liuwenzhe2008
RxJava实现事件总线的方法
Subject: RxJava的事件总线
RxJava实现事件总线很简单,就是创建一个Subject对象即可。Subject<Object, Object> bus = PublishSubject.create()
Subject既是被观察者Observable,又是观察者(或叫订阅者,Subscriber)。
PublishSubject不是线程安全的,但它提供toSerialized方法可以转化为线程安全的SerializedSubject,事件总线的线程安全版本,如下所示:
Subject<Object, Object> bus = PublishSubject.create().toSerialized()
事件发送
当我们要发送事件(可以是任意对象)时,只需调用onNext方法:bus.onNext(evt)
其中,evt是任意对象,作为事件发送出去(具体说,应该是“广播”),任何总线的监听者都能够收到这个事件。
事件监听
当我们要监听事件时,只需订阅总线:bus.subscribe(evt -> { // 当监听到事件时,相应的处理代码,(略) });
其中evt为接受到的事件对象,即发送端发送出去的对象。
关闭事件监听
subscribe方法返回一个Subscription对象,即:Subscription subscription = bus.subscribe(evt -> ...);
当我们不需要观察事件时,可以取消订阅:
if (!subscription.isUnsubscribed()) { subscription.unsubscribe(); }
事件过滤
RxJava有大量强大的操作符,可以方便我们做很多处理, 比如过滤filter,转化map,线程调度observedOn。实际上,很多情况下我们的监听者不需要处理任何事件,而只关注某些事件,因此需要对监听到的事件做过滤操作,这是RxJava擅长的,在subscribe前用filter操作符,即可:
bus.filter(evt -> ...) .subscribe(evt -> ...)
事件转化
当我们只对事件中某些属性感兴趣,只需用map操作符将事件转化为需要的属性:bus.filter(evt -> ...) .map(evt -> evt.getXXX()) .subscribe(xxx -> ...)
线程调度
上面的事件发送到接受处理的过程,都是在单线程里进行的。我们可以使用observeOn操作符将接收处理逻辑指定到相应的线程池中处理:bus.filter(evt -> ...) .map(evt -> evt.getXXX()) .observeOn(Schedulers.computation()) // 使用计算线程池处理 .subscribe(xxx -> ...)
抽象与扩展
事件的定义
事件应该包含以下信息:事件发生的时间,事件源,具体事件。对于具体事件,需要实现
Event接口:
/** * @author wen-zhe.liu@asml.com * */ public interface Event { }
下面是总线中事件的定义:
/** * The Class BusEvent. * * @author wen-zhe.liu@asml.com * @param <E> * the element type */ public class BusEvent<E extends Event> { private final long occurTime; private final Object eventSource; private final E event; /** * Instantiates a new bus event. * * @param eventSource * the event source * @param event * the event */ public BusEvent(Object eventSource, E event) { occurTime = System.currentTimeMillis(); this.eventSource = eventSource; this.event = event; } /** * Gets the occur time. * * @return the occur time */ public long getOccurTime() { return occurTime; } /** * usually user don't have to get the content of event source, just to filter its reference. * * @return the event source */ public Object getEventSource() { return eventSource; } /** * Gets the event. * * @return the event */ public E getEvent() { return event; } }
事件总线的抽象
/** * The Interface EventBus. * * @author wen-zhe.liu@asml.com */ public interface EventBus { /** * Post. * * @param o * the o */ void post(Object o); /** * Async post. * * @param o * the o */ void asyncPost(Object o); /** * Observe. * * @param <T> * the generic type * @param eventType * the event type * @return the observable */ <T> Observable<T> observe(Class<T> eventType); /** * Observe event. * * @param <T> * the generic type * @param eventType * the event type * @return the bus event filter */ <T extends Event> BusEventFilter<T> observeEvent(Class<T> eventType); /** * Observe event. * * @param <T> * the generic type * @param event * the event * @return the bus event filter */ <T extends Event> BusEventFilter<T> observeEvent(T event); /** * Removes the event bus. * * @param eventType * the event type */ void removeEventBus(Class<?> eventType); /** * Release */ void release(); }
其中,为了方便使用,定义了内部抽象类BusEventFilter。
/** * The Class BusEventFilter. * * @param <T> * the generic type */ public abstract class BusEventFilter<T extends Event> { protected final Class<T> eventType; private final T matchedEvent; // null for don't have to matched, type is enough @SuppressWarnings("rawtypes") private final List<Func1<? super BusEvent, Boolean>> filters = new ArrayList<>(); protected BusEventFilter(Class<T> eventType) { this.eventType = eventType; this.matchedEvent = null; } @SuppressWarnings("unchecked") protected BusEventFilter(T event) { this.eventType = (Class<T>) event.getClass(); this.matchedEvent = event; } /** filter bus event. * * @param filter * the filter * @return the bus event filter */ @SuppressWarnings({ "unchecked", "rawtypes" }) public BusEventFilter<T> filter(Func1<? super BusEvent<T>, Boolean> filter) { filters.add((Func1<? super BusEvent, Boolean>) filter); return this; } /** * filter by event source * <p> * supplier to get event source, to make it calculate in run time, not in first time. * * @param eventSourceSupplier * the event source supplier * @return the bus event filter */ public BusEventFilter<T> filterEventSource(Supplier<Object> eventSourceSupplier) { return filter(evt -> evt.getEventSource() == eventSourceSupplier.get()); } /** * filter by event source in a collection * <p> * supplier to get event source list, to make it calculate in run time, not in first time. * * @param eventSourceCollectionSupplier * the event source collection supplier * @return the bus event filter */ public BusEventFilter<T> filterEventSourceIn(Supplier<Collection<?>> eventSourceCollectionSupplier) { return filter(evt -> eventSourceCollectionSupplier.get().stream().anyMatch( obj -> obj == evt.getEventSource())); } /** Then, convert to observable event stream. * * @return the observable */ public Observable<T> then() { return andThen() .map(evt -> evt.getEvent()) .cast(eventType) // if don't have to match event, match event type is enough (the case matchedEvent == null) .filter(evt -> matchedEvent == null || matchedEvent.equals(evt)); } /** And then. * * @return the observable */ public Observable<BusEvent> andThen() { return filters.stream().reduce( observeBusEvent().filter(busEvent -> eventType.isInstance(busEvent.getEvent())), (busEvent, filter) -> busEvent.filter(filter), (a, b) -> a); } protected abstract Observable<BusEvent> observeBusEvent(); }
单一总线实现
单一总线是在整个程序范围内,任何事件都发送给这个总线,任何总线的监听器都能够响应事件,这样设计简单、使用方便。其实现简单,就是对RxJava的进行简单的封装而已。/** * The Class RxEventBus. Single event bus for all event. * * @author wen-zhe.liu@asml.com */ public class RxEventBus implements EventBus { private final Subject<Object, Object> bus = PublishSubject.create().toSerialized(); /* (non-Javadoc) * @see com.asml.jex.rx.event.EventBus#post(java.lang.Object) */ @Override public void post(Object o) { bus.onNext(o); } /* (non-Javadoc) * @see com.asml.jex.rx.event.EventBus#asyncPost(java.lang.Object) */ @Override public void asyncPost(Object o) { Schedulers.computation().createWorker().schedule(() -> bus.onNext(o) ); } @Override public void removeEventBus(Class<?> eventType) { // nothing to do as it is a single event bus } @Override public void release() { bus.onCompleted(); } /* (non-Javadoc) * @see com.asml.jex.rx.event.EventBus#observe(java.lang.Class) */ @Override public <T> Observable<T> observe(Class<T> eventType) { return bus.ofType(eventType); } /* (non-Javadoc) * @see com.asml.jex.rx.event.EventBus#observeEvent(java.lang.Class) */ @Override public <T extends Event> BusEventFilter<T> observeEvent(Class<T> eventType) { return new BusEventFilter<>(eventType); } /* (non-Javadoc) * @see com.asml.jex.rx.event.EventBus#observeEvent(T) */ @Override public <T extends Event> BusEventFilter<T> observeEvent(T event) { return new BusEventFilter<>(event); } /** * The Class BusEventFilter. * * @param <T> * the generic type */ public class BusEventFilter<T extends Event> extends EventBus.BusEventFilter<T> { private BusEventFilter(Class<T> eventType) { super(eventType); } @SuppressWarnings("unchecked") private BusEventFilter(T event) { super(event); } @Override protected Observable<BusEvent> observeBusEvent() { return observe(BusEvent.class); } } }
多总线实现
如果有太多的监听器(比如10000个),任何一个事件发送给单一总线,所有监听器都会响应,尽管绝大多数都在过滤阶段忽略掉,但仍然会带来一定的系统开销。更好的办法就是采用多总线的方式,通过多个总线来分担事件,事件发往响应的总线,而只有少数几个监听器接收到事件,从而提高效率。关于多总线如何分工,有多种办法。这里根据不同事件类型,选择不同的事件总线。一种事件类型对应一个事件总线。
/** * The Class RxEventBus. Multiple event buses, each one to one event type. * * @author wen-zhe.liu@asml.com */ public class RxEventBuses implements EventBus { private final Map<Class<?>, Subject<Object, Object>> eventBuses = new IdentityHashMap<>(); /** * Post event, support BusEvent, support hierarchy event. * * @param o * the o */ @Override public synchronized void post(Object o) { Class<? extends Object> eventType = checkBusEvent(o).getClass(); List<Subject<Object, Object>> evtBuses = eventBuses.entrySet().stream() .filter(eventBus -> eventBus.getKey().isAssignableFrom(eventType)) .map(eventBus -> eventBus.getValue()) .collect(Collectors.toList()); // to list: prevent ConcurrentModificationException in forEach as stream for (Subject<Object, Object> eventBus : evtBuses) { eventBus.onNext(o); } } private Object checkBusEvent(Object o) { return o instanceof BusEvent<?> ? ((BusEvent<?>) o).getEvent() : o; } private synchronized Subject<Object, Object> getOrCreateBus(Class<?> eventType) { Subject<Object, Object> bus = eventBuses.get(eventType); if (bus == null) { bus = PublishSubject.create().toSerialized(); eventBuses.put(eventType, bus); } return bus; } /* (non-Javadoc) * @see com.asml.jex.rx.event.EventBus#removeEventBus(java.lang.Class) */ @Override public void removeEventBus(Class<?> eventType) { Subject<Object, Object> bus = null; synchronized (this) { bus = eventBuses.remove(eventType); } if (bus != null) { bus.onCompleted(); } } @Override public synchronized void release() { eventBuses.values().forEach(bus -> bus.onCompleted()); eventBuses.clear(); } /** * Async post. * * @param o * the o */ @Override public void asyncPost(Object o) { Schedulers.computation().createWorker().schedule(() -> post(o) ); } /** * Observe. * * @param <T> * the generic type * @param eventType * the event type * @return the observable */ @Override public <T> Observable<T> observe(Class<T> eventType) { Subject<Object, Object> bus = getOrCreateBus(eventType); return bus.ofType(eventType); } /** * Observe event. * * @param <T> * the generic type * @param eventType * the event type * @return the bus event filter */ @Override public <T extends Event> BusEventFilter<T> observeEvent(Class<T> eventType) { return new BusEventFilter<>(eventType); } /** * Observe event. * * @param <T> * the generic type * @param event * the event * @return the bus event filter */ @Override public <T extends Event> BusEventFilter<T> observeEvent(T event) { return new BusEventFilter<>(event); } /** * The Class BusEventFilter. * * @param <T> * the generic type */ public class BusEventFilter<T extends Event> extends EventBus.BusEventFilter<T> { protected BusEventFilter(Class<T> eventType) { super(eventType); } private BusEventFilter(T event) { super(event); } @Override protected Observable<BusEvent> observeBusEvent() { return getOrCreateBus(eventType).ofType(BusEvent.class); } } }
事件总线仓库
为了方便使用,用户代码无需手动创建具体的事件总线,而交给事件总线仓库,从仓库中拿就行了。/** * The Class EventBusRepo. * * @author wen-zhe.liu@asml.com */ public class EventBusRepo { public static EventBus eventBus = new RxEventBuses(); static { Runtime.getRuntime().addShutdownHook(new Thread(() -> eventBus.release())); } /** * Gets the. * * @return the event bus */ public static EventBus get() { return eventBus; } /** * Sets the event bus. * * @param evtBus * the new event bus */ public static void setEventBus(EventBus evtBus) { eventBus = evtBus; } }
缺省采用多总线,用户也可以采用setEventBus更改。
使用示例
定义事件
用户可以定义事件,只要实现Event接口即可,可以说普通类,也可以说枚举。举例如下:
public enum XxEvent implements Event { XX_LOADED, XX_CHANGED } public class XxRemoved implements Event { // ... code of XxRemoved }
根据惯例,事件命名规则为:名称+过去分词,如XX_LOADED, XX_CHANGED, XxRemoved,等。
如果具体事件无状态,可以定义为枚举值比较方便(这也是为什么Event要定义为接口的原因,因为枚举只能实现接口,不能继承类)。
如果事件有状态,只能定义为普通类。由于事件可能在多个线程中传输,建议设计成不可变类。否则,要考虑线程安全问题。
发送事件
可以从事件总线仓库中拿到具体的事件总线,调用post方法,将事件封装BusEvent中,指定事件源,如下所示:EventBusRepo.get().post(new BusEvent<>(事件源对象, XX_LOADED));
在DDD中,领域实体可以发布领域事件,可以将上面代码封装在领域实体的基类里。
/** * @author wen-zhe.liu@asml.com * */ public abstract class DomainEntity<T> { ... protected void postEvent(DomainEvent event) { EventBusRepo.get().post(new BusEvent<>(this, event)); ... }
在具体的领域实体类中,可以使用postEvent方法简单地发布事件:
postEvent(XX_LOADED);
接收事件
接收事件代码一般如下所示:EventBusRepo.get().observeEvent(eventType) .filter(evt -> ...) .then() .subscribe(evt -> ...)
通过observeEvent(eventType)观察事件,这里eventType可以说事件的类对象,如
XxRemoved.class, 也可以说是对象(一般为枚举值),如
XX_LOADED。
然后一般都要对事件进行过滤。通常情况下会对事件源进行过滤,即只对某些事件源感兴趣,除了上面的
filter方法之外,我还提供了针对事件源过滤更便捷的方法,如下:
.filterEventSource(() -> 事件源对象)
只有事件源对象匹配(引用相等)才能通过。这里函数参数使用一等函数而不是直接的事件源对象,是为了避免在初始化时设置,以解决事件源对象重新赋值的情况。
有时候我们很难定位到具体的事件源对象,只能定位到事件源的集合,那么可以使用以下过滤方法:
.filterEventSourceIn(() -> 事件源对象的集合)
在DDD中,聚合根对象可以监听其内部实体对象发送到事件,因此为了简化用户代码,在聚合根对象的基类中进行封装:
public abstract class DomainAggregateRoot<T> extends DomainEntity<T> { private final transient RxSubscriptions subscriptions = new RxSubscriptions(); public synchronized void subscribeEvents() { if (subscriptions.isEmpty()) { subscriptions.addAll(createEventObservers()); } } public synchronized void unsubscribeEvents() { subscriptions.release(); } protected Collection<Subscription> createEventObservers() { return Collections.emptyList(); } protected static <T extends DomainEvent> BusEventFilter<T> observeEvent(Class<T> eventType) { return EventBusRepo.get().observeEvent(eventType); } protected static <T extends DomainEvent> BusEventFilter<T> observeEvent(T event) { return EventBusRepo.get().observeEvent(event); } public DomainAggregateRoot<T> observe(Subscription... ss) { subscriptions.observe(ss); return this; } }
具体的聚合根类可以覆盖
createEventObservers方法,从而实现订阅生命周期管理。
—— 本博客所有内容均为原创,转载请注明作者和出处 ——–
作者:刘文哲
联系方式:liuwenzhe2008@qq.com
博客:http://blog.csdn.net/liuwenzhe2008
相关文章推荐
- 用RxJava实现事件总线(Event Bus)
- 用RxJava实现Rxbus替换EventBus事件总线
- RxJava实现事件总线(RxBus)及详解
- EventBus使用详解-2-用RxJava实现事件总线(Event Bus)
- 重温.NET下Assembly的加载过程 ASP.NET Core Web API下事件驱动型架构的实现(三):基于RabbitMQ的事件总线
- 用RxJava实现事件总线(Event Bus)
- 使用RxJava实现的超简单事件总线RxBus
- 用RxJava实现事件总线(Event Bus)
- ASP.NET Core Web API下事件驱动型架构的实现(三):基于RabbitMQ的事件总线
- ASP.NET Core Web API下事件驱动型架构的实现(三):基于RabbitMQ的事件总线
- 详解用RxJava实现事件总线(Event Bus)
- 用RxJava实现事件总线(Event Bus)
- 利用RxJava实现的事件总线(Event Bus)
- RxJava实现事件总线——RxBus
- 用RxJava实现事件总线(Event Bus)
- js学习心得之js的自定义事件-基于观察者模式的实现
- silverlight线程与基于事件驱动javascript引擎(实现轨迹回放功能)
- silverlight线程与基于事件驱动javascript引擎(实现轨迹回放功能)
- 浅谈Observer模式的实现.基于继承和基于事件/委托的两种方式
- 使用UnrealScript实现基于观察者模式的事件系统