Spark MLlib LDA 源代码解析-程序员宅基地

1、Spark MLlib LDA源代码解析

Spark MLlib LDA 应该算是比較难理解的,当中涉及到大量的概率与统计的相关知识,并且还涉及到了Spark GraphX图计算方面的知识。要想明确当中的原理得要下一番功夫。

LDA源代码解析前的基础知识:

1)LDA主题模型的理论知识

參照:LDA数学八卦

2)SparkGraphX 基础知识

http://blog.csdn.net/sunbow0/article/details/47612291

http://blog.csdn.net/sunbow0/article/details/47610481

1.1 LDA源代码解析 

class LDA private (
  private var k: Int,
  private var maxIterations: Int,
  private var docConcentration: Double,
  private var topicConcentration: Double,
  private var seed: Long,
  private var checkpointInterval: Int,
  private var ldaOptimizer: LDAOptimizer) extends Logging {
  /**
   * k: 主题数量
   * maxIterations: 迭代次数
   * docConcentration: 超參alpha
   * topicConcentration: 超參beta
   * seed: 随机种子
   * checkpointInterval: 检查间隔
   * ldaOptimizer: 优化方法 "em" "online"
   *
   */
  def this() = this(k = 10, maxIterations = 20, docConcentration = -1, topicConcentration = -1,
    seed = Utils.random.nextLong(), checkpointInterval = 10, ldaOptimizer = new EMLDAOptimizer)

  /**
   * Number of topics to infer.  I.e., the number of soft cluster centers.
   */
  // 获取  主题数量
  def getK: Int = k

  /**
   * Number of topics to infer.  I.e., the number of soft cluster centers.
   * (default = 10)
   */
  // 设置  主题数量
  def setK(k: Int): this.type = {
    require(k > 0, s"LDA k (number of clusters) must be > 0, but was set to $k")
    this.k = k
    this
  }

  /**
   * Concentration parameter (commonly named "alpha") for the prior placed on documents'
   * distributions over topics ("theta").
   *
   * This is the parameter to a symmetric Dirichlet distribution.
   */
  // 获取  超參alpha
  def getDocConcentration: Double = this.docConcentration

  /**
   * Concentration parameter (commonly named "alpha") for the prior placed on documents'
   * distributions over topics ("theta").
   *
   * This is the parameter to a symmetric Dirichlet distribution, where larger values
   * mean more smoothing (more regularization).
   *
   * If set to -1, then docConcentration is set automatically.
   *  (default = -1 = automatic)
   *
   * Optimizer-specific parameter settings:
   *  - EM
   *     - Value should be > 1.0
   *     - default = (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows
   *       Asuncion et al. (2009), who recommend a +1 adjustment for EM.
   *  - Online
   *     - Value should be >= 0
   *     - default = (1.0 / k), following the implementation from
   *       [[https://github.com/Blei-Lab/onlineldavb]].
   */
  // 设置  超參alpha
  def setDocConcentration(docConcentration: Double): this.type = {
    this.docConcentration = docConcentration
    this
  }

  // 获取  超參alpha
  /** Alias for [[getDocConcentration]] */
  def getAlpha: Double = getDocConcentration

  // 设置  超參alpha
  /** Alias for [[setDocConcentration()]] */
  def setAlpha(alpha: Double): this.type = setDocConcentration(alpha)

  /**
   * Concentration parameter (commonly named "beta" or "eta") for the prior placed on topics'
   * distributions over terms.
   *
   * This is the parameter to a symmetric Dirichlet distribution.
   *
   * Note: The topics' distributions over terms are called "beta" in the original LDA paper
   * by Blei et al., but are called "phi" in many later papers such as Asuncion et al., 2009.
   */
  // 获取  超參beta
  def getTopicConcentration: Double = this.topicConcentration

  /**
   * Concentration parameter (commonly named "beta" or "eta") for the prior placed on topics'
   * distributions over terms.
   *
   * This is the parameter to a symmetric Dirichlet distribution.
   *
   * Note: The topics' distributions over terms are called "beta" in the original LDA paper
   * by Blei et al., but are called "phi" in many later papers such as Asuncion et al., 2009.
   *
   * If set to -1, then topicConcentration is set automatically.
   *  (default = -1 = automatic)
   *
   * Optimizer-specific parameter settings:
   *  - EM
   *     - Value should be > 1.0
   *     - default = 0.1 + 1, where 0.1 gives a small amount of smoothing and +1 follows
   *       Asuncion et al. (2009), who recommend a +1 adjustment for EM.
   *  - Online
   *     - Value should be >= 0
   *     - default = (1.0 / k), following the implementation from
   *       [[https://github.com/Blei-Lab/onlineldavb]].
   */
  // 设置  超參beta
  def setTopicConcentration(topicConcentration: Double): this.type = {
    this.topicConcentration = topicConcentration
    this
  }

  // 获取  超參beta
  /** Alias for [[getTopicConcentration]] */
  def getBeta: Double = getTopicConcentration

  // 设置  超參beta
  /** Alias for [[setTopicConcentration()]] */
  def setBeta(beta: Double): this.type = setTopicConcentration(beta)

  /**
   * Maximum number of iterations for learning.
   */
  // 获取  迭代次数
  def getMaxIterations: Int = maxIterations

  /**
   * Maximum number of iterations for learning.
   * (default = 20)
   */
  // 获取  迭代次数
  def setMaxIterations(maxIterations: Int): this.type = {
    this.maxIterations = maxIterations
    this
  }

  /** Random seed */
  // 获取  随机种子
  def getSeed: Long = seed

  /** Random seed */
  // 设置  随机种子
  def setSeed(seed: Long): this.type = {
    this.seed = seed
    this
  }

  /**
   * Period (in iterations) between checkpoints.
   */
  // 检查间隔
  def getCheckpointInterval: Int = checkpointInterval

  /**
   * Period (in iterations) between checkpoints (default = 10). Checkpointing helps with recovery
   * (when nodes fail). It also helps with eliminating temporary shuffle files on disk, which can be
   * important when LDA is run for many iterations. If the checkpoint directory is not set in
   * [[org.apache.spark.SparkContext]], this setting is ignored.
   *
   * @see [[org.apache.spark.SparkContext#setCheckpointDir]]
   */
  def setCheckpointInterval(checkpointInterval: Int): this.type = {
    this.checkpointInterval = checkpointInterval
    this
  }

  /**
   * :: DeveloperApi ::
   *
   * LDAOptimizer used to perform the actual calculation
   */
  @DeveloperApi
  def getOptimizer: LDAOptimizer = ldaOptimizer

  /**
   * :: DeveloperApi ::
   *
   * LDAOptimizer used to perform the actual calculation (default = EMLDAOptimizer)
   */
  @DeveloperApi
  def setOptimizer(optimizer: LDAOptimizer): this.type = {
    this.ldaOptimizer = optimizer
    this
  }

  /**
   * Set the LDAOptimizer used to perform the actual calculation by algorithm name.
   * Currently "em", "online" are supported.
   */
  // 优化方法
  def setOptimizer(optimizerName: String): this.type = {
    this.ldaOptimizer =
      optimizerName.toLowerCase match {
        case "em" => new EMLDAOptimizer
        case "online" => new OnlineLDAOptimizer
        case other =>
          throw new IllegalArgumentException(s"Only em, online are supported but got $other.")
      }
    this
  }

  /**
   * Learn an LDA model using the given dataset.
   *
   * @param documents  RDD of documents, which are term (word) count vectors paired with IDs.
   *                   The term count vectors are "bags of words" with a fixed-size vocabulary
   *                   (where the vocabulary size is the length of the vector).
   *                   Document IDs must be unique and >= 0.
   * @return  Inferred LDA model
   */
  // LDA 模型開始训练。输入数据是文档的词向量RDD[(Long, Vector)]
  // ldaOptimizer.initialize(documents, this) 是初始化ldaOptimizer
  // state.next()。ldaOptimizer迭代下一步
  // state.getLDAModel 模型生成
  def run(documents: RDD[(Long, Vector)]): LDAModel = {
    val state = ldaOptimizer.initialize(documents, this)
    var iter = 0
    val iterationTimes = Array.fill[Double](maxIterations)(0)
    while (iter < maxIterations) {
      val start = System.nanoTime()
      state.next()
      val elapsedSeconds = (System.nanoTime() - start) / 1e9
      iterationTimes(iter) = elapsedSeconds
      iter += 1
    }
    state.getLDAModel(iterationTimes)
  }

  /** Java-friendly version of [[run()]] */
  def run(documents: JavaPairRDD[java.lang.Long, Vector]): LDAModel = {
    run(documents.rdd.asInstanceOf[RDD[(Long, Vector)]])
  }
}

