简介

几个月前,咱们介绍了 Informer 这个模型,相关论文 (Zhou, Haoyi, et al., 2021) 是一篇获得了 AAAI 2021 最佳论文奖的时刻序列论文。咱们也展现了一个运用 Informer 进行多变量概率猜测的比方。在本文中,咱们讨论以下问题: Transformer 模型对时刻序列猜测真的有用吗?咱们给出的答案是,它们真的有用。

首要,咱们将会供给一些试验证据,展现其真正的有用性。咱们的比照试验将标明, DLinear 这个简略线性模型并没有像说的那样比 transformer 好。当咱们在同等模型巨细和相同设定的情况下比照时,咱们发现根据 transformer 的模型在咱们重视的测验规范上体现得更好。其次,咱们将会介绍 Autoformer 模型,相关论文 (Wu, Haixu, et al., 2021) 在 Informer 模型问世后发表在 NeurIPS 2021 上。Autoformer 的模型现在现已能够在 Transformers 中 运用。终究,咱们还会讨论 DLinear 模型,该模型是一个简略的前向网络,运用了 Autoformer 中的分化层 (decomposition layer)。DLinear 模型是在 Are Transformers Effective for Time Series Forecasting? 这篇论文中提出的,文中宣称其功能在时刻序列猜测范畴逾越了 transformer 系列的算法。

下面咱们开端!

评价 Transformer 系列模型 和 DLinear 模型

在 AAAI 2023 的论文 Are Transformers Effective for Time Series Forecasting? 中,作者宣称 transformer 系列模型在时刻序列猜测方面并不有用。他们拿根据 transformer 的模型与一个简略的线性模型 DLinear 作比照。DLinear 运用了 Autoformer 中的 decomposition layer 结构 (下文将会介绍),作者宣称其功能逾越了根据 transformer 的模型。但现实真的是这样吗?咱们接下来看看。

Dataset Autoformer (uni.) MASE DLinear MASE
Traffic 0.910 0.965
Exchange-Rate 1.087 1.690
Electricity 0.751 0.831

上表展现了 Autoformer 和 DLinear 在三个论文中用到的数据集上的体现。成果阐明 Autoformer 在三个数据集上体现都逾越了 DLinear 模型。

接下来,咱们将介绍 Autoformer 和 DLinear 模型,演示咱们怎么在上表 Traffic 数据集上比照它们的功能,并为成果供给一些可解释性。

先说结论: 一个简略的线性模型可能在某些特定情况下更有优势,但可能无法像 transformer 之类的复杂模型那样处理协方差信息。

Autoformer 具体介绍

Autoformer 根据传统的时刻序列办法: 把时刻序列分化为时节性 (seasonality) 以及趋势 – 周期 (trend-cycle) 这些要素。这经过加入分化层 ( Decomposition Layer ) 来完成,以此来增强模型获取这些信息的才能。此外,Autoformer 中还首创了自相关 (auto-correlation) 机制,替换掉了传统 transformer 中的自留意力 (self-attention)。该机制使得模型能够利用留意力机制中周期性的依赖,提升了整体功能。

下面,咱们将深入探讨 Autoformer 的这两大首要贡献: 分化层 ( Decomposition Layer ) 和自相关机制 ( Autocorrelation Mechanism )。相关代码也会供给出来。

分化层

分化是一个时刻序列范畴十分常用的办法,但在 Autoformer 曾经都没有被密布集成入深度学习模型中。咱们先简略介绍这一概念,随后会运用 PyTorch 代码演示这一思路是怎么应用到 Autoformer 中的。

时刻序列分化

在时刻序列分析中,分化 (decomposition) 是把一个时刻序列拆分红三个系统性要素的办法: 趋势周期 (trend-cycle) 、时节性变动 (seasonal variation) 和随机波动 (random fluctuations)。趋势要素代表了时刻序列的长时刻走势方向; 时节要素反映了一些重复出现的方式,例如以一年或一季度为周期出现的方式; 而随机 (无规律) 要素则反映了数据中无法被上述两种要素解释的随机噪声。

有两种主流的分化办法: 加法分化和乘法分化,这在 statsmodels 这个库里都有完成。经过分化时刻序列到这三个要素,咱们能更好地理解和建模数据中潜在的方式。

但怎样把分化集成进 transformer 结构呢?咱们能够参阅参阅 Autoformer 的做法。

Autoformer 中的分化

据说,Transformer 不能有效地进行时间序列预测?
Autoformer 结构 (来自论文)

