aboutsummaryrefslogtreecommitdiff
path: root/neb/std/fs.py
blob: 8500140f05f2621fec57783ed513a5a22d0df583 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
from .. import TypeEnum, Environment, Arg, Builtin, evaluate, InterpretPanic, MultiFunction
from ..structs import *
from pathlib import Path
from glob import glob
from io import UnsupportedOperation
import sys

FS = Environment()

FS.register("_stdin_", Handle(sys.stdin))
FS.register("_stdout_", Handle(sys.stdout))
FS.register("_stderr_", Handle(sys.stderr))

def interpretExists(symbol, args, env, ns):
    return Bool(Path(args[0].value).resolve().exists())

exists_func = Builtin("exists?", interpretExists, [Arg("filename", TypeEnum.STRING)], return_type=Type(":bool"))
exists_multi = MultiFunction("exists?")
exists_multi.register(exists_func)
FS.register("exists?", exists_multi)

def interpretGlob(symbol, args, env, ns):
    items = glob(args[0].value)
    return List([String(item) for item in items])

glob_func = Builtin("glob", interpretGlob, [Arg("regex", TypeEnum.STRING)], return_type=Type(":string"))
glob_multi = MultiFunction("glob")
glob_multi.register(glob_func)
FS.register("glob", glob_multi)

def interpretUnlink(symbol, args, env, ns):
    target_path = Path(args[0].value).resolve()
    if not target_path.exists():
        raise InterpretPanic(symbol, "target file does not exist", target_path)
    target_path.unlink()
    return Nil()

unlink_func = Builtin("unlink", interpretUnlink, [Arg("filename", TypeEnum.STRING)], return_type=Type(":nil"))
unlink_multi = MultiFunction("unlink")
unlink_multi.register(unlink_func)
FS.register("unlink", unlink_multi)

def interpretWrite(symbol, args, env, ns):
    string = args[0]
    handle = args[1]
    try:
        handle.file.write(string.value)
    except UnsupportedOperation:
        raise InterpretPanic(symbol, f"{handle} is not writable!")
    except ValueError:
        raise InterpretPanic(symbol, f"{handle} is closed")
    return Nil()

write_func = Builtin("write", interpretWrite, [Arg("string", TypeEnum.STRING), Arg("handle", TypeEnum.HANDLE)], return_type=Type(":nil"))
write_multi = MultiFunction("write")
write_multi.register(write_func)
FS.register("write", write_multi)

def interpretOpenRead(symbol, args, env, ns):
    name = args[0].value
    fil = Path(name)
    if not fil.exists():
        raise InterpretPanic(symbol, "file does not exist", fil)
    try:
        f = open(str(fil), "r")
    except:
        raise InterpretPanic(symbol, "cannot open {fil} for reading")
    return Handle(f)

openread_func = Builtin("open-read", interpretOpenRead, [Arg("filename", TypeEnum.STRING)], return_type=TypeEnum.HANDLE)
openread_multi = MultiFunction("open-read")
openread_multi.register(openread_func)
FS.register("open-read", openread_multi)

def interpretOpenWrite(symbol, args, env, ns):
    name = args[0].value
    fil = Path(name)
    try:
        f = open(str(fil), "w")
    except:
        raise InterpretPanic(symbol, "cannot open {fil} for writing")
    return Handle(f)

openwrite_func = Builtin("open-write", interpretOpenWrite, [Arg("filename", TypeEnum.STRING)], return_type=TypeEnum.HANDLE)
openwrite_multi = MultiFunction("open-write")
openwrite_multi.register(openwrite_func)
FS.register("open-write", openwrite_multi)

def interpretOpenAppend(symbol, args, env, ns):
    name = args[0].value
    fil = Path(name)
    try:
        f = open(str(fil), "a")
    except:
        raise InterpretPanic(symbol, "cannot open {fil} for appending")
    return Handle(f)

openappend_func = Builtin("open-append", interpretOpenAppend, [Arg("filename", TypeEnum.STRING)], return_type=TypeEnum.HANDLE)
openappend_multi = MultiFunction("open-append")
openappend_multi.register(openappend_func)
FS.register("open-append", openappend_multi)

def interpretClose(symbol, args, env, ns):
    try:
        args[0].file.close()
        # TODO ideally we'd be able to remove it from the env
        # but by this point, we don't know its symbol
        # though it may not be in the env, e.g.
        # (close (print (read (open-read "fil.txt"))))
    except:
        raise InterpretPanic(symbol, "cannot close {args[0]}")
    return Nil()

close_func = Builtin("close", interpretClose, [Arg("handle", TypeEnum.HANDLE)], return_type=Type(":nil"))
close_multi = MultiFunction("close")
close_multi.register(close_func)
FS.register("close", close_multi)

def interpretRead(symbol, args, env, ns):
    handle = args[0]
    try:
        inp = args[0].file.read()
    except UnsupportedOperation:
        raise InterpretPanic(symbol, f"{handle} is not writable!")
    except ValueError:
        raise InterpretPanic(symbol, f"{handle} is closed")
    return String(inp)

read_func = Builtin("read", interpretRead, [Arg("handle", TypeEnum.HANDLE)], return_type=TypeEnum.STRING)
read_multi = MultiFunction("read")
read_multi.register(read_func)
FS.register("read", read_multi)

def interpretReadLines(symbol, args, env, ns):
    target_file_name = args[0].value
    target_file = Path(target_file_name).resolve()
    if not target_file.exists():
        raise InterpretPanic(symbol, "no such file", target_file)
    with open(target_file, "r") as fil:
        data = fil.readlines()
    out = List([String(d) for d in data]) # all lines are strings
    return out

readlines_func = Builtin("read-lines", interpretReadLines, [Arg("filename", TypeEnum.STRING)], return_type=Type(":list"))
readlines_multi = MultiFunction("read-lines")
readlines_multi.register(readlines_func)
FS.register("read-lines", readlines_multi)