private[clustering] object LDA {

  /*
    DEVELOPERS NOTE:

    This implementation uses GraphX, where the graph is bipartite with 2 types of vertices:
     - Document vertices
        - indexed with unique indices >= 0
        - Store vectors of length k (# topics).
     - Term vertices
        - indexed {-1, -2, ..., -vocabSize}
        - Store vectors of length k (# topics).
     - Edges correspond to terms appearing in documents.
        - Edges are directed Document -> Term.
        - Edges are partitioned by documents.

    Info on EM implementation.
     - We follow Section 2.2 from Asuncion et al., 2009.  We use some of their notation.
     - In this implementation, there is one edge for every unique term appearing in a document,
       i.e., for every unique (document, term) pair.
     - Notation:
        - N_{wkj} = count of tokens of term w currently assigned to topic k in document j
        - N_{*} where * is missing a subscript w/k/j is the count summed over missing subscript(s)
        - gamma_{wjk} = P(z_i = k | x_i = w, d_i = j),
          the probability of term x_i in document d_i having topic z_i.
     - Data graph
        - Document vertices store N_{kj}
        - Term vertices store N_{wk}
        - Edges store N_{wj}.
        - Global data N_k
     - Algorithm
        - Initial state:
           - Document and term vertices store random counts N_{wk}, N_{kj}.
        - E-step: For each (document,term) pair i, compute P(z_i | x_i, d_i).
           - Aggregate N_k from term vertices.
           - Compute gamma_{wjk} for each possible topic k, from each triplet.
             using inputs N_{wk}, N_{kj}, N_k.
        - M-step: Compute sufficient statistics for hidden parameters phi and theta
          (counts N_{wk}, N_{kj}, N_k).
           - Document update:
              - N_{kj} <- sum_w N_{wj} gamma_{wjk}
              - N_j <- sum_k N_{kj}  (only needed to output predictions)
           - Term update:
              - N_{wk} <- sum_j N_{wj} gamma_{wjk}
              - N_k <- sum_w N_{wk}

    TODO: Add simplex constraints to allow alpha in (0,1).
          See: Vorontsov and Potapenko. "Tutorial on Probabilistic Topic Modeling : Additive
               Regularization for Stochastic Matrix Factorization." 2014.
   */

  /**
   * Vector over topics (length k) of token counts.
   * The meaning of these counts can vary, and it may or may not be normalized to be a distribution.
   */
  /**
   * 自己定义类别及方法
   * TopicCounts 主题分布统计
   * TokenCount 词汇统计
   * term2index index2term 顶点与词汇id 转换 
   * computePTopic 计算主题分布
   * 
   */
  private[clustering]type TopicCounts = BDV[Double]

  private[clustering]type TokenCount = Double

  /** Term vertex IDs are {-1, -2, ..., -vocabSize} */
  private[clustering] def term2index(term: Int): Long = -(1 + term.toLong)

  private[clustering] def index2term(termIndex: Long): Int = -(1 + termIndex).toInt

  private[clustering] def isDocumentVertex(v: (VertexId, _)): Boolean = v._1 >= 0

  private[clustering] def isTermVertex(v: (VertexId, _)): Boolean = v._1 < 0

  /**
   * Compute gamma_{wjk}, a distribution over topics k.
   */
  // docTopicCounts文章的主题分布, termTopicCounts词汇的主题分布, totalTopicCounts有词的主题分布概率和
  // vocabSize词向量长度,eta alpha 超參
  private[clustering] def computePTopic(
    docTopicCounts: TopicCounts,
    termTopicCounts: TopicCounts,
    totalTopicCounts: TopicCounts,
    vocabSize: Int,
    eta: Double,
    alpha: Double): TopicCounts = {
    val K = docTopicCounts.length
    val N_j = docTopicCounts.data
    val N_w = termTopicCounts.data
    val N = totalTopicCounts.data
    val eta1 = eta - 1.0
    val alpha1 = alpha - 1.0
    val Weta1 = vocabSize * eta1
    var sum = 0.0
    val gamma_wj = new Array[Double](K)
    var k = 0
    while (k < K) {
      val gamma_wjk = (N_w(k) + eta1) * (N_j(k) + alpha1) / (N(k) + Weta1)
      gamma_wj(k) = gamma_wjk
      sum += gamma_wjk
      k += 1
    }
    // normalize
    BDV(gamma_wj) /= sum
  }
}