Autoformer 把分化作为一个内部核算操作集成到模型中,如上图所示。能够看到,编码器和解码器都运用了分化模块来调集 trend-cyclical 信息,并从序列中渐进地提取 seasonal 信息。这种内部分化的概念现已从 Autoformer 中展现了其有用性。所以很多其它的时刻序列论文也开端选用这一办法,例如 FEDformer (Zhou, Tian, et al., ICML 2022) 和 DLinear (Zeng, Ailing, et al., AAAI 2023),这更阐明了其在时刻序列建模中的意义。

现在,咱们正式地给分化层做出界说:

对一个长度为 LL 的序列 X∈RLd\mathcal{X} \in \mathbb{R}^{L \times d},分化层回来的 Xtrend和Xseasonal\mathcal{X}_\textrm{trend} 和 \mathcal{X}_\textrm{seasonal} 界说如下:

Xtrend=AvgPool(Padding(X))Xseasonal=X−Xtrend\mathcal{X}_\textrm{trend} = \textrm{AvgPool(Padding(} \mathcal{X} \textrm{))} \\ \mathcal{X}_\textrm{seasonal} = \mathcal{X} – \mathcal{X}_\textrm{trend}

对应的 PyTorch 代码完成是:

import torch
from torch import nn
class DecompositionLayer(nn.Module):
    """
    Returns the trend and the seasonal parts of the time series.
    """
    def __init__(self, kernel_size):
        super().__init__()
        self.kernel_size = kernel_size
        self.avg = nn.AvgPool1d(kernel_size=kernel_size, stride=1, padding=0) # moving average
    def forward(self, x):
        """Input shape: Batch x Time x EMBED_DIM"""
        # padding on the both ends of time series
        num_of_pads = (self.kernel_size - 1) // 2
        front = x[:, 0:1, :].repeat(1, num_of_pads, 1)
        end = x[:, -1:, :].repeat(1, num_of_pads, 1)
        x_padded = torch.cat([front, x, end], dim=1)
        # calculate the trend and seasonal part of the series
        x_trend = self.avg(x_padded.permute(0, 2, 1)).permute(0, 2, 1)
        x_seasonal = x - x_trend
        return x_seasonal, x_trend

可见,代码十分简略,能够很方便地用在其它模型中,正如 DLinear 那样。下面,咱们解说第二个立异点: 留意力 (自相关) 机制

留意力 (自相关) 机制

据说,Transformer 不能有效地进行时间序列预测?
最原始的留意力机制和自相关机制 (图片来自论文)

除了分化层之外,Autoformer 还运用了一个原创的自相关 (autocorrelation) 机制,能够完美替换自留意力 (self-attention) 机制。在 最原始的时刻序列 transformer 模型 中,留意力权重是在时域核算并逐点聚合的。而从上图中能够看出,Autoformer 不同的是它在频域核算这些 (运用 快速傅立叶改换),然后经过期延聚合它们。

接下来部分,咱们深入细节,并运用代码作出解说。

时域的留意力机制

据说,Transformer 不能有效地进行时间序列预测?
借助 FFT 在频域核算留意力权重 (图片来自论文)

理论上讲,给定一个时刻延迟 \tau,一个离散变量的 自相关性 yy 能够用来衡量这个变量当时时刻 tt 的值和过去时刻 t−t-\tau 的值之间的“关系”(皮尔逊相关性,pearson correlation):

Autocorrelation()=Corr(yt,yt−)\textrm{Autocorrelation}(\tau) = \textrm{Corr}(y_t, y_{t-\tau})

运用自相关性,Autoformer 提取了 query 和 key 之间根据频域的相互依赖,而不是像之前那样两两之间的点乘。能够把这个操作看成是自留意力中 QKTQK^T 的替换。

实际操作中,query 和 key 之间的自相关是经过 FFT 一次性针对 一切时刻延迟 核算出来的。经过这种办法,自相关机制达到了 O(Llog⁡L)O(L \log L) 的时刻复杂度 ( LL 是输入时刻长度),这个速度和 Informer 的 ProbSparse attention 接近。值得一提的是,运用 FFT 核算自相关性的理论基础是 Wiener–Khinchin theorem,这儿咱们不细讲了。

现在,咱们来看看相应的 PyTorch 代码:

import torch
def autocorrelation(query_states, key_states):
    """
    Computes autocorrelation(Q,K) using `torch.fft`.
    Think about it as a replacement for the QK^T in the self-attention.
    Assumption: states are resized to same shape of [batch_size, time_length, embedding_dim].
    """
    query_states_fft = torch.fft.rfft(query_states, dim=1)
    key_states_fft = torch.fft.rfft(key_states, dim=1)
    attn_weights = query_states_fft * torch.conj(key_states_fft)
    attn_weights = torch.fft.irfft(attn_weights, dim=1)
    return attn_weights

