diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 346c62f8..36488543 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -56,6 +56,7 @@ jobs: - name: Demo run: | cd examples + cd tracker_api_example python app.py "localhost:9090" - name: Coveralls diff --git a/CHANGES.txt b/CHANGES.txt index 9a9b82b9..f4ab14eb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,8 @@ +Version 0.15.0 (2023-04-19) +--------------------------- +Use Requests Session for sending eventss (#221) +Add Redis example app (#322) + Version 0.14.0 (2023-03-21) --------------------------- Adds deprecation warnings for V1 changes (#315) diff --git a/docs/source/conf.py b/docs/source/conf.py index 59bfe9df..d2b51972 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -28,7 +28,7 @@ author = 'Alex Dean, Paul Boocock, Matus Tomlein, Jack Keene' # The full version, including alpha/beta/rc tags -release = "0.14" +release = "0.15" # -- General configuration --------------------------------------------------- diff --git a/examples/redis_example/README.md b/examples/redis_example/README.md new file mode 100644 index 00000000..59b1aeb7 --- /dev/null +++ b/examples/redis_example/README.md @@ -0,0 +1,26 @@ +# Redis Example App + +This example shows how to set up the Python tracker with a Redis database and a Redis worker to forward events to a Snowplow pipeline. + +#### Installation +- Install the Python tracker from the root folder of the project. + +`python setup.py install` + +- Install redis for your machine. More information can be found [here](https://redis.io/docs/getting-started/installation/) + +`brew install redis` + +- Run `redis-server` to check your redis installation, to stop the server enter `ctrl+c`. + +#### Usage +Navigate to the example folder. + +`cd examples/redis_example` + +This example has two programmes, `redis_app.py` tracks events and sends them to a redis database, `redis_worker.py` then forwards these events onto a Snowplow pipeline. + +To send events to your pipeline, run `redis-server`, followed by the `redis_worker.py {{your_collector_endpoint}}` and finally `redis_app.py`. You should see 3 events in your pipleine. + + + diff --git a/examples/redis_example/redis_app.py b/examples/redis_example/redis_app.py new file mode 100644 index 00000000..db7ffc32 --- /dev/null +++ b/examples/redis_example/redis_app.py @@ -0,0 +1,60 @@ +from snowplow_tracker import Tracker +from snowplow_tracker.typing import PayloadDict +import json +import redis +import logging + +# logging +logging.basicConfig() +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +class RedisEmitter(object): + """ + Sends Snowplow events to a Redis database + """ + + def __init__(self, rdb=None, key: str = "redis_key") -> None: + """ + :param rdb: Optional custom Redis database + :type rdb: redis | None + :param key: The Redis key for the list of events + :type key: string + """ + + if rdb is None: + rdb = redis.StrictRedis() + + self.rdb = rdb + self.key = key + + def input(self, payload: PayloadDict) -> None: + """ + :param payload: The event properties + :type payload: dict(string:*) + """ + logger.info("Pushing event to Redis queue...") + self.rdb.rpush(self.key, json.dumps(payload)) + logger.info("Finished sending event to Redis.") + + def flush(self) -> None: + logger.warning("The RedisEmitter class does not need to be flushed") + return + + def sync_flush(self) -> None: + self.flush() + + +def main(): + emitter = RedisEmitter() + + t = Tracker(emitter) + + t.track_page_view("https://www.snowplow.io", "Homepage") + t.track_page_ping("https://www.snowplow.io", "Homepage") + t.track_link_click("https://www.snowplow.io") + + +if __name__ == "__main__": + main() diff --git a/examples/redis_example/redis_worker.py b/examples/redis_example/redis_worker.py new file mode 100644 index 00000000..6a190683 --- /dev/null +++ b/examples/redis_example/redis_worker.py @@ -0,0 +1,74 @@ +import sys +from snowplow_tracker import Emitter +from typing import Any +from snowplow_tracker.typing import PayloadDict +import json +import redis +import signal +import gevent +from gevent.pool import Pool + + +def get_url_from_args(): + if len(sys.argv) != 2: + raise ValueError("Collector Endpoint is required") + return sys.argv[1] + + +class RedisWorker: + def __init__(self, emitter: Emitter, key) -> None: + self.pool = Pool(5) + self.emitter = emitter + self.rdb = redis.StrictRedis() + self.key = key + + signal.signal(signal.SIGTERM, self.request_shutdown) + signal.signal(signal.SIGINT, self.request_shutdown) + signal.signal(signal.SIGQUIT, self.request_shutdown) + + def send(self, payload: PayloadDict) -> None: + """ + Send an event to an emitter + """ + self.emitter.input(payload) + + def pop_payload(self) -> None: + """ + Get a single event from Redis and send it + If the Redis queue is empty, sleep to avoid making continual requests + """ + payload = self.rdb.lpop(self.key) + if payload: + self.pool.spawn(self.send, json.loads(payload.decode("utf-8"))) + else: + gevent.sleep(5) + + def run(self) -> None: + """ + Run indefinitely + """ + self._shutdown = False + while not self._shutdown: + self.pop_payload() + self.pool.join(timeout=20) + + def request_shutdown(self, *args: Any) -> None: + """ + Halt the worker + """ + self._shutdown = True + + +def main(): + collector_url = get_url_from_args() + + # Configure Emitter + emitter = Emitter(collector_url, batch_size=1) + + # Setup worker + worker = RedisWorker(emitter=emitter, key="redis_key") + worker.run() + + +if __name__ == "__main__": + main() diff --git a/examples/redis_example/requirements.txt b/examples/redis_example/requirements.txt new file mode 100644 index 00000000..ac10dd44 --- /dev/null +++ b/examples/redis_example/requirements.txt @@ -0,0 +1,2 @@ +redis~=4.5 +gevent~=22.10 \ No newline at end of file diff --git a/examples/snowplow_api_example/README.md b/examples/snowplow_api_example/README.md new file mode 100644 index 00000000..6819757b --- /dev/null +++ b/examples/snowplow_api_example/README.md @@ -0,0 +1,18 @@ +# Snowplow API Example App + +This example shows how to set up the Python tracker with the Snowplow API to send events to a Snowplow pipeline. + +#### Installation +- Install the Python tracker from the root folder of the project. + +`python setup.py install` + +#### Usage +Navigate to the example folder. + +`cd examples/snowplow_api_example` + +To send events to your pipeline, run `snowplow_app.py {{your_collector_endpoint}}`. You should see 6 events in your pipleine. + + + diff --git a/examples/snowplow_app.py b/examples/snowplow_api_example/snowplow_app.py similarity index 100% rename from examples/snowplow_app.py rename to examples/snowplow_api_example/snowplow_app.py diff --git a/examples/tracker_api_example/README.md b/examples/tracker_api_example/README.md new file mode 100644 index 00000000..10392b17 --- /dev/null +++ b/examples/tracker_api_example/README.md @@ -0,0 +1,18 @@ +# Example App + +This example shows how to set up the Python tracker with the tracker API to send events to a Snowplow pipeline. + +#### Installation +- Install the Python tracker from the root folder of the project. + +`python setup.py install` + +#### Usage +Navigate to the example folder. + +`cd examples/tracker_api_example` + +To send events to your pipeline, run `app.py {{your_collector_endpoint}}`. You should see 5 events in your pipleine. + + + diff --git a/examples/app.py b/examples/tracker_api_example/app.py similarity index 90% rename from examples/app.py rename to examples/tracker_api_example/app.py index 973f5a99..cee66b47 100644 --- a/examples/app.py +++ b/examples/tracker_api_example/app.py @@ -1,5 +1,10 @@ from distutils.log import error -from snowplow_tracker import Tracker, Emitter, Subject, SelfDescribingJson +from snowplow_tracker import ( + Tracker, + Emitter, + Subject, + SelfDescribingJson, +) import sys diff --git a/setup.py b/setup.py index b7968c9a..81776ce5 100644 --- a/setup.py +++ b/setup.py @@ -33,7 +33,7 @@ setup( name="snowplow-tracker", - version="0.14.0", + version="0.15.0", author=authors_str, author_email=authors_email_str, packages=[ diff --git a/snowplow_tracker/_version.py b/snowplow_tracker/_version.py index d9f39c84..69a6ff65 100644 --- a/snowplow_tracker/_version.py +++ b/snowplow_tracker/_version.py @@ -15,6 +15,6 @@ # language governing permissions and limitations there under. # """ -__version_info__ = (0, 14, 0) +__version_info__ = (0, 15, 0) __version__ = ".".join(str(x) for x in __version_info__) __build_version__ = __version__ + "" diff --git a/snowplow_tracker/emitter_configuration.py b/snowplow_tracker/emitter_configuration.py index 87fa6c9e..1cf90238 100644 --- a/snowplow_tracker/emitter_configuration.py +++ b/snowplow_tracker/emitter_configuration.py @@ -18,6 +18,7 @@ from typing import Optional, Union, Tuple, Dict from snowplow_tracker.typing import SuccessCallback, FailureCallback from snowplow_tracker.event_store import EventStore +import requests class EmitterConfiguration(object): @@ -31,6 +32,7 @@ def __init__( buffer_capacity: Optional[int] = None, custom_retry_codes: Dict[int, bool] = {}, event_store: Optional[EventStore] = None, + session: Optional[requests.Session] = None, ) -> None: """ Configuration for the emitter that sends events to the Snowplow collector. @@ -57,6 +59,8 @@ def __init__( :type custom_retry_codes: dict :param event_store: Stores the event buffer and buffer capacity. Default is an InMemoryEventStore object with buffer_capacity of 10,000 events. :type event_store: EventStore | None + :param session: Persist parameters across requests by using a session object + :type session: request.Session | None """ self.batch_size = batch_size @@ -67,6 +71,7 @@ def __init__( self.buffer_capacity = buffer_capacity self.custom_retry_codes = custom_retry_codes self.event_store = event_store + self.session = session @property def batch_size(self) -> Optional[int]: @@ -197,3 +202,14 @@ def event_store(self) -> Optional[EventStore]: @event_store.setter def event_store(self, value: Optional[EventStore]): self._event_store = value + + @property + def session(self) -> Optional[requests.Session]: + """ + Persist parameters across requests using a requests.Session object + """ + return self._session + + @session.setter + def session(self, value: Optional[requests.Session]): + self._session = value diff --git a/snowplow_tracker/emitters.py b/snowplow_tracker/emitters.py index 19151885..af233566 100644 --- a/snowplow_tracker/emitters.py +++ b/snowplow_tracker/emitters.py @@ -69,6 +69,7 @@ def __init__( buffer_capacity: Optional[int] = None, custom_retry_codes: Dict[int, bool] = {}, event_store: Optional[EventStore] = None, + session: Optional[requests.Session] = None, ) -> None: """ :param endpoint: The collector URL. If protocol is not set in endpoint it will automatically set to "https://" - this is done automatically. @@ -107,6 +108,8 @@ def __init__( :type custom_retry_codes: dict :param event_store: Stores the event buffer and buffer capacity. Default is an InMemoryEventStore object with buffer_capacity of 10,000 events. :type event_store: EventStore | None + :param session: Persist parameters across requests by using a session object + :type session: requests.Session | None """ one_of(protocol, PROTOCOLS) one_of(method, METHODS) @@ -153,6 +156,8 @@ def __init__( self.custom_retry_codes = custom_retry_codes logger.info("Emitter initialized with endpoint " + self.endpoint) + self.request_method = requests if session is None else session + @staticmethod def as_collector_uri( endpoint: str, @@ -243,7 +248,7 @@ def http_post(self, data: str) -> int: logger.info("Sending POST request to %s..." % self.endpoint) logger.debug("Payload: %s" % data) try: - r = requests.post( + r = self.request_method.post( self.endpoint, data=data, headers={"Content-Type": "application/json; charset=utf-8"}, @@ -263,7 +268,7 @@ def http_get(self, payload: PayloadDict) -> int: logger.info("Sending GET request to %s..." % self.endpoint) logger.debug("Payload: %s" % payload) try: - r = requests.get( + r = self.request_method.get( self.endpoint, params=payload, timeout=self.request_timeout ) except requests.RequestException as e: @@ -444,6 +449,7 @@ def __init__( buffer_capacity: int = None, custom_retry_codes: Dict[int, bool] = {}, event_store: Optional[EventStore] = None, + session: Optional[requests.Session] = None, ) -> None: """ :param endpoint: The collector URL. If protocol is not set in endpoint it will automatically set to "https://" - this is done automatically. @@ -476,6 +482,8 @@ def __init__( :type buffer_capacity: int :param event_store: Stores the event buffer and buffer capacity. Default is an InMemoryEventStore object with buffer_capacity of 10,000 events. :type event_store: EventStore + :param session: Persist parameters across requests by using a session object + :type session: requests.Session | None """ super(AsyncEmitter, self).__init__( endpoint=endpoint, @@ -491,6 +499,7 @@ def __init__( buffer_capacity=buffer_capacity, custom_retry_codes=custom_retry_codes, event_store=event_store, + session=session, ) self.queue = Queue() for i in range(thread_count): diff --git a/snowplow_tracker/snowplow.py b/snowplow_tracker/snowplow.py index b967cdec..953c1587 100644 --- a/snowplow_tracker/snowplow.py +++ b/snowplow_tracker/snowplow.py @@ -81,6 +81,7 @@ def create_tracker( request_timeout=emitter_config.request_timeout, custom_retry_codes=emitter_config.custom_retry_codes, event_store=emitter_config.event_store, + session=emitter_config.session, ) tracker = Tracker(