1.2 LDAModel源代码解析 

abstract class LDAModel private[clustering] {

  /** Number of topics */
  // 主题数
  def k: Int

  /** Vocabulary size (number of terms or terms in the vocabulary) */
  // 词向量长度
  def vocabSize: Int

  /**
   * Inferred topics, where each topic is represented by a distribution over terms.
   * This is a matrix of size vocabSize x k, where each column is a topic.
   * No guarantees are given about the ordering of the topics.
   */
  // 主题分布矩阵
  def topicsMatrix: Matrix

  /**
   * Return the topics described by weighted terms.
   *
   * This limits the number of terms per topic.
   * This is approximate; it may not return exactly the top-weighted terms for each topic.
   * To get a more precise set of top terms, increase maxTermsPerTopic.
   *
   * @param maxTermsPerTopic  Maximum number of terms to collect for each topic.
   * @return  Array over topics.  Each topic is represented as a pair of matching arrays:
   *          (term indices, term weights in topic).
   *          Each topic's terms are sorted in order of decreasing weight.
   */
  // 每个主题的词权重排序
  def describeTopics(maxTermsPerTopic: Int): Array[(Array[Int], Array[Double])]

  /**
   * Return the topics described by weighted terms.
   *
   * WARNING: If vocabSize and k are large, this can return a large object!
   *
   * @return  Array over topics.  Each topic is represented as a pair of matching arrays:
   *          (term indices, term weights in topic).
   *          Each topic's terms are sorted in order of decreasing weight.
   */
  def describeTopics(): Array[(Array[Int], Array[Double])] = describeTopics(vocabSize)

  /* TODO (once LDA can be trained with Strings or given a dictionary)
   * Return the topics described by weighted terms.
   *
   * This is similar to [[describeTopics()]] but returns String values for terms.
   * If this model was trained using Strings or was given a dictionary, then this method returns
   * terms as text.  Otherwise, this method returns terms as term indices.
   *
   * This limits the number of terms per topic.
   * This is approximate; it may not return exactly the top-weighted terms for each topic.
   * To get a more precise set of top terms, increase maxTermsPerTopic.
   *
   * @param maxTermsPerTopic  Maximum number of terms to collect for each topic.
   * @return  Array over topics.  Each topic is represented as a pair of matching arrays:
   *          (terms, term weights in topic) where terms are either the actual term text
   *          (if available) or the term indices.
   *          Each topic's terms are sorted in order of decreasing weight.
   */
  // def describeTopicsAsStrings(maxTermsPerTopic: Int): Array[(Array[Double], Array[String])]

