Monday, June 27, 2016

verify million files MD5 with spark

Partitioning will not be helpful in all applications—for
example, if a given RDD is scanned only once, there is no point in partitioning it in
advance. It is useful only when a dataset is reused multiple times in key-oriented
operations such as joins. 
Spark’s partitioning is available on all RDDs of key/value pairs, and causes the system
to group elements based on a function of each key.Although Spark does not give
explicit control of which worker node each key goes to (partly because the system is
designed to work even if specific nodes fail), it lets the program ensure that a set of
keys will appear together on some node.
Many of Spark’s operations involve shuffling data by key across the network. All of
these will benefit from partitioning.For operations that act on a single RDD, such as reduceByKey(), running on a prepartitioned
RDD will cause all the values for each key to be computed locally on a
single machine, requiring only the final, locally reduced value to be sent from each
worker node back to the master.





One issue to watch out for when passing functions is inadvertently serializing the
object containing the function. When you pass a function that is the member of an
object, or contains references to fields in an object (e.g., self.field), Spark sends the
entire object to worker nodes, which can be much larger than the bit of information
you need (see Example 3-19). Sometimes this can also cause your program to fail, if
your class contains objects that Python can’t figure out how to pickle.
The set of stages produced for a particular action is termed a job. In each case when
we invoke actions such as count(), we are creating a job composed of one or more
stages.
Once the stage graph is defined, tasks are created and dispatched to an internal
scheduler, which varies depending on the deployment mode being used. Stages in the
physical plan can depend on each other, based on the RDD lineage, so they will be
executed in a specific order. For instance, a stage that outputs shuffle data must occur
before one that relies on that data being present.
A physical stage will launch tasks that each do the same thing but on specific partitions
of data. Each task internally performs the same steps:
1. Fetching its input, either from data storage (if the RDD is an input RDD), an
existing RDD (if the stage is based on already cached data), or shuffle outputs.
2. Performing the operation necessary to compute RDD(s) that it represents. For
instance, executing filter() or map() functions on the input data, or performing
grouping or reduction.
3. Writing output to a shuffle, to external storage, or back to the driver (if it is the
final RDD of an action such as count()).



Choosing an output compression codec can have a big impact on future users of the
data. With distributed systems such as Spark, we normally try to read our data in
from multiple different machines. To make this possible, each worker needs to be
able to find the start of a new record. Some compression formats make this impossible,
which requires a single node to read in all of the data and thus can easily lead to a
bottleneck. Formats that can be easily read from multiple machines are called “splittable.”



While Spark’s textFile() method can handle compressed input, it
automatically disables splittable even if the input is compressed
such that it could be read in a splittable way. If you find yourself
needing to read in a large single-file compressed input, consider
skipping Spark’s wrapper and instead use either newAPIHadoopFile
or hadoopFile and specify the correct compression codec.




val sourceMD5 = sc.textFile("/home/ano/source.md5")
val  destMD5 = sc.textFile("/home/ano/oss.md5")
val source_pairs = sourceMD5.map(x => (x.split(" ")(0), x.split(" ")(2).replace("oss1","osstest")))
val dest_pairs = lines.map(x => (x.split(" ")(0), x.split(" ")(2)))
val invalids = countsource_pairs.subtract(dest_pairs ).count
每台跑满了,运行周期三天,数据1.8T
原始数据*2.5 *2(fq)
单机版的速度你可以尝试这样测试, 1.把一个稍微大的文件放到内存里,停掉单机版的服务 2.配置好job.cfg, 删掉jobName配置 3.然后seq 1 100 | xargs -I {} echo "cp job.cfg job.{}.cfg;echo jobName=local_test.{} >> job.{}.cfg" | bash 复制多个job  4.把这些job submit进去 5.启动服务,看下稳定后速度能到多少
seq 1 200 | xargs -I {} echo "cp job.cfg job.{}.cfg;echo jobName=local_test.{} >> job.{}.cfg;echo destPrefix=vpn{}/ >> job.{}.cfg;" | bash
seq 1 100 | xargs -I {} java  -jar bin/ossimport2.jar  -c conf/sys.properties submit jobs/job.{}.cfg
seq 1 100 | xargs -I {} java  -jar bin/ossimport2.jar  -c conf/sys.properties clean jobs/job.{}.cfg

No comments:

Post a Comment