Coverage for pySDC/projects/Monodomain/utils/data_management.py: 92%
24 statements
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-20 14:51 +0000
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-20 14:51 +0000
1import sqlite3
2import json
3import os
6class database:
7 def __init__(self, name):
8 self.name = name
9 path = os.path.dirname(self.name)
10 if path != "" and not os.path.exists(path):
11 os.makedirs(path)
12 self.conn = sqlite3.connect(f'{self.name}.db')
13 self.cursor = self.conn.cursor()
15 # def write_arrays(self, table, arrays, columns_names=None):
16 # if not isinstance(arrays, list):
17 # arrays = [arrays]
18 # n = len(arrays)
19 # if columns_names is None:
20 # columns_names = ["val_" + str(i) for i in range(n)]
22 # self.cursor.execute(f'DROP TABLE IF EXISTS {table}')
23 # self.cursor.execute(f'CREATE TABLE {table} ({self._convert_list_str_to_arg(columns_names)})')
24 # self.cursor.execute(f'INSERT INTO {table} VALUES ({self._convert_list_str_to_arg(["?"]*n)})', arrays)
25 # self.conn.commit()
27 def write_dictionary(self, table, dic):
28 self.cursor.execute(f'DROP TABLE IF EXISTS {table}')
29 self.cursor.execute(f'CREATE TABLE {table} (dic TEXT)')
30 self.cursor.execute(f'INSERT INTO {table} VALUES (?)', [json.dumps(dic)])
31 self.conn.commit()
33 # def read_arrays(self, table, columns_names=None, dtype=np.double):
34 # if columns_names is None:
35 # self.cursor.execute(f"SELECT * FROM {table}")
36 # else:
37 # self.cursor.execute(f"SELECT {self._convert_list_str_to_arg(columns_names)} FROM {table}")
38 # result = self.cursor.fetchone()
39 # result_new = list()
40 # for res in result:
41 # result_new.append(np.frombuffer(res, dtype=dtype))
42 # if len(result_new) > 1:
43 # return result_new
44 # else:
45 # return result_new[0]
47 def read_dictionary(self, table):
48 self.cursor.execute(f"SELECT dic FROM {table}")
49 (json_dic,) = self.cursor.fetchone()
50 return json.loads(json_dic)
52 def _convert_list_str_to_arg(self, str_list):
53 return str(str_list).replace("'", "").replace("[", "").replace("]", "")
55 def __del__(self):
56 self.conn.close()
59# def main():
60# data = database("test")
61# a = np.array([1.0, 2.0, 3.0])
62# b = np.array([10.0, 11.0, 12.0])
63# data.write_arrays("ab_table", [a, b], ['a', 'b'])
64# data.write_arrays("a_table", [a], ['a'])
65# data.write_arrays("b_table", b, 'b')
66# data.write_arrays("ab_table_noname", [a, b])
67# data.write_arrays("b_table_noname", b)
69# a_new, b_new = data.read_arrays("ab_table", ['a', 'b'])
70# print(a)
71# print(a_new)
72# print(b)
73# print(b_new)
75# a_new = data.read_arrays("a_table", "a")
76# print(a)
77# print(a_new)
78# b_new = data.read_arrays("b_table", "b")
79# print(b)
80# print(b_new)
82# a_new, b_new = data.read_arrays("ab_table_noname")
83# print(a)
84# print(a_new)
85# print(b)
86# print(b_new)
88# b_new = data.read_arrays("b_table_noname")
89# print(b)
90# print(b_new)
92# dic = {"name": "Giacomo", "age": 33}
93# data.write_dictionary("dic_table", dic)
94# dic_new = data.read_dictionary("dic_table")
95# print(dic)
96# print(dic_new)
98# data_read = database("test")
99# a_new, b_new = data_read.read_arrays("ab_table", ['a', 'b'])
100# print(a)
101# print(a_new)
102# print(b)
103# print(b_new)
106# if __name__ == "__main__":
107# main()