在《运用 Transformers 进行概率时刻序列猜测》的第一部分里,咱们为大家介绍了传统时刻序列猜测和依据 Transformers 的方法,也一步步预备好了练习所需的数据集并定义了环境、模型、转化和 InstanceSplitter。本篇内容将包括从数据加载器,到前向传达、练习、推理和展望未来开展等精彩内容。

创立 PyTorch 数据加载器

有了数据,下一步需求创立 PyTorch DataLoaders。它答应咱们批量处理成对的 (输入, 输出) 数据,即 (past_values , future_values)。

from gluonts.itertools import Cyclic, IterableSlice, PseudoShuffled
from gluonts.torch.util import IterableDataset
from torch.utils.data import DataLoader
from typing import Iterable
def create_train_dataloader(
    config: PretrainedConfig,
    freq,
    data,
    batch_size: int,
    num_batches_per_epoch: int,
    shuffle_buffer_length: Optional[int] = None,
    **kwargs,
) -> Iterable:
    PREDICTION_INPUT_NAMES = [
        "static_categorical_features",
        "static_real_features",
        "past_time_features",
        "past_values",
        "past_observed_mask",
        "future_time_features",
        ]
    TRAINING_INPUT_NAMES = PREDICTION_INPUT_NAMES + [
        "future_values",
        "future_observed_mask",
        ]
    transformation = create_transformation(freq, config)
    transformed_data = transformation.apply(data, is_train=True)
    # we initialize a Training instance
    instance_splitter = create_instance_splitter(
        config, "train"
    ) + SelectFields(TRAINING_INPUT_NAMES)
    # the instance splitter will sample a window of 
    # context length + lags + prediction length (from the 366 possible transformed time series)
    # randomly from within the target time series and return an iterator.
    training_instances = instance_splitter.apply(
        Cyclic(transformed_data)
        if shuffle_buffer_length is None
        else PseudoShuffled(
            Cyclic(transformed_data), 
            shuffle_buffer_length=shuffle_buffer_length,
        )
    )
    # from the training instances iterator we now return a Dataloader which will 
    # continue to sample random windows for as long as it is called
    # to return batch_size of the appropriate tensors ready for training!
    return IterableSlice(
        iter(
            DataLoader(
                IterableDataset(training_instances),
                batch_size=batch_size,
                **kwargs,
            )
        ),
        num_batches_per_epoch,
    )
def create_test_dataloader(
    config: PretrainedConfig,
    freq,
    data,
    batch_size: int,
    **kwargs,
):
    PREDICTION_INPUT_NAMES = [
        "static_categorical_features",
        "static_real_features",
        "past_time_features",
        "past_values",
        "past_observed_mask",
        "future_time_features",
        ]
    transformation = create_transformation(freq, config)
    transformed_data = transformation.apply(data, is_train=False)
    # we create a Test Instance splitter which will sample the very last 
    # context window seen during training only for the encoder.
    instance_splitter = create_instance_splitter(
        config, "test"
    ) + SelectFields(PREDICTION_INPUT_NAMES)
    # we apply the transformations in test mode
    testing_instances = instance_splitter.apply(transformed_data, is_train=False)
    # This returns a Dataloader which will go over the dataset once.
    return DataLoader(IterableDataset(testing_instances), batch_size=batch_size, **kwargs)
train_dataloader = create_train_dataloader(
    config=config, 
    freq=freq, 
    data=train_dataset, 
    batch_size=256, 
    num_batches_per_epoch=100,
)
test_dataloader = create_test_dataloader(
    config=config, 
    freq=freq, 
    data=test_dataset,
    batch_size=64,
)

让咱们检查第一批:

batch = next(iter(train_dataloader))
for k,v in batch.items():
  print(k,v.shape, v.type())
>>> static_categorical_features torch.Size([256, 1]) torch.LongTensor
    static_real_features torch.Size([256, 1]) torch.FloatTensor
    past_time_features torch.Size([256, 181, 2]) torch.FloatTensor
    past_values torch.Size([256, 181]) torch.FloatTensor
    past_observed_mask torch.Size([256, 181]) torch.FloatTensor
    future_time_features torch.Size([256, 24, 2]) torch.FloatTensor
    future_values torch.Size([256, 24]) torch.FloatTensor
    future_observed_mask torch.Size([256, 24]) torch.FloatTensor

能够看出,咱们没有将 input_idsattention_mask 供给给编码器 (练习 NLP 模型时也是这种状况),而是供给 past_values,以及 past_observed_maskpast_time_featuresstatic_categorical_featuresstatic_real_features 几项数据。

解码器的输入包括 future_valuesfuture_observed_maskfuture_time_featuresfuture_values 能够看作等同于 NLP 练习中的 decoder_input_ids

