Skip to content

Commit 37e8870

Browse files
pgp-encryption
1 parent 9e60575 commit 37e8870

File tree

7 files changed

+775
-0
lines changed

7 files changed

+775
-0
lines changed

CyberSource/api/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,4 @@
6363
from .reversal_api import ReversalApi
6464
from .taxes_api import TaxesApi
6565
from .void_api import VoidApi
66+
from .batch_upload_with_mtls_api import BatchUploadWithMTLSApi
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
"""
2+
Batch Upload with mTLS API for CyberSource.
3+
4+
This module provides functionality for batch uploads to CyberSource using mTLS authentication,
5+
supporting both file-based and in-memory operations with various configuration options.
6+
"""
7+
8+
from pathlib import Path
9+
from typing import Optional
10+
11+
import CyberSource.logging.log_factory as LogFactory
12+
from CyberSource.utilities.pgpBatchUpload.mutual_auth_upload import MutualAuthUpload
13+
from CyberSource.utilities.pgpBatchUpload.pgp_encryption import PgpEncryption
14+
15+
16+
def _validate_paths(paths: list) -> None:
17+
"""
18+
Validate that all specified paths exist.
19+
20+
Args:
21+
paths: List of tuples containing (path, description)
22+
23+
Raises:
24+
FileNotFoundError: If any of the paths don't exist
25+
"""
26+
for path, description in paths:
27+
if path is None:
28+
continue # Skip validation for None paths
29+
if not path:
30+
raise ValueError(f"{description} path is required")
31+
if not Path(path).exists():
32+
raise FileNotFoundError(f"{description} not found: {path}")
33+
34+
35+
class BatchUploadWithMTLSApi:
36+
"""
37+
A class for handling batch uploads to CyberSource using mTLS authentication.
38+
39+
This class provides methods for encrypting and uploading batch files to CyberSource
40+
using PGP encryption and mutual TLS authentication. It supports various authentication
41+
methods including separate key/cert files.
42+
43+
Attributes:
44+
logger: Logger instance for logging operations and errors
45+
pgp_encryption: PgpEncryption instance for handling PGP encryption
46+
mutual_auth_upload: MutualAuthUpload instance for handling mTLS uploads
47+
"""
48+
49+
_end_point = "/pts/v1/transaction-batch-upload"
50+
51+
def __init__(self, log_config=None):
52+
"""
53+
Initialize the BatchUploadWithMTLSApi.
54+
55+
Args:
56+
log_config: Optional configuration for the logger
57+
"""
58+
self.logger = LogFactory.setup_logger(self.__class__.__name__, log_config)
59+
self.pgp_encryption = PgpEncryption(log_config)
60+
self.mutual_auth_upload = MutualAuthUpload(log_config)
61+
62+
def upload_batch_api_with_separate_key_and_cert_file(
63+
self,
64+
input_file: str,
65+
environment_hostname: str,
66+
pgp_encryption_public_key_path: str,
67+
client_cert_path: str,
68+
client_key_path: str,
69+
server_trust_cert_path: str = None,
70+
client_cert_password: Optional[str] = None,
71+
verify_ssl: bool = True,
72+
) -> None:
73+
"""
74+
Upload a batch file using separate key and certificate files.
75+
76+
This method encrypts the input file using PGP and uploads it to CyberSource
77+
using mutual TLS authentication with separate key and certificate files.
78+
79+
Args:
80+
input_file: Path to the file to upload
81+
environment_hostname: CyberSource environment hostname
82+
pgp_encryption_public_key_path: Path to the PGP public key file
83+
client_cert_path: Path to the client certificate file
84+
client_key_path: Path to the client private key file
85+
server_trust_cert_path: Path to the server trust certificate file (optional)
86+
client_cert_password: Optional password for the client certificate file
87+
verify_ssl: Whether to verify SSL certificates (default: True). Set to False only for development purposes.
88+
89+
Raises:
90+
ValueError: If required parameters are missing or invalid
91+
FileNotFoundError: If any of the required files don't exist
92+
requests.exceptions.RequestException: If there's an error during the upload process
93+
"""
94+
try:
95+
# Step 1: Create endpoint URL
96+
endpoint_url = self.get_base_url(environment_hostname) + self._end_point
97+
98+
# Step 2: Validations
99+
_validate_paths(
100+
[
101+
(input_file, "Input file"),
102+
(pgp_encryption_public_key_path, "PGP public key"),
103+
(client_cert_path, "Client certificate"),
104+
(client_key_path, "Client private key"),
105+
(server_trust_cert_path, "Server trust certificate"),
106+
]
107+
)
108+
109+
# Validate file size (maximum 75 MB)
110+
file_size = Path(input_file).stat().st_size
111+
max_size_bytes = 75 * 1024 * 1024 # 75 MB in bytes
112+
if file_size > max_size_bytes:
113+
error_msg = f"Input file size ({file_size} bytes) exceeds the maximum allowed size of 75 MB ({max_size_bytes} bytes)"
114+
if self.logger is not None:
115+
self.logger.error(error_msg)
116+
raise ValueError(error_msg)
117+
118+
# Validate file extension (.csv)
119+
file_extension = Path(input_file).suffix.lower()
120+
if file_extension != ".csv":
121+
error_msg = (
122+
f"Input file must have a .csv extension, but got '{file_extension}'"
123+
)
124+
if self.logger is not None:
125+
self.logger.error(error_msg)
126+
raise ValueError(error_msg)
127+
128+
# Step 3: PGP encryption
129+
encrypted_pgp_bytes = self.pgp_encryption.handle_encrypt_operation(
130+
input_file, pgp_encryption_public_key_path
131+
)
132+
133+
# Step 4: Upload the encrypted PGP file using mTLS
134+
# Replace the file extension with .pgp
135+
file_name = Path(input_file).stem + ".pgp"
136+
137+
# Log a warning if SSL verification is disabled
138+
if not verify_ssl and self.logger is not None:
139+
self.logger.warning(
140+
"SSL certificate verification is disabled. This should only be used in development environments."
141+
)
142+
143+
self.mutual_auth_upload.handle_upload_operation_using_private_key_and_certs(
144+
encrypted_pgp_bytes=encrypted_pgp_bytes,
145+
endpoint_url=endpoint_url,
146+
file_name=file_name,
147+
client_private_key_path=client_key_path,
148+
client_cert_path=client_cert_path,
149+
server_trust_cert_path=server_trust_cert_path,
150+
client_cert_password=client_cert_password,
151+
verify_ssl=verify_ssl,
152+
)
153+
if self.logger is not None:
154+
self.logger.info("Batch file uploaded successfully")
155+
except ValueError as e:
156+
if self.logger is not None:
157+
self.logger.error(f"Validation error: {str(e)}")
158+
raise
159+
except FileNotFoundError as e:
160+
if self.logger is not None:
161+
self.logger.error(f"File not found: {str(e)}")
162+
raise
163+
except Exception as e:
164+
if self.logger is not None:
165+
self.logger.error(
166+
f"Error in upload_batch_api_with_separate_key_and_cert_file: {str(e)}"
167+
)
168+
raise
169+
170+
@staticmethod
171+
def get_base_url(environment_hostname: str) -> str:
172+
"""
173+
Get the base URL from the environment hostname.
174+
175+
Args:
176+
environment_hostname: CyberSource environment hostname
177+
178+
Returns:
179+
str: The base URL with https:// prefix if not already present
180+
"""
181+
if not environment_hostname:
182+
raise ValueError("Environment hostname is required")
183+
184+
base_url = environment_hostname.strip()
185+
if not base_url.startswith("https://"):
186+
base_url = "https://" + base_url
187+
return base_url
188+
189+
def __enter__(self):
190+
"""
191+
Enter the context manager.
192+
193+
Returns:
194+
BatchUploadWithMTLSApi: The instance itself
195+
"""
196+
return self
197+
198+
def __exit__(self, exc_type, exc_val, exc_tb):
199+
"""
200+
Exit the context manager.
201+
202+
Args:
203+
exc_type: Exception type if an exception was raised
204+
exc_val: Exception value if an exception was raised
205+
exc_tb: Exception traceback if an exception was raised
206+
"""
207+
# Currently no resources to clean up
208+
pass

CyberSource/utilities/pgpBatchUpload/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)