Skip to content

Commit

Permalink
Future/data pipeline (#14)
Browse files Browse the repository at this point in the history
* reimplemented DataStream to DataPipeline
  • Loading branch information
Mararok authored Sep 8, 2016
1 parent cad06a6 commit 60900bc
Show file tree
Hide file tree
Showing 14 changed files with 203 additions and 125 deletions.
30 changes: 30 additions & 0 deletions src/SAREhub/Commons/DataPipeline/DataSink.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

namespace SAREhub\Commons\DataPipeline;

/**
* Represents place where data can be write like file, database or socket
*/
abstract class DataSink implements Filter {

public function pipe(Filter $output) {
// noop for sink
}

public function unpipe(Filter $output = null) {
// noop for sink
}

/**
* @param $data
*/
public abstract function write($data);

public function onPipe(DataSource $source) {

}

public function onUnpipe(DataSource $source) {

}
}
22 changes: 22 additions & 0 deletions src/SAREhub/Commons/DataPipeline/DataSource.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php

namespace SAREhub\Commons\DataPipeline;

/**
* Represents place where data come from like file, database or socket
*/
interface DataSource {

/**
* @param Filter $output
* @return Filter
*/
public function pipe(Filter $output);

/**
* @param Filter|null $output
* @return Filter
*/
public function unpipe(Filter $output = null);

}
22 changes: 22 additions & 0 deletions src/SAREhub/Commons/DataPipeline/Filter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php

namespace SAREhub\Commons\DataPipeline;

/**
* Transfrom, process, filter data and can output it to next Filter.
*/
interface Filter extends DataSource {

/**
* @param $data
*/
public function write($data);

/**
* @param DataSource $source
* @return
*/
public function onPipe(DataSource $source);

public function onUnpipe(DataSource $source);
}
11 changes: 11 additions & 0 deletions src/SAREhub/Commons/DataPipeline/Filters.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

namespace SAREhub\Commons\DataPipeline;


class Filters {

public static function transform(callable $transfromer) {
return new Transformer($transfromer);
}
}
14 changes: 14 additions & 0 deletions src/SAREhub/Commons/DataPipeline/NullSink.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

namespace SAREhub\Commons\DataPipeline;

/**
* Represents linux dev/null All written data will be lost.
*/
class NullSink extends DataSink {

public function write($data) {

}

}
39 changes: 39 additions & 0 deletions src/SAREhub/Commons/DataPipeline/Transformer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?php

namespace SAREhub\Commons\DataPipeline;

class Transformer implements Filter {

protected $transformer;
protected $output;

public function __construct(callable $transfromer) {
$this->transformer = $transfromer;
$this->output = new NullSink();
}

public function write($data) {
$transfromer = $this->transformer;
$this->output->write($transfromer($data));
}

public function pipe(Filter $output) {
$this->unpipe();
$this->output = $output;
$this->output->onPipe($this);
return $this->output;
}

public function unpipe(Filter $output = null) {
$this->output->onUnpipe($this);
$this->output = new NullSink();
}

public function onPipe(DataSource $source) {

}

public function onUnpipe(DataSource $source) {

}
}
19 changes: 0 additions & 19 deletions src/SAREhub/Commons/DataStream/DataStreamSink.php

This file was deleted.

15 changes: 0 additions & 15 deletions src/SAREhub/Commons/DataStream/DataStreamSource.php

This file was deleted.

13 changes: 0 additions & 13 deletions src/SAREhub/Commons/DataStream/NullDataStreamSink.php

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@

namespace SAREhub\Commons\Protobuf;

use Protobuf\Message;
use Protobuf\Stream;
use SAREhub\Commons\DataStream\DataStreamSink;
use SAREhub\Commons\DataStream\DataStreamSource;
use SAREhub\Commons\DataPipeline\DataSink;

/**
* Class for writing Protocol Buffers messages to file.
*/
class FileProtbufMessageStreamSink implements DataStreamSink {
class FileProtbufMessageSink extends DataSink {

/** @var \SplFileObject */
protected $file;
Expand All @@ -37,9 +35,4 @@ public function write($data) {
);
}

public function onPipe(DataStreamSource $source) {
}

public function onUnpipe(DataStreamSource $source) {
}
}
46 changes: 46 additions & 0 deletions src/SAREhub/Commons/Protobuf/FileProtobufMessageSource.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?php

namespace SAREhub\Commons\Protobuf;

use Protobuf\Stream;
use SAREhub\Commons\DataPipeline\DataSource;
use SAREhub\Commons\DataPipeline\Filter;
use SAREhub\Commons\DataPipeline\NullSink;

class FileProtobufMessageSource implements DataSource {

/** @var \SplFileObject */
protected $file;

protected $sizeInfoBytes;

/** @var string */
protected $sizeInfoPackFormat;

/** @var Filter */
protected $output;

public function __construct(\SplFileObject $file, ProtobufMessagesFileHeader $header) {
$this->file = $file;
$this->sizeInfoPackFormat = $header->getMessageSizeInfoPackFormat();
$this->sizeInfoBytes = $header->getMessageSizeInfoBytes();
$this->output = new NullSink();
}

public function flow() {
$size = unpack($this->sizeInfoPackFormat, $this->file->fread($this->sizeInfoBytes))[1];
$data = $this->file->fread($size);
$this->output->write(Stream::fromString($data, $size));
}

public function pipe(Filter $output) {
$this->unpipe();
$this->output = $output;
$this->output->onPipe($this);
}

public function unpipe(Filter $output = null) {
$this->output->onUnpipe($this);
$this->output = new NullSink();
}
}
50 changes: 0 additions & 50 deletions src/SAREhub/Commons/Protobuf/FileProtobufMessageStreamSource.php

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@
use PHPUnit\Framework\TestCase;
use Protobuf\Stream;

class FileProtbufMessageStreamSinkTest extends TestCase {
class FileProtbufMessageSinkTest extends TestCase {

/** @var vfsStreamDirectory */
private $root;
private $filename = 'test.fpe';
/** @var FileProtbufMessageStreamSink */
/** @var FileProtbufMessageSink */
private $sink;

protected function setUp() {
$this->root = vfsStream::setup('tmp');
$path = $this->root->url().'/'.$this->filename;
$file = new \SplFileObject($path, 'a');
$this->sink = new FileProtbufMessageStreamSink($file, 'N');
$this->sink = new FileProtbufMessageSink($file, 'N');
}

public function testWrite() {
Expand Down
Loading

0 comments on commit 60900bc

Please sign in to comment.