代码十分简洁! 请留意这只是 autocorrelation(Q,K) 的部分完成,完好完成请参阅 Transformers 中的代码。

接下来,咱们将看到怎么运用时延值聚合咱们的 attn_weights ,这个进程被称为时延聚合 ( Time Delay Aggregation )。

时延聚合

据说,Transformer 不能有效地进行时间序列预测?
经过期延来聚合,图片来自 Autoformer 论文

咱们用 RQ,K\mathcal{R_{Q,K}} 来表明自相关 (即 attn_weights )。那么问题是: 咱们应该怎么聚合这些 RQ,K(1),RQ,K(2),…,RQ,K(k)\mathcal{R_{Q,K}}(\tau_1), \mathcal{R_{Q,K}}(\tau_2), …, \mathcal{R_{Q,K}}(\tau_k)V\mathcal{V} 上面?在规范的自留意力机制中,这种聚合经过点乘完结。但在 Autoformer 中,咱们运用了一种不同的办法。首要咱们在时延 1,2,…k\tau_1, \tau_2, … \tau_k 上对齐 V\mathcal{V},核算在这些时延下它对应的值,这个操作叫作 Rolling 。接下来,咱们将对齐的 V\mathcal{V} 和自相关的值进行逐点的乘法运算。在上图中,你能够看到在左边是根据时延对 V\mathcal{V} 进行的 Rolling 操作; 而右边就展现了与自相关进行的逐点乘法。

整个进程能够用以下公式总结:

1,2,…k=argTop-k(RQ,K())RQ,K(1),RQ,K(2),…,RQ,K(k)=Softmax(RQ,K(1),RQ,K(2),…,RQ,K(k))Autocorrelation-Attention=∑i=1kRoll(V,i)⋅RQ,K(i)\tau_1, \tau_2, … \tau_k = \textrm{arg Top-k}(\mathcal{R_{Q,K}}(\tau)) \ \hat{\mathcal{R}}\mathcal{ _{Q,K}}(\tau _1), \hat{\mathcal{R}}\mathcal{_ {Q,K}}(\tau _2), …, \hat{\mathcal{R}}\mathcal{_ {Q,K}}(\tau _k) = \textrm{Softmax}(\mathcal{R_ {Q,K}}(\tau _1), \mathcal{R_ {Q,K}}(\tau_2), …, \mathcal{R_ {Q,K}}(\tau_k)) \ \textrm{Autocorrelation-Attention} = \sum_{i=1}^k \textrm{Roll}(\mathcal{V}, \tau_i) \cdot \hat{\mathcal{R}}\mathcal{_{Q,K}}(\tau _i)

便是这样!需求留意的是,kk 是一个超参数,咱们称之为 autocorrelation_factor (相似于 Informer 里的 sampling_factor ) ; 而 softmax 是在乘法操作之前运用到自相关上面的。

现在,咱们现已能够看看终究的代码了:

import torch
import math
def time_delay_aggregation(attn_weights, value_states, autocorrelation_factor=2):
    """
    Computes aggregation as value_states.roll(delay)* top_k_autocorrelations(delay).
    The final result is the autocorrelation-attention output.
    Think about it as a replacement of the dot-product between attn_weights and value states.
    The autocorrelation_factor is used to find top k autocorrelations delays.
    Assumption: value_states and attn_weights shape: [batch_size, time_length, embedding_dim]
    """
    bsz, num_heads, tgt_len, channel = ...
    time_length = value_states.size(1)
    autocorrelations = attn_weights.view(bsz, num_heads, tgt_len, channel)
    # find top k autocorrelations delays
    top_k = int(autocorrelation_factor * math.log(time_length))
    autocorrelations_mean = torch.mean(autocorrelations, dim=(1, -1)) # bsz x tgt_len
    top_k_autocorrelations, top_k_delays = torch.topk(autocorrelations_mean, top_k, dim=1)
    # apply softmax on the channel dim
    top_k_autocorrelations = torch.softmax(top_k_autocorrelations, dim=-1) # bsz x top_k
    # compute aggregation: value_states.roll(delay)* top_k_autocorrelations(delay)
    delays_agg = torch.zeros_like(value_states).float() # bsz x time_length x channel
    for i in range(top_k):
        value_states_roll_delay = value_states.roll(shifts=-int(top_k_delays[i]), dims=1)
        top_k_at_delay = top_k_autocorrelations[:, i]
        # aggregation
        top_k_resized = top_k_at_delay.view(-1, 1, 1).repeat(num_heads, tgt_len, channel)
        delays_agg += value_states_roll_delay * top_k_resized
    attn_output = delays_agg.contiguous()
    return attn_output

