Skip to main content

server (Source)

#!/usr/bin/python3
"""
Run a server in the current directory, or the first parent containing a .serverc file
The .serverc file contains settings:
    command = <command to run> (default: python3 -m http.server)
    cwd = <working directory to run the command in> (default: .)
    port = <port the server runs on (default: chosen at random from available ports)
    env = a list of environment variables to set, separated by spaces
    open = should a page be opened in the browser?
    open_url = the relative url to open in the browser
    delay = the number of seconds to wait before opening the browser
Once the server is running, a browser is opened.
"""
import argparse
import configparser
from contextlib import closing
import hashlib
from multiprocessing import Process
import os
from pathlib import Path
import socket
import subprocess
import sys
from time import sleep
try:
    from watchmake import WatchMaker
except ImportError as e:
    print(e)
    WatchMaker = None
basic_server_cmd = 'python3 /home/christian/bin/httpserver.py'
def port_is_in_use(port: int) -> bool:
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        return s.connect_ex(('localhost', port)) == 0
class ServerRunner(object):
    cwd = '.'
    cmd = None
    open = True
    open_url = '/'
    env_str = ''
    delay = 5
    def __init__(self,options):
        self.options = options
        d = Path('.').resolve()
        got_rc = False
        while d.parent!=d:
            if (d / '.serverc').exists():
                got_rc = True
                break
            d = d.parent
        if not got_rc:
            d = Path('.').resolve()
        self.d = d
        self.rcname = self.d / '.serverc'
    def find_free_port(self):
        h = hashlib.sha256(str(self.d).encode('utf-8')).digest()
        port = 0
        for c in h:
            port = (port*256 + c) % 5000
        port += 2000
        while port_is_in_use(port):
            port += 1
        return port
    def load_config(self):
        self.port = self.find_free_port()
        self.open = self.options.open
        self.config = configparser.ConfigParser()
        if self.rcname.exists():
            with open(self.rcname) as f:
                s = f.read()
                if not s.startswith('['):
                    s = '[settings]\n'+s
            self.config.read_string(s)
            if not self.config.has_section('settings'):
                self.config.add_section('settings')
            self.settings = self.config['settings']
            self.cmd = self.settings.get('command')
            self.cwd = self.settings.get('cwd', self.cwd)
            self.port = self.settings.getint('port',self.port)
            self.env_str = self.settings.get('env',self.env_str)
            self.open = self.settings.getboolean('open', self.open)
            self.open_url = self.settings.get('open_url',self.open_url)
            self.delay = self.settings.getfloat('delay',self.delay)
        else:
            self.config.add_section('settings')
        if self.options.port is not None:
            self.port = self.options.port
        if self.cmd is None:
            self.cmd = basic_server_cmd
            self.delay = 0
            self.cmd += f' {self.port}'
        else:
            self.cmd = self.cmd.format(port=self.port)
    def run(self):
        self.env = os.environ.copy()
        self.env.update({k:v for k,v in [b.split('=') for b in self.env_str.split(' ') if b]})
        self.env['PORT'] = str(self.port)
        self.run_server()
        self.run_watchmaker()
        
        if self.open:
            self.open_browser()
        try:
            self.server_process.wait()
        except KeyboardInterrupt:
            pass
        if self.watchmaker_process:
            self.watchmaker_process.terminate()
    def run_server(self):
        os.chdir(self.d)
        self.server_process = subprocess.Popen(self.cmd.format(**self.env).split(' '), env=self.env, cwd=self.cwd)
    def run_watchmaker(self):
        def target(d):
            try:
                wm = WatchMaker(d)
                wm.run()
            except Exception as e:
                print(e)
                pass
        if WatchMaker is not None and (self.d / '.watchmakerc').exists():
            self.watchmaker_process = Process(target=target,args=(self.d,))
            self.watchmaker_process.start()
        else:
            print("No watchmake")
            print(WatchMaker)
            print((self.d / '.watchmakerc'))
            self.watchmaker_process = None
    def get_full_open_url(self):
        open_url = self.open_url
        if not open_url.startswith('/'):
            open_url = '/' + open_url
        full_open_url = f'http://localhost:{self.port}{open_url}'
        return full_open_url
    def open_browser(self):
        sleep(self.delay)
        full_open_url = self.get_full_open_url()
        print("Opening",full_open_url)
        subprocess.run(['xdg-open',full_open_url])
    def set_config(self,values):
        for line in values:
            name,value = line.split('=')
            self.config.set('settings',name,value)
        with open(self.rcname,'w') as f:
            self.config.write(f)
    def show(self):
        print(self.get_full_open_url())
parser = argparse.ArgumentParser(description='Start a server here.')
parser.add_argument('--port',nargs=1,default=None,type=int, help="Override the port number")
parser.add_argument('-n', '--noopen',action='store_false',dest='open', help="Don't open in the browser")
parser.add_argument('--set',nargs='*', help='Set config options in .serverc')
parser.add_argument('--show', action='store_true', help='Print the URL of this server')
options = parser.parse_args()
runner = ServerRunner(options)
runner.load_config()
if options.set:
    print('set',options.set)
    runner.set_config(options.set)
elif options.show:
    runner.show()
else:
    runner.run()