Skip to content

并发控制

RogueMap 是线程安全的,支持高并发读写操作。本文档介绍 RogueMap 的并发控制机制。

线程安全保证

RogueMap 的所有操作都是线程安全的:

java
RogueMap<String, Long> map = RogueMap.<String, Long>offHeap()
    .keyCodec(StringCodec.INSTANCE)
    .valueCodec(PrimitiveCodecs.LONG)
    .build();

// 多线程安全
ExecutorService executor = Executors.newFixedThreadPool(16);
for (int i = 0; i < 100; i++) {
    executor.submit(() -> {
        map.put("key", 100L);  // 线程安全
        Long value = map.get("key");  // 线程安全
    });
}

SegmentedHashIndex 并发机制

分段锁设计

SegmentedHashIndex 采用 64 个独立段,每个段有独立的锁:

Segment 分布:
┌──────────┬──────────┬──────────┬──────────┐
│ Segment  │ Segment  │ Segment  │   ...    │
│    0     │    1     │    2     │   63     │
└──────────┴──────────┴──────────┴──────────┘
     ↓           ↓           ↓          ↓
StampedLock StampedLock StampedLock StampedLock

哈希分布:
hash(key) % 64 → Segment Index

StampedLock 乐观锁

每个段使用 StampedLock 实现乐观读:

java
// 读操作流程
long stamp = lock.tryOptimisticRead();  // 1. 获取乐观读戳
V value = doRead();                      // 2. 读取数据
if (lock.validate(stamp)) {              // 3. 验证读戳
    return value;                        // 4. 验证成功,返回
}
// 验证失败,降级为悲观读
stamp = lock.readLock();
try {
    return doRead();
} finally {
    lock.unlockRead(stamp);
}

写操作流程

java
// 写操作流程
long stamp = lock.writeLock();  // 1. 获取写锁
try {
    doWrite();                  // 2. 执行写入
} finally {
    lock.unlockWrite(stamp);    // 3. 释放写锁
}

并发优势

  • 减少锁竞争 - 64 个段独立加锁
  • 乐观读无锁 - 大部分读操作无需加锁
  • 读写分离 - 读写操作互不影响(不同段)
  • 高并发性能 - 读性能提升 3-5 倍

LongPrimitiveIndex 并发机制

单锁 + 乐观读

LongPrimitiveIndex 使用单个 StampedLock:

java
// 读操作:乐观读
long stamp = lock.tryOptimisticRead();
V value = doRead();
if (lock.validate(stamp)) {
    return value;
}
// 降级为悲观读
stamp = lock.readLock();
try {
    return doRead();
} finally {
    lock.unlockRead(stamp);
}

// 写操作:写锁
long stamp = lock.writeLock();
try {
    doWrite();
} finally {
    lock.unlockWrite(stamp);
}

并发特点

  • ✅ 乐观读无锁
  • ⚠️ 高并发写入性能不如 SegmentedHashIndex
  • ✅ 适合读多写少场景

并发性能测试

读性能(100 万次操作)

索引类型1 线程4 线程16 线程64 线程
HashMap200ms300ms800ms2000ms
BasicIndex210ms280ms600ms1500ms
SegmentedHashIndex220ms150ms100ms120ms
LongPrimitiveIndex200ms220ms250ms300ms

写性能(100 万次操作)

索引类型1 线程4 线程16 线程64 线程
HashMap250ms500ms1500ms4000ms
BasicIndex260ms450ms1200ms3000ms
SegmentedHashIndex270ms200ms150ms180ms
LongPrimitiveIndex260ms300ms400ms600ms

混合读写(70% 读,30% 写)

索引类型1 线程4 线程16 线程64 线程
SegmentedHashIndex230ms170ms120ms140ms
LongPrimitiveIndex220ms250ms300ms400ms

最佳实践

1. 高并发场景使用 SegmentedHashIndex

