您的位置:首页 > 其它

osgi之Eventadmin通信

2016-07-25 10:01 357 查看

osgi之Eventadmin通信

osgi中bundle之间的通信,可以使用eventadmin来完成,eventadmin是osgi中的一种基于发布订阅的方式,一个Bundle进行发布发布一个事件之后,另外一个Bundle订阅相应主题,从而进行通信,在使用过相同的通信方式中,guava中有一个eventbus可以达到相同的效果,以及mq的发布订阅均是如此,但是osgi的eventadmin服务,在通信过程中event事件不是持久化的,现在开始我们的osgi之间的event事件通信。

发布事件

我们在拟定事件的主题之后可以进行相应事件的发布,这使用的还是compedium中的Eventadmin服务,如同上一篇文章中引入的依赖一样,需要使用到compedium这个包,发布事件相当简单,获取到Eventadmin之后就可以进行相应的事件发布,但同时要指定topic,指定主题之后,订阅者才能根据相应的主题获取到订阅事件。发布事件的代码相当简单,如下所示:

package cn.com.ds;

import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceReference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.event.EventConstants;

import java.util.HashMap;

/**
* Created by xiaxuan on 16/7/11.
*/
public class Activator implements BundleActivator {

private BundleContext context;

boolean flag = true;

private ServiceReference sr;

EventAdmin eventAdmin = null;

HashMap properties = null;

Event event = null;

public void start(BundleContext context) throws Exception {
this.context = context;
sr = context.getServiceReference(EventAdmin.class.getName());
if (sr == null) {
throw new Exception("Failed to obtain EventAdmin service reference!");
}
eventAdmin = (EventAdmin) context.getService(sr);
if (eventAdmin == null) {
throw new Exception("Failed to obtain EventAdmin service object!");
}
while (flag) {
if (eventAdmin != null) {
properties = new HashMap();
properties.put(EventConstants.BUNDLE_SYMBOLICNAME, "est.first");
//create event topic
event = new Event("my_osgi_test_event", properties);
eventAdmin.postEvent(event);        //asynchronous
System.out.println("Send Event!");
try {
Thread.sleep(5000);
} catch (Exception e) {}

}
}
System.out.println("ds service registered..");
}

public void stop(BundleContext context) throws Exception {
//flag = false;
System.out.println("service stoping...");
}
}


在获取到EventAdmin服务诸侯就可以进行事件发布了,其中我指定的Event的topic为my_osgi_test,在进行Event初始化的时候指定topic,我在发布事件的时候我使用的是postEvent,这是一种异步发送事件的方式,有一种同步的事件发送为sendEvent,两者的区别在于,异步发送无论是监听者是否接收成功都会返回,而同步发送则是会等到监听者接收成功才会返回,所以在这里会出现一个问题,就是当监听的Bundle没有启动起来,或者stop之后,当前发送Event的Bundle暂时也会阻塞起来,造成其他的问题。

EventHandler

在Event订阅者中需要一个EventHandler来处理相应Event,这个EventHandler需要实现EventHandler接口,实现一个handler方法,我在这写了一个简单的EventHandler为以下:

package cn.com.event.handler;

import org.osgi.service.event.Event;
import org.osgi.service.event.EventHandler;

/**
* Created by xiaxuan on 16/7/15.
*/
public class MyEventHandler implements EventHandler {

public void handleEvent(Event event) {
System.out.println("test event received..");
System.out.println("handle event start--" + event);
try {
Thread.currentThread().sleep(2000);
} catch (Exception e) {}
}
}


在handleEvent方法中,我仅仅只是打出了event监听到了一些标志以及休息2秒,其实可以从event中拿到一些之前在Event中注入的属性,以及当前的主题等等。

订阅Activator

在订阅者中,我还是将相应监听的事件放在了Activator中,这个也是相当简单,只需将EventHandler进行注册并指定监听的topic即可,代码如下:

package cn.com.example;

import cn.com.event.handler.MyEventHandler;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceReference;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.event.EventConstants;
import org.osgi.service.event.EventHandler;

import java.util.Dictionary;
import java.util.HashMap;
import java.util.Hashtable;

/**
* Created by xiaxuan on 16/7/15.
*/
public class Activator4 implements BundleActivator {

private ServiceReference sr;

EventAdmin eventAdmin = null;

HashMap properties = null;

Event event = null;

/**
* event  topic
*/
final static String[] topic = {"my_osgi_test_event"};

ServiceRegistration registration = null;

public void start(BundleContext context) throws Exception {
System.out.println("activator4 start");
Dictionary dict = new Hashtable();
dict.put(EventConstants.EVENT_TOPIC, topic);
EventHandler eventHandler = new MyEventHandler();
//registering the eventHandler
registration = context.registerService(EventHandler.class.getName(), eventHandler, dict);
if (registration != null) {
System.out.println("event handler registered.");
}
}

public void stop(BundleContext context) throws Exception {
registration.unregister();
System.out.println("event handler unregistered!");
}
}


使用EventHandler的方法和之前注册其他服务的方式相同,都是通过context.registerService的方式,在指定订阅的topic之后,就可以进行进行Bundle之间的通信了。

Bundle通信

在启动karaf之前,需要改变我们的karaf插件,karaf插件需要再启动一个feature来支持Eventadmin同性,在karaf插件中增加的如下:

<startupFeatures>
<feature>eventadmin</feature>
</startupFeatures>


这个写在configuration中,这样在karaf启动时候同样回家再eventadmin这个feature,如果不加载这个feature,会报空指针异常,这种feature加载的方式并不够优雅,还有其他方式的加载办法,但一时没有找到,日后找到了再提一下.

启动karaf之后,观察控制台,就能清楚的看到我们的事件的发布和订阅了,如下:



以上就能清楚的观察到我们的事件的发布订阅都正常进行。

总结

以上中,事件的发布和订阅中,发布Event的Bundle和订阅的Bundle之间可以没有任何的关系它们之间以 Event Admin 服务为中间人 (Broker),以事件 (Event) 为消息载体,进行 Bundle 间的松散协作。

osgi中的Event Admin的事件发布相对与各种消息中间件的发布订阅来说,功能上还是单薄了许多,而且在发布过程中,事件并不能持久化保存。意味着如果当前Bundle重启,那么Event就会全部丢失。

在事件的发布订阅中,发布事件的Bundle发布事件之后,如果订阅的Bundle处于未Active状态的时候,这个时候发布的事件就不会被接收到,并且在随后也不会被接收到。

据上,在使用Bundle之间的通信的时候,还是更加推荐使用各种开源的消息中间件来进行消息通信更好。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: