Wormhole源码阅读

[TOC]

$\newcommand{\w}{\mathbf{w}} \newcommand{\x}{\mathbf{x}} \newcommand{\v}{\mathbf{v}} \newcommand{\T}{\mathsf{T}}$

核心代码

learn/solver/minibatch_solver.h + iter_solver.h

实现了H/W/S三个角色

MinibatchScheduler : IterScheduler : DataParScheduler : ps::App
MinibatchServer = IterServer : ps::App
MinibatchWorker : IterWorker : DataParWorker : ps::App

wormhole整个train或test均由H驱动,所有任务由WorkloadPool维护,MinibatchScheduler::Run() 是一个runloop,不断把Workload分配给各个空闲的W计算;W从S拉取需要的模型参数,并计算梯度,再去更新S的模型。

MinibatchScheduler::Iterate() 一次pass里的迭代,调用StartDispatch()激活各个W

DataParScheduler::StartDispatch()

  1. 加载所有匹配的文件
  2. 给所有W发送一个空的workload(确认W都在?)

MinibatchScheduler::ProcessResponse() 不断分配workload给W(根据node id)

相关类: MatchFile 根据输入pattern,正则匹配出相应的文件列表 Workload 文件列表,即H派发给W的具体内容,可以序列化到Stream里(Load/Save) WorkloadPool 维护和管理Workload,记录节点和已分配Workload的对应关系(shuffle TODO)

scheduler会根据每次任务的平均耗时来判断某workload是否失败,如果失败会把该workload分发到别的worker继续执行。


server相关的两个重要数据结构:

AdaGradEntry 模型参数 w, V AdaGradHandler 模型更新Push、模型发送 Pull、超参数配置

AdagradServer() 构造函数里从配置文件初始化超参数 和 KVStore

using Server = ps::OnlineServer<float, AdaGradEntry, AdaGradHandle>;
Server s(h);
server_ = s.server();

worker接收workload分配的mini-batch (dmlc::RawBlock),计算梯度。

AsgdWorker::ProcessMinibatch() 一次实际的mini-batch计算,根据workload类型predict/train。

learn/base/minibatch_iter.h

MinibatchIter 接受文件uri,调用Next()读取RowBlock,调用Value()获取之作为一个mini-batch。

根据不同格式做解析,参考data.proto

  • LibSVMParser
  • CriteoParser

learn/base/workload.h

Workload ::Serializable 是Scheduler分配给Worker的任务,可序列化的,其中包括:

  • data_pass 当前的pass,从1开始
  • File列表,其中包括待处理的文件名、格式、分片数、处理分片

learn/base/localizer.h

LocalizerRowBlock数据(包含的feature index并不连续)映射到连续索引(从0开始)。

  template<typename C = unsigned>
  void Localize(const RowBlock<I>& blk,
                data::RowBlockContainer<unsigned> *localized,
                std::vector<I>* uniq_idx = NULL, // unique原始feat_id,升序排列
                std::vector<C>* idx_frq = NULL, // 各feature的出现次数,和uniq_idx顺序一致
                std::vector<unsigned>* field_idx = NULL, // 各feature对应的field_id,和uniq_idx顺序一致
                uint32_t field_bit = 8,
                bool last_bit = false) {
    std::vector<I>* uidx = uniq_idx == NULL ? new std::vector<I>() : uniq_idx;
    CountUniqIndex<C>(blk, uidx, idx_frq, field_idx, field_bit, last_bit);
    // 重新映射blk到localized,新的feature id从1连续递增
    RemapIndex(blk, *uidx, localized);
    if (uniq_idx == NULL) delete uidx;
    Clear();
  }

  std::vector<Pair> pair_;  // k原始feat_id;i在RowBlock中的下标

这里的关键思路是,只要维护uniq_idx即可,从S拉取相应的参数;然后由于新映射后的RowBlock里feature id和uniq_idx的顺序是完全对应的,所以具体计算梯度时候无需再关心原始的feature id。

