forked from skyzh/mini-lsm
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtable.rs
More file actions
250 lines (225 loc) · 7.93 KB
/
table.rs
File metadata and controls
250 lines (225 loc) · 7.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
pub(crate) mod bloom;
mod builder;
mod iterator;
use std::fs::File;
use std::path::Path;
use std::sync::Arc;
use anyhow::{anyhow, bail, Result};
pub use builder::SsTableBuilder;
use bytes::{Buf, BufMut};
pub use iterator::SsTableIterator;
use crate::block::Block;
use crate::key::{KeyBytes, KeySlice};
use crate::lsm_storage::BlockCache;
use self::bloom::Bloom;
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct BlockMeta {
/// Offset of this data block.
pub offset: usize,
/// The first key of the data block.
pub first_key: KeyBytes,
/// The last key of the data block.
pub last_key: KeyBytes,
}
impl BlockMeta {
/// Encode block meta to a buffer.
pub fn encode_block_meta(block_meta: &[BlockMeta], buf: &mut Vec<u8>) {
let mut estimated_size = std::mem::size_of::<u32>();
for meta in block_meta {
// The size of offset
estimated_size += std::mem::size_of::<u32>();
// The size of key length
estimated_size += std::mem::size_of::<u16>();
// The size of actual key
estimated_size += meta.first_key.len();
// The size of key length
estimated_size += std::mem::size_of::<u16>();
// The size of actual key
estimated_size += meta.last_key.len();
}
estimated_size += std::mem::size_of::<u32>();
// Reserve the space to improve performance, especially when the size of incoming data is
// large
buf.reserve(estimated_size);
let original_len = buf.len();
buf.put_u32(block_meta.len() as u32);
for meta in block_meta {
buf.put_u32(meta.offset as u32);
buf.put_u16(meta.first_key.len() as u16);
buf.put_slice(meta.first_key.raw_ref());
buf.put_u16(meta.last_key.len() as u16);
buf.put_slice(meta.last_key.raw_ref());
}
buf.put_u32(crc32fast::hash(&buf[original_len + 4..]));
assert_eq!(estimated_size, buf.len() - original_len);
}
/// Decode block meta from a buffer.
pub fn decode_block_meta(mut buf: &[u8]) -> Result<Vec<BlockMeta>> {
let mut block_meta = Vec::new();
let num = buf.get_u32() as usize;
let checksum = crc32fast::hash(&buf[..buf.remaining() - 4]);
for _ in 0..num {
let offset = buf.get_u32() as usize;
let first_key_len = buf.get_u16() as usize;
let first_key = KeyBytes::from_bytes(buf.copy_to_bytes(first_key_len));
let last_key_len: usize = buf.get_u16() as usize;
let last_key = KeyBytes::from_bytes(buf.copy_to_bytes(last_key_len));
block_meta.push(BlockMeta {
offset,
first_key,
last_key,
});
}
if buf.get_u32() != checksum {
bail!("meta checksum mismatched");
}
Ok(block_meta)
}
}
/// A file object.
pub struct FileObject(Option<File>, u64);
impl FileObject {
pub fn read(&self, offset: u64, len: u64) -> Result<Vec<u8>> {
use std::os::unix::fs::FileExt;
let mut data = vec![0; len as usize];
self.0
.as_ref()
.unwrap()
.read_exact_at(&mut data[..], offset)?;
Ok(data)
}
pub fn size(&self) -> u64 {
self.1
}
/// Create a new file object (day 2) and write the file to the disk (day 4).
pub fn create(path: &Path, data: Vec<u8>) -> Result<Self> {
std::fs::write(path, &data)?;
File::open(path)?.sync_all()?;
Ok(FileObject(
Some(File::options().read(true).write(false).open(path)?),
data.len() as u64,
))
}
pub fn open(path: &Path) -> Result<Self> {
let file = File::options().read(true).write(false).open(path)?;
let size = file.metadata()?.len();
Ok(FileObject(Some(file), size))
}
}
/// An SSTable.
pub struct SsTable {
/// The actual storage unit of SsTable, the format is as above.
pub(crate) file: FileObject,
/// The meta blocks that hold info for data blocks.
pub(crate) block_meta: Vec<BlockMeta>,
/// The offset that indicates the start point of meta blocks in `file`.
pub(crate) block_meta_offset: usize,
id: usize,
block_cache: Option<Arc<BlockCache>>,
first_key: KeyBytes,
last_key: KeyBytes,
pub(crate) bloom: Option<Bloom>,
max_ts: u64,
}
impl SsTable {
#[cfg(test)]
pub(crate) fn open_for_test(file: FileObject) -> Result<Self> {
Self::open(0, None, file)
}
/// Open SSTable from a file.
pub fn open(id: usize, block_cache: Option<Arc<BlockCache>>, file: FileObject) -> Result<Self> {
let len = file.size();
let raw_bloom_offset = file.read(len - 4, 4)?;
let bloom_offset = (&raw_bloom_offset[..]).get_u32() as u64;
let raw_bloom = file.read(bloom_offset, len - 4 - bloom_offset)?;
let bloom_filter = Bloom::decode(&raw_bloom)?;
let raw_meta_offset = file.read(bloom_offset - 4, 4)?;
let block_meta_offset = (&raw_meta_offset[..]).get_u32() as u64;
let raw_meta = file.read(block_meta_offset, bloom_offset - 4 - block_meta_offset)?;
let block_meta = BlockMeta::decode_block_meta(&raw_meta[..])?;
Ok(Self {
file,
first_key: block_meta.first().unwrap().first_key.clone(),
last_key: block_meta.last().unwrap().last_key.clone(),
block_meta,
block_meta_offset: block_meta_offset as usize,
id,
block_cache,
bloom: Some(bloom_filter),
max_ts: 0,
})
}
/// Create a mock SST with only first key + last key metadata
pub fn create_meta_only(
id: usize,
file_size: u64,
first_key: KeyBytes,
last_key: KeyBytes,
) -> Self {
Self {
file: FileObject(None, file_size),
block_meta: vec![],
block_meta_offset: 0,
id,
block_cache: None,
first_key,
last_key,
bloom: None,
max_ts: 0,
}
}
/// Read a block from the disk.
pub fn read_block(&self, block_idx: usize) -> Result<Arc<Block>> {
let offset = self.block_meta[block_idx].offset;
let offset_end = self
.block_meta
.get(block_idx + 1)
.map_or(self.block_meta_offset, |x| x.offset);
let block_len = offset_end - offset - 4;
let block_data_with_chksum: Vec<u8> = self
.file
.read(offset as u64, (offset_end - offset) as u64)?;
let block_data = &block_data_with_chksum[..block_len];
let checksum = (&block_data_with_chksum[block_len..]).get_u32();
if checksum != crc32fast::hash(block_data) {
bail!("block checksum mismatched");
}
Ok(Arc::new(Block::decode(block_data)))
}
/// Read a block from disk, with block cache.
pub fn read_block_cached(&self, block_idx: usize) -> Result<Arc<Block>> {
if let Some(ref block_cache) = self.block_cache {
let blk = block_cache
.try_get_with((self.id, block_idx), || self.read_block(block_idx))
.map_err(|e| anyhow!("{}", e))?;
Ok(blk)
} else {
self.read_block(block_idx)
}
}
/// Find the block that may contain `key`.
pub fn find_block_idx(&self, key: KeySlice) -> usize {
self.block_meta
.partition_point(|meta| meta.first_key.as_key_slice() <= key)
.saturating_sub(1)
}
/// Get number of data blocks.
pub fn num_of_blocks(&self) -> usize {
self.block_meta.len()
}
pub fn first_key(&self) -> &KeyBytes {
&self.first_key
}
pub fn last_key(&self) -> &KeyBytes {
&self.last_key
}
pub fn table_size(&self) -> u64 {
self.file.1
}
pub fn sst_id(&self) -> usize {
self.id
}
pub fn max_ts(&self) -> u64 {
self.max_ts
}
}