Java 版目录 watchdog

工作中我们经常会用到监听目录的功能, 当目录中有文件增加删除等操作时, 执行某些任务. 如果是Java来写, 怎么做呢 ?

实现方式

实现这个功能, 我们要使用到WatchServicePath这两个东西, 首先创建一个WatchService:

WatchService watcher = FileSystems.getDefault().newWatchService();

接着就可以注册这个WatchService到某个Path了:

Path dir = ...;
WatchKey key = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
`ENTRY_CREATE`等事件定义在`StandardWatchEventKinds`中.

注册后, 我们使用WatchService.take()就可以获取目录变更的事件了, 若没有, 则wait .

工具类

接下来就是写一个工具类WatchDirUtils, 用来接收和注册监听请求及回调.

相对于WatchDirUtils, 客户端的需求无非就两个, 一个是要监听的目录, 另一个是变更时的Callback. 我们可以封装成一个类.

我们暂且定义WatchPathWatchCallback两个接口:

public interface WatchPath {
public java.nio.file.Path getPath();
}
public interface WatchCallback {
public void handle(WatchEvent<?> event);
}

然后, 定义一个WatchRequest接口, 继承WatchPathWatchCallback, 同时要求返回是否递归监听.

public interface WatchRequest extends WatchPath, WatchCallback {
public boolean isRecursive();
}

接口定义好了, 接下来就是实现了, 我们新建一个DebugWatchRequest实现类,

public class DebugWatchRequest implements WatchRequest {
...
@Override
public void handle(WatchEvent<Path> ev) {
Path name = ev.context();
Path child = dir.resolve(name);
System.out.format("%s: %s\n", event.kind().name(), child);
...
}
...
}

封装好了watch请求, 只需要定义一个静态方法, 把请求传入即可.

WatchDirUtils.register(new DebugWatchRequest(Path dir));
public static void register(WatchRequest watchRequest) throws IOException {
if (started.compareAndSet(false, true)) {
init();
}
if (serviceDown.get()) {
throw new Exception("watch service down.");
}
try {
Path dir = watchRequest.getPath();
if (watchRequest.isRecursive()) {
System.out.format("Scanning %s ...\n", dir);
registerRecursive(dir, watchRequest);
System.out.println("Done.");
} else {
registerNonRecursive(watchRequest);
}
} catch (IOException e) {
e.printStackTrace();
}
}

当该方法第一次被调用时, 会进行初始化:

private static void init() throws IOException {
watcher = FileSystems.getDefault().newWatchService();
keysToWatchRequest = new ConcurrentHashMap<>();
trace = true;

// 启动线程来处理事件
ExecutorService exec = Executors.newSingleThreadExecutor();
exec.execute(new EventProcessor());

serviceDown.set(false);
}

初始化包括新建watcher, 还有keysToWatchRequest(
在前面的介绍中, 我们已经知道, register某个目录后, 会有一个WatchKey返回, 所以需要有一个Map来关联WatchKey和Callback ).

最后启动一个线程来从watcher获取变更的事件, 并调用对应的Callback去handle该事件.

// EventProcessor

private static class EventProcessor implements Runnable {

@Override
public void run() {
while (!Thread.interrupted()) {
// wait for key to be signalled
WatchKey key;
try {
key = watcher.take();
} catch (InterruptedException x) {
serviceDown();
return;
}

WatchRequest watchRequest = keysToWatchRequest.get(key);
if (watchRequest == null) {
System.err.println("WatchKey not recognized (no cb) !!");
continue;
}
Path dir = watchRequest.getPath();
if (dir == null) {
System.err.println("WatchKey not recognized (no path) !!");
continue;
}

for (WatchEvent<?> event: key.pollEvents()) {
WatchEvent.Kind kind = event.kind();
// TBD - provide example of how OVERFLOW event is handled
if (kind == OVERFLOW) {
continue;
}

// handle
watchRequest.handle(event);

// Context for directory entry event is the file name of entry
WatchEvent<Path> ev = cast(event);
Path name = ev.context();
Path child = dir.resolve(name);

// if directory is created, and watching recursively, then
// register it and its sub-directories
if (watchRequest.isRecursive() && (kind == ENTRY_CREATE)) {
try {
if (Files.isDirectory(child, NOFOLLOW_LINKS)) {
registerRecursive(child, new DebugWatchRequest(child).setIsRecursive(true));
}
} catch (IOException x) {
// ignore to keep sample readbale
}
}
}

// reset key and remove from set if directory no longer accessible
boolean valid = key.reset();
if (!valid) {
keysToWatchRequest.remove(key);

// all directories are inaccessible
if (keysToWatchRequest.isEmpty()) {
break;
}
}
}
serviceDown();
}

private void serviceDown() {
serviceDown.set(true);
// TODO REPORT IT
}

}

改进

这个demo实现了一个简单的目录watch功能, 还有许多地方需要改进. 比如:

  • InterruptedException等异常 及 客户端自定义通知方

当发生interrupt时, 应该停止服务释放资源, 进行记录日志并通过一些report通知给中控或相关人员, 关闭WatchService等 不过也应支持一下客户端自定义通知方式.
同时, OVERFLOW等异常也要有对应的处理方式, 这个WatchService也不能100%保证可靠, 可以采取一些定时扫描的其他措施来弥补.

  • 事件的处理和Callback的调用效率

目前的事件处理线程是只有一个的, 而且Callback的handle()时同步调用的.

那能够改成异步的呢 ?

这个要看情况.
如果这个回调操作是次要业务, 那就不能使用异步. 因为如果一瞬间促发了许多回调, 那么这些回调同时运行, 极有可能迅速占用系统功能, 影响主业务, 甚至崩溃.
使用异步是有一定风险的, 不过开多几个线程还是可以考虑的.

  • 支持多个WatchCallback

这个demo支持一个WatchKey对应一个Callback, 当有新的到来时, 就会覆盖旧的, 这个明显不太好. 不过要支持多个WatchCallback也很容易修改.

  • 增加取消注册的功能

参考

https://docs.oracle.com/javase/tutorial/essential/io/notification.html