首先找到入口,比如获取最新遥测数据方法getLatestTelemetry,/DEVICE/deviceId/values/timeseries 在 TelemetryController 中:
@RestController @TbCoreComponent // TbUrlConstants.TELEMETRY_URL_PREFIX=="/api/plugins/telemetry" @RequestMapping(TbUrlConstants.TELEMETRY_URL_PREFIX) @Slf4j public class TelemetryController extends BaseController { // 这个组件负责获取设备遥测数据 @Autowired private TimeseriesService tsService; // 这个组件时为了验证当前用户是否有权限去执行当前操作 @Autowired private AccessValidator accessValidator; @Value("${transport.json.max_string_value_length:0}") private int maxStringValueLength; private ExecutorService executor; @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") @RequestMapping(value = "/{entityType}/{entityId}/values/timeseries", method = RequestMethod.GET) @ResponseBody public DeferredResult<ResponseEntity> getLatestTimeseries( @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr, @RequestParam(name = "keys", required = false) String keysStr, @RequestParam(name = "useStrictDataTypes", required = false, defaultValue = "false") Boolean useStrictDataTypes) throws ThingsboardException { SecurityUser user = getCurrentUser(); return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.READ_TELEMETRY, entityType, entityIdStr, (result, tenantId, entityId) -> getLatestTimeseriesValuesCallback(result, user, entityId, keysStr, useStrictDataTypes)); } private void getLatestTimeseriesValuesCallback(@Nullable DeferredResult<ResponseEntity> result, SecurityUser user, EntityId entityId, String keys, Boolean useStrictDataTypes) { ListenableFuture<List<TsKvEntry>> future; if (StringUtils.isEmpty(keys)) { // 如果我们不传入键名列表,则返回所有遥测数据。 future = tsService.findAllLatest(user.getTenantId(), entityId); } else { // 如果传入键名列表,则只查找相关的遥测数据。比如查经纬度要将longitude和latitude传入这个接口 future = tsService.findLatest(user.getTenantId(), entityId, toKeysList(keys)); } Futures.addCallback(future, getTsKvListCallback(result, useStrictDataTypes), MoreExecutors.directExecutor()); } }
这个类主要是用于判断当前访问者是否有访问遥测数据的权限
validateEntityAndCallback 重载方法很多,一定要注意区分。这里使用的是第一个方法
@Component public class AccessValidator { ........ // 这是第一个 validateEntityAndCallback 方法,这一步加入了出错处理方法 public DeferredResult<ResponseEntity> validateEntityAndCallback(SecurityUser currentUser, Operation operation, String entityType, String entityIdStr, ThreeConsumer<DeferredResult<ResponseEntity>, TenantId, EntityId> onSuccess) throws ThingsboardException { // 调用了第二个 validateEntityAndCallback 方法 return validateEntityAndCallback(currentUser, operation, entityType, entityIdStr, onSuccess, // 加入出错处理方法 (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR)); } // 这是第二个 validateEntityAndCallback 方法,这一步根据id和类型(DEVICE)产生EntityId,以便找到这个设备 public DeferredResult<ResponseEntity> validateEntityAndCallback(SecurityUser currentUser, Operation operation, String entityType, String entityIdStr, ThreeConsumer<DeferredResult<ResponseEntity>, TenantId, EntityId> onSuccess, BiConsumer<DeferredResult<ResponseEntity>, Throwable> onFailure) throws ThingsboardException { // 调用第四个 validateEntityAndCallback 方法 return validateEntityAndCallback(currentUser, operation, // 根据类型(DEVICE)和id产生EntityId实例,以便查找设备 EntityIdFactory.getByTypeAndId(entityType, entityIdStr),onSuccess, onFailure); } // 这是第三个 validateEntityAndCallback 方法,没有被调用 public DeferredResult<ResponseEntity> validateEntityAndCallback(SecurityUser currentUser, Operation operation, EntityId entityId, ThreeConsumer<DeferredResult<ResponseEntity>, TenantId, EntityId> onSuccess) throws ThingsboardException { return validateEntityAndCallback(currentUser, operation, entityId, onSuccess, (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR)); } // 这是第四个 validateEntityAndCallback 方法 public DeferredResult<ResponseEntity> validateEntityAndCallback(SecurityUser currentUser, Operation operation, EntityId entityId, ThreeConsumer<DeferredResult<ResponseEntity>, TenantId, EntityId> onSuccess, BiConsumer<DeferredResult<ResponseEntity>, Throwable> onFailure) throws ThingsboardException { final DeferredResult<ResponseEntity> response = new DeferredResult<>(); // 调用validate方法,根据当前EntityType来确定使用哪个方法 validate(currentUser, operation, entityId, new HttpValidationCallback(response, new FutureCallback<DeferredResult<ResponseEntity>>() { @Override public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) { try { onSuccess.accept(response, currentUser.getTenantId(), entityId); } catch (Exception e) { onFailure(e); } } @Override public void onFailure(Throwable t) { onFailure.accept(response, t); } })); return response; } public void validate(SecurityUser currentUser, Operation operation, EntityId entityId, FutureCallback<ValidationResult> callback) { switch (entityId.getEntityType()) { case DEVICE: // EntityType=='DEVICE',调用validateDevice方法,直接返回响应结果到http response validateDevice(currentUser, operation, entityId, callback); return; case DEVICE_PROFILE: validateDeviceProfile(currentUser, operation, entityId, callback); return; /*。。。。。。。。。。。。。。*/ default: //TODO: add support of other entities throw new IllegalStateException("Not Implemented!"); } } private void validateDevice(final SecurityUser currentUser, Operation operation, EntityId entityId, FutureCallback<ValidationResult> callback) { // 系统管理员没有查看设备遥测数据的权限 if (currentUser.isSystemAdmin()) { callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION)); } else { // 根据TenantId和DeviceId来查找Device实例 ListenableFuture<Device> deviceFuture = deviceService.findDeviceByIdAsync(currentUser.getTenantId(), new DeviceId(entityId.getId())); Futures.addCallback(deviceFuture, getCallback(callback, device -> { if (device == null) { return ValidationResult.entityNotFound(DEVICE_WITH_REQUESTED_ID_NOT_FOUND); } else { try { // 查找权限列表判断当前用户有无读取设备遥测数据的权限。如果没有权限将直接报错,然后出错处理 accessControlService.checkPermission(currentUser, Resource.DEVICE, operation, entityId, device); } catch (ThingsboardException e) { return ValidationResult.accessDenied(e.getMessage()); } return ValidationResult.ok(device); } }), executor); } } public interface ThreeConsumer<A, B, C> { void accept(A a, B b, C c); } }
@Service @Slf4j public class BaseTimeseriesService implements TimeseriesService { // 这个组件负责查找所有最新遥测数据 @Autowired private TimeseriesLatestDao timeseriesLatestDao; @Override public ListenableFuture<List<TsKvEntry>> findLatest(TenantId tenantId, EntityId entityId, Collection<String> keys) { validate(entityId); List<ListenableFuture<TsKvEntry>> futures = Lists.newArrayListWithExpectedSize(keys.size()); keys.forEach(key -> Validator.validateString(key, "Incorrect key " + key)); keys.forEach(key -> futures.add(timeseriesLatestDao.findLatest(tenantId, entityId, key))); return Futures.allAsList(futures); } @Override public ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId) { //校验当前deviceId是否合法 validate(entityId); // 根据tenantId和deviceId返回结果 return timeseriesLatestDao.findAllLatest(tenantId, entityId); } }
@Slf4j @Component @SqlTsLatestAnyDaopublic class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao implements TimeseriesLatestDao { @Autowired private SearchTsKvLatestRepository searchTsKvLatestRepository; @Override public ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId) { return getFindAllLatestFuture(entityId); } protected ListenableFuture<List<TsKvEntry>> getFindAllLatestFuture(EntityId entityId) { return Futures.immediateFuture(DaoUtil.convertDataList(Lists.newArrayList( searchTsKvLatestRepository.findAllByEntityId(entityId.getId())))); } }
@SqlTsLatestAnyDao@Repositorypublic class SearchTsKvLatestRepository { public static final String FIND_ALL_BY_ENTITY_ID = "findAllByEntityId"; public static final String FIND_ALL_BY_ENTITY_ID_QUERY = "SELECT ts_kv_latest.entity_id AS entityId, ts_kv_latest.key AS key, ts_kv_dictionary.key AS strKey, ts_kv_latest.str_v AS strValue," + " ts_kv_latest.bool_v AS boolValue, ts_kv_latest.long_v AS longValue, ts_kv_latest.dbl_v AS doubleValue, ts_kv_latest.json_v AS jsonValue, ts_kv_latest.ts AS ts FROM ts_kv_latest " + "INNER JOIN ts_kv_dictionary ON ts_kv_latest.key = ts_kv_dictionary.key_id WHERE ts_kv_latest.entity_id = cast(:id AS uuid)"; @PersistenceContext private EntityManager entityManager; public List<TsKvLatestEntity> findAllByEntityId(UUID entityId) { return entityManager.createNamedQuery(FIND_ALL_BY_ENTITY_ID, TsKvLatestEntity.class.setParameter("id",entityId).getResultList()); } }