何かを書き留める何か

数学や読んだ本について書く何かです。最近は社会人として生き残りの術を学ぶ日々です。

『 Linuxプログラミングインタフェース』を気合いで読む会 第2回:ファイルIO後半戦

l is for long int

Linuxプログラミングインタフェース』4章の後半戦としてlseek()を使ったプログラミングを行う。 lseek()はファイルオフセットを操作するシステムコールである。 名前のllongの意味で、引数offsetと返り値がlongであった名残だそうである(現在はoff_t型)。 初期のUNIXにはint型のseek()が実装され、long long型のllseek()seek64()も存在するらしい。

$ man -k seek
_llseek (2)          - reposition read/write file offset
fseek (3)            - reposition a stream
fseeko (3)           - seek to or report file position
ftello (3)           - seek to or report file position
llseek (2)           - reposition read/write file offset
lseek (2)            - reposition read/write file offset
lseek64 (3)          - reposition 64-bit read/write file offset
seekdir (3)          - set the position of the next readdir() call in the directory stream.

さて、今回実装するプログラムは、操作するファイルとIOコマンドを渡すとその通りの動作するプログラミングである。 IOコマンドは以下の4つであり、コマンドとパラメータの間のスペースは実際は存在しない。

  • s offsetファイルの先頭からoffsetバイトだけファイルのシークを行う
  • r length 現在位置からlength分読み取ってテキスト表示する
  • R length 現在位置からlength分読み取って16進数表示する
  • w str 現在位置へ文字列strを書き込む

Python

Pythonosモジュールにはos.lseek()がある。 使い方もC言語版(つまりシステムコール)と同じである。

import os
import sys
import stat
import typing


def _seek(fd: int, param):
    try:
        offset = int(param)
        os.lseek(fd, offset, os.SEEK_SET)
    except OSError:
        print("cloud not seek file.")
        return os.EX_IOERR
    else:
        print(f"seek {offset} bytes successed.")


def _write(fd: int, param):
    try:
        write_str = param.encode()
        written_bytes_len = os.write(fd, write_str)
    except OSError:
        print("cloud not seek file.")
        return os.EX_IOERR
    else:
        print(f"wrote {written_bytes_len} bytes.")


def _read_as_str(fd: int, param):
    length = int(param)
    read_bytes = os.read(fd, length)
    if len(read_bytes) == 0:
        print(f"end-of-file.")
    else:
        print(read_bytes)


def _read_as_hex(fd: int, param):
    length = int(param)
    read_bytes = os.read(fd, length)
    if len(read_bytes) == 0:
        print(f"end-of-file.")
    else:
        print(read_bytes.hex())


def seek_file(file: str, operatons: typing.List[str]):
    try:
        open_flags = os.O_RDWR | os.O_CREAT
        open_mode = (
            stat.S_IRUSR
            | stat.S_IWUSR
            | stat.S_IRGRP
            | stat.S_IWGRP
            | stat.S_IROTH
            | stat.S_IWOTH
        )
        fd = os.open(file, open_flags, open_mode)
    except FileNotFoundError:
        print(f"No such file: {file}.")
        return os.EX_NOINPUT
    except PermissionError:
        print(f"input {file} can not open.")
        return os.EX_NOPERM

    for operatoin in operatons:
        cmd, param = operatoin[0], operatoin[1:]
        cmds = {"s": _seek, "w": _write, "r": _read_as_str, "R": _read_as_hex}
        if cmd not in cmds:
            print(f"Argument must start with [{cmds.keys()}].")
        else:
            cmds[cmd](fd, param)

    try:
        os.close(fd)
    except OSError:
        print("cloud not close input file.")
        return os.EX_IOERR

    return os.EX_OK


def main():
    if len(sys.argv) < 3:
        print(f"Usage: {__file__} file {{r<length>|R<length>|w<string>|s<offset>}}")
        sys.exit(-1)
    f, operatons = sys.argv[1], sys.argv[2:]
    ret_code = seek_file(f, operatons)
    sys.exit(ret_code)


if __name__ == "__main__":
    main()

よりPythonらしくかこうと思うならば、以下の通り。 ファイルシークもopen()で開いたIOストリームのメソッドseek()を使うべきである。

import io
import os
import sys
import typing


def _seek(f: io.IOBase, param: str):
    try:
        offset = int(param)
        f.seek(offset, os.SEEK_SET)
    except OSError:
        print("cloud not seek file.")
        return os.EX_IOERR
    else:
        print(f"seek {offset} bytes successed.")


def _write(f: io.IOBase, param: str):
    try:
        write_str = param.encode()
        written_bytes_len = f.write(write_str)
    except OSError:
        print("cloud not seek file.")
        return os.EX_IOERR
    else:
        print(f"wrote {written_bytes_len} bytes.")


def _read_as_str(f: io.IOBase, param: str):
    length = int(param)
    read_bytes = f.read(length)
    if len(read_bytes) == 0:
        print(f"end-of-file.")
    else:
        print(read_bytes)


def _read_as_hex(f: io.IOBase, param: str):
    length = int(param)
    read_bytes = f.read(length)
    if len(read_bytes) == 0:
        print(f"end-of-file.")
    else:
        print(read_bytes.hex())


def seek_file(file: str, operatons: typing.List[str]):
    cmds = {"s": _seek, "w": _write, "r": _read_as_str, "R": _read_as_hex}
    try:
        with open(file, "r+b") as f:
            for operatoin in operatons:
                cmd, param = operatoin[0], operatoin[1:]
                if cmd not in cmds:
                    print("Argument must start with [swrR]")
                else:
                    cmds[cmd](f, param)
    except FileNotFoundError:
        print(f"No such file: {file}.")
        return os.EX_NOINPUT
    except PermissionError:
        print(f"input {file} can not open.")
        return os.EX_NOPERM
    else:
        return os.EX_OK


def main():
    if len(sys.argv) < 3:
        print(f"Usage: {__file__} file {{r<length>|R<length>|w<string>|s<offset>}}")
        sys.exit(-1)
    f, operatons = sys.argv[1], sys.argv[2:]
    ret_code = seek_file(f, operatons)
    sys.exit(ret_code)


if __name__ == "__main__":
    main()

Rust

std::io::SeekFromを使う。 file.seek(SeekFrom::Current(offset))?;とすればいい。

use std::fs::OpenOptions;
use std::io::Read;
use std::io::Seek;
use std::io::SeekFrom;
use std::io::Write;

fn seek_io(file: &mut std::fs::File, operations: &[String]) -> std::io::Result<()> {
    for operation in operations {
        let cmd = operation.chars().nth(0).unwrap();
        let param = &operation[1..];

        if cmd == 's' {
            let offset: i64 = param.parse().unwrap();
            file.seek(SeekFrom::Current(offset))?;
        } else if cmd == 'w' {
            write!(file, "{}", param)?;
        } else if cmd == 'r' {
            let length: u64 = param.parse().unwrap();
            let mut buf = String::new();
            file.take(length).read_to_string(&mut buf)?;
            println!("{:?}", buf);
        } else if cmd == 'R' {
            let length: u64 = param.parse().unwrap();
            let mut buf = Vec::new();
            file.take(length).read_to_end(&mut buf)?;
            let hex_string = buf.iter().map(|n| format!("{:02X}", n)).collect::<String>();
            println!("{:X?}", hex_string);
        }
    }

    Ok(())
}

fn main() -> std::io::Result<()> {
    let mut _args = Vec::new();

    for arg in std::env::args().skip(1) {
        _args.push(arg);
    }

    if _args.len() < 2 {
        writeln!(
            std::io::stderr(),
            "Usage: seek_io file {{r<length>|R<length>|w<string>|s<offset>}}."
        )
        .unwrap();
    } else {
        let mut file = OpenOptions::new()
            .read(true)
            .write(true)
            .create(true)
            .open(&_args[0])?;
        let operations = &_args[1..];

        seek_io(&mut file, &operations)?;
    }

    Ok(())
}

『 Linuxプログラミングインタフェース』を気合いで読む会 第1回:ファイルIO前半戦

転回

Unix/Linuxプログラミング 理論と実践』をダシにしてRustのお勉強、システムプログラミングよりのPythonのお勉強をしようと思い立ったものの、『Unix/Linuxプログラミング 理論と実践』の who コマンド実装でmacOSとDocker上のUbuntu Linuxの違いで疲れ果ててしまった。 また、いい本だと聞いて積ん読になっていた『 Linuxプログラミングインタフェース』には who参照元であった utmpx 構造体について詳しく書いてあるなどうれしいことが多かったので、転回して『 Linuxプログラミングインタフェース』を読もうと思い立った。

概要

4章からシステムコールAPIの説明が始まり、最初はファイルIOから説明される。 UNIXシステムではデバイスもファイルの一種として扱うことができるので、ファイルの取り扱いは重要かつ基礎となる。 ファイルを開くとそのファイルにファイルディスクリプタを割り当てる。 ファイルIOで重要になるのが4つのシステムコールopenread, write, closeである。

最初は概観を掴むために、cpコマンドの簡易版を実装する。

Python

Pythonでは通常、ファイルを開く際は組み込み関数open()を使うが、osモジュールを駆使してシステムコールopenread, write, closeを使ったファイルオープンおよび処理もできる。ファイルモードに関してはstatモジュールも援用する。

import argparse
import os
import stat
import sys

BUF_SIZE = 1024


def copy_file(src, dst):
    try:
        src_fd = os.open(src, os.O_RDONLY)
    except FileNotFoundError:
        print(f"No such file: {src}.")
        return os.EX_NOINPUT
    except PermissionError:
        print(f"input {src} can not open.")
        return os.EX_NOPERM

    dst_flag = os.O_CREAT | os.O_WRONLY | os.O_TRUNC
    dst_mode = (
        stat.S_IRUSR
        | stat.S_IWUSR
        | stat.S_IRGRP
        | stat.S_IWGRP
        | stat.S_IROTH
        | stat.S_IWOTH
    )
    try:
        dst_fd = os.open(dst, dst_flag, dst_mode)
    except OSError:
        print(f"output {dst} can not open.")
        return os.EX_CANTCREAT

    while True:
        read_bytes = os.read(src_fd, BUF_SIZE)
        if len(read_bytes) == 0:
            break
        written_bytes_len = os.write(dst_fd, read_bytes)
        if len(read_bytes) != written_bytes_len:
            print("could not write whole buffer.")
            return os.EX_IOERR
    try:
        os.close(src_fd)
    except OSError:
        print("cloud not close input file.")
        return os.EX_IOERR
    try:
        os.close(dst_fd)
    except OSError:
        print("cloud not close output file.")
        return os.EX_IOERR

    return os.EX_OK


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("source_file")
    parser.add_argument("target_file")
    args = parser.parse_args()
    ret_code = copy_file(args.source_file, args.target_file)
    sys.exit(ret_code)


if __name__ == "__main__":
    main()

よりPythonらしくかこうと思うならば、以下の通り。 C言語でファイルを扱う際に、システムコールopenの代わりにfopenを使うのが自然であるのと同様に、Pythonでファイルを扱う際はos.openではなくopenを使うのが自然である。 また、with文を使えば自動的にファイルを閉じることができるので記述量が減り、かつ安心して書くことができる。

import argparse
import os
import sys
import stat


BUF_SIZE = 1024


def copy_file(src, dst):
    with open(src, "rb") as src_f, open(dst, "wb") as dst_f:
        while True:
            read_bytes = src_f.read(BUF_SIZE)
            if len(read_bytes) == 0:
                break
            try:
                written_bytes_len = dst_f.write(read_bytes)
            except OSError:
                print("could not write whole buffer.")
                return os.EX_IOERR
            if len(read_bytes) != written_bytes_len:
                print("could not write whole buffer.")
                return os.EX_IOERR
    try:
        dst_mode = (
            stat.S_IRUSR
            | stat.S_IWUSR
            | stat.S_IRGRP
            | stat.S_IWGRP
            | stat.S_IROTH
            | stat.S_IWOTH
        )
        os.chmod(dst, dst_mode)
    except OSError:
        print("could not change mode.")
        return os.EX_IOERR

    return os.EX_OK


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("source_file")
    parser.add_argument("target_file")
    args = parser.parse_args()
    ret_code = copy_file(args.source_file, args.target_file)
    sys.exit(ret_code)


if __name__ == "__main__":
    main()

Rust

RustのIO周りはRead, BufRead, Writeという3つのトレイトが基本となる。 closeに関しては多くのreader, writerが自動で閉じるように実装されているらしい。

以下の実装は『プログラミングRust』P.420の実装を大いに参考にしている。

use std::io::Write;

const BUF_SIZE: usize = 8 * 1024;

fn copy<R: ?Sized, W: ?Sized>(reader: &mut R, writer: &mut W) -> std::io::Result<u64>
where
    R: std::io::Read,
    W: std::io::Write,
{
    let mut buf = [0; BUF_SIZE];
    let mut written = 0;
    loop {
        let len = match reader.read(&mut buf) {
            Ok(0) => return Ok(written),
            Ok(len) => len,
            Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
            Err(e) => return Err(e),
        };
        writer.write_all(&buf[..len])?;
        written += len as u64;
    }
}

