Skip to content

Commit

Permalink
Added MapMany Rx test
Browse files Browse the repository at this point in the history
  • Loading branch information
timyates committed Jun 13, 2013
1 parent f9ff445 commit 342be90
Showing 1 changed file with 64 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public class JavaBasedRxTest extends TestVerticle {

private RxEventBus rxEventBus ;

private JsonObject createMsg, insertMsg, selectMsg, dropMsg ;

private void appReady() {
super.start() ;
}
Expand All @@ -34,12 +36,73 @@ public void start() {
container.deployModule(System.getProperty("vertx.modulename"), config, 1, new AsyncResultHandler<String>() {
@Override
public void handle(AsyncResult<String> event1) {
createMsg = new JsonObject() {{
putString( "action", "execute" ) ;
putString( "stmt", "CREATE TABLE IF NOT EXISTS test ( id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 1 INCREMENT BY 1) NOT NULL, name VARCHAR(80), age INTEGER, CONSTRAINT testid PRIMARY KEY ( id ) )" ) ;
}} ;
insertMsg = new JsonObject() {{
putString( "action", "insert" ) ;
putString( "stmt", "INSERT INTO test ( name, age ) VALUES ( ?, ? )" ) ;
putArray( "values", new JsonArray( new ArrayList() {{
add( new ArrayList() {{ add( "tim" ) ; add( 65 ) ; }} ) ;
add( new ArrayList() {{ add( "dave" ) ; add( 29 ) ; }} ) ;
add( new ArrayList() {{ add( "mike" ) ; add( 42 ) ; }} ) ;
}} ) ) ;
}} ;
selectMsg = new JsonObject() {{
putString( "action", "select" ) ;
putString( "stmt", "SELECT * FROM test ORDER BY age ASC" ) ;
}} ;
dropMsg = new JsonObject() {{
putString( "action", "execute" ) ;
putString( "stmt", "DROP TABLE test" ) ;
}} ;
appReady() ;
}
} ) ;
}


@Test
public void testCreateInsertSelectAndDropMapMany() {
rxEventBus.send( "test.persistor", createMsg )
.mapMany( new Func1<RxMessage<JsonObject>, Observable<RxMessage<JsonObject>>>() {
@Override
public Observable<RxMessage<JsonObject>> call(RxMessage<JsonObject> message) {
assertEquals( message.body().getString( "status" ), "ok" ) ;
return rxEventBus.send( "test.persistor", insertMsg ) ;
}
} )
.mapMany( new Func1<RxMessage<JsonObject>, Observable<RxMessage<JsonObject>>>() {
@Override
public Observable<RxMessage<JsonObject>> call(RxMessage<JsonObject> message) {
assertEquals( message.body().getString( "status" ), "ok" ) ;
JsonArray result = message.body().getArray( "result" ) ;
assertEquals( ((JsonObject)result.get( 0 )).getNumber( "ID" ), 1 ) ;
assertEquals( ((JsonObject)result.get( 1 )).getNumber( "ID" ), 2 ) ;
assertEquals( ((JsonObject)result.get( 2 )).getNumber( "ID" ), 3 ) ;
return rxEventBus.send( "test.persistor", selectMsg ) ;
}
} )
.mapMany( new Func1<RxMessage<JsonObject>, Observable<RxMessage<JsonObject>>>() {
@Override
public Observable<RxMessage<JsonObject>> call(RxMessage<JsonObject> message) {
assertEquals( message.body().getString( "status" ), "ok" ) ;
JsonArray result = message.body().getArray( "result" ) ;
assertEquals( ((JsonObject)result.get( 0 )).getNumber( "AGE" ), 29 ) ;
assertEquals( ((JsonObject)result.get( 1 )).getNumber( "AGE" ), 42 ) ;
assertEquals( ((JsonObject)result.get( 2 )).getNumber( "AGE" ), 65 ) ;
return rxEventBus.send( "test.persistor", dropMsg ) ;
}
} )
.subscribe( new Action1<RxMessage<JsonObject>>() {
@Override
public void call( RxMessage<JsonObject> message ) {
assertEquals( message.body().getString( "status" ), "ok" ) ;
testComplete() ;
}
} ) ;
}

@Test
public void testCreateInsertSelectAndDrop() {
// Last subscription in the chain; drop the table
Expand All @@ -59,11 +122,6 @@ public void call( RxMessage<JsonObject> message ) {
assertEquals( ((JsonObject)result.get( 0 )).getNumber( "AGE" ), 29 ) ;
assertEquals( ((JsonObject)result.get( 1 )).getNumber( "AGE" ), 42 ) ;
assertEquals( ((JsonObject)result.get( 2 )).getNumber( "AGE" ), 65 ) ;

final JsonObject dropMsg = new JsonObject() {{
putString( "action", "execute" ) ;
putString( "stmt", "DROP TABLE test" ) ;
}} ;
rxEventBus.send( "test.persistor", dropMsg ).subscribe( dropSubscription ) ;
}
} ;
Expand All @@ -76,11 +134,6 @@ public void call( RxMessage<JsonObject> message ) {
assertEquals( ((JsonObject)result.get( 0 )).getNumber( "ID" ), 1 ) ;
assertEquals( ((JsonObject)result.get( 1 )).getNumber( "ID" ), 2 ) ;
assertEquals( ((JsonObject)result.get( 2 )).getNumber( "ID" ), 3 ) ;

final JsonObject selectMsg = new JsonObject() {{
putString( "action", "select" ) ;
putString( "stmt", "SELECT * FROM test ORDER BY age ASC" ) ;
}} ;
rxEventBus.send( "test.persistor", selectMsg ).subscribe( selectSubscription ) ;
}
} ;
Expand All @@ -90,17 +143,6 @@ public void call( RxMessage<JsonObject> message ) {
@Override
public void call( RxMessage<JsonObject> message ) {
assertEquals( message.body().getString( "status" ), "ok" ) ;

final JsonArray insertValues = new JsonArray( new ArrayList() {{
add( new ArrayList() {{ add( "tim" ) ; add( 65 ) ; }} ) ;
add( new ArrayList() {{ add( "dave" ) ; add( 29 ) ; }} ) ;
add( new ArrayList() {{ add( "mike" ) ; add( 42 ) ; }} ) ;
}} ) ;
final JsonObject insertMsg = new JsonObject() {{
putString( "action", "insert" ) ;
putString( "stmt", "INSERT INTO test ( name, age ) VALUES ( ?, ? )" ) ;
putArray( "values", insertValues ) ;
}} ;
rxEventBus.send( "test.persistor", insertMsg ).subscribe( insertSubscription ) ;
}
} ;
Expand Down

0 comments on commit 342be90

Please sign in to comment.