更新时间:2024-10-24 GMT+08:00

MoXing进阶用法的样例代码

如果您已经熟悉了常用操作,同时熟悉MoXing Framework API文档以及常用的Python编码,您可以参考本章节使用MoXing Framework的一些进阶用法。

读取完毕后将文件关闭

当读取OBS文件时,实际调用的是HTTP连接读去网络流,注意要记得在读取完毕后将文件关闭。为了防止忘记文件关闭操作,推荐使用with语句,在with语句退出时会自动调用mox.file.File对象的close()方法:

1
2
3
import moxing as mox
with mox.file.File('obs://bucket_name/obs_file.txt', 'r') as f:
  data = f.readlines()

利用pandas读或写一个OBS文件

  • 利用pandas读一个OBS文件。
    1
    2
    3
    4
    import pandas as pd
    import moxing as mox
    with mox.file.File("obs://bucket_name/b.txt", "r") as f:
      csv = pd.read_csv(f)
    
  • 利用pandas写一个OBS文件。
    1
    2
    3
    4
    5
    import pandas as pd
    import moxing as mox
    df = pd.DataFrame({'col1': [1, 2], 'col2': [3, 4]})
    with mox.file.File("obs://bucket_name/b.txt", "w") as f:
      df.to_csv(f)
    

利用文件对象读取图片

使用opencv打开一张图片时,无法传入一个OBS路径,需要利用文件对象读取,考虑以下代码是无法读取到该图片的。

1
2
import cv2
cv2.imread('obs://bucket_name/xxx.jpg', cv2.IMREAD_COLOR)

修改为如下代码:

1
2
3
4
import cv2
import numpy as np
import moxing as mox
img = cv2.imdecode(np.fromstring(mox.file.read('obs://bucket_name/xxx.jpg', binary=True), np.uint8), cv2.IMREAD_COLOR)

将一个不支持OBS路径的API改造成支持OBS路径的API

pandas中对h5的文件读写to_hdfread_hdf既不支持OBS路径,也不支持输入一个文件对象,考虑以下代码会出现错误。

1
2
3
4
import pandas as pd
df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]}, index=['a', 'b', 'c'])
df.to_hdf('obs://wolfros-net/hdftest.h5', key='df', mode='w')
pd.read_hdf('obs://wolfros-net/hdftest.h5')

通过重写pandas源码API的方式,将该API改造成支持OBS路径的形式。

  • 写h5到OBS = 写h5到本地缓存 + 上传本地缓存到OBS + 删除本地缓存
  • 从OBS读h5 = 下载h5到本地缓存 + 读取本地缓存 + 删除本地缓存

即将以下代码写在运行脚本的最前面,就能使运行过程中的to_hdfread_hdf支持OBS路径。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import os
import moxing as mox
import pandas as pd
from pandas.io import pytables
from pandas.core.generic import NDFrame

to_hdf_origin = getattr(NDFrame, 'to_hdf')
read_hdf_origin = getattr(pytables, 'read_hdf')


def to_hdf_override(self, path_or_buf, key, **kwargs):
  tmp_dir = '/cache/hdf_tmp'
  file_name = os.path.basename(path_or_buf)
  mox.file.make_dirs(tmp_dir)
  local_file = os.path.join(tmp_dir, file_name)
  to_hdf_origin(self, local_file, key, **kwargs)
  mox.file.copy(local_file, path_or_buf)
  mox.file.remove(local_file)


def read_hdf_override(path_or_buf, key=None, mode='r', **kwargs):
  tmp_dir = '/cache/hdf_tmp'
  file_name = os.path.basename(path_or_buf)
  mox.file.make_dirs(tmp_dir)
  local_file = os.path.join(tmp_dir, file_name)
  mox.file.copy(path_or_buf, local_file)
  result = read_hdf_origin(local_file, key, mode, **kwargs)
  mox.file.remove(local_file)
  return result

setattr(NDFrame, 'to_hdf', to_hdf_override)
setattr(pytables, 'read_hdf', read_hdf_override)
setattr(pd, 'read_hdf', read_hdf_override)

利用MoXing使h5py.File支持OBS

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import os
import h5py
import numpy as np
import moxing as mox

h5py_File_class = h5py.File

class OBSFile(h5py_File_class):
  def __init__(self, name, *args, **kwargs):
    self._tmp_name = None
    self._target_name = name
    if name.startswith('obs://'):
      self._tmp_name = name.replace('/', '_')
      if mox.file.exists(name):
        mox.file.copy(name, os.path.join('cache', 'h5py_tmp', self._tmp_name))
      name = self._tmp_name

    super(OBSFile, self).__init__(name, *args, **kwargs)

  def close(self):
    if self._tmp_name:
      mox.file.copy(self._tmp_name, self._target_name)

    super(OBSFile, self).close()


setattr(h5py, 'File', OBSFile)

arr = np.random.randn(1000)
with h5py.File('obs://bucket/random.hdf5', 'r') as f:
  f.create_dataset("default", data=arr)

with h5py.File('obs://bucket/random.hdf5', 'r') as f:
  print(f.require_dataset("default", dtype=np.float32, shape=(1000,)))