learn/ffm/async_sgd.h

struct AdaGradEntry {  // 对应一个维度的参数(一次项1个,二次项参数field*dim个)
  // ...

  /// #appearence of this feature in the data
  unsigned fea_cnt = 0;
  /// length of w. if size == 1, then using w itself to store the value to save
  /// memory and avoid unnecessary new (see w_0())
  /// 很tricky,用float*(8字节)一半来保存实际的float(4字节),导出模型的时候要注意!
  int size = 1;
  bool first_order_only = false;
  /// w and V
  float *w = NULL;
  /// square root of the cumulative gradient
  float *sqc_grad = NULL;

为什么sqc_grad 要分配size+1的空间?额外保存z。 一次项用ftrl求解,sqc_grad[0]用来保存$\sqrt{n}$,sqc_grad[1]用来保存z; 二次项用adagrad求解,sqc_grad[2]开始的位置用来保存$\sqrt{G}$。

struct AdaGradHandle : public ISGDHandle {
  inline void Resize(AdaGradEntry& val) {
    // 哪些情况会给参数分配空间呢?需要计算二次项时。
    if (val.fea_cnt > V.thr  // 该特征出现次数大于threshold
        && val.size < V.dim*V.field_number + 1 // 原来空间小于align+1
        && (!l1_shrk       // 没有w=0则V=0的约束(即一次项为0,二次项也为0)
            || val.w_0() != 0)  // 或w!=0
        && (!V.lastBitForFirstOrder  // 取反:feature id最后一位bit作为一次项标示
            || !val.first_order_only)) {  // 或取反:该参数只用作一次项计算
      // 又有点tricky,在交叉项情况下,分配了align+1的参数长度。why?
      // 因为AdaGradEntry复用一个w向量来保存一次项和二次项参数,第一维一次项参数,后面是二次项。
      // 这个参数长度会在loss.h里Data::Load()加载二次项参数时用到(model_siz[i] == align + 1)
      val.Resize(V.dim*V.field_number + 1);
virtual void ProcessMinibatch(const Minibatch& mb, const Workload& wl) {
  pull_w_opt.callback = [this, data, feaid, val, val_siz, fieldidx, wl]() {

可以看出每个mini-batch只会拉取该mini-batch样本所用到的特征feaid和参数val(只是整个模型一个子集,可以单机内存里运行)。

如何辨别仅有一次项的参数呢? 在Handle::Pull(FeaID key, ...) 里判断这个参数id是否属于仅有一次项,是则是只返回一次项。

learn/ffm/loss.h

加载mini-batch的数据和模型。对输入分两次加载,分别到wV

Loss(const RowBlock<unsigned>& data,
       const std::vector<T>& model,
       const std::vector<int>& model_siz,
       const std::vector<unsigned>& field_idx,
       const Config& conf) {
    // init w
    w.Load(0, data, model, model_siz, field_idx, 0);
    // init V
    V.Load(cf.dim(), data, model, model_siz, field_idx, align);

【注意】AdaGradEntry复用一个w数组(即AdaGradEntry::w,由AdaGradHandle::Pull(..)从S发送到W)来保存一次项和二次项参数。但在loss.h从S获取参数时候,Data::Load(data, model,...)会从输入modeldata里分别取出 一次项参数+所有特征 保存到w,取出 二次项参数+对应特征 保存到V,并且所有参数会重新映射到从0开始在weight数组里依次存储。

  /// \brief store data and model w (dim==0) and V (dim >= 1)
  struct Data {
    /// \brief get data and model
    void Load(int d,  // dim
              const RowBlock<unsigned>& data, // mini-batch数据
              const std::vector<T>& model, // 需要的模型参数
              const std::vector<int>& model_siz,  // 各参数的长度
              const std::vector<unsigned>& field_idx,
              int align_) { // align_ = field_size * dim
    std::vector<unsigned> col_map;
    if (dim == 0) { // w
        // 一次项参数,参数个数即等于model_siz.size()
        // 如果 model_siz[i] == 0,说明该参数不存在,设置pos[i]=-1,Save时会跳过;
        // 否则 weight[i] = model[p]; pos[i] = p; p是该一次项在实际model里的offset。
    } else { // V
        col_map.resize(model_siz.size());
        unsigned k = 0, p = 0;
        for (size_t i = 0; i < model_siz.size(); ++i) {
          // 需要判断 model_siz[i] == align + 1,才包含二次项参数
          if (model_siz[i] == align + 1) {
            // 跳过第一维(一次项参数),才是二次项在model中的offset
            pos.push_back(p+1);  // skip the first dim
            // 重新映射model中二次项参数,比如 model_size = [1, 5, 0, 5] 第2、3参数包含二次项,
            // 则 col_map = [0, 1, 0, 2],其中 i 代表feature index(remap后),k代表新的feature index(压缩了)。
            // 用于重新生成X。
            col_map[i] = ++ k;
          }
          p += model_siz[i];
        }
    }
    int dim;
    int align; // field_num*dim
    RowBlock<unsigned> X;  // 本次mini-batch的训练样本
    std::vector<T> weight;  // 需要的参数(仅包含一次项或二次项),长度= pos.size() * align
    std::vector<unsigned> pos; // 搭配weight使用,这些参数在原模型model中的offset
    std::vector<unsigned> final_field_idx;  // 借助col_map重新压缩的feature_id -> field_id映射

    T dropout = 0; // config.proto
    T grad_clipping = 0;  // config.proto
    T grad_normalization = 0; // config.proto
   private:
    std::vector<T> val_;  // X.value
    std::vector<size_t> os;  // X.offset
    std::vector<unsigned> idx;  // X.index
  };
  Data w, V;  // 一次项参数和交叉项参数分开处理

由于在async_sgd.h里调用loss.CalcGrad(val)之前,先调用了loss.Evaluate(&prog),所以实际计算梯度的时候,复用了py_数组,其中已经保存了Evaluate()中预估的y值。

  void CalcGrad(std::vector<T>* grad) {
    // p = ... (reuse py_)
    CHECK_EQ(py_.size(), w.X.size) << "call *evaluate* first";
#pragma omp parallel for num_threads(nt_)
    for (size_t i = 0; i < py_.size(); ++i) {
      T y = w.X.label[i] > 0 ? 1 : -1;
      py_[i] = - y / ( 1 + exp ( y * py_[i] ));
    }

    // grad_w = ...
    SpMV::TransTimes(w.X, py_, &w.weight, nt_);
    w.Save(grad);
    // 以上计算一次项参数梯度,也可以参考learn/linear/loss.h

    // 然后计算二次项参数梯度,公式如下。

对二次项参数求偏导,假设 第$i$维特征和第$j$维特征(属于$f_j$域)交叉隐向量$\v$中第$m$项$\v_m$ 偏导:

learn/linear/loss.h

template <typename V>
class LogitLoss : public BinClassLoss<V> {
    virtual void CalcGrad(std::vector<V>* grad) {
        std::vector<V> dual(data_.size);
        for (size_t i = 0; i < data_.size; ++i) {
            V y = data_.label[i] > 0 ? 1 : -1;
            dual[i] = - y / ( 1 + exp ( y * Xw_[i] ));    // part1: py = -y * sigmoid(- y * Xw)
        }
        SpMV::TransTimes(data_, dual, grad, nt_);  // part2: g = X^T * py
    }

logloss: $ \ell = \log \left( 1 + \exp(-y \langle \x, \w\rangle) \right)$

对logloss求导,使用链式法则,可得:$g = \nabla_\w \ell = \left(\sigma(y\w^T \x) -1\right) y\x = -y\sigma(-y\w^T\x) \cdot \x$

其中,Xw_ScalarLoss::Init()函数里生成。

参考

  • https://github.com/dmlc/wormhole