[ PROMPT_NODE_28144 ]
database_operations
[ SKILL_DOCUMENTATION ]
# 数据库操作模式
**用例**:在工作流中读取、写入、同步和管理数据库数据。
---
## 模式结构
触发器 → [查询/读取] → [转换] → [写入/更新] → [验证/日志]
**关键特性**:数据持久化与同步
---
## 核心组件
### 1. 触发器
**选项**:
- **定时任务 (Schedule)** - 定期同步/维护(最常见)
- **Webhook** - 事件驱动的写入
- **手动 (Manual)** - 一次性操作
### 2. 数据库读取节点
**支持的数据库**:
- Postgres
- MySQL
- MongoDB
- Microsoft SQL
- SQLite
- Redis
- 以及通过社区节点支持的其他数据库
### 3. 转换
**目的**:在不同的数据库架构或格式之间进行映射
**典型节点**:
- **Set** - 字段映射
- **Code** - 复杂转换
- **Merge** - 合并来自多个源的数据
### 4. 数据库写入节点
**操作**:
- INSERT - 创建新记录
- UPDATE - 修改现有记录
- UPSERT - 插入或更新
- DELETE - 删除记录
### 5. 验证
**目的**:确认操作成功
**方法**:
- 查询以验证记录
- 计算受影响的行数
- 记录结果
---
## 常见用例
### 1. 数据同步
**流程**:定时任务 → 读取源数据库 → 转换 → 写入目标数据库 → 日志
**示例**(Postgres 到 MySQL 同步):
1. 定时任务 (每15分钟)
2. Postgres (SELECT * FROM users WHERE updated_at > {{$json.last_sync}})
3. IF (检查记录是否存在)
4. Set (将 Postgres 架构映射到 MySQL 架构)
5. MySQL (INSERT 或 UPDATE users)
6. Postgres (UPDATE sync_log SET last_sync = NOW())
7. Slack (通知: "已同步 X 个用户")
**增量同步查询**:
sql
SELECT *
FROM users
WHERE updated_at > $1
ORDER BY updated_at ASC
LIMIT 1000
**参数**:
javascript
{
"parameters": [
"={{$node['Get Last Sync'].json.last_sync}}"
]
}
### 2. ETL (提取、转换、加载)
**流程**:从多个源提取 → 转换 → 加载到数据仓库
**示例**(整合数据):
1. 定时任务 (每天凌晨2点)
2. [并行分支]
├─ Postgres (SELECT orders)
├─ MySQL (SELECT customers)
└─ MongoDB (SELECT products)
3. Merge (合并所有数据)
4. Code (转换为仓库架构)
5. Postgres (数据仓库 - INSERT into fact_sales)
6. Email (发送摘要报告)
### 3. 数据验证与清理
**流程**:定时任务 → 查询 → 验证 → 更新/删除无效记录
**示例**(清理孤立记录):
1. 定时任务 (每周)
2. Postgres (SELECT users WHERE email IS NULL OR email = '')
3. IF (