上一章节我们了解了 shuffle 相关的概念和原理后其实可以发现一个问题,那就是 shuffle 比较容易造成数据倾斜的情况。 例如上一节我们看到的图,在这批数据中,hello 这个单词的行占据了绝大部分,当我们执行 groupByKey 的时候触发了 shuffle。这时候大部分的数据 (Hello) 都汇集到了一个 partition 上。这种极端的情况就会造成著名的长尾现象,就是说由于大部分数据都汇集到了一个 partition 而造成了这个 partition 的 task 运行的十分慢。而其他的 task 早已完成,整个任务都在等这个大尾巴 task 的结束。 这种现象破坏了分布式计算的设计初衷,因为最终大部分的计算任务都在一个单点上执行了。所以极端的数据分布就成为了机器学习和大数据处理这类产品的劲敌,我跟我司的研发人员聊的时候,他们也觉得数据倾斜的情况比较难处理,当然我们可以做 repartition(重新分片) 来重新整合 parition 的数量和分布等操作,以及避免或者减少 shuffle 的成本,各家不同的业务有不同的做法。在做这类产品的性能测试的时候,也跟我们以往的互联网模式不同,产品的压力不在于并发量上,而在于数据量和数据分布上。
一般我们需要模拟以下这些情况的数据:
下面是造数工具的架构图:

