Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plugins里面的alicloud代码跟官方提供的demo代码相比没看到本质变化 #4

Open
loxeed1990 opened this issue Oct 8, 2024 · 2 comments

Comments

@loxeed1990
Copy link

所谓的增加对阿里云的支持是怎么支持的?alicloud_recog_engine.c代码跟官方的demo_recog_engine.c除了改了变量名和加了日志,其他没什么区别,没看到调用阿里云ASR服务的逻辑,请问作者具体是怎么添加的阿里云支持?

@hani-code-explorer
Copy link

hani-code-explorer commented Oct 12, 2024

我这边可以提供 MRCP 与阿里云集成的相关 plugin,下面是一个 TTS 示例,需要阿里云 nls 的 sdk

/*
 * Copyright 2008-2015 Arsen Chaloyan
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

/* 
 * Mandatory rules concerning plugin implementation.
 * 1. Each plugin MUST implement a plugin/engine creator function
 *    with the exact signature and name (the main entry point)
 *        MRCP_PLUGIN_DECLARE(mrcp_engine_t*) mrcp_plugin_create(apr_pool_t *pool)
 * 2. Each plugin MUST declare its version number
 *        MRCP_PLUGIN_VERSION_DECLARE
 * 3. One and only one response MUST be sent back to the received request.
 * 4. Methods (callbacks) of the MRCP engine channel MUST not block.
 *   (asynchronous response can be sent from the context of other thread)
 * 5. Methods (callbacks) of the MPF engine stream MUST not block.
 */

extern "C" {
#include "mrcp_synth_engine.h"
#include "mpf_buffer.h"
#include "apt_consumer_task.h"
#include "apt_log.h"
} // extern C

#include <string.h>
#include <sys/io.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

#include <ctime>
#include <fstream>
#include <iostream>
#include <string>
#include <vector>

#include "nlsClient.h"
#include "nlsEvent.h"
#include "nlsToken.h"
#include "speechSynthesizerRequest.h"

#define DEFAULT_STRING_LEN 512
#define AUDIO_TEXT_LENGTH 1024

using namespace AlibabaNlsCommon;
using AlibabaNls::NlsClient;
using AlibabaNls::NlsEvent;
using AlibabaNls::SpeechSynthesizerRequest;
using AlibabaNls::LongTts;
using AlibabaNls::LogDebug;



#define SYNTH_ENGINE_TASK_NAME "Alimrcp Synth Engine"

typedef struct alimrcp_synth_engine_t alimrcp_synth_engine_t;
typedef struct alimrcp_synth_channel_t alimrcp_synth_channel_t;
typedef struct alimrcp_synth_msg_t alimrcp_synth_msg_t;

/** Declaration of synthesizer engine methods */
static apt_bool_t alimrcp_synth_engine_destroy(mrcp_engine_t *engine);
static apt_bool_t alimrcp_synth_engine_open(mrcp_engine_t *engine);
static apt_bool_t alimrcp_synth_engine_close(mrcp_engine_t *engine);
static mrcp_engine_channel_t* alimrcp_synth_engine_channel_create(mrcp_engine_t *engine, apr_pool_t *pool);

static const struct mrcp_engine_method_vtable_t engine_vtable = {
	alimrcp_synth_engine_destroy,
	alimrcp_synth_engine_open,
	alimrcp_synth_engine_close,
	alimrcp_synth_engine_channel_create
};


/** Declaration of synthesizer channel methods */
static apt_bool_t alimrcp_synth_channel_destroy(mrcp_engine_channel_t *channel);
static apt_bool_t alimrcp_synth_channel_open(mrcp_engine_channel_t *channel);
static apt_bool_t alimrcp_synth_channel_close(mrcp_engine_channel_t *channel);
static apt_bool_t alimrcp_synth_channel_request_process(mrcp_engine_channel_t *channel, mrcp_message_t *request);

static const struct mrcp_engine_channel_method_vtable_t channel_vtable = {
	alimrcp_synth_channel_destroy,
	alimrcp_synth_channel_open,
	alimrcp_synth_channel_close,
	alimrcp_synth_channel_request_process
};

/** Declaration of synthesizer audio stream methods */
static apt_bool_t alimrcp_synth_stream_destroy(mpf_audio_stream_t *stream);
static apt_bool_t alimrcp_synth_stream_open(mpf_audio_stream_t *stream, mpf_codec_t *codec);
static apt_bool_t alimrcp_synth_stream_close(mpf_audio_stream_t *stream);
static apt_bool_t alimrcp_synth_stream_read(mpf_audio_stream_t *stream, mpf_frame_t *frame);

static const mpf_audio_stream_vtable_t audio_stream_vtable = {
	alimrcp_synth_stream_destroy,
	alimrcp_synth_stream_open,
	alimrcp_synth_stream_close,
	alimrcp_synth_stream_read,
	NULL,
	NULL,
	NULL,
	NULL
};

/** Declaration of alimrcp synthesizer engine */
struct alimrcp_synth_engine_t {
	apt_consumer_task_t    *task;
};

/** Declaration of alimrcp synthesizer channel */
struct alimrcp_synth_channel_t {
	/** Back pointer to engine */
	alimrcp_synth_engine_t   *alimrcp_engine;
	/** Engine channel base */
	mrcp_engine_channel_t *channel;

	/** Active (in-progress) speak request */
	mrcp_message_t        *speak_request;
	/** Pending stop response */
	mrcp_message_t        *stop_response;
	/** Estimated time to complete */
	apr_size_t             time_to_complete;
	/** Is paused */
	apt_bool_t             paused;
	/** Speech source (used instead of actual synthesis) */
	FILE                  *audio_file;
	mpf_buffer_t          *audio_buffer;
};

typedef enum {
	ALIMRCP_SYNTH_MSG_OPEN_CHANNEL,
	ALIMRCP_SYNTH_MSG_CLOSE_CHANNEL,
	ALIMRCP_SYNTH_MSG_REQUEST_PROCESS
} alimrcp_synth_msg_type_e;

/** Declaration of alimrcp synthesizer task message */
struct alimrcp_synth_msg_t {
	alimrcp_synth_msg_type_e  type;
	mrcp_engine_channel_t *channel; 
	mrcp_message_t        *request;
};


static apt_bool_t alimrcp_synth_msg_signal(alimrcp_synth_msg_type_e type, mrcp_engine_channel_t *channel, mrcp_message_t *request);
static apt_bool_t alimrcp_synth_msg_process(apt_task_t *task, apt_task_msg_t *msg);

/** Declare this macro to set plugin version */
MRCP_PLUGIN_VERSION_DECLARE

/**
 * Declare this macro to use log routine of the server, plugin is loaded from.
 * Enable/add the corresponding entry in logger.xml to set a cutsom log source priority.
 *    <source name="SYNTH-PLUGIN" priority="DEBUG" masking="NONE"/>
 */
MRCP_PLUGIN_LOG_SOURCE_IMPLEMENT(SYNTH_PLUGIN,"SYNTH-PLUGIN")

/** Use custom log source mark */
#define SYNTH_LOG_MARK   APT_LOG_MARK_DECLARE(SYNTH_PLUGIN)


/**
 * 全局维护一个服务鉴权token和其对应的有效期时间戳,
 * 每次调用服务之前,首先判断token是否已经过期,
 * 如果已经过期,则根据AccessKey ID和AccessKey
 * Secret重新生成一个token,并更新这个全局的token和其有效期时间戳。
 *
 * 注意:不要每次调用服务之前都重新生成新token,只需在token即将过期时重新生成即可。所有的服务并发可共用一个token。
 */
// 自定义线程参数
struct ParamStruct {
    char text[AUDIO_TEXT_LENGTH];
    char token[DEFAULT_STRING_LEN];
    char appKey[DEFAULT_STRING_LEN];
    char url[DEFAULT_STRING_LEN];
    char var_file_path[DEFAULT_STRING_LEN];
    char session_id[DEFAULT_STRING_LEN];
    mpf_buffer_t *audio_buffer;
    mrcp_synth_header_t *synth_header;
};


// 自定义事件回调参数
struct ParamCallBack {
public:
    explicit ParamCallBack(ParamStruct *param) {
        tParam = param;
        pthread_mutex_init(&mtxWord, NULL);
        pthread_cond_init(&cvWord, NULL);
    };

    ~ParamCallBack() {
        tParam = NULL;
        pthread_mutex_destroy(&mtxWord);
        pthread_cond_destroy(&cvWord);
    };

    pthread_mutex_t mtxWord;
    pthread_cond_t cvWord;

    ParamStruct *tParam;
};

std::string g_appKey = "";
std::string g_akId = "";
std::string g_akSecret = "";
std::string g_token = "";
std::string g_url = "";
std::string g_voice = "xiaoyun";
std::string g_format = "pcm";
int g_sdkLogLevel = 1;
bool g_save_audio = true;
bool g_encodingUtf8 = true;
int g_volume = 50;
int g_speech_rate = 0;
int g_pitch_rate = 0;
int g_sample_rate = 8000;

int g_cpu = 1;
long g_expireTime = -1;

static bool sysAddrinfo = false;

/**
 * 根据AccessKey ID和AccessKey Secret重新生成一个token,并获取其有效期时间戳
 */
static int generateToken(std::string akId, std::string akSecret, std::string* token, long* expireTime)
{
	NlsToken nlsTokenRequest;
	nlsTokenRequest.setAccessKeyId(akId);
	nlsTokenRequest.setKeySecret(akSecret);

	int ret = nlsTokenRequest.applyNlsToken();
	if (ret < 0) {
		// 获取失败原因。
		apt_log(SYNTH_LOG_MARK,APT_PRIO_ERROR,"generateToken Failed, error code:%d msg:%s",ret,nlsTokenRequest.getErrorMsg());
		return ret;
	}
	*token = nlsTokenRequest.getToken();
	*expireTime = nlsTokenRequest.getExpireTime();
	return 0;
}

static void OnSynthesisStarted(NlsEvent *cbEvent, void *cbParam) 
{
	ParamCallBack *tmpParam = static_cast<ParamCallBack *>(cbParam);
	if (cbParam && tmpParam->tParam) {
		// 获取消息的状态码,成功为0或者20000000,失败时对应失败的错误码。
		// 当前任务的task id,方便定位问题,作为和服务端交互的唯一标识建议输出。
		apt_log(SYNTH_LOG_MARK,APT_PRIO_DEBUG,"OnSynthesisStarted: status code=%d, session id = %s, task id=%s",cbEvent->getStatusCode(),tmpParam->tParam->session_id,cbEvent->getTaskId());
	} else {
		// 获取消息的状态码,成功为0或者20000000,失败时对应失败的错误码。
		// 当前任务的task id,方便定位问题,作为和服务端交互的唯一标识建议输出。
		apt_log(SYNTH_LOG_MARK,APT_PRIO_DEBUG,"OnSynthesisStarted: status code=%d, task id=%s",cbEvent->getStatusCode(),cbEvent->getTaskId());
	}
}

/**
 * @brief sdk在接收到云端返回合成结束消息时, sdk内部线程上报Completed事件
 * @note 上报Completed事件之后,SDK内部会关闭识别连接通道.
 * @param cbEvent 回调事件结构, 详见nlsEvent.h
 * @param cbParam 回调自定义参数,默认为NULL, 可以根据需求自定义参数
 * @return
 */
static void OnSynthesisCompleted(NlsEvent *cbEvent, void *cbParam) 
{
	ParamCallBack *tmpParam = static_cast<ParamCallBack *>(cbParam);
	if (cbParam && tmpParam->tParam && tmpParam->tParam->audio_buffer) {
                mpf_buffer_event_write(tmpParam->tParam->audio_buffer, MEDIA_FRAME_TYPE_EVENT);
		// 获取消息的状态码,成功为0或者20000000,失败时对应失败的错误码。
		// 当前任务的task id,方便定位问题,作为和服务端交互的唯一标识建议输出。
		apt_log(SYNTH_LOG_MARK,APT_PRIO_DEBUG,"OnSynthesisCompleted: status code=%d, task id=%s",cbEvent->getStatusCode(),cbEvent->getTaskId());
		// 获取服务端返回的全部信息。
		apt_log(SYNTH_LOG_MARK,APT_PRIO_DEBUG,"OnSynthesisCompleted: all response=%s",cbEvent->getAllResponse());
        }
}

/**
 * @brief 合成过程发生异常时, sdk内部线程上报TaskFailed事件
 * @note 上报TaskFailed事件之后,SDK内部会关闭识别连接通道.
 * @param cbEvent 回调事件结构, 详见nlsEvent.h
 * @param cbParam 回调自定义参数,默认为NULL, 可以根据需求自定义参数
 * @return
 */
static void OnSynthesisTaskFailed(NlsEvent *cbEvent, void *cbParam) 
{
	ParamCallBack *tmpParam = static_cast<ParamCallBack *>(cbParam);
	if (tmpParam) {
		apt_log(SYNTH_LOG_MARK,APT_PRIO_WARNING,"OnSynthesisTaskFailed: status code=%d, task id=%s, error message=%s",cbEvent->getStatusCode(),cbEvent->getTaskId(),cbEvent->getErrorMessage());
		apt_log(SYNTH_LOG_MARK,APT_PRIO_WARNING,"OnSynthesisTaskFailed: All response:%s",cbEvent->getAllResponse());
	}
}

/**
 * @brief 识别结束或发生异常时,会关闭连接通道,
 * sdk内部线程上报ChannelCloseed事件
 * @param cbEvent 回调事件结构, 详见nlsEvent.h
 * @param cbParam 回调自定义参数,默认为NULL, 可以根据需求自定义参数
 * @return
 */
static void OnSynthesisChannelClosed(NlsEvent *cbEvent, void *cbParam) 
{
	ParamCallBack *tmpParam = static_cast<ParamCallBack *>(cbParam);
	if (tmpParam) {
		apt_log(SYNTH_LOG_MARK,APT_PRIO_DEBUG,"OnSynthesisChannelClosed: %s",cbEvent->getAllResponse());
		//通知发送线程, 最终识别结果已经返回, 可以调用stop()
		pthread_mutex_lock(&(tmpParam->mtxWord));
		pthread_cond_signal(&(tmpParam->cvWord));
		pthread_mutex_unlock(&(tmpParam->mtxWord));
    }
}

/**
 * @brief 文本上报服务端之后, 收到服务端返回的二进制音频数据,
 * SDK内部线程通过BinaryDataRecved事件上报给用户
 * @param cbEvent 回调事件结构, 详见nlsEvent.h
 * @param cbParam 回调自定义参数,默认为NULL, 可以根据需求自定义参数
 * @return
 * @notice 此处切记不可做block操作,只可做音频数据转存. 若在此回调中做过多操作,
 *         会阻塞后续的数据回调和completed事件回调.
 */
static void OnBinaryDataRecved(NlsEvent *cbEvent, void *cbParam) 
{
	ParamCallBack *tmpParam = static_cast<ParamCallBack *>(cbParam);
	// apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"OnBinaryDataRecved: %s",tmpParam->binAudioFile.c_str());

	// getBinaryData() 获取文本合成的二进制音频数据
	std::vector<unsigned char> data = cbEvent->getBinaryData();

	apt_log(SYNTH_LOG_MARK,APT_PRIO_DEBUG,"OnBinaryDataRecved: status code=%d, task id=%s, data size=%d",cbEvent->getStatusCode(),cbEvent->getTaskId(),data.size());
	if (cbParam && tmpParam->tParam && tmpParam->tParam->audio_buffer) {
		mpf_buffer_audio_write(tmpParam->tParam->audio_buffer, &data[0], data.size());
	}
	if (cbParam && tmpParam->tParam && g_save_audio && data.size() > 0) {
		// 以追加形式将二进制音频数据写入文件
		char var_tts_audio_dir[256] = {0};
		snprintf(var_tts_audio_dir, 256, "%s/tts_audio", tmpParam->tParam->var_file_path);
		if (access(var_tts_audio_dir, 0) == -1) {
			mkdir(var_tts_audio_dir, S_IRWXU);
		}
		char file_name[256] = {0};
		snprintf(file_name, 256, "%s/%s.%s", var_tts_audio_dir, cbEvent->getTaskId(), g_format.c_str());
		FILE *tts_stream = fopen(file_name, "a+");
		if (tts_stream) {
			fwrite((char *) &data[0], data.size(), 1, tts_stream);
			fclose(tts_stream);
		}
	}

}

/**
 * @brief 返回 tts 文本对应的日志信息,增量返回对应的字幕信息
 * @param cbEvent 回调事件结构, 详见nlsEvent.h
 * @param cbParam 回调自定义参数,默认为NULL, 可以根据需求自定义参数
 * @return
 */
static void OnMetaInfo(NlsEvent *cbEvent, void *cbParam) 
{
	ParamCallBack *tmpParam = (ParamCallBack *) cbParam;
	apt_log(SYNTH_LOG_MARK,APT_PRIO_DEBUG,"OnMetaInfo: task id=%s, respose=%s",cbEvent->getTaskId(),cbEvent->getAllResponse());
}

/**
 * @brief 服务端返回的所有信息会通过此回调反馈,
 * @param cbEvent 回调事件结构, 详见nlsEvent.h
 * @param cbParam 回调自定义参数,默认为NULL, 可以根据需求自定义参数
 * @return
 */
static void onMessage(NlsEvent *cbEvent, void *cbParam) 
{
	apt_log(SYNTH_LOG_MARK,APT_PRIO_DEBUG,"onMessage: All response:%s",cbEvent->getAllResponse());
}

/**
 * @brief 短链接模式下工作线程
 *        以 createSynthesizerRequest          <----|
 *                   |                              |
 *           request->start()                       |
 *                   |                              |
 *           收到OnSynthesisChannelClosed回调       |
 *                   |                              |
 *           releaseSynthesizerRequest(request) ----|
 *        进行循环。
 */
static void *pthreadFunc(void *arg) 
{
	// 从自定义线程参数中获取token, 配置文件等参数.
	ParamStruct *tst = static_cast<ParamStruct *>(arg);
	if (tst == NULL) {
		apt_log(SYNTH_LOG_MARK,APT_PRIO_ERROR,"arg is not valid.");
		return NULL;
	}

	// 初始化自定义回调参数
	ParamCallBack cbParam(tst);

	/*
	 * 1. 创建语音识别SpeechSynthesizerRequest对象.
	 *
	 * 默认为实时短文本语音合成请求, 支持一次性合成300字符以内的文字,
	 * 其中1个汉字、1个英文字母或1个标点均算作1个字符,
	 * 超过300个字符的内容将会报错(或者截断).
	 * 一次性合成超过300字符可考虑长文本语音合成功能.
	 *
	 * 实时短文本语音合成文档详见:
	 * https://help.aliyun.com/document_detail/84435.html
	 * 长文本语音合成文档详见:
	 * https://help.aliyun.com/document_detail/130509.html
	 */
	int chars_cnt = NlsClient::getInstance()->calculateUtf8Chars(tst->text);
	apt_log(SYNTH_LOG_MARK,APT_PRIO_DEBUG,"starting synthesizer this text contains %d chars. pid %ld",chars_cnt,pthread_self());
	apt_log(SYNTH_LOG_MARK,APT_PRIO_DEBUG,"Voice Name : %s",tst->synth_header->voice_param.name.buf);

	SpeechSynthesizerRequest *request = NULL;
	if (chars_cnt > 300) {
		// 长文本语音合成
		request = NlsClient::getInstance()->createSynthesizerRequest(LongTts);
	} else {
		// 短文本语音合成
		request = NlsClient::getInstance()->createSynthesizerRequest();
	}
	if (request == NULL) {
		apt_log(SYNTH_LOG_MARK,APT_PRIO_ERROR,"createSynthesizerRequest failed.");
		return NULL;
	}

	/*
	 * 2. 设置用于接收结果的回调
	 */
	// 设置音频合成可以开始的回调函数
	request->setOnSynthesisStarted(OnSynthesisStarted, &cbParam);
	// 设置音频合成结束回调函数
	request->setOnSynthesisCompleted(OnSynthesisCompleted, &cbParam);
	// 设置音频合成通道关闭回调函数
	request->setOnChannelClosed(OnSynthesisChannelClosed, &cbParam);
	// 设置异常失败回调函数
	request->setOnTaskFailed(OnSynthesisTaskFailed, &cbParam);
	// 设置文本音频数据接收回调函数
	request->setOnBinaryDataReceived(OnBinaryDataRecved, &cbParam);
	// 设置字幕信息
	request->setOnMetaInfo(OnMetaInfo, &cbParam);
	// 设置所有服务端返回信息回调函数
	// request->setOnMessage(onMessage, &cbParam);
	// 开启所有服务端返回信息回调函数, 其他回调(除了OnBinaryDataRecved)失效
	// request->setEnableOnMessage(true);

	/*
	 * 3. 设置request的相关参数
	 */
	// 设置待合成文本, 必填参数. 文本内容必须为UTF-8编码
	// 一次性合成超过300字符可考虑长文本语音合成功能.
	// 长文本语音合成文档详见:
	// https://help.aliyun.com/document_detail/130509.html
	request->setText(tst->text);
	// 发音人, 包含"xiaoyun", "ruoxi", "xiaogang"等. 可选参数, 默认是xiaoyun
	if (strlen(tst->synth_header->voice_param.name.buf) > 0) {
		request->setVoice(tst->synth_header->voice_param.name.buf);
	} else {
		request->setVoice(g_voice.c_str());
	}

	// 访问个性化音色,访问的Voice必须是个人定制音色
	// request->setPayloadParam("{\"enable_ptts\":true}");
	// 音量, 范围是0~100, 可选参数, 默认50
	request->setVolume(g_volume);
	// 语速, 范围是-500~500, 可选参数, 默认是0
	request->setSpeechRate(g_speech_rate);
	// 语调, 范围是-500~500, 可选参数, 默认是0
	request->setPitchRate(g_pitch_rate);
	// 音频编码格式, 可选参数, 默认是wav. 支持的格式pcm, wav, mp3
	request->setFormat(g_format.c_str());
	// 音频采样率, 包含8000, 16000. 可选参数, 默认是16000
	request->setSampleRate(g_sample_rate);
	// 开启字幕
	request->setEnableSubtitle(false);

	// 设置AppKey, 必填参数, 请参照官网申请
	if (strlen(tst->appKey) > 0) {
		request->setAppKey(tst->appKey);
	}
	// 设置账号校验token, 必填参数
	if (strlen(tst->token) > 0) {
		request->setToken(tst->token);
	}

	if (strlen(tst->url) > 0) {
		request->setUrl(tst->url);
	}
	// 设置链接超时500ms
	request->setTimeout(500);
	// 获取返回文本的编码格式
	const char *output_format = request->getOutputFormat();
	apt_log(SYNTH_LOG_MARK,APT_PRIO_DEBUG,"text format: %s",output_format);

	/*
	 * 4. start()为异步操作。成功则开始返回BinaryRecv事件。失败返回TaskFailed事件。
	 */
	int ret = request->start();
	if (ret < 0) {
		apt_log(SYNTH_LOG_MARK,APT_PRIO_WARNING,"start() failed. may be can not connect server. please check network or firewalld");
		// start()失败,释放request对象
		NlsClient::getInstance()->releaseSynthesizerRequest(request);
		return NULL;
	}

	/*
	 * 5. start成功,开始等待接收完所有合成数据。
	 *    stop()为无意义接口,调用与否都会跑完全程.
	 *    cancel()立即停止工作, 且不会有回调返回, 失败返回TaskFailed事件。
	 */
	//    ret = request->cancel();
	//    ret = request->stop();  // always return 0

	/*
	 * 开始等待接收完所有合成数据。
	 */
	struct timeval now;
	struct timespec outtime;
	if (ret == 0) {
		apt_log(SYNTH_LOG_MARK,APT_PRIO_DEBUG,"wait closed callback.");
		// 语音服务器存在来不及处理当前请求, 10s内不返回任何回调的问题,
		// 然后在10s后返回一个TaskFailed回调, 所以需要设置一个超时机制.
		gettimeofday(&now, NULL);

		/*
		 * 根据文本长短、接收速度和网络环境,接收完所有合成数据的时间无法确定,
		 * 这里设定30s超时只是展示一种超时机制。
		 */
		outtime.tv_sec = now.tv_sec + 30;
		outtime.tv_nsec = now.tv_usec * 1000;
		// 等待closed事件后再进行释放, 否则会出现崩溃
		pthread_mutex_lock(&(cbParam.mtxWord));
		if (ETIMEDOUT == pthread_cond_timedwait(&(cbParam.cvWord), &(cbParam.mtxWord), &outtime)) {
			apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"synthesis timeout.");
		}
		pthread_mutex_unlock(&(cbParam.mtxWord));
	} else {
		apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"ret is %d, pid %ld",ret,pthread_self());
	}

	/*
	 * 6. 完成所有工作后释放当前请求。
	 *    请在closed事件(确定完成所有工作)后再释放, 否则容易破坏内部状态机,
	 *    会强制卸载正在运行的请求。
	 */
	NlsClient::getInstance()->releaseSynthesizerRequest(request);
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"release Synthesizer success. pid %ld",pthread_self());
	return NULL;
}

/** Create alimrcp synthesizer engine */
MRCP_PLUGIN_DECLARE(mrcp_engine_t*) mrcp_plugin_create(apr_pool_t *pool)
{
	/* create alimrcp engine */
	alimrcp_synth_engine_t *alimrcp_engine = static_cast<alimrcp_synth_engine_t *>(apr_palloc(pool,sizeof(alimrcp_synth_engine_t)));
	apt_task_t *task;
	apt_task_vtable_t *vtable;
	apt_task_msg_pool_t *msg_pool;

	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO, "mrcp_plugin_create");
	/* create task/thread to run alimrcp engine in the context of this task */
	msg_pool = apt_task_msg_pool_create_dynamic(sizeof(alimrcp_synth_msg_t),pool);
	alimrcp_engine->task = apt_consumer_task_create(alimrcp_engine,msg_pool,pool);
	if(!alimrcp_engine->task) {
		return NULL;
	}
	task = apt_consumer_task_base_get(alimrcp_engine->task);
	apt_task_name_set(task,SYNTH_ENGINE_TASK_NAME);
	vtable = apt_task_vtable_get(task);
	if(vtable) {
		vtable->process_msg = alimrcp_synth_msg_process;
	}

	/* create engine base */
	return mrcp_engine_create(
				MRCP_SYNTHESIZER_RESOURCE, /* MRCP resource identifier */
				alimrcp_engine,               /* object to associate */
				&engine_vtable,            /* virtual methods table of engine */
				pool);                     /* pool to allocate memory from */
}

/** Destroy synthesizer engine */
static apt_bool_t alimrcp_synth_engine_destroy(mrcp_engine_t *engine)
{
	alimrcp_synth_engine_t *alimrcp_engine = static_cast<alimrcp_synth_engine_t *>(engine->obj);
	if(alimrcp_engine->task) {
		apt_task_t *task = apt_consumer_task_base_get(alimrcp_engine->task);
		apt_task_destroy(task);
		alimrcp_engine->task = NULL;
	}
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_engine_destroy");
	NlsClient::releaseInstance();
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_engine_close release instance");
	return TRUE;
}

/** Open synthesizer engine */
static apt_bool_t alimrcp_synth_engine_open(mrcp_engine_t *engine)
{
	alimrcp_synth_engine_t *alimrcp_engine = static_cast<alimrcp_synth_engine_t *>(engine->obj);
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_engine_open");
	const char *appKey = mrcp_engine_param_get(engine, "appKey");
	if (appKey != NULL) {
		g_appKey = appKey;
	}
	const char *akId = mrcp_engine_param_get(engine, "akId");
	if (akId != NULL) {
		g_akId = akId;
	}
	const char *akSecret = mrcp_engine_param_get(engine, "akSecret");
	if (akSecret != NULL) {
		g_akSecret = akSecret;
	}
	const char *sdkLogLevel = mrcp_engine_param_get(engine, "sdk-log-level");
	if (sdkLogLevel != NULL) {
		g_sdkLogLevel = atoi(sdkLogLevel);
	}
	const char *saveAudio = mrcp_engine_param_get(engine, "save-audio");
	if (saveAudio != NULL) {
		g_save_audio = atoi(saveAudio) == 1;
	}
	const char *encodingGB2312 = mrcp_engine_param_get(engine, "text-encoding-gb2312");
	if (encodingGB2312 != NULL) {
		g_encodingUtf8 = atoi(encodingGB2312) == 0;
	}
	const char *format = mrcp_engine_param_get(engine, "format");
	if (format != NULL) {
		g_format = format;
	}
	const char *voice = mrcp_engine_param_get(engine, "voice");
	if (voice != NULL) {
		g_voice = voice;
	}
	const char *sampleRate = mrcp_engine_param_get(engine, "sample_rate");
	if (sampleRate != NULL) {
		g_sample_rate = atoi(sampleRate);
	}
	const char *volume = mrcp_engine_param_get(engine, "volume");
	if (volume != NULL) {
		g_volume = atoi(volume);
	}
	const char *speechRate = mrcp_engine_param_get(engine, "speech_rate");
	if (speechRate != NULL) {
		g_speech_rate = atoi(speechRate);
	}
	const char *pitchRate = mrcp_engine_param_get(engine, "pitch_rate");
	if (pitchRate != NULL) {
		g_pitch_rate = atoi(pitchRate);
	}

	if(alimrcp_engine->task) {
		apt_task_t *task = apt_consumer_task_base_get(alimrcp_engine->task);
		apt_task_start(task);
	}
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"max_channel_count : %d, appId : %s, akId : %s, akSecret : ******", engine->config->max_channel_count, g_appKey.c_str(), g_akId.c_str());
	// 启动工作线程, 在创建请求和启动前必须调用此函数, 可理解为对NlsClient的初始化
	// 入参为负时, 启动当前系统中可用的核数。
	// 200并发以下推荐入参为1, 更高并发入参推荐可看readme。
	NlsClient::getInstance()->startWorkThread(1);
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"Nls Client Version : %s", NlsClient::getInstance()->getVersion());

	return mrcp_engine_open_respond(engine,TRUE);
}

/** Close synthesizer engine */
static apt_bool_t alimrcp_synth_engine_close(mrcp_engine_t *engine)
{
	alimrcp_synth_engine_t *alimrcp_engine = static_cast<alimrcp_synth_engine_t *>(engine->obj);
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_engine_close");
	if(alimrcp_engine->task) {
		apt_task_t *task = apt_consumer_task_base_get(alimrcp_engine->task);
		apt_task_terminate(task,TRUE);
	}
	return mrcp_engine_close_respond(engine);
}

/** Create alimrcp synthesizer channel derived from engine channel base */
static mrcp_engine_channel_t* alimrcp_synth_engine_channel_create(mrcp_engine_t *engine, apr_pool_t *pool)
{
	mpf_stream_capabilities_t *capabilities;
	mpf_termination_t *termination; 

	/* create alimrcp synth channel */
	alimrcp_synth_channel_t *synth_channel = static_cast<alimrcp_synth_channel_t *>(apr_palloc(pool,sizeof(alimrcp_synth_channel_t)));
	synth_channel->alimrcp_engine = static_cast<alimrcp_synth_engine_t *>(engine->obj);
	synth_channel->speak_request = NULL;
	synth_channel->stop_response = NULL;
	synth_channel->time_to_complete = 0;
	synth_channel->paused = FALSE;
	synth_channel->audio_file = NULL;
	synth_channel->audio_buffer = NULL;
	
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_engine_channel_create");
	capabilities = mpf_source_stream_capabilities_create(pool);
	mpf_codec_capabilities_add(
			&capabilities->codecs,
			MPF_SAMPLE_RATE_8000 | MPF_SAMPLE_RATE_16000,
			"LPCM");

	/* create media termination */
	termination = mrcp_engine_audio_termination_create(
			synth_channel,        /* object to associate */
			&audio_stream_vtable, /* virtual methods table of audio stream */
			capabilities,         /* stream capabilities */
			pool);                /* pool to allocate memory from */

	/* create engine channel base */
	synth_channel->channel = mrcp_engine_channel_create(
			engine,               /* engine */
			&channel_vtable,      /* virtual methods table of engine channel */
			synth_channel,        /* object to associate */
			termination,          /* associated media termination */
			pool);                /* pool to allocate memory from */

	synth_channel->audio_buffer = mpf_buffer_create(pool);
	return synth_channel->channel;
}

/** Destroy engine channel */
static apt_bool_t alimrcp_synth_channel_destroy(mrcp_engine_channel_t *channel)
{
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_channel_destroy");
	/* nothing to destroy */
	return TRUE;
}

/** Open engine channel (asynchronous response MUST be sent)*/
static apt_bool_t alimrcp_synth_channel_open(mrcp_engine_channel_t *channel)
{
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_channel_open");
	return alimrcp_synth_msg_signal(ALIMRCP_SYNTH_MSG_OPEN_CHANNEL,channel,NULL);
}

/** Close engine channel (asynchronous response MUST be sent)*/
static apt_bool_t alimrcp_synth_channel_close(mrcp_engine_channel_t *channel)
{
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_channel_close");
	return alimrcp_synth_msg_signal(ALIMRCP_SYNTH_MSG_CLOSE_CHANNEL,channel,NULL);
}

/** Process MRCP channel request (asynchronous response MUST be sent)*/
static apt_bool_t alimrcp_synth_channel_request_process(mrcp_engine_channel_t *channel, mrcp_message_t *request)
{
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_channel_request_process");
	return alimrcp_synth_msg_signal(ALIMRCP_SYNTH_MSG_REQUEST_PROCESS,channel,request);
}

/** Process SPEAK request */
static apt_bool_t alimrcp_synth_channel_speak(mrcp_engine_channel_t *channel, mrcp_message_t *request, mrcp_message_t *response)
{
	char *file_path = NULL;
	alimrcp_synth_channel_t *synth_channel = static_cast<alimrcp_synth_channel_t *>(channel->method_obj);
	const mpf_codec_descriptor_t *descriptor = mrcp_engine_source_stream_codec_get(channel);

	apt_log(SYNTH_LOG_MARK,APT_PRIO_DEBUG,"alimrcp_synth_channel_speak");

	if(!descriptor) {
		apt_log(SYNTH_LOG_MARK,APT_PRIO_WARNING,"Failed to Get Codec Descriptor " APT_SIDRES_FMT, MRCP_MESSAGE_SIDRES(request));
		response->start_line.status_code = MRCP_STATUS_CODE_METHOD_FAILED;
		return FALSE;
	}
	
	if(!request->body.length) {
		synth_channel->speak_request = NULL;
		response->start_line.status_code = MRCP_STATUS_CODE_MISSING_PARAM;
		return FALSE;
	}

	//获取当前系统时间戳,判断token是否过期。
	std::time_t curTime = std::time(0);
	if (g_expireTime - curTime < 10) {
		apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"The token will be expired, please generate new token by AccessKey-ID and AccessKey-Secret.");
		if (generateToken(g_akId, g_akSecret, &g_token, &g_expireTime) < 0) {
			apt_log(SYNTH_LOG_MARK,APT_PRIO_ERROR,"Failed to Generate New Token." APT_SIDRES_FMT, MRCP_MESSAGE_SIDRES(request));
			response->start_line.status_code = MRCP_STATUS_CODE_RESOURCE_SPECIFIC_FAILURE;
			return FALSE;
		}
	}

	/* 不要超过AUDIO_TEXT_LENGTH */
	// const char texts[AUDIO_TEXT_LENGTH] = {"今日天气真不错,我想去操场踢足球."};

	ParamStruct pa;

	memset(pa.token, 0, DEFAULT_STRING_LEN);
	memcpy(pa.token, g_token.c_str(), g_token.length());

	memset(pa.appKey, 0, DEFAULT_STRING_LEN);
	memcpy(pa.appKey, g_appKey.c_str(), g_appKey.length());

	/* 不要超过AUDIO_TEXT_LENGTH */
	memset(pa.text, 0, AUDIO_TEXT_LENGTH);
	// memcpy(pa.text, texts, strlen(texts));
	memcpy(pa.text, request->body.buf, strlen(request->body.buf));

	memset(pa.url, 0, DEFAULT_STRING_LEN);
	if (!g_url.empty()) {
		memcpy(pa.url, g_url.c_str(), g_url.length());
	}

	if(g_save_audio && channel->engine) {
		memset(pa.var_file_path, 0, DEFAULT_STRING_LEN);
		char *var_file_path = apt_vardir_filepath_get(channel->engine->dir_layout,"",channel->pool);
		memcpy(pa.var_file_path, var_file_path, strlen(var_file_path));
        }

	// 设置回话 ID
	memset(pa.session_id, 0, DEFAULT_STRING_LEN);
	memcpy(pa.session_id, request->channel_id.session_id.buf, strlen(request->channel_id.session_id.buf));

	pa.audio_buffer = synth_channel->audio_buffer;
	/* get synthesizer header */
	pa.synth_header = static_cast<mrcp_synth_header_t *>(mrcp_resource_header_get(request));
	
	pthread_t pthreadId;
	// 启动工作线程
	pthread_create(&pthreadId, NULL, &pthreadFunc, (void *) &(pa));
	pthread_join(pthreadId, NULL);


	synth_channel->time_to_complete = 0;
	if(synth_channel->audio_buffer == NULL && channel->engine) {
		char *file_name = apr_psprintf(channel->pool,"alimrcp-%dkHz.pcm",descriptor->sampling_rate/1000);
		file_path = apt_datadir_filepath_get(channel->engine->dir_layout,file_name,channel->pool);
	}

	if(file_path) {
		synth_channel->audio_file = fopen(file_path,"rb");
		if(synth_channel->audio_file) {
			apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"Set [%s] as Speech Source " APT_SIDRES_FMT,file_path,MRCP_MESSAGE_SIDRES(request));
		} else {
			apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"No Speech Source [%s] Found " APT_SIDRES_FMT,file_path,MRCP_MESSAGE_SIDRES(request));
			/* calculate estimated time to complete */
			if(mrcp_generic_header_property_check(request,GENERIC_HEADER_CONTENT_LENGTH) == TRUE) {
				mrcp_generic_header_t *generic_header = mrcp_generic_header_get(request);
				if(generic_header) {
					synth_channel->time_to_complete = generic_header->content_length * 10; /* 10 msec per character */
				}
			}
		}
	}

	response->start_line.request_state = MRCP_REQUEST_STATE_INPROGRESS;
	/* send asynchronous response */
	mrcp_engine_channel_message_send(channel,response);
	synth_channel->speak_request = request;
	
	return TRUE;
}

/** Process STOP request */
static apt_bool_t alimrcp_synth_channel_stop(mrcp_engine_channel_t *channel, mrcp_message_t *request, mrcp_message_t *response)
{
	alimrcp_synth_channel_t *synth_channel = static_cast<alimrcp_synth_channel_t *>(channel->method_obj);
	/* store the request, make sure there is no more activity and only then send the response */
	synth_channel->stop_response = response;
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_channel_stop");
	return TRUE;
}

/** Process PAUSE request */
static apt_bool_t alimrcp_synth_channel_pause(mrcp_engine_channel_t *channel, mrcp_message_t *request, mrcp_message_t *response)
{
	alimrcp_synth_channel_t *synth_channel = static_cast<alimrcp_synth_channel_t *>(channel->method_obj);
	synth_channel->paused = TRUE;
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_channel_pause");
	/* send asynchronous response */
	mrcp_engine_channel_message_send(channel,response);
	return TRUE;
}

/** Process RESUME request */
static apt_bool_t alimrcp_synth_channel_resume(mrcp_engine_channel_t *channel, mrcp_message_t *request, mrcp_message_t *response)
{
	alimrcp_synth_channel_t *synth_channel = static_cast<alimrcp_synth_channel_t *>(channel->method_obj);
	synth_channel->paused = FALSE;
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_channel_resume");
	/* send asynchronous response */
	mrcp_engine_channel_message_send(channel,response);
	return TRUE;
}

/** Process SET-PARAMS request */
static apt_bool_t alimrcp_synth_channel_set_params(mrcp_engine_channel_t *channel, mrcp_message_t *request, mrcp_message_t *response)
{
	mrcp_synth_header_t *req_synth_header;
	/* get synthesizer header */
	req_synth_header = static_cast<mrcp_synth_header_t *>(mrcp_resource_header_get(request));
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_channel_set_params");
	if(req_synth_header) {
		/* check voice age header */
		if(mrcp_resource_header_property_check(request,SYNTHESIZER_HEADER_VOICE_AGE) == TRUE) {
			apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"Set Voice Age [%" APR_SIZE_T_FMT "]",
				req_synth_header->voice_param.age);
		}
		/* check voice name header */
		if(mrcp_resource_header_property_check(request,SYNTHESIZER_HEADER_VOICE_NAME) == TRUE) {
			apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"Set Voice Name [%s]",
				req_synth_header->voice_param.name.buf);
		}
	}
	
	/* send asynchronous response */
	mrcp_engine_channel_message_send(channel,response);
	return TRUE;
}

/** Process GET-PARAMS request */
static apt_bool_t alimrcp_synth_channel_get_params(mrcp_engine_channel_t *channel, mrcp_message_t *request, mrcp_message_t *response)
{
	mrcp_synth_header_t *req_synth_header;
	/* get synthesizer header */
	req_synth_header = static_cast<mrcp_synth_header_t *>(mrcp_resource_header_get(request));
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_channel_get_params");
	if(req_synth_header) {
		mrcp_synth_header_t *res_synth_header = static_cast<mrcp_synth_header_t *>(mrcp_resource_header_prepare(response));
		/* check voice age header */
		if(mrcp_resource_header_property_check(request,SYNTHESIZER_HEADER_VOICE_AGE) == TRUE) {
			res_synth_header->voice_param.age = 25;
			mrcp_resource_header_property_add(response,SYNTHESIZER_HEADER_VOICE_AGE);
		}
		/* check voice name header */
		if(mrcp_resource_header_property_check(request,SYNTHESIZER_HEADER_VOICE_NAME) == TRUE) {
			apt_string_set(&res_synth_header->voice_param.name,"David");
			mrcp_resource_header_property_add(response,SYNTHESIZER_HEADER_VOICE_NAME);
		}
	}

	/* send asynchronous response */
	mrcp_engine_channel_message_send(channel,response);
	return TRUE;
}

/** Dispatch MRCP request */
static apt_bool_t alimrcp_synth_channel_request_dispatch(mrcp_engine_channel_t *channel, mrcp_message_t *request)
{
	apt_bool_t processed = FALSE;
	mrcp_message_t *response = mrcp_response_create(request,request->pool);
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_channel_request_dispatch");
	switch(request->start_line.method_id) {
		case SYNTHESIZER_SET_PARAMS:
			processed = alimrcp_synth_channel_set_params(channel,request,response);
			break;
		case SYNTHESIZER_GET_PARAMS:
			processed = alimrcp_synth_channel_get_params(channel,request,response);
			break;
		case SYNTHESIZER_SPEAK:
			processed = alimrcp_synth_channel_speak(channel,request,response);
			break;
		case SYNTHESIZER_STOP:
			processed = alimrcp_synth_channel_stop(channel,request,response);
			break;
		case SYNTHESIZER_PAUSE:
			processed = alimrcp_synth_channel_pause(channel,request,response);
			break;
		case SYNTHESIZER_RESUME:
			processed = alimrcp_synth_channel_resume(channel,request,response);
			break;
		case SYNTHESIZER_BARGE_IN_OCCURRED:
			processed = alimrcp_synth_channel_stop(channel,request,response);
			break;
		case SYNTHESIZER_CONTROL:
			break;
		case SYNTHESIZER_DEFINE_LEXICON:
			break;
		default:
			break;
	}
	if(processed == FALSE) {
		/* send asynchronous response for not handled request */
		mrcp_engine_channel_message_send(channel,response);
	}
	return TRUE;
}

/** Callback is called from MPF engine context to destroy any additional data associated with audio stream */
static apt_bool_t alimrcp_synth_stream_destroy(mpf_audio_stream_t *stream)
{
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_stream_destroy");
	return TRUE;
}

/** Callback is called from MPF engine context to perform any action before open */
static apt_bool_t alimrcp_synth_stream_open(mpf_audio_stream_t *stream, mpf_codec_t *codec)
{
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_stream_open");
	return TRUE;
}

/** Callback is called from MPF engine context to perform any action after close */
static apt_bool_t alimrcp_synth_stream_close(mpf_audio_stream_t *stream)
{
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_stream_close");
	return TRUE;
}

/** Callback is called from MPF engine context to read/get new frame */
static apt_bool_t alimrcp_synth_stream_read(mpf_audio_stream_t *stream, mpf_frame_t *frame)
{
	alimrcp_synth_channel_t *synth_channel = static_cast<alimrcp_synth_channel_t *>(stream->obj);
	// apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_stream_read");
	/* check if STOP was requested */
	if(synth_channel->stop_response) {
		/* send asynchronous response to STOP request */
		mrcp_engine_channel_message_send(synth_channel->channel,synth_channel->stop_response);
		synth_channel->stop_response = NULL;
		synth_channel->speak_request = NULL;
		synth_channel->paused = FALSE;
		if(synth_channel->audio_buffer) {
			// TODO: close
			synth_channel->audio_buffer = NULL;
		}
		if(synth_channel->audio_file) {
			fclose(synth_channel->audio_file);
			synth_channel->audio_file = NULL;
		}
		return TRUE;
	}

	/* check if there is active SPEAK request and it isn't in paused state */
	if(synth_channel->speak_request && synth_channel->paused == FALSE) {
		/* normal processing */
		apt_bool_t completed = FALSE;
	
		// apt_log(APT_LOG_MARK, APT_PRIO_INFO, "alimrcp_synth_stream_read synth_channel->speak_request");
		if(synth_channel->audio_buffer) {
			// apt_log(APT_LOG_MARK, APT_PRIO_INFO, "alimrcp_synth_stream_read synth_channel->audio_buffer to frame");
			/* read speech from buffer */
			mpf_buffer_frame_read(synth_channel->audio_buffer,frame);
			if((frame->type & MEDIA_FRAME_TYPE_EVENT) == MEDIA_FRAME_TYPE_EVENT) {
				frame->type &= ~MEDIA_FRAME_TYPE_EVENT;
				completed = TRUE;
			}
		} else if(synth_channel->audio_file) {
			/* read speech from file */
			apr_size_t size = frame->codec_frame.size;
			if(fread(frame->codec_frame.buffer,1,size,synth_channel->audio_file) == size) {
				frame->type |= MEDIA_FRAME_TYPE_AUDIO;
			} else {
				completed = TRUE;
			}
		} else {
			/* fill with silence in case no file available */
			if(synth_channel->time_to_complete >= stream->rx_descriptor->frame_duration) {
				memset(frame->codec_frame.buffer,0,frame->codec_frame.size);
				frame->type |= MEDIA_FRAME_TYPE_AUDIO;
				synth_channel->time_to_complete -= stream->rx_descriptor->frame_duration;
			} else {
				completed = TRUE;
			}
		}

		if(completed) {
			/* raise SPEAK-COMPLETE event */
			mrcp_message_t *message = mrcp_event_create(
								synth_channel->speak_request,
								SYNTHESIZER_SPEAK_COMPLETE,
								synth_channel->speak_request->pool);
			if(message) {
				/* get/allocate synthesizer header */
				mrcp_synth_header_t *synth_header = static_cast<mrcp_synth_header_t *>(mrcp_resource_header_prepare(message));
				if(synth_header) {
					/* set completion cause */
					synth_header->completion_cause = SYNTHESIZER_COMPLETION_CAUSE_NORMAL;
					mrcp_resource_header_property_add(message,SYNTHESIZER_HEADER_COMPLETION_CAUSE);
				}
				/* set request state */
				message->start_line.request_state = MRCP_REQUEST_STATE_COMPLETE;

				synth_channel->speak_request = NULL;
				if(synth_channel->audio_file) {
					fclose(synth_channel->audio_file);
					synth_channel->audio_file = NULL;
				}
				if(synth_channel->audio_buffer) {
					// TODO: fclose(synth_channel->audio_file);
					synth_channel->audio_buffer = NULL;
				}
				/* send asynch event */
				mrcp_engine_channel_message_send(synth_channel->channel,message);
			}
		}
	}
	return TRUE;
}

static apt_bool_t alimrcp_synth_msg_signal(alimrcp_synth_msg_type_e type, mrcp_engine_channel_t *channel, mrcp_message_t *request)
{
	apt_bool_t status = FALSE;
	alimrcp_synth_channel_t *alimrcp_channel = static_cast<alimrcp_synth_channel_t *>(channel->method_obj);
	alimrcp_synth_engine_t *alimrcp_engine = alimrcp_channel->alimrcp_engine;
	apt_task_t *task = apt_consumer_task_base_get(alimrcp_engine->task);
	apt_task_msg_t *msg = apt_task_msg_get(task);
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_msg_signal");
	if(msg) {
		alimrcp_synth_msg_t *alimrcp_msg;
		msg->type = TASK_MSG_USER;
		alimrcp_msg = (alimrcp_synth_msg_t*) msg->data;

		alimrcp_msg->type = type;
		alimrcp_msg->channel = channel;
		alimrcp_msg->request = request;
		status = apt_task_msg_signal(task,msg);
	}
	return status;
}

static apt_bool_t alimrcp_synth_msg_process(apt_task_t *task, apt_task_msg_t *msg)
{
	alimrcp_synth_msg_t *alimrcp_msg = (alimrcp_synth_msg_t*)msg->data;
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_msg_process");
	switch(alimrcp_msg->type) {
		case ALIMRCP_SYNTH_MSG_OPEN_CHANNEL:
			/* open channel and send asynch response */
			mrcp_engine_channel_open_respond(alimrcp_msg->channel,TRUE);
			break;
		case ALIMRCP_SYNTH_MSG_CLOSE_CHANNEL:
			/* close channel, make sure there is no activity and send asynch response */
			mrcp_engine_channel_close_respond(alimrcp_msg->channel);
			break;
		case ALIMRCP_SYNTH_MSG_REQUEST_PROCESS:
			alimrcp_synth_channel_request_dispatch(alimrcp_msg->channel,alimrcp_msg->request);
			break;
		default:
			break;
	}
	return TRUE;
}

