您的位置:首页 > 其它

使用“阿里大于”和 “企信通” 短信异步发送

2017-03-07 14:08 651 查看
因为项目里有许多发送短信的地方,所以我们就把 发送短信的功能单独拿出来,做成一个service给其他app使用。

    程序的大概流程是这样的 :



首先 我选择生产者模式创建队列。

@Component
public class SmsQueueFactory {

private static SmsQueueFactory smsQueueFactory =null;

static final BlockingQueue<SmsMessageDto> messageQueue =new ArrayBlockingQueue<SmsMessageDto>(1024);

private SmsQueueFactory (){};

private static Logger log = LoggerFactory.getLogger(SmsQueueFactory.class);

/**
* 获取对象
* @return
*/
public static  SmsQueueFactory getSmsQueueFactory() {
if (smsQueueFactory == null) {
smsQueueFactory = new SmsQueueFactory();
}
return smsQueueFactory;
}

public BlockingQueue<SmsMessageDto> getMessageQueue(){
return this.messageQueue;
}

public void setMessageDto(SmsMessageDto messageDto) {
try {
this.messageQueue.offer(messageDto, 100, TimeUnit.MILLISECONDS);
}catch (Exception e){
log.error("setMessageDto出现错误"+e.getMessage());
}

}

}


一个接收器来 接收请求,并保存到数据库, 我在这里只是 写了一些伪代码,关键的地方我也会写上去的:

@Service("smsAcceptorImpl")
public class SmsAcceptorImpl {

public void sendSms(SendMessageReq req) throws Exception{
validation(req); //校验参数
SmsMessageDto sms  = genSmsObj(req);//封装成 message 对象
if (sms==null){
return;
}
sms= saveMessage(sms); // 先保存到数据库这样可以拿到 这个消息对象的 ID,方便以后使用。
sendToQueue(sms);// 放到队列
}

/**
* 添加到发送队列
* @param
*/
private  void  sendToQueue(SmsMessageDto messageDto) {
try {
BlockingQueue<SmsMessageDto> queue =SmsQueueFactory.getSmsQueueFactory().getMessageQueue();
queue.offer(messageDto, 100, TimeUnit.MILLISECONDS);
}catch (Exception e){
log.error("加入队列出错"+e.getMessage());
}
}

}


然后写一个 init() 方法 ,定时到 队列中 看看,有没有可以发送的消息。

@Component
public class SmsQueueProcess {

@Autowired
private SmsSupplierSelector smsSupplierSelector;

private static Logger log = LoggerFactory.getLogger(SmsQueueProcess.class);

@PostConstruct
public void init(){
getMesForQueue();
}

//短信量不大 暂时不使用多线程处理
private void process(SmsMessageDto sms){
ISupplier supplier = smsSupplierSelector.getSupplier(sms);
supplier.validation(sms); // 验证的方法就不写了
supplier.send(sms); // 重点 把 这发送的代码展示一下
}

public void getMesForQueue() {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1);
fixedThreadPool.execute(new getQueueList());
}

class getQueueList implements Runnable{
//todo 获取 队列中的message 对象
public void getQueueList() throws InterruptedException{
while(true){
BlockingQueue<SmsMessageDto> queue = SmsQueueFactory.getSmsQueueFactory().getMessageQueue();
SmsMessageDto smsMessage=queue.take();
process(smsMessage);
}
}

@Override
public void run() {
try{
getQueueList();
}catch (Exception e){
log.error("获取队列 中的对象出错"+e.getMessage());
}
}
}


