ATLAS Offline Software
AtlTriggerDBCopy.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
4 
5 # script to produce a one-to-one copy of the oracle Run 2 MC TriggerDB to sqlite
6 
7 from TrigConfIO.TriggerConfigAccessBase import ConfigDBLoader
8 from copy import deepcopy
9 import sqlite3
10 
12  import argparse
13  parser = argparse.ArgumentParser(prog = "AtlTriggerDBCopy.py", description="Example: %(prog)s -d TRIGGERDBMC -c triggerDBMC_Run2.db")
14  parser.add_argument("-d", "--dbalias", dest="dbalias", help="TriggerDB connection alias for the source DB")
15  parser.add_argument("-c", "--create", dest="createfile", help="create sqlite db file")
16  parser.add_argument("-v", help="increase output verbosity", action="count", default=0)
17  args = parser.parse_args()
18  if not args.dbalias:
19  print("No source Trigger DB specified")
20  parser.print_help()
21  return None
22  if not args.createfile:
23  print("No sqlite target file specified")
24  parser.print_help(parser)
25  return None
26  return args
27 
28 # class to handle the oracle db connection
29 # it uses the CORAL authentication method implemented in the TrigConfIO.TriggerConfigAccessBase (ConfigDBLoader)
31  def __init__(self, dbalias):
32  self.dbalias = dbalias
33  self.connection = None
34  self.schema = None
35 
36  def __enter__(self):
37  credentials = ConfigDBLoader.getConnectionParameters(self.dbalias)
38  self.connection, self.schema = ConfigDBLoader.getConnection(credentials)
39  print(f"Opening connection to {self.dbalias}")
40  return self
41 
42  def __exit__(self, type, value, traceback):
43  self.connection.close()
44  print(f"Closing connection to {self.dbalias}" )
45 
46  def cursor(self):
47  return self.connection.cursor()
48 
49  def schema(self):
50  return self.schema
51 
52 # class to handle the insertion of schema and data on the sqlite side
54 
55  def __init__(self, filename = None):
56  self.filename = filename
57  self.connection = None
58  self.activeTransaction = False
59  self.insCountInTrans = 0
61  if self.filename is None:
62  print("Not connected to an output file, will print the insert commands instead")
63  import os
64  if filename and os.path.exists(filename):
65  print(f"Target file {filename} exists already, will abort in order to prevent overwriting")
66  raise RuntimeError("Target file already exists")
67  if self.filename is not None:
68  self.connection = sqlite3.connect(self.filename)
69  self.cursor = self.connection.cursor()
70  def commit(self):
71  if self.activeTransaction:
72  self.cursor.execute('COMMIT')
73  self.activeTransaction = False
74  self.insCountInTrans = 0
75 
76  def insert(self, statement, data):
77  if self.connection is None:
78  return
79  if not self.activeTransaction:
80  self.cursor.execute('BEGIN TRANSACTION')
81  self.activeTransaction = True
82  self.cursor.execute(statement, data)
83  self.insCountInTrans += 1
85  self.commit()
86 
87  def insertBulk(self, statement, data):
88  if self.connection is None:
89  return
90  self.cursor.executemany(statement, iter(data))
91  self.connection.commit()
92 
93  def createSchema(self, creationFileName):
94  if self.connection is None:
95  return
96  with open(creationFileName) as fh:
97  print("Creating sqlite db %s from %s" % (self.filename, creationFileName) )
98  self.cursor.executescript( fh.read() )
99 
100 
101 # class to handle the extraction of schema and data on the oracle side
103 
104  def __init__(self, connection):
105  self.connection = connection
106  self.primaryKeys = None
107  self.foreignKeys = None
108  self.indexes = None
109  self.allTables = None
110  self.ignoreTablesCreate = [ 'TT_WRITELOCK', 'HLT_SMT_TO_HRE', 'HLT_PARAMETER', 'HLT_RELEASE', 'TRIGGER_LOG',
111  'DBCOPY_SOURCE_DATABASE', 'HLT_RULE_SET', 'TEMP', 'HLT_RULE_PARAMETER', 'HLT_RULE_COMPONENT',
112  'HLT_SETUP', 'TT_USERS', 'HLT_RULE', "HLT_HRC_TO_HRP", "HLT_HRE_TO_HRS",
113  "HLT_HRS_TO_HRU", "HLT_HRU_TO_HRC", "HLT_PRESCALE_SET_ALIAS", "HLT_PRESCALE_SET_COLL",
114  "PRESCALE_SET_ALIAS", "TRIGGER_ALIAS", "L1_PRESCALE_SET_ALIAS", "L1_CALO_SIN_COS",
115  "L1_CI_TO_CSC", "L1_JET_INPUT", "L1_MUON_THRESHOLD_SET"]
117  print("Tables to ignore creation: %r" % len(self.ignoreTablesCreate))
118  print("Tables to ignore filling: %r" % len(self.ignoreTablesFill))
119 
120 
121  def getTables(self, exceptTables):
122  if self.allTables is None:
123  query_ListAllTables = "SELECT table_name FROM all_tables WHERE owner=:SCHEMA"
124  data = { "SCHEMA" : self.connection.schema }
125  result = self.executeQuery(query_ListAllTables, data)
126  self.allTables = [x[0] for x in result]
127  print("All tables: %i" % len(self.allTables))
128  tables = deepcopy(self.allTables)
129  if exceptTables is not None:
130  for x in exceptTables:
131  if x in tables:
132  tables.remove(x)
133  return tables
134 
135  def executeQuery(self, query, data = {}):
136  cursor = self.connection.cursor()
137  cursor.arraysize = 1000
138  cursor.execute( query, **data )
139  return cursor
140 
141  def tableSize(self,tableName):
142  query_TableSize = "SELECT COUNT(*) FROM {schema}.{tableName}"
143  qdict = { "schema" : self.connection.schema, "tableName" : tableName }
144  result = self.executeQuery( query_TableSize.format(**qdict) )
145  row = result.fetchone()
146  return row[0]
147 
148  def columnNames(self,tableName):
149  query_ColumnNames = "SELECT column_name, data_type FROM sys.all_tab_columns WHERE owner = :SCHEMA AND table_name = :TABLE_NAME ORDER BY column_id"
150  data = { "SCHEMA" : self.connection.schema, "TABLE_NAME" : tableName }
151  result = self.executeQuery( query = query_ColumnNames, data = data )
152  colNames, colTypes = zip(*result)
153  return colNames, colTypes
154 
155  def getIndexes(self):
156  if self.indexes is not None:
157  return self.indexes
158  self.indexes = {}
159  print("retrieving indexes from Oracle")
160  query_Indexes = """
161  SELECT table_name, index_name, column_name FROM sys.all_ind_columns WHERE table_owner = :SCHEMA
162  """
163  data = { "SCHEMA" : self.connection.schema }
164  result = self.executeQuery( query = query_Indexes, data = data )
165  for table_name, index_name, column_name in result:
166  if table_name not in self.indexes:
167  self.indexes[table_name] = {}
168  self.indexes[table_name][index_name] = column_name
169  return self.indexes
170 
171  def getPrimaryKeys(self):
172  if self.primaryKeys is not None:
173  return self.primaryKeys
174  print("retrieving primary key constraints from Oracle")
175  query_PrimaryKeys = """
176  SELECT table_name, column_name, position FROM sys.all_cons_columns WHERE owner = :SCHEMA AND constraint_name in
177  (SELECT constraint_name FROM sys.all_constraints WHERE owner = :SCHEMA AND constraint_type='P')
178  ORDER BY table_name, position
179  """
180  data = { "SCHEMA" : self.connection.schema }
181  pk = {}
182  result = self.executeQuery( query = query_PrimaryKeys, data = data )
183  for table_name, column_name, _ in result:
184  if table_name not in pk:
185  pk[table_name] = []
186  pk[table_name] += [ column_name ]
187  self.primaryKeys = pk
188  return self.primaryKeys
189 
190  def getForeignKeys(self):
191  if self.foreignKeys is not None:
192  return self.foreignKeys
193  print("retrieving foreign key constraints from Oracle")
194  query_ForeignKeys = """
195  SELECT c.table_name, c.constraint_name, a.column_name, c.r_constraint_name, r.table_name, r.column_name
196  FROM sys.all_constraints c
197  INNER JOIN
198  sys.all_cons_columns a
199  on (c.constraint_type='R' and c.constraint_name = a.constraint_name and c.owner = :SCHEMA and a.owner = :SCHEMA)
200  INNER JOIN
201  sys.all_cons_columns r
202  on (c.r_constraint_name = r.constraint_name and r.owner = :SCHEMA)
203  """
204  data = { "SCHEMA" : self.connection.schema }
205  fk = {}
206  result = self.executeQuery( query = query_ForeignKeys, data = data )
207  for table, _, fkcol, _, rtable, rcol in result:
208  if table not in fk:
209  fk[table] = []
210  fk[table] += [ { "col" : fkcol, "rtab" : rtable, "rcol" : rcol } ]
211  self.foreignKeys = fk
212  return self.foreignKeys
213 
214  def sqliteType(self, oraType):
215  if oraType.startswith("TIMESTAMP"):
216  return "TEXT"
217  d = { "VARCHAR2" : "TEXT",
218  "CHAR" : "TEXT",
219  "NUMBER" : "INTEGER",
220  "CLOB" : "TEXT"
221  }
222  return d[oraType]
223 
224  def tableCreationCommand(self, tableName, primaryKeys, foreignKeys, indexes):
225  colNames, colTypes = self.columnNames(tableName)
226  lines = []
227  for colName, colType in zip(colNames,colTypes):
228  lines.append( "%s %s" % (colName, self.sqliteType(colType)) )
229  lines.append( "PRIMARY KEY (%s)" % ",".join(primaryKeys) )
230  for fk in foreignKeys:
231  lines.append( "FOREIGN KEY (%s) REFERENCES %s (%s)" % (fk["col"], fk["rtab"], fk["rcol"]) )
232  creationCommand = f"CREATE TABLE IF NOT EXISTS {tableName} (\n "
233  creationCommand += ",\n ".join(lines)
234  creationCommand += "\n);\n"
235  for index_name in indexes:
236  creationCommand += "CREATE INDEX %s on %s(%s);\n" % (index_name, tableName, indexes[index_name])
237  return creationCommand
238 
239  def extractSchema(self, creationFileName):
240  fk = self.getForeignKeys()
241  pk = self.getPrimaryKeys()
242  indexes = self.getIndexes()
243  with open(creationFileName, "w") as fh:
244  print("Creating schema file for sqlite: %s" % creationFileName)
245  for tableName in self.getTables(exceptTables = self.ignoreTablesCreate):
246  print(self.tableCreationCommand( tableName, pk[tableName], fk.get(tableName, list()), indexes.get(tableName, list())), file = fh)
247 
248  def copyTable(self, tableName, sqliteInserter, size):
249  # build insert statement
250  schema = self.connection.schema
251  colNames,_ = self.columnNames(tableName)
252  colNameList = ",".join(colNames)
253  bindVarList = ",".join(len(colNames)*["?"])
254  insertStatement = f"INSERT INTO {tableName} ({colNameList}) VALUES ({bindVarList})"
255  # select entries from oracle
256  selectQuery = f"SELECT {colNameList} FROM {schema}.{tableName}"
257  #print(selectQuery)
258  result = self.executeQuery( selectQuery )
259  useBulkInsert = True
260  if useBulkInsert:
261  sqliteInserter.insertBulk(insertStatement, result)
262  else:
263  c = 0
264  for data in result:
265  if c%10000==0:
266  print("%i / %i" % (c, size))
267  c+=1
268  sqliteInserter.insert(insertStatement, data)
269  if c==size:
270  break # just doing some timing measurements
271  sqliteInserter.commit()
272 
273 
274 def main():
275  args = parseCmdline()
276  if args is None:
277  return 1
278 
279  # instantiate the sqlite inserter with the sqlite db filename
280  sqliteInserter = SQLiteInserter(args.createfile)
281 
282  # connect to oracle and do the work
283  with DBConnection(args.dbalias) as oraConn:
284  oraExp = OracleExporter(oraConn)
285 
286  # extract the schema into a temporary file and create it on the sqlite side
287  tempTableCreationFileName = "tmpCreateTablesSQLite.sql"
288  oraExp.extractSchema(tempTableCreationFileName)
289  sqliteInserter.createSchema(tempTableCreationFileName)
290 
291  # collect some info about the size, to not run completely blind
292  entries = {}
293  tablesToCreate = oraExp.getTables(exceptTables = oraExp.ignoreTablesCreate)
294  tablesToFill = oraExp.getTables(exceptTables = oraExp.ignoreTablesFill)
295  print("Tables to create: %i" % len(tablesToCreate))
296  print("Tables to fill: %i" % len(tablesToFill))
297 
298 
299  for tableName in tablesToCreate:
300  entries[tableName] = oraExp.tableSize(tableName)
301  doNotCopy = tableName not in tablesToFill
302  print(" table %s has %i entries %s" % (tableName, entries[tableName], ("(will not copy)" if doNotCopy else "") ) )
303  totalEntries = sum(entries.values())
304  print("\nTotal number of entries: %i" %totalEntries)
305 
306  # copy the data one table at the time
307  print("Start copying data")
308  copiedEntries = 0
309  for tableName in tablesToFill:
310  print("Copying table %s" % tableName, end = '', flush=True)
311  oraExp.copyTable(tableName, sqliteInserter, entries[tableName])
312  copiedEntries += entries[tableName]
313  print(" => done %i / %i (%f%%)" % (copiedEntries, totalEntries, 100 * copiedEntries/totalEntries))
314 
315  sqliteInserter.connection.close()
316 
317 
318 if __name__ == "__main__":
319  import sys
320  sys.exit(main())
AtlTriggerDBCopy.OracleExporter.sqliteType
def sqliteType(self, oraType)
Definition: AtlTriggerDBCopy.py:214
AtlTriggerDBCopy.OracleExporter.__init__
def __init__(self, connection)
Definition: AtlTriggerDBCopy.py:104
AtlTriggerDBCopy.parseCmdline
def parseCmdline()
Definition: AtlTriggerDBCopy.py:11
AtlTriggerDBCopy.OracleExporter.allTables
allTables
Definition: AtlTriggerDBCopy.py:109
AtlTriggerDBCopy.OracleExporter.copyTable
def copyTable(self, tableName, sqliteInserter, size)
Definition: AtlTriggerDBCopy.py:248
AtlTriggerDBCopy.SQLiteInserter.cursor
cursor
Definition: AtlTriggerDBCopy.py:69
AtlTriggerDBCopy.SQLiteInserter.insert
def insert(self, statement, data)
Definition: AtlTriggerDBCopy.py:76
AtlTriggerDBCopy.OracleExporter.tableSize
def tableSize(self, tableName)
Definition: AtlTriggerDBCopy.py:141
AtlTriggerDBCopy.DBConnection
Definition: AtlTriggerDBCopy.py:30
AtlTriggerDBCopy.OracleExporter
Definition: AtlTriggerDBCopy.py:102
AtlTriggerDBCopy.OracleExporter.connection
connection
Definition: AtlTriggerDBCopy.py:105
LArG4FSStartPointFilterLegacy.execute
execute
Definition: LArG4FSStartPointFilterLegacy.py:20
AtlTriggerDBCopy.OracleExporter.executeQuery
def executeQuery(self, query, data={})
Definition: AtlTriggerDBCopy.py:135
AtlTriggerDBCopy.OracleExporter.ignoreTablesFill
ignoreTablesFill
Definition: AtlTriggerDBCopy.py:116
AtlTriggerDBCopy.SQLiteInserter.commit
def commit(self)
Definition: AtlTriggerDBCopy.py:70
AtlTriggerDBCopy.DBConnection.dbalias
dbalias
Definition: AtlTriggerDBCopy.py:32
convertTimingResiduals.sum
sum
Definition: convertTimingResiduals.py:55
AtlTriggerDBCopy.OracleExporter.foreignKeys
foreignKeys
Definition: AtlTriggerDBCopy.py:107
AtlTriggerDBCopy.main
def main()
Definition: AtlTriggerDBCopy.py:274
AtlTriggerDBCopy.SQLiteInserter.insCountInTrans
insCountInTrans
Definition: AtlTriggerDBCopy.py:59
AtlTriggerDBCopy.OracleExporter.getPrimaryKeys
def getPrimaryKeys(self)
Definition: AtlTriggerDBCopy.py:171
AtlTriggerDBCopy.OracleExporter.extractSchema
def extractSchema(self, creationFileName)
Definition: AtlTriggerDBCopy.py:239
AtlTriggerDBCopy.SQLiteInserter.insertBulk
def insertBulk(self, statement, data)
Definition: AtlTriggerDBCopy.py:87
AtlTriggerDBCopy.OracleExporter.getTables
def getTables(self, exceptTables)
Definition: AtlTriggerDBCopy.py:121
AtlTriggerDBCopy.DBConnection.schema
schema
Definition: AtlTriggerDBCopy.py:34
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
AtlTriggerDBCopy.OracleExporter.ignoreTablesCreate
ignoreTablesCreate
Definition: AtlTriggerDBCopy.py:110
AtlTriggerDBCopy.SQLiteInserter.activeTransaction
activeTransaction
Definition: AtlTriggerDBCopy.py:58
AtlTriggerDBCopy.SQLiteInserter
Definition: AtlTriggerDBCopy.py:53
AtlTriggerDBCopy.DBConnection.cursor
def cursor(self)
Definition: AtlTriggerDBCopy.py:46
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
AtlTriggerDBCopy.SQLiteInserter.filename
filename
Definition: AtlTriggerDBCopy.py:56
AtlTriggerDBCopy.OracleExporter.primaryKeys
primaryKeys
Definition: AtlTriggerDBCopy.py:106
AtlTriggerDBCopy.SQLiteInserter.maxInsertsPerTransaction
maxInsertsPerTransaction
Definition: AtlTriggerDBCopy.py:60
AtlTriggerDBCopy.SQLiteInserter.__init__
def __init__(self, filename=None)
Definition: AtlTriggerDBCopy.py:55
Trk::open
@ open
Definition: BinningType.h:40
AtlTriggerDBCopy.OracleExporter.getIndexes
def getIndexes(self)
Definition: AtlTriggerDBCopy.py:155
AtlTriggerDBCopy.DBConnection.connection
connection
Definition: AtlTriggerDBCopy.py:33
AtlTriggerDBCopy.SQLiteInserter.createSchema
def createSchema(self, creationFileName)
Definition: AtlTriggerDBCopy.py:93
query_example.cursor
cursor
Definition: query_example.py:21
AtlTriggerDBCopy.DBConnection.__enter__
def __enter__(self)
Definition: AtlTriggerDBCopy.py:36
AtlTriggerDBCopy.SQLiteInserter.connection
connection
Definition: AtlTriggerDBCopy.py:57
AtlTriggerDBCopy.OracleExporter.tableCreationCommand
def tableCreationCommand(self, tableName, primaryKeys, foreignKeys, indexes)
Definition: AtlTriggerDBCopy.py:224
Muon::print
std::string print(const MuPatSegment &)
Definition: MuonTrackSteering.cxx:28
AtlTriggerDBCopy.DBConnection.__exit__
def __exit__(self, type, value, traceback)
Definition: AtlTriggerDBCopy.py:42
AtlTriggerDBCopy.DBConnection.__init__
def __init__(self, dbalias)
Definition: AtlTriggerDBCopy.py:31
AtlTriggerDBCopy.OracleExporter.columnNames
def columnNames(self, tableName)
Definition: AtlTriggerDBCopy.py:148
AtlTriggerDBCopy.OracleExporter.indexes
indexes
Definition: AtlTriggerDBCopy.py:108
AtlTriggerDBCopy.OracleExporter.getForeignKeys
def getForeignKeys(self)
Definition: AtlTriggerDBCopy.py:190