V3IO Frames ("Frames") is a multi-model open-source data-access library that provides a unified high-performance DataFrame API for working with different types of data sources (backends). The library was developed by Iguazio to simplify working with data in the Iguazio Data Science Platform ("the platform"), but it can be extended to support additional backend types.
Note: For a full API reference of the Frames platform backends, including detailed examples, see the Frames API reference in the platform documentation.
- Overview
- User Authentication
Client
Constructor- Common
Client
Method Parameters create
Methodwrite
Methodread
Methoddelete
Methodexecute
Method
The current version of Frames supports Python 3.6 and 3.7.
To use Frames, you first need to import the v3io_frames Python library. For example:
import v3io_frames as v3f
Then, you need to create and initialize an instance of the Client
class; see Client Constructor.
You can then use the client methods to perform different data operations on the supported backend types.
All Frames client methods receive a backend
parameter for setting the Frames backend type.
Frames currently supports the following backend types:
nosql
|kv
— a platform NoSQL (key/value) table. See the platform NoSQL backend API reference.
Note: The documentation uses the
"nosql"
alias to the"kv"
type, which was added in Frames v0.6.10-v0.9.13;"kv"
is still supported for backwards compatibility with earlier releases.stream
— a platform data stream [Tech Preview]. See the platform TSDB backend API reference.tsdb
— a time-series database (TSDB). See the platform streaming backend API reference.csv
— a comma-separated-value (CSV) file. This backend type is used only for testing purposes.
The Client
class features the following methods for supporting operations on a data collection, such as a NoSQL or TSDB table or a data stream:
create
— creates a new collection.delete
— deletes a collection or specific items of the collection.read
— reads data from a collection into pandas DataFrames.write
— writes data from pandas DataFrames to a collection.execute
— executes a backend-specific command on a collection. Each backend may support multiple commands.
Note: Some methods or method parameters are backend-specific, as detailed in this reference.
When creating a Frames client, you must provide valid credentials for accessing the backend data, which Frames will use to identify the identity of the user. This can be done by using any of the following alternative methods (documented in order of precedence). For more information about the user authentication for the platform backends, see the platform documentation:
-
Provide the authentication credentials in the call to the
Client
constructor — either by setting thetoken
parameter to a valid authentication token (access key) or by setting theuser
andpassword
parameters to a username and password. Note that you cannot set the token parameter concurrently with the username and password parameters. -
Provide the authentication credentials in environment variables — either by setting the
V3IO_ACCESS_KEY
variable to an authentication token or by setting theV3IO_USERNAME
andV3IO_PASSWORD
variables to a username and password.Note:
- When
V3IO_ACCESS_KEY
is defined,V3IO_USERNAME
andV3IO_PASSWORD
are ignored. - When the client constructor is called with authentication parameters (option #1), the authentication-credentials environment variables (if defined) are ignored.
- When
All Frames operations are executed via an object of the Client
class.
Client(address=""[, data_url=""], container=""[, user="", password="", token=""])
-
address — The address of the Frames service (
framesd
). Use thegrpc://
prefix for gRPC (default; recommended) or thehttp://
prefix for HTTP. When running locally on the platform, set this parameter toframesd:8081
to use the gRPC (recommended) or toframesd:8080
to use HTTP; for more information, see the platform documentation.- Type:
str
- Requirement: Required
- Type:
-
data_url — A web-API base URL for accessing the backend data. By default, the client uses the data URL that's configured for the Frames service; for the platform backends, this is typically the HTTPS URL of the web-APIs service of the parent tenant.
- Type:
str
- Requirement: Optional
- Type:
-
container — The name of the data container that contains the backend data. For example,
"bigdata"
or"users"
.- Type:
str
- Requirement: Required
- Type:
-
user — The username of a user with permissions to access the backend data. See User Authentication.
-
password — A valid password for the user configured in the
user
parameter. See User Authentication.- Type:
str
- Requirement: Required when the
user
parameter is set.
- Type:
-
token — A valid token that allows access to the backend data, such as a platform access key for the platform backends. See User Authentication.
Returns a new Frames Client
data object.
The following examples, for local platform execution, both create a Frames client for accessing data in the "users" container by using the authentication credentials of user "iguazio"; the first example uses token (access-key) authentication while the second example uses username and password authentication (see User Authentication):
import v3io_frames as v3f
client = v3f.Client("framesd:8081", token="e8bd4ca2-537b-4175-bf01-8c74963e90bf", container="users")
import v3io_frames as v3f
client = v3f.Client("framesd:8081", user="iguazio", password="mypass", container="users")
All client methods receive the following common parameters; additional, method-specific parameters are described for each method.
-
backend — The backend data type for the operation. See Backend Types.
- Type:
str
- Requirement: Required
- Valid Values:
"nosql"
|"stream"
|"tsdb"
|"csv"
(for testing)
- Type:
-
table — The relative path to a data collection of the specified backend type in the target data container (as configured for the client object). For example,
"mytable"
or"/examples/tsdb/my_metrics"
.- Type:
str
- Requirement: Required unless otherwise specified in the method-specific documentation
- Type:
Creates a new data collection in the configured client data container, according to the specified backend type.
Note: The
create
method isn't applicable to thenosql
backend, because NoSQL tables in the platform don't need to be created prior to ingestion; when ingesting data into a table that doesn't exist, the table is automatically created.
create(backend, table, schema=None, if_exists=FAIL, **kw)
All Frames backends that support the create
method support the following common parameters:
-
if_exists — Determines whether to raise an error when the specified collection (
table
) already exists.- Type:
pb.ErrorOptions
enumeration. To use the enumeration, import theframes_pb2 module
; for example:
from v3io_frames import frames_pb2 as fpb
- Requirement: Optional
- Valid Values:
FAIL
to raise an error when the specified collection already exist;IGNORE
to ignore this - Default Value:
FAIL
- Type:
-
schema — a schema for describing unstructured collection data. This parameter is intended to be used only for testing purposes with the
csv
backend.- Type: Backend-specific or
None
- Requirement: Optional
- Default Value:
None
- Type: Backend-specific or
-
kw — This parameter is used for passing a variable-length list of additional keyword (named) arguments. For more information, see the backend-specific method parameters.
- Type:
**
— variable-length keyword arguments list - Requirement: Optional
- Type:
The following create
parameters are specific to the tsdb
backend and are passed as keyword arguments via the kw
parameter; for more information and examples, see the platform's Frames TSDB-backend reference:
-
rate — metric-samples ingestion rate.
- Type:
str
- Requirement: Required
- Valid Values: A string of the format
"[0-9]+/[smh]"
— where 's
' = seconds, 'm
' = minutes, and 'h
' = hours. For example,"1/s"
(one sample per minute),"20/m"
(20 samples per minute), or"50/h"
(50 samples per hour).
- Type:
-
aggregates — A list of aggregation functions for real-time aggregation during the samples ingestion ("pre-aggregation").
- Type:
str
- Requirement: Optional
- Valid Values: A string containing a comma-separated list of supported aggregation functions —
avg
|count
|last
|max
|min
|rate
|stddev
|stdvar
|sum
. For example,"count,avg,min,max"
.
- Type:
-
aggregation_granularity — Aggregation granularity; applicable when the
aggregates
parameter is set.- Type:
str
- Requirement: Optional
- Valid Values: A string of the format
"[0-9]+[mhd]"
— where 'm
' = minutes, 'h
' = hours, and 'd
' = days. For example,"30m"
(30 minutes),"2h"
(2 hours), or"1d"
(1 day). - Default Value:
"1h"
(1 hour)
- Type:
The following create
parameters are specific to the stream
backend and are passed as keyword arguments via the kw
parameter; for more information and examples, see the platform's Frames streaming-backend reference:
-
shards — The number of stream shards to create.
- Type:
int
- Requirement: Optional
- Default Value:
1
- Valid Values: A positive integer (>= 1).
For example,
100
.
- Type:
-
retention_hours — The stream's retention period, in hours.
- Type:
int
- Requirement: Optional
- Default Value:
24
- Valid Values: A positive integer (>= 1).
For example,
2
(2 hours).
- Type:
client.create("tsdb", table="mytsdb", rate="10/m")
client.create("tsdb", table="/tsdb/my_metrics", rate="1/s", aggregates="count,avg,min,max", aggregation_granularity="1h")
client.create("stream", table="/mystream", shards=3)
client.create("stream", table="/my_streams/stream1", retention_hours=2)
Writes data from a DataFrame to a data collection, according to the specified backend type.
write(backend, table, dfs, expression='', condition='', labels=None,
max_rows_in_msg=0, index_cols=None, save_mode='createNewItemsOnly',
partition_keys=None):
Note: The
expression
andpartition_keys
parameters aren't supported in the current release.
All Frames backends that support the write
method support the following common parameters:
-
dfs — One or more DataFrames containing the data to write.
- Type: A single DataFrame, a list of DataFrames, or a DataFrames iterator
- Requirement: Required
-
index_cols — A list of column (attribute) names to be used as index columns for the write operation, regardless of any index-column definitions in the DataFrame. By default, the DataFrame's index columns are used.
Note: The significance and supported number of index columns is backend specific. For example, the
nosql
backend supports only a single index column for the primary-key item attribute, while thetsdb
backend supports additional index columns for metric labels.- Type:
[]str
- Requirement: Optional
- Default Value:
None
- Type:
-
labels — This parameter is currently applicable only to the
tsdb
backend (although it's available for all backends) and is therefore documented as part of thewrite
method'stsdb
backend parameters.- Type:
dict
- Requirement: Optional
- Type:
-
save_mode — This parameter is currently applicable only to the
nosql
backend, and is therefore documented as part of thewrite
method'snosql
backend parameters.- Type:
str
- Requirement: Optional
- Type:
-
max_rows_in_msg — Maximum number of rows to write in each message (write chunk size).
- Type:
int
- Requirement: Optional
- Default Value:
0
- Type:
The following write
parameters are specific to the nosql
backend; for more information and examples, see the platform's Frames NoSQL-backend reference:
-
condition — A platform condition expression that defines conditions for performing the write operation.
- Type:
str
- Requirement: Optional
- Type:
-
save_mode — Save mode, which determines in which circumstances to write new item to the table.
- Type:
str
- Requirement: Optional
- Valid Values:
createNewItemsOnly
— write only new items; don't replace or update any existing table item with the same name (primary-key attribute value) as a written item."updateItem"
— update items; add new items and update the attributes of existing table items."overwriteItem"
— overwrite items; add new items and replace any existing table item with the same name as a written item."errorIfTableExists"
— create a new table only; only write items if the target table doesn't already exist."overwriteTable"
— overwrite the table; replace all existing table items (if any) with the written items.
- Default Value:
createNewItemsOnly
- Type:
The following write
parameter descriptions are specific to the tsdb
backend; for more information and examples, see the platform's Frames TSDB-backend reference:
-
labels — A dictionary of metric labels of the format
{<label>: <value>[, <label>: <value>, ...]}
to apply to all the DataFrame rows. For example,{"os": "linux", "arch": "x86"}
.- Type:
dict
- Requirement: Optional
- Default Value:
None
- Type:
data = [["tom", 10, "TLV"], ["nick", 15, "Berlin"], ["juli", 14, "NY"]]
df = pd.DataFrame(data, columns = ["name", "age", "city"])
df.set_index("name", inplace=True)
client.write(backend="nosql", table="mytable", dfs=df, condition="age>14")
from datetime import datetime
df = pd.DataFrame(data=[[30.1, 12.7]], index=[[datetime.now()], ["1"]],
columns=["cpu", "disk"])
df.index.names = ["time", "node"]
client.write(backend="tsdb", table="mytsdb", dfs=df)
import numpy as np
df = pd.DataFrame(np.random.rand(9, 3) * 100,
columns=["cpu", "mem", "disk"])
client.write("stream", table="mystream", dfs=df)
Reads data from a data collection to a DataFrame, according to the specified backend type.
- Syntax
- Common parameters
nosql
backendread
parameterstsdb
backendread
parametersstream
backendread
parameters- Return Value
- Examples
read(backend='', table='', query='', columns=None, filter='', group_by='',
limit=0, data_format='', row_layout=False, max_rows_in_msg=0, marker='',
iterator=False, get_raw=False, **kw)
Note: The
limit
,data_format
,row_layout
, andmarker
parameters aren't supported in the current release, andget_raw
is for internal use only.
All Frames backends that support the read
method support the following common parameters:
-
iterator — set to
True
to to return a pandas DataFrames iterator;False
(default) returns a single DataFrame.- Type:
bool
- Requirement: Optional
- Default Value:
False
- Type:
-
filter — A query filter. For example,
filter="col1=='my_value'"
.
This parameter is currently applicable only to thenosql
andtsdb
backends, and cannot be used concurrently with thequery
parameter of thetsdb
backend.- Type:
str
- Requirement: Optional
- Type:
-
columns — A list of attributes (columns) to return.
This parameter is currently applicable only to thenosql
andtsdb
backends, and cannot be used concurrently with thequery
parameter of thetsdb
backend.- Type:
[]str
- Requirement: Optional
- Type:
-
kw — This parameter is used for passing a variable-length list of additional keyword (named) arguments. For more information, see the backend-specific method parameters.
- Type:
**
— variable-length keyword arguments list - Requirement: Optional
- Type:
The following read
parameters are specific to the nosql
backend; for more information and examples, see the platform's Frames NoSQL-backend reference:
The following parameters are passed as keyword arguments via the kw
parameter:
-
reset_index — Set to
True
to reset the index column of the returned DataFrame and use the auto-generated pandas range-index column;False
(default) sets the index column to the table's primary-key attribute.- Type:
bool
- Requirement: Optional
- Default Value:
False
- Type:
-
sharding_keys [Tech Preview] — A list of specific sharding keys to query, for range-scan formatted tables only.
- Type:
[]str
- Requirement: Optional
- Type:
The following read
parameters are specific to the tsdb
backend; for more information and examples, see the platform's Frames TSDB-backend reference:
-
group_by [Tech Preview] — A group-by query string.
This parameter cannot be used concurrently with thequery
parameter.- Type:
str
- Requirement: Optional
- Type:
-
query [Tech Preview] — A query string in SQL format.
Note:
- When setting the
query
parameter, you must provide the path to the TSDB table as part of theFROM
clause in the query string and not in theread
method'stable
parameter. - This parameter cannot be set concurrently with the following parameters:
aggregators
,columns
,filter
, orgroup_by
parameters.
- Type:
str
- Requirement: Optional
- When setting the
The following parameters are passed as keyword arguments via the kw
parameter:
-
start — Start (minimum) time for the read operation.
- Type:
str
- Requirement: Optional
- Valid Values: A string containing an RFC 3339 time, a Unix timestamp in milliseconds, a relative time of the format
"now"
or"now-[0-9]+[mhd]"
(wherem
= minutes,h
= hours, and'd'
= days), or 0 for the earliest time. For example:"2016-01-02T15:34:26Z"
;"1451748866"
;"now-90m"
;"0"
. - Default Value:
<end time> - 1h
- Type:
-
end — End (maximum) time for the read operation.
- Type:
str
- Requirement: Optional
- Valid Values: A string containing an RFC 3339 time, a Unix timestamp in milliseconds, a relative time of the format
"now"
or"now-[0-9]+[mhd]"
(wherem
= minutes,h
= hours, and'd'
= days), or 0 for the earliest time. For example:"2018-09-26T14:10:20Z"
;"1537971006000"
;"now-3h"
;"now-7d"
. - Default Value:
now
- Type:
-
step — The query aggregation or downsampling step. The default step is the query's time range, which can be configured via the start and end parameters.
- Type:
str
- Requirement: Optional
- Type:
-
aggregators — Aggregation information to return, as a comma-separated list of supported aggregation functions ("aggregators").
This parameter cannot be used concurrently with thequery
parameter.- Type:
str
- Requirement: Optional
- Valid Value: The following aggregation functions are supported for over-time aggregation (across each unique label set); for cross-series aggregation (across all metric labels), add "
_all
" to the end of the function name:
avg
|count
|last
|max
|min
|rate
|stddev
|stdvar
|sum
- Type:
-
aggregation_window [Tech Preview] — Aggregation interval for applying over-time aggregation functions, if set in the
aggregators
orquery
parameters.- Type:
str
- Requirement: Optional
- Valid Values: A string of the format
"[0-9]+[mhd]"
where 'm
' = minutes, 'h
' = hours, and 'd
' = days. For example,"30m"
(30 minutes),"2h"
(2 hours), or"1d"
(1 day). - Default Value: The query's aggregation step
- Type:
-
multi_index — set to
True
to display labels as index columns in the read results;False
(default) displays only the metric's sample time as an index column.- Type:
bool
- Requirement: Optional
- Default Value:
False
- Type:
The following read
parameters are specific to the stream
backend and are passed as keyword arguments via the kw
parameter; for more information and examples, see the platform's Frames streaming-backend reference:
-
seek — Seek type.
When the"seq"
or"sequence"
seek type is set, you must set thesequence
parameter to the desired record sequence number.
When thetime
seek type is set, you must set thestart
parameter to the desired seek start time.- Type:
str
- Requirement: Required
- Valid Values:
"time"
|"seq"
|"sequence"
|"latest"
|"earliest"
- Type:
-
shard_id — The ID of the stream shard from which to read.
- Type:
str
- Requirement: Required
- Valid values:
"0"
..."<stream shard count> - 1"
- Type:
-
sequence — The sequence number of the record from which to start reading.
- Type:
int64
- Requirement: Required
- Type:
-
start — The earliest record ingestion time from which to start reading.
- Type:
str
- Requirement: Required when
seek
="time"
- Valid Values: A string containing an RFC 3339 time, a Unix timestamp in milliseconds, a relative time of the format
"now"
or"now-[0-9]+[mhd]"
(wherem
= minutes,h
= hours, and'd'
= days), or 0 for the earliest time. For example:"2016-01-02T15:34:26Z"
;"1451748866"
;"now-90m"
;"0"
.
- Type:
- When the value of the
iterator
parameter isFalse
(default) — returns a single DataFrame. - When the value of the
iterator
parameter isTrue
— returns a DataFrames iterator.
df = client.read(backend="nosql", table="mytable", filter="col1>666")
df = client.read("tsdb", table="mytsdb" start="0", multi_index=True)
df = client.read(backend="tsdb", query="select avg(cpu) as cpu, avg(disk) from 'mytsdb' where node='1'", start="now-1d", end="now", step="2h")
df = client.read(backend="stream", table="mystream", seek="latest", shard_id="5")
Deletes a data collection or specific collection items, according to the specified backend type.
delete(backend, table, filter='', start='', end='', if_missing=FAIL
-
if_missing — Determines whether to raise an error when the specified collection (
table
) doesn't exist.- Type:
pb.ErrorOptions
enumeration. To use the enumeration, import theframes_pb2 module
; for example:
from v3io_frames import frames_pb2 as fpb
- Requirement: Optional
- Valid Values:
FAIL
to raise an error when the specified collection doesn't exist;IGNORE
to ignore this - Default Value:
FAIL
- Type:
The following delete
parameters are specific to the nosql
backend; for more information and examples, see the platform's Frames NoSQL-backend reference:
-
filter — A filter expression that identifies specific items to delete.
- Type:
str
- Requirement: Optional
- Default Value:
""
— delete the entire table and its schema file
- Type:
The following delete
parameters are specific to the tsdb
backend; for more information and examples, see the platform's Frames TSDB-backend reference:
-
start — Start (minimum) time for the delete operation — i.e., delete only items whose data sample time is at or after (
>=
) the specified start time.- Type:
str
- Requirement: Optional
- Valid Values: A string containing an RFC 3339 time, a Unix timestamp in milliseconds, a relative time of the format
"now"
or"now-[0-9]+[mhd]"
(wherem
= minutes,h
= hours, and'd'
= days), or 0 for the earliest time. For example:"2016-01-02T15:34:26Z"
;"1451748866"
;"now-90m"
;"0"
. - Default Value:
""
when neitherstart
norend
are set — delete the entire table and its schema file (.schema);0
whenend
is set
- Type:
-
end —
str
— End (maximum) time for the delete operation — i.e., delete only items whose data sample time is before or at (<=
) the specified end time.- Type:
str
- Requirement: Optional
- Valid Values: A string containing an RFC 3339 time, a Unix timestamp in milliseconds, a relative time of the format
"now"
or"now-[0-9]+[mhd]"
(wherem
= minutes,h
= hours, and'd'
= days), or 0 for the earliest time. For example:"2018-09-26T14:10:20Z"
;"1537971006000"
;"now-3h"
;"now-7d"
. - Default Value:
""
when neitherstart
norend
are set — delete the entire table and its schema file (.schema);0
whenstart
is set
- Type:
Note:
- When neither the
start
norend
parameters are set, the entire TSDB table and its schema file are deleted.- Only full table partitions within the specified time frame (as determined by the
start
andend
parameters) are deleted. Items within the specified time frames that reside within partitions that begin before the delete start time or end after the delete end time aren't deleted. The partition interval is calculated automatically based on the table's ingestion rate and is stored in the TSDB'spartitionerInterval
schema field (see the .schema file).
client.delete(backend="nosql", table="mytable", filter="age > 40")
client.delete(backend="tsdb", table="mytsdb", start="now-1d", end="now-5h")
from v3io_frames import frames_pb2 as fpb
client.delete(backend="stream", table="mystream", if_missing=fpb.IGNORE)
Extends the basic CRUD functionality of the other client methods via backend-specific commands for performing operations on a data collection.
Note: Currently, no
execute
commands are available for thetsdb
backend.
execute(backend, table, command="", args=None)
All Frames backends that support the execute
method support the following common parameters:
-
command — The command to execute.
- Type:
str
- Requirement: Required
- Valid Values: Backend-specific
- Type:
-
args — A dictionary of
<argument name>: <value>
pairs for passing command-specific parameters (arguments).- Type:
dict
- Requirement and Valid Values: Backend-specific
- Default Value:
None
- Type:
The following execute
commands are specific to the nosql
backend; for more information and examples, see the platform's Frames NoSQL-backend reference:
-
infer | infer_schema — Infers the data schema of a given NoSQL table and creates a schema file for the table.
Example:
client.execute(backend="nosql", table="mytable", command="infer")
The following execute
commands are specific to the stream
backend; for more information and examples, see the platform's Frames streaming-backend reference:
-
put — Adds records to a stream shard.
Example:
client.execute('stream', table="mystream", command='put', args={'data': '{"cpu": 12.4, "mem": 31.1, "disk": 12.7}', "client_info": "my custom info", "partition": "PK1"})
To contribute to V3IO Frames, you need to be aware of the following:
The following components are required for building Frames code:
- Go server with support for both the gRPC and HTTP protocols
- Go client
- Python client
The core is written in Go.
The development is done on the development
branch and then released to the master
branch.
Before submitting changes, test the code:
- To execute the Go tests, run
make test
. - To execute the Python tests, run
make test-python
.
- If you add Go dependencies, run
make update-go-deps
. - If you add Python dependencies, update clients/py/Pipfile and run
make update-py-deps
.
Integration tests are run on Travis CI. See .travis.yml for details.
The following environment variables are defined in the Travis settings:
- Docker Container Registry (Quay.io)
DOCKER_PASSWORD
— a password for pushing images to Quay.io.DOCKER_USERNAME
— a username for pushing images to Quay.io.
- Python Package Index (PyPI)
V3IO_PYPI_PASSWORD
— a password for pushing a new release to PyPi.V3IO_PYPI_USER
— a username for pushing a new release to PyPi.
- Iguazio Data Science Platform
-
V3IO_SESSION
— a JSON encoded map with session information for running tests. For example:'{"url":"45.39.128.5:8081","container":"mitzi","user":"daffy","password":"rabbit season"}'
Note: Make sure to embed the JSON object within single quotes (
'{...}'
).
-
Use the following command to build the Docker image:
make build-docker
Use the following command to run the Docker image:
docker run \
-v /path/to/config.yaml:/etc/framesd.yaml \
quay.io/v3io/frames:unstable