|  | /* | 
|  | Copyright 2020 Google LLC | 
|  |  | 
|  | Use of this source code is governed by a BSD-style | 
|  | license that can be found in the LICENSE file or at | 
|  | https://developers.google.com/open-source/licenses/bsd | 
|  | */ | 
|  |  | 
|  | #include "merged.h" | 
|  |  | 
|  | #include "constants.h" | 
|  | #include "iter.h" | 
|  | #include "pq.h" | 
|  | #include "reader.h" | 
|  | #include "record.h" | 
|  | #include "generic.h" | 
|  | #include "reftable-merged.h" | 
|  | #include "reftable-error.h" | 
|  | #include "system.h" | 
|  |  | 
|  | static int merged_iter_init(struct merged_iter *mi) | 
|  | { | 
|  | int i = 0; | 
|  | for (i = 0; i < mi->stack_len; i++) { | 
|  | struct reftable_record rec = reftable_new_record(mi->typ); | 
|  | int err = iterator_next(&mi->stack[i], &rec); | 
|  | if (err < 0) { | 
|  | return err; | 
|  | } | 
|  |  | 
|  | if (err > 0) { | 
|  | reftable_iterator_destroy(&mi->stack[i]); | 
|  | reftable_record_destroy(&rec); | 
|  | } else { | 
|  | struct pq_entry e = { | 
|  | .rec = rec, | 
|  | .index = i, | 
|  | }; | 
|  | merged_iter_pqueue_add(&mi->pq, e); | 
|  | } | 
|  | } | 
|  |  | 
|  | return 0; | 
|  | } | 
|  |  | 
|  | static void merged_iter_close(void *p) | 
|  | { | 
|  | struct merged_iter *mi = p; | 
|  | int i = 0; | 
|  | merged_iter_pqueue_release(&mi->pq); | 
|  | for (i = 0; i < mi->stack_len; i++) { | 
|  | reftable_iterator_destroy(&mi->stack[i]); | 
|  | } | 
|  | reftable_free(mi->stack); | 
|  | } | 
|  |  | 
|  | static int merged_iter_advance_nonnull_subiter(struct merged_iter *mi, | 
|  | size_t idx) | 
|  | { | 
|  | struct reftable_record rec = reftable_new_record(mi->typ); | 
|  | struct pq_entry e = { | 
|  | .rec = rec, | 
|  | .index = idx, | 
|  | }; | 
|  | int err = iterator_next(&mi->stack[idx], &rec); | 
|  | if (err < 0) | 
|  | return err; | 
|  |  | 
|  | if (err > 0) { | 
|  | reftable_iterator_destroy(&mi->stack[idx]); | 
|  | reftable_record_destroy(&rec); | 
|  | return 0; | 
|  | } | 
|  |  | 
|  | merged_iter_pqueue_add(&mi->pq, e); | 
|  | return 0; | 
|  | } | 
|  |  | 
|  | static int merged_iter_advance_subiter(struct merged_iter *mi, size_t idx) | 
|  | { | 
|  | if (iterator_is_null(&mi->stack[idx])) | 
|  | return 0; | 
|  | return merged_iter_advance_nonnull_subiter(mi, idx); | 
|  | } | 
|  |  | 
|  | static int merged_iter_next_entry(struct merged_iter *mi, | 
|  | struct reftable_record *rec) | 
|  | { | 
|  | struct strbuf entry_key = STRBUF_INIT; | 
|  | struct pq_entry entry = { 0 }; | 
|  | int err = 0; | 
|  |  | 
|  | if (merged_iter_pqueue_is_empty(mi->pq)) | 
|  | return 1; | 
|  |  | 
|  | entry = merged_iter_pqueue_remove(&mi->pq); | 
|  | err = merged_iter_advance_subiter(mi, entry.index); | 
|  | if (err < 0) | 
|  | return err; | 
|  |  | 
|  | /* | 
|  | One can also use reftable as datacenter-local storage, where the ref | 
|  | database is maintained in globally consistent database (eg. | 
|  | CockroachDB or Spanner). In this scenario, replication delays together | 
|  | with compaction may cause newer tables to contain older entries. In | 
|  | such a deployment, the loop below must be changed to collect all | 
|  | entries for the same key, and return new the newest one. | 
|  | */ | 
|  | reftable_record_key(&entry.rec, &entry_key); | 
|  | while (!merged_iter_pqueue_is_empty(mi->pq)) { | 
|  | struct pq_entry top = merged_iter_pqueue_top(mi->pq); | 
|  | struct strbuf k = STRBUF_INIT; | 
|  | int err = 0, cmp = 0; | 
|  |  | 
|  | reftable_record_key(&top.rec, &k); | 
|  |  | 
|  | cmp = strbuf_cmp(&k, &entry_key); | 
|  | strbuf_release(&k); | 
|  |  | 
|  | if (cmp > 0) { | 
|  | break; | 
|  | } | 
|  |  | 
|  | merged_iter_pqueue_remove(&mi->pq); | 
|  | err = merged_iter_advance_subiter(mi, top.index); | 
|  | if (err < 0) { | 
|  | return err; | 
|  | } | 
|  | reftable_record_destroy(&top.rec); | 
|  | } | 
|  |  | 
|  | reftable_record_copy_from(rec, &entry.rec, hash_size(mi->hash_id)); | 
|  | reftable_record_destroy(&entry.rec); | 
|  | strbuf_release(&entry_key); | 
|  | return 0; | 
|  | } | 
|  |  | 
|  | static int merged_iter_next(struct merged_iter *mi, struct reftable_record *rec) | 
|  | { | 
|  | while (1) { | 
|  | int err = merged_iter_next_entry(mi, rec); | 
|  | if (err == 0 && mi->suppress_deletions && | 
|  | reftable_record_is_deletion(rec)) { | 
|  | continue; | 
|  | } | 
|  |  | 
|  | return err; | 
|  | } | 
|  | } | 
|  |  | 
|  | static int merged_iter_next_void(void *p, struct reftable_record *rec) | 
|  | { | 
|  | struct merged_iter *mi = p; | 
|  | if (merged_iter_pqueue_is_empty(mi->pq)) | 
|  | return 1; | 
|  |  | 
|  | return merged_iter_next(mi, rec); | 
|  | } | 
|  |  | 
|  | static struct reftable_iterator_vtable merged_iter_vtable = { | 
|  | .next = &merged_iter_next_void, | 
|  | .close = &merged_iter_close, | 
|  | }; | 
|  |  | 
|  | static void iterator_from_merged_iter(struct reftable_iterator *it, | 
|  | struct merged_iter *mi) | 
|  | { | 
|  | assert(!it->ops); | 
|  | it->iter_arg = mi; | 
|  | it->ops = &merged_iter_vtable; | 
|  | } | 
|  |  | 
|  | int reftable_new_merged_table(struct reftable_merged_table **dest, | 
|  | struct reftable_table *stack, int n, | 
|  | uint32_t hash_id) | 
|  | { | 
|  | struct reftable_merged_table *m = NULL; | 
|  | uint64_t last_max = 0; | 
|  | uint64_t first_min = 0; | 
|  | int i = 0; | 
|  | for (i = 0; i < n; i++) { | 
|  | uint64_t min = reftable_table_min_update_index(&stack[i]); | 
|  | uint64_t max = reftable_table_max_update_index(&stack[i]); | 
|  |  | 
|  | if (reftable_table_hash_id(&stack[i]) != hash_id) { | 
|  | return REFTABLE_FORMAT_ERROR; | 
|  | } | 
|  | if (i == 0 || min < first_min) { | 
|  | first_min = min; | 
|  | } | 
|  | if (i == 0 || max > last_max) { | 
|  | last_max = max; | 
|  | } | 
|  | } | 
|  |  | 
|  | m = reftable_calloc(sizeof(struct reftable_merged_table)); | 
|  | m->stack = stack; | 
|  | m->stack_len = n; | 
|  | m->min = first_min; | 
|  | m->max = last_max; | 
|  | m->hash_id = hash_id; | 
|  | *dest = m; | 
|  | return 0; | 
|  | } | 
|  |  | 
|  | /* clears the list of subtable, without affecting the readers themselves. */ | 
|  | void merged_table_release(struct reftable_merged_table *mt) | 
|  | { | 
|  | FREE_AND_NULL(mt->stack); | 
|  | mt->stack_len = 0; | 
|  | } | 
|  |  | 
|  | void reftable_merged_table_free(struct reftable_merged_table *mt) | 
|  | { | 
|  | if (!mt) { | 
|  | return; | 
|  | } | 
|  | merged_table_release(mt); | 
|  | reftable_free(mt); | 
|  | } | 
|  |  | 
|  | uint64_t | 
|  | reftable_merged_table_max_update_index(struct reftable_merged_table *mt) | 
|  | { | 
|  | return mt->max; | 
|  | } | 
|  |  | 
|  | uint64_t | 
|  | reftable_merged_table_min_update_index(struct reftable_merged_table *mt) | 
|  | { | 
|  | return mt->min; | 
|  | } | 
|  |  | 
|  | static int reftable_table_seek_record(struct reftable_table *tab, | 
|  | struct reftable_iterator *it, | 
|  | struct reftable_record *rec) | 
|  | { | 
|  | return tab->ops->seek_record(tab->table_arg, it, rec); | 
|  | } | 
|  |  | 
|  | static int merged_table_seek_record(struct reftable_merged_table *mt, | 
|  | struct reftable_iterator *it, | 
|  | struct reftable_record *rec) | 
|  | { | 
|  | struct reftable_iterator *iters = reftable_calloc( | 
|  | sizeof(struct reftable_iterator) * mt->stack_len); | 
|  | struct merged_iter merged = { | 
|  | .stack = iters, | 
|  | .typ = reftable_record_type(rec), | 
|  | .hash_id = mt->hash_id, | 
|  | .suppress_deletions = mt->suppress_deletions, | 
|  | }; | 
|  | int n = 0; | 
|  | int err = 0; | 
|  | int i = 0; | 
|  | for (i = 0; i < mt->stack_len && err == 0; i++) { | 
|  | int e = reftable_table_seek_record(&mt->stack[i], &iters[n], | 
|  | rec); | 
|  | if (e < 0) { | 
|  | err = e; | 
|  | } | 
|  | if (e == 0) { | 
|  | n++; | 
|  | } | 
|  | } | 
|  | if (err < 0) { | 
|  | int i = 0; | 
|  | for (i = 0; i < n; i++) { | 
|  | reftable_iterator_destroy(&iters[i]); | 
|  | } | 
|  | reftable_free(iters); | 
|  | return err; | 
|  | } | 
|  |  | 
|  | merged.stack_len = n; | 
|  | err = merged_iter_init(&merged); | 
|  | if (err < 0) { | 
|  | merged_iter_close(&merged); | 
|  | return err; | 
|  | } else { | 
|  | struct merged_iter *p = | 
|  | reftable_malloc(sizeof(struct merged_iter)); | 
|  | *p = merged; | 
|  | iterator_from_merged_iter(it, p); | 
|  | } | 
|  | return 0; | 
|  | } | 
|  |  | 
|  | int reftable_merged_table_seek_ref(struct reftable_merged_table *mt, | 
|  | struct reftable_iterator *it, | 
|  | const char *name) | 
|  | { | 
|  | struct reftable_ref_record ref = { | 
|  | .refname = (char *)name, | 
|  | }; | 
|  | struct reftable_record rec = { NULL }; | 
|  | reftable_record_from_ref(&rec, &ref); | 
|  | return merged_table_seek_record(mt, it, &rec); | 
|  | } | 
|  |  | 
|  | int reftable_merged_table_seek_log_at(struct reftable_merged_table *mt, | 
|  | struct reftable_iterator *it, | 
|  | const char *name, uint64_t update_index) | 
|  | { | 
|  | struct reftable_log_record log = { | 
|  | .refname = (char *)name, | 
|  | .update_index = update_index, | 
|  | }; | 
|  | struct reftable_record rec = { NULL }; | 
|  | reftable_record_from_log(&rec, &log); | 
|  | return merged_table_seek_record(mt, it, &rec); | 
|  | } | 
|  |  | 
|  | int reftable_merged_table_seek_log(struct reftable_merged_table *mt, | 
|  | struct reftable_iterator *it, | 
|  | const char *name) | 
|  | { | 
|  | uint64_t max = ~((uint64_t)0); | 
|  | return reftable_merged_table_seek_log_at(mt, it, name, max); | 
|  | } | 
|  |  | 
|  | uint32_t reftable_merged_table_hash_id(struct reftable_merged_table *mt) | 
|  | { | 
|  | return mt->hash_id; | 
|  | } | 
|  |  | 
|  | static int reftable_merged_table_seek_void(void *tab, | 
|  | struct reftable_iterator *it, | 
|  | struct reftable_record *rec) | 
|  | { | 
|  | return merged_table_seek_record(tab, it, rec); | 
|  | } | 
|  |  | 
|  | static uint32_t reftable_merged_table_hash_id_void(void *tab) | 
|  | { | 
|  | return reftable_merged_table_hash_id(tab); | 
|  | } | 
|  |  | 
|  | static uint64_t reftable_merged_table_min_update_index_void(void *tab) | 
|  | { | 
|  | return reftable_merged_table_min_update_index(tab); | 
|  | } | 
|  |  | 
|  | static uint64_t reftable_merged_table_max_update_index_void(void *tab) | 
|  | { | 
|  | return reftable_merged_table_max_update_index(tab); | 
|  | } | 
|  |  | 
|  | static struct reftable_table_vtable merged_table_vtable = { | 
|  | .seek_record = reftable_merged_table_seek_void, | 
|  | .hash_id = reftable_merged_table_hash_id_void, | 
|  | .min_update_index = reftable_merged_table_min_update_index_void, | 
|  | .max_update_index = reftable_merged_table_max_update_index_void, | 
|  | }; | 
|  |  | 
|  | void reftable_table_from_merged_table(struct reftable_table *tab, | 
|  | struct reftable_merged_table *merged) | 
|  | { | 
|  | assert(!tab->ops); | 
|  | tab->ops = &merged_table_vtable; | 
|  | tab->table_arg = merged; | 
|  | } |