diff --git a/CMakeLists.txt b/CMakeLists.txt index 627a08a..f61882e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -31,6 +31,7 @@ if(STATUS_LENGTH EQUAL 0) else() add_definitions(-DGIT_COMMIT_HASH="[developing]") endif() +add_definitions(-DZSTD_STATIC_LINKING_ONLY) # 设置一个布尔类型的选项,用于控制是否启用高级功能,如日志(业务日志、程序日志、容错校验日志)、并发、快照等 option(ENABLE_ADVANCED_FEATURE "Enable advanced features" OFF) diff --git a/dataguard/include/dataguard/snapshot.h b/dataguard/include/dataguard/snapshot.h index c8803f5..40136c3 100644 --- a/dataguard/include/dataguard/snapshot.h +++ b/dataguard/include/dataguard/snapshot.h @@ -4,6 +4,8 @@ #include "map.hpp" #include "storage/driver.h" #include "vector.hpp" +void GenerateDiff(const std::string &old_file, const std::string &new_file, const std::string &diff_file); +void ApplyPatch(const std::string &old_file, const std::string &diff_file, const std::string &new_file, bool is_reverse); class SnapShotManager { bool has_connected = false; sjtu::vector drivers; diff --git a/dataguard/src/snapshot.cpp b/dataguard/src/snapshot.cpp index f46d56d..e5ca522 100644 --- a/dataguard/src/snapshot.cpp +++ b/dataguard/src/snapshot.cpp @@ -1,2 +1,280 @@ #include "dataguard/snapshot.h" +#include #include +#include +#include "storage/config.h" +#include "vector.hpp" + +default_numeric_index_t GetFileSize(const std::string &file) { + struct stat stat_buf; + int rc = stat(file.c_str(), &stat_buf); + if (rc != 0) { + throw std::runtime_error("stat failed"); + } + return stat_buf.st_size; +} +struct uint8_t_reader { + FILE *f; + uint8_t *buf, *p1, *p2; + int size; + uint8_t_reader(FILE *fin, int bufsize = 1 << 12) { + f = fin; + size = bufsize; + p1 = p2 = 0; + buf = new uint8_t[size]; + } + ~uint8_t_reader() { delete[] buf; } + inline int operator()() { return p1 == p2 && (p2 = (p1 = buf) + fread(buf, 1, size, f), p1 == p2) ? EOF : *p1++; } +}; +struct uint8_t_writer { + FILE *f; + uint8_t *buf, *p, *end; + int size; + uint8_t_writer(FILE *fout, int bufsize = 1 << 12) { + f = fout; + size = bufsize; + buf = new uint8_t[size]; + p = buf; + end = buf + bufsize; + } + void Flush() { + fwrite(buf, p - buf, 1, f); + p = buf; + } + ~uint8_t_writer() { + fwrite(buf, p - buf, 1, f); + delete[] buf; + } + inline uint8_t operator()(uint8_t ch) { + if (end == p) [[unlikely]] { + fwrite(buf, end - buf, 1, f); + p = buf; + } + return *p++ = ch; + } +}; +void GenerateDiff(const std::string &old_file, const std::string &new_file, const std::string &diff_file) { + /** + * Step 1: compare content of old_file and new_file, write it to buf + * Step 2: use zstd to compress buf, write it to diff_file + */ + sjtu::vector buf; + // Step 1 + default_numeric_index_t old_file_size = GetFileSize(old_file); + default_numeric_index_t new_file_size = GetFileSize(new_file); + default_numeric_index_t shared_size = std::min(old_file_size, new_file_size); + bool current_is_diff = false; + default_numeric_index_t current_diff_len = 0, current_diff_pos = 0; + sjtu::vector diff_buff_in_old, diff_buff_in_new; + FILE *old_fp = fopen(old_file.c_str(), "rb"); + FILE *new_fp = fopen(new_file.c_str(), "rb"); + uint8_t_reader old_reader(old_fp), new_reader(new_fp); + for (size_t i = 0; i < shared_size; i++) { + uint8_t o_c = old_reader(), n_c = new_reader(); + if (o_c == n_c) { + if (current_is_diff) { + buf.push_back(0); + buf.push_back((current_diff_len >> 24) & 0xFF); + buf.push_back((current_diff_len >> 16) & 0xFF); + buf.push_back((current_diff_len >> 8) & 0xFF); + buf.push_back(current_diff_len & 0xFF); + buf.push_back((current_diff_pos >> 24) & 0xFF); + buf.push_back((current_diff_pos >> 16) & 0xFF); + buf.push_back((current_diff_pos >> 8) & 0xFF); + buf.push_back(current_diff_pos & 0xFF); + for (uint8_t c : diff_buff_in_old) { + buf.push_back(c); + } + for (uint8_t c : diff_buff_in_new) { + buf.push_back(c); + } + current_is_diff = false; + current_diff_len = 0; + diff_buff_in_old.clear(); + diff_buff_in_new.clear(); + } else { + continue; + } + } else { + if (current_is_diff) { + diff_buff_in_old.push_back(o_c); + diff_buff_in_new.push_back(n_c); + current_diff_len++; + } else { + current_is_diff = true; + current_diff_len = 1; + current_diff_pos = i; + diff_buff_in_old.push_back(o_c); + diff_buff_in_new.push_back(n_c); + } + } + } + if (current_is_diff) { + buf.push_back(0); + buf.push_back((current_diff_len >> 24) & 0xFF); + buf.push_back((current_diff_len >> 16) & 0xFF); + buf.push_back((current_diff_len >> 8) & 0xFF); + buf.push_back(current_diff_len & 0xFF); + buf.push_back((current_diff_pos >> 24) & 0xFF); + buf.push_back((current_diff_pos >> 16) & 0xFF); + buf.push_back((current_diff_pos >> 8) & 0xFF); + buf.push_back(current_diff_pos & 0xFF); + for (uint8_t c : diff_buff_in_old) { + buf.push_back(c); + } + for (uint8_t c : diff_buff_in_new) { + buf.push_back(c); + } + } + if (old_file_size > shared_size) { + buf.push_back(1); + for (size_t i = shared_size; i < old_file_size; i++) { + buf.push_back(old_reader()); + } + } + if (new_file_size > shared_size) { + buf.push_back(2); + for (size_t i = shared_size; i < new_file_size; i++) { + buf.push_back(new_reader()); + } + } + // Step 2 + size_t compressed_size_bound = ZSTD_compressBound(buf.size()); + uint8_t *compressed_buf = new uint8_t[compressed_size_bound]; + size_t compressed_size = ZSTD_compress(compressed_buf, compressed_size_bound, buf.data(), buf.size(), 12); + if (ZSTD_isError(compressed_size)) { + delete[] compressed_buf; + throw std::runtime_error(ZSTD_getErrorName(compressed_size)); + } + FILE *fp = fopen(diff_file.c_str(), "wb"); + if (fp == nullptr) { + delete[] compressed_buf; + throw std::runtime_error("fopen failed"); + } + fwrite(compressed_buf, 1, compressed_size, fp); + fclose(fp); + delete[] compressed_buf; +} +void ApplyPatch(const std::string &old_file, const std::string &diff_file, const std::string &new_file, + bool is_reverse) { + default_numeric_index_t compressed_size = GetFileSize(diff_file); + uint8_t *compressed_buf = new uint8_t[compressed_size + 5]; + FILE *fp = fopen(diff_file.c_str(), "rb"); + fread(compressed_buf, 1, compressed_size, fp); + fclose(fp); + size_t decompressed_buf_size = ZSTD_decompressBound(compressed_buf, compressed_size); + uint8_t *decompressed_buf = new uint8_t[decompressed_buf_size]; + size_t decompressed_size = ZSTD_decompress(decompressed_buf, decompressed_buf_size, compressed_buf, compressed_size); + if (ZSTD_isError(decompressed_size)) { + delete[] compressed_buf; + delete[] decompressed_buf; + throw std::runtime_error(ZSTD_getErrorName(decompressed_size)); + } + delete[] compressed_buf; + fp = fopen(old_file.c_str(), "rb"); + size_t old_file_size = GetFileSize(old_file); + FILE *fp2 = fopen(new_file.c_str(), "wb"); + uint8_t_reader reader(fp); + uint8_t_writer writer(fp2); + size_t diff_buf_cnt = 0; + size_t reader_cursor = 0; + while (diff_buf_cnt < decompressed_size) { + uint8_t flag; + flag = decompressed_buf[diff_buf_cnt++]; + if (flag == 0) { + default_numeric_index_t current_diff_len = 0, current_diff_pos = 0, tmp = 0; + tmp = decompressed_buf[diff_buf_cnt++]; + current_diff_len |= tmp << 24; + tmp = decompressed_buf[diff_buf_cnt++]; + current_diff_len |= tmp << 16; + tmp = decompressed_buf[diff_buf_cnt++]; + current_diff_len |= tmp << 8; + tmp = decompressed_buf[diff_buf_cnt++]; + current_diff_len |= tmp; + tmp = decompressed_buf[diff_buf_cnt++]; + current_diff_pos |= tmp << 24; + tmp = decompressed_buf[diff_buf_cnt++]; + current_diff_pos |= tmp << 16; + tmp = decompressed_buf[diff_buf_cnt++]; + current_diff_pos |= tmp << 8; + tmp = decompressed_buf[diff_buf_cnt++]; + current_diff_pos |= tmp; + while (reader_cursor < current_diff_pos) { + writer(reader()); + reader_cursor++; + } + uint8_t *old_buf = new uint8_t[current_diff_len]; + uint8_t *new_buf = new uint8_t[current_diff_len]; + for (size_t i = 0; i < current_diff_len; i++) { + old_buf[i] = decompressed_buf[diff_buf_cnt++]; + } + for (size_t i = 0; i < current_diff_len; i++) { + new_buf[i] = decompressed_buf[diff_buf_cnt++]; + } + if (!is_reverse) { + for (size_t i = 0; i < current_diff_len; i++) { + writer(new_buf[i]); + reader_cursor++; + reader(); + } + } else { + for (size_t i = 0; i < current_diff_len; i++) { + writer(old_buf[i]); + reader_cursor++; + reader(); + } + } + delete[] old_buf; + delete[] new_buf; + } else if (flag == 1) { + size_t delta_len = decompressed_size - diff_buf_cnt; + if (!is_reverse) { + // Just make the last bytes disappear + while (reader_cursor < old_file_size - delta_len) { + writer(reader()); + reader_cursor++; + } + goto ed; + } else { + while (reader_cursor < old_file_size) { + writer(reader()); + reader_cursor++; + } + while (diff_buf_cnt < decompressed_size) { + writer(decompressed_buf[diff_buf_cnt++]); + } + goto ed; + } + } else if (flag == 2) { + size_t delta_len = decompressed_size - diff_buf_cnt; + if (is_reverse) { + // Just make the last bytes disappear + while (reader_cursor < old_file_size - delta_len) { + writer(reader()); + reader_cursor++; + } + goto ed; + } else { + while (reader_cursor < old_file_size) { + writer(reader()); + reader_cursor++; + } + while (diff_buf_cnt < decompressed_size) { + writer(decompressed_buf[diff_buf_cnt++]); + } + goto ed; + } + } + } + if (reader_cursor < old_file_size) { + while (reader_cursor < old_file_size) { + writer(reader()); + reader_cursor++; + } + } +ed:; + writer.Flush(); + fclose(fp); + fclose(fp2); + delete[] decompressed_buf; +} \ No newline at end of file diff --git a/stlite/vector.hpp b/stlite/vector.hpp index 8a9aea9..cfe2b52 100644 --- a/stlite/vector.hpp +++ b/stlite/vector.hpp @@ -534,6 +534,8 @@ class vector { // allocated_length = new_allocated_length; // } } + + T *data() { return raw_beg; } }; template std::allocator vector::alloc; diff --git a/storage/include/storage/disk_map.hpp b/storage/include/storage/disk_map.hpp index 3b3c401..358210a 100644 --- a/storage/include/storage/disk_map.hpp +++ b/storage/include/storage/disk_map.hpp @@ -68,11 +68,15 @@ class DiskMap : public DataDriverBase { return true; } bool Put(const Key &key, Value &value) { - if (indexer->Put(key, data_storage->preview_next_blank())) { - data_storage->write(value); - return true; + b_plus_tree_value_index_t data_id; + data_id = indexer->Get(key); + if (data_id != kInvalidValueIndex) { + data_storage->update(value, data_id); + return false; } - return false; + data_id = data_storage->write(value); + indexer->Put(key, data_id); + return true; } }; #endif // DISK_MAP_H \ No newline at end of file diff --git a/storage/include/storage/single_value_storage.hpp b/storage/include/storage/single_value_storage.hpp index c60d635..77d6c0c 100644 --- a/storage/include/storage/single_value_storage.hpp +++ b/storage/include/storage/single_value_storage.hpp @@ -60,18 +60,18 @@ class SingleValueStorage { memcpy(raw_mem + n * sizeof(int), &tmp, sizeof(int)); } - size_t preview_next_blank() { - if (first_blank_element_pair_id != 0) return first_blank_element_pair_id; - frame_id_t frame_id; - BasicPageGuard guard = bpm->NewPageGuarded(&frame_id); - first_blank_element_pair_id = frame_id * max_element_in_page; - for (size_t i = 0; i < max_element_in_page - 1; i++) { - guard.AsMut()->dat.elements[i].nxt_blank = first_blank_element_pair_id + i + 1; - } - guard.AsMut()->dat.elements[max_element_in_page - 1].nxt_blank = 0; - guard.AsMut()->dat.elements_count = 0; - return first_blank_element_pair_id; - } + // size_t preview_next_blank() { + // if (first_blank_element_pair_id != 0) return first_blank_element_pair_id; + // frame_id_t frame_id; + // BasicPageGuard guard = bpm->NewPageGuarded(&frame_id); + // first_blank_element_pair_id = frame_id * max_element_in_page; + // for (size_t i = 0; i < max_element_in_page - 1; i++) { + // guard.AsMut()->dat.elements[i].nxt_blank = first_blank_element_pair_id + i + 1; + // } + // guard.AsMut()->dat.elements[max_element_in_page - 1].nxt_blank = 0; + // guard.AsMut()->dat.elements_count = 0; + // return first_blank_element_pair_id; + // } int write(T &t) { size_t element_id = first_blank_element_pair_id; size_t res_id = 0; diff --git a/test/snapshot_test.cpp b/test/snapshot_test.cpp index 34df0b1..d5cb608 100644 --- a/test/snapshot_test.cpp +++ b/test/snapshot_test.cpp @@ -44,12 +44,12 @@ TEST(Basic, DiskMap) { std::map std_map; remove("/tmp/index.db"); remove("/tmp/data.db"); - const int total_opts = 10; + const int total_opts = 1000000; { DiskMap disk_map("index", "/tmp/index.db", "data", "/tmp/data.db"); for (int i = 0; i < total_opts; i++) { int opt_id = rnd() % 100; - if (opt_id <= 30) { + if (opt_id <= 40) { if (keys_container.Size() > 0 && rnd() % 5 <= 2) { // overrite and existing key int key = keys_container.GetRandomKey(rnd); @@ -90,4 +90,42 @@ TEST(Basic, DiskMap) { } } } +} + +TEST(Basic, T1) { + remove("/tmp/1.dat"); + remove("/tmp/2.dat"); + remove("/tmp/diff.dat"); + remove("/tmp/3.dat"); + remove("/tmp/4.dat"); + const unsigned int RndSeed = testing::GTEST_FLAG(random_seed); + std::mt19937 rnd(RndSeed); + const int str_len_s1 = 10000; + const int str_len_s2 = 9900; + char s1[str_len_s1], s2[str_len_s2]; + for (int i = 0; i < str_len_s1 - 1; i++) { + s1[i] = 'a' + rnd() % 26; + } + s1[str_len_s1 - 1] = '\0'; + memcpy(s2, s1, str_len_s2); + for (int i = 0; i < str_len_s2 - 1; i++) { + if (i >= str_len_s1) { + s2[i] = 'a' + rnd() % 26; + continue; + } + if (rnd() % 3 == 0) { + s2[i] = 'a' + rnd() % 26; + } + } + s2[str_len_s2 - 1] = '\0'; + // write to file + FILE *fp = fopen("/tmp/1.dat", "wb"); + fwrite(s1, 1, str_len_s1, fp); + fclose(fp); + fp = fopen("/tmp/2.dat", "wb"); + fwrite(s2, 1, str_len_s2, fp); + fclose(fp); + GenerateDiff("/tmp/1.dat", "/tmp/2.dat", "/tmp/diff.dat"); + ApplyPatch("/tmp/1.dat", "/tmp/diff.dat", "/tmp/3.dat", false); + ApplyPatch("/tmp/2.dat", "/tmp/diff.dat", "/tmp/4.dat", true); } \ No newline at end of file