完结!Autoformer 模型现在现已能够在 Transformers 中 运用 了,名字就叫 AutoformerModel

针对这个模型,咱们要比照单变量 transformer 模型与 DLinear 的功能,DLinear 实质也是单变量的。后面咱们也会展现两个多变量 transformer 模型的功能 (在同一数据上练习的)。

DLinear 具体介绍

实际上,DLinear 结构十分简略,仅仅是从 Autoformer 的 DecompositionLayer 上衔接全衔接层。它运用 DecompositionLayer 来分化输入的国际序列到残差部分 (时节性) 和趋势部分。前向进程中,每个部分都被输入到各自的线性层,并被映射成 prediction_length 长度的输出。终究的输出便是两个输入的和:

def forward(self, context):
    seasonal, trend = self.decomposition(context)
    seasonal_output = self.linear_seasonal(seasonal)
    trend_output = self.linear_trend(trend)
    return seasonal_output + trend_output

在这种设定下,首要咱们把输入的序列映射成 prediction-length * hidden 维度 (经过 linear_seasonallinear_trend 两个层) ; 得到的成果会被相加起来,并转换为 (prediction_length, hidden) 形状; 终究,维度为 hidden 的隐性表征会被映射到某种散布的参数上。

在咱们的测评中,咱们运用 GluonTS 中 DLinear 的完成。

示例: Traffic 数据集

咱们期望用试验成果展现库中根据 transformer 模型的功能,这儿咱们运用 Traffic 数据集,该数据集有 862 条时刻序列数据。咱们将在每条时刻序列上练习一个共享的模型 (单变量设定)。每个时刻序列都代表了一个传感器的占有率值,值的范围在 0 到 1 之间。下面的这些超参数咱们将在一切模型中保持一致。

# Traffic prediction_length is 24. Reference:
# https://github.com/awslabs/gluonts/blob/6605ab1278b6bf92d5e47343efcf0d22bc50b2ec/src/gluonts/dataset/repository/_lstnet.py#L105
prediction_length = 24
context_length = prediction_length*2
batch_size = 128
num_batches_per_epoch = 100
epochs = 50
scaling = "std"

运用的 transformer 模型都很小:

encoder_layers=2
decoder_layers=2
d_model=16

这儿咱们不再解说怎么用 Autoformer 练习模型,读者能够参阅之前两篇博客 (TimeSeriesTransformer 和 Informer) 并替换模型为 Autoformer 、替换数据集为 traffic 。咱们也练习了现成的模型放在 HuggingFace Hub 上,稍后的评测将会运用这儿的模型。

载入数据集

首要安装必要的库:

!pip install -q transformers datasets evaluate accelerate "gluonts[torch]" ujson tqdm

traffic 数据集 (Lai et al. (2017)) 包含了旧金山的交通数据。它包含 862 条以小时为时刻单位的时刻序列,代表了路途占有率的数值,其数值范围为 [0,1][0, 1],记录了旧金山湾区高速公路从 2015 年到 2016 年的数据。

from gluonts.dataset.repository.datasets import get_dataset
dataset = get_dataset("traffic")
freq = dataset.metadata.freq
prediction_length = dataset.metadata.prediction_length

咱们可视化一条时刻序列看看,并画出练习和测验集的划分:

import matplotlib.pyplot as plt
train_example = next(iter(dataset.train))
test_example = next(iter(dataset.test))
num_of_samples = 4*prediction_length
figure, axes = plt.subplots()
axes.plot(train_example["target"][-num_of_samples:], color="blue")
axes.plot(
    test_example["target"][-num_of_samples - prediction_length :],
    color="red",
    alpha=0.5,
)
plt.show()

据说,Transformer 不能有效地进行时间序列预测?

界说练习和测验集划分:

train_dataset = dataset.train
test_dataset = dataset.test

界说数据改换

接下来,咱们界说数据的改换,尤其是时刻相关特征的制造 (根据数据集本身和一些普适做法)。

咱们界说一个 Chain ,代表 GluonTS 中一系列的改换 (这相似图像里 torchvision.transforms.Compose )。这让咱们将一系列改换集成到一个处理流水线中。