  /* TODO (once LDA can be trained with Strings or given a dictionary)
   * Return the topics described by weighted terms.
   *
   * This is similar to [[describeTopics()]] but returns String values for terms.
   * If this model was trained using Strings or was given a dictionary, then this method returns
   * terms as text.  Otherwise, this method returns terms as term indices.
   *
   * WARNING: If vocabSize and k are large, this can return a large object!
   *
   * @return  Array over topics.  Each topic is represented as a pair of matching arrays:
   *          (terms, term weights in topic) where terms are either the actual term text
   *          (if available) or the term indices.
   *          Each topic's terms are sorted in order of decreasing weight.
   */
  // def describeTopicsAsStrings(): Array[(Array[Double], Array[String])] =
  //  describeTopicsAsStrings(vocabSize)

  /* TODO
   * Compute the log likelihood of the observed tokens, given the current parameter estimates:
   *  log P(docs | topics, topic distributions for docs, alpha, eta)
   *
   * Note:
   *  - This excludes the prior.
   *  - Even with the prior, this is NOT the same as the data log likelihood given the
   *    hyperparameters.
   *
   * @param documents  RDD of documents, which are term (word) count vectors paired with IDs.
   *                   The term count vectors are "bags of words" with a fixed-size vocabulary
   *                   (where the vocabulary size is the length of the vector).
   *                   This must use the same vocabulary (ordering of term counts) as in training.
   *                   Document IDs must be unique and >= 0.
   * @return  Estimated log likelihood of the data under this model
   */
  // def logLikelihood(documents: RDD[(Long, Vector)]): Double

  /* TODO
   * Compute the estimated topic distribution for each document.
   * This is often called 'theta' in the literature.
   *
   * @param documents  RDD of documents, which are term (word) count vectors paired with IDs.
   *                   The term count vectors are "bags of words" with a fixed-size vocabulary
   *                   (where the vocabulary size is the length of the vector).
   *                   This must use the same vocabulary (ordering of term counts) as in training.
   *                   Document IDs must be unique and >= 0.
   * @return  Estimated topic distribution for each document.
   *          The returned RDD may be zipped with the given RDD, where each returned vector
   *          is a multinomial distribution over topics.
   */
  // def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)]

}

/**
 * :: Experimental ::
 *
 * Local LDA model.
 * This model stores only the inferred topics.
 * It may be used for computing topics for new documents, but it may give less accurate answers
 * than the [[DistributedLDAModel]].
 *
 * @param topics Inferred topics (vocabSize x k matrix).
 */
// Local模式的LDA模型
@Experimental
class LocalLDAModel private[clustering] (
    private val topics: Matrix) extends LDAModel with Serializable {

  override def k: Int = topics.numCols

  override def vocabSize: Int = topics.numRows

  override def topicsMatrix: Matrix = topics

  override def describeTopics(maxTermsPerTopic: Int): Array[(Array[Int], Array[Double])] = {
    val brzTopics = topics.toBreeze.toDenseMatrix
    Range(0, k).map { topicIndex =>
      val topic = normalize(brzTopics(::, topicIndex), 1.0)
      val (termWeights, terms) =
        topic.toArray.zipWithIndex.sortBy(-_._1).take(maxTermsPerTopic).unzip
      (terms.toArray, termWeights.toArray)
    }.toArray
  }

  // TODO
  // override def logLikelihood(documents: RDD[(Long, Vector)]): Double = ???

// TODO: // override def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = ??

? } /** * :: Experimental :: * * Distributed LDA model. * This model stores the inferred topics, the full training dataset, and the topic distributions. * When computing topics for new documents, it may give more accurate answers * than the [[LocalLDAModel]]. */ // 分布式的LDA模型 @Experimental class DistributedLDAModel private ( private val graph: Graph[LDA.TopicCounts, LDA.TokenCount], private val globalTopicTotals: LDA.TopicCounts, val k: Int, val vocabSize: Int, private val docConcentration: Double, private val topicConcentration: Double, private[spark] val iterationTimes: Array[Double]) extends LDAModel { import LDA._ private[clustering] def this(state: EMLDAOptimizer, iterationTimes: Array[Double]) = { this(state.graph, state.globalTopicTotals, state.k, state.vocabSize, state.docConcentration, state.topicConcentration, iterationTimes) } /** * Convert model to a local model. * The local model stores the inferred topics but not the topic distributions for training * documents. */ def toLocal: LocalLDAModel = new LocalLDAModel(topicsMatrix) /** * Inferred topics, where each topic is represented by a distribution over terms. * This is a matrix of size vocabSize x k, where each column is a topic. * No guarantees are given about the ordering of the topics. * * WARNING: This matrix is collected from an RDD. Beware memory usage when vocabSize, k are large. */ // 主题的概率分布矩阵,列代表主题。行代表词典。每一行代表词的主题分布概率 override lazy val topicsMatrix: Matrix = { // Collect row-major topics val termTopicCounts: Array[(Int, TopicCounts)] = graph.vertices.filter(_._1 < 0).map { case (termIndex, cnts) => (index2term(termIndex), cnts) }.collect() // Convert to Matrix val brzTopics = BDM.zeros[Double](vocabSize, k) termTopicCounts.foreach { case (term, cnts) => var j = 0 while (j < k) { brzTopics(term, j) = cnts(j) j += 1 } } Matrices.fromBreeze(brzTopics) } // 每个主题的词典权重排序。格式(词汇id(依照权重由大到小排序),词在主题上的权重) override def describeTopics(maxTermsPerTopic: Int): Array[(Array[Int], Array[Double])] = { val numTopics = k // Note: N_k is not needed to find the top terms, but it is needed to normalize weights // to a distribution over terms. val N_k: TopicCounts = globalTopicTotals val topicsInQueues: Array[BoundedPriorityQueue[(Double, Int)]] = graph.vertices.filter(isTermVertex) .mapPartitions { termVertices => // For this partition, collect the most common terms for each topic in queues: // queues(topic) = queue of (term weight, term index). // Term weights are N_{wk} / N_k. val queues = Array.fill(numTopics)(new BoundedPriorityQueue[(Double, Int)](maxTermsPerTopic)) for ((termId, n_wk) <- termVertices) { var topic = 0 while (topic < numTopics) { queues(topic) += (n_wk(topic) / N_k(topic) -> index2term(termId.toInt)) topic += 1 } } Iterator(queues) }.reduce { (q1, q2) => q1.zip(q2).foreach { case (a, b) => a ++= b} q1 } topicsInQueues.map { q => val (termWeights, terms) = q.toArray.sortBy(-_._1).unzip (terms.toArray, termWeights.toArray) } } // TODO // override def logLikelihood(documents: RDD[(Long, Vector)]): Double = ???

