forked from skyzh/mini-lsm
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwal.rs
More file actions
93 lines (86 loc) · 2.92 KB
/
wal.rs
File metadata and controls
93 lines (86 loc) · 2.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
use std::fs::{File, OpenOptions};
use std::hash::Hasher;
use std::io::{BufWriter, Read, Write};
use std::path::Path;
use std::sync::Arc;
use anyhow::{bail, Context, Result};
use bytes::{Buf, BufMut, Bytes};
use crossbeam_skiplist::SkipMap;
use parking_lot::Mutex;
pub struct Wal {
file: Arc<Mutex<BufWriter<File>>>,
}
impl Wal {
pub fn create(path: impl AsRef<Path>) -> Result<Self> {
Ok(Self {
file: Arc::new(Mutex::new(BufWriter::new(
OpenOptions::new()
.read(true)
.create_new(true)
.write(true)
.open(path)
.context("failed to create WAL")?,
))),
})
}
pub fn recover(path: impl AsRef<Path>, skiplist: &SkipMap<Bytes, Bytes>) -> Result<Self> {
let path = path.as_ref();
let mut file = OpenOptions::new()
.read(true)
.append(true)
.open(path)
.context("failed to recover from WAL")?;
let mut buf = Vec::new();
file.read_to_end(&mut buf)?;
let mut rbuf: &[u8] = buf.as_slice();
while rbuf.has_remaining() {
let mut hasher = crc32fast::Hasher::new();
let key_len = rbuf.get_u16() as usize;
hasher.write_u16(key_len as u16);
let key = Bytes::copy_from_slice(&rbuf[..key_len]);
hasher.write(&key);
rbuf.advance(key_len);
let value_len = rbuf.get_u16() as usize;
hasher.write_u16(value_len as u16);
let value = Bytes::copy_from_slice(&rbuf[..value_len]);
hasher.write(&value);
rbuf.advance(value_len);
let checksum = rbuf.get_u32();
if hasher.finalize() != checksum {
bail!("checksum mismatch");
}
skiplist.insert(key, value);
}
Ok(Self {
file: Arc::new(Mutex::new(BufWriter::new(file))),
})
}
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
let mut file = self.file.lock();
let mut buf: Vec<u8> =
Vec::with_capacity(key.len() + value.len() + std::mem::size_of::<u16>());
let mut hasher = crc32fast::Hasher::new();
hasher.write_u16(key.len() as u16);
buf.put_u16(key.len() as u16);
hasher.write(key);
buf.put_slice(key);
hasher.write_u16(value.len() as u16);
buf.put_u16(value.len() as u16);
buf.put_slice(value);
hasher.write(value);
// add checksum: week 2 day 7
buf.put_u32(hasher.finalize());
file.write_all(&buf)?;
Ok(())
}
/// Implement this in week 3, day 5.
pub fn put_batch(&self, _data: &[(&[u8], &[u8])]) -> Result<()> {
unimplemented!()
}
pub fn sync(&self) -> Result<()> {
let mut file = self.file.lock();
file.flush()?;
file.get_mut().sync_all()?;
Ok(())
}
}