//  这个类是 运营商选择器,默认是使用 阿里大于,如果发送失败了,就使用其他的(企信通),目前就只选用了两个运营商
@Component
public class SmsSupplierSelector {

@Autowired
private ISupplier smsSupplierAliYu ;
@Autowired
private ISupplier smsSupplierQxt ;

private final Map<String ,ISupplier> suppliers = new HashMap<>();

@PostConstruct
private void init(){
suppliers.put("alidayu",smsSupplierAliYu);
suppliers.put("qxt",smsSupplierQxt);
}

public ISupplier getSupplier(SmsMessageDto sms) {

if(StringUtil.isBlank(sms.getSupplier())){
sms.setSupplier("alidayu");
return   smsSupplierAliYu;
}
HashMap<String ,ISupplier> cloneSuppliers = new HashMap<>(suppliers);

cloneSuppliers.remove(sms.getSupplier());

String[] keys = cloneSuppliers.keySet().toArray(new String[0]);
Random random = new Random();
String randomKey = keys[random.nextInt(keys.length)];
sms.setSupplier(randomKey);
return cloneSuppliers.get(randomKey);
}
}


运营商选择器,返回的是运营商对象。两个运营商,每个运营商都有自己的验证和发送的方法,这里展示一下 ,发送的方法。

@Override
public void send(SmsMessageDto sms) {
//todo 使用企信通的发送
try {
String message=sms.getMessage();
String   params = "DesMobile="+sms.getPhone()+"&Content=" + URLEncoder.encode(message, "GBK")
+"&t="+ new Date().getTime();
String logMes = HttpClientUtil.doGet(GET_URL + params);
sms.setSendCount(sms.getSendCount()==null ?0:(sms.getSendCount()+ 1)); //发送的次数加1
String code = XmlUtil.getContentFromXml(logMes, "code");
if (!("01".equals(code) || "03".equals(code))) {
sms.setStatus(2);   //发送失败
}else{
sms.setStatus(3); //发送成功
}
} catch (Exception e) {
log.error("短信发送失败!方法名[send]"+e.getMessage());
}finally {
SmsMessage smsMessage=new SmsMessage();
smsMessage.setId(sms.getId());
smsMessage.setStatus(sms.getStatus());
smsMessage.setSendCount(sms.getSendCount());
smsMessage.setUpdateTime(new Date());
smsMessage.setSupplier(sms.getSupplier());
smsMessage.setRequestJson(JsonUtil.bean2Json(sms));
messageService.updateStatus(smsMessage);

}
} 


@Override
public void send(SmsMessageDto sms)  {
//todo  使用阿里大于发送

TaobaoClient client = new DefaultTaobaoClient("http://gw.api.taobao.com/router/rest", "app-key", "app-secret");
AlibabaAliqinFcSmsNumSendRequest req = new AlibabaAliqinFcSmsNumSendRequest();
req.setExtend(sms.getId()+"sms");
req.setSmsType("normal");
Map<String,String > stringStringMap=sms.getSmsParamMap();
req.setSmsFreeSignName(stringStringMap.get("product"));
req.setSmsParamString(JsonUtil.bean2Json(stringStringMap));
req.setRecNum(sms.getPhone() );
sms.setSendCount(sms.getSendCount()==null ?0:(sms.getSendCount()+ 1));
req.setSmsTemplateCode(sms.getTemplateCode());
try {
client.execute(req);
sms.setSendCount(sms.getSendCount()==null ?(1):(sms.getSendCount()+ 1));
sms.setStatus(1);

SmsMessage smsMessage=new SmsMessage();
smsMessage.setStatus(sms.getStatus()); // 发送状态
smsMessage.setId(sms.getId());
smsMessage.setUpdateTime(new Date());
smsMessage.setSendCount(sms.getSendCount());
smsMessage.setSupplier(sms.getSupplier());
smsMessage.setRequestJson(JsonUtil.bean2Json(sms));
messageService.updateStatus(smsMessage);

} catch (ApiException e) {
log.error("阿里大于发送消息失败,类名[SmsSupplierAliYu]"+e.getMessage());
}
}


企信通的发送是实时接收消息的状态,阿里大鱼是异步的,所以就需要有个守护线程,查询数据库,和阿里服务器,修改消息的状态。

