Skip to content

Commit b8c8439

Browse files
bleonard33makmanalp
authored andcommitted
Better logging for multiprocessing (#6)
1 parent ad12707 commit b8c8439

File tree

6 files changed

+75
-59
lines changed

6 files changed

+75
-59
lines changed

pandas_to_postgres/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22
from .copy_hdf import HDFTableCopy, SmallHDFTableCopy, BigHDFTableCopy
33
from .hdf_to_postgres import hdf_to_postgres, create_hdf_table_objects, copy_worker
44
from .utilities import (
5-
logger,
65
hdf_metadata,
76
create_file_object,
87
df_generator,
98
cast_pandas,
9+
get_logger,
1010
)

pandas_to_postgres/_base_copy.py

+18-14
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from .utilities import logger
1+
from .utilities import get_logger
22
from sqlalchemy.schema import AddConstraint, DropConstraint
33
from sqlalchemy.exc import SQLAlchemyError
44

@@ -36,14 +36,15 @@ def __init__(
3636
self.csv_chunksize = csv_chunksize
3737

3838
if not defer_sql_objs:
39-
self.instantiate_sql_objs(conn, table_obj)
39+
self.instantiate_attrs(conn, table_obj)
4040
else:
4141
self.sql_table = sql_table
4242

43-
def instantiate_sql_objs(self, conn, table_obj):
43+
def instantiate_attrs(self, conn, table_obj):
4444
"""
45-
When using multiprocessing, pickling of SQLAlchemy objects in __init__ causes
46-
issues, so allow for deferring until after the pickling to fetch SQLAlchemy objs
45+
When using multiprocessing, pickling of logger and SQLAlchemy objects in
46+
__init__ causes issues, so allow for deferring until after the pickling to fetch
47+
SQLAlchemy objs
4748
4849
Parameters
4950
----------
@@ -55,6 +56,7 @@ def instantiate_sql_objs(self, conn, table_obj):
5556
self.conn = conn
5657
self.table_obj = table_obj
5758
self.sql_table = table_obj.name
59+
self.logger = get_logger(self.sql_table)
5860
self.primary_key = table_obj.primary_key
5961
self.foreign_keys = table_obj.foreign_key_constraints
6062

@@ -63,45 +65,47 @@ def drop_pk(self):
6365
Drop primary key constraints on PostgreSQL table as well as CASCADE any other
6466
constraints that may rely on the PK
6567
"""
66-
logger.info("Dropping {} primary key".format(self.sql_table))
68+
self.logger.info("Dropping {} primary key".format(self.sql_table))
6769
try:
6870
with self.conn.begin_nested():
6971
self.conn.execute(DropConstraint(self.primary_key, cascade=True))
7072
except SQLAlchemyError:
71-
logger.info("{} primary key not found. Skipping".format(self.sql_table))
73+
self.logger.info(
74+
"{} primary key not found. Skipping".format(self.sql_table)
75+
)
7276

7377
def create_pk(self):
7478
"""Create primary key constraints on PostgreSQL table"""
75-
logger.info("Creating {} primary key".format(self.sql_table))
79+
self.logger.info("Creating {} primary key".format(self.sql_table))
7680
self.conn.execute(AddConstraint(self.primary_key))
7781

7882
def drop_fks(self):
7983
"""Drop foreign key constraints on PostgreSQL table"""
8084
for fk in self.foreign_keys:
81-
logger.info("Dropping foreign key {}".format(fk.name))
85+
self.logger.info("Dropping foreign key {}".format(fk.name))
8286
try:
8387
with self.conn.begin_nested():
8488
self.conn.execute(DropConstraint(fk))
8589
except SQLAlchemyError:
86-
logger.warn("Foreign key {} not found".format(fk.name))
90+
self.logger.warn("Foreign key {} not found".format(fk.name))
8791

8892
def create_fks(self):
8993
"""Create foreign key constraints on PostgreSQL table"""
9094
for fk in self.foreign_keys:
9195
try:
92-
logger.info("Creating foreign key {}".format(fk.name))
96+
self.logger.info("Creating foreign key {}".format(fk.name))
9397
self.conn.execute(AddConstraint(fk))
9498
except SQLAlchemyError:
95-
logger.warn("Error creating foreign key {}".format(fk.name))
99+
self.logger.warn("Error creating foreign key {}".format(fk.name))
96100

97101
def truncate(self):
98102
"""TRUNCATE PostgreSQL table"""
99-
logger.info("Truncating {}".format(self.sql_table))
103+
self.logger.info("Truncating {}".format(self.sql_table))
100104
self.conn.execute("TRUNCATE TABLE {};".format(self.sql_table))
101105

102106
def analyze(self):
103107
"""Run ANALYZE on PostgreSQL table"""
104-
logger.info("Analyzing {}".format(self.sql_table))
108+
self.logger.info("Analyzing {}".format(self.sql_table))
105109
self.conn.execute("ANALYZE {};".format(self.sql_table))
106110

107111
def copy_from_file(self, file_object):

pandas_to_postgres/copy_df.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from .utilities import create_file_object, df_generator, logger, cast_pandas
1+
from .utilities import create_file_object, df_generator, cast_pandas
22
from ._base_copy import BaseCopy
33

44

@@ -38,17 +38,17 @@ def copy(self, functions=[cast_pandas]):
3838
with self.conn.begin():
3939
self.truncate()
4040

41-
logger.info("Creating generator for chunking dataframe")
41+
self.logger.info("Creating generator for chunking dataframe")
4242
for chunk in df_generator(self.df, self.csv_chunksize):
4343

44-
logger.info("Creating CSV in memory")
44+
self.logger.info("Creating CSV in memory")
4545
fo = create_file_object(chunk)
4646

47-
logger.info("Copying chunk to database")
47+
self.logger.info("Copying chunk to database")
4848
self.copy_from_file(fo)
4949
del fo
5050

51-
logger.info("All chunks copied ({} rows)".format(self.rows))
51+
self.logger.info("All chunks copied ({} rows)".format(self.rows))
5252

5353
self.create_pk()
5454
self.create_fks()

pandas_to_postgres/copy_hdf.py

+34-28
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
from .utilities import create_file_object, df_generator, logger, cast_pandas
2-
from ._base_copy import BaseCopy
31
import pandas as pd
2+
from .utilities import create_file_object, df_generator, cast_pandas
3+
from ._base_copy import BaseCopy
44

55

66
class HDFTableCopy(BaseCopy):
@@ -90,33 +90,35 @@ def hdf_to_pg(self, data_formatters=[cast_pandas], data_formatter_kwargs={}):
9090
data_formatter_kwargs: list of kwargs to pass to data_formatters functions
9191
"""
9292
if self.hdf_tables is None:
93-
logger.warn("No HDF table found for SQL table {}".format(self.sql_table))
93+
self.logger.warn(
94+
"No HDF table found for SQL table {}".format(self.sql_table)
95+
)
9496
return
9597

9698
for hdf_table in self.hdf_tables:
97-
logger.info("*** {} ***".format(hdf_table))
99+
self.logger.info("*** {} ***".format(hdf_table))
98100

99-
logger.info("Reading HDF table")
101+
self.logger.info("Reading HDF table")
100102
df = pd.read_hdf(self.file_name, key=hdf_table)
101103
self.rows += len(df)
102104

103105
data_formatter_kwargs["hdf_table"] = hdf_table
104-
logger.info("Formatting data")
106+
self.logger.info("Formatting data")
105107
df = self.data_formatting(
106108
df, functions=data_formatters, **data_formatter_kwargs
107109
)
108110

109-
logger.info("Creating generator for chunking dataframe")
110-
for chunk in df_generator(df, self.csv_chunksize):
111+
self.logger.info("Creating generator for chunking dataframe")
112+
for chunk in df_generator(df, self.csv_chunksize, logger=self.logger):
111113

112-
logger.info("Creating CSV in memory")
114+
self.logger.info("Creating CSV in memory")
113115
fo = create_file_object(chunk)
114116

115-
logger.info("Copying chunk to database")
117+
self.logger.info("Copying chunk to database")
116118
self.copy_from_file(fo)
117119
del fo
118120
del df
119-
logger.info("All chunks copied ({} rows)".format(self.rows))
121+
self.logger.info("All chunks copied ({} rows)".format(self.rows))
120122

121123

122124
class SmallHDFTableCopy(HDFTableCopy):
@@ -136,29 +138,29 @@ def hdf_to_pg(self, data_formatters=[cast_pandas], data_formatter_kwargs={}):
136138
data_formatter_kwargs: list of kwargs to pass to data_formatters functions
137139
"""
138140
if self.hdf_tables is None:
139-
logger.warn("No HDF table found for SQL table {self.sql_table}")
141+
self.logger.warn("No HDF table found for SQL table {self.sql_table}")
140142
return
141143

142144
for hdf_table in self.hdf_tables:
143-
logger.info("*** {} ***".format(hdf_table))
144-
logger.info("Reading HDF table")
145+
self.logger.info("*** {} ***".format(hdf_table))
146+
self.logger.info("Reading HDF table")
145147
df = pd.read_hdf(self.file_name, key=hdf_table)
146148
self.rows += len(df)
147149

148150
data_formatter_kwargs["hdf_table"] = hdf_table
149-
logger.info("Formatting data")
151+
self.logger.info("Formatting data")
150152
df = self.data_formatting(
151153
df, functions=data_formatters, **data_formatter_kwargs
152154
)
153155

154-
logger.info("Creating CSV in memory")
156+
self.logger.info("Creating CSV in memory")
155157
fo = create_file_object(df)
156158

157-
logger.info("Copying table to database")
159+
self.logger.info("Copying table to database")
158160
self.copy_from_file(fo)
159161
del df
160162
del fo
161-
logger.info("All chunks copied ({} rows)".format(self.rows))
163+
self.logger.info("All chunks copied ({} rows)".format(self.rows))
162164

163165

164166
class BigHDFTableCopy(HDFTableCopy):
@@ -181,11 +183,13 @@ def hdf_to_pg(self, data_formatters=[cast_pandas], data_formatter_kwargs={}):
181183
data_formatter_kwargs: list of kwargs to pass to data_formatters functions
182184
"""
183185
if self.hdf_tables is None:
184-
logger.warn("No HDF table found for SQL table {}".format(self.sql_table))
186+
self.logger.warn(
187+
"No HDF table found for SQL table {}".format(self.sql_table)
188+
)
185189
return
186190

187191
for hdf_table in self.hdf_tables:
188-
logger.info("*** {} ***".format(hdf_table))
192+
self.logger.info("*** {} ***".format(hdf_table))
189193

190194
with pd.HDFStore(self.file_name) as store:
191195
nrows = store.get_storer(hdf_table).nrows
@@ -199,26 +203,28 @@ def hdf_to_pg(self, data_formatters=[cast_pandas], data_formatter_kwargs={}):
199203
start = 0
200204

201205
for i in range(n_chunks):
202-
logger.info("*** HDF chunk {i} of {n} ***".format(i=i + 1, n=n_chunks))
203-
logger.info("Reading HDF table")
206+
self.logger.info(
207+
"*** HDF chunk {i} of {n} ***".format(i=i + 1, n=n_chunks)
208+
)
209+
self.logger.info("Reading HDF table")
204210
stop = min(start + self.hdf_chunksize, nrows)
205211
df = pd.read_hdf(self.file_name, key=hdf_table, start=start, stop=stop)
206212

207213
start += self.hdf_chunksize
208214

209215
data_formatter_kwargs["hdf_table"] = hdf_table
210-
logger.info("Formatting data")
216+
self.logger.info("Formatting data")
211217
df = self.data_formatting(
212218
df, functions=data_formatters, **data_formatter_kwargs
213219
)
214220

215-
logger.info("Creating generator for chunking dataframe")
216-
for chunk in df_generator(df, self.csv_chunksize):
217-
logger.info("Creating CSV in memory")
221+
self.logger.info("Creating generator for chunking dataframe")
222+
for chunk in df_generator(df, self.csv_chunksize, logger=self.logger):
223+
self.logger.info("Creating CSV in memory")
218224
fo = create_file_object(chunk)
219225

220-
logger.info("Copying chunk to database")
226+
self.logger.info("Copying chunk to database")
221227
self.copy_from_file(fo)
222228
del fo
223229
del df
224-
logger.info("All chunks copied ({} rows)".format(self.rows))
230+
self.logger.info("All chunks copied ({} rows)".format(self.rows))

pandas_to_postgres/hdf_to_postgres.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
from multiprocessing import Pool
22
from sqlalchemy import MetaData, create_engine
33
from .copy_hdf import HDFTableCopy
4-
from .utilities import cast_pandas
4+
from .utilities import cast_pandas, get_logger
5+
6+
7+
logger = get_logger("hdf_to_postgres")
58

69

710
def create_hdf_table_objects(
@@ -92,7 +95,7 @@ def copy_worker(
9295
if table_obj is None:
9396
raise ValueError("Table {} does not exist.".format(copy_obj.sql_table))
9497

95-
copy_obj.instantiate_sql_objs(conn, table_obj)
98+
copy_obj.instantiate_attrs(conn, table_obj)
9699

97100
# Run the task
98101
copy_obj.copy(

pandas_to_postgres/utilities.py

+12-9
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
import logging
2-
from pandas import HDFStore, isna
32
from collections import defaultdict
3+
from pandas import isna, HDFStore
44
from io import StringIO
55

66

7-
logging.basicConfig(
8-
level=logging.DEBUG,
9-
format="%(levelname)s %(asctime)s.%(msecs)03d %(message)s",
10-
datefmt="%Y-%m-%d,%H:%M:%S",
11-
)
7+
def get_logger(name):
8+
logging.basicConfig(
9+
level=logging.INFO,
10+
format="%(asctime)s.%(msecs)03d - %(name)s - %(levelname)s %(message)s",
11+
datefmt="%Y-%m-%d,%H:%M:%S",
12+
)
1213

13-
logger = logging.getLogger("pandas_to_postgres")
14+
return logging.getLogger(name)
1415

1516

1617
def hdf_metadata(file_name, keys=None, metadata_attr=None, metadata_keys=[]):
@@ -42,6 +43,7 @@ def hdf_metadata(file_name, keys=None, metadata_attr=None, metadata_keys=[]):
4243

4344
sql_to_hdf = defaultdict(set)
4445
metadata_vars = defaultdict(dict)
46+
logger = get_logger("hdf_metadata")
4547

4648
with HDFStore(file_name, mode="r") as store:
4749
keys = keys or store.keys()
@@ -90,7 +92,7 @@ def create_file_object(df):
9092
return file_object
9193

9294

93-
def df_generator(df, chunksize=10 ** 6):
95+
def df_generator(df, chunksize=10 ** 6, logger=None):
9496
"""
9597
Create a generator to iterate over chunks of a dataframe
9698
@@ -108,7 +110,8 @@ def df_generator(df, chunksize=10 ** 6):
108110
n_chunks = (df.shape[0] // chunksize) + 1
109111

110112
for i in range(n_chunks):
111-
logger.info("Chunk {i}/{n}".format(i=i + 1, n=n_chunks))
113+
if logger:
114+
logger.info("Chunk {i}/{n}".format(i=i + 1, n=n_chunks))
112115
yield df.iloc[rows : rows + chunksize]
113116
rows += chunksize
114117

0 commit comments

Comments
 (0)