并发控制
RogueMap 是线程安全的,支持高并发读写操作。本文档介绍 RogueMap 的并发控制机制。
线程安全保证
RogueMap 的所有操作都是线程安全的:
java
RogueMap<String, Long> map = RogueMap.<String, Long>offHeap()
.keyCodec(StringCodec.INSTANCE)
.valueCodec(PrimitiveCodecs.LONG)
.build();
// 多线程安全
ExecutorService executor = Executors.newFixedThreadPool(16);
for (int i = 0; i < 100; i++) {
executor.submit(() -> {
map.put("key", 100L); // 线程安全
Long value = map.get("key"); // 线程安全
});
}SegmentedHashIndex 并发机制
分段锁设计
SegmentedHashIndex 采用 64 个独立段,每个段有独立的锁:
Segment 分布:
┌──────────┬──────────┬──────────┬──────────┐
│ Segment │ Segment │ Segment │ ... │
│ 0 │ 1 │ 2 │ 63 │
└──────────┴──────────┴──────────┴──────────┘
↓ ↓ ↓ ↓
StampedLock StampedLock StampedLock StampedLock
哈希分布:
hash(key) % 64 → Segment IndexStampedLock 乐观锁
每个段使用 StampedLock 实现乐观读:
java
// 读操作流程
long stamp = lock.tryOptimisticRead(); // 1. 获取乐观读戳
V value = doRead(); // 2. 读取数据
if (lock.validate(stamp)) { // 3. 验证读戳
return value; // 4. 验证成功,返回
}
// 验证失败,降级为悲观读
stamp = lock.readLock();
try {
return doRead();
} finally {
lock.unlockRead(stamp);
}写操作流程
java
// 写操作流程
long stamp = lock.writeLock(); // 1. 获取写锁
try {
doWrite(); // 2. 执行写入
} finally {
lock.unlockWrite(stamp); // 3. 释放写锁
}并发优势
- ✅ 减少锁竞争 - 64 个段独立加锁
- ✅ 乐观读无锁 - 大部分读操作无需加锁
- ✅ 读写分离 - 读写操作互不影响(不同段)
- ✅ 高并发性能 - 读性能提升 3-5 倍
LongPrimitiveIndex 并发机制
单锁 + 乐观读
LongPrimitiveIndex 使用单个 StampedLock:
java
// 读操作:乐观读
long stamp = lock.tryOptimisticRead();
V value = doRead();
if (lock.validate(stamp)) {
return value;
}
// 降级为悲观读
stamp = lock.readLock();
try {
return doRead();
} finally {
lock.unlockRead(stamp);
}
// 写操作:写锁
long stamp = lock.writeLock();
try {
doWrite();
} finally {
lock.unlockWrite(stamp);
}并发特点
- ✅ 乐观读无锁
- ⚠️ 高并发写入性能不如 SegmentedHashIndex
- ✅ 适合读多写少场景
并发性能测试
读性能(100 万次操作)
| 索引类型 | 1 线程 | 4 线程 | 16 线程 | 64 线程 |
|---|---|---|---|---|
| HashMap | 200ms | 300ms | 800ms | 2000ms |
| BasicIndex | 210ms | 280ms | 600ms | 1500ms |
| SegmentedHashIndex | 220ms | 150ms | 100ms | 120ms |
| LongPrimitiveIndex | 200ms | 220ms | 250ms | 300ms |
写性能(100 万次操作)
| 索引类型 | 1 线程 | 4 线程 | 16 线程 | 64 线程 |
|---|---|---|---|---|
| HashMap | 250ms | 500ms | 1500ms | 4000ms |
| BasicIndex | 260ms | 450ms | 1200ms | 3000ms |
| SegmentedHashIndex | 270ms | 200ms | 150ms | 180ms |
| LongPrimitiveIndex | 260ms | 300ms | 400ms | 600ms |
混合读写(70% 读,30% 写)
| 索引类型 | 1 线程 | 4 线程 | 16 线程 | 64 线程 |
|---|---|---|---|---|
| SegmentedHashIndex | 230ms | 170ms | 120ms | 140ms |
| LongPrimitiveIndex | 220ms | 250ms | 300ms | 400ms |
最佳实践
1. 高并发场景使用 SegmentedHashIndex
java
// 推荐:高并发 Web 应用
RogueMap<String, User> cache = RogueMap.<String, User>offHeap()
.keyCodec(StringCodec.INSTANCE)
.valueCodec(KryoObjectCodec.create(User.class))
.segmentedIndex(64) // 默认,高并发优化
.build();2. 读多写少使用 LongPrimitiveIndex
java
// 适合:读多写少的 ID 映射
RogueMap<Long, Long> idMap = RogueMap.<Long, Long>offHeap()
.keyCodec(PrimitiveCodecs.LONG)
.valueCodec(PrimitiveCodecs.LONG)
.primitiveIndex() // 节省内存,读多写少
.build();3. 避免长时间持有引用
java
// 好的做法 ✅
Long value = map.get("key");
processValue(value); // 快速处理
// 避免 ❌
Long value = map.get("key");
// 长时间持有 value 引用
Thread.sleep(10000);
processValue(value);4. 批量操作
java
// 批量读取
List<String> keys = Arrays.asList("key1", "key2", "key3");
Map<String, Long> results = new HashMap<>();
for (String key : keys) {
Long value = map.get(key);
if (value != null) {
results.put(key, value);
}
}
// 批量写入
Map<String, Long> updates = new HashMap<>();
updates.put("key1", 100L);
updates.put("key2", 200L);
updates.put("key3", 300L);
for (Map.Entry<String, Long> entry : updates.entrySet()) {
map.put(entry.getKey(), entry.getValue());
}并发陷阱
1. 复合操作非原子性
java
// 非原子性操作 ❌
if (!map.containsKey("key")) {
map.put("key", 100L);
}
// 两个线程可能同时通过 containsKey 检查
// 解决方案:使用 putIfAbsent(如果实现)
// 或使用外部同步
synchronized (lock) {
if (!map.containsKey("key")) {
map.put("key", 100L);
}
}2. 迭代器并发修改
java
// 当前版本不支持迭代器
// 如需遍历,建议先获取所有键
Set<String> keys = map.keySet(); // 如果实现
for (String key : keys) {
Long value = map.get(key);
// 处理
}内存可见性
happens-before 保证
RogueMap 保证以下 happens-before 关系:
java
// Thread 1
map.put("key", 100L); // A
// Thread 2
Long value = map.get("key"); // B
// A happens-before B
// Thread 2 一定能看到 Thread 1 的写入volatile 语义
所有索引都使用了适当的同步机制,保证内存可见性。
死锁预防
RogueMap 内部不会产生死锁,因为:
- ✅ 单一锁定顺序(按段索引)
- ✅ 锁定时间短
- ✅ 无嵌套锁
性能调优
1. 调整段数量
java
// 高并发场景:增加段数
RogueMap<String, Long> map = RogueMap.<String, Long>offHeap()
.keyCodec(StringCodec.INSTANCE)
.valueCodec(PrimitiveCodecs.LONG)
.segmentedIndex(128) // 增加到 128 段
.build();
// 低并发场景:减少段数
RogueMap<String, Long> map = RogueMap.<String, Long>offHeap()
.keyCodec(StringCodec.INSTANCE)
.valueCodec(PrimitiveCodecs.LONG)
.segmentedIndex(32) // 减少到 32 段
.build();2. 线程池大小
java
// CPU 密集型
int threads = Runtime.getRuntime().availableProcessors();
// I/O 密集型
int threads = Runtime.getRuntime().availableProcessors() * 2;
ExecutorService executor = Executors.newFixedThreadPool(threads);监控和诊断
并发性能监控
java
long startTime = System.nanoTime();
// 执行并发操作
ExecutorService executor = Executors.newFixedThreadPool(16);
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
futures.add(executor.submit(() -> {
map.put("key" + i, (long) i);
}));
}
// 等待完成
for (Future<?> future : futures) {
future.get();
}
long endTime = System.nanoTime();
long duration = (endTime - startTime) / 1_000_000; // ms
System.out.println("Duration: " + duration + " ms");