@Alone749-i
Copy link

我这边可以提供 MRCP 与阿里云集成的相关 plugin,下面是一个 TTS 示例,需要阿里云 nls 的 sdk

/*
 * Copyright 2008-2015 Arsen Chaloyan
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

/* 
 * Mandatory rules concerning plugin implementation.
 * 1. Each plugin MUST implement a plugin/engine creator function
 *    with the exact signature and name (the main entry point)
 *        MRCP_PLUGIN_DECLARE(mrcp_engine_t*) mrcp_plugin_create(apr_pool_t *pool)
 * 2. Each plugin MUST declare its version number
 *        MRCP_PLUGIN_VERSION_DECLARE
 * 3. One and only one response MUST be sent back to the received request.
 * 4. Methods (callbacks) of the MRCP engine channel MUST not block.
 *   (asynchronous response can be sent from the context of other thread)
 * 5. Methods (callbacks) of the MPF engine stream MUST not block.
 */

extern "C" {
#include "mrcp_synth_engine.h"
#include "mpf_buffer.h"
#include "apt_consumer_task.h"
#include "apt_log.h"
} // extern C

#include <string.h>
#include <sys/io.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

#include <ctime>
#include <fstream>
#include <iostream>
#include <string>
#include <vector>

#include "nlsClient.h"
#include "nlsEvent.h"
#include "nlsToken.h"
#include "speechSynthesizerRequest.h"

#define DEFAULT_STRING_LEN 512
#define AUDIO_TEXT_LENGTH 1024

using namespace AlibabaNlsCommon;
using AlibabaNls::NlsClient;
using AlibabaNls::NlsEvent;
using AlibabaNls::SpeechSynthesizerRequest;
using AlibabaNls::LongTts;
using AlibabaNls::LogDebug;



#define SYNTH_ENGINE_TASK_NAME "Alimrcp Synth Engine"

typedef struct alimrcp_synth_engine_t alimrcp_synth_engine_t;
typedef struct alimrcp_synth_channel_t alimrcp_synth_channel_t;
typedef struct alimrcp_synth_msg_t alimrcp_synth_msg_t;

/** Declaration of synthesizer engine methods */
static apt_bool_t alimrcp_synth_engine_destroy(mrcp_engine_t *engine);
static apt_bool_t alimrcp_synth_engine_open(mrcp_engine_t *engine);
static apt_bool_t alimrcp_synth_engine_close(mrcp_engine_t *engine);
static mrcp_engine_channel_t* alimrcp_synth_engine_channel_create(mrcp_engine_t *engine, apr_pool_t *pool);

static const struct mrcp_engine_method_vtable_t engine_vtable = {
	alimrcp_synth_engine_destroy,
	alimrcp_synth_engine_open,
	alimrcp_synth_engine_close,
	alimrcp_synth_engine_channel_create
};


/** Declaration of synthesizer channel methods */
static apt_bool_t alimrcp_synth_channel_destroy(mrcp_engine_channel_t *channel);
static apt_bool_t alimrcp_synth_channel_open(mrcp_engine_channel_t *channel);
static apt_bool_t alimrcp_synth_channel_close(mrcp_engine_channel_t *channel);
static apt_bool_t alimrcp_synth_channel_request_process(mrcp_engine_channel_t *channel, mrcp_message_t *request);

static const struct mrcp_engine_channel_method_vtable_t channel_vtable = {
	alimrcp_synth_channel_destroy,
	alimrcp_synth_channel_open,
	alimrcp_synth_channel_close,
	alimrcp_synth_channel_request_process
};

/** Declaration of synthesizer audio stream methods */
static apt_bool_t alimrcp_synth_stream_destroy(mpf_audio_stream_t *stream);
static apt_bool_t alimrcp_synth_stream_open(mpf_audio_stream_t *stream, mpf_codec_t *codec);
static apt_bool_t alimrcp_synth_stream_close(mpf_audio_stream_t *stream);
static apt_bool_t alimrcp_synth_stream_read(mpf_audio_stream_t *stream, mpf_frame_t *frame);

static const mpf_audio_stream_vtable_t audio_stream_vtable = {
	alimrcp_synth_stream_destroy,
	alimrcp_synth_stream_open,
	alimrcp_synth_stream_close,
	alimrcp_synth_stream_read,
	NULL,
	NULL,
	NULL,
	NULL
};

/** Declaration of alimrcp synthesizer engine */
struct alimrcp_synth_engine_t {
	apt_consumer_task_t    *task;
};

/** Declaration of alimrcp synthesizer channel */
struct alimrcp_synth_channel_t {
	/** Back pointer to engine */
	alimrcp_synth_engine_t   *alimrcp_engine;
	/** Engine channel base */
	mrcp_engine_channel_t *channel;

	/** Active (in-progress) speak request */
	mrcp_message_t        *speak_request;
	/** Pending stop response */
	mrcp_message_t        *stop_response;
	/** Estimated time to complete */
	apr_size_t             time_to_complete;
	/** Is paused */
	apt_bool_t             paused;
	/** Speech source (used instead of actual synthesis) */
	FILE                  *audio_file;
	mpf_buffer_t          *audio_buffer;
};

typedef enum {
	ALIMRCP_SYNTH_MSG_OPEN_CHANNEL,
	ALIMRCP_SYNTH_MSG_CLOSE_CHANNEL,
	ALIMRCP_SYNTH_MSG_REQUEST_PROCESS
} alimrcp_synth_msg_type_e;

/** Declaration of alimrcp synthesizer task message */
struct alimrcp_synth_msg_t {
	alimrcp_synth_msg_type_e  type;
	mrcp_engine_channel_t *channel; 
	mrcp_message_t        *request;
};


static apt_bool_t alimrcp_synth_msg_signal(alimrcp_synth_msg_type_e type, mrcp_engine_channel_t *channel, mrcp_message_t *request);
static apt_bool_t alimrcp_synth_msg_process(apt_task_t *task, apt_task_msg_t *msg);

/** Declare this macro to set plugin version */
MRCP_PLUGIN_VERSION_DECLARE

/**
 * Declare this macro to use log routine of the server, plugin is loaded from.
 * Enable/add the corresponding entry in logger.xml to set a cutsom log source priority.
 *    <source name="SYNTH-PLUGIN" priority="DEBUG" masking="NONE"/>
 */
MRCP_PLUGIN_LOG_SOURCE_IMPLEMENT(SYNTH_PLUGIN,"SYNTH-PLUGIN")

/** Use custom log source mark */
#define SYNTH_LOG_MARK   APT_LOG_MARK_DECLARE(SYNTH_PLUGIN)


/**
 * 全局维护一个服务鉴权token和其对应的有效期时间戳,
 * 每次调用服务之前,首先判断token是否已经过期,
 * 如果已经过期,则根据AccessKey ID和AccessKey
 * Secret重新生成一个token,并更新这个全局的token和其有效期时间戳。
 *
 * 注意:不要每次调用服务之前都重新生成新token,只需在token即将过期时重新生成即可。所有的服务并发可共用一个token。
 */
// 自定义线程参数
struct ParamStruct {
    char text[AUDIO_TEXT_LENGTH];
    char token[DEFAULT_STRING_LEN];
    char appKey[DEFAULT_STRING_LEN];
    char url[DEFAULT_STRING_LEN];
    char var_file_path[DEFAULT_STRING_LEN];
    char session_id[DEFAULT_STRING_LEN];
    mpf_buffer_t *audio_buffer;
    mrcp_synth_header_t *synth_header;
};


// 自定义事件回调参数
struct ParamCallBack {
public:
    explicit ParamCallBack(ParamStruct *param) {
        tParam = param;
        pthread_mutex_init(&mtxWord, NULL);
        pthread_cond_init(&cvWord, NULL);
    };

    ~ParamCallBack() {
        tParam = NULL;
        pthread_mutex_destroy(&mtxWord);
        pthread_cond_destroy(&cvWord);
    };

    pthread_mutex_t mtxWord;
    pthread_cond_t cvWord;

    ParamStruct *tParam;
};

std::string g_appKey = "";
std::string g_akId = "";
std::string g_akSecret = "";
std::string g_token = "";
std::string g_url = "";
std::string g_voice = "xiaoyun";
std::string g_format = "pcm";
int g_sdkLogLevel = 1;
bool g_save_audio = true;
bool g_encodingUtf8 = true;
int g_volume = 50;
int g_speech_rate = 0;
int g_pitch_rate = 0;
int g_sample_rate = 8000;

int g_cpu = 1;
long g_expireTime = -1;

static bool sysAddrinfo = false;

/**
 * 根据AccessKey ID和AccessKey Secret重新生成一个token,并获取其有效期时间戳
 */
static int generateToken(std::string akId, std::string akSecret, std::string* token, long* expireTime)
{
	NlsToken nlsTokenRequest;
	nlsTokenRequest.setAccessKeyId(akId);
	nlsTokenRequest.setKeySecret(akSecret);

	int ret = nlsTokenRequest.applyNlsToken();
	if (ret < 0) {
		// 获取失败原因。
		apt_log(SYNTH_LOG_MARK,APT_PRIO_ERROR,"generateToken Failed, error code:%d msg:%s",ret,nlsTokenRequest.getErrorMsg());
		return ret;
	}
	*token = nlsTokenRequest.getToken();
	*expireTime = nlsTokenRequest.getExpireTime();
	return 0;
}

static void OnSynthesisStarted(NlsEvent *cbEvent, void *cbParam) 
{
	ParamCallBack *tmpParam = static_cast<ParamCallBack *>(cbParam);
	if (cbParam && tmpParam->tParam) {
		// 获取消息的状态码,成功为0或者20000000,失败时对应失败的错误码。
		// 当前任务的task id,方便定位问题,作为和服务端交互的唯一标识建议输出。
		apt_log(SYNTH_LOG_MARK,APT_PRIO_DEBUG,"OnSynthesisStarted: status code=%d, session id = %s, task id=%s",cbEvent->getStatusCode(),tmpParam->tParam->session_id,cbEvent->getTaskId());
	} else {
		// 获取消息的状态码,成功为0或者20000000,失败时对应失败的错误码。
		// 当前任务的task id,方便定位问题,作为和服务端交互的唯一标识建议输出。
		apt_log(SYNTH_LOG_MARK,APT_PRIO_DEBUG,"OnSynthesisStarted: status code=%d, task id=%s",cbEvent->getStatusCode(),cbEvent->getTaskId());
	}
}

/**
 * @brief sdk在接收到云端返回合成结束消息时, sdk内部线程上报Completed事件
 * @note 上报Completed事件之后,SDK内部会关闭识别连接通道.
 * @param cbEvent 回调事件结构, 详见nlsEvent.h
 * @param cbParam 回调自定义参数,默认为NULL, 可以根据需求自定义参数
 * @return
 */
static void OnSynthesisCompleted(NlsEvent *cbEvent, void *cbParam) 
{
	ParamCallBack *tmpParam = static_cast<ParamCallBack *>(cbParam);
	if (cbParam && tmpParam->tParam && tmpParam->tParam->audio_buffer) {
                mpf_buffer_event_write(tmpParam->tParam->audio_buffer, MEDIA_FRAME_TYPE_EVENT);
		// 获取消息的状态码,成功为0或者20000000,失败时对应失败的错误码。
		// 当前任务的task id,方便定位问题,作为和服务端交互的唯一标识建议输出。
		apt_log(SYNTH_LOG_MARK,APT_PRIO_DEBUG,"OnSynthesisCompleted: status code=%d, task id=%s",cbEvent->getStatusCode(),cbEvent->getTaskId());
		// 获取服务端返回的全部信息。
		apt_log(SYNTH_LOG_MARK,APT_PRIO_DEBUG,"OnSynthesisCompleted: all response=%s",cbEvent->getAllResponse());
        }
}

/**
 * @brief 合成过程发生异常时, sdk内部线程上报TaskFailed事件
 * @note 上报TaskFailed事件之后,SDK内部会关闭识别连接通道.
 * @param cbEvent 回调事件结构, 详见nlsEvent.h
 * @param cbParam 回调自定义参数,默认为NULL, 可以根据需求自定义参数
 * @return
 */
static void OnSynthesisTaskFailed(NlsEvent *cbEvent, void *cbParam) 
{
	ParamCallBack *tmpParam = static_cast<ParamCallBack *>(cbParam);
	if (tmpParam) {
		apt_log(SYNTH_LOG_MARK,APT_PRIO_WARNING,"OnSynthesisTaskFailed: status code=%d, task id=%s, error message=%s",cbEvent->getStatusCode(),cbEvent->getTaskId(),cbEvent->getErrorMessage());
		apt_log(SYNTH_LOG_MARK,APT_PRIO_WARNING,"OnSynthesisTaskFailed: All response:%s",cbEvent->getAllResponse());
	}
}

