ATLAS Offline Software
Loading...
Searching...
No Matches
AtlTriggerDBCopy.py
Go to the documentation of this file.
1#!/usr/bin/env python
2
3# Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
4
5# script to produce a one-to-one copy of the oracle Run 2 MC TriggerDB to sqlite
6
7from TrigConfIO.TriggerConfigAccessBase import ConfigDBLoader
8from copy import deepcopy
9import sqlite3
10
12 import argparse
13 parser = argparse.ArgumentParser(prog = "AtlTriggerDBCopy.py", description="Example: %(prog)s -d TRIGGERDBMC -c triggerDBMC_Run2.db")
14 parser.add_argument("-d", "--dbalias", dest="dbalias", help="TriggerDB connection alias for the source DB")
15 parser.add_argument("-c", "--create", dest="createfile", help="create sqlite db file")
16 parser.add_argument("-v", help="increase output verbosity", action="count", default=0)
17 args = parser.parse_args()
18 if not args.dbalias:
19 print("No source Trigger DB specified")
20 parser.print_help()
21 return None
22 if not args.createfile:
23 print("No sqlite target file specified")
24 parser.print_help(parser)
25 return None
26 return args
27
28# class to handle the oracle db connection
29# it uses the CORAL authentication method implemented in the TrigConfIO.TriggerConfigAccessBase (ConfigDBLoader)
31 def __init__(self, dbalias):
32 self.dbalias = dbalias
33 self.connection = None
34 self.schema = None
35
36 def __enter__(self):
37 credentials = ConfigDBLoader.getConnectionParameters(self.dbalias)
38 self.connection, self.schema = ConfigDBLoader.getConnection(credentials)
39 print(f"Opening connection to {self.dbalias}")
40 return self
41
42 def __exit__(self, type, value, traceback):
43 self.connection.close()
44 print(f"Closing connection to {self.dbalias}" )
45
46 def cursor(self):
47 return self.connection.cursor()
48
49 def schema(self):
50 return self.schema
51
52# class to handle the insertion of schema and data on the sqlite side
54
55 def __init__(self, filename = None):
56 self.filename = filename
57 self.connection = None
58 self.activeTransaction = False
61 if self.filename is None:
62 print("Not connected to an output file, will print the insert commands instead")
63 import os
64 if filename and os.path.exists(filename):
65 print(f"Target file {filename} exists already, will abort in order to prevent overwriting")
66 raise RuntimeError("Target file already exists")
67 if self.filename is not None:
68 self.connection = sqlite3.connect(self.filename)
69 self.cursor = self.connection.cursor()
70 def commit(self):
71 if self.activeTransaction:
72 self.cursor.execute('COMMIT')
73 self.activeTransaction = False
74 self.insCountInTrans = 0
75
76 def insert(self, statement, data):
77 if self.connection is None:
78 return
79 if not self.activeTransaction:
80 self.cursor.execute('BEGIN TRANSACTION')
81 self.activeTransaction = True
82 self.cursor.execute(statement, data)
83 self.insCountInTrans += 1
85 self.commit()
86
87 def insertBulk(self, statement, data):
88 if self.connection is None:
89 return
90 self.cursor.executemany(statement, iter(data))
91 self.connection.commit()
92
93 def createSchema(self, creationFileName):
94 if self.connection is None:
95 return
96 with open(creationFileName) as fh:
97 print("Creating sqlite db %s from %s" % (self.filename, creationFileName) )
98 self.cursor.executescript( fh.read() )
99
100
101# class to handle the extraction of schema and data on the oracle side
103
104 def __init__(self, connection):
105 self.connection = connection
106 self.primaryKeys = None
107 self.foreignKeys = None
108 self.indexes = None
109 self.allTables = None
110 self.ignoreTablesCreate = [ 'TT_WRITELOCK', 'HLT_SMT_TO_HRE', 'HLT_PARAMETER', 'HLT_RELEASE', 'TRIGGER_LOG',
111 'DBCOPY_SOURCE_DATABASE', 'HLT_RULE_SET', 'TEMP', 'HLT_RULE_PARAMETER', 'HLT_RULE_COMPONENT',
112 'HLT_SETUP', 'TT_USERS', 'HLT_RULE', "HLT_HRC_TO_HRP", "HLT_HRE_TO_HRS",
113 "HLT_HRS_TO_HRU", "HLT_HRU_TO_HRC", "HLT_PRESCALE_SET_ALIAS", "HLT_PRESCALE_SET_COLL",
114 "PRESCALE_SET_ALIAS", "TRIGGER_ALIAS", "L1_PRESCALE_SET_ALIAS", "L1_CALO_SIN_COS",
115 "L1_CI_TO_CSC", "L1_JET_INPUT", "L1_MUON_THRESHOLD_SET"]
117 print("Tables to ignore creation: %r" % len(self.ignoreTablesCreate))
118 print("Tables to ignore filling: %r" % len(self.ignoreTablesFill))
119
120
121 def getTables(self, exceptTables):
122 if self.allTables is None:
123 query_ListAllTables = "SELECT table_name FROM all_tables WHERE owner=:SCHEMA"
124 data = { "SCHEMA" : self.connection.schema }
125 result = self.executeQuery(query_ListAllTables, data)
126 self.allTables = [x[0] for x in result]
127 print("All tables: %i" % len(self.allTables))
128 tables = deepcopy(self.allTables)
129 if exceptTables is not None:
130 for x in exceptTables:
131 if x in tables:
132 tables.remove(x)
133 return tables
134
135 def executeQuery(self, query, data = {}):
136 cursor = self.connection.cursor()
137 cursor.arraysize = 1000
138 cursor.execute( query, **data )
139 return cursor
140
141 def tableSize(self,tableName):
142 query_TableSize = "SELECT COUNT(*) FROM {schema}.{tableName}"
143 qdict = { "schema" : self.connection.schema, "tableName" : tableName }
144 result = self.executeQuery( query_TableSize.format(**qdict) )
145 row = result.fetchone()
146 return row[0]
147
148 def columnNames(self,tableName):
149 query_ColumnNames = "SELECT column_name, data_type FROM sys.all_tab_columns WHERE owner = :SCHEMA AND table_name = :TABLE_NAME ORDER BY column_id"
150 data = { "SCHEMA" : self.connection.schema, "TABLE_NAME" : tableName }
151 result = self.executeQuery( query = query_ColumnNames, data = data )
152 colNames, colTypes = zip(*result)
153 return colNames, colTypes
154
155 def getIndexes(self):
156 if self.indexes is not None:
157 return self.indexes
158 self.indexes = {}
159 print("retrieving indexes from Oracle")
160 query_Indexes = """
161 SELECT table_name, index_name, column_name FROM sys.all_ind_columns WHERE table_owner = :SCHEMA
162 """
163 data = { "SCHEMA" : self.connection.schema }
164 result = self.executeQuery( query = query_Indexes, data = data )
165 for table_name, index_name, column_name in result:
166 if table_name not in self.indexes:
167 self.indexes[table_name] = {}
168 self.indexes[table_name][index_name] = column_name
169 return self.indexes
170
171 def getPrimaryKeys(self):
172 if self.primaryKeys is not None:
173 return self.primaryKeys
174 print("retrieving primary key constraints from Oracle")
175 query_PrimaryKeys = """
176 SELECT table_name, column_name, position FROM sys.all_cons_columns WHERE owner = :SCHEMA AND constraint_name in
177 (SELECT constraint_name FROM sys.all_constraints WHERE owner = :SCHEMA AND constraint_type='P')
178 ORDER BY table_name, position
179 """
180 data = { "SCHEMA" : self.connection.schema }
181 pk = {}
182 result = self.executeQuery( query = query_PrimaryKeys, data = data )
183 for table_name, column_name, _ in result:
184 if table_name not in pk:
185 pk[table_name] = []
186 pk[table_name] += [ column_name ]
187 self.primaryKeys = pk
188 return self.primaryKeys
189
190 def getForeignKeys(self):
191 if self.foreignKeys is not None:
192 return self.foreignKeys
193 print("retrieving foreign key constraints from Oracle")
194 query_ForeignKeys = """
195 SELECT c.table_name, c.constraint_name, a.column_name, c.r_constraint_name, r.table_name, r.column_name
196 FROM sys.all_constraints c
197 INNER JOIN
198 sys.all_cons_columns a
199 on (c.constraint_type='R' and c.constraint_name = a.constraint_name and c.owner = :SCHEMA and a.owner = :SCHEMA)
200 INNER JOIN
201 sys.all_cons_columns r
202 on (c.r_constraint_name = r.constraint_name and r.owner = :SCHEMA)
203 """
204 data = { "SCHEMA" : self.connection.schema }
205 fk = {}
206 result = self.executeQuery( query = query_ForeignKeys, data = data )
207 for table, _, fkcol, _, rtable, rcol in result:
208 if table not in fk:
209 fk[table] = []
210 fk[table] += [ { "col" : fkcol, "rtab" : rtable, "rcol" : rcol } ]
211 self.foreignKeys = fk
212 return self.foreignKeys
213
214 def sqliteType(self, oraType):
215 if oraType.startswith("TIMESTAMP"):
216 return "TEXT"
217 d = { "VARCHAR2" : "TEXT",
218 "CHAR" : "TEXT",
219 "NUMBER" : "INTEGER",
220 "CLOB" : "TEXT"
221 }
222 return d[oraType]
223
224 def tableCreationCommand(self, tableName, primaryKeys, foreignKeys, indexes):
225 colNames, colTypes = self.columnNames(tableName)
226 lines = []
227 for colName, colType in zip(colNames,colTypes):
228 lines.append( "%s %s" % (colName, self.sqliteType(colType)) )
229 lines.append( "PRIMARY KEY (%s)" % ",".join(primaryKeys) )
230 for fk in foreignKeys:
231 lines.append( "FOREIGN KEY (%s) REFERENCES %s (%s)" % (fk["col"], fk["rtab"], fk["rcol"]) )
232 creationCommand = f"CREATE TABLE IF NOT EXISTS {tableName} (\n "
233 creationCommand += ",\n ".join(lines)
234 creationCommand += "\n);\n"
235 for index_name in indexes:
236 creationCommand += "CREATE INDEX %s on %s(%s);\n" % (index_name, tableName, indexes[index_name])
237 return creationCommand
238
239 def extractSchema(self, creationFileName):
240 fk = self.getForeignKeys()
241 pk = self.getPrimaryKeys()
242 indexes = self.getIndexes()
243 with open(creationFileName, "w") as fh:
244 print("Creating schema file for sqlite: %s" % creationFileName)
245 for tableName in self.getTables(exceptTables = self.ignoreTablesCreate):
246 print(self.tableCreationCommand( tableName, pk[tableName], fk.get(tableName, list()), indexes.get(tableName, list())), file = fh)
247
248 def copyTable(self, tableName, sqliteInserter, size):
249 # build insert statement
250 schema = self.connection.schema
251 colNames,_ = self.columnNames(tableName)
252 colNameList = ",".join(colNames)
253 bindVarList = ",".join(len(colNames)*["?"])
254 insertStatement = f"INSERT INTO {tableName} ({colNameList}) VALUES ({bindVarList})"
255 # select entries from oracle
256 selectQuery = f"SELECT {colNameList} FROM {schema}.{tableName}"
257 #print(selectQuery)
258 result = self.executeQuery( selectQuery )
259 useBulkInsert = True
260 if useBulkInsert:
261 sqliteInserter.insertBulk(insertStatement, result)
262 else:
263 c = 0
264 for data in result:
265 if c%10000==0:
266 print("%i / %i" % (c, size))
267 c+=1
268 sqliteInserter.insert(insertStatement, data)
269 if c==size:
270 break # just doing some timing measurements
271 sqliteInserter.commit()
272
273
274def main():
275 args = parseCmdline()
276 if args is None:
277 return 1
278
279 # instantiate the sqlite inserter with the sqlite db filename
280 sqliteInserter = SQLiteInserter(args.createfile)
281
282 # connect to oracle and do the work
283 with DBConnection(args.dbalias) as oraConn:
284 oraExp = OracleExporter(oraConn)
285
286 # extract the schema into a temporary file and create it on the sqlite side
287 tempTableCreationFileName = "tmpCreateTablesSQLite.sql"
288 oraExp.extractSchema(tempTableCreationFileName)
289 sqliteInserter.createSchema(tempTableCreationFileName)
290
291 # collect some info about the size, to not run completely blind
292 entries = {}
293 tablesToCreate = oraExp.getTables(exceptTables = oraExp.ignoreTablesCreate)
294 tablesToFill = oraExp.getTables(exceptTables = oraExp.ignoreTablesFill)
295 print("Tables to create: %i" % len(tablesToCreate))
296 print("Tables to fill: %i" % len(tablesToFill))
297
298
299 for tableName in tablesToCreate:
300 entries[tableName] = oraExp.tableSize(tableName)
301 doNotCopy = tableName not in tablesToFill
302 print(" table %s has %i entries %s" % (tableName, entries[tableName], ("(will not copy)" if doNotCopy else "") ) )
303 totalEntries = sum(entries.values())
304 print("\nTotal number of entries: %i" %totalEntries)
305
306 # copy the data one table at the time
307 print("Start copying data")
308 copiedEntries = 0
309 for tableName in tablesToFill:
310 print("Copying table %s" % tableName, end = '', flush=True)
311 oraExp.copyTable(tableName, sqliteInserter, entries[tableName])
312 copiedEntries += entries[tableName]
313 print(" => done %i / %i (%f%%)" % (copiedEntries, totalEntries, 100 * copiedEntries/totalEntries))
314
315 sqliteInserter.connection.close()
316
317
318if __name__ == "__main__":
319 import sys
320 sys.exit(main())
void print(char *figname, TCanvas *c1)
__exit__(self, type, value, traceback)
executeQuery(self, query, data={})
tableCreationCommand(self, tableName, primaryKeys, foreignKeys, indexes)
copyTable(self, tableName, sqliteInserter, size)
extractSchema(self, creationFileName)
createSchema(self, creationFileName)
insertBulk(self, statement, data)
insert(self, statement, data)