java
// 推荐:高并发 Web 应用
RogueMap<String, User> cache = RogueMap.<String, User>offHeap()
    .keyCodec(StringCodec.INSTANCE)
    .valueCodec(KryoObjectCodec.create(User.class))
    .segmentedIndex(64)  // 默认,高并发优化
    .build();

2. 读多写少使用 LongPrimitiveIndex

java
// 适合:读多写少的 ID 映射
RogueMap<Long, Long> idMap = RogueMap.<Long, Long>offHeap()
    .keyCodec(PrimitiveCodecs.LONG)
    .valueCodec(PrimitiveCodecs.LONG)
    .primitiveIndex()  // 节省内存,读多写少
    .build();

3. 避免长时间持有引用

java
// 好的做法 ✅
Long value = map.get("key");
processValue(value);  // 快速处理

// 避免 ❌
Long value = map.get("key");
// 长时间持有 value 引用
Thread.sleep(10000);
processValue(value);

4. 批量操作

java
// 批量读取
List<String> keys = Arrays.asList("key1", "key2", "key3");
Map<String, Long> results = new HashMap<>();
for (String key : keys) {
    Long value = map.get(key);
    if (value != null) {
        results.put(key, value);
    }
}

// 批量写入
Map<String, Long> updates = new HashMap<>();
updates.put("key1", 100L);
updates.put("key2", 200L);
updates.put("key3", 300L);
for (Map.Entry<String, Long> entry : updates.entrySet()) {
    map.put(entry.getKey(), entry.getValue());
}

并发陷阱

1. 复合操作非原子性

java
// 非原子性操作 ❌
if (!map.containsKey("key")) {
    map.put("key", 100L);
}
// 两个线程可能同时通过 containsKey 检查

// 解决方案:使用 putIfAbsent(如果实现)
// 或使用外部同步
synchronized (lock) {
    if (!map.containsKey("key")) {
        map.put("key", 100L);
    }
}

2. 迭代器并发修改

java
// 当前版本不支持迭代器
// 如需遍历,建议先获取所有键
Set<String> keys = map.keySet();  // 如果实现
for (String key : keys) {
    Long value = map.get(key);
    // 处理
}

内存可见性

happens-before 保证

RogueMap 保证以下 happens-before 关系:

java
// Thread 1
map.put("key", 100L);  // A

// Thread 2
Long value = map.get("key");  // B
// A happens-before B
// Thread 2 一定能看到 Thread 1 的写入

volatile 语义

所有索引都使用了适当的同步机制,保证内存可见性。

死锁预防

RogueMap 内部不会产生死锁,因为:

  • ✅ 单一锁定顺序(按段索引)
  • ✅ 锁定时间短
  • ✅ 无嵌套锁

性能调优

1. 调整段数量

java
// 高并发场景:增加段数
RogueMap<String, Long> map = RogueMap.<String, Long>offHeap()
    .keyCodec(StringCodec.INSTANCE)
    .valueCodec(PrimitiveCodecs.LONG)
    .segmentedIndex(128)  // 增加到 128 段
    .build();

// 低并发场景:减少段数
RogueMap<String, Long> map = RogueMap.<String, Long>offHeap()
    .keyCodec(StringCodec.INSTANCE)
    .valueCodec(PrimitiveCodecs.LONG)
    .segmentedIndex(32)  // 减少到 32 段
    .build();

2. 线程池大小

java
// CPU 密集型
int threads = Runtime.getRuntime().availableProcessors();

// I/O 密集型
int threads = Runtime.getRuntime().availableProcessors() * 2;

ExecutorService executor = Executors.newFixedThreadPool(threads);

监控和诊断

并发性能监控

java
long startTime = System.nanoTime();

// 执行并发操作
ExecutorService executor = Executors.newFixedThreadPool(16);
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
    futures.add(executor.submit(() -> {
        map.put("key" + i, (long) i);
    }));
}

// 等待完成
for (Future<?> future : futures) {
    future.get();
}

long endTime = System.nanoTime();
long duration = (endTime - startTime) / 1_000_000; // ms
System.out.println("Duration: " + duration + " ms");

下一步

Released under the Apache License 2.0