/**
 * @brief 识别结束或发生异常时,会关闭连接通道,
 * sdk内部线程上报ChannelCloseed事件
 * @param cbEvent 回调事件结构, 详见nlsEvent.h
 * @param cbParam 回调自定义参数,默认为NULL, 可以根据需求自定义参数
 * @return
 */
static void OnSynthesisChannelClosed(NlsEvent *cbEvent, void *cbParam) 
{
	ParamCallBack *tmpParam = static_cast<ParamCallBack *>(cbParam);
	if (tmpParam) {
		apt_log(SYNTH_LOG_MARK,APT_PRIO_DEBUG,"OnSynthesisChannelClosed: %s",cbEvent->getAllResponse());
		//通知发送线程, 最终识别结果已经返回, 可以调用stop()
		pthread_mutex_lock(&(tmpParam->mtxWord));
		pthread_cond_signal(&(tmpParam->cvWord));
		pthread_mutex_unlock(&(tmpParam->mtxWord));
    }
}

/**
 * @brief 文本上报服务端之后, 收到服务端返回的二进制音频数据,
 * SDK内部线程通过BinaryDataRecved事件上报给用户
 * @param cbEvent 回调事件结构, 详见nlsEvent.h
 * @param cbParam 回调自定义参数,默认为NULL, 可以根据需求自定义参数
 * @return
 * @notice 此处切记不可做block操作,只可做音频数据转存. 若在此回调中做过多操作,
 *         会阻塞后续的数据回调和completed事件回调.
 */
static void OnBinaryDataRecved(NlsEvent *cbEvent, void *cbParam) 
{
	ParamCallBack *tmpParam = static_cast<ParamCallBack *>(cbParam);
	// apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"OnBinaryDataRecved: %s",tmpParam->binAudioFile.c_str());

	// getBinaryData() 获取文本合成的二进制音频数据
	std::vector<unsigned char> data = cbEvent->getBinaryData();

	apt_log(SYNTH_LOG_MARK,APT_PRIO_DEBUG,"OnBinaryDataRecved: status code=%d, task id=%s, data size=%d",cbEvent->getStatusCode(),cbEvent->getTaskId(),data.size());
	if (cbParam && tmpParam->tParam && tmpParam->tParam->audio_buffer) {
		mpf_buffer_audio_write(tmpParam->tParam->audio_buffer, &data[0], data.size());
	}
	if (cbParam && tmpParam->tParam && g_save_audio && data.size() > 0) {
		// 以追加形式将二进制音频数据写入文件
		char var_tts_audio_dir[256] = {0};
		snprintf(var_tts_audio_dir, 256, "%s/tts_audio", tmpParam->tParam->var_file_path);
		if (access(var_tts_audio_dir, 0) == -1) {
			mkdir(var_tts_audio_dir, S_IRWXU);
		}
		char file_name[256] = {0};
		snprintf(file_name, 256, "%s/%s.%s", var_tts_audio_dir, cbEvent->getTaskId(), g_format.c_str());
		FILE *tts_stream = fopen(file_name, "a+");
		if (tts_stream) {
			fwrite((char *) &data[0], data.size(), 1, tts_stream);
			fclose(tts_stream);
		}
	}

}

/**
 * @brief 返回 tts 文本对应的日志信息,增量返回对应的字幕信息
 * @param cbEvent 回调事件结构, 详见nlsEvent.h
 * @param cbParam 回调自定义参数,默认为NULL, 可以根据需求自定义参数
 * @return
 */
static void OnMetaInfo(NlsEvent *cbEvent, void *cbParam) 
{
	ParamCallBack *tmpParam = (ParamCallBack *) cbParam;
	apt_log(SYNTH_LOG_MARK,APT_PRIO_DEBUG,"OnMetaInfo: task id=%s, respose=%s",cbEvent->getTaskId(),cbEvent->getAllResponse());
}

/**
 * @brief 服务端返回的所有信息会通过此回调反馈,
 * @param cbEvent 回调事件结构, 详见nlsEvent.h
 * @param cbParam 回调自定义参数,默认为NULL, 可以根据需求自定义参数
 * @return
 */
static void onMessage(NlsEvent *cbEvent, void *cbParam) 
{
	apt_log(SYNTH_LOG_MARK,APT_PRIO_DEBUG,"onMessage: All response:%s",cbEvent->getAllResponse());
}

/**
 * @brief 短链接模式下工作线程
 *        以 createSynthesizerRequest          <----|
 *                   |                              |
 *           request->start()                       |
 *                   |                              |
 *           收到OnSynthesisChannelClosed回调       |
 *                   |                              |
 *           releaseSynthesizerRequest(request) ----|
 *        进行循环。
 */
static void *pthreadFunc(void *arg) 
{
	// 从自定义线程参数中获取token, 配置文件等参数.
	ParamStruct *tst = static_cast<ParamStruct *>(arg);
	if (tst == NULL) {
		apt_log(SYNTH_LOG_MARK,APT_PRIO_ERROR,"arg is not valid.");
		return NULL;
	}

	// 初始化自定义回调参数
	ParamCallBack cbParam(tst);

	/*
	 * 1. 创建语音识别SpeechSynthesizerRequest对象.
	 *
	 * 默认为实时短文本语音合成请求, 支持一次性合成300字符以内的文字,
	 * 其中1个汉字、1个英文字母或1个标点均算作1个字符,
	 * 超过300个字符的内容将会报错(或者截断).
	 * 一次性合成超过300字符可考虑长文本语音合成功能.
	 *
	 * 实时短文本语音合成文档详见:
	 * https://help.aliyun.com/document_detail/84435.html
	 * 长文本语音合成文档详见:
	 * https://help.aliyun.com/document_detail/130509.html
	 */
	int chars_cnt = NlsClient::getInstance()->calculateUtf8Chars(tst->text);
	apt_log(SYNTH_LOG_MARK,APT_PRIO_DEBUG,"starting synthesizer this text contains %d chars. pid %ld",chars_cnt,pthread_self());
	apt_log(SYNTH_LOG_MARK,APT_PRIO_DEBUG,"Voice Name : %s",tst->synth_header->voice_param.name.buf);

	SpeechSynthesizerRequest *request = NULL;
	if (chars_cnt > 300) {
		// 长文本语音合成
		request = NlsClient::getInstance()->createSynthesizerRequest(LongTts);
	} else {
		// 短文本语音合成
		request = NlsClient::getInstance()->createSynthesizerRequest();
	}
	if (request == NULL) {
		apt_log(SYNTH_LOG_MARK,APT_PRIO_ERROR,"createSynthesizerRequest failed.");
		return NULL;
	}

	/*
	 * 2. 设置用于接收结果的回调
	 */
	// 设置音频合成可以开始的回调函数
	request->setOnSynthesisStarted(OnSynthesisStarted, &cbParam);
	// 设置音频合成结束回调函数
	request->setOnSynthesisCompleted(OnSynthesisCompleted, &cbParam);
	// 设置音频合成通道关闭回调函数
	request->setOnChannelClosed(OnSynthesisChannelClosed, &cbParam);
	// 设置异常失败回调函数
	request->setOnTaskFailed(OnSynthesisTaskFailed, &cbParam);
	// 设置文本音频数据接收回调函数
	request->setOnBinaryDataReceived(OnBinaryDataRecved, &cbParam);
	// 设置字幕信息
	request->setOnMetaInfo(OnMetaInfo, &cbParam);
	// 设置所有服务端返回信息回调函数
	// request->setOnMessage(onMessage, &cbParam);
	// 开启所有服务端返回信息回调函数, 其他回调(除了OnBinaryDataRecved)失效
	// request->setEnableOnMessage(true);

	/*
	 * 3. 设置request的相关参数
	 */
	// 设置待合成文本, 必填参数. 文本内容必须为UTF-8编码
	// 一次性合成超过300字符可考虑长文本语音合成功能.
	// 长文本语音合成文档详见:
	// https://help.aliyun.com/document_detail/130509.html
	request->setText(tst->text);
	// 发音人, 包含"xiaoyun", "ruoxi", "xiaogang"等. 可选参数, 默认是xiaoyun
	if (strlen(tst->synth_header->voice_param.name.buf) > 0) {
		request->setVoice(tst->synth_header->voice_param.name.buf);
	} else {
		request->setVoice(g_voice.c_str());
	}

	// 访问个性化音色,访问的Voice必须是个人定制音色
	// request->setPayloadParam("{\"enable_ptts\":true}");
	// 音量, 范围是0~100, 可选参数, 默认50
	request->setVolume(g_volume);
	// 语速, 范围是-500~500, 可选参数, 默认是0
	request->setSpeechRate(g_speech_rate);
	// 语调, 范围是-500~500, 可选参数, 默认是0
	request->setPitchRate(g_pitch_rate);
	// 音频编码格式, 可选参数, 默认是wav. 支持的格式pcm, wav, mp3
	request->setFormat(g_format.c_str());
	// 音频采样率, 包含8000, 16000. 可选参数, 默认是16000
	request->setSampleRate(g_sample_rate);
	// 开启字幕
	request->setEnableSubtitle(false);

	// 设置AppKey, 必填参数, 请参照官网申请
	if (strlen(tst->appKey) > 0) {
		request->setAppKey(tst->appKey);
	}
	// 设置账号校验token, 必填参数
	if (strlen(tst->token) > 0) {
		request->setToken(tst->token);
	}

	if (strlen(tst->url) > 0) {
		request->setUrl(tst->url);
	}
	// 设置链接超时500ms
	request->setTimeout(500);
	// 获取返回文本的编码格式
	const char *output_format = request->getOutputFormat();
	apt_log(SYNTH_LOG_MARK,APT_PRIO_DEBUG,"text format: %s",output_format);

	/*
	 * 4. start()为异步操作。成功则开始返回BinaryRecv事件。失败返回TaskFailed事件。
	 */
	int ret = request->start();
	if (ret < 0) {
		apt_log(SYNTH_LOG_MARK,APT_PRIO_WARNING,"start() failed. may be can not connect server. please check network or firewalld");
		// start()失败,释放request对象
		NlsClient::getInstance()->releaseSynthesizerRequest(request);
		return NULL;
	}

	/*
	 * 5. start成功,开始等待接收完所有合成数据。
	 *    stop()为无意义接口,调用与否都会跑完全程.
	 *    cancel()立即停止工作, 且不会有回调返回, 失败返回TaskFailed事件。
	 */
	//    ret = request->cancel();
	//    ret = request->stop();  // always return 0

	/*
	 * 开始等待接收完所有合成数据。
	 */
	struct timeval now;
	struct timespec outtime;
	if (ret == 0) {
		apt_log(SYNTH_LOG_MARK,APT_PRIO_DEBUG,"wait closed callback.");
		// 语音服务器存在来不及处理当前请求, 10s内不返回任何回调的问题,
		// 然后在10s后返回一个TaskFailed回调, 所以需要设置一个超时机制.
		gettimeofday(&now, NULL);

		/*
		 * 根据文本长短、接收速度和网络环境,接收完所有合成数据的时间无法确定,
		 * 这里设定30s超时只是展示一种超时机制。
		 */
		outtime.tv_sec = now.tv_sec + 30;
		outtime.tv_nsec = now.tv_usec * 1000;
		// 等待closed事件后再进行释放, 否则会出现崩溃
		pthread_mutex_lock(&(cbParam.mtxWord));
		if (ETIMEDOUT == pthread_cond_timedwait(&(cbParam.cvWord), &(cbParam.mtxWord), &outtime)) {
			apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"synthesis timeout.");
		}
		pthread_mutex_unlock(&(cbParam.mtxWord));
	} else {
		apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"ret is %d, pid %ld",ret,pthread_self());
	}

	/*
	 * 6. 完成所有工作后释放当前请求。
	 *    请在closed事件(确定完成所有工作)后再释放, 否则容易破坏内部状态机,
	 *    会强制卸载正在运行的请求。
	 */
	NlsClient::getInstance()->releaseSynthesizerRequest(request);
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"release Synthesizer success. pid %ld",pthread_self());
	return NULL;
}

/** Create alimrcp synthesizer engine */
MRCP_PLUGIN_DECLARE(mrcp_engine_t*) mrcp_plugin_create(apr_pool_t *pool)
{
	/* create alimrcp engine */
	alimrcp_synth_engine_t *alimrcp_engine = static_cast<alimrcp_synth_engine_t *>(apr_palloc(pool,sizeof(alimrcp_synth_engine_t)));
	apt_task_t *task;
	apt_task_vtable_t *vtable;
	apt_task_msg_pool_t *msg_pool;

	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO, "mrcp_plugin_create");
	/* create task/thread to run alimrcp engine in the context of this task */
	msg_pool = apt_task_msg_pool_create_dynamic(sizeof(alimrcp_synth_msg_t),pool);
	alimrcp_engine->task = apt_consumer_task_create(alimrcp_engine,msg_pool,pool);
	if(!alimrcp_engine->task) {
		return NULL;
	}
	task = apt_consumer_task_base_get(alimrcp_engine->task);
	apt_task_name_set(task,SYNTH_ENGINE_TASK_NAME);
	vtable = apt_task_vtable_get(task);
	if(vtable) {
		vtable->process_msg = alimrcp_synth_msg_process;
	}

	/* create engine base */
	return mrcp_engine_create(
				MRCP_SYNTHESIZER_RESOURCE, /* MRCP resource identifier */
				alimrcp_engine,               /* object to associate */
				&engine_vtable,            /* virtual methods table of engine */
				pool);                     /* pool to allocate memory from */
}

/** Destroy synthesizer engine */
static apt_bool_t alimrcp_synth_engine_destroy(mrcp_engine_t *engine)
{
	alimrcp_synth_engine_t *alimrcp_engine = static_cast<alimrcp_synth_engine_t *>(engine->obj);
	if(alimrcp_engine->task) {
		apt_task_t *task = apt_consumer_task_base_get(alimrcp_engine->task);
		apt_task_destroy(task);
		alimrcp_engine->task = NULL;
	}
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_engine_destroy");
	NlsClient::releaseInstance();
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_engine_close release instance");
	return TRUE;
}

/** Open synthesizer engine */
static apt_bool_t alimrcp_synth_engine_open(mrcp_engine_t *engine)
{
	alimrcp_synth_engine_t *alimrcp_engine = static_cast<alimrcp_synth_engine_t *>(engine->obj);
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_engine_open");
	const char *appKey = mrcp_engine_param_get(engine, "appKey");
	if (appKey != NULL) {
		g_appKey = appKey;
	}
	const char *akId = mrcp_engine_param_get(engine, "akId");
	if (akId != NULL) {
		g_akId = akId;
	}
	const char *akSecret = mrcp_engine_param_get(engine, "akSecret");
	if (akSecret != NULL) {
		g_akSecret = akSecret;
	}
	const char *sdkLogLevel = mrcp_engine_param_get(engine, "sdk-log-level");
	if (sdkLogLevel != NULL) {
		g_sdkLogLevel = atoi(sdkLogLevel);
	}
	const char *saveAudio = mrcp_engine_param_get(engine, "save-audio");
	if (saveAudio != NULL) {
		g_save_audio = atoi(saveAudio) == 1;
	}
	const char *encodingGB2312 = mrcp_engine_param_get(engine, "text-encoding-gb2312");
	if (encodingGB2312 != NULL) {
		g_encodingUtf8 = atoi(encodingGB2312) == 0;
	}
	const char *format = mrcp_engine_param_get(engine, "format");
	if (format != NULL) {
		g_format = format;
	}
	const char *voice = mrcp_engine_param_get(engine, "voice");
	if (voice != NULL) {
		g_voice = voice;
	}
	const char *sampleRate = mrcp_engine_param_get(engine, "sample_rate");
	if (sampleRate != NULL) {
		g_sample_rate = atoi(sampleRate);
	}
	const char *volume = mrcp_engine_param_get(engine, "volume");
	if (volume != NULL) {
		g_volume = atoi(volume);
	}
	const char *speechRate = mrcp_engine_param_get(engine, "speech_rate");
	if (speechRate != NULL) {
		g_speech_rate = atoi(speechRate);
	}
	const char *pitchRate = mrcp_engine_param_get(engine, "pitch_rate");
	if (pitchRate != NULL) {
		g_pitch_rate = atoi(pitchRate);
	}

	if(alimrcp_engine->task) {
		apt_task_t *task = apt_consumer_task_base_get(alimrcp_engine->task);
		apt_task_start(task);
	}
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"max_channel_count : %d, appId : %s, akId : %s, akSecret : ******", engine->config->max_channel_count, g_appKey.c_str(), g_akId.c_str());
	// 启动工作线程, 在创建请求和启动前必须调用此函数, 可理解为对NlsClient的初始化
	// 入参为负时, 启动当前系统中可用的核数。
	// 200并发以下推荐入参为1, 更高并发入参推荐可看readme。
	NlsClient::getInstance()->startWorkThread(1);
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"Nls Client Version : %s", NlsClient::getInstance()->getVersion());

	return mrcp_engine_open_respond(engine,TRUE);
}

/** Close synthesizer engine */
static apt_bool_t alimrcp_synth_engine_close(mrcp_engine_t *engine)
{
	alimrcp_synth_engine_t *alimrcp_engine = static_cast<alimrcp_synth_engine_t *>(engine->obj);
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_engine_close");
	if(alimrcp_engine->task) {
		apt_task_t *task = apt_consumer_task_base_get(alimrcp_engine->task);
		apt_task_terminate(task,TRUE);
	}
	return mrcp_engine_close_respond(engine);
}

/** Create alimrcp synthesizer channel derived from engine channel base */
static mrcp_engine_channel_t* alimrcp_synth_engine_channel_create(mrcp_engine_t *engine, apr_pool_t *pool)
{
	mpf_stream_capabilities_t *capabilities;
	mpf_termination_t *termination; 

	/* create alimrcp synth channel */
	alimrcp_synth_channel_t *synth_channel = static_cast<alimrcp_synth_channel_t *>(apr_palloc(pool,sizeof(alimrcp_synth_channel_t)));
	synth_channel->alimrcp_engine = static_cast<alimrcp_synth_engine_t *>(engine->obj);
	synth_channel->speak_request = NULL;
	synth_channel->stop_response = NULL;
	synth_channel->time_to_complete = 0;
	synth_channel->paused = FALSE;
	synth_channel->audio_file = NULL;
	synth_channel->audio_buffer = NULL;
	
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_engine_channel_create");
	capabilities = mpf_source_stream_capabilities_create(pool);
	mpf_codec_capabilities_add(
			&capabilities->codecs,
			MPF_SAMPLE_RATE_8000 | MPF_SAMPLE_RATE_16000,
			"LPCM");

	/* create media termination */
	termination = mrcp_engine_audio_termination_create(
			synth_channel,        /* object to associate */
			&audio_stream_vtable, /* virtual methods table of audio stream */
			capabilities,         /* stream capabilities */
			pool);                /* pool to allocate memory from */

	/* create engine channel base */
	synth_channel->channel = mrcp_engine_channel_create(
			engine,               /* engine */
			&channel_vtable,      /* virtual methods table of engine channel */
			synth_channel,        /* object to associate */
			termination,          /* associated media termination */
			pool);                /* pool to allocate memory from */

	synth_channel->audio_buffer = mpf_buffer_create(pool);
	return synth_channel->channel;
}

/** Destroy engine channel */
static apt_bool_t alimrcp_synth_channel_destroy(mrcp_engine_channel_t *channel)
{
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_channel_destroy");
	/* nothing to destroy */
	return TRUE;
}

/** Open engine channel (asynchronous response MUST be sent)*/
static apt_bool_t alimrcp_synth_channel_open(mrcp_engine_channel_t *channel)
{
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_channel_open");
	return alimrcp_synth_msg_signal(ALIMRCP_SYNTH_MSG_OPEN_CHANNEL,channel,NULL);
}

/** Close engine channel (asynchronous response MUST be sent)*/
static apt_bool_t alimrcp_synth_channel_close(mrcp_engine_channel_t *channel)
{
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_channel_close");
	return alimrcp_synth_msg_signal(ALIMRCP_SYNTH_MSG_CLOSE_CHANNEL,channel,NULL);
}

/** Process MRCP channel request (asynchronous response MUST be sent)*/
static apt_bool_t alimrcp_synth_channel_request_process(mrcp_engine_channel_t *channel, mrcp_message_t *request)
{
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_channel_request_process");
	return alimrcp_synth_msg_signal(ALIMRCP_SYNTH_MSG_REQUEST_PROCESS,channel,request);
}

/** Process SPEAK request */
static apt_bool_t alimrcp_synth_channel_speak(mrcp_engine_channel_t *channel, mrcp_message_t *request, mrcp_message_t *response)
{
	char *file_path = NULL;
	alimrcp_synth_channel_t *synth_channel = static_cast<alimrcp_synth_channel_t *>(channel->method_obj);
	const mpf_codec_descriptor_t *descriptor = mrcp_engine_source_stream_codec_get(channel);

	apt_log(SYNTH_LOG_MARK,APT_PRIO_DEBUG,"alimrcp_synth_channel_speak");

	if(!descriptor) {
		apt_log(SYNTH_LOG_MARK,APT_PRIO_WARNING,"Failed to Get Codec Descriptor " APT_SIDRES_FMT, MRCP_MESSAGE_SIDRES(request));
		response->start_line.status_code = MRCP_STATUS_CODE_METHOD_FAILED;
		return FALSE;
	}
	
	if(!request->body.length) {
		synth_channel->speak_request = NULL;
		response->start_line.status_code = MRCP_STATUS_CODE_MISSING_PARAM;
		return FALSE;
	}

	//获取当前系统时间戳,判断token是否过期。
	std::time_t curTime = std::time(0);
	if (g_expireTime - curTime < 10) {
		apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"The token will be expired, please generate new token by AccessKey-ID and AccessKey-Secret.");
		if (generateToken(g_akId, g_akSecret, &g_token, &g_expireTime) < 0) {
			apt_log(SYNTH_LOG_MARK,APT_PRIO_ERROR,"Failed to Generate New Token." APT_SIDRES_FMT, MRCP_MESSAGE_SIDRES(request));
			response->start_line.status_code = MRCP_STATUS_CODE_RESOURCE_SPECIFIC_FAILURE;
			return FALSE;
		}
	}

	/* 不要超过AUDIO_TEXT_LENGTH */
	// const char texts[AUDIO_TEXT_LENGTH] = {"今日天气真不错,我想去操场踢足球."};

	ParamStruct pa;

	memset(pa.token, 0, DEFAULT_STRING_LEN);
	memcpy(pa.token, g_token.c_str(), g_token.length());

	memset(pa.appKey, 0, DEFAULT_STRING_LEN);
	memcpy(pa.appKey, g_appKey.c_str(), g_appKey.length());

	/* 不要超过AUDIO_TEXT_LENGTH */
	memset(pa.text, 0, AUDIO_TEXT_LENGTH);
	// memcpy(pa.text, texts, strlen(texts));
	memcpy(pa.text, request->body.buf, strlen(request->body.buf));

	memset(pa.url, 0, DEFAULT_STRING_LEN);
	if (!g_url.empty()) {
		memcpy(pa.url, g_url.c_str(), g_url.length());
	}

	if(g_save_audio && channel->engine) {
		memset(pa.var_file_path, 0, DEFAULT_STRING_LEN);
		char *var_file_path = apt_vardir_filepath_get(channel->engine->dir_layout,"",channel->pool);
		memcpy(pa.var_file_path, var_file_path, strlen(var_file_path));
        }

	// 设置回话 ID
	memset(pa.session_id, 0, DEFAULT_STRING_LEN);
	memcpy(pa.session_id, request->channel_id.session_id.buf, strlen(request->channel_id.session_id.buf));

	pa.audio_buffer = synth_channel->audio_buffer;
	/* get synthesizer header */
	pa.synth_header = static_cast<mrcp_synth_header_t *>(mrcp_resource_header_get(request));
	
	pthread_t pthreadId;
	// 启动工作线程
	pthread_create(&pthreadId, NULL, &pthreadFunc, (void *) &(pa));
	pthread_join(pthreadId, NULL);


	synth_channel->time_to_complete = 0;
	if(synth_channel->audio_buffer == NULL && channel->engine) {
		char *file_name = apr_psprintf(channel->pool,"alimrcp-%dkHz.pcm",descriptor->sampling_rate/1000);
		file_path = apt_datadir_filepath_get(channel->engine->dir_layout,file_name,channel->pool);
	}

	if(file_path) {
		synth_channel->audio_file = fopen(file_path,"rb");
		if(synth_channel->audio_file) {
			apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"Set [%s] as Speech Source " APT_SIDRES_FMT,file_path,MRCP_MESSAGE_SIDRES(request));
		} else {
			apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"No Speech Source [%s] Found " APT_SIDRES_FMT,file_path,MRCP_MESSAGE_SIDRES(request));
			/* calculate estimated time to complete */
			if(mrcp_generic_header_property_check(request,GENERIC_HEADER_CONTENT_LENGTH) == TRUE) {
				mrcp_generic_header_t *generic_header = mrcp_generic_header_get(request);
				if(generic_header) {
					synth_channel->time_to_complete = generic_header->content_length * 10; /* 10 msec per character */
				}
			}
		}
	}

	response->start_line.request_state = MRCP_REQUEST_STATE_INPROGRESS;
	/* send asynchronous response */
	mrcp_engine_channel_message_send(channel,response);
	synth_channel->speak_request = request;
	
	return TRUE;
}

/** Process STOP request */
static apt_bool_t alimrcp_synth_channel_stop(mrcp_engine_channel_t *channel, mrcp_message_t *request, mrcp_message_t *response)
{
	alimrcp_synth_channel_t *synth_channel = static_cast<alimrcp_synth_channel_t *>(channel->method_obj);
	/* store the request, make sure there is no more activity and only then send the response */
	synth_channel->stop_response = response;
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_channel_stop");
	return TRUE;
}

/** Process PAUSE request */
static apt_bool_t alimrcp_synth_channel_pause(mrcp_engine_channel_t *channel, mrcp_message_t *request, mrcp_message_t *response)
{
	alimrcp_synth_channel_t *synth_channel = static_cast<alimrcp_synth_channel_t *>(channel->method_obj);
	synth_channel->paused = TRUE;
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_channel_pause");
	/* send asynchronous response */
	mrcp_engine_channel_message_send(channel,response);
	return TRUE;
}

/** Process RESUME request */
static apt_bool_t alimrcp_synth_channel_resume(mrcp_engine_channel_t *channel, mrcp_message_t *request, mrcp_message_t *response)
{
	alimrcp_synth_channel_t *synth_channel = static_cast<alimrcp_synth_channel_t *>(channel->method_obj);
	synth_channel->paused = FALSE;
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_channel_resume");
	/* send asynchronous response */
	mrcp_engine_channel_message_send(channel,response);
	return TRUE;
}

/** Process SET-PARAMS request */
static apt_bool_t alimrcp_synth_channel_set_params(mrcp_engine_channel_t *channel, mrcp_message_t *request, mrcp_message_t *response)
{
	mrcp_synth_header_t *req_synth_header;
	/* get synthesizer header */
	req_synth_header = static_cast<mrcp_synth_header_t *>(mrcp_resource_header_get(request));
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_channel_set_params");
	if(req_synth_header) {
		/* check voice age header */
		if(mrcp_resource_header_property_check(request,SYNTHESIZER_HEADER_VOICE_AGE) == TRUE) {
			apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"Set Voice Age [%" APR_SIZE_T_FMT "]",
				req_synth_header->voice_param.age);
		}
		/* check voice name header */
		if(mrcp_resource_header_property_check(request,SYNTHESIZER_HEADER_VOICE_NAME) == TRUE) {
			apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"Set Voice Name [%s]",
				req_synth_header->voice_param.name.buf);
		}
	}
	
	/* send asynchronous response */
	mrcp_engine_channel_message_send(channel,response);
	return TRUE;
}

/** Process GET-PARAMS request */
static apt_bool_t alimrcp_synth_channel_get_params(mrcp_engine_channel_t *channel, mrcp_message_t *request, mrcp_message_t *response)
{
	mrcp_synth_header_t *req_synth_header;
	/* get synthesizer header */
	req_synth_header = static_cast<mrcp_synth_header_t *>(mrcp_resource_header_get(request));
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_channel_get_params");
	if(req_synth_header) {
		mrcp_synth_header_t *res_synth_header = static_cast<mrcp_synth_header_t *>(mrcp_resource_header_prepare(response));
		/* check voice age header */
		if(mrcp_resource_header_property_check(request,SYNTHESIZER_HEADER_VOICE_AGE) == TRUE) {
			res_synth_header->voice_param.age = 25;
			mrcp_resource_header_property_add(response,SYNTHESIZER_HEADER_VOICE_AGE);
		}
		/* check voice name header */
		if(mrcp_resource_header_property_check(request,SYNTHESIZER_HEADER_VOICE_NAME) == TRUE) {
			apt_string_set(&res_synth_header->voice_param.name,"David");
			mrcp_resource_header_property_add(response,SYNTHESIZER_HEADER_VOICE_NAME);
		}
	}

	/* send asynchronous response */
	mrcp_engine_channel_message_send(channel,response);
	return TRUE;
}

/** Dispatch MRCP request */
static apt_bool_t alimrcp_synth_channel_request_dispatch(mrcp_engine_channel_t *channel, mrcp_message_t *request)
{
	apt_bool_t processed = FALSE;
	mrcp_message_t *response = mrcp_response_create(request,request->pool);
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_channel_request_dispatch");
	switch(request->start_line.method_id) {
		case SYNTHESIZER_SET_PARAMS:
			processed = alimrcp_synth_channel_set_params(channel,request,response);
			break;
		case SYNTHESIZER_GET_PARAMS:
			processed = alimrcp_synth_channel_get_params(channel,request,response);
			break;
		case SYNTHESIZER_SPEAK:
			processed = alimrcp_synth_channel_speak(channel,request,response);
			break;
		case SYNTHESIZER_STOP:
			processed = alimrcp_synth_channel_stop(channel,request,response);
			break;
		case SYNTHESIZER_PAUSE:
			processed = alimrcp_synth_channel_pause(channel,request,response);
			break;
		case SYNTHESIZER_RESUME:
			processed = alimrcp_synth_channel_resume(channel,request,response);
			break;
		case SYNTHESIZER_BARGE_IN_OCCURRED:
			processed = alimrcp_synth_channel_stop(channel,request,response);
			break;
		case SYNTHESIZER_CONTROL:
			break;
		case SYNTHESIZER_DEFINE_LEXICON:
			break;
		default:
			break;
	}
	if(processed == FALSE) {
		/* send asynchronous response for not handled request */
		mrcp_engine_channel_message_send(channel,response);
	}
	return TRUE;
}

/** Callback is called from MPF engine context to destroy any additional data associated with audio stream */
static apt_bool_t alimrcp_synth_stream_destroy(mpf_audio_stream_t *stream)
{
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_stream_destroy");
	return TRUE;
}

/** Callback is called from MPF engine context to perform any action before open */
static apt_bool_t alimrcp_synth_stream_open(mpf_audio_stream_t *stream, mpf_codec_t *codec)
{
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_stream_open");
	return TRUE;
}

/** Callback is called from MPF engine context to perform any action after close */
static apt_bool_t alimrcp_synth_stream_close(mpf_audio_stream_t *stream)
{
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_stream_close");
	return TRUE;
}

/** Callback is called from MPF engine context to read/get new frame */
static apt_bool_t alimrcp_synth_stream_read(mpf_audio_stream_t *stream, mpf_frame_t *frame)
{
	alimrcp_synth_channel_t *synth_channel = static_cast<alimrcp_synth_channel_t *>(stream->obj);
	// apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_stream_read");
	/* check if STOP was requested */
	if(synth_channel->stop_response) {
		/* send asynchronous response to STOP request */
		mrcp_engine_channel_message_send(synth_channel->channel,synth_channel->stop_response);
		synth_channel->stop_response = NULL;
		synth_channel->speak_request = NULL;
		synth_channel->paused = FALSE;
		if(synth_channel->audio_buffer) {
			// TODO: close
			synth_channel->audio_buffer = NULL;
		}
		if(synth_channel->audio_file) {
			fclose(synth_channel->audio_file);
			synth_channel->audio_file = NULL;
		}
		return TRUE;
	}

	/* check if there is active SPEAK request and it isn't in paused state */
	if(synth_channel->speak_request && synth_channel->paused == FALSE) {
		/* normal processing */
		apt_bool_t completed = FALSE;
	
		// apt_log(APT_LOG_MARK, APT_PRIO_INFO, "alimrcp_synth_stream_read synth_channel->speak_request");
		if(synth_channel->audio_buffer) {
			// apt_log(APT_LOG_MARK, APT_PRIO_INFO, "alimrcp_synth_stream_read synth_channel->audio_buffer to frame");
			/* read speech from buffer */
			mpf_buffer_frame_read(synth_channel->audio_buffer,frame);
			if((frame->type & MEDIA_FRAME_TYPE_EVENT) == MEDIA_FRAME_TYPE_EVENT) {
				frame->type &= ~MEDIA_FRAME_TYPE_EVENT;
				completed = TRUE;
			}
		} else if(synth_channel->audio_file) {
			/* read speech from file */
			apr_size_t size = frame->codec_frame.size;
			if(fread(frame->codec_frame.buffer,1,size,synth_channel->audio_file) == size) {
				frame->type |= MEDIA_FRAME_TYPE_AUDIO;
			} else {
				completed = TRUE;
			}
		} else {
			/* fill with silence in case no file available */
			if(synth_channel->time_to_complete >= stream->rx_descriptor->frame_duration) {
				memset(frame->codec_frame.buffer,0,frame->codec_frame.size);
				frame->type |= MEDIA_FRAME_TYPE_AUDIO;
				synth_channel->time_to_complete -= stream->rx_descriptor->frame_duration;
			} else {
				completed = TRUE;
			}
		}

		if(completed) {
			/* raise SPEAK-COMPLETE event */
			mrcp_message_t *message = mrcp_event_create(
								synth_channel->speak_request,
								SYNTHESIZER_SPEAK_COMPLETE,
								synth_channel->speak_request->pool);
			if(message) {
				/* get/allocate synthesizer header */
				mrcp_synth_header_t *synth_header = static_cast<mrcp_synth_header_t *>(mrcp_resource_header_prepare(message));
				if(synth_header) {
					/* set completion cause */
					synth_header->completion_cause = SYNTHESIZER_COMPLETION_CAUSE_NORMAL;
					mrcp_resource_header_property_add(message,SYNTHESIZER_HEADER_COMPLETION_CAUSE);
				}
				/* set request state */
				message->start_line.request_state = MRCP_REQUEST_STATE_COMPLETE;

				synth_channel->speak_request = NULL;
				if(synth_channel->audio_file) {
					fclose(synth_channel->audio_file);
					synth_channel->audio_file = NULL;
				}
				if(synth_channel->audio_buffer) {
					// TODO: fclose(synth_channel->audio_file);
					synth_channel->audio_buffer = NULL;
				}
				/* send asynch event */
				mrcp_engine_channel_message_send(synth_channel->channel,message);
			}
		}
	}
	return TRUE;
}

static apt_bool_t alimrcp_synth_msg_signal(alimrcp_synth_msg_type_e type, mrcp_engine_channel_t *channel, mrcp_message_t *request)
{
	apt_bool_t status = FALSE;
	alimrcp_synth_channel_t *alimrcp_channel = static_cast<alimrcp_synth_channel_t *>(channel->method_obj);
	alimrcp_synth_engine_t *alimrcp_engine = alimrcp_channel->alimrcp_engine;
	apt_task_t *task = apt_consumer_task_base_get(alimrcp_engine->task);
	apt_task_msg_t *msg = apt_task_msg_get(task);
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_msg_signal");
	if(msg) {
		alimrcp_synth_msg_t *alimrcp_msg;
		msg->type = TASK_MSG_USER;
		alimrcp_msg = (alimrcp_synth_msg_t*) msg->data;

		alimrcp_msg->type = type;
		alimrcp_msg->channel = channel;
		alimrcp_msg->request = request;
		status = apt_task_msg_signal(task,msg);
	}
	return status;
}

static apt_bool_t alimrcp_synth_msg_process(apt_task_t *task, apt_task_msg_t *msg)
{
	alimrcp_synth_msg_t *alimrcp_msg = (alimrcp_synth_msg_t*)msg->data;
	apt_log(SYNTH_LOG_MARK,APT_PRIO_INFO,"alimrcp_synth_msg_process");
	switch(alimrcp_msg->type) {
		case ALIMRCP_SYNTH_MSG_OPEN_CHANNEL:
			/* open channel and send asynch response */
			mrcp_engine_channel_open_respond(alimrcp_msg->channel,TRUE);
			break;
		case ALIMRCP_SYNTH_MSG_CLOSE_CHANNEL:
			/* close channel, make sure there is no activity and send asynch response */
			mrcp_engine_channel_close_respond(alimrcp_msg->channel);
			break;
		case ALIMRCP_SYNTH_MSG_REQUEST_PROCESS:
			alimrcp_synth_channel_request_dispatch(alimrcp_msg->channel,alimrcp_msg->request);
			break;
		default:
			break;
	}
	return TRUE;
}
pthread_t pthreadId;
// 启动工作线程
pthread_create(&pthreadId, NULL, &pthreadFunc, (void *) &(pa));
pthread_join(pthreadId, NULL);

大佬请问一下这个工作线程启动后为什么不进行分离?如果join等待结束,第二路通道发送语音合成请求时,将会一直等待这路的请求结束掉。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants