Skip to content

Commit

Permalink
feat(backend): Migrate json encoded string columns into a native json…
Browse files Browse the repository at this point in the history
… column (#9475)

### Changes 🏗️

Due to the legacy of SQLite usage, some of the JSON columns are actually
a string column string a stringified JSON column.
The scope of this PR is migrating those columns into an actual JSON
column.

### Checklist 📋

#### For code changes:
- [ ] I have clearly listed my changes in the PR description
- [ ] I have made a test plan
- [ ] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
  - [ ] ...

<details>
  <summary>Example test plan</summary>
  
  - [ ] Create from scratch and execute an agent with at least 3 blocks
- [ ] Import an agent from file upload, and confirm it executes
correctly
  - [ ] Upload agent to marketplace
- [ ] Import an agent from marketplace and confirm it executes correctly
  - [ ] Edit an agent from monitor, and confirm it executes correctly
</details>

#### For configuration changes:
- [ ] `.env.example` is updated or already compatible with my changes
- [ ] `docker-compose.yml` is updated or already compatible with my
changes
- [ ] I have included a list of my configuration changes in the PR
description (under **Changes**)

<details>
  <summary>Examples of configuration changes</summary>

  - Changing ports
  - Adding new services that need to communicate with each other
  - Secrets or environment variable changes
  - New or infrastructure changes such as databases
</details>
  • Loading branch information
majdyz authored Feb 13, 2025
1 parent 7e04fbd commit b5b9a00
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 39 deletions.
30 changes: 15 additions & 15 deletions autogpt_platform/backend/backend/data/execution.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from collections import defaultdict
from datetime import datetime, timezone
from multiprocessing import Manager
from typing import Any, AsyncGenerator, Generator, Generic, Optional, TypeVar
from typing import Any, AsyncGenerator, Generator, Generic, Optional, Type, TypeVar

from prisma import Json
from prisma.enums import AgentExecutionStatus
from prisma.errors import PrismaError
from prisma.models import (
Expand All @@ -16,7 +17,7 @@
from backend.data.includes import EXECUTION_RESULT_INCLUDE, GRAPH_EXECUTION_INCLUDE
from backend.data.queue import AsyncRedisEventBus, RedisEventBus
from backend.server.v2.store.exceptions import DatabaseError
from backend.util import json, mock
from backend.util import mock, type
from backend.util.settings import Config


Expand Down Expand Up @@ -102,16 +103,16 @@ def from_graph(graph: AgentGraphExecution):
def from_db(execution: AgentNodeExecution):
if execution.executionData:
# Execution that has been queued for execution will persist its data.
input_data = json.loads(execution.executionData, target_type=dict[str, Any])
input_data = type.convert(execution.executionData, dict[str, Any])
else:
# For incomplete execution, executionData will not be yet available.
input_data: BlockInput = defaultdict()
for data in execution.Input or []:
input_data[data.name] = json.loads(data.data)
input_data[data.name] = type.convert(data.data, Type[Any])

output_data: CompletedBlockOutput = defaultdict(list)
for data in execution.Output or []:
output_data[data.name].append(json.loads(data.data))
output_data[data.name].append(type.convert(data.data, Type[Any]))

graph_execution: AgentGraphExecution | None = execution.AgentGraphExecution

Expand Down Expand Up @@ -158,7 +159,7 @@ async def create_graph_execution(
"executionStatus": ExecutionStatus.INCOMPLETE,
"Input": {
"create": [
{"name": name, "data": json.dumps(data)}
{"name": name, "data": Json(data)}
for name, data in node_input.items()
]
},
Expand Down Expand Up @@ -210,7 +211,7 @@ async def upsert_execution_input(
order={"addedTime": "asc"},
include={"Input": True},
)
json_input_data = json.dumps(input_data)
json_input_data = Json(input_data)

if existing_execution:
await AgentNodeExecutionInputOutput.prisma().create(
Expand All @@ -222,7 +223,7 @@ async def upsert_execution_input(
)
return existing_execution.id, {
**{
input_data.name: json.loads(input_data.data)
input_data.name: type.convert(input_data.data, Type[Any])
for input_data in existing_execution.Input or []
},
input_name: input_data,
Expand Down Expand Up @@ -256,7 +257,7 @@ async def upsert_execution_output(
await AgentNodeExecutionInputOutput.prisma().create(
data={
"name": output_name,
"data": json.dumps(output_data),
"data": Json(output_data),
"referencedByOutputExecId": node_exec_id,
}
)
Expand All @@ -281,7 +282,7 @@ async def update_graph_execution_stats(
where={"id": graph_exec_id},
data={
"executionStatus": status,
"stats": json.dumps(stats),
"stats": Json(stats),
},
)
if not res:
Expand All @@ -293,7 +294,7 @@ async def update_graph_execution_stats(
async def update_node_execution_stats(node_exec_id: str, stats: dict[str, Any]):
await AgentNodeExecution.prisma().update(
where={"id": node_exec_id},
data={"stats": json.dumps(stats)},
data={"stats": Json(stats)},
)


Expand All @@ -313,8 +314,8 @@ async def update_execution_status(
**({"startedTime": now} if status == ExecutionStatus.RUNNING else {}),
**({"endedTime": now} if status == ExecutionStatus.FAILED else {}),
**({"endedTime": now} if status == ExecutionStatus.COMPLETED else {}),
**({"executionData": json.dumps(execution_data)} if execution_data else {}),
**({"stats": json.dumps(stats)} if stats else {}),
**({"executionData": Json(execution_data)} if execution_data else {}),
**({"stats": Json(stats)} if stats else {}),
}

res = await AgentNodeExecution.prisma().update(
Expand Down Expand Up @@ -473,8 +474,7 @@ async def get_latest_execution(node_id: str, graph_eid: str) -> ExecutionResult
where={
"agentNodeId": node_id,
"agentGraphExecutionId": graph_eid,
"executionStatus": {"not": ExecutionStatus.INCOMPLETE},
"executionData": {"not": None}, # type: ignore
"executionStatus": {"not": ExecutionStatus.INCOMPLETE}, # type: ignore
},
order={"queuedTime": "desc"},
include=EXECUTION_RESULT_INCLUDE,
Expand Down
23 changes: 11 additions & 12 deletions autogpt_platform/backend/backend/data/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Any, Literal, Optional, Type

import prisma
from prisma import Json
from prisma.models import (
AgentGraph,
AgentGraphExecution,
Expand All @@ -18,7 +19,7 @@

from backend.blocks.agent import AgentExecutorBlock
from backend.blocks.basic import AgentInputBlock, AgentOutputBlock
from backend.util import json
from backend.util import type

from .block import BlockInput, BlockType, get_block, get_blocks
from .db import BaseDbModel, transaction
Expand Down Expand Up @@ -74,8 +75,8 @@ def from_db(node: AgentNode):
obj = NodeModel(
id=node.id,
block_id=node.AgentBlock.id,
input_default=json.loads(node.constantInput, target_type=dict[str, Any]),
metadata=json.loads(node.metadata, target_type=dict[str, Any]),
input_default=type.convert(node.constantInput, dict[str, Any]),
metadata=type.convert(node.metadata, dict[str, Any]),
graph_id=node.agentGraphId,
graph_version=node.agentGraphVersion,
webhook_id=node.webhookId,
Expand Down Expand Up @@ -125,7 +126,7 @@ def from_db(execution: AgentGraphExecution):
total_run_time = duration

try:
stats = json.loads(execution.stats or "{}", target_type=dict[str, Any])
stats = type.convert(execution.stats or {}, dict[str, Any])
except ValueError:
stats = {}

Expand Down Expand Up @@ -402,11 +403,9 @@ def _process_node(node: AgentNode, for_export: bool) -> AgentNode:
if for_export:
# Remove credentials from node input
if node.constantInput:
constant_input = json.loads(
node.constantInput, target_type=dict[str, Any]
)
constant_input = type.convert(node.constantInput, dict[str, Any])
constant_input = GraphModel._hide_node_input_credentials(constant_input)
node.constantInput = json.dumps(constant_input)
node.constantInput = Json(constant_input)

# Remove webhook info
node.webhookId = None
Expand Down Expand Up @@ -654,8 +653,8 @@ async def __create_graph(tx, graph: Graph, user_id: str):
{
"id": node.id,
"agentBlockId": node.block_id,
"constantInput": json.dumps(node.input_default),
"metadata": json.dumps(node.metadata),
"constantInput": Json(node.input_default),
"metadata": Json(node.metadata),
}
for node in graph.nodes
]
Expand Down Expand Up @@ -742,7 +741,7 @@ async def fix_llm_provider_credentials():
raise RuntimeError(f"Impossible state while processing node {node}")

node_id: str = node["node_id"]
node_preset_input: dict = json.loads(node["node_preset_input"])
node_preset_input: dict = node["node_preset_input"]
credentials_meta: dict = node_preset_input["credentials"]

credentials = next(
Expand Down Expand Up @@ -778,5 +777,5 @@ async def fix_llm_provider_credentials():
store.update_creds(user_id, credentials)
await AgentNode.prisma().update(
where={"id": node_id},
data={"constantInput": json.dumps(node_preset_input)},
data={"constantInput": Json(node_preset_input)},
)
4 changes: 4 additions & 0 deletions autogpt_platform/backend/backend/util/type.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import json
from typing import Any, Type, TypeVar, cast, get_args, get_origin

from prisma import Json as PrismaJson


class ConversionError(ValueError):
pass
Expand Down Expand Up @@ -188,6 +190,8 @@ def type_match(value: Any, target_type: Type[T]) -> T:

def convert(value: Any, target_type: Type[T]) -> T:
try:
if isinstance(value, PrismaJson):
value = value.data
return cast(T, _try_convert(value, target_type, raise_on_mismatch=False))
except Exception as e:
raise ConversionError(f"Failed to convert {value} to {target_type}") from e
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
CREATE OR REPLACE FUNCTION migrate_text_column_to_json(
p_table text, -- Table name, e.g. 'AgentNodeExecution'
p_col text, -- Column name to convert, e.g. 'executionData'
p_default json DEFAULT '{}'::json, -- Fallback value when original value is NULL.
-- Pass NULL here if you prefer to leave NULLs.
p_set_nullable boolean DEFAULT true -- If false, the new column will be NOT NULL.
) RETURNS void AS $$
DECLARE
full_table text;
tmp_col text;
BEGIN
-- Build a fully qualified table name using the current schema.
full_table := format('%I.%I', current_schema(), p_table);
tmp_col := p_col || '_tmp';

-- 0. Skip the migration if the column is already of type jsonb.
IF EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_schema = current_schema()
AND table_name = p_table
AND column_name = p_col
AND data_type = 'jsonb'
) THEN
RAISE NOTICE 'Column %I.%I is already of type jsonb, skipping migration.', full_table, p_col;
RETURN;
END IF;

-- 1. Cleanup the original column from invalid JSON characters.
EXECUTE format('UPDATE %s SET %I = replace(%I, E''\\u0000'', '''') WHERE %I LIKE ''%%\\u0000%%'';', full_table, p_col, p_col, p_col);

-- 2. Add the temporary column of type JSON.
EXECUTE format('ALTER TABLE %s ADD COLUMN %I jsonb;', full_table, tmp_col);

-- 3. Convert the data:
-- - If p_default IS NOT NULL, use it as the fallback value.
-- - Otherwise, keep NULL.
IF p_default IS NULL THEN
EXECUTE format(
'UPDATE %s SET %I = CASE WHEN %I IS NULL THEN NULL ELSE %I::json END;',
full_table, tmp_col, p_col, p_col
);
ELSE
EXECUTE format(
'UPDATE %s SET %I = CASE WHEN %I IS NULL THEN %L::json ELSE %I::json END;',
full_table, tmp_col, p_col, p_default::text, p_col
);
END IF;

-- 4. Drop the original text column.
EXECUTE format('ALTER TABLE %s DROP COLUMN %I;', full_table, p_col);

-- 5. Rename the temporary column to the original column name.
EXECUTE format('ALTER TABLE %s RENAME COLUMN %I TO %I;', full_table, tmp_col, p_col);

-- 6. Optionally set a DEFAULT for future inserts if a fallback is provided.
IF p_default IS NOT NULL THEN
EXECUTE format('ALTER TABLE %s ALTER COLUMN %I SET DEFAULT %L::json;',
full_table, p_col, p_default::text);
END IF;

-- 7. Optionally mark the column as NOT NULL.
IF NOT p_set_nullable THEN
EXECUTE format('ALTER TABLE %s ALTER COLUMN %I SET NOT NULL;', full_table, p_col);
END IF;
END;
$$ LANGUAGE plpgsql;


BEGIN;
SELECT migrate_text_column_to_json('AgentGraphExecution', 'stats', NULL, true);
SELECT migrate_text_column_to_json('AgentNodeExecution', 'stats', NULL, true);
SELECT migrate_text_column_to_json('AgentNodeExecution', 'executionData', NULL, true);
SELECT migrate_text_column_to_json('AgentNode', 'constantInput', '{}'::json, false);
SELECT migrate_text_column_to_json('AgentNode', 'metadata', '{}'::json, false);
SELECT migrate_text_column_to_json('AgentNodeExecutionInputOutput', 'data', NULL, false);
COMMIT;
15 changes: 6 additions & 9 deletions autogpt_platform/backend/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -209,15 +209,13 @@ model AgentNode {
// List of produced output, that the child node should be executed.
Output AgentNodeLink[] @relation("AgentNodeSource")
// JSON serialized dict[str, str] containing predefined input values.
constantInput String @default("{}")
constantInput Json @default("{}")
// For webhook-triggered blocks: reference to the webhook that triggers the node
webhookId String?
Webhook IntegrationWebhook? @relation(fields: [webhookId], references: [id])
// JSON serialized dict[str, str] containing the node metadata.
metadata String @default("{}")
metadata Json @default("{}")
ExecutionHistory AgentNodeExecution[]
Expand Down Expand Up @@ -290,7 +288,7 @@ model AgentGraphExecution {
userId String
user User @relation(fields: [userId], references: [id], onDelete: Cascade)
stats String? // JSON serialized object
stats Json?
AgentPreset AgentPreset? @relation(fields: [agentPresetId], references: [id])
agentPresetId String?
Expand All @@ -312,14 +310,13 @@ model AgentNodeExecution {
Output AgentNodeExecutionInputOutput[] @relation("AgentNodeExecutionOutput")
executionStatus AgentExecutionStatus @default(COMPLETED)
// Final JSON serialized input data for the node execution.
executionData String?
executionData Json?
addedTime DateTime @default(now())
queuedTime DateTime?
startedTime DateTime?
endedTime DateTime?
stats String? // JSON serialized object
stats Json?
@@index([agentGraphExecutionId])
@@index([agentNodeId])
Expand All @@ -330,7 +327,7 @@ model AgentNodeExecutionInputOutput {
id String @id @default(uuid())
name String
data String
data Json
time DateTime @default(now())
// Prisma requires explicit back-references.
Expand Down
6 changes: 3 additions & 3 deletions autogpt_platform/backend/test/test_data_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import prisma.enums
from faker import Faker
from prisma import Prisma
from prisma import Json, Prisma

faker = Faker()

Expand Down Expand Up @@ -110,8 +110,8 @@ async def main():
"agentBlockId": block.id,
"agentGraphId": graph.id,
"agentGraphVersion": graph.version,
"constantInput": "{}",
"metadata": "{}",
"constantInput": Json({}),
"metadata": Json({}),
}
)
agent_nodes.append(node)
Expand Down

0 comments on commit b5b9a00

Please sign in to comment.