ひよこエンジニアの2人にお題を出そう! サイボウズ6のタイムカード入力で楽をする
Aug 14

天下一プログラマー運営委員のinada-nです。
本戦後しばらくお待たせしてしまいましたが、本戦・予選の問題と解説、回答例を順不同で記事にしていこうと思います。

いきなりですが、決勝戦の問題です。

問題

20階建てのビルに、10人乗りのエレベータが一機ある。
このエレベータは、人数に関係なく一回の乗降に10秒かかり、
1階あたり1秒の速度で昇降する。
a 秒時点で b 階に c 階へ行きたい ddd さんが来る
(a秒時点に、c階へ行きたいdddさんがb階に来る) という情報を、

  a, b, c, ddd

というcsv形式で与える。このデータは時刻aで昇順にソートされており、
同時に二人の人物が来た場合は2行に分けられる。

これを元に、入力に出現した全員を目的の階に降ろすまでのエレベータの昇降と
人物の乗降を、各人の所要時間の合計が最小になるようにスケジューリングし、
e 秒時点に f 階で ggg, hhh, iii さんが乗降するというデータを

  e, f, ggg, hhh, iii

という形式で出力せよ。 e は乗降開始の時刻であり、同時に乗降する人物は
一列にまとめよ。出力は e で昇順にソートせよ。同一人物は出力の中に二度
出現し、一度目は乗車、二度目は降車を表す必要がある。

解等フォームには、規定された出力と、各人の所要時間の合計を入力せよ。

なお、各変数の定義域と値域は次の通りである.

* 時点をあらわす a, e は、スケジュール開始時点を 0 とする非負の整数である.
  値域は、 [1, 10000] の範囲である。(スケジュール開始と同時には人は来ない)
* 階をあらわす b, c, f は、1階を1, 20階を20とする正の整数である. 値域は
  [1, 20] である。
* 人物を表す ddd, ggg, hhh, iii は、asciiのアルファベット小文字3つで構成
  される三文字の文字列である。 aaa, aab, ... ,zzz の値をとる。

また、詳細な条件は以下の通りである。

* 入力には、同一人物は一度しか出現しない。
* 出力では、同一人物を必ず二回出現させる必要がある。目的の階に到着するとは
  目的の階で降車することであり、出発階以外の階からの乗車や目的階以外の階への
  降車は認めない。
* スケジューリング開始時点 (時刻 0[sec]) に、エレベータは1階で、すぐに移動可能
  な状態で待機している。 (たとえば、時刻 2[sec] に 3階 で乗降を開始することが出来る)
* 乗降は、乗降終了時点 (=移動開始時点) まで可能である。
  たとえば、20[sec] 時点に 10階に出現する abc が、 10[sec] 時点に
  10 階で乗降を開始したエレベータに乗車し、30[sec] 時点で 20 階で
  降車することが可能である。この場合、abcの所要時間とは、降車時点の30
  から出現時点の20[sec]を引いた10[sec]になる。

入力データはこちらになります。
http://tenka1.klab.jp/resources/schedule.csv

この問題、実は最初はもっと複雑なルールと大きなデータに対してより良い解を提出したチームの勝ちとするつもりだったのですが、時間等を考慮した結果小さいデータに対して最適解を求めるルールにしました。

データ数が少ないため、特別な工夫はあまり必要なく、1時間という限られた時間とプレッシャーの中で正しく状態の表現や遷移を設計・実装できるかという勝負になります。

まずは、私が自分で解いたコードは以下になります。最適解から順番に良い解を出力するプログラムになっているので、実行する場合は数秒でCtrl-Cで止めるなどしてください。

#!/usr/bin/env python
# coding: utf-8

import csv
from heapq import *
from itertools import *
from functools import *
from collections import *
from pprint import pprint
from time import time
import sys

Person = namedtuple('Person', 'time from_floor to_floor name')

DATA_FILE = 'elevator_data.txt'

FLOOR_RANGE = xrange(1, 21)
DATA = None
DATA_LEN = 0
STOP_TIME = 10 # 最低停止時間
RIDE_LIMIT = 10

def load_data():
    return [Person(int(r[0]), int(r[1]), int(r[2]), r[3].strip())
            for r in csv.reader(open(DATA_FILE)) if len(r) == 4]