下面代码中,每个改换都增加了注释,用以阐明它们的作用。从更高层次讲,咱们将遍历每一个时刻序列,并增加或删去一些特征:

from transformers import PretrainedConfig
from gluonts.time_feature import time_features_from_frequency_str
from gluonts.dataset.field_names import FieldName
from gluonts.transform import (
    AddAgeFeature,
    AddObservedValuesIndicator,
    AddTimeFeatures,
    AsNumpyArray,
    Chain,
    ExpectedNumInstanceSampler,
    RemoveFields,
    SelectFields,
    SetField,
    TestSplitSampler,
    Transformation,
    ValidationSplitSampler,
    VstackFeatures,
    RenameFields,
)
def create_transformation(freq: str, config: PretrainedConfig) -> Transformation:
    # create a list of fields to remove later
    remove_field_names = []
    if config.num_static_real_features == 0:
        remove_field_names.append(FieldName.FEAT_STATIC_REAL)
    if config.num_dynamic_real_features == 0:
        remove_field_names.append(FieldName.FEAT_DYNAMIC_REAL)
    if config.num_static_categorical_features == 0:
        remove_field_names.append(FieldName.FEAT_STATIC_CAT)
    return Chain(
        # step 1: remove static/dynamic fields if not specified
        [RemoveFields(field_names=remove_field_names)]
        # step 2: convert the data to NumPy (potentially not needed)
        + (
            [
                AsNumpyArray(
                    field=FieldName.FEAT_STATIC_CAT,
                    expected_ndim=1,
                    dtype=int,
                )
            ]
            if config.num_static_categorical_features > 0
            else []
        )
        + (
            [
                AsNumpyArray(
                    field=FieldName.FEAT_STATIC_REAL,
                    expected_ndim=1,
                )
            ]
            if config.num_static_real_features > 0
            else []
        )
        + [
            AsNumpyArray(
                field=FieldName.TARGET,
                # we expect an extra dim for the multivariate case:
                expected_ndim=1 if config.input_size == 1 else 2,
            ),
            # step 3: handle the NaN's by filling in the target with zero
            # and return the mask (which is in the observed values)
            # true for observed values, false for nan's
            # the decoder uses this mask (no loss is incurred for unobserved values)
            # see loss_weights inside the xxxForPrediction model
            AddObservedValuesIndicator(
                target_field=FieldName.TARGET,
                output_field=FieldName.OBSERVED_VALUES,
            ),
            # step 4: add temporal features based on freq of the dataset
            # these serve as positional encodings
            AddTimeFeatures(
                start_field=FieldName.START,
                target_field=FieldName.TARGET,
                output_field=FieldName.FEAT_TIME,
                time_features=time_features_from_frequency_str(freq),
                pred_length=config.prediction_length,
            ),
            # step 5: add another temporal feature (just a single number)
            # tells the model where in the life the value of the time series is
            # sort of running counter
            AddAgeFeature(
                target_field=FieldName.TARGET,
                output_field=FieldName.FEAT_AGE,
                pred_length=config.prediction_length,
                log_scale=True,
            ),
            # step 6: vertically stack all the temporal features into the key FEAT_TIME
            VstackFeatures(
                output_field=FieldName.FEAT_TIME,
                input_fields=[FieldName.FEAT_TIME, FieldName.FEAT_AGE]
                + (
                    [FieldName.FEAT_DYNAMIC_REAL]
                    if config.num_dynamic_real_features > 0
                    else []
                ),
            ),
            # step 7: rename to match HuggingFace names
            RenameFields(
                mapping={
                    FieldName.FEAT_STATIC_CAT: "static_categorical_features",
                    FieldName.FEAT_STATIC_REAL: "static_real_features",
                    FieldName.FEAT_TIME: "time_features",
                    FieldName.TARGET: "values",
                    FieldName.OBSERVED_VALUES: "observed_mask",
                }
            ),
        ]
    )

界说 InstanceSplitter

咱们需求创立一个 InstanceSplitter ,用来给练习、验证和测验集供给采样窗口,得到一段时刻的内的时刻序列 (咱们不行能把完好的整段数据输入给模型,究竟时刻太长,并且也有内存限制)。

这个实例分割东西每一次将会随机选取 context_length 长度的数据,以及紧随其后的 prediction_length 长度的窗口,并为相应的窗口标注 past_future_ 。这样能够确保 values 能被分为 past_values 和随后的 future_values ,各自作为编码器和解码器的输入。除了 values ,关于 time_series_fields 中的其它 key 对应的数据也是相同。

from gluonts.transform import InstanceSplitter
from gluonts.transform.sampler import InstanceSampler
from typing import Optional
def create_instance_splitter(
    config: PretrainedConfig,
    mode: str,
    train_sampler: Optional[InstanceSampler] = None,
    validation_sampler: Optional[InstanceSampler] = None,
) -> Transformation:
    assert mode in ["train", "validation", "test"]
    instance_sampler = {
        "train": train_sampler
        or ExpectedNumInstanceSampler(
            num_instances=1.0, min_future=config.prediction_length
        ),
        "validation": validation_sampler
        or ValidationSplitSampler(min_future=config.prediction_length),
        "test": TestSplitSampler(),
    }[mode]
    return InstanceSplitter(
        target_field="values",
        is_pad_field=FieldName.IS_PAD,
        start_field=FieldName.START,
        forecast_start_field=FieldName.FORECAST_START,
        instance_sampler=instance_sampler,
        past_length=config.context_length + max(config.lags_sequence),
        future_length=config.prediction_length,
        time_series_fields=["time_features", "observed_mask"],
    )

创立 PyTorch 的 DataLoader

接下来就该创立 PyTorch DataLoader 了: 这让咱们能把数据整理成 batch 的方式,即 (input, output) 对的方式,或者说是 ( past_values , future_values ) 的方式。

from typing import Iterable
import torch
from gluonts.itertools import Cyclic, Cached
from gluonts.dataset.loader import as_stacked_batches
def create_train_dataloader(
    config: PretrainedConfig,
    freq,
    data,
    batch_size: int,
    num_batches_per_epoch: int,
    shuffle_buffer_length: Optional[int] = None,
    cache_data: bool = True,
 **kwargs,
) -> Iterable:
    PREDICTION_INPUT_NAMES = [
        "past_time_features",
        "past_values",
        "past_observed_mask",
        "future_time_features",
    ]
    if config.num_static_categorical_features > 0:
        PREDICTION_INPUT_NAMES.append("static_categorical_features")
    if config.num_static_real_features > 0:
        PREDICTION_INPUT_NAMES.append("static_real_features")
    TRAINING_INPUT_NAMES = PREDICTION_INPUT_NAMES + [
        "future_values",
        "future_observed_mask",
    ]
    transformation = create_transformation(freq, config)
    transformed_data = transformation.apply(data, is_train=True)
    if cache_data:
        transformed_data = Cached(transformed_data)
    # we initialize a Training instance
    instance_splitter = create_instance_splitter(config, "train")
    # the instance splitter will sample a window of
    # context length + lags + prediction length (from the 366 possible transformed time series)
    # randomly from within the target time series and return an iterator.
    stream = Cyclic(transformed_data).stream()
    training_instances = instance_splitter.apply(stream, is_train=True)
    return as_stacked_batches(
        training_instances,
        batch_size=batch_size,
        shuffle_buffer_length=shuffle_buffer_length,
        field_names=TRAINING_INPUT_NAMES,
        output_type=torch.tensor,
        num_batches_per_epoch=num_batches_per_epoch,
    )
def create_test_dataloader(
    config: PretrainedConfig,
    freq,
    data,
    batch_size: int,
 **kwargs,
):
    PREDICTION_INPUT_NAMES = [
        "past_time_features",
        "past_values",
        "past_observed_mask",
        "future_time_features",
    ]
    if config.num_static_categorical_features > 0:
        PREDICTION_INPUT_NAMES.append("static_categorical_features")
    if config.num_static_real_features > 0:
        PREDICTION_INPUT_NAMES.append("static_real_features")
    transformation = create_transformation(freq, config)
    transformed_data = transformation.apply(data, is_train=False)
    # we create a Test Instance splitter which will sample the very last
    # context window seen during training only for the encoder.
    instance_sampler = create_instance_splitter(config, "test")
    # we apply the transformations in test mode
    testing_instances = instance_sampler.apply(transformed_data, is_train=False)
    return as_stacked_batches(
        testing_instances,
        batch_size=batch_size,
        output_type=torch.tensor,
        field_names=PREDICTION_INPUT_NAMES,
    )

在 Autoformer 上评测

咱们现已在这个数据集上预练习了一个 Autoformer 了,所以咱们能够直接拿来模型在测验集上测一下:

from transformers import AutoformerConfig, AutoformerForPrediction
config = AutoformerConfig.from_pretrained("kashif/autoformer-traffic-hourly")
model = AutoformerForPrediction.from_pretrained("kashif/autoformer-traffic-hourly")
test_dataloader = create_test_dataloader(
    config=config,
    freq=freq,
    data=test_dataset,
    batch_size=64,
)