咱们能够参阅 Time Series Transformer 文档 以取得对它们中每一个的详细解释。

前向传达

让咱们对刚刚创立的批次履行一次前向传达:

# perform forward pass
outputs = model(
    past_values=batch["past_values"],
    past_time_features=batch["past_time_features"],
    past_observed_mask=batch["past_observed_mask"],
    static_categorical_features=batch["static_categorical_features"],
    static_real_features=batch["static_real_features"],
    future_values=batch["future_values"],
    future_time_features=batch["future_time_features"],
    future_observed_mask=batch["future_observed_mask"],
    output_hidden_states=True
)
print("Loss:", outputs.loss.item())
>>> Loss: 9.141253471374512

现在,该模型返回了丢失值。这是因为解码器会主动将 future_values 向右移动一个位置以取得标签。这答应计算猜测结果和标签值之间的误差。

另请留意,解码器运用 Causal Mask 来避免猜测未来,因为它需求猜测的值在 future_values 张量中。

练习模型

是时分练习模型了!咱们将运用标准的 PyTorch 练习循环。

这里咱们用到了 Accelerate 库,它会主动将模型、优化器和数据加载器放置在恰当的 device 上。

from accelerate import Accelerator
from torch.optim import Adam
accelerator = Accelerator()
device = accelerator.device
model.to(device)
optimizer = Adam(model.parameters(), lr=1e-3)
model, optimizer, train_dataloader = accelerator.prepare(
    model, optimizer, train_dataloader, 
)
for epoch in range(40):
    model.train()
    for batch in train_dataloader:
        optimizer.zero_grad()
        outputs = model(
            static_categorical_features=batch["static_categorical_features"].to(device),
            static_real_features=batch["static_real_features"].to(device),
            past_time_features=batch["past_time_features"].to(device),
            past_values=batch["past_values"].to(device),
            future_time_features=batch["future_time_features"].to(device),
            future_values=batch["future_values"].to(device),
            past_observed_mask=batch["past_observed_mask"].to(device),
            future_observed_mask=batch["future_observed_mask"].to(device),
        )
        loss = outputs.loss
        # Backpropagation
        accelerator.backward(loss)
        optimizer.step()
        print(loss.item())

推理

在推理时,建议运用 generate() 方法进行自回归生成,类似于 NLP 模型。

猜测的进程会从测验实例采样器中取得数据。采样器会将数据集的每个时刻序列的终究 context_length 那么长时刻的数据采样出来,然后输入模型。请留意,这里需求把提前已知的 future_time_features 传递给解码器。

该模型将从猜测散布中自回归采样必定数量的值,并将它们传回解码器终究得到猜测输出:

model.eval()
forecasts = []
for batch in test_dataloader:
    outputs = model.generate(
        static_categorical_features=batch["static_categorical_features"].to(device),
        static_real_features=batch["static_real_features"].to(device),
        past_time_features=batch["past_time_features"].to(device),
        past_values=batch["past_values"].to(device),
        future_time_features=batch["future_time_features"].to(device),
        past_observed_mask=batch["past_observed_mask"].to(device),
    )
    forecasts.append(outputs.sequences.cpu().numpy())

该模型输出一个标明结构的张量 (batch_size, number of samples, prediction length)。

下面的输出阐明: 关于巨细为 64 的批次中的每个示例,咱们将取得接下来 24 个月内的 100 个可能的值:

forecasts[0].shape
>>> (64, 100, 24)

咱们将笔直堆叠它们,以取得测验数据会集一切时刻序列的猜测:

forecasts = np.vstack(forecasts)
print(forecasts.shape)
>>> (366, 100, 24)

咱们能够依据测验会集存在的样本值,依据真实状况评价生成的猜测。这里咱们运用数据会集的每个时刻序列的 MASE 和 sMAPE 目标 (metrics) 来评价:

from evaluate import load
from gluonts.time_feature import get_seasonality
mase_metric = load("evaluate-metric/mase")
smape_metric = load("evaluate-metric/smape")
forecast_median = np.median(forecasts, 1)
mase_metrics = []
smape_metrics = []
for item_id, ts in enumerate(test_dataset):
    training_data = ts["target"][:-prediction_length]
    ground_truth = ts["target"][-prediction_length:]
    mase = mase_metric.compute(
        predictions=forecast_median[item_id], 
        references=np.array(ground_truth), 
        training=np.array(training_data), 
        periodicity=get_seasonality(freq))
    mase_metrics.append(mase["mase"])
    smape = smape_metric.compute(
        predictions=forecast_median[item_id], 
        references=np.array(ground_truth), 
    )
    smape_metrics.append(smape["smape"])