fn main() -> std::io::Result<()> {
    let mut _args = Vec::new();

    for arg in std::env::args().skip(1) {
        _args.push(arg);
    }

    if _args.len() != 2 {
        writeln!(std::io::stderr(), "Usage: copy source_file target_file").unwrap();
    } else {
        // let source_file_path = Path::new(&_args[0]);
        // let target_file_path = Path::new(&_args[1]);
        // std::fs::copy(source_file_path, target_file_path)?;
        let mut source_file = std::fs::File::open(&_args[0])?;
        let mut target_file = std::fs::File::create(&_args[1])?;
        copy(&mut source_file, &mut target_file)?;
    }

    Ok(())
}

なお、コメントにある通り、ファイルをコピーするならばstd::fs::copyを使うのが普通のようである。

『Unix/Linuxプログラミング 理論と実践』をダシにしてRustのお勉強をする会

些細なる第一歩

プログラミング言語Rustの勉強をしたいと常日頃考えていたが、中々手を出せずにいた。 ずっと仕事ではPythonを使っているが、それしかできないのは流石に幅が狭かろう、と。 少しだけ『The Rust Programming Language 2nd edition』を読んでみたが、最初の章のサンプル止まりで仕事が忙しくなって放ったらかしにしていた。 気づくとその邦訳も発売されてしまった。 『プログラミングRust』の冒頭に、何かプログラミングしながら学びましょうという旨の記述があり、それならばずっと積ん読になっていた『Unix/Linuxプログラミング 理論と実践』をダシにしてRustのお勉強、システムプログラミングよりのPythonのお勉強をしようと思い立った。このエントリはその1回目である。

第1章 Unixシステムプログラミングの全体像

Unix/Linuxプログラミング 理論と実践』の中身を詳しく書くと著作権的にアレになるので、この本の大まかな流れのみにする。

  1. 実際のプログラムを観察する
  2. 使われているシステムコールを使う
  3. 自分で書いてみる

第1章ではmoreコマンドの実装が取り上げられている。 まず、標準入力のみを使ったバージョンから。

Python版は、C言語版をそのまま翻訳したものである。

import sys
import typing

PAGELEN = 24


def main():
    if len(sys.argv[1:]) == 0:
        do_more(sys.stdin)
    else:
        for f in sys.argv[1:]:
            with open(f) as in_f:
                do_more(in_f)


def do_more(f: typing.TextIO):
    """
    PAGELEN行読み込み、ユーザーの入力を待つ。
    """

    num_of_lines: int = 0

    for line in f:
        if num_of_lines == PAGELEN:
            reply: int = see_more()
            if reply == 0:
                break
            num_of_lines -= reply
        print(line, end="")
        num_of_lines += 1


def see_more() -> int:
    """
    メッセージを出力して応答を待ち、入力値に応じて進める行数を返す。

    qならば終了、スペースならば次のページ、エンターならば次の行分進める。
    """
    print("\033[7m more? \033[m")
    c: str = input()
    if c == "q":
        return 0
    if c == " ":
        return PAGELEN
    if c == "":
        return 1
    return 0


if __name__ == "__main__":
    main()

次に、Rust版。

use std::io::BufRead;

const PAGELEN: i32 = 24;

fn main() -> std::io::Result<()> {
    let mut _args = Vec::new();

    for arg in std::env::args().skip(1) {
        _args.push(arg);
    }

    if _args.len() == 0 {
        let stdin = std::io::stdin();
        do_more(stdin.lock())?;
    } else {
        for arg in &_args {
            let file = std::fs::File::open(arg)?;
            let buffered_reader = std::io::BufReader::new(file);
            do_more(buffered_reader)?;
        }
    }
    Ok(())
}

fn do_more<R>(reader: R) -> std::io::Result<()>
where
    R: BufRead,
{
    let mut num_of_lines = 0;
    for line_result in reader.lines() {
        let line = line_result?;
        if num_of_lines == PAGELEN {
            let reply = see_more();
            if reply == 0 {
                break;
            }
            num_of_lines -= reply
        }
        println!("{}", line);
        num_of_lines += 1;
    }
    Ok(())
}

fn see_more() -> i32 {
    println!("more?");
    let mut c = String::new();
    std::io::stdin().read_line(&mut c).ok().expect("Failed");
    if c == "q\n" {
        return 0;
    }
    if c == " \n" {
        return PAGELEN;
    }
    if c == "\n" {
        return 1;
    }
    return 0;
}

これらの実装は、標準入力しか使っていないので、キーボード入力と標準入力の区別がついていない。 そこで/dev/tty経由でキーボードの入力を取得する。

Python版では標準モジュールtermiosttyを使う。 以下の記事を大いに参考にした。 qiita.com

import sys
import termios
import tty
import typing

PAGELEN = 24


def main():
    if len(sys.argv[1:]) == 0:
        do_more(sys.stdin)
    else:
        for f in sys.argv[1:]:
            with open(f) as in_f:
                do_more(in_f)


def do_more(f: typing.TextIO):
    """
    PAGELEN行読み込み、ユーザーの入力を待つ。
    """

    num_of_lines: int = 0

    for line in f:
        if num_of_lines == PAGELEN:
            reply: int = see_more()
            if reply == 0:
                break
            num_of_lines -= reply
        print(line, end="")
        num_of_lines += 1


def see_more() -> int:
    """
    メッセージを出力して応答を待ち、入力値に応じて進める行数を返す。

    qならば終了、スペースならば次のページ、エンターならば次の行分進める。
    """
    print("\033[7m more? \033[m")
    fd = sys.stdin.fileno()
    old = termios.tcgetattr(fd)
    try:
        tty.setcbreak(sys.stdin.fileno())
        c = sys.stdin.read(1)
    finally:
        termios.tcsetattr(fd, termios.TCSANOW, old)
    if c == "q":
        return 0
    if c == " ":
        return PAGELEN
    if c == "\n":
        return 1
    return 0


if __name__ == "__main__":
    main()

次に、Rust版。以下のcrateを使った。

dcuddeback.github.io

tty周りの設定はPythonのttyモジュールの設定を大いに参考にした。

use std::io::BufRead;
use std::os::unix::prelude::*;
use termios::*;

const PAGELEN: i32 = 24;

fn main() -> std::io::Result<()> {
    let mut _args = Vec::new();

    for arg in std::env::args().skip(1) {
        _args.push(arg);
    }

    if _args.len() == 0 {
        let stdin = std::io::stdin();
        do_more(stdin.lock())?;
    } else {
        for arg in &_args {
            let file = std::fs::File::open(arg)?;
            let buffered_reader = std::io::BufReader::new(file);
            do_more(buffered_reader)?;
        }
    }
    Ok(())
}

fn do_more<R>(reader: R) -> std::io::Result<()>
where
    R: BufRead,
{
    let mut num_of_lines = 0;
    for line_result in reader.lines() {
        let line = line_result?;
        if num_of_lines == PAGELEN {
            let reply = see_more();
            if reply == 0 {
                break;
            }
            num_of_lines -= reply
        }
        println!("{}", line);
        num_of_lines += 1;
    }
    Ok(())
}

fn see_more() -> i32 {
    println!("more?");
    let stdin = std::io::stdin();
    let stdin_fd = stdin.as_raw_fd();
    let mut old_termios = Termios::from_fd(stdin_fd).unwrap();
    match tcgetattr(stdin_fd, &mut old_termios) {
        Ok(termios) => termios,
        Err(_error) => panic!("I Cannot open terminal attributes!!!"),
    };

    let mut new_termios = Termios::from_fd(stdin_fd).unwrap();
    new_termios.c_lflag = new_termios.c_lflag & !(ECHO | ICANON);
    new_termios.c_cc[VMIN] = 1;
    new_termios.c_cc[VTIME] = 0;
    match tcsetattr(stdin_fd, TCSAFLUSH, &mut new_termios) {
        Ok(termios) => termios,
        Err(_error) => panic!("I Cannot set terminal attributes!!!"),
    };

    let mut c = String::new();
    std::io::stdin()
        .read_line(&mut c)
        .ok()
        .expect("I Cannot read from stdin!!!");
    match tcsetattr(stdin_fd, TCSAFLUSH, &mut old_termios) {
        Ok(termios) => termios,
        Err(_error) => panic!("I Cannot set terminal attributes!!!"),
    };

    if c == "q\n" {
        return 0;
    }
    if c == " \n" {
        return PAGELEN;
    }
    if c == "\n" {
        return 1;
    }
    return 0;
}

/dev/tty経由でキーボードの入力を取得する。」と書いたが、実は嘘である。 C言語版では/dev/ttyfopenで開いているが、上記のPythonやRustではtermiosの値を変更しているので意味合いが違う。

感想

  • システムプログラミング感はないが、第2章でfopenではないopenシステムコールが登場する予定。
  • Python版は比較的スラスラかけたが、tty周りは厳しかった。
  • Rustは非常に難儀した。まず、Result型の処理の仕方が甘い。

これからも頑張っていきたい(小並感)。