全国并行应用挑战赛(PAC-2017)指定的官方开源库是Intel开发的基于Spark的开源深度学习库,旨在发挥CPU集群而非GPU集群的计算能力。在过去两年(难道过去很多年?),无论是部分业界同仁还是某些无良媒体使得大数据的概念遍地生花🌹,当然,如今的AI也被某些利益圈子提出了不切实际的期望设定,已经被自动化所的宗成庆老师在CCAI2017上骂了一通。不管怎样,从技术层面来说,大数据是一个有价值的概念。在此,多扯一句,从互联网产品发展角度来看,学界和业界似乎都共同在做一件事,那就是打通信息(数据)流。从早期的门户网站将信息上网到过去的O2O的发展融合线上线下数据,当然这个融合是垂直领域的融合,前不久去参加华为的一个展,看到了数据在多领域的融合,如今的AI,数据挖掘等,则讲究对深层数据的挖掘,能够发现有意义的信息。站在个人角度,看好由于多领域数据融合产生新的业务价值。此外,直播平台的发展,也将逐渐走向健康态,良性发展的道路。

扯淡完毕,回归正题,聊技术。

Hadoop适合离线处理,所谓离线,一个是可以处理更多的数据,另一个是批处理。考虑到和HDFS的结合,持久化到硬盘上也是一个技术点儿,此外,由于离线处理,则在应用场景上适合对时效性要求不高的场景,也就是不要求立即做出反应。

与Hadoop相对应的是Spark的特点,Spark是基于内存计算的,所谓内存计算,是在对中间数据的处理上表现和Hadoop不同,Spark更多的将中间数据落在内存中,而Hadoop则将中间数据落在硬盘上。由于基于内存,自然在I/O上的消耗要远低于Hadoop,适合时效性要求高的场景,比如一些在线系统。此外,这种存储模型更加适合机器学习的迭代优化。当然了,Spark并不具备HDFS的存储能力,要借助HDFS等持久化数据。

关于HDFS的抽象,Master节点称为NameNode, Worker节点称为DataNode。对于一个集群,至少要有一个Master节点,也就是一个NameNode。从单个机器来看,每个机器上可以有多个DataNode,但是至少在一个集群的某台机器上存在一个NameNode,当然了,这台机器上也可以有DataNode的存在。

RDD是Spark中的核心概念,江湖中称弹性分布式数据集,是分布式内存的抽象概念

RDD的特性主要体现在三个方面(敲黑板注意),第一是血统关系图。也就是说,Spark维护着RDDs之间的依赖关系和创建关系。比如说,inputRDD通过filter操作创建errorsRDD和warningsRDD, 而这两个RDD可以通过union操作创建badLinesRDD,也就是说,badLinesRDD依赖于上述两个RDD。这样的好处显而易见,从血统关系图中可以恢复丢失数据,也就是说,血统关系图其实维护了一个数据映射关系。

第二是延迟计算。Spark中针对RDD的操作主要有两种,分别是Transformation和Action。Transformation操作的典型API有map(), filter(),reduceByKey()等,而Action操作的代表性API有count(), collect(), save()等,对于二者提供的API,可以发现二者的区别。Transformation的含义如同其名,强调对数据的转换,而Action则强调对数据结果的获取所谓延迟计算是指对RDD的真正计算发生在对该RDD第一次使用Action操作的时候。一个典型的例子是加载数据,也就是说数据在必要的时候,才会被加载进去。但是要对加载这个操作本身(比如,一行加载数据的代码)进行响应, 是通过Spark内部的metadata记录的,也包括对Transformation操作的响应。类比找老板讨论问题的时候,先去和老板约时间,然后在约定时间直接去找老板,而不是直接去找老板(小心被怼,哈哈)。孰优孰劣,显然!优点是减少数据的传输,也就是省的你白跑多次。

第三是缓存。Spark默认每次在RDD上进行Action操作时,重新计算RDD,也就是要重新跑一遍血统关系图。这个时候很显然的方案是缓存技术,RDD.persist()可以实现对RDD的重复利用,而unpersist()是从缓存中移除。至于为什么要默认每次重新计算?至少这种简单粗暴的方法可以省去很多问题的考虑,比如可能的RDD的一致性问题等。

上述RDD的三个特性也是Spark的精华,每个特性都在某种程度上是为了更好的实现内存计算。关于各种操作的API,具体可以Google,毕竟我也记不住(囧)。

至此,战略层面的东西已经讨论完毕,接下来,休息一下,去看看战术层面的东西,也就是在比赛中我们是如何使用Spark完成既定目标的。

假设对银联产品的整体评价这个标签进行三分类(好,中,差),具体数据和比赛要求可参看官方网站。则我们的训练数据如下:

texts = 评论文本,整体评价

第一步生成RDD:

data_rdd = sc.parallelize(texts)