@Scheduled(cron="0/10 * * * * ? ")
// 获取消息的返回参数
public void ckMessageStatusTask() throws  Exception {

try {
//todo 守护线程 ,定时查询数据库,未发送成功的加入队列,重新发送
BlockingQueue<SmsMessageDto> queue = SmsQueueFactory.getSmsQueueFactory().getMessageQueue();
List<SmsMessage> smsMessageList = messageService.selectByStatusAndSendCount();// 要按照时间排序
List<SmsMessage> listAl =new ArrayList<>();
for (SmsMessage smsMessage : smsMessageList) {
if (aLiDaYu.equals(smsMessage.getSupplier())){
listAl.add(smsMessage);
}
}
if (null !=listAl && listAl.size()>0){
getMessageStatusForAli(listAl);
}
smsMessageList.removeAll(listAl);
for (int j = 0; j < smsMessageList.size(); j++) {
SmsMessage smsMessage=smsMessageList.get(j);
if (smsMessage.getStatus()==2 || smsMessage.getStatus()==0){ //
SmsMessageDto smsMessageDto = JsonUtil.json2Bean(smsMessage.getRequestJson(), SmsMessageDto.class);
queue.offer(smsMessageDto, 100, TimeUnit.MILLISECONDS);
}
}
} catch (Exception e) {
log.error("短信守护线程出错,方法名[ckMessageStatusTask]" + e.getMessage());
}
}

private void  getMessageStatusForAli( List<SmsMessage> smsMessageList){

//todo 从阿里大鱼 查询消息的状态
BlockingQueue<SmsMessageDto> queue = SmsQueueFactory.getSmsQueueFactory().getMessageQueue();
      // 这里需要在阿里大于 网站上注册一个用户,并创建应用。使用上面的app证书 创建 client
TaobaoClient client = new DefaultTaobaoClient("http://gw.api.taobao.com/router/rest", "app-key", "app-secret");
AlibabaAliqinFcSmsNumQueryRequest req = new AlibabaAliqinFcSmsNumQueryRequest();
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
Date date = new Date();
try {
for (SmsMessage smsMessage:smsMessageList){
String extend=smsMessage.getId()+"sms";
req.setRecNum(smsMessage.getPhone());
req.setQueryDate(sdf.format(date));
req.setCurrentPage(1L); // 当前页码
req.setPageSize(10L);
AlibabaAliqinFcSmsNumQueryResponse rsp = client.execute(req);
List<AlibabaAliqinFcSmsNumQueryResponse.FcPartnerSmsDetailDto> smsDetailDtos=rsp.getValues();
if (null !=smsDetailDtos){
for (AlibabaAliqinFcSmsNumQueryResponse.FcPartnerSmsDetailDto fcPartnerSmsDetailDto : smsDetailDtos) { // 只有一个 消息
if (extend.equals( fcPartnerSmsDetailDto.getExtend())) {
Integer state = Integer.parseInt(Long.toString(fcPartnerSmsDetailDto.getSmsStatus()));
if (state==3)
smsMessageList.remove(smsMessage);
if (state==2){
SmsMessageDto smsMessageDto = JsonUtil.json2Bean(smsMessage.getRequestJson(), SmsMessageDto.class);
queue.offer(smsMessageDto, 100, TimeUnit.MILLISECONDS);
}
smsMessage.setStatus(state);
messageService.updateMessageStatusById(smsMessage);
// 添加日志
SmsLog smsLog=new SmsLog();
smsLog.setCreateTime(new Date());
smsLog.setMessageId(smsMessage.getId());
smsLog.setMessageDetails(smsMessage.getMessage());
smsLog.setMessageType("(ali)修改记录");
smsLog.setMessageStatus(smsMessage.getStatus());
smsLog.setMessageSupplier("ali");
messageLogService.insertSelective(smsLog);
}
}
}
}
}catch (Exception e){
log.error("查询消息状态出错,方法名[getMessageStatusForAli]"+e.getMessage());
}
}


基本上这么多,就已经把这个服务的代码都展示出来了。 

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: