何かを書き留める何か

数学や読んだ本について書く何かです。最近は社会人として生き残りの術を学ぶ日々です。

『 Linuxプログラミングインタフェース』を気合いで読む会 第2回:ファイルIO後半戦

l is for long int

Linuxプログラミングインタフェース』4章の後半戦としてlseek()を使ったプログラミングを行う。 lseek()はファイルオフセットを操作するシステムコールである。 名前のllongの意味で、引数offsetと返り値がlongであった名残だそうである(現在はoff_t型)。 初期のUNIXにはint型のseek()が実装され、long long型のllseek()seek64()も存在するらしい。

$ man -k seek
_llseek (2)          - reposition read/write file offset
fseek (3)            - reposition a stream
fseeko (3)           - seek to or report file position
ftello (3)           - seek to or report file position
llseek (2)           - reposition read/write file offset
lseek (2)            - reposition read/write file offset
lseek64 (3)          - reposition 64-bit read/write file offset
seekdir (3)          - set the position of the next readdir() call in the directory stream.

さて、今回実装するプログラムは、操作するファイルとIOコマンドを渡すとその通りの動作するプログラミングである。 IOコマンドは以下の4つであり、コマンドとパラメータの間のスペースは実際は存在しない。

  • s offsetファイルの先頭からoffsetバイトだけファイルのシークを行う
  • r length 現在位置からlength分読み取ってテキスト表示する
  • R length 現在位置からlength分読み取って16進数表示する
  • w str 現在位置へ文字列strを書き込む

Python

Pythonosモジュールにはos.lseek()がある。 使い方もC言語版(つまりシステムコール)と同じである。

import os
import sys
import stat
import typing


def _seek(fd: int, param):
    try:
        offset = int(param)
        os.lseek(fd, offset, os.SEEK_SET)
    except OSError:
        print("cloud not seek file.")
        return os.EX_IOERR
    else:
        print(f"seek {offset} bytes successed.")


def _write(fd: int, param):
    try:
        write_str = param.encode()
        written_bytes_len = os.write(fd, write_str)
    except OSError:
        print("cloud not seek file.")
        return os.EX_IOERR
    else:
        print(f"wrote {written_bytes_len} bytes.")


def _read_as_str(fd: int, param):
    length = int(param)
    read_bytes = os.read(fd, length)
    if len(read_bytes) == 0:
        print(f"end-of-file.")
    else:
        print(read_bytes)


def _read_as_hex(fd: int, param):
    length = int(param)
    read_bytes = os.read(fd, length)
    if len(read_bytes) == 0:
        print(f"end-of-file.")
    else:
        print(read_bytes.hex())


def seek_file(file: str, operatons: typing.List[str]):
    try:
        open_flags = os.O_RDWR | os.O_CREAT
        open_mode = (
            stat.S_IRUSR
            | stat.S_IWUSR
            | stat.S_IRGRP
            | stat.S_IWGRP
            | stat.S_IROTH
            | stat.S_IWOTH
        )
        fd = os.open(file, open_flags, open_mode)
    except FileNotFoundError:
        print(f"No such file: {file}.")
        return os.EX_NOINPUT
    except PermissionError:
        print(f"input {file} can not open.")
        return os.EX_NOPERM

    for operatoin in operatons:
        cmd, param = operatoin[0], operatoin[1:]
        cmds = {"s": _seek, "w": _write, "r": _read_as_str, "R": _read_as_hex}
        if cmd not in cmds:
            print(f"Argument must start with [{cmds.keys()}].")
        else:
            cmds[cmd](fd, param)

    try:
        os.close(fd)
    except OSError:
        print("cloud not close input file.")
        return os.EX_IOERR

    return os.EX_OK


def main():
    if len(sys.argv) < 3:
        print(f"Usage: {__file__} file {{r<length>|R<length>|w<string>|s<offset>}}")
        sys.exit(-1)
    f, operatons = sys.argv[1], sys.argv[2:]
    ret_code = seek_file(f, operatons)
    sys.exit(ret_code)


