16 #include "leveldb/db.h"
17 #include <json/json.h>
30 #include <unordered_map>
31 #include <unordered_set>
34 #include "scv/scv_util.h"
35 #include "scv/scv_introspection.h"
36 #include "scv/scv_tr.h"
41 using namespace leveldb;
45 #define scv_tr_TEXT_LLU "%I64u"
46 #define scv_tr_TEXT_LLX "%I64x"
47 #define scv_tr_TEXT_16LLX "%016I64x"
49 #define scv_tr_TEXT_LLU "%llu"
50 #define scv_tr_TEXT_LLX "%llx"
51 #define scv_tr_TEXT_16LLX "%016lx"
55 enum EventType { BEGIN, RECORD, END };
56 const char* EventTypeStr[] = {
"BEGIN",
"RECORD",
"END"};
57 using data_type = scv_extensions_if::data_type;
63 Database(
const string& name)
65 wbuilder.settings_[
"indentation"] =
"";
66 CharReaderBuilder::strictMode(&rbuilder.settings_);
68 options.create_if_missing =
true;
69 options.compression = kSnappyCompression;
70 DestroyDB(name, options);
71 if(!DB::Open(options, name, &db).ok())
72 throw runtime_error(
"Could not create database");
73 key_buf =
new char[key_len];
85 inline bool writeEntry(
string& key, Value& val) {
86 return db->Put(write_options, Slice(key.c_str(), key.size()), writeString(wbuilder, val)).ok();
93 inline bool writeEntry(
string&& key, Value& val) {
94 return db->Put(write_options, Slice(key.c_str(), key.size()), writeString(wbuilder, val)).ok();
102 inline void writeStream(uint64_t
id,
string name,
string kind) {
103 auto len = sprintf(key_buf,
"s~" scv_tr_TEXT_16LLX,
id);
104 Value node{objectValue};
108 db->Put(write_options, Slice(key_buf, len), writeString(wbuilder, node));
116 inline void writeGenerator(uint64_t
id,
string name, uint64_t stream) {
117 auto len = sprintf(key_buf,
"sg~" scv_tr_TEXT_16LLX
"~" scv_tr_TEXT_16LLX, stream,
id);
118 Value node{objectValue};
121 node[
"stream"] = stream;
122 db->Put(write_options, Slice(key_buf, len), writeString(wbuilder, node));
130 inline void writeTransaction(uint64_t
id, uint64_t stream_id, uint64_t generator_id, uint64_t concurrencyLevel) {
131 Value val{objectValue};
133 val[
"s"] = stream_id;
134 val[
"g"] = generator_id;
135 val[
"conc"] = concurrencyLevel;
136 db->Put(write_options,
138 sprintf(key_buf,
"sgx~" scv_tr_TEXT_16LLX
"~" scv_tr_TEXT_16LLX
"~" scv_tr_TEXT_16LLX, stream_id, generator_id,
id)),
150 inline void writeTxTimepoint(uint64_t
id, uint64_t streamid, EventType type, uint64_t time) {
152 Value node{arrayValue};
153 auto len = sprintf(key_buf,
"st~" scv_tr_TEXT_16LLX
"~" scv_tr_TEXT_16LLX
"~%s", streamid, time, EventTypeStr[type]);
154 if(db->Get(read_options, key_buf, &value).ok() &&
155 rbuilder.newCharReader()->parse(value.data(), value.data() + value.size(), &node,
nullptr)) {
156 node[node.size()] = Value(
id);
158 node[0u] = Value(
id);
160 db->Put(write_options, Slice(key_buf, len), writeString(wbuilder, node));
161 updateTx(
id, type, time);
164 inline void updateTx(uint64_t
id, EventType type, uint64_t time) {
165 static const char* typeStr[] = {
"START_TIME",
"",
"END_TIME"};
166 auto& node = tx_lut[id];
167 node[typeStr[
type]] = time;
169 db->Put(write_options, Slice(key_buf, sprintf(key_buf,
"x~" scv_tr_TEXT_16LLX,
id)), writeString(wbuilder, node));
174 inline void updateTx(uint64_t
id, Value&& val) {
175 auto len = sprintf(key_buf,
"x~" scv_tr_TEXT_16LLX,
id);
176 auto& node = tx_lut[id];
177 auto& arrNode = node[
"attr"];
178 if(arrNode.isNull()) {
179 Value newNode{arrayValue};
181 node[
"attr"] = newNode;
195 inline void writeAttribute(uint64_t
id, EventType event,
const string& name, data_type type,
const string& value) {
199 val[
"value"] = value;
200 val[
"assoc"] = event;
201 updateTx(
id, move(val));
211 inline void writeAttribute(uint64_t
id, EventType event,
const string& name, data_type type, uint64_t value) {
215 val[
"value"] = value;
216 val[
"assoc"] = event;
217 updateTx(
id, move(val));
227 inline void writeAttribute(uint64_t
id, EventType event,
const string& name, data_type type,
double value) {
231 val[
"value"] = value;
232 val[
"assoc"] = event;
233 updateTx(
id, move(val));
241 inline void writeRelation(
const string& name, uint64_t sink_id, uint64_t src_id) {
242 if(key_len < (name.size() + 32 + 5)) {
244 key_len = name.size() + 32 + 5;
245 key_buf =
new char[key_len];
247 db->Put(write_options,
248 Slice(key_buf, sprintf(key_buf,
"ro~" scv_tr_TEXT_16LLX
"~" scv_tr_TEXT_16LLX
"~%s", src_id, sink_id, name.c_str())),
"");
249 db->Put(write_options,
250 Slice(key_buf, sprintf(key_buf,
"ri~" scv_tr_TEXT_16LLX
"~" scv_tr_TEXT_16LLX
"~%s", sink_id, src_id, name.c_str())),
"");
255 ReadOptions read_options;
256 WriteOptions write_options;
259 StreamWriterBuilder wbuilder;
260 CharReaderBuilder rbuilder;
261 unordered_map<uint64_t, Value> tx_lut;
264 vector<vector<uint64_t>> concurrencyLevel;
268 void dbCb(
const scv_tr_db& _scv_tr_db, scv_tr_db::callback_reason reason,
void* data) {
270 static string fName(
"DEFAULT_scv_tr_sqlite");
272 case scv_tr_db::CREATE:
273 if((_scv_tr_db.get_name() !=
nullptr) && (strlen(_scv_tr_db.get_name()) != 0))
274 fName = _scv_tr_db.get_name();
276 db =
new Database(fName);
277 Value val{objectValue};
278 val[
"resolution"] = (long)(sc_get_time_resolution().to_seconds() * 1e15);
279 db->writeEntry(
"__config", val);
280 }
catch(runtime_error& e) {
281 _scv_message::message(_scv_message::TRANSACTION_RECORDING_INTERNAL, e.what());
283 _scv_message::message(_scv_message::TRANSACTION_RECORDING_INTERNAL,
"Can't open recording file");
286 case scv_tr_db::DELETE:
290 _scv_message::message(_scv_message::TRANSACTION_RECORDING_INTERNAL,
"Can't close recording file");
294 _scv_message::message(_scv_message::TRANSACTION_RECORDING_INTERNAL,
"Unknown reason in scv_tr_db callback");
298 void streamCb(
const scv_tr_stream& s, scv_tr_stream::callback_reason reason,
void* data) {
299 if(reason == scv_tr_stream::CREATE) {
301 db->writeStream(s.get_id(), s.get_name(), s.get_stream_kind());
302 }
catch(runtime_error& e) {
303 _scv_message::message(_scv_message::TRANSACTION_RECORDING_INTERNAL,
"Can't create stream");
308 void recordAttribute(uint64_t
id, EventType event,
const string& name, data_type type,
const string& value) {
310 db->writeAttribute(
id, event, name, type, value);
311 }
catch(runtime_error& e) {
312 _scv_message::message(_scv_message::TRANSACTION_RECORDING_INTERNAL,
"Can't create attribute entry");
316 void recordAttribute(uint64_t
id, EventType event,
const string& name, data_type type,
long long value) {
318 db->writeAttribute(
id, event, name, type,
static_cast<uint64_t
>(value));
319 }
catch(runtime_error& e) {
320 _scv_message::message(_scv_message::TRANSACTION_RECORDING_INTERNAL,
"Can't create attribute entry");
324 inline void recordAttribute(uint64_t
id, EventType event,
const string& name, data_type type,
double value) {
326 db->writeAttribute(
id, event, name, type, value);
327 }
catch(runtime_error& e) {
328 _scv_message::message(_scv_message::TRANSACTION_RECORDING_INTERNAL,
"Can't create attribute entry");
332 void recordAttributes(uint64_t
id, EventType eventType,
string& prefix,
const scv_extensions_if* my_exts_p) {
333 if(my_exts_p ==
nullptr)
337 name = my_exts_p->get_name();
339 if((my_exts_p->get_name() ==
nullptr) || (strlen(my_exts_p->get_name()) == 0)) {
342 name = prefix +
"." + my_exts_p->get_name();
347 switch(my_exts_p->get_type()) {
348 case scv_extensions_if::RECORD: {
349 int num_fields = my_exts_p->get_num_fields();
351 for(
int field_counter = 0; field_counter < num_fields; field_counter++) {
352 const scv_extensions_if* field_data_p = my_exts_p->get_field(field_counter);
353 recordAttributes(
id, eventType, prefix, field_data_p);
357 case scv_extensions_if::ENUMERATION:
358 recordAttribute(
id, eventType, name, scv_extensions_if::ENUMERATION, my_exts_p->get_enum_string((
int)(my_exts_p->get_integer())));
360 case scv_extensions_if::BOOLEAN:
361 recordAttribute(
id, eventType, name, scv_extensions_if::BOOLEAN, my_exts_p->get_bool() ?
"TRUE" :
"FALSE");
363 case scv_extensions_if::INTEGER:
364 case scv_extensions_if::FIXED_POINT_INTEGER:
365 recordAttribute(
id, eventType, name, scv_extensions_if::INTEGER, my_exts_p->get_integer());
367 case scv_extensions_if::UNSIGNED:
368 recordAttribute(
id, eventType, name, scv_extensions_if::UNSIGNED, my_exts_p->get_integer());
370 case scv_extensions_if::POINTER:
371 recordAttribute(
id, eventType, name, scv_extensions_if::POINTER, (
long long)my_exts_p->get_pointer());
373 case scv_extensions_if::STRING:
374 recordAttribute(
id, eventType, name, scv_extensions_if::STRING, my_exts_p->get_string());
376 case scv_extensions_if::FLOATING_POINT_NUMBER:
377 recordAttribute(
id, eventType, name, scv_extensions_if::FLOATING_POINT_NUMBER, my_exts_p->get_double());
379 case scv_extensions_if::BIT_VECTOR: {
380 sc_bv_base tmp_bv(my_exts_p->get_bitwidth());
381 my_exts_p->get_value(tmp_bv);
382 recordAttribute(
id, eventType, name, scv_extensions_if::BIT_VECTOR, tmp_bv.to_string());
384 case scv_extensions_if::LOGIC_VECTOR: {
385 sc_lv_base tmp_lv(my_exts_p->get_bitwidth());
386 my_exts_p->get_value(tmp_lv);
387 recordAttribute(
id, eventType, name, scv_extensions_if::LOGIC_VECTOR, tmp_lv.to_string());
389 case scv_extensions_if::ARRAY:
390 for(
int array_elt_index = 0; array_elt_index < my_exts_p->get_array_size(); array_elt_index++) {
391 const scv_extensions_if* field_data_p = my_exts_p->get_array_elt(array_elt_index);
392 recordAttributes(
id, eventType, prefix, field_data_p);
396 array<char, 100> tmpString;
397 sprintf(tmpString.data(),
"Unsupported attribute type = %d", my_exts_p->get_type());
398 _scv_message::message(_scv_message::TRANSACTION_RECORDING_INTERNAL, tmpString.data());
403 void generatorCb(
const scv_tr_generator_base& g, scv_tr_generator_base::callback_reason reason,
void* data) {
404 if(reason == scv_tr_generator_base::CREATE && db) {
406 db->writeGenerator(g.get_id(), g.get_name(), g.get_scv_tr_stream().get_id());
407 }
catch(runtime_error& e) {
408 _scv_message::message(_scv_message::TRANSACTION_RECORDING_INTERNAL,
"Can't create generator entry");
413 void transactionCb(
const scv_tr_handle& t, scv_tr_handle::callback_reason reason,
void* data) {
416 if(t.get_scv_tr_stream().get_scv_tr_db() ==
nullptr)
418 if(t.get_scv_tr_stream().get_scv_tr_db()->get_recording() ==
false)
421 uint64_t
id = t.get_id();
422 uint64_t streamId = t.get_scv_tr_stream().get_id();
423 vector<uint64_t>::size_type concurrencyIdx;
424 const scv_extensions_if* my_exts_p;
426 case scv_tr_handle::BEGIN: {
428 if(concurrencyLevel.size() <= streamId)
429 concurrencyLevel.resize(streamId + 1);
430 vector<uint64_t>& levels = concurrencyLevel.at(streamId);
431 for(concurrencyIdx = 0; concurrencyIdx < levels.size(); ++concurrencyIdx)
432 if(levels[concurrencyIdx] == 0)
434 if(concurrencyIdx == levels.size())
435 levels.push_back(
id);
437 levels[concurrencyIdx] = id;
438 db->writeTransaction(
id, t.get_scv_tr_stream().get_id(), t.get_scv_tr_generator_base().get_id(), concurrencyIdx);
439 db->writeTxTimepoint(
id, t.get_scv_tr_stream().get_id(), BEGIN, t.get_begin_sc_time().value());
440 }
catch(runtime_error& e) {
441 _scv_message::message(_scv_message::TRANSACTION_RECORDING_INTERNAL, e.what());
443 my_exts_p = t.get_begin_exts_p();
444 if(my_exts_p ==
nullptr)
445 my_exts_p = t.get_scv_tr_generator_base().get_begin_exts_p();
448 t.get_scv_tr_generator_base().get_begin_attribute_name() ? t.get_scv_tr_generator_base().get_begin_attribute_name() :
"";
449 recordAttributes(
id, BEGIN, tmp_str, my_exts_p);
452 case scv_tr_handle::END: {
453 my_exts_p = t.get_end_exts_p();
454 if(my_exts_p ==
nullptr)
455 my_exts_p = t.get_scv_tr_generator_base().get_end_exts_p();
458 t.get_scv_tr_generator_base().get_end_attribute_name() ? t.get_scv_tr_generator_base().get_end_attribute_name() :
"";
459 recordAttributes(t.get_id(), END, tmp_str, my_exts_p);
462 db->writeTxTimepoint(
id, t.get_scv_tr_stream().get_id(), END, t.get_end_sc_time().value());
463 vector<uint64_t>& levels = concurrencyLevel[streamId];
464 for(concurrencyIdx = 0; concurrencyIdx < levels.size(); ++concurrencyIdx)
465 if(levels[concurrencyIdx] ==
id)
467 if(concurrencyIdx == levels.size())
470 levels[concurrencyIdx] = 0;
471 }
catch(runtime_error& e) {
472 _scv_message::message(_scv_message::TRANSACTION_RECORDING_INTERNAL,
"Can't create transaction end");
479 void attributeCb(
const scv_tr_handle& t,
const char* name,
const scv_extensions_if* ext,
void* data) {
482 if(t.get_scv_tr_stream().get_scv_tr_db() ==
nullptr)
484 if(t.get_scv_tr_stream().get_scv_tr_db()->get_recording() ==
false)
486 string tmp_str(name ==
nullptr ?
"" : name);
487 recordAttributes(t.get_id(), RECORD, tmp_str, ext);
490 void relationCb(
const scv_tr_handle& tr_1,
const scv_tr_handle& tr_2,
void* data, scv_tr_relation_handle_t relation_handle) {
493 if(tr_1.get_scv_tr_stream().get_scv_tr_db() ==
nullptr)
495 if(tr_1.get_scv_tr_stream().get_scv_tr_db()->get_recording() ==
false)
498 db->writeRelation(tr_1.get_scv_tr_stream().get_scv_tr_db()->get_relation_name(relation_handle), tr_1.get_id(), tr_2.get_id());
499 }
catch(runtime_error& e) {
500 _scv_message::message(_scv_message::TRANSACTION_RECORDING_INTERNAL,
"Can't create transaction relation");
505 void scv_tr_ldb_init() {
506 scv_tr_db::register_class_cb(dbCb);
507 scv_tr_stream::register_class_cb(streamCb);
508 scv_tr_generator_base::register_class_cb(generatorCb);
509 scv_tr_handle::register_class_cb(transactionCb);
510 scv_tr_handle::register_record_attribute_cb(attributeCb);
511 scv_tr_handle::register_relation_cb(relationCb);