|
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import
TensorDataset, DataLoader
from sklearn.preprocessing import
StandardScaler
from sklearn.metrics import
accuracy_score, roc_auc_score
# ── 随机种子 ──────────────────────────────────────────────────────
torch.manual_seed(42)
np.random.seed(42)
## 原始 CSV 是按时间倒序存储的(最新日期在前),用 iloc[::-1] 翻转为正序,
## 再
reset_index 重建从 0 开始的行索引。
df =
pd.read_csv("d:\\C3_1_WTI.csv")
df =
df.iloc[::-1].reset_index(drop=True)
## 原始交易量带有单位后缀(如 "12.5K" "3.2M"),需要统一转换为数值。
## 涨跌幅带有百分号(如
"-0.44%"),去掉百分号后转 float 即为小数形式的涨跌比例。
def parse_volume(s):
s = str(s).strip()
try:
if s.endswith("K"):
return float(s[:-1]) * 1_000
elif s.endswith("M"):
return float(s[:-1]) * 1_000_000
elif s.endswith("B"):
return float(s[:-1]) *
1_000_000_000
else:
return float(s)
except ValueError:
return np.nan
df["真实交易量"]
= df["交易量"].apply(parse_volume)
df["真实涨跌幅"]
= df["涨跌幅"].str.replace("%",
"").astype(float)
# ── 3. 数据修复
───────────────────────────────────────────────────
base_cols = ["开盘", "高", "低", "收盘", "真实交易量", "真实涨跌幅"]
df[base_cols] = df[base_cols].replace([np.inf,
-np.inf], np.nan)
nan_counts =
df[base_cols].isna().sum()
if nan_counts.any():
df[base_cols] = df[base_cols].ffill().bfill()
# ── 4. 技术指标特征
───────────────────────────────────────────────
close = df["收盘"]
high
= df["高"]
low
= df["低"]
vol
= df["真实交易量"]
#为了让特征更容易被学习,我们增加了部分技术特征,如布林带,动量,均线等,和LSTM本身无关
# --- 动量 / 均线 ---
df["ma5"] = close.rolling(5).mean()
df["ma10"] = close.rolling(10).mean()
df["ma20"] = close.rolling(20).mean()
df["ema12"] = close.ewm(span=12, adjust=False).mean()
df["ema26"] = close.ewm(span=26, adjust=False).mean()
# --- MACD ---
df["macd"] = df["ema12"] -
df["ema26"]
df["macd_signal"] =
df["macd"].ewm(span=9, adjust=False).mean()
df["macd_hist"] = df["macd"] -
df["macd_signal"]
# --- RSI(14) ---
delta = close.diff()
gain
= delta.clip(lower=0).rolling(14).mean()
loss
= (-delta.clip(upper=0)).rolling(14).mean()
df["rsi14"] = 100 - 100 / (1
+ gain / loss.replace(0, np.nan))
# --- Bollinger Bands (20) ---
bb_mid = close.rolling(20).mean()
bb_std = close.rolling(20).std()
df["bb_upper"] = bb_mid + 2
* bb_std
df["bb_lower"] = bb_mid - 2
* bb_std
df["bb_width"] =
(df["bb_upper"] - df["bb_lower"]) / bb_mid
df["bb_pos"] = (close - df["bb_lower"]) /
(df["bb_upper"] - df["bb_lower"] + 1e-9)
# --- ATR(14):衡量波动性 ---
tr = pd.concat([
high - low,
(high - close.shift()).abs(),
(low - close.shift()).abs(),
], axis=1).max(axis=1)
df["atr14"] =
tr.rolling(14).mean()
# --- 收益率特征
---
df["ret1"] = close.pct_change(1)
df["ret3"] = close.pct_change(3)
df["ret5"] = close.pct_change(5)
# --- 成交量变化率
---
df["vol_ma5"] = vol.rolling(5).mean()
df["vol_ratio"] = vol / (df["vol_ma5"] + 1e-9)
# ── 5. 预测目标
───────────────────────────────────────────────────
df["Target"] =
(close.shift(-1) > close).astype(int)
# 删掉因滚动窗口产生的
NaN 行,以及最后一行(Target 为 NaN)
df =
df.dropna().reset_index(drop=True)
print(f"[数据] 有效行数: {len(df)}")
# ── 6. 特征列
─────────────────────────────────────────────────────
features = [
# 原始价格相对特征
"真实涨跌幅",
"ret1", "ret3", "ret5",
# 均线比率(价格 / 均线,反映偏离程度)
"ma5", "ma10", "ma20",
# MACD
"macd", "macd_signal", "macd_hist",
# RSI
"rsi14",
# 布林带
"bb_width", "bb_pos",
# 波动性
"atr14",
# 成交量
"vol_ratio",
]
n_feat = len(features)
print(f"[特征] 共 {n_feat} 个:
{features}")
# ── 7. 滑窗构造样本
───────────────────────────────────────────────
## LSTM 需要输入形如
(样本数, 时间步, 特征数) 的三维张量。
## 滑动窗口:取连续 window_size
天的特征作为一个样本的输入,
## 对应的标签是第
window_size+1 天的涨跌。
## 窗口每次向后移动一天,因此总样本数 = len(df) - window_size。
window_size = 60
X_raw, y_raw = [], []
for i in range(len(df) - window_size):
X_raw.append(df[features].iloc[i : i + window_size].values)
y_raw.append(df["Target"].iloc[i + window_size])
X_raw = np.array(X_raw,
dtype=np.float64)
y_raw = np.array(y_raw)
print(f"[样本] X: {X_raw.shape}, y: {y_raw.shape}")
print(f"[标签] 上涨比例: {y_raw.mean():.3f}")
# ── 8. 划分训练 / 测试集 ──────────────────────────────────────────
## 时序数据必须按时间顺序划分,前 80% 训练,后 20% 测试,
## 不能随机打乱——否则未来数据会泄露到训练集,导致评估虚高。
split = int(len(X_raw) * 0.8)
X_train_raw, X_test_raw =
X_raw[:split], X_raw[split:]
y_train, y_test = y_raw[:split], y_raw[split:]
# ── 9. 标准化────────────────────────────────────
## StandardScaler 将每个特征变换为均值=0、方差=1 的分布,
## 消除不同特征之间量级差异(如价格 vs RSI vs 成交量比率)对梯度的影响。
##
## 关键:只在训练集上
fit(计算均值/方差),
## 再用同一套参数
transform 测试集。
## 若在全量数据上 fit,测试集的统计信息会渗入训练过程,造成数据泄露。
##
## 因为 X 是三维数组 (N, T, F),StandardScaler 只接受二维输入,
## 所以先 reshape
成 (N*T, F) 做 fit/transform,再 reshape 回去。
scaler = StandardScaler()
X_train_s = scaler.fit_transform(
X_train_raw.reshape(-1, n_feat)
).reshape(len(X_train_raw),
window_size, n_feat)
X_test_s = scaler.transform(
X_test_raw.reshape(-1, n_feat)
).reshape(len(X_test_raw),
window_size, n_feat)
assert not (np.isnan(X_train_s).any()
or np.isinf(X_train_s).any()), \
"标准化后 X_train 仍含
NaN/inf"
# ── 10. 转为 Tensor ───────────────────────────────────────────────
X_train = torch.tensor(X_train_s,
dtype=torch.float32)
X_test
= torch.tensor(X_test_s,
dtype=torch.float32)
y_train_t = torch.tensor(y_train,
dtype=torch.float32)
y_test_t = torch.tensor(y_test, dtype=torch.float32)
# ── 11.
DataLoader ────────────────────────────────────────────────
train_loader = DataLoader(
TensorDataset(X_train, y_train_t),
batch_size=64,
shuffle=False,
)
# ── 12. 模型:双层 LSTM + Dropout ─────────────────────────────────
## LSTM 内部有四个门:输入门、遗忘门、输出门、候选记忆。
## - 遗忘门:决定上一时刻记忆向量中哪些信息需要丢弃。
## - 输入门:决定当前输入中哪些信息值得写入记忆。
## - 输出门:决定当前时刻从记忆中读取哪些信息作为输出。
##
## num_layers=2:堆叠两层 LSTM,第一层的输出作为第二层的输入,
## 可以捕捉更高层次的时序抽象(类似深层 CNN 捕捉高阶纹理)。
## dropout=0.3:在两层 LSTM 之间随机丢弃 30% 的神经元,防止过拟合。
## 全连接头:LSTM 最后时间步的隐状态
→ Linear(64→32) → ReLU → Dropout →
Linear(32→1)
## 输出一个标量
logit,配合 BCEWithLogitsLoss 完成二分类。
class OilLSTM(nn.Module):
def __init__(self, input_size, hidden_size=64, num_layers=2,
dropout=0.3):
super().__init__()
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout, # 层间 dropout(num_layers>1 才生效)
)
self.head = nn.Sequential(
nn.Linear(hidden_size, 32),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(32, 1),
)
def forward(self, x):
out, _ = self.lstm(x)
return self.head(out[:, -1, :])
model = OilLSTM(input_size=n_feat)
criterion = nn.BCEWithLogitsLoss()
optimizer =
torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)
scheduler =
torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode="min", factor=0.5, patience=3, verbose=True
)
# ── 13. 训练
──────────────────────────────────────────────────────
## BCEWithLogitsLoss:数值上等价于 Sigmoid + 二元交叉熵,
## 合并计算更稳定(避免 Sigmoid 饱和区的数值误差)。
## Adam:自适应学习率优化器,收敛比 SGD 更快,适合大多数深度学习任务。
## weight_decay=1e-4:L2 正则化,惩罚过大的权重,减轻过拟合。
## ReduceLROnPlateau:当训练 loss 连续 3 个 epoch
不下降时,
## 将学习率乘以 0.5,帮助跳出平台区。
epochs = 50
for epoch in range(epochs):
model.train()
total_loss = 0.0
for batch_x, batch_y in train_loader:
optimizer.zero_grad()
loss = criterion(model(batch_x).squeeze(), batch_y)
loss.backward()
nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) # 梯度裁剪
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(train_loader)
scheduler.step(avg_loss)
if (epoch + 1) % 5 == 0:
print(f"Epoch [{epoch + 1:>2}/{epochs}] Loss: {avg_loss:.4f}")
# ── 14. 评估
──────────────────────────────────────────────────────
## model.eval():关闭 Dropout,使推理结果确定性。
## torch.no_grad():关闭梯度计算,节省显存和计算量。
##
## Accuracy:预测正确的样本比例,直观但对不平衡数据不够敏感。
## AUC(ROC 曲线下面积):衡量模型区分正负样本的综合能力,
##
0.5 = 随机猜,1.0 = 完美分类,不受分类阈值影响,更稳健。
model.eval()
with torch.no_grad():
outputs = model(X_test)
preds = (torch.sigmoid(outputs.squeeze()) > 0.5).int()
probs = torch.sigmoid(outputs).squeeze().numpy()
acc = accuracy_score(y_test,
preds.numpy())
auc = roc_auc_score(y_test, probs)
print(f"\nAccuracy :
{acc:.4f}")
print(f"AUC : {auc:.4f}")
|