默认情况下,SparkContext(sc)会根据集群的情况自动设定slice的数值,也就是数据分成多少份了,当然可以自己设定。这里我们采用默认。

第二步生成关键词:生成关键词的逻辑是这样的,对所有评论文本进行停用词过滤后进行词频统计,然后去掉前10个频率最高的词汇,然后按照词频递减的顺序选择990个词汇作为最终的关键词。为啥?因为词频越高意味着在评论文本中出现次数越多,也就是特征了。

word_to_ic = analyze_texts(data_rdd,stopwords)

其中,stopwords就是停用词表。analyze_texts的实现逻辑如下:

def analyze_texts(data_rdd,stopwords):
      def index(w_c_i):
          ((w, c), i) = w_c_i
          return (w, (i + 1, c))
    return data_rdd.flatMap(lambda text_label: text_to_words(text_label[0],stopwords)) \
          .map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b) \
          .sortBy(lambda w_c: - w_c[1]).zipWithIndex() \
          .map(lambda w_c_i: index(w_c_i)).collect()

def text_to_words(review_text,stopwords):
     words = list(jieba.cut(review_text.replace('\n','')))
     # Filter stop words
     words = filterCmt(words,stopwords)
     return words

analyze_texts的返回结果是(关键词,(关键词索引序号,词频)),关键词索引序号是按照词频递减依次增加。

sortBy(lambda w_c:-w_c[1])表示按照词频递减排序,其中w_c的结构是(关键词,词频),’_‘的作用更像是占位符,这种用法在内部函数index的实现中同样有体现(迷之微笑)。关于index给出一个例子如下:

w_c_i = ( ('hello',234),2) )
res = index(w_c_i)
res = ('hello',(3,234))

zipWithIndex()函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键值对。例如:

scala> var rdd2 = sc.makeRDD(Seq("A","B","R","D","F"),2)
scala> rdd2.zipWithIndex().collect

Array[(String, Long)] = Array((A,0), (B,1), (R,2), (D,3), (F,4))

其中text_to_words的逻辑是针对每条评论,先去掉所有换行符,然后使用分词工具jieba进行中文分词,然后返回分词后的一个列表。

第三步:将关键词分发到每个节点上。

bword_to_ic = sc.broadcast(word_to_ic)

broadcast即广播协议。

第四步:将词向量分发到每个节点上。

bfiltered_w2v = sc.broadcast(filtered_w2v)

注意这里的词向量是和关键词对应的词向量,词向量预先是针对所有评论文本训练得来。也就是说,此时集群上同时存在关键词和关键词对应的词向量。

第五步:将训练数据转换成下述格式

评论文本的关键词列表,整体评价

tokens_rdd = data_rdd.map(lambda text_label:([w for w in text_to_words(text_label[0],stopwords)
    if w in bword_to_ic.value], text_label[1]))

第六步:关键词填充。为啥呢?有的评论文本长,有的评论文本短,这样通过填充一些没有信息量的符号,使得所有的关键词列表长度一致,达到处理上的方便,比如说在给模型喂数据的时候。

padded_tokens_rdd = tokens_rdd.map(lambda tokens_label: (pad(tokens_label[0], "##", sequence_len), 
tokens_label[1]))

第七步:将训练数据转换成下述格式

评论文本的关键词向量列表,整体评价

vector_rdd = padded_tokens_rdd.map(lambda tokens_label:([to_vec(w, bfiltered_w2v.value,embedding_dim)
    for w in tokens_label[0]], tokens_label[1]))

第八步:将关键词向量列表转换成评论文本矩阵

sample_rdd = vector_rdd.map(
     lambda vectors_label: to_sample(vectors_label[0], vectors_label[1],embedding_dim))

其中,to_sample的实现逻辑如下:

def to_sample(vectors, label, embedding_dim):
  # flatten nested list
  flatten_features = list(itertools.chain(*vectors))
  features = np.array(flatten_features, dtype='float').reshape(
      [sequence_len, embedding_dim])
  return Sample.from_ndarray(features, np.array(label))

语法注释:其中chain()可以把一组迭代对象串联起来,形成一个更大的迭代器。int不是可迭代对象。*vectors表示访问一个可迭代对象集合。

第九步:分割数据集

train_rdd, val_rdd = sample_rdd.randomSplit([training_split,1-training_split])

至此,基于Spark的数据处理工作结束。欢迎开启Spark之旅!


参考:

1.一个实际PySpark项目性能调优

2.PySpark内部实现

讨论了RDD,序列化和反序列化,以及PySpark中的几个经典操作。

3.Python命令行参数处理

4.Spark默认并行度

5.sklearn中两种模型持久化的方式

6.Spark Streaming

BigDL官方给出的Spark Streaming例子,采用netcat作为数据流的产生工具,利用streaming接收数据流并进行词频统计。