Coverage for pySDC/projects/Monodomain/utils/data_management.py: 92%

24 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2024-12-20 14:51 +0000

1import sqlite3 

2import json 

3import os 

4 

5 

6class database: 

7 def __init__(self, name): 

8 self.name = name 

9 path = os.path.dirname(self.name) 

10 if path != "" and not os.path.exists(path): 

11 os.makedirs(path) 

12 self.conn = sqlite3.connect(f'{self.name}.db') 

13 self.cursor = self.conn.cursor() 

14 

15 # def write_arrays(self, table, arrays, columns_names=None): 

16 # if not isinstance(arrays, list): 

17 # arrays = [arrays] 

18 # n = len(arrays) 

19 # if columns_names is None: 

20 # columns_names = ["val_" + str(i) for i in range(n)] 

21 

22 # self.cursor.execute(f'DROP TABLE IF EXISTS {table}') 

23 # self.cursor.execute(f'CREATE TABLE {table} ({self._convert_list_str_to_arg(columns_names)})') 

24 # self.cursor.execute(f'INSERT INTO {table} VALUES ({self._convert_list_str_to_arg(["?"]*n)})', arrays) 

25 # self.conn.commit() 

26 

27 def write_dictionary(self, table, dic): 

28 self.cursor.execute(f'DROP TABLE IF EXISTS {table}') 

29 self.cursor.execute(f'CREATE TABLE {table} (dic TEXT)') 

30 self.cursor.execute(f'INSERT INTO {table} VALUES (?)', [json.dumps(dic)]) 

31 self.conn.commit() 

32 

33 # def read_arrays(self, table, columns_names=None, dtype=np.double): 

34 # if columns_names is None: 

35 # self.cursor.execute(f"SELECT * FROM {table}") 

36 # else: 

37 # self.cursor.execute(f"SELECT {self._convert_list_str_to_arg(columns_names)} FROM {table}") 

38 # result = self.cursor.fetchone() 

39 # result_new = list() 

40 # for res in result: 

41 # result_new.append(np.frombuffer(res, dtype=dtype)) 

42 # if len(result_new) > 1: 

43 # return result_new 

44 # else: 

45 # return result_new[0] 

46 

47 def read_dictionary(self, table): 

48 self.cursor.execute(f"SELECT dic FROM {table}") 

49 (json_dic,) = self.cursor.fetchone() 

50 return json.loads(json_dic) 

51 

52 def _convert_list_str_to_arg(self, str_list): 

53 return str(str_list).replace("'", "").replace("[", "").replace("]", "") 

54 

55 def __del__(self): 

56 self.conn.close() 

57 

58 

59# def main(): 

60# data = database("test") 

61# a = np.array([1.0, 2.0, 3.0]) 

62# b = np.array([10.0, 11.0, 12.0]) 

63# data.write_arrays("ab_table", [a, b], ['a', 'b']) 

64# data.write_arrays("a_table", [a], ['a']) 

65# data.write_arrays("b_table", b, 'b') 

66# data.write_arrays("ab_table_noname", [a, b]) 

67# data.write_arrays("b_table_noname", b) 

68 

69# a_new, b_new = data.read_arrays("ab_table", ['a', 'b']) 

70# print(a) 

71# print(a_new) 

72# print(b) 

73# print(b_new) 

74 

75# a_new = data.read_arrays("a_table", "a") 

76# print(a) 

77# print(a_new) 

78# b_new = data.read_arrays("b_table", "b") 

79# print(b) 

80# print(b_new) 

81 

82# a_new, b_new = data.read_arrays("ab_table_noname") 

83# print(a) 

84# print(a_new) 

85# print(b) 

86# print(b_new) 

87 

88# b_new = data.read_arrays("b_table_noname") 

89# print(b) 

90# print(b_new) 

91 

92# dic = {"name": "Giacomo", "age": 33} 

93# data.write_dictionary("dic_table", dic) 

94# dic_new = data.read_dictionary("dic_table") 

95# print(dic) 

96# print(dic_new) 

97 

98# data_read = database("test") 

99# a_new, b_new = data_read.read_arrays("ab_table", ['a', 'b']) 

100# print(a) 

101# print(a_new) 

102# print(b) 

103# print(b_new) 

104 

105 

106# if __name__ == "__main__": 

107# main()