Skip to content

Commit

Permalink
Merge branch 'release/0.15.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
Jack-Keene committed Apr 19, 2023
2 parents 5d33568 + c894d4d commit 033c780
Show file tree
Hide file tree
Showing 16 changed files with 241 additions and 6 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ jobs:
- name: Demo
run: |
cd examples
cd tracker_api_example
python app.py "localhost:9090"
- name: Coveralls
Expand Down
5 changes: 5 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
Version 0.15.0 (2023-04-19)
---------------------------
Use Requests Session for sending eventss (#221)
Add Redis example app (#322)

Version 0.14.0 (2023-03-21)
---------------------------
Adds deprecation warnings for V1 changes (#315)
Expand Down
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
author = 'Alex Dean, Paul Boocock, Matus Tomlein, Jack Keene'

# The full version, including alpha/beta/rc tags
release = "0.14"
release = "0.15"


# -- General configuration ---------------------------------------------------
Expand Down
26 changes: 26 additions & 0 deletions examples/redis_example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Redis Example App

This example shows how to set up the Python tracker with a Redis database and a Redis worker to forward events to a Snowplow pipeline.

#### Installation
- Install the Python tracker from the root folder of the project.

`python setup.py install`

- Install redis for your machine. More information can be found [here](https://redis.io/docs/getting-started/installation/)

`brew install redis`

- Run `redis-server` to check your redis installation, to stop the server enter `ctrl+c`.

#### Usage
Navigate to the example folder.

`cd examples/redis_example`

This example has two programmes, `redis_app.py` tracks events and sends them to a redis database, `redis_worker.py` then forwards these events onto a Snowplow pipeline.

To send events to your pipeline, run `redis-server`, followed by the `redis_worker.py {{your_collector_endpoint}}` and finally `redis_app.py`. You should see 3 events in your pipleine.



60 changes: 60 additions & 0 deletions examples/redis_example/redis_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from snowplow_tracker import Tracker
from snowplow_tracker.typing import PayloadDict
import json
import redis
import logging

# logging
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


class RedisEmitter(object):
"""
Sends Snowplow events to a Redis database
"""

def __init__(self, rdb=None, key: str = "redis_key") -> None:
"""
:param rdb: Optional custom Redis database
:type rdb: redis | None
:param key: The Redis key for the list of events
:type key: string
"""

if rdb is None:
rdb = redis.StrictRedis()

self.rdb = rdb
self.key = key

def input(self, payload: PayloadDict) -> None:
"""
:param payload: The event properties
:type payload: dict(string:*)
"""
logger.info("Pushing event to Redis queue...")
self.rdb.rpush(self.key, json.dumps(payload))
logger.info("Finished sending event to Redis.")

def flush(self) -> None:
logger.warning("The RedisEmitter class does not need to be flushed")
return

def sync_flush(self) -> None:
self.flush()


def main():
emitter = RedisEmitter()

t = Tracker(emitter)

t.track_page_view("https://www.snowplow.io", "Homepage")
t.track_page_ping("https://www.snowplow.io", "Homepage")
t.track_link_click("https://www.snowplow.io")


if __name__ == "__main__":
main()
74 changes: 74 additions & 0 deletions examples/redis_example/redis_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import sys
from snowplow_tracker import Emitter
from typing import Any
from snowplow_tracker.typing import PayloadDict
import json
import redis
import signal
import gevent
from gevent.pool import Pool


def get_url_from_args():
if len(sys.argv) != 2:
raise ValueError("Collector Endpoint is required")
return sys.argv[1]


class RedisWorker:
def __init__(self, emitter: Emitter, key) -> None:
self.pool = Pool(5)
self.emitter = emitter
self.rdb = redis.StrictRedis()
self.key = key

signal.signal(signal.SIGTERM, self.request_shutdown)
signal.signal(signal.SIGINT, self.request_shutdown)
signal.signal(signal.SIGQUIT, self.request_shutdown)

def send(self, payload: PayloadDict) -> None:
"""
Send an event to an emitter
"""
self.emitter.input(payload)

def pop_payload(self) -> None:
"""
Get a single event from Redis and send it
If the Redis queue is empty, sleep to avoid making continual requests
"""
payload = self.rdb.lpop(self.key)
if payload:
self.pool.spawn(self.send, json.loads(payload.decode("utf-8")))
else:
gevent.sleep(5)

def run(self) -> None:
"""
Run indefinitely
"""
self._shutdown = False
while not self._shutdown:
self.pop_payload()
self.pool.join(timeout=20)

def request_shutdown(self, *args: Any) -> None:
"""
Halt the worker
"""
self._shutdown = True


def main():
collector_url = get_url_from_args()

# Configure Emitter
emitter = Emitter(collector_url, batch_size=1)

# Setup worker
worker = RedisWorker(emitter=emitter, key="redis_key")
worker.run()


if __name__ == "__main__":
main()
2 changes: 2 additions & 0 deletions examples/redis_example/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
redis~=4.5
gevent~=22.10
18 changes: 18 additions & 0 deletions examples/snowplow_api_example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Snowplow API Example App

This example shows how to set up the Python tracker with the Snowplow API to send events to a Snowplow pipeline.

#### Installation
- Install the Python tracker from the root folder of the project.

`python setup.py install`

#### Usage
Navigate to the example folder.

`cd examples/snowplow_api_example`

To send events to your pipeline, run `snowplow_app.py {{your_collector_endpoint}}`. You should see 6 events in your pipleine.



File renamed without changes.
18 changes: 18 additions & 0 deletions examples/tracker_api_example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Example App

This example shows how to set up the Python tracker with the tracker API to send events to a Snowplow pipeline.

#### Installation
- Install the Python tracker from the root folder of the project.

`python setup.py install`

#### Usage
Navigate to the example folder.

`cd examples/tracker_api_example`

To send events to your pipeline, run `app.py {{your_collector_endpoint}}`. You should see 5 events in your pipleine.



7 changes: 6 additions & 1 deletion examples/app.py → examples/tracker_api_example/app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
from distutils.log import error
from snowplow_tracker import Tracker, Emitter, Subject, SelfDescribingJson
from snowplow_tracker import (
Tracker,
Emitter,
Subject,
SelfDescribingJson,
)
import sys


Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

setup(
name="snowplow-tracker",
version="0.14.0",
version="0.15.0",
author=authors_str,
author_email=authors_email_str,
packages=[
Expand Down
2 changes: 1 addition & 1 deletion snowplow_tracker/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
# language governing permissions and limitations there under.
# """

__version_info__ = (0, 14, 0)
__version_info__ = (0, 15, 0)
__version__ = ".".join(str(x) for x in __version_info__)
__build_version__ = __version__ + ""
16 changes: 16 additions & 0 deletions snowplow_tracker/emitter_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from typing import Optional, Union, Tuple, Dict
from snowplow_tracker.typing import SuccessCallback, FailureCallback
from snowplow_tracker.event_store import EventStore
import requests


class EmitterConfiguration(object):
Expand All @@ -31,6 +32,7 @@ def __init__(
buffer_capacity: Optional[int] = None,
custom_retry_codes: Dict[int, bool] = {},
event_store: Optional[EventStore] = None,
session: Optional[requests.Session] = None,
) -> None:
"""
Configuration for the emitter that sends events to the Snowplow collector.
Expand All @@ -57,6 +59,8 @@ def __init__(
:type custom_retry_codes: dict
:param event_store: Stores the event buffer and buffer capacity. Default is an InMemoryEventStore object with buffer_capacity of 10,000 events.
:type event_store: EventStore | None
:param session: Persist parameters across requests by using a session object
:type session: request.Session | None
"""

self.batch_size = batch_size
Expand All @@ -67,6 +71,7 @@ def __init__(
self.buffer_capacity = buffer_capacity
self.custom_retry_codes = custom_retry_codes
self.event_store = event_store
self.session = session

@property
def batch_size(self) -> Optional[int]:
Expand Down Expand Up @@ -197,3 +202,14 @@ def event_store(self) -> Optional[EventStore]:
@event_store.setter
def event_store(self, value: Optional[EventStore]):
self._event_store = value

@property
def session(self) -> Optional[requests.Session]:
"""
Persist parameters across requests using a requests.Session object
"""
return self._session

@session.setter
def session(self, value: Optional[requests.Session]):
self._session = value
13 changes: 11 additions & 2 deletions snowplow_tracker/emitters.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __init__(
buffer_capacity: Optional[int] = None,
custom_retry_codes: Dict[int, bool] = {},
event_store: Optional[EventStore] = None,
session: Optional[requests.Session] = None,
) -> None:
"""
:param endpoint: The collector URL. If protocol is not set in endpoint it will automatically set to "https://" - this is done automatically.
Expand Down Expand Up @@ -107,6 +108,8 @@ def __init__(
:type custom_retry_codes: dict
:param event_store: Stores the event buffer and buffer capacity. Default is an InMemoryEventStore object with buffer_capacity of 10,000 events.
:type event_store: EventStore | None
:param session: Persist parameters across requests by using a session object
:type session: requests.Session | None
"""
one_of(protocol, PROTOCOLS)
one_of(method, METHODS)
Expand Down Expand Up @@ -153,6 +156,8 @@ def __init__(
self.custom_retry_codes = custom_retry_codes
logger.info("Emitter initialized with endpoint " + self.endpoint)

self.request_method = requests if session is None else session

@staticmethod
def as_collector_uri(
endpoint: str,
Expand Down Expand Up @@ -243,7 +248,7 @@ def http_post(self, data: str) -> int:
logger.info("Sending POST request to %s..." % self.endpoint)
logger.debug("Payload: %s" % data)
try:
r = requests.post(
r = self.request_method.post(
self.endpoint,
data=data,
headers={"Content-Type": "application/json; charset=utf-8"},
Expand All @@ -263,7 +268,7 @@ def http_get(self, payload: PayloadDict) -> int:
logger.info("Sending GET request to %s..." % self.endpoint)
logger.debug("Payload: %s" % payload)
try:
r = requests.get(
r = self.request_method.get(
self.endpoint, params=payload, timeout=self.request_timeout
)
except requests.RequestException as e:
Expand Down Expand Up @@ -444,6 +449,7 @@ def __init__(
buffer_capacity: int = None,
custom_retry_codes: Dict[int, bool] = {},
event_store: Optional[EventStore] = None,
session: Optional[requests.Session] = None,
) -> None:
"""
:param endpoint: The collector URL. If protocol is not set in endpoint it will automatically set to "https://" - this is done automatically.
Expand Down Expand Up @@ -476,6 +482,8 @@ def __init__(
:type buffer_capacity: int
:param event_store: Stores the event buffer and buffer capacity. Default is an InMemoryEventStore object with buffer_capacity of 10,000 events.
:type event_store: EventStore
:param session: Persist parameters across requests by using a session object
:type session: requests.Session | None
"""
super(AsyncEmitter, self).__init__(
endpoint=endpoint,
Expand All @@ -491,6 +499,7 @@ def __init__(
buffer_capacity=buffer_capacity,
custom_retry_codes=custom_retry_codes,
event_store=event_store,
session=session,
)
self.queue = Queue()
for i in range(thread_count):
Expand Down
1 change: 1 addition & 0 deletions snowplow_tracker/snowplow.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def create_tracker(
request_timeout=emitter_config.request_timeout,
custom_retry_codes=emitter_config.custom_retry_codes,
event_store=emitter_config.event_store,
session=emitter_config.session,
)

tracker = Tracker(
Expand Down

0 comments on commit 033c780

Please sign in to comment.