Convert .format()
to f-strings
Committed 437cea
--- a/sqlyte.py
+++ b/sqlyte.py
# XXX # import textwrap
# XXX # print(textwrap.dedent(schema))
# XXX # table_schemas.pop(table)
- # XXX # new_table = "new_{}".format(table)
+ # XXX # new_table = f"new_{table}"
# XXX # self.create(table, schema)
# XXX # self.create(new_table, schema)
# XXX # with self.transaction as cur:
# XXX # old_names = {col[0] for col in old_columns}
# XXX # new_names = {col[0] for col in new_columns}
# XXX # cols = list(old_names.intersection(new_names))
- # XXX # print("Migrating table `{}`..".format(table), end=" ")
+ # XXX # print(f"Migrating table `{table}`..", end=" ")
# XXX # for row in cur.select(table, what=", ".join(cols)):
# XXX # cur.insert(new_table, dict(zip(cols, list(row))))
# XXX # cur.drop(table)
record[column] = float(val)
columns, vals = zip(*record.items())
values.append(vals)
- sql = "{} INTO {}({})".format(
- operation.upper(), table, ", ".join(columns)
- ) + " VALUES ({})".format(", ".join("?" * len(vals)))
+ sql = (
+ f"{operation.upper()} INTO {table} ({', '.join(columns)})"
+ + f' VALUES ({", ".join("?" * len(vals))})'
+ )
if self.debug:
print(sql)
try:
vals=None,
suffix="",
):
- sql_parts = ["SELECT {}".format(what), "FROM {}".format(table)]
+ sql_parts = [f"SELECT {what}", f"FROM {table}"]
if join:
if not isinstance(join, (list, tuple)):
join = [join]
for join_statement in join:
- sql_parts.append("LEFT JOIN {}".format(join_statement))
+ sql_parts.append(f"LEFT JOIN {join_statement}")
if where:
# if vals:
# where = where.format(*[str(adapt(v)) for v in vals])
- sql_parts.append("WHERE {}".format(where))
+ sql_parts.append(f"WHERE {where}")
if group:
- sql_parts.append("GROUP BY {}".format(group))
+ sql_parts.append(f"GROUP BY {group}")
if order:
- sql_parts.append("ORDER BY {}".format(order))
+ sql_parts.append(f"ORDER BY {order}")
if limit:
- limitsql = "LIMIT {}".format(limit)
+ limitsql = f"LIMIT {limit}"
if offset:
- limitsql += " {}".format(offset)
+ limitsql += f" {offset}"
sql_parts.append(limitsql)
- return "({}) {}".format("\n".join(sql_parts), suffix).rstrip()
+ sql_lines = "\n".join(sql_parts)
+ return f"({sql_lines}) {suffix}".rstrip()
def update(self, table, what=None, where=None, vals=None, **record):
"""
Use `what` *or* `record`.
"""
- sql_parts = ["UPDATE {}".format(table)]
+ sql_parts = [f"UPDATE {table}"]
if what:
what_sql = what
else:
keys, values = zip(*record.items())
- what_sql = ", ".join("{}=?".format(key) for key in keys)
+ what_sql = ", ".join(f"{key}=?" for key in keys)
if vals is None:
vals = []
vals = list(values) + vals
- sql_parts.append("SET {}".format(what_sql))
+ sql_parts.append(f"SET {what_sql}")
if where:
- sql_parts.append("WHERE {}".format(where))
+ sql_parts.append(f"WHERE {where}")
sql = "\n".join(sql_parts)
if self.debug:
print(sql)
delete one or more records
"""
- sql_parts = ["DELETE FROM {}".format(table)]
+ sql_parts = [f"DELETE FROM {table}"]
if where:
- sql_parts.append("WHERE {}".format(where))
+ sql_parts.append(f"WHERE {where}")
sql = "\n".join(sql_parts)
if vals:
self.cur.execute(sql, vals)
"""Return columns for given table."""
return [
list(column)[1:]
- for column in self.cur.execute("PRAGMA table_info({})".format(table))
+ for column in self.cur.execute(f"PRAGMA table_info({table})")
]
def add_column(self, table, column_def):