import sys from flask import Flask, request, Response, send_file import os import glob import json import tempfile import mmap import struct import time import threading import math import atexit import hashlib import hmac try: from scipy.io import savemat SCIPY_OK = True except: SCIPY_OK = False app = Flask(__name__) MUX_SETTLE_SEC = 0.00001 PWM_PERIOD_SEC = 0.001 LOG_PERIOD_SEC = 0.001 TWO_PI_OVER_3 = 2.0 * math.pi / 3.0 ALT_STM_OFST = 0xFC000000 ALT_LWFPGASLVS_OFST = 0xFF200000 HW_REGS_BASE = ALT_STM_OFST HW_REGS_SPAN = 0x04000000 HW_REGS_MASK = HW_REGS_SPAN - 1 INPORT_BASE = 0x50000 OUTPORT_BASE = 0x60000 MUX_INPORT_BASE = 0x52000 MUX_OUTPORT_BASE = 0x51000 PH1_DEG = 111.8185 PH2_DEG = -68.1815 P1 = PH1_DEG * math.pi / 180.0 P2 = PH2_DEG * math.pi / 180.0 CP1 = math.cos(P1) CP2 = math.cos(P2) SP1 = math.sin(P1) SP2 = math.sin(P2) C1 = 1.0 + CP1 * CP1 + CP2 * CP2 C2 = SP1 * CP1 + SP2 * CP2 C3 = SP1 * SP1 + SP2 * SP2 PH1_DEGp = 120 PH2_DEGp = -120 Pp1 = PH1_DEGp * math.pi / 180.0 Pp2 = PH2_DEGp * math.pi / 180.0 CPp1 = math.cos(Pp1) CPp2 = math.cos(Pp2) SPp1 = math.sin(Pp1) SPp2 = math.sin(Pp2) Cp1 = 1.0 + CPp1 * CPp1 + CPp2 * CPp2 Cp2 = SPp1 * CPp1 + SPp2 * CPp2 Cp3 = SPp1 * SPp1 + SPp2 * SPp2 ANGLE_OFFSET = float(sys.argv[1]) if len(sys.argv) > 1 else 0.15 g = 0.5026 sx = 1.6682 sy = 0.9982 ox = 0.0130419 oy = -0.001193 wwerot = 33.3775 * (math.pi / 180.0) wwCP = math.cos(wwerot) wwSP = math.sin(wwerot) LOG_DIR = "." PREFIX = "adc_log" MAX_EXPERIMENT_SEC = 30.0 AUTH_KEYS = { "admin1": "CAMBIA_ESTA_CLAVE_SUPER_LARGA_123456789" } AUTH_WINDOW_SEC = 30 NONCE_TTL_SEC = 90 run_flag = True threads_started = False io_lock = threading.Lock() state_lock = threading.Lock() auth_lock = threading.Lock() experiment_lock = threading.Lock() used_nonces = {} current_log_file = None state = { "amp": 200.0, "frq": 1.0, "typ": 1, "run": False, "record": False, "theta": 0.0, "theta_u": 0.0, "theta_prev": 0.0, "theta_vel": 0.0, "adc_s1": 0, "adc_s2": 0, "adc_s3": 0, "pwm_a": 512.0, "pwm_b": 512.0, "pwm_c": 512.0, "busy": False, "exp_start": 0.0, "exp_owner": "", "last_file": "" } fd = os.open("/dev/mem", os.O_RDWR | os.O_SYNC) mem = mmap.mmap(fd, HW_REGS_SPAN, mmap.MAP_SHARED, mmap.PROT_READ | mmap.PROT_WRITE, offset=HW_REGS_BASE) def vaddr(base): return (ALT_LWFPGASLVS_OFST + base) & HW_REGS_MASK def write_reg(addr, value): mem.seek(addr) mem.write(struct.pack('I', int(value) & 0xFFFFFFFF)) def read_reg(addr): mem.seek(addr) return struct.unpack('I', mem.read(4))[0] input_port = vaddr(INPORT_BASE) output_port = vaddr(OUTPORT_BASE) mux_in = vaddr(MUX_INPORT_BASE) mux_out = vaddr(MUX_OUTPORT_BASE) def json_response(obj, status=200): return Response(json.dumps(obj), mimetype="application/json", status=status) def clip(v): v = int(v) if v < 0: return 0 if v > 1023: return 1023 return v def wrap_pi(x): while x > math.pi: x -= 2.0 * math.pi while x < -math.pi: x += 2.0 * math.pi return x def read_adc(ch): write_reg(mux_in, ch) time.sleep(MUX_SETTLE_SEC) return read_reg(input_port) def read_adc8(): with io_lock: return [ read_adc(137), read_adc(138), read_adc(139), read_adc(140), read_adc(141), read_adc(142), read_adc(143), read_adc(144) ] def write_pwm(a, b, c): a = clip(a) b = clip(b) c = clip(c) value = (c << 20) | (b << 10) | a with io_lock: write_reg(mux_out, 134) time.sleep(MUX_SETTLE_SEC) write_reg(output_port, value) def center(): write_pwm(512, 512, 512) def clarke_corregido(s1, s2, s3): a = s1 + s2 * CP1 + s3 * CP2 b = s2 * SP1 + s3 * SP2 u = C3 * a - C2 * b v = -C2 * a + C1 * b return math.atan2(u, v) - ANGLE_OFFSET def clarke_corregido2(s1, s2, s3): a = s1 + s2 * CP1 + s3 * CP2 b = s2 * SP1 + s3 * SP2 uu = C3 * a - C2 * b vv = -C2 * a + C1 * b u_orig = uu v_orig = vv u_rot = wwCP * u_orig - wwSP * v_orig v_rot = wwSP * u_orig + wwCP * v_orig uu = g * (sx * u_rot) + ox vv = g * (sy * v_rot) + oy return math.atan2(uu, vv) - ANGLE_OFFSET def clarke_corregido3(s1, s2, s3): a = s1 + s2 * CPp1 + s3 * CPp2 b = s2 * SPp1 + s3 * SPp2 u = Cp3 * a - Cp2 * b v = -Cp2 * a + Cp1 * b return math.atan2(u, v) - ANGLE_OFFSET def update_theta_from_adc(raw1, raw2, raw3, dt): s1 = (((float(raw1) - 783) / (2024 - 783)) * 2) - 1 s2 = (((float(raw2) - 620) / (1580 - 620)) * 2) - 1 s3 = (((float(raw3) - 712) / (1748 - 712)) * 2) - 1 th = clarke_corregido(s1, s2, s3) with state_lock: prev = state["theta"] prev_u = state["theta_u"] d = wrap_pi(th - prev) th_u = prev_u + d vel = 0.0 if dt > 0.0: vel = d / dt with state_lock: state["theta_prev"] = prev state["theta"] = th state["theta_u"] = th_u state["theta_vel"] = vel state["adc_s1"] = raw1 state["adc_s2"] = raw2 state["adc_s3"] = raw3 return th, th_u, vel def list_log_files(): pattern1 = os.path.join(LOG_DIR, PREFIX + "*.txt") pattern2 = os.path.join(LOG_DIR, PREFIX + "*.csv") files = glob.glob(pattern1) + glob.glob(pattern2) files = [os.path.basename(x) for x in files] files.sort() return files def parse_log_file(path): with open(path, "r") as f: lines = [line.strip() for line in f if line.strip()] result = { "t_ms": [], "A137": [], "A138": [], "A139": [], "A140": [], "A141": [], "A142": [], "A143": [], "A144": [], "theta": [], "theta_u": [], "theta_vel": [], "pwm_a": [], "pwm_b": [], "pwm_c": [] } if not lines: return result header = [x.strip() for x in lines[0].split(",")] data = {} for h in header: data[h] = [] for line in lines[1:]: parts = [x.strip() for x in line.split(",")] if len(parts) != len(header): continue vals = [] ok = True for p in parts: try: vals.append(float(p)) except: ok = False break if not ok: continue for i in range(len(header)): data[header[i]].append(vals[i]) for k in result.keys(): if k in data: result[k] = data[k] return result def build_mat_file_from_log(safe_name): if not SCIPY_OK: return None, "scipy no esta instalado" path = os.path.join(LOG_DIR, safe_name) if not os.path.isfile(path): return None, "archivo no encontrado" data = parse_log_file(path) mat_data = { "t_ms": data["t_ms"], "A137": data["A137"], "A138": data["A138"], "A139": data["A139"], "A140": data["A140"], "A141": data["A141"], "A142": data["A142"], "A143": data["A143"], "A144": data["A144"], "theta": data["theta"], "theta_u": data["theta_u"], "theta_vel": data["theta_vel"], "pwm_a": data["pwm_a"], "pwm_b": data["pwm_b"], "pwm_c": data["pwm_c"] } base = os.path.splitext(safe_name)[0] temp_path = os.path.join(tempfile.gettempdir(), base + ".mat") savemat(temp_path, mat_data) return temp_path, None def get_state_copy(): with state_lock: return { "amp": float(state["amp"]), "frq": float(state["frq"]), "typ": int(state["typ"]), "run": bool(state["run"]), "record": bool(state["record"]), "theta": float(state["theta"]), "theta_u": float(state["theta_u"]), "theta_prev": float(state["theta_prev"]), "theta_vel": float(state["theta_vel"]), "adc_s1": int(state["adc_s1"]), "adc_s2": int(state["adc_s2"]), "adc_s3": int(state["adc_s3"]), "pwm_a": float(state["pwm_a"]), "pwm_b": float(state["pwm_b"]), "pwm_c": float(state["pwm_c"]), "busy": bool(state["busy"]), "exp_start": float(state["exp_start"]), "exp_owner": str(state["exp_owner"]), "last_file": str(state["last_file"]) } def cleanup_nonces(): now = time.time() expired = [] for nonce, ts in used_nonces.items(): if now - ts > NONCE_TTL_SEC: expired.append(nonce) for nonce in expired: del used_nonces[nonce] def get_raw_body(): data = request.get_data() if data is None: return b"" return data def verify_hmac(): key_id = request.headers.get("X-Auth-Key", "") ts_str = request.headers.get("X-Auth-Timestamp", "") nonce = request.headers.get("X-Auth-Nonce", "") client_sig = request.headers.get("X-Auth-Signature", "") if (not key_id) or (not ts_str) or (not nonce) or (not client_sig): return False, "faltan headers de autenticacion" if key_id not in AUTH_KEYS: return False, "key invalida" try: ts = int(ts_str) except: return False, "timestamp invalido" now = int(time.time()) if abs(now - ts) > AUTH_WINDOW_SEC: return False, "timestamp fuera de ventana" with auth_lock: cleanup_nonces() if nonce in used_nonces: return False, "nonce repetido" used_nonces[nonce] = time.time() body = get_raw_body() if not isinstance(body, bytes): try: body = body.encode("utf-8") except: body = b"" body_hash = hashlib.sha256(body).hexdigest() payload = "\n".join([ request.method.upper(), request.path, ts_str, nonce, body_hash ]).encode("utf-8") secret = AUTH_KEYS[key_id].encode("utf-8") expected = hmac.new(secret, payload, hashlib.sha256).hexdigest() if not hmac.compare_digest(expected, client_sig): return False, "firma invalida" request.auth_key_id = key_id return True, "ok" @app.before_request def require_auth(): ok, msg = verify_hmac() if not ok: return json_response({"ok": False, "error": msg}, 401) def close_all(): global run_flag run_flag = False time.sleep(0.05) try: center() except: pass try: mem.close() os.close(fd) except: pass def loop_pwm(): last_t = time.time() pwm_prev = { "a": 512.0, "b": 512.0, "c": 512.0 } ts = 0 while run_flag: now = time.time() dt = now - last_t last_t = now vals = read_adc8() raw1 = vals[3] raw2 = vals[6] raw3 = vals[7] th, th_u, vel = update_theta_from_adc(raw1, raw2, raw3, dt) with state_lock: active = state["run"] amp = state["amp"] typ = state["typ"] frq = state["frq"] exp_start = state["exp_start"] if active: if exp_start > 0.0: if (time.time() - exp_start) >= MAX_EXPERIMENT_SEC: with state_lock: state["run"] = False state["record"] = False state["busy"] = False state["exp_start"] = 0.0 state["exp_owner"] = "" center() with state_lock: state["pwm_a"] = 512.0 state["pwm_b"] = 512.0 state["pwm_c"] = 512.0 time.sleep(PWM_PERIOD_SEC) continue ts += 1 if typ == 1: asin = 1 a_obj = 512 + amp * asin * math.sin(th) b_obj = 512 + amp * asin * math.sin(th + TWO_PI_OVER_3) c_obj = 512 + amp * asin * math.sin(th - TWO_PI_OVER_3) elif typ == 2: asin = math.sin(ts * frq * 0.004672) a_obj = 512 + amp * asin * math.sin(th) b_obj = 512 + amp * asin * math.sin(th + TWO_PI_OVER_3) c_obj = 512 + amp * asin * math.sin(th - TWO_PI_OVER_3) elif typ == 3: asin = math.asin(math.sin(ts * frq * 0.004672)) / 1.57 a_obj = 512 + amp * asin * math.sin(th) b_obj = 512 + amp * asin * math.sin(th + TWO_PI_OVER_3) c_obj = 512 + amp * asin * math.sin(th - TWO_PI_OVER_3) else: a_obj = 512 b_obj = 512 c_obj = 512 a = a_obj b = b_obj c = c_obj pwm_prev["a"] = a pwm_prev["b"] = b pwm_prev["c"] = c write_pwm(a, b, c) with state_lock: state["pwm_a"] = float(a) state["pwm_b"] = float(b) state["pwm_c"] = float(c) else: pwm_prev["a"] = 512.0 pwm_prev["b"] = 512.0 pwm_prev["c"] = 512.0 center() with state_lock: state["pwm_a"] = 512.0 state["pwm_b"] = 512.0 state["pwm_c"] = 512.0 time.sleep(PWM_PERIOD_SEC) def loop_log(): global current_log_file f = None t0 = 0.0 while run_flag: with state_lock: rec = state["record"] if rec and f is None: name = "adc_log_%s.txt" % time.strftime("%Y%m%d_%H%M%S") path = os.path.join(LOG_DIR, name) f = open(path, "w") f.write("t_ms,A137,A138,A139,A140,A141,A142,A143,A144,theta,theta_u,theta_vel,pwm_a,pwm_b,pwm_c\n") t0 = time.time() current_log_file = name with state_lock: state["last_file"] = name print("grabando en:", name) if (not rec) and f is not None: f.close() f = None print("grabacion detenida") if f is not None: vals = read_adc8() t_ms = int((time.time() - t0) * 1000.0) raw1 = vals[3] raw2 = vals[6] raw3 = vals[7] update_theta_from_adc(raw1, raw2, raw3, LOG_PERIOD_SEC) with state_lock: th = state["theta"] th_u = state["theta_u"] vel = state["theta_vel"] pwm_a = state["pwm_a"] pwm_b = state["pwm_b"] pwm_c = state["pwm_c"] f.write("%d,%d,%d,%d,%d,%d,%d,%d,%d,%.9f,%.9f,%.9f,%.9f,%.9f,%.9f\n" % ( t_ms, vals[0], vals[1], vals[2], vals[3], vals[4], vals[5], vals[6], vals[7], th, th_u, vel, pwm_a, pwm_b, pwm_c )) f.flush() time.sleep(LOG_PERIOD_SEC) else: time.sleep(0.1) if f is not None: f.close() def start_background_threads(): global threads_started if threads_started: return t1 = threading.Thread(target=loop_pwm) t2 = threading.Thread(target=loop_log) t1.daemon = True t2.daemon = True t1.start() t2.start() threads_started = True def apply_config(data): changed = {} if "amp" in data: try: amp = float(data.get("amp")) except: return False, "amplitud invalida" amp = max(-250.0, min(250.0, amp)) with state_lock: state["amp"] = amp changed["amp"] = amp if "frq" in data: try: frq = float(data.get("frq")) except: return False, "frecuencia invalida" frq = max(0.0, min(10.0, frq)) with state_lock: state["frq"] = frq changed["frq"] = frq if "typ" in data: try: typ = int(float(data.get("typ"))) except: return False, "tipo invalido" typ = max(1, min(3, typ)) with state_lock: state["typ"] = typ changed["typ"] = typ return True, changed def stop_experiment_internal(): with state_lock: state["run"] = False state["record"] = False state["busy"] = False state["exp_start"] = 0.0 state["exp_owner"] = "" state["pwm_a"] = 512.0 state["pwm_b"] = 512.0 state["pwm_c"] = 512.0 center() @app.route("/api/status", methods=["GET"]) def api_status(): return json_response({ "ok": True, "state": get_state_copy() }) @app.route("/api/read_now", methods=["GET"]) def api_read_now(): vals = read_adc8() raw1 = vals[3] raw2 = vals[6] raw3 = vals[7] update_theta_from_adc(raw1, raw2, raw3, LOG_PERIOD_SEC) out = get_state_copy() out["A137"] = vals[0] out["A138"] = vals[1] out["A139"] = vals[2] out["A140"] = vals[3] out["A141"] = vals[4] out["A142"] = vals[5] out["A143"] = vals[6] out["A144"] = vals[7] return json_response({ "ok": True, "state": out }) @app.route("/api/config", methods=["POST"]) def api_config(): data = request.get_json(silent=True) or {} ok, result = apply_config(data) if not ok: return json_response({"ok": False, "error": result}, 400) return json_response({ "ok": True, "changed": result, "state": get_state_copy() }) @app.route("/api/start", methods=["POST"]) def api_start(): body = request.get_json(silent=True) or {} if not experiment_lock.acquire(False): return json_response({ "ok": False, "error": "ya hay un experimento en ejecucion", "state": get_state_copy() }, 409) try: ok, result = apply_config(body) if not ok: experiment_lock.release() return json_response({"ok": False, "error": result}, 400) owner = getattr(request, "auth_key_id", "") with state_lock: state["busy"] = True state["exp_owner"] = owner state["exp_start"] = time.time() state["run"] = True state["record"] = True experiment_lock.release() return json_response({ "ok": True, "msg": "experimento iniciado", "state": get_state_copy() }) except Exception as e: try: experiment_lock.release() except: pass return json_response({ "ok": False, "error": str(e), "state": get_state_copy() }, 500) @app.route("/api/stop", methods=["POST"]) def api_stop(): if not SCIPY_OK: stop_experiment_internal() return json_response({ "ok": False, "error": "scipy no esta instalado, no se puede generar .mat", "state": get_state_copy() }, 500) with state_lock: busy = state["busy"] last_file_before = state["last_file"] if not busy: stop_experiment_internal() if last_file_before: mat_path, err = build_mat_file_from_log(last_file_before) if err: return json_response({ "ok": False, "error": err, "state": get_state_copy() }, 500) base = os.path.splitext(os.path.basename(last_file_before))[0] return send_file( mat_path, as_attachment=True, attachment_filename=base + ".mat", mimetype="application/octet-stream" ) return json_response({ "ok": True, "msg": "no habia experimento activo", "state": get_state_copy() }) stop_experiment_internal() time.sleep(0.4) with state_lock: last_file = state["last_file"] if not last_file: return json_response({ "ok": False, "error": "no se genero archivo de log", "state": get_state_copy() }, 500) mat_path, err = build_mat_file_from_log(last_file) if err: return json_response({ "ok": False, "error": err, "file": last_file, "state": get_state_copy() }, 500) base = os.path.splitext(os.path.basename(last_file))[0] try: experiment_lock.release() except: pass return send_file( mat_path, as_attachment=True, attachment_filename=base + ".mat", mimetype="application/octet-stream" ) @app.route("/api/files", methods=["GET"]) def api_files(): return json_response({ "ok": True, "files": list_log_files() }) @app.route("/api/data", methods=["GET"]) def api_data(): name = request.args.get("file", "") if not name: return json_response({"ok": False, "error": "falta archivo"}, 400) safe_name = os.path.basename(name) path = os.path.join(LOG_DIR, safe_name) if not os.path.isfile(path): return json_response({"ok": False, "error": "archivo no encontrado"}, 404) result = parse_log_file(path) result["file"] = safe_name return json_response({"ok": True, "data": result}) @app.route("/api/export_mat", methods=["GET"]) def api_export_mat(): if not SCIPY_OK: return Response("scipy no esta instalado", mimetype="text/plain", status=500) name = request.args.get("file", "") if not name: return Response("falta archivo", mimetype="text/plain", status=400) safe_name = os.path.basename(name) mat_path, err = build_mat_file_from_log(safe_name) if err: return Response(err, mimetype="text/plain", status=404) base = os.path.splitext(safe_name)[0] return send_file( mat_path, as_attachment=True, attachment_filename=base + ".mat", mimetype="application/octet-stream" ) @atexit.register def _shutdown(): try: close_all() except: pass if __name__ == "__main__": start_background_threads() app.run(host="0.0.0.0", port=5000, debug=False, threaded=True, use_reloader=False)