Skip to content

Commit

Permalink
add multi:// null:// and stdout:// producers
Browse files Browse the repository at this point in the history
  • Loading branch information
thisisaaronland committed Feb 7, 2022
1 parent 6a69ece commit 0d0bc89
Show file tree
Hide file tree
Showing 4 changed files with 256 additions and 2 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,5 +145,6 @@ _TBW_

## See also

* https://www.github.com/whosonfirst/go-whosonfirst-iterate
* https://www.github.com/whosonfirst/go-reader-findingaid
* https://github.com/whosonfirst-data/whosonfirst-findingaids
* https://www.github.com/whosonfirst/go-reader-findingaid
* https://www.github.com/whosonfirst/go-whosonfirst-iterate
133 changes: 133 additions & 0 deletions producer/multi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package producer

import (
"context"
"fmt"
"github.com/sfomuseum/go-timings"
"net/url"
)

type MultiProducer struct {
Producer
producers []Producer
}

func init() {
ctx := context.Background()
RegisterProducer(ctx, "multi", NewMultiProducer)
}

func NewMultiProducer(ctx context.Context, uri string) (Producer, error) {

u, err := url.Parse(uri)

if err != nil {
return nil, fmt.Errorf("Failed to parse URI, %w", err)
}

q := u.Query()

producer_uris := q["producer"]
producers_count := len(producer_uris)

if producers_count == 0 {
return nil, fmt.Errorf("No ?producer parameters defined")
}

producers := make([]Producer, producers_count)

for idx, u := range producer_uris {

p, err := NewProducer(ctx, u)

if err != nil {
return nil, fmt.Errorf("Failed to create new producer for '%s', %w", u, err)
}

producers[idx] = p
}

p := &MultiProducer{
producers: producers,
}

return p, nil
}

func (p *MultiProducer) PopulateWithIterator(ctx context.Context, monitor timings.Monitor, iterator_uri string, iterator_sources ...string) error {

ctx, cancel := context.WithCancel(ctx)
defer cancel()

err_ch := make(chan error)
done_ch := make(chan bool)

remaining := len(p.producers)

for _, child_p := range p.producers {

go func(child_p Producer) {

err := child_p.PopulateWithIterator(ctx, monitor, iterator_uri, iterator_sources...)

if err != nil {
err_ch <- fmt.Errorf("Failed to iterator with %T, %w", child_p, err)
}

done_ch <- true

}(child_p)

}

for remaining > 0 {

select {
case <-done_ch:
remaining -= 1
case err := <-err_ch:
return err
}
}

return nil
}

func (p *MultiProducer) Close(ctx context.Context) error {

ctx, cancel := context.WithCancel(ctx)
defer cancel()

err_ch := make(chan error)
done_ch := make(chan bool)

remaining := len(p.producers)

for _, child_p := range p.producers {

go func(child_p Producer) {

err := child_p.Close(ctx)

if err != nil {
err_ch <- fmt.Errorf("Failed to close with %T, %w", child_p, err)
}

done_ch <- true

}(child_p)

}

for remaining > 0 {

select {
case <-done_ch:
remaining -= 1
case err := <-err_ch:
return err
}
}

return nil
}
28 changes: 28 additions & 0 deletions producer/null.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package producer

import (
"context"
"github.com/sfomuseum/go-timings"
)

type NullProducer struct {
Producer
}

func init() {
ctx := context.Background()
RegisterProducer(ctx, "null", NewNullProducer)
}

func NewNullProducer(ctx context.Context, uri string) (Producer, error) {
p := &NullProducer{}
return p, nil
}

func (p *NullProducer) PopulateWithIterator(ctx context.Context, monitor timings.Monitor, iterator_uri string, iterator_sources ...string) error {
return nil
}

func (p *NullProducer) Close(ctx context.Context) error {
return nil
}
92 changes: 92 additions & 0 deletions producer/stdout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package producer

import (
"context"
"fmt"
"github.com/sfomuseum/go-timings"
"github.com/whosonfirst/go-whosonfirst-findingaid/v2"
"github.com/whosonfirst/go-whosonfirst-iterate/v2/iterator"
"github.com/whosonfirst/go-whosonfirst-uri"
"io"
"os"
"sync"
)

type StdoutProducer struct {
Producer
}

func init() {
ctx := context.Background()
RegisterProducer(ctx, "stdout", NewStdoutProducer)
}

func NewStdoutProducer(ctx context.Context, uri string) (Producer, error) {

p := &StdoutProducer{}
return p, nil
}

func (p *StdoutProducer) PopulateWithIterator(ctx context.Context, monitor timings.Monitor, iterator_uri string, iterator_sources ...string) error {

mu := new(sync.RWMutex)

iter_cb := func(ctx context.Context, path string, fh io.ReadSeeker, args ...interface{}) error {

id, uri_args, err := uri.ParseURI(path)

if err != nil {
return fmt.Errorf("Failed to parse %s, %w", path, err)
}

if uri_args.IsAlternate {
return nil
}

// Get wof:repo

body, err := io.ReadAll(fh)

if err != nil {
return fmt.Errorf("Failed to read %s, %w", path, err)
}

mu.Lock()
defer mu.Unlock()

repo, _, err := findingaid.GetRepoWithBytes(ctx, body)

if err != nil {
return fmt.Errorf("Failed to retrieve repo for %s, %w", path, err)
}

repo_name := repo.Name

fmt.Fprintf(os.Stdout, "%d %s\n", id, repo_name)

if err != nil {
return fmt.Errorf("Failed to store %s, %w", path, err)
}

go monitor.Signal(ctx)
return nil
}

iter, err := iterator.NewIterator(ctx, iterator_uri, iter_cb)

if err != nil {
return fmt.Errorf("Failed to create iterator, %w", err)
}

err = iter.IterateURIs(ctx, iterator_sources...)

if err != nil {
return fmt.Errorf("Failed to iterate sources, %w", err)
}

return nil
}

func (p *StdoutProducer) Close(ctx context.Context) error {
return nil
}

0 comments on commit 0d0bc89

Please sign in to comment.