print(f"MASE: {np.mean(mase_metrics)}")
>>> MASE: 1.361636922541396
print(f"sMAPE: {np.mean(smape_metrics)}")
>>> sMAPE: 0.17457818831512306

咱们还能够单独制作数据会集每个时刻序列的结果目标,并观察到其间少数时刻序列对终究测验目标的影响很大:

plt.scatter(mase_metrics, smape_metrics, alpha=0.3)
plt.xlabel("MASE")
plt.ylabel("sMAPE")
plt.show()

下篇 | 使用  Transformers 进行概率时间序列预测

为了依据根本现实测验数据制作任何时刻序列的猜测,咱们定义了以下辅助绘图函数:

import matplotlib.dates as mdates
def plot(ts_index):
    fig, ax = plt.subplots()
    index = pd.period_range(
        start=test_dataset[ts_index][FieldName.START],
        periods=len(test_dataset[ts_index][FieldName.TARGET]),
        freq=freq,
    ).to_timestamp()
    # Major ticks every half year, minor ticks every month,
    ax.xaxis.set_major_locator(mdates.MonthLocator(bymonth=(1, 7)))
    ax.xaxis.set_minor_locator(mdates.MonthLocator())
    ax.plot(
        index[-2*prediction_length:], 
        test_dataset[ts_index]["target"][-2*prediction_length:],
        label="actual",
    )
    plt.plot(
        index[-prediction_length:], 
        np.median(forecasts[ts_index], axis=0),
        label="median",
    )
    plt.fill_between(
        index[-prediction_length:],
        forecasts[ts_index].mean(0) - forecasts[ts_index].std(axis=0), 
        forecasts[ts_index].mean(0) + forecasts[ts_index].std(axis=0), 
        alpha=0.3, 
        interpolate=True,
        label="+/- 1-std",
    )
    plt.legend()
    plt.show()

例如:

plot(334)

下篇 | 使用  Transformers 进行概率时间序列预测

咱们如何与其他模型进行比较? Monash Time Series Repository 有一个测验集 MASE 目标的比较表。咱们能够将自己的结果添加到其间作比较:

Dataset SES Theta TBATS ETS (DHR-)ARIMA PR CatBoost FFNN DeepAR N-BEATS WaveNet Transformer (Our)
Tourism Monthly 3.306 1.649 1.751 1.526 1.589 1.678 1.699 1.582 1.409 1.574 1.482 1.361

请留意,咱们的模型打败了一切已知的其他模型 (另请参见相应 论文 中的表 2) ,而且咱们没有做任何超参数优化。咱们只是花了 40 个完整练习调参周期来练习 Transformer。

当然,咱们应该谦虚。从前史开展的视点来看,现在以为神经网络处理时刻序列猜测问题是正途,就比如当年的论文得出了 “你需求的就是 XGBoost” 的定论。咱们只是很好奇,想看看神经网络能带咱们走多远,以及 Transformer 是否会在这个范畴发挥作用。这个特定的数据集好像标明它肯定值得探索。

下一步

咱们鼓舞读者测验咱们的 Jupyter Notebook 和来自 Hugging Face Hub 的其他时刻序列数据集,并替换恰当的频率和猜测长度参数。关于您的数据集,需求将它们转化为 GluonTS 的惯用格式,在他们的 文档 里有非常明晰的阐明。咱们还预备了一个示例 Notebook,向您展示如何将数据集转化为 Hugging Face 数据集格式。

正如时刻序列研究人员所知,人们对“将依据 Transformer 的模型应用于时刻序列”问题很感兴趣。传统 vanilla Transformer 只是很多依据留意力 (Attention) 的模型之一,因而需求向库中弥补更多模型。

现在没有什么能妨碍咱们持续探索对多变量时刻序列 (multivariate time series) 进行建模,但是为此需求运用多变量散布头 (multivariate distribution head) 来实例化模型。现在已经支持了对角独立散布 (diagonal independent distributions),后续会增加其他多元散布支持。请持续重视未来的博客文章以及其间的教程。

路线图上的另一件事是时刻序列分类。这需求将带有分类头的时刻序列模型添加到库中,例如用于反常检测这类使命。

当前的模型会假设日期时刻和时刻序列值都存在,但在现实中这可能不能完全满足。例如 WOODS 给出的神经科学数据集。因而,咱们还需求对当前模型进行泛化,使某些输入在整个流水线中可选。

终究,NLP/CV 范畴从大型预练习模型 中获益匪浅,但据咱们所知,时刻序列范畴并非如此。依据 Transformer 的模型好像是这一研究方向的必然之选,咱们刻不容缓地想看看研究人员和从业者会发现哪些打破!


英文原文: Probabilistic Time Series Forecasting with Transformers

译者、排版: zhongdongy (阿东)