diff --git a/.travis.yml b/.travis.yml index e288794..e1b98d0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -20,7 +20,7 @@ jobs: - stage: UnitTests os: linux sudo: false - script: make && make test + script: make check addons: apt: packages: diff --git a/Makefile b/Makefile index 221007e..45a411f 100644 --- a/Makefile +++ b/Makefile @@ -1,11 +1,10 @@ -CXX=g++ -CXXFLAGS=-g -std=c++11 -# temporary for pnappa +CXX=clang++ +CXXFLAGS=-g -std=c++11 -Wall -pedantic LIBS=-lpthread -.PHONY: all clean -all: demo test +.PHONY: all clean check test_programs +all: demo check clean: rm -fv demo test coverage @@ -13,13 +12,15 @@ clean: demo: demo.cpp subprocess.hpp $(CXX) $(CXXFLAGS) demo.cpp -o demo $(LIBS) +check: test test_programs + valgrind ./test + test: test.cpp subprocess.hpp $(CXX) $(CXXFLAGS) test.cpp -o test $(LIBS) - # run the testsuite (-s makes it nice and verbose) - # XXX: should we make this verbose? - ./test -s - valgrind ./test coverage: test.cpp subprocess.hpp $(CXX) $(CXXFLAGS) -fprofile-arcs -ftest-coverage test.cpp -o coverage $(LIBS) .codecov/run_coverage.sh + +test_programs: + $(MAKE) -C test_programs/ diff --git a/README.md b/README.md index 4e42a83..b174a17 100644 --- a/README.md +++ b/README.md @@ -72,14 +72,17 @@ Unfortunately, I am not as familiar with Windows to write code for it, if you wa ### TODO: ....detail what each of the functions _should_ be used for. ```C++ -old API (current for now): -int execute(const std::string& commandPath, const std::vector& commandArgs, std::list& stringInput, std::function lambda) -std::vector checkOutput(const std::string& commandPath, const std::vector& commandArgs, std::list& stringInput, int& status) -std::future async(const std::string commandPath, const std::vector commandArgs, std::list stringInput, std::function lambda) +// Iterable can be stuff that can be iterated over (std::vector, etc.), all arguments other than commandPath are optional! +int execute(const std::string& commandPath, const Iterable& commandArgs, const Iterable& stdinInput, std::function lambda, const Iterable& environmentVariables); +// accepts iterators, the argument iterators are mandatory, the rest are optional +int execute(const std::string& commandPath, Iterator argsBegin, Iterator argsEnd, Iterator stdinBegin, Iterator stdinEnd, std::function lambda, Iterator envStart, Iterator envEnd); -// ctor for ProcessStream class -class ProcessStream(const std::string& commandPath, const std::vector& commandArgs, std::list& stringInput) +// Iterable can be stuff that can be iterated over (std::vector, etc.), all arguments other than commandPath are optional! +std::vector check_output(const std::string& commandPath, const Iterable& commandArgs, const Iterable& stdinInput, const Iterable& environmentVariables); +// accepts iterators, the argument iterators are mandatory, the rest are optional +std::vector check_output(const std::string& commandPath, Iterator argsBegin, Iterator argsEnd, Iterator stdinBegin, Iterator stdinEnd, Iterator envStart, Iterator envEnd); +// we currently don't have asynchronous, daemon spawning, or iterable interaction yet. coming soon(TM) ``` # License @@ -88,8 +91,7 @@ This is dual-licensed under a MIT and GPLv3 license - so FOSS lovers can use it, I don't know too much about licenses, so if I missed anything, please let me know. # Future Features -Some stuff that I haven't written yet, but I wanna: - - [X] Output streaming. Provide an iterator to allow iteration over the output lines, such that we don't have to load all in memory at once. - - [ ] Thread-safe async lambda interactions. Provide a method to launch a process in async, but still allow writing to the list of stdin without a race condition. - - [ ] A ping-ponging interface. This should allow incrementally providing stdin, then invoking the functor if output is emitted. Note that will likely not be possible if there's not performed asynchronously, or without using select. Using select is a bit annoying, because how do we differentiate between a command taking a while and it providing no input? - - [ ] Provide a way to set environment variables (i can pretty easily do it via using `execvpe`, but what should the API look like?) +Some stuff that I haven't written yet, but I wanna (see [this issue for a more in depth explanation of each](https://github.com/pnappa/subprocesscpp/issues/3)): + - Asynchronous execution + - Daemon spawning helpers (execute process and disown it, only need to consider where stdout should go). + - Interactive processes (manually feed stdin and retrieve stdout) diff --git a/demo.cpp b/demo.cpp index ccbf445..f63b4ef 100644 --- a/demo.cpp +++ b/demo.cpp @@ -9,8 +9,13 @@ void echoString(std::string in) { } int main() { - // execute bc and pass it some equations + std::vector args = {}; std::list inputs = {"1+1\n", "2^333\n", "32-32\n"}; + std::vector env = {}; + subprocess::execute("/usr/bin/bc", args.begin(), args.end(), inputs.begin(), inputs.end(), echoString, env.begin(), env.end()); + + // execute bc and pass it some equations + inputs = {"1+1\n", "2^333\n", "32-32\n"}; subprocess::execute("/usr/bin/bc", {}, inputs, echoString); // grep over some inputs @@ -19,19 +24,18 @@ int main() { // execute a process and extract all lines outputted inputs.clear(); // provide no stdin - int status; - std::vector vec = subprocess::checkOutput("/usr/bin/time", {"sleep", "1"}, inputs, status); + std::vector vec = subprocess::check_output("/usr/bin/time", {"sleep", "1"}, inputs); + //std::vector vec = subprocess::check_output("/usr/bin/time", {"sleep", "1"}, inputs, status); for (const std::string& s : vec) { std::cout << "output: " << s << '\t'; std::cout << "line length:" << s.length() << std::endl; } - std::cout << "process finished with an exit code of: " << status << std::endl; - // execute sleep asynchronously, and block when needing the output - std::future futureStatus = subprocess::async("/bin/sleep", {"3"}, inputs, [](std::string) {}); - // if this wasn't async, this line wouldn't print until after the process finished! - std::cout << "executing sleep..." << std::endl; - std::cout << "sleep executed with exit status: " << futureStatus.get() << std::endl; + //// execute sleep asynchronously, and block when needing the output + //std::future futureStatus = subprocess::async("/bin/sleep", {"3"}, inputs, [](std::string) {}); + //// if this wasn't async, this line wouldn't print until after the process finished! + //std::cout << "executing sleep..." << std::endl; + //std::cout << "sleep executed with exit status: " << futureStatus.get() << std::endl; // simulate pipes between programs: lets launch cat to provide input into a grep process! // Note! This will read all output into a single vector, then provide this as input into the second @@ -39,14 +43,14 @@ int main() { // If you want a function that isn't as memory intensive, consider streamOutput, which provides an // iterator interface inputs = {"12232\n", "hello, world\n", "Hello, world\n", "line: Hello, world!\n"}; - vec = subprocess::checkOutput("/bin/cat", {}, inputs, status); + vec = subprocess::check_output("/bin/cat", {}, inputs); inputs = std::list(vec.begin(), vec.end()); subprocess::execute("/bin/grep", {"-i", "^Hello, world$"}, inputs, echoString); // stream output from a process - inputs = {"12232\n", "hello, world\n", "Hello, world\n", "line: Hello, world!\n"}; - subprocess::ProcessStream ps("/bin/grep", {"-i", "^Hello, world$"}, inputs); - for (std::string out : ps) { - std::cout << "received: " << out; - } + //inputs = {"12232\n", "hello, world\n", "Hello, world\n", "line: Hello, world!\n"}; + //subprocess::ProcessStream ps("/bin/grep", {"-i", "^Hello, world$"}, inputs); + //for (std::string out : ps) { + //std::cout << "received: " << out; + //} } diff --git a/next_prime.cpp b/next_prime.cpp new file mode 100644 index 0000000..ff2a5ac --- /dev/null +++ b/next_prime.cpp @@ -0,0 +1,28 @@ +/** + * Demonstration of recursive subprocess pipes + * + * This program find the next prime for an input number. + */ + + +#include "subprocess.hpp" + +int main() { + subprocess::Process incrementer("./test_programs/increment", {}, [&](std::string s) { std::cout << s; }); + subprocess::Process prime_checker("./test_programs/tee_if_nonprime"); + + incrementer.pipe_to(prime_checker); + prime_checker.pipe_to(incrementer); + prime_checker.output_to_file("prime.out"); + + incrementer.start(); + incrementer << "33\n"; + + incrementer.force_output(); + prime_checker.force_output(); + incrementer.force_output(); + prime_checker.force_output(); + incrementer.force_output(); + prime_checker.force_output(); +} + diff --git a/subprocess.hpp b/subprocess.hpp index f632cc0..18dd347 100644 --- a/subprocess.hpp +++ b/subprocess.hpp @@ -2,14 +2,23 @@ #include #include +#include +#include #include #include #include +#include #include #include #include #include +#include +#include #include +#include +#include +#include +#include // unix process stuff #include @@ -19,7 +28,10 @@ #include #include + namespace subprocess { + + class Process; namespace internal { /** * A TwoWayPipe that allows reading and writing between two processes @@ -78,7 +90,7 @@ class TwoWayPipe { * */ short inPipeState(long wait_ms) { // file descriptor struct to check if pollin bit will be set - struct pollfd fds = {.fd = input_pipe_file_descriptor[0], .events = POLLIN}; + struct pollfd fds = {input_pipe_file_descriptor[0], POLLIN, 0}; // poll with no wait time int res = poll(&fds, 1, wait_ms); @@ -249,56 +261,77 @@ class TwoWayPipe { * connection * */ class Process { + friend class subprocess::Process; + pid_t pid; TwoWayPipe pipe; + const std::string commandPath; + std::vector processArgs; + std::vector envVariables; + + // construct the argument list (unfortunately, the C api wasn't defined with C++ in mind, so + // we have to abuse const_cast) see: https://stackoverflow.com/a/190208 + // this returns a null terminated vector that contains a list of non-const char ptrs + template + static std::vector toNullTerminatedCharIterable(Iter begin, Iter end) { + // TODO: insert test to check if our iterable store strings..? + // well it'll fail in the push_back stage anyway + + std::vector charArrayPlex; + + // the process name must be first for execv + // charArrayPlex.push_back(const_cast(input.c_str())); + for (auto it = begin; it != end; ++it) { + charArrayPlex.push_back(strdup((*it).c_str())); + //charArrayPlex.push_back(const_cast((*it).c_str())); + } + // must be terminated with a nullptr for execv + charArrayPlex.push_back(nullptr); + + return charArrayPlex; + } + public: - Process() = default; + template + Process(const std::string& commandPath, ArgIt argBegin, ArgIt argEnd, EnvIt envBegin, EnvIt envEnd) + : commandPath(commandPath) { + pid = 0; + pipe.initialize(); + + // generate a vector that is able to be passed into exec for the process arguments + processArgs = toNullTerminatedCharIterable(argBegin, argEnd); + // process args must start with the processes name + processArgs.insert(processArgs.begin(), strdup(commandPath.c_str())); + + // ditto for the env variables + envVariables = toNullTerminatedCharIterable(envBegin, envEnd); + } + + ~Process() { + // clean these up because they're created via strdup + for (char* c : processArgs) free(c); + for (char* c : envVariables) free(c); + } /** - * Starts a seperate process with the provided command and - * arguments This also initializes the TwoWayPipe - * @param commandPath - an absolute string to the program path - * @param argsItBegin - the begin iterator to strings that - * will be passed as arguments - * @param argsItEnd - the end iterator to strings that - * will be passed as arguments + * Starts a seperate process with the provided command and arguments * @return TODO return errno returned by child call of execv * (need to use the TwoWayPipe) * */ - template - void start(const std::string& commandPath, InputIT argsItBegin, InputIT argsItEnd) { - pid = 0; - pipe.initialize(); - // construct the argument list (unfortunately, - // the C api wasn't defined with C++ in mind, so - // we have to abuse const_cast) see: - // https://stackoverflow.com/a/190208 - std::vector cargs; - // the process name must be first for execv - cargs.push_back(const_cast(commandPath.c_str())); - while (argsItBegin != argsItEnd) { - cargs.push_back(const_cast((*argsItBegin).c_str())); - argsItBegin++; - } - // must be terminated with a nullptr for execv - cargs.push_back(nullptr); - + void start() { pid = fork(); // child if (pid == 0) { pipe.setAsChildEnd(); - // ask kernel to deliver SIGTERM - // in case the parent dies + // ask kernel to deliver SIGTERM in case the parent dies + // so we don't get zombies prctl(PR_SET_PDEATHSIG, SIGTERM); - execv(commandPath.c_str(), cargs.data()); - // Nothing below this line - // should be executed by child - // process. If so, it means that - // the execl function wasn't - // successfull, so lets exit: + execvpe(commandPath.c_str(), processArgs.data(), envVariables.data()); + // Nothing below this line should be executed by child process. If so, it means that + // the exec function wasn't successful, so lets exit: exit(1); } pipe.setAsParentEnd(); @@ -342,215 +375,524 @@ class Process { return status; } }; -} + +/* hm, I copied this from somewhere, dunno where */ +template +struct is_iterator { + static char test(...); + + template ::difference_type, + typename = typename std::iterator_traits::pointer, + typename = typename std::iterator_traits::reference, + typename = typename std::iterator_traits::value_type, + typename = typename std::iterator_traits::iterator_category> + static long test(U&&); + + constexpr static bool value = std::is_same())), long>::value; +}; +/* begin https://stackoverflow.com/a/29634934 */ +namespace detail { +// To allow ADL with custom begin/end +using std::begin; +using std::end; + +template +auto is_iterable_impl(int) + -> decltype(begin(std::declval()) != end(std::declval()), // begin/end and operator != + ++std::declval()))&>(), // operator ++ + *begin(std::declval()), // operator* + std::true_type{}); + +template +std::false_type is_iterable_impl(...); + +} // namespace detail + +template +using is_iterable = decltype(detail::is_iterable_impl(0)); +/* end https://stackoverflow.com/a/29634934 */ + +// smallest possible iterable for the default arg values for the API functions that accept iterators +using DummyContainer = std::list; +static DummyContainer dummyVec = {}; +} // namespace internal + /** * Execute a subprocess and optionally call a function per line of stdout. * @param commandPath - the path of the executable to execute, e.g. "/bin/cat" - * @param commandArgs - the extra arguments for an executable e.g. {"argument 1", "henlo"} - * @param stdinInput - a list of inputs that will be piped into the processes' stdin - * @param lambda - a function that is called with every line from the executed process (default NOP function) - * @param env - a list of environment variables that the process will execute with (default nothing) + * @param firstArg - the begin iterator for a list of arguments + * @param lastArg - the end iterator for a list of arguments + * @param stdinBegin - an InputIterator to provide stdin + * @param stdinEnd - the end of the InputIterator range for stdin + * @param lambda - a function that is called with every line from the executed process (default NOP + * function) + * @param envBegin - the begin of an iterator containing process environment variables to set + * @param envEnd - the end of the env iterator */ -int execute(const std::string& commandPath, const std::vector& commandArgs, - const std::vector& stdinInput, +template ::value, void>::type, + typename = typename std::enable_if::value, void>::type, + typename = typename std::enable_if::value, void>::type> +int execute(const std::string& commandPath, ArgIt firstArg, ArgIt lastArg, + StdinIt stdinBegin = internal::dummyVec.begin(), StdinIt stdinEnd = internal::dummyVec.end(), const std::function& lambda = [](std::string) {}, - const std::vector& env = {}); + EnvIt envBegin = internal::dummyVec.begin(), EnvIt envEnd = internal::dummyVec.end()) { + internal::Process childProcess(commandPath, firstArg, lastArg, envBegin, envEnd); + childProcess.start(); + + // write our input to the processes stdin pipe + for (auto it = stdinBegin; it != stdinEnd; ++it) { + childProcess.write(*it); + + // propagate output as we need to ensure the output pipe isn't clogged + while (childProcess.isReady()) { + lambda(childProcess.readLine()); + } + } + + // close the stdin for the process + childProcess.sendEOF(); + + // iterate over each line of remaining output by the child's stdout, and call the functor + std::string processOutput; + while ((processOutput = childProcess.readLine()).size() > 0) { + lambda(processOutput); + } + + return childProcess.waitUntilFinished(); +} /** * Execute a subprocess and optionally call a function per line of stdout. * @param commandPath - the path of the executable to execute, e.g. "/bin/cat" - * @param commandArgs - the extra arguments for an executable e.g. {"argument 1", "henlo"} - * @param stdinBegin - an InputIterator to provide stdin - * @param stdinEnd - the end of the InputIterator range for stdin - * @param lambda - a function that is called with every line from the executed process (default NOP function) + * @param commandArgs - the extra arguments for an executable e.g. {"argument 1", "henlo"} (default no + * arguments) + * @param stdinInput - a list of inputs that will be piped into the processes' stdin (default no stdin) + * @param lambda - a function that is called with every line from the executed process (default NOP + * function) * @param env - a list of environment variables that the process will execute with (default nothing) */ -template -int execute(const std::string& commandPath, const std::vector& commandArgs, InputIt stdinBegin, - InputIt stdinEnd, const std::function& lambda = [](std::string) {}, - const std::vector& env = {}); +template , class StdinIterable = std::list, + class EnvIterable = std::list, + typename = typename std::enable_if::value, void>::type, + typename = typename std::enable_if::value, void>::type, + typename = typename std::enable_if::value, void>::type> +int execute(const std::string& commandPath, const ArgIterable& commandArgs = {}, + const StdinIterable& stdinInput = {}, + const std::function& lambda = [](std::string) {}, const EnvIterable& env = {}) { + return execute(commandPath, commandArgs.begin(), commandArgs.end(), stdinInput.begin(), stdinInput.end(), + lambda, env.begin(), env.end()); +} /** * Execute a subprocess and retrieve the output of the command * @param commandPath - the path of the executable to execute, e.g. "/bin/cat" - * @param commandArgs - the extra arguments for an executable e.g. {"argument 1", "henlo"} - * @param stdinInput - a list of inputs that will be piped into the processes' stdin - * @param env - a list of environment variables that the process will execute with (default nothing) + * @param firstArg - the begin iterator for a list of arguments + * @param lastArg - the end iterator for a list of arguments + * @param stdinBegin - an InputIterator to provide stdin + * @param stdinEnd - the end of the InputIterator range for stdin + * @param lambda - a function that is called with every line from the executed process (default NOP + * function) + * @param envBegin - the begin of an iterator containing process environment variables to set + * @param envEnd - the end of the env iterator */ -std::vector check_output(const std::string& commandPath, - const std::vector& commandArgs, const std::vector& stdioInput, - const std::vector& env = {}); +template ::value, void>::type, + typename = typename std::enable_if::value, void>::type, + typename = typename std::enable_if::value, void>::type> +std::vector check_output(const std::string& commandPath, ArgIt firstArg, ArgIt lastArg, + StdinIt stdinBegin = internal::dummyVec.begin(), StdinIt stdinEnd = internal::dummyVec.end(), + EnvIt envBegin = internal::dummyVec.begin(), EnvIt envEnd = internal::dummyVec.end()) { + std::vector retVec; + // XXX: what's a good way to return the return value, do we throw on non-zero return? + // int status = execute(commandPath, firstArg, lastArg, stdinBegin, stdinEnd, [&](std::string s) { + // retVec.push_back(std::move(s)); }, envBegin, envEnd); + execute(commandPath, firstArg, lastArg, stdinBegin, stdinEnd, + [&](std::string s) { retVec.push_back(std::move(s)); }, envBegin, envEnd); + return retVec; +} /** * Execute a subprocess and retrieve the output of the command * @param commandPath - the path of the executable to execute, e.g. "/bin/cat" * @param commandArgs - the extra arguments for an executable e.g. {"argument 1", "henlo"} - * @param stdinBegin - an InputIterator to provide stdin - * @param stdinEnd - the end of the InputIterator range for stdin + * @param stdinInput - a list of inputs that will be piped into the processes' stdin * @param env - a list of environment variables that the process will execute with (default nothing) */ -template -std::vector check_output(const std::string& commandPath, - const std::vector& commandArgs, InputIt stdioBegin, InputIt stdioEnd, - const std::vector& env = {}); - -// // TODO: what if the process terminates? consider error handling potentials... -// class ProcessStream { -// public: -// ProcessStream(const std::string& commandPath, const std::vector& commandArgs); - -// // write a line to the subprocess's stdin -// void write(const std::string& inputLine); -// // read a line and block until received (or until timeout reached) -// template -// std::string read(std::chrono::duration timeout=-1); -// // if there is a line for reading -// template -// bool ready(std::chrono::duration timeout=0); - -// ProcessStream& operator<<(const std::string& inputLine); -// ProcessStream& operator>>(std::string& outputLine); -// }; +template , class StdinIterable = std::vector, + class EnvIterable = std::vector, + typename = typename std::enable_if::value, void>::type, + typename = typename std::enable_if::value, void>::type, + typename = typename std::enable_if::value, void>::type> +std::vector check_output(const std::string& commandPath, const ArgIterable& commandArgs = {}, + const StdinIterable& stdinInput = {}, const EnvIterable& env = {}) { + return check_output(commandPath, commandArgs.begin(), commandArgs.end(), stdinInput.begin(), + stdinInput.end(), env.begin(), env.end()); +} +// TODO: what if the process terminates? consider error handling potentials... /** - * Execute a process, inputting stdin and calling the functor with the stdout - * lines. - * @param commandPath - an absolute string to the program path - * @param commandArgs - a vector of arguments that will be passed to the process - * @param stringInput - a feed of strings that feed into the process (you'll typically want to end them with a - * newline) - * @param lambda - the function to execute with every line output by the process - * @return the exit status of the process - * */ -int execute(const std::string& commandPath, const std::vector& commandArgs, - std::list& stringInput /* what pumps into stdin */, - std::function lambda) { - internal::Process childProcess; - childProcess.start(commandPath, commandArgs.begin(), commandArgs.end()); - - // while our string queue is working, - while (!stringInput.empty()) { - // write our input to the process's stdin pipe - std::string newInput = stringInput.front(); - stringInput.pop_front(); - childProcess.write(newInput); - } + * A representation of a process. A process may be piped to one or more other processes or files. + * Currently, this version does not support cyclic pipes, use AsyncProcess for that. + */ +class Process { + // need some list of processes this process is supposed to pipe to (if any) + // XXX: what if the child processes are moved? should we keep a reference to the parent(s) then update us within their vector..? + std::vector successor_processes; + std::vector predecessor_processes; + static_assert(std::is_same::value, "processes must be stored in same container type"); + + // TODO: how should we handle this..? + // The reason why its not trivial is that we may want to have a file pipe to multiple processes, and it feels + // like a waste to read from the file N times to output to N processes. + // std::vector feedin_files; + std::vector feedout_files; + // the function to call every time a line is output + // TODO: somehow make it use the ctor template type + std::function* func = nullptr; + + bool started = false; + bool finished = false; + int retval; + + mutable size_t lines_written = 0; + mutable size_t lines_received = 0; + + static size_t process_id_counter; + + internal::Process owned_proc; + size_t identifier = process_id_counter++; + + std::deque stdin_queue; + std::deque stdout_queue; + + protected: + std::string get_identifier() { + return std::to_string(identifier); + } - childProcess.sendEOF(); + void pump_input() { + assert(started && "error: input propagated for inactive process"); - // iterate over each line output by the child's stdout, and call - // the functor - std::string input; - while ((input = childProcess.readLine()).size() > 0) { - lambda(input); - } + // write any queued stdinput + while (!stdin_queue.empty()) { + this->write(stdin_queue.front()); + stdin_queue.pop_front(); + pump_output(); + } - return childProcess.waitUntilFinished(); -} + // each of the input files to this process has to be pumped too + // for (std::ifstream* ifile : feedin_files) { + // // write as many lines from the input file until we run out + // for (std::string line; std::getline(*ifile, line); ) { + // // we gotta append a newline, getline omits it. + // this->write(line + "\n"); + // pump_output(); + // } + // } + } -/* convenience fn to return a list of outputted strings */ -std::vector checkOutput(const std::string& commandPath, - const std::vector& commandArgs, - std::list& stringInput /* what pumps into stdin */, int& status) { - std::vector retVec; - status = execute( - commandPath, commandArgs, stringInput, [&](std::string s) { retVec.push_back(std::move(s)); }); - return retVec; -} + void pump_output() { + if (finished) return; + assert(started && "error: output propagated for inactive process"); -/* spawn the process in the background asynchronously, and return a future of the status code */ -std::future async(const std::string commandPath, const std::vector commandArgs, - std::list stringInput, std::function lambda) { - // spawn the function async - we must pass the args by value into the async lambda - // otherwise they may destruct before the execute fn executes! - // whew, that was an annoying bug to find... - return std::async(std::launch::async, - [&](const std::string cp, const std::vector ca, std::list si, - std::function l) { return execute(cp, ca, si, l); }, - commandPath, commandArgs, stringInput, lambda); -} + while (owned_proc.isReady()) { + std::string out = owned_proc.readLine(); -/* TODO: refactor up this function so that there isn't duplicated code - most of this is identical to the - * execute fn execute a program and stream the output after each line input this function calls select to - * check if outputs needs to be pumped after each line input. This means that if the line takes too long to - * output, it may be not input into the functor until another line is fed in. You may modify the delay to try - * and wait longer until moving on. This delay must exist, as several programs may not output a line for each - * line input. Consider grep - it will not output a line if no match is made for that input. */ -class ProcessStream { - internal::Process childProcess; + this->write_next(out); + } + } -public: - ProcessStream(const std::string& commandPath, const std::vector& commandArgs, - std::list& stringInput) { - childProcess.start(commandPath, commandArgs.begin(), commandArgs.end()); - - // while our string queue is working, - while (!stringInput.empty()) { - // write our input to the - // process's stdin pipe - std::string newInput = stringInput.front(); - stringInput.pop_front(); - childProcess.write(newInput); + void write_next(const std::string& out) { + assert(started && "error: input propagated for inactive process"); + + // hold onto stdout if we don't have any successors or lambdas to execute + if (successor_processes.empty() && feedout_files.empty() && func == nullptr) { + stdout_queue.push_back(out); + } else { + // call functor + (*func)(out); + + for (Process* succ_process : successor_processes) { + succ_process->write(out); + } + + // TODO: should I throw if cannot write to file..? + for (std::ofstream& succ_file : feedout_files) { + succ_file << out << std::flush; + } + } } - // now we finished chucking in the string, send - // an EOF - childProcess.sendEOF(); - } - ~ProcessStream() { - childProcess.waitUntilFinished(); - } + /** + * Read from this process (and all predecessors) until their and this process is finished. + */ + void read_until_completion() { + // XXX: should we colour this to check that it isn't a cyclic network, and throw an exception? + if (finished) { + return; + } + + // we need to do block and read for the preds first (they must complete before this one can) + for (Process* pred_process : predecessor_processes) { + pred_process->read_until_completion(); + } + + // then block read for us & forward output + std::string processOutput; + while ((processOutput = owned_proc.readLine()).size() > 0) { + this->write_next(processOutput); + finished = true; + } + } + + public: + template> + Process(const std::string& commandPath, const ArgIterable& commandArgs = internal::dummyVec) : + owned_proc(commandPath, commandArgs.begin(), commandArgs.end(), internal::dummyVec.begin(), internal::dummyVec.end()) { + } + template> + Process(const std::string& commandPath, const ArgIterable& commandArgs, Functor func) : + owned_proc(commandPath, commandArgs.begin(), commandArgs.end(), internal::dummyVec.begin(), internal::dummyVec.end()) { + // TODO: change back to new Functor(func), when I'm able to set the member variables type in the ctor. + this->func = new std::function(func); + } + + /** + * Get a graphvis compatible representation of the process network (DOT format) + */ + std::string get_network_topology() { + std::stringstream ret; + + ret << "digraph G {\n"; + + std::set visited_processes; + std::stack to_visit; - struct iterator { - ProcessStream* ps; - bool isFinished = false; - // current read line of the process - std::string cline; + to_visit.emplace(this); - iterator(ProcessStream* ps) : ps(ps) { - // increment this ptr, because nothing exists initially - ++(*this); + while (!to_visit.empty()) { + Process* top = to_visit.top(); + to_visit.pop(); + // ignore the already visited + if (visited_processes.count(top)) continue; + + visited_processes.emplace(top); + + // add the label for this process + ret << top->get_identifier() << " [label=\"" << top->owned_proc.processArgs[0] << "\"];\n"; + + // add edges for each of the parents and children, then queue them up to be visited + // as predecessor_procs and successor_procs are symmetric, we only need to add one + for (Process* proc : top->predecessor_processes) { + ret << proc->get_identifier() << "->" << top->get_identifier() << ";\n"; + to_visit.emplace(proc); + } + + for (Process* proc : top->successor_processes) { + to_visit.emplace(proc); + } + + } + + ret << "}\n"; + + return ret.str(); } - // ctor for end() - iterator(ProcessStream* ps, bool) : ps(ps), isFinished(true) {} - const std::string& operator*() const { - return cline; + virtual ~Process() { + // what needs to be done here is that output from predecessors needs to be forced until completion + + // need to close all predecessors (if any) + for (Process* pred_process : predecessor_processes) { + pred_process->finish(); + } + + // this->owned_proc.sendEOF(); + // process any remaining input/output + finish(); + + // do the same for outputting processes + // XXX: i don't think I need/should do this, right? + // for (Process* succ_process : successor_processes) { + // succ_process->finish(); + // } + + // our function is dynamically allocated (how else do we test for null functors..?) + delete func; } - /* preincrement */ - iterator& operator++() { - // iterate over each line output by the child's stdout, and call the functor - cline = ps->childProcess.readLine(); - if (cline.empty()) { - isFinished = true; + // start the process and prevent any more pipes from being established. + // may throw an exception? + void start() { + // ignore an already started process + if (started) return; + + owned_proc.start(); + started = true; + + // recursively start all predecessor processes + // do this to ensure that if this process relies on a predecessor's input, then it will terminate. + for (auto pred_process : predecessor_processes) { + pred_process->start(); } - return *this; + + // recursively start all successor processes + for (auto successor_proc : successor_processes) { + successor_proc->start(); + } + + // push out any pending input + pump_input(); + // propagate output some more + pump_output(); + } + + int finish() { + if (finished) return this->retval; + + pump_input(); + read_until_completion(); + pump_output(); + + return owned_proc.waitUntilFinished(); + } + + bool is_started() const { return started; } + + // write a line to the subprocess's stdin + void write(const std::string& inputLine) { + if (finished) throw std::runtime_error("cannot write to a finished process"); + + if (is_started()) { + this->lines_written++; + + owned_proc.write(inputLine); + + pump_output(); + + // if it hasn't been started, then we queue up the input for later + } else { + stdin_queue.push_front(inputLine); + } + } + + // read a line and block until received (or until timeout reached) + template + std::string read(std::chrono::duration timeout = std::chrono::duration(-1)){ + std::string outputLine; + + if (!started || finished) { + throw std::runtime_error("cannot read line from inactive process"); + } + + if (successor_processes.size() > 0 || feedout_files.size() > 0 || func != nullptr) { + throw std::runtime_error("manually reading line from process that is piped from/has a functor is prohibited"); + } + + lines_written++; + + // we may have lines of output to "use" from earlier + if (!stdout_queue.empty()) { + outputLine = stdout_queue.front(); + stdout_queue.pop_front(); + } else { + outputLine = owned_proc.readLine(timeout); + } + + return outputLine; + } + // if there is a line for reading (optionally + template + bool ready(std::chrono::duration timeout=0) { + return owned_proc.isReady(timeout); + } + + // pipe some data to the receiver process, and return the receiver process + // we do this so we can have: process1.pipe_to(process2).pipe_to(process3)...etc + // if pipes are set up there are some restrictions on using the << and >> operators. + // if a process is receiving from another process, then they cannot use operator<< anymore + // hmm: what about if its done before .start()? + // if a process is outputting to another, they cannot use operator>> + Process& pipe_to(Process& receiver) { + successor_processes.push_back(&receiver); + receiver.predecessor_processes.push_back(this); + return receiver; + } + // ditto + Process& operator>>(Process& receiver) { return this->pipe_to(receiver); } + // XXX: removed this because the dtor wasn't handled well + // for files + // std::ofstream& pipe_to(std::ofstream& receiver) { + // feedout_files.push_back(&receiver); + // return receiver; + // } + void output_to_file(const std::string& filename) { + // XXX: in some compilers this causes a warning, whilst in others omitting it causes an error. + feedout_files.push_back(std::move(std::ofstream(filename))); + if (!feedout_files.back().good()) throw std::runtime_error("error: file " + filename + " failed to open"); } - /* post increment */ - iterator operator++(int) { - iterator old(*this); - ++(*this); - return old; + // can't seem to get this one working..? + //void output_to_file(std::ofstream&& file) { + // feedout_files.push_back(std::move(file)); + // if (!feedout_files.back().good()) throw std::runtime_error("error: file is invalid"); + //} + + // read a line into this process (so it acts as another line of stdin) + // instead of string, probably should be typename Stringable, and use stringstream and stuff. + Process& operator<<(const std::string& inputLine) { + this->write(inputLine); + return *this; } - bool operator==(const iterator& other) const { - return other.ps == this->ps && this->isFinished == other.isFinished; + // retrieve a line of stdout from this process (blocking) into the string + Process& operator>>(std::string& outputLine) { + outputLine = read(); + return *this; } - bool operator!=(const iterator& other) const { - return !((*this) == other); + // write all stdout to file? + // Process& operator>>(std::ofstream& outfile); + + // some other functions which maybe useful (perhaps take a timeout?) + // returns whether it could terminate + bool terminate(); + // a more...extreme way + bool kill(); + // send arbitrary signals to the subprocess + void signal(int signum); + + class iterator; + + // provide an iterator to iterate over the stdout produced + iterator begin(); + iterator end(); +}; + +// initialise the id counter, dumb c++ standard doesn't allow it +size_t Process::process_id_counter = 0; + +/** + * An async equivalent of Process + * It constantly blocks for input to allow cyclic process flows + */ +class AsyncProcess : Process { + + std::future retval; + + public: + template> + AsyncProcess(const std::string& commandPath, const ArgIterable& commandArgs = internal::dummyVec, Functor func = [](std::string){}) : + Process(commandPath, commandArgs, func) { } + + ~AsyncProcess() { + } - }; - iterator begin() { - return iterator(this); - } + void start() { - iterator end() { - return iterator(this, true); - } + } }; } // end namespace subprocess diff --git a/test.cpp b/test.cpp index 3564bdc..17c36ec 100644 --- a/test.cpp +++ b/test.cpp @@ -9,7 +9,16 @@ #include "subprocess.hpp" -TEST_CASE("basic echo execution", "[subprocess::execute]") { +/* + * Some additional tests I'd like/things to improve: + * - TODO: replace these system binaries with hand written ones in test_programs, + * so we can ensure these are all available across all systems. + * - TODO: add some more tests about lots of output generated for little inputs + * - TODO: add some tests which will fill up the pipe fully (https://unix.stackexchange.com/questions/11946/how-big-is-the-pipe-buffer) + * - TODO: should a process be started in the dtor if it hasn't already been..? + */ + +TEST_CASE("[iterable] basic echo execution", "[subprocess::execute]") { std::list inputs; std::vector outputs; subprocess::execute("/bin/echo", {"hello"}, inputs, [&](std::string s) { outputs.push_back(s); }); @@ -18,7 +27,38 @@ TEST_CASE("basic echo execution", "[subprocess::execute]") { REQUIRE(outputs.front() == "hello\n"); } -TEST_CASE("no trailing output newline echo execution", "[subprocess::execute]") { +TEST_CASE("[iterable] basic echo execution varargs", "[subprocess::execute]") { + // test that execute will compile file with vargs + std::list inputs; + std::vector outputs; + std::vector env = {"LOL=lol"}; + subprocess::execute("/bin/echo", {"hello"}, inputs, [&](std::string s) { outputs.push_back(s); }, env); + REQUIRE(outputs.size() == 1); + // echo appends a newline by default + REQUIRE(outputs.front() == "hello\n"); + + outputs.clear(); + int status = + subprocess::execute("/bin/echo", {"hello"}, inputs, [&](std::string s) { outputs.push_back(s); }); + REQUIRE(outputs.size() == 1); + REQUIRE(status == 0); + + // XXX: this causes a linker error..? well.. it used to. + // I think I need to fix the template for the Iterator one, to be more strict in what it accepts (i think it might actually be able to cast some Iterables to Iterators) + outputs.clear(); + status = subprocess::execute("/bin/echo", {"hello"}, inputs); + REQUIRE(status == 0); + + outputs.clear(); + status = subprocess::execute("/bin/echo", {"hello"}); + REQUIRE(status == 0); + + outputs.clear(); + status = subprocess::execute("/bin/echo"); + REQUIRE(status == 0); +} + +TEST_CASE("[iterable] no trailing output newline echo execution", "[subprocess::execute]") { std::list inputs; std::vector outputs; subprocess::execute("/bin/echo", {"-n", "hello"}, inputs, [&](std::string s) { outputs.push_back(s); }); @@ -27,7 +67,7 @@ TEST_CASE("no trailing output newline echo execution", "[subprocess::execute]") REQUIRE(outputs.front() == "hello"); } -TEST_CASE("non existent executable", "[subprocess::execute]") { +TEST_CASE("[iterable] non existent executable", "[subprocess::execute]") { // try and run a non-existent executable, what should happen..? std::list inputs; std::vector outputs; @@ -39,7 +79,7 @@ TEST_CASE("non existent executable", "[subprocess::execute]") { REQUIRE(outputs.size() == 0); } -TEST_CASE("stdin execute simple cat", "[subprocess::execute]") { +TEST_CASE("[iterable] stdin execute simple cat", "[subprocess::execute]") { std::list inputs = {"henlo wurld\n", "1,2,3,4\n"}; std::vector outputs; int retval = subprocess::execute("/bin/cat", {}, inputs, [&](std::string s) { outputs.push_back(s); }); @@ -50,8 +90,9 @@ TEST_CASE("stdin execute simple cat", "[subprocess::execute]") { REQUIRE(outputs.at(1) == "1,2,3,4\n"); } -TEST_CASE("stdin execute simple cat no trailing newline for last input", "[subprocess::execute]") { - // executing a command with the last one missing a newline still should work the same, as the stdin stream gets closed. +TEST_CASE("[iterable] stdin execute simple cat no trailing newline for last input", "[subprocess::execute]") { + // executing a command with the last one missing a newline still should work the same, as the stdin stream + // gets closed. std::list inputs = {"henlo wurld\n", "1,2,3,4"}; std::vector outputs; int retval = subprocess::execute("/bin/cat", {}, inputs, [&](std::string s) { outputs.push_back(s); }); @@ -62,86 +103,453 @@ TEST_CASE("stdin execute simple cat no trailing newline for last input", "[subpr REQUIRE(outputs.at(1) == "1,2,3,4"); } -TEST_CASE("checkOutput simple case cat", "[subprocess::checkOutput]") { - // execute bc and pass it some equations - std::list inputs = {"1+1\n", "2^333\n", "32-32\n"}; - // this one is interesting, there's more than one line of stdout for each input (bc line breaks after a certain number of characters) - std::vector output_expected = {"2\n", "17498005798264095394980017816940970922825355447145699491406164851279\\\n", "623993595007385788105416184430592\n", "0\n"}; - int retval; - std::vector out = subprocess::checkOutput("/usr/bin/bc", {}, inputs, retval); +TEST_CASE("[iterable] test env variables are sent to program correctly", "[subprocess::execute]") { + // executing a command with the last one missing a newline still should work the same, as the stdin stream + // gets closed. + std::vector outputs; + int retval = subprocess::execute("./test_programs/print_env", {"LOL"}, {}, + [&](std::string s) { outputs.push_back(s); }, {"LOL=lol"}); REQUIRE(retval == 0); - REQUIRE(out.size() == output_expected.size()); - REQUIRE(out == output_expected); + REQUIRE(outputs.size() == 1); + REQUIRE(outputs.at(0) == "LOL,lol\n"); } -TEST_CASE("asynchronous is actually asynchronous", "[subprocess::async]") { +TEST_CASE("[iterator] basic echo execution", "[subprocess::execute]") { std::list inputs; std::vector outputs; + std::vector args = {"hello"}; + std::vector env = {}; + int status; - std::atomic isDone(false); - std::future retval = subprocess::async("/usr/bin/time", {"sleep", "3"}, inputs, [&](std::string s) { isDone.store(true); outputs.push_back(s); }); - // reaching here after the async starts, means that we prrroooooobbbaabbbly (unless the user has a very, very slow computer) won't be finished - REQUIRE(isDone.load() == false); - REQUIRE(retval.get() == 0); + status = subprocess::execute("/bin/echo", args.begin(), args.end(), inputs.begin(), inputs.end(), + [&](std::string s) { outputs.push_back(s); }, env.begin(), env.end()); + REQUIRE(outputs.size() == 1); + // echo appends a newline by default + REQUIRE(outputs.front() == "hello\n"); - // time has different outputs for different OSes, pluuus they will take different times to complete. all we need is some stdout. - REQUIRE(outputs.size() > 0); + // test the optional arguments compile + outputs.clear(); + status = subprocess::execute("/bin/echo", args.begin(), args.end(), inputs.begin(), inputs.end(), + [&](std::string s) { outputs.push_back(s); }); + REQUIRE(outputs.size() == 1); + // echo appends a newline by default + REQUIRE(outputs.front() == "hello\n"); + + status = subprocess::execute("/bin/echo", args.begin(), args.end(), inputs.begin(), inputs.end()); + REQUIRE(status == 0); + + status = subprocess::execute("/bin/echo", args.begin(), args.end(), inputs.begin(), inputs.end()); + REQUIRE(status == 0); + + status = subprocess::execute("/bin/echo", args.begin(), args.end()); + REQUIRE(status == 0); + + status = subprocess::execute("/bin/echo"); + REQUIRE(status == 0); } -TEST_CASE("output iterator contains everything", "[subprocess::ProcessStream]") { - // stream output from a process - std::list inputs = {"12232\n", "hello, world\n", "Hello, world\n", "line: Hello, world!\n"}; - subprocess::ProcessStream ps("/bin/grep", {"-i", "^Hello, world$"}, inputs); - std::vector expectedOutput = {"hello, world\n", "Hello, world\n"}; +TEST_CASE("[iterator] no trailing output newline echo execution", "[subprocess::execute]") { + std::list inputs; std::vector outputs; - for (std::string out : ps) { - outputs.push_back(out); - } + subprocess::execute("/bin/echo", {"-n", "hello"}, inputs, [&](std::string s) { outputs.push_back(s); }); + + REQUIRE(outputs.size() == 1); + REQUIRE(outputs.front() == "hello"); +} + +TEST_CASE("[iterator] non existent executable", "[subprocess::execute]") { + // try and run a non-existent executable, what should happen..? + std::list args; + std::list inputs; + std::vector outputs; + int retval = subprocess::execute("/bin/wangwang", args.begin(), args.end(), inputs.begin(), inputs.end(), + [](std::string) { FAIL("this functor should never have been called"); }); + + // process should have failed..? + REQUIRE(retval != 0); + REQUIRE(outputs.size() == 0); +} + +TEST_CASE("[iterator] stdin execute simple cat", "[subprocess::execute]") { + std::list args; + std::list inputs = {"henlo wurld\n", "1,2,3,4\n"}; + std::vector outputs; + int retval = subprocess::execute("/bin/cat", args.begin(), args.end(), inputs.begin(), inputs.end(), + [&](std::string s) { outputs.push_back(s); }); - REQUIRE(outputs == expectedOutput); + REQUIRE(retval == 0); + REQUIRE(outputs.size() == 2); + REQUIRE(outputs.at(0) == "henlo wurld\n"); + REQUIRE(outputs.at(1) == "1,2,3,4\n"); } -TEST_CASE("output iterator handles empty output", "[subprocess::ProcessStream]") { - std::list inputs = {"12232\n", "hello, world\n", "Hello, world\n", "line: Hello, world!\n"}; - subprocess::ProcessStream ps("/bin/grep", {"-i", "^bingo bango bongo$"}, inputs); - std::vector expectedOutput = {}; +TEST_CASE("[iterator] stdin execute simple cat no trailing newline for last input", "[subprocess::execute]") { + // executing a command with the last one missing a newline still should work the same, as the stdin stream + // gets closed. + std::list args; + std::list inputs = {"henlo wurld\n", "1,2,3,4"}; std::vector outputs; - for (std::string out : ps) { - FAIL("why do we have output!!! - " << out); - outputs.push_back(out); + int retval = subprocess::execute("/bin/cat", args.begin(), args.end(), inputs.begin(), inputs.end(), + [&](std::string s) { outputs.push_back(s); }); + + REQUIRE(retval == 0); + REQUIRE(outputs.size() == 2); + REQUIRE(outputs.at(0) == "henlo wurld\n"); + REQUIRE(outputs.at(1) == "1,2,3,4"); +} + +TEST_CASE("[iterable] check_output simple case bc", "[subprocess::check_output]") { + // execute bc and pass it some equations + std::list inputs = {"1+1\n", "2^333\n", "32-32\n"}; + // this one is interesting, there's more than one line of stdout for each input (bc line breaks after a + // certain number of characters) + std::vector output_expected = {"2\n", + "17498005798264095394980017816940970922825355447145699491406164851279\\\n", + "623993595007385788105416184430592\n", "0\n"}; + std::vector out = subprocess::check_output("/usr/bin/bc", {}, inputs); + + REQUIRE(out.size() == output_expected.size()); + REQUIRE(out == output_expected); +} + +TEST_CASE("[iterable] check_output permutations (varargs)", "[subprocess::check_output]") { + // execute echo over a series of lines + std::list inputs = {"line1\n", "line2\n", "line3\n"}; + std::list env = {"LOL=lol"}; + std::vector output_expected = {"line1\n", "line2\n", "line3\n"}; + std::vector out; + + out.clear(); + out = subprocess::check_output("/bin/cat", {}, inputs, env); + REQUIRE(out.size() == output_expected.size()); + REQUIRE(out == output_expected); + + out.clear(); + out = subprocess::check_output("/bin/cat", {}, inputs); + REQUIRE(out.size() == output_expected.size()); + REQUIRE(out == output_expected); + + // now do the same with echo (as we're limited to args only at this point) + out.clear(); + std::vector args = {"value"}; + output_expected = {"value\n"}; + out = subprocess::check_output("/bin/echo", args); + REQUIRE(out.size() == output_expected.size()); + REQUIRE(out == output_expected); + + // and echo without any args just outputs a blank line + out.clear(); + output_expected = {"\n"}; + out = subprocess::check_output("/bin/echo"); + REQUIRE(out.size() == output_expected.size()); + REQUIRE(out == output_expected); +} + +TEST_CASE("[iterator] check_output simple case bc", "[subprocess::check_output]") { + std::vector args; + // execute bc and pass it some equations + std::list inputs = {"1+1\n", "2^333\n", "32-32\n"}; + // this one is interesting, there's more than one line of stdout for each input (bc line breaks after a + // certain number of characters) + std::vector output_expected = {"2\n", + "17498005798264095394980017816940970922825355447145699491406164851279\\\n", + "623993595007385788105416184430592\n", "0\n"}; + std::vector out = subprocess::check_output("/usr/bin/bc", args.begin(), args.end(), inputs.begin(), inputs.end()); + + REQUIRE(out.size() == output_expected.size()); + REQUIRE(out == output_expected); +} + +TEST_CASE("[iterator] check_output permutations (varargs)", "[subprocess::check_output]") { + // execute echo over a series of lines + std::deque args; + std::list inputs = {"line1\n", "line2\n", "line3\n"}; + std::list env = {"LOL=lol"}; + std::vector output_expected = {"line1\n", "line2\n", "line3\n"}; + std::vector out; + + out.clear(); + out = subprocess::check_output("/bin/cat", args.begin(), args.end(), inputs.begin(), inputs.end(), env.begin(), env.end()); + REQUIRE(out.size() == output_expected.size()); + REQUIRE(out == output_expected); + + out.clear(); + out = subprocess::check_output("/bin/cat", args.begin(), args.end(), inputs.begin(), inputs.end()); + REQUIRE(out.size() == output_expected.size()); + REQUIRE(out == output_expected); + + // now do the same with echo (as we're limited to args only at this point) + out.clear(); + args = {"value"}; + output_expected = {"value\n"}; + out = subprocess::check_output("/bin/echo", args.begin(), args.end()); + REQUIRE(out.size() == output_expected.size()); + REQUIRE(out == output_expected); + + // and echo without any args just outputs a blank line + out.clear(); + output_expected = {"\n"}; + out = subprocess::check_output("/bin/echo"); + REQUIRE(out.size() == output_expected.size()); + REQUIRE(out == output_expected); +} + + +// TODO: make all these have timeouts! it's possible that they never terminate +// TODO: somehow ensure that if we try and retrieve more output it fails..? idk, seems annoying +// perhaps we just use the timeouts, with some reasonable duration..? +// TODO: replace these tests as I made having the functor AND being able to extract stdout illegal + +TEST_CASE("basic process instantiation", "[subprocess::Process]") { + subprocess::Process p("/bin/echo", {"henlo world"}); + + p.start(); + std::string line; + p >> line; + + REQUIRE(line == "henlo world\n"); +} + +// handy deferrable functions (executed on dtor) +template +struct Deferrable { + Functor func; + Deferrable(Functor f) : func(f) {} + ~Deferrable() { + func(); } +}; + +TEST_CASE("process functor", "[subprocess::Process]") { + + // requirement from the dead + // just ensure that even after the dtor, the functor isn't invoked again! + std::string line; + size_t func_count = 0; + // as the dtors are invoked in a stack like matter, this fn will be checked after the + // process' termination + auto deferred_assertion = Deferrable>([&]() { + REQUIRE(func_count == 1); + }); + + subprocess::Process p("/bin/echo", {"henlo world"}, [&](std::string s) { + func_count += 1; + REQUIRE(s == "henlo world\n"); + }); - REQUIRE(outputs == expectedOutput); + p.start(); + p.finish(); + + REQUIRE(func_count == 1); +} + +TEST_CASE("pre-emptive process input", "[subprocess::Process]") { + subprocess::Process p("/bin/cat"); + + p << "henlo world\n"; + p.start(); + + std::string line; + p >> line; + + REQUIRE(line == "henlo world\n"); +} + +TEST_CASE("post process start input", "[subprocess::Process]") { + subprocess::Process p("/bin/cat"); + + p.start(); + + p << "henlo world\n"; + + std::string line; + p >> line; + + REQUIRE(line == "henlo world\n"); } -TEST_CASE("output iterator all operator overload testing", "[subprocess::ProcessStream]") { - // stream output from a process - std::list inputs = {"12232\n", "hello, world\n", "Hello, world\n", "line: Hello, world!\n"}; - subprocess::ProcessStream ps("/bin/grep", {"-i", "Hello, world"}, inputs); - std::list expectedOutput = {"hello, world\n", "Hello, world\n", "line: Hello, world!\n"}; +TEST_CASE("reading from process that itself is a successor proc", "[subprocess::Process]") { + // TODO: add timeout + subprocess::Process p1("/bin/echo", {"high to roam"}); + subprocess::Process p2("/bin/grep", {"-o", "hi"}); + + p1.pipe_to(p2); + + p1.start(); + + std::string line; + // XXX: this line is currently making the test hang. The reason is simple, but the implications are complex. + // When reading from p2, there isn't a line instantly available, as it requires input from p1 + // - but we don't currently call readline in the predecessor. If we do, what if the current + // process *will* output, but is taking a while..? + p2 >> line; - auto beg = ps.begin(); - auto end = ps.end(); + REQUIRE(line == "hi\n"); +} - REQUIRE(beg != end); +TEST_CASE("malordered process RAII", "[subprocess::Process]") { + bool func_called = false; + auto deferred_assertion = Deferrable>([&]() { + REQUIRE(func_called == true); + }); + // test that initialising the processes in the reverse stack order won't bork them + subprocess::Process p2("/bin/grep", {"-o", "hi"}, [&](std::string s) { + REQUIRE(s == "hi\n"); + func_called = true; + }); + subprocess::Process p1("/bin/echo", {"high to roam"}); - REQUIRE(*beg == expectedOutput.front()); - expectedOutput.pop_front(); - REQUIRE(beg != end); + p1.pipe_to(p2); - ++beg; - REQUIRE(*beg == expectedOutput.front()); - expectedOutput.pop_front(); - REQUIRE(beg != end); + p1.start(); +} - beg++; - REQUIRE(*beg == expectedOutput.front()); - expectedOutput.pop_front(); +TEST_CASE("RAII doesn't start non-started process", "[subprocess:Process]") { + subprocess::Process p1("/bin/echo", {"die bart die"}, [&](std::string) { + FAIL("process output when shouldn't have"); + }); +} - beg++; - REQUIRE(beg == end); - REQUIRE(expectedOutput.size() == 0); +TEST_CASE("superfluous input", "[subprocess::Process]") { + // provide input to a process that won't use it. } +TEST_CASE("", "[subprocess::Process]") { + +} + +TEST_CASE("reading from succesor plus functor", "[subprocess::Process]") { + +} + +TEST_CASE("multi-line output", "[subprocess::Process]") { + // test a process that outputs lots of output for each line of stdin + +} + +TEST_CASE("post-start manual process input", "[subprocess::Process]") { + +} + +TEST_CASE("simple process piping", "[subprocess::Process]") { + +} + +TEST_CASE("piping to file", "[subprocess::Process]") { + +} + +TEST_CASE("bifurcated file outputting", "[subprocess::Process]") { + +} + +TEST_CASE("long pipe chain", "[subprocess::Process]") { + +} + +TEST_CASE("complex process piping", "[subprocess::Process]") { + +} + +TEST_CASE("cyclic process piping", "[subprocess::Process]") { + // TODO: fail if this takes too long, that indicates there's a problem + // probably will be the next prime implementation +} + +TEST_CASE("test infinite output cropped via head unix pipe", "[subprocess::Process]") { + +} + +TEST_CASE("", "[subprocess::Process]") { + +} + +TEST_CASE("", "[subprocess::Process]") { + +} + +TEST_CASE("", "[subprocess::Process]") { + +} + +TEST_CASE("test output iterator", "[subprocess::Process]") { + +} + +TEST_CASE("test ctor vargs", "[subprocess::Process]") { + +} + + +/* tests pending API update */ +// TEST_CASE("asynchronous is actually asynchronous", "[subprocess::async]") { +// std::list inputs; +// std::vector outputs; +// +// std::atomic isDone(false); +// std::future retval = subprocess::async("/usr/bin/time", {"sleep", "3"}, inputs, [&](std::string s) +// { isDone.store(true); outputs.push_back(s); }); +// // reaching here after the async starts, means that we prrroooooobbbaabbbly (unless the user has a very, +// very slow computer) won't be finished REQUIRE(isDone.load() == false); REQUIRE(retval.get() == 0); +// +// // time has different outputs for different OSes, pluuus they will take different times to complete. all +// we need is some stdout. REQUIRE(outputs.size() > 0); +//} +// +// TEST_CASE("output iterator contains everything", "[subprocess::ProcessStream]") { +// // stream output from a process +// std::list inputs = {"12232\n", "hello, world\n", "Hello, world\n", "line: Hello, +// world!\n"}; subprocess::ProcessStream ps("/bin/grep", {"-i", "^Hello, world$"}, inputs); +// std::vector expectedOutput = {"hello, world\n", "Hello, world\n"}; +// std::vector outputs; +// for (std::string out : ps) { +// outputs.push_back(out); +// } +// +// REQUIRE(outputs == expectedOutput); +//} +// +// TEST_CASE("output iterator handles empty output", "[subprocess::ProcessStream]") { +// std::list inputs = {"12232\n", "hello, world\n", "Hello, world\n", "line: Hello, +// world!\n"}; subprocess::ProcessStream ps("/bin/grep", {"-i", "^bingo bango bongo$"}, inputs); +// std::vector expectedOutput = {}; +// std::vector outputs; +// for (std::string out : ps) { +// FAIL("why do we have output!!! - " << out); +// outputs.push_back(out); +// } +// +// REQUIRE(outputs == expectedOutput); +//} +// +// TEST_CASE("output iterator all operator overload testing", "[subprocess::ProcessStream]") { +// // stream output from a process +// std::list inputs = {"12232\n", "hello, world\n", "Hello, world\n", "line: Hello, +// world!\n"}; subprocess::ProcessStream ps("/bin/grep", {"-i", "Hello, world"}, inputs); +// std::list expectedOutput = {"hello, world\n", "Hello, world\n", "line: Hello, world!\n"}; +// +// auto beg = ps.begin(); +// auto end = ps.end(); +// +// REQUIRE(beg != end); +// +// REQUIRE(*beg == expectedOutput.front()); +// expectedOutput.pop_front(); +// REQUIRE(beg != end); +// +// ++beg; +// REQUIRE(*beg == expectedOutput.front()); +// expectedOutput.pop_front(); +// REQUIRE(beg != end); +// +// beg++; +// REQUIRE(*beg == expectedOutput.front()); +// expectedOutput.pop_front(); +// +// beg++; +// REQUIRE(beg == end); +// REQUIRE(expectedOutput.size() == 0); +//} + // TODO: write more test cases (this seems pretty covering, let's see how coverage looks) diff --git a/test_programs/.gitignore b/test_programs/.gitignore new file mode 100644 index 0000000..d2ed88d --- /dev/null +++ b/test_programs/.gitignore @@ -0,0 +1,3 @@ +print_env +tee_if_nonprime +increment diff --git a/test_programs/Makefile b/test_programs/Makefile new file mode 100644 index 0000000..9c374fe --- /dev/null +++ b/test_programs/Makefile @@ -0,0 +1,23 @@ +CXX=g++ +CXXFLAGS=-g -std=c++11 -Wall -pedantic +LIBS=-lpthread + +.PHONY: all clean + +all: print_env tee_if_nonprime increment large_outputter + +print_env: print_env.cpp + $(CXX) $(CXXFLAGS) print_env.cpp -o print_env $(LIBS) + +tee_if_nonprime: tee_if_nonprime.cpp + $(CXX) $(CXXFLAGS) tee_if_nonprime.cpp -o tee_if_nonprime $(LIBS) + +increment: increment.cpp + $(CXX) $(CXXFLAGS) increment.cpp -o increment $(LIBS) + +large_outputter: large_outputter.cpp + $(CXX) $(CXXFLAGS) large_outputter.cpp -o large_outputter $(LIBS) + +clean: + rm -rvf print_env tee_if_nonprime increment large_outputter + diff --git a/test_programs/README b/test_programs/README new file mode 100644 index 0000000..bb48469 --- /dev/null +++ b/test_programs/README @@ -0,0 +1 @@ +A bunch of programs that do things so we can unit-test different functionalities diff --git a/test_programs/increment.cpp b/test_programs/increment.cpp new file mode 100644 index 0000000..8643d75 --- /dev/null +++ b/test_programs/increment.cpp @@ -0,0 +1,16 @@ + +/** + * Demo program to demonstrate recursive process piping + * All this does is increment the number provided. + */ + +#include + +int main() { + std::string line; + while (std::getline(std::cin, line)) { + std::cout << std::stoi(line) + 1 << std::endl; + } + + return 0; +} diff --git a/test_programs/large_outputter.cpp b/test_programs/large_outputter.cpp new file mode 100644 index 0000000..7e7bd54 --- /dev/null +++ b/test_programs/large_outputter.cpp @@ -0,0 +1,110 @@ +/** + * Demo program to output a large number of characters for a series of switches + * @author Patrick Nappa + * + * The reason for needing this program is to ensure that the pipes are flushed + * timely within the process. The unix pipes only support ~65kB of data within + * so we output several times that amount just to be sure. + * This data must be split up via lines, as output is buffered via lines, + * and it's not really expected that a line of input is larger than 65kB + * We believe this is a limitation that we can't avoid. + * -> https://unix.stackexchange.com/questions/11946/how-big-is-the-pipe-buffer + */ + +#include +#include + +// 1024 length string +constexpr ssize_t line_length = 1024; +const std::string basic_line = std::string(line_length-1, 'A'); + +void output_line(ssize_t& amount) { + // duh, nothing to output! + if (amount <= 0) return; + + if (amount < line_length) { + std::cout << std::string(amount - 1, 'A') << std::endl; + } else { + std::cout << basic_line << std::endl; + } + + amount -= line_length; +} + +void prefixed_churn(const ssize_t amount) { + assert(amount > 0 && "amount must be non-zero and positive"); + std::string line; + + ssize_t c_amount = amount; + + do { + while (c_amount > 0) { + output_line(c_amount); + } + + c_amount = amount; + } while (std::getline(std::cin, line)); +} + +void postfix_churn(const ssize_t amount) { + assert(amount > 0 && "amount must be non-zero and positive"); + std::string line; + ssize_t c_amount = amount; + + while (std::getline(std::cin, line)) { + while (c_amount > 0) { + output_line(c_amount); + } + c_amount = amount; + } +} + +void infinite_churn(const ssize_t amount) { + assert(amount > 0 && "amount must be non-zero and positive"); + ssize_t c_amount = amount; + while (true) { + while (c_amount > 0) { + output_line(c_amount); + } + + c_amount = amount; + } +} + +int main(int argc, char* argv[]) { + if (argc < 2) { + std::cerr << "Usage: " + std::string(argv[0]) + " TYPE [amount]" << std::endl; + std::cerr << "Where TYPE is either PRE, FOREACH, INFINITE" << std::endl; + std::cerr << "PRE means lines with be output before each line of stdin is read" << std::endl; + std::cerr << "FOREACH means output will be emitted after each line of stdin is processed" << std::endl; + std::cerr << "INFINITE means an infinite stream of data will be emitted" << std::endl; + std::cerr << "By default amount is 2^17 bytes, i.e. 131072 characters. This is split up into 1024 character lines (1023 plus newline)." << std::endl; + std::cerr << "Cheers cunt" << std::endl; + + return EXIT_FAILURE; + } + + // default output length + ssize_t num_bytes = 1 << 17; + if (argc >= 3) { + num_bytes = std::stoi(std::string(argv[2])); + if (num_bytes < 0) { + std::cerr << "Amount of bytes emitted must be positive" << std::endl; + return EXIT_FAILURE; + } + } + + std::string execution_type(argv[1]); + if (execution_type == "PRE") { + prefixed_churn(num_bytes); + } else if (execution_type == "FOREACH") { + postfix_churn(num_bytes); + } else if (execution_type == "INFINITE") { + infinite_churn(num_bytes); + } else { + std::cerr << "please provide a valid type of execution" << std::endl; + return EXIT_FAILURE; + } + + return EXIT_SUCCESS; +} diff --git a/test_programs/print_env.cpp b/test_programs/print_env.cpp new file mode 100644 index 0000000..9dc6f99 --- /dev/null +++ b/test_programs/print_env.cpp @@ -0,0 +1,16 @@ +/** + * Print environment variable values for each arg + * usage: ./print_env SHELL + * -> /usr/bin/bash + */ + +// to allow us to use getenv +#include +#include + +int main(int argc, char* argv[]) { + for (int i = 1; i < argc; ++i) { + char* env_val = getenv(argv[i]); + std::cout << argv[i] << "," << (env_val == nullptr ? "" : env_val) << std::endl; + } +} diff --git a/test_programs/tee_if_nonprime.cpp b/test_programs/tee_if_nonprime.cpp new file mode 100644 index 0000000..daff4f2 --- /dev/null +++ b/test_programs/tee_if_nonprime.cpp @@ -0,0 +1,38 @@ +/** + * Demo program to demonstrate recursive process piping + * All this does is output the number if it's non-prime, + * otherwise close program if it's prime. + * That should trigger the process hierarchy to collapse, + * and the result can be harvested. + */ + +#include +#include + +bool is_prime(int input) { + if (input <= 1) return false; + + // basic primality test, just check if any of the + // numbers up to sqrt(n) divide n + + int int_sqrt = (int) sqrt((double) input); + for (int dividor = 2; dividor <= int_sqrt; ++dividor) { + if (input % dividor == 0) return false; + } + + // reached here? + return true; +} + +int main() { + std::string line; + while (std::getline(std::cin, line)) { + if (!is_prime(std::stoi(line))) { + std::cout << line << std::endl; + } else { + break; + } + } + + return 0; +} diff --git a/testingstuff.cpp b/testingstuff.cpp new file mode 100644 index 0000000..c992b15 --- /dev/null +++ b/testingstuff.cpp @@ -0,0 +1,23 @@ +#include "subprocess.hpp" + +#include + +int main() { + subprocess::Process p("/bin/echo", {"asjdlksaj"}); + p.output_to_file("cool.out"); + subprocess::Process p2("/bin/grep", {"-o", "asj"}); + p2.output_to_file("nekcool.out"); + p.pipe_to(p2); +} + + + +// v1 - pipe_to(Process&& proc); +//p.pipe_to(Process("/bin/grep", {"-i", "cool"})); +//// equivalent to +//p.pipe_to({"/bin/grep", {"-i", "cool"}}); +// +//// v2 - pipe_to(Process& proc); +//Process p3("/bin/grep", {"-i", "cool"}); +//p.pipe_to(p3); +