• 《Go Web 编程》之第9章 发挥Go的并发优势


    第9章 发挥Go的并发优势

    9.1 并发与并行的区别

    并发(concurrency),多个任务同一时间段内启动、运行并结束,任务间可能会互动,使用和分享相同的资源。
    并行(parallelism),多个任务同时启动并执行(大任务分割成小任务),需要独立的资源(CPU等),相互平行无重叠。
    并发是同时处理多项任务,并行是同时执行多项任务。

    9.2 goroutine

    goroutine是对线程的复用,关联函数,轻量级,启动时需要很小的栈(按需扩大和缩小)。
    goroutine被阻塞时,其所在线程被阻塞,运行时环境(runtime)会将阻塞线程上的其它goroutine移动到其它未阻塞的线程上运行。

    9.2.1 使用goroutine

    goroutine.go

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func printNumbers1() {
    	for i := 0; i < 10; i++ {
    		fmt.Printf("%d ", i)
    	}
    }
    
    func printLetters1() {
    	for i := 'A'; i < 'A'+10; i++ {
    		fmt.Printf("%c ", i)
    	}
    }
    
    func printNumbers2() {
    	for i := 0; i < 10; i++ {
    		time.Sleep(1 * time.Microsecond)
    		fmt.Printf("%d ", i)
    	}
    }
    
    func printLetters2() {
    	for i := 'A'; i < 'A'+10; i++ {
    		time.Sleep(1 * time.Microsecond)
    		fmt.Printf("%c ", i)
    	}
    }
    
    func print1() {
    	printNumbers1()
    	printLetters1()
    }
    
    func goPrint1() {
    	go printNumbers1()
    	go printLetters1()
    }
    
    func print2() {
    	printNumbers2()
    	printLetters2()
    }
    
    func goPrint2() {
    	go printNumbers2()
    	go printLetters2()
    }
    
    func main() {
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    goroutine_test.go

    package main
    
    import (
    	"testing"
    	"time"
    )
    
    // Test cases
    
    // normal run
    func TestPrint1(t *testing.T) {
    	print1()
    }
    
    // run with goroutines
    func TestGoPrint1(t *testing.T) {
    	goPrint1()
    	time.Sleep(100 * time.Millisecond)
    }
    
    func TestPrint2(t *testing.T) {
    	print2()
    }
    
    // run with goroutines and some work
    func TestGoPrint2(t *testing.T) {
    	goPrint2()
    	time.Sleep(100 * time.Millisecond)
    }
    
    // Benchmark cases
    
    /*
    // normal run
    func BenchmarkPrint1(b *testing.B) {
    	for i := 0; i < b.N; i++ {
    		print1()
    	}
    }
    
    // run with goroutines
    func BenchmarkGoPrint1(b *testing.B) {
    	for i := 0; i < b.N; i++ {
    		goPrint1()
    	}
    }
    
    // run with some work
    func BenchmarkPrint2(b *testing.B) {
    	for i := 0; i < b.N; i++ {
    		print2()
    	}
    }
    
    // run with goroutines and some work
    func BenchmarkGoPrint2(b *testing.B) {
    	for i := 0; i < b.N; i++ {
    		goPrint2()
    	}
    }
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    go test -v
    go test -run x -bench . -cpu 1
    
    • 1
    • 2

    9.2.2 goroutine与性能

    goroutine.go

    package main
    
    import "time"
    
    func printNumbers1() {
    	for i := 0; i < 10; i++ {
    		//fmt.Printf("%d ", i)
    	}
    }
    
    func printLetters1() {
    	for i := 'A'; i < 'A'+10; i++ {
    		//fmt.Printf("%c ", i)
    	}
    }
    
    func printNumbers2() {
    	for i := 0; i < 10; i++ {
    		time.Sleep(1 * time.Microsecond)
    		//fmt.Printf("%d ", i)
    	}
    }
    
    func printLetters2() {
    	for i := 'A'; i < 'A'+10; i++ {
    		time.Sleep(1 * time.Microsecond)
    		//fmt.Printf("%c ", i)
    	}
    }
    
    func print1() {
    	printNumbers1()
    	printLetters1()
    }
    
    func goPrint1() {
    	go printNumbers1()
    	go printLetters1()
    }
    
    func print2() {
    	printNumbers2()
    	printLetters2()
    }
    
    func goPrint2() {
    	go printNumbers2()
    	go printLetters2()
    }
    
    func main() {
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    goroutine_test.go

    package main
    
    import (
    	"testing"
    	"time"
    )
    
    // Test cases
    
    // normal run
    func TestPrint1(t *testing.T) {
    	print1()
    }
    
    // run with goroutines
    func TestGoPrint1(t *testing.T) {
    	goPrint1()
    	time.Sleep(100 * time.Millisecond)
    }
    
    func TestPrint2(t *testing.T) {
    	print2()
    }
    
    // run with goroutines and some work
    func TestGoPrint2(t *testing.T) {
    	goPrint2()
    	time.Sleep(100 * time.Millisecond)
    }
    
    // Benchmark cases
    
    
    // normal run
    func BenchmarkPrint1(b *testing.B) {
    	for i := 0; i < b.N; i++ {
    		print1()
    	}
    }
    
    // run with goroutines
    func BenchmarkGoPrint1(b *testing.B) {
    	for i := 0; i < b.N; i++ {
    		goPrint1()
    	}
    }
    
    // run with some work
    func BenchmarkPrint2(b *testing.B) {
    	for i := 0; i < b.N; i++ {
    		print2()
    	}
    }
    
    // run with goroutines and some work
    func BenchmarkGoPrint2(b *testing.B) {
    	for i := 0; i < b.N; i++ {
    		goPrint2()
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    go test -run x -bench . -cpu 1
    go test -run x -bench . -cpu 2
    go test -run x -bench . -cpu 4
    
    • 1
    • 2
    • 3

    启动goroutine有一定代价。
    在多个CPU上调度和运行任务需要消耗一定的资源,如果使用多个CPU带来的性能优势不足以抵消随之而来的额外消耗,那么程序的性能就会不升反降。

    9.2.3 等待goroutine

    等待组(WaitGroup)机制:

    • 声明等待组;
    • Add为等待组计数器设置值;
    • 一个goroutine结束时,Done方法让等待组计数器减一;
    • Wait阻塞,直到等待组计数器为0。
    package main
    
    import (
    	"fmt"
    	"sync"
    	"time"
    )
    
    func printNumbers2(wg *sync.WaitGroup) {
    	defer wg.Done()
    	for i := 0; i < 10; i++ {
    		time.Sleep(1 * time.Microsecond)
    		fmt.Printf("%d ", i)
    	}
    }
    
    func printLetters2(wg *sync.WaitGroup) {
    	defer wg.Done()
    	for i := 'A'; i < 'A'+10; i++ {
    		time.Sleep(1 * time.Microsecond)
    		fmt.Printf("%c ", i)
    	}
    }
    
    func main() {
    	var wg sync.WaitGroup
    	wg.Add(2)
    	go printNumbers2(&wg)
    	go printLetters2(&wg)
    	wg.Wait()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    9.3 通道

    channel,带有类型的值(typed value),让goroutine相互通信。

    //默认无缓冲通道(unbuffered channel),同步。
    ch := make(chan int)
    
    //有缓冲通道,buffered channel
    ch := make(chan int, 10)
    
    ch <- 1
    i := <-ch
    
    //默认双向(bidirectional),可只读,可只写
    ch := make(chan<- string)
    ch := make( <-chan string)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    9.3.1 通过通道实现同步

    channel_wait.go

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func printNumbers(w chan bool) {
    	for i := 0; i < 10; i++ {
    		time.Sleep(1 * time.Microsecond)
    		fmt.Printf("%d ", i)
    	}
    	w <- true
    }
    
    func printLetters(w chan bool) {
    	for i := 'A'; i < 'A'+10; i++ {
    		time.Sleep(1 * time.Microsecond)
    		fmt.Printf("%c ", i)
    	}
    	w <- true
    }
    
    func main() {
    	w1, w2 := make(chan bool), make(chan bool)
    	go printNumbers(w1)
    	go printLetters(w2)
    	<-w1
    	<-w2
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    9.3.2 通过通道实现消息传递

    channel_message.go

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func thrower(c chan int) {
    	for i := 0; i < 5; i++ {
    		c <- i
    		fmt.Println("Threw  >>", i)
    	}
    }
    
    func catcher(c chan int) {
    	for i := 0; i < 5; i++ {
    		num := <-c
    		fmt.Println("Caught <<", num)
    	}
    }
    
    func main() {
    	c := make(chan int)
    	go thrower(c)
    	go catcher(c)
    	time.Sleep(100 * time.Millisecond)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    9.3.3 有缓冲通道

    channel_buffered.go

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func thrower(c chan int) {
    	for i := 0; i < 5; i++ {
    		c <- i
    		fmt.Println("Threw  >>", i)
    	}
    }
    
    func catcher(c chan int) {
    	for i := 0; i < 5; i++ {
    		num := <-c
    		fmt.Println("Caught <<", num)
    	}
    }
    
    func main() {
    	c := make(chan int, 3)
    	go thrower(c)
    	go catcher(c)
    	time.Sleep(100 * time.Millisecond)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    9.3.4 从多个通道中选择

    channel_select.go

    package main
    
    import (
    	"fmt"
    	//"time"
    )
    
    func callerA(c chan string) {
    	c <- "Hello World!"
    	close(c)
    }
    
    func callerB(c chan string) {
    	c <- "Hola Mundo!"
    	close(c)
    }
    
    func main() {
    	a, b := make(chan string), make(chan string)
    	go callerA(a)
    	go callerB(b)
    	var msg string
    	openA, openB := true, true
    	for openA || openB {
    		select {
    		case msg, openA = <-a:
    			if openA {
    				fmt.Printf("%s from A\n", msg)
    			}			
    		case msg, openB = <-b:
    			if openB {
    				fmt.Printf("%s from B\n", msg)
    			}			
    		}
    	}
    }
    /*
    func main() {
    	a, b := make(chan string), make(chan string)
    	go callerA(a)
    	go callerB(b)
    	msg1, msg2 := "A", "B"
    	for {
    		time.Sleep(1 * time.Microsecond)
    
    		select {
    		case msg1 = <-a:
    			fmt.Printf("%s from A\n", msg1)
    		case msg2 = <-b:
    			fmt.Printf("%s from B\n", msg2)
    		default:
    			fmt.Println("Default")
    		}
    		if msg1 == "" && msg2 == "" {
    			break
    		}
    
    	}
    }
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    9.3.5 通过通道共享数据

    多goroutine中,map必须加锁。

    channel_shared.go

    package main
    
    import (
    	"fmt"
    	"time"
    	"runtime"
    )
    
    type Store struct {
    	hash map[string]string
    	in chan [2]string
    }
    
    var DB Store
    
    func StoreInit() {
    	DB = Store{
    		hash: make(map[string]string),
    		in: make(chan [2]string),
    	}
    	go func() {
    		for {
    			a := <-DB.in
    			DB.hash[a[0]] = a[1]
    		}
    	}()
    }
    
    func (store *Store) Get(key string) (value string, err error) {
    	value = store.hash[key]
    	return
    }
    
    func (store *Store) Add(key string, value string) (err error) {
    	a := [2]string{key, value}
    	store.in <- a
    	return
    }
    
    func (store *Store) Set(key string, value string) (err error) {
    	return
    }
    
    func (store *Store) Del(key string) (err error) {
    	return
    }
    
    func (store *Store) Pop(key string) (value string, err error) {
    	return
    }
    
    func main() {
    	runtime.GOMAXPROCS(4)
    	StoreInit()
    	for i := 0; i < 10; i++ {
    		go DB.Add("a", "A")
    		go DB.Add("a", "B")
    		go DB.Add("a", "C")
    	
    		time.Sleep(1 * time.Microsecond)
    	
    		s, _ := DB.Get("a")
    		fmt.Printf("%s ", s)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    9.4 Web应用中使用并发

    9.4.1 创建马赛克图片

    • mosaic_original.go
    package main
    
    import (
    	"fmt"
    	"image"
    	"image/color"
    	"io/ioutil"
    	"math"
    	"os"
    )
    
    // 平方
    func sq(n float64) float64 {
    	return n * n
    }
    
    // 欧几里得距离
    func distance(p1 [3]float64, p2 [3]float64) float64 {
    	return math.Sqrt(sq(p2[0]-p1[0]) + sq(p2[1]-p1[1]) + sq(p2[2]-p1[2]))
    }
    
    // 最匹配图像文件
    func nearest(target [3]float64, db *map[string][3]float64) string {
    	var filename string
    	smallest := 1000000.0
    	for k, v := range *db {
    		if dist := distance(target, v); dist < smallest {
    			filename, smallest = k, dist
    		}
    	}
    	delete(*db, filename)
    	return filename
    }
    
    // 计算图像平均颜色
    func averageColor(img image.Image) [3]float64 {
    	bounds := img.Bounds()
    	width := bounds.Dx()
    	height := bounds.Dy()
    	totalPixels := float64(width * height)
    
    	r, g, b := 0.0, 0.0, 0.0
    	for y := 0; y < height; y++ {
    		for x := 0; x < width; x++ {
    			r1, g1, b1, _ := img.At(x, y).RGBA()
    			r, g, b = r+float64(r1), g+float64(g1), b+float64(b1)
    		}
    	}
    	return [3]float64{r / totalPixels, g / totalPixels, b / totalPixels}
    }
    
    // 缩放图像到指定大小
    func resize(in image.Image, newWidth int, newHeight int) image.Image {
    	bounds := in.Bounds()
    	width := bounds.Dx()
    	height := bounds.Dy()
    	newBounds := image.Rect(0, 0, newWidth, newHeight)
    	out := image.NewNRGBA64(newBounds)
    
    	for j := 0; j < newHeight; j++ {
    		y := j * (height - 1) / (newHeight - 1)
    		for i := 0; i < newWidth; i++ {
    			x := i * (width - 1) / (newWidth - 1)
    			r, g, b, a := in.At(x, y).RGBA()
    			out.SetNRGBA64(i, j, color.NRGBA64{uint16(r), uint16(g), uint16(b), uint16(a)})
    		}
    	}
    	return out.SubImage(newBounds)
    }
    
    // 创建瓷砖图像数据库
    func tilesDB() map[string][3]float64 {
    	fmt.Println("Start populating tiles db ...")
    	db := make(map[string][3]float64)
    	files, _ := ioutil.ReadDir("tiles")
    	for _, f := range files {
    		name := "tiles/" + f.Name()
    		file, err := os.Open(name)
    		if err == nil {
    			img, _, err := image.Decode(file)
    			if err == nil {
    				db[name] = averageColor(img)
    			} else {
    				fmt.Println("error in populating tiles db:", err, name)
    			}
    		} else {
    			fmt.Println("cannot open file", name, "when populating tiles db:", err)
    		}
    		file.Close()
    	}
    	fmt.Println("Finished populating tiles db.")
    	return db
    }
    
    // 克隆瓷砖图像数据库
    var TILESDB map[string][3]float64
    
    func cloneTilesDB() map[string][3]float64 {
    	db := make(map[string][3]float64)
    	for k, v := range TILESDB {
    		db[k] = v
    	}
    	return db
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104

    9.4.2 马赛克图片Web应用

    main.go

    package main
    
    import (
    	"bytes"
    	"encoding/base64"
    	"fmt"
    	"html/template"
    	"image"
    	"image/draw"
    	"image/jpeg"
    	"net/http"
    	"os"
    	"strconv"
    	"time"
    	// "runtime"
    )
    
    func main() {
    	// runtime.GOMAXPROCS(runtime.NumCPU())
    	mux := http.NewServeMux()
    	files := http.FileServer(http.Dir("public"))
    	mux.Handle("/static/", http.StripPrefix("/static/", files))
    	mux.HandleFunc("/", upload)
    	mux.HandleFunc("/mosaic", mosaic)
    	server := &http.Server{
    		Addr:    "127.0.0.1:8080",
    		Handler: mux,
    	}
    	// building up the source tile database
    	TILESDB = tilesDB()
    	fmt.Println("Mosaic server started.")
    	server.ListenAndServe()
    }
    
    func upload(w http.ResponseWriter, r *http.Request) {
    	t, _ := template.ParseFiles("upload.html")
    	t.Execute(w, nil)
    }
    
    func mosaic(w http.ResponseWriter, r *http.Request) {
    	t0 := time.Now()
    
    	// get the content from the POSTed form
    	r.ParseMultipartForm(10485760) // max body in memory is 10MB
    
    	file, _, _ := r.FormFile("image")
    	defer file.Close()
    
    	tileSize, _ := strconv.Atoi(r.FormValue("tile_size"))
    
    	// decode and get original image
    	original, _, _ := image.Decode(file)
    	bounds := original.Bounds()
    	width := bounds.Dx()
    	height := bounds.Dy()
    
    	// create a new image for the mosaic
    	newimage := image.NewNRGBA(image.Rect(0, 0, width, height))
    
    	// build up the tiles database
    	db := cloneTilesDB()
    
    	// source point for each tile, which starts with 0, 0 of each tile
    	sp := image.Point{0, 0}
    	for y := 0; y < height; y += tileSize {
    		//for y := 0; y+tileSize < height; y += tileSize {
    		for x := 0; x < width; x += tileSize {
    			//for x := 0; x+tileSize < width; x += tileSize {
    			// use the top left most pixel as the average color
    			r, g, b, _ := original.At(x, y).RGBA()
    			color := [3]float64{float64(r), float64(g), float64(b)}
    			// get the closest tile from the tiles DB
    			nearest := nearest(color, &db)
    			file, err := os.Open(nearest)
    			if err == nil {
    				img, _, err := image.Decode(file)
    				if err == nil {
    					// resize the tile to the correct size
    					t := resize(img, tileSize, tileSize)
    					tileBounds := image.Rect(x, y, x+tileSize, y+tileSize)
    					// draw the tile into the mosaic
    					draw.Draw(newimage, tileBounds, t, sp, draw.Src)
    				} else {
    					fmt.Println("error:", err, nearest)
    				}
    			} else {
    				fmt.Println("error:", nearest)
    			}
    			file.Close()
    		}
    	}
    
    	//保存图像
    	//outFile, _ := os.Create("gopher3.png")
    	//defer outFile.Close()
    	//png.Encode(outFile, img)
    
    	buf1 := new(bytes.Buffer)
    	jpeg.Encode(buf1, original, nil)
    	originalStr := base64.StdEncoding.EncodeToString(buf1.Bytes())
    
    	buf2 := new(bytes.Buffer)
    	jpeg.Encode(buf2, newimage, nil)
    	mosaic := base64.StdEncoding.EncodeToString(buf2.Bytes())
    
    	t1 := time.Now()
    	images := map[string]string{
    		"original": originalStr,
    		"mosaic":   mosaic,
    		"duration": fmt.Sprintf("%v ", t1.Sub(t0)),
    	}
    	t, _ := template.ParseFiles("results.html")
    	t.Execute(w, images)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114

    9.4.3 并发版马赛克图片生成Web应用

    mosaic_concurrent.go

    package main
    
    import (
    	"fmt"
    	"image"
    	"image/color"
    	"io/ioutil"
    	"math"
    	"os"
    	"path/filepath"
    	"sync"
    )
    
    // 平方
    func sq(n float64) float64 {
    	return n * n
    }
    
    // 欧几里得距离
    func distance(p1 [3]float64, p2 [3]float64) float64 {
    	return math.Sqrt(sq(p2[0]-p1[0]) + sq(p2[1]-p1[1]) + sq(p2[2]-p1[2]))
    }
    
    // 计算图像平均颜色
    func averageColor(img image.Image) [3]float64 {
    	bounds := img.Bounds()
    	width := bounds.Dx()
    	height := bounds.Dy()
    	totalPixels := float64(width * height)
    
    	r, g, b := 0.0, 0.0, 0.0
    	for y := 0; y < height; y++ {
    		for x := 0; x < width; x++ {
    			r1, g1, b1, _ := img.At(x, y).RGBA()
    			r, g, b = r+float64(r1), g+float64(g1), b+float64(b1)
    		}
    	}
    	return [3]float64{r / totalPixels, g / totalPixels, b / totalPixels}
    }
    
    // 缩放图像到指定大小
    func scale(in image.Image, newWidth int, newHeight int) image.Image {
    	bounds := in.Bounds()
    	width := bounds.Dx()
    	height := bounds.Dy()
    	newBounds := image.Rect(0, 0, newWidth, newHeight)
    	out := image.NewNRGBA64(newBounds)
    
    	for j := 0; j < newHeight; j++ {
    		y := j * (height - 1) / (newHeight - 1)
    		for i := 0; i < newWidth; i++ {
    			x := i * (width - 1) / (newWidth - 1)
    			r, g, b, a := in.At(x, y).RGBA()
    			out.SetNRGBA64(i, j, color.NRGBA64{uint16(r), uint16(g), uint16(b), uint16(a)})
    		}
    	}
    	return out.SubImage(newBounds)
    }
    
    type DB struct {
    	mutex *sync.Mutex
    	store map[string][3]float64
    }
    
    // 最匹配图像文件
    func (db *DB) nearest(target [3]float64) string {
    	db.mutex.Lock()
    	defer db.mutex.Unlock()
    
    	var filename string
    	smallest := 1000000.0
    	for k, v := range db.store {
    		dist := distance(target, v)
    		if dist < smallest {
    			filename, smallest = k, dist
    		}
    	}
    	delete(db.store, filename)
    	return filename
    }
    
    var TILESDB map[string][3]float64
    
    func cloneTilesDB() DB {
    	db := make(map[string][3]float64)
    	for k, v := range TILESDB {
    		db[k] = v
    	}
    	tiles := DB{
    		store: db,
    		mutex: &sync.Mutex{},
    	}
    	return tiles
    }
    
    //并发使用map,必须加锁
    var lock sync.Mutex
    
    func oneDB(db *map[string][3]float64, name string) {
    	file, err := os.Open(name)
    	if err == nil {
    		img, _, err := image.Decode(file)
    		if err == nil {
    			lock.Lock()
    			(*db)[name] = averageColor(img)
    			lock.Unlock()
    		} else {
    			fmt.Println("error in populating tiles db:", err, name)
    		}
    	} else {
    		fmt.Println("cannot open file", name, "when populating tiles db:", err)
    	}
    	file.Close()
    }
    
    // populate a tiles database in memory
    func tilesDB() map[string][3]float64 {
    	fmt.Println("Start populating tiles db ...")
    	db := make(map[string][3]float64)
    
    	//等待所有goroutine执行结束
    	var wg sync.WaitGroup
    
    	//控制最多同时运行的goroutine数量
    	tokens := make(chan struct{}, 20)
    
    	files, _ := ioutil.ReadDir("tiles")
    	for _, f := range files {
    		name := filepath.Join("tiles", f.Name())
    
    		wg.Add(1)
    		tokens <- struct{}{}
    		go func(name string) {
    			defer wg.Done()
    
    			oneDB(&db, name)
    
    			<-tokens
    		}(name)
    
    	}
    
    	wg.Wait()
    	fmt.Println("Finished populating tiles db.")
    	return db
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146

    main.go

    package main
    
    import (
    	"bytes"
    	"encoding/base64"
    	"fmt"
    	"html/template"
    	"image"
    	"image/draw"
    	"image/jpeg"
    	"net/http"
    	"os"
    	"strconv"
    	"sync"
    	"time"
    )
    
    func main() {
    	fmt.Println("Starting mosaic server ...")
    	mux := http.NewServeMux()
    	files := http.FileServer(http.Dir("public"))
    	mux.Handle("/static/", http.StripPrefix("/static/", files))
    
    	mux.HandleFunc("/", upload)
    	mux.HandleFunc("/mosaic", mosaic)
    
    	server := &http.Server{
    		Addr:    "127.0.0.1:8080",
    		Handler: mux,
    	}
    	TILESDB = tilesDB()
    
    	fmt.Println("Mosaic server started.")
    	server.ListenAndServe()
    }
    
    func upload(w http.ResponseWriter, r *http.Request) {
    	t, _ := template.ParseFiles("upload.html")
    	t.Execute(w, nil)
    }
    
    func oneTile(nearest string, tileSize int, x int, y int, newimage *image.NRGBA) {
    	sp := image.Point{0, 0}
    	file, err := os.Open(nearest)
    	if err == nil {
    		img, _, err := image.Decode(file)
    		if err == nil {
    			// resize the tile to the correct size
    			t := scale(img, tileSize, tileSize)
    			tileBounds := image.Rect(x, y, x+tileSize, y+tileSize)
    			// draw the tile into the mosaic
    			draw.Draw(newimage, tileBounds, t, sp, draw.Src)
    		} else {
    			fmt.Println("error:", err, nearest)
    		}
    	} else {
    		fmt.Println("error:", nearest)
    	}
    	file.Close()
    }
    
    //  Handler function for fan-out and fan-in
    func mosaic(w http.ResponseWriter, r *http.Request) {
    	t0 := time.Now()
    	// get the content from the POSTed form
    	r.ParseMultipartForm(10485760) // max body in memory is 10MB
    	file, _, _ := r.FormFile("image")
    	defer file.Close()
    	tileSize, _ := strconv.Atoi(r.FormValue("tile_size"))
    
    	//   // decode and get original image
    	original, _, _ := image.Decode(file)
    	bounds := original.Bounds()
    	width := bounds.Dx()
    	height := bounds.Dy()
    	newimage := image.NewNRGBA(image.Rect(0, 0, width, height))
    
    	db := cloneTilesDB()
    
    	//等待所有goroutine执行结束
    	var wg sync.WaitGroup
    
    	//控制最多同时运行的goroutine数量
    	tokens := make(chan struct{}, 20)
    
    	//sp := image.Point{0, 0}
    	for y := 0; y < height; y += tileSize {
    		for x := 0; x < width; x += tileSize {
    			// use the top left most pixel as the average color
    			r, g, b, _ := original.At(x, y).RGBA()
    			color := [3]float64{float64(r), float64(g), float64(b)}
    			// get the closest tile from the tiles DB
    			nearest := db.nearest(color)
    
    			wg.Add(1)
    			tokens <- struct{}{}
    			go func(name string, x int, y int) {
    				defer wg.Done()
    
    				oneTile(nearest, tileSize, x, y, newimage)
    
    				<-tokens
    			}(nearest, x, y)
    		}
    	}
    	wg.Wait()
    
    	buf1 := new(bytes.Buffer)
    	jpeg.Encode(buf1, original, nil)
    	originalStr := base64.StdEncoding.EncodeToString(buf1.Bytes())
    
    	buf2 := new(bytes.Buffer)
    	jpeg.Encode(buf2, newimage, nil)
    	mosaic := base64.StdEncoding.EncodeToString(buf2.Bytes())
    
    	t1 := time.Now()
    	images := map[string]string{
    		"original": originalStr,
    		"mosaic":   mosaic,
    		"duration": fmt.Sprintf("%v ", t1.Sub(t0)),
    	}
    
    	t, _ := template.ParseFiles("results.html")
    	t.Execute(w, images)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125

    main.go.bak

    package main
    
    import (
    	"bytes"
    	"encoding/base64"
    	"fmt"
    	"html/template"
    	"image"
    	"image/draw"
    	"image/jpeg"
    	"net/http"
    	"os"
    	"strconv"
    	"sync"
    	"time"
    )
    
    func main() {
    	fmt.Println("Starting mosaic server ...")
    	mux := http.NewServeMux()
    	files := http.FileServer(http.Dir("public"))
    	mux.Handle("/static/", http.StripPrefix("/static/", files))
    
    	mux.HandleFunc("/", upload)
    	mux.HandleFunc("/mosaic", mosaic)
    
    	server := &http.Server{
    		Addr:    "127.0.0.1:8080",
    		Handler: mux,
    	}
    	TILESDB = tilesDB()
    
    	fmt.Println("Mosaic server started.")
    	server.ListenAndServe()
    }
    
    func upload(w http.ResponseWriter, r *http.Request) {
    	t, _ := template.ParseFiles("upload.html")
    	t.Execute(w, nil)
    }
    
    // cut out the image and return individual channels with image.Image
    // no encoding of JPEG
    func cut(original image.Image, db *DB, tileSize, x1, y1, x2, y2 int) <-chan image.Image {
    	c := make(chan image.Image)
    	sp := image.Point{0, 0}
    	go func() {
    		newimage := image.NewNRGBA(image.Rect(x1, y1, x2, y2))
    		for y := y1; y < y2; y = y + tileSize {
    			for x := x1; x < x2; x = x + tileSize {
    				r, g, b, _ := original.At(x, y).RGBA()
    				color := [3]float64{float64(r), float64(g), float64(b)}
    				nearest := db.nearest(color)
    				file, err := os.Open(nearest)
    				if err == nil {
    					img, _, err := image.Decode(file)
    					if err == nil {
    						tile := scale(img, tileSize, tileSize)
    						//tile := t.SubImage(t.Bounds())
    						tileBounds := image.Rect(x, y, x+tileSize, y+tileSize)
    						draw.Draw(newimage, tileBounds, tile, sp, draw.Src)
    					} else {
    						fmt.Println("error in decoding nearest", err, nearest)
    					}
    				} else {
    					fmt.Println("error opening file when creating mosaic:", nearest)
    				}
    				file.Close()
    			}
    		}
    		c <- newimage.SubImage(newimage.Rect)
    	}()
    
    	return c
    }
    
    // combine the images and return the encoding string
    func combine(r image.Rectangle, c1, c2, c3, c4 <-chan image.Image) <-chan string {
    	c := make(chan string)
    	// start a goroutine
    	go func() {
    		var wg sync.WaitGroup
    		newimage := image.NewNRGBA(r)
    		copy := func(dst draw.Image, r image.Rectangle, src image.Image, sp image.Point) {
    			defer wg.Done()
    			draw.Draw(dst, r, src, sp, draw.Src)
    		}
    		wg.Add(4)
    		var s1, s2, s3, s4 image.Image
    		var ok1, ok2, ok3, ok4 bool
    		for {
    			select {
    			case s1, ok1 = <-c1:
    				go copy(newimage, s1.Bounds(), s1, image.Point{r.Min.X, r.Min.Y})
    			case s2, ok2 = <-c2:
    				go copy(newimage, s2.Bounds(), s2, image.Point{r.Max.X / 2, r.Min.Y})
    			case s3, ok3 = <-c3:
    				go copy(newimage, s3.Bounds(), s3, image.Point{r.Min.X, r.Max.Y / 2})
    			case s4, ok4 = <-c4:
    				go copy(newimage, s4.Bounds(), s4, image.Point{r.Max.X / 2, r.Max.Y / 2})
    			}
    			if ok1 && ok2 && ok3 && ok4 {
    				break
    			}
    		}
    		// wait till all copy goroutines are complete
    		wg.Wait()
    
    		buf2 := new(bytes.Buffer)
    		jpeg.Encode(buf2, newimage, nil)
    		c <- base64.StdEncoding.EncodeToString(buf2.Bytes())
    	}()
    	return c
    }
    
    //  Handler function for fan-out and fan-in
    func mosaic(w http.ResponseWriter, r *http.Request) {
    	t0 := time.Now()
    	// get the content from the POSTed form
    	r.ParseMultipartForm(10485760) // max body in memory is 10MB
    	file, _, _ := r.FormFile("image")
    	defer file.Close()
    	tileSize, _ := strconv.Atoi(r.FormValue("tile_size"))
    
    	//   // decode and get original image
    	original, _, _ := image.Decode(file)
    	bounds := original.Bounds()
    	db := cloneTilesDB()
    	width := bounds.Dx()
    	height := bounds.Dy()
    	// fan-out
    	c1 := cut(original, &db, tileSize, 0, 0, width/2, height/2)
    	c2 := cut(original, &db, tileSize, width/2, 0, width, height/2)
    	c3 := cut(original, &db, tileSize, 0, height/2, width/2, height)
    	c4 := cut(original, &db, tileSize, width/2, height/2, width, height)
    
    	// fan-in
    	c := combine(bounds, c1, c2, c3, c4)
    
    	buf1 := new(bytes.Buffer)
    	jpeg.Encode(buf1, original, nil)
    	originalStr := base64.StdEncoding.EncodeToString(buf1.Bytes())
    
    	t1 := time.Now()
    	images := map[string]string{
    		"original": originalStr,
    		"mosaic":   <-c,
    		"duration": fmt.Sprintf("%v ", t1.Sub(t0)),
    	}
    
    	t, _ := template.ParseFiles("results.html")
    	t.Execute(w, images)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
  • 相关阅读:
    E 排队(排列组合)[牛客小*白月赛61]
    细说从0开始挖掘CMS
    echart 两个柱状图并列,共用一个y轴的label
    python LeetCode 刷题记录 14
    如何使用jenkins、ant、selenium、testng搭建自动化测试框架
    使用python中的pandas对csv文件进行拆分
    IP地址追踪具体位置:技术和隐私考虑
    【SpringBoot】秒杀业务:redis+拦截器+自定义注解+验证码简单实现限流
    递归回溯实战+思想
    前端demo: 实现对图片进行上传前的压缩功能
  • 原文地址:https://blog.csdn.net/oqqyx1234567/article/details/126708377