Wormhole源码阅读

[TOC]

$\newcommand{\w}{\mathbf{w}} \newcommand{\x}{\mathbf{x}} \newcommand{\v}{\mathbf{v}} \newcommand{\T}{\mathsf{T}}$

核心代码

learn/solver/minibatch_solver.h + iter_solver.h

实现了H/W/S三个角色

MinibatchScheduler : IterScheduler : DataParScheduler : ps::App
MinibatchServer = IterServer : ps::App
MinibatchWorker : IterWorker : DataParWorker : ps::App

wormhole整个train或test均由H驱动,所有任务由WorkloadPool维护,MinibatchScheduler::Run() 是一个runloop,不断把Workload分配给各个空闲的W计算;W从S拉取需要的模型参数,并计算梯度,再去更新S的模型。

MinibatchScheduler::Iterate() 一次pass里的迭代,调用StartDispatch()激活各个W。

DataParScheduler::StartDispatch()

  1. 匹配输入文件,调用WorkloadPool::Add()初始化任务列表
  2. 给所有W发送一个空的workload(确认W都在?)

MinibatchScheduler::ProcessResponse() 不断分配workload给W(根据node id)

相关类:

  • MatchFile 根据输入pattern,正则匹配出相应的文件列表
  • Workload 文件列表,即H派发给W的具体内容,可以序列化到Stream里(Load/Save)
  • WorkloadPool 维护和管理Workload,记录node和已分配Workload的映射关系(shuffle TODO)

scheduler会根据每次任务的平均耗时来判断某workload是否失败,如果失败会把该workload分发到别的worker继续执行。


server相关的两个重要数据结构:

  • AdaGradEntry 模型参数 w, V
  • AdaGradHandler 模型更新Push、模型发送 Pull、超参数配置

AdagradServer() 构造函数里从配置文件初始化 超参数 和 KVStore。

using Server = ps::OnlineServer<float, AdaGradEntry, AdaGradHandle>;
Server s(h);
server_ = s.server();

worker接收workload分配的mini-batch (dmlc::RawBlock),计算梯度。

AsgdWorker::ProcessMinibatch() 一次实际的mini-batch计算,根据workload类型predict/train。

learn/base/minibatch_iter.h

MinibatchIter 接受文件uri,调用Next()读取RowBlock,调用Value()获取之作为一个mini-batch。

根据不同格式做解析,参考data.proto

  • LibSVMParser
  • CriteoParser

learn/base/workload.h

Workload::Serializable 是Scheduler分配给Worker的任务,可序列化的,其中包括:

  • data_pass 当前的pass,从1开始
  • File列表,其中包括待处理的文件名、格式、分片数、处理分片

learn/base/localizer.h

LocalizerRowBlock数据(包含的feature index并不连续)映射到连续索引(从0开始)。

template<typename C = unsigned>
void Localize(const RowBlock<I>& blk,
              data::RowBlockContainer<unsigned> *localized,
              std::vector<I>* uniq_idx = NULL, // unique原始feat_id,升序排列
              std::vector<C>* idx_frq = NULL, // 各feature的出现次数,和uniq_idx顺序一致
              std::vector<unsigned>* field_idx = NULL, // 各feature对应的field_id,和uniq_idx顺序一致
              uint32_t field_bit = 8,
              bool last_bit = false) {
  std::vector<I>* uidx = uniq_idx == NULL ? new std::vector<I>() : uniq_idx;
  CountUniqIndex<C>(blk, uidx, idx_frq, field_idx, field_bit, last_bit);
  // 重新映射blk到localized,新的feature id从1连续递增
  RemapIndex(blk, *uidx, localized);
  if (uniq_idx == NULL) delete uidx;
  Clear();
}

std::vector<Pair> pair_;  // k原始feat_id;i在RowBlock中的下标

这里的关键思路是,只要维护uniq_idx即可,从S拉取相应的参数;然后由于新映射后的RowBlock里feature id和uniq_idx的顺序是完全对应的,所以具体计算梯度时候无需再关心原始的feature id。

learn/ffm/async_sgd.h

struct AdaGradEntry {  // 对应一个维度的参数(一次项1个,二次项参数field*dim个)

  /// #appearence of this feature in the data
  unsigned fea_cnt = 0;
  /// length of w. if size == 1, then using w itself to store the value to save
  /// memory and avoid unnecessary new (see w_0())
  /// 很tricky,用float*(8字节)一半来保存实际的float(4字节),导出模型的时候要注意!
  int size = 1;
  bool first_order_only = false;
  /// w and V
  float *w = NULL;
  /// square root of the cumulative gradient
  float *sqc_grad = NULL;

为什么sqc_grad 要分配size+1的空间?额外保存z。 一次项用ftrl求解,sqc_grad[0]用来保存$\sqrt{n}$,sqc_grad[1]用来保存z; 二次项用adagrad求解,sqc_grad[2]开始的位置用来保存$\sqrt{G}$。

struct AdaGradHandle : public ISGDHandle {
  inline void Resize(AdaGradEntry& val) {
    // 哪些情况会给参数分配空间呢?需要计算二次项时。
    if (val.fea_cnt > V.thr  // 该特征出现次数大于threshold
        && val.size < V.dim*V.field_number + 1 // 原来空间小于align+1
        && (!l1_shrk       // 没有w=0则V=0的约束(即一次项为0,二次项也为0)
            || val.w_0() != 0)  // 或w!=0
        && (!V.lastBitForFirstOrder  // 取反:feature id最后一位bit作为一次项标示
            || !val.first_order_only)) {  // 或取反:该参数只用作一次项计算
      // 又有点tricky,在交叉项情况下,分配了align+1的参数长度。why?
      // 因为AdaGradEntry复用一个w向量来保存一次项和二次项参数,第一维一次项参数,后面是二次项。
      // 这个参数长度会在loss.h里Data::Load()加载二次项参数时用到(model_siz[i] == align + 1)
      val.Resize(V.dim*V.field_number + 1);
virtual void ProcessMinibatch(const Minibatch& mb, const Workload& wl) {
  pull_w_opt.callback = [this, data, feaid, val, val_siz, fieldidx, wl]() {

可以看出每个mini-batch只会拉取该mini-batch样本所用到的特征feaid和参数val(只是整个模型一个子集,可以单机内存里运行)。

如何辨别仅有一次项的参数呢? 在Handle::Pull(FeaID key, ...) 里判断这个参数id是否属于仅有一次项,是则是只返回一次项。

learn/ffm/loss.h

加载mini-batch的数据和模型。对输入分两次加载,分别到wV

Loss(const RowBlock<unsigned>& data,
       const std::vector<T>& model,
       const std::vector<int>& model_siz,
       const std::vector<unsigned>& field_idx,
       const Config& conf) {
    // init w
    w.Load(0, data, model, model_siz, field_idx, 0);
    // init V
    V.Load(cf.dim(), data, model, model_siz, field_idx, align);

【注意】AdaGradEntry复用一个w数组(即AdaGradEntry::w,由AdaGradHandle::Pull(..)从S发送到W)来保存一次项和二次项参数。但在loss.h从S获取参数时候,Data::Load(data, model,...)会从输入modeldata里分别取出 一次项参数+所有特征 保存到w,取出 二次项参数+对应特征 保存到V,并且所有参数会重新映射到从0开始在weight数组里依次存储。

/// \brief store data and model w (dim==0) and V (dim >= 1)
struct Data {
  /// \brief get data and model
  void Load(int d,  // dim
            const RowBlock<unsigned>& data, // mini-batch数据
            const std::vector<T>& model, // 需要的模型参数
            const std::vector<int>& model_siz,  // 各参数的长度
            const std::vector<unsigned>& field_idx,
            int align_) { // align_ = field_size * dim
  std::vector<unsigned> col_map;
  if (dim == 0) { // w
      // 一次项参数,参数个数即等于model_siz.size()
      // 如果 model_siz[i] == 0,说明该参数不存在,设置pos[i]=-1,Save时会跳过;
      // 否则 weight[i] = model[p]; pos[i] = p; p是该一次项在实际model里的offset。
  } else { // V
      col_map.resize(model_siz.size());
      unsigned k = 0, p = 0;
      for (size_t i = 0; i < model_siz.size(); ++i) {
        // 需要判断 model_siz[i] == align + 1,才包含二次项参数
        if (model_siz[i] == align + 1) {
          // 跳过第一维(一次项参数),才是二次项在model中的offset
          pos.push_back(p+1);  // skip the first dim
          // 重新映射model中二次项参数,比如 model_size = [1, 5, 0, 5] 第2、3参数包含二次项,
          // 则 col_map = [0, 1, 0, 2],其中 i 代表feature index(remap后),k代表新的feature index(压缩了)。
          // 用于重新生成X。
          col_map[i] = ++ k;
        }
        p += model_siz[i];
      }
  }
  int dim;
  int align; // field_num*dim
  RowBlock<unsigned> X;  // 本次mini-batch的训练样本
  std::vector<T> weight;  // 需要的参数(仅包含一次项或二次项),长度= pos.size() * align
  std::vector<unsigned> pos; // 搭配weight使用,这些参数在原模型model中的offset
  std::vector<unsigned> final_field_idx;  // 借助col_map重新压缩的feature_id -> field_id映射

  T dropout = 0; // config.proto
  T grad_clipping = 0;  // config.proto
  T grad_normalization = 0; // config.proto
 private:
  std::vector<T> val_;  // X.value
  std::vector<size_t> os;  // X.offset
  std::vector<unsigned> idx;  // X.index
};
Data w, V;  // 一次项参数和交叉项参数分开处理

由于在async_sgd.h里调用loss.CalcGrad(val)之前,先调用了loss.Evaluate(&prog),所以实际计算梯度的时候,复用了py_数组,其中已经保存了Evaluate()中预估的y值。

void CalcGrad(std::vector<T>* grad) {
  // p = ... (reuse py_)
  CHECK_EQ(py_.size(), w.X.size) << "call *evaluate* first";
#pragma omp parallel for num_threads(nt_)
  for (size_t i = 0; i < py_.size(); ++i) {
    T y = w.X.label[i] > 0 ? 1 : -1;
    py_[i] = - y / ( 1 + exp ( y * py_[i] ));
  }

  // grad_w = ...
  SpMV::TransTimes(w.X, py_, &w.weight, nt_);
  w.Save(grad);
  // 以上计算一次项参数梯度,也可以参考learn/linear/loss.h

  // 然后计算二次项参数梯度,公式如下。

对二次项参数求偏导,假设 第$i$维特征和第$j$维特征(属于$f_j$域)交叉隐向量$\v$中第$m$项$\v_m$ 偏导:

learn/linear/loss.h

template <typename V>
class LogitLoss : public BinClassLoss<V> {
    virtual void CalcGrad(std::vector<V>* grad) {
        std::vector<V> dual(data_.size);
        for (size_t i = 0; i < data_.size; ++i) {
            V y = data_.label[i] > 0 ? 1 : -1;
            dual[i] = - y / ( 1 + exp ( y * Xw_[i] ));    // part1: py = -y * sigmoid(- y * Xw)
        }
        SpMV::TransTimes(data_, dual, grad, nt_);  // part2: g = X^T * py
    }

logloss: $ \ell = \log \left( 1 + \exp(-y \langle \x, \w\rangle) \right)$

对logloss求导,使用链式法则,可得:$g = \nabla_\w \ell = \left(\sigma(y\w^T \x) -1\right) y\x = -y\sigma(-y\w^T\x) \cdot \x$

其中,Xw_ScalarLoss::Init()函数里生成。

运行逻辑

运行代码在 tracker 目录,以最简单的 dmlc_local.py 为例。

tracker/dmlc_local.py -n 1 -s 1 bin/ffm.dmlc ffm.conf

实际执行文件只有一个,即 bin/ffm.dmlc,只是通过不同的环境变量扮演三种不同的角色。

这里假设只有一个worker,一个server,当然scheduler始终只有一个。

worker (W)

env['DMLC_TASK_ID'] = 0
env['DMLC_ROLE'] = 'worker'
env['DMLC_NUM_ATTEMPT'] = $nrep

执行逻辑在 dmlc_local.py:exec_cmd。实际执行的是一段bash循环,当执行完毕后返回254退出循环:

nrep=0
rc=254
while [ $rc -eq 254 ];
do
    export DMLC_NUM_ATTEMPT=$nrep
    ./bin/ffm.dmlc
    rc=$?;
    nrep=$((nrep+1));
done

server (S)

env['DMLC_TASK_ID'] = 1
env['DMLC_ROLE'] = 'worker'
env['DMLC_NUM_ATTEMPT'] = $nrep

执行逻辑同worker。

scheduler (H)

env['DMLC_ROLE'] = 'scheduler'
env['DMLC_PS_ROOT_URI'] = '127.0.0.1'
env['DMLC_PS_ROOT_PORT'] = '8401'
env['DMLC_NUM_WORKER'] = 1
env['DMLC_NUM_SERVER'] = 1

执行逻辑在tracker.py:PSTracker,直接执行 bin/ffm.dmlc 在指定的ip/port上伺服。

参考

  • https://github.com/dmlc/wormhole