Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pubsub functionality via MQTT #88

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,19 @@ endif()

set_property(GLOBAL PROPERTY USE_FOLDERS ON)

set(GEDS_EXTRA_COMPILER_FLAGS -Wall -Wextra -Werror) # -Wpedantic # error: ISO C++ does not support ‘__int128’ for ‘type name’ [-Werror=pedantic]
# Added for pub/sub service
# Requires to remove GEDS_EXTRA_COMPILER_FLAGS -Werror
include(FetchContent)
FetchContent_Declare(MQTT
GIT_REPOSITORY https://github.com/eclipse/paho.mqtt.c.git
GIT_TAG v1.3.8)
FetchContent_MakeAvailable(MQTT)
FetchContent_Declare(MQTT_CXX
GIT_REPOSITORY https://github.com/eclipse/paho.mqtt.cpp
GIT_TAG v1.2.0)
FetchContent_MakeAvailable(MQTT_CXX)

set(GEDS_EXTRA_COMPILER_FLAGS -Wall -Wextra) # -Wpedantic # error: ISO C++ does not support ‘__int128’ for ‘type name’ [-Werror=pedantic]
set(GEDS_EXTRA_LINKER_FLAGS)

# if(CMAKE_CXX_COMPILER_ID MATCHES "Clang")
Expand Down
84 changes: 47 additions & 37 deletions doc/BUILDING.md
Original file line number Diff line number Diff line change
@@ -1,31 +1,23 @@
# Building

## CMake
Install CMake > 3.20.

- Build commands:
```bash
cmake -DCMAKE_BUILD_TYPE=Debug -S . -B $BUILD_DIR
cmake --build $BUILD_DIR
```

- Test commands:
```bash
cmake --build $BUILD_DIR -t test
```

- Install command:
```bash
cmake --install $BUILD_DIR --prefix $INSTALL_DIR --component geds
```

## Docker

`build-docker.sh` builds a docker container with GRPC and a build of GEDS in `/usr/local/opt/geds`.

## Dependencies

### MacOS
# Building GEDS