在推理时,咱们运用模型的 generate() 办法来猜测 prediction_length 步的未来数据,根据最近运用的对应时刻序列的窗口长度。

from accelerate import Accelerator
accelerator = Accelerator()
device = accelerator.device
model.to(device)
model.eval()
forecasts_ = []
for batch in test_dataloader:
    outputs = model.generate(
        static_categorical_features=batch["static_categorical_features"].to(device)
        if config.num_static_categorical_features > 0
        else None,
        static_real_features=batch["static_real_features"].to(device)
        if config.num_static_real_features > 0
        else None,
        past_time_features=batch["past_time_features"].to(device),
        past_values=batch["past_values"].to(device),
        future_time_features=batch["future_time_features"].to(device),
        past_observed_mask=batch["past_observed_mask"].to(device),
    )
    forecasts_.append(outputs.sequences.cpu().numpy())

模型输出的数据形状是 ( batch_size , number of samples , prediction length , input_size )。

在下面这个比方中,咱们为猜测接下来 24 小时的交通数据而得到了 100 条可能的数值,而 batch size 是 64:

forecasts_[0].shape
>>> (64, 100, 24)

咱们在垂直方向把它们堆叠起来 (运用 numpy.vstack 函数),以此获取一切测验集时刻序列的猜测: 咱们有 7 个滚动的窗口,所以有 7 * 862 = 6034 个猜测。

import numpy as np
forecasts = np.vstack(forecasts_)
print(forecasts.shape)
>>> (6034, 100, 24)

咱们能够把猜测成果和 ground truth 做个比照。为此,咱们运用 Evaluate 这个库,它里边包含了 MASE 的衡量办法。

咱们对每个时刻序列用这一衡量规范核算相应的值,并算出其平均值:

from tqdm.autonotebook import tqdm
from evaluate import load
from gluonts.time_feature import get_seasonality
mase_metric = load("evaluate-metric/mase")
forecast_median = np.median(forecasts, 1)
mase_metrics = []
for item_id, ts in enumerate(tqdm(test_dataset)):
    training_data = ts["target"][:-prediction_length]
    ground_truth = ts["target"][-prediction_length:]
    mase = mase_metric.compute(
        predictions=forecast_median[item_id],
        references=np.array(ground_truth),
        training=np.array(training_data),
        periodicity=get_seasonality(freq))
    mase_metrics.append(mase["mase"])

所以 Autoformer 模型的成果是:

print(f"Autoformer univariate MASE: {np.mean(mase_metrics):.3f}")
>>> Autoformer univariate MASE: 0.910

咱们还能够画出恣意时刻序列猜测针对其 ground truth 的比照,这需求以下函数:

import matplotlib.dates as mdates
import pandas as pd
test_ds = list(test_dataset)
def plot(ts_index):
    fig, ax = plt.subplots()
    index = pd.period_range(
        start=test_ds[ts_index][FieldName.START],
        periods=len(test_ds[ts_index][FieldName.TARGET]),
        freq=test_ds[ts_index][FieldName.START].freq,
    ).to_timestamp()
    ax.plot(
        index[-5*prediction_length:],
        test_ds[ts_index]["target"][-5*prediction_length:],
        label="actual",
    )
    plt.plot(
        index[-prediction_length:],
        np.median(forecasts[ts_index], axis=0),
        label="median",
    )
    plt.gcf().autofmt_xdate()
    plt.legend(loc="best")
    plt.show()

比方,测验会集第四个时刻序列的成果比照,画出来是这样:

plot(4)

据说,Transformer 不能有效地进行时间序列预测?

在 DLinear 上评测

gluonts 供给了一种 DLinear 的完成,咱们将运用这个完成区练习、测评该算法:

from gluonts.torch.model.d_linear.estimator import DLinearEstimator
# Define the DLinear model with the same parameters as the Autoformer model
estimator = DLinearEstimator(
    prediction_length=dataset.metadata.prediction_length,
    context_length=dataset.metadata.prediction_length*2,
    scaling=scaling,
    hidden_dimension=2,
    batch_size=batch_size,
    num_batches_per_epoch=num_batches_per_epoch,
    trainer_kwargs=dict(max_epochs=epochs)
)

练习模型:

