Skip to content

Commit 9ec082c

Browse files
authored
feat: Pass additional partition projection params to wr.s3.to_parquet & cat… (#1627)
* Pass additional parititon projection params to wr.s3.to_parquet & catalog methods * Support the same args in wr.s3.write_(text)
1 parent 1ddffb1 commit 9ec082c

File tree

3 files changed

+91
-8
lines changed

3 files changed

+91
-8
lines changed

awswrangler/catalog/_create.py

+32-1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ def _create_table( # pylint: disable=too-many-branches,too-many-statements,too-
4444
projection_values: Optional[Dict[str, str]],
4545
projection_intervals: Optional[Dict[str, str]],
4646
projection_digits: Optional[Dict[str, str]],
47+
projection_formats: Optional[Dict[str, str]],
4748
projection_storage_location_template: Optional[str],
4849
catalog_id: Optional[str],
4950
) -> None:
@@ -67,11 +68,13 @@ def _create_table( # pylint: disable=too-many-branches,too-many-statements,too-
6768
projection_values = projection_values if projection_values else {}
6869
projection_intervals = projection_intervals if projection_intervals else {}
6970
projection_digits = projection_digits if projection_digits else {}
71+
projection_formats = projection_formats if projection_formats else {}
7072
projection_types = {sanitize_column_name(k): v for k, v in projection_types.items()}
7173
projection_ranges = {sanitize_column_name(k): v for k, v in projection_ranges.items()}
7274
projection_values = {sanitize_column_name(k): v for k, v in projection_values.items()}
7375
projection_intervals = {sanitize_column_name(k): v for k, v in projection_intervals.items()}
7476
projection_digits = {sanitize_column_name(k): v for k, v in projection_digits.items()}
77+
projection_formats = {sanitize_column_name(k): v for k, v in projection_formats.items()}
7578
for k, v in projection_types.items():
7679
dtype: Optional[str] = partitions_types.get(k)
7780
if dtype is None and projection_storage_location_template is None:
@@ -98,6 +101,10 @@ def _create_table( # pylint: disable=too-many-branches,too-many-statements,too-
98101
mode = _update_if_necessary(
99102
dic=table_input["Parameters"], key=f"projection.{k}.digits", value=str(v), mode=mode
100103
)
104+
for k, v in projection_formats.items():
105+
mode = _update_if_necessary(
106+
dic=table_input["Parameters"], key=f"projection.{k}.format", value=str(v), mode=mode
107+
)
101108
mode = _update_if_necessary(
102109
table_input["Parameters"],
103110
key="storage.location.template",
@@ -266,6 +273,7 @@ def _create_parquet_table(
266273
projection_values: Optional[Dict[str, str]],
267274
projection_intervals: Optional[Dict[str, str]],
268275
projection_digits: Optional[Dict[str, str]],
276+
projection_formats: Optional[Dict[str, str]],
269277
projection_storage_location_template: Optional[str],
270278
boto3_session: Optional[boto3.Session],
271279
catalog_table_input: Optional[Dict[str, Any]],
@@ -318,6 +326,7 @@ def _create_parquet_table(
318326
projection_values=projection_values,
319327
projection_intervals=projection_intervals,
320328
projection_digits=projection_digits,
329+
projection_formats=projection_formats,
321330
projection_storage_location_template=projection_storage_location_template,
322331
catalog_id=catalog_id,
323332
)
@@ -350,6 +359,7 @@ def _create_csv_table( # pylint: disable=too-many-arguments,too-many-locals
350359
projection_values: Optional[Dict[str, str]],
351360
projection_intervals: Optional[Dict[str, str]],
352361
projection_digits: Optional[Dict[str, str]],
362+
projection_formats: Optional[Dict[str, str]],
353363
projection_storage_location_template: Optional[str],
354364
catalog_table_input: Optional[Dict[str, Any]],
355365
catalog_id: Optional[str],
@@ -398,6 +408,7 @@ def _create_csv_table( # pylint: disable=too-many-arguments,too-many-locals
398408
projection_values=projection_values,
399409
projection_intervals=projection_intervals,
400410
projection_digits=projection_digits,
411+
projection_formats=projection_formats,
401412
projection_storage_location_template=projection_storage_location_template,
402413
catalog_id=catalog_id,
403414
)
@@ -428,6 +439,7 @@ def _create_json_table( # pylint: disable=too-many-arguments
428439
projection_values: Optional[Dict[str, str]],
429440
projection_intervals: Optional[Dict[str, str]],
430441
projection_digits: Optional[Dict[str, str]],
442+
projection_formats: Optional[Dict[str, str]],
431443
projection_storage_location_template: Optional[str],
432444
catalog_table_input: Optional[Dict[str, Any]],
433445
catalog_id: Optional[str],
@@ -474,6 +486,7 @@ def _create_json_table( # pylint: disable=too-many-arguments
474486
projection_values=projection_values,
475487
projection_intervals=projection_intervals,
476488
projection_digits=projection_digits,
489+
projection_formats=projection_formats,
477490
projection_storage_location_template=projection_storage_location_template,
478491
catalog_id=catalog_id,
479492
)
@@ -676,6 +689,7 @@ def create_parquet_table(
676689
projection_values: Optional[Dict[str, str]] = None,
677690
projection_intervals: Optional[Dict[str, str]] = None,
678691
projection_digits: Optional[Dict[str, str]] = None,
692+
projection_formats: Optional[Dict[str, str]] = None,
679693
projection_storage_location_template: Optional[str] = None,
680694
boto3_session: Optional[boto3.Session] = None,
681695
) -> None:
@@ -741,6 +755,10 @@ def create_parquet_table(
741755
Dictionary of partitions names and Athena projections digits.
742756
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
743757
(e.g. {'col_name': '1', 'col2_name': '2'})
758+
projection_formats: Optional[Dict[str, str]]
759+
Dictionary of partitions names and Athena projections formats.
760+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
761+
(e.g. {'col_date': 'yyyy-MM-dd', 'col2_timestamp': 'yyyy-MM-dd HH:mm:ss'})
744762
projection_storage_location_template: Optional[str]
745763
Value which is allows Athena to properly map partition values if the S3 file locations do not follow
746764
a typical `.../column=value/...` pattern.
@@ -796,14 +814,15 @@ def create_parquet_table(
796814
projection_values=projection_values,
797815
projection_intervals=projection_intervals,
798816
projection_digits=projection_digits,
817+
projection_formats=projection_formats,
799818
projection_storage_location_template=projection_storage_location_template,
800819
boto3_session=boto3_session,
801820
catalog_table_input=catalog_table_input,
802821
)
803822

804823

805824
@apply_configs
806-
def create_csv_table( # pylint: disable=too-many-arguments
825+
def create_csv_table( # pylint: disable=too-many-arguments,too-many-locals
807826
database: str,
808827
table: str,
809828
path: str,
@@ -830,6 +849,7 @@ def create_csv_table( # pylint: disable=too-many-arguments
830849
projection_values: Optional[Dict[str, str]] = None,
831850
projection_intervals: Optional[Dict[str, str]] = None,
832851
projection_digits: Optional[Dict[str, str]] = None,
852+
projection_formats: Optional[Dict[str, str]] = None,
833853
projection_storage_location_template: Optional[str] = None,
834854
catalog_id: Optional[str] = None,
835855
) -> None:
@@ -908,6 +928,10 @@ def create_csv_table( # pylint: disable=too-many-arguments
908928
Dictionary of partitions names and Athena projections digits.
909929
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
910930
(e.g. {'col_name': '1', 'col2_name': '2'})
931+
projection_formats: Optional[Dict[str, str]]
932+
Dictionary of partitions names and Athena projections formats.
933+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
934+
(e.g. {'col_date': 'yyyy-MM-dd', 'col2_timestamp': 'yyyy-MM-dd HH:mm:ss'})
911935
projection_storage_location_template: Optional[str]
912936
Value which is allows Athena to properly map partition values if the S3 file locations do not follow
913937
a typical `.../column=value/...` pattern.
@@ -967,6 +991,7 @@ def create_csv_table( # pylint: disable=too-many-arguments
967991
projection_values=projection_values,
968992
projection_intervals=projection_intervals,
969993
projection_digits=projection_digits,
994+
projection_formats=projection_formats,
970995
projection_storage_location_template=projection_storage_location_template,
971996
boto3_session=boto3_session,
972997
catalog_table_input=catalog_table_input,
@@ -1003,6 +1028,7 @@ def create_json_table( # pylint: disable=too-many-arguments
10031028
projection_values: Optional[Dict[str, str]] = None,
10041029
projection_intervals: Optional[Dict[str, str]] = None,
10051030
projection_digits: Optional[Dict[str, str]] = None,
1031+
projection_formats: Optional[Dict[str, str]] = None,
10061032
projection_storage_location_template: Optional[str] = None,
10071033
catalog_id: Optional[str] = None,
10081034
) -> None:
@@ -1077,6 +1103,10 @@ def create_json_table( # pylint: disable=too-many-arguments
10771103
Dictionary of partitions names and Athena projections digits.
10781104
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
10791105
(e.g. {'col_name': '1', 'col2_name': '2'})
1106+
projection_formats: Optional[Dict[str, str]]
1107+
Dictionary of partitions names and Athena projections formats.
1108+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
1109+
(e.g. {'col_date': 'yyyy-MM-dd', 'col2_timestamp': 'yyyy-MM-dd HH:mm:ss'})
10801110
projection_storage_location_template: Optional[str]
10811111
Value which is allows Athena to properly map partition values if the S3 file locations do not follow
10821112
a typical `.../column=value/...` pattern.
@@ -1135,6 +1165,7 @@ def create_json_table( # pylint: disable=too-many-arguments
11351165
projection_values=projection_values,
11361166
projection_intervals=projection_intervals,
11371167
projection_digits=projection_digits,
1168+
projection_formats=projection_formats,
11381169
projection_storage_location_template=projection_storage_location_template,
11391170
boto3_session=boto3_session,
11401171
catalog_table_input=catalog_table_input,

awswrangler/s3/_write_parquet.py

+29-3
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,8 @@ def to_parquet( # pylint: disable=too-many-arguments,too-many-locals,too-many-b
225225
projection_values: Optional[Dict[str, str]] = None,
226226
projection_intervals: Optional[Dict[str, str]] = None,
227227
projection_digits: Optional[Dict[str, str]] = None,
228+
projection_formats: Optional[Dict[str, str]] = None,
229+
projection_storage_location_template: Optional[str] = None,
228230
catalog_id: Optional[str] = None,
229231
) -> Dict[str, Union[List[str], Dict[str, List[str]]]]:
230232
"""Write Parquet file or dataset on Amazon S3.
@@ -356,6 +358,15 @@ def to_parquet( # pylint: disable=too-many-arguments,too-many-locals,too-many-b
356358
Dictionary of partitions names and Athena projections digits.
357359
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
358360
(e.g. {'col_name': '1', 'col2_name': '2'})
361+
projection_formats: Optional[Dict[str, str]]
362+
Dictionary of partitions names and Athena projections formats.
363+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
364+
(e.g. {'col_date': 'yyyy-MM-dd', 'col2_timestamp': 'yyyy-MM-dd HH:mm:ss'})
365+
projection_storage_location_template: Optional[str]
366+
Value which is allows Athena to properly map partition values if the S3 file locations do not follow
367+
a typical `.../column=value/...` pattern.
368+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-setting-up.html
369+
(e.g. s3://bucket/table_root/a=${a}/${b}/some_static_subdirectory/${c}/)
359370
catalog_id : str, optional
360371
The ID of the Data Catalog from which to retrieve Databases.
361372
If none is provided, the AWS account ID is used by default.
@@ -622,7 +633,8 @@ def to_parquet( # pylint: disable=too-many-arguments,too-many-locals,too-many-b
622633
projection_values=projection_values,
623634
projection_intervals=projection_intervals,
624635
projection_digits=projection_digits,
625-
projection_storage_location_template=None,
636+
projection_formats=projection_formats,
637+
projection_storage_location_template=projection_storage_location_template,
626638
catalog_id=catalog_id,
627639
catalog_table_input=catalog_table_input,
628640
)
@@ -685,7 +697,8 @@ def to_parquet( # pylint: disable=too-many-arguments,too-many-locals,too-many-b
685697
projection_values=projection_values,
686698
projection_intervals=projection_intervals,
687699
projection_digits=projection_digits,
688-
projection_storage_location_template=None,
700+
projection_formats=projection_formats,
701+
projection_storage_location_template=projection_storage_location_template,
689702
catalog_id=catalog_id,
690703
catalog_table_input=catalog_table_input,
691704
)
@@ -718,7 +731,7 @@ def to_parquet( # pylint: disable=too-many-arguments,too-many-locals,too-many-b
718731

719732

720733
@apply_configs
721-
def store_parquet_metadata( # pylint: disable=too-many-arguments
734+
def store_parquet_metadata( # pylint: disable=too-many-arguments,too-many-locals
722735
path: str,
723736
database: str,
724737
table: str,
@@ -743,6 +756,8 @@ def store_parquet_metadata( # pylint: disable=too-many-arguments
743756
projection_values: Optional[Dict[str, str]] = None,
744757
projection_intervals: Optional[Dict[str, str]] = None,
745758
projection_digits: Optional[Dict[str, str]] = None,
759+
projection_formats: Optional[Dict[str, str]] = None,
760+
projection_storage_location_template: Optional[str] = None,
746761
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
747762
boto3_session: Optional[boto3.Session] = None,
748763
) -> Tuple[Dict[str, str], Optional[Dict[str, str]], Optional[Dict[str, List[str]]]]:
@@ -839,6 +854,15 @@ def store_parquet_metadata( # pylint: disable=too-many-arguments
839854
Dictionary of partitions names and Athena projections digits.
840855
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
841856
(e.g. {'col_name': '1', 'col2_name': '2'})
857+
projection_formats: Optional[Dict[str, str]]
858+
Dictionary of partitions names and Athena projections formats.
859+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
860+
(e.g. {'col_date': 'yyyy-MM-dd', 'col2_timestamp': 'yyyy-MM-dd HH:mm:ss'})
861+
projection_storage_location_template: Optional[str]
862+
Value which is allows Athena to properly map partition values if the S3 file locations do not follow
863+
a typical `.../column=value/...` pattern.
864+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-setting-up.html
865+
(e.g. s3://bucket/table_root/a=${a}/${b}/some_static_subdirectory/${c}/)
842866
s3_additional_kwargs : Optional[Dict[str, Any]]
843867
Forwarded to botocore requests.
844868
e.g. s3_additional_kwargs={'ServerSideEncryption': 'aws:kms', 'SSEKMSKeyId': 'YOUR_KMS_KEY_ARN'}
@@ -907,6 +931,8 @@ def store_parquet_metadata( # pylint: disable=too-many-arguments
907931
projection_values=projection_values,
908932
projection_intervals=projection_intervals,
909933
projection_digits=projection_digits,
934+
projection_formats=projection_formats,
935+
projection_storage_location_template=projection_storage_location_template,
910936
boto3_session=session,
911937
catalog_id=catalog_id,
912938
)

0 commit comments

Comments
 (0)