Skip to content

Commit

Permalink
[SYSTEMDS-3794] Fix multi-threaded sparse matrix-vector elementwise ops
Browse files Browse the repository at this point in the history
There was a regression where all sparse matrix-vector elementwise
operations are now only executed single-threaded. This patch fixes
the most important branch for sparse-safe matrix-vector operations,
but in subsequent task we also need to fix all the other cases.

When running connected components on the Europe road network, the
individual binary multiply operations improved by 10-20x on a box with
48 vcores. End-to-end the entire components() invocation with 20
iterations improved from 282s (246s for b(*)) to 112s (75s for b(*)).
The 10x improvements do not carry fully through because the output MCSR
is converted to CSR when appending to the buffer pool (57s of 75s).
  • Loading branch information
mboehm7 committed Nov 24, 2024
1 parent 4e00aa1 commit cdc2e2c
Showing 1 changed file with 23 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public static MatrixBlock bincellOpScalar(MatrixBlock m1, MatrixBlock ret, Scala
public static MatrixBlock bincellOp(MatrixBlock m1, MatrixBlock m2, MatrixBlock ret, BinaryOperator op) {
try{

// Timing time = new Timing(true);
//Timing time = new Timing(true);
isValidDimensionsBinary(m1, m2);
op = replaceOpWithSparseSafeIfApplicable(m1, m2, op);

Expand Down Expand Up @@ -212,11 +212,13 @@ public static MatrixBlock bincellOp(MatrixBlock m1, MatrixBlock m2, MatrixBlock
int k = op.getNumThreads();

// fallback to sequential computation for specialized operations
// TODO fix all variants to be feasible for multi-threading
if(k <= 1 || m1.isEmpty() || m2.isEmpty()
|| ret.getLength() < PAR_NUMCELL_THRESHOLD2
|| ((op.sparseSafe || isSparseSafeDivideOrPow(op, m2))
&& !(atype == BinaryAccessType.MATRIX_MATRIX
|| atype.isMatrixVector() && isAllDense(m1, m2, ret)))
|| (atype.isMatrixVector() && isAllDense(m1, m2, ret))
|| (atype.isMatrixVector() && m1.sparse && (m2.sparse || ret.sparse))))
|| !CommonThreadPool.useParallelismOnThread())
{
bincellOpMatrixSingle(m1, m2, ret, op, atype);
Expand All @@ -227,7 +229,7 @@ public static MatrixBlock bincellOp(MatrixBlock m1, MatrixBlock m2, MatrixBlock

if(ret.isEmptyBlock(false))
ret.examSparsity(k);
// System.out.println("BinCell " + op + " " + m1.getNumRows() + ", " + m1.getNumColumns() + ", " + m1.getNonZeros()
//System.out.println("BinCell " + op + " " + m1.getNumRows() + ", " + m1.getNumColumns() + ", " + m1.getNonZeros()
// + " -- " + m2.getNumRows() + ", " + m2.getNumColumns() + " " + m2.getNonZeros() + "\t\t" + time.stop());

return ret;
Expand Down Expand Up @@ -732,7 +734,7 @@ else if( m1.sparse && !m2.sparse && !m2.isEmpty() && !ret.sparse
&& atype == BinaryAccessType.MATRIX_ROW_VECTOR)
safeBinaryMVSparseDenseRow(m1, m2, ret, op);
else if( m1.sparse ) //SPARSE m1
safeBinaryMVSparseLeft(m1, m2, ret, op);
return safeBinaryMVSparseLeft(m1, m2, ret, op, rl, ru);
else if( !m1.sparse && !m2.sparse && ret.sparse && op.fn instanceof Multiply
&& atype == BinaryAccessType.MATRIX_COL_VECTOR
&& (long)m1.rlen * m2.clen < Integer.MAX_VALUE )
Expand Down Expand Up @@ -977,39 +979,39 @@ else if( !skipEmpty && m2.isEmptyBlock(false) && (op.fn instanceof Minus || op.f
ret.nonZeros = nnz;
}

private static void safeBinaryMVSparseLeft(MatrixBlock m1, MatrixBlock m2, MatrixBlock ret, BinaryOperator op) {
private static long safeBinaryMVSparseLeft(MatrixBlock m1, MatrixBlock m2, MatrixBlock ret,
BinaryOperator op, int rl, int ru)
{
boolean isMultiply = (op.fn instanceof Multiply);
boolean skipEmpty = (isMultiply || isSparseSafeDivideOrPow(op, m2));
BinaryAccessType atype = getBinaryAccessType(m1, m2);

// early abort on skip and empty
if(skipEmpty && (m1.isEmptyBlock(false) || m2.isEmptyBlock(false)))
return; // skip entire empty block

return 0; // skip entire empty block

if(atype == BinaryAccessType.MATRIX_COL_VECTOR)
safeBinaryMVSparseLeftColVector(m1, m2, ret, op);
safeBinaryMVSparseLeftColVector(m1, m2, ret, op, rl, ru);
else if(atype == BinaryAccessType.MATRIX_ROW_VECTOR)
safeBinaryMVSparseLeftRowVector(m1, m2, ret, op);

ret.recomputeNonZeros();
safeBinaryMVSparseLeftRowVector(m1, m2, ret, op, rl, ru);

return ret.recomputeNonZeros(rl, ru-1);
}

@SuppressWarnings("null")
private static void safeBinaryMVSparseLeftColVector(MatrixBlock m1, MatrixBlock m2, MatrixBlock ret, BinaryOperator op) {
private static void safeBinaryMVSparseLeftColVector(MatrixBlock m1, MatrixBlock m2, MatrixBlock ret,
BinaryOperator op, int rl, int ru)
{
final boolean isMultiply = (op.fn instanceof Multiply);
final boolean skipEmpty = (isMultiply || isSparseSafeDivideOrPow(op, m2));

final int rlen = m1.rlen;
final int clen = m1.clen;
final SparseBlock a = m1.sparseBlock;
final boolean aNull = a == null;
if(skipEmpty && a == null)
return;
if(ret.isInSparseFormat()){
final SparseBlockMCSR rb = (SparseBlockMCSR) ret.getSparseBlock();
for(int i = 0; i < rlen; i++) {
for(int i = rl; i < ru; i++) {
final double v2 = m2.get(i, 0);
final boolean emptyRow = !aNull ? a.isEmpty(i) : true;
if((skipEmpty && (emptyRow || v2 == 0)) // skip empty one side zero
Expand All @@ -1029,7 +1031,7 @@ else if(!fill)
}
else{
final DenseBlock db = ret.getDenseBlock();
for(int i = 0; i < rlen; i++) {
for(int i = rl; i < ru; i++) {
final double v2 = m2.get(i, 0);

final boolean emptyRow = !aNull ? a.isEmpty(i) : true;
Expand All @@ -1045,7 +1047,6 @@ else if(!fill)
safeBinaryMVSparseColVectorRowNoFill(a, i, db, v2, emptyRow, op);
else // GENERAL CASE
safeBinaryMVSparseColVectorRowWithFill(a, i, db, vz, v2, clen, emptyRow, op);

}
}
}
Expand Down Expand Up @@ -1141,18 +1142,17 @@ private static final void fillZeroValuesScalar( double v, DenseBlock ret,
ret.set(rpos, rpos + 1, cpos, len, v);
}

@SuppressWarnings("null")
private static void safeBinaryMVSparseLeftRowVector(MatrixBlock m1, MatrixBlock m2, MatrixBlock ret, BinaryOperator op) {
private static void safeBinaryMVSparseLeftRowVector(MatrixBlock m1, MatrixBlock m2, MatrixBlock ret,
BinaryOperator op, int rl, int ru)
{
boolean isMultiply = (op.fn instanceof Multiply);
boolean skipEmpty = (isMultiply || isSparseSafeDivideOrPow(op, m2));

int rlen = m1.rlen;
int clen = m1.clen;
SparseBlock a = m1.sparseBlock;
if(ret.isInSparseFormat()){
SparseBlock sb = ret.getSparseBlock();
long nnz = 0;
for(int i = 0; i < rlen; i++) {
for(int i = rl; i < ru; i++) {
if(skipEmpty && (a == null || a.isEmpty(i)))
continue; // skip empty rows
if(skipEmpty && ret.sparse)
Expand All @@ -1170,18 +1170,16 @@ private static void safeBinaryMVSparseLeftRowVector(MatrixBlock m1, MatrixBlock
double v2 = m2.get(0, aix[j]);
double v = op.fn.execute(avals[j], v2);
sb.append(i, aix[j], v);
nnz += v != 0 ? 1 : 0;
lastIx = aix[j];
}
}
// empty left
fillZeroValues(op, m2, ret, skipEmpty, i, lastIx + 1, clen);
}
ret.setNonZeros(nnz);
}
else{
DenseBlock db = ret.getDenseBlock();
for(int i = 0; i < rlen; i++){
for(int i = rl; i < ru; i++){
if(skipEmpty && (a == null || a.isEmpty(i)))
continue; // skip empty rows
if(skipEmpty && ret.sparse)
Expand Down

0 comments on commit cdc2e2c

Please sign in to comment.