MongoDB自增序列实现 - Java多线程同步 synchronized 用法
2018-01-11 18:21
639 查看
转 : 简书 - MongoDB自增序列实现 - Java多线程同步 synchronized 用法
在使用MongoDB的时候 (基于spring-mongo) ,我想在插入对象时获取有序自增的主键 ,但是MongoDB的默认规则是生成一串无序 (大致有序) 的字串 .而Spring Data提供的主键生成方法也是随机的 String/BigInteger.
因为分布式情况下 ,有序ID会变得困难 ( ID中心/分布式锁 )
* 创建sequence :
* 获取sequence ,
* 保存
但是在多线程情况下 , 2-3可能被多个线程同时运行 ,导致sequence还未保存成功就被下一个获取.
模拟数据库Sequence模型
非同步代码读写sequence
可以从结果看出 ,最终的sequence不是1001(从1开始取1000个) ,说明中途有重复的sequence产生 .原因就是在
同步代码读写sequence
因此set/get必须在一个同步代码块中 ,这段代码每次只能被一个线程访问 .
这次结果正确 ,所有ID都是有序取用的 ,但是运行的时间足足慢了10倍 !! 因为 synchronized 标记加在了
当然 ,同步代码约束了多线程的运行 ,效率上自然有所下降 .不过我们发现 ,我们所取的AB两个Sequence是互不影响的 ,当 A.get/B.get 同时发生也是允许的 ,所以需要调整同步规则 ,只对同一个sequence取用进行同步.
同步指定对象运行代码读写sequence
很容易看出 ,运行时间比上一个少了一半 .因为我们对sequence对象进行同步 ,不同的sequence对象就像多个锁 ,只有争抢同一个锁的人才需要进行等待 .
测试服务
测试结果
可以看到测试成功 ,已经可以获取到有序自增的sequence . 而批量主键的执行速度明显优于数据库存取 ,而且可以减轻数据库压力 ,在不需要严格连续的主键时 ,应该采用批量获取 ,内存发放的方式 ( 分布式ID实现方式之一 ) .
代码见 todolist-server / SequenceServiceTest.java.
在使用MongoDB的时候 (基于spring-mongo) ,我想在插入对象时获取有序自增的主键 ,但是MongoDB的默认规则是生成一串无序 (大致有序) 的字串 .而Spring Data提供的主键生成方法也是随机的 String/BigInteger.
因为分布式情况下 ,有序ID会变得困难 ( ID中心/分布式锁 )
同步问题
获取有序ID的通常做法是 :* 创建sequence :
key-start-end-step-current标识/起值/止值/步长/当前值
* 获取sequence ,
current作为主键值
* 保存
current = current + step到数据库作为下一个主键值
但是在多线程情况下 , 2-3可能被多个线程同时运行 ,导致sequence还未保存成功就被下一个获取.
模拟数据库Sequence模型
@AllArgsConstructor public class Sequence { @Setter long current; public long getCurrent() { transTime(); return current; } public void setCurrent(long current) { transTime(); this.current = current; } private void transTime() { try { Thread.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } } } public class SequenceServiceTest { Sequence A;//A型sequence Sequence B;//B型sequence /** * 初始化数据 */ @Before public void before() { A = new Sequence(1L); B = new Sequence(1L); } /** * 模拟数据库取值 */ private Sequence getSequence(String name) { switch (name) { case "A": return A; case "B": return B; default: return null; } } private void waitService(ExecutorService executorService) { executorService.shutdown(); try { while (!executorService.awaitTermination(1000, TimeUnit.SECONDS)) { } System.out.println("final A:" + A.getCurrent()); System.out.println("final B:" + B.getCurrent()); } catch (InterruptedException e) { e.printStackTrace(); } } }
非同步代码读写sequence
@Test public void noSynchronizedTest() { //10个用户同时需要获取id ExecutorService executorService = Executors.newFixedThreadPool(10); //一部分需要A ,一部分需要B for (int i = 0; i < 1000; i++) { executorService.submit(() -> System.out.println("A:" + getNextWithNoSync("A"))); executorService.submit(() -> System.out.println("B:" + getNextWithNoSync("B"))); } waitService(executorService); } private long getNextWithNoSync(String name) { Sequence sequence = getSequence(name); long current = sequence.getCurrent(); long next = current + 1; sequence.setCurrent(next); return current; }
运行时间 : 1s369ms ... B:360 A:332 B:361 B:361 A:332 A:332 final A:333 final B:362
可以从结果看出 ,最终的sequence不是1001(从1开始取1000个) ,说明中途有重复的sequence产生 .原因就是在
getNextWithNoSync函数 , get取值 在上一个set回写 之前运行 ,造成混乱. (网络传输越慢 ,影响越大)
同步代码读写sequence
因此set/get必须在一个同步代码块中 ,这段代码每次只能被一个线程访问 .
@Test public void synchronizedMethodTest() { ExecutorService executorService = Executors.newFixedThreadPool(10); for (int i = 0; i < 1000; i++) { executorService.submit(() -> System.out.println("A:" + getNextWithSync("A"))); executorService.submit(() -> System.out.println("B:" + getNextWithSync("B"))); } waitService(executorService); } private synchronized long getNextWithSync(String name) { Sequence sequence = getSequence(name); long current = sequence.getCurrent(); long next = current + 1; sequence.setCurrent(next); return current; }
运行时间 : 13s277ms ... A:998 B:999 A:999 B:1000 A:1000 final A:1001 final B:1001
这次结果正确 ,所有ID都是有序取用的 ,但是运行的时间足足慢了10倍 !! 因为 synchronized 标记加在了
getNextWithSync方法上 ,线程会等待只有一个线程能运行这个函数 .差值 = 数据库读写时间*方法执行数量 :
(3+3) * 1000 * 2 = 12,000 ms.
当然 ,同步代码约束了多线程的运行 ,效率上自然有所下降 .不过我们发现 ,我们所取的AB两个Sequence是互不影响的 ,当 A.get/B.get 同时发生也是允许的 ,所以需要调整同步规则 ,只对同一个sequence取用进行同步.
同步指定对象运行代码读写sequence
@Test public void synchronizedObjectTest() { ExecutorService executorService = Executors.newFixedThreadPool(10); for (int i = 0; i < 1000; i++) { executorService.submit(() -> System.out.println("A:" + getNextWithSyncObject("A"))); executorService.submit(() -> System.out.println("B:" + getNextWithSyncObject("B"))); } waitService(executorService); } private long getNextWithSyncObject(String name) { Sequence sequence = getSequence(name); synchronized (sequence) { long current = sequence.getCurrent(); long next = current + 1; sequence.setCurrent(next); return current; } }
运行时间 : 6s869ms ... A:997 B:999 A:998 A:999 B:1000 A:1000 final A:1001 final B:1001
很容易看出 ,运行时间比上一个少了一半 .因为我们对sequence对象进行同步 ,不同的sequence对象就像多个锁 ,只有争抢同一个锁的人才需要进行等待 .
实践
@Service public class SequenceServiceImpl implements SequenceService, ApplicationListener<ContextRefreshedEvent> { final Map<String, SequenceLock> collection2SequenceLock = new ConcurrentHashMap<>(); @Autowired SequenceRepository sequenceRepository; /** * 初始化各sequence的同步锁 */ @Override public void onApplicationEvent(ContextRefreshedEvent event) { //避免spring-mvc中的servlet context事件 if (event.getApplicationContext().getParent() == null) { for (SysSequence sequence : sequenceRepository.findAll()) { collection2SequenceLock.put(sequence.getCollectionName(), new SequenceLock(sequence)); log.info("初始化sequence lock列表 : {} - {}", sequence.getCollectionName(), sequence.getCurrent()); } } } @Override public void register(SysSequence sequence) { sequenceRepository.insert(sequence); } /** * 获取主键 * * @param entity collection对象 */ @Override public Long getNext(Class entity) { String collection = CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_CAMEL, entity.getSimpleName()); Document docAnnotation = (Document) entity.getAnnotation(Document.class); if (docAnnotation != null) { collection = docAnnotation.collection(); } return getNext(collection); } @Override public Long getNext(String collection) { if (!collection2SequenceLock.containsKey(collection)) { throw CheckedException.of("找不到%s表的主键sequence", collection); } return getNextWithLock(collection); } /** * 多线程情况下获取同一个collection的id时 ,可能出现同步问题 ;但又不能影响不同的collection的流程 */ private Long getNextWithLock(String collection) { SequenceLock lock = collection2SequenceLock.get(collection); synchronized (lock) { //已载入的批量主键 if (lock.isBatchInit) { long current = lock.current; long next = current + lock.step; lock.current = next; if (current < lock.max) { log.debug("批量序列分发 - {}:{}", collection, current); return current; } } //获取主键记录值 SysSequence sequence = sequenceRepository.findOne(collection); long current = sequence.getCurrent(); long next; if (sequence.isBatch()) { //未载入的批量主键 : 直接取一段sequence next = current + sequence.getStep() * sequence.getBatchCount(); //缓存到lock中 long batchNext = current + sequence.getStep(); lock.init(batchNext, sequence.getStep(), next); } else { next = current + sequence.getStep(); } if (next > sequence.getEnd()) { throw CheckedException .of("%s表的sequence超限 - next[%s] max[%s]", collection, next, sequence.getEnd()); } sequence.setCurrent(next); sequenceRepository.save(sequence); log.debug("数据库获取 - {}:{}", collection, current); return current; } } /** * 获取sequence的同步lock ,同时用来分发批量主键 */ class SequenceLock { String collection; boolean isBatchInit; long current = -1; long step = -1; long max = -1; SequenceLock(SysSequence sequence) { this.collection = sequence.getCollectionName(); } void init(long current, long step, long max) { this.isBatchInit = true; this.current = current; this.step = step; this.max = max; log.debug("初始化批量序列 - {} ,当前 - {} ,步长 - {} ,分发最大值 - {}", collection, current, step, max); } } }
测试服务
@SpringBootTest(classes = {Application.class}) @RunWith(SpringRunner.class) public class SequenceServiceTest { @Autowired SequenceService sequenceService; @Autowired SequenceRepository sequenceRepository; @Before public void before() { sequenceRepository.save( new SysSequence() .setCollectionName("sys_user") .setStart(0L).setEnd(Long.MAX_VALUE).setStep(1L) .setCurrent(0L) ); sequenceRepository.save( new SysSequence() .setCollectionName("sys_user2") .setStart(0L).setEnd(Long.MAX_VALUE).setStep(10L) .setCurrent(0L) ); sequenceRepository.save( new SysSequence() .setCollectionName("sys_user3") .setStart(0L).setEnd(Long.MAX_VALUE).setStep(1L) .setCurrent(0L) .setBatch(true).setBatchCount(10) ); } @Test public void getSequence1() { for (int i = 0; i < 1000; i++) { sequenceService.getNext(SysUser.class); } Assert.assertEquals(sequenceService.getNext(SysUser.class), Long.valueOf(1000L)); } @Test public void getSequence2() { for (int i = 0; i < 1000; i++) { sequenceService.getNext("sys_user2"); } Assert.assertEquals(sequenceService.getNext("sys_user2"), Long.valueOf(10000L)); } @Test public void getSequence3() { for (int i = 0; i < 1000; i++) { sequenceService.getNext("sys_user3"); } Assert.assertEquals(sequenceService.getNext("sys_user3"), Long.valueOf(1000L)); } }
测试结果
可以看到测试成功 ,已经可以获取到有序自增的sequence . 而批量主键的执行速度明显优于数据库存取 ,而且可以减轻数据库压力 ,在不需要严格连续的主键时 ,应该采用批量获取 ,内存发放的方式 ( 分布式ID实现方式之一 ) .
代码见 todolist-server / SequenceServiceTest.java.
相关文章推荐
- Java多线程同步 – synchronized 用法
- Java中利用synchronized关键字实现多线程同步问题 .
- 使用synchronized和volatile实现Java多线程同步
- Java中利用synchronized关键字实现多线程同步问题
- Java 多线程同步 锁机制与synchronized
- [转]java中synchronized用法
- 最大子序列算法的JAVA实现
- java中synchronized的用法
- java中synchronized用法
- java中synchronized用法
- 最大子序列算法的JAVA实现
- java中synchronized用法
- java的多线程同步互斥:synchronized的乱用
- Java synchronized用法
- java中的synchronized关键字 用法
- Java synchronized用法搜集
- java.lang.ThreadLocal类的实现,用法
- Java为其for循环实现了针对数组和集合类的新用法
- java中synchronized用法(zz)
- java中synchronized用法(zz) 文章出处:http://www.diybl.com/course/3_program/java/javajs/2007917/71563.html