class State(object):
    __slots__ = "next_data_index current_time current_floor in_elevator in_floor total_cost prev_state".split()

    def make_child(self):
        return State(self.next_data_index,
                     self.current_time,
                     self.current_floor,
                     self.in_elevator[:],
                     self.in_floor[:],
                     self.total_cost,
                     self)

    def __init__(self, next_data_index=0, current_time=0, current_floor=1, in_elevator=[], in_floor=[], total_cost=0, prev_state=None):
        self.next_data_index = next_data_index
        self.current_time = current_time
        self.current_floor = current_floor
        self.in_elevator = in_elevator
        self.in_floor = in_floor
        self.total_cost = total_cost
        self.prev_state = prev_state

    def __str__(self):
        s = ["%d[sec] %d[floor] %d[cost] elevator(" % (
             self.current_time, self.current_floor, self.total_cost)]
        s.append(','.join(p.name for p in self.in_elevator))
        s.append(') floor(')
        s.append(','.join(p.name for p in self.in_floor))
        s.append(')')
        return ''.join(s)

    __repr__ = __str__

    def goal(self):
        return (self.next_data_index >= DATA_LEN and
                not self.in_elevator and
                not self.in_floor)

    def __cmp__(self, other):
        # これで小さいと判断された状態が優先して探索される.
        # ここを改良するだけで A* に変更可能(だけどコスト計算ムズー)
        ret = self.total_cost - other.total_cost   # cost は低いほうを優先
        if ret:
            return ret
        ret = other.current_time - self.current_time # 時間は進んでいるほう
        return ret

    def next_states(self):
        states = []

        # 動かずに人が来るのを待つ場合
        next_state = self.make_child()
        for j in xrange(self.next_data_index, DATA_LEN):
            d = DATA[j]
            if d.from_floor != self.current_floor:
                next_state.in_floor.append(d)
            else:
                next_state.in_elevator.append(d)
                next_state.next_data_index = j+1
                next_state.current_time = d.time
                time_diff = next_state.current_time - self.current_time

                for p in next_state.in_elevator:
                    next_state.total_cost += min(time_diff, next_state.current_time-p.time)
                for p in next_state.in_floor:
                    next_state.total_cost += min(time_diff, next_state.current_time-p.time)
                states.append(next_state)
                break

        # 昇降する場合
        for to_floor in FLOOR_RANGE:
            if self.current_floor == to_floor:
                continue
            do_something = False
            next_time = self.current_time + STOP_TIME + abs(to_floor - self.current_floor)
            time_diff = next_time - self.current_time
            next_state = self.make_child()

            next_state.current_time = next_time
            next_state.current_floor = to_floor

            # エレベータに乗っている人のコスト計算と降車処理.
            for p in self.in_elevator:
                if p.to_floor == to_floor:
                    next_state.total_cost += (time_diff - STOP_TIME)
                    next_state.in_elevator.remove(p)
                    do_something = True
                else:
                    next_state.total_cost += time_diff

            # フロアにいる人のコスト計算と乗車処理.
            for p in self.in_floor:
                next_state.total_cost += time_diff
                if p.from_floor == to_floor and len(next_state.in_elevator) < RIDE_LIMIT:
                    next_state.in_floor.remove(p)
                    next_state.in_elevator.append(p)
                    do_something = True

            # フロアに到着した人の(ry
            j = DATA_LEN
            for j in xrange(next_state.next_data_index, DATA_LEN):
                d = DATA[j]
                if d.time > next_time:
                    next_state.next_data_index = j
                    break
                next_state.total_cost += next_time - d.time

                if d.from_floor == to_floor and len(next_state.in_elevator) < RIDE_LIMIT:
                    next_state.in_elevator.append(d)
                    do_something = True
                else:
                    next_state.in_floor.append(d)
            else:
                next_state.next_data_index = j+1

            if not do_something:
                # 何もすることが無い場合、そのフロアに次の人が来るのを待って乗せる.
                for j in xrange(next_state.next_data_index, DATA_LEN):
                    d = DATA[j]
                    if d.from_floor == to_floor:
                        base_time = next_time
                        time_diff = d.time - base_time
                        next_state.current_time = next_time = d.time
                        next_state.next_data_index = j+1

                        # 追加コスト計算
                        next_state.total_cost += time_diff * len(next_state.in_elevator)
                        next_state.in_elevator.append(d)

                        for p in next_state.in_floor:
                            next_state.total_cost += min(time_diff, next_time - p.time)

                        break
                    else:
                        next_state.in_floor.append(d)
                else:
                    continue # 本気で何もすることが無い場合はスキップ
            states.append(next_state)

        return states

def _init():
    global DATA
    global DATA_LEN
    DATA = load_data()
    DATA_LEN = len(DATA)

def search():
    queue = [State()]
    cnt = 0
    while queue:
        state = heappop(queue)
        cnt += 1
        if cnt % 100 == 0:
            print >>sys.stderr, state
        if state.goal():
            yield state
        else:
            for s in state.next_states():
                heappush(queue, s)

def print_result(result):
    u"""結果を規定されたフォーマットで出力する"""
    print result[-1].total_cost
    for prev,curr in zip(result, result[1:]):
        time = curr.current_time
        floor = curr.current_floor
        if floor != prev.current_floor:
            time_s = curr.current_time - STOP_TIME
        else:
            time_s = time

        res_dic = defaultdict(list)
        for p in prev.in_elevator:
            if p not in curr.in_elevator:
                res_dic[time_s].append(p.name)

        for p in curr.in_elevator:
            if p not in prev.in_elevator:
                if p.time < time_s:
                    res_dic[time_s].append(p.name)
                else:
                    assert p.time <= time
                    res_dic[p.time].append(p.name)

        for t in sorted(res_dic.keys()):
            res = [str(t), str(floor)] + res_dic[t]
            print ", ".join(str(x) for x in [t, floor]+res_dic[t])

def main():
    _init()
    pprint(DATA, stream=sys.stderr)
    print >>sys.stderr
    for result in search():
        lis = []
        while result:
            lis.append(result)
            result = result.prev_state
        lis.reverse()

        for s in lis:
            print >>sys.stderr, s
        print '-----'
        print_result(lis)

if __name__ == '__main__':
    import gc
    gc.disable()
    start = time()
    main()
    end = time()
    print >>sys.stderr, (end-start), '[sec]'

探索自体はシンプルな best first search になっています。本体ループは search() 関数の部分で、状態を priority queue に放り込んで、一番コストが小さい状態を取り出しては次の状態を展開するだけです。ここは特に難しくありませんね。

本題はStateクラスです。Stateクラスには、現時刻・エレベータ内にいる人、エレベータを待っている人、エレベータの現在位置、その状態までの合計コストを持っています。next_states()メソッドを見れば判るように、ある状態から「そのままの階にいて次に人が来るのを待つ」か「別の階に移動して10秒待つ」した状態を次の状態と定義することによって、Stateクラスが常に移動可能な状態だけを表現するようにして10秒ルールをできるだけシンプルに実装しています。

それでは、この問題を見事43分で正答された優勝者のコードを見てみましょう。

import static java.lang.Math.*;
import static java.util.Arrays.*;
import java.util.*;

public class Main {
	public static void main(String[] args) {
		new Main().run();
	}
	String[] ss = {
			"1, 16, 1, aaa",
			"20, 20, 10, aab",
			"75, 1, 6, aac",
			"101, 15, 1, aad",
			"127, 13, 1, aae",
			"156, 13, 1, aaf",
			"185, 4, 1, aag",
			"213, 17, 10, aah",
			"226, 1, 7, aai",
			"286, 1, 5, aaj",
			"302, 16, 10, aak",
			"373, 8, 18, aal",
			"432, 13, 1, aam",
			"498, 12, 10, aan",
			"573, 13, 1, aao",
	};
	int N = ss.length;
	void run() {
		times = new int[N];
		from = new int[N];
		to = new int[N];
		name = new String[N];
		for (int i = 0; i < N; i++) {
			String[] s = ss[i].split("[, ]+");
			times[i] = Integer.parseInt(s[0]);
			from[i] = Integer.parseInt(s[1]);
			to[i] = Integer.parseInt(s[2]);
			name[i] = s[3];
		}
		TreeSet set = new TreeSet();
		for (int i = 0; i < N; i++) {
			set.add(from[i]);
			set.add(to[i]);
		}
		set.add(1);
		ps = toi(set.toArray(new Integer[0]));
		set.clear();
		for (int i = 0; i < N; i++) {
			set.add(times[i]);
		}
		ts = toi(set.toArray(new Integer[0]));
		State res = astar();
		res.output();
	}

	int[] times;
	int[] from;
	int[] to;
	String[] name;
	int[] ps;
	int[] ts;

