-
Notifications
You must be signed in to change notification settings - Fork 800
Open
Labels
bugSomething isn't workingSomething isn't working
Description
Describe your environment
OS: MacOs
Python version: 3.12.10
Package version: opentelemetry-instrumentation 0.59b0
What happened?
What happened?
Getting Type Error JSON object must be str, bytes or bytearray, not dict while parsing tool use input in _decode_tool_use
Steps to Reproduce
- Create
run_with_otel.sh(see below) - Create
bedrock_empty_tool_args.py(see below) - In one terminal, run
./run_with_otel.sh - In second terminal, run
curl -X POST http://localhost:8080/invocations -H "Content-Type: application/json" -d '{"input": "what are my accounts?"}'
Callout: If I run with no instrumentation (python bedrock_empty_tool_args.py), call works just fine.
Scripts:
run_with_otel.sh:
#!/bin/bash
# Install dependencies
pip freeze | xargs pip uninstall -y
pip install boto3 langchain-aws langgraph bedrock-agentcore
pip install opentelemetry-distro==0.59b0
opentelemetry-bootstrap -a install
# Set AWS credentials
export AWS_ACCESS_KEY_ID=*********
export AWS_SECRET_ACCESS_KEY=*********
export AWS_SESSION_TOKEN=*********
# Set environment variables
OTEL_TRACES_EXPORTER=console \
OTEL_METRICS_EXPORTER=none \
OTEL_LOGS_EXPORTER=none \
opentelemetry-instrument python bedrock_empty_tool_args.py
bedrock_empty_tool_args.py:
import asyncio
import os
from bedrock_agentcore import BedrockAgentCoreApp
from langchain_aws import ChatBedrock
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.tools import tool
from langgraph.graph import MessagesState, StateGraph
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_core.runnables import RunnableConfig
from langgraph.graph.message import BaseMessage
from langgraph.graph.state import CompiledStateGraph
from langgraph.types import Command
from langchain_core.messages.ai import AIMessageChunk
from langgraph.typing import ContextT, OutputT, StateT, InputT
import logging
logger = logging.getLogger(__name__)
class StreamingQueue:
"""Simple async queue for streaming responses."""
def __init__(self):
self._queue = asyncio.Queue()
self._finished = False
async def put(self, data: str) -> None:
"""Add an item to the queue."""
await self._queue.put({"event": "message", "data": data})
async def finish(self) -> None:
"""Mark the queue as finished and add sentinel value."""
self._finished = True
await self._queue.put(None)
async def stream(self):
"""Stream items from the queue until finished."""
while True:
item = await self._queue.get()
if item is None and self._finished:
break
yield item
async def stream_messages(
graph: CompiledStateGraph[StateT, ContextT, InputT, OutputT],
streaming_queue: StreamingQueue,
graph_input: InputT | Command,
graph_config: RunnableConfig | None = None,
) -> None:
"""Stream graph response with stream mode 'messages'"""
async for chunk in graph.astream(
input=graph_input,
stream_mode="messages",
config=graph_config,
):
# Handle the streaming chunk properly - it's a tuple with (message, metadata)
if chunk and len(chunk) > 0:
message_chunk: BaseMessage = chunk[0] # type: ignore
logger.info("Chunk: %s", message_chunk)
if isinstance(message_chunk, AIMessageChunk):
# Do not send the tool messages to the user
await streaming_queue.put(message_chunk.text())
else:
logger.info("Invalid chunk: %s", chunk)
await streaming_queue.put("We cannot get an answer at this moment. Try again later.")
app = BedrockAgentCoreApp()
queue = StreamingQueue()
@tool
def salesforce_search():
"""Tool used when you need to search for an account name in our Salesforce system"""
search_results = {"records": [{"account_name": "Test account", "account_id": "ABC123XYZ"}]}
return search_results
@tool
def salesforce_account_details(account_id: str):
"""Tool used when you need to search for an account id in our Salesforce system"""
search_results = {
"account_name": "Test account",
"account_id": f"{account_id}",
"arr": 1312.00,
}
return search_results
def create_agent():
"""Create and configure the LangGraph agent"""
# Initialize your LLM (adjust model and parameters as needed)
llm = ChatBedrock(
model="us.anthropic.claude-sonnet-4-20250514-v1:0",
model_kwargs={"temperature": 0.1},
region=os.environ.get("AWS_REGION_NAME", "us-east-1"),
)
# Bind tools to the LLM
tools = [salesforce_search, salesforce_account_details]
llm_with_tools = llm.bind_tools(tools)
# System message
system_message = """You are the Salesforce Search Assistant, you can help users find the right details for their accounts.
CAPABILITIES:
- You can search for matching accounts with the salesforce_search tool
- You can look up account details with the salesforce_account_details tool
"""
# Define the chatbot node
async def chatbot(state: MessagesState):
raw_messages = state["messages"]
# Remove any existing system messages to avoid duplicates or misplacement
non_system_messages = [msg for msg in raw_messages if not isinstance(msg, SystemMessage)]
# Always ensure SystemMessage is first
messages = [SystemMessage(content=system_message), *non_system_messages]
# Get response from model with tools bound
response = await llm_with_tools.ainvoke(messages)
# Append response to full message history
return {"messages": [*raw_messages, response]}
# Create the graph
graph_builder = StateGraph(MessagesState)
# Add nodes
graph_builder.add_node("chatbot", chatbot)
graph_builder.add_node("tools", ToolNode(tools))
# Add edges
graph_builder.add_conditional_edges(
"chatbot",
tools_condition,
)
graph_builder.add_edge("tools", "chatbot")
# Set entry point
graph_builder.set_entry_point("chatbot")
# Compile the graph
return graph_builder.compile()
async def agent_task(payload):
"""Invoke the agent with a payload"""
await queue.put("Begin agent execution")
try:
user_input = payload.get(
"input",
"No user input provided",
)
agent = create_agent()
#response = await agent.ainvoke({"messages": [HumanMessage(content=user_input)]})
await stream_messages(agent, queue, {"messages": [HumanMessage(content=user_input)]})
# await queue.put(response["messages"][-1].content)
except Exception as e:
logger.exception(e)
await queue.put(f"An exception occurred: {e!s}")
finally:
await queue.finish()
@app.entrypoint
async def agent_invocation(payload, context):
"""Main entrypoint for agent invocation."""
# Create and start the agent task
task = asyncio.create_task(agent_task(payload=payload))
async def stream_with_task():
"""Stream results while ensuring task completion."""
async for item in queue.stream():
yield item
await task # Ensure task completes
return stream_with_task()
if __name__ == "__main__":
logger.info("Starting Agent")
app.run()
Expected Result
- In second terminal:
thp@842f577dc4b0 Downloads % curl -X POST http://localhost:8080/invocations -H "Content-Type: application/json" -d '{"input": "what are my accounts?"}'
data: {"event": "message", "data": "Begin agent execution"}
data: {"event": "message", "data": ""}
data: {"event": "message", "data": "I"}
data: {"event": "message", "data": "'ll help you find your accounts in Sales"}
data: {"event": "message", "data": "force. Let me search for them"}
data: {"event": "message", "data": " now."}
data: {"event": "message", "data": ""}
data: {"event": "message", "data": ""}
data: {"event": "message", "data": ""}
data: {"event": "message", "data": ""}
data: {"event": "message", "data": ""}
data: {"event": "message", "data": "Base"}
data: {"event": "message", "data": "d on the search results, I"}
data: {"event": "message", "data": " found one account associated with you:\n\n**"}
data: {"event": "message", "data": "Account Name:** Test account "}
data: {"event": "message", "data": "\n**Account ID:** ABC123X"}
data: {"event": "message", "data": "YZ\n\nWould you like me to"}
data: {"event": "message", "data": " get more detailed information about this account,"}
data: {"event": "message", "data": " or do you need help with anything else regarding"}
data: {"event": "message", "data": " your Salesforce accounts?"}
data: {"event": "message", "data": ""}
data: {"event": "message", "data": ""}
Actual Result
- In second terminal:
curl -X POST http://localhost:8080/invocations \
-H "Content-Type: application/json" \
-d '{"input": "what are my accounts?"}'
data: {"event": "message", "data": "Begin agent execution"}
data: {"event": "message", "data": ""}
data: {"event": "message", "data": "I'll help you find your accounts in"}
data: {"event": "message", "data": " Salesforce. Let me search for"}
data: {"event": "message", "data": " them now."}
data: {"event": "message", "data": ""}
data: {"event": "message", "data": ""}
data: {"event": "message", "data": "An exception occurred: the JSON object must be str, bytes or bytearray, not dict"}
- In first terminal:
the JSON object must be str, bytes or bytearray, not dict
Traceback (most recent call last):
File "<sanitized>/bedrock_empty_tool_args.py", line 160, in agent_task
await stream_messages(agent, queue, {"messages": [HumanMessage(content=user_input)]})
File "<sanitized>/bedrock_empty_tool_args.py", line 53, in stream_messages
async for chunk in graph.astream(
File "<sanitized>.local/share/mise/installs/python/3.12.10/lib/python3.12/site-packages/langgraph/pregel/main.py", line 2993, in astream
async for _ in runner.atick(
File "<sanitized>.local/share/mise/installs/python/3.12.10/lib/python3.12/site-packages/langgraph/pregel/_runner.py", line 401, in atick
_panic_or_proceed(
File "<sanitized>.local/share/mise/installs/python/3.12.10/lib/python3.12/site-packages/langgraph/pregel/_runner.py", line 511, in _panic_or_proceed
raise exc
File "<sanitized>.local/share/mise/installs/python/3.12.10/lib/python3.12/site-packages/langgraph/pregel/_retry.py", line 137, in arun_with_retry
return await task.proc.ainvoke(task.input, config)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<sanitized>.local/share/mise/installs/python/3.12.10/lib/python3.12/site-packages/langgraph/_internal/_runnable.py", line 706, in ainvoke
input = await asyncio.create_task(
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<sanitized>.local/share/mise/installs/python/3.12.10/lib/python3.12/site-packages/opentelemetry/instrumentation/asyncio/__init__.py", line 299, in trace_coroutine
return await coro
^^^^^^^^^^
File "<sanitized>.local/share/mise/installs/python/3.12.10/lib/python3.12/site-packages/langgraph/_internal/_runnable.py", line 474, in ainvoke
ret = await self.afunc(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<sanitized>/bedrock_empty_tool_args.py", line 122, in chatbot
response = await llm_with_tools.ainvoke(messages)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<sanitized>.local/share/mise/installs/python/3.12.10/lib/python3.12/site-packages/langchain_core/runnables/base.py", line 5724, in ainvoke
return await self.bound.ainvoke(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "<sanitized>.local/share/mise/installs/python/3.12.10/lib/python3.12/site-packages/langchain_core/language_models/chat_models.py", line 417, in ainvoke
llm_result = await self.agenerate_prompt(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<sanitized>.local/share/mise/installs/python/3.12.10/lib/python3.12/site-packages/langchain_core/language_models/chat_models.py", line 1036, in agenerate_prompt
return await self.agenerate(
^^^^^^^^^^^^^^^^^^^^^
File "<sanitized>.local/share/mise/installs/python/3.12.10/lib/python3.12/site-packages/langchain_core/language_models/chat_models.py", line 994, in agenerate
raise exceptions[0]
File "<sanitized>.local/share/mise/installs/python/3.12.10/lib/python3.12/site-packages/opentelemetry/instrumentation/asyncio/__init__.py", line 299, in trace_coroutine
return await coro
^^^^^^^^^^
File "<sanitized>.local/share/mise/installs/python/3.12.10/lib/python3.12/site-packages/langchain_core/language_models/chat_models.py", line 1153, in _agenerate_with_cache
async for chunk in self._astream(messages, stop=stop, **kwargs):
File "<sanitized>.local/share/mise/installs/python/3.12.10/lib/python3.12/site-packages/langchain_core/language_models/chat_models.py", line 1281, in _astream
item = await run_in_executor(
^^^^^^^^^^^^^^^^^^^^^^
File "<sanitized>.local/share/mise/installs/python/3.12.10/lib/python3.12/site-packages/langchain_core/runnables/config.py", line 611, in run_in_executor
return await asyncio.get_running_loop().run_in_executor(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<sanitized>.local/share/mise/installs/python/3.12.10/lib/python3.12/concurrent/futures/thread.py", line 59, in run
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<sanitized>.local/share/mise/installs/python/3.12.10/lib/python3.12/site-packages/opentelemetry/instrumentation/threading/__init__.py", line 171, in wrapped_func
return original_func(*func_args, **func_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<sanitized>.local/share/mise/installs/python/3.12.10/lib/python3.12/site-packages/langchain_core/runnables/config.py", line 602, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "<sanitized>.local/share/mise/installs/python/3.12.10/lib/python3.12/site-packages/langchain_aws/chat_models/bedrock.py", line 869, in _stream
for chunk in self._prepare_input_and_invoke_stream(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<sanitized>.local/share/mise/installs/python/3.12.10/lib/python3.12/site-packages/langchain_aws/llms/bedrock.py", line 1165, in _prepare_input_and_invoke_stream
for chunk in LLMInputOutputAdapter.prepare_output_stream(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<sanitized>.local/share/mise/installs/python/3.12.10/lib/python3.12/site-packages/langchain_aws/llms/bedrock.py", line 509, in prepare_output_stream
for event in stream:
^^^^^^
File "<sanitized>.local/share/mise/installs/python/3.12.10/lib/python3.12/site-packages/opentelemetry/instrumentation/botocore/extensions/bedrock_utils.py", line 211, in __iter__
self._process_event(event)
File "<sanitized>.local/share/mise/installs/python/3.12.10/lib/python3.12/site-packages/opentelemetry/instrumentation/botocore/extensions/bedrock_utils.py", line 233, in _process_event
self._process_anthropic_claude_chunk(chunk)
File "<sanitized>.local/share/mise/installs/python/3.12.10/lib/python3.12/site-packages/opentelemetry/instrumentation/botocore/extensions/bedrock_utils.py", line 373, in _process_anthropic_claude_chunk
_decode_tool_use(self._content_block)
File "<sanitized>.local/share/mise/installs/python/3.12.10/lib/python3.12/site-packages/opentelemetry/instrumentation/botocore/extensions/bedrock_utils.py", line 43, in _decode_tool_use
tool_use["input"] = json.loads(tool_use["input"])
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<sanitized>.local/share/mise/installs/python/3.12.10/lib/python3.12/json/__init__.py", line 339, in loads
raise TypeError(f'the JSON object must be str, bytes or bytearray, '
TypeError: the JSON object must be str, bytes or bytearray, not dict
During task with name 'chatbot' and id '5ca8992d-7886-79b4-af0e-86f7e4e8518b'
Additional context
See #3843
Would you like to implement a fix?
Yes
Tip
React with 👍 to help prioritize this issue. Please use comments to provide useful context, avoiding +1 or me too, to help us triage it. Learn more here.
thpierce
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working