/** * Log likelihood of the observed tokens in the training set, * given the current parameter estimates: * log P(docs | topics, topic distributions for docs, alpha, eta) * * Note: * - This excludes the prior; for that, use [[logPrior]]. * - Even with [[logPrior]], this is NOT the same as the data log likelihood given the * hyperparameters. */ // 对数似然log P(docs | topics, topic distributions for docs, alpha, eta) lazy val logLikelihood: Double = { val eta = topicConcentration val alpha = docConcentration assert(eta > 1.0) assert(alpha > 1.0) val N_k = globalTopicTotals val smoothed_N_k: TopicCounts = N_k + (vocabSize * (eta - 1.0)) // Edges: Compute token log probability from phi_{wk}, theta_{kj}. val sendMsg: EdgeContext[TopicCounts, TokenCount, Double] => Unit = (edgeContext) => { val N_wj = edgeContext.attr val smoothed_N_wk: TopicCounts = edgeContext.dstAttr + (eta - 1.0) val smoothed_N_kj: TopicCounts = edgeContext.srcAttr + (alpha - 1.0) val phi_wk: TopicCounts = smoothed_N_wk :/ smoothed_N_k val theta_kj: TopicCounts = normalize(smoothed_N_kj, 1.0) val tokenLogLikelihood = N_wj * math.log(phi_wk.dot(theta_kj)) edgeContext.sendToDst(tokenLogLikelihood) } graph.aggregateMessages[Double](sendMsg, _ + _) .map(_._2).fold(0.0)(_ + _) } /** * Log probability of the current parameter estimate: * log P(topics, topic distributions for docs | alpha, eta) */ // 对数概率log P(topics, topic distributions for docs | alpha, eta) lazy val logPrior: Double = { val eta = topicConcentration val alpha = docConcentration // Term vertices: Compute phi_{wk}. Use to compute prior log probability. // Doc vertex: Compute theta_{kj}. Use to compute prior log probability. val N_k = globalTopicTotals val smoothed_N_k: TopicCounts = N_k + (vocabSize * (eta - 1.0)) val seqOp: (Double, (VertexId, TopicCounts)) => Double = { case (sumPrior: Double, vertex: (VertexId, TopicCounts)) => if (isTermVertex(vertex)) { val N_wk = vertex._2 val smoothed_N_wk: TopicCounts = N_wk + (eta - 1.0) val phi_wk: TopicCounts = smoothed_N_wk :/ smoothed_N_k (eta - 1.0) * brzSum(phi_wk.map(math.log)) } else { val N_kj = vertex._2 val smoothed_N_kj: TopicCounts = N_kj + (alpha - 1.0) val theta_kj: TopicCounts = normalize(smoothed_N_kj, 1.0) (alpha - 1.0) * brzSum(theta_kj.map(math.log)) } } graph.vertices.aggregate(0.0)(seqOp, _ + _) } /** * For each document in the training set, return the distribution over topics for that document * ("theta_doc"). * * @return RDD of (document ID, topic distribution) pairs */ // 返回训练文档的主题分布概率 def topicDistributions: RDD[(Long, Vector)] = { graph.vertices.filter(LDA.isDocumentVertex).map { case (docID, topicCounts) => (docID.toLong, Vectors.fromBreeze(normalize(topicCounts, 1.0))) } } /** Java-friendly version of [[topicDistributions]] */ def javaTopicDistributions: JavaPairRDD[java.lang.Long, Vector] = { JavaPairRDD.fromRDD(topicDistributions.asInstanceOf[RDD[(java.lang.Long, Vector)]]) } // TODO: // override def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = ??

? }

1.3 LDAOptimizer源代码解析 

sealed trait LDAOptimizer {

  /*
    DEVELOPERS NOTE:

    An LDAOptimizer contains an algorithm for LDA and performs the actual computation, which
    stores internal data structure (Graph or Matrix) and other parameters for the algorithm.
    The interface is isolated to improve the extensibility of LDA.
   */

  /**
   * Initializer for the optimizer. LDA passes the common parameters to the optimizer and
   * the internal structure can be initialized properly.
   */
  private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer

  private[clustering] def next(): LDAOptimizer

  private[clustering] def getLDAModel(iterationTimes: Array[Double]): LDAModel
}

/**
 * :: DeveloperApi ::
 *
 * Optimizer for EM algorithm which stores data + parameter graph, plus algorithm parameters.
 *
 * Currently, the underlying implementation uses Expectation-Maximization (EM), implemented
 * according to the Asuncion et al. (2009) paper referenced below.
 *
 * References:
 *  - Original LDA paper (journal version):
 *    Blei, Ng, and Jordan.  "Latent Dirichlet Allocation."  JMLR, 2003.
 *     - This class implements their "smoothed" LDA model.
 *  - Paper which clearly explains several algorithms, including EM:
 *    Asuncion, Welling, Smyth, and Teh.
 *    "On Smoothing and Inference for Topic Models."  UAI, 2009.
 *
 */
@DeveloperApi
final class EMLDAOptimizer extends LDAOptimizer {

  import LDA._

  /**
   * The following fields will only be initialized through the initialize() method
   */
  private[clustering] var graph: Graph[TopicCounts, TokenCount] = null
  private[clustering] var k: Int = 0
  private[clustering] var vocabSize: Int = 0
  private[clustering] var docConcentration: Double = 0
  private[clustering] var topicConcentration: Double = 0
  private[clustering] var checkpointInterval: Int = 10
  private var graphCheckpointer: PeriodicGraphCheckpointer[TopicCounts, TokenCount] = null

