Goroutines
Goroutines are lightweight software threads managed by the Go runtime and can be used to achieve concurrency.
Concurrency is the execution of multiple instruction sequences at the same time. Traditionally, the Operating System scheduler will rapidly context switch between threads so all make progress. In Go, this is previously managed by the runtime with a cooperative scheduler and has since been replaced with a more preemptive scheduler in Go versions >= 1.14.
Synchronous Execution
package main
import (
"fmt"
"io"
"os"
)
func check(e error) {
if e != nil {
panic(e)
}
}
func copyToDestination(src, dest string) (string, error) {
source, err := os.Open(src)
check(err)
defer source.Close()
destination, err := os.Create(dest)
check(err)
defer destination.Close()
_, err = io.Copy(destination, source)
check(err)
return dest, err
}
func main() {
files, err := os.ReadDir("./folderA")
check(err)
for _, file := range files {
src := "./folderA/" + file.Name()
dest := "./folderB/" + file.Name()
_, err = copyToDestination(src, dest)
if err != nil {
fmt.Printf("%s -> %s ❌\n", src, dest)
} else {
fmt.Printf("%s -> %s ✅\n", src, dest)
}
}
}
Files will be copied one at a time.
Concurrent Execution
func main() {
files, err := os.ReadDir("./folderA")
check(err)
for _, file := range files {
go func(file os.DirEntry) {
src := "./folderA/" + file.Name()
dest := "./folderB/" + file.Name()
_, err = copyToDestination(src, dest)
if err != nil {
fmt.Printf("%s -> %s ❌\n", src, dest)
} else {
fmt.Printf("%s -> %s ✅\n", src, dest)
}
}(file)
}
}
In Go, you can simply wrap the function with the go keyword to create a Goroutine
In Go versions < 1.22, the variable file
has to be passed as an argument to the Goroutine because loop variables are created once and are updated every iteration. Otherwise, all subsequent Goroutines will see the same file name. However, starting in Go 1.22 loop variables are created per iteration.
Wait Groups
Wait groups are a mechanism for the Go runtime to wait for Goroutines to finish running. They are simply counters and can be incremented with wg.Add()
, decremented with wg.Done()
, and waited using wg.Wait()
.
func main() {
files, err := os.ReadDir("./folderA")
check(err)
var wg sync.WaitGroup
for _, file := range files {
wg.Add(1)
go func(file os.DirEntry) {
src := "./folderA/" + file.Name()
dest := "./folderB/" + file.Name()
_, err = copyToDestination(src, dest)
if err != nil {
fmt.Printf("%s -> %s ❌\n", src, dest)
} else {
fmt.Printf("%s -> %s ✅\n", src, dest)
}
defer wg.Done()
}(file)
}
wg.Wait()
}
Availability
The source code can be found on Github.
Experimental Evaluations
The test files are created with dd if=/dev/zero of=upload_test bs=1M count=<size>
and measured with the time(1)
command.
Size | Single | Concurrent |
---|---|---|
50G (10 * 5G) | 86% cpu 22.366 total | 615% cpu 9.431 total |
10G (10 * 1G) | 92% cpu 3.649 total | 732% cpu 1.270 total |
1G (10 * 100M) | 89% cpu 0.445 total | 420% cpu 0.215 total |
100M (10 * 10M) | 81% cpu 0.206 total | 133% cpu 0.152 total |
Conclusion
In the concurrent setup, every 100% in CPU utilization equates to about 1 core. For smaller file transfers, there is not much benefits at the cost of additional CPU utilization. However, for larger file transfers the run time was reduced by about 58%.