PStreams
pstream.h
Go to the documentation of this file.
00001 /* $Id: pstream.h,v 1.112 2010/03/20 14:50:47 redi Exp $
00002 PStreams - POSIX Process I/O for C++
00003 Copyright (C) 2001,2002,2003,2004,2005,2006,2007,2008 Jonathan Wakely
00004 
00005 This file is part of PStreams.
00006 
00007 PStreams is free software; you can redistribute it and/or modify
00008 it under the terms of the GNU Lesser General Public License as published by
00009 the Free Software Foundation; either version 3 of the License, or
00010 (at your option) any later version.
00011 
00012 PStreams is distributed in the hope that it will be useful,
00013 but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 GNU Lesser General Public License for more details.
00016 
00017 You should have received a copy of the GNU Lesser General Public License
00018 along with this program.  If not, see <http://www.gnu.org/licenses/>.
00019 */
00020 
00030 #ifndef REDI_PSTREAM_H_SEEN
00031 #define REDI_PSTREAM_H_SEEN
00032 
00033 #include <ios>
00034 #include <streambuf>
00035 #include <istream>
00036 #include <ostream>
00037 #include <string>
00038 #include <vector>
00039 #include <algorithm>    // for min()
00040 #include <cerrno>       // for errno
00041 #include <cstddef>      // for size_t
00042 #include <cstdlib>      // for exit()
00043 #include <sys/types.h>  // for pid_t
00044 #include <sys/wait.h>   // for waitpid()
00045 #include <sys/ioctl.h>  // for ioctl() and FIONREAD
00046 #if defined(__sun)
00047 # include <sys/filio.h> // for FIONREAD on Solaris 2.5
00048 #endif
00049 #include <unistd.h>     // for pipe() fork() exec() and filedes functions
00050 #include <signal.h>     // for kill()
00051 #include <fcntl.h>      // for fcntl()
00052 #if REDI_EVISCERATE_PSTREAMS
00053 # include <stdio.h>     // for FILE, fdopen()
00054 #endif
00055 
00056 
00058 #define PSTREAMS_VERSION 0x0070   // 0.7.0
00059 
00073 namespace redi
00074 {
00076   struct pstreams
00077   {
00079     typedef std::ios_base::openmode           pmode;
00080 
00082     typedef std::vector<std::string>          argv_type;
00083 
00085     typedef int                               fd_type;
00086 
00087     static const pmode pstdin  = std::ios_base::out; 
00088     static const pmode pstdout = std::ios_base::in;  
00089     static const pmode pstderr = std::ios_base::app; 
00090 
00091   protected:
00092     enum { bufsz = 32 };  
00093     enum { pbsz  = 2 };   
00094   };
00095 
00097   template <typename CharT, typename Traits = std::char_traits<CharT> >
00098     class basic_pstreambuf
00099     : public std::basic_streambuf<CharT, Traits>
00100     , public pstreams
00101     {
00102     public:
00103       // Type definitions for dependent types
00104       typedef CharT                             char_type;
00105       typedef Traits                            traits_type;
00106       typedef typename traits_type::int_type    int_type;
00107       typedef typename traits_type::off_type    off_type;
00108       typedef typename traits_type::pos_type    pos_type;
00110       typedef fd_type                           fd_t;
00111 
00113       basic_pstreambuf();
00114 
00116       basic_pstreambuf(const std::string& command, pmode mode);
00117 
00119       basic_pstreambuf( const std::string& file,
00120                         const argv_type& argv,
00121                         pmode mode );
00122 
00124       ~basic_pstreambuf();
00125 
00127       basic_pstreambuf*
00128       open(const std::string& command, pmode mode);
00129 
00131       basic_pstreambuf*
00132       open(const std::string& file, const argv_type& argv, pmode mode);
00133 
00135       basic_pstreambuf*
00136       close();
00137 
00139       basic_pstreambuf*
00140       kill(int signal = SIGTERM);
00141 
00143       void
00144       peof();
00145 
00147       bool
00148       read_err(bool readerr = true);
00149 
00151       bool
00152       is_open() const;
00153 
00155       bool
00156       exited();
00157 
00158 #if REDI_EVISCERATE_PSTREAMS
00159 
00160       std::size_t
00161       fopen(FILE*& in, FILE*& out, FILE*& err);
00162 #endif
00163 
00165       int
00166       status() const;
00167 
00169       int
00170       error() const;
00171 
00172     protected:
00174       int_type
00175       overflow(int_type c);
00176 
00178       int_type
00179       underflow();
00180 
00182       int_type
00183       pbackfail(int_type c = traits_type::eof());
00184 
00186       int
00187       sync();
00188 
00190       std::streamsize
00191       xsputn(const char_type* s, std::streamsize n);
00192 
00194       std::streamsize
00195       write(const char_type* s, std::streamsize n);
00196 
00198       std::streamsize
00199       read(char_type* s, std::streamsize n);
00200 
00202       std::streamsize
00203       showmanyc();
00204 
00205     protected:
00207       enum buf_read_src { rsrc_out = 0, rsrc_err = 1 };
00208 
00210       pid_t
00211       fork(pmode mode);
00212 
00214       int
00215       wait(bool nohang = false);
00216 
00218       fd_type&
00219       wpipe();
00220 
00222       fd_type&
00223       rpipe();
00224 
00226       fd_type&
00227       rpipe(buf_read_src which);
00228 
00229       void
00230       create_buffers(pmode mode);
00231 
00232       void
00233       destroy_buffers(pmode mode);
00234 
00236       bool
00237       empty_buffer();
00238 
00239       bool
00240       fill_buffer(bool non_blocking = false);
00241 
00243       char_type*
00244       rbuffer();
00245 
00246       buf_read_src
00247       switch_read_buffer(buf_read_src);
00248 
00249     private:
00250       basic_pstreambuf(const basic_pstreambuf&);
00251       basic_pstreambuf& operator=(const basic_pstreambuf&);
00252 
00253       void
00254       init_rbuffers();
00255 
00256       pid_t         ppid_;        // pid of process
00257       fd_type       wpipe_;       // pipe used to write to process' stdin
00258       fd_type       rpipe_[2];    // two pipes to read from, stdout and stderr
00259       char_type*    wbuffer_;
00260       char_type*    rbuffer_[2];
00261       char_type*    rbufstate_[3];
00263       buf_read_src  rsrc_;
00264       int           status_;      // hold exit status of child process
00265       int           error_;       // hold errno if fork() or exec() fails
00266     };
00267 
00269   template <typename CharT, typename Traits = std::char_traits<CharT> >
00270     class pstream_common
00271     : virtual public std::basic_ios<CharT, Traits>
00272     , virtual public pstreams
00273     {
00274     protected:
00275       typedef basic_pstreambuf<CharT, Traits>       streambuf_type;
00276 
00277       typedef pstreams::pmode                       pmode;
00278       typedef pstreams::argv_type                   argv_type;
00279 
00281       pstream_common();
00282 
00284       pstream_common(const std::string& command, pmode mode);
00285 
00287       pstream_common(const std::string& file, const argv_type& argv, pmode mode);
00288 
00290       virtual
00291       ~pstream_common() = 0;
00292 
00294       void
00295       do_open(const std::string& command, pmode mode);
00296 
00298       void
00299       do_open(const std::string& file, const argv_type& argv, pmode mode);
00300 
00301     public:
00303       void
00304       close();
00305 
00307       bool
00308       is_open() const;
00309 
00311       const std::string&
00312       command() const;
00313 
00315       streambuf_type*
00316       rdbuf() const;
00317 
00318 #if REDI_EVISCERATE_PSTREAMS
00319 
00320       std::size_t
00321       fopen(FILE*& in, FILE*& out, FILE*& err);
00322 #endif
00323 
00324     protected:
00325       std::string       command_; 
00326       streambuf_type    buf_;     
00327     };
00328 
00329 
00340   template <typename CharT, typename Traits = std::char_traits<CharT> >
00341     class basic_ipstream
00342     : public std::basic_istream<CharT, Traits>
00343     , public pstream_common<CharT, Traits>
00344     , virtual public pstreams
00345     {
00346       typedef std::basic_istream<CharT, Traits>     istream_type;
00347       typedef pstream_common<CharT, Traits>         pbase_type;
00348 
00349       using pbase_type::buf_;  // declare name in this scope
00350 
00351       pmode readable(pmode mode)
00352       {
00353         if (!(mode & (pstdout|pstderr)))
00354           mode |= pstdout;
00355         return mode;
00356       }
00357 
00358     public:
00360       typedef typename pbase_type::pmode            pmode;
00361 
00363       typedef typename pbase_type::argv_type        argv_type;
00364 
00366       basic_ipstream()
00367       : istream_type(NULL), pbase_type()
00368       { }
00369 
00380       basic_ipstream(const std::string& command, pmode mode = pstdout)
00381       : istream_type(NULL), pbase_type(command, readable(mode))
00382       { }
00383 
00395       basic_ipstream( const std::string& file,
00396                       const argv_type& argv,
00397                       pmode mode = pstdout )
00398       : istream_type(NULL), pbase_type(file, argv, readable(mode))
00399       { }
00400 
00406       ~basic_ipstream()
00407       { }
00408 
00418       void
00419       open(const std::string& command, pmode mode = pstdout)
00420       {
00421         this->do_open(command, readable(mode));
00422       }
00423 
00434       void
00435       open( const std::string& file,
00436             const argv_type& argv,
00437             pmode mode = pstdout )
00438       {
00439         this->do_open(file, argv, readable(mode));
00440       }
00441 
00446       basic_ipstream&
00447       out()
00448       {
00449         this->buf_.read_err(false);
00450         return *this;
00451       }
00452 
00457       basic_ipstream&
00458       err()
00459       {
00460         this->buf_.read_err(true);
00461         return *this;
00462       }
00463     };
00464 
00465 
00475   template <typename CharT, typename Traits = std::char_traits<CharT> >
00476     class basic_opstream
00477     : public std::basic_ostream<CharT, Traits>
00478     , public pstream_common<CharT, Traits>
00479     , virtual public pstreams
00480     {
00481       typedef std::basic_ostream<CharT, Traits>     ostream_type;
00482       typedef pstream_common<CharT, Traits>         pbase_type;
00483 
00484       using pbase_type::buf_;  // declare name in this scope
00485 
00486     public:
00488       typedef typename pbase_type::pmode            pmode;
00489 
00491       typedef typename pbase_type::argv_type        argv_type;
00492 
00494       basic_opstream()
00495       : ostream_type(NULL), pbase_type()
00496       { }
00497 
00508       basic_opstream(const std::string& command, pmode mode = pstdin)
00509       : ostream_type(NULL), pbase_type(command, mode|pstdin)
00510       { }
00511 
00523       basic_opstream( const std::string& file,
00524                       const argv_type& argv,
00525                       pmode mode = pstdin )
00526       : ostream_type(NULL), pbase_type(file, argv, mode|pstdin)
00527       { }
00528 
00534       ~basic_opstream() { }
00535 
00545       void
00546       open(const std::string& command, pmode mode = pstdin)
00547       {
00548         this->do_open(command, mode|pstdin);
00549       }
00550 
00561       void
00562       open( const std::string& file,
00563             const argv_type& argv,
00564             pmode mode = pstdin)
00565       {
00566         this->do_open(file, argv, mode|pstdin);
00567       }
00568     };
00569 
00570 
00584   template <typename CharT, typename Traits = std::char_traits<CharT> >
00585     class basic_pstream
00586     : public std::basic_iostream<CharT, Traits>
00587     , public pstream_common<CharT, Traits>
00588     , virtual public pstreams
00589     {
00590       typedef std::basic_iostream<CharT, Traits>    iostream_type;
00591       typedef pstream_common<CharT, Traits>         pbase_type;
00592 
00593       using pbase_type::buf_;  // declare name in this scope
00594 
00595     public:
00597       typedef typename pbase_type::pmode            pmode;
00598 
00600       typedef typename pbase_type::argv_type        argv_type;
00601 
00603       basic_pstream()
00604       : iostream_type(NULL), pbase_type()
00605       { }
00606 
00617       basic_pstream(const std::string& command, pmode mode = pstdout|pstdin)
00618       : iostream_type(NULL), pbase_type(command, mode)
00619       { }
00620 
00632       basic_pstream( const std::string& file,
00633                      const argv_type& argv,
00634                      pmode mode = pstdout|pstdin )
00635       : iostream_type(NULL), pbase_type(file, argv, mode)
00636       { }
00637 
00643       ~basic_pstream() { }
00644 
00654       void
00655       open(const std::string& command, pmode mode = pstdout|pstdin)
00656       {
00657         this->do_open(command, mode);
00658       }
00659 
00670       void
00671       open( const std::string& file,
00672             const argv_type& argv,
00673             pmode mode = pstdout|pstdin )
00674       {
00675         this->do_open(file, argv, mode);
00676       }
00677 
00682       basic_pstream&
00683       out()
00684       {
00685         this->buf_.read_err(false);
00686         return *this;
00687       }
00688 
00693       basic_pstream&
00694       err()
00695       {
00696         this->buf_.read_err(true);
00697         return *this;
00698       }
00699     };
00700 
00701 
00723   template <typename CharT, typename Traits = std::char_traits<CharT> >
00724     class basic_rpstream
00725     : public std::basic_ostream<CharT, Traits>
00726     , private std::basic_istream<CharT, Traits>
00727     , private pstream_common<CharT, Traits>
00728     , virtual public pstreams
00729     {
00730       typedef std::basic_ostream<CharT, Traits>     ostream_type;
00731       typedef std::basic_istream<CharT, Traits>     istream_type;
00732       typedef pstream_common<CharT, Traits>         pbase_type;
00733 
00734       using pbase_type::buf_;  // declare name in this scope
00735 
00736     public:
00738       typedef typename pbase_type::pmode            pmode;
00739 
00741       typedef typename pbase_type::argv_type        argv_type;
00742 
00744       basic_rpstream()
00745       : ostream_type(NULL), istream_type(NULL), pbase_type()
00746       { }
00747 
00758       basic_rpstream(const std::string& command, pmode mode = pstdout|pstdin)
00759       : ostream_type(NULL) , istream_type(NULL) , pbase_type(command, mode)
00760       { }
00761 
00773       basic_rpstream( const std::string& file,
00774                       const argv_type& argv,
00775                       pmode mode = pstdout|pstdin )
00776       : ostream_type(NULL), istream_type(NULL), pbase_type(file, argv, mode)
00777       { }
00778 
00780       ~basic_rpstream() { }
00781 
00791       void
00792       open(const std::string& command, pmode mode = pstdout|pstdin)
00793       {
00794         this->do_open(command, mode);
00795       }
00796 
00807       void
00808       open( const std::string& file,
00809             const argv_type& argv,
00810             pmode mode = pstdout|pstdin )
00811       {
00812         this->do_open(file, argv, mode);
00813       }
00814 
00820       istream_type&
00821       out()
00822       {
00823         this->buf_.read_err(false);
00824         return *this;
00825       }
00826 
00832       istream_type&
00833       err()
00834       {
00835         this->buf_.read_err(true);
00836         return *this;
00837       }
00838     };
00839 
00840 
00842   typedef basic_pstreambuf<char> pstreambuf;
00844   typedef basic_ipstream<char> ipstream;
00846   typedef basic_opstream<char> opstream;
00848   typedef basic_pstream<char> pstream;
00850   typedef basic_rpstream<char> rpstream;
00851 
00852 
00865   template <typename C, typename T>
00866     inline std::basic_ostream<C,T>&
00867     peof(std::basic_ostream<C,T>& s)
00868     {
00869       typedef basic_pstreambuf<C,T> pstreambuf;
00870       if (pstreambuf* p = dynamic_cast<pstreambuf*>(s.rdbuf()))
00871         p->peof();
00872       return s;
00873     }
00874 
00875 
00876   /*
00877    * member definitions for pstreambuf
00878    */
00879 
00880 
00887   template <typename C, typename T>
00888     inline
00889     basic_pstreambuf<C,T>::basic_pstreambuf()
00890     : ppid_(-1)   // initialise to -1 to indicate no process run yet.
00891     , wpipe_(-1)
00892     , wbuffer_(NULL)
00893     , rsrc_(rsrc_out)
00894     , status_(-1)
00895     , error_(0)
00896     {
00897       init_rbuffers();
00898     }
00899 
00908   template <typename C, typename T>
00909     inline
00910     basic_pstreambuf<C,T>::basic_pstreambuf(const std::string& command, pmode mode)
00911     : ppid_(-1)   // initialise to -1 to indicate no process run yet.
00912     , wpipe_(-1)
00913     , wbuffer_(NULL)
00914     , rsrc_(rsrc_out)
00915     , status_(-1)
00916     , error_(0)
00917     {
00918       init_rbuffers();
00919       open(command, mode);
00920     }
00921 
00931   template <typename C, typename T>
00932     inline
00933     basic_pstreambuf<C,T>::basic_pstreambuf( const std::string& file,
00934                                              const argv_type& argv,
00935                                              pmode mode )
00936     : ppid_(-1)   // initialise to -1 to indicate no process run yet.
00937     , wpipe_(-1)
00938     , wbuffer_(NULL)
00939     , rsrc_(rsrc_out)
00940     , status_(-1)
00941     , error_(0)
00942     {
00943       init_rbuffers();
00944       open(file, argv, mode);
00945     }
00946 
00951   template <typename C, typename T>
00952     inline
00953     basic_pstreambuf<C,T>::~basic_pstreambuf()
00954     {
00955       close();
00956     }
00957 
00985   template <typename C, typename T>
00986     basic_pstreambuf<C,T>*
00987     basic_pstreambuf<C,T>::open(const std::string& command, pmode mode)
00988     {
00989       const char * shell_path = "/bin/sh";
00990 #if 0
00991       const std::string argv[] = { "sh", "-c", command };
00992       return this->open(shell_path, argv_type(argv, argv+3), mode);
00993 #else
00994       basic_pstreambuf<C,T>* ret = NULL;
00995 
00996       if (!is_open())
00997       {
00998         switch(fork(mode))
00999         {
01000         case 0 :
01001           // this is the new process, exec command
01002           ::execl(shell_path, "sh", "-c", command.c_str(), (char*)NULL);
01003 
01004           // can only reach this point if exec() failed
01005 
01006           // parent can get exit code from waitpid()
01007           ::_exit(errno);
01008           // using std::exit() would make static dtors run twice
01009 
01010         case -1 :
01011           // couldn't fork, error already handled in pstreambuf::fork()
01012           break;
01013 
01014         default :
01015           // this is the parent process
01016           // activate buffers
01017           create_buffers(mode);
01018           ret = this;
01019         }
01020       }
01021       return ret;
01022 #endif
01023     }
01024 
01033   inline void
01034   close_fd(pstreams::fd_type& fd)
01035   {
01036     if (fd >= 0 && ::close(fd) == 0)
01037       fd = -1;
01038   }
01039 
01050   template <int N>
01051     inline void
01052     close_fd_array(pstreams::fd_type (&fds)[N])
01053     {
01054       for (std::size_t i = 0; i < N; ++i)
01055         close_fd(fds[i]);
01056     }
01057 
01087   template <typename C, typename T>
01088     basic_pstreambuf<C,T>*
01089     basic_pstreambuf<C,T>::open( const std::string& file,
01090                                  const argv_type& argv,
01091                                  pmode mode )
01092     {
01093       basic_pstreambuf<C,T>* ret = NULL;
01094 
01095       if (!is_open())
01096       {
01097         // constants for read/write ends of pipe
01098         enum { RD, WR };
01099 
01100         // open another pipe and set close-on-exec
01101         fd_type ck_exec[] = { -1, -1 };
01102         if (-1 == ::pipe(ck_exec)
01103             || -1 == ::fcntl(ck_exec[RD], F_SETFD, FD_CLOEXEC)
01104             || -1 == ::fcntl(ck_exec[WR], F_SETFD, FD_CLOEXEC))
01105         {
01106           error_ = errno;
01107           close_fd_array(ck_exec);
01108         }
01109         else
01110         {
01111           switch(fork(mode))
01112           {
01113           case 0 :
01114             // this is the new process, exec command
01115             {
01116               char** arg_v = new char*[argv.size()+1];
01117               for (std::size_t i = 0; i < argv.size(); ++i)
01118               {
01119                 const std::string& src = argv[i];
01120                 char*& dest = arg_v[i];
01121                 dest = new char[src.size()+1];
01122                 dest[ src.copy(dest, src.size()) ] = '\0';
01123               }
01124               arg_v[argv.size()] = NULL;
01125 
01126               ::execvp(file.c_str(), arg_v);
01127 
01128               // can only reach this point if exec() failed
01129 
01130               // parent can get error code from ck_exec pipe
01131               error_ = errno;
01132 
01133               ::write(ck_exec[WR], &error_, sizeof(error_));
01134               ::close(ck_exec[WR]);
01135               ::close(ck_exec[RD]);
01136 
01137               ::_exit(error_);
01138               // using std::exit() would make static dtors run twice
01139             }
01140 
01141           case -1 :
01142             // couldn't fork, error already handled in pstreambuf::fork()
01143             close_fd_array(ck_exec);
01144             break;
01145 
01146           default :
01147             // this is the parent process
01148 
01149             // check child called exec() successfully
01150             ::close(ck_exec[WR]);
01151             switch (::read(ck_exec[RD], &error_, sizeof(error_)))
01152             {
01153             case 0:
01154               // activate buffers
01155               create_buffers(mode);
01156               ret = this;
01157               break;
01158             case -1:
01159               error_ = errno;
01160               break;
01161             default:
01162               // error_ contains error code from child
01163               // call wait() to clean up and set ppid_ to 0
01164               this->wait();
01165               break;
01166             }
01167             ::close(ck_exec[RD]);
01168           }
01169         }
01170       }
01171       return ret;
01172     }
01173 
01190   template <typename C, typename T>
01191     pid_t
01192     basic_pstreambuf<C,T>::fork(pmode mode)
01193     {
01194       pid_t pid = -1;
01195 
01196       // Three pairs of file descriptors, for pipes connected to the
01197       // process' stdin, stdout and stderr
01198       // (stored in a single array so close_fd_array() can close all at once)
01199       fd_type fd[] = { -1, -1, -1, -1, -1, -1 };
01200       fd_type* const pin = fd;
01201       fd_type* const pout = fd+2;
01202       fd_type* const perr = fd+4;
01203 
01204       // constants for read/write ends of pipe
01205       enum { RD, WR };
01206 
01207       // N.B.
01208       // For the pstreambuf pin is an output stream and
01209       // pout and perr are input streams.
01210 
01211       if (!error_ && mode&pstdin && ::pipe(pin))
01212         error_ = errno;
01213 
01214       if (!error_ && mode&pstdout && ::pipe(pout))
01215         error_ = errno;
01216 
01217       if (!error_ && mode&pstderr && ::pipe(perr))
01218         error_ = errno;
01219 
01220       if (!error_)
01221       {
01222         pid = ::fork();
01223         switch (pid)
01224         {
01225           case 0 :
01226           {
01227             // this is the new process
01228 
01229             // for each open pipe close one end and redirect the
01230             // respective standard stream to the other end
01231 
01232             if (*pin >= 0)
01233             {
01234               ::close(pin[WR]);
01235               ::dup2(pin[RD], STDIN_FILENO);
01236               ::close(pin[RD]);
01237             }
01238             if (*pout >= 0)
01239             {
01240               ::close(pout[RD]);
01241               ::dup2(pout[WR], STDOUT_FILENO);
01242               ::close(pout[WR]);
01243             }
01244             if (*perr >= 0)
01245             {
01246               ::close(perr[RD]);
01247               ::dup2(perr[WR], STDERR_FILENO);
01248               ::close(perr[WR]);
01249             }
01250             break;
01251           }
01252           case -1 :
01253           {
01254             // couldn't fork for some reason
01255             error_ = errno;
01256             // close any open pipes
01257             close_fd_array(fd);
01258             break;
01259           }
01260           default :
01261           {
01262             // this is the parent process, store process' pid
01263             ppid_ = pid;
01264 
01265             // store one end of open pipes and close other end
01266             if (*pin >= 0)
01267             {
01268               wpipe_ = pin[WR];
01269               ::close(pin[RD]);
01270             }
01271             if (*pout >= 0)
01272             {
01273               rpipe_[rsrc_out] = pout[RD];
01274               ::close(pout[WR]);
01275             }
01276             if (*perr >= 0)
01277             {
01278               rpipe_[rsrc_err] = perr[RD];
01279               ::close(perr[WR]);
01280             }
01281           }
01282         }
01283       }
01284       else
01285       {
01286         // close any pipes we opened before failure
01287         close_fd_array(fd);
01288       }
01289       return pid;
01290     }
01291 
01301   template <typename C, typename T>
01302     basic_pstreambuf<C,T>*
01303     basic_pstreambuf<C,T>::close()
01304     {
01305       const bool running = is_open();
01306 
01307       sync(); // this might call wait() and reap the child process
01308 
01309       // rather than trying to work out whether or not we need to clean up
01310       // just do it anyway, all cleanup functions are safe to call twice.
01311 
01312       destroy_buffers(pstdin|pstdout|pstderr);
01313 
01314       // close pipes before wait() so child gets EOF/SIGPIPE
01315       close_fd(wpipe_);
01316       close_fd_array(rpipe_);
01317 
01318       do
01319       {
01320         error_ = 0;
01321       } while (wait() == -1 && error() == EINTR);
01322 
01323       return running ? this : NULL;
01324     }
01325 
01329   template <typename C, typename T>
01330     inline void
01331     basic_pstreambuf<C,T>::init_rbuffers()
01332     {
01333       rpipe_[rsrc_out] = rpipe_[rsrc_err] = -1;
01334       rbuffer_[rsrc_out] = rbuffer_[rsrc_err] = NULL;
01335       rbufstate_[0] = rbufstate_[1] = rbufstate_[2] = NULL;
01336     }
01337 
01338   template <typename C, typename T>
01339     void
01340     basic_pstreambuf<C,T>::create_buffers(pmode mode)
01341     {
01342       if (mode & pstdin)
01343       {
01344         delete[] wbuffer_;
01345         wbuffer_ = new char_type[bufsz];
01346         this->setp(wbuffer_, wbuffer_ + bufsz);
01347       }
01348       if (mode & pstdout)
01349       {
01350         delete[] rbuffer_[rsrc_out];
01351         rbuffer_[rsrc_out] = new char_type[bufsz];
01352         rsrc_ = rsrc_out;
01353         this->setg(rbuffer_[rsrc_out] + pbsz, rbuffer_[rsrc_out] + pbsz,
01354             rbuffer_[rsrc_out] + pbsz);
01355       }
01356       if (mode & pstderr)
01357       {
01358         delete[] rbuffer_[rsrc_err];
01359         rbuffer_[rsrc_err] = new char_type[bufsz];
01360         if (!(mode & pstdout))
01361         {
01362           rsrc_ = rsrc_err;
01363           this->setg(rbuffer_[rsrc_err] + pbsz, rbuffer_[rsrc_err] + pbsz,
01364               rbuffer_[rsrc_err] + pbsz);
01365         }
01366       }
01367     }
01368 
01369   template <typename C, typename T>
01370     void
01371     basic_pstreambuf<C,T>::destroy_buffers(pmode mode)
01372     {
01373       if (mode & pstdin)
01374       {
01375         this->setp(NULL, NULL);
01376         delete[] wbuffer_;
01377         wbuffer_ = NULL;
01378       }
01379       if (mode & pstdout)
01380       {
01381         if (rsrc_ == rsrc_out)
01382           this->setg(NULL, NULL, NULL);
01383         delete[] rbuffer_[rsrc_out];
01384         rbuffer_[rsrc_out] = NULL;
01385       }
01386       if (mode & pstderr)
01387       {
01388         if (rsrc_ == rsrc_err)
01389           this->setg(NULL, NULL, NULL);
01390         delete[] rbuffer_[rsrc_err];
01391         rbuffer_[rsrc_err] = NULL;
01392       }
01393     }
01394 
01395   template <typename C, typename T>
01396     typename basic_pstreambuf<C,T>::buf_read_src
01397     basic_pstreambuf<C,T>::switch_read_buffer(buf_read_src src)
01398     {
01399       if (rsrc_ != src)
01400       {
01401         char_type* tmpbufstate[] = {this->eback(), this->gptr(), this->egptr()};
01402         this->setg(rbufstate_[0], rbufstate_[1], rbufstate_[2]);
01403         for (std::size_t i = 0; i < 3; ++i)
01404           rbufstate_[i] = tmpbufstate[i];
01405         rsrc_ = src;
01406       }
01407       return rsrc_;
01408     }
01409 
01426   template <typename C, typename T>
01427     int
01428     basic_pstreambuf<C,T>::wait(bool nohang)
01429     {
01430       int exited = -1;
01431       if (is_open())
01432       {
01433         int status;
01434         switch(::waitpid(ppid_, &status, nohang ? WNOHANG : 0))
01435         {
01436           case 0 :
01437             // nohang was true and process has not exited
01438             exited = 0;
01439             break;
01440           case -1 :
01441             error_ = errno;
01442             break;
01443           default :
01444             // process has exited
01445             ppid_ = 0;
01446             status_ = status;
01447             exited = 1;
01448             // Close wpipe, would get SIGPIPE if we used it.
01449             destroy_buffers(pstdin);
01450             close_fd(wpipe_);
01451             // Must free read buffers and pipes on destruction
01452             // or next call to open()/close()
01453             break;
01454         }
01455       }
01456       return exited;
01457     }
01458 
01469   template <typename C, typename T>
01470     inline basic_pstreambuf<C,T>*
01471     basic_pstreambuf<C,T>::kill(int signal)
01472     {
01473       basic_pstreambuf<C,T>* ret = NULL;
01474       if (is_open())
01475       {
01476         if (::kill(ppid_, signal))
01477           error_ = errno;
01478         else
01479         {
01480 #if 0
01481           // TODO call exited() to check for exit and clean up? leave to user?
01482           if (signal==SIGTERM || signal==SIGKILL)
01483             this->exited();
01484 #endif
01485           ret = this;
01486         }
01487       }
01488       return ret;
01489     }
01490 
01498   template <typename C, typename T>
01499     inline bool
01500     basic_pstreambuf<C,T>::exited()
01501     {
01502       return ppid_ == 0 || wait(true)==1;
01503     }
01504 
01505 
01511   template <typename C, typename T>
01512     inline int
01513     basic_pstreambuf<C,T>::status() const
01514     {
01515       return status_;
01516     }
01517 
01521   template <typename C, typename T>
01522     inline int
01523     basic_pstreambuf<C,T>::error() const
01524     {
01525       return error_;
01526     }
01527 
01532   template <typename C, typename T>
01533     inline void
01534     basic_pstreambuf<C,T>::peof()
01535     {
01536       sync();
01537       destroy_buffers(pstdin);
01538       close_fd(wpipe_);
01539     }
01540 
01551   template <typename C, typename T>
01552     inline bool
01553     basic_pstreambuf<C,T>::is_open() const
01554     {
01555       return ppid_ > 0;
01556     }
01557 
01566   template <typename C, typename T>
01567     inline bool
01568     basic_pstreambuf<C,T>::read_err(bool readerr)
01569     {
01570       buf_read_src src = readerr ? rsrc_err : rsrc_out;
01571       if (rpipe_[src]>=0)
01572       {
01573         switch_read_buffer(src);
01574         return true;
01575       }
01576       return false;
01577     }
01578 
01589   template <typename C, typename T>
01590     typename basic_pstreambuf<C,T>::int_type
01591     basic_pstreambuf<C,T>::overflow(int_type c)
01592     {
01593       if (!empty_buffer())
01594         return traits_type::eof();
01595       else if (!traits_type::eq_int_type(c, traits_type::eof()))
01596         return this->sputc(c);
01597       else
01598         return traits_type::not_eof(c);
01599     }
01600 
01601 
01602   template <typename C, typename T>
01603     int
01604     basic_pstreambuf<C,T>::sync()
01605     {
01606       return !exited() && empty_buffer() ? 0 : -1;
01607     }
01608 
01614   template <typename C, typename T>
01615     std::streamsize
01616     basic_pstreambuf<C,T>::xsputn(const char_type* s, std::streamsize n)
01617     {
01618       if (n < this->epptr() - this->pptr())
01619       {
01620         traits_type::copy(this->pptr(), s, n);
01621         this->pbump(n);
01622         return n;
01623       }
01624       else
01625       {
01626         for (std::streamsize i = 0; i < n; ++i)
01627         {
01628           if (traits_type::eq_int_type(this->sputc(s[i]), traits_type::eof()))
01629             return i;
01630         }
01631         return n;
01632       }
01633     }
01634 
01638   template <typename C, typename T>
01639     bool
01640     basic_pstreambuf<C,T>::empty_buffer()
01641     {
01642       const std::streamsize count = this->pptr() - this->pbase();
01643       if (count > 0)
01644       {
01645         const std::streamsize written = this->write(this->wbuffer_, count);
01646         if (written > 0)
01647         {
01648           if (const std::streamsize unwritten = count - written)
01649             traits_type::move(this->pbase(), this->pbase()+written, unwritten);
01650           this->pbump(-written);
01651           return true;
01652         }
01653       }
01654       return false;
01655     }
01656 
01664   template <typename C, typename T>
01665     typename basic_pstreambuf<C,T>::int_type
01666     basic_pstreambuf<C,T>::underflow()
01667     {
01668       if (this->gptr() < this->egptr() || fill_buffer())
01669         return traits_type::to_int_type(*this->gptr());
01670       else
01671         return traits_type::eof();
01672     }
01673 
01682   template <typename C, typename T>
01683     typename basic_pstreambuf<C,T>::int_type
01684     basic_pstreambuf<C,T>::pbackfail(int_type c)
01685     {
01686       if (this->gptr() != this->eback())
01687       {
01688         this->gbump(-1);
01689         if (!traits_type::eq_int_type(c, traits_type::eof()))
01690           *this->gptr() = traits_type::to_char_type(c);
01691         return traits_type::not_eof(c);
01692       }
01693       else
01694          return traits_type::eof();
01695     }
01696 
01697   template <typename C, typename T>
01698     std::streamsize
01699     basic_pstreambuf<C,T>::showmanyc()
01700     {
01701       int avail = 0;
01702       if (sizeof(char_type) == 1)
01703         avail = fill_buffer(true) ? this->egptr() - this->gptr() : -1;
01704 #ifdef FIONREAD
01705       else
01706       {
01707         if (::ioctl(rpipe(), FIONREAD, &avail) == -1)
01708           avail = -1;
01709         else if (avail)
01710           avail /= sizeof(char_type);
01711       }
01712 #endif
01713       return std::streamsize(avail);
01714     }
01715 
01719   template <typename C, typename T>
01720     bool
01721     basic_pstreambuf<C,T>::fill_buffer(bool non_blocking)
01722     {
01723       const std::streamsize pb1 = this->gptr() - this->eback();
01724       const std::streamsize pb2 = pbsz;
01725       const std::streamsize npb = std::min(pb1, pb2);
01726 
01727       char_type* const rbuf = rbuffer();
01728 
01729       traits_type::move(rbuf + pbsz - npb, this->gptr() - npb, npb);
01730 
01731       std::streamsize rc = -1;
01732 
01733       if (non_blocking)
01734       {
01735         const int flags = ::fcntl(rpipe(), F_GETFL);
01736         if (flags != -1)
01737         {
01738           const bool blocking = !(flags & O_NONBLOCK);
01739           if (blocking)
01740             ::fcntl(rpipe(), F_SETFL, flags | O_NONBLOCK);  // set non-blocking
01741 
01742           error_ = 0;
01743           rc = read(rbuf + pbsz, bufsz - pbsz);
01744 
01745           if (rc == -1 && error_ == EAGAIN)  // nothing available
01746             rc = 0;
01747           else if (rc == 0)  // EOF
01748             rc = -1;
01749 
01750           if (blocking)
01751             ::fcntl(rpipe(), F_SETFL, flags); // restore
01752         }
01753       }
01754       else
01755         rc = read(rbuf + pbsz, bufsz - pbsz);
01756 
01757       if (rc > 0 || (rc == 0 && non_blocking))
01758       {
01759         this->setg( rbuf + pbsz - npb,
01760                     rbuf + pbsz,
01761                     rbuf + pbsz + rc );
01762         return true;
01763       }
01764       else
01765       {
01766         this->setg(NULL, NULL, NULL);
01767         return false;
01768       }
01769     }
01770 
01778   template <typename C, typename T>
01779     inline std::streamsize
01780     basic_pstreambuf<C,T>::write(const char_type* s, std::streamsize n)
01781     {
01782       std::streamsize nwritten = 0;
01783       if (wpipe() >= 0)
01784       {
01785         nwritten = ::write(wpipe(), s, n * sizeof(char_type));
01786         if (nwritten == -1)
01787           error_ = errno;
01788         else
01789           nwritten /= sizeof(char_type);
01790       }
01791       return nwritten;
01792     }
01793 
01801   template <typename C, typename T>
01802     inline std::streamsize
01803     basic_pstreambuf<C,T>::read(char_type* s, std::streamsize n)
01804     {
01805       std::streamsize nread = 0;
01806       if (rpipe() >= 0)
01807       {
01808         nread = ::read(rpipe(), s, n * sizeof(char_type));
01809         if (nread == -1)
01810           error_ = errno;
01811         else
01812           nread /= sizeof(char_type);
01813       }
01814       return nread;
01815     }
01816 
01818   template <typename C, typename T>
01819     inline typename basic_pstreambuf<C,T>::fd_type&
01820     basic_pstreambuf<C,T>::wpipe()
01821     {
01822       return wpipe_;
01823     }
01824 
01826   template <typename C, typename T>
01827     inline typename basic_pstreambuf<C,T>::fd_type&
01828     basic_pstreambuf<C,T>::rpipe()
01829     {
01830       return rpipe_[rsrc_];
01831     }
01832 
01834   template <typename C, typename T>
01835     inline typename basic_pstreambuf<C,T>::fd_type&
01836     basic_pstreambuf<C,T>::rpipe(buf_read_src which)
01837     {
01838       return rpipe_[which];
01839     }
01840 
01842   template <typename C, typename T>
01843     inline typename basic_pstreambuf<C,T>::char_type*
01844     basic_pstreambuf<C,T>::rbuffer()
01845     {
01846       return rbuffer_[rsrc_];
01847     }
01848 
01849 
01850   /*
01851    * member definitions for pstream_common
01852    */
01853 
01863   template <typename C, typename T>
01864     inline
01865     pstream_common<C,T>::pstream_common()
01866     : std::basic_ios<C,T>(NULL)
01867     , command_()
01868     , buf_()
01869     {
01870       this->init(&buf_);
01871     }
01872 
01881   template <typename C, typename T>
01882     inline
01883     pstream_common<C,T>::pstream_common(const std::string& command, pmode mode)
01884     : std::basic_ios<C,T>(NULL)
01885     , command_(command)
01886     , buf_()
01887     {
01888       this->init(&buf_);
01889       do_open(command, mode);
01890     }
01891 
01901   template <typename C, typename T>
01902     inline
01903     pstream_common<C,T>::pstream_common( const std::string& file,
01904                                          const argv_type& argv,
01905                                          pmode mode )
01906     : std::basic_ios<C,T>(NULL)
01907     , command_(file)
01908     , buf_()
01909     {
01910       this->init(&buf_);
01911       do_open(file, argv, mode);
01912     }
01913 
01923   template <typename C, typename T>
01924     inline
01925     pstream_common<C,T>::~pstream_common()
01926     {
01927     }
01928 
01937   template <typename C, typename T>
01938     inline void
01939     pstream_common<C,T>::do_open(const std::string& command, pmode mode)
01940     {
01941       if (!buf_.open((command_=command), mode))
01942         this->setstate(std::ios_base::failbit);
01943     }
01944 
01954   template <typename C, typename T>
01955     inline void
01956     pstream_common<C,T>::do_open( const std::string& file,
01957                                   const argv_type& argv,
01958                                   pmode mode )
01959     {
01960       if (!buf_.open((command_=file), argv, mode))
01961         this->setstate(std::ios_base::failbit);
01962     }
01963 
01965   template <typename C, typename T>
01966     inline void
01967     pstream_common<C,T>::close()
01968     {
01969       if (!buf_.close())
01970         this->setstate(std::ios_base::failbit);
01971     }
01972 
01977   template <typename C, typename T>
01978     inline bool
01979     pstream_common<C,T>::is_open() const
01980     {
01981       return buf_.is_open();
01982     }
01983 
01985   template <typename C, typename T>
01986     inline const std::string&
01987     pstream_common<C,T>::command() const
01988     {
01989       return command_;
01990     }
01991 
01993   // TODO  document behaviour if buffer replaced.
01994   template <typename C, typename T>
01995     inline typename pstream_common<C,T>::streambuf_type*
01996     pstream_common<C,T>::rdbuf() const
01997     {
01998       return const_cast<streambuf_type*>(&buf_);
01999     }
02000 
02001 
02002 #if REDI_EVISCERATE_PSTREAMS
02003 
02035   template <typename C, typename T>
02036     std::size_t
02037     basic_pstreambuf<C,T>::fopen(FILE*& in, FILE*& out, FILE*& err)
02038     {
02039       in = out = err = NULL;
02040       std::size_t open_files = 0;
02041       if (wpipe() > -1)
02042       {
02043         if ((in = ::fdopen(wpipe(), "w")))
02044         {
02045             open_files |= pstdin;
02046         }
02047       }
02048       if (rpipe(rsrc_out) > -1)
02049       {
02050         if ((out = ::fdopen(rpipe(rsrc_out), "r")))
02051         {
02052             open_files |= pstdout;
02053         }
02054       }
02055       if (rpipe(rsrc_err) > -1)
02056       {
02057         if ((err = ::fdopen(rpipe(rsrc_err), "r")))
02058         {
02059             open_files |= pstderr;
02060         }
02061       }
02062       return open_files;
02063     }
02064 
02075   template <typename C, typename T>
02076     inline std::size_t
02077     pstream_common<C,T>::fopen(FILE*& fin, FILE*& fout, FILE*& ferr)
02078     {
02079       return buf_.fopen(fin, fout, ferr);
02080     }
02081 
02082 #endif // REDI_EVISCERATE_PSTREAMS
02083 
02084 
02085 } // namespace redi
02086 
02092 #endif  // REDI_PSTREAM_H_SEEN
02093 
02094 // vim: ts=2 sw=2 expandtab
02095