  /**
   * Compute bipartite term/doc graph.
   */
  override private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = {

    val docConcentration = lda.getDocConcentration
    val topicConcentration = lda.getTopicConcentration
    val k = lda.getK

    // Note: The restriction > 1.0 may be relaxed in the future (allowing sparse solutions),
    // but values in (0,1) are not yet supported.
    require(docConcentration > 1.0 || docConcentration == -1.0, s"LDA docConcentration must be" +
      s" > 1.0 (or -1 for auto) for EM Optimizer, but was set to $docConcentration")
    require(topicConcentration > 1.0 || topicConcentration == -1.0, s"LDA topicConcentration " +
      s"must be > 1.0 (or -1 for auto) for EM Optimizer, but was set to $topicConcentration")

    this.docConcentration = if (docConcentration == -1) (50.0 / k) + 1.0 else docConcentration
    this.topicConcentration = if (topicConcentration == -1) 1.1 else topicConcentration
    val randomSeed = lda.getSeed

    // For each document, create an edge (Document -> Term) for each unique term in the document.
    // 创建文章与词汇的edge。格式:(文章id,词汇id,词频)。对每一个词向量的文档依照此格式创建edge,当中过滤词频为0的词汇。
    val edges: RDD[Edge[TokenCount]] = docs.flatMap { case (docID: Long, termCounts: Vector) =>
      // Add edges for terms with non-zero counts.
      termCounts.toBreeze.activeIterator.filter(_._2 != 0.0).map { case (term, cnt) =>
        Edge(docID, term2index(term), cnt)
      }
    }

    // Create vertices.
    // Initially, we use random soft assignments of tokens to topics (random gamma).
    // edge.attr 是边的属性,edge.srcId 是边的起点,edge.dstId 是边的终点
    // gamma 是生成主题分布的随机向量
    // 返回格式:(顶点。主题分布随机向量)
    // 每一个词节点存储一些权重值。表示这个词语和哪个主题相关。每篇文章节点存储当前文章讨论主题的预计。

val docTermVertices: RDD[(VertexId, TopicCounts)] = { val verticesTMP: RDD[(VertexId, TopicCounts)] = edges.mapPartitionsWithIndex { case (partIndex, partEdges) => val random = new Random(partIndex + randomSeed) partEdges.flatMap { edge => val gamma = normalize(BDV.fill[Double](k)(random.nextDouble()), 1.0) val sum = gamma * edge.attr Seq((edge.srcId, sum), (edge.dstId, sum)) } } verticesTMP.reduceByKey(_ + _) } // Partition such that edges are grouped by document // 创建graph,依据上面生成的顶点docTermVertices和边edges // partitionBy图的分布式存储採用点切割模式 // computeGlobalTopicTotals。计算全部词的主题分布概率和 this.graph = Graph(docTermVertices, edges).partitionBy(PartitionStrategy.EdgePartition1D) this.k = k this.vocabSize = docs.take(1).head._2.size this.checkpointInterval = lda.getCheckpointInterval this.graphCheckpointer = new PeriodicGraphCheckpointer[TopicCounts, TokenCount](graph, checkpointInterval) this.globalTopicTotals = computeGlobalTopicTotals() this } override private[clustering] def next(): EMLDAOptimizer = { require(graph != null, "graph is null, EMLDAOptimizer not initialized.") val eta = topicConcentration val W = vocabSize val alpha = docConcentration val N_k = globalTopicTotals // sendMsg: 发消息函数 // computePTopic计算主题分布 // sendToSrc sendToDst 是发送信息到源和目的属性 // 计算N_{wj} gamma_{wjk} // N_{wj} 词汇w在文档中的频次,gamma_{wjk} 词汇w在文档j中分配给主题k的概率 val sendMsg: EdgeContext[TopicCounts, TokenCount, (Boolean, TopicCounts)] => Unit = (edgeContext) => { // Compute N_{wj} gamma_{wjk} // attr边属性,srcAttr dstAttr 顶点属性 val N_wj = edgeContext.attr // E-STEP: Compute gamma_{wjk} (smoothed topic distributions), scaled by token count // N_{wj}. val scaledTopicDistribution: TopicCounts = computePTopic(edgeContext.srcAttr, edgeContext.dstAttr, N_k, W, eta, alpha) *= N_wj edgeContext.sendToDst((false, scaledTopicDistribution)) edgeContext.sendToSrc((false, scaledTopicDistribution)) } // This is a hack to detect whether we could modify the values in-place. // TODO: Add zero/seqOp/combOp option to aggregateMessages. (SPARK-5438) // mergeMsg:合并消息函数 // 用于Map阶段,每一个edge分区中每一个点收到的消息合并,以及reduce阶段。合并不同分区的消息。

合并vertexId同样的消息。 val mergeMsg: ((Boolean, TopicCounts), (Boolean, TopicCounts)) => (Boolean, TopicCounts) = (m0, m1) => { val sum = if (m0._1) { m0._2 += m1._2 } else if (m1._1) { m1._2 += m0._2 } else { m0._2 + m1._2 } (true, sum) } // M-STEP: Aggregation computes new N_{kj}, N_{wk} counts. // 每一个节点通过收集邻居数据来更新主题权重数据 val docTopicDistributions: VertexRDD[TopicCounts] = graph.aggregateMessages[(Boolean, TopicCounts)](sendMsg, mergeMsg) .mapValues(_._2) // Update the vertex descriptors with the new counts. // 依据最新顶点数据更新图 val newGraph = GraphImpl.fromExistingRDDs(docTopicDistributions, graph.edges) graph = newGraph graphCheckpointer.updateGraph(newGraph) globalTopicTotals = computeGlobalTopicTotals() this } /** * Aggregate distributions over topics from all term vertices. * * Note: This executes an action on the graph RDDs. */ private[clustering] var globalTopicTotals: TopicCounts = null // computeGlobalTopicTotals,计算全部词的主题分布概率和 private def computeGlobalTopicTotals(): TopicCounts = { val numTopics = k graph.vertices.filter(isTermVertex).values.fold(BDV.zeros[Double](numTopics))(_ += _) } // 生成LDA模型 override private[clustering] def getLDAModel(iterationTimes: Array[Double]): LDAModel = { require(graph != null, "graph is null, EMLDAOptimizer not initialized.") this.graphCheckpointer.deleteAllCheckpoints() new DistributedLDAModel(this, iterationTimes) } }<span style="font-family: 'Microsoft YaHei'; background-color: rgb(255, 255, 255);"> </span>

转载请注明出处:

http://blog.csdn.net/sunbow0

 

转载于:https://www.cnblogs.com/zhchoutai/p/7091437.html

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/aoe41606/article/details/102035399

智能推荐

874计算机科学基础综合,2018年四川大学874计算机科学专业基础综合之计算机操作系统考研仿真模拟五套题...-程序员宅基地

文章浏览阅读1.1k次。一、选择题1. 串行接口是指( )。A. 接口与系统总线之间串行传送,接口与I/0设备之间串行传送B. 接口与系统总线之间串行传送,接口与1/0设备之间并行传送C. 接口与系统总线之间并行传送,接口与I/0设备之间串行传送D. 接口与系统总线之间并行传送,接口与I/0设备之间并行传送【答案】C2. 最容易造成很多小碎片的可变分区分配算法是( )。A. 首次适应算法B. 最佳适应算法..._874 计算机科学专业基础综合题型

XShell连接失败:Could not connect to '192.168.191.128' (port 22): Connection failed._could not connect to '192.168.17.128' (port 22): c-程序员宅基地

文章浏览阅读9.7k次,点赞5次,收藏15次。连接xshell失败,报错如下图,怎么解决呢。1、通过ps -e|grep ssh命令判断是否安装ssh服务2、如果只有客户端安装了,服务器没有安装,则需要安装ssh服务器,命令:apt-get install openssh-server3、安装成功之后,启动ssh服务,命令:/etc/init.d/ssh start4、通过ps -e|grep ssh命令再次判断是否正确启动..._could not connect to '192.168.17.128' (port 22): connection failed.

杰理之KeyPage【篇】_杰理 空白芯片 烧入key文件-程序员宅基地

文章浏览阅读209次。00000000_杰理 空白芯片 烧入key文件

一文读懂ChatGPT,满足你对chatGPT的好奇心_引发对chatgpt兴趣的表述-程序员宅基地

文章浏览阅读475次。2023年初,“ChatGPT”一词在社交媒体上引起了热议,人们纷纷探讨它的本质和对社会的影响。就连央视新闻也对此进行了报道。作为新传专业的前沿人士,我们当然不能忽视这一热点。本文将全面解析ChatGPT,打开“技术黑箱”,探讨它对新闻与传播领域的影响。_引发对chatgpt兴趣的表述

中文字符频率统计python_用Python数据分析方法进行汉字声调频率统计分析-程序员宅基地

文章浏览阅读259次。用Python数据分析方法进行汉字声调频率统计分析木合塔尔·沙地克;布合力齐姑丽·瓦斯力【期刊名称】《电脑知识与技术》【年(卷),期】2017(013)035【摘要】该文首先用Python程序,自动获取基本汉字字符集中的所有汉字,然后用汉字拼音转换工具pypinyin把所有汉字转换成拼音,最后根据所有汉字的拼音声调,统计并可视化拼音声调的占比.【总页数】2页(13-14)【关键词】数据分析;数据可..._汉字声调频率统计

linux输出信息调试信息重定向-程序员宅基地

文章浏览阅读64次。最近在做一个android系统移植的项目,所使用的开发板com1是调试串口,就是说会有uboot和kernel的调试信息打印在com1上(ttySAC0)。因为后期要使用ttySAC0作为上层应用通信串口,所以要把所有的调试信息都给去掉。参考网上的几篇文章,自己做了如下修改,终于把调试信息重定向到ttySAC1上了,在这做下记录。参考文章有:http://blog.csdn.net/longt..._嵌入式rootfs 输出重定向到/dev/console

随便推点

uniapp 引入iconfont图标库彩色symbol教程_uniapp symbol图标-程序员宅基地

文章浏览阅读1.2k次,点赞4次,收藏12次。1,先去iconfont登录,然后选择图标加入购物车 2,点击又上角车车添加进入项目我的项目中就会出现选择的图标 3,点击下载至本地,然后解压文件夹,然后切换到uniapp打开终端运行注:要保证自己电脑有安装node(没有安装node可以去官网下载Node.js 中文网)npm i -g iconfont-tools(mac用户失败的话在前面加个sudo,password就是自己的开机密码吧)4,终端切换到上面解压的文件夹里面,运行iconfont-tools 这些可以默认也可以自己命名(我是自己命名的_uniapp symbol图标

C、C++ 对于char*和char[]的理解_c++ char*-程序员宅基地

文章浏览阅读1.2w次,点赞25次,收藏192次。char*和char[]都是指针,指向第一个字符所在的地址,但char*是常量的指针,char[]是指针的常量_c++ char*

Sublime Text2 使用教程-程序员宅基地

文章浏览阅读930次。代码编辑器或者文本编辑器,对于程序员来说,就像剑与战士一样,谁都想拥有一把可以随心驾驭且锋利无比的宝剑,而每一位程序员,同样会去追求最适合自己的强大、灵活的编辑器,相信你和我一样,都不会例外。我用过的编辑器不少,真不少~ 但却没有哪款让我特别心仪的,直到我遇到了 Sublime Text 2 !如果说“神器”是我能给予一款软件最高的评价,那么我很乐意为它封上这么一个称号。它小巧绿色且速度非

对10个整数进行按照从小到大的顺序排序用选择法和冒泡排序_对十个数进行大小排序java-程序员宅基地

文章浏览阅读4.1k次。一、选择法这是每一个数出来跟后面所有的进行比较。2.冒泡排序法,是两个相邻的进行对比。_对十个数进行大小排序java

物联网开发笔记——使用网络调试助手连接阿里云物联网平台(基于MQTT协议)_网络调试助手连接阿里云连不上-程序员宅基地

文章浏览阅读2.9k次。物联网开发笔记——使用网络调试助手连接阿里云物联网平台(基于MQTT协议)其实作者本意是使用4G模块来实现与阿里云物联网平台的连接过程,但是由于自己用的4G模块自身的限制,使得阿里云连接总是无法建立,已经联系客服返厂检修了,于是我在此使用网络调试助手来演示如何与阿里云物联网平台建立连接。一.准备工作1.MQTT协议说明文档(3.1.1版本)2.网络调试助手(可使用域名与服务器建立连接)PS:与阿里云建立连解释,最好使用域名来完成连接过程,而不是使用IP号。这里我跟阿里云的售后工程师咨询过,表示对应_网络调试助手连接阿里云连不上

<<<零基础C++速成>>>_无c语言基础c++期末速成-程序员宅基地

文章浏览阅读544次,点赞5次,收藏6次。运算符与表达式任何高级程序设计语言中,表达式都是最基本的组成部分,可以说C++中的大部分语句都是由表达式构成的。_无c语言基础c++期末速成