解释一下原理:
下面给一个使用pyspark构造数据的DEMO:
- from pyspark import SparkContext, SparkConf, SQLContext
-
- import random
-
- from pyspark.sql.types import StructType, StructField, StringType, IntegerType
-
-
-
-
-
- def choice_one_random(sequence):
-
- return random.choice(sequence)
-
-
-
-
-
- def choice_one_with_weights(sequence, weight):
-
- return random.choices(sequence, weights=weight)[0]
-
-
-
-
-
- def randomInt(start, end):
-
- return random.randint(start, end)
-
-
-
-
-
- conf = SparkConf().setMaster("local").setAppName("My App")
-
- sc = SparkContext(conf=conf)
-
- sqlContext = SQLContext(sc)
-
-
-
- title = ['程序员', '教师', '测试人员', '产品经理']
-
- gender = ['男性', '女性']
-
- gender_weights = weights = [0.6, 0.4]
-
-
-
- nums = sc.parallelize(range(10000))
-
- rdd = nums.map(
-
- lambda x: (
-
- randomInt(1, 100),
-
- choice_one_random(title),
-
- choice_one_with_weights(gender, gender_weights)))
-
-
-
- schema = StructType([
-
- StructField("age", IntegerType(), True),
-
- StructField("title", StringType(), True),
-
- StructField("gender", StringType(), True),
-
- ])
-
-
-
-
-
- data = sqlContext.createDataFrame(rdd, schema=schema)
-
- data.show()
原理:
可以参考上面的DEMO来完成自己的造数任务, 可以看到python的库中有很多好用的功能, 比如上面我通过random库不仅可以生成随机的数据, 也可以给存储一个列表,让数据生成的时候从这个列表中选一个并且给不同的值不同的权重来控制数据的分布, 这就可以造出数据倾斜的场景。
下面给一个用java实现的例子(实际的项目中我是用java的,下面就是一个项目中的构造程序):
- package yuanhang;
-
-
-
- import generator.field.random.RandomDateField;
-
- import generator.field.random.RandomIntField;
-
- import generator.field.random.RandomScopeField;
-
- import generator.field.random.RandomStringField;
-
- import generator.table.XRange;
-
- import generator.utils.DateUtil;
-
- import org.apache.spark.SparkConf;
-
- import org.apache.spark.SparkContext;
-
- import org.apache.spark.api.java.JavaRDD;
-
- import org.apache.spark.api.java.JavaSparkContext;
-
- import org.apache.spark.sql.*;
-
- import org.apache.spark.sql.types.DataTypes;
-
- import org.apache.spark.sql.types.StructField;
-
- import org.apache.spark.sql.types.StructType;
-
-
-
- import java.sql.Date;
-
- import java.sql.Timestamp;
-
- import java.text.DateFormat;
-
- import java.text.SimpleDateFormat;
-
- import java.time.ZoneId;
-
- import java.util.ArrayList;
-
- import java.util.List;
-
- import java.util.Properties;
-
- import org.apache.spark.util.LongAccumulator;
-
- import java.time.LocalDate;
-
- import java.util.Random;
-
-
-
-
-
- /**
- */
-
- public class event {
-
-
-
- public static void main(String[] args) {
-
- // SparkConf conf = new SparkConf().setAppName("data produce")
-
- // .setMaster("local");
-
- SparkConf conf = new SparkConf().setAppName("data produce");
-
- JavaSparkContext sc = new JavaSparkContext(conf);
-
-
-
- SparkSession spark = SparkSession
-
- .builder()
-
- .appName("Java Spark SQL basic example")
-
- .getOrCreate();
-
-
-
- // SparkContext sparkSC = spark.sparkContext();
-
- List
fields = new ArrayList<>(); -
- fields.add(DataTypes.createStructField("uin", DataTypes.StringType, true));
-
- fields.add(DataTypes.createStructField("app_key", DataTypes.StringType, true));
-
- fields.add(DataTypes.createStructField("event_time", DataTypes.DateType, true));
-
- fields.add(DataTypes.createStructField("event_code", DataTypes.StringType, true));
-
- fields.add(DataTypes.createStructField("ds", DataTypes.IntegerType, true));
-
-
-
-
-
- fields.add(DataTypes.createStructField("i001", DataTypes.IntegerType, true));
-
- fields.add(DataTypes.createStructField("i002", DataTypes.IntegerType, true));
-
- fields.add(DataTypes.createStructField("s001", DataTypes.StringType, true));
-
- fields.add(DataTypes.createStructField("s002", DataTypes.StringType, true));
-
- fields.add(DataTypes.createStructField("s003", DataTypes.StringType, true));
-
- fields.add(DataTypes.createStructField("s004", DataTypes.StringType, true));
-
- fields.add(DataTypes.createStructField("s005", DataTypes.StringType, true));
-
- fields.add(DataTypes.createStructField("s006", DataTypes.StringType, true));
-
- fields.add(DataTypes.createStructField("s007", DataTypes.StringType, true));
-
- fields.add(DataTypes.createStructField("s008", DataTypes.StringType, true));
-
- fields.add(DataTypes.createStructField("d001", DataTypes.DoubleType, true));
-
-
-
-
-
-
-
- StructType schema = DataTypes.createStructType(fields);
-
-
-
- final LongAccumulator accumulator = sc.sc().longAccumulator();
-
-
-
-
-
- // LocalDate startDate = LocalDate.of(2023, 5, 1);
-
- LocalDate startDate = LocalDate.of(2022, 10, 30);
-
- // LocalDate beginDate = LocalDate.of(2022, 5, 1);
-
- LocalDate endDate = LocalDate.of(2023, 10, 30);
-
- //default time zone
-
- ZoneId defaultZoneId = ZoneId.systemDefault();
-
-
-
- RandomScopeField event_codes = new RandomScopeField();
-
- event_codes.getValues().add("app_jhapp_search_res");
-
- event_codes.getValues().add("app_jhapp_tab_switch");
-
- event_codes.getValues().add("app_jhapp_applnch");
-
- event_codes.getValues().add("app_jhapp_search_ck");
-
- event_codes.getValues().add("app_jhapp_search_page_imp");
-
- event_codes.getValues().add("app_jhapp_search_page_ck");
-
- event_codes.getValues().add("app_jhapp_explore_subtab_imp");
-
- event_codes.getValues().add("app_jhapp_carousels_imp");
-
- event_codes.getValues().add("app_jhapp_carousels_intrct");
-
- event_codes.getValues().add("app_jhapp_first_open");
-
- event_codes.getValues().add("app_jhapp_content_detail_imp");
-
- event_codes.getValues().add("app_jhapp_content_detail_interact");
-
- event_codes.getValues().add("app_jhapp_tab_imp");
-
-
-
-
-
-
-
- while (!startDate.isAfter(endDate)) {
-
- // System.out.println(startDate);
-
-
-
- // List data1 = new XRange(1000);
-
- List data1 = new XRange(274000);
-
- JavaRDD distData = sc.parallelize(data1, 20);
-
- DateFormat dateformat = new SimpleDateFormat("yyyyMMddhh");
-
- Date date = new Date(java.util.Date.from(startDate.atStartOfDay(defaultZoneId).toInstant()).getTime());
-
- // Date bDate = new Date(java.util.Date.from(beginDate.atStartOfDay(defaultZoneId).toInstant()).getTime());
-
- // Date eDate = new Date(java.util.Date.from(endDate.atStartOfDay(defaultZoneId).toInstant()).getTime());
-
- int ds = Integer.parseInt(dateformat.format(date));
-
-
-
- JavaRDD rowRDD = distData.map( record ->{
-
- RandomIntField userId = new RandomIntField();
-
- userId.setMax(1000000);
-
- userId.setMin(1);
-
- // Date date = DateUtil.randomDate("2023-05-01", "2023-06-19");
-
- RandomIntField r = new RandomIntField();
-
- r.setMin(1);
-
- r.setMax(100);
-
- int i001 = Integer.parseInt(r.gen().toString());
-
- int i002 = Integer.parseInt(r.gen().toString());
-
-
-
- String s001 = "C"+ r.gen();
-
- String s002 = "当前一级板" + r.gen();
-
- String s003 = "去向一级板块" + r.gen();
-
- String s004 = "去向一级板块" + r.gen();
-
- String s005 = "去向一级板块" + r.gen();
-
- String s006 = "去向一级板块" + r.gen();
-
- String s007 = "2022090701";
-
- String s008 = "2023090701";
-
- double d001 = Double.parseDouble(r.gen().toString());
-
-
-
-
-
-
-
- return RowFactory.create(userId.gen().toString(), "0WEB05LD02D5FL6K",date,event_codes.gen(),ds,i001,i002,s001,s002,s003,s004,s005,s006,s007,s008,d001);
-
- });
-
-
-
- Dataset dataset =spark.createDataFrame(rowRDD, schema);
-
- dataset.show();
-
- // DataFrameWriter writer = new DataFrameWriter(dataset);
-
-
-
- String jdbcUrl = "jdbc:clickhouse://clickhouse-hs:8123/beacon_olap";
-
- // String jdbcUrl = "jdbc:clickhouse://10.27.20.122:8123/beacon_olap";
-
- Properties ckProperties = new Properties();
-
- // ckProperties.setProperty("user", "beidou");
-
- ckProperties.setProperty("user", "default");
-
- ckProperties.setProperty("password", "QdFx@00700!*");
-
- // ckProperties.setProperty("password", "Beidou@qidian");
-
- ckProperties.setProperty("driver", "ru.yandex.clickhouse.ClickHouseDriver");
-
- // ckProperties.setProperty("dbtable", "event_record_240");
-
- // ckProperties.setProperty("batchsize", "50000");
-
- // ckProperties.setProperty("isolationLevel", "NONE");
-
- // ckProperties.setProperty("numPartitions", "12");
-
- // ckProperties.setProperty("url", "jdbc:clickhouse://clickhouse-hs:8123/beacon_olap");
-
-
-
- dataset.write().option("batchsize", "50000")
-
- .option("isolationLevel", "NONE")
-
- .option("numPartitions", "10")
-
- .option("truncate", "true")
-
- .option("compression", "snappy")
-
- .mode(SaveMode.Append)
-
- .jdbc(jdbcUrl, "event_record_273", ckProperties);
-
-
-
- // dataset.write().mode(SaveMode.Append).jdbc(jdbcUrl, "event_record_240", ckProperties);
-
- startDate = startDate.plusDays(1);
-
- }
-
-
-
-
-
- }
-
- }
上一篇中也提到了海量的小文件是所有分布式存储设备的天敌, 我们之前在做一个车企的项目的时候, 也是要为客户搭建一个人工智能系统, 但是客户那边有20亿张图片的数据量需要做模型的训练推理(主要是计算机视觉方向,所以都是图片数据)。 所以专门引入了一个适合存储小文件的分布式存储设备(比如可以用ceph), 这时候就需要测试在这样庞大的文件数量下,不仅仅是存储系统,还有我们的产品本身是否能符合客户的性能需要。
海量小文件的构建与之前所讲的构造方式完全不同, spark可以造大量的数据,但这些数据都是在少数文件中的, 它无法构建海量的小文件, 这是因为在spark中每个parition(这里可以理解为一个小文件, 因为如果一个比较大的数据被切分成了很多很小的文件, 那么即便这个文件只有1k,在它读取到内存的时候也会当成一个partition处理)都会生成一个独立的task来计算, 一个task可以理解为一个线程。 所以当文件数量过多时,spark就会启动非常多的线程争抢cpu资源。所以不仅仅是分布式存储系统, 在分布式计算本身,过多的文件数量都是噩梦,试想一下,当一个文件只有100w条数据,但是每条数据都单独保存在一个文件中。 这时候spark就要开启100w个线程来处理这个数据,这是多么可怕的事情。
所以以前的构造方式是无法满足我们的需要的。 就要引入另外一种机制 -- 异步IO,这是一种利用少量线程就可以支撑大并发量的技术。 因为我们常见的普通的同步IO是无法满足我们的需要的,它有如下的缺点:
后面经过讨论, 最后的方案是使用 golang 语言, 用协程 + 异步 IO 来进行造数:
代码实现:
- package main
-
-
-
- import (
-
- "crypto/md5"
-
- "encoding/hex"
-
- "fmt"
-
- "io/ioutil"
-
- "math/rand"
-
- "net/http"
-
- "os"
-
- "path"
-
- "sync"
-
- "time"
-
- )
-
-
-
- var (
-
- fileQueue = make(chan FileInfo, 1000) // 缓冲队列,用来存储文件key, 方便后面的协程取出来插入数据库
-
- sourceFiles = []string{"asfdf.png"} // 复制的源文件
-
- sourceFileCache sync.Map // 缓存源文件内容, 避免每次copy都重新读取源文件
-
- )
-
-
-
- const (
-
- destDir = "file" // 需要复制的目录路径
-
- copyNumber = 10 // 每个协程需要copy的文件数量
-
- )
-
-
-
- // 文件 信息
-
- type FileInfo struct {
-
- key string
-
- createdAt time.Time
-
- }
-
-
-
- // 生成随机字符串
-
- func GetRandomString(n int) string {
-
- str := "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
-
- bytes := []byte(str)
-
- var result []byte
-
- for i := 0; i < n; i++ {
-
- result = append(result, bytes[rand.Intn(len(bytes))])
-
- }
-
- return string(result)
-
- }
-
-
-
- func copyFile(src, dst string) ([]byte, int64, error) {
-
-
-
- var input []byte
-
- if data, ok := sourceFileCache.Load(src); ok {
-
- input = data.([]byte)
-
- } else {
-
- data, err := ioutil.ReadFile(src)
-
- if err != nil {
-
- return []byte{}, 0, err
-
- }
-
- input = data
-
- sourceFileCache.Store(src, input)
-
- }
-
-
-
- err := ioutil.WriteFile(dst, input, 0644)
-
- if err != nil {
-
- return []byte{}, 0, err
-
- }
-
-
-
- fi, err := os.Stat(dst)
-
- if err != nil {
-
- return []byte{}, 0, err
-
- }
-
-
-
- return input, fi.Size(), nil
-
- }
-
-
-
- func copyFiles(wg *sync.WaitGroup) {
-
- defer wg.Done()
-
- for i := 0; i < copyNumber; i++ {
-
- // 随机种子
-
- rand.Seed(time.Now().UnixNano())
-
-
-
- // 从源文件中选择一个文件进行copy
-
- sourceFileCount := len(sourceFiles)
-
- sourceFilePath := sourceFiles[rand.Intn(sourceFileCount)]
-
-
-
- // 生成随机的文件名称
-
- fileName := GetRandomString(30) + ".jpg"
-
- destFilePath := path.Join(destDir, fileName)
-
-
-
- data, size, err := copyFile(sourceFilePath, destFilePath)
-
- if err != nil {
-
- fmt.Printf("copyFile file from %s to %s err, the message is %s", sourceFilePath, destFilePath, err.Error())
-
- }
-
-
-
- key, err := NewUploadFileKey("superadmin", fileName, md5V(string(data)), size)
-
- if err != nil {
-
- fmt.Printf("gen file key error, the message is %s", err.Error())
-
- }
-
- fileQueue <- FileInfo{
-
- key: key,
-
- createdAt: time.Now(),
-
- }
-
- }
-
- }
-
-
-
- func parseBasenameFromURI(uri string) (string, error) {
-
- r, _ := http.NewRequest("GET", uri, nil)
-
- return path.Base(r.URL.Path), nil
-
- }
-
-
-
- func NewUploadFileKey(uin, filename, hash string, size int64) (string, error) {
-
- str := fmt.Sprintf("uin:%s-hash:%s-size:%d", uin, hash, size)
-
- basename, err := parseBasenameFromURI(filename)
-
- if err != nil {
-
- return "", err
-
- }
-
- partHash := md5V(str)
-
- result := md5V(fmt.Sprintf("hash:%s-name:%s", partHash, filename))
-
- content := md5V(fmt.Sprintf("%s-%s-%s", result[8:10], result[10:], basename))
-
- u := fmt.Sprintf("%s%s/%s/%s-%s", "upload/", result[8:10], content[8:10], result[10:], basename)
-
- return u, nil
-
- }
-
-
-
- func md5V(str string) string {
-
- h := md5.New()
-
- h.Write([]byte(str))
-
- return hex.EncodeToString(h.Sum(nil))
-
- }
-
-
-
- func main() {
-
- var wg1 sync.WaitGroup
-
- wg1.Add(10)
-
- for i := 0; i < 10; i++ {
-
- go copyFiles(&wg1)
-
- }
-
-
-
- // 等待所有复制文件的协程结束
-
- go func() {
-
- wg1.Wait()
-
- // 关闭chan, 通知插入数据的协程, 文件都已经复制完毕
-
- close(fileQueue)
-
- fmt.Println("关闭通道")
-
- }()
-
-
-
- // 使用10个线程来插入数据库
-
- var wg2 sync.WaitGroup
-
- wg2.Add(10)
-
- for i := 0; i < 10; i++ {
-
- go insert(&wg2)
-
- }
-
- wg2.Wait()
-
- fmt.Println("数据生成完毕")
-
- }
-
-
-
- // 从chan中取出文件key插入数据库
-
- func insert(wg *sync.WaitGroup) {
-
- defer wg.Done()
-
- var cache []FileInfo
-
-
-
- for fileInfo := range fileQueue {
-
- if len(cache) < 1000 {
-
- cache = append(cache, fileInfo)
-
- //fmt.Println("数据没到1000条, 继续缓存")
-
- } else {
-
- // todo 将1000条数据插入到数据库中
-
- fmt.Println("积累了1k个文件, 开始插入数据库")
-
- }
-
- }
// todo for循环结束说明队列已经被关闭, 所有文件都copy完毕这时候需要缓存中剩余的记录一块插入到数据库中
fmt.Println("通道已经关闭, 现在开始把剩余的插入到数据库中")
//fmt.Println(cache)
for _, fileInfo := range cache{
fmt.Println(fileInfo.key)
}
}
因为golang原生就支持异步IO,实现起来最简单便选择了golang语言,对于go语言不熟悉的同学也可以查找一下python语言的异步io库
更多内容欢迎来到我的知识星球:
