Skip to content

Commit ad12707

Browse files
bleonard33makmanalp
authored andcommitted
Refactor HDFMetadata from class to function (#5)
* Refactor HDF metadata from class to function to pass more args directly to hdf_to_postgres rather than through HDFMetadata object, few small bug fixes * Updated docstrings * _copy_worker --> copy_worker * Add the data_formatters back into copy_worker function * Missed format key
1 parent edbb7f7 commit ad12707

File tree

5 files changed

+177
-184
lines changed

5 files changed

+177
-184
lines changed

pandas_to_postgres/__init__.py

+2-5
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
from .copy_df import DataFrameCopy
22
from .copy_hdf import HDFTableCopy, SmallHDFTableCopy, BigHDFTableCopy
3-
from .hdf_to_postgres import (
4-
hdf_to_postgres,
5-
create_hdf_table_objects,
6-
)
3+
from .hdf_to_postgres import hdf_to_postgres, create_hdf_table_objects, copy_worker
74
from .utilities import (
85
logger,
9-
HDFMetadata,
6+
hdf_metadata,
107
create_file_object,
118
df_generator,
129
cast_pandas,

pandas_to_postgres/_base_copy.py

+8-3
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,10 @@ def create_fks(self):
8989
"""Create foreign key constraints on PostgreSQL table"""
9090
for fk in self.foreign_keys:
9191
try:
92-
logger.info("Creating foreign key {fk.name}".format(fk.name))
92+
logger.info("Creating foreign key {}".format(fk.name))
9393
self.conn.execute(AddConstraint(fk))
9494
except SQLAlchemyError:
95-
logger.warn("Error creating foreign key {fk.name}".format(fk.name))
95+
logger.warn("Error creating foreign key {}".format(fk.name))
9696

9797
def truncate(self):
9898
"""TRUNCATE PostgreSQL table"""
@@ -128,12 +128,17 @@ def data_formatting(self, df, functions=[], **kwargs):
128128
Parameters
129129
----------
130130
df: pandas DataFrame
131-
dataframe to format
131+
DataFrame to format
132132
functions: list of functions
133133
Functions to apply to df. each gets passed df, self as copy_obj, and all
134134
kwargs passed to data_formatting
135135
**kwargs
136136
kwargs to pass on to each function
137+
138+
Returns
139+
-------
140+
df: pandas DataFrame
141+
formatted DataFrame
137142
"""
138143
for f in functions:
139144
df = f(df, copy_obj=self, **kwargs)

pandas_to_postgres/copy_hdf.py

+16-92
Original file line numberDiff line numberDiff line change
@@ -11,42 +11,46 @@ class HDFTableCopy(BaseCopy):
1111

1212
def __init__(
1313
self,
14+
file_name,
1415
hdf_tables,
15-
hdf_meta,
1616
defer_sql_objs=False,
1717
conn=None,
1818
table_obj=None,
1919
sql_table=None,
2020
csv_chunksize=10 ** 6,
21+
hdf_chunksize=10 ** 7,
22+
hdf_metadata=None,
2123
):
2224
"""
2325
Parameters
2426
----------
27+
file_name
2528
hdf_tables: list of strings
2629
HDF keys with data corresponding to destination SQL table
2730
(assumption being that HDF tables:SQL tables is many:one)
28-
hdf_meta: HDFMetadata object
29-
Information from the HDF file for use in building copy objects
3031
defer_sql_objs: bool
3132
multiprocessing has issue with passing SQLALchemy objects, so if
3233
True, defer attributing these to the object until after pickled by Pool
33-
conn: SQLAlchemy connection
34+
conn: SQLAlchemy connection or None
3435
Managed outside of the object
35-
table_obj: SQLAlchemy model object
36+
table_obj: SQLAlchemy model object or None
3637
Destination SQL Table
37-
sql_table: string
38+
sql_table: string or None
3839
SQL table name
3940
csv_chunksize: int
4041
Max rows to keep in memory when generating CSV for COPY
42+
hdf_chunksize: int
43+
Max rows to keep in memory when reading HDF file
44+
hdf_metadata: dict or None
45+
Dict of HDF table keys to dict of constant:value pairs. Not actively used by
46+
any pre-defined function, but available to data_formatting method
4147
"""
4248
super().__init__(defer_sql_objs, conn, table_obj, sql_table, csv_chunksize)
4349

4450
self.hdf_tables = hdf_tables
45-
46-
# Info from the HDFMetadata object
47-
self.hdf_metadata = hdf_meta.metadata_vars
48-
self.file_name = hdf_meta.file_name
49-
self.hdf_chunksize = hdf_meta.chunksize
51+
self.hdf_metadata = hdf_metadata
52+
self.file_name = file_name
53+
self.hdf_chunksize = hdf_chunksize
5054

5155
def copy(self, data_formatters=[cast_pandas], data_formatter_kwargs={}):
5256
"""
@@ -121,46 +125,6 @@ class SmallHDFTableCopy(HDFTableCopy):
121125
in-memory for both reading from the HDF as well as COPYing using StringIO.
122126
"""
123127

124-
def __init__(
125-
self,
126-
hdf_tables,
127-
hdf_meta,
128-
defer_sql_objs=False,
129-
conn=None,
130-
table_obj=None,
131-
sql_table=None,
132-
csv_chunksize=10 ** 6,
133-
):
134-
"""
135-
Parameters
136-
----------
137-
hdf_tables: list of strings
138-
HDF keys with data corresponding to destination SQL table
139-
(assumption being that HDF tables:SQL tables is many:one)
140-
hdf_meta: HDFMetadata object
141-
Information from the HDF file for use in building copy objects
142-
defer_sql_objs: bool
143-
multiprocessing has issue with passing SQLALchemy objects, so if
144-
True, defer attributing these to the object until after pickled by Pool
145-
conn: SQLAlchemy connection
146-
Managed outside of the object
147-
table_obj: SQLAlchemy model object
148-
Destination SQL Table
149-
sql_table: string
150-
SQL table name
151-
csv_chunksize: int
152-
Max rows to keep in memory when generating CSV for COPY
153-
"""
154-
super().__init__(
155-
hdf_tables,
156-
hdf_meta,
157-
defer_sql_objs,
158-
conn,
159-
table_obj,
160-
sql_table,
161-
csv_chunksize,
162-
)
163-
164128
def hdf_to_pg(self, data_formatters=[cast_pandas], data_formatter_kwargs={}):
165129
"""
166130
Copy each HDF table that relates to SQL table to database
@@ -206,46 +170,6 @@ class BigHDFTableCopy(HDFTableCopy):
206170
pd.read_hdf(..., iterator=True) because we found the performance was much better.
207171
"""
208172

209-
def __init__(
210-
self,
211-
hdf_tables,
212-
hdf_meta,
213-
defer_sql_objs=False,
214-
conn=None,
215-
table_obj=None,
216-
sql_table=None,
217-
csv_chunksize=10 ** 6,
218-
):
219-
"""
220-
Parameters
221-
----------
222-
hdf_tables: list of strings
223-
HDF keys with data corresponding to destination SQL table
224-
(assumption being that HDF tables:SQL tables is many:one)
225-
hdf_meta: HDFMetadata object
226-
Information from the HDF file for use in building copy objects
227-
defer_sql_objs: bool
228-
multiprocessing has issue with passing SQLALchemy objects, so if
229-
True, defer attributing these to the object until after pickled by Pool
230-
conn: SQLAlchemy connection
231-
Managed outside of the object
232-
table_obj: SQLAlchemy model object
233-
Destination SQL Table
234-
sql_table: string
235-
SQL table name
236-
csv_chunksize: int
237-
Max rows to keep in memory when generating CSV for COPY
238-
"""
239-
super().__init__(
240-
hdf_tables,
241-
hdf_meta,
242-
defer_sql_objs,
243-
conn,
244-
table_obj,
245-
sql_table,
246-
csv_chunksize,
247-
)
248-
249173
def hdf_to_pg(self, data_formatters=[cast_pandas], data_formatter_kwargs={}):
250174
"""
251175
Copy each HDF table that relates to SQL table to database
@@ -275,7 +199,7 @@ def hdf_to_pg(self, data_formatters=[cast_pandas], data_formatter_kwargs={}):
275199
start = 0
276200

277201
for i in range(n_chunks):
278-
logger.info("*** HDF chunk {i + 1} of {} ***".format(n_chunks))
202+
logger.info("*** HDF chunk {i} of {n} ***".format(i=i + 1, n=n_chunks))
279203
logger.info("Reading HDF table")
280204
stop = min(start + self.hdf_chunksize, nrows)
281205
df = pd.read_hdf(self.file_name, key=hdf_table, start=start, stop=stop)

0 commit comments

Comments
 (0)