if __name__ == "__main__":
    main()

よりPythonらしくかこうと思うならば、以下の通り。 ファイルシークもopen()で開いたIOストリームのメソッドseek()を使うべきである。

import io
import os
import sys
import typing


def _seek(f: io.IOBase, param: str):
    try:
        offset = int(param)
        f.seek(offset, os.SEEK_SET)
    except OSError:
        print("cloud not seek file.")
        return os.EX_IOERR
    else:
        print(f"seek {offset} bytes successed.")


def _write(f: io.IOBase, param: str):
    try:
        write_str = param.encode()
        written_bytes_len = f.write(write_str)
    except OSError:
        print("cloud not seek file.")
        return os.EX_IOERR
    else:
        print(f"wrote {written_bytes_len} bytes.")


def _read_as_str(f: io.IOBase, param: str):
    length = int(param)
    read_bytes = f.read(length)
    if len(read_bytes) == 0:
        print(f"end-of-file.")
    else:
        print(read_bytes)


def _read_as_hex(f: io.IOBase, param: str):
    length = int(param)
    read_bytes = f.read(length)
    if len(read_bytes) == 0:
        print(f"end-of-file.")
    else:
        print(read_bytes.hex())


def seek_file(file: str, operatons: typing.List[str]):
    cmds = {"s": _seek, "w": _write, "r": _read_as_str, "R": _read_as_hex}
    try:
        with open(file, "r+b") as f:
            for operatoin in operatons:
                cmd, param = operatoin[0], operatoin[1:]
                if cmd not in cmds:
                    print("Argument must start with [swrR]")
                else:
                    cmds[cmd](f, param)
    except FileNotFoundError:
        print(f"No such file: {file}.")
        return os.EX_NOINPUT
    except PermissionError:
        print(f"input {file} can not open.")
        return os.EX_NOPERM
    else:
        return os.EX_OK


def main():
    if len(sys.argv) < 3:
        print(f"Usage: {__file__} file {{r<length>|R<length>|w<string>|s<offset>}}")
        sys.exit(-1)
    f, operatons = sys.argv[1], sys.argv[2:]
    ret_code = seek_file(f, operatons)
    sys.exit(ret_code)


if __name__ == "__main__":
    main()

Rust

std::io::SeekFromを使う。 file.seek(SeekFrom::Current(offset))?;とすればいい。

use std::fs::OpenOptions;
use std::io::Read;
use std::io::Seek;
use std::io::SeekFrom;
use std::io::Write;

fn seek_io(file: &mut std::fs::File, operations: &[String]) -> std::io::Result<()> {
    for operation in operations {
        let cmd = operation.chars().nth(0).unwrap();
        let param = &operation[1..];

        if cmd == 's' {
            let offset: i64 = param.parse().unwrap();
            file.seek(SeekFrom::Current(offset))?;
        } else if cmd == 'w' {
            write!(file, "{}", param)?;
        } else if cmd == 'r' {
            let length: u64 = param.parse().unwrap();
            let mut buf = String::new();
            file.take(length).read_to_string(&mut buf)?;
            println!("{:?}", buf);
        } else if cmd == 'R' {
            let length: u64 = param.parse().unwrap();
            let mut buf = Vec::new();
            file.take(length).read_to_end(&mut buf)?;
            let hex_string = buf.iter().map(|n| format!("{:02X}", n)).collect::<String>();
            println!("{:X?}", hex_string);
        }
    }

    Ok(())
}

fn main() -> std::io::Result<()> {
    let mut _args = Vec::new();

    for arg in std::env::args().skip(1) {
        _args.push(arg);
    }

    if _args.len() < 2 {
        writeln!(
            std::io::stderr(),
            "Usage: seek_io file {{r<length>|R<length>|w<string>|s<offset>}}."
        )
        .unwrap();
    } else {
        let mut file = OpenOptions::new()
            .read(true)
            .write(true)
            .create(true)
            .open(&_args[0])?;
        let operations = &_args[1..];

        seek_io(&mut file, &operations)?;
    }

    Ok(())
}