	class State implements Comparable {
		int d, h;
		List output;
		int in;
		int out;
		int time;
		int pos;
		State() {
			d = 0;
			output = new ArrayList();
			in = 0;
			out = 0;
			time = 0;
			pos = 1;
			calcH();
		}
		State(int d, int in, int out, int time, int pos, List output) {
			this.d = d;
			this.in = in;
			this.out = out;
			this.time = time;
			this.pos = pos;
			this.output = output;
			calcH();
		}
		void calcH() {
			h = 0;
			for (int i = 0; i < N; i++) if ((out >> i & 1) == 0 && times[i] < time) {
				h += time - times[i];
			}
			for (int i = 0; i < N; i++) if ((out >> i & 1) == 0 && (in >> i & 1) == 0) {
				h += 10 + abs(to[i] - from[i]);
			}
		}
		boolean ok() {
			return out == (1 << N) - 1;
		}
		List next() {
			List list = new Stack();
			for (int p : ps) {
				int t = time + abs(p - pos);
				State s = next(t, t + 10, p);
				if (s != null) list.add(s);
				for (int t2 : ts) if (t2 > t + 10) {
					s = next(t, t2, p);
					if (s != null) list.add(s);
				}
			}
			return list;
		}
		State next(int t, int t2, int p) {
			int in2 = in;
			int out2 = out;
			int d2 = d;
			List list = new ArrayList();
			List output2 = new ArrayList(output);
			for (int i = 0; i < N; i++) if ((in >> i & 1) != 0 && to[i] == p) {
				in2 ^= 1 << i;
				out2 ^= 1 << i;
				d2 += t - times[i];
				list.add(i);
			}
			if (list.size() > 0) {
				String str = String.format("%d, %d", t, p);
				for (int i : list) str += ", " + name[i];
				output2.add(str);
			}
			list.clear();
			for (int i = 0; i < N; i++) if ((in >> i & 1) == 0 && ((out >> i) & 1) == 0 && from[i] == p && times[i] <= t2) {
				in2 ^= 1 << i;
				list.add(i);
			}
			if (list.size() > 0) {
				String str = String.format("%d, %d", t2, p);
				for (int i : list) str += ", " + name[i];
				output2.add(str);
			}
			if (in == in2 && out == out2) return null;
			return new State(d2, in2, out2, t2, p, output2);
		}
		void output() {
			System.out.println(d);
			for (String s : output) System.out.println(s);
			System.out.println();
		}
		public int compareTo(State o) {
			return (d + h) - (o.d + o.h);
		}
		public String toString() {
			return String.format("time=%d, pos=%d, d=%d, in=%d, out=%d", time, pos, d, Integer.bitCount(in), Integer.bitCount(out));
		}
	}

	State astar() {
		PriorityQueue que = new PriorityQueue();
		que.offer(new State());
		while (!que.isEmpty()) {
			State s = que.poll();
			if (s.ok()) return s;
			debug(s);
			List next = s.next();
			for (State t : next) que.offer(t);
		}
		return null;
	}

	int[] toi(Integer[] Is) {
		int n = Is.length;
		int[] is = new int[n];
		for (int i = 0; i < n; i++) is[i] = Is[i];
		return is;
	}
	void debug(Object...os) {
		System.err.println(deepToString(os));
	}
}

aster() 部分が A* 探索のメインループになっていますね。私の search() 関数と同じです。

A* 探索はbest first searchの一種です。A*になっていない私のプログラムとの違いは、状態のコストを比較するときにその状態になるまでにかかったコストだけで比較するか、その状態からゴールに到達するまでのコストの下界も足したコストを比較するかだけなので、違いが出るのはメインループではなく比較関数になります。(もちろん、A*の方が最適解に到達するまでに展開する状態数が少なくて済むので効率的です)

では、状態の中身をみてみましょう。メンバ d, h がそれぞれその状態になるまでにかかったコストと将来のコストの予測値ですね。timeが現時刻でposがエレベータの位置、inとoutはそれぞれエレベータに乗っている人と目的階に到達した人を各ビットで表現しています。

next()メソッドがある状態から次の状態を展開するところですが、私が各状態で出現している人全員のコストを計算しているのに対して、ゴールする人のコストのみを計算することで圧倒的にシンプルになっています。もちろん最終的には全員がゴールしないといけないので、これで正しい答えにたどり着けます。

ゴールしていない人のコストは、出現していない人のコストと一緒にcalcH()で計算しています。私のプログラムよりも圧倒的にシンプルに表現できています。

とても簡潔で効率的なプログラムで、厳しい時間制限とプレッシャーの中で書かれたとは思えません。感服しました。

Trackback URL

Leave a Reply