版本信息
python3.7 tornado==4.3.0
问题描述: 多次下载同样的文件,每次文件的hash均不相同.
下载文件的示例接口:
import tornado from tornado.concurrent import futures from tornado.concurrent import run_on_executor from tornado.web import RequestHandler @run_on_executor @tornado.web.asynchronous class XXX_Handler(RequestHandler): executor = futures.ThreadPoolExecutor() def get(): data = open("/data/xxx.csv",encoding="utf-8") chunk = data.read(65536) while chunk: self.write(chunk) self.flush() chunk = data.read(65536) data.close() self.set_status(200) return self.finish()
推测可能与线程池有关,果真去掉@run_on_executor则正常.
好奇心驱使进行具体原因调查,后面涉及到Tornado相关源码的查看与记录。
RequestHandler
的flush
方法 检查到self.stream.write 中的 data每次的hash都是相同,继续向下调查。else: if callback is not None: self._write_callback = stack_context.wrap(callback) else: future = self._write_future = Future() data = b"\r\n".join(lines) + b"\r\n\r\n" if chunk: data += self._format_chunk(chunk) self._pending_write = self.stream.write(data) self._pending_write.add_done_callback(self._on_write_complete) return future
stream = IOStream(connection, io_loop=self.io_loop, max_buffer_size=self.max_buffer_size, read_chunk_size=self.read_chunk_size) future = self.handle_stream(stream, address) if future is not None: self.io_loop.add_future(future, lambda f: f.result()) except Exception: app_log.error("Error in connection callback", exc_info=True)
3.知道通过IOstream来传送数据,查看它的write方法.主要就是按照大小将上层传过来的数据切分到指定大小,
(1).通过self._handle_write来发送 self._write_buffer保存的chunk数据
if not self._connecting: self._handle_write() if self._write_buffer: self._add_io_state(self.io_loop.WRITE) self._maybe_add_error_listener() return future
(2). 查看self._handle_write, self._handle_write是ThreadPoolExecutor的线程池来处理的,发生(Resource temporarily unavailable)错误失败, 会返回到上面的函数中添加写事件来处理(主线程) self._add_io_state(self.io_loop.WRITE)
。
如果发送错误比较多,这就导致出现多线程都在写self._write_buffer
的问题, 会出现顺序错误或者重复数据的问题从而导致hash结果不同。
except (socket.error, IOError, OSError) as e: if e.args[0] in _ERRNO_WOULDBLOCK: self._write_buffer_frozen = True break else: if not self._is_connreset(e): # Broken pipe errors are usually caused by connection # reset, and its better to not log EPIPE errors to # minimize log spam gen_log.warning("Write error on %s: %s", self.fileno(), e) self.close(exc_info=True) return
测试使用硬核的方法,失败后去除掉 主线程重试的写事件,而是交给当前线程一直重试。。。这个测试可以获得正确的hash,不过最好是去掉@run_on_executor
的使用.