Skip to content

Commit

Permalink
Merge branch 'dev' into pwuts/open-2314-fix-up-and-re-introduce-libra…
Browse files Browse the repository at this point in the history
…ry-v2-back-end-work
  • Loading branch information
Pwuts committed Feb 13, 2025
2 parents 97db355 + b5b9a00 commit 91a606e
Show file tree
Hide file tree
Showing 13 changed files with 763 additions and 61 deletions.
56 changes: 41 additions & 15 deletions autogpt_platform/backend/backend/data/execution.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from collections import defaultdict
from datetime import datetime, timezone
from multiprocessing import Manager
from typing import Any, AsyncGenerator, Generator, Generic, Optional, TypeVar
from typing import Any, AsyncGenerator, Generator, Generic, Optional, Type, TypeVar

from prisma import Json
from prisma.enums import AgentExecutionStatus
from prisma.errors import PrismaError
from prisma.models import (
Expand All @@ -15,7 +16,8 @@
from backend.data.block import BlockData, BlockInput, CompletedBlockOutput
from backend.data.includes import EXECUTION_RESULT_INCLUDE, GRAPH_EXECUTION_INCLUDE
from backend.data.queue import AsyncRedisEventBus, RedisEventBus
from backend.util import json, mock
from backend.server.v2.store.exceptions import DatabaseError
from backend.util import mock, type
from backend.util.settings import Config


Expand Down Expand Up @@ -101,16 +103,16 @@ def from_graph(graph: AgentGraphExecution):
def from_db(execution: AgentNodeExecution):
if execution.executionData:
# Execution that has been queued for execution will persist its data.
input_data = json.loads(execution.executionData, target_type=dict[str, Any])
input_data = type.convert(execution.executionData, dict[str, Any])
else:
# For incomplete execution, executionData will not be yet available.
input_data: BlockInput = defaultdict()
for data in execution.Input or []:
input_data[data.name] = json.loads(data.data)
input_data[data.name] = type.convert(data.data, Type[Any])

output_data: CompletedBlockOutput = defaultdict(list)
for data in execution.Output or []:
output_data[data.name].append(json.loads(data.data))
output_data[data.name].append(type.convert(data.data, Type[Any]))

graph_execution: AgentGraphExecution | None = execution.AgentGraphExecution

Expand Down Expand Up @@ -158,7 +160,7 @@ async def create_graph_execution(
"executionStatus": ExecutionStatus.INCOMPLETE,
"Input": {
"create": [
{"name": name, "data": json.dumps(data)}
{"name": name, "data": Json(data)}
for name, data in node_input.items()
]
},
Expand Down Expand Up @@ -211,7 +213,7 @@ async def upsert_execution_input(
order={"addedTime": "asc"},
include={"Input": True},
)
json_input_data = json.dumps(input_data)
json_input_data = Json(input_data)

if existing_execution:
await AgentNodeExecutionInputOutput.prisma().create(
Expand All @@ -223,7 +225,7 @@ async def upsert_execution_input(
)
return existing_execution.id, {
**{
input_data.name: json.loads(input_data.data)
input_data.name: type.convert(input_data.data, Type[Any])
for input_data in existing_execution.Input or []
},
input_name: input_data,
Expand Down Expand Up @@ -257,7 +259,7 @@ async def upsert_execution_output(
await AgentNodeExecutionInputOutput.prisma().create(
data={
"name": output_name,
"data": json.dumps(output_data),
"data": Json(output_data),
"referencedByOutputExecId": node_exec_id,
}
)
Expand All @@ -282,7 +284,7 @@ async def update_graph_execution_stats(
where={"id": graph_exec_id},
data={
"executionStatus": status,
"stats": json.dumps(stats),
"stats": Json(stats),
},
)
if not res:
Expand All @@ -294,7 +296,7 @@ async def update_graph_execution_stats(
async def update_node_execution_stats(node_exec_id: str, stats: dict[str, Any]):
await AgentNodeExecution.prisma().update(
where={"id": node_exec_id},
data={"stats": json.dumps(stats)},
data={"stats": Json(stats)},
)


Expand All @@ -314,8 +316,8 @@ async def update_execution_status(
**({"startedTime": now} if status == ExecutionStatus.RUNNING else {}),
**({"endedTime": now} if status == ExecutionStatus.FAILED else {}),
**({"endedTime": now} if status == ExecutionStatus.COMPLETED else {}),
**({"executionData": json.dumps(execution_data)} if execution_data else {}),
**({"stats": json.dumps(stats)} if stats else {}),
**({"executionData": Json(execution_data)} if execution_data else {}),
**({"stats": Json(stats)} if stats else {}),
}

res = await AgentNodeExecution.prisma().update(
Expand Down Expand Up @@ -366,6 +368,31 @@ async def get_execution_results(graph_exec_id: str) -> list[ExecutionResult]:
return res


async def get_executions_in_timerange(
user_id: str, start_time: str, end_time: str
) -> list[ExecutionResult]:
try:
executions = await AgentGraphExecution.prisma().find_many(
where={
"AND": [
{
"startedAt": {
"gte": datetime.fromisoformat(start_time),
"lte": datetime.fromisoformat(end_time),
}
},
{"userId": user_id},
]
},
include=GRAPH_EXECUTION_INCLUDE,
)
return [ExecutionResult.from_graph(execution) for execution in executions]
except Exception as e:
raise DatabaseError(
f"Failed to get executions in timerange {start_time} to {end_time} for user {user_id}: {e}"
) from e


LIST_SPLIT = "_$_"
DICT_SPLIT = "_#_"
OBJC_SPLIT = "_@_"
Expand Down Expand Up @@ -449,8 +476,7 @@ async def get_latest_execution(node_id: str, graph_eid: str) -> ExecutionResult
where={
"agentNodeId": node_id,
"agentGraphExecutionId": graph_eid,
"executionStatus": {"not": ExecutionStatus.INCOMPLETE},
"executionData": {"not": None}, # type: ignore
"executionStatus": {"not": ExecutionStatus.INCOMPLETE}, # type: ignore
},
order={"queuedTime": "desc"},
include=EXECUTION_RESULT_INCLUDE,
Expand Down
23 changes: 11 additions & 12 deletions autogpt_platform/backend/backend/data/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Any, Literal, Optional, Type

import prisma
from prisma import Json
from prisma.models import (
AgentGraph,
AgentGraphExecution,
Expand All @@ -18,7 +19,7 @@

from backend.blocks.agent import AgentExecutorBlock
from backend.blocks.basic import AgentInputBlock, AgentOutputBlock
from backend.util import json
from backend.util import type

from .block import BlockInput, BlockType, get_block, get_blocks
from .db import BaseDbModel, transaction
Expand Down Expand Up @@ -74,8 +75,8 @@ def from_db(node: AgentNode):
obj = NodeModel(
id=node.id,
block_id=node.AgentBlock.id,
input_default=json.loads(node.constantInput, target_type=dict[str, Any]),
metadata=json.loads(node.metadata, target_type=dict[str, Any]),
input_default=type.convert(node.constantInput, dict[str, Any]),
metadata=type.convert(node.metadata, dict[str, Any]),
graph_id=node.agentGraphId,
graph_version=node.agentGraphVersion,
webhook_id=node.webhookId,
Expand Down Expand Up @@ -125,7 +126,7 @@ def from_db(execution: AgentGraphExecution):
total_run_time = duration

try:
stats = json.loads(execution.stats or "{}", target_type=dict[str, Any])
stats = type.convert(execution.stats or {}, dict[str, Any])
except ValueError:
stats = {}

Expand Down Expand Up @@ -402,11 +403,9 @@ def _process_node(node: AgentNode, for_export: bool) -> AgentNode:
if for_export:
# Remove credentials from node input
if node.constantInput:
constant_input = json.loads(
node.constantInput, target_type=dict[str, Any]
)
constant_input = type.convert(node.constantInput, dict[str, Any])
constant_input = GraphModel._hide_node_input_credentials(constant_input)
node.constantInput = json.dumps(constant_input)
node.constantInput = Json(constant_input)

# Remove webhook info
node.webhookId = None
Expand Down Expand Up @@ -654,8 +653,8 @@ async def __create_graph(tx, graph: Graph, user_id: str):
{
"id": node.id,
"agentBlockId": node.block_id,
"constantInput": json.dumps(node.input_default),
"metadata": json.dumps(node.metadata),
"constantInput": Json(node.input_default),
"metadata": Json(node.metadata),
}
for node in graph.nodes
]
Expand Down Expand Up @@ -742,7 +741,7 @@ async def fix_llm_provider_credentials():
raise RuntimeError(f"Impossible state while processing node {node}")

node_id: str = node["node_id"]
node_preset_input: dict = json.loads(node["node_preset_input"])
node_preset_input: dict = node["node_preset_input"]
credentials_meta: dict = node_preset_input["credentials"]

credentials = next(
Expand Down Expand Up @@ -778,5 +777,5 @@ async def fix_llm_provider_credentials():
store.update_creds(user_id, credentials)
await AgentNode.prisma().update(
where={"id": node_id},
data={"constantInput": json.dumps(node_preset_input)},
data={"constantInput": Json(node_preset_input)},
)
Loading

0 comments on commit 91a606e

Please sign in to comment.