您的位置:首页 > 编程语言 > Java开发

java ee wildfly spring 在线程池的线程中注入

2017-08-31 15:54 337 查看
public class RtmpSpyingTests extends AbstractTransactionalJUnit4SpringContextTests {
@Autowired
ThreadPoolTaskExecutor rtmpSpyingTaskExecutor;

@Autowired
ApplicationContext ctx;

@Autowired
RtmpSourceRepository rtmpRep;

@Test
public void test() {
RtmpSource rtmpSourceSample = new RtmpSource("test");

rtmpRep.save(rtmpSourceSample);
rtmpRep.flush();

List<RtmpSource> rtmpSourceList = rtmpRep.findAll();  // Here I get a list containing rtmpSourceSample

RtmpSpyingTask rtmpSpyingTask = ctx.getBean(RtmpSpyingTask.class,
"arg1","arg2");
rtmpSpyingTaskExecutor.execute(rtmpSpyingTask);

}
}

public class RtmpSpyingTask implements Runnable {

@Autowired
RtmpSourceRepository rtmpRep;

String nameIdCh;
String rtmpUrl;

public RtmpSpyingTask(String nameIdCh, String rtmpUrl) {
this.nameIdCh = nameIdCh;
this.rtmpUrl = rtmpUrl;
}

public void run() {
// Here I should get a list containing rtmpSourceSample, but instead of that
// I get an empty list
List<RtmpSource> rtmpSource = rtmpRep.findAll();
}
}

应该用
@Service
public class AsyncTransactionService {

@Autowired
RtmpSourceRepository rtmpRep;

@Transactional(readOnly = true)
public List<RtmpSource> getRtmpSources() {
return rtmpRep.findAll();
}

@Transactional(propagation = Propagation.REQUIRES_NEW)
public void insertRtmpSource(RtmpSource rtmpSource) {
rtmpRep.save(rtmpSource);
}
}


或者

用内部类。

package com.italktv.platform.audioDist.service;

import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import com.italktv.platform.audioDist.mongo.CustomerRepository;
import com.italktv.platform.audioDist.mongo.PlayUrl;
import com.italktv.platform.audioDist.mongo.PlayUrl.MyUrl;
import com.italktv.platform.audioDist.mongo.PlayUrlRepository;
import com.italktv.platform.audioDist.mysql.SubSet;
import com.italktv.platform.audioDist.mysql.UserRepository;
import com.italktv.platform.audioDist.task.MyTask;
import com.italktv.platform.audioDist.task.TaskManager;

@Component
public class ScheduleJobs {
private static final Logger log = LoggerFactory.getLogger(ScheduleJobs.class);

public final static long SECOND = 1 * 1000;
LocalDateTime nowDate = LocalDateTime.now();

@Autowired
// This means to get the bean called userRepository
// Which is auto-generated by Spring, we will use it to handle the data
private UserRepository userRepository;

@Autowired
private PlayUrlRepository repository;
@Autowired
private CustomerRepository cc;

@Autowired
private UserRepository user;

@Autowired
TaskManager taskManager;

@Scheduled(fixedRate = SECOND * 400)
public void fixedRateJob() {
nowDate = LocalDateTime.now();
System.out.println("=== start distribution: " + nowDate);
dotask();
}

//    @PostConstruct
//    public void init() {
//
//        taskManager = new TaskManager();
//        taskManager.init();
//    }
//
//    @PreDestroy
//    void destroy() {
//        taskManager.destroy();
//    }

void dotask() {

Map<Integer, List<SubSet>> map = userRepository.getUploadFileMap();
for (Entry<Integer, List<SubSet>> subject : map.entrySet()) {
int subjectId = subject.getKey();
log.info(" subject id:" + subjectId);
List<SubSet> allsub = subject.getValue();
for (SubSet item : allsub) {
log.info(" sub:" + item.toString());
taskManager.add(new MessagePublish(item.id, item.path));
}

//wait them finished
//TODO:

//update subject status
//TODO

}

}

////////////////////////内部类////////////////////////
public class MessagePublish  extends MyTask implements Serializable{
public MessagePublish() {
super();
}
public  MessagePublish(int id,String name ){
this.srcFile = name;
this.partId=id;
}

@Value("${platform.audio.dist.domain}") private String domain;

@Override
public String call() {
System.out.println(srcFile + " is uploading...");
try {
//获取消息发布的区域
TimeUnit.SECONDS.sleep(new Random().nextInt(10)+1);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(srcFile + " uploaded.");

//2.RECORD TO MONGO DB
PlayUrl play=new PlayUrl();
play.programid="programid fake"+ "";
play.domain=domain;
play.protocol="HTTP";
MyUrl myurl=new MyUrl();
myurl.high="http://xxx.xxx/xi//";
play.url=myurl;
repository.save(play);
//TODO:

//IF FAILED, RETRY, RECORD RETRY TIMES.
//TODO:

return "ok";
}

}
}

package com.italktv.platform.audioDist.task;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
public class TaskManager {

private static final org.slf4j.Logger logger = LoggerFactory.getLogger(TaskManager.class);

//    @Resource(lookup = "java:comp/DefaultManagedScheduledExecutorService")
//    ManagedScheduledExecutorService executor;

Map<String, Future<String>> tasks;
ExecutorService executor ;
@PostConstruct
public void init() {
logger.info(" === init TaskManager===");
tasks = new HashMap<String, Future<String>>();
executor =   Executors.newFixedThreadPool(3);
}

public void add(MyTask task) {
logger.info("add delay:"+ task.partId+task.srcFile);
Future<String> future = executor.submit(task);
tasks.put(task.srcFile, future);
}

public boolean cancel(String name) {
logger.info("cancel "+ name);
boolean ret = false;
Future<String> future = tasks.get(name);
if (future == null) {
logger.info("Not found name:" + name);
} else {
ret = future.cancel(true);
logger.info("cancel "+ name+":"+ret);
tasks.remove(name);
}
return ret;
}

public void waitTaskDone(){
Collection<Future<String>> futuretasks = tasks.values();
for(Future<String> future: futuretasks ){
System.out.println("future done? " + future.isDone());

String result="";
try {
result = future.get();
} catch (InterruptedException | ExecutionException e) {
logger.error("future exec failed.");
e.printStackTrace();
}

System.out.println("future done? " + future.isDone());
System.out.print("result: " + result);
}
}
@PreDestroy
public void destroy(){
try {
System.out.println("attempt to shutdown executor");
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
System.err.println("tasks interrupted");
}
finally {
if (!executor.isTerminated()) {
System.err.println("cancel non-finished tasks");
}
executor.shutdownNow();
System.out.println("shutdown finished");
}
}
}

package com.italktv.platform.audioDist.task;

import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

public abstract class MyTask implements Callable<String> {
protected String srcFile;
protected int partId;
String programId;

protected MyTask() {

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: