From b5b9a008bf1142ada107169bb4f9636c45264de3 Mon Sep 17 00:00:00 2001 From: Zamil Majdy Date: Thu, 13 Feb 2025 19:02:53 +0700 Subject: [PATCH] feat(backend): Migrate json encoded string columns into a native json column (#9475) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Changes 🏗️ Due to the legacy of SQLite usage, some of the JSON columns are actually a string column string a stringified JSON column. The scope of this PR is migrating those columns into an actual JSON column. ### Checklist 📋 #### For code changes: - [ ] I have clearly listed my changes in the PR description - [ ] I have made a test plan - [ ] I have tested my changes according to the test plan: - [ ] ...
Example test plan - [ ] Create from scratch and execute an agent with at least 3 blocks - [ ] Import an agent from file upload, and confirm it executes correctly - [ ] Upload agent to marketplace - [ ] Import an agent from marketplace and confirm it executes correctly - [ ] Edit an agent from monitor, and confirm it executes correctly
#### For configuration changes: - [ ] `.env.example` is updated or already compatible with my changes - [ ] `docker-compose.yml` is updated or already compatible with my changes - [ ] I have included a list of my configuration changes in the PR description (under **Changes**)
Examples of configuration changes - Changing ports - Adding new services that need to communicate with each other - Secrets or environment variable changes - New or infrastructure changes such as databases
--- .../backend/backend/data/execution.py | 30 ++++---- .../backend/backend/data/graph.py | 23 +++--- autogpt_platform/backend/backend/util/type.py | 4 + .../migration.sql | 77 +++++++++++++++++++ autogpt_platform/backend/schema.prisma | 15 ++-- .../backend/test/test_data_creator.py | 6 +- 6 files changed, 116 insertions(+), 39 deletions(-) create mode 100644 autogpt_platform/backend/migrations/20250213110232_migrate_string_json/migration.sql diff --git a/autogpt_platform/backend/backend/data/execution.py b/autogpt_platform/backend/backend/data/execution.py index f9bbccf864bb..4396fcb423d1 100644 --- a/autogpt_platform/backend/backend/data/execution.py +++ b/autogpt_platform/backend/backend/data/execution.py @@ -1,8 +1,9 @@ from collections import defaultdict from datetime import datetime, timezone from multiprocessing import Manager -from typing import Any, AsyncGenerator, Generator, Generic, Optional, TypeVar +from typing import Any, AsyncGenerator, Generator, Generic, Optional, Type, TypeVar +from prisma import Json from prisma.enums import AgentExecutionStatus from prisma.errors import PrismaError from prisma.models import ( @@ -16,7 +17,7 @@ from backend.data.includes import EXECUTION_RESULT_INCLUDE, GRAPH_EXECUTION_INCLUDE from backend.data.queue import AsyncRedisEventBus, RedisEventBus from backend.server.v2.store.exceptions import DatabaseError -from backend.util import json, mock +from backend.util import mock, type from backend.util.settings import Config @@ -102,16 +103,16 @@ def from_graph(graph: AgentGraphExecution): def from_db(execution: AgentNodeExecution): if execution.executionData: # Execution that has been queued for execution will persist its data. - input_data = json.loads(execution.executionData, target_type=dict[str, Any]) + input_data = type.convert(execution.executionData, dict[str, Any]) else: # For incomplete execution, executionData will not be yet available. input_data: BlockInput = defaultdict() for data in execution.Input or []: - input_data[data.name] = json.loads(data.data) + input_data[data.name] = type.convert(data.data, Type[Any]) output_data: CompletedBlockOutput = defaultdict(list) for data in execution.Output or []: - output_data[data.name].append(json.loads(data.data)) + output_data[data.name].append(type.convert(data.data, Type[Any])) graph_execution: AgentGraphExecution | None = execution.AgentGraphExecution @@ -158,7 +159,7 @@ async def create_graph_execution( "executionStatus": ExecutionStatus.INCOMPLETE, "Input": { "create": [ - {"name": name, "data": json.dumps(data)} + {"name": name, "data": Json(data)} for name, data in node_input.items() ] }, @@ -210,7 +211,7 @@ async def upsert_execution_input( order={"addedTime": "asc"}, include={"Input": True}, ) - json_input_data = json.dumps(input_data) + json_input_data = Json(input_data) if existing_execution: await AgentNodeExecutionInputOutput.prisma().create( @@ -222,7 +223,7 @@ async def upsert_execution_input( ) return existing_execution.id, { **{ - input_data.name: json.loads(input_data.data) + input_data.name: type.convert(input_data.data, Type[Any]) for input_data in existing_execution.Input or [] }, input_name: input_data, @@ -256,7 +257,7 @@ async def upsert_execution_output( await AgentNodeExecutionInputOutput.prisma().create( data={ "name": output_name, - "data": json.dumps(output_data), + "data": Json(output_data), "referencedByOutputExecId": node_exec_id, } ) @@ -281,7 +282,7 @@ async def update_graph_execution_stats( where={"id": graph_exec_id}, data={ "executionStatus": status, - "stats": json.dumps(stats), + "stats": Json(stats), }, ) if not res: @@ -293,7 +294,7 @@ async def update_graph_execution_stats( async def update_node_execution_stats(node_exec_id: str, stats: dict[str, Any]): await AgentNodeExecution.prisma().update( where={"id": node_exec_id}, - data={"stats": json.dumps(stats)}, + data={"stats": Json(stats)}, ) @@ -313,8 +314,8 @@ async def update_execution_status( **({"startedTime": now} if status == ExecutionStatus.RUNNING else {}), **({"endedTime": now} if status == ExecutionStatus.FAILED else {}), **({"endedTime": now} if status == ExecutionStatus.COMPLETED else {}), - **({"executionData": json.dumps(execution_data)} if execution_data else {}), - **({"stats": json.dumps(stats)} if stats else {}), + **({"executionData": Json(execution_data)} if execution_data else {}), + **({"stats": Json(stats)} if stats else {}), } res = await AgentNodeExecution.prisma().update( @@ -473,8 +474,7 @@ async def get_latest_execution(node_id: str, graph_eid: str) -> ExecutionResult where={ "agentNodeId": node_id, "agentGraphExecutionId": graph_eid, - "executionStatus": {"not": ExecutionStatus.INCOMPLETE}, - "executionData": {"not": None}, # type: ignore + "executionStatus": {"not": ExecutionStatus.INCOMPLETE}, # type: ignore }, order={"queuedTime": "desc"}, include=EXECUTION_RESULT_INCLUDE, diff --git a/autogpt_platform/backend/backend/data/graph.py b/autogpt_platform/backend/backend/data/graph.py index 403a364c18a1..3d9a1076761f 100644 --- a/autogpt_platform/backend/backend/data/graph.py +++ b/autogpt_platform/backend/backend/data/graph.py @@ -6,6 +6,7 @@ from typing import Any, Literal, Optional, Type import prisma +from prisma import Json from prisma.models import ( AgentGraph, AgentGraphExecution, @@ -18,7 +19,7 @@ from backend.blocks.agent import AgentExecutorBlock from backend.blocks.basic import AgentInputBlock, AgentOutputBlock -from backend.util import json +from backend.util import type from .block import BlockInput, BlockType, get_block, get_blocks from .db import BaseDbModel, transaction @@ -74,8 +75,8 @@ def from_db(node: AgentNode): obj = NodeModel( id=node.id, block_id=node.AgentBlock.id, - input_default=json.loads(node.constantInput, target_type=dict[str, Any]), - metadata=json.loads(node.metadata, target_type=dict[str, Any]), + input_default=type.convert(node.constantInput, dict[str, Any]), + metadata=type.convert(node.metadata, dict[str, Any]), graph_id=node.agentGraphId, graph_version=node.agentGraphVersion, webhook_id=node.webhookId, @@ -125,7 +126,7 @@ def from_db(execution: AgentGraphExecution): total_run_time = duration try: - stats = json.loads(execution.stats or "{}", target_type=dict[str, Any]) + stats = type.convert(execution.stats or {}, dict[str, Any]) except ValueError: stats = {} @@ -402,11 +403,9 @@ def _process_node(node: AgentNode, for_export: bool) -> AgentNode: if for_export: # Remove credentials from node input if node.constantInput: - constant_input = json.loads( - node.constantInput, target_type=dict[str, Any] - ) + constant_input = type.convert(node.constantInput, dict[str, Any]) constant_input = GraphModel._hide_node_input_credentials(constant_input) - node.constantInput = json.dumps(constant_input) + node.constantInput = Json(constant_input) # Remove webhook info node.webhookId = None @@ -654,8 +653,8 @@ async def __create_graph(tx, graph: Graph, user_id: str): { "id": node.id, "agentBlockId": node.block_id, - "constantInput": json.dumps(node.input_default), - "metadata": json.dumps(node.metadata), + "constantInput": Json(node.input_default), + "metadata": Json(node.metadata), } for node in graph.nodes ] @@ -742,7 +741,7 @@ async def fix_llm_provider_credentials(): raise RuntimeError(f"Impossible state while processing node {node}") node_id: str = node["node_id"] - node_preset_input: dict = json.loads(node["node_preset_input"]) + node_preset_input: dict = node["node_preset_input"] credentials_meta: dict = node_preset_input["credentials"] credentials = next( @@ -778,5 +777,5 @@ async def fix_llm_provider_credentials(): store.update_creds(user_id, credentials) await AgentNode.prisma().update( where={"id": node_id}, - data={"constantInput": json.dumps(node_preset_input)}, + data={"constantInput": Json(node_preset_input)}, ) diff --git a/autogpt_platform/backend/backend/util/type.py b/autogpt_platform/backend/backend/util/type.py index 2c480b674d8f..531f973df154 100644 --- a/autogpt_platform/backend/backend/util/type.py +++ b/autogpt_platform/backend/backend/util/type.py @@ -1,6 +1,8 @@ import json from typing import Any, Type, TypeVar, cast, get_args, get_origin +from prisma import Json as PrismaJson + class ConversionError(ValueError): pass @@ -188,6 +190,8 @@ def type_match(value: Any, target_type: Type[T]) -> T: def convert(value: Any, target_type: Type[T]) -> T: try: + if isinstance(value, PrismaJson): + value = value.data return cast(T, _try_convert(value, target_type, raise_on_mismatch=False)) except Exception as e: raise ConversionError(f"Failed to convert {value} to {target_type}") from e diff --git a/autogpt_platform/backend/migrations/20250213110232_migrate_string_json/migration.sql b/autogpt_platform/backend/migrations/20250213110232_migrate_string_json/migration.sql new file mode 100644 index 000000000000..2d5216aee9ae --- /dev/null +++ b/autogpt_platform/backend/migrations/20250213110232_migrate_string_json/migration.sql @@ -0,0 +1,77 @@ +CREATE OR REPLACE FUNCTION migrate_text_column_to_json( + p_table text, -- Table name, e.g. 'AgentNodeExecution' + p_col text, -- Column name to convert, e.g. 'executionData' + p_default json DEFAULT '{}'::json, -- Fallback value when original value is NULL. + -- Pass NULL here if you prefer to leave NULLs. + p_set_nullable boolean DEFAULT true -- If false, the new column will be NOT NULL. +) RETURNS void AS $$ +DECLARE + full_table text; + tmp_col text; +BEGIN + -- Build a fully qualified table name using the current schema. + full_table := format('%I.%I', current_schema(), p_table); + tmp_col := p_col || '_tmp'; + + -- 0. Skip the migration if the column is already of type jsonb. + IF EXISTS ( + SELECT 1 + FROM information_schema.columns + WHERE table_schema = current_schema() + AND table_name = p_table + AND column_name = p_col + AND data_type = 'jsonb' + ) THEN + RAISE NOTICE 'Column %I.%I is already of type jsonb, skipping migration.', full_table, p_col; + RETURN; + END IF; + + -- 1. Cleanup the original column from invalid JSON characters. + EXECUTE format('UPDATE %s SET %I = replace(%I, E''\\u0000'', '''') WHERE %I LIKE ''%%\\u0000%%'';', full_table, p_col, p_col, p_col); + + -- 2. Add the temporary column of type JSON. + EXECUTE format('ALTER TABLE %s ADD COLUMN %I jsonb;', full_table, tmp_col); + + -- 3. Convert the data: + -- - If p_default IS NOT NULL, use it as the fallback value. + -- - Otherwise, keep NULL. + IF p_default IS NULL THEN + EXECUTE format( + 'UPDATE %s SET %I = CASE WHEN %I IS NULL THEN NULL ELSE %I::json END;', + full_table, tmp_col, p_col, p_col + ); + ELSE + EXECUTE format( + 'UPDATE %s SET %I = CASE WHEN %I IS NULL THEN %L::json ELSE %I::json END;', + full_table, tmp_col, p_col, p_default::text, p_col + ); + END IF; + + -- 4. Drop the original text column. + EXECUTE format('ALTER TABLE %s DROP COLUMN %I;', full_table, p_col); + + -- 5. Rename the temporary column to the original column name. + EXECUTE format('ALTER TABLE %s RENAME COLUMN %I TO %I;', full_table, tmp_col, p_col); + + -- 6. Optionally set a DEFAULT for future inserts if a fallback is provided. + IF p_default IS NOT NULL THEN + EXECUTE format('ALTER TABLE %s ALTER COLUMN %I SET DEFAULT %L::json;', + full_table, p_col, p_default::text); + END IF; + + -- 7. Optionally mark the column as NOT NULL. + IF NOT p_set_nullable THEN + EXECUTE format('ALTER TABLE %s ALTER COLUMN %I SET NOT NULL;', full_table, p_col); + END IF; +END; +$$ LANGUAGE plpgsql; + + +BEGIN; + SELECT migrate_text_column_to_json('AgentGraphExecution', 'stats', NULL, true); + SELECT migrate_text_column_to_json('AgentNodeExecution', 'stats', NULL, true); + SELECT migrate_text_column_to_json('AgentNodeExecution', 'executionData', NULL, true); + SELECT migrate_text_column_to_json('AgentNode', 'constantInput', '{}'::json, false); + SELECT migrate_text_column_to_json('AgentNode', 'metadata', '{}'::json, false); + SELECT migrate_text_column_to_json('AgentNodeExecutionInputOutput', 'data', NULL, false); +COMMIT; diff --git a/autogpt_platform/backend/schema.prisma b/autogpt_platform/backend/schema.prisma index 59db035f5d51..b24f77014913 100644 --- a/autogpt_platform/backend/schema.prisma +++ b/autogpt_platform/backend/schema.prisma @@ -209,15 +209,13 @@ model AgentNode { // List of produced output, that the child node should be executed. Output AgentNodeLink[] @relation("AgentNodeSource") - // JSON serialized dict[str, str] containing predefined input values. - constantInput String @default("{}") + constantInput Json @default("{}") // For webhook-triggered blocks: reference to the webhook that triggers the node webhookId String? Webhook IntegrationWebhook? @relation(fields: [webhookId], references: [id]) - // JSON serialized dict[str, str] containing the node metadata. - metadata String @default("{}") + metadata Json @default("{}") ExecutionHistory AgentNodeExecution[] @@ -290,7 +288,7 @@ model AgentGraphExecution { userId String user User @relation(fields: [userId], references: [id], onDelete: Cascade) - stats String? // JSON serialized object + stats Json? AgentPreset AgentPreset? @relation(fields: [agentPresetId], references: [id]) agentPresetId String? @@ -312,14 +310,13 @@ model AgentNodeExecution { Output AgentNodeExecutionInputOutput[] @relation("AgentNodeExecutionOutput") executionStatus AgentExecutionStatus @default(COMPLETED) - // Final JSON serialized input data for the node execution. - executionData String? + executionData Json? addedTime DateTime @default(now()) queuedTime DateTime? startedTime DateTime? endedTime DateTime? - stats String? // JSON serialized object + stats Json? @@index([agentGraphExecutionId]) @@index([agentNodeId]) @@ -330,7 +327,7 @@ model AgentNodeExecutionInputOutput { id String @id @default(uuid()) name String - data String + data Json time DateTime @default(now()) // Prisma requires explicit back-references. diff --git a/autogpt_platform/backend/test/test_data_creator.py b/autogpt_platform/backend/test/test_data_creator.py index d5235043d355..45596b7d9063 100644 --- a/autogpt_platform/backend/test/test_data_creator.py +++ b/autogpt_platform/backend/test/test_data_creator.py @@ -4,7 +4,7 @@ import prisma.enums from faker import Faker -from prisma import Prisma +from prisma import Json, Prisma faker = Faker() @@ -110,8 +110,8 @@ async def main(): "agentBlockId": block.id, "agentGraphId": graph.id, "agentGraphVersion": graph.version, - "constantInput": "{}", - "metadata": "{}", + "constantInput": Json({}), + "metadata": Json({}), } ) agent_nodes.append(node)