-
Notifications
You must be signed in to change notification settings - Fork 12
How to set timestamp in an HBase cell
The HBASEPut operator in streamsx.hbase toolkit provides from version 3.7.0 two new parameters to add time stamp:
-
Timestamp This parameter specifies the timestamp in milliseconds (INT64).
The timestamp allows for versioning of the HBase cells. Every time HBaes make a PUT on a table it set the timestamp.
By default this is the current time in milliseconds, but it is possible to set your own timestamp as well with this parameter.
Cannot be used with TimestampAttrName parameter. -
TimestampAttrName Name of the attribute on the input tuple containing the timestamp in milliseconds.
Cannot be used with Timestamp parameter.
https://github.com/IBMStreams/streamsx.hbase/releases/tag/v3.7.0
The following SPL application demonstrates how to put time stamp in an HBase cell.
Before you start with testing of this SPL application, please perform the following steps:
- 1 - Create a test table on your HBase database.
create 'timestamp-table', 'all'
-
2- Copy the HBase configuration xml file from your HBase server into etc directory.
etc/hbase-site.xml
If your hadoop cluster is 'kerberized':
-
3- copy teh HBase keytab file from your HBase server into etc directory of your SPL project.
etc/hbase.headless.keytab
-
4- Copy the kerberose configuration file
krb5.conf
in root /etc directory. -
5- Check the keytab file with kinit tool.
kinit -k -t etc/hbase.headless.keytab <your-hbase-principal>
-
6- Adapt the following SPL parameters with your your hbase principal.
-
7- Make the application and start it.
In this SPL application:
-
InputStream
creates data. -
putToHbase
puts created data by InputStream into 'timestamp-table' . -
HbaseScan
scans the rows from table 'timestamp-table' .
namespace application ;
use com.ibm.streamsx.hbase::HBASEPut ;
use com.ibm.streamsx.hbase::HBASEScan ;
composite HbaseTimestamp {
param
expression<rstring> $authKeytab : getSubmissionTimeValue("authKeytab", "etc/hbase.headless.keytab");
expression<rstring> $authPrincipal : getSubmissionTimeValue("authPrincipal", "[email protected]");
expression<rstring> $hbaseSite : getSubmissionTimeValue("hbaseSite", "etc/hbase-site.xml");
type
HbasePut = rstring table, rstring row, rstring key, rstring value, int64 ts ;
graph
// generates input data (tablename, row, key, valte, timestampe)
stream<HbasePut> InputStream = Custom()
{
logic
onProcess :
{
for(int32 rowId in range(1,4))
{
for(int32 keyId in range(1,4))
{
submit({
table = "timestamp-table",
row = "row_" + (rstring)rowId,
key = "key_" + (rstring)keyId,
value = "value_" + (rstring)(int32)(random()*10000.0),
ts = (int64)(getTimestampInSecs()*1000.0)},
InputStream) ;
}
}
submit(Sys.WindowMarker, InputStream) ;
submit(Sys.FinalMarker, InputStream) ;
}
}
() as printInputStream = Custom(InputStream)
{
logic
onTuple InputStream : printStringLn("InputStream tuple " + (rstring) InputStream) ;
}
// puts data created by InputStreams into hbase table timestamp-table
(stream<boolean success> putToHbase ; stream<rstring errorText> Error)= HBASEPut(InputStream){
param
authPrincipal : $authPrincipal ;
authKeytab : $authKeytab ;
hbaseSite : $hbaseSite ;
tableNameAttribute : table ;
rowAttrName : "row" ;
columnQualifierAttrName : "key" ;
valueAttrName : "value" ;
staticColumnFamily : "all" ;
successAttr : "success" ;
// Timestamp : (int64)(getTimestampInSecs()*1000.0);
TimestampAttrName : "ts" ;
vmArg : "-Djava.security.krb5.conf=/etc/krb5.conf" ;
}
// prints the success value after put operation.
()as printPutToHbase = Custom(putToHbase){
logic
onTuple putToHbase : printStringLn((rstring)putToHbase);
}
// In case of any error it prints the error message.
()as printError = Custom(Error){
logic
onTuple Error : printStringLn((rstring)Error);
}
// scans the rows from 'timestamp-table'
stream<rstring row, int32 numResults,
tuple< list<tuple<rstring value, int64 ts>> key_1,
list<tuple<rstring value, int64 ts>> key_2,
list<tuple<rstring value, int64 ts>> key_3 > results> HbaseScan =
HBASEScan()
{
param
authPrincipal : $authPrincipal ;
authKeytab : $authKeytab ;
hbaseSite : $hbaseSite ;
initDelay: 10.0;
tableName : "timestamp-table" ;
outAttrName : "results" ;
staticColumnFamily : "all" ;
outputCountAttr : "numResults" ;
maxVersions : 0 ;
// We have set maxVersions =0. If we had left it unSet, we'd only get the most recent version.
// endRow : "row_4";
vmArg : "-Djava.security.krb5.conf=/etc/krb5.conf" ;
}
// prints scan results
()as printHbaseScan = Custom(HbaseScan){
logic
onTuple HbaseScan : printStringLn((rstring)HbaseScan);
}
}