import http.server import logging import os import sqlite3 import tempfile import db import templates import utils logger = logging.getLogger(__name__) conn = sqlite3.connect('db.sqlite3') files_directory = 'download' authorized_key = os.environ['KEY'] class MyServer(http.server.BaseHTTPRequestHandler): def do_GET(self): match self.path: case '/': self._serve_str(templates.index, 200, 'text/html') case '/main.js': self._serve_file('public/main.js', 'application/javascript') case '/main.css': self._serve_file('public/main.css', 'text/css') case path: prefix = f'/{files_directory}/' if path.startswith(prefix): file_id = path[len(prefix):] res = db.get_file(conn, file_id) if res is None: self._serve_str(templates.not_found, 404, 'text/html') else: filename, _, content_length = res path = os.path.join(files_directory, file_id) headers = [ ('Content-Disposition', f'attachment; filename={filename}'), ('Content-Length', content_length) ] self._serve_file(path, 'application/octet-stream', headers) else: file_id = path[1:] res = db.get_file(conn, file_id) if res is None: self._serve_str(templates.not_found, 404, 'text/html') else: filename, expires, _ = res href = os.path.join(files_directory, file_id) self._serve_str(templates.download(href, filename, expires), 200, 'text/html') def do_POST(self): key = self.headers['X-Key'] if not key == authorized_key: logging.info('Unauthorized to upload file: wrong key') self._serve_str('Unauthorized', 401) else: logging.info('Uploading file') content_length = int(self.headers['content-length']) filename = utils.sanitize_filename(self.headers['X-FileName']) expiration = self.headers['X-Expiration'] with tempfile.NamedTemporaryFile(delete = False) as tmp: utils.transfer(self.rfile, tmp, content_length = content_length) logging.info('File uploaded') file_id = db.insert_file(conn, filename, expiration, content_length) os.makedirs(files_directory, exist_ok=True) os.rename(tmp.name, os.path.join(files_directory, file_id)) self._serve_str(file_id, 200) def _serve_str(self, s, code, content_type='text/plain'): self.send_response(code) self.send_header('Content-type', content_type) self.end_headers() self.wfile.write(bytes(s, 'utf-8')) def _serve_file(self, filename, content_type, headers = []): self.send_response(200) self.send_header('Content-type', content_type) for header_name, header_value in headers: self.send_header(header_name, header_value) self.end_headers() with open(filename, 'rb') as f: utils.transfer(f, self.wfile)