Skip to content

Commit

Permalink
splice: refactor pipe pool
Browse files Browse the repository at this point in the history
Sadly we could not use go's internal package, most of the refactor
is from linux_splice.go inside go's internal package.
Also removed unused APIs, we respect most of the APIs.
  • Loading branch information
winglq committed Nov 28, 2024
1 parent ee25178 commit 080c974
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 103 deletions.
4 changes: 2 additions & 2 deletions splice/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ func CopyFile(dstName string, srcName string, mode int) error {
}

func CopyFds(dst *os.File, src *os.File) (err error) {
p, err := splicePool.get()
p, err := Get()
if p != nil {
p.Grow(256 * 1024)
_, err := SpliceCopy(dst, src, p)
splicePool.done(p)
Done(p)
return err
} else {
_, err = io.Copy(dst, src)
Expand Down
4 changes: 1 addition & 3 deletions splice/copy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,13 @@ func TestSpliceCopy(t *testing.T) {
if maxPipeSize%4096 != 0 || maxPipeSize < 4096 {
t.Error("pipe size should be page size multiple", maxPipeSize)
}
pool := newSplicePairPool()
p, err := pool.get()
p, err := Get()
if p != nil {
p.MaxGrow()
t.Logf("Splice size %d", p.size)
SpliceCopy(dst, src, p)
dst.Close()
src.Close()
p.Close()
} else {
t.Error("Could not open splice: ", err)
}
Expand Down
4 changes: 4 additions & 0 deletions splice/pair.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
type Pair struct {
r, w int
size int

// We want to use a finalizer, so ensure that the size is
// large enough to not use the tiny allocator.
_ [12]byte
}

func (p *Pair) MaxGrow() {
Expand Down
2 changes: 1 addition & 1 deletion splice/pair_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (p *Pair) discard() {
}
}

func (p *Pair) Close() error {
func (p *Pair) close() error {
err1 := syscall.Close(p.r)
err2 := syscall.Close(p.w)
if err1 != nil {
Expand Down
123 changes: 46 additions & 77 deletions splice/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,101 +5,70 @@
package splice

import (
"fmt"
"log"
"runtime"
"sync"
"syscall"
)

var splicePool *pairPool

type pairPool struct {
sync.Mutex
unused []*Pair
usedCount int
}

func ClearSplicePool() {
splicePool.clear()
}

func Get() (*Pair, error) {
return splicePool.get()
}

func Total() int {
return splicePool.total()
}

func Used() int {
return splicePool.used()
}

// Done returns the pipe pair to pool.
func Done(p *Pair) {
splicePool.done(p)
}

// Closes and discards pipe pair.
func Drop(p *Pair) {
splicePool.drop(p)
}

func newSplicePairPool() *pairPool {
return &pairPool{}
var splicePool = sync.Pool{
New: newPoolPipe,
}

func (pp *pairPool) clear() {
pp.Lock()
for _, p := range pp.unused {
p.Close()
func newPoolPipe() interface{} {
// Discard the error which occurred during the creation of pipe buffer,
// redirecting the data transmission to the conventional way utilizing read() + write() as a fallback.
p := newPipe()
if p == nil {
return nil
}
pp.unused = pp.unused[:0]
pp.Unlock()
runtime.SetFinalizer(p, destroyPipe)
return p
}

func (pp *pairPool) used() (n int) {
pp.Lock()
n = pp.usedCount
pp.Unlock()

return n
// newPipe sets up a pipe for a splice operation.
func newPipe() *Pair {
var fds [2]int
var err error
fds[0], fds[1], err = osPipe()
if err != nil {
log.Printf("Warning: create pipe failed: %v\n", err)
return nil
}
fcntl(uintptr(fds[0]), syscall.F_SETPIPE_SZ, maxPipeSize)
return &Pair{r: fds[0], w: fds[1], size: maxPipeSize}
}

func (pp *pairPool) total() int {
pp.Lock()
n := pp.usedCount + len(pp.unused)
pp.Unlock()
return n
func destroyPipe(p *Pair) {
err := p.close()
if err != nil {
log.Printf("close pipe failed: %v\n", err)
}
}

func (pp *pairPool) drop(p *Pair) {
p.Close()
pp.Lock()
pp.usedCount--
pp.Unlock()
type pairPool struct {
sync.Mutex
unused []*Pair
usedCount int
}

func (pp *pairPool) get() (p *Pair, err error) {
pp.Lock()
defer pp.Unlock()

pp.usedCount++
l := len(pp.unused)
if l > 0 {
p := pp.unused[l-1]
pp.unused = pp.unused[:l-1]
return p, nil
func Get() (*Pair, error) {
p := splicePool.Get()
if p == nil {
return nil, fmt.Errorf("create pipe failed")
}

return newSplicePair()
return p.(*Pair), nil
}

func (pp *pairPool) done(p *Pair) {
// Done returns the pipe pair to pool.
func Done(p *Pair) {
p.discard()
pp.Lock()
pp.usedCount--
pp.unused = append(pp.unused, p)
pp.Unlock()
splicePool.Put(p)
}

func init() {
splicePool = newSplicePairPool()
// Closes and discards pipe pair.
func Drop(p *Pair) {
runtime.SetFinalizer(p, nil)
destroyPipe(p)
}
19 changes: 0 additions & 19 deletions splice/splice.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,3 @@ func init() {

const F_SETPIPE_SZ = 1031
const F_GETPIPE_SZ = 1032

func newSplicePair() (p *Pair, err error) {
p = &Pair{}
p.r, p.w, err = osPipe()
if err != nil {
return nil, err
}
var errNo syscall.Errno
p.size, errNo = fcntl(uintptr(p.r), F_GETPIPE_SZ, 0)
if errNo == syscall.EINVAL {
p.size = DefaultPipeSize
return p, nil
}
if errNo != 0 {
p.Close()
return nil, fmt.Errorf("fcntl getsize: %v", errNo)
}
return p, nil
}
2 changes: 1 addition & 1 deletion splice/splice_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ func fcntl(fd uintptr, cmd int, arg int) (val int, errno syscall.Errno) {

func osPipe() (int, int, error) {
var fds [2]int
err := syscall.Pipe2(fds[:], syscall.O_NONBLOCK)
err := syscall.Pipe2(fds[:], syscall.O_CLOEXEC|syscall.O_NONBLOCK)
return fds[0], fds[1], err
}

0 comments on commit 080c974

Please sign in to comment.