- [Workflow](#workflow)
- [Instructions for MacOS](#instructions-for-macos)
- [Instructions for Windows](#instructions-for-windows)
- [Instructions for Linux](#instructions-for-linux)
- [Deploying via Docker](#deploying-via-docker)
- [Deploying via Ansible](#deploying-via-ansible)

## Workflow <a name="workflow"></a>
The general workflow of building GEDS from source is:
1. Pull GEDS repository: `git pull https://github.com/IBM/GEDS.git`
2. Install dependencies, e.g. `cmake` version > 3.20 (check via `cmake --version`)
3. Create `build` and `install` directory in the GEDS folder and set environment variables: `export $BUILD_DIR=~/GEDS/build` & `export $INSTALL_DIR=~/GEDS/bin`
4. Build Boost
5. Build AWS SDK
6. Build GEDS
7. Install GEDS

## Instructions for MacOS <a name="instructions-for-macos"></a>

Install the following dependencies through homebrew:

Expand Down Expand Up @@ -54,23 +46,41 @@ Finally build it with:
cmake --build . --target all
```

### Linux
## Instructions for Windows <a name="instructions-for-windows"></a>
Coming

Install the following dependencies:
## Instructions for Linux <a name="instructions-for-linux"></a>
Install GEDS dependencies:

```
apt-get install -y \
clang \
curl wget \
build-essential gcc ninja-build \
openjdk-11-jdk \
python3.9 python3.9-dev python3-distutils
sudo apt install -y clang curl wget build-essential gcc ninja-build openjdk-11-jdk python3-dev python3-distutils cmake
```

and a recent version (>= 3.20) of CMake:
CMake version >= 3.20:
```
CMAKE_VERSION=3.22.4
wget --quiet -O cmake.tar.gz https://github.com/Kitware/CMake/releases/download/v${CMAKE_VERSION}/cmake-${CMAKE_VERSION}-linux-x86_64.tar.gz \
&& tar xf cmake.tar.gz --strip-components=1 -C /usr/local/ \
&& rm cmake.tar.gz
```

Install AWS SDK dependecies:
```
sudo apt install libcurl4-openssl-dev libssl-de uuid-dev zlib1g-dev libpulse-dev
```

Build AWS SDK: `/bin/bash build-aws-sdk.sh`

Build Boost: `/bin/bash build-boost.sh`

Build GEDS:
1. Check if environment variables are correctly set via `printenv | grep BUILD_DIR` and `printenv | grep INSTALL_DIR`
2. `cmake -DCMAKE_BUILD_TYPE=Debug -S . -B $BUILD_DIR`
3. `cmake --build $BUILD_DIR -j 4` (-j specifies the number of cores to use)
4. `cmake --install $BUILD_DIR --prefix $INSTALL_DIR --component geds`

## Deploying via Docker <a name="deploying-via-docker"></a>
`build-docker.sh` builds a docker container with GRPC and a build of GEDS in `/usr/local/opt/geds`.

## Deploying via Ansible <a name="deploying-via-ansible"></a>
We offer an Ansible playbook to automate GEDS building from source on multiple clients.
123 changes: 123 additions & 0 deletions doc/geds_ansible.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
---
- hosts: geds
name: Update all apt packages
become: false
vars:
ansible_python_interpreter: /usr/bin/python3
remote_home: "{{ ansible_env.HOME }}"

tasks:
- name: Update and upgrade
tags: update
become: true
apt:
upgrade: yes
update_cache: yes

- name: Reboot
tags: reboot
become: true
reboot:

- name: Install GEDS dependencies
tags: dependencies
become: true
apt:
pkg:
- clang
- curl
- wget
- build-essential
- gcc
- ninja-build
- openjdk-11-jdk
- python3-dev
- python3-distutils
- cmake
state: latest
update_cache: yes

- name: Create GEDS directory
tags: git
become: false
file:
path: "{{ remote_home }}/GEDS"
state: directory

- name: Git clone GEDS
tags: git
become: false
ansible.builtin.git:
repo: "https://github.com/IBM/GEDS.git"
dest: "{{ remote_home }}/GEDS/"

- name: AWS dependencies
tags: aws
become: true
apt:
pkg:
- libcurl4-openssl-dev
- libssl-dev
- uuid-dev
- zlib1g-dev
- libpulse-dev
state: latest
update_cache: yes

- name: Build AWS
tags: aws
become: false
ansible.builtin.command: /bin/bash build-aws-sdk.sh
async: 3600
poll: 30
args:
chdir: "{{ remote_home }}/GEDS"

- name: Build boost
tags: boost
become: false
ansible.builtin.command: /bin/bash build-boost.sh
args:
chdir: "{{ remote_home }}/GEDS"

- name: Create GEDS build directory
tags: geds
become: false
file:
path: "{{ remote_home }}/GEDS/build"
state: directory

- name: Build GEDS
tags: geds
become: false
ansible.builtin.command: cmake -DCMAKE_BUILD_TYPE=Debug -S . -B $BUILD_DIR
args:
chdir: "{{ remote_home }}/GEDS"
environment:
BUILD_DIR: "{{ remote_home }}/GEDS/build"

- name: Build GEDS
tags: geds
become: false
ansible.builtin.command: cmake --build $BUILD_DIR -j 4
args:
chdir: "{{ remote_home }}/GEDS"
environment:
BUILD_DIR: "{{ remote_home }}/GEDS/build"

- name: Create GEDS install directory
tags: geds
become: false
file:
path: "{{ remote_home }}/GEDS/bin"
state: directory

- name: Install GEDS
tags: geds
become: false
ansible.builtin.command: cmake --install $BUILD_DIR --prefix $INSTALL_DIR --component geds
args:
chdir: "{{ remote_home }}/GEDS"
environment:
BUILD_DIR: "{{ remote_home }}/GEDS/build"
INSTALL_DIR: "{{ remote_home }}/GEDS/bin"
2 changes: 2 additions & 0 deletions src/metadataservice/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ set(SOURCES
ObjectStoreHandler.h
S3Helper.cpp
S3Helper.h
PubSubMQTT.h
)

add_library(libmetadataservice STATIC ${SOURCES})
Expand All @@ -19,6 +20,7 @@ target_link_libraries(libmetadataservice
geds_utility
geds_proto
geds_s3
paho-mqttpp3
)
target_compile_options(libmetadataservice PUBLIC ${GEDS_EXTRA_COMPILER_FLAGS})
target_compile_definitions(libmetadataservice
Expand Down
130 changes: 130 additions & 0 deletions src/metadataservice/PubSubMQTT.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/**
* Copyright 2022- IBM Inc. All rights reserved
* SPDX-License-Identifier: Apache-2.0
*/

#include "mqtt/async_client.h"


// https://github.com/eclipse/paho.mqtt.cpp/issues/141
mqtt::async_client_ptr createClient(std::string serverAddress,
std::string clientID){
// Use MQTT v5 to enable no_local:
// This flag avoids receiving messages from the same host by telling
// the broker not to send messages received with a client ID to a
// subscriber with the same client ID

mqtt::create_options createOpts = mqtt::create_options();
createOpts.set_mqtt_verison(5);
std::string persistDir= "None";
auto client_ptr = std::make_shared<mqtt::async_client>(serverAddress,
clientID,
createOpts,
persistDir);
std::cout << "Created MQTT client to " + serverAddress + " and ID " + clientID << std::endl;
return client_ptr;
}

mqtt::async_client_ptr connectClient(std::shared_ptr<mqtt::async_client> client_ptr,
std::string node){
auto connOpts = std::make_shared<mqtt::connect_options>();

if (node == "server"){
connOpts->set_keep_alive_interval(20);
connOpts->set_clean_session(false);
connOpts->set_automatic_reconnect(true);
connOpts->set_mqtt_version(5);
}
if (node == "client"){
connOpts->set_clean_session(false);
connOpts->set_mqtt_version(5);
}
client_ptr->connect(*connOpts)->wait();
std::cout << "Connected MQTT client" << std::endl;
return client_ptr;
}

void publishData(mqtt::async_client_ptr client_ptr,
std::string topic,
int QoS,
std::string data){
mqtt::topic top(*client_ptr, topic, QoS, true);
mqtt::message_ptr message = mqtt::make_message(topic, data);
message->set_qos(QoS);
message->set_payload("A single message");

client_ptr->publish(message);
std::cout << "Published data to " + topic << std::endl;
}

mqtt::async_client_ptr subscribe(mqtt::async_client_ptr client_ptr,
std::string topic,
int QoS){
mqtt::subscribe_options subOpts;
subOpts.set_no_local(true); // Only works with MQTT v5

client_ptr->subscribe(topic, QoS, subOpts)->wait();
std::cout << "Subscribed to " + topic << std::endl;
return client_ptr;
}

void unsubscribe(mqtt::async_client_ptr client_ptr,
std::string topic){
client_ptr->unsubscribe(topic)->wait();
client_ptr->stop_consuming();
std::cout << "Unsubscribed from " + topic << std::endl;
}

std::tuple<std::string, std::string> consumeMessage(mqtt::async_client_ptr client_ptr){
client_ptr->start_consuming();
auto msg = client_ptr->consume_message();
// msg->get_payload()
return std::make_tuple(msg->get_topic(), msg->to_string());
}

void disconnectClient(mqtt::async_client_ptr client_ptr){
client_ptr->disconnect()->wait();
std::cout << "Disconnected client" << std::endl;
}


// Example publisher
// ---------------------------------------------------------------
// #include "PubSub.h"
//
// int main(int argc, char **argv) {
// mqtt::async_client_ptr client_ptr = createClient("tcp://localhost:1883",
// "mds_server");
// std::string node_type = "server";
// mqtt::async_client_ptr connected_client_ptr = connectClient(client_ptr,
// node_type);
// publishData(connected_client_ptr, "home/file1", 1, "Hello World");
// disconnectClient(connected_client_ptr);
// return 0;
// }

// Example subscriber
// ---------------------------------------------------------------
// #include "PubSub.h"
//
// int main(void)
// {
// mqtt::async_client_ptr client_ptr = createClient("tcp://localhost:1883",
// "mds_client");
// std::string node_type = "client";
// mqtt::async_client_ptr connected_client_ptr = connectClient(client_ptr,
// node_type);
// mqtt::async_client_ptr subscribed_client_ptr = subscribe(connected_client_ptr,
// "home/file1", 1);
// while (true) {
// auto msg = consumeMessage(subscribed_client_ptr);
// std::string topic = std::get<0>(msg);
// std::string payload = std::get<1>(msg);
//
// if (payload.empty()){
// break;
// }
// std::cout << topic + " " + payload << std::endl;
// }
// return 0;
// }
Loading