write patch engine
This commit is contained in:
@ -31,6 +31,7 @@ if(STATUS_LENGTH EQUAL 0)
|
|||||||
else()
|
else()
|
||||||
add_definitions(-DGIT_COMMIT_HASH="[developing]")
|
add_definitions(-DGIT_COMMIT_HASH="[developing]")
|
||||||
endif()
|
endif()
|
||||||
|
add_definitions(-DZSTD_STATIC_LINKING_ONLY)
|
||||||
|
|
||||||
# 设置一个布尔类型的选项,用于控制是否启用高级功能,如日志(业务日志、程序日志、容错校验日志)、并发、快照等
|
# 设置一个布尔类型的选项,用于控制是否启用高级功能,如日志(业务日志、程序日志、容错校验日志)、并发、快照等
|
||||||
option(ENABLE_ADVANCED_FEATURE "Enable advanced features" OFF)
|
option(ENABLE_ADVANCED_FEATURE "Enable advanced features" OFF)
|
||||||
|
@ -4,6 +4,8 @@
|
|||||||
#include "map.hpp"
|
#include "map.hpp"
|
||||||
#include "storage/driver.h"
|
#include "storage/driver.h"
|
||||||
#include "vector.hpp"
|
#include "vector.hpp"
|
||||||
|
void GenerateDiff(const std::string &old_file, const std::string &new_file, const std::string &diff_file);
|
||||||
|
void ApplyPatch(const std::string &old_file, const std::string &diff_file, const std::string &new_file, bool is_reverse);
|
||||||
class SnapShotManager {
|
class SnapShotManager {
|
||||||
bool has_connected = false;
|
bool has_connected = false;
|
||||||
sjtu::vector<DataDriverBase *> drivers;
|
sjtu::vector<DataDriverBase *> drivers;
|
||||||
|
@ -1,2 +1,280 @@
|
|||||||
#include "dataguard/snapshot.h"
|
#include "dataguard/snapshot.h"
|
||||||
|
#include <sys/stat.h>
|
||||||
#include <zstd.h>
|
#include <zstd.h>
|
||||||
|
#include <cstdint>
|
||||||
|
#include "storage/config.h"
|
||||||
|
#include "vector.hpp"
|
||||||
|
|
||||||
|
default_numeric_index_t GetFileSize(const std::string &file) {
|
||||||
|
struct stat stat_buf;
|
||||||
|
int rc = stat(file.c_str(), &stat_buf);
|
||||||
|
if (rc != 0) {
|
||||||
|
throw std::runtime_error("stat failed");
|
||||||
|
}
|
||||||
|
return stat_buf.st_size;
|
||||||
|
}
|
||||||
|
struct uint8_t_reader {
|
||||||
|
FILE *f;
|
||||||
|
uint8_t *buf, *p1, *p2;
|
||||||
|
int size;
|
||||||
|
uint8_t_reader(FILE *fin, int bufsize = 1 << 12) {
|
||||||
|
f = fin;
|
||||||
|
size = bufsize;
|
||||||
|
p1 = p2 = 0;
|
||||||
|
buf = new uint8_t[size];
|
||||||
|
}
|
||||||
|
~uint8_t_reader() { delete[] buf; }
|
||||||
|
inline int operator()() { return p1 == p2 && (p2 = (p1 = buf) + fread(buf, 1, size, f), p1 == p2) ? EOF : *p1++; }
|
||||||
|
};
|
||||||
|
struct uint8_t_writer {
|
||||||
|
FILE *f;
|
||||||
|
uint8_t *buf, *p, *end;
|
||||||
|
int size;
|
||||||
|
uint8_t_writer(FILE *fout, int bufsize = 1 << 12) {
|
||||||
|
f = fout;
|
||||||
|
size = bufsize;
|
||||||
|
buf = new uint8_t[size];
|
||||||
|
p = buf;
|
||||||
|
end = buf + bufsize;
|
||||||
|
}
|
||||||
|
void Flush() {
|
||||||
|
fwrite(buf, p - buf, 1, f);
|
||||||
|
p = buf;
|
||||||
|
}
|
||||||
|
~uint8_t_writer() {
|
||||||
|
fwrite(buf, p - buf, 1, f);
|
||||||
|
delete[] buf;
|
||||||
|
}
|
||||||
|
inline uint8_t operator()(uint8_t ch) {
|
||||||
|
if (end == p) [[unlikely]] {
|
||||||
|
fwrite(buf, end - buf, 1, f);
|
||||||
|
p = buf;
|
||||||
|
}
|
||||||
|
return *p++ = ch;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
void GenerateDiff(const std::string &old_file, const std::string &new_file, const std::string &diff_file) {
|
||||||
|
/**
|
||||||
|
* Step 1: compare content of old_file and new_file, write it to buf
|
||||||
|
* Step 2: use zstd to compress buf, write it to diff_file
|
||||||
|
*/
|
||||||
|
sjtu::vector<uint8_t> buf;
|
||||||
|
// Step 1
|
||||||
|
default_numeric_index_t old_file_size = GetFileSize(old_file);
|
||||||
|
default_numeric_index_t new_file_size = GetFileSize(new_file);
|
||||||
|
default_numeric_index_t shared_size = std::min(old_file_size, new_file_size);
|
||||||
|
bool current_is_diff = false;
|
||||||
|
default_numeric_index_t current_diff_len = 0, current_diff_pos = 0;
|
||||||
|
sjtu::vector<uint8_t> diff_buff_in_old, diff_buff_in_new;
|
||||||
|
FILE *old_fp = fopen(old_file.c_str(), "rb");
|
||||||
|
FILE *new_fp = fopen(new_file.c_str(), "rb");
|
||||||
|
uint8_t_reader old_reader(old_fp), new_reader(new_fp);
|
||||||
|
for (size_t i = 0; i < shared_size; i++) {
|
||||||
|
uint8_t o_c = old_reader(), n_c = new_reader();
|
||||||
|
if (o_c == n_c) {
|
||||||
|
if (current_is_diff) {
|
||||||
|
buf.push_back(0);
|
||||||
|
buf.push_back((current_diff_len >> 24) & 0xFF);
|
||||||
|
buf.push_back((current_diff_len >> 16) & 0xFF);
|
||||||
|
buf.push_back((current_diff_len >> 8) & 0xFF);
|
||||||
|
buf.push_back(current_diff_len & 0xFF);
|
||||||
|
buf.push_back((current_diff_pos >> 24) & 0xFF);
|
||||||
|
buf.push_back((current_diff_pos >> 16) & 0xFF);
|
||||||
|
buf.push_back((current_diff_pos >> 8) & 0xFF);
|
||||||
|
buf.push_back(current_diff_pos & 0xFF);
|
||||||
|
for (uint8_t c : diff_buff_in_old) {
|
||||||
|
buf.push_back(c);
|
||||||
|
}
|
||||||
|
for (uint8_t c : diff_buff_in_new) {
|
||||||
|
buf.push_back(c);
|
||||||
|
}
|
||||||
|
current_is_diff = false;
|
||||||
|
current_diff_len = 0;
|
||||||
|
diff_buff_in_old.clear();
|
||||||
|
diff_buff_in_new.clear();
|
||||||
|
} else {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (current_is_diff) {
|
||||||
|
diff_buff_in_old.push_back(o_c);
|
||||||
|
diff_buff_in_new.push_back(n_c);
|
||||||
|
current_diff_len++;
|
||||||
|
} else {
|
||||||
|
current_is_diff = true;
|
||||||
|
current_diff_len = 1;
|
||||||
|
current_diff_pos = i;
|
||||||
|
diff_buff_in_old.push_back(o_c);
|
||||||
|
diff_buff_in_new.push_back(n_c);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (current_is_diff) {
|
||||||
|
buf.push_back(0);
|
||||||
|
buf.push_back((current_diff_len >> 24) & 0xFF);
|
||||||
|
buf.push_back((current_diff_len >> 16) & 0xFF);
|
||||||
|
buf.push_back((current_diff_len >> 8) & 0xFF);
|
||||||
|
buf.push_back(current_diff_len & 0xFF);
|
||||||
|
buf.push_back((current_diff_pos >> 24) & 0xFF);
|
||||||
|
buf.push_back((current_diff_pos >> 16) & 0xFF);
|
||||||
|
buf.push_back((current_diff_pos >> 8) & 0xFF);
|
||||||
|
buf.push_back(current_diff_pos & 0xFF);
|
||||||
|
for (uint8_t c : diff_buff_in_old) {
|
||||||
|
buf.push_back(c);
|
||||||
|
}
|
||||||
|
for (uint8_t c : diff_buff_in_new) {
|
||||||
|
buf.push_back(c);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (old_file_size > shared_size) {
|
||||||
|
buf.push_back(1);
|
||||||
|
for (size_t i = shared_size; i < old_file_size; i++) {
|
||||||
|
buf.push_back(old_reader());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (new_file_size > shared_size) {
|
||||||
|
buf.push_back(2);
|
||||||
|
for (size_t i = shared_size; i < new_file_size; i++) {
|
||||||
|
buf.push_back(new_reader());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Step 2
|
||||||
|
size_t compressed_size_bound = ZSTD_compressBound(buf.size());
|
||||||
|
uint8_t *compressed_buf = new uint8_t[compressed_size_bound];
|
||||||
|
size_t compressed_size = ZSTD_compress(compressed_buf, compressed_size_bound, buf.data(), buf.size(), 12);
|
||||||
|
if (ZSTD_isError(compressed_size)) {
|
||||||
|
delete[] compressed_buf;
|
||||||
|
throw std::runtime_error(ZSTD_getErrorName(compressed_size));
|
||||||
|
}
|
||||||
|
FILE *fp = fopen(diff_file.c_str(), "wb");
|
||||||
|
if (fp == nullptr) {
|
||||||
|
delete[] compressed_buf;
|
||||||
|
throw std::runtime_error("fopen failed");
|
||||||
|
}
|
||||||
|
fwrite(compressed_buf, 1, compressed_size, fp);
|
||||||
|
fclose(fp);
|
||||||
|
delete[] compressed_buf;
|
||||||
|
}
|
||||||
|
void ApplyPatch(const std::string &old_file, const std::string &diff_file, const std::string &new_file,
|
||||||
|
bool is_reverse) {
|
||||||
|
default_numeric_index_t compressed_size = GetFileSize(diff_file);
|
||||||
|
uint8_t *compressed_buf = new uint8_t[compressed_size + 5];
|
||||||
|
FILE *fp = fopen(diff_file.c_str(), "rb");
|
||||||
|
fread(compressed_buf, 1, compressed_size, fp);
|
||||||
|
fclose(fp);
|
||||||
|
size_t decompressed_buf_size = ZSTD_decompressBound(compressed_buf, compressed_size);
|
||||||
|
uint8_t *decompressed_buf = new uint8_t[decompressed_buf_size];
|
||||||
|
size_t decompressed_size = ZSTD_decompress(decompressed_buf, decompressed_buf_size, compressed_buf, compressed_size);
|
||||||
|
if (ZSTD_isError(decompressed_size)) {
|
||||||
|
delete[] compressed_buf;
|
||||||
|
delete[] decompressed_buf;
|
||||||
|
throw std::runtime_error(ZSTD_getErrorName(decompressed_size));
|
||||||
|
}
|
||||||
|
delete[] compressed_buf;
|
||||||
|
fp = fopen(old_file.c_str(), "rb");
|
||||||
|
size_t old_file_size = GetFileSize(old_file);
|
||||||
|
FILE *fp2 = fopen(new_file.c_str(), "wb");
|
||||||
|
uint8_t_reader reader(fp);
|
||||||
|
uint8_t_writer writer(fp2);
|
||||||
|
size_t diff_buf_cnt = 0;
|
||||||
|
size_t reader_cursor = 0;
|
||||||
|
while (diff_buf_cnt < decompressed_size) {
|
||||||
|
uint8_t flag;
|
||||||
|
flag = decompressed_buf[diff_buf_cnt++];
|
||||||
|
if (flag == 0) {
|
||||||
|
default_numeric_index_t current_diff_len = 0, current_diff_pos = 0, tmp = 0;
|
||||||
|
tmp = decompressed_buf[diff_buf_cnt++];
|
||||||
|
current_diff_len |= tmp << 24;
|
||||||
|
tmp = decompressed_buf[diff_buf_cnt++];
|
||||||
|
current_diff_len |= tmp << 16;
|
||||||
|
tmp = decompressed_buf[diff_buf_cnt++];
|
||||||
|
current_diff_len |= tmp << 8;
|
||||||
|
tmp = decompressed_buf[diff_buf_cnt++];
|
||||||
|
current_diff_len |= tmp;
|
||||||
|
tmp = decompressed_buf[diff_buf_cnt++];
|
||||||
|
current_diff_pos |= tmp << 24;
|
||||||
|
tmp = decompressed_buf[diff_buf_cnt++];
|
||||||
|
current_diff_pos |= tmp << 16;
|
||||||
|
tmp = decompressed_buf[diff_buf_cnt++];
|
||||||
|
current_diff_pos |= tmp << 8;
|
||||||
|
tmp = decompressed_buf[diff_buf_cnt++];
|
||||||
|
current_diff_pos |= tmp;
|
||||||
|
while (reader_cursor < current_diff_pos) {
|
||||||
|
writer(reader());
|
||||||
|
reader_cursor++;
|
||||||
|
}
|
||||||
|
uint8_t *old_buf = new uint8_t[current_diff_len];
|
||||||
|
uint8_t *new_buf = new uint8_t[current_diff_len];
|
||||||
|
for (size_t i = 0; i < current_diff_len; i++) {
|
||||||
|
old_buf[i] = decompressed_buf[diff_buf_cnt++];
|
||||||
|
}
|
||||||
|
for (size_t i = 0; i < current_diff_len; i++) {
|
||||||
|
new_buf[i] = decompressed_buf[diff_buf_cnt++];
|
||||||
|
}
|
||||||
|
if (!is_reverse) {
|
||||||
|
for (size_t i = 0; i < current_diff_len; i++) {
|
||||||
|
writer(new_buf[i]);
|
||||||
|
reader_cursor++;
|
||||||
|
reader();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for (size_t i = 0; i < current_diff_len; i++) {
|
||||||
|
writer(old_buf[i]);
|
||||||
|
reader_cursor++;
|
||||||
|
reader();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
delete[] old_buf;
|
||||||
|
delete[] new_buf;
|
||||||
|
} else if (flag == 1) {
|
||||||
|
size_t delta_len = decompressed_size - diff_buf_cnt;
|
||||||
|
if (!is_reverse) {
|
||||||
|
// Just make the last bytes disappear
|
||||||
|
while (reader_cursor < old_file_size - delta_len) {
|
||||||
|
writer(reader());
|
||||||
|
reader_cursor++;
|
||||||
|
}
|
||||||
|
goto ed;
|
||||||
|
} else {
|
||||||
|
while (reader_cursor < old_file_size) {
|
||||||
|
writer(reader());
|
||||||
|
reader_cursor++;
|
||||||
|
}
|
||||||
|
while (diff_buf_cnt < decompressed_size) {
|
||||||
|
writer(decompressed_buf[diff_buf_cnt++]);
|
||||||
|
}
|
||||||
|
goto ed;
|
||||||
|
}
|
||||||
|
} else if (flag == 2) {
|
||||||
|
size_t delta_len = decompressed_size - diff_buf_cnt;
|
||||||
|
if (is_reverse) {
|
||||||
|
// Just make the last bytes disappear
|
||||||
|
while (reader_cursor < old_file_size - delta_len) {
|
||||||
|
writer(reader());
|
||||||
|
reader_cursor++;
|
||||||
|
}
|
||||||
|
goto ed;
|
||||||
|
} else {
|
||||||
|
while (reader_cursor < old_file_size) {
|
||||||
|
writer(reader());
|
||||||
|
reader_cursor++;
|
||||||
|
}
|
||||||
|
while (diff_buf_cnt < decompressed_size) {
|
||||||
|
writer(decompressed_buf[diff_buf_cnt++]);
|
||||||
|
}
|
||||||
|
goto ed;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (reader_cursor < old_file_size) {
|
||||||
|
while (reader_cursor < old_file_size) {
|
||||||
|
writer(reader());
|
||||||
|
reader_cursor++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ed:;
|
||||||
|
writer.Flush();
|
||||||
|
fclose(fp);
|
||||||
|
fclose(fp2);
|
||||||
|
delete[] decompressed_buf;
|
||||||
|
}
|
@ -534,6 +534,8 @@ class vector {
|
|||||||
// allocated_length = new_allocated_length;
|
// allocated_length = new_allocated_length;
|
||||||
// }
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
T *data() { return raw_beg; }
|
||||||
};
|
};
|
||||||
template <typename T>
|
template <typename T>
|
||||||
std::allocator<T> vector<T>::alloc;
|
std::allocator<T> vector<T>::alloc;
|
||||||
|
@ -68,11 +68,15 @@ class DiskMap : public DataDriverBase {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
bool Put(const Key &key, Value &value) {
|
bool Put(const Key &key, Value &value) {
|
||||||
if (indexer->Put(key, data_storage->preview_next_blank())) {
|
b_plus_tree_value_index_t data_id;
|
||||||
data_storage->write(value);
|
data_id = indexer->Get(key);
|
||||||
return true;
|
if (data_id != kInvalidValueIndex) {
|
||||||
}
|
data_storage->update(value, data_id);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
data_id = data_storage->write(value);
|
||||||
|
indexer->Put(key, data_id);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
#endif // DISK_MAP_H
|
#endif // DISK_MAP_H
|
@ -60,18 +60,18 @@ class SingleValueStorage {
|
|||||||
memcpy(raw_mem + n * sizeof(int), &tmp, sizeof(int));
|
memcpy(raw_mem + n * sizeof(int), &tmp, sizeof(int));
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t preview_next_blank() {
|
// size_t preview_next_blank() {
|
||||||
if (first_blank_element_pair_id != 0) return first_blank_element_pair_id;
|
// if (first_blank_element_pair_id != 0) return first_blank_element_pair_id;
|
||||||
frame_id_t frame_id;
|
// frame_id_t frame_id;
|
||||||
BasicPageGuard guard = bpm->NewPageGuarded(&frame_id);
|
// BasicPageGuard guard = bpm->NewPageGuarded(&frame_id);
|
||||||
first_blank_element_pair_id = frame_id * max_element_in_page;
|
// first_blank_element_pair_id = frame_id * max_element_in_page;
|
||||||
for (size_t i = 0; i < max_element_in_page - 1; i++) {
|
// for (size_t i = 0; i < max_element_in_page - 1; i++) {
|
||||||
guard.AsMut<Page>()->dat.elements[i].nxt_blank = first_blank_element_pair_id + i + 1;
|
// guard.AsMut<Page>()->dat.elements[i].nxt_blank = first_blank_element_pair_id + i + 1;
|
||||||
}
|
// }
|
||||||
guard.AsMut<Page>()->dat.elements[max_element_in_page - 1].nxt_blank = 0;
|
// guard.AsMut<Page>()->dat.elements[max_element_in_page - 1].nxt_blank = 0;
|
||||||
guard.AsMut<Page>()->dat.elements_count = 0;
|
// guard.AsMut<Page>()->dat.elements_count = 0;
|
||||||
return first_blank_element_pair_id;
|
// return first_blank_element_pair_id;
|
||||||
}
|
// }
|
||||||
int write(T &t) {
|
int write(T &t) {
|
||||||
size_t element_id = first_blank_element_pair_id;
|
size_t element_id = first_blank_element_pair_id;
|
||||||
size_t res_id = 0;
|
size_t res_id = 0;
|
||||||
|
@ -44,12 +44,12 @@ TEST(Basic, DiskMap) {
|
|||||||
std::map<int, int> std_map;
|
std::map<int, int> std_map;
|
||||||
remove("/tmp/index.db");
|
remove("/tmp/index.db");
|
||||||
remove("/tmp/data.db");
|
remove("/tmp/data.db");
|
||||||
const int total_opts = 10;
|
const int total_opts = 1000000;
|
||||||
{
|
{
|
||||||
DiskMap<int, int> disk_map("index", "/tmp/index.db", "data", "/tmp/data.db");
|
DiskMap<int, int> disk_map("index", "/tmp/index.db", "data", "/tmp/data.db");
|
||||||
for (int i = 0; i < total_opts; i++) {
|
for (int i = 0; i < total_opts; i++) {
|
||||||
int opt_id = rnd() % 100;
|
int opt_id = rnd() % 100;
|
||||||
if (opt_id <= 30) {
|
if (opt_id <= 40) {
|
||||||
if (keys_container.Size() > 0 && rnd() % 5 <= 2) {
|
if (keys_container.Size() > 0 && rnd() % 5 <= 2) {
|
||||||
// overrite and existing key
|
// overrite and existing key
|
||||||
int key = keys_container.GetRandomKey(rnd);
|
int key = keys_container.GetRandomKey(rnd);
|
||||||
@ -91,3 +91,41 @@ TEST(Basic, DiskMap) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(Basic, T1) {
|
||||||
|
remove("/tmp/1.dat");
|
||||||
|
remove("/tmp/2.dat");
|
||||||
|
remove("/tmp/diff.dat");
|
||||||
|
remove("/tmp/3.dat");
|
||||||
|
remove("/tmp/4.dat");
|
||||||
|
const unsigned int RndSeed = testing::GTEST_FLAG(random_seed);
|
||||||
|
std::mt19937 rnd(RndSeed);
|
||||||
|
const int str_len_s1 = 10000;
|
||||||
|
const int str_len_s2 = 9900;
|
||||||
|
char s1[str_len_s1], s2[str_len_s2];
|
||||||
|
for (int i = 0; i < str_len_s1 - 1; i++) {
|
||||||
|
s1[i] = 'a' + rnd() % 26;
|
||||||
|
}
|
||||||
|
s1[str_len_s1 - 1] = '\0';
|
||||||
|
memcpy(s2, s1, str_len_s2);
|
||||||
|
for (int i = 0; i < str_len_s2 - 1; i++) {
|
||||||
|
if (i >= str_len_s1) {
|
||||||
|
s2[i] = 'a' + rnd() % 26;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (rnd() % 3 == 0) {
|
||||||
|
s2[i] = 'a' + rnd() % 26;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s2[str_len_s2 - 1] = '\0';
|
||||||
|
// write to file
|
||||||
|
FILE *fp = fopen("/tmp/1.dat", "wb");
|
||||||
|
fwrite(s1, 1, str_len_s1, fp);
|
||||||
|
fclose(fp);
|
||||||
|
fp = fopen("/tmp/2.dat", "wb");
|
||||||
|
fwrite(s2, 1, str_len_s2, fp);
|
||||||
|
fclose(fp);
|
||||||
|
GenerateDiff("/tmp/1.dat", "/tmp/2.dat", "/tmp/diff.dat");
|
||||||
|
ApplyPatch("/tmp/1.dat", "/tmp/diff.dat", "/tmp/3.dat", false);
|
||||||
|
ApplyPatch("/tmp/2.dat", "/tmp/diff.dat", "/tmp/4.dat", true);
|
||||||
|
}
|
Reference in New Issue
Block a user