# HG changeset patch # User Bill Welliver # Date 1324792277 18000 # Node ID aee55c69457369abdebf9bbd1a0609a5346cab30 # Parent 43849dab1e8207e0e37095277e14f0352355ec1d notify optimization is partially working (an event currently triggers a full scan) also reworked some problems with database storage that was causing entries to be duplicated. record insertion should be much faster now, as well. diff -r 43849dab1e8207e0e37095277e14f0352355ec1d -r aee55c69457369abdebf9bbd1a0609a5346cab30 bin/start.sh --- a/bin/start.sh Thu Dec 22 22:56:36 2011 -0500 +++ b/bin/start.sh Sun Dec 25 00:51:17 2011 -0500 @@ -9,4 +9,4 @@ fi cd `dirname $0`/../.. - exec pike $PIKE_ARGS -x fins start tunesd $* + exec pike $PIKE_ARGS -x fins start tunesd-new $* diff -r 43849dab1e8207e0e37095277e14f0352355ec1d -r aee55c69457369abdebf9bbd1a0609a5346cab30 classes/check.pike --- a/classes/check.pike Thu Dec 22 22:56:36 2011 -0500 +++ b/classes/check.pike Sun Dec 25 00:51:17 2011 -0500 @@ -299,6 +299,7 @@ void file_exists(string p, Stdio.Stat s) { +log->debug("adding file %s", p); exists_queue->write(({p, s})); } diff -r 43849dab1e8207e0e37095277e14f0352355ec1d -r aee55c69457369abdebf9bbd1a0609a5346cab30 classes/db.pike --- a/classes/db.pike Thu Dec 22 22:56:36 2011 -0500 +++ b/classes/db.pike Sun Dec 25 00:51:17 2011 -0500 @@ -18,7 +18,7 @@ constant songs_fields = ({ ({"id", "integer primary key"}), ({"title", "string not null"}), - ({"path", "string not null"}), + ({"path", "string not null unique"}), // ({"hash", "string not null"}), ({"artist", "string"}), ({"album", "string"}), @@ -156,7 +156,7 @@ void add(mapping ent) { - log->debug("adding %O", ent); +// log->debug("adding %O", ent); change_queue->write(ent); } @@ -179,19 +179,31 @@ if(in_processing_changes) return; in_processing_changes = 1; log->info("flushing changes to db\n"); + array checks = ({}); + + sql->query("BEGIN TRANSACTION"); + while(!change_queue->is_empty()) { mapping ent = change_queue->read(); - if(has_entry(sql, ent)) {log->debug("skipping %s", ent->path); continue; } - // werror("adding " + ent->path + "\n"); - // ent->id = ++id; - if(!ent->title) ent->title = basename(ent->path); - ent->format = lower_case((ent->path/".")[-1] || "mp3"); - // ent->hash = String.string2hex(Crypto.MD5()->hash(Stdio.read_file(ent->path))); - // songs[ent->id] = ent; - ent->batch = gsc+1; - write_entry_to_db(sql, ent); - had_changes++; + checks += ({ent}); + if(sizeof(checks) >= 10 || change_queue->is_empty()) + { + checks = has_entry(sql, checks); + foreach(checks;; ent) + { + werror("adding " + ent->path + "\n"); + // ent->id = ++id; + if(!ent->title) ent->title = basename(ent->path); + ent->format = lower_case((ent->path/".")[-1] || "mp3"); + // ent->hash = String.string2hex(Crypto.MD5()->hash(Stdio.read_file(ent->path))); + // songs[ent->id] = ent; + ent->batch = gsc+1; + write_entry_to_db(sql, ent); + had_changes++; + } + checks = ({}); + } } while(!remove_queue->is_empty()) { @@ -206,31 +218,59 @@ gsc++; had_changes = 0; } + sql->query("COMMIT"); in_processing_changes = 0; } -int has_entry(Sql.Sql sql, mapping entry) -{ - array a = sql->query("select * from songs where path = %s", entry->path); - if(sizeof(a)) return 1; - else return 0; +array has_entry(Sql.Sql sql, array(mapping) entry) +{ + array paths = ({}); + + foreach(entry;;mapping e) + { + paths += ({ "'" + sql->quote(e->path) + "'" }); + } + +// werror("query: %O\n", query); + array a = sql->query("select path from songs where path IN(" + ( paths * "," ) + ")"); + + if(sizeof(a) == sizeof(entry)) return ({}); + + array res = ({}); + + foreach(entry;;mapping e) + { + int hadit = 0; + foreach(a;; mapping row) + { +//werror("%O, %O\n", e->path, row->path); + if(e->path == row->path) + { + hadit++; + break; + } + } + if(!hadit) res += ({e}); +// else werror("disqualifying " + e->path + "\n"); + } + return res; } void write_entry_to_db(Sql.Sql sql, mapping entry) { - array vc = ({}); foreach(entry; string key; mixed val) { // if(!val) m_delete(entry, key); if(intp(val)) - vc += ({"%d"}); + vc += ({(string)val}); else - vc += ({"%s"}); + vc += ({"'" + sql->quote(val) + "'"}); } string q = "INSERT INTO songs (" + (indices(entry)* ", ") + ", added) VALUES(" + (vc * ", ") + ", 'now')"; +//werror("QUERY: %O\n", q); //werror(q + sprintf("%O\n", values(entry))); - sql->query(q, @values(entry)); + sql->query(q /*, @values(entry)*/); // werror("failed to write entry for %s: %O\n", entry->path, entry); } diff -r 43849dab1e8207e0e37095277e14f0352355ec1d -r aee55c69457369abdebf9bbd1a0609a5346cab30 classes/server.pike --- a/classes/server.pike Thu Dec 22 22:56:36 2011 -0500 +++ b/classes/server.pike Sun Dec 25 00:51:17 2011 -0500 @@ -47,7 +47,7 @@ void start() { musicpath = replace(config["music"]["path"], "$HOME", getenv()["HOME"]); - +werror("********\n*******\n"); // the db is actually loaded by fins into "model", but for the sake of code already written, we keep db as an alias. db = model; model->start(); @@ -60,9 +60,10 @@ { db = model; bonjour = Protocols.DNS_SD.Service(db->get_name(), +// "_daap._tcp", "", (int)8001); "_daap._tcp", "", (int)__fin_serve->my_port); - log->info("Advertising tunesd via Bonjour."); + log->info("Advertising tunesd/DAAP via Bonjour."); } diff -r 43849dab1e8207e0e37095277e14f0352355ec1d -r aee55c69457369abdebf9bbd1a0609a5346cab30 config/dev.cfg --- a/config/dev.cfg Thu Dec 22 22:56:36 2011 -0500 +++ b/config/dev.cfg Sun Dec 25 00:51:17 2011 -0500 @@ -6,7 +6,7 @@ [model] class=db debug=1 -datasource=sqlite://tunesd/config/tunesd.sqlite3 +datasource=sqlite://tunesd-new/config/tunesd.sqlite3 [controller] class=controller @@ -20,4 +20,7 @@ class=server [music] -path=$HOME/Music/iTunes/iTunes Media/Music +path=/c/media/iTunes/iTunes Music + +[web] +port=3689 diff -r 43849dab1e8207e0e37095277e14f0352355ec1d -r aee55c69457369abdebf9bbd1a0609a5346cab30 config/log_dev.cfg --- a/config/log_dev.cfg Thu Dec 22 22:56:36 2011 -0500 +++ b/config/log_dev.cfg Sun Dec 25 00:51:17 2011 -0500 @@ -5,7 +5,7 @@ [logger.default] appender=default_console appender=default_debuglog -level=INFO +level=DEBUG [logger.access] appender=access_log @@ -16,6 +16,11 @@ appender=default_console appender=default_debuglog +[logger.scanner] +level=DEBUG +appender=default_console +appender=default_debuglog + # this is the base logger for fins [logger.fins] level=INFO diff -r 43849dab1e8207e0e37095277e14f0352355ec1d -r aee55c69457369abdebf9bbd1a0609a5346cab30 modules/FS.pmod/Monitor.pmod/basic.pike --- a/modules/FS.pmod/Monitor.pmod/basic.pike Thu Dec 22 22:56:36 2011 -0500 +++ b/modules/FS.pmod/Monitor.pmod/basic.pike Sun Dec 25 00:51:17 2011 -0500 @@ -4,6 +4,10 @@ #define HAVE_EVENTSTREAM 1 #endif +#if constant(Public.System.___Inotify) +#define HAVE_INOTIFY 1 +#endif + #if HAVE_EVENTSTREAM import Public.System; FSEvents.EventStream eventstream = FSEvents.EventStream(({}), 3.0, FSEvents.kFSEventStreamEventIdSinceNow, FSEvents.kFSEventStreamCreateFlagNone); @@ -18,7 +22,43 @@ } else check(0); } -#endif +#elseif HAVE_INOTIFY +import Public.System; + object instance; + object file; + + void inotify_parse(mixed id, string data) + { + while (sizeof(data)) { + array a; + mixed err = catch { + a = Inotify.parse_event(data); + }; + + if (err) { + // TODO: might have a partial even struct here which gets completed + // by the next call?? maybe add an internal buffer. + werror("Could not parse inotify event: %s\n", describe_error(err)); + return; + } + string path; +werror("%O\n", a); + path = a[3]; + if(path && monitors[path]) + { +werror("inotify: %O\n", path); + monitors[path]->check(0); + } + else + { check(0); // no need to look at the others if we're going to do a full scan. + return; + } + + data = data[a[4]..]; + } + } + +#endif /* HAVE_EVENTSTREAM */ // // Basic filesystem monitor. @@ -197,6 +237,10 @@ int last_change = 0x7fffffff; // Future... array(string) files; +#ifdef HAVE_INOTIFY + int wd; +#endif + int `<(mixed m) { return next_poll < m; } int `>(mixed m) { return next_poll > m; } @@ -216,7 +260,23 @@ eventstream->stop(); eventstream->add_path(path); eventstream->start(); -#endif +#elseif HAVE_INOTIFY +//werror("***** inotify\n"); + wd = instance->add_watch(path, + Inotify.IN_MOVED_FROM | Inotify.IN_UNMOUNT | + Inotify.IN_MOVED_TO | Inotify.IN_MASK_ADD | + Inotify.IN_MOVE_SELF | Inotify.IN_DELETE | + Inotify.IN_MOVE | Inotify.IN_MODIFY | + Inotify.IN_ATTRIB | Inotify.IN_DELETE_SELF | + Inotify.IN_CREATE); +#endif + } + + void destroy() + { +#if HAVE_INOTIFY + instance->rm_watch(wd); +#endif /* HAVE_INOTIFY */ } //! Call a notification callback. @@ -429,6 +489,7 @@ } this_program::files = files; foreach(new_files, string file) { + if(filter_file(file)) continue; file = canonic_path(Stdio.append_path(path, file)); Monitor m2 = monitors[file]; mixed err = catch { @@ -516,7 +577,7 @@ //! @[file_deleted()], @[stable_data_change()] int(0..1) check(MonitorFlags|void flags) { - werror("Checking monitor %O...\n", this); + //werror("Checking monitor %O...\n", this); Stdio.Stat st = file_stat(path, 1); Stdio.Stat old_st = this_program::st; int orig_flags = this_program::flags; @@ -530,6 +591,7 @@ this_program::files = files; foreach(files, string file) { file = canonic_path(Stdio.append_path(path, file)); + if(filter_file(file)) continue; if (monitors[file]) { // There's already a monitor for the file. // Assume it has already notified about existance. @@ -697,7 +759,12 @@ { #if HAVE_EVENTSTREAM eventstream->callback_func = eventstream_callback; -#endif +#elseif HAVE_INOTIFY + instance = Inotify._Instance(); + file = Stdio.File(instance->get_fd(), "r"); + file->set_nonblocking(); + file->set_read_callback(inotify_parse); +#endif if (max_dir_check_interval > 0) { this_program::max_dir_check_interval = max_dir_check_interval; @@ -769,7 +836,7 @@ int(0..)|void file_interval_factor, int(0..)|void stable_time) { - werror("monitor_factory(%O)\n", path); + // werror("monitor_factory(%O)\n", path); return Monitor(path, flags, max_dir_check_interval, file_interval_factor, stable_time); } @@ -812,8 +879,8 @@ int(0..)|void file_interval_factor, int(0..)|void stable_time) { -werror("monitor\n"); path = canonic_path(path); + if(filter_file(path)) return; Monitor m = monitors[path]; if (m) { if (!(flags & MF_AUTO)) { @@ -839,6 +906,15 @@ } } +int filter_file(string path) +{ + array x = path/"/"; + foreach(x;; string pc) + if(pc && strlen(pc) && pc[0]=='.') { werror("skipping %O\n", path); return 1; } + + return 0; +} + //! Release a @[path] from monitoring. //! //! @param path @@ -1071,9 +1147,11 @@ }; #if HAVE_EVENTSTREAM // if we are using FSEvents, we don't want to run this check more than once to prime the pumps. +#elseif HAVE_INOTIFY +// if we are using Inotify, we don't want to run this check more than once to prime the pumps. #else set_nonblocking(t); -#endif +#endif /* HAVE_EVENTSTREAM */ if (err) throw(err); } diff -r 43849dab1e8207e0e37095277e14f0352355ec1d -r aee55c69457369abdebf9bbd1a0609a5346cab30 modules/FS.pmod/Monitor.pmod/symlinks.pike --- a/modules/FS.pmod/Monitor.pmod/symlinks.pike Thu Dec 22 22:56:36 2011 -0500 +++ b/modules/FS.pmod/Monitor.pmod/symlinks.pike Sun Dec 25 00:51:17 2011 -0500 @@ -443,9 +443,10 @@ protected void monitor(string path, int flags, int max_dir_interval, int file_interval_factor, int stable_time) { + object m; ::monitor(path, flags, max_dir_check_interval, file_interval_factor, stable_time); - monitors[path]->symlinks |= symlinks; + if((m = monitors[path])) m->symlinks |= symlinks; } //! Called when the status has changed for an existing file.