Skip to content

Commit

Permalink
Reformatted code for column-constrained book & corrected errors.
Browse files Browse the repository at this point in the history
- The book has a constrained width, so in many cases it was necessary
  to update code to conform to these boundaries.
- The examples for mutexes were decrementing instead of incrementing
  their counters.
  • Loading branch information
kat-co committed Jul 8, 2017
1 parent 079a907 commit 4e55fd7
Show file tree
Hide file tree
Showing 18 changed files with 141 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ func runJob(id string) error {
id,
)} // <1>
} else if isExecutable == false {
return wrapError(nil, "cannot run job %q: requisite binaries are not executable", id)
return wrapError(
nil,
"cannot run job %q: requisite binaries are not executable",
id,
)
}

return exec.Command(jobBinPath, "--id="+id).Run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,14 @@ func main() {
pulseInterval time.Duration,
) (heartbeat <-chan interface{}) // <1>

newSteward := func(timeout time.Duration, startGoroutine startGoroutineFn) startGoroutineFn { // <2>
return func(done <-chan interface{}, pulseInterval time.Duration) <-chan interface{} {
newSteward := func(
timeout time.Duration,
startGoroutine startGoroutineFn,
) startGoroutineFn { // <2>
return func(
done <-chan interface{},
pulseInterval time.Duration,
) <-chan interface{} {
heartbeat := make(chan interface{})
go func() {
defer close(heartbeat)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,14 @@ func main() {
pulseInterval time.Duration,
) (heartbeat <-chan interface{}) // <1>

newSteward := func(timeout time.Duration, startGoroutine startGoroutineFn) startGoroutineFn { // <2>
return func(done <-chan interface{}, pulseInterval time.Duration) <-chan interface{} {
newSteward := func(
timeout time.Duration,
startGoroutine startGoroutineFn,
) startGoroutineFn { // <2>
return func(
done <-chan interface{},
pulseInterval time.Duration,
) <-chan interface{} {
heartbeat := make(chan interface{})
go func() {
defer close(heartbeat)
Expand Down Expand Up @@ -86,7 +92,11 @@ func main() {
return heartbeat
}
}
take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
take := func(
done <-chan interface{},
valueStream <-chan interface{},
num int,
) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
Expand Down
12 changes: 10 additions & 2 deletions concurrency-at-scale/heartbeats/bad_concurrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import (
"time"
)

func DoWork(done <-chan interface{}, nums ...int) (<-chan interface{}, <-chan int) {
func DoWork(
done <-chan interface{},
nums ...int,
) (<-chan interface{}, <-chan int) {
heartbeat := make(chan interface{}, 1)
intStream := make(chan int)
go func() {
Expand Down Expand Up @@ -41,7 +44,12 @@ func TestDoWork_GeneratesAllNumbers(t *testing.T) {
select {
case r := <-results:
if r != expected {
t.Errorf("index %v: expected %v, but received %v,", i, expected, r)
t.Errorf(
"index %v: expected %v, but received %v,",
i,
expected,
r,
)
}
case <-time.After(1 * time.Second): // <1>
t.Fatal("test timed out")
Expand Down
5 changes: 4 additions & 1 deletion concurrency-at-scale/heartbeats/concurrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import (
"time"
)

func DoWork(done <-chan interface{}, nums ...int) (<-chan interface{}, <-chan int) {
func DoWork(
done <-chan interface{},
nums ...int,
) (<-chan interface{}, <-chan int) {
heartbeat := make(chan interface{}, 1)
intStream := make(chan int)
go func() {
Expand Down
5 changes: 4 additions & 1 deletion concurrency-at-scale/heartbeats/dowork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import (
"time"
)

func DoWork(done <-chan interface{}, nums ...int) (<-chan interface{}, <-chan int) {
func DoWork(
done <-chan interface{},
nums ...int,
) (<-chan interface{}, <-chan int) {
heartbeat := make(chan interface{}, 1)
intStream := make(chan int)
go func() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import (
)

func main() {
checkStatus := func(done <-chan interface{}, urls ...string) <-chan *http.Response {
checkStatus := func(
done <-chan interface{},
urls ...string,
) <-chan *http.Response {
responses := make(chan *http.Response)
go func() {
defer close(responses)
Expand All @@ -29,7 +32,8 @@ func main() {
done := make(chan interface{})
defer close(done)

for response := range checkStatus(done, "https://www.google.com", "https://badhost") {
urls := []string{"https://www.google.com", "https://badhost"}
for response := range checkStatus(done, urls...) {
fmt.Printf("Response: %v\n", response.Status)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ func main() {
done := make(chan interface{})
defer close(done)

for result := range checkStatus(done, "https://www.google.com", "https://badhost") {
urls := []string{"https://www.google.com", "https://badhost"}
for result := range checkStatus(done, urls...) {
if result.Error != nil { // <5>
fmt.Printf("error: %v", result.Error)
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ func main() {
defer close(done)

errCount := 0
for result := range checkStatus(done, "a", "https://www.google.com", "b", "c", "d") {
urls := []string{"a", "https://www.google.com", "b", "c", "d"}
for result := range checkStatus(done, urls...) {
if result.Error != nil {
fmt.Printf("error: %v\n", result.Error)
errCount++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import (
)

func main() {
repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {
repeatFn := func(
done <-chan interface{},
fn func() interface{},
) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
Expand All @@ -23,7 +26,11 @@ func main() {
}()
return valueStream
}
take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
take := func(
done <-chan interface{},
valueStream <-chan interface{},
num int,
) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
Expand Down Expand Up @@ -76,7 +83,10 @@ func main() {
}()
return primeStream
}
fanIn := func(done <-chan interface{}, channels ...<-chan interface{}) <-chan interface{} { // <1>
fanIn := func(
done <-chan interface{},
channels ...<-chan interface{},
) <-chan interface{} { // <1>
var wg sync.WaitGroup // <2>
multiplexedStream := make(chan interface{})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import (
)

func main() {
repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {
repeatFn := func(
done <-chan interface{},
fn func() interface{},
) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
Expand All @@ -21,7 +24,11 @@ func main() {
}()
return valueStream
}
take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
take := func(
done <-chan interface{},
valueStream <-chan interface{},
num int,
) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ func main() {
return intStream
}

multiply := func(done <-chan interface{}, intStream <-chan int, multiplier int) <-chan int {
multiply := func(
done <-chan interface{},
intStream <-chan int,
multiplier int,
) <-chan int {
multipliedStream := make(chan int)
go func() {
defer close(multipliedStream)
Expand All @@ -35,7 +39,11 @@ func main() {
return multipliedStream
}

add := func(done <-chan interface{}, intStream <-chan int, additive int) <-chan int {
add := func(
done <-chan interface{},
intStream <-chan int,
additive int,
) <-chan int {
addedStream := make(chan int)
go func() {
defer close(addedStream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import (
)

func main() {
repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
repeat := func(
done <-chan interface{},
values ...interface{},
) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
Expand All @@ -21,7 +24,11 @@ func main() {
}()
return valueStream
}
take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
take := func(
done <-chan interface{},
valueStream <-chan interface{},
num int,
) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ import (
)

func main() {
take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
take := func(
done <-chan interface{},
valueStream <-chan interface{},
num int,
) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
Expand All @@ -19,7 +23,10 @@ func main() {
}()
return takeStream
}
repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
repeat := func(
done <-chan interface{},
values ...interface{},
) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
Expand All @@ -35,7 +42,10 @@ func main() {
}()
return valueStream
}
toString := func(done <-chan interface{}, valueStream <-chan interface{}) <-chan string {
toString := func(
done <-chan interface{},
valueStream <-chan interface{},
) <-chan string {
stringStream := make(chan string)
go func() {
defer close(stringStream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package pipelines
import ()

func BenchmarkGeneric(b *testing.B) {
repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
repeat := func(
done <-chan interface{},
values ...interface{},
) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
Expand All @@ -19,7 +22,11 @@ func BenchmarkGeneric(b *testing.B) {
}()
return valueStream
}
take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
take := func(
done <-chan interface{},
valueStream <-chan interface{},
num int,
) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
Expand All @@ -33,7 +40,10 @@ func BenchmarkGeneric(b *testing.B) {
}()
return takeStream
}
toString := func(done <-chan interface{}, valueStream <-chan interface{}) <-chan string {
toString := func(
done <-chan interface{},
valueStream <-chan interface{},
) <-chan string {
stringStream := make(chan string)
go func() {
defer close(stringStream)
Expand Down Expand Up @@ -73,7 +83,11 @@ func BenchmarkTyped(b *testing.B) {
return valueStream
}

take := func(done <-chan interface{}, valueStream <-chan string, num int) <-chan string {
take := func(
done <-chan interface{},
valueStream <-chan string,
num int,
) <-chan string {
takeStream := make(chan string)
go func() {
defer close(takeStream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import (
)

func main() {
doWork := func(done <-chan interface{}, strings <-chan string) <-chan interface{} { // <1>
doWork := func(
done <-chan interface{},
strings <-chan string,
) <-chan interface{} { // <1>
terminated := make(chan interface{})
go func() {
defer fmt.Println("doWork exited.")
Expand Down
11 changes: 9 additions & 2 deletions concurrency-patterns-in-go/queuing/buffering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ func tmpFileOrFatal() *os.File {
}

func performWrite(b *testing.B, writer io.Writer) {
repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
repeat := func(
done <-chan interface{},
values ...interface{},
) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
Expand All @@ -43,7 +46,11 @@ func performWrite(b *testing.B, writer io.Writer) {
}()
return valueStream
}
take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
take := func(
done <-chan interface{},
valueStream <-chan interface{},
num int,
) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
Expand Down
Loading

0 comments on commit 4e55fd7

Please sign in to comment.