Skip to content

fixed Invalid base64-encoded error on ec2 cfn template #38

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions streamlit_app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,26 @@ def format_response(response_body):
response_data = None

try:
# Extract the response and trace data
all_data = format_response(response_data['response'])
the_response = response_data['trace_data']
except:
# Check if response_data is None or contains error
if response_data is None:
all_data = "No response data"
the_response = "Failed to get response from agent"
elif 'error' in response_data:
all_data = "Error occurred"
the_response = f"Agent error: {response_data['error']}"
elif 'response' in response_data and 'trace_data' in response_data:
# Extract the response and trace data
all_data = format_response(response_data['response'])
the_response = response_data['trace_data']
else:
all_data = f"Unexpected response format: {list(response_data.keys()) if response_data else 'None'}"
the_response = f"Response data: {response_data}"
except Exception as e:
print(f"Error extracting response data: {e}")
print(f"Response data: {response_data}")
print(f"Full response: {response}")
all_data = "..."
the_response = "Apologies, but an error occurred. Please rerun the application"
the_response = f"Error occurred: {str(e)}"

# Use trace_data and formatted_response as needed
st.sidebar.text_area("", value=all_data, height=300)
Expand Down
83 changes: 60 additions & 23 deletions streamlit_app/invoke_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
from botocore.credentials import Credentials
from requests import request

ssm = boto3.client('ssm')
# Region configuration moved here to be available for all clients
theRegion = "us-west-2"
os.environ["AWS_REGION"] = theRegion

ssm = boto3.client('ssm', region_name=theRegion)

# ---------------------------------------------------------------------
# Replace with your actual Agent ID and Alias ID below:
Expand All @@ -25,11 +29,7 @@
#agentAliasId = ssm.get_parameter(Name='/alias-id', WithDecryption=True)['Parameter']['Value'] #valid if CFN infrastructure templates were ran


# ---------------------------------------------------------------------
# REGION CONFIGURATION:
# ---------------------------------------------------------------------
theRegion = "us-west-2"
os.environ["AWS_REGION"] = theRegion
# Region configuration moved to top of file

# ---------------------------------------------------------------------
# HELPER FUNCTION TO GET AWS CREDENTIALS SAFELY
Expand Down Expand Up @@ -157,27 +157,64 @@ def decode_response(response):
final_response = ""
for idx in range(len(split_response)):
if "bytes" in split_response[idx]:
encoded_last_response = split_response[idx].split("\"")[3]
decoded = base64.b64decode(encoded_last_response)
final_response_chunk = decoded.decode('utf-8')
print(final_response_chunk)
try:
# More robust extraction of base64 content
segment = split_response[idx]
# Find the bytes field and extract the base64 string
bytes_start = segment.find('"bytes":"') + len('"bytes":"')
bytes_end = segment.find('"', bytes_start)
if bytes_start > len('"bytes":"') - 1 and bytes_end > bytes_start:
encoded_last_response = segment[bytes_start:bytes_end]
# Add padding if needed for base64 decoding
missing_padding = len(encoded_last_response) % 4
if missing_padding:
encoded_last_response += '=' * (4 - missing_padding)
decoded = base64.b64decode(encoded_last_response)
final_response_chunk = decoded.decode('utf-8')
print(final_response_chunk)
final_response += final_response_chunk
else:
print(f"Could not extract base64 from: {segment}")
except (base64.binascii.Error, UnicodeDecodeError) as e:
print(f"Error decoding base64 at index {idx}: {e}")
print(f"Raw data: {split_response[idx]}")
else:
print(f"No bytes at index {idx}")
print(split_response[idx])

# Attempt to parse the last part for finalResponse
last_response = split_response[-1]
print(f"Last Response: {last_response}")
if "bytes" in last_response:
print("Bytes in last response")
encoded_last_response = last_response.split("\"")[3]
decoded = base64.b64decode(encoded_last_response)
final_response = decoded.decode('utf-8')
else:
print("No bytes in last response")
part1 = string[string.find('finalResponse')+len('finalResponse":'):]
part2 = part1[:part1.find('"}')+2]
final_response = json.loads(part2)['text']
# If we didn't get a response from the loop above, try the final response parsing
if not final_response:
last_response = split_response[-1]
print(f"Last Response: {last_response}")
if "bytes" in last_response:
print("Bytes in last response")
try:
# More robust extraction of base64 content
bytes_start = last_response.find('"bytes":"') + len('"bytes":"')
bytes_end = last_response.find('"', bytes_start)
if bytes_start > len('"bytes":"') - 1 and bytes_end > bytes_start:
encoded_last_response = last_response[bytes_start:bytes_end]
# Add padding if needed for base64 decoding
missing_padding = len(encoded_last_response) % 4
if missing_padding:
encoded_last_response += '=' * (4 - missing_padding)
decoded = base64.b64decode(encoded_last_response)
final_response = decoded.decode('utf-8')
else:
print(f"Could not extract base64 from last response: {last_response}")
final_response = "Could not parse final response"
except (base64.binascii.Error, UnicodeDecodeError) as e:
print(f"Error decoding final response: {e}")
final_response = f"Error decoding response: {e}"
else:
print("No bytes in last response")
try:
part1 = string[string.find('finalResponse')+len('finalResponse":'):]
part2 = part1[:part1.find('"}')+2]
final_response = json.loads(part2)['text']
except (json.JSONDecodeError, KeyError) as e:
print(f"Error parsing final response JSON: {e}")
final_response = f"Error parsing response: {e}"

# Cleanup the final response
final_response = final_response.replace("\"", "")
Expand Down