predictor = estimator.train(
    training_data=train_dataset,
    cache_data=True,
    shuffle_buffer_length=1024
)
>>> INFO:pytorch_lightning.callbacks.model_summary:
      | Name  | Type         | Params
    ---------------------------------------
    0 | model | DLinearModel | 4.7 K 
    ---------------------------------------
    4.7 K     Trainable params
    0 Non-trainable params
    4.7 K     Total params
    0.019 Total estimated model params size (MB)
    Training: 0it [00:00, ?it/s]
    ...
    INFO:pytorch_lightning.utilities.rank_zero:Epoch 49, global step 5000: 'train_loss' was not in top 1
    INFO:pytorch_lightning.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=50` reached.

在测验集上评测:

from gluonts.evaluation import make_evaluation_predictions, Evaluator
forecast_it, ts_it = make_evaluation_predictions(
    dataset=dataset.test,
    predictor=predictor,
)
d_linear_forecasts = list(forecast_it)
d_linear_tss = list(ts_it)
evaluator = Evaluator()
agg_metrics, _ = evaluator(iter(d_linear_tss), iter(d_linear_forecasts))

所以 DLinear 对应的成果是:

dlinear_mase = agg_metrics["MASE"]
print(f"DLinear MASE: {dlinear_mase:.3f}")
>>> DLinear MASE: 0.965

同样地,咱们画出猜测成果与 ground truth 的比照曲线图:

def plot_gluonts(index):
    plt.plot(d_linear_tss[index][-4 * dataset.metadata.prediction_length:].to_timestamp(), label="target")
    d_linear_forecasts[index].plot(show_label=True, color='g')
    plt.legend()
    plt.gcf().autofmt_xdate()
    plt.show()
plot_gluonts(4)

据说,Transformer 不能有效地进行时间序列预测?

实际上, traffic 数据集在平日和周末会出现传感器中方式的散布偏移。那咱们还应该怎么做呢?由于 DLinear 没有满足的才能去处理协方差信息,或者说是任何的日期时刻的特征,咱们给出的窗口巨细无法掩盖全面,使得让模型有满足信息去知道当时是在猜测平日数据仍是周末数据。因此模型只会去猜测更为普适的成果,这就导致其猜测散布倾向平日数据,因此导致对周末数据的猜测变得更差。当然,如果咱们给一个满足大的窗口,一个线性模型也能够识别出周末的方式,但当咱们的数据中存在以月或以季度为单位的方式散布时,那就需求更大的窗口了。

总结

所以 transformer 模型和线性模型比照的结论是什么呢?不同模型在测验集上的 MASE 指标如下所示:

Dataset Transformer (uni.) Transformer (mv.) Informer (uni.) Informer (mv.) Autoformer (uni.) DLinear
Traffic 0.876 1.046 0.924 1.131 0.910 0.965

能够看到,咱们去年引进的 最原始的 Transformer 模型 获得了最好的功能指标。其次,多变量模型一般都比对应的单变量模型更差,原因在于序列间的相关性关系一般都较难猜测。额定增加的波动一般会损坏猜测成果,或者模型可能会学到一些过错的相关性信息。最近的一些论文,如 CrossFormer (ICLR 23) 和 CARD 也在测验解决这些 transformer 模型中的问题。 多变量模型一般在练习数据满足大的时候才会体现得好。但当咱们与单变量模型在小的公开数据集上比照时,一般单变量模型会体现得更好。相关于线性模型,一般其相应尺度的单变量 transformer 模型或其它神经网络类模型会体现得更好。

总结来讲,transformer 模型在时刻序列猜测范畴,远没有达到要被筛选的境地。 但是大规模练习数据对它巨大潜力的挖掘是至关重要的,这一点不像 CV 或 NLP 范畴,时刻序列猜测缺乏大规模公开数据集。 当时绝大多数的时刻序列预练习模型也不过是在比如 UCR & UEA 这样的少量样本上练习的。 即便这些基准数据集为时刻序列猜测的开展前进供给了柱石,其较小的规模和泛化性的缺失使得大规模预练习依然面临诸多困难。

所以关于时刻序列猜测范畴来讲,开展大规模、强泛化性的数据集 (就像 CV 范畴的 ImageNet 相同) 是当时最重要的事情。这将会极大地促进时刻序列分析范畴与练习模型的开展研讨,提升与练习模型在时刻序列猜测方面的才能。

声明

咱们诚挚感谢 Lysandre Debut 和 Pedro Cuenca 供给的深刻见解和对本项目的协助。 ❤️


英文原文: hf.co/blog/autofo…

作者: Eli Simhayev, Kashif Rasul, Niels Rogge

译者: Hoi2022

审校/排版: zhongdongy (阿东)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。