Skip to content

Commit

Permalink
fix: streaming plugin stdout to koji
Browse files Browse the repository at this point in the history
  • Loading branch information
TurtIeSocks committed Dec 29, 2023
1 parent f46c158 commit ab601f4
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 51 deletions.
3 changes: 2 additions & 1 deletion or-tools/tsp/tsp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ int main(int argc, char *argv[])

for (auto point : routes)
{
std::cout << stringPoints[point] << " ";
std::cout << stringPoints[point] << std::endl
<< std::flush;
}

return EXIT_SUCCESS;
Expand Down
91 changes: 41 additions & 50 deletions server/algorithms/src/plugin.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::fmt::Display;
use std::io::Write;
use std::io::{self, BufRead, BufReader};
use std::path::Path;
use std::process::{Command, Stdio};
use std::time::Instant;

use crate::s2::create_cell_map;
use crate::utils;
use model::api::single_vec::SingleVec;
use rayon::iter::{Either, IntoParallelIterator, ParallelIterator};
use rayon::iter::{IntoParallelIterator, ParallelIterator};

#[derive(Debug)]
pub enum Folder {
Expand Down Expand Up @@ -184,71 +184,62 @@ impl Plugin {
Err(err) => return Err(err),
};

let mut stdin = match child.stdin.take() {
Some(stdin) => stdin,
None => {
return Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
"failed to open stdin",
));
let stdout = child
.stdout
.take()
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Could not capture stdout"))?;

let mut results = vec![];
let mut invalid = vec![];
let reader = BufReader::new(stdout);
for line in reader.lines() {
match line {
Ok(line) => {
let mut iter: std::str::Split<'_, &str> = line.trim().split(",");
let lat = iter.parse_next_coord();
let lng = iter.parse_next_coord();
if lat.is_none() || lng.is_none() {
invalid.push(line)
} else {
results.push([lat.unwrap(), lng.unwrap()])
}
}
Err(e) => {
log::error!("Error reading line: {}", e);
}
}
};
}

match stdin.write_all(input.as_bytes()) {
Ok(_) => match stdin.flush() {
Ok(_) => {}
Err(err) => {
log::error!("failed to flush stdin: {}", err);
}
},
Err(err) => {
log::error!("failed to write to stdin: {}", err)
match child.wait()? {
status if status.success() => {}
status => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("child process exited with status: {}", status),
))
}
};
}

let output = match child.wait_with_output() {
Ok(result) => result,
Err(err) => return Err(err),
};
let output = String::from_utf8_lossy(&output.stdout);
// let mut output_indexes = output
// .split(",")
// .filter_map(|s| s.trim().parse::<usize>().ok())
// .collect::<Vec<usize>>();
let (invalid, mut output_result): (Vec<&str>, SingleVec) = output
.split_ascii_whitespace()
.into_iter()
.collect::<Vec<_>>()
.into_par_iter()
.partition_map(|s| {
let mut iter: std::str::Split<'_, &str> = s.trim().split(",");
let lat = iter.parse_next_coord();
let lng = iter.parse_next_coord();
if lat.is_none() || lng.is_none() {
Either::Left(s)
} else {
Either::Right([lat.unwrap(), lng.unwrap()])
}
});
if let Some(first) = output_result.first() {
if let Some(last) = output_result.last() {
if let Some(first) = results.first() {
if let Some(last) = results.last() {
if first == last {
output_result.pop();
results.pop();
}
}
}

if !invalid.is_empty() {
log::warn!(
"Some invalid results were returned from the plugin: `{}`",
invalid.join(", ")
);
}
if output_result.is_empty() {
if results.is_empty() {
Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!(
"no valid output from child process \n{}\noutput should return points in the following format: `lat,lng lat,lng`",
output
invalid.join(", ")
),
))
} else {
Expand All @@ -258,7 +249,7 @@ impl Plugin {
time.elapsed().as_secs_f32()
);
// Ok(output_indexes.into_iter().map(|i| points[i]).collect())
Ok(output_result)
Ok(results)
}
}
}

0 comments on commit ab601f4

Please sign in to comment.