Skip to content

Commit

Permalink
#2379 added activemq dependency; implemented event publishing to Acti…
Browse files Browse the repository at this point in the history
…veMQ
  • Loading branch information
tjamakeev committed Apr 11, 2018
1 parent 85539d4 commit 8a9f05e
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@
<artifactId>nimbus-jose-jwt</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-client -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.15.3</version>
</dependency>

<!-- Testing -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.subutai.core.identity.api.IdentityManager;
import io.subutai.core.identity.api.exception.TokenCreateException;
import io.subutai.core.peer.api.PeerManager;
import io.subutai.hub.share.Utils;
import io.subutai.hub.share.dto.environment.EnvironmentInfoDto;
import io.subutai.hub.share.event.Event;
import io.subutai.hub.share.json.JsonUtil;
Expand All @@ -30,14 +31,16 @@ public class EnvironmentMetadataManagerImpl implements EnvironmentMetadataManage
private final IdentityManager identityManager;
private PeerManager peerManager;
private EnvironmentManager environmentManager;
private String brokerURL;


public EnvironmentMetadataManagerImpl( PeerManager peerManager, EnvironmentManager environmentManager,
IdentityManager identityManager )
IdentityManager identityManager, final String brokerURL )
{
this.peerManager = peerManager;
this.environmentManager = environmentManager;
this.identityManager = identityManager;
this.brokerURL = brokerURL;
}


Expand All @@ -62,9 +65,8 @@ public void issueToken( String containerIp ) throws TokenCreateException
String environmentId = container.getEnvironmentId().getId();
String containerId = container.getContainerId().getId();
String peerId = container.getPeerId();
String origin = String.format( "%s.%s.%s", peerId, containerId, environmentId );
String origin = Utils.buildSubutaiOrigin( environmentId, peerId, containerId );
final String token = identityManager.issueJWTToken( origin );
// environmentManager.placeTokenToContainer( environmentId, containerIp, token );

placeTokenIntoContainer( container, token );
}
Expand All @@ -91,15 +93,26 @@ public void pushEvent( final Event event )
{
try
{
LOG.debug( "Event received: {} {}", event, JsonUtil.toJson( event ) );
String jsonEvent = JsonUtil.toJson( event );
LOG.debug( "Event received: {} {}", event, jsonEvent );
LOG.debug( "OS: {}", event.getCustomMetaByKey( "OS" ) );
LOG.debug( "Nature: {}", event.getPayload().getNature() );
String destination = "events." + event.getOrigin().getId();

thread( new EventProducer( brokerURL, destination, jsonEvent ), true );
}
catch ( JsonProcessingException e )
{
LOG.error( e.getMessage(), e );
}
// TODO: send event to consumers
}


private void thread( Runnable runnable, boolean daemon )
{
Thread brokerThread = new Thread( runnable );
brokerThread.setDaemon( daemon );
brokerThread.start();
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package io.subutai.core.environment.metadata.impl;


import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.activemq.ActiveMQConnectionFactory;


public class EventProducer implements Runnable
{
private static final Logger LOG = LoggerFactory.getLogger( EventProducer.class );
private final String destination;
private String message;

private String brokerURL;
private ActiveMQConnectionFactory connectionFactory;


public EventProducer( final String brokerURL, final String destination, final String message )
{
this.brokerURL = brokerURL;
this.destination = destination;
this.message = message;
// Create a ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory( this.brokerURL );
}


@Override
public void run()
{
// Create a Connection
Connection connection = null;
try
{
connection = connectionFactory.createConnection();

connection.start();

// Create a Session
Session session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE );

// Create the destination (Topic or Queue)
Destination destination = session.createTopic( this.destination );

// Create a MessageProducer from the Session to the Topic or Queue
MessageProducer producer = session.createProducer( destination );
producer.setDeliveryMode( DeliveryMode.NON_PERSISTENT );

// Create a messages
TextMessage message = session.createTextMessage( this.message );

// Tell the producer to send the message
producer.send( message );

// Clean up
session.close();
connection.close();
}
catch ( JMSException e )
{
LOG.error( e.getMessage(), e );
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:ext="http://aries.apache.org/blueprint/xmlns/blueprint-ext/v1.0.0"
default-activation="eager"
xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
xsi:schemaLocation="http://www.w3.org/201/XMLSchema-instance http://www.w3.org/2001/XMLSchema-instance
0"
<blueprint
xmlns:ext="http://aries.apache.org/blueprint/xmlns/blueprint-ext/v1.0.0"
default-activation="eager"
xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
>
<!-- Allow the use of system properties -->
<ext:property-placeholder placeholder-prefix="$[" placeholder-suffix="]" />
Expand All @@ -23,6 +21,7 @@
<argument ref="peerManager" />
<argument ref="environmentManager" />
<argument ref="identityManager" />
<argument value="tcp://brokerHost:61616" />
</bean>

<service ref="environmentMetadataManager"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.subutai.hub.share;


public class Utils
{
public static String buildSubutaiOrigin( final String environmentId, final String peerId,
final String containerId )
{
return String.format( "%s.%s.%s", environmentId, peerId, containerId, environmentId );
}
}
Original file line number Diff line number Diff line change
@@ -1,31 +1,35 @@
package io.subutai.hub.share.event.meta;


import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;

import io.subutai.hub.share.Utils;


public class OriginMeta implements Meta
{
private String id;

@JsonIgnore
@JsonProperty
private String peerId;

@JsonIgnore
@JsonProperty
private String containerId;

@JsonIgnore
@JsonProperty
private String environmentId;


@JsonCreator
public OriginMeta( final String subutaiOrigin )
{
Preconditions.checkNotNull( subutaiOrigin );
this.id = subutaiOrigin;
prase();
final String[] parts = subutaiOrigin.split( "\\." );
if ( parts.length != 3 )
{
throw new IllegalArgumentException( "Invalid origin argument." );
}
this.environmentId = parts[0];
this.peerId = parts[1];
this.containerId = parts[2];
}


Expand All @@ -34,24 +38,9 @@ private OriginMeta()
}


private void prase()
{
final String[] splittedOrigins = this.id.split( ":" );
this.peerId = splittedOrigins[0];
if ( splittedOrigins.length > 1 )
{
this.containerId = splittedOrigins[1];
if ( splittedOrigins.length > 2 )
{
this.environmentId = splittedOrigins[2];
}
}
}


public String getId()
{
return id;
return Utils.buildSubutaiOrigin( this.environmentId, this.peerId, this.containerId );
}


Expand All @@ -76,6 +65,6 @@ public String getContainerId()
@Override
public String toString()
{
return String.format( "%s:%s:%s", peerId, environmentId, containerId );
return this.getId();
}
}

0 comments on commit 8a9f05e

Please sign in to comment.